From 2ca8b099b7abaa77309775a8d113ae5b6626be36 Mon Sep 17 00:00:00 2001 From: John Howard Date: Wed, 24 Jul 2024 14:14:05 -0700 Subject: [PATCH] Fix shutdown of single threaded trace batch provider Fixes https://github.com/open-telemetry/opentelemetry-rust/issues/1963 --- opentelemetry-sdk/CHANGELOG.md | 1 + opentelemetry-sdk/src/trace/span_processor.rs | 34 +++++++++++-------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index e985bdb21a..56e8e89922 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -3,6 +3,7 @@ ## vNext - `opentelemetry_sdk::logs::record::LogRecord` and `opentelemetry_sdk::logs::record::TraceContext` derive from `PartialEq` to facilitate Unit Testing. +- Fixed an issue causing a panic during shutdown when using the `TokioCurrentThread` tracing batch processor. ## v0.24.1 diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index b99dc45311..e18ed9cf55 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -449,22 +449,28 @@ impl BatchSpanProcessor { pub(crate) fn new(exporter: Box, config: BatchConfig, runtime: R) -> Self { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); - let ticker = runtime - .interval(config.scheduled_delay) - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = runtime.clone(); - - let messages = Box::pin(stream::select(message_receiver, ticker)); - let processor = BatchSpanProcessorInternal { - spans: Vec::new(), - export_tasks: FuturesUnordered::new(), - runtime: timeout_runtime, - config, - exporter, - }; + let inner_runtime = runtime.clone(); // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(processor.run(messages))); + runtime.spawn(Box::pin(async move { + // Timer will take a reference to the current runtime, so its important we do this within the + // runtime.spawn() + let ticker = inner_runtime + .interval(config.scheduled_delay) + .map(|_| BatchMessage::Flush(None)); + let timeout_runtime = inner_runtime.clone(); + + let messages = Box::pin(stream::select(message_receiver, ticker)); + let processor = BatchSpanProcessorInternal { + spans: Vec::new(), + export_tasks: FuturesUnordered::new(), + runtime: timeout_runtime, + config, + exporter, + }; + + processor.run(messages).await + })); // Return batch processor with link to worker BatchSpanProcessor { message_sender }