Skip to content

Commit

Permalink
[server][dvc][vpj][controller] Add getPositionByTimestamp API in PubS…
Browse files Browse the repository at this point in the history
…ubConsumerAdapter (#1540)

Introduced new default methods `getPositionByTimestamp` in `PubSubConsumerAdapter`  
to fetch offsets based on timestamps, returning a `PubSubPosition` object.  
This replaces the `offsetForTime` API, which previously returned a numeric offset.
  • Loading branch information
sushantmane authored Feb 24, 2025
1 parent 2f3a731 commit 2456ceb
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public boolean hasAnySubscription() {

@Override
public boolean hasSubscription(PubSubTopicPartition pubSubTopicPartition) {
pubSubTopicPartition = Objects.requireNonNull(pubSubTopicPartition, "PubSubTopicPartition cannot be null");
Objects.requireNonNull(pubSubTopicPartition, "PubSubTopicPartition cannot be null");
String topic = pubSubTopicPartition.getPubSubTopic().getName();
int partition = pubSubTopicPartition.getPartitionNumber();
TopicPartition tp = new TopicPartition(topic, partition);
Expand Down Expand Up @@ -431,8 +431,8 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetMap =
this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), timeout);
if (topicPartitionOffsetMap.isEmpty()) {
return -1L;
if (topicPartitionOffsetMap == null || topicPartitionOffsetMap.isEmpty()) {
return null;
}
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetMap.get(topicPartition);
if (offsetAndTimestamp == null) {
Expand Down Expand Up @@ -460,8 +460,8 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest
pubSubTopicPartition.getPartitionNumber());
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetMap =
this.kafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));
if (topicPartitionOffsetMap.isEmpty()) {
return -1L;
if (topicPartitionOffsetMap == null || topicPartitionOffsetMap.isEmpty()) {
return null;
}
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetMap.get(topicPartition);
if (offsetAndTimestamp == null) {
Expand All @@ -479,6 +479,27 @@ public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timest
}
}

@Override
public PubSubPosition getPositionByTimestamp(
PubSubTopicPartition pubSubTopicPartition,
long timestamp,
Duration timeout) {
Long offset = offsetForTime(pubSubTopicPartition, timestamp, timeout);
if (offset == null) {
return null;
}
return new ApacheKafkaOffsetPosition(offset);
}

@Override
public PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
Long offset = offsetForTime(pubSubTopicPartition, timestamp);
if (offset == null) {
return null;
}
return new ApacheKafkaOffsetPosition(offset);
}

@Override
public Long beginningOffset(PubSubTopicPartition pubSubTopicPartition, Duration timeout) {
TopicPartition kafkaTp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,27 @@ default long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) {
*/
Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout);

/**
* Retrieves the offset of the first message with a timestamp greater than or equal to the target
* timestamp for the specified PubSub topic-partition. If no such message is found, {@code null}
* will be returned for the partition.
*
* @param pubSubTopicPartition The PubSub topic-partition for which to fetch the offset.
* @param timestamp The target timestamp to search for in milliseconds since the Unix epoch.
* @param timeout The maximum duration to wait for the operation to complete.
* @return The offset of the first message with a timestamp greater than or equal to the target timestamp,
* or {@code null} if no such message is found for the partition.
* @throws PubSubOpTimeoutException If the operation times out while fetching the offset.
* @throws PubSubClientException If there is an error while attempting to fetch the offset.
*/
@UnderDevelopment("Under development and may change in the future.")
default PubSubPosition getPositionByTimestamp(
PubSubTopicPartition pubSubTopicPartition,
long timestamp,
Duration timeout) {
throw new UnsupportedOperationException("getPositionByTimestamp is not supported");
}

/**
* Retrieves the offset of the first message with a timestamp greater than or equal to the target
* timestamp for the specified PubSub topic-partition. If no such message is found, {@code null}
Expand All @@ -192,6 +213,23 @@ default long getLatestOffset(PubSubTopicPartition pubSubTopicPartition) {
*/
Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp);

/**
* Retrieves the offset of the first message with a timestamp greater than or equal to the target
* timestamp for the specified PubSub topic-partition. If no such message is found, {@code null}
* will be returned for the partition.
*
* @param pubSubTopicPartition The PubSub topic-partition for which to fetch the offset.
* @param timestamp The target timestamp to search for in milliseconds since the Unix epoch.
* @return The offset of the first message with a timestamp greater than or equal to the target timestamp,
* or {@code null} if no such message is found for the partition.
* @throws PubSubOpTimeoutException If the operation times out while fetching the offset.
* @throws PubSubClientException If there is an error while attempting to fetch the offset.
*/
@UnderDevelopment("Under development and may change in the future.")
default PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) {
throw new UnsupportedOperationException("getPositionByTimestamp is not supported");
}

/**
* Retrieves the beginning offset for the specified PubSub topic-partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -576,4 +577,142 @@ public void testEndPosition() {
position = kafkaConsumerAdapter.endPosition(pubSubTopicPartition);
assertEquals(position, PubSubPosition.LATEST);
}

@Test
public void testOffsetForTimeWithTimeoutSuccess() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
Long expectedOffset = 500L;
OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(expectedOffset, timestamp);
Map<TopicPartition, OffsetAndTimestamp> mockResponse = Collections.singletonMap(topicPartition, offsetAndTimestamp);

when(
internalKafkaConsumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500)))
.thenReturn(mockResponse);

Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
assertEquals(actualOffset, expectedOffset);
}

@Test
public void testOffsetForTimeWithTimeoutReturnsNull() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
Map<TopicPartition, OffsetAndTimestamp> mockResponse = Collections.emptyMap();

when(
internalKafkaConsumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500)))
.thenReturn(mockResponse);

Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
assertNull(actualOffset);
}

@Test(expectedExceptions = PubSubOpTimeoutException.class)
public void testOffsetForTimeWithTimeoutThrowsTimeoutException() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;

when(
internalKafkaConsumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500)))
.thenThrow(new TimeoutException("Test timeout"));

kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
}

@Test
public void testOffsetForTimeWithoutTimeoutSuccess() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
Long expectedOffset = 500L;
OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(expectedOffset, timestamp);
Map<TopicPartition, OffsetAndTimestamp> mockResponse = Collections.singletonMap(topicPartition, offsetAndTimestamp);

when(internalKafkaConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)))
.thenReturn(mockResponse);

Long actualOffset = kafkaConsumerAdapter.offsetForTime(pubSubTopicPartition, timestamp);
assertEquals(actualOffset, expectedOffset);
}

@Test
public void testGetPositionByTimestampWithTimeout() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
long expectedOffset = 500L;

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResponse =
Collections.singletonMap(topicPartition, new OffsetAndTimestamp(expectedOffset, timestamp));

doReturn(offsetsForTimesResponse).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp), Duration.ofMillis(500));

PubSubPosition position =
kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp, Duration.ofMillis(500));
assertNotNull(position);
assertTrue(position instanceof ApacheKafkaOffsetPosition);
assertEquals(((ApacheKafkaOffsetPosition) position).getOffset(), expectedOffset);
}

@Test
public void testGetPositionByTimestampWithoutTimeout() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;
long expectedOffset = 500L;
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimesResponse =
Collections.singletonMap(topicPartition, new OffsetAndTimestamp(expectedOffset, timestamp));

doReturn(offsetsForTimesResponse).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));

PubSubPosition position = kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp);
assertNotNull(position);
assertTrue(position instanceof ApacheKafkaOffsetPosition);
assertEquals(((ApacheKafkaOffsetPosition) position).getOffset(), expectedOffset);
}

@Test
public void testGetPositionByTimestampReturnsNull() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;

doReturn(Collections.emptyMap()).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));

PubSubPosition position = kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp);
assertNull(position);
}

@Test
public void testGetPositionByTimestampThrowsException() {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("test"), 0);
TopicPartition topicPartition =
new TopicPartition(pubSubTopicPartition.getTopicName(), pubSubTopicPartition.getPartitionNumber());
long timestamp = 1000000L;

doThrow(new RuntimeException("Simulate exception")).when(internalKafkaConsumer)
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));

Exception e = expectThrows(
PubSubClientException.class,
() -> kafkaConsumerAdapter.getPositionByTimestamp(pubSubTopicPartition, timestamp));
assertTrue(e.getMessage().contains("Failed to fetch offset for time"), "Actual message: " + e.getMessage());
}
}

0 comments on commit 2456ceb

Please sign in to comment.