Skip to content

Commit

Permalink
fix tonic mutability
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas committed Mar 8, 2025
1 parent 95627b9 commit db055b2
Showing 1 changed file with 35 additions and 22 deletions.
57 changes: 35 additions & 22 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
use std::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: tokio::sync::Mutex<BoxInterceptor>,
interceptor: BoxInterceptor,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -45,30 +45,38 @@ impl TonicLogsClient {
otel_debug!(name: "TonicsLogsClientBuilt");

TonicLogsClient {
inner: Some(ClientInner {
inner: Mutex::new(Some(ClientInner {

Check warning on line 48 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L48

Added line #L48 was not covered by tests
client,
interceptor: tokio::sync::Mutex::new(interceptor),
}),
interceptor: interceptor,
})),

Check warning on line 51 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L50-L51

Added lines #L50 - L51 were not covered by tests
resource: Default::default(),
}
}
}

impl LogExporter for TonicLogsClient {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
.call(Request::new(()))
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
.into_parts();
(inner.client.clone(), m, e)
}
None => return Err(OTelSdkError::AlreadyShutdown),
};
let (mut client, metadata, extensions) = self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {e:?}")))
.and_then(|mut inner| match &mut *inner {
Some(inner) => {
let (m, e, _) = inner
.interceptor
.call(Request::new(()))
.map_err(|e| {
OTelSdkError::InternalFailure(format!(
"unexpected status while exporting {e:?}"
))
})?
.into_parts();
Ok((inner.client.clone(), m, e))

Check warning on line 74 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L59-L74

Added lines #L59 - L74 were not covered by tests
}
None => Err(OTelSdkError::InternalFailure(
"exporter is already shut down".into(),
)),
})?;

Check warning on line 79 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L76-L79

Added lines #L76 - L79 were not covered by tests

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);

Expand All @@ -85,8 +93,13 @@ impl LogExporter for TonicLogsClient {
Ok(())
}

fn shutdown(&mut self) -> OTelSdkResult {
match self.inner.take() {
fn shutdown(&self) -> OTelSdkResult {
match self
.inner
.lock()
.map_err(|e| OTelSdkError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
.take()

Check warning on line 101 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L96-L101

Added lines #L96 - L101 were not covered by tests
{
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
}
Expand Down

0 comments on commit db055b2

Please sign in to comment.