Skip to content

Commit 3399573

Browse files
committed
initial commit
1 parent 19d3223 commit 3399573

File tree

11 files changed

+62
-48
lines changed

11 files changed

+62
-48
lines changed

opentelemetry-appender-tracing/benches/logs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ struct NoopExporter {
3333

3434
#[async_trait]
3535
impl LogExporter for NoopExporter {
36-
async fn export(&mut self, _: Vec<LogData>) -> LogResult<()> {
36+
async fn export<'a>(&mut self, _: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
3737
LogResult::Ok(())
3838
}
3939

@@ -54,7 +54,7 @@ impl NoopProcessor {
5454
}
5555

5656
impl LogProcessor for NoopProcessor {
57-
fn emit(&self, _: LogData) {
57+
fn emit(&self, _: &mut LogData) {
5858
// no-op
5959
}
6060

opentelemetry-otlp/src/exporter/http/logs.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::OtlpHttpClient;
99

1010
#[async_trait]
1111
impl LogExporter for OtlpHttpClient {
12-
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
12+
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
1313
let client = self
1414
.client
1515
.lock()
@@ -19,7 +19,12 @@ impl LogExporter for OtlpHttpClient {
1919
_ => Err(LogError::Other("exporter is already shut down".into())),
2020
})?;
2121

22-
let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? };
22+
//TBD :avoid cloning and work only on borrowed logdata
23+
let owned_batch = batch.into_iter()
24+
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
25+
.collect::<Vec<LogData>>();
26+
27+
let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
2328
let mut request = http::Request::builder()
2429
.method(Method::POST)
2530
.uri(&self.collector_endpoint)

opentelemetry-otlp/src/exporter/tonic/logs.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ impl TonicLogsClient {
5252

5353
#[async_trait]
5454
impl LogExporter for TonicLogsClient {
55-
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
55+
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
56+
// clone if batch is borrowed
5657
let (mut client, metadata, extensions) = match &mut self.inner {
5758
Some(inner) => {
5859
let (m, e, _) = inner
@@ -65,9 +66,11 @@ impl LogExporter for TonicLogsClient {
6566
None => return Err(LogError::Other("exporter is already shut down".into())),
6667
};
6768

69+
// TBD: We can avoid cloning and work only on logdata reference
6870
let resource_logs = {
6971
batch
7072
.into_iter()
73+
.map(|log_data_cow| (log_data_cow.into_owned()))
7174
.map(|log_data| (log_data, &self.resource))
7275
.map(Into::into)
7376
.collect()

opentelemetry-otlp/src/logs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl LogExporter {
9898

9999
#[async_trait]
100100
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
101-
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
101+
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> opentelemetry::logs::LogResult<()> {
102102
self.client.export(batch).await
103103
}
104104

opentelemetry-sdk/benches/log.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ struct VoidExporter;
1919

2020
#[async_trait]
2121
impl LogExporter for VoidExporter {
22-
async fn export(&mut self, _batch: Vec<LogData>) -> LogResult<()> {
22+
async fn export<'a>(&mut self, _batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
2323
LogResult::Ok(())
2424
}
2525
}

opentelemetry-sdk/src/export/logs/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ use opentelemetry::{
99
InstrumentationLibrary,
1010
};
1111
use std::fmt::Debug;
12+
use std::borrow::Cow;
1213

1314
/// `LogExporter` defines the interface that log exporters should implement.
1415
#[async_trait]
1516
pub trait LogExporter: Send + Sync + Debug {
1617
/// Exports a batch of [`LogData`].
17-
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
18+
//async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()>;
19+
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
20+
1821
/// Shuts down the exporter.
1922
fn shutdown(&mut self) {}
2023
#[cfg(feature = "logs_level_enabled")]

opentelemetry-sdk/src/logs/log_emitter.rs

+13-11
Original file line numberDiff line numberDiff line change
@@ -236,17 +236,19 @@ impl opentelemetry::logs::Logger for Logger {
236236
cx.has_active_span()
237237
.then(|| TraceContext::from(cx.span().span_context()))
238238
});
239+
let mut log_record = record;
240+
if let Some(ref trace_context) = trace_context {
241+
log_record.trace_context = Some(trace_context.clone());
242+
}
243+
244+
let mut data = LogData {
245+
record: log_record,
246+
instrumentation: self.instrumentation_library().clone(),
247+
};
239248

240249
for p in processors {
241-
let mut cloned_record = record.clone();
242-
if let Some(ref trace_context) = trace_context {
243-
cloned_record.trace_context = Some(trace_context.clone());
244-
}
245-
let data = LogData {
246-
record: cloned_record,
247-
instrumentation: self.instrumentation_library().clone(),
248-
};
249-
p.emit(data);
250+
//let mut cloned_record = record.clone();
251+
p.emit(&mut data);
250252
}
251253
}
252254

@@ -303,7 +305,7 @@ mod tests {
303305
}
304306

305307
impl LogProcessor for ShutdownTestLogProcessor {
306-
fn emit(&self, _data: LogData) {
308+
fn emit(&self, _data: &mut LogData) {
307309
self.is_shutdown
308310
.lock()
309311
.map(|is_shutdown| {
@@ -535,7 +537,7 @@ mod tests {
535537
}
536538

537539
impl LogProcessor for LazyLogProcessor {
538-
fn emit(&self, _data: LogData) {
540+
fn emit(&self, _data: &mut LogData) {
539541
// nothing to do.
540542
}
541543

opentelemetry-sdk/src/logs/log_processor.rs

+20-21
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::{
2222
sync::Arc,
2323
time::Duration,
2424
};
25+
use std::borrow::Cow;
2526

2627
/// Delay interval between two consecutive exports.
2728
const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
@@ -45,7 +46,7 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
4546
/// [`Logger`]: crate::logs::Logger
4647
pub trait LogProcessor: Send + Sync + Debug {
4748
/// Called when a log record is ready to processed and exported.
48-
fn emit(&self, data: LogData);
49+
fn emit(&self, data: &mut LogData);
4950
/// Force the logs lying in the cache to be exported.
5051
fn force_flush(&self) -> LogResult<()>;
5152
/// Shuts down the processor.
@@ -80,7 +81,7 @@ impl SimpleLogProcessor {
8081
}
8182

8283
impl LogProcessor for SimpleLogProcessor {
83-
fn emit(&self, data: LogData) {
84+
fn emit(&self, data: &mut LogData) {
8485
// noop after shutdown
8586
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
8687
return;
@@ -90,7 +91,7 @@ impl LogProcessor for SimpleLogProcessor {
9091
.exporter
9192
.lock()
9293
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
93-
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![data])));
94+
.and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)])));
9495
if let Err(err) = result {
9596
global::handle_error(err);
9697
}
@@ -140,8 +141,8 @@ impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
140141
}
141142

142143
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
143-
fn emit(&self, data: LogData) {
144-
let result = self.message_sender.try_send(BatchMessage::ExportLog(data));
144+
fn emit(&self, data: &mut LogData) {
145+
let result = self.message_sender.try_send(BatchMessage::ExportLog(data.clone()));
145146

146147
if let Err(err) = result {
147148
global::handle_error(LogError::Other(err.into()));
@@ -201,7 +202,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
201202
match message {
202203
// Log has finished, add to buffer of pending logs.
203204
BatchMessage::ExportLog(log) => {
204-
logs.push(log);
205+
logs.push(Cow::Owned(log));
205206

206207
if logs.len() == config.max_export_batch_size {
207208
let result = export_with_timeout(
@@ -285,11 +286,11 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
285286
}
286287
}
287288

288-
async fn export_with_timeout<R, E>(
289+
async fn export_with_timeout<'a, R, E>(
289290
time_out: Duration,
290291
exporter: &mut E,
291292
runtime: &R,
292-
batch: Vec<LogData>,
293+
batch: Vec<Cow<'a, LogData>>,
293294
) -> ExportResult
294295
where
295296
R: RuntimeChannel,
@@ -496,6 +497,7 @@ mod tests {
496497
OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
497498
};
498499
use crate::testing::logs::InMemoryLogsExporterBuilder;
500+
use std::borrow::Cow;
499501
use crate::{
500502
export::logs::{LogData, LogExporter},
501503
logs::{
@@ -522,7 +524,7 @@ mod tests {
522524

523525
#[async_trait]
524526
impl LogExporter for MockLogExporter {
525-
async fn export(&mut self, _batch: Vec<LogData>) -> LogResult<()> {
527+
async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
526528
Ok(())
527529
}
528530

@@ -744,17 +746,15 @@ mod tests {
744746
BatchConfig::default(),
745747
runtime::Tokio,
746748
);
747-
processor.emit(LogData {
749+
let mut log_data = LogData {
748750
record: Default::default(),
749751
instrumentation: Default::default(),
750-
});
752+
};
753+
processor.emit(&mut log_data);
751754
processor.force_flush().unwrap();
752755
processor.shutdown().unwrap();
753756
// todo: expect to see errors here. How should we assert this?
754-
processor.emit(LogData {
755-
record: Default::default(),
756-
instrumentation: Default::default(),
757-
});
757+
processor.emit(&mut log_data);
758758
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
759759
}
760760

@@ -765,10 +765,12 @@ mod tests {
765765
.build();
766766
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
767767

768-
processor.emit(LogData {
768+
let mut log_data = LogData{
769769
record: Default::default(),
770770
instrumentation: Default::default(),
771-
});
771+
};
772+
773+
processor.emit(&mut log_data);
772774

773775
processor.shutdown().unwrap();
774776

@@ -777,10 +779,7 @@ mod tests {
777779
.load(std::sync::atomic::Ordering::Relaxed);
778780
assert!(is_shutdown);
779781

780-
processor.emit(LogData {
781-
record: Default::default(),
782-
instrumentation: Default::default(),
783-
});
782+
processor.emit(&mut log_data);
784783

785784
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
786785
}

opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,12 @@ impl InMemoryLogsExporter {
175175

176176
#[async_trait]
177177
impl LogExporter for InMemoryLogsExporter {
178-
async fn export(&mut self, batch: Vec<LogData>) -> LogResult<()> {
179-
self.logs
180-
.lock()
181-
.map(|mut logs_guard| logs_guard.append(&mut batch.clone()))
182-
.map_err(LogError::from)
178+
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
179+
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
180+
for log in batch.into_iter() {
181+
logs_guard.push((*log).clone());
182+
}
183+
Ok(())
183184
}
184185
fn shutdown(&mut self) {
185186
if self.should_reset_on_shutdown {

opentelemetry-stdout/src/logs/exporter.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use opentelemetry::{
77
use opentelemetry_sdk::export::logs::{ExportResult, LogData};
88
use opentelemetry_sdk::Resource;
99
use std::io::{stdout, Write};
10+
use std::borrow::Cow;
1011

1112
type Encoder =
1213
Box<dyn Fn(&mut dyn Write, crate::logs::transform::LogData) -> LogResult<()> + Send + Sync>;
@@ -44,9 +45,9 @@ impl fmt::Debug for LogExporter {
4445
#[async_trait]
4546
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
4647
/// Export spans to stdout
47-
async fn export(&mut self, batch: Vec<LogData>) -> ExportResult {
48+
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> ExportResult {
4849
if let Some(writer) = &mut self.writer {
49-
let log_data = crate::logs::transform::LogData::from((batch, &self.resource));
50+
let log_data = crate::logs::transform::LogData::from((batch.into_iter().map(Cow::into_owned).collect(), &self.resource));
5051
let result = (self.encoder)(writer, log_data) as LogResult<()>;
5152
result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into()))
5253
} else {

stress/src/logs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod throughput;
99
pub struct NoOpLogProcessor;
1010

1111
impl LogProcessor for NoOpLogProcessor {
12-
fn emit(&self, _data: opentelemetry_sdk::export::logs::LogData) {}
12+
fn emit(&self, _data: &mut opentelemetry_sdk::export::logs::LogData) {}
1313

1414
fn force_flush(&self) -> opentelemetry::logs::LogResult<()> {
1515
Ok(())

0 commit comments

Comments
 (0)