From 25002df466a1266ea4c135036172208457c6fbae Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Mon, 16 Dec 2024 14:35:57 -0800 Subject: [PATCH 01/11] Handle batch log processing in a dedicated background thread --- opentelemetry-sdk/src/logs/log_emitter.rs | 3 +- opentelemetry-sdk/src/logs/log_processor.rs | 293 ++++++++++++-------- 2 files changed, 179 insertions(+), 117 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 18f0fbb228..d723b5b12d 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -197,9 +197,8 @@ impl Builder { pub fn with_batch_exporter( self, exporter: T, - runtime: R, ) -> Self { - let batch = BatchLogProcessor::builder(exporter, runtime).build(); + let batch = BatchLogProcessor::builder(exporter).build(); self.with_log_processor(batch) } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 0c95eae1e8..7f11a43424 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,14 +1,10 @@ use crate::{ export::logs::{ExportResult, LogBatch, LogExporter}, logs::{LogError, LogRecord, LogResult}, - runtime::{RuntimeChannel, TrySend}, Resource, }; -use futures_channel::oneshot; -use futures_util::{ - future::{self, Either}, - {pin_mut, stream, StreamExt as _}, -}; +use std::sync::mpsc::{self, SyncSender, RecvTimeoutError}; + #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; @@ -19,7 +15,9 @@ use std::{ fmt::{self, Debug, Formatter}, str::FromStr, sync::Arc, + thread, time::Duration, + time::Instant, }; /// Delay interval between two consecutive exports. @@ -39,6 +37,12 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size. const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +/// Default timeout for forceflush and shutdown. +const OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT: Duration = Duration::from_secs(1); + +/// environment variable name for forceflush and shutdown timeout. +const OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME: &str = "OTEL_LOG_EXPORT_INTERVAL"; + /// The interface for plugging into a [`Logger`]. /// /// [`Logger`]: crate::logs::Logger @@ -152,8 +156,12 @@ impl LogProcessor for SimpleLogProcessor { /// A [`LogProcessor`] that asynchronously buffers log records and reports /// them at a pre-configured interval. -pub struct BatchLogProcessor { - message_sender: R::Sender, +pub struct BatchLogProcessor { + message_sender: SyncSender, + handle: Mutex>>, + forceflush_timeout: Duration, + shutdown_timeout: Duration, + is_shutdown: AtomicBool, // Track dropped logs - we'll log this at shutdown dropped_logs_count: AtomicUsize, @@ -162,7 +170,7 @@ pub struct BatchLogProcessor { max_queue_size: usize, } -impl Debug for BatchLogProcessor { +impl Debug for BatchLogProcessor { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("BatchLogProcessor") .field("message_sender", &self.message_sender) @@ -170,8 +178,17 @@ impl Debug for BatchLogProcessor { } } -impl LogProcessor for BatchLogProcessor { +impl LogProcessor for BatchLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { + // noop after shutdown + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "BatchLogProcessor.Emit.ProcessorShutdown", + message = "BatchLogProcessor has been shutdown. No further logs will be emitted." + ); + return; + } + let result = self.message_sender.try_send(BatchMessage::ExportLog(( record.clone(), instrumentation.clone(), @@ -189,17 +206,36 @@ impl LogProcessor for BatchLogProcessor { } fn force_flush(&self) -> LogResult<()> { - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) + if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "BatchLogProcessor.ForceFlush.ProcessorShutdown", + message = "BatchLogProcessor has been shutdown. No further logs will be emitted." + ); + return LogResult::Err(LogError::Other("BatchLogProcessor is already shutdown".into())); + } + let (sender, receiver) = mpsc::sync_channel(1); + self.message_sender.try_send(BatchMessage::ForceFlush(sender)) .map_err(|err| LogError::Other(err.into()))?; - futures_executor::block_on(res_receiver) - .map_err(|err| LogError::Other(err.into())) - .and_then(std::convert::identity) + receiver.recv_timeout(self.forceflush_timeout).map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.forceflush_timeout) + } else { + LogError::Other(err.into()) + } + })? } fn shutdown(&self) -> LogResult<()> { + // test and set is_shutdown flag if it is not set + if self.is_shutdown.swap(true, std::sync::atomic::Ordering::Relaxed) { + otel_warn!( + name: "BatchLogProcessor.Shutdown.ProcessorShutdown", + message = "BatchLogProcessor has been shutdown. No further logs will be emitted." + ); + return LogResult::Err(LogError::Other("BatchLogProcessor is already shutdown".into())); + } + let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; if dropped_logs > 0 { @@ -211,14 +247,36 @@ impl LogProcessor for BatchLogProcessor { ); } - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Shutdown(res_sender)) + let (sender, receiver) = mpsc::sync_channel(1); + self.message_sender.try_send(BatchMessage::Shutdown(sender)) .map_err(|err| LogError::Other(err.into()))?; - futures_executor::block_on(res_receiver) - .map_err(|err| LogError::Other(err.into())) - .and_then(std::convert::identity) + receiver.recv_timeout(self.shutdown_timeout) + .map(|_| { + // join the background thread after receiving back the shutdown signal + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.join().unwrap(); + } + LogResult::Ok(()) + }) + .map_err(|err| { + match err { + RecvTimeoutError::Timeout => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Timeout", + message = "BatchLogProcessor shutdown timed out." + ); + LogError::ExportTimedOut(self.shutdown_timeout) + } + _ => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Error", + error = format!("{}", err) + ); + LogError::Other(err.into()) + } + } + })? } fn set_resource(&self, resource: &Resource) { @@ -229,115 +287,127 @@ impl LogProcessor for BatchLogProcessor { } } -impl BatchLogProcessor { - pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { +impl BatchLogProcessor { + pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { let (message_sender, message_receiver) = - runtime.batch_message_channel(config.max_queue_size); - let inner_runtime = runtime.clone(); + mpsc::sync_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; - // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { - // Timer will take a reference to the current runtime, so its important we do this within the - // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = inner_runtime.clone(); + let handle = thread::spawn(move || { + let mut last_export_time = Instant::now(); let mut logs = Vec::new(); - let mut messages = Box::pin(stream::select(message_receiver, ticker)); + logs.reserve(config.max_export_batch_size); + + loop { + let remaining_time_option = config.scheduled_delay.checked_sub(last_export_time.elapsed()); + let remaining_time = match remaining_time_option { + Some(remaining_time) => remaining_time, + None => config.scheduled_delay, + }; - while let Some(message) = messages.next().await { - match message { - // Log has finished, add to buffer of pending logs. - BatchMessage::ExportLog(log) => { + match message_receiver.recv_timeout(remaining_time) { + Ok(BatchMessage::ExportLog(log)) => { logs.push(log); - if logs.len() == config.max_export_batch_size { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Err(err) = result { - otel_error!( - name: "BatchLogProcessor.Export.Error", - error = format!("{}", err) - ); - } + if logs.len() == config.max_export_batch_size || last_export_time.elapsed() >= config.scheduled_delay { + let _ = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); } } - // Log batch interval time reached or a force flush has been invoked, export current spans. - BatchMessage::Flush(res_channel) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - if let Some(channel) = res_channel { - if let Err(send_error) = channel.send(result) { - otel_debug!( - name: "BatchLogProcessor.Flush.SendResultError", - error = format!("{:?}", send_error), - ); - } - } + Ok(BatchMessage::ForceFlush(sender)) => { + let result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + let _ = sender.send(result); } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - logs.split_off(0), - ) - .await; - - exporter.shutdown(); - - if let Err(send_error) = ch.send(result) { - otel_debug!( - name: "BatchLogProcessor.Shutdown.SendResultError", - error = format!("{:?}", send_error), - ); - } + Ok(BatchMessage::Shutdown(sender)) => { + let result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + let _ = sender.send(result); + + // + // break out the loop and return from the current background thread. + // break; } - // propagate the resource - BatchMessage::SetResource(resource) => { + Ok(BatchMessage::SetResource(resource)) => { exporter.set_resource(&resource); } + Err(RecvTimeoutError::Timeout) => { + // FIXME handle result + let _result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + } + Err(err) => { + otel_error!( + name: "BatchLogProcessor.ReceiveError", + error = format!("{}", err) + ); + }, } } - })); + }); + + + let forceflush_timeout = env::var(OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME) + .ok() + .and_then(|s| u64::from_str(&s).ok()) + .map(Duration::from_secs) + .unwrap_or(OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT); // Return batch processor with link to worker BatchLogProcessor { message_sender, + handle: Mutex::new(Some(handle)), + forceflush_timeout, + shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable + is_shutdown: AtomicBool::new(false), dropped_logs_count: AtomicUsize::new(0), max_queue_size, } } /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchLogProcessorBuilder + pub fn builder(exporter: E) -> BatchLogProcessorBuilder where E: LogExporter, { BatchLogProcessorBuilder { exporter, config: Default::default(), - runtime, } } } +fn export_with_timeout_sync( + _: Duration, // TODO, enforcing timeout in exporter. + exporter: &mut E, + batch: Vec<(LogRecord, InstrumentationScope)>, + last_export_time: &mut Instant, +) -> ExportResult +where + E: LogExporter + ?Sized, +{ + *last_export_time = Instant::now(); + + if batch.is_empty() { + return LogResult::Ok(()); + } + + let log_vec: Vec<(&LogRecord, &InstrumentationScope)> = batch + .iter() + .map(|log_data| (&log_data.0, &log_data.1)) + .collect(); + let export = exporter.export(LogBatch::new(log_vec.as_slice())); + let export_result = futures_executor::block_on(export); + + match export_result { + Ok(__) => LogResult::Ok(()), + Err(err) => { + otel_error!( + name: "BatchLogProcessor.ExportError", + error = format!("{}", err) + ); + LogResult::Err(err) + } + } +} + +/* async fn export_with_timeout( time_out: Duration, exporter: &mut E, @@ -366,6 +436,7 @@ where Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), } } +*/ /// Batch log processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. @@ -510,16 +581,14 @@ impl BatchConfigBuilder { /// A builder for creating [`BatchLogProcessor`] instances. /// #[derive(Debug)] -pub struct BatchLogProcessorBuilder { +pub struct BatchLogProcessorBuilder { exporter: E, config: BatchConfig, - runtime: R, } -impl BatchLogProcessorBuilder +impl BatchLogProcessorBuilder where E: LogExporter + 'static, - R: RuntimeChannel, { /// Set the BatchConfig for [`BatchLogProcessorBuilder`] pub fn with_batch_config(self, config: BatchConfig) -> Self { @@ -527,8 +596,8 @@ where } /// Build a batch processor - pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime) + pub fn build(self) -> BatchLogProcessor { + BatchLogProcessor::new(Box::new(self.exporter), self.config) } } @@ -540,9 +609,11 @@ enum BatchMessage { ExportLog((LogRecord, InstrumentationScope)), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. - Flush(Option>), + // Flush(Option>), + /// ForceFlush flushes the current buffer to the backend. + ForceFlush(mpsc::SyncSender), /// Shut down the worker thread, push all logs in buffer to the backend. - Shutdown(oneshot::Sender), + Shutdown(mpsc::SyncSender), /// Set the resource for the exporter. SetResource(Arc), } @@ -565,7 +636,6 @@ mod tests { }, BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor, }, - runtime, testing::logs::InMemoryLogExporter, Resource, }; @@ -713,7 +783,7 @@ mod tests { ]; temp_env::with_vars(env_vars.clone(), || { let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); + BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -734,7 +804,7 @@ mod tests { temp_env::with_vars(env_vars, || { let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio); + BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); }); @@ -749,7 +819,7 @@ mod tests { .with_max_queue_size(4) .build(); - let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio) + let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()) .with_batch_config(expected); let actual = &builder.config; @@ -790,7 +860,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); let provider = LoggerProvider::builder() .with_log_processor(processor) @@ -806,7 +875,6 @@ mod tests { .build(), ) .build(); - tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking? assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); let _ = provider.shutdown(); } @@ -821,7 +889,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); let mut record = LogRecord::default(); @@ -866,7 +933,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); // @@ -881,7 +947,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::TokioCurrentThread, ); processor.shutdown().unwrap(); @@ -893,7 +958,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::Tokio, ); processor.shutdown().unwrap(); @@ -905,7 +969,6 @@ mod tests { let processor = BatchLogProcessor::new( Box::new(exporter.clone()), BatchConfig::default(), - runtime::TokioCurrentThread, ); processor.shutdown().unwrap(); From 2008fde83083f88e47fba6a0866b6261d783d897 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Mon, 16 Dec 2024 17:21:50 -0800 Subject: [PATCH 02/11] Box LogRecord and InstrumentationScope into heap --- opentelemetry-sdk/src/logs/log_processor.rs | 64 ++++++++------------- 1 file changed, 24 insertions(+), 40 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 7f11a43424..9f567aa7f1 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -37,12 +37,6 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size. const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; -/// Default timeout for forceflush and shutdown. -const OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT: Duration = Duration::from_secs(1); - -/// environment variable name for forceflush and shutdown timeout. -const OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME: &str = "OTEL_LOG_EXPORT_INTERVAL"; - /// The interface for plugging into a [`Logger`]. /// /// [`Logger`]: crate::logs::Logger @@ -154,6 +148,23 @@ impl LogProcessor for SimpleLogProcessor { } } +/// Messages sent between application thread and batch log processor's work thread. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessage { + /// Export logs, usually called when the log is emitted. + ExportLog(Box<(LogRecord, InstrumentationScope)>), + /// Flush the current buffer to the backend, it can be triggered by + /// pre configured interval or a call to `force_push` function. + // Flush(Option>), + /// ForceFlush flushes the current buffer to the backend. + ForceFlush(mpsc::SyncSender), + /// Shut down the worker thread, push all logs in buffer to the backend. + Shutdown(mpsc::SyncSender), + /// Set the resource for the exporter. + SetResource(Arc), +} + /// A [`LogProcessor`] that asynchronously buffers log records and reports /// them at a pre-configured interval. pub struct BatchLogProcessor { @@ -189,10 +200,8 @@ impl LogProcessor for BatchLogProcessor { return; } - let result = self.message_sender.try_send(BatchMessage::ExportLog(( - record.clone(), - instrumentation.clone(), - ))); + let result = self.message_sender. + try_send(BatchMessage::ExportLog(Box::new((record.clone(), instrumentation.clone())))); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { @@ -264,7 +273,7 @@ impl LogProcessor for BatchLogProcessor { RecvTimeoutError::Timeout => { otel_error!( name: "BatchLogProcessor.Shutdown.Timeout", - message = "BatchLogProcessor shutdown timed out." + message = "BatchLogProcessor shutdown timing out." ); LogError::ExportTimedOut(self.shutdown_timeout) } @@ -329,8 +338,7 @@ impl BatchLogProcessor { exporter.set_resource(&resource); } Err(RecvTimeoutError::Timeout) => { - // FIXME handle result - let _result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + let _ = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); } Err(err) => { otel_error!( @@ -342,19 +350,12 @@ impl BatchLogProcessor { } }); - - let forceflush_timeout = env::var(OTEL_LOGS_FORCEFLUSH_TIMEOUT_NAME) - .ok() - .and_then(|s| u64::from_str(&s).ok()) - .map(Duration::from_secs) - .unwrap_or(OTEL_LOGS_DEFAULT_FORCEFLUSH_TIMEOUT); - // Return batch processor with link to worker BatchLogProcessor { message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout, - shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable + forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable + shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable is_shutdown: AtomicBool::new(false), dropped_logs_count: AtomicUsize::new(0), max_queue_size, @@ -376,7 +377,7 @@ impl BatchLogProcessor { fn export_with_timeout_sync( _: Duration, // TODO, enforcing timeout in exporter. exporter: &mut E, - batch: Vec<(LogRecord, InstrumentationScope)>, + batch: Vec>, last_export_time: &mut Instant, ) -> ExportResult where @@ -601,23 +602,6 @@ where } } -/// Messages sent between application thread and batch log processor's work thread. -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum BatchMessage { - /// Export logs, usually called when the log is emitted. - ExportLog((LogRecord, InstrumentationScope)), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - // Flush(Option>), - /// ForceFlush flushes the current buffer to the backend. - ForceFlush(mpsc::SyncSender), - /// Shut down the worker thread, push all logs in buffer to the backend. - Shutdown(mpsc::SyncSender), - /// Set the resource for the exporter. - SetResource(Arc), -} - #[cfg(all(test, feature = "testing", feature = "logs"))] mod tests { use super::{ From 752ea93530c028d6c69e4ac9baf839f018e9fd76 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 11:10:11 -0800 Subject: [PATCH 03/11] Move previous BatchLogProcessor to a feature flag and rename it --- .../examples/logs-basic.rs | 2 +- opentelemetry-appender-tracing/src/layer.rs | 2 +- .../examples/basic-otlp-http/src/main.rs | 2 +- .../examples/basic-otlp/src/main.rs | 2 +- opentelemetry-sdk/Cargo.toml | 1 + opentelemetry-sdk/src/logs/log_emitter.rs | 4 +- opentelemetry-sdk/src/logs/log_processor.rs | 291 +++++++++++++++++- opentelemetry-sdk/src/logs/mod.rs | 5 + 8 files changed, 290 insertions(+), 19 deletions(-) diff --git a/opentelemetry-appender-log/examples/logs-basic.rs b/opentelemetry-appender-log/examples/logs-basic.rs index dc5bacc813..c2fda41b52 100644 --- a/opentelemetry-appender-log/examples/logs-basic.rs +++ b/opentelemetry-appender-log/examples/logs-basic.rs @@ -16,7 +16,7 @@ async fn main() { let exporter = LogExporter::default(); //Create a LoggerProvider and register the exporter let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .with_log_processor(BatchLogProcessor::builder(exporter).build()) .build(); // Setup Log Appender for the log crate. diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 9ba43f7d65..a8354822e1 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -296,7 +296,7 @@ mod tests { async fn batch_processor_no_deadlock() { let exporter: ReentrantLogExporter = ReentrantLogExporter; let logger_provider = LoggerProvider::builder() - .with_batch_exporter(exporter.clone(), opentelemetry_sdk::runtime::Tokio) + .with_batch_exporter(exporter.clone()) .build(); let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 0d218a4b8a..2b5d878f9d 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -37,7 +37,7 @@ fn init_logs() -> Result Result { Ok(LoggerProvider::builder() .with_resource(RESOURCE.clone()) - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .build()) } diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 88bb5af6ac..2c3643dee1 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -56,6 +56,7 @@ internal-logs = ["tracing"] experimental_metrics_periodic_reader_no_runtime = ["metrics"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] +experimental_logs_batch_log_processor_with_async_runtime = ["logs"] [[bench]] name = "context" diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index d723b5b12d..e01974cbfa 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,5 +1,5 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; +use crate::{export::logs::LogExporter, Resource}; use crate::{logs::LogError, logs::LogResult}; use opentelemetry::{otel_debug, otel_info, trace::TraceContextExt, Context, InstrumentationScope}; @@ -194,7 +194,7 @@ impl Builder { } /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use. - pub fn with_batch_exporter( + pub fn with_batch_exporter( self, exporter: T, ) -> Self { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 9f567aa7f1..092bf4cf58 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -20,6 +20,16 @@ use std::{ time::Instant, }; +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +use crate::runtime::{RuntimeChannel, TrySend}; +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +use futures_channel::oneshot; +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +use futures_util::{ + future::{self, Either}, + {pin_mut, stream, StreamExt as _}, +}; + /// Delay interval between two consecutive exports. const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY"; /// Default delay interval between two consecutive exports. @@ -165,8 +175,8 @@ enum BatchMessage { SetResource(Arc), } -/// A [`LogProcessor`] that asynchronously buffers log records and reports -/// them at a pre-configured interval. +/// A [`LogProcessor`] that buffers log records and reports +/// them at a pre-configured interval from a dedicated background thread. pub struct BatchLogProcessor { message_sender: SyncSender, handle: Mutex>>, @@ -408,7 +418,238 @@ where } } -/* +/// +/// A builder for creating [`BatchLogProcessor`] instances. +/// +#[derive(Debug)] +pub struct BatchLogProcessorBuilder { + exporter: E, + config: BatchConfig, +} + +impl BatchLogProcessorBuilder +where + E: LogExporter + 'static, +{ + /// Set the BatchConfig for [`BatchLogProcessorBuilder`] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchLogProcessorBuilder { config, ..self } + } + + /// Build a batch processor + pub fn build(self) -> BatchLogProcessor { + BatchLogProcessor::new(Box::new(self.exporter), self.config) + } +} + +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessageWithAsyncRuntime { + /// Export logs, usually called when the log is emitted. + ExportLog((LogRecord, InstrumentationScope)), + /// Flush the current buffer to the backend, it can be triggered by + /// pre configured interval or a call to `force_push` function. + Flush(Option>), + /// Shut down the worker thread, push all logs in buffer to the backend. + Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), +} + +/// A [`LogProcessor`] that asynchronously buffers log records and reports +/// them at a pre-configured interval. +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +pub struct BatchLogProcessorWithAsyncRuntime { + message_sender: R::Sender, + + // Track dropped logs - we'll log this at shutdown + dropped_logs_count: AtomicUsize, + + // Track the maximum queue size that was configured for this processor + max_queue_size: usize, +} + +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl Debug for BatchLogProcessorWithAsyncRuntime { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("BatchLogProcessor") + .field("message_sender", &self.message_sender) + .finish() + } +} + +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl LogProcessor for BatchLogProcessorWithAsyncRuntime { + fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { + let result = self.message_sender.try_send(BatchMessageWithAsyncRuntime::ExportLog(( + record.clone(), + instrumentation.clone(), + ))); + + // TODO - Implement throttling to prevent error flooding when the queue is full or closed. + if result.is_err() { + // Increment dropped logs count. The first time we have to drop a log, + // emit a warning. + if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", + message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); + } + } + } + + fn force_flush(&self) -> LogResult<()> { + let (res_sender, res_receiver) = oneshot::channel(); + self.message_sender + .try_send(BatchMessageWithAsyncRuntime::Flush(Some(res_sender))) + .map_err(|err| LogError::Other(err.into()))?; + + futures_executor::block_on(res_receiver) + .map_err(|err| LogError::Other(err.into())) + .and_then(std::convert::identity) + } + + fn shutdown(&self) -> LogResult<()> { + let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; + if dropped_logs > 0 { + otel_warn!( + name: "BatchLogProcessor.LogsDropped", + dropped_logs_count = dropped_logs, + max_queue_size = max_queue_size, + message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } + + let (res_sender, res_receiver) = oneshot::channel(); + self.message_sender + .try_send(BatchMessageWithAsyncRuntime::Shutdown(res_sender)) + .map_err(|err| LogError::Other(err.into()))?; + + futures_executor::block_on(res_receiver) + .map_err(|err| LogError::Other(err.into())) + .and_then(std::convert::identity) + } + + fn set_resource(&self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessageWithAsyncRuntime::SetResource(resource)); + } +} + +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl BatchLogProcessorWithAsyncRuntime { + pub(crate) fn new(mut exporter: Box, config: BatchConfig, runtime: R) -> Self { + let (message_sender, message_receiver) = + runtime.batch_message_channel(config.max_queue_size); + let inner_runtime = runtime.clone(); + let max_queue_size = config.max_queue_size; + + // Spawn worker process via user-defined spawn function. + runtime.spawn(Box::pin(async move { + // Timer will take a reference to the current runtime, so its important we do this within the + // runtime.spawn() + let ticker = inner_runtime + .interval(config.scheduled_delay) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| BatchMessageWithAsyncRuntime::Flush(None)); + let timeout_runtime = inner_runtime.clone(); + let mut logs = Vec::new(); + let mut messages = Box::pin(stream::select(message_receiver, ticker)); + + while let Some(message) = messages.next().await { + match message { + // Log has finished, add to buffer of pending logs. + BatchMessageWithAsyncRuntime::ExportLog(log) => { + logs.push(log); + if logs.len() == config.max_export_batch_size { + let result = export_with_timeout( + config.max_export_timeout, + exporter.as_mut(), + &timeout_runtime, + logs.split_off(0), + ) + .await; + + if let Err(err) = result { + otel_error!( + name: "BatchLogProcessor.Export.Error", + error = format!("{}", err) + ); + } + } + } + // Log batch interval time reached or a force flush has been invoked, export current spans. + BatchMessageWithAsyncRuntime::Flush(res_channel) => { + let result = export_with_timeout( + config.max_export_timeout, + exporter.as_mut(), + &timeout_runtime, + logs.split_off(0), + ) + .await; + + if let Some(channel) = res_channel { + if let Err(send_error) = channel.send(result) { + otel_debug!( + name: "BatchLogProcessor.Flush.SendResultError", + error = format!("{:?}", send_error), + ); + } + } + } + // Stream has terminated or processor is shutdown, return to finish execution. + BatchMessageWithAsyncRuntime::Shutdown(ch) => { + let result = export_with_timeout( + config.max_export_timeout, + exporter.as_mut(), + &timeout_runtime, + logs.split_off(0), + ) + .await; + + exporter.shutdown(); + + if let Err(send_error) = ch.send(result) { + otel_debug!( + name: "BatchLogProcessor.Shutdown.SendResultError", + error = format!("{:?}", send_error), + ); + } + break; + } + // propagate the resource + BatchMessageWithAsyncRuntime::SetResource(resource) => { + exporter.set_resource(&resource); + } + } + } + })); + + // Return batch processor with link to worker + BatchLogProcessorWithAsyncRuntime { + message_sender, + dropped_logs_count: AtomicUsize::new(0), + max_queue_size, + } + } + + /// Create a new batch processor builder + pub fn builder(exporter: E, runtime: R) -> BatchLogProcessorWithAsyncRuntimeBuilder + where + E: LogExporter, + { + BatchLogProcessorWithAsyncRuntimeBuilder { + exporter, + config: Default::default(), + runtime, + } + } +} + +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] async fn export_with_timeout( time_out: Duration, exporter: &mut E, @@ -437,7 +678,6 @@ where Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), } } -*/ /// Batch log processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. @@ -579,26 +819,30 @@ impl BatchConfigBuilder { } } -/// A builder for creating [`BatchLogProcessor`] instances. +/// A builder for creating [`BatchLogProcessorWithAsyncRuntime`] instances. /// #[derive(Debug)] -pub struct BatchLogProcessorBuilder { +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +pub struct BatchLogProcessorWithAsyncRuntimeBuilder { exporter: E, config: BatchConfig, + runtime: R, } -impl BatchLogProcessorBuilder +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +impl BatchLogProcessorWithAsyncRuntimeBuilder where E: LogExporter + 'static, + R: RuntimeChannel, { - /// Set the BatchConfig for [`BatchLogProcessorBuilder`] + /// Set the BatchConfig for [`BatchLogProcessorWithAsyncRuntimeBuilder`] pub fn with_batch_config(self, config: BatchConfig) -> Self { - BatchLogProcessorBuilder { config, ..self } + BatchLogProcessorWithAsyncRuntimeBuilder { config, ..self } } /// Build a batch processor - pub fn build(self) -> BatchLogProcessor { - BatchLogProcessor::new(Box::new(self.exporter), self.config) + pub fn build(self) -> BatchLogProcessorWithAsyncRuntime { + BatchLogProcessorWithAsyncRuntime::new(Box::new(self.exporter), self.config, self.runtime) } } @@ -633,6 +877,11 @@ mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + use super::BatchLogProcessorWithAsyncRuntime; + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + use crate::runtime; + #[derive(Debug, Clone)] struct MockLogExporter { resource: Arc>>, @@ -911,8 +1160,7 @@ mod tests { } #[tokio::test(flavor = "current_thread")] - #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] - async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_multi_thread() { + async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new( Box::new(exporter.clone()), @@ -925,6 +1173,23 @@ mod tests { processor.shutdown().unwrap(); } + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] + #[tokio::test(flavor = "current_thread")] + #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] + async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread() { + let exporter = InMemoryLogExporterBuilder::default().build(); + let processor = BatchLogProcessorWithAsyncRuntime::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + // + // deadloack happens in shutdown with tokio current_thread runtime + // + processor.shutdown().unwrap(); + } + #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 3643f7ecf2..b1341b9e96 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -12,6 +12,11 @@ pub use log_processor::{ }; pub use record::{LogRecord, TraceContext}; +#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] +pub use log_processor::{ + BatchLogProcessorWithAsyncRuntime, BatchLogProcessorWithAsyncRuntimeBuilder, +}; + #[cfg(all(test, feature = "testing"))] mod tests { use super::*; From 0ea817a118e95831f72f3df01b4931057264cb6a Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 11:31:23 -0800 Subject: [PATCH 04/11] Add changelog and format --- opentelemetry-sdk/src/logs/log_emitter.rs | 5 +- opentelemetry-sdk/src/logs/log_processor.rs | 176 +++++++++++--------- opentelemetry/CHANGELOG.md | 52 ++++++ 3 files changed, 149 insertions(+), 84 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index e01974cbfa..479ca36dd2 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -194,10 +194,7 @@ impl Builder { } /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use. - pub fn with_batch_exporter( - self, - exporter: T, - ) -> Self { + pub fn with_batch_exporter(self, exporter: T) -> Self { let batch = BatchLogProcessor::builder(exporter).build(); self.with_log_processor(batch) } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 092bf4cf58..29da8ead6b 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -3,7 +3,7 @@ use crate::{ logs::{LogError, LogRecord, LogResult}, Resource, }; -use std::sync::mpsc::{self, SyncSender, RecvTimeoutError}; +use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; @@ -210,8 +210,12 @@ impl LogProcessor for BatchLogProcessor { return; } - let result = self.message_sender. - try_send(BatchMessage::ExportLog(Box::new((record.clone(), instrumentation.clone())))); + let result = self + .message_sender + .try_send(BatchMessage::ExportLog(Box::new(( + record.clone(), + instrumentation.clone(), + )))); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { @@ -226,33 +230,39 @@ impl LogProcessor for BatchLogProcessor { fn force_flush(&self) -> LogResult<()> { if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - otel_warn!( - name: "BatchLogProcessor.ForceFlush.ProcessorShutdown", - message = "BatchLogProcessor has been shutdown. No further logs will be emitted." - ); - return LogResult::Err(LogError::Other("BatchLogProcessor is already shutdown".into())); + return LogResult::Err(LogError::Other( + "BatchLogProcessor is already shutdown".into(), + )); } let (sender, receiver) = mpsc::sync_channel(1); - self.message_sender.try_send(BatchMessage::ForceFlush(sender)) + self.message_sender + .try_send(BatchMessage::ForceFlush(sender)) .map_err(|err| LogError::Other(err.into()))?; - receiver.recv_timeout(self.forceflush_timeout).map_err(|err| { - if err == RecvTimeoutError::Timeout { - LogError::ExportTimedOut(self.forceflush_timeout) - } else { - LogError::Other(err.into()) - } - })? + receiver + .recv_timeout(self.forceflush_timeout) + .map_err(|err| { + if err == RecvTimeoutError::Timeout { + LogError::ExportTimedOut(self.forceflush_timeout) + } else { + LogError::Other(err.into()) + } + })? } fn shutdown(&self) -> LogResult<()> { // test and set is_shutdown flag if it is not set - if self.is_shutdown.swap(true, std::sync::atomic::Ordering::Relaxed) { + if self + .is_shutdown + .swap(true, std::sync::atomic::Ordering::Relaxed) + { otel_warn!( name: "BatchLogProcessor.Shutdown.ProcessorShutdown", message = "BatchLogProcessor has been shutdown. No further logs will be emitted." ); - return LogResult::Err(LogError::Other("BatchLogProcessor is already shutdown".into())); + return LogResult::Err(LogError::AlreadyShutdown( + "BatchLogProcessor is already shutdown".into(), + )); } let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); @@ -267,10 +277,12 @@ impl LogProcessor for BatchLogProcessor { } let (sender, receiver) = mpsc::sync_channel(1); - self.message_sender.try_send(BatchMessage::Shutdown(sender)) + self.message_sender + .try_send(BatchMessage::Shutdown(sender)) .map_err(|err| LogError::Other(err.into()))?; - receiver.recv_timeout(self.shutdown_timeout) + receiver + .recv_timeout(self.shutdown_timeout) .map(|_| { // join the background thread after receiving back the shutdown signal if let Some(handle) = self.handle.lock().unwrap().take() { @@ -278,22 +290,20 @@ impl LogProcessor for BatchLogProcessor { } LogResult::Ok(()) }) - .map_err(|err| { - match err { - RecvTimeoutError::Timeout => { - otel_error!( - name: "BatchLogProcessor.Shutdown.Timeout", - message = "BatchLogProcessor shutdown timing out." - ); - LogError::ExportTimedOut(self.shutdown_timeout) - } - _ => { - otel_error!( - name: "BatchLogProcessor.Shutdown.Error", - error = format!("{}", err) - ); - LogError::Other(err.into()) - } + .map_err(|err| match err { + RecvTimeoutError::Timeout => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Timeout", + message = "BatchLogProcessor shutdown timing out." + ); + LogError::ExportTimedOut(self.shutdown_timeout) + } + _ => { + otel_error!( + name: "BatchLogProcessor.Shutdown.Error", + error = format!("{}", err) + ); + LogError::Other(err.into()) } })? } @@ -308,8 +318,7 @@ impl LogProcessor for BatchLogProcessor { impl BatchLogProcessor { pub(crate) fn new(mut exporter: Box, config: BatchConfig) -> Self { - let (message_sender, message_receiver) = - mpsc::sync_channel(config.max_queue_size); + let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); let max_queue_size = config.max_queue_size; let handle = thread::spawn(move || { @@ -318,7 +327,9 @@ impl BatchLogProcessor { logs.reserve(config.max_export_batch_size); loop { - let remaining_time_option = config.scheduled_delay.checked_sub(last_export_time.elapsed()); + let remaining_time_option = config + .scheduled_delay + .checked_sub(last_export_time.elapsed()); let remaining_time = match remaining_time_option { Some(remaining_time) => remaining_time, None => config.scheduled_delay, @@ -327,16 +338,33 @@ impl BatchLogProcessor { match message_receiver.recv_timeout(remaining_time) { Ok(BatchMessage::ExportLog(log)) => { logs.push(log); - if logs.len() == config.max_export_batch_size || last_export_time.elapsed() >= config.scheduled_delay { - let _ = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + if logs.len() == config.max_export_batch_size + || last_export_time.elapsed() >= config.scheduled_delay + { + let _ = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); } } Ok(BatchMessage::ForceFlush(sender)) => { - let result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + let result = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); let _ = sender.send(result); } Ok(BatchMessage::Shutdown(sender)) => { - let result = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + let result = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); let _ = sender.send(result); // @@ -348,14 +376,19 @@ impl BatchLogProcessor { exporter.set_resource(&resource); } Err(RecvTimeoutError::Timeout) => { - let _ = export_with_timeout_sync(remaining_time, exporter.as_mut(), logs.split_off(0), &mut last_export_time); + let _ = export_with_timeout_sync( + remaining_time, + exporter.as_mut(), + logs.split_off(0), + &mut last_export_time, + ); } Err(err) => { otel_error!( name: "BatchLogProcessor.ReceiveError", error = format!("{}", err) ); - }, + } } } }); @@ -482,10 +515,12 @@ impl Debug for BatchLogProcessorWithAsyncRuntime { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] impl LogProcessor for BatchLogProcessorWithAsyncRuntime { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) { - let result = self.message_sender.try_send(BatchMessageWithAsyncRuntime::ExportLog(( - record.clone(), - instrumentation.clone(), - ))); + let result = self + .message_sender + .try_send(BatchMessageWithAsyncRuntime::ExportLog(( + record.clone(), + instrumentation.clone(), + ))); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { @@ -1015,8 +1050,7 @@ mod tests { (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), ]; temp_env::with_vars(env_vars.clone(), || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default()); + let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -1036,8 +1070,7 @@ mod tests { env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); temp_env::with_vars(env_vars, || { - let builder = - BatchLogProcessor::builder(InMemoryLogExporter::default()); + let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); }); @@ -1052,8 +1085,8 @@ mod tests { .with_max_queue_size(4) .build(); - let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()) - .with_batch_config(expected); + let builder = + BatchLogProcessor::builder(InMemoryLogExporter::default()).with_batch_config(expected); let actual = &builder.config; assert_eq!(actual.max_export_batch_size, 1); @@ -1090,10 +1123,7 @@ mod tests { let exporter = MockLogExporter { resource: Arc::new(Mutex::new(None)), }; - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); let provider = LoggerProvider::builder() .with_log_processor(processor) .with_resource( @@ -1119,10 +1149,7 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default() .keep_records_on_shutdown() .build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); let mut record = LogRecord::default(); let instrumentation = InstrumentationScope::default(); @@ -1162,10 +1189,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); // // deadloack happens in shutdown with tokio current_thread runtime @@ -1176,7 +1200,8 @@ mod tests { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] #[tokio::test(flavor = "current_thread")] #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"] - async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread() { + async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread( + ) { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessorWithAsyncRuntime::new( Box::new(exporter.clone()), @@ -1193,10 +1218,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); processor.shutdown().unwrap(); } @@ -1204,10 +1226,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); processor.shutdown().unwrap(); } @@ -1215,10 +1234,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() { let exporter = InMemoryLogExporterBuilder::default().build(); - let processor = BatchLogProcessor::new( - Box::new(exporter.clone()), - BatchConfig::default(), - ); + let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); processor.shutdown().unwrap(); } diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index f82bbd9638..687c9c507b 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -59,6 +59,58 @@ let counter = meter.u64_counter("my_counter").build(); These changes shouldn't directly affect the users of OpenTelemetry crate, as these constructs are used in SDK and Exporters. If you are an author of an sdk component/plug-in, like an exporter etc. please use these types from sdk. Refer [CHANGELOG.md](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/CHANGELOG.md) for more details, under same version section. - **Breaking** [2291](https://github.com/open-telemetry/opentelemetry-rust/pull/2291) Rename `logs_level_enabled flag` to `spec_unstable_logs_enabled`. Please enable this updated flag if the feature is needed. This flag will be removed once the feature is stabilized in the specifications. +- **Breaking** [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436) + + `BatchLogProcessor` no longer requires an async runtime by default. Instead, a dedicated + background thread is created to do the batch processing and exporting. + + For users who prefer the previous behavior of relying on a specific + `Runtime`, they can do so by enabling the feature flag + **`experimental_logs_batch_log_processor_with_async_runtime`**. + + 1. *Default Implementation, requires no async runtime* (**Recommended**) The + new default implementation does not require a runtime argument. Replace the + builder method accordingly: + - *Before:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter).build()) + .build(); + ``` + + 2. *Async Runtime Support* + If your application cannot spin up new threads or you prefer using async + runtimes, enable the + "experimental_logs_batch_log_processor_with_async_runtime" feature flag and + adjust code as below. + + - *Before:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessorWithAsyncRuntime::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + *Requirements:* + - Enable the feature flag: + `experimental_logs_batch_log_processor_with_async_runtime`. + - Continue enabling one of the async runtime feature flags: `rt-tokio`, + `rt-tokio-current-thread`, or `rt-async-std`. + ## v0.26.0 Released 2024-Sep-30 From 5c1fd73dd6a0d724cc654f0a6397f3de08b6d7ce Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 11:33:48 -0800 Subject: [PATCH 05/11] Move changelog to SDK --- opentelemetry-sdk/CHANGELOG.md | 53 +++++++++++++++++++++++++++++++++- opentelemetry/CHANGELOG.md | 53 ---------------------------------- 2 files changed, 52 insertions(+), 54 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 06b5a76703..61991a9b75 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -95,7 +95,6 @@ - Bump msrv to 1.75.0. - - *Breaking* : [#2314](https://github.com/open-telemetry/opentelemetry-rust/pull/2314) - The LogRecord struct has been updated: - All fields are now pub(crate) instead of pub. @@ -105,6 +104,58 @@ - Upgrade the tracing crate used for internal logging to version 0.1.40 or later. This is necessary because the internal logging macros utilize the name field as metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/open-telemetry/opentelemetry-rust/pull/2418) +- **Breaking** [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436) + + `BatchLogProcessor` no longer requires an async runtime by default. Instead, a dedicated + background thread is created to do the batch processing and exporting. + + For users who prefer the previous behavior of relying on a specific + `Runtime`, they can do so by enabling the feature flag + **`experimental_logs_batch_log_processor_with_async_runtime`**. + + 1. *Default Implementation, requires no async runtime* (**Recommended**) The + new default implementation does not require a runtime argument. Replace the + builder method accordingly: + - *Before:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter).build()) + .build(); + ``` + + 2. *Async Runtime Support* + If your application cannot spin up new threads or you prefer using async + runtimes, enable the + "experimental_logs_batch_log_processor_with_async_runtime" feature flag and + adjust code as below. + + - *Before:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let logger_provider = LoggerProvider::builder() + .with_log_processor(BatchLogProcessorWithAsyncRuntime::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + *Requirements:* + - Enable the feature flag: + `experimental_logs_batch_log_processor_with_async_runtime`. + - Continue enabling one of the async runtime feature flags: `rt-tokio`, + `rt-tokio-current-thread`, or `rt-async-std`. + ## 0.27.1 Released 2024-Nov-27 diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 687c9c507b..fa94f47be1 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -59,59 +59,6 @@ let counter = meter.u64_counter("my_counter").build(); These changes shouldn't directly affect the users of OpenTelemetry crate, as these constructs are used in SDK and Exporters. If you are an author of an sdk component/plug-in, like an exporter etc. please use these types from sdk. Refer [CHANGELOG.md](https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-sdk/CHANGELOG.md) for more details, under same version section. - **Breaking** [2291](https://github.com/open-telemetry/opentelemetry-rust/pull/2291) Rename `logs_level_enabled flag` to `spec_unstable_logs_enabled`. Please enable this updated flag if the feature is needed. This flag will be removed once the feature is stabilized in the specifications. -- **Breaking** [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436) - - `BatchLogProcessor` no longer requires an async runtime by default. Instead, a dedicated - background thread is created to do the batch processing and exporting. - - For users who prefer the previous behavior of relying on a specific - `Runtime`, they can do so by enabling the feature flag - **`experimental_logs_batch_log_processor_with_async_runtime`**. - - 1. *Default Implementation, requires no async runtime* (**Recommended**) The - new default implementation does not require a runtime argument. Replace the - builder method accordingly: - - *Before:* - ```rust - let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) - .build(); - ``` - - - *After:* - ```rust - let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessor::builder(exporter).build()) - .build(); - ``` - - 2. *Async Runtime Support* - If your application cannot spin up new threads or you prefer using async - runtimes, enable the - "experimental_logs_batch_log_processor_with_async_runtime" feature flag and - adjust code as below. - - - *Before:* - ```rust - let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) - .build(); - ``` - - - *After:* - ```rust - let logger_provider = LoggerProvider::builder() - .with_log_processor(BatchLogProcessorWithAsyncRuntime::builder(exporter, runtime::Tokio).build()) - .build(); - ``` - - *Requirements:* - - Enable the feature flag: - `experimental_logs_batch_log_processor_with_async_runtime`. - - Continue enabling one of the async runtime feature flags: `rt-tokio`, - `rt-tokio-current-thread`, or `rt-async-std`. - - ## v0.26.0 Released 2024-Sep-30 From 8e676147b8632d4601289f772bf4d05a2edc5140 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 11:47:52 -0800 Subject: [PATCH 06/11] Fix CI test --- opentelemetry-sdk/src/logs/log_processor.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 29da8ead6b..dd37af09ce 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -384,8 +384,9 @@ impl BatchLogProcessor { ); } Err(err) => { + // TODO: this should not happen! Log the error and continue for now. otel_error!( - name: "BatchLogProcessor.ReceiveError", + name: "BatchLogProcessor.InternalError", error = format!("{}", err) ); } @@ -417,6 +418,7 @@ impl BatchLogProcessor { } } +#[allow(clippy::vec_box)] fn export_with_timeout_sync( _: Duration, // TODO, enforcing timeout in exporter. exporter: &mut E, @@ -440,7 +442,7 @@ where let export_result = futures_executor::block_on(export); match export_result { - Ok(__) => LogResult::Ok(()), + Ok(_) => LogResult::Ok(()), Err(err) => { otel_error!( name: "BatchLogProcessor.ExportError", @@ -717,6 +719,7 @@ where /// Batch log processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. #[derive(Debug)] +#[allow(dead_code)] pub struct BatchConfig { /// The maximum queue size to buffer logs for delayed processing. If the /// queue gets full it drops the logs. The default value of is 2048. @@ -1138,6 +1141,10 @@ mod tests { .build(), ) .build(); + + // wait for the batch processor to process the resource. + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5); let _ = provider.shutdown(); } From 8d6e116edd41426f24b851507da0f2886e5dedab Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 11:50:10 -0800 Subject: [PATCH 07/11] Remove runtime from use for log-basic --- opentelemetry-appender-log/examples/logs-basic.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/opentelemetry-appender-log/examples/logs-basic.rs b/opentelemetry-appender-log/examples/logs-basic.rs index c2fda41b52..b2daba7e97 100644 --- a/opentelemetry-appender-log/examples/logs-basic.rs +++ b/opentelemetry-appender-log/examples/logs-basic.rs @@ -7,7 +7,6 @@ use log::{error, info, warn, Level}; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_sdk::logs::{BatchLogProcessor, LoggerProvider}; -use opentelemetry_sdk::runtime; use opentelemetry_stdout::LogExporter; #[tokio::main] @@ -21,8 +20,7 @@ async fn main() { // Setup Log Appender for the log crate. let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider); - log::set_boxed_logger(Box::new(otel_log_appender)).unwrap(); - log::set_max_level(Level::Info.to_level_filter()); + log::set_boxed_logger(Box::new(otel_log_appender)).unwrap(); log::set_max_level(Level::Info.to_level_filter()); // Emit logs using macros from the log crate. let fruit = "apple"; From aef732d2133f91bb6fb21ad502fbb736b7c5a13e Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 11:52:39 -0800 Subject: [PATCH 08/11] Fix lint in log-basic example --- opentelemetry-appender-log/examples/logs-basic.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-appender-log/examples/logs-basic.rs b/opentelemetry-appender-log/examples/logs-basic.rs index b2daba7e97..e1faf255b7 100644 --- a/opentelemetry-appender-log/examples/logs-basic.rs +++ b/opentelemetry-appender-log/examples/logs-basic.rs @@ -20,7 +20,8 @@ async fn main() { // Setup Log Appender for the log crate. let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider); - log::set_boxed_logger(Box::new(otel_log_appender)).unwrap(); log::set_max_level(Level::Info.to_level_filter()); + log::set_boxed_logger(Box::new(otel_log_appender)).unwrap(); + log::set_max_level(Level::Info.to_level_filter()); // Emit logs using macros from the log crate. let fruit = "apple"; From f169f063e8bc9d2832eaf8d6882e1823dbf2d471 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 14:00:50 -0800 Subject: [PATCH 09/11] Fix integration test and address feedback --- opentelemetry-otlp/tests/integration_test/tests/logs.rs | 2 +- opentelemetry-sdk/src/logs/log_processor.rs | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 8498a24913..b9810a7bbc 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -28,7 +28,7 @@ fn init_logs() -> Result { let exporter = exporter_builder.build()?; Ok(LoggerProvider::builder() - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .with_resource( Resource::builder_empty() .with_service_name("logs-integration-test") diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index dd37af09ce..831ec1e522 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1198,9 +1198,6 @@ mod tests { let exporter = InMemoryLogExporterBuilder::default().build(); let processor = BatchLogProcessor::new(Box::new(exporter.clone()), BatchConfig::default()); - // - // deadloack happens in shutdown with tokio current_thread runtime - // processor.shutdown().unwrap(); } From 45ad6d6d5870c2cad72bc920a7eb5a28e64b780c Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 14:49:04 -0800 Subject: [PATCH 10/11] Remove unused use of runtime in integration test --- opentelemetry-otlp/tests/integration_test/tests/logs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index b9810a7bbc..a2110e3ab9 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -8,7 +8,7 @@ use log::{info, Level}; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::LogExporter; use opentelemetry_sdk::logs::LoggerProvider; -use opentelemetry_sdk::{logs as sdklogs, runtime, Resource}; +use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::fs::File; use std::os::unix::fs::MetadataExt; use std::time::Duration; From a04683ae249a218a7588edfa59b30627eb411ba6 Mon Sep 17 00:00:00 2001 From: Tom Tan Date: Wed, 18 Dec 2024 15:19:37 -0800 Subject: [PATCH 11/11] Add wait before calling shutdown --- opentelemetry-otlp/tests/integration_test/tests/logs.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index a2110e3ab9..5ae774cc76 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -48,6 +48,9 @@ pub async fn test_logs() -> Result<()> { log::set_max_level(Level::Info.to_level_filter()); info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); + + // TODO: remove below wait before calling logger_provider.shutdown() + tokio::time::sleep(Duration::from_secs(10)).await; let _ = logger_provider.shutdown(); tokio::time::sleep(Duration::from_secs(10)).await;