From aa7a407b401f362bdbf6eb144227e828a9f51589 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Fri, 13 Sep 2024 17:56:34 +0300 Subject: [PATCH 1/6] Add DescribeTopics support for admin client API --- Cargo.lock | 7 ++ Cargo.toml | 1 + rdkafka-sys/src/types.rs | 15 +++ src/admin.rs | 261 +++++++++++++++++++++++++++++++++++---- 4 files changed, 263 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 708bf88b1..33ff75b18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1095,6 +1095,7 @@ dependencies = [ "smol", "tokio", "tracing", + "uuid", ] [[package]] @@ -1412,6 +1413,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" + [[package]] name = "vcpkg" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index b9045ce58..80e4dae6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ serde_json = "1.0.0" slab = "0.4" tokio = { version = "1.18", features = ["rt", "time"], optional = true } tracing = { version = "0.1.30", optional = true } +uuid = "1.10.0" [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 0005073ba..d227ddb1e 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -99,6 +99,18 @@ pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t; /// Native rdkafka mock cluster. pub type RDKafkaMockCluster = bindings::rd_kafka_mock_cluster_t; +/// Native rdkafka topic collection. +pub type RDKafkaTopicCollection = bindings::rd_kafka_TopicCollection_t; + +/// Native rdkafka node. +pub type RDKafkaNode = bindings::rd_kafka_Node_t; + +/// Native rdkafka topic description. +pub type RDKafkaTopicDescription = bindings::rd_kafka_TopicDescription_t; + +/// Native rdkafka topic partition info. +pub type RDKafkaTopicPartitionInfo = bindings::rd_kafka_TopicPartitionInfo_t; + // ENUMS /// Client types. @@ -119,6 +131,9 @@ pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType; /// Config source. pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource; +/// ACL operation. +pub use bindings::rd_kafka_AclOperation_t as RDKafkaAclOperation; + // Errors enum /// Native rdkafka error code. diff --git a/src/admin.rs b/src/admin.rs index 0418f0cac..398fe83ff 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -28,6 +28,7 @@ use crate::log::{trace, warn}; use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout}; use crate::TopicPartitionList; +use uuid::Uuid; // // ********** ADMIN CLIENT ********** // @@ -53,9 +54,9 @@ impl AdminClient { &self, topics: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.create_topics_inner(topics, opts) { Ok(rx) => Either::Left(CreateTopicsFuture { rx }), @@ -69,7 +70,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_topics = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -98,7 +99,7 @@ impl AdminClient { &self, topic_names: &[&str], opts: &AdminOptions, - ) -> impl Future>> { + ) -> impl Future>> { match self.delete_topics_inner(topic_names, opts) { Ok(rx) => Either::Left(DeleteTopicsFuture { rx }), Err(err) => Either::Right(future::err(err)), @@ -137,7 +138,7 @@ impl AdminClient { &self, group_names: &[&str], opts: &AdminOptions, - ) -> impl Future>> { + ) -> impl Future>> { match self.delete_groups_inner(group_names, opts) { Ok(rx) => Either::Left(DeleteGroupsFuture { rx }), Err(err) => Either::Right(future::err(err)), @@ -183,9 +184,9 @@ impl AdminClient { &self, partitions: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.create_partitions_inner(partitions, opts) { Ok(rx) => Either::Left(CreatePartitionsFuture { rx }), @@ -199,7 +200,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_partitions = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -236,7 +237,7 @@ impl AdminClient { &self, offsets: &TopicPartitionList, opts: &AdminOptions, - ) -> impl Future> { + ) -> impl Future> { match self.delete_records_inner(offsets, opts) { Ok(rx) => Either::Left(DeleteRecordsFuture { rx }), Err(err) => Either::Right(future::err(err)), @@ -252,7 +253,7 @@ impl AdminClient { let delete_records = unsafe { NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr())) } - .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; + .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; unsafe { rdsys::rd_kafka_DeleteRecords( @@ -275,9 +276,9 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.describe_configs_inner(configs, opts) { Ok(rx) => Either::Left(DescribeConfigsFuture { rx }), @@ -291,7 +292,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_configs = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -315,7 +316,7 @@ impl AdminClient { typ, name.as_ptr(), )) - .unwrap() + .unwrap() }); } let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; @@ -340,9 +341,9 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.alter_configs_inner(configs, opts) { Ok(rx) => Either::Left(AlterConfigsFuture { rx }), @@ -356,7 +357,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_configs = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -376,6 +377,43 @@ impl AdminClient { Ok(rx) } + pub fn describe_topics( + &self, + topic_names: &[&str], + opts: &AdminOptions, + ) -> impl Future>> { + match self.describe_topics_inner(topic_names, opts) { + Ok(rx) => Either::Left(DescribeTopicsFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn describe_topics_inner( + &self, + topic_names: &[&str], + opts: &AdminOptions, + ) -> KafkaResult> { + let topic_names_string_array = topic_names + .iter() + .map(|tn| CString::new(*tn).map(|s| s.as_ptr())) + .collect::, _>>()? + .as_mut_ptr(); + let native_topic_collection = unsafe { + NativeTopicCollection::from_ptr(rdsys::rd_kafka_TopicCollection_of_topic_names(topic_names_string_array, topic_names.len())).unwrap() + }; + let mut err_buf = ErrBuf::new(); + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DescribeTopics( + self.client.native_ptr(), + native_topic_collection.ptr(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + /// Returns the client underlying this admin client. pub fn inner(&self) -> &Client { &self.client @@ -542,7 +580,7 @@ impl AdminOptions { client, RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY, )) - .unwrap() + .unwrap() }; if let Some(timeout) = self.request_timeout { @@ -737,7 +775,7 @@ impl<'a> NewTopic<'a> { err_buf.capacity(), )) } - .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; + .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; if let TopicReplication::Variable(assignment) = self.replication { for (partition_id, broker_ids) in assignment.iter().enumerate() { @@ -946,7 +984,7 @@ impl<'a> NewPartitions<'a> { err_buf.capacity(), )) } - .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; + .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; if let Some(assignment) = self.assignment { for (partition_id, broker_ids) in assignment.iter().enumerate() { @@ -1249,7 +1287,7 @@ impl Future for DescribeConfigsFuture { /// The result of an individual AlterConfig operation. pub type AlterConfigsResult = - Result; +Result; /// Configuration for an AlterConfig operation. pub struct AlterConfig<'a> { @@ -1340,3 +1378,184 @@ impl Future for AlterConfigsFuture { Poll::Ready(Ok(out)) } } + +// +// Describe topics handling +// + +#[derive(Debug)] +pub struct Node { + pub id: i32, + pub host: String, + pub port: u16, + pub rack: Option, +} + +#[derive(Debug)] +pub struct TopicPartitionInfo { + pub partition: i32, + pub leader: Node, + pub isr: Vec, + pub replicas: Vec, +} + +#[derive(Debug, Eq, PartialEq)] +pub enum AclOperation { + Unknown, + Any, + All, + Read, + Write, + Create, + Delete, + Alter, + Describe, + ClusterAction, + DescribeConfigs, + AlterConfigs, + IdempotentWrite, +} + +#[derive(Debug)] +pub struct TopicDescription { + pub name: String, + pub topic_id: Uuid, + pub partitions: Vec, + pub is_internal: bool, + pub authorized_operations: Vec, +} + +type NativeTopicCollection = NativePtr; + +unsafe impl KafkaDrop for RDKafkaTopicCollection { + const TYPE: &'static str = "topic collection"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_TopicCollection_destroy; +} + + +struct DescribeTopicsFuture { + rx: oneshot::Receiver, +} + +impl Future for DescribeTopicsFuture { + type Output = KafkaResult>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DescribeTopics_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "describe topics request received response of incorrect type ({})", + typ + )))); + } + let mut n = 0; + let topic_descriptions = unsafe { rdsys::rd_kafka_DescribeTopics_result_topics(res, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + let topic_description = unsafe { *topic_descriptions.add(i) }; + let topic_description = TopicDescription { + name: unsafe { cstr_to_owned(rdsys::rd_kafka_TopicDescription_name(topic_description)) }, + topic_id: extract_topic_id(topic_description), + partitions: extract_partitions(topic_description), + is_internal: unsafe { rdsys::rd_kafka_TopicDescription_is_internal(topic_description) } != 0, + authorized_operations: extract_authorized_operations(topic_description)?, + }; + out.push(topic_description); + } + Poll::Ready(Ok(out)) + } +} + +fn extract_topic_id(topic_description: *const RDKafkaTopicDescription) -> Uuid { + let topic_id = unsafe { rdsys::rd_kafka_TopicDescription_topic_id(topic_description) }; + let high_bits = unsafe { rdsys::rd_kafka_Uuid_most_significant_bits(topic_id) } as u64; + let low_bits = unsafe { rdsys::rd_kafka_Uuid_least_significant_bits(topic_id) } as u64; + Uuid::from_u64_pair(high_bits, low_bits) +} + +fn extract_partitions(topic_description: *const RDKafkaTopicDescription) -> Vec { + let mut n = 0; + let partitions = unsafe { rdsys::rd_kafka_TopicDescription_partitions(topic_description, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + let partition = unsafe { *partitions.add(i) }; + let leader = extract_node(unsafe { rdsys::rd_kafka_TopicPartitionInfo_leader(partition) }); + let isr = extract_isr(partition); + let replicas = extract_replicas(partition ); + out.push(TopicPartitionInfo { + partition: unsafe { rdsys::rd_kafka_TopicPartitionInfo_partition(partition) }, + leader, + isr, + replicas, + }); + } + out +} + +fn extract_node(node: *const RDKafkaNode) -> Node { + let rack = unsafe { rdsys::rd_kafka_Node_rack(node) }; + let rack = if rack.is_null() { + None + } else { + Some(unsafe { cstr_to_owned(rack) }) + }; + Node { + id: unsafe { rdsys::rd_kafka_Node_id(node) }, + host: unsafe { cstr_to_owned(rdsys::rd_kafka_Node_host(node)) }, + port: unsafe { rdsys::rd_kafka_Node_port(node) }, + rack, + } +} + +fn extract_isr(partition: *const RDKafkaTopicPartitionInfo) -> Vec { + let mut n = 0; + let nodes = unsafe { rdsys::rd_kafka_TopicPartitionInfo_isr(partition, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + out.push(extract_node(unsafe { *nodes.add(i) })); + } + out +} + +fn extract_replicas(nodes: *const RDKafkaTopicPartitionInfo) -> Vec { + let mut n = 0; + let nodes = unsafe { rdsys::rd_kafka_TopicPartitionInfo_replicas(nodes, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + out.push(extract_node(unsafe { *nodes.add(i) })); + } + out +} + + +fn extract_authorized_operations(topic_description: *const RDKafkaTopicDescription) -> KafkaResult> { + let mut n = 0; + let operations = unsafe { rdsys::rd_kafka_TopicDescription_authorized_operations(topic_description, &mut n) }; + let mut out = Vec::with_capacity(n); + for i in 0..n { + let operation = unsafe { *operations.add(i) }; + out.push(match operation { + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_UNKNOWN => AclOperation::Unknown, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ANY => AclOperation::Any, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALL => AclOperation::All, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_READ => AclOperation::Read, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_WRITE => AclOperation::Write, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CREATE => AclOperation::Create, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DELETE => AclOperation::Delete, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER => AclOperation::Alter, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE => AclOperation::Describe, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION => AclOperation::ClusterAction, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS => AclOperation::DescribeConfigs, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS => AclOperation::AlterConfigs, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE => AclOperation::IdempotentWrite, + _ => return Err(KafkaError::AdminOpCreation(format!( + "bogus acl operation in kafka response: {:?}", + operation + ))) + }); + } + Ok(out) +} From 5da2ef19193c7d9bc59776b92bca5a6c4d009f65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Mon, 16 Sep 2024 14:28:26 +0300 Subject: [PATCH 2/6] Format project --- src/admin.rs | 102 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 398fe83ff..d22ba9aad 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -54,9 +54,9 @@ impl AdminClient { &self, topics: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.create_topics_inner(topics, opts) { Ok(rx) => Either::Left(CreateTopicsFuture { rx }), @@ -70,7 +70,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_topics = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -99,7 +99,7 @@ impl AdminClient { &self, topic_names: &[&str], opts: &AdminOptions, - ) -> impl Future>> { + ) -> impl Future>> { match self.delete_topics_inner(topic_names, opts) { Ok(rx) => Either::Left(DeleteTopicsFuture { rx }), Err(err) => Either::Right(future::err(err)), @@ -138,7 +138,7 @@ impl AdminClient { &self, group_names: &[&str], opts: &AdminOptions, - ) -> impl Future>> { + ) -> impl Future>> { match self.delete_groups_inner(group_names, opts) { Ok(rx) => Either::Left(DeleteGroupsFuture { rx }), Err(err) => Either::Right(future::err(err)), @@ -184,9 +184,9 @@ impl AdminClient { &self, partitions: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.create_partitions_inner(partitions, opts) { Ok(rx) => Either::Left(CreatePartitionsFuture { rx }), @@ -200,7 +200,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_partitions = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -237,7 +237,7 @@ impl AdminClient { &self, offsets: &TopicPartitionList, opts: &AdminOptions, - ) -> impl Future> { + ) -> impl Future> { match self.delete_records_inner(offsets, opts) { Ok(rx) => Either::Left(DeleteRecordsFuture { rx }), Err(err) => Either::Right(future::err(err)), @@ -253,7 +253,7 @@ impl AdminClient { let delete_records = unsafe { NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr())) } - .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; + .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; unsafe { rdsys::rd_kafka_DeleteRecords( @@ -276,9 +276,9 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.describe_configs_inner(configs, opts) { Ok(rx) => Either::Left(DescribeConfigsFuture { rx }), @@ -292,7 +292,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_configs = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -316,7 +316,7 @@ impl AdminClient { typ, name.as_ptr(), )) - .unwrap() + .unwrap() }); } let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; @@ -341,9 +341,9 @@ impl AdminClient { &self, configs: I, opts: &AdminOptions, - ) -> impl Future>> + ) -> impl Future>> where - I: IntoIterator>, + I: IntoIterator>, { match self.alter_configs_inner(configs, opts) { Ok(rx) => Either::Left(AlterConfigsFuture { rx }), @@ -357,7 +357,7 @@ impl AdminClient { opts: &AdminOptions, ) -> KafkaResult> where - I: IntoIterator>, + I: IntoIterator>, { let mut native_configs = Vec::new(); let mut err_buf = ErrBuf::new(); @@ -381,7 +381,7 @@ impl AdminClient { &self, topic_names: &[&str], opts: &AdminOptions, - ) -> impl Future>> { + ) -> impl Future>> { match self.describe_topics_inner(topic_names, opts) { Ok(rx) => Either::Left(DescribeTopicsFuture { rx }), Err(err) => Either::Right(future::err(err)), @@ -399,7 +399,11 @@ impl AdminClient { .collect::, _>>()? .as_mut_ptr(); let native_topic_collection = unsafe { - NativeTopicCollection::from_ptr(rdsys::rd_kafka_TopicCollection_of_topic_names(topic_names_string_array, topic_names.len())).unwrap() + NativeTopicCollection::from_ptr(rdsys::rd_kafka_TopicCollection_of_topic_names( + topic_names_string_array, + topic_names.len(), + )) + .unwrap() }; let mut err_buf = ErrBuf::new(); let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; @@ -580,7 +584,7 @@ impl AdminOptions { client, RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY, )) - .unwrap() + .unwrap() }; if let Some(timeout) = self.request_timeout { @@ -775,7 +779,7 @@ impl<'a> NewTopic<'a> { err_buf.capacity(), )) } - .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; + .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; if let TopicReplication::Variable(assignment) = self.replication { for (partition_id, broker_ids) in assignment.iter().enumerate() { @@ -984,7 +988,7 @@ impl<'a> NewPartitions<'a> { err_buf.capacity(), )) } - .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; + .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?; if let Some(assignment) = self.assignment { for (partition_id, broker_ids) in assignment.iter().enumerate() { @@ -1287,7 +1291,7 @@ impl Future for DescribeConfigsFuture { /// The result of an individual AlterConfig operation. pub type AlterConfigsResult = -Result; + Result; /// Configuration for an AlterConfig operation. pub struct AlterConfig<'a> { @@ -1432,7 +1436,6 @@ unsafe impl KafkaDrop for RDKafkaTopicCollection { const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_TopicCollection_destroy; } - struct DescribeTopicsFuture { rx: oneshot::Receiver, } @@ -1452,15 +1455,20 @@ impl Future for DescribeTopicsFuture { )))); } let mut n = 0; - let topic_descriptions = unsafe { rdsys::rd_kafka_DescribeTopics_result_topics(res, &mut n) }; + let topic_descriptions = + unsafe { rdsys::rd_kafka_DescribeTopics_result_topics(res, &mut n) }; let mut out = Vec::with_capacity(n); for i in 0..n { let topic_description = unsafe { *topic_descriptions.add(i) }; let topic_description = TopicDescription { - name: unsafe { cstr_to_owned(rdsys::rd_kafka_TopicDescription_name(topic_description)) }, + name: unsafe { + cstr_to_owned(rdsys::rd_kafka_TopicDescription_name(topic_description)) + }, topic_id: extract_topic_id(topic_description), partitions: extract_partitions(topic_description), - is_internal: unsafe { rdsys::rd_kafka_TopicDescription_is_internal(topic_description) } != 0, + is_internal: unsafe { + rdsys::rd_kafka_TopicDescription_is_internal(topic_description) + } != 0, authorized_operations: extract_authorized_operations(topic_description)?, }; out.push(topic_description); @@ -1476,15 +1484,18 @@ fn extract_topic_id(topic_description: *const RDKafkaTopicDescription) -> Uuid { Uuid::from_u64_pair(high_bits, low_bits) } -fn extract_partitions(topic_description: *const RDKafkaTopicDescription) -> Vec { +fn extract_partitions( + topic_description: *const RDKafkaTopicDescription, +) -> Vec { let mut n = 0; - let partitions = unsafe { rdsys::rd_kafka_TopicDescription_partitions(topic_description, &mut n) }; + let partitions = + unsafe { rdsys::rd_kafka_TopicDescription_partitions(topic_description, &mut n) }; let mut out = Vec::with_capacity(n); for i in 0..n { let partition = unsafe { *partitions.add(i) }; let leader = extract_node(unsafe { rdsys::rd_kafka_TopicPartitionInfo_leader(partition) }); let isr = extract_isr(partition); - let replicas = extract_replicas(partition ); + let replicas = extract_replicas(partition); out.push(TopicPartitionInfo { partition: unsafe { rdsys::rd_kafka_TopicPartitionInfo_partition(partition) }, leader, @@ -1530,10 +1541,13 @@ fn extract_replicas(nodes: *const RDKafkaTopicPartitionInfo) -> Vec { out } - -fn extract_authorized_operations(topic_description: *const RDKafkaTopicDescription) -> KafkaResult> { +fn extract_authorized_operations( + topic_description: *const RDKafkaTopicDescription, +) -> KafkaResult> { let mut n = 0; - let operations = unsafe { rdsys::rd_kafka_TopicDescription_authorized_operations(topic_description, &mut n) }; + let operations = unsafe { + rdsys::rd_kafka_TopicDescription_authorized_operations(topic_description, &mut n) + }; let mut out = Vec::with_capacity(n); for i in 0..n { let operation = unsafe { *operations.add(i) }; @@ -1547,14 +1561,22 @@ fn extract_authorized_operations(topic_description: *const RDKafkaTopicDescripti RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DELETE => AclOperation::Delete, RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER => AclOperation::Alter, RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE => AclOperation::Describe, - RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION => AclOperation::ClusterAction, - RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS => AclOperation::DescribeConfigs, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION => { + AclOperation::ClusterAction + } + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS => { + AclOperation::DescribeConfigs + } RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS => AclOperation::AlterConfigs, - RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE => AclOperation::IdempotentWrite, - _ => return Err(KafkaError::AdminOpCreation(format!( - "bogus acl operation in kafka response: {:?}", - operation - ))) + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE => { + AclOperation::IdempotentWrite + } + _ => { + return Err(KafkaError::AdminOpCreation(format!( + "bogus acl operation in kafka response: {:?}", + operation + ))) + } }); } Ok(out) From 192c7968c628f6dae7912afee4894becae6e25fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Mon, 16 Sep 2024 14:40:53 +0300 Subject: [PATCH 3/6] Add missing docs, admin options and tests --- rdkafka-sys/src/types.rs | 6 ++ src/admin.rs | 183 ++++++++++++++++++++++++++++++++++----- tests/test_admin.rs | 75 ++++++++++++++++ 3 files changed, 243 insertions(+), 21 deletions(-) diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index d227ddb1e..3d35f0cc8 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -131,6 +131,12 @@ pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType; /// Config source. pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource; +/// Isolation level. +pub use bindings::rd_kafka_IsolationLevel_t as RDKafkaIsolationLevel; + +/// Consumer group state. +pub use bindings::rd_kafka_consumer_group_state_t as RDKafkaConsumerGroupState; + /// ACL operation. pub use bindings::rd_kafka_AclOperation_t as RDKafkaAclOperation; diff --git a/src/admin.rs b/src/admin.rs index d22ba9aad..2ed4609df 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -377,31 +377,45 @@ impl AdminClient { Ok(rx) } - pub fn describe_topics( + /// Describe topics as specified by the `topic_names` array. + pub fn describe_topics<'a, I>( &self, - topic_names: &[&str], + topic_names: I, opts: &AdminOptions, - ) -> impl Future>> { + ) -> impl Future>> + where + I: IntoIterator, + { match self.describe_topics_inner(topic_names, opts) { Ok(rx) => Either::Left(DescribeTopicsFuture { rx }), Err(err) => Either::Right(future::err(err)), } } - fn describe_topics_inner( + fn describe_topics_inner<'a, I>( &self, - topic_names: &[&str], + topic_names: I, opts: &AdminOptions, - ) -> KafkaResult> { - let topic_names_string_array = topic_names + ) -> KafkaResult> + where + I: IntoIterator, + { + let topic_names_cstrings = topic_names + .into_iter() + .map(|t| CString::new(t)) + .collect::, _>>()?; + + // Don't consume topic_names_cstrings here because pointers become invalid. + // Use .iter() instead of .into_iter() + let mut topic_names_ptrs = topic_names_cstrings .iter() - .map(|tn| CString::new(*tn).map(|s| s.as_ptr())) - .collect::, _>>()? - .as_mut_ptr(); + .map(|s| s.as_ptr()) + .collect::>(); + let native_topic_collection = unsafe { NativeTopicCollection::from_ptr(rdsys::rd_kafka_TopicCollection_of_topic_names( - topic_names_string_array, - topic_names.len(), + topic_names_ptrs.as_mut_ptr(), + topic_names_ptrs.len(), )) .unwrap() }; @@ -525,6 +539,10 @@ pub struct AdminOptions { operation_timeout: Option, validate_only: bool, broker_id: Option, + require_stable_offsets: bool, + include_authorized_operations: bool, + match_consumer_group_states: Option>, + isolation_level: Option, } impl AdminOptions { @@ -574,6 +592,39 @@ impl AdminOptions { self } + /// Whether the broker should return stable offsets (transaction-committed). + /// + /// Defaults to false. + pub fn require_stable_offsets(mut self, require_stable_offsets: bool) -> Self { + self.require_stable_offsets = require_stable_offsets; + self + } + + /// Whether the broker should return authorized operations. + /// + /// Defaults to false. + pub fn include_authorized_operations(mut self, include_authorized_operations: bool) -> Self { + self.include_authorized_operations = include_authorized_operations; + self + } + + /// List of consumer group states to query for. + pub fn match_consumer_group_states>>( + mut self, + match_consumer_group_states: T, + ) -> Self { + self.match_consumer_group_states = Some(match_consumer_group_states.into()); + self + } + + /// Isolation Level needed for list Offset to query for. + /// + /// Defaults to [`IsolationLevel::ReadUncommitted`] + pub fn isolation_level>(mut self, isolation_level: T) -> Self { + self.isolation_level = Some(isolation_level.into()); + self + } + fn to_native( &self, client: *mut RDKafka, @@ -635,6 +686,48 @@ impl AdminOptions { check_rdkafka_invalid_arg(res, err_buf)?; } + if self.require_stable_offsets { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_require_stable_offsets( + native_opts.ptr(), + 1, // true + ) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if self.include_authorized_operations { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_include_authorized_operations( + native_opts.ptr(), + 1, // true + ) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if let Some(match_consumer_group_states) = &self.match_consumer_group_states { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_match_consumer_group_states( + native_opts.ptr(), + match_consumer_group_states.as_ptr(), + match_consumer_group_states.len(), + ) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + + if let Some(isolation_level) = self.isolation_level { + let res = unsafe { + rdsys::rd_kafka_AdminOptions_set_isolation_level(native_opts.ptr(), isolation_level) + }; + let res = unsafe { rdsys::rd_kafka_error_code(res) }; + check_rdkafka_invalid_arg(res, err_buf)?; + } + let (tx, rx) = oneshot::channel(); let tx = Box::into_raw(Box::new(tx)) as *mut c_void; unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) }; @@ -1387,46 +1480,79 @@ impl Future for AlterConfigsFuture { // Describe topics handling // +/// The result of a DescribeTopics operation. +pub type TopicDescriptionResult = Result; + +/// Node represents a broker. #[derive(Debug)] pub struct Node { + /// Node id. pub id: i32, + /// Node host. pub host: String, + /// Node port. pub port: u16, + /// (Optional) Node rack id. pub rack: Option, } +/// TopicPartition result type in DescribeTopics result. #[derive(Debug)] pub struct TopicPartitionInfo { + /// Partition id. pub partition: i32, + /// Leader of the partition. pub leader: Node, + /// List of in sync replica nodes. pub isr: Vec, + /// List of replica nodes. pub replicas: Vec, } +/// Apache Kafka ACL operation types. Common type for multiple Admin API functions. #[derive(Debug, Eq, PartialEq)] pub enum AclOperation { + /// Unknown Unknown, + /// In a filter, matches any AclOperation Any, + /// ALL operation All, + /// READ operation Read, + /// WRITE operation Write, + /// CREATE operation Create, + /// DELETE operation Delete, + /// ALTER operation Alter, + /// DESCRIBE operation Describe, + /// CLUSTER_ACTION operation ClusterAction, + /// DESCRIBE_CONFIGS operation DescribeConfigs, + /// ALTER_CONFIGS operation AlterConfigs, + /// IDEMPOTENT_WRITE operation IdempotentWrite, } +/// DescribeTopics result. #[derive(Debug)] pub struct TopicDescription { + /// Topic name. pub name: String, + /// Topic id. pub topic_id: Uuid, + /// Partitions. pub partitions: Vec, + /// Is the topic internal to Kafka? pub is_internal: bool, - pub authorized_operations: Vec, + /// Operations allowed for topic. It may be None if operations were not requested. + pub authorized_operations: Option>, } type NativeTopicCollection = NativePtr; @@ -1441,7 +1567,7 @@ struct DescribeTopicsFuture { } impl Future for DescribeTopicsFuture { - type Output = KafkaResult>; + type Output = KafkaResult>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; @@ -1460,6 +1586,11 @@ impl Future for DescribeTopicsFuture { let mut out = Vec::with_capacity(n); for i in 0..n { let topic_description = unsafe { *topic_descriptions.add(i) }; + + let err = unsafe { + let err = rdsys::rd_kafka_TopicDescription_error(topic_description); + rdsys::rd_kafka_error_code(err) + }; let topic_description = TopicDescription { name: unsafe { cstr_to_owned(rdsys::rd_kafka_TopicDescription_name(topic_description)) @@ -1471,7 +1602,12 @@ impl Future for DescribeTopicsFuture { } != 0, authorized_operations: extract_authorized_operations(topic_description)?, }; - out.push(topic_description); + + if err.is_error() { + out.push(Err((topic_description, err.into()))); + } else { + out.push(Ok(topic_description)); + } } Poll::Ready(Ok(out)) } @@ -1543,15 +1679,19 @@ fn extract_replicas(nodes: *const RDKafkaTopicPartitionInfo) -> Vec { fn extract_authorized_operations( topic_description: *const RDKafkaTopicDescription, -) -> KafkaResult> { +) -> KafkaResult>> { let mut n = 0; let operations = unsafe { rdsys::rd_kafka_TopicDescription_authorized_operations(topic_description, &mut n) }; + + if operations.is_null() { + return Ok(None); + } + let mut out = Vec::with_capacity(n); for i in 0..n { - let operation = unsafe { *operations.add(i) }; - out.push(match operation { + let operation = match unsafe { *operations.add(i) } { RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_UNKNOWN => AclOperation::Unknown, RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ANY => AclOperation::Any, RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALL => AclOperation::All, @@ -1574,10 +1714,11 @@ fn extract_authorized_operations( _ => { return Err(KafkaError::AdminOpCreation(format!( "bogus acl operation in kafka response: {:?}", - operation + unsafe { *operations.add(i) } ))) } - }); + }; + out.push(operation); } - Ok(out) + Ok(Some(out)) } diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 4cf0e9a81..268348563 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -537,6 +537,81 @@ async fn test_configs() { assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]); } +#[tokio::test] +async fn test_describe_topics() { + let admin_client = create_admin_client(); + let opts = AdminOptions::new(); + + // Create a new topic with a single partition whose replication factor is 1. + let first_name = rand_test_topic("first_topic"); + let first_topic = NewTopic::new(&first_name, 1, TopicReplication::Fixed(1)); + let res = admin_client + .create_topics([&first_topic], &opts) + .await + .expect("topic creation failed"); + assert_eq!(res, &[Ok(first_name.clone())]); + + // Describe the created topic and verify its properties. + let res = admin_client + .describe_topics([first_name.as_ref()], &opts) + .await + .expect("describe topics failed"); + assert_eq!(res.len(), 1); + let topic_description = res[0].as_ref().expect("describe topics failed"); + assert_eq!(topic_description.name, first_name); + assert_eq!(topic_description.partitions.len(), 1); + assert_eq!(topic_description.partitions[0].replicas.len(), 1); + assert_eq!(topic_description.is_internal, false); + assert_eq!(topic_description.authorized_operations, None); + + // Create a second topic with 2 partitions whose replication factors are 1. + let second_name = rand_test_topic("second_topic"); + let second_topic = NewTopic::new(&second_name, 2, TopicReplication::Fixed(1)); + let res = admin_client + .create_topics([&second_topic], &opts) + .await + .expect("topic creation failed"); + assert_eq!(res, &[Ok(second_name.clone())]); + + // Describe both topics and verify their properties. + let res = admin_client + .describe_topics([first_name.as_ref(), second_name.as_ref()], &opts) + .await + .expect("describe topics failed"); + + assert_eq!(res.len(), 2); + + let first_topic_description = res[0].as_ref().expect("describe topics failed"); + assert_eq!(first_topic_description.name, first_name); + assert_eq!(first_topic_description.partitions.len(), 1); + assert_eq!(first_topic_description.partitions[0].replicas.len(), 1); + assert_eq!(first_topic_description.is_internal, false); + assert_eq!(first_topic_description.authorized_operations, None); + + let second_topic_description = res[1].as_ref().expect("describe topics failed"); + assert_eq!(second_topic_description.name, second_name); + assert_eq!(second_topic_description.partitions.len(), 2); + assert_eq!(second_topic_description.partitions[0].replicas.len(), 1); + assert_eq!(second_topic_description.partitions[1].replicas.len(), 1); + assert_eq!(second_topic_description.is_internal, false); + assert_eq!(second_topic_description.authorized_operations, None); + + // Include authorized operations in the description options and describe both topics again. + let opts = opts.include_authorized_operations(true); + let res = admin_client + .describe_topics([first_name.as_ref(), second_name.as_ref()], &opts) + .await + .expect("describe topics failed"); + + assert_eq!(res.len(), 2); + + let first_topic_description = res[0].as_ref().expect("describe topics failed"); + assert!(first_topic_description.authorized_operations.is_some()); + + let second_topic_description = res[1].as_ref().expect("describe topics failed"); + assert!(second_topic_description.authorized_operations.is_some()); +} + #[tokio::test] async fn test_groups() { let admin_client = create_admin_client(); From 37fb257ff2351f6a167961c83441015673b19ee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Mon, 16 Sep 2024 16:30:55 +0300 Subject: [PATCH 4/6] Add DescribeCluster support for admin client API --- src/admin.rs | 136 +++++++++++++++++++++++++++++++++++--------- tests/test_admin.rs | 22 +++++++ 2 files changed, 132 insertions(+), 26 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 2ed4609df..418a02aa2 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -432,6 +432,33 @@ impl AdminClient { Ok(rx) } + /// Describe the Kafka cluster. + pub fn describe_cluster( + &self, + opts: &AdminOptions, + ) -> impl Future> { + match self.describe_cluster_inner(opts) { + Ok(rx) => Either::Left(DescribeClusterFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn describe_cluster_inner( + &self, + opts: &AdminOptions, + ) -> KafkaResult> { + let mut err_buf = ErrBuf::new(); + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DescribeCluster( + self.client.native_ptr(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + /// Returns the client underlying this admin client. pub fn inner(&self) -> &Client { &self.client @@ -1591,6 +1618,15 @@ impl Future for DescribeTopicsFuture { let err = rdsys::rd_kafka_TopicDescription_error(topic_description); rdsys::rd_kafka_error_code(err) }; + + let mut n_operations = 0; + let operations = unsafe { + rdsys::rd_kafka_TopicDescription_authorized_operations( + topic_description, + &mut n_operations, + ) + }; + let topic_description = TopicDescription { name: unsafe { cstr_to_owned(rdsys::rd_kafka_TopicDescription_name(topic_description)) @@ -1600,7 +1636,7 @@ impl Future for DescribeTopicsFuture { is_internal: unsafe { rdsys::rd_kafka_TopicDescription_is_internal(topic_description) } != 0, - authorized_operations: extract_authorized_operations(topic_description)?, + authorized_operations: extract_authorized_operations(operations, n_operations)?, }; if err.is_error() { @@ -1629,14 +1665,19 @@ fn extract_partitions( let mut out = Vec::with_capacity(n); for i in 0..n { let partition = unsafe { *partitions.add(i) }; - let leader = extract_node(unsafe { rdsys::rd_kafka_TopicPartitionInfo_leader(partition) }); - let isr = extract_isr(partition); - let replicas = extract_replicas(partition); + + let mut n_isr = 0; + let isr = unsafe { rdsys::rd_kafka_TopicPartitionInfo_isr(partition, &mut n_isr) }; + + let mut n_replicas = 0; + let replicas = + unsafe { rdsys::rd_kafka_TopicPartitionInfo_replicas(partition, &mut n_replicas) }; + out.push(TopicPartitionInfo { partition: unsafe { rdsys::rd_kafka_TopicPartitionInfo_partition(partition) }, - leader, - isr, - replicas, + leader: extract_node(unsafe { rdsys::rd_kafka_TopicPartitionInfo_leader(partition) }), + isr: extract_nodes(isr, n_isr), + replicas: extract_nodes(replicas, n_replicas), }); } out @@ -1657,19 +1698,7 @@ fn extract_node(node: *const RDKafkaNode) -> Node { } } -fn extract_isr(partition: *const RDKafkaTopicPartitionInfo) -> Vec { - let mut n = 0; - let nodes = unsafe { rdsys::rd_kafka_TopicPartitionInfo_isr(partition, &mut n) }; - let mut out = Vec::with_capacity(n); - for i in 0..n { - out.push(extract_node(unsafe { *nodes.add(i) })); - } - out -} - -fn extract_replicas(nodes: *const RDKafkaTopicPartitionInfo) -> Vec { - let mut n = 0; - let nodes = unsafe { rdsys::rd_kafka_TopicPartitionInfo_replicas(nodes, &mut n) }; +fn extract_nodes(nodes: *mut *const RDKafkaNode, n: usize) -> Vec { let mut out = Vec::with_capacity(n); for i in 0..n { out.push(extract_node(unsafe { *nodes.add(i) })); @@ -1678,13 +1707,9 @@ fn extract_replicas(nodes: *const RDKafkaTopicPartitionInfo) -> Vec { } fn extract_authorized_operations( - topic_description: *const RDKafkaTopicDescription, + operations: *const RDKafkaAclOperation, + n: usize, ) -> KafkaResult>> { - let mut n = 0; - let operations = unsafe { - rdsys::rd_kafka_TopicDescription_authorized_operations(topic_description, &mut n) - }; - if operations.is_null() { return Ok(None); } @@ -1722,3 +1747,62 @@ fn extract_authorized_operations( } Ok(Some(out)) } + +// +// Describe cluster handling +// + +/// DescribeCluster result. +#[derive(Debug)] +pub struct ClusterDescription { + /// Cluster id. + pub cluster_id: String, + /// Current controller. + pub controller: Node, + /// Brokers in the cluster. + pub nodes: Vec, + /// Operations allowed for cluster. It may be None if operations were not requested. + pub authorized_operations: Option>, +} + +struct DescribeClusterFuture { + rx: oneshot::Receiver, +} + +impl Future for DescribeClusterFuture { + type Output = KafkaResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DescribeCluster_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "describe cluster request received response of incorrect type ({})", + typ + )))); + } + + let mut n_nodes = 0; + let nodes = unsafe { rdsys::rd_kafka_DescribeCluster_result_nodes(res, &mut n_nodes) }; + + let mut n_operations = 0; + let operations = unsafe { + rdsys::rd_kafka_DescribeCluster_result_authorized_operations(res, &mut n_operations) + }; + + let out = Ok(ClusterDescription { + cluster_id: unsafe { + cstr_to_owned(rdsys::rd_kafka_DescribeCluster_result_cluster_id(res)) + }, + controller: extract_node(unsafe { + rdsys::rd_kafka_DescribeCluster_result_controller(res) + }), + nodes: extract_nodes(nodes, n_nodes), + authorized_operations: extract_authorized_operations(operations, n_operations)?, + }); + + Poll::Ready(out) + } +} diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 268348563..54b4bd01f 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -612,6 +612,28 @@ async fn test_describe_topics() { assert!(second_topic_description.authorized_operations.is_some()); } +#[tokio::test] +async fn test_describe_cluster() { + let admin_client = create_admin_client(); + let opts = AdminOptions::new(); + + // Describe the cluster and verify its properties. + let res = admin_client + .describe_cluster(&opts) + .await + .expect("describe cluster failed"); + assert!(res.nodes.len() > 0); + assert_eq!(res.authorized_operations, None); + + // Describe the cluster with authorized operations and verify the properties. + let opts = opts.include_authorized_operations(true); + let res = admin_client + .describe_cluster(&opts) + .await + .expect("describe cluster failed"); + assert!(res.authorized_operations.is_some()); +} + #[tokio::test] async fn test_groups() { let admin_client = create_admin_client(); From 2c0424271bacf685a05093c900d13acb6fff4917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Mon, 16 Sep 2024 16:41:04 +0300 Subject: [PATCH 5/6] Fix clippy warnings --- src/admin.rs | 2 +- tests/test_admin.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 418a02aa2..d843f9823 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -402,7 +402,7 @@ impl AdminClient { { let topic_names_cstrings = topic_names .into_iter() - .map(|t| CString::new(t)) + .map(CString::new) .collect::, _>>()?; // Don't consume topic_names_cstrings here because pointers become invalid. diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 54b4bd01f..6b1c9121c 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -561,7 +561,7 @@ async fn test_describe_topics() { assert_eq!(topic_description.name, first_name); assert_eq!(topic_description.partitions.len(), 1); assert_eq!(topic_description.partitions[0].replicas.len(), 1); - assert_eq!(topic_description.is_internal, false); + assert!(!topic_description.is_internal); assert_eq!(topic_description.authorized_operations, None); // Create a second topic with 2 partitions whose replication factors are 1. @@ -585,7 +585,7 @@ async fn test_describe_topics() { assert_eq!(first_topic_description.name, first_name); assert_eq!(first_topic_description.partitions.len(), 1); assert_eq!(first_topic_description.partitions[0].replicas.len(), 1); - assert_eq!(first_topic_description.is_internal, false); + assert!(!first_topic_description.is_internal); assert_eq!(first_topic_description.authorized_operations, None); let second_topic_description = res[1].as_ref().expect("describe topics failed"); @@ -593,7 +593,7 @@ async fn test_describe_topics() { assert_eq!(second_topic_description.partitions.len(), 2); assert_eq!(second_topic_description.partitions[0].replicas.len(), 1); assert_eq!(second_topic_description.partitions[1].replicas.len(), 1); - assert_eq!(second_topic_description.is_internal, false); + assert!(!second_topic_description.is_internal); assert_eq!(second_topic_description.authorized_operations, None); // Include authorized operations in the description options and describe both topics again. @@ -622,7 +622,7 @@ async fn test_describe_cluster() { .describe_cluster(&opts) .await .expect("describe cluster failed"); - assert!(res.nodes.len() > 0); + assert!(!res.nodes.is_empty()); assert_eq!(res.authorized_operations, None); // Describe the cluster with authorized operations and verify the properties. From 8f907bde748f7dfde21ae56a9510b8961a83e839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Mon, 16 Sep 2024 16:52:36 +0300 Subject: [PATCH 6/6] Add when values are valid in AdminOptions --- src/admin.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index d843f9823..25decfab5 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -594,7 +594,7 @@ impl AdminOptions { /// If unset (the default), the API calls will return immediately after /// triggering the operation. /// - /// Only the CreateTopics, DeleteTopics, and CreatePartitions API calls + /// Only the CreatePartitions, CreateTopics, DeleteTopics, and DeleteRecords API calls /// respect this option. pub fn operation_timeout>(mut self, timeout: Option) -> Self { self.operation_timeout = timeout.map(Into::into); @@ -605,6 +605,9 @@ impl AdminOptions { /// requested operation. /// /// Defaults to false. + /// + /// Only the CreateTopics, CreatePartitions, AlterConfigs, and IncrementalAlterConfigs + /// API calls respect this option. pub fn validate_only(mut self, validate_only: bool) -> Self { self.validate_only = validate_only; self @@ -622,6 +625,8 @@ impl AdminOptions { /// Whether the broker should return stable offsets (transaction-committed). /// /// Defaults to false. + /// + /// Only the ListConsumerGroupOffsets API call respects this option. pub fn require_stable_offsets(mut self, require_stable_offsets: bool) -> Self { self.require_stable_offsets = require_stable_offsets; self @@ -630,12 +635,17 @@ impl AdminOptions { /// Whether the broker should return authorized operations. /// /// Defaults to false. + /// + /// Only the DescribeConsumerGroups, DescribeCluster, and DescribeTopics API calls + /// respect this option. pub fn include_authorized_operations(mut self, include_authorized_operations: bool) -> Self { self.include_authorized_operations = include_authorized_operations; self } /// List of consumer group states to query for. + /// + /// Only the ListConsumerGroups API call respects this option. pub fn match_consumer_group_states>>( mut self, match_consumer_group_states: T, @@ -646,7 +656,7 @@ impl AdminOptions { /// Isolation Level needed for list Offset to query for. /// - /// Defaults to [`IsolationLevel::ReadUncommitted`] + /// Defaults to read uncommitted. pub fn isolation_level>(mut self, isolation_level: T) -> Self { self.isolation_level = Some(isolation_level.into()); self