Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify LogExporter::Export interface #2041

Merged
merged 5 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::LogResult;
use opentelemetry::KeyValue;
use opentelemetry::{InstrumentationLibrary, KeyValue};
use opentelemetry_appender_tracing::layer as tracing_layer;
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider};
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, LoggerProvider};
use opentelemetry_sdk::Resource;
use pprof::criterion::{Output, PProfProfiler};
use tracing::error;
Expand All @@ -34,7 +34,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export<'a>(&mut self, _: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export(&mut self, _: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {
LogResult::Ok(())
}

Expand Down
14 changes: 5 additions & 9 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::export::logs::LogExporter;
use opentelemetry_sdk::logs::LogRecord;

use super::OtlpHttpClient;

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {

Check warning on line 14 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L14

Added line #L14 was not covered by tests
let client = self
.client
.lock()
Expand All @@ -19,13 +21,7 @@
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let (body, content_type) = { self.build_logs_export_body(owned_batch)? };
let (body, content_type) = { self.build_logs_export_body(batch)? };

Check warning on line 24 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L24

Added line #L24 was not covered by tests
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down
8 changes: 5 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
OTEL_EXPORTER_OTLP_TIMEOUT,
};
use http::{HeaderName, HeaderValue, Uri};
#[cfg(feature = "logs")]
use opentelemetry::InstrumentationLibrary;
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogData;
#[cfg(feature = "trace")]
use opentelemetry_sdk::export::trace::SpanData;
#[cfg(feature = "logs")]
use opentelemetry_sdk::logs::LogRecord;
#[cfg(feature = "metrics")]
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use prost::Message;
Expand Down Expand Up @@ -328,7 +330,7 @@
#[cfg(feature = "logs")]
fn build_logs_export_body(
&self,
logs: Vec<LogData>,
logs: Vec<(&LogRecord, &InstrumentationLibrary)>,

Check warning on line 333 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L333

Added line #L333 was not covered by tests
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
Expand Down
14 changes: 5 additions & 9 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
use opentelemetry_proto::tonic::collector::logs::v1::{
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
};
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use opentelemetry_sdk::export::logs::LogExporter;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::logs::LogRecord;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
Expand Down Expand Up @@ -54,7 +56,7 @@

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export<'a>(&mut self, batch: Vec<std::borrow::Cow<'a, LogData>>) -> LogResult<()> {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {

Check warning on line 59 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L59

Added line #L59 was not covered by tests
let (mut client, metadata, extensions) = match &mut self.inner {
Some(inner) => {
let (m, e, _) = inner
Expand All @@ -67,13 +69,7 @@
None => return Err(LogError::Other("exporter is already shut down".into())),
};

//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);

Check warning on line 72 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L72

Added line #L72 was not covered by tests

client
.export(Request::from_parts(
Expand Down
10 changes: 4 additions & 6 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
use async_trait::async_trait;
use std::fmt::Debug;

use opentelemetry::logs::LogError;
use opentelemetry::logs::{LogError, LogResult};
use opentelemetry::InstrumentationLibrary;

use opentelemetry_sdk::{export::logs::LogData, runtime::RuntimeChannel, Resource};
use opentelemetry_sdk::{logs::LogRecord, runtime::RuntimeChannel, Resource};

/// Compression algorithm to use, defaults to none.
pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION";
Expand Down Expand Up @@ -98,10 +99,7 @@

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export<'a>(
&mut self,
batch: Vec<std::borrow::Cow<'a, LogData>>,
) -> opentelemetry::logs::LogResult<()> {
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> {

Check warning on line 102 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L102

Added line #L102 was not covered by tests
self.client.export(batch).await
}

Expand Down
82 changes: 47 additions & 35 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
}
}

impl From<opentelemetry_sdk::logs::LogRecord> for LogRecord {
fn from(log_record: opentelemetry_sdk::logs::LogRecord) -> Self {
impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord {
fn from(log_record: &opentelemetry_sdk::logs::LogRecord) -> Self {
let trace_context = log_record.trace_context.as_ref();
let severity_number = match log_record.severity_number {
Some(Severity::Trace) => SeverityNumber::Trace,
Expand Down Expand Up @@ -118,7 +118,7 @@
},
severity_number: severity_number.into(),
severity_text: log_record.severity_text.map(Into::into).unwrap_or_default(),
body: log_record.body.map(Into::into),
body: log_record.body.clone().map(Into::into),
dropped_attributes_count: 0,
flags: trace_context
.map(|ctx| {
Expand All @@ -139,17 +139,23 @@

impl
From<(
opentelemetry_sdk::export::logs::LogData,
(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
),
&ResourceAttributesWithSchema,
)> for ResourceLogs
{
fn from(
data: (
opentelemetry_sdk::export::logs::LogData,
(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
),

Check warning on line 154 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L151-L154

Added lines #L151 - L154 were not covered by tests
&ResourceAttributesWithSchema,
),
) -> Self {
let (log_data, resource) = data;
let ((log_record, instrumentation), resource) = data;

Check warning on line 158 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L158

Added line #L158 was not covered by tests

ResourceLogs {
resource: Some(Resource {
Expand All @@ -158,37 +164,44 @@
}),
schema_url: resource.schema_url.clone().unwrap_or_default(),
scope_logs: vec![ScopeLogs {
schema_url: log_data
.instrumentation
schema_url: instrumentation

Check warning on line 167 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L167

Added line #L167 was not covered by tests
.schema_url
.clone()
.map(Into::into)
.unwrap_or_default(),
scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()),
log_records: vec![log_data.record.into()],
scope: Some((instrumentation, log_record.target.clone()).into()),
log_records: vec![log_record.into()],

Check warning on line 173 in opentelemetry-proto/src/transform/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/logs.rs#L172-L173

Added lines #L172 - L173 were not covered by tests
}],
}
}
}

pub fn group_logs_by_resource_and_scope(
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
logs: Vec<(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
)>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
let scope_map = logs.iter().fold(
HashMap::new(),
|mut scope_map: HashMap<
Cow<'static, str>,
Vec<&opentelemetry_sdk::export::logs::LogData>,
Vec<(
&opentelemetry_sdk::logs::LogRecord,
&opentelemetry::InstrumentationLibrary,
)>,
>,
log| {
let key = log
.record
(log_record, instrumentation)| {
let key = log_record
.target
.clone()
.unwrap_or_else(|| log.instrumentation.name.clone());
scope_map.entry(key).or_default().push(log);
.unwrap_or_else(|| Cow::Owned(instrumentation.name.clone().into_owned()));
scope_map
.entry(key)
.or_default()
.push((log_record, instrumentation));
scope_map
},
);
Expand All @@ -197,13 +210,13 @@
.into_iter()
.map(|(key, log_data)| ScopeLogs {
scope: Some(InstrumentationScope::from((
&log_data.first().unwrap().instrumentation,
Some(key),
log_data.first().unwrap().1,
Some(key.into_owned().into()),
))),
schema_url: resource.schema_url.clone().unwrap_or_default(),
log_records: log_data
.into_iter()
.map(|log_data| log_data.record.clone().into())
.map(|(log_record, _)| log_record.into())
.collect(),
})
.collect();
Expand All @@ -223,30 +236,29 @@
mod tests {
use crate::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry::logs::LogRecord as _;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry::InstrumentationLibrary;
use opentelemetry_sdk::{logs::LogRecord, Resource};
use std::time::SystemTime;

fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
fn create_test_log_data(
instrumentation_name: &str,
_message: &str,
) -> (LogRecord, InstrumentationLibrary) {
let mut logrecord = LogRecord::default();
logrecord.set_timestamp(SystemTime::now());
logrecord.set_observed_timestamp(SystemTime::now());
LogData {
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
instrumentation_name.to_string(),
)
.build(),
record: logrecord,
}
let instrumentation =
InstrumentationLibrary::builder(instrumentation_name.to_string()).build();
(logrecord, instrumentation)
}

#[test]
fn test_group_logs_by_resource_and_scope_single_scope() {
let resource = Resource::default();
let log1 = create_test_log_data("test-lib", "Log 1");
let log2 = create_test_log_data("test-lib", "Log 2");
let (log_record1, instrum_lib1) = create_test_log_data("test-lib", "Log 1");
let (log_record2, instrum_lib2) = create_test_log_data("test-lib", "Log 2");

let logs = vec![log1, log2];
let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
Expand All @@ -263,10 +275,10 @@
#[test]
fn test_group_logs_by_resource_and_scope_multiple_scopes() {
let resource = Resource::default();
let log1 = create_test_log_data("lib1", "Log 1");
let log2 = create_test_log_data("lib2", "Log 2");
let (log_record1, instrum_lib1) = create_test_log_data("lib1", "Log 1");
let (log_record2, instrum_lib2) = create_test_log_data("lib2", "Log 2");

let logs = vec![log1, log2];
let logs = vec![(&log_record1, &instrum_lib1), (&log_record2, &instrum_lib2)];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);
Expand Down
12 changes: 12 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@
[#2021](https://github.com/open-telemetry/opentelemetry-rust/pull/2021)
- Provide default implementation for `event_enabled` method in `LogProcessor`
trait that returns `true` always.
- **Breaking** [#2041](https://github.com/open-telemetry/opentelemetry-rust/pull/2041)
- The Exporter::export() interface is modified as below:
Previous Signature:
```rust
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
```

Updated Signature:
```rust
async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>;
```
This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures.

## v0.24.1

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use opentelemetry::logs::{
use opentelemetry::trace::Tracer;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::Key;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::logs::LogData;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::{Logger, LoggerProvider};
use opentelemetry_sdk::trace;
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, Criterion};

use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};

use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::logs::LogData;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LoggerProvider;
use pprof::criterion::{Output, PProfProfiler};
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
use criterion::{criterion_group, criterion_main, Criterion};
use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity};
use opentelemetry_sdk::{
export::logs::LogData,
logs::LogData,
logs::{LogProcessor, LogRecord, Logger, LoggerProvider},
};

Expand Down
Loading
Loading