Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][dvc] Replace Long offset with PubSubPosition in PubSubMessage #1559

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

sushantmane
Copy link
Collaborator

@sushantmane sushantmane commented Feb 27, 2025

Replace Long offset with PubSubPosition in PubSubMessage

  • Replaced PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> with
    PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>.
  • Updated method signatures, constructors, and consumer record processing logic.
  • Introduced PubSubPosition::getNumericOffset to facilitate the transition from numeric offsets
    to PubSubPosition. It will be removed once the codebase fully adopts PubSubPosition.

How was this PR tested?

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

@sushantmane sushantmane requested a review from Copilot February 27, 2025 22:13

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This PR replaces occurrences of PubSubMessage with a Long offset type by PubSubMessage where the offset is now a PubSubPosition. The changes update related method signatures, constructors, and consumer record processing to use getNumericOffset() when a long value is needed.

  • Updated offset handling in consumer record processing methods across several modules.
  • Modified method signatures and type parameters in classes such as ActiveActiveStoreIngestionTask, StoreIngestionTask, and others.
  • Unified the conversion from PubSubPosition to long using getNumericOffset() in the appropriate contexts.

Reviewed Changes

File Description
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java Converted offset retrieval to use getNumericOffset() in record processing.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java Updated PubSubMessage type parameters and method signatures from Long to PubSubPosition.
Multiple files (e.g., VeniceChangelogConsumerImpl.java, KafkaConsumerService.java, IngestionBatchProcessor.java) Consistently replaced Long offset usage by PubSubPosition and updated related offset extraction calls.

Copilot reviewed 72 out of 72 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (3)

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java:482

  • Ensure that the conversion from PubSubPosition to a numeric offset using getNumericOffset() is applied consistently across all consumer record processing logic to avoid type mismatches and unintended behavior.
long sourceOffset = consumerRecord.getOffset().getNumericOffset();

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java:33

  • Verify that using PubSubPosition directly in the VeniceChangeCoordinate constructor integrates smoothly with downstream processing expectations and that any numeric offset extraction happens where needed.
this.offset = new VeniceChangeCoordinate(this.topicPartition.getPubSubTopic().getName(), pubSubPosition, this.topicPartition.getPartitionNumber());

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java:1691

  • Confirm that all leader and follower processing logic correctly uses getNumericOffset() to retrieve a long value from PubSubPosition, ensuring consistency with the new offset type.
long sourceTopicOffset = consumerRecord.getOffset().getNumericOffset();
@sushantmane sushantmane force-pushed the li-pubsub-message-with-position branch from f98a60d to 4eee3c6 Compare February 28, 2025 10:37
@sushantmane sushantmane changed the title [server][dvc][controller][vpj] Replace PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> with PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> [server][dvc][controller][vpj][cc] Replace Long offset with PubSubPosition in PubSubMessage Feb 28, 2025
@sushantmane sushantmane changed the title [server][dvc][controller][vpj][cc] Replace Long offset with PubSubPosition in PubSubMessage [server][dvc][controller] Replace Long offset with PubSubPosition in PubSubMessage Feb 28, 2025
@sushantmane sushantmane changed the title [server][dvc][controller] Replace Long offset with PubSubPosition in PubSubMessage [server][dvc] Replace Long offset with PubSubPosition in PubSubMessage Feb 28, 2025
@sushantmane sushantmane force-pushed the li-pubsub-message-with-position branch from 4eee3c6 to d8c89ac Compare February 28, 2025 11:53
@sushantmane sushantmane marked this pull request as ready for review February 28, 2025 11:54
@sushantmane sushantmane requested a review from Copilot February 28, 2025 11:55

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This PR fully replaces usages of Long offsets with PubSubPosition in PubSubMessage and its related processing logic in several modules. Key changes include:

  • Updating method signatures, constructors, and record processing logic to use PubSubPosition and its getNumericOffset() accessor.
  • Refactoring of multiple files to consistently propagate offset conversions and remove legacy ApacheKafkaOffsetPosition usage.

Reviewed Changes

File Description
clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java Updated bootstrap record handling with new PubSubPosition wrapping and a new TODO regarding the zero-offset case.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java Changed delegate and processing methods to extract numeric offsets via getNumericOffset() from PubSubPosition.
Other files in the client package Applied similar refactorings to method signatures, variable types, and documentation to replace Long with PubSubPosition.

Copilot reviewed 74 out of 74 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (2)

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java:313

  • [nitpick] Consider replacing the magic 'zero-offset' value with a well-defined constant (e.g., PubSubPosition.EARLIEST) to improve code clarity and prevent future misinterpretations.
+        /**
+         * TODO: Should we introduce a magic position to handle the zero-offset case?
+         * Or use {@link com.linkedin.venice.pubsub.api.PubSubPosition.EARLIEST} as an alternative.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java:482

  • Ensure that invoking getNumericOffset() consistently returns the correct long value across all code paths and consider reviewing related tests to verify that the new offset conversion behaves as expected.
long sourceOffset = consumerRecord.getOffset().getNumericOffset();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant