Skip to content

Commit 1f35467

Browse files
authored
BatchSpanProcessor with dedicated thread. (#2456)
1 parent 6209c06 commit 1f35467

File tree

18 files changed

+1324
-555
lines changed

18 files changed

+1324
-555
lines changed

examples/tracing-grpc/src/client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ fn init_tracer() -> sdktrace::TracerProvider {
1313
global::set_text_map_propagator(TraceContextPropagator::new());
1414
// Install stdout exporter pipeline to be able to retrieve the collected spans.
1515
let provider = sdktrace::TracerProvider::builder()
16-
.with_batch_exporter(SpanExporter::default(), Tokio)
16+
.with_batch_exporter(SpanExporter::default())
1717
.build();
1818

1919
global::set_tracer_provider(provider.clone());

examples/tracing-grpc/src/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ fn init_tracer() -> TracerProvider {
1515
global::set_text_map_propagator(TraceContextPropagator::new());
1616
// Install stdout exporter pipeline to be able to retrieve the collected spans.
1717
let provider = TracerProvider::builder()
18-
.with_batch_exporter(SpanExporter::default(), Tokio)
18+
.with_batch_exporter(SpanExporter::default())
1919
.build();
2020

2121
global::set_tracer_provider(provider.clone());

examples/tracing-jaeger/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ fn init_tracer_provider() -> Result<opentelemetry_sdk::trace::TracerProvider, Tr
1414
.build()?;
1515

1616
Ok(TracerProvider::builder()
17-
.with_batch_exporter(exporter, runtime::Tokio)
17+
.with_batch_exporter(exporter)
1818
.with_resource(
1919
Resource::builder()
2020
.with_service_name("tracing-jaeger")

opentelemetry-otlp/examples/basic-otlp-http/src/main.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
4949
.build()?;
5050

5151
Ok(TracerProvider::builder()
52-
// TODO: Enable BatchExporter after
53-
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
54-
.with_simple_exporter(exporter)
52+
.with_batch_exporter(exporter)
5553
.with_resource(RESOURCE.clone())
5654
.build())
5755
}
@@ -73,7 +71,6 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric
7371

7472
// #[tokio::main]
7573
// TODO: Re-enable tokio::main, if needed, after
76-
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
7774
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
7875
let logger_provider = init_logs()?;
7976

opentelemetry-otlp/examples/basic-otlp/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
2727
.build()?;
2828
Ok(sdktrace::TracerProvider::builder()
2929
.with_resource(RESOURCE.clone())
30-
.with_batch_exporter(exporter, runtime::Tokio)
30+
.with_batch_exporter(exporter)
3131
.build())
3232
}
3333

opentelemetry-otlp/tests/integration_test/tests/traces.rs

+45-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
3535
let exporter = exporter_builder.build()?;
3636

3737
Ok(opentelemetry_sdk::trace::TracerProvider::builder()
38-
.with_batch_exporter(exporter, runtime::Tokio)
38+
.with_batch_exporter(exporter)
3939
.with_resource(
4040
Resource::builder_empty()
4141
.with_service_name("basic-otlp-tracing-example")
@@ -141,6 +141,50 @@ pub fn test_serde() -> Result<()> {
141141
Ok(())
142142
}
143143

144+
#[test]
145+
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
146+
pub fn span_batch_non_tokio_main() -> Result<()> {
147+
// Initialize the tracer provider inside a tokio runtime
148+
// as this allows tonic client to capture the runtime,
149+
// but actual export occurs from the dedicated std::thread
150+
// created by BatchSpanProcessor.
151+
152+
use anyhow::Ok;
153+
let rt = tokio::runtime::Runtime::new()?;
154+
let tracer_provider = rt.block_on(async {
155+
// While we're here setup our collector container too, as this needs tokio to run
156+
let _ = test_utils::start_collector_container().await;
157+
init_tracer_provider()
158+
})?;
159+
160+
let tracer = global::tracer("ex.com/basic");
161+
162+
tracer.in_span("operation", |cx| {
163+
let span = cx.span();
164+
span.add_event(
165+
"Nice operation!".to_string(),
166+
vec![KeyValue::new("bogons", 100)],
167+
);
168+
span.set_attribute(KeyValue::new(ANOTHER_KEY, "yes"));
169+
170+
tracer.in_span("Sub operation...", |cx| {
171+
let span = cx.span();
172+
span.set_attribute(KeyValue::new(LEMONS_KEY, "five"));
173+
174+
span.add_event("Sub span event", vec![]);
175+
});
176+
});
177+
178+
tracer_provider.shutdown()?;
179+
180+
// Give it a second to flush
181+
std::thread::sleep(Duration::from_secs(2));
182+
183+
// Validate results
184+
assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?;
185+
Ok(())
186+
}
187+
144188
///
145189
/// Make sure we stop the collector container, otherwise it will sit around hogging our
146190
/// ports and subsequent test runs will fail.

opentelemetry-otlp/tests/smoke.rs

-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ async fn smoke_tracer() {
100100
.with_metadata(metadata)
101101
.build()
102102
.expect("NON gzip-tonic SpanExporter failed to build"),
103-
opentelemetry_sdk::runtime::Tokio,
104103
)
105104
.build();
106105

opentelemetry-sdk/CHANGELOG.md

+52
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,58 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope
159159
- Continue enabling one of the async runtime feature flags: `rt-tokio`,
160160
`rt-tokio-current-thread`, or `rt-async-std`.
161161

162+
- **Breaking** [#2456](https://github.com/open-telemetry/opentelemetry-rust/pull/2456)
163+
164+
`BatchSpanProcessor` no longer requires an async runtime by default. Instead, a dedicated
165+
background thread is created to do the batch processing and exporting.
166+
167+
For users who prefer the previous behavior of relying on a specific
168+
`Runtime`, they can do so by enabling the feature flag
169+
**`experimental_trace_batch_span_processor_with_async_runtime`**.
170+
171+
1. *Default Implementation, requires no async runtime* (**Recommended**) The
172+
new default implementation does not require a runtime argument. Replace the
173+
builder method accordingly:
174+
- *Before:*
175+
```rust
176+
let tracer_provider = TracerProvider::builder()
177+
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
178+
.build();
179+
```
180+
181+
- *After:*
182+
```rust
183+
let tracer_provider = TracerProvider::builder()
184+
.with_span_processor(BatchSpanProcessor::builder(exporter).build())
185+
.build();
186+
```
187+
188+
2. *Async Runtime Support*
189+
If your application cannot spin up new threads or you prefer using async
190+
runtimes, enable the
191+
"experimental_trace_batch_span_processor_with_async_runtime" feature flag and
192+
adjust code as below.
193+
194+
- *Before:*
195+
```rust
196+
let tracer_provider = TracerProvider::builder()
197+
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
198+
.build();
199+
```
200+
201+
- *After:*
202+
```rust
203+
let tracer_provider = TracerProvider::builder()
204+
.with_span_processor(span_processor_with_async_runtime::BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
205+
.build();
206+
```
207+
208+
*Requirements:*
209+
- Enable the feature flag:
210+
`experimental_trace_batch_span_processor_with_async_runtime`.
211+
- Continue enabling one of the async runtime feature flags: `rt-tokio`,
212+
`rt-tokio-current-thread`, or `rt-async-std`.
213+
162214
## 0.27.1
163215

164216
Released 2024-Nov-27

opentelemetry-sdk/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ internal-logs = ["tracing"]
5656
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
5757
spec_unstable_metrics_views = ["metrics"]
5858
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]
59+
experimental_trace_batch_span_processor_with_async_runtime = ["trace"]
60+
5961

6062
[[bench]]
6163
name = "context"

opentelemetry-sdk/benches/batch_span_processor.rs

+10-13
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use opentelemetry::trace::{
33
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
44
};
55
use opentelemetry_sdk::export::trace::SpanData;
6-
use opentelemetry_sdk::runtime::Tokio;
76
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
87
use opentelemetry_sdk::trace::{
98
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
@@ -49,14 +48,13 @@ fn criterion_benchmark(c: &mut Criterion) {
4948
b.iter(|| {
5049
let rt = Runtime::new().unwrap();
5150
rt.block_on(async move {
52-
let span_processor =
53-
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
54-
.with_batch_config(
55-
BatchConfigBuilder::default()
56-
.with_max_queue_size(10_000)
57-
.build(),
58-
)
59-
.build();
51+
let span_processor = BatchSpanProcessor::builder(NoopSpanExporter::new())
52+
.with_batch_config(
53+
BatchConfigBuilder::default()
54+
.with_max_queue_size(10_000)
55+
.build(),
56+
)
57+
.build();
6058
let mut shared_span_processor = Arc::new(span_processor);
6159
let mut handles = Vec::with_capacity(10);
6260
for _ in 0..task_num {
@@ -70,10 +68,9 @@ fn criterion_benchmark(c: &mut Criterion) {
7068
}));
7169
}
7270
futures_util::future::join_all(handles).await;
73-
let _ =
74-
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
75-
.unwrap()
76-
.shutdown();
71+
let _ = Arc::<BatchSpanProcessor>::get_mut(&mut shared_span_processor)
72+
.unwrap()
73+
.shutdown();
7774
});
7875
})
7976
},

opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex};
2222
///# async fn main() {
2323
/// let exporter = InMemorySpanExporterBuilder::new().build();
2424
/// let provider = TracerProvider::builder()
25-
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone(), runtime::Tokio).build())
25+
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone()).build())
2626
/// .build();
2727
///
2828
/// global::set_tracer_provider(provider.clone());

opentelemetry-sdk/src/trace/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ mod sampler;
1515
mod span;
1616
mod span_limit;
1717
mod span_processor;
18+
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
19+
/// Experimental feature to use async runtime with batch span processor.
20+
pub mod span_processor_with_async_runtime;
1821
mod tracer;
1922

2023
pub use config::{config, Config};
@@ -30,11 +33,13 @@ pub use span_processor::{
3033
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
3134
SimpleSpanProcessor, SpanProcessor,
3235
};
36+
3337
pub use tracer::Tracer;
3438

3539
#[cfg(feature = "jaeger_remote_sampler")]
3640
pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder};
3741

42+
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
3843
#[cfg(test)]
3944
mod runtime_tests;
4045

opentelemetry-sdk/src/trace/provider.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
/// provider.shutdown();
6363
/// }
6464
/// ```
65-
use crate::runtime::RuntimeChannel;
6665
use crate::trace::{
6766
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
6867
};
@@ -296,12 +295,8 @@ impl Builder {
296295
}
297296

298297
/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
299-
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
300-
self,
301-
exporter: T,
302-
runtime: R,
303-
) -> Self {
304-
let batch = BatchSpanProcessor::builder(exporter, runtime).build();
298+
pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
299+
let batch = BatchSpanProcessor::builder(exporter).build();
305300
self.with_span_processor(batch)
306301
}
307302

opentelemetry-sdk/src/trace/runtime_tests.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,12 @@ fn build_batch_tracer_provider<R: RuntimeChannel>(
5252
runtime: R,
5353
) -> crate::trace::TracerProvider {
5454
use crate::trace::TracerProvider;
55+
let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
56+
exporter, runtime,
57+
)
58+
.build();
5559
TracerProvider::builder()
56-
.with_batch_exporter(exporter, runtime)
60+
.with_span_processor(processor)
5761
.build()
5862
}
5963

0 commit comments

Comments
 (0)