Skip to content

Commit 2660047

Browse files
authored
Consolidate BatchConfig creation and validation via BatchConfigBuilder (#1480)
1 parent 27b19b6 commit 2660047

File tree

8 files changed

+320
-204
lines changed

8 files changed

+320
-204
lines changed

opentelemetry-jaeger/src/exporter/config/agent.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,13 @@ impl AgentPipeline {
237237
/// # Examples
238238
/// Set max queue size.
239239
/// ```rust
240-
/// use opentelemetry_sdk::trace::BatchConfig;
240+
/// use opentelemetry_sdk::trace::BatchConfigBuilder;
241241
///
242242
/// let pipeline = opentelemetry_jaeger::new_agent_pipeline()
243243
/// .with_batch_processor_config(
244-
/// BatchConfig::default().with_max_queue_size(200)
244+
/// BatchConfigBuilder::default()
245+
/// .with_max_queue_size(200)
246+
/// .build()
245247
/// );
246248
///
247249
/// ```

opentelemetry-jaeger/src/exporter/config/collector/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -380,11 +380,13 @@ impl CollectorPipeline {
380380
/// # Examples
381381
/// Set max queue size.
382382
/// ```rust
383-
/// use opentelemetry_sdk::trace::BatchConfig;
383+
/// use opentelemetry_sdk::trace::BatchConfigBuilder;
384384
///
385385
/// let pipeline = opentelemetry_jaeger::new_collector_pipeline()
386386
/// .with_batch_processor_config(
387-
/// BatchConfig::default().with_max_queue_size(200)
387+
/// BatchConfigBuilder::default()
388+
/// .with_max_queue_size(200)
389+
/// .build()
388390
/// );
389391
///
390392
/// ```

opentelemetry-sdk/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
- **Breaking** Remove `TextMapCompositePropagator` [#1373](https://github.com/open-telemetry/opentelemetry-rust/pull/1373). Use `TextMapCompositePropagator` in opentelemetry API.
3030

3131
- [#1375](https://github.com/open-telemetry/opentelemetry-rust/pull/1375/) Fix metric collections during PeriodicReader shutdown
32+
- **Breaking** [#1480](https://github.com/open-telemetry/opentelemetry-rust/pull/1480) Remove fine grained `BatchConfig` configurations from `BatchLogProcessorBuilder` and `BatchSpanProcessorBuilder`. Use `BatchConfigBuilder` to construct a `BatchConfig` instance and pass it using `BatchLogProcessorBuilder::with_batch_config` or `BatchSpanProcessorBuilder::with_batch_config`.
33+
- **Breaking** [#1480](https://github.com/open-telemetry/opentelemetry-rust/pull/1480) Remove mutating functions from `BatchConfig`, use `BatchConfigBuilder` to construct a `BatchConfig` instance.
3234

3335
## v0.21.2
3436

opentelemetry-sdk/benches/batch_span_processor.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use opentelemetry::trace::{
55
use opentelemetry_sdk::export::trace::SpanData;
66
use opentelemetry_sdk::runtime::Tokio;
77
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
8-
use opentelemetry_sdk::trace::{BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor};
8+
use opentelemetry_sdk::trace::{
9+
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
10+
};
911
use opentelemetry_sdk::Resource;
1012
use std::borrow::Cow;
1113
use std::sync::Arc;
@@ -52,7 +54,11 @@ fn criterion_benchmark(c: &mut Criterion) {
5254
rt.block_on(async move {
5355
let span_processor =
5456
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
55-
.with_max_queue_size(10_000)
57+
.with_batch_config(
58+
BatchConfigBuilder::default()
59+
.with_max_queue_size(10_000)
60+
.build(),
61+
)
5662
.build();
5763
let mut shared_span_processor = Arc::new(span_processor);
5864
let mut handles = Vec::with_capacity(10);

opentelemetry-sdk/src/logs/log_processor.rs

+85-90
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use opentelemetry::{
1313
global,
1414
logs::{LogError, LogResult},
1515
};
16-
use std::{env, sync::Mutex};
16+
use std::{cmp::min, env, sync::Mutex};
1717
use std::{
1818
fmt::{self, Debug, Formatter},
1919
str::FromStr,
@@ -246,7 +246,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
246246
{
247247
BatchLogProcessorBuilder {
248248
exporter,
249-
config: BatchConfig::default(),
249+
config: Default::default(),
250250
runtime,
251251
}
252252
}
@@ -276,7 +276,8 @@ where
276276
}
277277
}
278278

279-
/// Batch log processor configuration
279+
/// Batch log processor configuration.
280+
/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
280281
#[derive(Debug)]
281282
pub struct BatchConfig {
282283
/// The maximum queue size to buffer logs for delayed processing. If the
@@ -299,55 +300,36 @@ pub struct BatchConfig {
299300

300301
impl Default for BatchConfig {
301302
fn default() -> Self {
302-
let mut config = BatchConfig {
303+
BatchConfigBuilder::default().build()
304+
}
305+
}
306+
307+
/// A builder for creating [`BatchConfig`] instances.
308+
#[derive(Debug)]
309+
pub struct BatchConfigBuilder {
310+
max_queue_size: usize,
311+
scheduled_delay: Duration,
312+
max_export_batch_size: usize,
313+
max_export_timeout: Duration,
314+
}
315+
316+
impl Default for BatchConfigBuilder {
317+
/// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
318+
/// The values are overriden by environment variables if set.
319+
/// For a list of supported environment variables see [Batch LogRecord Processor](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/configuration/sdk-environment-variables.md#batch-logrecord-processor).
320+
fn default() -> Self {
321+
BatchConfigBuilder {
303322
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
304323
scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
305324
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
306325
max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
307-
};
308-
309-
if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
310-
.ok()
311-
.and_then(|queue_size| usize::from_str(&queue_size).ok())
312-
{
313-
config.max_queue_size = max_queue_size;
314-
}
315-
316-
if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
317-
.ok()
318-
.and_then(|batch_size| usize::from_str(&batch_size).ok())
319-
{
320-
config.max_export_batch_size = max_export_batch_size;
321326
}
322-
323-
// max export batch size must be less or equal to max queue size.
324-
// we set max export batch size to max queue size if it's larger than max queue size.
325-
if config.max_export_batch_size > config.max_queue_size {
326-
config.max_export_batch_size = config.max_queue_size;
327-
}
328-
329-
if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
330-
.ok()
331-
.or_else(|| env::var("OTEL_BLRP_SCHEDULE_DELAY_MILLIS").ok())
332-
.and_then(|delay| u64::from_str(&delay).ok())
333-
{
334-
config.scheduled_delay = Duration::from_millis(scheduled_delay);
335-
}
336-
337-
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
338-
.ok()
339-
.or_else(|| env::var("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS").ok())
340-
.and_then(|s| u64::from_str(&s).ok())
341-
{
342-
config.max_export_timeout = Duration::from_millis(max_export_timeout);
343-
}
344-
345-
config
327+
.init_from_env_vars()
346328
}
347329
}
348330

349-
impl BatchConfig {
350-
/// Set max_queue_size for [`BatchConfig`].
331+
impl BatchConfigBuilder {
332+
/// Set max_queue_size for [`BatchConfigBuilder`].
351333
/// It's the maximum queue size to buffer logs for delayed processing.
352334
/// If the queue gets full it will drop the logs.
353335
/// The default value of is 2048.
@@ -356,23 +338,23 @@ impl BatchConfig {
356338
self
357339
}
358340

359-
/// Set scheduled_delay for [`BatchConfig`].
341+
/// Set scheduled_delay for [`BatchConfigBuilder`].
360342
/// It's the delay interval in milliseconds between two consecutive processing of batches.
361343
/// The default value is 1000 milliseconds.
362344
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
363345
self.scheduled_delay = scheduled_delay;
364346
self
365347
}
366348

367-
/// Set max_export_timeout for [`BatchConfig`].
349+
/// Set max_export_timeout for [`BatchConfigBuilder`].
368350
/// It's the maximum duration to export a batch of data.
369351
/// The default value is 30000 milliseconds.
370352
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
371353
self.max_export_timeout = max_export_timeout;
372354
self
373355
}
374356

375-
/// Set max_export_batch_size for [`BatchConfig`].
357+
/// Set max_export_batch_size for [`BatchConfigBuilder`].
376358
/// It's the maximum number of logs to process in a single batch. If there are
377359
/// more than one batch worth of logs then it processes multiple batches
378360
/// of logs one batch after the other without any delay.
@@ -381,6 +363,55 @@ impl BatchConfig {
381363
self.max_export_batch_size = max_export_batch_size;
382364
self
383365
}
366+
367+
/// Builds a `BatchConfig` enforcing the following invariants:
368+
/// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
369+
pub fn build(self) -> BatchConfig {
370+
// max export batch size must be less or equal to max queue size.
371+
// we set max export batch size to max queue size if it's larger than max queue size.
372+
let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
373+
374+
BatchConfig {
375+
max_queue_size: self.max_queue_size,
376+
scheduled_delay: self.scheduled_delay,
377+
max_export_timeout: self.max_export_timeout,
378+
max_export_batch_size,
379+
}
380+
}
381+
382+
fn init_from_env_vars(mut self) -> Self {
383+
if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
384+
.ok()
385+
.and_then(|queue_size| usize::from_str(&queue_size).ok())
386+
{
387+
self.max_queue_size = max_queue_size;
388+
}
389+
390+
if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
391+
.ok()
392+
.and_then(|batch_size| usize::from_str(&batch_size).ok())
393+
{
394+
self.max_export_batch_size = max_export_batch_size;
395+
}
396+
397+
if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
398+
.ok()
399+
.or_else(|| env::var("OTEL_BLRP_SCHEDULE_DELAY_MILLIS").ok())
400+
.and_then(|delay| u64::from_str(&delay).ok())
401+
{
402+
self.scheduled_delay = Duration::from_millis(scheduled_delay);
403+
}
404+
405+
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
406+
.ok()
407+
.or_else(|| env::var("OTEL_BLRP_EXPORT_TIMEOUT_MILLIS").ok())
408+
.and_then(|s| u64::from_str(&s).ok())
409+
{
410+
self.max_export_timeout = Duration::from_millis(max_export_timeout);
411+
}
412+
413+
self
414+
}
384415
}
385416

386417
/// A builder for creating [`BatchLogProcessor`] instances.
@@ -397,44 +428,6 @@ where
397428
E: LogExporter + 'static,
398429
R: RuntimeChannel,
399430
{
400-
/// Set max queue size for batches
401-
pub fn with_max_queue_size(self, size: usize) -> Self {
402-
let mut config = self.config;
403-
config.max_queue_size = size;
404-
405-
BatchLogProcessorBuilder { config, ..self }
406-
}
407-
408-
/// Set scheduled delay for batches
409-
pub fn with_scheduled_delay(self, delay: Duration) -> Self {
410-
let mut config = self.config;
411-
config.scheduled_delay = delay;
412-
413-
BatchLogProcessorBuilder { config, ..self }
414-
}
415-
416-
/// Set max timeout for exporting.
417-
pub fn with_max_timeout(self, timeout: Duration) -> Self {
418-
let mut config = self.config;
419-
config.max_export_timeout = timeout;
420-
421-
BatchLogProcessorBuilder { config, ..self }
422-
}
423-
424-
/// Set max export size for batches, should always less than or equals to max queue size.
425-
///
426-
/// If input is larger than max queue size, will lower it to be equal to max queue size
427-
pub fn with_max_export_batch_size(self, size: usize) -> Self {
428-
let mut config = self.config;
429-
if size > config.max_queue_size {
430-
config.max_export_batch_size = config.max_queue_size;
431-
} else {
432-
config.max_export_batch_size = size;
433-
}
434-
435-
BatchLogProcessorBuilder { config, ..self }
436-
}
437-
438431
/// Set the BatchConfig for [`BatchLogProcessorBuilder`]
439432
pub fn with_batch_config(self, config: BatchConfig) -> Self {
440433
BatchLogProcessorBuilder { config, ..self }
@@ -471,7 +464,7 @@ mod tests {
471464
OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
472465
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
473466
},
474-
BatchConfig,
467+
BatchConfig, BatchConfigBuilder,
475468
},
476469
runtime,
477470
testing::logs::InMemoryLogsExporter,
@@ -590,11 +583,12 @@ mod tests {
590583

591584
#[test]
592585
fn test_batch_config_with_fields() {
593-
let batch = BatchConfig::default()
586+
let batch = BatchConfigBuilder::default()
594587
.with_max_export_batch_size(1)
595588
.with_scheduled_delay(Duration::from_millis(2))
596589
.with_max_export_timeout(Duration::from_millis(3))
597-
.with_max_queue_size(4);
590+
.with_max_queue_size(4)
591+
.build();
598592

599593
assert_eq!(batch.max_export_batch_size, 1);
600594
assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
@@ -640,11 +634,12 @@ mod tests {
640634

641635
#[test]
642636
fn test_build_batch_log_processor_builder_with_custom_config() {
643-
let expected = BatchConfig::default()
637+
let expected = BatchConfigBuilder::default()
644638
.with_max_export_batch_size(1)
645639
.with_scheduled_delay(Duration::from_millis(2))
646640
.with_max_export_timeout(Duration::from_millis(3))
647-
.with_max_queue_size(4);
641+
.with_max_queue_size(4)
642+
.build();
648643

649644
let builder = BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio)
650645
.with_batch_config(expected);

opentelemetry-sdk/src/logs/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ mod log_processor;
77
pub use config::{config, Config};
88
pub use log_emitter::{Builder, Logger, LoggerProvider};
99
pub use log_processor::{
10-
BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor,
10+
BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor,
11+
SimpleLogProcessor,
1112
};
1213

1314
#[cfg(all(test, feature = "testing"))]

opentelemetry-sdk/src/trace/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ pub use sampler::{Sampler, ShouldSample};
2626
pub use span::Span;
2727
pub use span_limit::SpanLimits;
2828
pub use span_processor::{
29-
BatchConfig, BatchSpanProcessor, BatchSpanProcessorBuilder, SimpleSpanProcessor, SpanProcessor,
29+
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
30+
SimpleSpanProcessor, SpanProcessor,
3031
};
3132
pub use tracer::Tracer;
3233

0 commit comments

Comments
 (0)