Skip to content

Commit e69b763

Browse files
authored
Merge branch 'main' into main
2 parents e629842 + 68c9133 commit e69b763

File tree

11 files changed

+40
-21
lines changed

11 files changed

+40
-21
lines changed

opentelemetry-otlp/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
- The `OTEL_EXPORTER_OTLP_TIMEOUT`, `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`, `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT` and `OTEL_EXPORTER_OTLP_LOGS_TIMEOUT` are changed from seconds to miliseconds.
66
- Fixed `.with_headers()` in `HttpExporterBuilder` to correctly support multiple key/value pairs. [#2699](https://github.com/open-telemetry/opentelemetry-rust/pull/2699)
7+
- Fixed
8+
[#2770](https://github.com/open-telemetry/opentelemetry-rust/issues/2770)
9+
partially to properly handle `shutdown()` when using `http`. (`tonic` still
10+
does not do proper shutdown)
711

812
## 0.28.0
913

opentelemetry-otlp/src/exporter/http/logs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl LogExporter for OtlpHttpClient {
4646
Ok(())
4747
}
4848

49-
fn shutdown(&mut self) -> OTelSdkResult {
49+
fn shutdown(&self) -> OTelSdkResult {
5050
let mut client_guard = self.client.lock().map_err(|e| {
5151
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
5252
})?;

opentelemetry-otlp/src/exporter/tonic/logs.rs

+16-14
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,23 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
55
};
66
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
77
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
8+
use tokio::sync::Mutex;
89
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
910

1011
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
1112

1213
use super::BoxInterceptor;
13-
use tokio::sync::Mutex;
1414

1515
pub(crate) struct TonicLogsClient {
16-
inner: Option<ClientInner>,
16+
inner: Mutex<Option<ClientInner>>,
1717
#[allow(dead_code)]
1818
// <allow dead> would be removed once we support set_resource for metrics.
1919
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
2020
}
2121

2222
struct ClientInner {
2323
client: LogsServiceClient<Channel>,
24-
interceptor: Mutex<BoxInterceptor>,
24+
interceptor: BoxInterceptor,
2525
}
2626

2727
impl fmt::Debug for TonicLogsClient {
@@ -46,23 +46,21 @@ impl TonicLogsClient {
4646
otel_debug!(name: "TonicsLogsClientBuilt");
4747

4848
TonicLogsClient {
49-
inner: Some(ClientInner {
49+
inner: Mutex::new(Some(ClientInner {
5050
client,
51-
interceptor: Mutex::new(interceptor),
52-
}),
51+
interceptor,
52+
})),
5353
resource: Default::default(),
5454
}
5555
}
5656
}
5757

5858
impl LogExporter for TonicLogsClient {
5959
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
60-
let (mut client, metadata, extensions) = match &self.inner {
60+
let (mut client, metadata, extensions) = match self.inner.lock().await.as_mut() {
6161
Some(inner) => {
6262
let (m, e, _) = inner
6363
.interceptor
64-
.lock()
65-
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
6664
.call(Request::new(()))
6765
.map_err(|e| OTelSdkError::InternalFailure(format!("error: {:?}", e)))?
6866
.into_parts();
@@ -86,11 +84,15 @@ impl LogExporter for TonicLogsClient {
8684
Ok(())
8785
}
8886

89-
fn shutdown(&mut self) -> OTelSdkResult {
90-
match self.inner.take() {
91-
Some(_) => Ok(()), // Successfully took `inner`, indicating a successful shutdown.
92-
None => Err(OTelSdkError::AlreadyShutdown), // `inner` was already `None`, meaning it's already shut down.
93-
}
87+
fn shutdown(&self) -> OTelSdkResult {
88+
// TODO: Implement actual shutdown
89+
// Due to the use of tokio::sync::Mutex to guard
90+
// the inner client, we need to await the call to lock the mutex
91+
// and that requires async runtime.
92+
// It is possible to fix this by using
93+
// a dedicated thread just to handle shutdown.
94+
// But for now, we just return Ok.
95+
Ok(())
9496
}
9597

9698
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {

opentelemetry-otlp/src/logs.rs

+9
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,13 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
156156
SupportedTransportClient::Http(client) => client.set_resource(resource),
157157
}
158158
}
159+
160+
fn shutdown(&self) -> OTelSdkResult {
161+
match &self.client {
162+
#[cfg(feature = "grpc-tonic")]
163+
SupportedTransportClient::Tonic(client) => client.shutdown(),
164+
#[cfg(any(feature = "http-proto", feature = "http-json"))]
165+
SupportedTransportClient::Http(client) => client.shutdown(),
166+
}
167+
}
159168
}

opentelemetry-sdk/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@
6060
`LogProcessor` and `LogExporter` traits. `SdkLogger` no longer passes its
6161
`scope` name but instead passes the incoming `name` when invoking
6262
`event_enabled` on processors.
63+
- **Breaking** for custom LogExporter authors: `shutdown()` method in
64+
`LogExporter` trait no longer requires a mutable ref to `self`. If the exporter
65+
needs to mutate state, it should rely on interior mutability.
66+
[2764](https://github.com/open-telemetry/opentelemetry-rust/pull/2764)
6367

6468
## 0.28.0
6569

opentelemetry-sdk/src/logs/export.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pub trait LogExporter: Send + Sync + Debug {
136136
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
137137

138138
/// Shuts down the exporter.
139-
fn shutdown(&mut self) -> OTelSdkResult {
139+
fn shutdown(&self) -> OTelSdkResult {
140140
Ok(())
141141
}
142142
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-sdk/src/logs/in_memory_exporter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ impl LogExporter for InMemoryLogExporter {
211211
Ok(())
212212
}
213213

214-
fn shutdown(&mut self) -> OTelSdkResult {
214+
fn shutdown(&self) -> OTelSdkResult {
215215
self.shutdown_called
216216
.store(true, std::sync::atomic::Ordering::Relaxed);
217217
if self.should_reset_on_shutdown {

opentelemetry-sdk/src/logs/log_processor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub(crate) mod tests {
9292
Ok(())
9393
}
9494

95-
fn shutdown(&mut self) -> OTelSdkResult {
95+
fn shutdown(&self) -> OTelSdkResult {
9696
Ok(())
9797
}
9898

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ mod tests {
321321
Ok(())
322322
}
323323

324-
fn shutdown(&mut self) -> OTelSdkResult {
324+
fn shutdown(&self) -> OTelSdkResult {
325325
Ok(())
326326
}
327327

opentelemetry-sdk/src/logs/simple_log_processor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
117117
fn shutdown(&self) -> OTelSdkResult {
118118
self.is_shutdown
119119
.store(true, std::sync::atomic::Ordering::Relaxed);
120-
if let Ok(mut exporter) = self.exporter.lock() {
120+
if let Ok(exporter) = self.exporter.lock() {
121121
exporter.shutdown()
122122
} else {
123123
Err(OTelSdkError::InternalFailure(

opentelemetry-stdout/src/logs/exporter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
5757
}
5858
}
5959

60-
fn shutdown(&mut self) -> OTelSdkResult {
60+
fn shutdown(&self) -> OTelSdkResult {
6161
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
6262
Ok(())
6363
}

0 commit comments

Comments
 (0)