Skip to content

Commit 978c964

Browse files
committed
Fix op timeout computation logic on poll_queue
1 parent 3b13940 commit 978c964

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

src/consumer/base_consumer.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::mem::ManuallyDrop;
55
use std::os::raw::c_void;
66
use std::ptr;
77
use std::sync::Arc;
8-
use std::time::Duration;
8+
use std::time::{Duration, Instant};
99

1010
use log::{error, warn};
1111
use rdkafka_sys as rdsys;
@@ -123,6 +123,7 @@ where
123123
queue: &NativeQueue,
124124
timeout: T,
125125
) -> Option<KafkaResult<BorrowedMessage<'_>>> {
126+
let now = Instant::now();
126127
let mut timeout = timeout.into();
127128
let min_poll_interval = self.context().main_queue_min_poll_interval();
128129
loop {
@@ -158,10 +159,10 @@ where
158159
}
159160
}
160161

161-
if op_timeout >= timeout {
162+
timeout = timeout.saturating_sub(now.elapsed());
163+
if timeout.is_zero() {
162164
return None;
163165
}
164-
timeout -= op_timeout;
165166
}
166167
}
167168

src/util.rs

+16
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,22 @@ impl Timeout {
4848
Timeout::Never => -1,
4949
}
5050
}
51+
52+
/// Saturating `Duration` subtraction to Timeout.
53+
pub(crate) fn saturating_sub(&self, rhs: Duration) -> Timeout {
54+
match (self, rhs) {
55+
(Timeout::After(lhs), rhs) => Timeout::After(lhs.saturating_sub(rhs)),
56+
(Timeout::Never, _) => Timeout::Never,
57+
}
58+
}
59+
60+
/// Returns `true` if the timeout is zero.
61+
pub(crate) fn is_zero(&self) -> bool {
62+
match self {
63+
Timeout::After(d) => d.is_zero(),
64+
Timeout::Never => false,
65+
}
66+
}
5167
}
5268

5369
impl std::ops::SubAssign for Timeout {

0 commit comments

Comments
 (0)