Skip to content

Commit

Permalink
Replace PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> with PubS…
Browse files Browse the repository at this point in the history
…ubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>
  • Loading branch information
sushantmane committed Feb 27, 2025
1 parent 2ac7854 commit f98a60d
Show file tree
Hide file tree
Showing 70 changed files with 805 additions and 573 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.davinci.consumer;

import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import java.util.Objects;

Expand All @@ -19,7 +19,7 @@ public ImmutableChangeCapturePubSubMessage(
K key,
V value,
PubSubTopicPartition topicPartition,
long offset,
PubSubPosition pubSubPosition,
long timestamp,
int payloadSize,
boolean isEndOfBootstrap) {
Expand All @@ -30,7 +30,7 @@ public ImmutableChangeCapturePubSubMessage(
this.payloadSize = payloadSize;
this.offset = new VeniceChangeCoordinate(
this.topicPartition.getPubSubTopic().getName(),
new ApacheKafkaOffsetPosition(offset),
pubSubPosition,
this.topicPartition.getPartitionNumber());
this.isEndOfBootstrap = isEndOfBootstrap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,11 @@ void onRecordReceivedFromStorage(
keyDeserializer.deserialize(key),
changeEvent,
getTopicPartition(partition),
0,
/**
* TODO: Should we introduce a magic position to handle the zero-offset case?
* Or use {@link com.linkedin.venice.pubsub.api.PubSubPosition.EARLIEST} as an alternative.
*/
ApacheKafkaOffsetPosition.getKafkaPosition(0),
0,
value.length * 8,
false);
Expand All @@ -328,7 +332,15 @@ void onCompletionForStorage(
bootstrapCompletedCount++;
if (bootstrapCompletedCount == bootstrapStateMap.size()) {
// Add a dummy record to mark the end of the bootstrap.
resultSet.add(new ImmutableChangeCapturePubSubMessage<>(null, null, getTopicPartition(partition), 0, 0, 0, true));
resultSet.add(
new ImmutableChangeCapturePubSubMessage<>(
null,
null,
getTopicPartition(partition),
ApacheKafkaOffsetPosition.getKafkaPosition(0),
0,
0,
true));
}

// Notify that we've caught up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.lazy.Lazy;
Expand Down Expand Up @@ -123,7 +123,7 @@ protected CompletableFuture<Void> internalSeekToEndOfPush(Set<Integer> partition
// filter out from the user
PubSubConsumerAdapter consumerAdapter = internalSeekConsumer.get().getPubSubConsumer();

Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> polledResults;
Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>>> polledResults;
Map<Integer, Boolean> endOfPushConsumedPerPartitionMap = new HashMap<>();
Set<VeniceChangeCoordinate> checkpoints = new HashSet<>();

Expand All @@ -141,11 +141,11 @@ protected CompletableFuture<Void> internalSeekToEndOfPush(Set<Integer> partition
counter++;
polledResults = consumerAdapter.poll(5000L);
// Loop through all polled messages
for (Map.Entry<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> entry: polledResults
for (Map.Entry<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>>> entry: polledResults
.entrySet()) {
PubSubTopicPartition pubSubTopicPartition = entry.getKey();
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> messageList = entry.getValue();
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message: messageList) {
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>> messageList = entry.getValue();
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> message: messageList) {
if (message.getKey().isControlMessage()) {
ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion();
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
Expand All @@ -155,7 +155,7 @@ protected CompletableFuture<Void> internalSeekToEndOfPush(Set<Integer> partition
endOfPushConsumedPerPartitionMap.put(pubSubTopicPartition.getPartitionNumber(), true);
VeniceChangeCoordinate coordinate = new VeniceChangeCoordinate(
pubSubTopicPartition.getPubSubTopic().getName(),
new ApacheKafkaOffsetPosition(message.getOffset()),
message.getOffset(),
pubSubTopicPartition.getPartitionNumber());
checkpoints.add(coordinate);
Set<Integer> unsubSet = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.exceptions.PubSubTopicDoesNotExistException;
Expand Down Expand Up @@ -611,15 +612,15 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
String topicSuffix,
boolean includeControlMessage) {
List<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessages = new ArrayList<>();
Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> messagesMap;
Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>>> messagesMap;
synchronized (pubSubConsumer) {
messagesMap = pubSubConsumer.poll(timeoutInMs);
}
for (Map.Entry<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> entry: messagesMap
for (Map.Entry<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>>> entry: messagesMap
.entrySet()) {
PubSubTopicPartition pubSubTopicPartition = entry.getKey();
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> messageList = entry.getValue();
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message: messageList) {
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>> messageList = entry.getValue();
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> message: messageList) {
maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition);
if (message.getKey().isControlMessage()) {
ControlMessage controlMessage = (ControlMessage) message.getValue().getPayloadUnion();
Expand Down Expand Up @@ -672,7 +673,7 @@ protected Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> i
}

void maybeUpdatePartitionToBootstrapMap(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> message,
PubSubTopicPartition pubSubTopicPartition) {
if (getSubscribeTime() - message.getValue().producerMetadata.messageTimestamp <= TimeUnit.MINUTES.toMillis(1)) {
getPartitionToBootstrapState().put(pubSubTopicPartition.getPartitionNumber(), true);
Expand Down Expand Up @@ -745,7 +746,7 @@ protected <T> T processRecordBytes(
}

protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> convertPubSubMessageToPubSubChangeEventMessage(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> message,
PubSubTopicPartition pubSubTopicPartition) {
Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubChangeEventMessage = Optional.empty();
byte[] keyBytes = message.getKey().getKey();
Expand Down Expand Up @@ -811,7 +812,7 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
put.getSchemaId(),
keyBytes,
put.getPutValue(),
message.getOffset(),
message.getOffset().getNumericOffset(),
deserializerProvider,
readerSchemaId,
compressor);
Expand All @@ -828,7 +829,7 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
put.getPutValue(),
pubSubTopicPartition,
readerSchemaId,
message.getOffset());
message.getOffset().getNumericOffset());
} catch (Exception ex) {
throw new VeniceException(ex);
}
Expand Down Expand Up @@ -960,7 +961,7 @@ private PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> convertChangeEv
RecordChangeEvent recordChangeEvent,
K currentKey,
PubSubTopicPartition pubSubTopicPartition,
Long offset,
PubSubPosition pubSubPosition,
Long timestamp,
int payloadSize) {
V currentValue = null;
Expand All @@ -980,7 +981,7 @@ private PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> convertChangeEv
currentKey,
changeEvent,
pubSubTopicPartition,
offset,
pubSubPosition,
timestamp,
payloadSize,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.service.AbstractVeniceService;
Expand Down Expand Up @@ -36,7 +37,7 @@ public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTop
public abstract void startConsumptionIntoDataReceiver(
PartitionReplicaIngestionContext partitionReplicaIngestionContext,
long lastReadOffset,
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver);
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition>>> consumedDataReceiver);

public abstract long getOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.service.AbstractVeniceService;
import java.util.concurrent.CompletableFuture;
Expand All @@ -13,7 +14,7 @@
*/
public abstract class AbstractStoreBufferService extends AbstractVeniceService {
public abstract void putConsumerRecord(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> consumerRecord,
StoreIngestionTask ingestionTask,
LeaderProducedRecordContext leaderProducedRecordContext,
int partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
Expand All @@ -16,7 +17,7 @@ public class ActiveActiveProducerCallback extends LeaderProducerCallback {

public ActiveActiveProducerCallback(
LeaderFollowerStoreIngestionTask ingestionTask,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> sourceConsumerRecord,
PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> sourceConsumerRecord,
PartitionConsumptionState partitionConsumptionState,
LeaderProducedRecordContext leaderProducedRecordContext,
int partition,
Expand Down
Loading

0 comments on commit f98a60d

Please sign in to comment.