Skip to content

Commit 0b35424

Browse files
authored
Create a new Delivery struct that includes Timestamp (fede1024#555)
* Add `Timestamp` to the `OwnedDeliveryResult` tuple. **Note:** This is an API breaking change. Currently the `Timestamp` value is being dropped as part of the conversion from `BorrowedMessage` to `OwnedDeliveryResult`, however this value is updated once the message has been published and callers should have access to the updated value. As far as I can tell, this is the **only** other value that is updated as part of publishing. An alternative to expanding the tuple would be to replace it with a struct. This is a bit more disruptive now, but would making adding values in the future a backwards compatible change. So it depends how likely it is that there will be further new values. This could also be done in a way that isn't a breaking change, but would involve duplicating functions and copying some types, i.e. `DeliveryFuture`. It doesn't seem like that level of complexity is justified given the pre 1.0 status. * Switch from tuple to struct in OwnedDeliveryResult. * Remove unused import. * Rename struct. * Add breaking change notice to the changelog. * Respect existing line lengths.
1 parent a1be711 commit 0b35424

File tree

5 files changed

+32
-13
lines changed

5 files changed

+32
-13
lines changed

changelog.md

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
1010
* Address wakeup races introduced by pivoting to the event API.
1111
* Update `BaseProducer::poll` to not return early, and instead continue
1212
looping until the passed timeout is reached.
13+
* **Breaking change.** Change signature for `OwnedDeliveryResult`. The
14+
`Ok` variant is now a `Delivery` struct, rather than a tuple. This allows
15+
or including `Timestamp` as a result field. It means that adding values
16+
in the future will not require a breaking change.
1317

1418
## 0.36.2 (2024-01-16)
1519

src/message.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
1717
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
1818

1919
/// Timestamp of a Kafka message.
20-
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
20+
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Clone, Copy)]
2121
pub enum Timestamp {
2222
/// Timestamp not available.
2323
NotAvailable,

src/producer/future_producer.rs

+21-8
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,23 @@ pub struct FutureProducerContext<C: ClientContext + 'static> {
132132
wrapped_context: C,
133133
}
134134

135-
/// Represents the result of message production as performed from the
136-
/// `FutureProducer`.
135+
/// Contains information about a successfully delivered message
136+
#[derive(Debug, PartialEq, Eq)]
137+
pub struct Delivery {
138+
/// The partition the message was delivered to
139+
pub partition: i32,
140+
/// The offset within the partition
141+
pub offset: i64,
142+
/// The timestamp associated with the message
143+
pub timestamp: Timestamp,
144+
}
145+
146+
/// Represents the result of message production as performed from the FutureProducer.
137147
///
138-
/// If message delivery was successful, `OwnedDeliveryResult` will return the
139-
/// partition and offset of the message. If the message failed to be delivered
140-
/// an error will be returned, together with an owned copy of the original
141-
/// message.
142-
pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;
148+
/// If message delivery was successful, returns `DeliveredMessage` containing the partition,
149+
/// offset and timestamp. If the message failed to be delivered, returns the error and
150+
/// an owned copy of the original message.
151+
pub type OwnedDeliveryResult = Result<Delivery, (KafkaError, OwnedMessage)>;
143152

144153
// Delegates all the methods calls to the wrapped context.
145154
impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
@@ -183,7 +192,11 @@ where
183192
tx: Box<oneshot::Sender<OwnedDeliveryResult>>,
184193
) {
185194
let owned_delivery_result = match *delivery_result {
186-
Ok(ref message) => Ok((message.partition(), message.offset())),
195+
Ok(ref message) => Ok(Delivery {
196+
partition: message.partition(),
197+
offset: message.offset(),
198+
timestamp: message.timestamp(),
199+
}),
187200
Err((ref error, ref message)) => Err((error.clone(), message.detach())),
188201
};
189202
let _ = tx.send(owned_delivery_result); // TODO: handle error

tests/test_high_producers.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use rdkafka::error::{KafkaError, RDKafkaErrorCode};
1111
use rdkafka::message::{Header, Headers, Message, OwnedHeaders};
1212
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
1313
use rdkafka::util::Timeout;
14+
use rdkafka::Timestamp;
1415

1516
use crate::utils::*;
1617

@@ -44,9 +45,10 @@ async fn test_future_producer_send() {
4445
let results: Vec<_> = results.collect().await;
4546
assert!(results.len() == 10);
4647
for (i, result) in results.into_iter().enumerate() {
47-
let (partition, offset) = result.unwrap();
48-
assert_eq!(partition, 1);
49-
assert_eq!(offset, i as i64);
48+
let delivered = result.unwrap();
49+
assert_eq!(delivered.partition, 1);
50+
assert_eq!(delivered.offset, i as i64);
51+
assert!(delivered.timestamp < Timestamp::now());
5052
}
5153
}
5254

tests/utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ where
146146
let mut message_map = HashMap::new();
147147
for (id, future) in futures {
148148
match future.await {
149-
Ok((partition, offset)) => message_map.insert((partition, offset), id),
149+
Ok(delivered) => message_map.insert((delivered.partition, delivered.offset), id),
150150
Err((kafka_error, _message)) => panic!("Delivery failed: {}", kafka_error),
151151
};
152152
}

0 commit comments

Comments
 (0)