From 36be0c56495d81fac979c3b5a71f358bd25f419b Mon Sep 17 00:00:00 2001 From: Ignytis <155588001+ignytis@users.noreply.github.com> Date: Wed, 3 Apr 2024 21:50:29 +0300 Subject: [PATCH 1/3] Poll errror callbacks --- src/client.rs | 3 +++ src/config.rs | 11 ++++++++++- src/producer/base_producer.rs | 8 ++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 1b9f6bd1c..4772f0253 100644 --- a/src/client.rs +++ b/src/client.rs @@ -212,6 +212,8 @@ impl NativeClient { pub struct Client { native: NativeClient, context: Arc, + /// A poll error callback for main queue + pub queue_poll_error_cb: Option, } impl Client { @@ -260,6 +262,7 @@ impl Client { Ok(Client { native: unsafe { NativeClient::from_ptr(client_ptr) }, context, + queue_poll_error_cb: config.queue_poll_error_cb, }) } diff --git a/src/config.rs b/src/config.rs index 296d9f867..5b4c854b3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,7 +23,7 @@ //! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md use std::collections::HashMap; -use std::ffi::{CStr, CString}; +use std::ffi::CString; use std::iter::FromIterator; use std::os::raw::c_char; use std::ptr; @@ -161,6 +161,8 @@ pub struct ClientConfig { /// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list /// of available levels. pub log_level: RDKafkaLogLevel, + /// Poll error callback + pub queue_poll_error_cb: Option, } impl Default for ClientConfig { @@ -175,6 +177,7 @@ impl ClientConfig { ClientConfig { conf_map: HashMap::new(), log_level: log_level_from_global_config(), + queue_poll_error_cb: None, } } @@ -222,6 +225,12 @@ impl ClientConfig { self } + /// Sets the error callback for poll method. + pub fn set_queue_poll_error_cb(&mut self, error_cb: fn(String)) -> &mut ClientConfig { + self.queue_poll_error_cb = Some(error_cb); + self + } + /// Builds a native librdkafka configuration. pub fn create_native_config(&self) -> KafkaResult { let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) }; diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 1cc6e05ce..fdc8ed6dc 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -367,6 +367,14 @@ where let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; match evtype { rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), + rdsys::RD_KAFKA_EVENT_ERROR => if let Some(queue_poll_error_cb) = self.client.queue_poll_error_cb { + let event_error_str = unsafe { + let event_error_str = rdsys::rd_kafka_event_error_string(ev.ptr()); + CStr::from_ptr(event_error_str).to_string_lossy() + }; + let event_error_str = String::from(event_error_str); + queue_poll_error_cb(event_error_str) + } _ => { let evname = unsafe { let evname = rdsys::rd_kafka_event_name(ev.ptr()); From 295e8b7107628ceec37ced3a047000451db6c0f7 Mon Sep 17 00:00:00 2001 From: Ignytis <155588001+ignytis@users.noreply.github.com> Date: Wed, 10 Apr 2024 21:14:33 +0300 Subject: [PATCH 2/3] Version bump --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 708bf88b1..e7f2fbeb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,7 +1070,7 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.36.2" +version = "0.36.3" dependencies = [ "async-std", "backoff", diff --git a/Cargo.toml b/Cargo.toml index e190bd18e..c1ce558a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rdkafka" -version = "0.36.2" +version = "0.36.3" authors = ["Federico Giraud "] repository = "https://github.com/fede1024/rust-rdkafka" readme = "README.md" From f5782a45a60acab1f19753194e73436d9d30e5ff Mon Sep 17 00:00:00 2001 From: Ignytis <155588001+ignytis@users.noreply.github.com> Date: Wed, 10 Apr 2024 21:36:14 +0300 Subject: [PATCH 3/3] Fall back to default behavior if no callback is provided --- src/producer/base_producer.rs | 42 ++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index fdc8ed6dc..bdecadcaf 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -362,26 +362,28 @@ where /// Regular calls to `poll` are required to process the events and execute /// the message delivery callbacks. pub fn poll>(&self, timeout: T) { - let event = self.client().poll_event(&self.queue, timeout.into()); - if let Some(ev) = event { - let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; - match evtype { - rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), - rdsys::RD_KAFKA_EVENT_ERROR => if let Some(queue_poll_error_cb) = self.client.queue_poll_error_cb { - let event_error_str = unsafe { - let event_error_str = rdsys::rd_kafka_event_error_string(ev.ptr()); - CStr::from_ptr(event_error_str).to_string_lossy() - }; - let event_error_str = String::from(event_error_str); - queue_poll_error_cb(event_error_str) - } - _ => { - let evname = unsafe { - let evname = rdsys::rd_kafka_event_name(ev.ptr()); - CStr::from_ptr(evname).to_string_lossy() - }; - warn!("Ignored event '{}' on base producer poll", evname); - } + let ev = match self.client().poll_event(&self.queue, timeout.into()) { + Some(e) => e, + None => return, + }; + + let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; + match evtype { + rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), + rdsys::RD_KAFKA_EVENT_ERROR if self.client.queue_poll_error_cb.is_some() => { + let queue_poll_error_cb = self.client.queue_poll_error_cb.unwrap(); + let event_error_str = unsafe { + let event_error_str = rdsys::rd_kafka_event_error_string(ev.ptr()); + CStr::from_ptr(event_error_str).to_string_lossy() + }; + queue_poll_error_cb(String::from(event_error_str)) + } + _ => { + let evname = unsafe { + let evname = rdsys::rd_kafka_event_name(ev.ptr()); + CStr::from_ptr(evname).to_string_lossy() + }; + warn!("Ignored event '{}' on base producer poll", evname); } } }