Skip to content

Commit 4883d2d

Browse files
authored
Support LogProcessors chaining through mutable reference (#1726)
1 parent 33abef2 commit 4883d2d

File tree

12 files changed

+225
-54
lines changed

12 files changed

+225
-54
lines changed

opentelemetry-appender-tracing/benches/logs.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
| noop_layer_disabled | 12 ns |
1111
| noop_layer_enabled | 25 ns |
1212
| ot_layer_disabled | 19 ns |
13-
| ot_layer_enabled | 588 ns |
13+
| ot_layer_enabled | 446 ns |
1414
*/
1515

1616
use async_trait::async_trait;
@@ -33,7 +33,7 @@ struct NoopExporter {
3333

3434
#[async_trait]
3535
impl LogExporter for NoopExporter {
36-
async fn export(&mut self, _: Vec<LogData>) -> LogResult<()> {
36+
async fn export<'a>(&mut self, _: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
3737
LogResult::Ok(())
3838
}
3939

@@ -54,7 +54,7 @@ impl NoopProcessor {
5454
}
5555

5656
impl LogProcessor for NoopProcessor {
57-
fn emit(&self, _: LogData) {
57+
fn emit(&self, _: &mut LogData) {
5858
// no-op
5959
}
6060

opentelemetry-otlp/src/exporter/http/logs.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::OtlpHttpClient;
99

1010
#[async_trait]
1111
impl LogExporter for OtlpHttpClient {
12-
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
12+
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
1313
let client = self
1414
.client
1515
.lock()
@@ -19,7 +19,13 @@ impl LogExporter for OtlpHttpClient {
1919
_ => Err(LogError::Other("exporter is already shut down".into())),
2020
})?;
2121

22-
let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? };
22+
//TODO: avoid cloning here.
23+
let owned_batch = batch
24+
.into_iter()
25+
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
26+
.collect::<Vec<LogData>>();
27+
28+
let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
2329
let mut request = http::Request::builder()
2430
.method(Method::POST)
2531
.uri(&self.collector_endpoint)

opentelemetry-otlp/src/exporter/tonic/logs.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl TonicLogsClient {
5252

5353
#[async_trait]
5454
impl LogExporter for TonicLogsClient {
55-
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
55+
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
5656
let (mut client, metadata, extensions) = match &mut self.inner {
5757
Some(inner) => {
5858
let (m, e, _) = inner
@@ -65,9 +65,11 @@ impl LogExporter for TonicLogsClient {
6565
None => return Err(LogError::Other("exporter is already shut down".into())),
6666
};
6767

68+
// TODO: Avoid cloning here.
6869
let resource_logs = {
6970
batch
7071
.into_iter()
72+
.map(|log_data_cow| (log_data_cow.into_owned()))
7173
.map(|log_data| (log_data, &self.resource))
7274
.map(Into::into)
7375
.collect()

opentelemetry-otlp/src/logs.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ impl LogExporter {
9898

9999
#[async_trait]
100100
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
101-
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
101+
async fn export<'a>(
102+
&mut self,
103+
batch: Vec<std::borrow::Cow<'a, LogData>>,
104+
) -> opentelemetry::logs::LogResult<()> {
102105
self.client.export(batch).await
103106
}
104107

opentelemetry-sdk/CHANGELOG.md

+12
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@
1111
logger provider.
1212
- Removed dependency on `ordered-float`.
1313

14+
- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
15+
Update `LogProcessor::emit() method to take mutable reference to LogData. This is breaking
16+
change for LogProcessor developers. If the processor needs to invoke the exporter
17+
asynchronously, it should clone the data to ensure it can be safely processed without
18+
lifetime issues. Any changes made to the log data before cloning in this method will be
19+
reflected in the next log processor in the chain, as well as to the exporter.
20+
- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
21+
Update `LogExporter::export() method to accept a batch of log data, which can be either a
22+
reference or owned `LogData`. If the exporter needs to process the log data
23+
asynchronously, it should clone the log data to ensure it can be safely processed without
24+
lifetime issues.
25+
1426
## v0.23.0
1527

1628
- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed

opentelemetry-sdk/benches/log.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
//! run with `$ cargo bench --bench log --features=logs -- --exact <test_name>` to run specific test for logs
2+
//! So to run test named "full-log-with-attributes/with-context" you would run `$ cargo bench --bench log --features=logs -- --exact full-log-with-attributes/with-context`
3+
//! To run all tests for logs you would run `$ cargo bench --bench log --features=logs`
4+
//!
5+
16
use std::collections::HashMap;
27
use std::time::SystemTime;
38

@@ -19,7 +24,7 @@ struct VoidExporter;
1924

2025
#[async_trait]
2126
impl LogExporter for VoidExporter {
22-
async fn export(&mut self, _batch: Vec<LogData>) -> LogResult<()> {
27+
async fn export<'a>(&mut self, _batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
2328
LogResult::Ok(())
2429
}
2530
}

opentelemetry-sdk/src/export/logs/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ use opentelemetry::{
88
logs::{LogError, LogResult},
99
InstrumentationLibrary,
1010
};
11+
use std::borrow::Cow;
1112
use std::fmt::Debug;
1213

1314
/// `LogExporter` defines the interface that log exporters should implement.
1415
#[async_trait]
1516
pub trait LogExporter: Send + Sync + Debug {
1617
/// Exports a batch of [`LogData`].
17-
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
18+
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
1819
/// Shuts down the exporter.
1920
fn shutdown(&mut self) {}
2021
#[cfg(feature = "logs_level_enabled")]

opentelemetry-sdk/src/logs/log_emitter.rs

+15-14
Original file line numberDiff line numberDiff line change
@@ -256,20 +256,21 @@ impl opentelemetry::logs::Logger for Logger {
256256
cx.has_active_span()
257257
.then(|| TraceContext::from(cx.span().span_context()))
258258
});
259+
let mut log_record = record;
260+
if let Some(ref trace_context) = trace_context {
261+
log_record.trace_context = Some(trace_context.clone());
262+
}
263+
if log_record.observed_timestamp.is_none() {
264+
log_record.observed_timestamp = Some(SystemTime::now());
265+
}
266+
267+
let mut data = LogData {
268+
record: log_record,
269+
instrumentation: self.instrumentation_library().clone(),
270+
};
259271

260272
for p in processors {
261-
let mut cloned_record = record.clone();
262-
if let Some(ref trace_context) = trace_context {
263-
cloned_record.trace_context = Some(trace_context.clone());
264-
}
265-
if cloned_record.observed_timestamp.is_none() {
266-
cloned_record.observed_timestamp = Some(SystemTime::now());
267-
}
268-
let data = LogData {
269-
record: cloned_record,
270-
instrumentation: self.instrumentation_library().clone(),
271-
};
272-
p.emit(data);
273+
p.emit(&mut data);
273274
}
274275
}
275276

@@ -326,7 +327,7 @@ mod tests {
326327
}
327328

328329
impl LogProcessor for ShutdownTestLogProcessor {
329-
fn emit(&self, _data: LogData) {
330+
fn emit(&self, _data: &mut LogData) {
330331
self.is_shutdown
331332
.lock()
332333
.map(|is_shutdown| {
@@ -561,7 +562,7 @@ mod tests {
561562
}
562563

563564
impl LogProcessor for LazyLogProcessor {
564-
fn emit(&self, _data: LogData) {
565+
fn emit(&self, _data: &mut LogData) {
565566
// nothing to do.
566567
}
567568

0 commit comments

Comments
 (0)