@@ -50,7 +50,8 @@ use opentelemetry::{
50
50
Context ,
51
51
} ;
52
52
use std:: cmp:: min;
53
- use std:: { env, fmt, str:: FromStr , thread, time:: Duration } ;
53
+ use std:: sync:: Mutex ;
54
+ use std:: { env, fmt, str:: FromStr , time:: Duration } ;
54
55
55
56
/// Delay interval between two consecutive exports.
56
57
const OTEL_BSP_SCHEDULE_DELAY : & str = "OTEL_BSP_SCHEDULE_DELAY" ;
@@ -93,65 +94,19 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
93
94
fn shutdown ( & mut self ) -> TraceResult < ( ) > ;
94
95
}
95
96
96
- /// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as
97
- /// soon as they are finished, without any batching.
97
+ /// A [SpanProcessor] that passes finished spans to the configured
98
+ /// `SpanExporter`, as soon as they are finished, without any batching. This is
99
+ /// typically useful for debugging and testing. For scenarios requiring higher
100
+ /// performance/throughput, consider using [BatchSpanProcessor].
98
101
#[ derive( Debug ) ]
99
102
pub struct SimpleSpanProcessor {
100
- message_sender : crossbeam_channel :: Sender < Message > ,
103
+ exporter : Mutex < Box < dyn SpanExporter > > ,
101
104
}
102
105
103
106
impl SimpleSpanProcessor {
104
- pub ( crate ) fn new ( mut exporter : Box < dyn SpanExporter > ) -> Self {
105
- let ( message_sender, rx) = crossbeam_channel:: unbounded ( ) ;
106
-
107
- let _ = thread:: Builder :: new ( )
108
- . name ( "opentelemetry-exporter" . to_string ( ) )
109
- . spawn ( move || {
110
- while let Ok ( msg) = rx. recv ( ) {
111
- match msg {
112
- Message :: ExportSpan ( span) => {
113
- if let Err ( err) =
114
- futures_executor:: block_on ( exporter. export ( vec ! [ span] ) )
115
- {
116
- global:: handle_error ( err) ;
117
- }
118
- }
119
- Message :: Flush ( sender) => {
120
- Self :: respond ( & sender, "sync" ) ;
121
- }
122
- Message :: Shutdown ( sender) => {
123
- exporter. shutdown ( ) ;
124
-
125
- Self :: respond ( & sender, "shutdown" ) ;
126
-
127
- return ;
128
- }
129
- }
130
- }
131
-
132
- exporter. shutdown ( ) ;
133
- } ) ;
134
-
135
- Self { message_sender }
136
- }
137
-
138
- fn signal ( & self , msg : fn ( crossbeam_channel:: Sender < ( ) > ) -> Message , description : & str ) {
139
- let ( tx, rx) = crossbeam_channel:: bounded ( 0 ) ;
140
-
141
- if self . message_sender . send ( msg ( tx) ) . is_ok ( ) {
142
- if let Err ( err) = rx. recv ( ) {
143
- global:: handle_error ( TraceError :: from ( format ! (
144
- "error {description} span processor: {err:?}"
145
- ) ) ) ;
146
- }
147
- }
148
- }
149
-
150
- fn respond ( sender : & crossbeam_channel:: Sender < ( ) > , description : & str ) {
151
- if let Err ( err) = sender. send ( ( ) ) {
152
- global:: handle_error ( TraceError :: from ( format ! (
153
- "could not send {description}: {err:?}"
154
- ) ) ) ;
107
+ pub ( crate ) fn new ( exporter : Box < dyn SpanExporter > ) -> Self {
108
+ Self {
109
+ exporter : Mutex :: new ( exporter) ,
155
110
}
156
111
}
157
112
}
@@ -166,34 +121,34 @@ impl SpanProcessor for SimpleSpanProcessor {
166
121
return ;
167
122
}
168
123
169
- if let Err ( err) = self . message_sender . send ( Message :: ExportSpan ( span) ) {
170
- global:: handle_error ( TraceError :: from ( format ! ( "error processing span {:?}" , err) ) ) ;
124
+ let result = self
125
+ . exporter
126
+ . lock ( )
127
+ . map_err ( |_| TraceError :: Other ( "SimpleSpanProcessor mutex poison" . into ( ) ) )
128
+ . and_then ( |mut exporter| futures_executor:: block_on ( exporter. export ( vec ! [ span] ) ) ) ;
129
+
130
+ if let Err ( err) = result {
131
+ global:: handle_error ( err) ;
171
132
}
172
133
}
173
134
174
135
fn force_flush ( & self ) -> TraceResult < ( ) > {
175
- self . signal ( Message :: Flush , "flushing" ) ;
176
-
136
+ // Nothing to flush for simple span processor.
177
137
Ok ( ( ) )
178
138
}
179
139
180
140
fn shutdown ( & mut self ) -> TraceResult < ( ) > {
181
- self . signal ( Message :: Shutdown , "shutting down" ) ;
182
-
183
- Ok ( ( ) )
141
+ if let Ok ( mut exporter) = self . exporter . lock ( ) {
142
+ exporter. shutdown ( ) ;
143
+ Ok ( ( ) )
144
+ } else {
145
+ Err ( TraceError :: Other (
146
+ "SimpleSpanProcessor mutex poison at shutdown" . into ( ) ,
147
+ ) )
148
+ }
184
149
}
185
150
}
186
151
187
- #[ derive( Debug ) ]
188
- #[ allow( clippy:: large_enum_variant) ]
189
- // reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning.
190
- // Expecting to address that separately in the future."")
191
- enum Message {
192
- ExportSpan ( SpanData ) ,
193
- Flush ( crossbeam_channel:: Sender < ( ) > ) ,
194
- Shutdown ( crossbeam_channel:: Sender < ( ) > ) ,
195
- }
196
-
197
152
/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
198
153
/// them at a preconfigured interval.
199
154
///
@@ -707,6 +662,7 @@ where
707
662
708
663
#[ cfg( all( test, feature = "testing" , feature = "trace" ) ) ]
709
664
mod tests {
665
+ // cargo test trace::span_processor::tests:: --features=trace,testing
710
666
use super :: {
711
667
BatchSpanProcessor , SimpleSpanProcessor , SpanProcessor , OTEL_BSP_EXPORT_TIMEOUT ,
712
668
OTEL_BSP_MAX_EXPORT_BATCH_SIZE , OTEL_BSP_MAX_QUEUE_SIZE , OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT ,
@@ -715,7 +671,7 @@ mod tests {
715
671
use crate :: export:: trace:: { ExportResult , SpanData , SpanExporter } ;
716
672
use crate :: runtime;
717
673
use crate :: testing:: trace:: {
718
- new_test_export_span_data, new_test_exporter , new_tokio_test_exporter ,
674
+ new_test_export_span_data, new_tokio_test_exporter , TestSpanExporter ,
719
675
} ;
720
676
use crate :: trace:: span_processor:: {
721
677
OTEL_BSP_EXPORT_TIMEOUT_DEFAULT , OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT ,
@@ -729,17 +685,17 @@ mod tests {
729
685
730
686
#[ test]
731
687
fn simple_span_processor_on_end_calls_export ( ) {
732
- let ( exporter, rx_export , _rx_shutdown ) = new_test_exporter ( ) ;
733
- let mut processor = SimpleSpanProcessor :: new ( Box :: new ( exporter) ) ;
688
+ let exporter = TestSpanExporter :: new ( ) ;
689
+ let mut processor = SimpleSpanProcessor :: new ( Box :: new ( exporter. clone ( ) ) ) ;
734
690
processor. on_end ( new_test_export_span_data ( ) ) ;
735
- assert ! ( rx_export . recv ( ) . is_ok ( ) ) ;
691
+ assert ! ( exporter . is_export_called ( ) ) ;
736
692
let _result = processor. shutdown ( ) ;
737
693
}
738
694
739
695
#[ test]
740
696
fn simple_span_processor_on_end_skips_export_if_not_sampled ( ) {
741
- let ( exporter, rx_export , _rx_shutdown ) = new_test_exporter ( ) ;
742
- let processor = SimpleSpanProcessor :: new ( Box :: new ( exporter) ) ;
697
+ let exporter = TestSpanExporter :: new ( ) ;
698
+ let processor = SimpleSpanProcessor :: new ( Box :: new ( exporter. clone ( ) ) ) ;
743
699
let unsampled = SpanData {
744
700
span_context : SpanContext :: empty_context ( ) ,
745
701
parent_span_id : SpanId :: INVALID ,
@@ -756,15 +712,16 @@ mod tests {
756
712
instrumentation_lib : Default :: default ( ) ,
757
713
} ;
758
714
processor. on_end ( unsampled) ;
759
- assert ! ( rx_export . recv_timeout ( Duration :: from_millis ( 100 ) ) . is_err ( ) ) ;
715
+ assert ! ( !exporter . is_export_called ( ) ) ;
760
716
}
761
717
762
718
#[ test]
763
719
fn simple_span_processor_shutdown_calls_shutdown ( ) {
764
- let ( exporter, _rx_export, rx_shutdown) = new_test_exporter ( ) ;
765
- let mut processor = SimpleSpanProcessor :: new ( Box :: new ( exporter) ) ;
720
+ let exporter = TestSpanExporter :: new ( ) ;
721
+ let mut processor = SimpleSpanProcessor :: new ( Box :: new ( exporter. clone ( ) ) ) ;
722
+ assert ! ( !exporter. is_shutdown_called( ) ) ;
766
723
let _result = processor. shutdown ( ) ;
767
- assert ! ( rx_shutdown . try_recv ( ) . is_ok ( ) ) ;
724
+ assert ! ( exporter . is_shutdown_called ( ) ) ;
768
725
}
769
726
770
727
#[ test]
@@ -863,7 +820,7 @@ mod tests {
863
820
( OTEL_BSP_EXPORT_TIMEOUT , Some ( "2046" ) ) ,
864
821
] ;
865
822
temp_env:: with_vars ( env_vars. clone ( ) , || {
866
- let builder = BatchSpanProcessor :: builder ( new_test_exporter ( ) . 0 , runtime:: Tokio ) ;
823
+ let builder = BatchSpanProcessor :: builder ( TestSpanExporter :: new ( ) , runtime:: Tokio ) ;
867
824
// export batch size cannot exceed max queue size
868
825
assert_eq ! ( builder. config. max_export_batch_size, 500 ) ;
869
826
assert_eq ! (
@@ -883,7 +840,7 @@ mod tests {
883
840
env_vars. push ( ( OTEL_BSP_MAX_QUEUE_SIZE , Some ( "120" ) ) ) ;
884
841
885
842
temp_env:: with_vars ( env_vars, || {
886
- let builder = BatchSpanProcessor :: builder ( new_test_exporter ( ) . 0 , runtime:: Tokio ) ;
843
+ let builder = BatchSpanProcessor :: builder ( TestSpanExporter :: new ( ) , runtime:: Tokio ) ;
887
844
assert_eq ! ( builder. config. max_export_batch_size, 120 ) ;
888
845
assert_eq ! ( builder. config. max_queue_size, 120 ) ;
889
846
} ) ;
0 commit comments