Skip to content

Commit 16c0e10

Browse files
authored
Global Log handler cleanup - Logs SDK (#2184)
1 parent 3f5c230 commit 16c0e10

File tree

4 files changed

+82
-73
lines changed

4 files changed

+82
-73
lines changed

opentelemetry-sdk/src/logs/log_emitter.rs

+26-15
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
22
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
3-
use opentelemetry::otel_warn;
43
use opentelemetry::{
5-
global,
64
logs::{LogError, LogResult},
5+
otel_debug,
76
trace::TraceContextExt,
87
Context, InstrumentationLibrary,
98
};
@@ -126,17 +125,10 @@ impl LoggerProvider {
126125
if errs.is_empty() {
127126
Ok(())
128127
} else {
129-
otel_warn!(
130-
name: "logger_provider_shutdown_error",
131-
error = format!("{:?}", errs)
132-
);
133-
Err(LogError::Other(format!("{:?}", errs).into()))
128+
Err(LogError::Other(format!("{errs:?}").into()))
134129
}
135130
} else {
136-
otel_warn!(
137-
name: "logger_provider_already_shutdown"
138-
);
139-
Err(LogError::Other("logger provider already shut down".into()))
131+
Err(LogError::AlreadyShutdown("LoggerProvider".to_string()))
140132
}
141133
}
142134
}
@@ -154,6 +146,24 @@ impl LoggerProviderInner {
154146
let mut errs = vec![];
155147
for processor in &self.processors {
156148
if let Err(err) = processor.shutdown() {
149+
// Log at debug level because:
150+
// - The error is also returned to the user for handling (if applicable)
151+
// - Or the error occurs during `LoggerProviderInner::Drop` as part of telemetry shutdown,
152+
// which is non-actionable by the user
153+
match err {
154+
// specific handling for mutex poisioning
155+
LogError::MutexPoisoned(_) => {
156+
otel_debug!(
157+
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
158+
);
159+
}
160+
_ => {
161+
otel_debug!(
162+
name: "LoggerProvider.Drop.ShutdownError",
163+
error = format!("{err}")
164+
);
165+
}
166+
}
157167
errs.push(err);
158168
}
159169
}
@@ -164,10 +174,11 @@ impl LoggerProviderInner {
164174
impl Drop for LoggerProviderInner {
165175
fn drop(&mut self) {
166176
if !self.is_shutdown.load(Ordering::Relaxed) {
167-
let errs = self.shutdown();
168-
if !errs.is_empty() {
169-
global::handle_error(LogError::Other(format!("{:?}", errs).into()));
170-
}
177+
let _ = self.shutdown(); // errors are handled within shutdown
178+
} else {
179+
otel_debug!(
180+
name: "LoggerProvider.Drop.AlreadyShutdown"
181+
);
171182
}
172183
}
173184
}

opentelemetry-sdk/src/logs/log_processor.rs

+34-56
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ use futures_util::{
1212
#[cfg(feature = "logs_level_enabled")]
1313
use opentelemetry::logs::Severity;
1414
use opentelemetry::{
15-
global,
1615
logs::{LogError, LogResult},
17-
otel_error, otel_warn, InstrumentationLibrary,
16+
otel_debug, otel_error, otel_warn, InstrumentationLibrary,
1817
};
1918

2019
use std::sync::atomic::AtomicBool;
@@ -99,26 +98,36 @@ impl LogProcessor for SimpleLogProcessor {
9998
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
10099
// noop after shutdown
101100
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
101+
// this is a warning, as the user is trying to log after the processor has been shutdown
102102
otel_warn!(
103-
name: "simple_log_processor_emit_after_shutdown"
103+
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
104104
);
105105
return;
106106
}
107107

108108
let result = self
109109
.exporter
110110
.lock()
111-
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
111+
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
112112
.and_then(|mut exporter| {
113113
let log_tuple = &[(record as &LogRecord, instrumentation)];
114114
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
115115
});
116-
if let Err(err) = result {
117-
otel_error!(
118-
name: "simple_log_processor_emit_error",
119-
error = format!("{:?}", err)
120-
);
121-
global::handle_error(err);
116+
// Handle errors with specific static names
117+
match result {
118+
Err(LogError::MutexPoisoned(_)) => {
119+
// logging as debug as this is not a user error
120+
otel_debug!(
121+
name: "SimpleLogProcessor.Emit.MutexPoisoning",
122+
);
123+
}
124+
Err(err) => {
125+
otel_error!(
126+
name: "SimpleLogProcessor.Emit.ExportError",
127+
error = format!("{}",err)
128+
);
129+
}
130+
_ => {}
122131
}
123132
}
124133

@@ -133,12 +142,7 @@ impl LogProcessor for SimpleLogProcessor {
133142
exporter.shutdown();
134143
Ok(())
135144
} else {
136-
otel_error!(
137-
name: "simple_log_processor_shutdown_error"
138-
);
139-
Err(LogError::Other(
140-
"simple logprocessor mutex poison during shutdown".into(),
141-
))
145+
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
142146
}
143147
}
144148

@@ -170,12 +174,12 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
170174
instrumentation.clone(),
171175
)));
172176

177+
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
173178
if let Err(err) = result {
174179
otel_error!(
175-
name: "batch_log_processor_emit_error",
176-
error = format!("{:?}", err)
180+
name: "BatchLogProcessor.Export.Error",
181+
error = format!("{}", err)
177182
);
178-
global::handle_error(LogError::Other(err.into()));
179183
}
180184
}
181185

@@ -243,10 +247,9 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
243247

244248
if let Err(err) = result {
245249
otel_error!(
246-
name: "batch_log_processor_export_error",
247-
error = format!("{:?}", err)
250+
name: "BatchLogProcessor.Export.Error",
251+
error = format!("{}", err)
248252
);
249-
global::handle_error(err);
250253
}
251254
}
252255
}
@@ -261,24 +264,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
261264
.await;
262265

263266
if let Some(channel) = res_channel {
264-
if let Err(result) = channel.send(result) {
265-
global::handle_error(LogError::from(format!(
266-
"failed to send flush result: {:?}",
267-
result
268-
)));
269-
otel_error!(
270-
name: "batch_log_processor_flush_error",
271-
error = format!("{:?}", result),
272-
message = "Failed to send flush result"
267+
if let Err(send_error) = channel.send(result) {
268+
otel_debug!(
269+
name: "BatchLogProcessor.Flush.SendResultError",
270+
error = format!("{:?}", send_error),
273271
);
274272
}
275-
} else if let Err(err) = result {
276-
otel_error!(
277-
name: "batch_log_processor_flush_error",
278-
error = format!("{:?}", err),
279-
message = "Flush failed"
280-
);
281-
global::handle_error(err);
282273
}
283274
}
284275
// Stream has terminated or processor is shutdown, return to finish execution.
@@ -293,21 +284,14 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
293284

294285
exporter.shutdown();
295286

296-
if let Err(result) = ch.send(result) {
297-
otel_error!(
298-
name: "batch_log_processor_shutdown_error",
299-
error = format!("{:?}", result),
300-
message = "Failed to send shutdown result"
287+
if let Err(send_error) = ch.send(result) {
288+
otel_debug!(
289+
name: "BatchLogProcessor.Shutdown.SendResultError",
290+
error = format!("{:?}", send_error),
301291
);
302-
global::handle_error(LogError::from(format!(
303-
"failed to send batch processor shutdown result: {:?}",
304-
result
305-
)));
306292
}
307-
308293
break;
309294
}
310-
311295
// propagate the resource
312296
BatchMessage::SetResource(resource) => {
313297
exporter.set_resource(&resource);
@@ -357,13 +341,7 @@ where
357341
pin_mut!(timeout);
358342
match future::select(export, timeout).await {
359343
Either::Left((export_res, _)) => export_res,
360-
Either::Right((_, _)) => {
361-
otel_error!(
362-
name: "export_with_timeout_timeout",
363-
timeout_duration = time_out.as_millis()
364-
);
365-
ExportResult::Err(LogError::ExportTimedOut(time_out))
366-
}
344+
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
367345
}
368346
}
369347

opentelemetry/src/global/internal_logging.rs

+14-2
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,13 @@ macro_rules! otel_warn {
6666
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
6767
#[cfg(feature = "internal-logs")]
6868
{
69-
tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
69+
tracing::warn!(name: $name,
70+
target: env!("CARGO_PKG_NAME"),
71+
$($key = {
72+
$value
73+
}),+,
74+
""
75+
)
7076
}
7177
#[cfg(not(feature = "internal-logs"))]
7278
{
@@ -136,7 +142,13 @@ macro_rules! otel_error {
136142
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
137143
#[cfg(feature = "internal-logs")]
138144
{
139-
tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
145+
tracing::error!(name: $name,
146+
target: env!("CARGO_PKG_NAME"),
147+
$($key = {
148+
$value
149+
}),+,
150+
""
151+
)
140152
}
141153
#[cfg(not(feature = "internal-logs"))]
142154
{

opentelemetry/src/logs/mod.rs

+8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ pub enum LogError {
3030
#[error("Exporter timed out after {} seconds", .0.as_secs())]
3131
ExportTimedOut(Duration),
3232

33+
/// Processor is already shutdown
34+
#[error("{0} already shutdown")]
35+
AlreadyShutdown(String),
36+
37+
/// Mutex lock poisoning
38+
#[error("mutex lock poisioning for {0}")]
39+
MutexPoisoned(String),
40+
3341
/// Other errors propagated from log SDK that weren't covered above.
3442
#[error(transparent)]
3543
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),

0 commit comments

Comments
 (0)