Skip to content

Commit 8c60749

Browse files
authored
[Logs SDK] Send resource once to processor and exporter, and not for every event. (open-telemetry#1636)
1 parent 213fa3f commit 8c60749

File tree

13 files changed

+254
-50
lines changed

13 files changed

+254
-50
lines changed

opentelemetry-otlp/src/exporter/http/logs.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ impl LogExporter for OtlpHttpClient {
1919
_ => Err(LogError::Other("exporter is already shut down".into())),
2020
})?;
2121

22-
let (body, content_type) = self.build_logs_export_body(batch)?;
22+
let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? };
2323
let mut request = http::Request::builder()
2424
.method(Method::POST)
2525
.uri(&self.collector_endpoint)
@@ -50,4 +50,8 @@ impl LogExporter for OtlpHttpClient {
5050
fn shutdown(&mut self) {
5151
let _ = self.client.lock().map(|mut c| c.take());
5252
}
53+
54+
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
55+
self.resource = resource.into();
56+
}
5357
}

opentelemetry-otlp/src/exporter/http/mod.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use crate::{
55
};
66
use http::{HeaderName, HeaderValue, Uri};
77
use opentelemetry_http::HttpClient;
8+
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;
9+
810
#[cfg(feature = "logs")]
911
use opentelemetry_sdk::export::logs::LogData;
1012
#[cfg(feature = "trace")]
@@ -274,6 +276,9 @@ struct OtlpHttpClient {
274276
headers: HashMap<HeaderName, HeaderValue>,
275277
protocol: Protocol,
276278
_timeout: Duration,
279+
#[allow(dead_code)]
280+
// <allow dead> would be removed once we support set_resource for metrics and traces.
281+
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
277282
}
278283

279284
impl OtlpHttpClient {
@@ -291,6 +296,7 @@ impl OtlpHttpClient {
291296
headers,
292297
protocol,
293298
_timeout: timeout,
299+
resource: ResourceAttributesWithSchema::default(),
294300
}
295301
}
296302

@@ -318,12 +324,15 @@ impl OtlpHttpClient {
318324
fn build_logs_export_body(
319325
&self,
320326
logs: Vec<LogData>,
327+
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
321328
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
322329
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
330+
let resource_logs = logs
331+
.into_iter()
332+
.map(|log_event| (log_event, resource).into())
333+
.collect::<Vec<_>>();
334+
let req = ExportLogsServiceRequest { resource_logs };
323335

324-
let req = ExportLogsServiceRequest {
325-
resource_logs: logs.into_iter().map(Into::into).collect(),
326-
};
327336
match self.protocol {
328337
#[cfg(feature = "http-json")]
329338
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {

opentelemetry-otlp/src/exporter/tonic/logs.rs

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use core::fmt;
2-
31
use async_trait::async_trait;
2+
use core::fmt;
43
use opentelemetry::logs::{LogError, LogResult};
54
use opentelemetry_proto::tonic::collector::logs::v1::{
65
logs_service_client::LogsServiceClient, ExportLogsServiceRequest,
@@ -12,6 +11,9 @@ use super::BoxInterceptor;
1211

1312
pub(crate) struct TonicLogsClient {
1413
inner: Option<ClientInner>,
14+
#[allow(dead_code)]
15+
// <allow dead> would be removed once we support set_resource for metrics and traces.
16+
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
1517
}
1618

1719
struct ClientInner {
@@ -43,6 +45,7 @@ impl TonicLogsClient {
4345
client,
4446
interceptor,
4547
}),
48+
resource: Default::default(),
4649
}
4750
}
4851
}
@@ -62,13 +65,19 @@ impl LogExporter for TonicLogsClient {
6265
None => return Err(LogError::Other("exporter is already shut down".into())),
6366
};
6467

68+
let resource_logs = {
69+
batch
70+
.into_iter()
71+
.map(|log_data| (log_data, &self.resource))
72+
.map(Into::into)
73+
.collect()
74+
};
75+
6576
client
6677
.export(Request::from_parts(
6778
metadata,
6879
extensions,
69-
ExportLogsServiceRequest {
70-
resource_logs: batch.into_iter().map(Into::into).collect(),
71-
},
80+
ExportLogsServiceRequest { resource_logs },
7281
))
7382
.await
7483
.map_err(crate::Error::from)?;
@@ -79,4 +88,8 @@ impl LogExporter for TonicLogsClient {
7988
fn shutdown(&mut self) {
8089
let _ = self.inner.take();
8190
}
91+
92+
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
93+
self.resource = resource.into();
94+
}
8295
}

opentelemetry-otlp/src/logs.rs

+4
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
101101
async fn export(&mut self, batch: Vec<LogData>) -> opentelemetry::logs::LogResult<()> {
102102
self.client.export(batch).await
103103
}
104+
105+
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
106+
self.client.set_resource(resource);
107+
}
104108
}
105109

106110
/// Recommended configuration for an OTLP exporter pipeline.

opentelemetry-proto/src/transform/common.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,23 @@ pub mod tonic {
2222
use opentelemetry::{Array, Value};
2323
use std::borrow::Cow;
2424

25+
#[cfg(any(feature = "trace", feature = "logs"))]
26+
#[derive(Debug, Default)]
27+
pub struct ResourceAttributesWithSchema {
28+
pub attributes: Attributes,
29+
pub schema_url: Option<String>,
30+
}
31+
32+
#[cfg(any(feature = "trace", feature = "logs"))]
33+
impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema {
34+
fn from(resource: &opentelemetry_sdk::Resource) -> Self {
35+
ResourceAttributesWithSchema {
36+
attributes: resource_attributes(resource),
37+
schema_url: resource.schema_url().map(ToString::to_string),
38+
}
39+
}
40+
}
41+
2542
#[cfg(any(feature = "trace", feature = "logs"))]
2643
use opentelemetry_sdk::Resource;
2744

@@ -52,7 +69,7 @@ pub mod tonic {
5269
}
5370

5471
/// Wrapper type for Vec<`KeyValue`>
55-
#[derive(Default)]
72+
#[derive(Default, Debug)]
5673
pub struct Attributes(pub ::std::vec::Vec<crate::proto::tonic::common::v1::KeyValue>);
5774

5875
impl From<Vec<opentelemetry::KeyValue>> for Attributes {

opentelemetry-proto/src/transform/logs.rs

+17-9
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub mod tonic {
77
resource::v1::Resource,
88
Attributes,
99
},
10-
transform::common::{to_nanos, tonic::resource_attributes},
10+
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
1111
};
1212
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
1313

@@ -110,18 +110,26 @@ pub mod tonic {
110110
}
111111
}
112112

113-
impl From<opentelemetry_sdk::export::logs::LogData> for ResourceLogs {
114-
fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self {
113+
impl
114+
From<(
115+
opentelemetry_sdk::export::logs::LogData,
116+
&ResourceAttributesWithSchema,
117+
)> for ResourceLogs
118+
{
119+
fn from(
120+
data: (
121+
opentelemetry_sdk::export::logs::LogData,
122+
&ResourceAttributesWithSchema,
123+
),
124+
) -> Self {
125+
let (log_data, resource) = data;
126+
115127
ResourceLogs {
116128
resource: Some(Resource {
117-
attributes: resource_attributes(&log_data.resource).0,
129+
attributes: resource.attributes.0.clone(),
118130
dropped_attributes_count: 0,
119131
}),
120-
schema_url: log_data
121-
.resource
122-
.schema_url()
123-
.map(Into::into)
124-
.unwrap_or_default(),
132+
schema_url: resource.schema_url.clone().unwrap_or_default(),
125133
scope_logs: vec![ScopeLogs {
126134
schema_url: log_data
127135
.instrumentation

opentelemetry-sdk/CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@
1212
- **Breaking** [#1624](https://github.com/open-telemetry/opentelemetry-rust/pull/1624) Remove `OsResourceDetector` and
1313
`ProcessResourceDetector` resource detectors, use the
1414
[`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead.
15+
- [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Improves performance by sending
16+
Resource information to processors (and exporters) once, instead of sending with every log. If you are an author
17+
of Processor, Exporter, the following are *BREAKING* changes.
18+
- Implement `set_resource` method in your custom LogProcessor, which invokes exporter's `set_resource`.
19+
- Implement `set_resource` method in your custom LogExporter. This method should save the resource object
20+
in original or serialized format, to be merged with every log event during export.
21+
- `LogData` doesn't have the resource attributes. The `LogExporter::export()` method needs to merge it
22+
with the earlier preserved resource before export.
1523
- Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640)
1624
- Improves `shutdown` behavior of `LoggerProvider` and `LogProcessor` [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643).
1725
- `shutdown` can be called by any clone of the `LoggerProvider` without the need of waiting on all `Logger` drops. Thus, `try_shutdown` has been removed.

opentelemetry-sdk/src/export/logs/mod.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use opentelemetry::{
77
logs::{LogError, LogRecord, LogResult},
88
InstrumentationLibrary,
99
};
10-
use std::{borrow::Cow, fmt::Debug};
10+
use std::fmt::Debug;
1111

1212
/// `LogExporter` defines the interface that log exporters should implement.
1313
#[async_trait]
@@ -21,17 +21,16 @@ pub trait LogExporter: Send + Sync + Debug {
2121
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
2222
true
2323
}
24+
/// Set the resource for the exporter.
25+
fn set_resource(&mut self, _resource: &Resource) {}
2426
}
2527

26-
/// `LogData` associates a [`LogRecord`] with a [`Resource`] and
27-
/// [`InstrumentationLibrary`].
28+
/// `LogData` represents a single log event without resource context.
2829
#[derive(Clone, Debug)]
2930
pub struct LogData {
3031
/// Log record
3132
pub record: LogRecord,
32-
/// Resource for the emitter who produced this `LogData`.
33-
pub resource: Cow<'static, Resource>,
34-
/// Instrumentation details for the emitter who produced this `LogData`.
33+
/// Instrumentation details for the emitter who produced this `LogEvent`.
3534
pub instrumentation: InstrumentationLibrary,
3635
}
3736

opentelemetry-sdk/src/logs/log_emitter.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ impl Builder {
176176

177177
/// Create a new provider from this configuration.
178178
pub fn build(self) -> LoggerProvider {
179+
// invoke set_resource on all the processors
180+
for processor in &self.processors {
181+
processor.set_resource(&self.config.resource);
182+
}
179183
LoggerProvider {
180184
inner: Arc::new(LoggerProviderInner {
181185
processors: self.processors,
@@ -221,20 +225,19 @@ impl opentelemetry::logs::Logger for Logger {
221225
/// Emit a `LogRecord`.
222226
fn emit(&self, record: LogRecord) {
223227
let provider = self.provider();
224-
let config = provider.config();
225228
let processors = provider.log_processors();
226229
let trace_context = Context::map_current(|cx| {
227230
cx.has_active_span()
228231
.then(|| TraceContext::from(cx.span().span_context()))
229232
});
233+
230234
for p in processors {
231-
let mut record = record.clone();
235+
let mut cloned_record = record.clone();
232236
if let Some(ref trace_context) = trace_context {
233-
record.trace_context = Some(trace_context.clone())
237+
cloned_record.trace_context = Some(trace_context.clone());
234238
}
235239
let data = LogData {
236-
record,
237-
resource: config.resource.clone(),
240+
record: cloned_record,
238241
instrumentation: self.instrumentation_library().clone(),
239242
};
240243
p.emit(data);

0 commit comments

Comments
 (0)