Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global Log handler cleanup - Logs SDK #2184

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::otel_warn;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_debug,
trace::TraceContextExt,
Context, InstrumentationLibrary,
};
Expand Down Expand Up @@ -126,17 +125,10 @@
if errs.is_empty() {
Ok(())
} else {
otel_warn!(
name: "logger_provider_shutdown_error",
error = format!("{:?}", errs)
);
Err(LogError::Other(format!("{:?}", errs).into()))
Err(LogError::Other(format!("{errs:?}").into()))

Check warning on line 128 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L128

Added line #L128 was not covered by tests
}
} else {
otel_warn!(
name: "logger_provider_already_shutdown"
);
Err(LogError::Other("logger provider already shut down".into()))
Err(LogError::AlreadyShutdown("LoggerProvider".to_string()))
}
}
}
Expand All @@ -154,6 +146,24 @@
let mut errs = vec![];
for processor in &self.processors {
if let Err(err) = processor.shutdown() {
// Log at debug level because:
// - The error is also returned to the user for handling (if applicable)
// - Or the error occurs during `LoggerProviderInner::Drop` as part of telemetry shutdown,
// which is non-actionable by the user
match err {

Check warning on line 153 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L153

Added line #L153 was not covered by tests
// specific handling for mutex poisioning
LogError::MutexPoisoned(_) => {
otel_debug!(
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
);

Check warning on line 158 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L156-L158

Added lines #L156 - L158 were not covered by tests
}
_ => {
otel_debug!(

Check warning on line 161 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L161

Added line #L161 was not covered by tests
name: "LoggerProvider.Drop.ShutdownError",
error = format!("{err}")

Check warning on line 163 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L163

Added line #L163 was not covered by tests
);
}
}
errs.push(err);
}
}
Expand All @@ -164,10 +174,11 @@
impl Drop for LoggerProviderInner {
fn drop(&mut self) {
if !self.is_shutdown.load(Ordering::Relaxed) {
let errs = self.shutdown();
if !errs.is_empty() {
global::handle_error(LogError::Other(format!("{:?}", errs).into()));
}
let _ = self.shutdown(); // errors are handled within shutdown
} else {
otel_debug!(
name: "LoggerProvider.Drop.AlreadyShutdown"
);
}
}
}
Expand Down
90 changes: 34 additions & 56 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_error, otel_warn, InstrumentationLibrary,
otel_debug, otel_error, otel_warn, InstrumentationLibrary,
};

use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -99,26 +98,36 @@
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
otel_warn!(
name: "simple_log_processor_emit_after_shutdown"
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
);
return;
}

let result = self
.exporter
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
.and_then(|mut exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
otel_error!(
name: "simple_log_processor_emit_error",
error = format!("{:?}", err)
);
global::handle_error(err);
// Handle errors with specific static names
match result {

Check warning on line 117 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L117

Added line #L117 was not covered by tests
Err(LogError::MutexPoisoned(_)) => {
// logging as debug as this is not a user error
otel_debug!(
name: "SimpleLogProcessor.Emit.MutexPoisoning",
);

Check warning on line 122 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L120-L122

Added lines #L120 - L122 were not covered by tests
}
Err(err) => {
otel_error!(

Check warning on line 125 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L124-L125

Added lines #L124 - L125 were not covered by tests
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)

Check warning on line 127 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L127

Added line #L127 was not covered by tests
);
}
_ => {}
}
}

Expand All @@ -133,12 +142,7 @@
exporter.shutdown();
Ok(())
} else {
otel_error!(
name: "simple_log_processor_shutdown_error"
);
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))

Check warning on line 145 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L145

Added line #L145 was not covered by tests
}
}

Expand Down Expand Up @@ -170,12 +174,12 @@
instrumentation.clone(),
)));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if let Err(err) = result {
otel_error!(
name: "batch_log_processor_emit_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be triggered when channel is full? If yes, we need to rethink this, as this can spam the log output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this error only triggers when channel is full or closed. We need to add some throttling or logic to prevent flooding - have added the TODO for now, as we need common strategy for such flooding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we need a common strategy, but lets remove the error log from here. It'll flood as-is when buffer is full.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove altogether, or make it otel_debug for now - with comment to change it to otel_error once throttling is ready.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either of them are fine with me, though I slightly prefer removing altogether, as I don't know if we can ship a throttling solution for next release.

error = format!("{}", err)

Check warning on line 181 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L181

Added line #L181 was not covered by tests
);
global::handle_error(LogError::Other(err.into()));
}
}

Expand Down Expand Up @@ -243,10 +247,9 @@

if let Err(err) = result {
otel_error!(
name: "batch_log_processor_export_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)

Check warning on line 251 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L251

Added line #L251 was not covered by tests
);
global::handle_error(err);
}
}
}
Expand All @@ -261,24 +264,12 @@
.await;

if let Some(channel) = res_channel {
if let Err(result) = channel.send(result) {
global::handle_error(LogError::from(format!(
"failed to send flush result: {:?}",
result
)));
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", result),
message = "Failed to send flush result"
if let Err(send_error) = channel.send(result) {
otel_debug!(

Check warning on line 268 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L268

Added line #L268 was not covered by tests
name: "BatchLogProcessor.Flush.SendResultError",
error = format!("{:?}", send_error),

Check warning on line 270 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L270

Added line #L270 was not covered by tests
);
}
} else if let Err(err) = result {
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", err),
message = "Flush failed"
);
global::handle_error(err);
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
Expand All @@ -293,21 +284,14 @@

exporter.shutdown();

if let Err(result) = ch.send(result) {
otel_error!(
name: "batch_log_processor_shutdown_error",
error = format!("{:?}", result),
message = "Failed to send shutdown result"
if let Err(send_error) = ch.send(result) {
otel_debug!(

Check warning on line 288 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L288

Added line #L288 was not covered by tests
name: "BatchLogProcessor.Shutdown.SendResultError",
error = format!("{:?}", send_error),

Check warning on line 290 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L290

Added line #L290 was not covered by tests
);
global::handle_error(LogError::from(format!(
"failed to send batch processor shutdown result: {:?}",
result
)));
}

break;
}

// propagate the resource
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
Expand Down Expand Up @@ -357,13 +341,7 @@
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => {
otel_error!(
name: "export_with_timeout_timeout",
timeout_duration = time_out.as_millis()
);
ExportResult::Err(LogError::ExportTimedOut(time_out))
}
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),

Check warning on line 344 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L344

Added line #L344 was not covered by tests
}
}

Expand Down
16 changes: 14 additions & 2 deletions opentelemetry/src/global/internal_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ macro_rules! otel_warn {
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::warn!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
Expand Down Expand Up @@ -136,7 +142,13 @@ macro_rules! otel_error {
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::error!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub enum LogError {
#[error("Exporter timed out after {} seconds", .0.as_secs())]
ExportTimedOut(Duration),

/// Processor is already shutdown
#[error("{0} already shutdown")]
AlreadyShutdown(String),

/// Mutex lock poisoning
#[error("mutex lock poisioning for {0}")]
MutexPoisoned(String),

/// Other errors propagated from log SDK that weren't covered above.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
Expand Down
Loading