Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove mut ref requirement for shutdown LogExporter #2764

Merged
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 49 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L49

Added line #L49 was not covered by tests
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;
Expand Down
56 changes: 33 additions & 23 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use tokio::sync::Mutex;
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
use std::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: Mutex<BoxInterceptor>,
interceptor: BoxInterceptor,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -46,30 +45,36 @@
otel_debug!(name: "TonicsLogsClientBuilt");

TonicLogsClient {
inner: Some(ClientInner {
inner: Mutex::new(Some(ClientInner {

Check warning on line 48 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L48

Added line #L48 was not covered by tests
client,
interceptor: Mutex::new(interceptor),
}),
interceptor,
})),

Check warning on line 51 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L50-L51

Added lines #L50 - L51 were not covered by tests
resource: Default::default(),
}
}
}

impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(OTelSdkError::AlreadyShutdown),
};
let (mut client, metadata, extensions) = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
OTelSdkError::InternalFailure(format!(
"unexpected status while exporting {e:?}"
))
})?
.into_parts();
Ok((inner.client.clone(), m, e))

Check warning on line 74 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L59-L74

Added lines #L59 - L74 were not covered by tests
}
None => Err(OTelSdkError::AlreadyShutdown),
})?;

Check warning on line 77 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L76-L77

Added lines #L76 - L77 were not covered by tests

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);

Expand All @@ -86,8 +91,13 @@
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
fn shutdown(&self) -> OTelSdkResult {
match self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
.take()

Check warning on line 99 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L94-L99

Added lines #L94 - L99 were not covered by tests
{
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Shuts down the exporter.
fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 139 in opentelemetry-sdk/src/logs/export.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/export.rs#L139

Added line #L139 was not covered by tests
Ok(())
}
#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl LogExporter for InMemoryLogExporter {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {

Check warning on line 60 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L60

Added line #L60 was not covered by tests
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}
Expand Down