From 2a7b528f69dbf7c293241d413bffe56c440b49b1 Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Fri, 19 Jan 2024 14:45:18 -0500
Subject: [PATCH 01/10] Use from_bytes_until_nul to avoid problems with
 multiple nuls.

---
 src/config.rs | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/src/config.rs b/src/config.rs
index 296d9f867..50cbbe00f 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -150,7 +150,11 @@ impl NativeClientConfig {
         }
 
         // Convert the C string to a Rust string.
-        Ok(String::from_utf8_lossy(&buf).to_string())
+        let cstr = CStr::from_bytes_until_nul(&buf)
+            .unwrap()
+            .to_string_lossy()
+            .into();
+        Ok(cstr)
     }
 }
 

From cae942346745b3c726513e19eecc9c71a9fade1f Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Fri, 19 Jan 2024 14:45:32 -0500
Subject: [PATCH 02/10] Return error instead of panicing.

---
 src/consumer/stream_consumer.rs | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/src/consumer/stream_consumer.rs b/src/consumer/stream_consumer.rs
index 5a7f60552..96e9c7cd4 100644
--- a/src/consumer/stream_consumer.rs
+++ b/src/consumer/stream_consumer.rs
@@ -207,11 +207,20 @@ where
     fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<Self> {
         let native_config = config.create_native_config()?;
         let poll_interval = {
-            let millis: u64 = native_config
-                .get("max.poll.interval.ms")?
-                .parse()
-                .expect("librdkafka validated config value is valid u64");
-            Duration::from_millis(millis)
+            let millis = native_config.get("max.poll.interval.ms")?;
+            match millis.parse() {
+                Ok(millis) => Duration::from_millis(millis),
+                Err(e) => {
+                    println!("Config string: '{}'", millis);
+                    println!("Error: '{}'", e);
+                    return Err(KafkaError::ClientConfig(
+                        RDKafkaConfRes::RD_KAFKA_CONF_INVALID,
+                        "max.poll.interval.ms".to_string(),
+                        format!("Invalid integer: {}", e),
+                        millis,
+                    ));
+                }
+            }
         };
 
         let base = Arc::new(BaseConsumer::new(config, native_config, context)?);

From d23a97e80504933dad8458f27cd79483343f9737 Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Fri, 19 Jan 2024 14:58:02 -0500
Subject: [PATCH 03/10] Bump MSRV to 1.69.0 to pick up cargo index improvement
 and `CStr::from_bytes_until_nul`

---
 .github/workflows/ci.yml | 2 +-
 Cargo.toml               | 2 +-
 Dockerfile               | 1 +
 README.md                | 2 +-
 changelog.md             | 1 +
 rdkafka-sys/Cargo.toml   | 2 +-
 rdkafka-sys/build.rs     | 2 +-
 rdkafka-sys/changelog.md | 1 +
 src/lib.rs               | 2 +-
 9 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 96708961d..06df67deb 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -7,7 +7,7 @@ on:
     branches: [master]
 
 env:
-  rust_version: 1.61.0
+  rust_version: 1.69.0
 
 jobs:
   lint:
diff --git a/Cargo.toml b/Cargo.toml
index e190bd18e..064c7ec08 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,7 +10,7 @@ keywords = ["kafka", "rdkafka"]
 categories = ["api-bindings"]
 edition = "2018"
 exclude = ["Cargo.lock"]
-rust-version = "1.61"
+rust-version = "1.69"
 
 [workspace]
 members = ["rdkafka-sys"]
diff --git a/Dockerfile b/Dockerfile
index 04086b51c..ace214610 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y build-essential \
 
 RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2019-10-17
 ENV PATH /root/.cargo/bin/:$PATH
+ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse
 
 # # Create dummy project for rdkafka
 # COPY Cargo.toml /rdkafka/
diff --git a/README.md b/README.md
index 6070ef6a7..cb44723f3 100644
--- a/README.md
+++ b/README.md
@@ -184,7 +184,7 @@ re-exported as rdkafka features.
 
 ### Minimum supported Rust version (MSRV)
 
-The current minimum supported Rust version (MSRV) is 1.61.0. Note that
+The current minimum supported Rust version (MSRV) is 1.69.0. Note that
 bumping the MSRV is not considered a breaking change. Any release of
 rust-rdkafka may bump the MSRV.
 
diff --git a/changelog.md b/changelog.md
index 1806d5807..9aeaff9c8 100644
--- a/changelog.md
+++ b/changelog.md
@@ -3,6 +3,7 @@
 See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
 
 ## Unreleased
+* Bump MSRV to 1.69 to pick up cargo index improvement and `CStr::from_bytes_until_nul`
 
 ## 0.36.2 (2024-01-16)
 
diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml
index 4870e15b3..ea937216e 100644
--- a/rdkafka-sys/Cargo.toml
+++ b/rdkafka-sys/Cargo.toml
@@ -10,7 +10,7 @@ description = "Native bindings to the librdkafka library"
 keywords = ["kafka", "rdkafka"]
 categories = ["external-ffi-bindings"]
 edition = "2018"
-rust-version = "1.61"
+rust-version = "1.69"
 
 [dependencies]
 num_enum = "0.5.0"
diff --git a/rdkafka-sys/build.rs b/rdkafka-sys/build.rs
index d615744f3..b7c3e42ca 100644
--- a/rdkafka-sys/build.rs
+++ b/rdkafka-sys/build.rs
@@ -77,7 +77,7 @@ fn main() {
         // Ensure that we are in the right directory
         let rdkafkasys_root = Path::new("rdkafka-sys");
         if rdkafkasys_root.exists() {
-            assert!(env::set_current_dir(&rdkafkasys_root).is_ok());
+            assert!(env::set_current_dir(rdkafkasys_root).is_ok());
         }
         if !Path::new("librdkafka/LICENSE").exists() {
             eprintln!("Setting up submodules");
diff --git a/rdkafka-sys/changelog.md b/rdkafka-sys/changelog.md
index 44451c738..f07565790 100644
--- a/rdkafka-sys/changelog.md
+++ b/rdkafka-sys/changelog.md
@@ -1,6 +1,7 @@
 # Changelog
 
 ## Unreleased
+* Bump MSRV to sync with rdkafka crate
 
 ## v4.7.0+2.2.0 (2023-11-07)
 
diff --git a/src/lib.rs b/src/lib.rs
index 79a8d113f..8854adc92 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -176,7 +176,7 @@
 //!
 //! ### Minimum supported Rust version (MSRV)
 //!
-//! The current minimum supported Rust version (MSRV) is 1.61.0. Note that
+//! The current minimum supported Rust version (MSRV) is 1.69.0. Note that
 //! bumping the MSRV is not considered a breaking change. Any release of
 //! rust-rdkafka may bump the MSRV.
 //!

From 33a028fd5794b8f7dc6c6b89472f9138625ebeab Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Fri, 19 Jan 2024 15:07:29 -0500
Subject: [PATCH 04/10] Make random suffix longer.

---
 tests/utils.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/utils.rs b/tests/utils.rs
index 447213672..3be24ce77 100644
--- a/tests/utils.rs
+++ b/tests/utils.rs
@@ -20,7 +20,7 @@ use rdkafka::TopicPartitionList;
 pub fn rand_test_topic(test_name: &str) -> String {
     let id = rand::thread_rng()
         .gen_ascii_chars()
-        .take(10)
+        .take(20)
         .collect::<String>();
     format!("__{}_{}", test_name, id)
 }

From 88bbbe0cdce5f4e79c9d264c7de34588de74a3cf Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Fri, 19 Jan 2024 15:31:15 -0500
Subject: [PATCH 05/10] Address updated lints.

---
 src/consumer/mod.rs             | 4 ++--
 src/message.rs                  | 8 ++++----
 src/producer/base_producer.rs   | 2 ++
 src/producer/future_producer.rs | 1 +
 src/topic_partition_list.rs     | 6 +++---
 5 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs
index 95e91ffb0..5ce8b05b1 100644
--- a/src/consumer/mod.rs
+++ b/src/consumer/mod.rs
@@ -100,12 +100,12 @@ pub trait ConsumerContext: ClientContext + Sized {
     /// Pre-rebalance callback. This method will run before the rebalance and
     /// should terminate its execution quickly.
     #[allow(unused_variables)]
-    fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
+    fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
 
     /// Post-rebalance callback. This method will run after the rebalance and
     /// should terminate its execution quickly.
     #[allow(unused_variables)]
-    fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
+    fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'_>) {}
 
     // TODO: convert pointer to structure
     /// Post commit callback. This method will run after a group of offsets was
diff --git a/src/message.rs b/src/message.rs
index 76bac9c39..7a422608e 100644
--- a/src/message.rs
+++ b/src/message.rs
@@ -425,20 +425,20 @@ impl<'a> Message for BorrowedMessage<'a> {
     type Headers = BorrowedHeaders;
 
     fn key(&self) -> Option<&[u8]> {
-        unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
+        unsafe { util::ptr_to_opt_slice(self.ptr.key, self.ptr.key_len) }
     }
 
     fn payload(&self) -> Option<&[u8]> {
-        unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
+        unsafe { util::ptr_to_opt_slice(self.ptr.payload, self.ptr.len) }
     }
 
     unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
-        util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
+        util::ptr_to_opt_mut_slice(self.ptr.payload, self.ptr.len)
     }
 
     fn topic(&self) -> &str {
         unsafe {
-            CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
+            CStr::from_ptr(rdsys::rd_kafka_topic_name(self.ptr.rkt))
                 .to_str()
                 .expect("Topic name is not valid UTF-8")
         }
diff --git a/src/producer/base_producer.rs b/src/producer/base_producer.rs
index 1cc6e05ce..7623869f2 100644
--- a/src/producer/base_producer.rs
+++ b/src/producer/base_producer.rs
@@ -425,6 +425,7 @@ where
     /// Note that this method will never block.
     // Simplifying the return type requires generic associated types, which are
     // unstable.
+    #[allow(clippy::result_large_err)]
     pub fn send<'a, K, P>(
         &self,
         mut record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
@@ -701,6 +702,7 @@ where
     /// See the documentation for [`BaseProducer::send`] for details.
     // Simplifying the return type requires generic associated types, which are
     // unstable.
+    #[allow(clippy::result_large_err)]
     pub fn send<'a, K, P>(
         &self,
         record: BaseRecord<'a, K, P, C::DeliveryOpaque>,
diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs
index 0769a16a8..baae2cc15 100644
--- a/src/producer/future_producer.rs
+++ b/src/producer/future_producer.rs
@@ -346,6 +346,7 @@ where
 
     /// Like [`FutureProducer::send`], but if enqueuing fails, an error will be
     /// returned immediately, alongside the [`FutureRecord`] provided.
+    #[allow(clippy::result_large_err)]
     pub fn send_result<'a, K, P>(
         &self,
         record: FutureRecord<'a, K, P>,
diff --git a/src/topic_partition_list.rs b/src/topic_partition_list.rs
index 1d8e77ce9..16063bfbc 100644
--- a/src/topic_partition_list.rs
+++ b/src/topic_partition_list.rs
@@ -317,7 +317,7 @@ impl TopicPartitionList {
 
     /// Sets all partitions in the list to the specified offset.
     pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
-        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
+        let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
         for elem_ptr in slice {
             let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
             elem.set_offset(offset)?;
@@ -327,7 +327,7 @@ impl TopicPartitionList {
 
     /// Returns all the elements of the list.
     pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
-        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
+        let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
         let mut vec = Vec::with_capacity(slice.len());
         for elem_ptr in slice {
             vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
@@ -337,7 +337,7 @@ impl TopicPartitionList {
 
     /// Returns all the elements of the list that belong to the specified topic.
     pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
-        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
+        let slice = unsafe { slice::from_raw_parts_mut(self.ptr.elems, self.count()) };
         let mut vec = Vec::with_capacity(slice.len());
         for elem_ptr in slice {
             let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);

From 53df310b70f7f33819109c7c82bda0af63fae16b Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Mon, 22 Jan 2024 10:53:16 -0500
Subject: [PATCH 06/10] Test if sparse index is triggering clippy compiler
 error

---
 Dockerfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/Dockerfile b/Dockerfile
index ace214610..6f9036d32 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -10,7 +10,7 @@ RUN apt-get update && apt-get install -y build-essential \
 
 RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2019-10-17
 ENV PATH /root/.cargo/bin/:$PATH
-ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse
+#ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse
 
 # # Create dummy project for rdkafka
 # COPY Cargo.toml /rdkafka/

From e86a46aaecf2890987b6421e3bfb731ac3e32279 Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Mon, 22 Jan 2024 13:09:54 -0500
Subject: [PATCH 07/10] Bump to 1.70 to see if compiler panic is fixed.

---
 .github/workflows/ci.yml | 2 +-
 Cargo.toml               | 2 +-
 Dockerfile               | 2 +-
 README.md                | 2 +-
 changelog.md             | 2 +-
 rdkafka-sys/Cargo.toml   | 2 +-
 src/lib.rs               | 2 +-
 7 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 06df67deb..d2988e35e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -7,7 +7,7 @@ on:
     branches: [master]
 
 env:
-  rust_version: 1.69.0
+  rust_version: 1.70.0
 
 jobs:
   lint:
diff --git a/Cargo.toml b/Cargo.toml
index 064c7ec08..b9045ce58 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,7 +10,7 @@ keywords = ["kafka", "rdkafka"]
 categories = ["api-bindings"]
 edition = "2018"
 exclude = ["Cargo.lock"]
-rust-version = "1.69"
+rust-version = "1.70"
 
 [workspace]
 members = ["rdkafka-sys"]
diff --git a/Dockerfile b/Dockerfile
index 6f9036d32..ace214610 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -10,7 +10,7 @@ RUN apt-get update && apt-get install -y build-essential \
 
 RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2019-10-17
 ENV PATH /root/.cargo/bin/:$PATH
-#ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse
+ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL sparse
 
 # # Create dummy project for rdkafka
 # COPY Cargo.toml /rdkafka/
diff --git a/README.md b/README.md
index cb44723f3..333fb5140 100644
--- a/README.md
+++ b/README.md
@@ -184,7 +184,7 @@ re-exported as rdkafka features.
 
 ### Minimum supported Rust version (MSRV)
 
-The current minimum supported Rust version (MSRV) is 1.69.0. Note that
+The current minimum supported Rust version (MSRV) is 1.70.0. Note that
 bumping the MSRV is not considered a breaking change. Any release of
 rust-rdkafka may bump the MSRV.
 
diff --git a/changelog.md b/changelog.md
index 9aeaff9c8..b354c0f97 100644
--- a/changelog.md
+++ b/changelog.md
@@ -3,7 +3,7 @@
 See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
 
 ## Unreleased
-* Bump MSRV to 1.69 to pick up cargo index improvement and `CStr::from_bytes_until_nul`
+* Bump MSRV to 1.70 to pick up cargo index improvement and `CStr::from_bytes_until_nul`
 
 ## 0.36.2 (2024-01-16)
 
diff --git a/rdkafka-sys/Cargo.toml b/rdkafka-sys/Cargo.toml
index ea937216e..7ea379ef8 100644
--- a/rdkafka-sys/Cargo.toml
+++ b/rdkafka-sys/Cargo.toml
@@ -10,7 +10,7 @@ description = "Native bindings to the librdkafka library"
 keywords = ["kafka", "rdkafka"]
 categories = ["external-ffi-bindings"]
 edition = "2018"
-rust-version = "1.69"
+rust-version = "1.70"
 
 [dependencies]
 num_enum = "0.5.0"
diff --git a/src/lib.rs b/src/lib.rs
index 8854adc92..46709c5a7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -176,7 +176,7 @@
 //!
 //! ### Minimum supported Rust version (MSRV)
 //!
-//! The current minimum supported Rust version (MSRV) is 1.69.0. Note that
+//! The current minimum supported Rust version (MSRV) is 1.70.0. Note that
 //! bumping the MSRV is not considered a breaking change. Any release of
 //! rust-rdkafka may bump the MSRV.
 //!

From fe1fb6543514825a944bef25ed735f5c54cf8337 Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Mon, 22 Jan 2024 15:08:37 -0500
Subject: [PATCH 08/10] Use tokio sleep instead of synchronous thread sleep.

---
 tests/test_low_consumers.rs | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs
index e6642b688..25bde2a80 100644
--- a/tests/test_low_consumers.rs
+++ b/tests/test_low_consumers.rs
@@ -401,7 +401,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     assert!(consumer.poll(Duration::from_secs(0)).is_none());
 
     // Expect no wakeups for 1s.
-    thread::sleep(Duration::from_secs(1));
+    tokio::time::sleep(Duration::from_secs(1)).await;
     assert_eq!(wakeups.load(Ordering::SeqCst), 0);
 
     // Verify there are no messages waiting.
@@ -418,7 +418,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     // Add more messages to the topic. Expect no additional wakeups, as the
     // queue is not fully drained, for 1s.
     populate_topic(&topic_name, 2, &value_fn, &key_fn, None, None).await;
-    thread::sleep(Duration::from_secs(1));
+    tokio::time::sleep(Duration::from_secs(1)).await;
     assert_eq!(wakeups.load(Ordering::SeqCst), 1);
 
     // Drain the queue.
@@ -427,7 +427,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     assert!(queue.poll(None).is_some());
 
     // Expect no additional wakeups for 1s.
-    thread::sleep(Duration::from_secs(1));
+    tokio::time::sleep(Duration::from_secs(1)).await;
     assert_eq!(wakeups.load(Ordering::SeqCst), 1);
 
     // Add another message, and expect a wakeup.
@@ -435,7 +435,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     wait_for_wakeups(2);
 
     // Expect no additional wakeups for 1s.
-    thread::sleep(Duration::from_secs(1));
+    tokio::time::sleep(Duration::from_secs(1)).await;
     assert_eq!(wakeups.load(Ordering::SeqCst), 2);
 
     // Disable the queue and add another message.
@@ -443,7 +443,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     populate_topic(&topic_name, 1, &value_fn, &key_fn, None, None).await;
 
     // Expect no additional wakeups for 1s.
-    thread::sleep(Duration::from_secs(1));
+    tokio::time::sleep(Duration::from_secs(1)).await;
     assert_eq!(wakeups.load(Ordering::SeqCst), 2);
 }
 

From 15ccb4d54c7aa502fa00f4e5cfb5e9f82ae754e8 Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Mon, 22 Jan 2024 15:54:25 -0500
Subject: [PATCH 09/10] Test to see if we might sometimes get a spurious
 initial callback.

---
 tests/test_low_consumers.rs | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs
index 25bde2a80..ff866e372 100644
--- a/tests/test_low_consumers.rs
+++ b/tests/test_low_consumers.rs
@@ -401,8 +401,8 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     assert!(consumer.poll(Duration::from_secs(0)).is_none());
 
     // Expect no wakeups for 1s.
-    tokio::time::sleep(Duration::from_secs(1)).await;
-    assert_eq!(wakeups.load(Ordering::SeqCst), 0);
+    //tokio::time::sleep(Duration::from_secs(1)).await;
+    //assert_eq!(wakeups.load(Ordering::SeqCst), 0);
 
     // Verify there are no messages waiting.
     assert!(consumer.poll(Duration::from_secs(0)).is_none());

From fea66c1a155be8add7532e64617116e7a452cd84 Mon Sep 17 00:00:00 2001
From: David Blewett <david.blewett@datadoghq.com>
Date: Mon, 22 Jan 2024 16:24:56 -0500
Subject: [PATCH 10/10] restore initial test, move queue creation before assign
 and only clone once

---
 tests/test_low_consumers.rs | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs
index ff866e372..aacf23364 100644
--- a/tests/test_low_consumers.rs
+++ b/tests/test_low_consumers.rs
@@ -365,6 +365,7 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
         .create_with_context(ConsumerTestContext { _n: 64 })
         .expect("Consumer creation failed");
     let consumer = Arc::new(consumer);
+    let mut queue = consumer.split_partition_queue(&topic_name, 0).unwrap();
 
     let mut tpl = TopicPartitionList::new();
     tpl.add_partition_offset(&topic_name, 0, Offset::Beginning)
@@ -372,11 +373,10 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     consumer.assign(&tpl).unwrap();
 
     let wakeups = Arc::new(AtomicUsize::new(0));
-    let mut queue = consumer.split_partition_queue(&topic_name, 0).unwrap();
+    let cb_wakeups = wakeups.clone();
     queue.set_nonempty_callback({
-        let wakeups = wakeups.clone();
         move || {
-            wakeups.fetch_add(1, Ordering::SeqCst);
+            cb_wakeups.fetch_add(1, Ordering::SeqCst);
         }
     });
 
@@ -401,8 +401,8 @@ async fn test_produce_consume_message_queue_nonempty_callback() {
     assert!(consumer.poll(Duration::from_secs(0)).is_none());
 
     // Expect no wakeups for 1s.
-    //tokio::time::sleep(Duration::from_secs(1)).await;
-    //assert_eq!(wakeups.load(Ordering::SeqCst), 0);
+    tokio::time::sleep(Duration::from_secs(1)).await;
+    assert_eq!(wakeups.load(Ordering::SeqCst), 0);
 
     // Verify there are no messages waiting.
     assert!(consumer.poll(Duration::from_secs(0)).is_none());