Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use regular lock for simplespanprocessor #1612

Merged
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -2,6 +2,10 @@

## vNext

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)

## v0.22.1

### Fixed
5 changes: 2 additions & 3 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
crossbeam-channel = { version = "0.5", optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
@@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

[features]
default = ["trace"]
trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"]
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
59 changes: 27 additions & 32 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
@@ -7,13 +7,15 @@
InstrumentationLibrary,
};
use async_trait::async_trait;
use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
use futures_util::future::BoxFuture;
pub use opentelemetry::testing::trace::TestSpan;
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use std::fmt::{Display, Formatter};
use std::{
fmt::{Display, Formatter},
sync::{Arc, Mutex},
};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
@@ -40,42 +42,41 @@
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TestSpanExporter {
tx_export: Sender<SpanData>,
tx_shutdown: Sender<()>,
pub export_called: Arc<Mutex<bool>>,
pub shutdown_called: Arc<Mutex<bool>>,
}

impl TestSpanExporter {
pub fn new() -> Self {

Check failure on line 52 in opentelemetry-sdk/src/testing/trace/span_exporters.rs

GitHub Actions / lint

you should consider adding a `Default` implementation for `TestSpanExporter`
TestSpanExporter {
export_called: Arc::new(Mutex::new(false)),
shutdown_called: Arc::new(Mutex::new(false)),
}
}

pub fn is_export_called(&self) -> bool {
*self.export_called.lock().unwrap()
}

pub fn is_shutdown_called(&self) -> bool {
*self.shutdown_called.lock().unwrap()
}
}

#[async_trait]
impl SpanExporter for TestSpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
for span_data in batch {
if let Err(err) = self
.tx_export
.send(span_data)
.map_err::<TestExportError, _>(Into::into)
{
return Box::pin(std::future::ready(Err(Into::into(err))));
}
}
fn export(&mut self, _batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
*self.export_called.lock().unwrap() = true;
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
let _ = self.tx_shutdown.send(()); // ignore error
*self.shutdown_called.lock().unwrap() = true;
}
}

pub fn new_test_exporter() -> (TestSpanExporter, Receiver<SpanData>, Receiver<()>) {
let (tx_export, rx_export) = unbounded();
let (tx_shutdown, rx_shutdown) = unbounded();
let exporter = TestSpanExporter {
tx_export,
tx_shutdown,
};
(exporter, rx_export, rx_shutdown)
}

#[derive(Debug)]
pub struct TokioSpanExporter {
tx_export: tokio::sync::mpsc::UnboundedSender<SpanData>,
@@ -139,12 +140,6 @@
}
}

impl<T> From<crossbeam_channel::SendError<T>> for TestExportError {
fn from(err: SendError<T>) -> Self {
TestExportError(err.to_string())
}
}

/// A no-op instance of an [`SpanExporter`].
///
/// [`SpanExporter`]: crate::export::trace::SpanExporter
119 changes: 37 additions & 82 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
@@ -50,7 +50,8 @@
Context,
};
use std::cmp::min;
use std::{env, fmt, str::FromStr, thread, time::Duration};
use std::sync::Mutex;
use std::{env, fmt, str::FromStr, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
@@ -97,61 +98,13 @@
/// soon as they are finished, without any batching.
#[derive(Debug)]
pub struct SimpleSpanProcessor {
message_sender: crossbeam_channel::Sender<Message>,
exporter: Mutex<Box<dyn SpanExporter>>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (message_sender, rx) = crossbeam_channel::unbounded();

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Message::ExportSpan(span) => {
if let Err(err) =
futures_executor::block_on(exporter.export(vec![span]))
{
global::handle_error(err);
}
}
Message::Flush(sender) => {
Self::respond(&sender, "sync");
}
Message::Shutdown(sender) => {
exporter.shutdown();

Self::respond(&sender, "shutdown");

return;
}
}
}

exporter.shutdown();
});

Self { message_sender }
}

fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
let (tx, rx) = crossbeam_channel::bounded(0);

if self.message_sender.send(msg(tx)).is_ok() {
if let Err(err) = rx.recv() {
global::handle_error(TraceError::from(format!(
"error {description} span processor: {err:?}"
)));
}
}
}

fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
if let Err(err) = sender.send(()) {
global::handle_error(TraceError::from(format!(
"could not send {description}: {err:?}"
)));
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
Self {
exporter: Mutex::new(exporter),
}
}
}
@@ -166,34 +119,34 @@
return;
}

if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
let result = self
.exporter
.lock()
.map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));

if let Err(err) = result {
global::handle_error(err);

Check warning on line 129 in opentelemetry-sdk/src/trace/span_processor.rs

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L129

Added line #L129 was not covered by tests
}
}

fn force_flush(&self) -> TraceResult<()> {
self.signal(Message::Flush, "flushing");

// Nothing to flush for simple span processor.
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
self.signal(Message::Shutdown, "shutting down");

Ok(())
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"SimpleSpanProcessor mutex poison at shutdown".into(),
))

Check warning on line 145 in opentelemetry-sdk/src/trace/span_processor.rs

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L143-L145

Added lines #L143 - L145 were not covered by tests
}
}
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning.
// Expecting to address that separately in the future."")
enum Message {
ExportSpan(SpanData),
Flush(crossbeam_channel::Sender<()>),
Shutdown(crossbeam_channel::Sender<()>),
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
@@ -707,6 +660,7 @@

#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
// cargo test trace::span_processor::tests:: --features=trace,testing
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
@@ -715,7 +669,7 @@
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::runtime;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter,
};
use crate::trace::span_processor::{
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
@@ -729,17 +683,17 @@

#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
processor.on_end(new_test_export_span_data());
assert!(rx_export.recv().is_ok());
assert!(exporter.is_export_called());
let _result = processor.shutdown();
}

#[test]
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
@@ -756,15 +710,16 @@
instrumentation_lib: Default::default(),
};
processor.on_end(unsampled);
assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err());
assert!(!exporter.is_export_called());
}

#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
assert!(!exporter.is_shutdown_called());
let _result = processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
assert!(exporter.is_shutdown_called());
}

#[test]
@@ -863,7 +818,7 @@
(OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
// export batch size cannot exceed max queue size
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
@@ -883,7 +838,7 @@
env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});