diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index ab7c13ba19..9ae4dcc433 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -56,6 +56,7 @@ internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] experimental_logs_batch_log_processor_with_async_runtime = ["logs"] +experimental_logs_concurrent_log_processor = ["logs"] experimental_trace_batch_span_processor_with_async_runtime = ["trace"] experimental_metrics_disable_name_validation = ["metrics"] @@ -88,6 +89,11 @@ harness = false name = "log_processor" harness = false +[[bench]] +name = "log_enabled" +harness = false +required-features = ["spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"] + [[bench]] name = "tracer_creation" harness = false diff --git a/opentelemetry-sdk/benches/log_enabled.rs b/opentelemetry-sdk/benches/log_enabled.rs new file mode 100644 index 0000000000..be1cba8081 --- /dev/null +++ b/opentelemetry-sdk/benches/log_enabled.rs @@ -0,0 +1,81 @@ +/* + The benchmark results: + criterion = "0.5.1" + Hardware: Apple M4 Pro + Total Number of Cores:   14 (10 performance and 4 efficiency) + | Test | Average time| + |---------------------------------------------|-------------| + | exporter_disabled_concurrent_processor | 1.9 ns | + | exporter_disabled_simple_processor | 5.0 ns | +*/ + +use criterion::{criterion_group, criterion_main, Criterion}; +use opentelemetry::logs::{Logger, LoggerProvider}; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor; +use opentelemetry_sdk::logs::{ + LogBatch, LogExporter, LogProcessor, SdkLoggerProvider, SimpleLogProcessor, +}; +use opentelemetry_sdk::Resource; +#[cfg(not(target_os = "windows"))] +use pprof::criterion::{Output, PProfProfiler}; + +#[derive(Debug)] +struct NoopExporter; +impl LogExporter for NoopExporter { + async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + Ok(()) + } + + fn event_enabled( + &self, + _level: opentelemetry::logs::Severity, + _target: &str, + _name: Option<&str>, + ) -> bool { + false + } + + fn set_resource(&mut self, _: &Resource) {} +} + +fn benchmark_exporter_enabled_false(c: &mut Criterion, name: &str, processor: T) +where + T: LogProcessor + Send + Sync + 'static, +{ + let provider = SdkLoggerProvider::builder() + .with_log_processor(processor) + .build(); + let logger = provider.logger("test_logger"); + + c.bench_function(name, |b| { + b.iter(|| { + logger.event_enabled(opentelemetry::logs::Severity::Debug, "target", Some("name")); + }); + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let processor = SimpleConcurrentLogProcessor::new(NoopExporter); + benchmark_exporter_enabled_false(c, "exporter_disabled_concurrent_processor", processor); + let simple = SimpleLogProcessor::new(NoopExporter); + benchmark_exporter_enabled_false(c, "exporter_disabled_simple_processor", simple); +} + +#[cfg(not(target_os = "windows"))] +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = criterion_benchmark +} +#[cfg(target_os = "windows")] +criterion_group! { + name = benches; + config = Criterion::default(); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/opentelemetry-sdk/src/logs/concurrent_log_processor.rs b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs new file mode 100644 index 0000000000..bca2cec3f3 --- /dev/null +++ b/opentelemetry-sdk/src/logs/concurrent_log_processor.rs @@ -0,0 +1,59 @@ +use opentelemetry::{otel_info, InstrumentationScope}; + +use crate::error::OTelSdkResult; + +use super::{LogBatch, LogExporter, LogProcessor, SdkLogRecord}; + +/// A concurrent log processor calls exporter's export method on each emit. This +/// processor does not buffer logs. Note: This invokes exporter's export method +/// on the current thread without synchronization. i.e multiple export() calls +/// can happen simultaneously from different threads. This is not a problem if +/// the exporter is designed to handle that. As of now, exporters in the +/// opentelemetry-rust project (stdout/otlp) are not thread-safe. +/// This is intended to be used when exporting to operating system +/// tracing facilities like Windows ETW, Linux TracePoints etc. +#[derive(Debug)] +pub struct SimpleConcurrentLogProcessor { + exporter: T, +} + +impl SimpleConcurrentLogProcessor { + /// Creates a new `ConcurrentExportProcessor` with the given exporter. + pub fn new(exporter: T) -> Self { + Self { exporter } + } +} + +impl LogProcessor for SimpleConcurrentLogProcessor { + fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { + let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; + let result = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); + if let Err(err) = result { + otel_info!( + name: "SimpleConcurrentLogProcessor.Emit.ExportError", + error = format!("{}",err) + ); + } + } + + fn force_flush(&self) -> OTelSdkResult { + // TODO: invoke flush on exporter + // once https://github.com/open-telemetry/opentelemetry-rust/issues/2261 + // is resolved + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + self.exporter.shutdown() + } + + #[cfg(feature = "spec_unstable_logs_enabled")] + fn event_enabled( + &self, + level: opentelemetry::logs::Severity, + target: &str, + name: Option<&str>, + ) -> bool { + self.exporter.event_enabled(level, target, name) + } +} diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 0da96bb730..42219cfb26 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -27,6 +27,10 @@ pub use logger_provider::{LoggerProviderBuilder, SdkLoggerProvider}; pub use record::{SdkLogRecord, TraceContext}; pub use simple_log_processor::SimpleLogProcessor; +#[cfg(feature = "experimental_logs_concurrent_log_processor")] +/// Module for ConcurrentLogProcessor. +pub mod concurrent_log_processor; + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] /// Module for BatchLogProcessor with async runtime. pub mod log_processor_with_async_runtime; diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 3ff9a59287..f7094752d2 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -65,7 +65,8 @@ pub struct SimpleLogProcessor { } impl SimpleLogProcessor { - pub(crate) fn new(exporter: T) -> Self { + /// Creates a new instance of `SimpleLogProcessor`. + pub fn new(exporter: T) -> Self { SimpleLogProcessor { exporter: Mutex::new(exporter), is_shutdown: AtomicBool::new(false), @@ -131,6 +132,20 @@ impl LogProcessor for SimpleLogProcessor { exporter.set_resource(resource); } } + + #[cfg(feature = "spec_unstable_logs_enabled")] + fn event_enabled( + &self, + level: opentelemetry::logs::Severity, + target: &str, + name: Option<&str>, + ) -> bool { + if let Ok(exporter) = self.exporter.lock() { + exporter.event_enabled(level, target, name) + } else { + true + } + } } #[cfg(all(test, feature = "testing", feature = "logs"))] diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 8cc4e7e70a..1a0e41470e 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -44,7 +44,7 @@ ctrlc = { workspace = true } lazy_static = { workspace = true } num_cpus = { workspace = true } opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] } -opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled"] } +opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "spec_unstable_logs_enabled", "experimental_logs_concurrent_log_processor"] } opentelemetry-appender-tracing = { workspace = true, features = ["spec_unstable_logs_enabled"] } rand = { workspace = true, features = ["small_rng", "os_rng"] } tracing = { workspace = true, features = ["std"]} diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 1f57892380..c4efea9733 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -12,45 +12,34 @@ Total Number of Cores: 14 (10 performance and 4 efficiency) ~50 M/sec ~1.1 B/sec (when disabled) -*/ -use opentelemetry::InstrumentationScope; + With existing SimpleLogProcessor: + 3 M/sec (when enabled) (.with_log_processor(SimpleLogProcessor::new(NoopExporter::new(true)))) + 26 M/sec (when disabled) (.with_log_processor(SimpleLogProcessor::new(NoopExporter::new(false))) +*/ use opentelemetry_appender_tracing::layer; use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::logs::concurrent_log_processor::SimpleConcurrentLogProcessor; +use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; -use opentelemetry_sdk::logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}; +use opentelemetry_sdk::Resource; use tracing::error; use tracing_subscriber::prelude::*; mod throughput; -#[derive(Debug, Clone)] -struct MockLogExporter; - -impl LogExporter for MockLogExporter { - async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult { - Ok(()) - } -} - #[derive(Debug)] -pub struct MockLogProcessor { - exporter: MockLogExporter, +struct NoopExporter { enabled: bool, } - -impl LogProcessor for MockLogProcessor { - fn emit( - &self, - record: &mut opentelemetry_sdk::logs::SdkLogRecord, - scope: &InstrumentationScope, - ) { - let log_tuple = &[(record as &SdkLogRecord, scope)]; - let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))); +impl NoopExporter { + fn new(enabled: bool) -> Self { + Self { enabled } } - - fn force_flush(&self) -> OTelSdkResult { +} +impl LogExporter for NoopExporter { + async fn export(&self, _: LogBatch<'_>) -> OTelSdkResult { Ok(()) } @@ -66,6 +55,8 @@ impl LogProcessor for MockLogProcessor { ) -> bool { self.enabled } + + fn set_resource(&mut self, _: &Resource) {} } fn main() { @@ -74,10 +65,9 @@ fn main() { // LoggerProvider with a no-op processor. let provider: SdkLoggerProvider = SdkLoggerProvider::builder() - .with_log_processor(MockLogProcessor { - exporter: MockLogExporter {}, + .with_log_processor(SimpleConcurrentLogProcessor::new(NoopExporter::new( enabled, - }) + ))) .build(); // Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing.