Skip to content

Commit 2ac2b18

Browse files
TommyCpplalitb
andauthored
feat: add shutdown in TracerProvider (open-telemetry#1855)
Co-authored-by: Lalit Kumar Bhasin <lalit_fin@yahoo.com> Co-authored-by: Lalit Kumar Bhasin <labhas@microsoft.com>
1 parent bc1f94c commit 2ac2b18

File tree

5 files changed

+180
-31
lines changed

5 files changed

+180
-31
lines changed

opentelemetry-sdk/src/logs/log_emitter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {
7777

7878
fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
7979
// If the provider is shutdown, new logger will refer a no-op logger provider.
80-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
80+
if self.is_shutdown.load(Ordering::Relaxed) {
8181
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
8282
}
8383
Logger::new(library, self.clone())

opentelemetry-sdk/src/trace/provider.rs

+164-17
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,37 @@
99
//! not duplicate this data to avoid that different [`Tracer`] instances
1010
//! of the [`TracerProvider`] have different versions of these data.
1111
use crate::runtime::RuntimeChannel;
12-
use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer};
12+
use crate::trace::{
13+
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
14+
};
1315
use crate::{export::trace::SpanExporter, trace::SpanProcessor};
1416
use crate::{InstrumentationLibrary, Resource};
15-
use once_cell::sync::OnceCell;
17+
use once_cell::sync::{Lazy, OnceCell};
18+
use opentelemetry::trace::TraceError;
1619
use opentelemetry::{global, trace::TraceResult};
1720
use std::borrow::Cow;
21+
use std::sync::atomic::{AtomicBool, Ordering};
1822
use std::sync::Arc;
1923

2024
/// Default tracer name if empty string is provided.
2125
const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer";
2226
static PROVIDER_RESOURCE: OnceCell<Resource> = OnceCell::new();
2327

28+
// a no nop tracer provider used as placeholder when the provider is shutdown
29+
static NOOP_TRACER_PROVIDER: Lazy<TracerProvider> = Lazy::new(|| TracerProvider {
30+
inner: Arc::new(TracerProviderInner {
31+
processors: Vec::new(),
32+
config: Config {
33+
// cannot use default here as the default resource is not empty
34+
sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
35+
id_generator: Box::<RandomIdGenerator>::default(),
36+
span_limits: SpanLimits::default(),
37+
resource: Cow::Owned(Resource::empty()),
38+
},
39+
}),
40+
is_shutdown: Arc::new(AtomicBool::new(true)),
41+
});
42+
2443
/// TracerProvider inner type
2544
#[derive(Debug)]
2645
pub(crate) struct TracerProviderInner {
@@ -39,9 +58,14 @@ impl Drop for TracerProviderInner {
3958
}
4059

4160
/// Creator and registry of named [`Tracer`] instances.
61+
///
62+
/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components.
63+
/// Cloning and dropping them will not stop the span processing. To stop span processing, users
64+
/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`.
4265
#[derive(Clone, Debug)]
4366
pub struct TracerProvider {
4467
inner: Arc<TracerProviderInner>,
68+
is_shutdown: Arc<AtomicBool>,
4569
}
4670

4771
impl Default for TracerProvider {
@@ -52,8 +76,11 @@ impl Default for TracerProvider {
5276

5377
impl TracerProvider {
5478
/// Build a new tracer provider
55-
pub(crate) fn new(inner: Arc<TracerProviderInner>) -> Self {
56-
TracerProvider { inner }
79+
pub(crate) fn new(inner: TracerProviderInner) -> Self {
80+
TracerProvider {
81+
inner: Arc::new(inner),
82+
is_shutdown: Arc::new(AtomicBool::new(false)),
83+
}
5784
}
5885

5986
/// Create a new [`TracerProvider`] builder.
@@ -71,6 +98,12 @@ impl TracerProvider {
7198
&self.inner.config
7299
}
73100

101+
/// true if the provider has been shutdown
102+
/// Don't start span or export spans when provider is shutdown
103+
pub(crate) fn is_shutdown(&self) -> bool {
104+
self.is_shutdown.load(Ordering::Relaxed)
105+
}
106+
74107
/// Force flush all remaining spans in span processors and return results.
75108
///
76109
/// # Examples
@@ -114,11 +147,41 @@ impl TracerProvider {
114147
.map(|processor| processor.force_flush())
115148
.collect()
116149
}
150+
151+
/// Shuts down the current `TracerProvider`.
152+
///
153+
/// Note that shut down doesn't means the TracerProvider has dropped
154+
pub fn shutdown(&self) -> TraceResult<()> {
155+
if self
156+
.is_shutdown
157+
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
158+
.is_ok()
159+
{
160+
// propagate the shutdown signal to processors
161+
// it's up to the processor to properly block new spans after shutdown
162+
let mut errs = vec![];
163+
for processor in &self.inner.processors {
164+
if let Err(err) = processor.shutdown() {
165+
errs.push(err);
166+
}
167+
}
168+
169+
if errs.is_empty() {
170+
Ok(())
171+
} else {
172+
Err(TraceError::Other(format!("{errs:?}").into()))
173+
}
174+
} else {
175+
Err(TraceError::Other(
176+
"tracer provider already shut down".into(),
177+
))
178+
}
179+
}
117180
}
118181

119182
impl opentelemetry::trace::TracerProvider for TracerProvider {
120183
/// This implementation of `TracerProvider` produces `Tracer` instances.
121-
type Tracer = crate::trace::Tracer;
184+
type Tracer = Tracer;
122185

123186
/// Create a new versioned `Tracer` instance.
124187
fn versioned_tracer(
@@ -152,7 +215,10 @@ impl opentelemetry::trace::TracerProvider for TracerProvider {
152215
}
153216

154217
fn library_tracer(&self, library: Arc<InstrumentationLibrary>) -> Self::Tracer {
155-
Tracer::new(library, Arc::downgrade(&self.inner))
218+
if self.is_shutdown.load(Ordering::Relaxed) {
219+
return Tracer::new(library, NOOP_TRACER_PROVIDER.clone());
220+
}
221+
Tracer::new(library, self.clone())
156222
}
157223
}
158224

@@ -226,9 +292,7 @@ impl Builder {
226292
p.set_resource(config.resource.as_ref());
227293
}
228294

229-
TracerProvider {
230-
inner: Arc::new(TracerProviderInner { processors, config }),
231-
}
295+
TracerProvider::new(TracerProviderInner { processors, config })
232296
}
233297
}
234298

@@ -241,24 +305,59 @@ mod tests {
241305
use crate::trace::provider::TracerProviderInner;
242306
use crate::trace::{Config, Span, SpanProcessor};
243307
use crate::Resource;
244-
use opentelemetry::trace::{TraceError, TraceResult};
308+
use opentelemetry::trace::{TraceError, TraceResult, Tracer, TracerProvider};
245309
use opentelemetry::{Context, Key, KeyValue, Value};
246310
use std::borrow::Cow;
247311
use std::env;
312+
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
248313
use std::sync::Arc;
249314

315+
// fields below is wrapped with Arc so we can assert it
316+
#[derive(Default, Debug)]
317+
struct AssertInfo {
318+
started_span: AtomicU32,
319+
is_shutdown: AtomicBool,
320+
}
321+
322+
#[derive(Default, Debug, Clone)]
323+
struct SharedAssertInfo(Arc<AssertInfo>);
324+
325+
impl SharedAssertInfo {
326+
fn started_span_count(&self, count: u32) -> bool {
327+
self.0.started_span.load(Ordering::SeqCst) == count
328+
}
329+
}
330+
250331
#[derive(Debug)]
251332
struct TestSpanProcessor {
252333
success: bool,
334+
assert_info: SharedAssertInfo,
335+
}
336+
337+
impl TestSpanProcessor {
338+
fn new(success: bool) -> TestSpanProcessor {
339+
TestSpanProcessor {
340+
success,
341+
assert_info: SharedAssertInfo::default(),
342+
}
343+
}
344+
345+
// get handle to assert info
346+
fn assert_info(&self) -> SharedAssertInfo {
347+
self.assert_info.clone()
348+
}
253349
}
254350

255351
impl SpanProcessor for TestSpanProcessor {
256352
fn on_start(&self, _span: &mut Span, _cx: &Context) {
257-
unimplemented!()
353+
self.assert_info
354+
.0
355+
.started_span
356+
.fetch_add(1, Ordering::SeqCst);
258357
}
259358

260359
fn on_end(&self, _span: SpanData) {
261-
unimplemented!()
360+
// ignore
262361
}
263362

264363
fn force_flush(&self) -> TraceResult<()> {
@@ -270,19 +369,29 @@ mod tests {
270369
}
271370

272371
fn shutdown(&self) -> TraceResult<()> {
273-
self.force_flush()
372+
if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
373+
Ok(())
374+
} else {
375+
let _ = self.assert_info.0.is_shutdown.compare_exchange(
376+
false,
377+
true,
378+
Ordering::SeqCst,
379+
Ordering::SeqCst,
380+
);
381+
self.force_flush()
382+
}
274383
}
275384
}
276385

277386
#[test]
278387
fn test_force_flush() {
279-
let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner {
388+
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
280389
processors: vec![
281-
Box::from(TestSpanProcessor { success: true }),
282-
Box::from(TestSpanProcessor { success: false }),
390+
Box::from(TestSpanProcessor::new(true)),
391+
Box::from(TestSpanProcessor::new(false)),
283392
],
284393
config: Default::default(),
285-
}));
394+
});
286395

287396
let results = tracer_provider.force_flush();
288397
assert_eq!(results.len(), 2);
@@ -417,4 +526,42 @@ mod tests {
417526

418527
assert_eq!(no_service_name.config().resource.len(), 0)
419528
}
529+
530+
#[test]
531+
fn test_shutdown_noops() {
532+
let processor = TestSpanProcessor::new(false);
533+
let assert_handle = processor.assert_info();
534+
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
535+
processors: vec![Box::from(processor)],
536+
config: Default::default(),
537+
});
538+
539+
let test_tracer_1 = tracer_provider.tracer("test1");
540+
let _ = test_tracer_1.start("test");
541+
542+
assert!(assert_handle.started_span_count(1));
543+
544+
let _ = test_tracer_1.start("test");
545+
546+
assert!(assert_handle.started_span_count(2));
547+
548+
let shutdown = |tracer_provider: super::TracerProvider| {
549+
let _ = tracer_provider.shutdown(); // shutdown once
550+
};
551+
552+
// assert tracer provider can be shutdown using on a cloned version
553+
shutdown(tracer_provider.clone());
554+
555+
// after shutdown we should get noop tracer
556+
let noop_tracer = tracer_provider.tracer("noop");
557+
// noop tracer cannot start anything
558+
let _ = noop_tracer.start("test");
559+
assert!(assert_handle.started_span_count(2));
560+
// noop tracer's tracer provider should be shutdown
561+
assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst));
562+
563+
// existing tracer becomes noops after shutdown
564+
let _ = test_tracer_1.start("test");
565+
assert!(assert_handle.started_span_count(2));
566+
}
420567
}

opentelemetry-sdk/src/trace/span.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,11 @@ impl Span {
204204
None => return,
205205
};
206206

207+
let provider = self.tracer.provider();
207208
// skip if provider has been shut down
208-
let provider = match self.tracer.provider() {
209-
Some(provider) => provider,
210-
None => return,
211-
};
209+
if provider.is_shutdown() {
210+
return;
211+
}
212212

213213
// ensure end time is set via explicit end or implicitly on drop
214214
if let Some(timestamp) = timestamp {
@@ -719,7 +719,7 @@ mod tests {
719719
let exported_data = span.exported_data();
720720
assert!(exported_data.is_some());
721721

722-
drop(provider);
722+
provider.shutdown().expect("shutdown panicked");
723723
let dropped_span = tracer.start("span_with_dropped_provider");
724724
// return none if the provider has already been dropped
725725
assert!(dropped_span.exported_data().is_none());

opentelemetry-sdk/src/trace/span_processor.rs

+2
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
9292
fn force_flush(&self) -> TraceResult<()>;
9393
/// Shuts down the processor. Called when SDK is shut down. This is an
9494
/// opportunity for processors to do any cleanup required.
95+
///
96+
/// Implementation should make sure shutdown can be called multiple times.
9597
fn shutdown(&self) -> TraceResult<()>;
9698
/// Set the resource for the log processor.
9799
fn set_resource(&mut self, _resource: &Resource) {}

opentelemetry-sdk/src/trace/tracer.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
//! Docs: <https://github.com/open-telemetry/opentelemetry-specification/blob/v1.3.0/specification/trace/api.md#tracer>
1010
use crate::{
1111
trace::{
12-
provider::{TracerProvider, TracerProviderInner},
12+
provider::TracerProvider,
1313
span::{Span, SpanData},
1414
SpanLimits, SpanLinks,
1515
},
@@ -20,15 +20,15 @@ use opentelemetry::{
2020
Context, KeyValue,
2121
};
2222
use std::fmt;
23-
use std::sync::{Arc, Weak};
23+
use std::sync::Arc;
2424

2525
use super::SpanEvents;
2626

2727
/// `Tracer` implementation to create and manage spans
2828
#[derive(Clone)]
2929
pub struct Tracer {
3030
instrumentation_lib: Arc<InstrumentationLibrary>,
31-
provider: Weak<TracerProviderInner>,
31+
provider: TracerProvider,
3232
}
3333

3434
impl fmt::Debug for Tracer {
@@ -46,7 +46,7 @@ impl Tracer {
4646
/// Create a new tracer (used internally by `TracerProvider`s).
4747
pub(crate) fn new(
4848
instrumentation_lib: Arc<InstrumentationLibrary>,
49-
provider: Weak<TracerProviderInner>,
49+
provider: TracerProvider,
5050
) -> Self {
5151
Tracer {
5252
instrumentation_lib,
@@ -55,8 +55,8 @@ impl Tracer {
5555
}
5656

5757
/// TracerProvider associated with this tracer.
58-
pub(crate) fn provider(&self) -> Option<TracerProvider> {
59-
self.provider.upgrade().map(TracerProvider::new)
58+
pub(crate) fn provider(&self) -> &TracerProvider {
59+
&self.provider
6060
}
6161

6262
/// Instrumentation library information of this tracer.
@@ -175,7 +175,8 @@ impl opentelemetry::trace::Tracer for Tracer {
175175
/// spans in the trace.
176176
fn build_with_context(&self, mut builder: SpanBuilder, parent_cx: &Context) -> Self::Span {
177177
let provider = self.provider();
178-
if provider.is_none() {
178+
// no point start a span if the tracer provider has already being shutdown
179+
if provider.is_shutdown() {
179180
return Span::new(
180181
SpanContext::empty_context(),
181182
None,
@@ -184,7 +185,6 @@ impl opentelemetry::trace::Tracer for Tracer {
184185
);
185186
}
186187

187-
let provider = provider.unwrap();
188188
let config = provider.config();
189189
let span_id = builder
190190
.span_id

0 commit comments

Comments
 (0)