Skip to content

Commit d4b8fe5

Browse files
committed
Make SpanProcessor::{on_end,force_flush,shutdown} async
1 parent d8253bf commit d4b8fe5

File tree

7 files changed

+134
-90
lines changed

7 files changed

+134
-90
lines changed

opentelemetry-sdk/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ rustdoc-args = ["--cfg", "docsrs"]
3939
[dev-dependencies]
4040
criterion = { workspace = true, features = ["html_reports"] }
4141
temp-env = { workspace = true }
42+
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
4243

4344
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
4445
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }

opentelemetry-sdk/benches/batch_span_processor.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ fn criterion_benchmark(c: &mut Criterion) {
6464
let spans = get_span_data();
6565
handles.push(tokio::spawn(async move {
6666
for span in spans {
67-
span_processor.on_end(span);
67+
span_processor.on_end(span).await;
6868
tokio::task::yield_now().await;
6969
}
7070
}));
@@ -73,7 +73,8 @@ fn criterion_benchmark(c: &mut Criterion) {
7373
let _ =
7474
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
7575
.unwrap()
76-
.shutdown();
76+
.shutdown()
77+
.await;
7778
});
7879
})
7980
},

opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex};
3737
/// cx.span().add_event("handling this...", Vec::new());
3838
/// cx.span().end();
3939
///
40-
/// let results = provider.force_flush();
40+
/// let results = provider.force_flush().await;
4141
/// for result in results {
4242
/// if let Err(e) = result {
4343
/// println!("{:?}", e)

opentelemetry-sdk/src/trace/provider.rs

+30-24
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::trace::{
1414
};
1515
use crate::{export::trace::SpanExporter, trace::SpanProcessor};
1616
use crate::{InstrumentationLibrary, Resource};
17+
use futures_util::StreamExt;
1718
use once_cell::sync::{Lazy, OnceCell};
1819
use opentelemetry::trace::TraceError;
1920
use opentelemetry::{global, trace::TraceResult};
@@ -49,11 +50,14 @@ pub(crate) struct TracerProviderInner {
4950

5051
impl Drop for TracerProviderInner {
5152
fn drop(&mut self) {
52-
for processor in &mut self.processors {
53-
if let Err(err) = processor.shutdown() {
54-
global::handle_error(err);
53+
let processors = std::mem::take(&mut self.processors);
54+
crate::util::spawn_future(async move {
55+
for processor in processors {
56+
if let Err(err) = processor.shutdown().await {
57+
global::handle_error(err);
58+
}
5559
}
56-
}
60+
})
5761
}
5862
}
5963

@@ -121,13 +125,14 @@ impl TracerProvider {
121125
/// provider
122126
/// }
123127
///
124-
/// fn main() {
128+
/// #[tokio::main]
129+
/// async fn main() {
125130
/// let provider = init_tracing();
126131
///
127132
/// // create spans..
128133
///
129134
/// // force all spans to flush
130-
/// for result in provider.force_flush() {
135+
/// for result in provider.force_flush().await {
131136
/// if let Err(err) = result {
132137
/// // .. handle flush error
133138
/// }
@@ -141,17 +146,17 @@ impl TracerProvider {
141146
/// global::shutdown_tracer_provider();
142147
/// }
143148
/// ```
144-
pub fn force_flush(&self) -> Vec<TraceResult<()>> {
145-
self.span_processors()
146-
.iter()
147-
.map(|processor| processor.force_flush())
149+
pub async fn force_flush(&self) -> Vec<TraceResult<()>> {
150+
futures_util::stream::iter(self.span_processors())
151+
.then(|processor| processor.force_flush())
148152
.collect()
153+
.await
149154
}
150155

151156
/// Shuts down the current `TracerProvider`.
152157
///
153158
/// Note that shut down doesn't means the TracerProvider has dropped
154-
pub fn shutdown(&self) -> TraceResult<()> {
159+
pub async fn shutdown(&self) -> TraceResult<()> {
155160
if self
156161
.is_shutdown
157162
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
@@ -161,7 +166,7 @@ impl TracerProvider {
161166
// it's up to the processor to properly block new spans after shutdown
162167
let mut errs = vec![];
163168
for processor in &self.inner.processors {
164-
if let Err(err) = processor.shutdown() {
169+
if let Err(err) = processor.shutdown().await {
165170
errs.push(err);
166171
}
167172
}
@@ -348,6 +353,7 @@ mod tests {
348353
}
349354
}
350355

356+
#[async_trait::async_trait]
351357
impl SpanProcessor for TestSpanProcessor {
352358
fn on_start(&self, _span: &mut Span, _cx: &Context) {
353359
self.assert_info
@@ -356,19 +362,19 @@ mod tests {
356362
.fetch_add(1, Ordering::SeqCst);
357363
}
358364

359-
fn on_end(&self, _span: SpanData) {
365+
async fn on_end(&self, _span: SpanData) {
360366
// ignore
361367
}
362368

363-
fn force_flush(&self) -> TraceResult<()> {
369+
async fn force_flush(&self) -> TraceResult<()> {
364370
if self.success {
365371
Ok(())
366372
} else {
367373
Err(TraceError::from("cannot export"))
368374
}
369375
}
370376

371-
fn shutdown(&self) -> TraceResult<()> {
377+
async fn shutdown(&self) -> TraceResult<()> {
372378
if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
373379
Ok(())
374380
} else {
@@ -378,13 +384,13 @@ mod tests {
378384
Ordering::SeqCst,
379385
Ordering::SeqCst,
380386
);
381-
self.force_flush()
387+
self.force_flush().await
382388
}
383389
}
384390
}
385391

386-
#[test]
387-
fn test_force_flush() {
392+
#[tokio::test]
393+
async fn test_force_flush() {
388394
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
389395
processors: vec![
390396
Box::from(TestSpanProcessor::new(true)),
@@ -393,7 +399,7 @@ mod tests {
393399
config: Default::default(),
394400
});
395401

396-
let results = tracer_provider.force_flush();
402+
let results = tracer_provider.force_flush().await;
397403
assert_eq!(results.len(), 2);
398404
}
399405

@@ -527,8 +533,8 @@ mod tests {
527533
assert_eq!(no_service_name.config().resource.len(), 0)
528534
}
529535

530-
#[test]
531-
fn test_shutdown_noops() {
536+
#[tokio::test]
537+
async fn test_shutdown_noops() {
532538
let processor = TestSpanProcessor::new(false);
533539
let assert_handle = processor.assert_info();
534540
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
@@ -545,12 +551,12 @@ mod tests {
545551

546552
assert!(assert_handle.started_span_count(2));
547553

548-
let shutdown = |tracer_provider: super::TracerProvider| {
549-
let _ = tracer_provider.shutdown(); // shutdown once
554+
let shutdown = |tracer_provider: super::TracerProvider| async move {
555+
let _ = tracer_provider.shutdown().await; // shutdown once
550556
};
551557

552558
// assert tracer provider can be shutdown using on a cloned version
553-
shutdown(tracer_provider.clone());
559+
shutdown(tracer_provider.clone()).await;
554560

555561
// after shutdown we should get noop tracer
556562
let noop_tracer = tracer_provider.tracer("noop");

opentelemetry-sdk/src/trace/span.rs

+44-19
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use opentelemetry::KeyValue;
1414
use std::borrow::Cow;
1515
use std::time::SystemTime;
1616

17+
use super::TracerProvider;
18+
1719
/// Single operation within a trace.
1820
#[derive(Debug)]
1921
pub struct Span {
@@ -192,24 +194,45 @@ impl opentelemetry::trace::Span for Span {
192194

193195
/// Finishes the span with given timestamp.
194196
fn end_with_timestamp(&mut self, timestamp: SystemTime) {
195-
self.ensure_ended_and_exported(Some(timestamp));
197+
if let Some(fut) = self.ensure_ended_and_exported(Some(timestamp)) {
198+
crate::util::spawn_future(fut)
199+
}
196200
}
197201
}
198202

199203
impl Span {
200-
fn ensure_ended_and_exported(&mut self, timestamp: Option<SystemTime>) {
204+
fn ensure_ended_and_exported(
205+
&mut self,
206+
timestamp: Option<SystemTime>,
207+
) -> Option<impl std::future::Future<Output = ()>> {
201208
// skip if data has already been exported
202-
let mut data = match self.data.take() {
209+
let data = match self.data.take() {
203210
Some(data) => data,
204-
None => return,
211+
None => return None,
205212
};
206213

207214
let provider = self.tracer.provider();
208215
// skip if provider has been shut down
209216
if provider.is_shutdown() {
210-
return;
217+
return None;
211218
}
212219

220+
Some(Self::ensure_ended_and_exported_impl(
221+
data,
222+
provider.clone(),
223+
self.tracer.clone(),
224+
self.span_context.clone(),
225+
timestamp,
226+
))
227+
}
228+
229+
async fn ensure_ended_and_exported_impl(
230+
mut data: SpanData,
231+
provider: TracerProvider,
232+
tracer: crate::trace::Tracer,
233+
span_context: SpanContext,
234+
timestamp: Option<SystemTime>,
235+
) {
213236
// ensure end time is set via explicit end or implicitly on drop
214237
if let Some(timestamp) = timestamp {
215238
data.end_time = timestamp;
@@ -220,19 +243,19 @@ impl Span {
220243
match provider.span_processors() {
221244
[] => {}
222245
[processor] => {
223-
processor.on_end(build_export_data(
224-
data,
225-
self.span_context.clone(),
226-
&self.tracer,
227-
));
246+
processor
247+
.on_end(build_export_data(data, span_context, &tracer))
248+
.await;
228249
}
229250
processors => {
230251
for processor in processors {
231-
processor.on_end(build_export_data(
232-
data.clone(),
233-
self.span_context.clone(),
234-
&self.tracer,
235-
));
252+
processor
253+
.on_end(build_export_data(
254+
data.clone(),
255+
span_context.clone(),
256+
&tracer,
257+
))
258+
.await;
236259
}
237260
}
238261
}
@@ -242,7 +265,9 @@ impl Span {
242265
impl Drop for Span {
243266
/// Report span on inner drop
244267
fn drop(&mut self) {
245-
self.ensure_ended_and_exported(None);
268+
if let Some(fut) = self.ensure_ended_and_exported(None) {
269+
crate::util::spawn_future(fut)
270+
}
246271
}
247272
}
248273

@@ -705,8 +730,8 @@ mod tests {
705730
assert_eq!(event_vec.len(), DEFAULT_MAX_EVENT_PER_SPAN as usize);
706731
}
707732

708-
#[test]
709-
fn test_span_exported_data() {
733+
#[tokio::test]
734+
async fn test_span_exported_data() {
710735
let provider = crate::trace::TracerProvider::builder()
711736
.with_simple_exporter(NoopSpanExporter::new())
712737
.build();
@@ -719,7 +744,7 @@ mod tests {
719744
let exported_data = span.exported_data();
720745
assert!(exported_data.is_some());
721746

722-
provider.shutdown().expect("shutdown panicked");
747+
provider.shutdown().await.expect("shutdown panicked");
723748
let dropped_span = tracer.start("span_with_dropped_provider");
724749
// return none if the provider has already been dropped
725750
assert!(dropped_span.exported_data().is_none());

0 commit comments

Comments
 (0)