From 2456cebc68def4ec8c25f2420c096bc3740b8c0f Mon Sep 17 00:00:00 2001 From: Sushant Mane Date: Mon, 24 Feb 2025 11:57:45 -0800 Subject: [PATCH] [server][dvc][vpj][controller] Add getPositionByTimestamp API in PubSubConsumerAdapter (#1540) Introduced new default methods `getPositionByTimestamp` in `PubSubConsumerAdapter` to fetch offsets based on timestamps, returning a `PubSubPosition` object. This replaces the `offsetForTime` API, which previously returned a numeric offset. --- .../consumer/ApacheKafkaConsumerAdapter.java | 31 +++- .../pubsub/api/PubSubConsumerAdapter.java | 38 +++++ .../ApacheKafkaConsumerAdapterTest.java | 139 ++++++++++++++++++ 3 files changed, 203 insertions(+), 5 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java index 1e12cafc6a..aadb7ffdb1 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java @@ -344,7 +344,7 @@ public boolean hasAnySubscription() { @Override public boolean hasSubscription(PubSubTopicPartition pubSubTopicPartition) { - pubSubTopicPartition = Objects.requireNonNull(pubSubTopicPartition, "PubSubTopicPartition cannot be null"); + Objects.requireNonNull(pubSubTopicPartition, "PubSubTopicPartition cannot be null"); String topic = pubSubTopicPartition.getPubSubTopic().getName(); int partition = pubSubTopicPartition.getPartitionNumber(); TopicPartition tp = new TopicPartition(topic, partition); @@ -431,8 +431,8 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); Map topicPartitionOffsetMap = this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), timeout); - if (topicPartitionOffsetMap.isEmpty()) { - return -1L; + if (topicPartitionOffsetMap == null || topicPartitionOffsetMap.isEmpty()) { + return null; } OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetMap.get(topicPartition); if (offsetAndTimestamp == null) { @@ -460,8 +460,8 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest pubSubTopicPartition.getPartitionNumber()); Map topicPartitionOffsetMap = this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)); - if (topicPartitionOffsetMap.isEmpty()) { - return -1L; + if (topicPartitionOffsetMap == null || topicPartitionOffsetMap.isEmpty()) { + return null; } OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetMap.get(topicPartition); if (offsetAndTimestamp == null) { @@ -479,6 +479,27 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest } } + @Override + public PubSubPosition getPositionByTimestamp( + PubSubTopicPartition pubSubTopicPartition, + long timestamp, + Duration timeout) { + Long offset = offsetForTime(pubSubTopicPartition, timestamp, timeout); + if (offset == null) { + return null; + } + return new ApacheKafkaOffsetPosition(offset); + } + + @Override + public PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) { + Long offset = offsetForTime(pubSubTopicPartition, timestamp); + if (offset == null) { + return null; + } + return new ApacheKafkaOffsetPosition(offset); + } + @Override public Long beginningOffset(PubSubTopicPartition pubSubTopicPartition, Duration timeout) { TopicPartition kafkaTp = diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubConsumerAdapter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubConsumerAdapter.java index c8b5993ab1..876e632932 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubConsumerAdapter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubConsumerAdapter.java @@ -178,6 +178,27 @@ default long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) { */ Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout); + /** + * Retrieves the offset of the first message with a timestamp greater than or equal to the target + * timestamp for the specified PubSub topic-partition. If no such message is found, {@code null} + * will be returned for the partition. + * + * @param pubSubTopicPartition The PubSub topic-partition for which to fetch the offset. + * @param timestamp The target timestamp to search for in milliseconds since the Unix epoch. + * @param timeout The maximum duration to wait for the operation to complete. + * @return The offset of the first message with a timestamp greater than or equal to the target timestamp, + * or {@code null} if no such message is found for the partition. + * @throws PubSubOpTimeoutException If the operation times out while fetching the offset. + * @throws PubSubClientException If there is an error while attempting to fetch the offset. + */ + @UnderDevelopment("Under development and may change in the future.") + default PubSubPosition getPositionByTimestamp( + PubSubTopicPartition pubSubTopicPartition, + long timestamp, + Duration timeout) { + throw new UnsupportedOperationException("getPositionByTimestamp is not supported"); + } + /** * Retrieves the offset of the first message with a timestamp greater than or equal to the target * timestamp for the specified PubSub topic-partition. If no such message is found, {@code null} @@ -192,6 +213,23 @@ default long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) { */ Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp); + /** + * Retrieves the offset of the first message with a timestamp greater than or equal to the target + * timestamp for the specified PubSub topic-partition. If no such message is found, {@code null} + * will be returned for the partition. + * + * @param pubSubTopicPartition The PubSub topic-partition for which to fetch the offset. + * @param timestamp The target timestamp to search for in milliseconds since the Unix epoch. + * @return The offset of the first message with a timestamp greater than or equal to the target timestamp, + * or {@code null} if no such message is found for the partition. + * @throws PubSubOpTimeoutException If the operation times out while fetching the offset. + * @throws PubSubClientException If there is an error while attempting to fetch the offset. + */ + @UnderDevelopment("Under development and may change in the future.") + default PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) { + throw new UnsupportedOperationException("getPositionByTimestamp is not supported"); + } + /** * Retrieves the beginning offset for the specified PubSub topic-partition. * diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java index 8d7f17b801..0d9069bae6 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java @@ -61,6 +61,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -576,4 +577,142 @@ public void testEndPosition() { position = kafkaConsumerAdapter.endPosition(pubSubTopicPartition); assertEquals(position, PubSubPosition.LATEST); } + + @Test + public void testOffsetForTimeWithTimeoutSuccess() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + Long expectedOffset = 500L; + OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(expectedOffset, timestamp); + Map mockResponse = Collections.singletonMap(topicPartition, offsetAndTimestamp); + + when( + internalKafkaConsumer + .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500))) + .thenReturn(mockResponse); + + Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500)); + assertEquals(actualOffset, expectedOffset); + } + + @Test + public void testOffsetForTimeWithTimeoutReturnsNull() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + Map mockResponse = Collections.emptyMap(); + + when( + internalKafkaConsumer + .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500))) + .thenReturn(mockResponse); + + Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500)); + assertNull(actualOffset); + } + + @Test(expectedExceptions = PubSubOpTimeoutException.class) + public void testOffsetForTimeWithTimeoutThrowsTimeoutException() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + + when( + internalKafkaConsumer + .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500))) + .thenThrow(new TimeoutException("Test timeout")); + + kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500)); + } + + @Test + public void testOffsetForTimeWithoutTimeoutSuccess() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + Long expectedOffset = 500L; + OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(expectedOffset, timestamp); + Map mockResponse = Collections.singletonMap(topicPartition, offsetAndTimestamp); + + when(internalKafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp))) + .thenReturn(mockResponse); + + Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp); + assertEquals(actualOffset, expectedOffset); + } + + @Test + public void testGetPositionByTimestampWithTimeout() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + long expectedOffset = 500L; + + Map offsetsForTimesResponse = + Collections.singletonMap(topicPartition, new OffsetAndTimestamp(expectedOffset, timestamp)); + + doReturn(offsetsForTimesResponse).when(internalKafkaConsumer) + .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500)); + + PubSubPosition position = + kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp, Duration.ofMillis(500)); + assertNotNull(position); + assertTrue(position instanceof ApacheKafkaOffsetPosition); + assertEquals(((ApacheKafkaOffsetPosition) position).getOffset(), expectedOffset); + } + + @Test + public void testGetPositionByTimestampWithoutTimeout() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + long expectedOffset = 500L; + Map offsetsForTimesResponse = + Collections.singletonMap(topicPartition, new OffsetAndTimestamp(expectedOffset, timestamp)); + + doReturn(offsetsForTimesResponse).when(internalKafkaConsumer) + .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)); + + PubSubPosition position = kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp); + assertNotNull(position); + assertTrue(position instanceof ApacheKafkaOffsetPosition); + assertEquals(((ApacheKafkaOffsetPosition) position).getOffset(), expectedOffset); + } + + @Test + public void testGetPositionByTimestampReturnsNull() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + + doReturn(Collections.emptyMap()).when(internalKafkaConsumer) + .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)); + + PubSubPosition position = kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp); + assertNull(position); + } + + @Test + public void testGetPositionByTimestampThrowsException() { + PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0); + TopicPartition topicPartition = + new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber()); + long timestamp = 1000000L; + + doThrow(new RuntimeException("Simulate exception")).when(internalKafkaConsumer) + .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)); + + Exception e = expectThrows( + PubSubClientException.class, + () -> kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp)); + assertTrue(e.getMessage().contains("Failed to fetch offset for time"), "Actual message: " + e.getMessage()); + } }