From 0369dbea5f9115f8434b4a15834da657969fc40c Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 10 Mar 2025 16:30:28 -0700 Subject: [PATCH 1/4] feat: Add experimental concurrent processor for logs --- opentelemetry-sdk/Cargo.toml | 6 ++ opentelemetry-sdk/benches/log_enabled.rs | 81 +++++++++++++++++++ .../src/logs/concurrent_log_processor.rs | 53 ++++++++++++ opentelemetry-sdk/src/logs/mod.rs | 4 + .../src/logs/simple_log_processor.rs | 17 +++- stress/Cargo.toml | 2 +- stress/src/logs.rs | 44 ++++------ 7 files changed, 175 insertions(+), 32 deletions(-) create mode 100644 opentelemetry-sdk/benches/log_enabled.rs create mode 100644 opentelemetry-sdk/src/logs/concurrent_log_processor.rs diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index ab7c13ba19..9ae4dcc433 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -56,6 +56,7 @@ internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] experimental_logs_batch_log_processor_with_async_runtime = ["logs"] +experimental_logs_concurrent_log_processor = ["logs"] experimental_trace_batch_span_processor_with_async_runtime = ["trace"] experimental_metrics_disable_name_validation = ["metrics"] @@ -88,6 +89,11 @@ harness = false name = "log_processor" harness = false +[[bench]] +name = "log_enabled" +harness = false +required-features = ["spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"] + [[bench]] name = "tracer_creation" harness = false diff --git a/opentelemetry-sdk/benches/log_enabled.rs b/opentelemetry-sdk/benches/log_enabled.rs new file mode 100644 index 0000000000..dc41262c5e --- /dev/null +++ b/opentelemetry-sdk/benches/log_enabled.rs @@ -0,0 +1,81 @@ +/* + The benchmark results: + criterion = "0.5.1" + Hardware: Apple M4 Pro + Total Number of Cores:   14 (10 performance and 4 efficiency) + | Test | Average time| + |---------------------------------------------|-------------| + | exporter_disabled_concurrent_processor | 1.9 ns | + | exporter_disabled_simple_processor | 5.0 ns | +*/ + +use criterion::{criterion_group, criterion_main, Criterion}; +use opentelemetry::logs::{Logger, LoggerProvider}; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::logs::concurrent_log_processor::ConcurrentExportProcessor; +use opentelemetry_sdk::logs::{ + LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor, +}; +use opentelemetry_sdk::Resource; +#[cfg(not(target_os = "windows"))] +use pprof::criterion::{Output, PProfProfiler}; + +#[derive(Debug)] +struct NoopExporter; +impl LogExporter for NoopExporter { + async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + Ok(()) + } + + fn event_enabled( + &self, + _level: opentelemetry::logs::Severity, + _target: &str, + _name: Option<&str>, + ) -> bool { + false + } + + fn set_resource(&mut self, _: &Resource) {} +} + +fn benchmark_exporter_enabled_false(c: &mut Criterion, name: &str, processor: T) +where + T: LogProcessor + Send + Sync + 'static, +{ + let provider = SdkLoggerProvider::builder() + .with_log_processor(processor) + .build(); + let logger = provider.logger("test_logger"); + + c.bench_function(name, |b| { + b.iter(|| { + logger.event_enabled(opentelemetry::logs::Severity::Debug, "target", Some("name")); + }); + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let processor = ConcurrentExportProcessor::new(NoopExporter); + benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor); + let simple = SimpleLogProcessor::new(NoopExporter); + benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple); +} + +#[cfg(not(target_os = "windows"))] +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = criterion_benchmark +} +#[cfg(target_os = "windows")] +criterion_group! { + name = benches; + config = Criterion::default(); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs new file mode 100644 index 0000000000..f9ed0ad6f8 --- /dev/null +++ b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs @@ -0,0 +1,53 @@ +use opentelemetry::InstrumentationScope; + +use crate::error::OTelSdkResult; + +use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord}; + +/// A concurrent log processor calls exporter's export method on each emit. This +/// processor does not buffer logs. Note: This invokes exporter's export method +/// on the current thread without synchronization. i.e multiple export() calls +/// can happen simultaneously from different threads. This is not a problem if +/// the exporter is designed to handle that. As of now, exporters in the +/// opentelemetry-rust project (stdout/otlp) are not thread-safe. +/// This is intended to be used when exporting to operating system +/// tracing facilities like Windows ETW, Linux TracePoints etc. +#[derive(Debug)] +pub struct ConcurrentExportProcessor { + exporter: T, +} + +impl ConcurrentExportProcessor { + /// Creates a new `ConcurrentExportProcessor` with the given exporter. + pub fn new(exporter: T) -> Self { + Self { exporter } + } +} + +impl LogProcessor for ConcurrentExportProcessor { + fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { + let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; + let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); + } + + fn force_flush(&self) -> OTelSdkResult { + // TODO: invoke flush on exporter + // once https://github.com/open-telemetry/opentelemetry-rust/issues/2261 + // is resolved + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + self.exporter.shutdown() + } + + #[cfg(feature = "spec_unstable_logs_enabled")] + fn event_enabled( + &self, + level: opentelemetry::logs::Severity, + target: &str, + name: Option<&str>, + ) -> bool { + self.exporter.event_enabled(level, target, name) + } +} diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 0da96bb730..42219cfb26 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -27,6 +27,10 @@ pub use logger_provider::{LoggerProviderBuilder, SdkLoggerProvider}; pub use record::{SdkLogRecord, TraceContext}; pub use simple_log_processor::SimpleLogProcessor; +#[cfg(feature = "experimental_logs_concurrent_log_processor")] +/// Module for ConcurrentLogProcessor. +pub mod concurrent_log_processor; + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] /// Module for BatchLogProcessor with async runtime. pub mod log_processor_with_async_runtime; diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 3ff9a59287..f7094752d2 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -65,7 +65,8 @@ pub struct SimpleLogProcessor { } impl SimpleLogProcessor { - pub(crate) fn new(exporter: T) -> Self { + /// Creates a new instance of `SimpleLogProcessor`. + pub fn new(exporter: T) -> Self { SimpleLogProcessor { exporter: Mutex::new(exporter), is_shutdown: AtomicBool::new(false), @@ -131,6 +132,20 @@ impl LogProcessor for SimpleLogProcessor { exporter.set_resource(resource); } } + + #[cfg(feature = "spec_unstable_logs_enabled")] + fn event_enabled( + &self, + level: opentelemetry::logs::Severity, + target: &str, + name: Option<&str>, + ) -> bool { + if let Ok(exporter) = self.exporter.lock() { + exporter.event_enabled(level, target, name) + } else { + true + } + } } #[cfg(all(test, feature = "testing", feature = "logs"))] diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 8cc4e7e70a..1a0e41470e 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -44,7 +44,7 @@ ctrlc = { workspace = true } lazy_static = { workspace = true } num_cpus = { workspace = true } opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] } -opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] } +opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"] } opentelemetry-appender-tracing = { workspace = true, features = ["spec_unstable_logs_enabled"] } rand = { workspace = true, features = ["small_rng", "os_rng"] } tracing = { workspace = true, features = ["std"]} diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 1f57892380..53d95fa5ff 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -13,44 +13,29 @@ ~50 M/sec ~1.1 B/sec (when disabled) */ - -use opentelemetry::InstrumentationScope; use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::logs::concurrent_log_processor::ConcurrentExportProcessor; +use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; -use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}; +use opentelemetry_sdk::Resource; use tracing::error; use tracing_subscriber::prelude::*; mod throughput; -#[derive(Debug, Clone)] -struct MockLogExporter; - -impl LogExporter for MockLogExporter { - async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult { - Ok(()) - } -} - #[derive(Debug)] -pub struct MockLogProcessor { - exporter: MockLogExporter, +struct NoopExporter { enabled: bool, } - -impl LogProcessor for MockLogProcessor { - fn emit( - &self, - record: &mut opentelemetry_sdk::logs::SdkLogRecord, - scope: &InstrumentationScope, - ) { - let log_tuple = &[(record as &SdkLogRecord, scope)]; - let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); +impl NoopExporter { + fn new(enabled: bool) -> Self { + Self { enabled } } - - fn force_flush(&self) -> OTelSdkResult { +} +impl LogExporter for NoopExporter { + async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult { Ok(()) } @@ -66,18 +51,17 @@ impl LogProcessor for MockLogProcessor { ) -> bool { self.enabled } + + fn set_resource(&mut self, _: &Resource) {} } fn main() { // change this to false to test the throughput when enabled is false. - let enabled = true; + let enabled = false; // LoggerProvider with a no-op processor. let provider: SdkLoggerProvider = SdkLoggerProvider::builder() - .with_log_processor(MockLogProcessor { - exporter: MockLogExporter {}, - enabled, - }) + .with_log_processor(ConcurrentExportProcessor::new(NoopExporter::new(enabled))) .build(); // Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing. From 58715387dd0bdbc9def67b540bdbac5b8a739b53 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 10 Mar 2025 16:37:55 -0700 Subject: [PATCH 2/4] Share stress test result for simple --- stress/src/logs.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 53d95fa5ff..93fd28f1e9 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -12,6 +12,10 @@ Total Number of Cores: 14 (10 performance and 4 efficiency) ~50 M/sec ~1.1 B/sec (when disabled) + + With existing SimpleLogProcessor: + 3 M/sec (when enabled) (.with_log_processor(SimpleLogProcessor::new(NoopExporter::new(true)))) + 26 M/sec (when disabled) (.with_log_processor(SimpleLogProcessor::new(NoopExporter::new(false))) */ use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::error::OTelSdkResult; @@ -57,7 +61,7 @@ impl LogExporter for NoopExporter { fn main() { // change this to false to test the throughput when enabled is false. - let enabled = false; + let enabled = true; // LoggerProvider with a no-op processor. let provider: SdkLoggerProvider = SdkLoggerProvider::builder() From a5ef65d8d32be5bf35bfb0e19e9b7bbfa0fc6523 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 10 Mar 2025 16:53:04 -0700 Subject: [PATCH 3/4] use name SimpleConcurrentProcessor --- opentelemetry-sdk/benches/log_enabled.rs | 4 ++-- opentelemetry-sdk/src/logs/concurrent_log_processor.rs | 6 +++--- stress/src/logs.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/opentelemetry-sdk/benches/log_enabled.rs b/opentelemetry-sdk/benches/log_enabled.rs index dc41262c5e..3d0eae9a7a 100644 --- a/opentelemetry-sdk/benches/log_enabled.rs +++ b/opentelemetry-sdk/benches/log_enabled.rs @@ -12,7 +12,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{Logger, LoggerProvider}; use opentelemetry_sdk::error::OTelSdkResult; -use opentelemetry_sdk::logs::concurrent_log_processor::ConcurrentExportProcessor; +use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentProcessor; use opentelemetry_sdk::logs::{ LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor, }; @@ -60,7 +60,7 @@ where } fn criterion_benchmark(c: &mut Criterion) { - let processor = ConcurrentExportProcessor::new(NoopExporter); + let processor = SimpleConcurrentProcessor::new(NoopExporter); benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor); let simple = SimpleLogProcessor::new(NoopExporter); benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple); diff --git a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs index f9ed0ad6f8..5ac10f50ff 100644 --- a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs +++ b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs @@ -13,18 +13,18 @@ use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord}; /// This is intended to be used when exporting to operating system /// tracing facilities like Windows ETW, Linux TracePoints etc. #[derive(Debug)] -pub struct ConcurrentExportProcessor { +pub struct SimpleConcurrentProcessor { exporter: T, } -impl ConcurrentExportProcessor { +impl SimpleConcurrentProcessor { /// Creates a new `ConcurrentExportProcessor` with the given exporter. pub fn new(exporter: T) -> Self { Self { exporter } } } -impl LogProcessor for ConcurrentExportProcessor { +impl LogProcessor for SimpleConcurrentProcessor { fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 93fd28f1e9..d939ec44d5 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -19,7 +19,7 @@ */ use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::error::OTelSdkResult; -use opentelemetry_sdk::logs::concurrent_log_processor::ConcurrentExportProcessor; +use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentProcessor; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; @@ -65,7 +65,7 @@ fn main() { // LoggerProvider with a no-op processor. let provider: SdkLoggerProvider = SdkLoggerProvider::builder() - .with_log_processor(ConcurrentExportProcessor::new(NoopExporter::new(enabled))) + .with_log_processor(SimpleConcurrentProcessor::new(NoopExporter::new(enabled))) .build(); // Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing. From 3fb7d490733ee55fe313c41fae7ca7ab9a3969a8 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 10 Mar 2025 18:30:44 -0700 Subject: [PATCH 4/4] rename and handle error --- opentelemetry-sdk/benches/log_enabled.rs | 4 ++-- .../src/logs/concurrent_log_processor.rs | 16 +++++++++++----- stress/src/logs.rs | 6 ++++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/benches/log_enabled.rs b/opentelemetry-sdk/benches/log_enabled.rs index 3d0eae9a7a..be1cba8081 100644 --- a/opentelemetry-sdk/benches/log_enabled.rs +++ b/opentelemetry-sdk/benches/log_enabled.rs @@ -12,7 +12,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{Logger, LoggerProvider}; use opentelemetry_sdk::error::OTelSdkResult; -use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentProcessor; +use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor; use opentelemetry_sdk::logs::{ LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor, }; @@ -60,7 +60,7 @@ where } fn criterion_benchmark(c: &mut Criterion) { - let processor = SimpleConcurrentProcessor::new(NoopExporter); + let processor = SimpleConcurrentLogProcessor::new(NoopExporter); benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor); let simple = SimpleLogProcessor::new(NoopExporter); benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple); diff --git a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs index 5ac10f50ff..bca2cec3f3 100644 --- a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs +++ b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs @@ -1,4 +1,4 @@ -use opentelemetry::InstrumentationScope; +use opentelemetry::{otel_info, InstrumentationScope}; use crate::error::OTelSdkResult; @@ -13,21 +13,27 @@ use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord}; /// This is intended to be used when exporting to operating system /// tracing facilities like Windows ETW, Linux TracePoints etc. #[derive(Debug)] -pub struct SimpleConcurrentProcessor { +pub struct SimpleConcurrentLogProcessor { exporter: T, } -impl SimpleConcurrentProcessor { +impl SimpleConcurrentLogProcessor { /// Creates a new `ConcurrentExportProcessor` with the given exporter. pub fn new(exporter: T) -> Self { Self { exporter } } } -impl LogProcessor for SimpleConcurrentProcessor { +impl LogProcessor for SimpleConcurrentLogProcessor { fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; - let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); + let result = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); + if let Err(err) = result { + otel_info!( + name: "SimpleConcurrentLogProcessor.Emit.ExportError", + error = format!("{}",err) + ); + } } fn force_flush(&self) -> OTelSdkResult { diff --git a/stress/src/logs.rs b/stress/src/logs.rs index d939ec44d5..c4efea9733 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -19,7 +19,7 @@ */ use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::error::OTelSdkResult; -use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentProcessor; +use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; @@ -65,7 +65,9 @@ fn main() { // LoggerProvider with a no-op processor. let provider: SdkLoggerProvider = SdkLoggerProvider::builder() - .with_log_processor(SimpleConcurrentProcessor::new(NoopExporter::new(enabled))) + .with_log_processor(SimpleConcurrentLogProcessor::new(NoopExporter::new( + enabled, + ))) .build(); // Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing.