From 280baa3b9910609f2b7b044ab1ed732bd4a74d3f Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 12:30:23 -0700 Subject: [PATCH 1/5] Use regular lock for simplespanprocessor --- opentelemetry-sdk/Cargo.toml | 5 +- .../src/testing/trace/span_exporters.rs | 59 ++++----- opentelemetry-sdk/src/trace/span_processor.rs | 119 ++++++------------ 3 files changed, 66 insertions(+), 117 deletions(-) diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index cc6f010683..cabcc84476 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" } opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true } async-std = { workspace = true, features = ["unstable"], optional = true } async-trait = { workspace = true, optional = true } -crossbeam-channel = { version = "0.5", optional = true } futures-channel = "0.3" futures-executor = { workspace = true } futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] } @@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] } [features] default = ["trace"] -trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"] +trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"] -logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"] +logs = ["opentelemetry/logs", "async-trait", "serde_json"] logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"] metrics = ["opentelemetry/metrics", "glob", "async-trait"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 92666e229f..2aaaed1bcc 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -7,13 +7,15 @@ use crate::{ InstrumentationLibrary, }; use async_trait::async_trait; -use crossbeam_channel::{unbounded, Receiver, SendError, Sender}; use futures_util::future::BoxFuture; pub use opentelemetry::testing::trace::TestSpan; use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; -use std::fmt::{Display, Formatter}; +use std::{ + fmt::{Display, Formatter}, + sync::{Arc, Mutex}, +}; pub fn new_test_export_span_data() -> SpanData { let config = Config::default(); @@ -40,42 +42,41 @@ pub fn new_test_export_span_data() -> SpanData { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TestSpanExporter { - tx_export: Sender, - tx_shutdown: Sender<()>, + pub export_called: Arc>, + pub shutdown_called: Arc>, +} + +impl TestSpanExporter { + pub fn new() -> Self { + TestSpanExporter { + export_called: Arc::new(Mutex::new(false)), + shutdown_called: Arc::new(Mutex::new(false)), + } + } + + pub fn is_export_called(&self) -> bool { + *self.export_called.lock().unwrap() + } + + pub fn is_shutdown_called(&self) -> bool { + *self.shutdown_called.lock().unwrap() + } } #[async_trait] impl SpanExporter for TestSpanExporter { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - for span_data in batch { - if let Err(err) = self - .tx_export - .send(span_data) - .map_err::(Into::into) - { - return Box::pin(std::future::ready(Err(Into::into(err)))); - } - } + fn export(&mut self, _batch: Vec) -> BoxFuture<'static, ExportResult> { + *self.export_called.lock().unwrap() = true; Box::pin(std::future::ready(Ok(()))) } fn shutdown(&mut self) { - let _ = self.tx_shutdown.send(()); // ignore error + *self.shutdown_called.lock().unwrap() = true; } } -pub fn new_test_exporter() -> (TestSpanExporter, Receiver, Receiver<()>) { - let (tx_export, rx_export) = unbounded(); - let (tx_shutdown, rx_shutdown) = unbounded(); - let exporter = TestSpanExporter { - tx_export, - tx_shutdown, - }; - (exporter, rx_export, rx_shutdown) -} - #[derive(Debug)] pub struct TokioSpanExporter { tx_export: tokio::sync::mpsc::UnboundedSender, @@ -139,12 +140,6 @@ impl From> for TestExportError { } } -impl From> for TestExportError { - fn from(err: SendError) -> Self { - TestExportError(err.to_string()) - } -} - /// A no-op instance of an [`SpanExporter`]. /// /// [`SpanExporter`]: crate::export::trace::SpanExporter diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index c9230a1603..85ee4d2b71 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -50,7 +50,8 @@ use opentelemetry::{ Context, }; use std::cmp::min; -use std::{env, fmt, str::FromStr, thread, time::Duration}; +use std::sync::Mutex; +use std::{env, fmt, str::FromStr, time::Duration}; /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; @@ -97,61 +98,13 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// soon as they are finished, without any batching. #[derive(Debug)] pub struct SimpleSpanProcessor { - message_sender: crossbeam_channel::Sender, + exporter: Mutex>, } impl SimpleSpanProcessor { - pub(crate) fn new(mut exporter: Box) -> Self { - let (message_sender, rx) = crossbeam_channel::unbounded(); - - let _ = thread::Builder::new() - .name("opentelemetry-exporter".to_string()) - .spawn(move || { - while let Ok(msg) = rx.recv() { - match msg { - Message::ExportSpan(span) => { - if let Err(err) = - futures_executor::block_on(exporter.export(vec![span])) - { - global::handle_error(err); - } - } - Message::Flush(sender) => { - Self::respond(&sender, "sync"); - } - Message::Shutdown(sender) => { - exporter.shutdown(); - - Self::respond(&sender, "shutdown"); - - return; - } - } - } - - exporter.shutdown(); - }); - - Self { message_sender } - } - - fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) { - let (tx, rx) = crossbeam_channel::bounded(0); - - if self.message_sender.send(msg(tx)).is_ok() { - if let Err(err) = rx.recv() { - global::handle_error(TraceError::from(format!( - "error {description} span processor: {err:?}" - ))); - } - } - } - - fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) { - if let Err(err) = sender.send(()) { - global::handle_error(TraceError::from(format!( - "could not send {description}: {err:?}" - ))); + pub(crate) fn new(exporter: Box) -> Self { + Self { + exporter: Mutex::new(exporter), } } } @@ -166,34 +119,34 @@ impl SpanProcessor for SimpleSpanProcessor { return; } - if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) { - global::handle_error(TraceError::from(format!("error processing span {:?}", err))); + let result = self + .exporter + .lock() + .map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into())) + .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span]))); + + if let Err(err) = result { + global::handle_error(err); } } fn force_flush(&self) -> TraceResult<()> { - self.signal(Message::Flush, "flushing"); - + // Nothing to flush for simple span processor. Ok(()) } fn shutdown(&mut self) -> TraceResult<()> { - self.signal(Message::Shutdown, "shutting down"); - - Ok(()) + if let Ok(mut exporter) = self.exporter.lock() { + exporter.shutdown(); + Ok(()) + } else { + Err(TraceError::Other( + "SimpleSpanProcessor mutex poison at shutdown".into(), + )) + } } } -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning. -// Expecting to address that separately in the future."") -enum Message { - ExportSpan(SpanData), - Flush(crossbeam_channel::Sender<()>), - Shutdown(crossbeam_channel::Sender<()>), -} - /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. /// @@ -707,6 +660,7 @@ where #[cfg(all(test, feature = "testing", feature = "trace"))] mod tests { + // cargo test trace::span_processor::tests:: --features=trace,testing use super::{ BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, @@ -715,7 +669,7 @@ mod tests { use crate::export::trace::{ExportResult, SpanData, SpanExporter}; use crate::runtime; use crate::testing::trace::{ - new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, + new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter, }; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, @@ -729,17 +683,17 @@ mod tests { #[test] fn simple_span_processor_on_end_calls_export() { - let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); - let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); processor.on_end(new_test_export_span_data()); - assert!(rx_export.recv().is_ok()); + assert!(exporter.is_export_called()); let _result = processor.shutdown(); } #[test] fn simple_span_processor_on_end_skips_export_if_not_sampled() { - let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); - let processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); let unsampled = SpanData { span_context: SpanContext::empty_context(), parent_span_id: SpanId::INVALID, @@ -756,15 +710,16 @@ mod tests { instrumentation_lib: Default::default(), }; processor.on_end(unsampled); - assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err()); + assert!(!exporter.is_export_called()); } #[test] fn simple_span_processor_shutdown_calls_shutdown() { - let (exporter, _rx_export, rx_shutdown) = new_test_exporter(); - let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); + assert!(!exporter.is_shutdown_called()); let _result = processor.shutdown(); - assert!(rx_shutdown.try_recv().is_ok()); + assert!(exporter.is_shutdown_called()); } #[test] @@ -863,7 +818,7 @@ mod tests { (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), ]; temp_env::with_vars(env_vars.clone(), || { - let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio); + let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio); // export batch size cannot exceed max queue size assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -883,7 +838,7 @@ mod tests { env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); temp_env::with_vars(env_vars, || { - let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio); + let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); }); From 2f775e00c8627000afab977a6dc68e6ff3684e60 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 12:42:52 -0700 Subject: [PATCH 2/5] Add changelog --- opentelemetry-sdk/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 61d2dbb0b7..f5880ce3ae 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,10 @@ ## vNext +- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed + dependency on crossbeam-channel. + [1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files) + ## v0.22.1 ### Fixed From 36b5aa890006a4ffb80323adca0491391b0d75a1 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 16:09:25 -0700 Subject: [PATCH 3/5] clippy suggestion --- opentelemetry-sdk/src/testing/trace/span_exporters.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 2aaaed1bcc..aa5ec2d651 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -48,6 +48,12 @@ pub struct TestSpanExporter { pub shutdown_called: Arc>, } +impl Default for TestSpanExporter { + fn default() -> Self { + Self::new() + } +} + impl TestSpanExporter { pub fn new() -> Self { TestSpanExporter { From d7553e7b83fad21c5dac465ba1efb9deaa20a118 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 22:27:13 -0700 Subject: [PATCH 4/5] add doc warning about simple --- opentelemetry-sdk/src/logs/log_processor.rs | 10 ++++------ opentelemetry-sdk/src/trace/span_processor.rs | 6 ++++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index bda7730283..21572aeba3 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug { fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; } -/// A [`LogProcessor`] that exports synchronously when logs are emitted. -/// -/// # Examples -/// -/// Note that the simple processor exports synchronously every time a log is -/// emitted. If you find this limiting, consider the batch processor instead. +/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon +/// as they are emitted, without any batching. This is typically useful for +/// debugging and testing. For scenarios requiring higher +/// performance/throughput, consider using [BatchSpanProcessor]. #[derive(Debug)] pub struct SimpleLogProcessor { exporter: Mutex>, diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 85ee4d2b71..1f8ec47c03 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -94,8 +94,10 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { fn shutdown(&mut self) -> TraceResult<()>; } -/// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as -/// soon as they are finished, without any batching. +/// A [SpanProcessor] that passes finished spans to the configured +/// `SpanExporter`, as soon as they are finished, without any batching. This is +/// typically useful for debugging and testing. For scenarios requiring higher +/// performance/throughput, consider using [BatchSpanProcessor]. #[derive(Debug)] pub struct SimpleSpanProcessor { exporter: Mutex>, From 7596bac713cae169cc882f3eb332afd38f7f238d Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 22:32:09 -0700 Subject: [PATCH 5/5] fix link --- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 21572aeba3..57ace8dad6 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -55,7 +55,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon /// as they are emitted, without any batching. This is typically useful for /// debugging and testing. For scenarios requiring higher -/// performance/throughput, consider using [BatchSpanProcessor]. +/// performance/throughput, consider using [BatchLogProcessor]. #[derive(Debug)] pub struct SimpleLogProcessor { exporter: Mutex>,