From d8c89ac2bcadaf1f09e9ba99c8c8659c5c446044 Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Fri, 28 Feb 2025 02:07:16 -0800 Subject: [PATCH] Further changes --- ...lBootstrappingVeniceChangelogConsumer.java | 6 +-- .../consumer/VeniceChangelogConsumerImpl.java | 2 +- ...tstrappingVeniceChangelogConsumerTest.java | 29 +++++----- .../consumer/TestVeniceChangeCoordinate.java | 38 +++++-------- .../VeniceChangelogConsumerImplTest.java | 23 +++++--- .../ActiveActiveStoreIngestionTaskTest.java | 6 +-- .../consumer/IngestionBatchProcessorTest.java | 38 ++++++------- .../KafkaClusterBasedRecordThrottlerTest.java | 1 + .../consumer/KafkaConsumerServiceTest.java | 1 + .../LeaderFollowerStoreIngestionTaskTest.java | 3 +- .../consumer/LeaderProducerCallbackTest.java | 1 + .../consumer/SharedKafkaConsumerTest.java | 1 + .../consumer/StoreBufferServiceTest.java | 53 ++++++++++++------- .../consumer/StoreIngestionTaskTest.java | 45 +++++++++++----- .../KafkaDataIntegrityValidatorTest.java | 10 +++- .../validation/TestPartitionTracker.java | 28 +++++----- .../venice/TestAdminToolConsumption.java | 38 ++++++++++--- .../linkedin/venice/TestKafkaTopicDumper.java | 14 ++--- .../kafka/KafkaInputRecordReaderTest.java | 11 +++- .../venice/pubsub/ImmutablePubSubMessage.java | 2 +- .../kafka/ApacheKafkaOffsetPosition.java | 11 +++- .../venice/pubsub/api/PubSubPosition.java | 18 ++++++- .../memory/InstanceSizeEstimatorTest.java | 2 +- .../pubsub/PubSubPositionFactoryTest.java | 4 +- .../kafka/ApacheKafkaOffsetPositionTest.java | 42 +++++++-------- .../ApacheKafkaConsumerAdapterTest.java | 2 +- .../manager/TopicMetadataFetcherTest.java | 4 +- .../venice/utils/DictionaryUtilsTest.java | 8 +-- .../consumer/TestChangelogConsumer.java | 2 +- .../venice/endToEnd/PartialUpdateTest.java | 5 +- .../consumer/poll/AbstractPollStrategy.java | 2 +- .../venice/utils/ChunkingTestUtils.java | 10 ++-- 32 files changed, 282 insertions(+), 178 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java index c71f221656..fbdf81fc85 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java @@ -314,7 +314,7 @@ void onRecordReceivedFromStorage( * TODO: Should we introduce a magic position to handle the zero-offset case? * Or use {@link com.linkedin.venice.pubsub.api.PubSubPosition.EARLIEST} as an alternative. */ - ApacheKafkaOffsetPosition.getKafkaPosition(0), + ApacheKafkaOffsetPosition.of(0), 0, value.length * 8, false); @@ -337,7 +337,7 @@ void onCompletionForStorage( null, null, getTopicPartition(partition), - ApacheKafkaOffsetPosition.getKafkaPosition(0), + ApacheKafkaOffsetPosition.of(0), 0, 0, true)); @@ -562,7 +562,7 @@ void setStorageAndMetadataService(StorageService storageService, StorageMetadata * Helper method to get offset in long value from VeniceChangeCoordinate. */ private long getOffset(VeniceChangeCoordinate veniceChangeCoordinate) { - return ((ApacheKafkaOffsetPosition) (veniceChangeCoordinate.getPosition())).getOffset(); + return veniceChangeCoordinate.getPosition().getNumericOffset(); } enum PollState { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index 22d858be80..d096f78713 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -426,7 +426,7 @@ public CompletableFuture seekToCheckpoint(Set chec PubSubTopic topic = pubSubTopicRepository.getTopic(coordinate.getTopic()); PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(topic, coordinate.getPartition()); internalSeek(Collections.singleton(coordinate.getPartition()), topic, foo -> { - Long topicOffset = ((ApacheKafkaOffsetPosition) coordinate.getPosition()).getOffset(); + Long topicOffset = coordinate.getPosition().getNumericOffset(); pubSubConsumerSeek(pubSubTopicPartition, topicOffset); }).join(); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java index 59b5164f98..8bacac2e29 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java @@ -47,6 +47,7 @@ import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.SchemaEntry; @@ -330,10 +331,8 @@ public void testProcessRecordBytes_UpdatesBootstrapStateMap() throws IOException bootstrappingVeniceChangelogConsumer.getBootstrapStateMap(); InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState bootstrapState = new InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState(); - bootstrapState.currentPubSubPosition = new VeniceChangeCoordinate( - TEST_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET_OLD), - TEST_PARTITION_ID_0); + bootstrapState.currentPubSubPosition = + new VeniceChangeCoordinate(TEST_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET_OLD), TEST_PARTITION_ID_0); bootstrapStateMap.put(TEST_PARTITION_ID_0, bootstrapState); ByteBuffer decompressedBytes = compressor.decompress(value); @@ -347,8 +346,7 @@ public void testProcessRecordBytes_UpdatesBootstrapStateMap() throws IOException TEST_OFFSET_NEW); Assert.assertEquals( - ((ApacheKafkaOffsetPosition) bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition()) - .getOffset(), + bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition().getNumericOffset(), TEST_OFFSET_NEW); verify(mockStorageEngine, times(1)).put(eq(TEST_PARTITION_ID_0), eq(key), any(byte[].class)); } @@ -385,10 +383,8 @@ public void testProcessRecordBytes_SyncOffsetAndUpdatesBootstrapStateMap() throw bootstrappingVeniceChangelogConsumer.getBootstrapStateMap(); InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState bootstrapState = new InternalLocalBootstrappingVeniceChangelogConsumer.BootstrapState(); - bootstrapState.currentPubSubPosition = new VeniceChangeCoordinate( - TEST_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET_OLD), - TEST_PARTITION_ID_0); + bootstrapState.currentPubSubPosition = + new VeniceChangeCoordinate(TEST_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET_OLD), TEST_PARTITION_ID_0); bootstrapStateMap.put(TEST_PARTITION_ID_0, bootstrapState); ByteBuffer decompressedBytes = compressor.decompress(value); @@ -402,8 +398,7 @@ public void testProcessRecordBytes_SyncOffsetAndUpdatesBootstrapStateMap() throw TEST_OFFSET_NEW); Assert.assertEquals( - ((ApacheKafkaOffsetPosition) bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition()) - .getOffset(), + bootstrapStateMap.get(TEST_PARTITION_ID_0).currentPubSubPosition.getPosition().getNumericOffset(), TEST_OFFSET_NEW); verify(storageEngine, times(1)).put(eq(TEST_PARTITION_ID_0), eq(key), any(byte[].class)); verify(storageEngineReloadedFromRepo, times(1)).sync(TEST_PARTITION_ID_0); @@ -437,7 +432,7 @@ public void testStart_InvalidLocalCheckpoint_Throws() throws Exception { VeniceChangeCoordinate.convertVeniceChangeCoordinateToStringAndEncode( new VeniceChangeCoordinate( TEST_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET_NEW), + ApacheKafkaOffsetPosition.of(TEST_OFFSET_NEW), TEST_PARTITION_ID_2))); lastOffsetRecord.setDatabaseInfo(databaseInfo); when(mockStorageMetadataService.getLastOffset(anyString(), anyInt())).thenReturn(lastOffsetRecord); @@ -538,6 +533,12 @@ private PubSubMessage constructC null); KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key)); PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(changeCaptureVersionTopic, partition); - return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0); + return new ImmutablePubSubMessage<>( + kafkaKey, + kafkaMessageEnvelope, + pubSubTopicPartition, + mock(PubSubPosition.class), + 0, + 0); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TestVeniceChangeCoordinate.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TestVeniceChangeCoordinate.java index 364318c8c3..5b7aa3fc7f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TestVeniceChangeCoordinate.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/TestVeniceChangeCoordinate.java @@ -21,7 +21,7 @@ public class TestVeniceChangeCoordinate { @Test public void testReadAndWriteExternal() throws IOException, ClassNotFoundException { - PubSubPosition position = ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET); + PubSubPosition position = ApacheKafkaOffsetPosition.of(TEST_OFFSET); VeniceChangeCoordinate veniceChangeCoordinate = new VeniceChangeCoordinate(TEST_STORE_TOPIC, position, TEST_PARTITION); @@ -46,38 +46,26 @@ public void testReadAndWriteExternal() throws IOException, ClassNotFoundExceptio @Test public void testComparePosition() { - VeniceChangeCoordinate veniceChangeCoordinate = new VeniceChangeCoordinate( - TEST_STORE_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET), - TEST_PARTITION); - VeniceChangeCoordinate veniceChangeCoordinate_1 = new VeniceChangeCoordinate( - TEST_STORE_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET), - TEST_PARTITION); + VeniceChangeCoordinate veniceChangeCoordinate = + new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION); + VeniceChangeCoordinate veniceChangeCoordinate_1 = + new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION); Assert.assertEquals(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_1), 0); - VeniceChangeCoordinate veniceChangeCoordinate_2 = new VeniceChangeCoordinate( - TEST_STORE_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET), - TEST_PARTITION + 1); + VeniceChangeCoordinate veniceChangeCoordinate_2 = + new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION + 1); Assert.assertThrows(VeniceException.class, () -> veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_2)); - VeniceChangeCoordinate veniceChangeCoordinate_3 = new VeniceChangeCoordinate( - TEST_STORE_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET - 1), - TEST_PARTITION); + VeniceChangeCoordinate veniceChangeCoordinate_3 = + new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET - 1), TEST_PARTITION); Assert.assertTrue(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_3) > 0); - VeniceChangeCoordinate veniceChangeCoordinate_4 = new VeniceChangeCoordinate( - TEST_STORE_TOPIC, - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET + 1), - TEST_PARTITION); + VeniceChangeCoordinate veniceChangeCoordinate_4 = + new VeniceChangeCoordinate(TEST_STORE_TOPIC, ApacheKafkaOffsetPosition.of(TEST_OFFSET + 1), TEST_PARTITION); Assert.assertTrue(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_4) < 0); - VeniceChangeCoordinate veniceChangeCoordinate_5 = new VeniceChangeCoordinate( - TEST_STORE_TOPIC + "v2", - ApacheKafkaOffsetPosition.getKafkaPosition(TEST_OFFSET), - TEST_PARTITION); + VeniceChangeCoordinate veniceChangeCoordinate_5 = + new VeniceChangeCoordinate(TEST_STORE_TOPIC + "v2", ApacheKafkaOffsetPosition.of(TEST_OFFSET), TEST_PARTITION); Assert.assertTrue(veniceChangeCoordinate.comparePosition(veniceChangeCoordinate_5) < 0); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 9847005507..c2e2fdbf4c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -84,12 +84,14 @@ public class VeniceChangelogConsumerImplTest { private RecordSerializer valueSerializer; private Schema rmdSchema; private SchemaReader schemaReader; + private PubSubPosition mockPubSubPosition; private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); private final Schema valueSchema = AvroCompatibilityHelper.parse("\"string\""); @BeforeMethod public void setUp() { storeName = Utils.getUniqueString(); + mockPubSubPosition = mock(PubSubPosition.class); schemaReader = mock(SchemaReader.class); Schema keySchema = AvroCompatibilityHelper.parse("\"string\""); doReturn(keySchema).when(schemaReader).getKeySchema(); @@ -268,8 +270,13 @@ public void testBootstrapState() { kafkaMessageEnvelope.producerMetadata = new ProducerMetadata(); kafkaMessageEnvelope.producerMetadata.messageTimestamp = currentTimestamp - TimeUnit.MINUTES.toMillis(2); kafkaMessageEnvelope.payloadUnion = controlMessage; - PubSubMessage message = - new ImmutablePubSubMessage<>(KafkaKey.HEART_BEAT, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0); + PubSubMessage message = new ImmutablePubSubMessage<>( + KafkaKey.HEART_BEAT, + kafkaMessageEnvelope, + pubSubTopicPartition, + mockPubSubPosition, + 0, + 0); doReturn(currentTimestamp).when(veniceChangelogConsumer).getSubscribeTime(); veniceChangelogConsumer.maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition); Assert.assertFalse(bootstrapStateMap.get(0)); @@ -610,7 +617,7 @@ private PubSubMessage constructV controlMessage.controlMessageType = ControlMessageType.VERSION_SWAP.getValue(); kafkaMessageEnvelope.payloadUnion = controlMessage; PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition); - return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0); + return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0); } private PubSubMessage constructChangeCaptureConsumerRecord( @@ -644,7 +651,7 @@ private PubSubMessage constructC kafkaMessageEnvelope.setProducerMetadata(producerMetadata); KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key)); PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(changeCaptureVersionTopic, partition); - return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0); + return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0); } private PubSubMessage constructConsumerRecord( @@ -667,7 +674,7 @@ private PubSubMessage constructC null); KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key)); PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(changeCaptureVersionTopic, partition); - return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0); + return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0); } private PubSubMessage constructEndOfPushMessage( @@ -685,7 +692,9 @@ private PubSubMessage constructE controlMessage.controlMessageType = ControlMessageType.END_OF_PUSH.getValue(); kafkaMessageEnvelope.payloadUnion = controlMessage; PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition); - return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, offset, 0, 0); + PubSubPosition mockPubSubPosition = mock(PubSubPosition.class); + doReturn(offset).when(mockPubSubPosition).getNumericOffset(); + return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0); } private PubSubMessage constructStartOfPushMessage( @@ -703,7 +712,7 @@ private PubSubMessage constructS controlMessage.controlMessageType = ControlMessageType.START_OF_PUSH.getValue(); kafkaMessageEnvelope.payloadUnion = controlMessage; PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition); - return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0); + return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, mockPubSubPosition, 0, 0); } private ChangelogClientConfig getChangelogClientConfig(D2ControllerClient d2ControllerClient) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index f267211d61..325d1af9fc 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -141,7 +141,7 @@ public void testHandleDeleteBeforeEOP() { when(consumerRecord.getKey()).thenReturn(kafkaKey); KafkaMessageEnvelope kafkaValue = new KafkaMessageEnvelope(); when(consumerRecord.getValue()).thenReturn(kafkaValue); - when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.getKafkaPosition(1)); + when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.of(1)); kafkaValue.messageType = MessageType.DELETE.getValue(); Delete deletePayload = new Delete(); kafkaValue.payloadUnion = deletePayload; @@ -384,7 +384,7 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio ByteBuffer updatedValueBytes = ByteUtils.prependIntHeaderToByteBuffer(valueBytes, 1); ByteBuffer updatedRmdBytes = ByteBuffer.wrap(new byte[] { 0xa, 0xb }); PubSubMessage consumerRecord = mock(PubSubMessage.class); - when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.getKafkaPosition(100L)); + when(consumerRecord.getOffset()).thenReturn(ApacheKafkaOffsetPosition.of(100L)); Put updatedPut = new Put(); updatedPut.putValue = ByteUtils.prependIntHeaderToByteBuffer(updatedValueBytes, valueSchemaId, resultReuseInput); @@ -620,7 +620,7 @@ public void testUnwrapByteBufferFromOldValueProvider() { public void testGetUpstreamKafkaUrlFromKafkaValue() { PubSubTopicPartition partition = new PubSubTopicPartitionImpl(TOPIC_REPOSITORY.getTopic("topic"), 0); long offset = 100; - PubSubPosition position = ApacheKafkaOffsetPosition.getKafkaPosition(offset); + PubSubPosition position = ApacheKafkaOffsetPosition.of(offset); long timestamp = System.currentTimeMillis(); int payloadSize = 200; String sourceKafka = "sourceKafkaURL"; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java index f5f677f5b5..e1bce6ce0f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/IngestionBatchProcessorTest.java @@ -26,6 +26,7 @@ import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.utils.DaemonThreadFactory; @@ -52,28 +53,28 @@ public void isAllMessagesFromRTTopicTest() { mock(KafkaKey.class), mock(KafkaMessageEnvelope.class), versionTopicPartition, - 1, + mock(PubSubPosition.class), 100, 100); PubSubMessage vtMessage2 = new ImmutablePubSubMessage<>( mock(KafkaKey.class), mock(KafkaMessageEnvelope.class), versionTopicPartition, - 2, + mock(PubSubPosition.class), 101, 100); PubSubMessage rtMessage1 = new ImmutablePubSubMessage<>( mock(KafkaKey.class), mock(KafkaMessageEnvelope.class), rtTopicPartition, - 1, + mock(PubSubPosition.class), 100, 100); PubSubMessage rtMessage2 = new ImmutablePubSubMessage<>( mock(KafkaKey.class), mock(KafkaMessageEnvelope.class), rtTopicPartition, - 2, + mock(PubSubPosition.class), 101, 100); @@ -98,14 +99,14 @@ public void lockKeysTest() { new KafkaKey(MessageType.PUT, key1), mock(KafkaMessageEnvelope.class), rtTopicPartition, - 1, + mock(PubSubPosition.class), 100, 100); PubSubMessage rtMessage2 = new ImmutablePubSubMessage<>( new KafkaKey(MessageType.PUT, key2), mock(KafkaMessageEnvelope.class), rtTopicPartition, - 2, + mock(PubSubPosition.class), 101, 100); @@ -163,14 +164,14 @@ public void processTest() { new KafkaKey(MessageType.PUT, key1), mock(KafkaMessageEnvelope.class), rtTopicPartition, - 1, + mock(PubSubPosition.class), 100, 100); PubSubMessage rtMessage2 = new ImmutablePubSubMessage<>( new KafkaKey(MessageType.PUT, key2), mock(KafkaMessageEnvelope.class), rtTopicPartition, - 2, + mock(PubSubPosition.class), 101, 100); @@ -200,21 +201,22 @@ public void processTest() { mockAggVersionedIngestionStats, mockHostLevelIngestionStats); - List> result = batchProcessor.process( - Arrays.asList(rtMessage1, rtMessage2), - mock(PartitionConsumptionState.class), - 1, - "test_kafka", - 1, - 1, - 1); + List> result = + batchProcessor.process( + Arrays.asList(rtMessage1, rtMessage2), + mock(PartitionConsumptionState.class), + 1, + "test_kafka", + 1, + 1, + 1); assertEquals(result.size(), 2); - PubSubMessageProcessedResultWrapper resultForKey1 = result.get(0); + PubSubMessageProcessedResultWrapper resultForKey1 = result.get(0); assertEquals( resultForKey1.getProcessedResult().getWriteComputeResultWrapper().getNewPut().putValue.array(), "value1".getBytes()); - PubSubMessageProcessedResultWrapper resultForKey2 = result.get(1); + PubSubMessageProcessedResultWrapper resultForKey2 = result.get(1); assertEquals( resultForKey2.getProcessedResult().getWriteComputeResultWrapper().getNewPut().putValue.array(), "value2".getBytes()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaClusterBasedRecordThrottlerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaClusterBasedRecordThrottlerTest.java index d01adad51f..16101d40a6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaClusterBasedRecordThrottlerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaClusterBasedRecordThrottlerTest.java @@ -8,6 +8,7 @@ import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java index 1a262b9351..0f14f3983d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceTest.java @@ -29,6 +29,7 @@ import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index f9f24237da..25a04dbecf 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -39,6 +39,7 @@ import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -375,7 +376,7 @@ private PubSubMessageProcessedResultWrapper getMockMessage(int seqNumber) { ControlMessage controlMessage = mock(ControlMessage.class); doReturn(controlMessage).when(kafkaValue).getPayloadUnion(); doReturn(ControlMessageType.START_OF_SEGMENT.getValue()).when(controlMessage).getControlMessageType(); - doReturn((long) seqNumber).when(pubSubMessage).getOffset(); + doReturn(ApacheKafkaOffsetPosition.of(seqNumber)).when(pubSubMessage).getOffset(); return pubSubMessageProcessedResultWrapper; } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java index 4d342bbdd0..f3f57736c7 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallbackTest.java @@ -17,6 +17,7 @@ import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.InMemoryLogAppender; import com.linkedin.venice.utils.Utils; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java index c56630d58d..1e9356b14f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java @@ -14,6 +14,7 @@ import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.utils.SystemTime; diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java index 9b87e16754..dbd6e7980e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java @@ -25,6 +25,7 @@ import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.utils.DataProviderUtils; @@ -48,9 +49,11 @@ public class StoreBufferServiceTest { private static final int TIMEOUT_IN_MS = 1000; private final MetricsRepository mockMetricRepo = mock(MetricsRepository.class); private StoreBufferServiceStats mockedStats; + PubSubPosition mockPosition; @BeforeMethod public void setUp() { + mockPosition = mock(PubSubPosition.class); final Sensor mockSensor = mock(Sensor.class); doReturn(mockSensor).when(mockMetricRepo).sensor(anyString(), any()); mockedStats = mock(StoreBufferServiceStats.class); @@ -61,6 +64,7 @@ public void testRun(boolean queueLeaderWrites) throws Exception { StoreBufferService bufferService = new StoreBufferService(1, 10000, 1000, queueLeaderWrites, mockedStats); StoreIngestionTask mockTask = mock(StoreIngestionTask.class); String topic = Utils.getUniqueString("test_topic") + "_v1"; + PubSubPosition mockPosition = mock(PubSubPosition.class); int partition1 = 1; int partition2 = 2; int partition3 = 3; @@ -72,13 +76,13 @@ public void testRun(boolean queueLeaderWrites) throws Exception { PubSubTopicPartition pubSubTopicPartition4 = new PubSubTopicPartitionImpl(pubSubTopic, partition4); String kafkaUrl = "blah"; PubSubMessage cr1 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, mockPosition, 0, 0); PubSubMessage cr2 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, mockPosition, 0, 0); PubSubMessage cr3 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition3, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition3, mockPosition, 0, 0); PubSubMessage cr4 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition4, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition4, mockPosition, 0, 0); bufferService.putConsumerRecord(cr1, mockTask, null, partition1, kafkaUrl, 0L); bufferService.putConsumerRecord(cr2, mockTask, null, partition2, kafkaUrl, 0L); @@ -106,6 +110,7 @@ public void testRunWhenThrowException(boolean queueLeaderWrites) throws Exceptio StoreBufferService bufferService = new StoreBufferService(1, 10000, 1000, queueLeaderWrites, mockedStats); StoreIngestionTask mockTask = mock(StoreIngestionTask.class); String topic = Utils.getUniqueString("test_topic") + "_v1"; + PubSubPosition mockPosition = mock(PubSubPosition.class); int partition1 = 1; int partition2 = 2; PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topic); @@ -113,9 +118,9 @@ public void testRunWhenThrowException(boolean queueLeaderWrites) throws Exceptio PubSubTopicPartition pubSubTopicPartition2 = new PubSubTopicPartitionImpl(pubSubTopic, partition2); String kafkaUrl = "blah"; PubSubMessage cr1 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, mockPosition, 0, 0); PubSubMessage cr2 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, mockPosition, 0, 0); Exception e = new VeniceException("test_exception"); doThrow(e).when(mockTask).processConsumerRecord(cr1, null, partition1, kafkaUrl, 0L); @@ -136,12 +141,13 @@ public void testDrainBufferedRecordsWhenNotExists(boolean queueLeaderWrites) thr StoreBufferService bufferService = new StoreBufferService(1, 10000, 1000, queueLeaderWrites, mockedStats); StoreIngestionTask mockTask = mock(StoreIngestionTask.class); String topic = Utils.getUniqueString("test_topic") + "_v1"; + PubSubPosition mockPosition = mock(PubSubPosition.class); int partition = 1; PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topic); PubSubTopicPartition pubSubTopicPartition1 = new PubSubTopicPartitionImpl(pubSubTopic, partition); String kafkaUrl = "blah"; PubSubMessage cr = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, mockPosition, 0, 0); bufferService.start(); bufferService.putConsumerRecord(cr, mockTask, null, partition, kafkaUrl, 0L); int nonExistingPartition = 2; @@ -160,9 +166,10 @@ public void testDrainBufferedRecordsWhenExists(boolean queueLeaderWrites) throws int partition = 1; PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topic); PubSubTopicPartition pubSubTopicPartition1 = new PubSubTopicPartitionImpl(pubSubTopic, partition); + PubSubPosition mockPosition = mock(PubSubPosition.class); String kafkaUrl = "blah"; PubSubMessage cr = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, 100, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, mockPosition, 0, 0); bufferService.start(); bufferService.putConsumerRecord(cr, mockTask, null, partition, kafkaUrl, 0L); bufferService.internalDrainBufferedRecordsFromTopicPartition(pubSubTopicPartition1, 3, 50); @@ -187,8 +194,13 @@ public void testGetDrainerIndexForConsumerRecordSeparateDrainer(boolean queueLea doReturn(queueLeaderWrites).when(serverConfig).isStoreWriterBufferAfterLeaderLogicEnabled(); SeparatedStoreBufferService bufferService = new SeparatedStoreBufferService(serverConfig, mockMetricRepo); for (int partition = 0; partition < partitionCount; ++partition) { - PubSubMessage cr = - new ImmutablePubSubMessage<>(key, value, new PubSubTopicPartitionImpl(pubSubTopic, partition), 100, 0, 0); + PubSubMessage cr = new ImmutablePubSubMessage<>( + key, + value, + new PubSubTopicPartitionImpl(pubSubTopic, partition), + mock(PubSubPosition.class), + 0, + 0); int drainerIndex; if (partition < 16) { drainerIndex = bufferService.sortedStoreBufferServiceDelegate.getDrainerIndexForConsumerRecord(cr, partition); @@ -217,8 +229,13 @@ public void testGetDrainerIndexForConsumerRecord(boolean queueLeaderWrites) { } StoreBufferService bufferService = new StoreBufferService(8, 10000, 1000, queueLeaderWrites, mockedStats); for (int partition = 0; partition < partitionCount; ++partition) { - PubSubMessage cr = - new ImmutablePubSubMessage<>(key, value, new PubSubTopicPartitionImpl(pubSubTopic, partition), 100, 0, 0); + PubSubMessage cr = new ImmutablePubSubMessage<>( + key, + value, + new PubSubTopicPartitionImpl(pubSubTopic, partition), + mockPosition, + 0, + 0); int drainerIndex = bufferService.getDrainerIndexForConsumerRecord(cr, partition); ++drainerPartitionCount[drainerIndex]; } @@ -240,9 +257,9 @@ public void testRunWhenThrowVeniceCheckSumFailException(boolean queueLeaderWrite PubSubTopicPartition pubSubTopicPartition2 = new PubSubTopicPartitionImpl(pubSubTopic, partition2); String kafkaUrl = "blah"; PubSubMessage cr1 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, mockPosition, 0, 0); PubSubMessage cr2 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, -1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, mockPosition, 0, 0); Exception e = new VeniceChecksumException("test_exception"); doThrow(e).when(mockTask).processConsumerRecord(cr1, null, partition1, kafkaUrl, 0L); @@ -283,13 +300,13 @@ public void testPutConsumerRecord(boolean queueLeaderWrites) throws InterruptedE PubSubTopicPartition pubSubTopicPartition2 = new PubSubTopicPartitionImpl(pubSubTopic, partition2); String kafkaUrl = "blah"; PubSubMessage cr1 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, 0, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, mock(PubSubPosition.class), 0, 0); PubSubMessage cr2 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, 0, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, mock(PubSubPosition.class), 0, 0); PubSubMessage cr3 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, 1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition1, mock(PubSubPosition.class), 0, 0); PubSubMessage cr4 = - new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, 1, 0, 0); + new ImmutablePubSubMessage<>(key, value, pubSubTopicPartition2, mock(PubSubPosition.class), 0, 0); doReturn(true).when(mockTask).isHybridMode(); bufferService.putConsumerRecord(cr1, mockTask, null, partition1, kafkaUrl, 0); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index a318084d28..6ff5b71c42 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -151,10 +151,12 @@ import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; import com.linkedin.venice.pubsub.api.PubSubTopic; @@ -387,6 +389,7 @@ public static Object[][] sortedInputAndAAConfigProvider() { private PubSubTopic pubSubTopic; private PubSubTopicPartition fooTopicPartition; private PubSubTopicPartition barTopicPartition; + private PubSubPosition mockedPubSubPosition; private Runnable runnableForKillNonCurrentVersion; @@ -500,6 +503,7 @@ public void methodSetUp() throws Exception { pubSubTopic = pubSubTopicRepository.getTopic(topic); fooTopicPartition = new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO); barTopicPartition = new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_BAR); + mockedPubSubPosition = mock(PubSubPosition.class); inMemoryLocalKafkaBroker = new InMemoryKafkaBroker("local"); inMemoryLocalKafkaBroker.createTopic(topic, PARTITION_COUNT); @@ -3713,7 +3717,7 @@ public void testGetAndUpdateLeaderCompletedState(HybridConfig hybridConfig, Node KafkaKey.HEART_BEAT, kafkaMessageEnvelope, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), - 0, + mockedPubSubPosition, 0, 0, pubSubMessageHeaders); @@ -4305,7 +4309,7 @@ public void testProduceToStoreBufferService(AAConfig aaConfig) throws Exception kafkaKey, kafkaMessageEnvelope, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), - 0, + mockedPubSubPosition, 0, 0); @@ -4379,8 +4383,13 @@ private void verifyStats( @Test public void testShouldPersistRecord() throws Exception { - PubSubMessage pubSubMessage = - new ImmutablePubSubMessage(null, null, new PubSubTopicPartitionImpl(pubSubTopic, 1), 0, 0, 0); + PubSubMessage pubSubMessage = new ImmutablePubSubMessage( + null, + null, + new PubSubTopicPartitionImpl(pubSubTopic, 1), + mockedPubSubPosition, + 0, + 0); runTest(Collections.singleton(PARTITION_FOO), () -> { assertFalse(storeIngestionTaskUnderTest.shouldPersistRecord(pubSubMessage, null)); @@ -4427,8 +4436,13 @@ public void testShouldPersistRecord() throws Exception { PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateSupplier.get(); PubSubTopic wrongTopic = pubSubTopicRepository.getTopic("blah_v1"); - PubSubMessage pubSubMessage2 = - new ImmutablePubSubMessage(null, null, new PubSubTopicPartitionImpl(wrongTopic, 1), 0, 0, 0); + PubSubMessage pubSubMessage2 = new ImmutablePubSubMessage( + null, + null, + new PubSubTopicPartitionImpl(wrongTopic, 1), + mockedPubSubPosition, + 0, + 0); when(partitionConsumptionState.getLeaderFollowerState()).thenReturn(STANDBY); assertFalse(storeIngestionTaskUnderTest.shouldPersistRecord(pubSubMessage2, partitionConsumptionState)); @@ -4852,7 +4866,7 @@ public void testSITRecordTransformer(AAConfig aaConfig) throws Exception { kafkaKey, kafkaMessageEnvelope, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), - 0, + mockedPubSubPosition, 0, 0); @@ -4920,7 +4934,7 @@ public void testSITRecordTransformerUndefinedOutputValueClassAndSchema(AAConfig kafkaKey, kafkaMessageEnvelope, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), - 0, + mockedPubSubPosition, 0, 0); @@ -4988,7 +5002,7 @@ public void testSITRecordTransformerError(AAConfig aaConfig) throws Exception { kafkaKey, kafkaMessageEnvelope, new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO), - 0, + mockedPubSubPosition, 0, 0); @@ -5214,6 +5228,7 @@ public void testCheckAndHandleUpstreamOffsetRewind() { // Benign rewind final long messageOffset = 10; + PubSubPosition messagePosition = ApacheKafkaOffsetPosition.of(messageOffset); KafkaKey key = new KafkaKey(MessageType.PUT, "test_key".getBytes()); KafkaMessageEnvelope messsageEnvelope = new KafkaMessageEnvelope(); LeaderMetadata leaderMetadata = new LeaderMetadata(); @@ -5233,7 +5248,7 @@ public void testCheckAndHandleUpstreamOffsetRewind() { new PubSubTopicPartitionImpl( new TestPubSubTopic("test_store_v1", "test_store", PubSubTopicType.VERSION_TOPIC), 1), - messageOffset, + messagePosition, -1, 1000); AbstractStorageEngine mockStorageEngine2 = mock(AbstractStorageEngine.class); @@ -5276,7 +5291,9 @@ public void testCheckAndHandleUpstreamOffsetRewind() { assertTrue( exception.getMessage().contains("Failing the job because lossy rewind happens before receiving EndOfPush.")); // Verify that the VT offset is also in the error message - assertTrue(exception.getMessage().contains("received message at offset: " + messageOffset)); + assertTrue( + exception.getMessage().contains("received message at offset: " + messagePosition), + "Actual message: " + exception.getMessage()); verify(mockStats2).recordPotentiallyLossyLeaderOffsetRewind(storeName, version); } @@ -5401,7 +5418,7 @@ public void testGetTopicManager() throws Exception { } @Test - public void testShouldProcessRecordForGlobalRtDivMessage() throws Exception { + public void testShouldProcessRecordForGlobalRtDivMessage() { // Set up the environment. StoreIngestionTaskFactory.Builder builder = mock(StoreIngestionTaskFactory.Builder.class); StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); @@ -5462,7 +5479,7 @@ public void testShouldProcessRecordForGlobalRtDivMessage() throws Exception { PubSubTopicPartition versionTopicPartition = new PubSubTopicPartitionImpl(versionTopic, PARTITION_FOO); PubSubTopicPartition rtPartition = new PubSubTopicPartitionImpl(rtTopic, PARTITION_FOO); PubSubMessage vtRecord = - new ImmutablePubSubMessage<>(key, value, versionTopicPartition, 1, 0, 0); + new ImmutablePubSubMessage<>(key, value, versionTopicPartition, ApacheKafkaOffsetPosition.of(1), 0, 0); PartitionConsumptionState pcs = mock(PartitionConsumptionState.class); when(pcs.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER); @@ -5482,7 +5499,7 @@ public void testShouldProcessRecordForGlobalRtDivMessage() throws Exception { doReturn(pubSubTopicRepository.getTopic(rtTopicName)).when(offsetRecord).getLeaderTopic(any()); PubSubMessage rtRecord = - new ImmutablePubSubMessage<>(key, value, rtPartition, 0, 0, 0); + new ImmutablePubSubMessage<>(key, value, rtPartition, ApacheKafkaOffsetPosition.of(0), 0, 0); assertFalse(ingestionTask.shouldProcessRecord(rtRecord), "RT DIV from RT should not be processed"); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/KafkaDataIntegrityValidatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/KafkaDataIntegrityValidatorTest.java index 86826e8910..0e433c04c6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/KafkaDataIntegrityValidatorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/KafkaDataIntegrityValidatorTest.java @@ -27,7 +27,9 @@ import com.linkedin.venice.pubsub.ImmutablePubSubMessage; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.serialization.KafkaKeySerializer; import com.linkedin.venice.utils.DataProviderUtils; @@ -339,6 +341,12 @@ private static PubSubMessage bui when(offsetRecord.getMaxMessageTimeInMs()).thenReturn(brokerTimestamp); } - return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, topicPartition, offset, brokerTimestamp, 0); + return new ImmutablePubSubMessage<>( + kafkaKey, + messageEnvelope, + topicPartition, + ApacheKafkaOffsetPosition.of(offset), + brokerTimestamp, + 0); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java index c428b15a34..33ac9ef10a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java @@ -18,7 +18,9 @@ import com.linkedin.venice.pubsub.ImmutablePubSubMessage; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.TestUtils; @@ -146,7 +148,7 @@ public void testSequenceNumber(boolean isVersionTopic) { getControlMessageKey(startOfSegmentMessage), startOfSegmentMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, controlMessageConsumerRecord, false, Lazy.FALSE); @@ -160,7 +162,7 @@ public void testSequenceNumber(boolean isVersionTopic) { firstMessageKey, firstMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, firstConsumerRecord, false, Lazy.FALSE); @@ -175,7 +177,7 @@ public void testSequenceNumber(boolean isVersionTopic) { secondMessageKey, secondMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); Assert.assertThrows( @@ -192,7 +194,7 @@ public void testSequenceNumber(boolean isVersionTopic) { thirdMessageKey, thirdMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); // It doesn't matter whether EOP is true/false. The result is same. @@ -208,7 +210,7 @@ public void testSequenceNumber(boolean isVersionTopic) { fourthMessageKey, fourthMessage, pubSubTopicPartition, - offset, + ApacheKafkaOffsetPosition.of(offset), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, fourthConsumerRecord, false, Lazy.TRUE); @@ -242,7 +244,7 @@ public void testNewSegmentWithSeqNumberAfterEOP( firstMessageKey, firstMessage, pubSubTopicPartition, - offset, + ApacheKafkaOffsetPosition.of(offset), System.currentTimeMillis(), 0); partitionTracker.validateMessage(type, firstConsumerRecord, endOfPushReceived, Lazy.FALSE); @@ -274,7 +276,7 @@ public void testSegmentNumber(boolean isVersionTopic) { getControlMessageKey(startOfSegmentMessage), startOfSegmentMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, controlMessageConsumerRecord, true, Lazy.FALSE); @@ -289,7 +291,7 @@ public void testSegmentNumber(boolean isVersionTopic) { firstMessageKey, firstMessage, pubSubTopicPartition, - offset, + ApacheKafkaOffsetPosition.of(offset), System.currentTimeMillis() + 1000, 0); /** @@ -335,7 +337,7 @@ public void testDuplicateMsgsDetected(boolean isVersionTopic, CheckSumType check getControlMessageKey(startOfSegmentMessage), startOfSegmentMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, controlMessageConsumerRecord, true, Lazy.FALSE); @@ -351,7 +353,7 @@ public void testDuplicateMsgsDetected(boolean isVersionTopic, CheckSumType check getControlMessageKey(endOfSegmentMessage), endOfSegmentMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, controlMessageConsumerRecord, true, Lazy.TRUE); @@ -366,7 +368,7 @@ public void testDuplicateMsgsDetected(boolean isVersionTopic, CheckSumType check firstMessageKey, firstMessage, pubSubTopicPartition, - offset, + ApacheKafkaOffsetPosition.of(offset), System.currentTimeMillis() + 1000, 0); Assert.assertThrows( @@ -402,7 +404,7 @@ public void testMidSegmentCheckSumStates(boolean isVersionTopic, CheckSumType ch getControlMessageKey(startOfSegmentMessage), startOfSegmentMessage, pubSubTopicPartition, - offset++, + ApacheKafkaOffsetPosition.of(offset++), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, controlMessageConsumerRecord, true, Lazy.FALSE); @@ -424,7 +426,7 @@ public void testMidSegmentCheckSumStates(boolean isVersionTopic, CheckSumType ch firstMessageKey, firstMessage, pubSubTopicPartition, - offset, + ApacheKafkaOffsetPosition.of(offset), System.currentTimeMillis() + 1000, 0); partitionTracker.validateMessage(type, firstConsumerRecord, true, Lazy.TRUE); diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java index a12f5e074f..4a882eb39c 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java @@ -29,8 +29,10 @@ import com.linkedin.venice.pubsub.ImmutablePubSubMessage; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.utils.Utils; import java.nio.ByteBuffer; @@ -105,8 +107,13 @@ private List> prep put.putValue = ByteBuffer.wrap(putValueBytes); put.replicationMetadataPayload = ByteBuffer.allocate(0); messageEnvelope.payloadUnion = put; - PubSubMessage pubSubMessage = - new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 0, 0, 20); + PubSubMessage pubSubMessage = new ImmutablePubSubMessage<>( + kafkaKey, + messageEnvelope, + pubSubTopicPartition, + ApacheKafkaOffsetPosition.of(0), + 0, + 20); pubSubMessageList.add(pubSubMessage); } return pubSubMessageList; @@ -155,10 +162,20 @@ public void testAdminToolConsumption() { messageEnvelope2.payloadUnion = delete; PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), assignedPartition); - PubSubMessage pubSubMessage1 = - new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 0, 0, 20); - PubSubMessage pubSubMessage2 = - new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope2, pubSubTopicPartition, 1, 0, 10); + PubSubMessage pubSubMessage1 = new ImmutablePubSubMessage<>( + kafkaKey, + messageEnvelope, + pubSubTopicPartition, + ApacheKafkaOffsetPosition.of(0), + 0, + 20); + PubSubMessage pubSubMessage2 = new ImmutablePubSubMessage<>( + kafkaKey, + messageEnvelope2, + pubSubTopicPartition, + ApacheKafkaOffsetPosition.of(1), + 0, + 10); KafkaKey kafkaControlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); EndOfPush endOfPush = new EndOfPush(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); @@ -172,8 +189,13 @@ public void testAdminToolConsumption() { kafkaMessageEnvelope.producerMetadata.messageSequenceNumber = 0; kafkaMessageEnvelope.producerMetadata.segmentNumber = 0; kafkaMessageEnvelope.producerMetadata.producerGUID = new GUID(); - PubSubMessage pubSubMessage3 = - new ImmutablePubSubMessage<>(kafkaControlMessageKey, kafkaMessageEnvelope, pubSubTopicPartition, 2, 0, 20); + PubSubMessage pubSubMessage3 = new ImmutablePubSubMessage<>( + kafkaControlMessageKey, + kafkaMessageEnvelope, + pubSubTopicPartition, + ApacheKafkaOffsetPosition.of(2), + 0, + 20); List> pubSubMessageList = new ArrayList<>(); pubSubMessageList.add(pubSubMessage1); diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java index fea4dfdb76..d41619bd0f 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestKafkaTopicDumper.java @@ -2,7 +2,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -289,7 +288,7 @@ public void testTopicSwitchMessageLogging() { kafkaKey, messageEnvelope, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(120), + ApacheKafkaOffsetPosition.of(120), 0, 0, null); @@ -310,7 +309,7 @@ public void testTopicSwitchMessageLogging() { kafkaKey, messageEnvelope, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(120), + ApacheKafkaOffsetPosition.of(120), 0, 0, null); @@ -322,7 +321,7 @@ public void testTopicSwitchMessageLogging() { regularMsgKey, null, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(120), + ApacheKafkaOffsetPosition.of(120), 0, 0, null); @@ -453,9 +452,10 @@ private List> crea long startOffset) { List> messages = new ArrayList<>(); for (int i = 0; i < count; i++) { - PubSubMessage message = - mock(PubSubMessage.class, RETURNS_DEEP_STUBS); - when(message.getOffset().getNumericOffset()).thenReturn(startOffset + i); + PubSubMessage message = mock(PubSubMessage.class); + PubSubPosition pubSubPosition = mock(PubSubPosition.class); + when(pubSubPosition.getNumericOffset()).thenReturn(startOffset + i); + when(message.getOffset()).thenReturn(pubSubPosition); messages.add(message); } return messages; diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReaderTest.java index 69be74f62d..13db4d7839 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReaderTest.java @@ -21,8 +21,10 @@ import com.linkedin.venice.pubsub.ImmutablePubSubMessage; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; import com.linkedin.venice.utils.ByteUtils; @@ -74,7 +76,14 @@ public void testNext() throws IOException { put.putValue = ByteBuffer.wrap(valueBytes); put.replicationMetadataPayload = ByteBuffer.allocate(0); messageEnvelope.payloadUnion = put; - consumerRecordList.add(new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, i, -1, -1)); + consumerRecordList.add( + new ImmutablePubSubMessage<>( + kafkaKey, + messageEnvelope, + pubSubTopicPartition, + ApacheKafkaOffsetPosition.of(i), + -1, + -1)); } Map>> recordsMap = diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java index 3a34e27e8c..f729281fc2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java @@ -96,6 +96,6 @@ public String toString() { public int getHeapSize() { /** The {@link #topicPartition} is supposed to be a shared instance, and is therefore ignored. */ return SHALLOW_CLASS_OVERHEAD + InstanceSizeEstimator.getObjectSize(key) - + InstanceSizeEstimator.getObjectSize(value); + + InstanceSizeEstimator.getObjectSize(value) + InstanceSizeEstimator.getObjectSize(pubSubPosition); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPosition.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPosition.java index 34e73b1eca..4f6bc8afb0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPosition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPosition.java @@ -3,6 +3,7 @@ import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.io.ZeroCopyByteArrayOutputStream; +import com.linkedin.venice.memory.ClassSizeEstimator; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.pubsub.PubSubPositionType; import com.linkedin.venice.pubsub.api.PubSubPosition; @@ -19,6 +20,7 @@ public class ApacheKafkaOffsetPosition implements PubSubPosition { private static final ThreadLocal DECODER = new ThreadLocal<>(); private static final ThreadLocal ENCODER = new ThreadLocal<>(); + private final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(ApacheKafkaOffsetPosition.class); private final long offset; @@ -83,7 +85,7 @@ public long getOffset() { @Override public String toString() { - return "KafkaOffset: " + offset; + return String.format("KafkaOffset: %s", offset); } @Override @@ -110,7 +112,7 @@ public long getNumericOffset() { return offset; } - public static ApacheKafkaOffsetPosition getKafkaPosition(long offset) { + public static ApacheKafkaOffsetPosition of(long offset) { return new ApacheKafkaOffsetPosition(offset); } @@ -134,4 +136,9 @@ public PubSubPositionWireFormat getPositionWireFormat() { } return wireFormat; } + + @Override + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubPosition.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubPosition.java index 1591df406a..db917f01e2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubPosition.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubPosition.java @@ -2,13 +2,15 @@ import com.linkedin.venice.annotation.RestrictedApi; import com.linkedin.venice.annotation.UnderDevelopment; +import com.linkedin.venice.memory.ClassSizeEstimator; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.pubsub.PubSubPositionFactory; /** * Represents a position of a message in a partition of a topic. */ -public interface PubSubPosition { +public interface PubSubPosition extends Measurable { /** * A special position representing the earliest available message in a partition. All pub-sub adapters must support * this position, and all pub-sub client implementations should interpret it as the earliest retrievable message in @@ -16,6 +18,13 @@ public interface PubSubPosition { * in the underlying pub-sub system. */ PubSubPosition EARLIEST = new PubSubPosition() { + private final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(PubSubPosition.class); + + @Override + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD; + } + @Override public int comparePosition(PubSubPosition other) { throw new IllegalStateException("Cannot compare EARLIEST position"); @@ -59,6 +68,13 @@ public long getNumericOffset() { * in the underlying pub-sub system. */ PubSubPosition LATEST = new PubSubPosition() { + private final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(PubSubPosition.class); + + @Override + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD; + } + @Override public int comparePosition(PubSubPosition other) { throw new IllegalStateException("Cannot compare LATEST position"); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java index b7a02dfca6..831ce3e8e1 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java @@ -99,7 +99,7 @@ public void testInstanceMeasurement() { kafkaKeySupplier.get(), kmeSupplier.get(), pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(0), + ApacheKafkaOffsetPosition.of(0), 0, 0); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubPositionFactoryTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubPositionFactoryTest.java index eaf30606a6..770d0e5aa9 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubPositionFactoryTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/PubSubPositionFactoryTest.java @@ -17,7 +17,7 @@ public class PubSubPositionFactoryTest { @Test public void testConvertToPositionForApacheKafkaPosition() { - ApacheKafkaOffsetPosition position = ApacheKafkaOffsetPosition.getKafkaPosition(123); + ApacheKafkaOffsetPosition position = ApacheKafkaOffsetPosition.of(123); PubSubPositionWireFormat wireFormat = position.getPositionWireFormat(); PubSubPosition position1 = PubSubPositionFactory.getPositionFromWireFormat(wireFormat); @@ -34,7 +34,7 @@ public void testConvertToPositionForUnsupportedPosition() { @Test public void testConvertToPositionFromWireFormatPositionBytes() { - ApacheKafkaOffsetPosition kafkaPosition = ApacheKafkaOffsetPosition.getKafkaPosition(567); + ApacheKafkaOffsetPosition kafkaPosition = ApacheKafkaOffsetPosition.of(567); PubSubPositionWireFormat kafkaPositionWireFormat = kafkaPosition.getPositionWireFormat(); InternalAvroSpecificSerializer wireFormatSerializer = AvroProtocolDefinition.PUBSUB_POSITION_WIRE_FORMAT.getSerializer(); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPositionTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPositionTest.java index 697d169798..e6724f3882 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPositionTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaOffsetPositionTest.java @@ -18,9 +18,9 @@ public class ApacheKafkaOffsetPositionTest { @Test public void testComparePosition() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); - ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.getKafkaPosition(2); - ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.getKafkaPosition(3); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); + ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.of(2); + ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.of(3); assertTrue(position1.comparePosition(position2) < 0); assertTrue(position2.comparePosition(position1) > 0); @@ -31,9 +31,9 @@ public void testComparePosition() { @Test public void testDiff() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); - ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.getKafkaPosition(2); - ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.getKafkaPosition(3); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); + ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.of(2); + ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.of(3); assertEquals(position1.diff(position2), -1); assertEquals(position2.diff(position1), 1); @@ -44,33 +44,33 @@ public void testDiff() { @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Cannot compare ApacheKafkaOffsetPosition with null") public void testComparePositionWithNull() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); position1.comparePosition(null); } @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Cannot compare ApacheKafkaOffsetPosition with null") public void testDiffWithNull() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); position1.diff(null); } @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Cannot compare ApacheKafkaOffsetPosition with .*") public void testComparePositionWithDifferentType() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); position1.comparePosition(mock(PubSubPosition.class)); } @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Cannot compare ApacheKafkaOffsetPosition with .*") public void testDiffWithDifferentType() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); position1.diff(mock(PubSubPosition.class)); } @Test public void testEquals() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); - ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.getKafkaPosition(2); - ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.getKafkaPosition(1); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); + ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.of(2); + ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.of(1); assertEquals(position1, position1); assertEquals(position1, position3); @@ -82,9 +82,9 @@ public void testEquals() { @Test public void testHashCode() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); - ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.getKafkaPosition(2); - ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.getKafkaPosition(1); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); + ApacheKafkaOffsetPosition position2 = ApacheKafkaOffsetPosition.of(2); + ApacheKafkaOffsetPosition position3 = ApacheKafkaOffsetPosition.of(1); assertEquals(position1.hashCode(), position1.hashCode()); assertEquals(position1.hashCode(), position3.hashCode()); assertNotEquals(position1.hashCode(), position2.hashCode()); @@ -92,13 +92,13 @@ public void testHashCode() { @Test public void testToString() { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1); - assertEquals(position1.toString(), "ApacheKafkaOffsetPosition{offset=1}"); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1); + assertEquals(position1.toString(), "KafkaOffset: 1"); } @Test public void testGetOffset() throws IOException { - ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.getKafkaPosition(1234); + ApacheKafkaOffsetPosition position1 = ApacheKafkaOffsetPosition.of(1234); assertEquals(position1.getOffset(), 1234); PubSubPositionWireFormat wireFormat = position1.getPositionWireFormat(); ApacheKafkaOffsetPosition position2 = new ApacheKafkaOffsetPosition(wireFormat.rawBytes); @@ -113,7 +113,7 @@ public void testGetOffsetWithNull() throws IOException { @Test public void testGetPositionWireFormat() throws IOException { - ApacheKafkaOffsetPosition kafkaPosition = ApacheKafkaOffsetPosition.getKafkaPosition(Long.MAX_VALUE); + ApacheKafkaOffsetPosition kafkaPosition = ApacheKafkaOffsetPosition.of(Long.MAX_VALUE); PubSubPositionWireFormat wireFormat = kafkaPosition.getPositionWireFormat(); assertEquals(wireFormat.type, PubSubPositionType.APACHE_KAFKA_OFFSET); ApacheKafkaOffsetPosition kafkaPosition2 = new ApacheKafkaOffsetPosition(wireFormat.rawBytes); @@ -123,6 +123,6 @@ public void testGetPositionWireFormat() throws IOException { @Test(expectedExceptions = IllegalArgumentException.class) void testIllegalPosition() { - ApacheKafkaOffsetPosition.getKafkaPosition(OffsetRecord.LOWEST_OFFSET - 1); + ApacheKafkaOffsetPosition.of(OffsetRecord.LOWEST_OFFSET - 1); } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java index b6836bc0b4..e72ec017b8 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java @@ -168,7 +168,7 @@ public void testSubscribeWithInvalidPubSubPositionType() { @Test public void testSubscribeWithApacheKafkaOffsetPosition() { - ApacheKafkaOffsetPosition offsetPosition = ApacheKafkaOffsetPosition.getKafkaPosition(50); + ApacheKafkaOffsetPosition offsetPosition = ApacheKafkaOffsetPosition.of(50); when(internalKafkaConsumer.assignment()).thenReturn(Collections.emptySet()); kafkaConsumerAdapter.subscribe(pubSubTopicPartition, offsetPosition); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java index 2d28a4016e..43d5e77bda 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java @@ -398,7 +398,7 @@ private PubSubMessage getHeartBeatPubSubMessage(PubSubTopicPartition topicPartit key, val, topicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(offset), + ApacheKafkaOffsetPosition.of(offset), System.currentTimeMillis(), 512); } @@ -414,7 +414,7 @@ private PubSubMessage getPubSubMessage(PubSubTopicPartition topicPartition, bool key, val, topicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(offset), + ApacheKafkaOffsetPosition.of(offset), System.currentTimeMillis(), 512); } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java index 382264e4db..d45bc8c992 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java @@ -63,7 +63,7 @@ public void testGetDictionary() { controlMessageKey, sopWithDictionaryValue, topicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(0), + ApacheKafkaOffsetPosition.of(0), 0L, 0); doReturn(Collections.singletonMap(topicPartition, Collections.singletonList(sopWithDictionary))) @@ -99,7 +99,7 @@ public void testGetDictionaryReturnsNullWhenNoDictionary() { controlMessageKey, sopWithDictionaryValue, topicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(0), + ApacheKafkaOffsetPosition.of(0), 0L, 0); doReturn(Collections.singletonMap(topicPartition, Collections.singletonList(sopWithDictionary))) @@ -133,7 +133,7 @@ public void testGetDictionaryReturnsNullWhenNoSOP() { dataMessageKey, putMessageValue, topicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(0), + ApacheKafkaOffsetPosition.of(0), 0L, 0); doReturn(Collections.singletonMap(topicPartition, Collections.singletonList(sopWithDictionary))) @@ -172,7 +172,7 @@ public void testGetDictionaryWaitsTillTopicHasRecords() { controlMessageKey, sopWithDictionaryValue, topicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(0), + ApacheKafkaOffsetPosition.of(0), 0L, 0); doReturn(Collections.emptyMap()) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java index 9c4775df54..69abd0ee40 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java @@ -667,7 +667,7 @@ public void testAAIngestionWithStoreView() throws Exception { polledChangeEvents.clear(); // Seek to a bogus checkpoint - PubSubPosition badPubSubPosition = ApacheKafkaOffsetPosition.getKafkaPosition(1337L); + PubSubPosition badPubSubPosition = ApacheKafkaOffsetPosition.of(1337L); VeniceChangeCoordinate badCoordinate = new MockVeniceChangeCoordinate(storeName + "_v777777", badPubSubPosition, 0); Set badCheckpointSet = new HashSet<>(); badCheckpointSet.add(badCoordinate); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index bb204b07fe..075b3f9f1f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -1024,9 +1024,10 @@ public void testNonAARewind() throws Exception { veniceProducer.stop(); } try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAME, parentControllerUrl)) { - VersionCreationResponse response = parentControllerClient.emptyPush(storeName, "test_push_id_v2", 1000); + VersionCreationResponse response = + TestUtils.assertCommand(parentControllerClient.emptyPush(storeName, "test_push_id_v2", 1000)); TestUtils.waitForNonDeterministicPushCompletion( - Version.composeKafkaTopic(storeName, 2), + response.getKafkaTopic(), parentControllerClient, 30, TimeUnit.SECONDS); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/unit/kafka/consumer/poll/AbstractPollStrategy.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/unit/kafka/consumer/poll/AbstractPollStrategy.java index 1f6800f321..efde583c98 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/unit/kafka/consumer/poll/AbstractPollStrategy.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/unit/kafka/consumer/poll/AbstractPollStrategy.java @@ -91,7 +91,7 @@ public synchronized Map crea kafkaKey, messageEnvelope, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(newOffset), + ApacheKafkaOffsetPosition.of(newOffset), 0, 20); } @@ -131,7 +131,7 @@ public static PubSubMessage crea kafkaKey, messageEnvelope, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(newOffset), + ApacheKafkaOffsetPosition.of(newOffset), 0, 20); } @@ -165,7 +165,7 @@ public static PubSubMessage crea kafkaKey, messageEnvelope, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(1), + ApacheKafkaOffsetPosition.of(1), 0, 20); } @@ -191,7 +191,7 @@ public static PubSubMessage crea kafkaKey, messageEnvelope, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(1), + ApacheKafkaOffsetPosition.of(1), 0, serializedValue.length); } @@ -213,7 +213,7 @@ public static PubSubMessage crea kafkaKey, messageEnvelope, pubSubTopicPartition, - ApacheKafkaOffsetPosition.getKafkaPosition(1), + ApacheKafkaOffsetPosition.of(1), 0, serializedValue.length); }