Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove mut ref requirement for shutdown LogExporter #2764

Merged
4 changes: 4 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- The `OTEL_EXPORTER_OTLP_TIMEOUT`, `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`, `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT` and `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT` are changed from seconds to miliseconds.
- Fixed `.with_headers()` in `HttpExporterBuilder` to correctly support multiple key/value pairs. [#2699](https://github.com/open-telemetry/opentelemetry-rust/pull/2699)
- Fixed
[#2770](https://github.com/open-telemetry/opentelemetry-rust/issues/2770)
partially to properly handle `shutdown()` when using `http`. (`tonic` still
does not do proper shutdown)

## 0.28.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 49 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L49

Added line #L49 was not covered by tests
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;
Expand Down
30 changes: 16 additions & 14 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use tokio::sync::Mutex;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use tokio::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: Mutex<BoxInterceptor>,
interceptor: BoxInterceptor,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -46,23 +46,21 @@
otel_debug!(name: "TonicsLogsClientBuilt");

TonicLogsClient {
inner: Some(ClientInner {
inner: Mutex::new(Some(ClientInner {

Check warning on line 49 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L49

Added line #L49 was not covered by tests
client,
interceptor: Mutex::new(interceptor),
}),
interceptor,
})),

Check warning on line 52 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L51-L52

Added lines #L51 - L52 were not covered by tests
resource: Default::default(),
}
}
}

impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {

Check warning on line 60 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L60

Added line #L60 was not covered by tests
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
Expand All @@ -86,11 +84,15 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
}
fn shutdown(&self) -> OTelSdkResult {
// TODO: Implement actual shutdown
// Due to the use of tokio::sync::Mutex to guard
// the inner client, we need to await the call to lock the mutex
// and that requires async runtime.
// It is possible to fix this by using
// a dedicated thread just to handle shutdown.
// But for now, we just return Ok.
Ok(())

Check warning on line 95 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L87-L95

Added lines #L87 - L95 were not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,13 @@
SupportedTransportClient::Http(client) => client.set_resource(resource),
}
}

fn shutdown(&self) -> OTelSdkResult {
match &self.client {

Check warning on line 161 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L160-L161

Added lines #L160 - L161 were not covered by tests
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.shutdown(),

Check warning on line 163 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L163

Added line #L163 was not covered by tests
#[cfg(any(feature = "http-proto", feature = "http-json"))]
SupportedTransportClient::Http(client) => client.shutdown(),

Check warning on line 165 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L165

Added line #L165 was not covered by tests
}
}

Check warning on line 167 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L167

Added line #L167 was not covered by tests
}
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
`LogProcessor` and `LogExporter` traits. `SdkLogger` no longer passes its
`scope` name but instead passes the incoming `name` when invoking
`event_enabled` on processors.
- **Breaking** for custom LogExporter authors: `shutdown()` method in
`LogExporter` trait no longer requires a mutable ref to `self`. If the exporter
needs to mutate state, it should rely on interior mutability.
[2764](https://github.com/open-telemetry/opentelemetry-rust/pull/2764)

## 0.28.0

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Shuts down the exporter.
fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 139 in opentelemetry-sdk/src/logs/export.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/export.rs#L139

Added line #L139 was not covered by tests
Ok(())
}
#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl LogExporter for InMemoryLogExporter {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 60 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L60

Added line #L60 was not covered by tests
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}
Expand Down