Skip to content

Commit d1f2ae8

Browse files
authored
Merge branch 'main' into cijothomas/metricsdk-add-moretesetes
2 parents f92e64e + d5bf258 commit d1f2ae8

File tree

5 files changed

+84
-125
lines changed

5 files changed

+84
-125
lines changed

opentelemetry-sdk/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## vNext
44

5+
- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
6+
dependency on crossbeam-channel.
7+
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)
8+
59
## v0.22.1
610

711
### Fixed

opentelemetry-sdk/Cargo.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" }
1414
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
1515
async-std = { workspace = true, features = ["unstable"], optional = true }
1616
async-trait = { workspace = true, optional = true }
17-
crossbeam-channel = { version = "0.5", optional = true }
1817
futures-channel = "0.3"
1918
futures-executor = { workspace = true }
2019
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
@@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
4544

4645
[features]
4746
default = ["trace"]
48-
trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"]
47+
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
4948
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
50-
logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"]
49+
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
5150
logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"]
5251
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
5352
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]

opentelemetry-sdk/src/logs/log_processor.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug {
5252
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
5353
}
5454

55-
/// A [`LogProcessor`] that exports synchronously when logs are emitted.
56-
///
57-
/// # Examples
58-
///
59-
/// Note that the simple processor exports synchronously every time a log is
60-
/// emitted. If you find this limiting, consider the batch processor instead.
55+
/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
56+
/// as they are emitted, without any batching. This is typically useful for
57+
/// debugging and testing. For scenarios requiring higher
58+
/// performance/throughput, consider using [BatchLogProcessor].
6159
#[derive(Debug)]
6260
pub struct SimpleLogProcessor {
6361
exporter: Mutex<Box<dyn LogExporter>>,

opentelemetry-sdk/src/testing/trace/span_exporters.rs

+33-32
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ use crate::{
77
InstrumentationLibrary,
88
};
99
use async_trait::async_trait;
10-
use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
1110
use futures_util::future::BoxFuture;
1211
pub use opentelemetry::testing::trace::TestSpan;
1312
use opentelemetry::trace::{
1413
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
1514
};
16-
use std::fmt::{Display, Formatter};
15+
use std::{
16+
fmt::{Display, Formatter},
17+
sync::{Arc, Mutex},
18+
};
1719

1820
pub fn new_test_export_span_data() -> SpanData {
1921
let config = Config::default();
@@ -40,42 +42,47 @@ pub fn new_test_export_span_data() -> SpanData {
4042
}
4143
}
4244

43-
#[derive(Debug)]
45+
#[derive(Clone, Debug)]
4446
pub struct TestSpanExporter {
45-
tx_export: Sender<SpanData>,
46-
tx_shutdown: Sender<()>,
47+
pub export_called: Arc<Mutex<bool>>,
48+
pub shutdown_called: Arc<Mutex<bool>>,
49+
}
50+
51+
impl Default for TestSpanExporter {
52+
fn default() -> Self {
53+
Self::new()
54+
}
55+
}
56+
57+
impl TestSpanExporter {
58+
pub fn new() -> Self {
59+
TestSpanExporter {
60+
export_called: Arc::new(Mutex::new(false)),
61+
shutdown_called: Arc::new(Mutex::new(false)),
62+
}
63+
}
64+
65+
pub fn is_export_called(&self) -> bool {
66+
*self.export_called.lock().unwrap()
67+
}
68+
69+
pub fn is_shutdown_called(&self) -> bool {
70+
*self.shutdown_called.lock().unwrap()
71+
}
4772
}
4873

4974
#[async_trait]
5075
impl SpanExporter for TestSpanExporter {
51-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
52-
for span_data in batch {
53-
if let Err(err) = self
54-
.tx_export
55-
.send(span_data)
56-
.map_err::<TestExportError, _>(Into::into)
57-
{
58-
return Box::pin(std::future::ready(Err(Into::into(err))));
59-
}
60-
}
76+
fn export(&mut self, _batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
77+
*self.export_called.lock().unwrap() = true;
6178
Box::pin(std::future::ready(Ok(())))
6279
}
6380

6481
fn shutdown(&mut self) {
65-
let _ = self.tx_shutdown.send(()); // ignore error
82+
*self.shutdown_called.lock().unwrap() = true;
6683
}
6784
}
6885

69-
pub fn new_test_exporter() -> (TestSpanExporter, Receiver<SpanData>, Receiver<()>) {
70-
let (tx_export, rx_export) = unbounded();
71-
let (tx_shutdown, rx_shutdown) = unbounded();
72-
let exporter = TestSpanExporter {
73-
tx_export,
74-
tx_shutdown,
75-
};
76-
(exporter, rx_export, rx_shutdown)
77-
}
78-
7986
#[derive(Debug)]
8087
pub struct TokioSpanExporter {
8188
tx_export: tokio::sync::mpsc::UnboundedSender<SpanData>,
@@ -139,12 +146,6 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for TestExportError {
139146
}
140147
}
141148

142-
impl<T> From<crossbeam_channel::SendError<T>> for TestExportError {
143-
fn from(err: SendError<T>) -> Self {
144-
TestExportError(err.to_string())
145-
}
146-
}
147-
148149
/// A no-op instance of an [`SpanExporter`].
149150
///
150151
/// [`SpanExporter`]: crate::export::trace::SpanExporter

opentelemetry-sdk/src/trace/span_processor.rs

+41-84
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ use opentelemetry::{
5050
Context,
5151
};
5252
use std::cmp::min;
53-
use std::{env, fmt, str::FromStr, thread, time::Duration};
53+
use std::sync::Mutex;
54+
use std::{env, fmt, str::FromStr, time::Duration};
5455

5556
/// Delay interval between two consecutive exports.
5657
const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
@@ -93,65 +94,19 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
9394
fn shutdown(&mut self) -> TraceResult<()>;
9495
}
9596

96-
/// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as
97-
/// soon as they are finished, without any batching.
97+
/// A [SpanProcessor] that passes finished spans to the configured
98+
/// `SpanExporter`, as soon as they are finished, without any batching. This is
99+
/// typically useful for debugging and testing. For scenarios requiring higher
100+
/// performance/throughput, consider using [BatchSpanProcessor].
98101
#[derive(Debug)]
99102
pub struct SimpleSpanProcessor {
100-
message_sender: crossbeam_channel::Sender<Message>,
103+
exporter: Mutex<Box<dyn SpanExporter>>,
101104
}
102105

103106
impl SimpleSpanProcessor {
104-
pub(crate) fn new(mut exporter: Box<dyn SpanExporter>) -> Self {
105-
let (message_sender, rx) = crossbeam_channel::unbounded();
106-
107-
let _ = thread::Builder::new()
108-
.name("opentelemetry-exporter".to_string())
109-
.spawn(move || {
110-
while let Ok(msg) = rx.recv() {
111-
match msg {
112-
Message::ExportSpan(span) => {
113-
if let Err(err) =
114-
futures_executor::block_on(exporter.export(vec![span]))
115-
{
116-
global::handle_error(err);
117-
}
118-
}
119-
Message::Flush(sender) => {
120-
Self::respond(&sender, "sync");
121-
}
122-
Message::Shutdown(sender) => {
123-
exporter.shutdown();
124-
125-
Self::respond(&sender, "shutdown");
126-
127-
return;
128-
}
129-
}
130-
}
131-
132-
exporter.shutdown();
133-
});
134-
135-
Self { message_sender }
136-
}
137-
138-
fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) {
139-
let (tx, rx) = crossbeam_channel::bounded(0);
140-
141-
if self.message_sender.send(msg(tx)).is_ok() {
142-
if let Err(err) = rx.recv() {
143-
global::handle_error(TraceError::from(format!(
144-
"error {description} span processor: {err:?}"
145-
)));
146-
}
147-
}
148-
}
149-
150-
fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) {
151-
if let Err(err) = sender.send(()) {
152-
global::handle_error(TraceError::from(format!(
153-
"could not send {description}: {err:?}"
154-
)));
107+
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
108+
Self {
109+
exporter: Mutex::new(exporter),
155110
}
156111
}
157112
}
@@ -166,34 +121,34 @@ impl SpanProcessor for SimpleSpanProcessor {
166121
return;
167122
}
168123

169-
if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) {
170-
global::handle_error(TraceError::from(format!("error processing span {:?}", err)));
124+
let result = self
125+
.exporter
126+
.lock()
127+
.map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
128+
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));
129+
130+
if let Err(err) = result {
131+
global::handle_error(err);
171132
}
172133
}
173134

174135
fn force_flush(&self) -> TraceResult<()> {
175-
self.signal(Message::Flush, "flushing");
176-
136+
// Nothing to flush for simple span processor.
177137
Ok(())
178138
}
179139

180140
fn shutdown(&mut self) -> TraceResult<()> {
181-
self.signal(Message::Shutdown, "shutting down");
182-
183-
Ok(())
141+
if let Ok(mut exporter) = self.exporter.lock() {
142+
exporter.shutdown();
143+
Ok(())
144+
} else {
145+
Err(TraceError::Other(
146+
"SimpleSpanProcessor mutex poison at shutdown".into(),
147+
))
148+
}
184149
}
185150
}
186151

187-
#[derive(Debug)]
188-
#[allow(clippy::large_enum_variant)]
189-
// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning.
190-
// Expecting to address that separately in the future."")
191-
enum Message {
192-
ExportSpan(SpanData),
193-
Flush(crossbeam_channel::Sender<()>),
194-
Shutdown(crossbeam_channel::Sender<()>),
195-
}
196-
197152
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
198153
/// them at a preconfigured interval.
199154
///
@@ -707,6 +662,7 @@ where
707662

708663
#[cfg(all(test, feature = "testing", feature = "trace"))]
709664
mod tests {
665+
// cargo test trace::span_processor::tests:: --features=trace,testing
710666
use super::{
711667
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
712668
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
@@ -715,7 +671,7 @@ mod tests {
715671
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
716672
use crate::runtime;
717673
use crate::testing::trace::{
718-
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
674+
new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter,
719675
};
720676
use crate::trace::span_processor::{
721677
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
@@ -729,17 +685,17 @@ mod tests {
729685

730686
#[test]
731687
fn simple_span_processor_on_end_calls_export() {
732-
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
733-
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
688+
let exporter = TestSpanExporter::new();
689+
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
734690
processor.on_end(new_test_export_span_data());
735-
assert!(rx_export.recv().is_ok());
691+
assert!(exporter.is_export_called());
736692
let _result = processor.shutdown();
737693
}
738694

739695
#[test]
740696
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
741-
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
742-
let processor = SimpleSpanProcessor::new(Box::new(exporter));
697+
let exporter = TestSpanExporter::new();
698+
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
743699
let unsampled = SpanData {
744700
span_context: SpanContext::empty_context(),
745701
parent_span_id: SpanId::INVALID,
@@ -756,15 +712,16 @@ mod tests {
756712
instrumentation_lib: Default::default(),
757713
};
758714
processor.on_end(unsampled);
759-
assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err());
715+
assert!(!exporter.is_export_called());
760716
}
761717

762718
#[test]
763719
fn simple_span_processor_shutdown_calls_shutdown() {
764-
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
765-
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
720+
let exporter = TestSpanExporter::new();
721+
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
722+
assert!(!exporter.is_shutdown_called());
766723
let _result = processor.shutdown();
767-
assert!(rx_shutdown.try_recv().is_ok());
724+
assert!(exporter.is_shutdown_called());
768725
}
769726

770727
#[test]
@@ -863,7 +820,7 @@ mod tests {
863820
(OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
864821
];
865822
temp_env::with_vars(env_vars.clone(), || {
866-
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
823+
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
867824
// export batch size cannot exceed max queue size
868825
assert_eq!(builder.config.max_export_batch_size, 500);
869826
assert_eq!(
@@ -883,7 +840,7 @@ mod tests {
883840
env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));
884841

885842
temp_env::with_vars(env_vars, || {
886-
let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio);
843+
let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio);
887844
assert_eq!(builder.config.max_export_batch_size, 120);
888845
assert_eq!(builder.config.max_queue_size, 120);
889846
});

0 commit comments

Comments
 (0)