diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 1d8e177693..4bad1cc39e 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -19,7 +19,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = self.build_logs_export_body(batch)?; + let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) @@ -50,4 +50,8 @@ impl LogExporter for OtlpHttpClient { fn shutdown(&mut self) { let _ = self.client.lock().map(|mut c| c.take()); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 884971e028..fc46953ed1 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -5,6 +5,8 @@ use crate::{ }; use http::{HeaderName, HeaderValue, Uri}; use opentelemetry_http::HttpClient; +use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; + #[cfg(feature = "logs")] use opentelemetry_sdk::export::logs::LogData; #[cfg(feature = "trace")] @@ -274,6 +276,9 @@ struct OtlpHttpClient { headers: HashMap, protocol: Protocol, _timeout: Duration, + #[allow(dead_code)] + // would be removed once we support set_resource for metrics and traces. + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } impl OtlpHttpClient { @@ -291,6 +296,7 @@ impl OtlpHttpClient { headers, protocol, _timeout: timeout, + resource: ResourceAttributesWithSchema::default(), } } @@ -318,12 +324,15 @@ impl OtlpHttpClient { fn build_logs_export_body( &self, logs: Vec, + resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; + let resource_logs = logs + .into_iter() + .map(|log_event| (log_event, resource).into()) + .collect::>(); + let req = ExportLogsServiceRequest { resource_logs }; - let req = ExportLogsServiceRequest { - resource_logs: logs.into_iter().map(Into::into).collect(), - }; match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 4b5a5787e3..65759f7a4b 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,6 +1,5 @@ -use core::fmt; - use async_trait::async_trait; +use core::fmt; use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, @@ -12,6 +11,9 @@ use super::BoxInterceptor; pub(crate) struct TonicLogsClient { inner: Option, + #[allow(dead_code)] + // would be removed once we support set_resource for metrics and traces. + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { @@ -43,6 +45,7 @@ impl TonicLogsClient { client, interceptor, }), + resource: Default::default(), } } } @@ -62,13 +65,19 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; + let resource_logs = { + batch + .into_iter() + .map(|log_data| (log_data, &self.resource)) + .map(Into::into) + .collect() + }; + client .export(Request::from_parts( metadata, extensions, - ExportLogsServiceRequest { - resource_logs: batch.into_iter().map(Into::into).collect(), - }, + ExportLogsServiceRequest { resource_logs }, )) .await .map_err(crate::Error::from)?; @@ -79,4 +88,8 @@ impl LogExporter for TonicLogsClient { fn shutdown(&mut self) { let _ = self.inner.take(); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 43045a7742..714d4508a3 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -101,6 +101,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { async fn export(&mut self, batch: Vec) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.client.set_resource(resource); + } } /// Recommended configuration for an OTLP exporter pipeline. diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index 9aa29e4b22..8197d90574 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -22,6 +22,23 @@ pub mod tonic { use opentelemetry::{Array, Value}; use std::borrow::Cow; + #[cfg(any(feature = "trace", feature = "logs"))] + #[derive(Debug, Default)] + pub struct ResourceAttributesWithSchema { + pub attributes: Attributes, + pub schema_url: Option, + } + + #[cfg(any(feature = "trace", feature = "logs"))] + impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema { + fn from(resource: &opentelemetry_sdk::Resource) -> Self { + ResourceAttributesWithSchema { + attributes: resource_attributes(resource), + schema_url: resource.schema_url().map(ToString::to_string), + } + } + } + #[cfg(any(feature = "trace", feature = "logs"))] use opentelemetry_sdk::Resource; @@ -52,7 +69,7 @@ pub mod tonic { } /// Wrapper type for Vec<`KeyValue`> - #[derive(Default)] + #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); impl From> for Attributes { diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index bc5c2697c0..335526685b 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -7,7 +7,7 @@ pub mod tonic { resource::v1::Resource, Attributes, }, - transform::common::{to_nanos, tonic::resource_attributes}, + transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; @@ -110,18 +110,26 @@ pub mod tonic { } } - impl From for ResourceLogs { - fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self { + impl + From<( + opentelemetry_sdk::export::logs::LogData, + &ResourceAttributesWithSchema, + )> for ResourceLogs + { + fn from( + data: ( + opentelemetry_sdk::export::logs::LogData, + &ResourceAttributesWithSchema, + ), + ) -> Self { + let (log_data, resource) = data; + ResourceLogs { resource: Some(Resource { - attributes: resource_attributes(&log_data.resource).0, + attributes: resource.attributes.0.clone(), dropped_attributes_count: 0, }), - schema_url: log_data - .resource - .schema_url() - .map(Into::into) - .unwrap_or_default(), + schema_url: resource.schema_url.clone().unwrap_or_default(), scope_logs: vec![ScopeLogs { schema_url: log_data .instrumentation diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index d71d0ca4a1..19dac050a4 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -12,6 +12,14 @@ - **Breaking** [#1624](https://github.com/open-telemetry/opentelemetry-rust/pull/1624) Remove `OsResourceDetector` and `ProcessResourceDetector` resource detectors, use the [`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead. +- [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Improves performance by sending + Resource information to processors (and exporters) once, instead of sending with every log. If you are an author + of Processor, Exporter, the following are *BREAKING* changes. + - Implement `set_resource` method in your custom LogProcessor, which invokes exporter's `set_resource`. + - Implement `set_resource` method in your custom LogExporter. This method should save the resource object + in original or serialized format, to be merged with every log event during export. + - `LogData` doesn't have the resource attributes. The `LogExporter::export()` method needs to merge it + with the earlier preserved resource before export. - Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640) - Improves `shutdown` behavior of `LoggerProvider` and `LogProcessor` [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643). - `shutdown` can be called by any clone of the `LoggerProvider` without the need of waiting on all `Logger` drops. Thus, `try_shutdown` has been removed. diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 39db15900f..8676db9d16 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -7,7 +7,7 @@ use opentelemetry::{ logs::{LogError, LogRecord, LogResult}, InstrumentationLibrary, }; -use std::{borrow::Cow, fmt::Debug}; +use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] @@ -21,17 +21,16 @@ pub trait LogExporter: Send + Sync + Debug { fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true } + /// Set the resource for the exporter. + fn set_resource(&mut self, _resource: &Resource) {} } -/// `LogData` associates a [`LogRecord`] with a [`Resource`] and -/// [`InstrumentationLibrary`]. +/// `LogData` represents a single log event without resource context. #[derive(Clone, Debug)] pub struct LogData { /// Log record pub record: LogRecord, - /// Resource for the emitter who produced this `LogData`. - pub resource: Cow<'static, Resource>, - /// Instrumentation details for the emitter who produced this `LogData`. + /// Instrumentation details for the emitter who produced this `LogEvent`. pub instrumentation: InstrumentationLibrary, } diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 1fb650b965..e87214228b 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -176,6 +176,10 @@ impl Builder { /// Create a new provider from this configuration. pub fn build(self) -> LoggerProvider { + // invoke set_resource on all the processors + for processor in &self.processors { + processor.set_resource(&self.config.resource); + } LoggerProvider { inner: Arc::new(LoggerProviderInner { processors: self.processors, @@ -221,20 +225,19 @@ impl opentelemetry::logs::Logger for Logger { /// Emit a `LogRecord`. fn emit(&self, record: LogRecord) { let provider = self.provider(); - let config = provider.config(); let processors = provider.log_processors(); let trace_context = Context::map_current(|cx| { cx.has_active_span() .then(|| TraceContext::from(cx.span().span_context())) }); + for p in processors { - let mut record = record.clone(); + let mut cloned_record = record.clone(); if let Some(ref trace_context) = trace_context { - record.trace_context = Some(trace_context.clone()) + cloned_record.trace_context = Some(trace_context.clone()); } let data = LogData { - record, - resource: config.resource.clone(), + record: cloned_record, instrumentation: self.instrumentation_library().clone(), }; p.emit(data); diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 45fc25ff5e..d3d69c87b8 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,6 +1,7 @@ use crate::{ export::logs::{ExportResult, LogData, LogExporter}, runtime::{RuntimeChannel, TrySend}, + Resource, }; use futures_channel::oneshot; use futures_util::{ @@ -18,6 +19,7 @@ use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, str::FromStr, + sync::Arc, time::Duration, }; @@ -53,6 +55,9 @@ pub trait LogProcessor: Send + Sync + Debug { #[cfg(feature = "logs_level_enabled")] /// Check if logging is enabled fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; + + /// Set the resource for the log processor. + fn set_resource(&self, _resource: &Resource) {} } /// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon @@ -108,6 +113,12 @@ impl LogProcessor for SimpleLogProcessor { } } + fn set_resource(&self, resource: &Resource) { + if let Ok(mut exporter) = self.exporter.lock() { + exporter.set_resource(resource); + } + } + #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true @@ -163,6 +174,13 @@ impl LogProcessor for BatchLogProcessor { .map_err(|err| LogError::Other(err.into())) .and_then(std::convert::identity) } + + fn set_resource(&self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } } impl BatchLogProcessor { @@ -241,6 +259,11 @@ impl BatchLogProcessor { break; } + + // propagate the resource + BatchMessage::SetResource(resource) => { + exporter.set_resource(&resource); + } } } })); @@ -462,6 +485,8 @@ enum BatchMessage { Flush(Option>), /// Shut down the worker thread, push all logs in buffer to the backend. Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), } #[cfg(all(test, feature = "testing", feature = "logs"))] @@ -470,22 +495,52 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; - use crate::export::logs::LogData; - use crate::logs::{LogProcessor, SimpleLogProcessor}; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ + export::logs::{LogData, LogExporter}, logs::{ log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, }, - BatchConfig, BatchConfigBuilder, + BatchConfig, BatchConfigBuilder, Config, LogProcessor, LoggerProvider, + SimpleLogProcessor, }, runtime, testing::logs::InMemoryLogsExporter, + Resource, }; + use async_trait::async_trait; + use opentelemetry::{logs::LogResult, KeyValue}; + use std::sync::Arc; use std::time::Duration; + #[derive(Debug, Clone)] + struct MockLogExporter { + resource: Arc>, + } + + #[async_trait] + impl LogExporter for MockLogExporter { + async fn export(&mut self, _batch: Vec) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&mut self) {} + + fn set_resource(&mut self, resource: &Resource) { + let res = Arc::make_mut(&mut self.resource); + *res = Some(resource.clone()); + } + } + + // Implementation specific to the MockLogExporter, not part of the LogExporter trait + impl MockLogExporter { + fn get_resource(&self) -> Option { + (*self.resource).clone() + } + } + #[test] fn test_default_const_values() { assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY"); @@ -636,6 +691,47 @@ mod tests { assert_eq!(actual.max_queue_size, 4); } + #[test] + fn test_set_resource_simple_processor() { + let exporter = MockLogExporter { + resource: Arc::new(Some(Resource::default())), + }; + let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let _ = LoggerProvider::builder() + .with_log_processor(processor) + .with_config(Config::default().with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + ]))) + .build(); + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 4); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_set_resource_batch_processor() { + let exporter = MockLogExporter { + resource: Arc::new(Some(Resource::default())), + }; + let processor = BatchLogProcessor::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + let provider = LoggerProvider::builder() + .with_log_processor(processor) + .with_config(Config::default().with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + ]))) + .build(); + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 4); + provider.shutdown(); + } + #[tokio::test(flavor = "multi_thread")] async fn test_batch_shutdown() { // assert we will receive an error @@ -650,7 +746,6 @@ mod tests { ); processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), }); processor.force_flush().unwrap(); @@ -658,7 +753,6 @@ mod tests { // todo: expect to see errors here. How should we assert this? processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), }); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) @@ -673,7 +767,6 @@ mod tests { processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), }); @@ -686,7 +779,6 @@ mod tests { processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), }); diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index e987209fb7..63143f0b12 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,6 +1,9 @@ use crate::export::logs::{LogData, LogExporter}; +use crate::Resource; use async_trait::async_trait; -use opentelemetry::logs::{LogError, LogResult}; +use opentelemetry::logs::{LogError, LogRecord, LogResult}; +use opentelemetry::InstrumentationLibrary; +use std::borrow::Cow; use std::sync::{Arc, Mutex}; /// An in-memory logs exporter that stores logs data in memory.. @@ -36,6 +39,7 @@ use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { logs: Arc>>, + resource: Arc>, should_reset_on_shutdown: bool, } @@ -45,6 +49,18 @@ impl Default for InMemoryLogsExporter { } } +/// `LogDataWithResource` associates a [`LogRecord`] with a [`Resource`] and +/// [`InstrumentationLibrary`]. +#[derive(Clone, Debug)] +pub struct LogDataWithResource { + /// Log record + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogData`. + pub instrumentation: InstrumentationLibrary, + /// Resource for the emitter who produced this `LogData`. + pub resource: Cow<'static, Resource>, +} + ///Builder for ['InMemoryLogsExporter']. /// # Example /// @@ -96,6 +112,7 @@ impl InMemoryLogsExporterBuilder { pub fn build(&self) -> InMemoryLogsExporter { InMemoryLogsExporter { logs: Arc::new(Mutex::new(Vec::new())), + resource: Arc::new(Mutex::new(Resource::default())), should_reset_on_shutdown: self.reset_on_shutdown, } } @@ -121,13 +138,20 @@ impl InMemoryLogsExporter { /// let emitted_logs = exporter.get_emitted_logs().unwrap(); /// ``` /// - pub fn get_emitted_logs(&self) -> LogResult> { - self.logs - .lock() - .map(|logs_guard| logs_guard.iter().cloned().collect()) - .map_err(LogError::from) - } + pub fn get_emitted_logs(&self) -> LogResult> { + let logs_guard = self.logs.lock().map_err(LogError::from)?; + let resource_guard = self.resource.lock().map_err(LogError::from)?; + let logs: Vec = logs_guard + .iter() + .map(|log_data| LogDataWithResource { + record: log_data.record.clone(), + resource: Cow::Owned(resource_guard.clone()), + instrumentation: log_data.instrumentation.clone(), + }) + .collect(); + Ok(logs) + } /// Clears the internal (in-memory) storage of logs. /// /// # Example @@ -161,4 +185,9 @@ impl LogExporter for InMemoryLogsExporter { self.reset(); } } + + fn set_resource(&mut self, resource: &Resource) { + let mut res_guard = self.resource.lock().expect("Resource lock poisoned"); + *res_guard = resource.clone(); + } } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index cece3b07e8..fcea153133 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -5,6 +5,7 @@ use opentelemetry::{ ExportError, }; use opentelemetry_sdk::export::logs::{ExportResult, LogData}; +use opentelemetry_sdk::Resource; use std::io::{stdout, Write}; type Encoder = @@ -18,6 +19,7 @@ type Encoder = pub struct LogExporter { writer: Option>, encoder: Encoder, + resource: Resource, } impl LogExporter { @@ -44,7 +46,8 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout async fn export(&mut self, batch: Vec) -> ExportResult { if let Some(writer) = &mut self.writer { - let result = (self.encoder)(writer, crate::logs::LogData::from(batch)) as LogResult<()>; + let log_data = crate::logs::transform::LogData::from((batch, &self.resource)); + let result = (self.encoder)(writer, log_data) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) } else { Err("exporter is shut down".into()) @@ -54,6 +57,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { fn shutdown(&mut self) { self.writer.take(); } + + fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { + self.resource = res.clone(); + } } /// Stdout exporter's error @@ -127,6 +134,7 @@ impl LogExporterBuilder { pub fn build(self) -> LogExporter { LogExporter { writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))), + resource: Resource::default(), encoder: self.encoder.unwrap_or_else(|| { Box::new(|writer, logs| { serde_json::to_writer(writer, &logs) diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 9612cf1ff6..10332953d2 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -15,18 +15,28 @@ pub struct LogData { resource_logs: Vec, } -impl From> for LogData { - fn from(sdk_logs: Vec) -> LogData { +impl + From<( + Vec, + &opentelemetry_sdk::Resource, + )> for LogData +{ + fn from( + (sdk_logs, sdk_resource): ( + Vec, + &opentelemetry_sdk::Resource, + ), + ) -> Self { let mut resource_logs = HashMap::::new(); for sdk_log in sdk_logs { - let resource_schema_url = sdk_log.resource.schema_url().map(|s| s.to_string().into()); + let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_log.instrumentation.schema_url.clone(); let scope: Scope = sdk_log.instrumentation.clone().into(); - let resource: Resource = sdk_log.resource.as_ref().into(); + let resource: Resource = sdk_resource.into(); let rl = resource_logs - .entry(sdk_log.resource.as_ref().into()) + .entry(sdk_resource.into()) .or_insert_with(move || ResourceLogs { resource, scope_logs: Vec::with_capacity(1),