Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SimpleProcessor for Logs simplified #2825

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ impl InMemoryLogExporter {

impl LogExporter for InMemoryLogExporter {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
if self.is_shutdown_called() {
return Err(OTelSdkError::AlreadyShutdown);
}
let mut logs_guard = self.logs.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e))
})?;
Expand Down
84 changes: 25 additions & 59 deletions opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
//! +-----+---------------+ +-----------------------+ +-------------------+
//! ```

use crate::error::{OTelSdkError, OTelSdkResult};
use crate::error::OTelSdkResult;
use crate::logs::log_processor::LogProcessor;
use crate::{
logs::{LogBatch, LogExporter, SdkLogRecord},
Resource,
};

use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
use opentelemetry::{otel_warn, InstrumentationScope};

use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::Mutex;

/// A [`LogProcessor`] designed for testing and debugging purpose, that immediately
Expand Down Expand Up @@ -60,54 +59,37 @@
///
#[derive(Debug)]
pub struct SimpleLogProcessor<T: LogExporter> {
exporter: Mutex<T>,
is_shutdown: AtomicBool,
exporter: T,
export_mutex: Mutex<()>,
}

impl<T: LogExporter> SimpleLogProcessor<T> {
/// Creates a new instance of `SimpleLogProcessor`.
pub fn new(exporter: T) -> Self {
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
exporter,
export_mutex: Mutex::new(()),
}
}
}

impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
// export() does not require mutable self and can be called in parallel
// with other export() calls. However, OTel Spec requires that
// existing export() must be completed before the next export() call.
let _guard = self.export_mutex.lock().unwrap();

// We now have exclusive access to export
let result = {
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)))
};
if let Err(err) = result {
otel_warn!(
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)

Check warning on line 91 in opentelemetry-sdk/src/logs/simple_log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/simple_log_processor.rs#L91

Added line #L91 was not covered by tests
);
return;
}

let result = self
.exporter
.lock()
.map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
.and_then(|exporter| {
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
// Handle errors with specific static names
match result {
Err(OTelSdkError::InternalFailure(_)) => {
// logging as debug as this is not a user error
otel_debug!(
name: "SimpleLogProcessor.Emit.MutexPoisoning",
);
}
Err(err) => {
otel_error!(
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)
);
}
_ => {}
}
}

Expand All @@ -116,21 +98,11 @@
}

fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown
Copy link
Contributor

@utpilla utpilla Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is not complicated. It's simple enough.

I don't find the Mutex<()> approach making things simpler. In fact, from a readability/maintenance standpoint, the existing export code is much better than the one introduced in this PR. If you're using a Mutex to guard access to some resource, then it's best to wrap it with Mutex instead of having to manually ensure that we acquire a Mutex.

I don't see the need to go against the Rust way of doing things in this case. The current SimpleProcessor neither has the perf requirements nor the complexity to switch to manual dependency on acquiring Mutex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree at all!
It felt quite awkward to Mutex lock to do EventEnabled(), though it did not require mutability!
Not worried about performance at all here, just does not feel right to do that.

Regd. Rust idiomatic way to prevent multiple export at the same time - I think it'd be better to model export() as requiring mutable self, but we know that will limit us from achieving higher perf when we need it!

If all agree this PR is wrong direction, I can abandon it. (Its harmless with/without this PR, as simple processor is just a learning/test purpose component only)

.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
"SimpleLogProcessor mutex poison at shutdown".into(),
))
}
self.exporter.shutdown()
}

fn set_resource(&mut self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
self.exporter.set_resource(resource);
}

#[cfg(feature = "spec_unstable_logs_enabled")]
Expand All @@ -141,11 +113,7 @@
target: &str,
name: Option<&str>,
) -> bool {
if let Ok(exporter) = self.exporter.lock() {
exporter.event_enabled(level, target, name)
} else {
true
}
self.exporter.event_enabled(level, target, name)
}
}

Expand Down Expand Up @@ -232,13 +200,11 @@

processor.shutdown().unwrap();

let is_shutdown = processor
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed);
assert!(is_shutdown);

processor.emit(&mut record, &instrumentation);

// Emit was called after shutdown. While SimpleLogProcessor
// does not care, the exporter in this case does,
// and it ignores the export() calls after shutdown.
assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
assert!(exporter.is_shutdown_called());
}
Expand Down
Loading