diff --git a/Cargo.lock b/Cargo.lock index f6d11d80f..d75b55cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1254,7 +1254,7 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.36.2" +version = "0.36.3" dependencies = [ "async-std", "backoff", diff --git a/Cargo.toml b/Cargo.toml index b9045ce58..21fd21afa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rdkafka" -version = "0.36.2" +version = "0.36.3" authors = ["Federico Giraud "] repository = "https://github.com/fede1024/rust-rdkafka" readme = "README.md" diff --git a/src/client.rs b/src/client.rs index 231bcf908..395baa22e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -227,6 +227,8 @@ impl From> for Option { pub struct Client { native: NativeClient, context: Arc, + /// A poll error callback for main queue + pub queue_poll_error_cb: Option, } impl Client { @@ -282,6 +284,7 @@ impl Client { Ok(Client { native: unsafe { NativeClient::from_ptr(client_ptr) }, context, + queue_poll_error_cb: config.queue_poll_error_cb, }) } diff --git a/src/config.rs b/src/config.rs index 1f5e34247..bda6da833 100644 --- a/src/config.rs +++ b/src/config.rs @@ -187,6 +187,8 @@ pub struct ClientConfig { /// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list /// of available levels. pub log_level: RDKafkaLogLevel, + /// Poll error callback + pub queue_poll_error_cb: Option, } impl Default for ClientConfig { @@ -201,6 +203,7 @@ impl ClientConfig { ClientConfig { conf_map: HashMap::new(), log_level: log_level_from_global_config(), + queue_poll_error_cb: None, } } @@ -248,6 +251,12 @@ impl ClientConfig { self } + /// Sets the error callback for poll method. + pub fn set_queue_poll_error_cb(&mut self, error_cb: fn(String)) -> &mut ClientConfig { + self.queue_poll_error_cb = Some(error_cb); + self + } + /// Builds a native librdkafka configuration. pub fn create_native_config(&self) -> KafkaResult { let conf = unsafe { NativeClientConfig::from_ptr(rdsys::rd_kafka_conf_new()) }; diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs index 7edd4befd..f87a7cbe0 100644 --- a/src/producer/base_producer.rs +++ b/src/producer/base_producer.rs @@ -367,6 +367,14 @@ where let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) }; match evtype { rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev), + rdsys::RD_KAFKA_EVENT_ERROR if self.client.queue_poll_error_cb.is_some() => { + let queue_poll_error_cb = self.client.queue_poll_error_cb.unwrap(); + let event_error_str = unsafe { + let event_error_str = rdsys::rd_kafka_event_error_string(ev.ptr()); + CStr::from_ptr(event_error_str).to_string_lossy() + }; + queue_poll_error_cb(String::from(event_error_str)) + }, _ => { let evname = unsafe { let evname = rdsys::rd_kafka_event_name(ev.ptr());