Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use regular lock for simplespanprocessor #1612

Merged
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)

## v0.22.1

### Fixed
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
crossbeam-channel = { version = "0.5", optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
Expand Down Expand Up @@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

[features]
default = ["trace"]
trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"]
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"]
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"]
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
Expand Down
65 changes: 33 additions & 32 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
InstrumentationLibrary,
};
use async_trait::async_trait;
use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
use futures_util::future::BoxFuture;
pub use opentelemetry::testing::trace::TestSpan;
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use std::fmt::{Display, Formatter};
use std::{
fmt::{Display, Formatter},
sync::{Arc, Mutex},
};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
Expand All @@ -40,42 +42,47 @@
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TestSpanExporter {
tx_export: Sender<SpanData>,
tx_shutdown: Sender<()>,
pub export_called: Arc<Mutex<bool>>,
pub shutdown_called: Arc<Mutex<bool>>,
}

impl Default for TestSpanExporter {
fn default() -> Self {
Self::new()
}

Check warning on line 54 in opentelemetry-sdk/src/testing/trace/span_exporters.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/testing/trace/span_exporters.rs#L52-L54

Added lines #L52 - L54 were not covered by tests
}

impl TestSpanExporter {
pub fn new() -> Self {
TestSpanExporter {
export_called: Arc::new(Mutex::new(false)),
shutdown_called: Arc::new(Mutex::new(false)),
}
}

pub fn is_export_called(&self) -> bool {
*self.export_called.lock().unwrap()
}

pub fn is_shutdown_called(&self) -> bool {
*self.shutdown_called.lock().unwrap()
}
}

#[async_trait]
impl SpanExporter for TestSpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
for span_data in batch {
if let Err(err) = self
.tx_export
.send(span_data)
.map_err::<TestExportError, _>(Into::into)
{
return Box::pin(std::future::ready(Err(Into::into(err))));
}
}
fn export(&mut self, _batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
*self.export_called.lock().unwrap() = true;
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
let _ = self.tx_shutdown.send(()); // ignore error
*self.shutdown_called.lock().unwrap() = true;
}
}

pub fn new_test_exporter() -> (TestSpanExporter, Receiver<SpanData>, Receiver<()>) {
let (tx_export, rx_export) = unbounded();
let (tx_shutdown, rx_shutdown) = unbounded();
let exporter = TestSpanExporter {
tx_export,
tx_shutdown,
};
(exporter, rx_export, rx_shutdown)
}

#[derive(Debug)]
pub struct TokioSpanExporter {
tx_export: tokio::sync::mpsc::UnboundedSender<SpanData>,
Expand Down Expand Up @@ -139,12 +146,6 @@
}
}

impl<T> From<crossbeam_channel::SendError<T>> for TestExportError {
fn from(err: SendError<T>) -> Self {
TestExportError(err.to_string())
}
}

/// A no-op instance of an [`SpanExporter`].
///
/// [`SpanExporter`]: crate::export::trace::SpanExporter
Expand Down
119 changes: 37 additions & 82 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
Context,
};
use std::cmp::min;
use std::{env, fmt, str::FromStr, thread, time::Duration};
use std::sync::Mutex;
use std::{env, fmt, str::FromStr, time::Duration};

/// Delay interval between two consecutive exports.
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
Expand Down Expand Up @@ -97,61 +98,13 @@
/// soon as they are finished, without any batching.
#[derive(Debug)]
pub struct SimpleSpanProcessor {
message_sender: crossbeam_channel::Sender<Message>,
exporter: Mutex<Box<dyn SpanExporter>>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
let (message_sender, rx) = crossbeam_channel::unbounded();

let _ = thread::Builder::new()
.name("opentelemetry-exporter".to_string())
.spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Message::ExportSpan(span) => {
if let Err(err) =
futures_executor::block_on(exporter.export(vec![span]))
{
global::handle_error(err);
}
}
Message::Flush(sender) => {
Self::respond(&sender, "sync");
}
Message::Shutdown(sender) => {
exporter.shutdown();

Self::respond(&sender, "shutdown");

return;
}
}
}

exporter.shutdown();
});

Self { message_sender }
}

fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
let (tx, rx) = crossbeam_channel::bounded(0);

if self.message_sender.send(msg(tx)).is_ok() {
if let Err(err) = rx.recv() {
global::handle_error(TraceError::from(format!(
"error {description} span processor: {err:?}"
)));
}
}
}

fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
if let Err(err) = sender.send(()) {
global::handle_error(TraceError::from(format!(
"could not send {description}: {err:?}"
)));
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
Self {
exporter: Mutex::new(exporter),
}
}
}
Expand All @@ -166,34 +119,34 @@
return;
}

if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
let result = self
.exporter
.lock()
.map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));

if let Err(err) = result {
global::handle_error(err);

Check warning on line 129 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L129

Added line #L129 was not covered by tests
}
}

fn force_flush(&self) -> TraceResult<()> {
self.signal(Message::Flush, "flushing");

// Nothing to flush for simple span processor.
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
self.signal(Message::Shutdown, "shutting down");

Ok(())
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"SimpleSpanProcessor mutex poison at shutdown".into(),
))

Check warning on line 145 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L143-L145

Added lines #L143 - L145 were not covered by tests
}
}
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning.
// Expecting to address that separately in the future."")
enum Message {
ExportSpan(SpanData),
Flush(crossbeam_channel::Sender<()>),
Shutdown(crossbeam_channel::Sender<()>),
}

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
///
Expand Down Expand Up @@ -707,6 +660,7 @@

#[cfg(all(test, feature = "testing", feature = "trace"))]
mod tests {
// cargo test trace::span_processor::tests:: --features=trace,testing
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
Expand All @@ -715,7 +669,7 @@
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::runtime;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter,
};
use crate::trace::span_processor::{
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
Expand All @@ -729,17 +683,17 @@

#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
processor.on_end(new_test_export_span_data());
assert!(rx_export.recv().is_ok());
assert!(exporter.is_export_called());
let _result = processor.shutdown();
}

#[test]
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
Expand All @@ -756,15 +710,16 @@
instrumentation_lib: Default::default(),
};
processor.on_end(unsampled);
assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err());
assert!(!exporter.is_export_called());
}

#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
let exporter = TestSpanExporter::new();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
assert!(!exporter.is_shutdown_called());
let _result = processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
assert!(exporter.is_shutdown_called());
}

#[test]
Expand Down Expand Up @@ -863,7 +818,7 @@
(OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
// export batch size cannot exceed max queue size
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
Expand All @@ -883,7 +838,7 @@
env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));

temp_env::with_vars(env_vars, || {
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
Expand Down
Loading