diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 311a8a4bfc..2e298b2963 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -4,6 +4,10 @@ - The `OTEL_EXPORTER_OTLP_TIMEOUT`, `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`, `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT` and `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT` are changed from seconds to miliseconds. - Fixed `.with_headers()` in `HttpExporterBuilder` to correctly support multiple key/value pairs. [#2699](https://github.com/open-telemetry/opentelemetry-rust/pull/2699) +- Fixed + [#2770](https://github.com/open-telemetry/opentelemetry-rust/issues/2770) + partially to properly handle `shutdown()` when using `http`. (`tonic` still + does not do proper shutdown) ## 0.28.0 diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index d108e59c5c..e25d94bc20 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -46,7 +46,7 @@ impl LogExporter for OtlpHttpClient { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { let mut client_guard = self.client.lock().map_err(|e| { OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e)) })?; diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index cdbed23be2..cbcf5284b3 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -5,15 +5,15 @@ use opentelemetry_proto::tonic::collector::logs::v1::{ }; use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::{LogBatch, LogExporter}; +use tokio::sync::Mutex; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; -use tokio::sync::Mutex; pub(crate) struct TonicLogsClient { - inner: Option, + inner: Mutex>, #[allow(dead_code)] // would be removed once we support set_resource for metrics. resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, @@ -21,7 +21,7 @@ pub(crate) struct TonicLogsClient { struct ClientInner { client: LogsServiceClient, - interceptor: Mutex, + interceptor: BoxInterceptor, } impl fmt::Debug for TonicLogsClient { @@ -46,10 +46,10 @@ impl TonicLogsClient { otel_debug!(name: "TonicsLogsClientBuilt"); TonicLogsClient { - inner: Some(ClientInner { + inner: Mutex::new(Some(ClientInner { client, - interceptor: Mutex::new(interceptor), - }), + interceptor, + })), resource: Default::default(), } } @@ -57,12 +57,10 @@ impl TonicLogsClient { impl LogExporter for TonicLogsClient { async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { - let (mut client, metadata, extensions) = match &self.inner { + let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() { Some(inner) => { let (m, e, _) = inner .interceptor - .lock() - .await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here .call(Request::new(())) .map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))? .into_parts(); @@ -86,11 +84,15 @@ impl LogExporter for TonicLogsClient { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { - match self.inner.take() { - Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown. - None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down. - } + fn shutdown(&self) -> OTelSdkResult { + // TODO: Implement actual shutdown + // Due to the use of tokio::sync::Mutex to guard + // the inner client, we need to await the call to lock the mutex + // and that requires async runtime. + // It is possible to fix this by using + // a dedicated thread just to handle shutdown. + // But for now, we just return Ok. + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 4beede9086..67a8ade3b8 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -156,4 +156,13 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter { SupportedTransportClient::Http(client) => client.set_resource(resource), } } + + fn shutdown(&self) -> OTelSdkResult { + match &self.client { + #[cfg(feature = "grpc-tonic")] + SupportedTransportClient::Tonic(client) => client.shutdown(), + #[cfg(any(feature = "http-proto", feature = "http-json"))] + SupportedTransportClient::Http(client) => client.shutdown(), + } + } } diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 93c3b92abb..80847e7d87 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -60,6 +60,10 @@ `LogProcessor` and `LogExporter` traits. `SdkLogger` no longer passes its `scope` name but instead passes the incoming `name` when invoking `event_enabled` on processors. +- **Breaking** for custom LogExporter authors: `shutdown()` method in + `LogExporter` trait no longer requires a mutable ref to `self`. If the exporter + needs to mutate state, it should rely on interior mutability. + [2764](https://github.com/open-telemetry/opentelemetry-rust/pull/2764) ## 0.28.0 diff --git a/opentelemetry-sdk/src/logs/export.rs b/opentelemetry-sdk/src/logs/export.rs index 83322199f8..5ef91c77c8 100644 --- a/opentelemetry-sdk/src/logs/export.rs +++ b/opentelemetry-sdk/src/logs/export.rs @@ -136,7 +136,7 @@ pub trait LogExporter: Send + Sync + Debug { ) -> impl std::future::Future + Send; /// Shuts down the exporter. - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { Ok(()) } #[cfg(feature = "spec_unstable_logs_enabled")] diff --git a/opentelemetry-sdk/src/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/logs/in_memory_exporter.rs index 0a0c4ee593..eb83033bdb 100644 --- a/opentelemetry-sdk/src/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/logs/in_memory_exporter.rs @@ -211,7 +211,7 @@ impl LogExporter for InMemoryLogExporter { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { self.shutdown_called .store(true, std::sync::atomic::Ordering::Relaxed); if self.should_reset_on_shutdown { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index cc634f063c..bffbbb0bb5 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -92,7 +92,7 @@ pub(crate) mod tests { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index 6b684f4d63..f86a0d27af 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -321,7 +321,7 @@ mod tests { Ok(()) } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { Ok(()) } diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index f4b57ccc5f..3ff9a59287 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -117,7 +117,7 @@ impl LogProcessor for SimpleLogProcessor { fn shutdown(&self) -> OTelSdkResult { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); - if let Ok(mut exporter) = self.exporter.lock() { + if let Ok(exporter) = self.exporter.lock() { exporter.shutdown() } else { Err(OTelSdkError::InternalFailure( diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 518dbfe2a3..c7a8397a30 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -57,7 +57,7 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter { } } - fn shutdown(&mut self) -> OTelSdkResult { + fn shutdown(&self) -> OTelSdkResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) }