-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server][dvc] Replace Long offset with PubSubPosition in PubSubMessage #1559
base: main
Are you sure you want to change the base?
[server][dvc] Replace Long offset with PubSubPosition in PubSubMessage #1559
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Overview
This PR replaces occurrences of PubSubMessage with a Long offset type by PubSubMessage where the offset is now a PubSubPosition. The changes update related method signatures, constructors, and consumer record processing to use getNumericOffset() when a long value is needed.
- Updated offset handling in consumer record processing methods across several modules.
- Modified method signatures and type parameters in classes such as ActiveActiveStoreIngestionTask, StoreIngestionTask, and others.
- Unified the conversion from PubSubPosition to long using getNumericOffset() in the appropriate contexts.
Reviewed Changes
File | Description |
---|---|
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java | Converted offset retrieval to use getNumericOffset() in record processing. |
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Updated PubSubMessage type parameters and method signatures from Long to PubSubPosition. |
Multiple files (e.g., VeniceChangelogConsumerImpl.java, KafkaConsumerService.java, IngestionBatchProcessor.java) | Consistently replaced Long offset usage by PubSubPosition and updated related offset extraction calls. |
Copilot reviewed 72 out of 72 changed files in this pull request and generated no comments.
Comments suppressed due to low confidence (3)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java:482
- Ensure that the conversion from PubSubPosition to a numeric offset using getNumericOffset() is applied consistently across all consumer record processing logic to avoid type mismatches and unintended behavior.
long sourceOffset = consumerRecord.getOffset().getNumericOffset();
clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java:33
- Verify that using PubSubPosition directly in the VeniceChangeCoordinate constructor integrates smoothly with downstream processing expectations and that any numeric offset extraction happens where needed.
this.offset = new VeniceChangeCoordinate(this.topicPartition.getPubSubTopic().getName(), pubSubPosition, this.topicPartition.getPartitionNumber());
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java:1691
- Confirm that all leader and follower processing logic correctly uses getNumericOffset() to retrieve a long value from PubSubPosition, ensuring consistency with the new offset type.
long sourceTopicOffset = consumerRecord.getOffset().getNumericOffset();
…ubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>
f98a60d
to
4eee3c6
Compare
4eee3c6
to
d8c89ac
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Overview
This PR fully replaces usages of Long offsets with PubSubPosition in PubSubMessage and its related processing logic in several modules. Key changes include:
- Updating method signatures, constructors, and record processing logic to use PubSubPosition and its getNumericOffset() accessor.
- Refactoring of multiple files to consistently propagate offset conversions and remove legacy ApacheKafkaOffsetPosition usage.
Reviewed Changes
File | Description |
---|---|
clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java | Updated bootstrap record handling with new PubSubPosition wrapping and a new TODO regarding the zero-offset case. |
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java | Changed delegate and processing methods to extract numeric offsets via getNumericOffset() from PubSubPosition. |
Other files in the client package | Applied similar refactorings to method signatures, variable types, and documentation to replace Long with PubSubPosition. |
Copilot reviewed 74 out of 74 changed files in this pull request and generated no comments.
Comments suppressed due to low confidence (2)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java:313
- [nitpick] Consider replacing the magic 'zero-offset' value with a well-defined constant (e.g., PubSubPosition.EARLIEST) to improve code clarity and prevent future misinterpretations.
+ /**
+ * TODO: Should we introduce a magic position to handle the zero-offset case?
+ * Or use {@link com.linkedin.venice.pubsub.api.PubSubPosition.EARLIEST} as an alternative.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java:482
- Ensure that invoking getNumericOffset() consistently returns the correct long value across all code paths and consider reviewing related tests to verify that the new offset conversion behaves as expected.
long sourceOffset = consumerRecord.getOffset().getNumericOffset();
Replace Long offset with PubSubPosition in PubSubMessage
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>
withPubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>
.PubSubPosition::getNumericOffset
to facilitate the transition from numeric offsetsto
PubSubPosition
. It will be removed once the codebase fully adoptsPubSubPosition
.How was this PR tested?
Does this PR introduce any user-facing changes?