Skip to content

Commit

Permalink
Further changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Feb 28, 2025
1 parent 30d4522 commit d8c89ac
Show file tree
Hide file tree
Showing 32 changed files with 282 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ void onRecordReceivedFromStorage(
* TODO: Should we introduce a magic position to handle the zero-offset case?
* Or use {@link com.linkedin.venice.pubsub.api.PubSubPosition.EARLIEST} as an alternative.
*/
ApacheKafkaOffsetPosition.getKafkaPosition(0),
ApacheKafkaOffsetPosition.of(0),
0,
value.length * 8,
false);
Expand All @@ -337,7 +337,7 @@ void onCompletionForStorage(
null,
null,
getTopicPartition(partition),
ApacheKafkaOffsetPosition.getKafkaPosition(0),
ApacheKafkaOffsetPosition.of(0),
0,
0,
true));
Expand Down Expand Up @@ -562,7 +562,7 @@ void setStorageAndMetadataService(StorageService storageService, StorageMetadata
* Helper method to get offset in long value from VeniceChangeCoordinate.
*/
private long getOffset(VeniceChangeCoordinate veniceChangeCoordinate) {
return ((ApacheKafkaOffsetPosition) (veniceChangeCoordinate.getPosition())).getOffset();
return veniceChangeCoordinate.getPosition().getNumericOffset();
}

enum PollState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> chec
PubSubTopic topic = pubSubTopicRepository.getTopic(coordinate.getTopic());
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(topic, coordinate.getPartition());
internalSeek(Collections.singleton(coordinate.getPartition()), topic, foo -> {
Long topicOffset = ((ApacheKafkaOffsetPosition) coordinate.getPosition()).getOffset();
Long topicOffset = coordinate.getPosition().getNumericOffset();
pubSubConsumerSeek(pubSubTopicPartition, topicOffset);
}).join();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
Expand Down Expand Up @@ -330,10 +331,8 @@ public void testProcessRecordBytes_UpdatesBootstrapStateMap() throws IOException
bootstrappingVeniceChangelogConsumer.getBootstrapStateMap();
InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState bootstrapState =
new InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState();
bootstrapState.currentPubSubPosition = new VeniceChangeCoordinate(
TEST_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET_OLD),
TEST_PARTITION_ID_0);
bootstrapState.currentPubSubPosition =
new VeniceChangeCoordinate(TEST_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET_OLD), TEST_PARTITION_ID_0);
bootstrapStateMap.put(TEST_PARTITION_ID_0, bootstrapState);

ByteBuffer decompressedBytes = compressor.decompress(value);
Expand All @@ -347,8 +346,7 @@ public void testProcessRecordBytes_UpdatesBootstrapStateMap() throws IOException
TEST_OFFSET_NEW);

Assert.assertEquals(
((ApacheKafkaOffsetPosition) bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition())
.getOffset(),
bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition().getNumericOffset(),
TEST_OFFSET_NEW);
verify(mockStorageEngine, times(1)).put(eq(TEST_PARTITION_ID_0), eq(key), any(byte[].class));
}
Expand Down Expand Up @@ -385,10 +383,8 @@ public void testProcessRecordBytes_SyncOffsetAndUpdatesBootstrapStateMap() throw
bootstrappingVeniceChangelogConsumer.getBootstrapStateMap();
InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState bootstrapState =
new InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState();
bootstrapState.currentPubSubPosition = new VeniceChangeCoordinate(
TEST_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET_OLD),
TEST_PARTITION_ID_0);
bootstrapState.currentPubSubPosition =
new VeniceChangeCoordinate(TEST_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET_OLD), TEST_PARTITION_ID_0);
bootstrapStateMap.put(TEST_PARTITION_ID_0, bootstrapState);

ByteBuffer decompressedBytes = compressor.decompress(value);
Expand All @@ -402,8 +398,7 @@ public void testProcessRecordBytes_SyncOffsetAndUpdatesBootstrapStateMap() throw
TEST_OFFSET_NEW);

Assert.assertEquals(
((ApacheKafkaOffsetPosition) bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition())
.getOffset(),
bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition().getNumericOffset(),
TEST_OFFSET_NEW);
verify(storageEngine, times(1)).put(eq(TEST_PARTITION_ID_0), eq(key), any(byte[].class));
verify(storageEngineReloadedFromRepo, times(1)).sync(TEST_PARTITION_ID_0);
Expand Down Expand Up @@ -437,7 +432,7 @@ public void testStart_InvalidLocalCheckpoint_Throws() throws Exception {
VeniceChangeCoordinate.convertVeniceChangeCoordinateToStringAndEncode(
new VeniceChangeCoordinate(
TEST_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET_NEW),
ApacheKafkaOffsetPosition.of(TEST_OFFSET_NEW),
TEST_PARTITION_ID_2)));
lastOffsetRecord.setDatabaseInfo(databaseInfo);
when(mockStorageMetadataService.getLastOffset(anyString(), anyInt())).thenReturn(lastOffsetRecord);
Expand Down Expand Up @@ -538,6 +533,12 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructC
null);
KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key));
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(changeCaptureVersionTopic, partition);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);
return new ImmutablePubSubMessage<>(
kafkaKey,
kafkaMessageEnvelope,
pubSubTopicPartition,
mock(PubSubPosition.class),
0,
0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class TestVeniceChangeCoordinate {

@Test
public void testReadAndWriteExternal() throws IOException, ClassNotFoundException {
PubSubPosition position = ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET);
PubSubPosition position = ApacheKafkaOffsetPosition.of(TEST_OFFSET);
VeniceChangeCoordinate veniceChangeCoordinate =
new VeniceChangeCoordinate(TEST_STORE_TOPIC, position, TEST_PARTITION);

Expand All @@ -46,38 +46,26 @@ public void testReadAndWriteExternal() throws IOException, ClassNotFoundExceptio

@Test
public void testComparePosition() {
VeniceChangeCoordinate veniceChangeCoordinate = new VeniceChangeCoordinate(
TEST_STORE_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET),
TEST_PARTITION);
VeniceChangeCoordinate veniceChangeCoordinate_1 = new VeniceChangeCoordinate(
TEST_STORE_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET),
TEST_PARTITION);
VeniceChangeCoordinate veniceChangeCoordinate =
new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION);
VeniceChangeCoordinate veniceChangeCoordinate_1 =
new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION);
Assert.assertEquals(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_1), 0);

VeniceChangeCoordinate veniceChangeCoordinate_2 = new VeniceChangeCoordinate(
TEST_STORE_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET),
TEST_PARTITION + 1);
VeniceChangeCoordinate veniceChangeCoordinate_2 =
new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION + 1);
Assert.assertThrows(VeniceException.class, () -> veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_2));

VeniceChangeCoordinate veniceChangeCoordinate_3 = new VeniceChangeCoordinate(
TEST_STORE_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET - 1),
TEST_PARTITION);
VeniceChangeCoordinate veniceChangeCoordinate_3 =
new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET - 1), TEST_PARTITION);
Assert.assertTrue(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_3) > 0);

VeniceChangeCoordinate veniceChangeCoordinate_4 = new VeniceChangeCoordinate(
TEST_STORE_TOPIC,
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET + 1),
TEST_PARTITION);
VeniceChangeCoordinate veniceChangeCoordinate_4 =
new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET + 1), TEST_PARTITION);
Assert.assertTrue(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_4) < 0);

VeniceChangeCoordinate veniceChangeCoordinate_5 = new VeniceChangeCoordinate(
TEST_STORE_TOPIC + "v2",
ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET),
TEST_PARTITION);
VeniceChangeCoordinate veniceChangeCoordinate_5 =
new VeniceChangeCoordinate(TEST_STORE_TOPIC + "v2", ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION);
Assert.assertTrue(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_5) < 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ public class VeniceChangelogConsumerImplTest {
private RecordSerializer<String> valueSerializer;
private Schema rmdSchema;
private SchemaReader schemaReader;
private PubSubPosition mockPubSubPosition;
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private final Schema valueSchema = AvroCompatibilityHelper.parse("\"string\"");

@BeforeMethod
public void setUp() {
storeName = Utils.getUniqueString();
mockPubSubPosition = mock(PubSubPosition.class);
schemaReader = mock(SchemaReader.class);
Schema keySchema = AvroCompatibilityHelper.parse("\"string\"");
doReturn(keySchema).when(schemaReader).getKeySchema();
Expand Down Expand Up @@ -268,8 +270,13 @@ public void testBootstrapState() {
kafkaMessageEnvelope.producerMetadata = new ProducerMetadata();
kafkaMessageEnvelope.producerMetadata.messageTimestamp = currentTimestamp - TimeUnit.MINUTES.toMillis(2);
kafkaMessageEnvelope.payloadUnion = controlMessage;
PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> message =
new ImmutablePubSubMessage<>(KafkaKey.HEART_BEAT, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);
PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> message = new ImmutablePubSubMessage<>(
KafkaKey.HEART_BEAT,
kafkaMessageEnvelope,
pubSubTopicPartition,
mockPubSubPosition,
0,
0);
doReturn(currentTimestamp).when(veniceChangelogConsumer).getSubscribeTime();
veniceChangelogConsumer.maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition);
Assert.assertFalse(bootstrapStateMap.get(0));
Expand Down Expand Up @@ -610,7 +617,7 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructV
controlMessage.controlMessageType = ControlMessageType.VERSION_SWAP.getValue();
kafkaMessageEnvelope.payloadUnion = controlMessage;
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0);
}

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructChangeCaptureConsumerRecord(
Expand Down Expand Up @@ -644,7 +651,7 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructC
kafkaMessageEnvelope.setProducerMetadata(producerMetadata);
KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key));
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(changeCaptureVersionTopic, partition);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0);
}

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructConsumerRecord(
Expand All @@ -667,7 +674,7 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructC
null);
KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key));
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(changeCaptureVersionTopic, partition);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0);
}

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructEndOfPushMessage(
Expand All @@ -685,7 +692,9 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructE
controlMessage.controlMessageType = ControlMessageType.END_OF_PUSH.getValue();
kafkaMessageEnvelope.payloadUnion = controlMessage;
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, offset, 0, 0);
PubSubPosition mockPubSubPosition = mock(PubSubPosition.class);
doReturn(offset).when(mockPubSubPosition).getNumericOffset();
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0);
}

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructStartOfPushMessage(
Expand All @@ -703,7 +712,7 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> constructS
controlMessage.controlMessageType = ControlMessageType.START_OF_PUSH.getValue();
kafkaMessageEnvelope.payloadUnion = controlMessage;
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0);
}

private ChangelogClientConfig getChangelogClientConfig(D2ControllerClient d2ControllerClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testHandleDeleteBeforeEOP() {
when(consumerRecord.getKey()).thenReturn(kafkaKey);
KafkaMessageEnvelope kafkaValue = new KafkaMessageEnvelope();
when(consumerRecord.getValue()).thenReturn(kafkaValue);
when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.getKafkaPosition(1));
when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.of(1));
kafkaValue.messageType = MessageType.DELETE.getValue();
Delete deletePayload = new Delete();
kafkaValue.payloadUnion = deletePayload;
Expand Down Expand Up @@ -384,7 +384,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio
ByteBuffer updatedValueBytes = ByteUtils.prependIntHeaderToByteBuffer(valueBytes, 1);
ByteBuffer updatedRmdBytes = ByteBuffer.wrap(new byte[] { 0xa, 0xb });
PubSubMessage<KafkaKey, KafkaMessageEnvelope, PubSubPosition> consumerRecord = mock(PubSubMessage.class);
when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.getKafkaPosition(100L));
when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.of(100L));

Put updatedPut = new Put();
updatedPut.putValue = ByteUtils.prependIntHeaderToByteBuffer(updatedValueBytes, valueSchemaId, resultReuseInput);
Expand Down Expand Up @@ -620,7 +620,7 @@ public void testUnwrapByteBufferFromOldValueProvider() {
public void testGetUpstreamKafkaUrlFromKafkaValue() {
PubSubTopicPartition partition = new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic("topic"), 0);
long offset = 100;
PubSubPosition position = ApacheKafkaOffsetPosition.getKafkaPosition(offset);
PubSubPosition position = ApacheKafkaOffsetPosition.of(offset);
long timestamp = System.currentTimeMillis();
int payloadSize = 200;
String sourceKafka = "sourceKafkaURL";
Expand Down
Loading

0 comments on commit d8c89ac

Please sign in to comment.