Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use regular lock for simplespanprocessor #1612

Merged
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)

## v0.22.1

### Fixed
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
crossbeam-channel = { version = "0.5", optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
Expand Down Expand Up @@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

[features]
default = ["trace"]
trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"]
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
Expand Down
10 changes: 4 additions & 6 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug {
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
}

/// A [`LogProcessor`] that exports synchronously when logs are emitted.
///
/// # Examples
///
/// Note that the simple processor exports synchronously every time a log is
/// emitted. If you find this limiting, consider the batch processor instead.
/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
/// as they are emitted, without any batching. This is typically useful for
/// debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchSpanProcessor].
#[derive(Debug)]
pub struct SimpleLogProcessor {
exporter: Mutex<Box<dyn LogExporter>>,
Expand Down
65 changes: 33 additions & 32 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
InstrumentationLibrary,
};
use async_trait::async_trait;
use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
use futures_util::future::BoxFuture;
pub use opentelemetry::testing::trace::TestSpan;
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use std::fmt::{Display, Formatter};
use std::{
fmt::{Display, Formatter},
sync::{Arc, Mutex},
};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
Expand All @@ -40,42 +42,47 @@
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TestSpanExporter {
tx_export: Sender<SpanData>,
tx_shutdown: Sender<()>,
pub export_called: Arc<Mutex<bool>>,
pub shutdown_called: Arc<Mutex<bool>>,
}

impl Default for TestSpanExporter {
fn default() -> Self {
Self::new()
}

Check warning on line 54 in opentelemetry-sdk/src/testing/trace/span_exporters.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/testing/trace/span_exporters.rs#L52-L54

Added lines #L52 - L54 were not covered by tests
}

impl TestSpanExporter {
pub fn new() -> Self {
TestSpanExporter {
export_called: Arc::new(Mutex::new(false)),
shutdown_called: Arc::new(Mutex::new(false)),
}
}

pub fn is_export_called(&self) -> bool {
*self.export_called.lock().unwrap()
}

pub fn is_shutdown_called(&self) -> bool {
*self.shutdown_called.lock().unwrap()
}
}

#[async_trait]
impl SpanExporter for TestSpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
for span_data in batch {
if let Err(err) = self
.tx_export
.send(span_data)
.map_err::<TestExportError, _>(Into::into)
{
return Box::pin(std::future::ready(Err(Into::into(err))));
}
}
fn export(&mut self, _batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
*self.export_called.lock().unwrap() = true;
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
let _ = self.tx_shutdown.send(()); // ignore error
*self.shutdown_called.lock().unwrap() = true;
}
}

pub fn new_test_exporter() -> (TestSpanExporter, Receiver<SpanData>, Receiver<()>) {
let (tx_export, rx_export) = unbounded();
let (tx_shutdown, rx_shutdown) = unbounded();
let exporter = TestSpanExporter {
tx_export,
tx_shutdown,
};
(exporter, rx_export, rx_shutdown)
}

#[derive(Debug)]
pub struct TokioSpanExporter {
tx_export: tokio::sync::mpsc::UnboundedSender<SpanData>,
Expand Down Expand Up @@ -139,12 +146,6 @@
}
}

impl<T> From<crossbeam_channel::SendError<T>> for TestExportError {
fn from(err: SendError<T>) -> Self {
TestExportError(err.to_string())
}
}

/// A no-op instance of an [`SpanExporter`].
///
/// [`SpanExporter`]: crate::export::trace::SpanExporter
Expand Down
125 changes: 41 additions & 84 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
Context,
};
use std::cmp::min;
use std::{env, fmt, str::FromStr, thread, time::Duration};
use std::sync::Mutex;
use std::{env, fmt, str::FromStr, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
Expand Down Expand Up @@ -93,65 +94,19 @@
fn shutdown(&mut self) -> TraceResult<()>;
}

/// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as
/// soon as they are finished, without any batching.
/// A [SpanProcessor] that passes finished spans to the configured
/// `SpanExporter`, as soon as they are finished, without any batching. This is
/// typically useful for debugging and testing. For scenarios requiring higher
/// performance/throughput, consider using [BatchSpanProcessor].
#[derive(Debug)]
pub struct SimpleSpanProcessor {
message_sender: crossbeam_channel::Sender<Message>,
exporter: Mutex<Box<dyn SpanExporter>>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (message_sender, rx) = crossbeam_channel::unbounded();

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Message::ExportSpan(span) => {
if let Err(err) =
futures_executor::block_on(exporter.export(vec![span]))
{
global::handle_error(err);
}
}
Message::Flush(sender) => {
Self::respond(&sender, "sync");
}
Message::Shutdown(sender) => {
exporter.shutdown();

Self::respond(&sender, "shutdown");

return;
}
}
}

exporter.shutdown();
});

Self { message_sender }
}

fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
let (tx, rx) = crossbeam_channel::bounded(0);

if self.message_sender.send(msg(tx)).is_ok() {
if let Err(err) = rx.recv() {
global::handle_error(TraceError::from(format!(
"error {description} span processor: {err:?}"
)));
}
}
}

fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
if let Err(err) = sender.send(()) {
global::handle_error(TraceError::from(format!(
"could not send {description}: {err:?}"
)));
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
Self {
exporter: Mutex::new(exporter),
}
}
}
Expand All @@ -166,34 +121,34 @@
return;
}

if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
let result = self
.exporter
.lock()
.map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));

if let Err(err) = result {
global::handle_error(err);

Check warning on line 131 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L131

Added line #L131 was not covered by tests
}
}

fn force_flush(&self) -> TraceResult<()> {
self.signal(Message::Flush, "flushing");

// Nothing to flush for simple span processor.
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
self.signal(Message::Shutdown, "shutting down");

Ok(())
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"SimpleSpanProcessor mutex poison at shutdown".into(),
))

Check warning on line 147 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L145-L147

Added lines #L145 - L147 were not covered by tests
}
}
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning.
// Expecting to address that separately in the future."")
enum Message {
ExportSpan(SpanData),
Flush(crossbeam_channel::Sender<()>),
Shutdown(crossbeam_channel::Sender<()>),
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
Expand Down Expand Up @@ -707,6 +662,7 @@

#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
// cargo test trace::span_processor::tests:: --features=trace,testing
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
Expand All @@ -715,7 +671,7 @@
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::runtime;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter,
};
use crate::trace::span_processor::{
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
Expand All @@ -729,17 +685,17 @@

#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
processor.on_end(new_test_export_span_data());
assert!(rx_export.recv().is_ok());
assert!(exporter.is_export_called());
let _result = processor.shutdown();
}

#[test]
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
Expand All @@ -756,15 +712,16 @@
instrumentation_lib: Default::default(),
};
processor.on_end(unsampled);
assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err());
assert!(!exporter.is_export_called());
}

#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
assert!(!exporter.is_shutdown_called());
let _result = processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
assert!(exporter.is_shutdown_called());
}

#[test]
Expand Down Expand Up @@ -863,7 +820,7 @@
(OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
// export batch size cannot exceed max queue size
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
Expand All @@ -883,7 +840,7 @@
env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
Expand Down
Loading