Skip to content

Commit 7694d19

Browse files
authored
Merge branch 'main' into cijothomas/periodicreader-help
2 parents 972ce98 + 013d51a commit 7694d19

File tree

9 files changed

+143
-49
lines changed

9 files changed

+143
-49
lines changed

opentelemetry-prometheus/examples/hyper.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use opentelemetry_sdk::metrics::SdkMeterProvider;
1515
use prometheus::{Encoder, Registry, TextEncoder};
1616
use std::net::SocketAddr;
1717
use std::sync::Arc;
18-
use std::time::SystemTime;
18+
use opentelemetry::time::now;
1919
use tokio::net::TcpListener;
2020

2121
static HANDLER_ALL: Lazy<[KeyValue; 1]> = Lazy::new(|| [KeyValue::new("handler", "all")]);
@@ -25,7 +25,7 @@ async fn serve_req(
2525
state: Arc<AppState>,
2626
) -> Result<Response<Full<Bytes>>, hyper::Error> {
2727
println!("Receiving request at path {}", req.uri());
28-
let request_start = SystemTime::now();
28+
let request_start = now();
2929

3030
state.http_counter.add(1, HANDLER_ALL.as_ref());
3131

opentelemetry-sdk/CHANGELOG.md

+33
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,39 @@ let processor = BatchLogProcessor::builder(exporter)
337337
.build();
338338
```
339339

340+
- *Breaking*: The `BatchSpanProcessor` no longer supports configuration of `max_export_timeout`
341+
or the `OTEL_BLRP_EXPORT_TIMEOUT` environment variable. Timeout handling is now the
342+
responsibility of the exporter.
343+
For example, in the OTLP Span exporter, the export timeout can be configured using:
344+
- The environment variables `OTEL_EXPORTER_OTLP_TIMEOUT` or `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`.
345+
- The opentelemetry_otlp API, via `.with_tonic().with_timeout()` or `.with_http().with_timeout()`.
346+
Before:
347+
```rust
348+
let processor = BatchSpanProcessor::builder(exporter)
349+
.with_batch_config(
350+
BatchConfigBuilder::default()
351+
.with_max_queue_size(2048)
352+
.with_max_export_batch_size(512)
353+
.with_scheduled_delay(Duration::from_secs(5))
354+
.with_max_export_timeout(Duration::from_secs(30)) // Previously configurable
355+
.build(),
356+
)
357+
.build();
358+
```
359+
360+
After:
361+
```rust
362+
let processor = BatchSpanProcessor::builder(exporter)
363+
.with_batch_config(
364+
BatchConfigBuilder::default()
365+
.with_max_queue_size(2048)
366+
.with_max_export_batch_size(512)
367+
.with_scheduled_delay(Duration::from_secs(5)) // No `max_export_timeout`
368+
.build(),
369+
)
370+
.build();
371+
```
372+
340373
## 0.27.1
341374

342375
Released 2024-Nov-27

opentelemetry-sdk/benches/batch_span_processor.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
2+
use opentelemetry::time::now;
23
use opentelemetry::trace::{
34
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
45
};
@@ -8,7 +9,6 @@ use opentelemetry_sdk::trace::{
89
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
910
};
1011
use std::sync::Arc;
11-
use std::time::SystemTime;
1212
use tokio::runtime::Runtime;
1313

1414
fn get_span_data() -> Vec<SpanData> {
@@ -24,8 +24,8 @@ fn get_span_data() -> Vec<SpanData> {
2424
parent_span_id: SpanId::from_u64(12),
2525
span_kind: SpanKind::Client,
2626
name: Default::default(),
27-
start_time: SystemTime::now(),
28-
end_time: SystemTime::now(),
27+
start_time: now(),
28+
end_time: now(),
2929
attributes: Vec::new(),
3030
dropped_attributes_count: 0,
3131
events: SpanEvents::default(),

opentelemetry-sdk/benches/log.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ RAM: 64.0 GB
1515
| Logging_Comparable_To_Appender | 87 ns |
1616
*/
1717

18+
use opentelemetry::time::now;
1819
use std::collections::HashMap;
19-
use std::time::SystemTime;
2020

2121
use criterion::{criterion_group, criterion_main, Criterion};
2222

@@ -111,7 +111,7 @@ fn logging_comparable_to_appender(c: &mut Criterion) {
111111
c.bench_function("Logging_Comparable_To_Appender", |b| {
112112
b.iter(|| {
113113
let mut log_record = logger.create_log_record();
114-
let now = SystemTime::now();
114+
let now = now();
115115
log_record.set_observed_timestamp(now);
116116
log_record.set_target("my-target".to_string());
117117
log_record.set_event_name("CheckoutFailed");
@@ -253,7 +253,7 @@ fn criterion_benchmark(c: &mut Criterion) {
253253
logger.emit(log_record);
254254
});
255255

256-
let now = SystemTime::now();
256+
let now = now();
257257
log_benchmark_group(c, "full-log", |logger| {
258258
let mut log_record = logger.create_log_record();
259259
log_record.set_body("full log".into());

opentelemetry-sdk/benches/log_exporter.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
| LogExporterWithoutFuture | 92 ns |
1111
*/
1212

13+
use opentelemetry::time::now;
1314
use std::sync::Mutex;
14-
use std::time::SystemTime;
1515

1616
use async_trait::async_trait;
1717
use criterion::{criterion_group, criterion_main, Criterion};
@@ -126,7 +126,7 @@ fn exporter_with_future(c: &mut Criterion) {
126126
c.bench_function("LogExporterWithFuture", |b| {
127127
b.iter(|| {
128128
let mut log_record = logger.create_log_record();
129-
let now = SystemTime::now();
129+
let now = now();
130130
log_record.set_observed_timestamp(now);
131131
log_record.set_target("my-target".to_string());
132132
log_record.set_event_name("CheckoutFailed");
@@ -152,7 +152,7 @@ fn exporter_without_future(c: &mut Criterion) {
152152
c.bench_function("LogExporterWithoutFuture", |b| {
153153
b.iter(|| {
154154
let mut log_record = logger.create_log_record();
155-
let now = SystemTime::now();
155+
let now = now();
156156
log_record.set_observed_timestamp(now);
157157
log_record.set_target("my-target".to_string());
158158
log_record.set_event_name("CheckoutFailed");

opentelemetry-sdk/benches/log_processor.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
| log_clone_and_send_to_channel_processor | 403 ns |
1212
*/
1313

14+
use opentelemetry::time::now;
1415
use std::{
1516
sync::{Arc, Mutex},
1617
thread::sleep,
17-
time::SystemTime,
1818
};
1919

2020
use criterion::{criterion_group, criterion_main, Criterion};
@@ -29,7 +29,7 @@ use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, Logger, Logger
2929

3030
fn create_log_record(logger: &Logger) -> LogRecord {
3131
let mut log_record = logger.create_log_record();
32-
let now = SystemTime::now();
32+
let now = now();
3333
log_record.set_observed_timestamp(now);
3434
log_record.set_target("my-target".to_string());
3535
log_record.set_event_name("CheckoutFailed");

opentelemetry-sdk/src/logs/log_processor.rs

+20-6
Original file line numberDiff line numberDiff line change
@@ -228,19 +228,33 @@ type LogsData = Box<(LogRecord, InstrumentationScope)>;
228228
/// individually. It uses a **dedicated background thread** to manage and export logs
229229
/// asynchronously, ensuring that the application's main execution flow is not blocked.
230230
///
231-
/// - This processor supports the following configurations:
232-
/// - **Queue size**: Maximum number of log records that can be buffered.
233-
/// - **Batch size**: Maximum number of log records to include in a single export.
234-
/// - **Scheduled delay**: Frequency at which the batch is exported.
231+
/// This processor supports the following configurations:
232+
/// - **Queue size**: Maximum number of log records that can be buffered.
233+
/// - **Batch size**: Maximum number of log records to include in a single export.
234+
/// - **Scheduled delay**: Frequency at which the batch is exported.
235235
///
236236
/// When using this processor with the OTLP Exporter, the following exporter
237237
/// features are supported:
238-
/// - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio
239-
/// runtime.
238+
/// - `grpc-tonic`: Requires `LoggerProvider` to be created within a tokio runtime.
240239
/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
241240
///
242241
/// In other words, other clients like `reqwest` and `hyper` are not supported.
243242
///
243+
/// `BatchLogProcessor` buffers logs in memory and exports them in batches. An
244+
/// export is triggered when `max_export_batch_size` is reached or every
245+
/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
246+
/// the `force_flush` method. Shutdown also triggers an export of all buffered
247+
/// logs and is recommended to be called before the application exits to ensure
248+
/// all buffered logs are exported.
249+
///
250+
/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
251+
/// is a blocking call ,should not be called from your main thread. This can
252+
/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
253+
/// tokio's `spawn_blocking`.
254+
///
255+
/// [`shutdown()`]: crate::logs::LoggerProvider::shutdown
256+
/// [`force_flush()`]: crate::logs::LoggerProvider::force_flush
257+
///
244258
/// ### Using a BatchLogProcessor:
245259
///
246260
/// ```rust

opentelemetry-sdk/src/metrics/periodic_reader.rs

+40-15
Original file line numberDiff line numberDiff line change
@@ -91,25 +91,50 @@ where
9191
}
9292
}
9393

94-
/// A [MetricReader] that continuously collects and exports metrics at a set
95-
/// interval.
94+
/// A `MetricReader` that periodically collects and exports metrics at a configurable interval.
9695
///
97-
/// By default, `PeriodicReader` will collect and export metrics every 60
98-
/// seconds. The export time is not counted towards the interval between
99-
/// attempts. `PeriodicReader` itself does not enforce a timeout. Instead, the
100-
/// timeout is passed on to the configured exporter for each export attempt.
96+
/// By default, [`PeriodicReader`] collects and exports metrics every **60 seconds**.
97+
/// The time taken for export is **not** included in the interval. Use [`PeriodicReaderBuilder`]
98+
/// to customize the interval.
10199
///
102-
/// `PeriodicReader` spawns a background thread to handle the periodic
103-
/// collection and export of metrics. The background thread will continue to run
104-
/// until `shutdown()` is called.
100+
/// [`PeriodicReader`] spawns a background thread to handle metric collection and export.
101+
/// This thread remains active until [`shutdown()`] is called.
105102
///
106-
/// When using this reader with the OTLP Exporter, the following exporter
107-
/// features are supported:
108-
/// - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio
109-
/// runtime.
110-
/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
103+
/// ## Collection Process
104+
/// "Collection" refers to gathering aggregated metrics from the SDK's internal storage.
105+
/// During this phase, callbacks from observable instruments are also triggered.
111106
///
112-
/// In other words, other clients like `reqwest` and `hyper` are not supported.
107+
/// [`PeriodicReader`] does **not** enforce a timeout for collection. If an
108+
/// observable callback takes too long, it may delay the next collection cycle.
109+
/// If a callback never returns, it **will stall** all metric collection (and exports)
110+
/// indefinitely.
111+
///
112+
/// ## Exporter Compatibility
113+
/// When used with the [`OTLP Exporter`](https://docs.rs/opentelemetry-otlp), the following
114+
/// transport options are supported:
115+
///
116+
/// - **`grpc-tonic`**: Requires [`MeterProvider`] to be initialized within a `tokio` runtime.
117+
/// - **`reqwest-blocking-client`**: Works with both a standard (`main`) function and `tokio::main`.
118+
///
119+
/// [`PeriodicReader`] does **not** enforce a timeout for exports either. Instead,
120+
/// the configured exporter is responsible for enforcing timeouts. If an export operation
121+
/// never returns, [`PeriodicReader`] will **stop exporting new metrics**, stalling
122+
/// metric collection.
123+
///
124+
/// ## Manual Export & Shutdown
125+
/// Users can manually trigger an export via [`force_flush()`]. Calling [`shutdown()`]
126+
/// exports any remaining metrics and should be done before application exit to ensure
127+
/// all data is sent.
128+
///
129+
/// **Warning**: If using **tokio’s current-thread runtime**, calling [`shutdown()`]
130+
/// from the main thread may cause a deadlock. To prevent this, call [`shutdown()`]
131+
/// from a separate thread or use tokio's `spawn_blocking`.
132+
///
133+
/// [`PeriodicReader`]: crate::metrics::PeriodicReader
134+
/// [`PeriodicReaderBuilder`]: crate::metrics::PeriodicReaderBuilder
135+
/// [`MeterProvider`]: crate::metrics::SdkMeterProvider
136+
/// [`shutdown()`]: crate::metrics::SdkMeterProvider::shutdown
137+
/// [`force_flush()`]: crate::metrics::SdkMeterProvider::force_flush
113138
///
114139
/// # Example
115140
///

opentelemetry-sdk/src/trace/span_processor.rs

+37-15
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ use crate::trace::ExportResult;
217217
/// .with_max_queue_size(1024) // Buffer up to 1024 spans.
218218
/// .with_max_export_batch_size(256) // Export in batches of up to 256 spans.
219219
/// .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds.
220-
/// .with_max_export_timeout(Duration::from_secs(10)) // Timeout after 10 seconds.
221220
/// .build(),
222221
/// )
223222
/// .build();
@@ -253,7 +252,38 @@ enum BatchMessage {
253252
SetResource(Arc<Resource>),
254253
}
255254

256-
/// A batch span processor with a dedicated background thread.
255+
/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
256+
/// in batches to the configured `SpanExporter`. This processor is ideal for
257+
/// high-throughput environments, as it minimizes the overhead of exporting spans
258+
/// individually. It uses a **dedicated background thread** to manage and export spans
259+
/// asynchronously, ensuring that the application's main execution flow is not blocked.
260+
///
261+
/// This processor supports the following configurations:
262+
/// - **Queue size**: Maximum number of spans that can be buffered.
263+
/// - **Batch size**: Maximum number of spans to include in a single export.
264+
/// - **Scheduled delay**: Frequency at which the batch is exported.
265+
///
266+
/// When using this processor with the OTLP Exporter, the following exporter
267+
/// features are supported:
268+
/// - `grpc-tonic`: Requires `TracerProvider` to be created within a tokio runtime.
269+
/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.
270+
///
271+
/// In other words, other clients like `reqwest` and `hyper` are not supported.
272+
///
273+
/// `BatchSpanProcessor` buffers spans in memory and exports them in batches. An
274+
/// export is triggered when `max_export_batch_size` is reached or every
275+
/// `scheduled_delay` milliseconds. Users can explicitly trigger an export using
276+
/// the `force_flush` method. Shutdown also triggers an export of all buffered
277+
/// spans and is recommended to be called before the application exits to ensure
278+
/// all buffered spans are exported.
279+
///
280+
/// **Warning**: When using tokio's current-thread runtime, `shutdown()`, which
281+
/// is a blocking call ,should not be called from your main thread. This can
282+
/// cause deadlock. Instead, call `shutdown()` from a separate thread or use
283+
/// tokio's `spawn_blocking`.
284+
///
285+
/// [`shutdown()`]: crate::trace::TracerProvider::shutdown
286+
/// [`force_flush()`]: crate::trace::TracerProvider::force_flush
257287
#[derive(Debug)]
258288
pub struct BatchSpanProcessor {
259289
span_sender: SyncSender<SpanData>, // Data channel to store spans
@@ -443,20 +473,14 @@ impl BatchSpanProcessor {
443473
}
444474

445475
let count_of_spans = spans.len(); // Count of spans that will be exported
446-
let result = Self::export_with_timeout_sync(
447-
config.max_export_timeout,
448-
exporter,
449-
spans,
450-
last_export_time,
451-
); // This method clears the spans vec after exporting
476+
let result = Self::export_batch_sync(exporter, spans, last_export_time); // This method clears the spans vec after exporting
452477

453478
current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
454479
result
455480
}
456481

457482
#[allow(clippy::vec_box)]
458-
fn export_with_timeout_sync<E>(
459-
_: Duration, // TODO, enforcing timeout in exporter.
483+
fn export_batch_sync<E>(
460484
exporter: &mut E,
461485
batch: &mut Vec<SpanData>,
462486
last_export_time: &mut Instant,
@@ -740,6 +764,7 @@ impl BatchConfigBuilder {
740764
/// Set max_export_timeout for [`BatchConfigBuilder`].
741765
/// It's the maximum duration to export a batch of data.
742766
/// The The default value is 30000 milliseconds.
767+
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
743768
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
744769
self.max_export_timeout = max_export_timeout;
745770
self
@@ -960,10 +985,11 @@ mod tests {
960985
let batch = BatchConfigBuilder::default()
961986
.with_max_export_batch_size(10)
962987
.with_scheduled_delay(Duration::from_millis(10))
963-
.with_max_export_timeout(Duration::from_millis(10))
964988
.with_max_queue_size(10);
965989
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
966990
let batch = batch.with_max_concurrent_exports(10);
991+
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
992+
let batch = batch.with_max_export_timeout(Duration::from_millis(10));
967993
let batch = batch.build();
968994
assert_eq!(batch.max_export_batch_size, 10);
969995
assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
@@ -1037,7 +1063,6 @@ mod tests {
10371063
.with_max_queue_size(10)
10381064
.with_max_export_batch_size(10)
10391065
.with_scheduled_delay(Duration::from_secs(5))
1040-
.with_max_export_timeout(Duration::from_secs(2))
10411066
.build();
10421067
let processor = BatchSpanProcessor::new(exporter, config);
10431068

@@ -1060,7 +1085,6 @@ mod tests {
10601085
.with_max_queue_size(10)
10611086
.with_max_export_batch_size(10)
10621087
.with_scheduled_delay(Duration::from_secs(5))
1063-
.with_max_export_timeout(Duration::from_secs(2))
10641088
.build();
10651089
let processor = BatchSpanProcessor::new(exporter, config);
10661090

@@ -1090,7 +1114,6 @@ mod tests {
10901114
.with_max_queue_size(10)
10911115
.with_max_export_batch_size(10)
10921116
.with_scheduled_delay(Duration::from_secs(5))
1093-
.with_max_export_timeout(Duration::from_secs(2))
10941117
.build();
10951118
let processor = BatchSpanProcessor::new(exporter, config);
10961119

@@ -1126,7 +1149,6 @@ mod tests {
11261149
let config = BatchConfigBuilder::default()
11271150
.with_max_queue_size(2) // Small queue size to test span dropping
11281151
.with_scheduled_delay(Duration::from_secs(5))
1129-
.with_max_export_timeout(Duration::from_secs(2))
11301152
.build();
11311153
let processor = BatchSpanProcessor::new(exporter, config);
11321154

0 commit comments

Comments
 (0)