Skip to content

Commit 3f74371

Browse files
committed
Add get_watermark_offsets
1 parent 52546d0 commit 3f74371

File tree

4 files changed

+60
-0
lines changed

4 files changed

+60
-0
lines changed

src/consumer/base_consumer.rs

+21
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,27 @@ where
656656
}
657657
}
658658

659+
fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)> {
660+
let mut low = -1;
661+
let mut high = -1;
662+
let topic_c = CString::new(topic.to_string())?;
663+
let result = unsafe {
664+
rdsys::rd_kafka_get_watermark_offsets(
665+
self.client.native_ptr(),
666+
topic_c.as_ptr(),
667+
partition,
668+
&mut low as *mut i64,
669+
&mut high as *mut i64,
670+
)
671+
};
672+
673+
if result.is_error() {
674+
Err(KafkaError::MetadataFetch(result.into()))
675+
} else {
676+
Ok((low, high))
677+
}
678+
}
679+
659680
fn position(&self) -> KafkaResult<TopicPartitionList> {
660681
let tpl = self.assignment()?;
661682
let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };

src/consumer/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,15 @@ where
369369
T: Into<Timeout>,
370370
Self: Sized;
371371

372+
/// Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
373+
///
374+
/// The low offset is updated periodically (if statistics.interval.ms is set) while the
375+
/// high offset is updated on each fetched message set from the broker.
376+
///
377+
/// If there is no cached offset (either low or high, or both) then OFFSET_INVALID will
378+
/// be returned for the respective offset.
379+
fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)>;
380+
372381
/// Retrieve current positions (offsets) for topics and partitions.
373382
fn position(&self) -> KafkaResult<TopicPartitionList>;
374383

src/consumer/stream_consumer.rs

+4
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,10 @@ where
493493
self.base.offsets_for_times(timestamps, timeout)
494494
}
495495

496+
fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)> {
497+
self.base.get_watermark_offsets(topic, partition)
498+
}
499+
496500
fn position(&self) -> KafkaResult<TopicPartitionList> {
497501
self.base.position()
498502
}

tests/test_high_consumers.rs

+26
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,19 @@ async fn test_consumer_commit_message() {
314314
(0, 12)
315315
);
316316

317+
assert_eq!(
318+
consumer.get_watermark_offsets(&topic_name, 0).unwrap(),
319+
(0, 10)
320+
);
321+
assert_eq!(
322+
consumer.get_watermark_offsets(&topic_name, 1).unwrap(),
323+
(0, 11)
324+
);
325+
assert_eq!(
326+
consumer.get_watermark_offsets(&topic_name, 2).unwrap(),
327+
(0, 12)
328+
);
329+
317330
let mut assignment = TopicPartitionList::new();
318331
assignment
319332
.add_partition_offset(&topic_name, 0, Offset::Stored)
@@ -399,6 +412,19 @@ async fn test_consumer_store_offset_commit() {
399412
(0, 12)
400413
);
401414

415+
assert_eq!(
416+
consumer.get_watermark_offsets(&topic_name, 0).unwrap(),
417+
(0, 10)
418+
);
419+
assert_eq!(
420+
consumer.get_watermark_offsets(&topic_name, 1).unwrap(),
421+
(0, 11)
422+
);
423+
assert_eq!(
424+
consumer.get_watermark_offsets(&topic_name, 2).unwrap(),
425+
(0, 12)
426+
);
427+
402428
let mut assignment = TopicPartitionList::new();
403429
assignment
404430
.add_partition_offset(&topic_name, 0, Offset::Stored)

0 commit comments

Comments
 (0)