From 57fa974c47df0b512baa2e63accd80941c9c530f Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 21 Mar 2024 18:15:05 +0000 Subject: [PATCH 01/22] initial commit --- opentelemetry-otlp/src/exporter/http/logs.rs | 11 ++++-- opentelemetry-otlp/src/exporter/http/mod.rs | 3 ++ opentelemetry-otlp/src/exporter/tonic/logs.rs | 8 +++- opentelemetry-otlp/src/logs.rs | 8 +++- opentelemetry-sdk/benches/log.rs | 6 ++- opentelemetry-sdk/src/export/logs/mod.rs | 12 +++++- opentelemetry-sdk/src/logs/log_emitter.rs | 12 +++--- opentelemetry-sdk/src/logs/log_processor.rs | 37 ++++++++++++++++--- .../src/testing/logs/in_memory_exporter.rs | 32 ++++++++++++---- opentelemetry-stdout/src/logs/exporter.rs | 14 +++++-- opentelemetry-stdout/src/logs/transform.rs | 24 ++++++++---- stress/src/logs.rs | 6 ++- 12 files changed, 133 insertions(+), 40 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 6bb8b76493..eef40ae77e 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry_sdk::export::logs::{LogEvent, LogExporter}; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export(&mut self, batch: Vec) -> LogResult<()> { let client = self .client .lock() @@ -50,10 +50,15 @@ impl LogExporter for OtlpHttpClient { fn shutdown(&mut self) { let _ = self.client.lock().map(|mut c| c.take()); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + let mut res = self.resource.lock().unwrap(); + *res = resource.clone(); + } } #[cfg(feature = "http-proto")] -fn build_body(logs: Vec) -> LogResult<(Vec, &'static str)> { +fn build_body(logs: Vec) -> LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use prost::Message; diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index a8b4e3d0c3..87b7d6a5cd 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -4,6 +4,7 @@ use crate::{ }; use http::{HeaderName, HeaderValue, Uri}; use opentelemetry_http::HttpClient; +use opentelemetry_sdk::Resource; use std::collections::HashMap; use std::env; use std::str::FromStr; @@ -255,6 +256,7 @@ struct OtlpHttpClient { collector_endpoint: Uri, headers: HashMap, _timeout: Duration, + resource: Arc>, } impl OtlpHttpClient { @@ -270,6 +272,7 @@ impl OtlpHttpClient { collector_endpoint, headers, _timeout: timeout, + resource: Arc::new(Mutex::new(Resource::default())), } } } diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 4b5a5787e3..6c6f71af5c 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -5,7 +5,7 @@ use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry_sdk::export::logs::{LogEvent, LogExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -49,7 +49,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export(&mut self, batch: Vec) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner @@ -79,4 +79,8 @@ impl LogExporter for TonicLogsClient { fn shutdown(&mut self) { let _ = self.inner.take(); } + + fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) { + todo!("set_resource") + } } diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index a9ec267d0c..b8aae5a9b7 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -16,7 +16,7 @@ use opentelemetry::{ global, logs::{LogError, LoggerProvider}, }; -use opentelemetry_sdk::{self, export::logs::LogData, runtime::RuntimeChannel}; +use opentelemetry_sdk::{self, export::logs::LogEvent, runtime::RuntimeChannel}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -100,9 +100,13 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&mut self, batch: Vec) -> opentelemetry::logs::LogResult<()> { + async fn export(&mut self, batch: Vec) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } + + fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) { + todo!("set_resource"); + } } /// Recommended configuration for an OTLP exporter pipeline. diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index 9b66ad0447..ca3338dd10 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -8,7 +8,7 @@ use opentelemetry::logs::{AnyValue, LogRecord, LogResult, Logger, LoggerProvider use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; use opentelemetry::Key; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry_sdk::export::logs::{LogEvent, LogExporter}; use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::trace::{config, Sampler, TracerProvider}; @@ -17,9 +17,11 @@ struct VoidExporter; #[async_trait] impl LogExporter for VoidExporter { - async fn export(&mut self, _batch: Vec) -> LogResult<()> { + async fn export(&mut self, _batch: Vec) -> LogResult<()> { LogResult::Ok(()) } + + fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {} } fn log_benchmark_group(c: &mut Criterion, name: &str, f: F) { diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 39db15900f..eb1fa7d887 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -13,7 +13,7 @@ use std::{borrow::Cow, fmt::Debug}; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export(&mut self, batch: Vec) -> LogResult<()>; + async fn export(&mut self, batch: Vec) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] @@ -21,6 +21,16 @@ pub trait LogExporter: Send + Sync + Debug { fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true } + /// Set the resource for the exporter. + fn set_resource(&mut self, resource: &Resource); +} +/// `LogEvent` represents a single log event without resource context. +#[derive(Clone, Debug)] +pub struct LogEvent { + /// Log record + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, } /// `LogData` associates a [`LogRecord`] with a [`Resource`] and diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 475cf72f77..de74144dfe 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,6 +1,6 @@ use super::{BatchLogProcessor, Config, LogProcessor, SimpleLogProcessor}; use crate::{ - export::logs::{LogData, LogExporter}, + export::logs::{LogEvent, LogExporter}, runtime::RuntimeChannel, }; use opentelemetry::{ @@ -200,7 +200,6 @@ impl opentelemetry::logs::Logger for Logger { /// Emit a `LogRecord`. fn emit(&self, record: LogRecord) { let provider = self.provider(); - let config = provider.config(); let processors = provider.log_processors(); let trace_context = Context::map_current(|cx| { cx.has_active_span() @@ -211,9 +210,8 @@ impl opentelemetry::logs::Logger for Logger { if let Some(ref trace_context) = trace_context { record.trace_context = Some(trace_context.clone()) } - let data = LogData { + let data = LogEvent { record, - resource: config.resource.clone(), instrumentation: self.instrumentation_library().clone(), }; p.emit(data); @@ -345,7 +343,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: LogData) { + fn emit(&self, _data: LogEvent) { // nothing to do. } @@ -363,5 +361,9 @@ mod tests { fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true } + + fn set_resource(&mut self, _resource: &crate::Resource) { + // nothing to do. + } } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 57ace8dad6..9c3faa6413 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,6 +1,7 @@ use crate::{ - export::logs::{ExportResult, LogData, LogExporter}, + export::logs::{ExportResult, LogEvent, LogExporter}, runtime::{RuntimeChannel, TrySend}, + Resource, }; use futures_channel::oneshot; use futures_util::{ @@ -17,6 +18,7 @@ use std::{cmp::min, env, sync::Mutex}; use std::{ fmt::{self, Debug, Formatter}, str::FromStr, + sync::Arc, time::Duration, }; @@ -42,7 +44,7 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; /// [`Logger`]: crate::logs::Logger pub trait LogProcessor: Send + Sync + Debug { /// Called when a log record is ready to processed and exported. - fn emit(&self, data: LogData); + fn emit(&self, data: LogEvent); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -50,6 +52,9 @@ pub trait LogProcessor: Send + Sync + Debug { #[cfg(feature = "logs_level_enabled")] /// Check if logging is enabled fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; + + /// Set the resource for the log processor. + fn set_resource(&mut self, resource: &Resource); } /// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon @@ -70,7 +75,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: LogData) { + fn emit(&self, data: LogEvent) { let result = self .exporter .lock() @@ -96,6 +101,12 @@ impl LogProcessor for SimpleLogProcessor { } } + fn set_resource(&mut self, resource: &Resource) { + if let Ok(mut exporter) = self.exporter.lock() { + exporter.set_resource(resource); + } + } + #[cfg(feature = "logs_level_enabled")] fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true @@ -117,7 +128,7 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: LogData) { + fn emit(&self, data: LogEvent) { let result = self.message_sender.try_send(BatchMessage::ExportLog(data)); if let Err(err) = result { @@ -151,6 +162,13 @@ impl LogProcessor for BatchLogProcessor { .map_err(|err| LogError::Other(err.into())) .and_then(std::convert::identity) } + + fn set_resource(&mut self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } } impl BatchLogProcessor { @@ -229,6 +247,11 @@ impl BatchLogProcessor { break; } + + // propagate the resource + BatchMessage::SetResource(resource) => { + exporter.set_resource(&*resource); + } } } })); @@ -254,7 +277,7 @@ async fn export_with_timeout( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec, + batch: Vec, ) -> ExportResult where R: RuntimeChannel, @@ -444,12 +467,14 @@ where #[derive(Debug)] enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog(LogData), + ExportLog(LogEvent), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), /// Shut down the worker thread, push all logs in buffer to the backend. Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), } #[cfg(all(test, feature = "testing", feature = "logs"))] diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index aca67dc20b..aed5d8c693 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,6 +1,8 @@ -use crate::export::logs::{LogData, LogExporter}; +use crate::export::logs::{LogData, LogEvent, LogExporter}; +use crate::Resource; use async_trait::async_trait; use opentelemetry::logs::{LogError, LogResult}; +use std::borrow::Cow; use std::sync::{Arc, Mutex}; /// An in-memory logs exporter that stores logs data in memory.. @@ -35,7 +37,8 @@ use std::sync::{Arc, Mutex}; /// #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { - logs: Arc>>, + logs: Arc>>, + resource: Arc>, } impl Default for InMemoryLogsExporter { @@ -91,6 +94,7 @@ impl InMemoryLogsExporterBuilder { pub fn build(&self) -> InMemoryLogsExporter { InMemoryLogsExporter { logs: Arc::new(Mutex::new(Vec::new())), + resource: Arc::new(Mutex::new(Resource::default())), } } } @@ -108,12 +112,19 @@ impl InMemoryLogsExporter { /// ``` /// pub fn get_emitted_logs(&self) -> LogResult> { - self.logs - .lock() - .map(|logs_guard| logs_guard.iter().cloned().collect()) - .map_err(LogError::from) - } + let logs_guard = self.logs.lock().map_err(LogError::from)?; + let resource_guard = self.resource.lock().map_err(LogError::from)?; + let logs: Vec = logs_guard + .iter() + .map(|log_event| LogData { + record: log_event.record.clone(), + resource: Cow::Owned(resource_guard.clone()), + instrumentation: log_event.instrumentation.clone(), + }) + .collect(); + Ok(logs) + } /// Clears the internal (in-memory) storage of logs. /// /// # Example @@ -136,7 +147,7 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export(&mut self, batch: Vec) -> LogResult<()> { self.logs .lock() .map(|mut logs_guard| logs_guard.append(&mut batch.clone())) @@ -145,4 +156,9 @@ impl LogExporter for InMemoryLogsExporter { fn shutdown(&mut self) { self.reset(); } + + fn set_resource(&mut self, resource: &Resource) { + let mut res_guard = self.resource.lock().expect("Resource lock poisoned"); + *res_guard = resource.clone(); + } } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index cece3b07e8..0f70a4bc14 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -4,7 +4,8 @@ use opentelemetry::{ logs::{LogError, LogResult}, ExportError, }; -use opentelemetry_sdk::export::logs::{ExportResult, LogData}; +use opentelemetry_sdk::export::logs::{ExportResult, LogEvent}; +use opentelemetry_sdk::Resource; use std::io::{stdout, Write}; type Encoder = @@ -18,6 +19,7 @@ type Encoder = pub struct LogExporter { writer: Option>, encoder: Encoder, + resource: Resource, } impl LogExporter { @@ -42,9 +44,10 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: Vec) -> ExportResult { + async fn export(&mut self, batch: Vec) -> ExportResult { if let Some(writer) = &mut self.writer { - let result = (self.encoder)(writer, crate::logs::LogData::from(batch)) as LogResult<()>; + let log_data = crate::logs::transform::LogData::from((batch, &self.resource)); + let result = (self.encoder)(writer, log_data) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) } else { Err("exporter is shut down".into()) @@ -54,6 +57,10 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { fn shutdown(&mut self) { self.writer.take(); } + + fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { + self.resource = res.clone(); + } } /// Stdout exporter's error @@ -127,6 +134,7 @@ impl LogExporterBuilder { pub fn build(self) -> LogExporter { LogExporter { writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))), + resource: Resource::default(), encoder: self.encoder.unwrap_or_else(|| { Box::new(|writer, logs| { serde_json::to_writer(writer, &logs) diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 9612cf1ff6..218ffb580b 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -15,18 +15,28 @@ pub struct LogData { resource_logs: Vec, } -impl From> for LogData { - fn from(sdk_logs: Vec) -> LogData { +impl + From<( + Vec, + &opentelemetry_sdk::Resource, + )> for LogData +{ + fn from( + (sdk_logs, sdk_resource): ( + Vec, + &opentelemetry_sdk::Resource, + ), + ) -> Self { let mut resource_logs = HashMap::::new(); for sdk_log in sdk_logs { - let resource_schema_url = sdk_log.resource.schema_url().map(|s| s.to_string().into()); + let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_log.instrumentation.schema_url.clone(); let scope: Scope = sdk_log.instrumentation.clone().into(); - let resource: Resource = sdk_log.resource.as_ref().into(); + let resource: Resource = sdk_resource.into(); let rl = resource_logs - .entry(sdk_log.resource.as_ref().into()) + .entry(sdk_resource.into()) .or_insert_with(move || ResourceLogs { resource, scope_logs: Vec::with_capacity(1), @@ -95,8 +105,8 @@ struct LogRecord { trace_id: Option, } -impl From for LogRecord { - fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { +impl From for LogRecord { + fn from(value: opentelemetry_sdk::export::logs::LogEvent) -> Self { LogRecord { trace_id: value .record diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 27ddb40965..5af87feef3 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -38,7 +38,7 @@ where pub struct NoOpLogProcessor; impl LogProcessor for NoOpLogProcessor { - fn emit(&self, _data: opentelemetry_sdk::export::logs::LogData) {} + fn emit(&self, _data: opentelemetry_sdk::export::logs::LogEvent) {} fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { Ok(()) @@ -56,6 +56,10 @@ impl LogProcessor for NoOpLogProcessor { ) -> bool { true } + + fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) { + todo!("set_resource") + } } fn main() { From 597a2af58ed8c0ae07f52b0d788185a527472820 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 27 Mar 2024 06:39:16 +0000 Subject: [PATCH 02/22] more changes --- opentelemetry-otlp/src/exporter/http/logs.rs | 23 ++++++++++---- opentelemetry-otlp/src/exporter/tonic/logs.rs | 17 +++++++++-- opentelemetry-proto/src/transform/logs.rs | 30 ++++++++++++------- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index eef40ae77e..7a6b531564 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -19,7 +19,10 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = build_body(batch)?; + let (body, content_type) = { + let resource = self.resource.lock().unwrap(); + build_body(batch, &*resource)? + }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) @@ -58,13 +61,18 @@ impl LogExporter for OtlpHttpClient { } #[cfg(feature = "http-proto")] -fn build_body(logs: Vec) -> LogResult<(Vec, &'static str)> { +fn build_body( + logs: Vec, + resource: &opentelemetry_sdk::Resource, +) -> LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use prost::Message; + let resource_logs = logs + .into_iter() + .map(|log_event| (log_event, resource).into()) + .collect::>(); - let req = ExportLogsServiceRequest { - resource_logs: logs.into_iter().map(Into::into).collect(), - }; + let req = ExportLogsServiceRequest { resource_logs }; let mut buf = vec![]; req.encode(&mut buf).map_err(crate::Error::from)?; @@ -72,7 +80,10 @@ fn build_body(logs: Vec) -> LogResult<(Vec, &'static str)> { } #[cfg(not(feature = "http-proto"))] -fn build_body(logs: Vec) -> LogResult<(Vec, &'static str)> { +fn build_body( + logs: Vec, + resource: &opentelemetry_sdk::Resource, +) -> LogResult<(Vec, &'static str)> { Err(LogsError::Other( "No http protocol configured. Enable one via `http-proto`".into(), )) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 6c6f71af5c..54e0939e3b 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,5 +1,7 @@ use core::fmt; +use std::sync::{Arc, Mutex}; + use async_trait::async_trait; use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ @@ -12,6 +14,7 @@ use super::BoxInterceptor; pub(crate) struct TonicLogsClient { inner: Option, + resource: Arc>, } struct ClientInner { @@ -43,6 +46,7 @@ impl TonicLogsClient { client, interceptor, }), + resource: Arc::new(Mutex::new(opentelemetry_sdk::Resource::default())), } } } @@ -62,13 +66,20 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; + let resource_logs = { + let resource = self.resource.lock().unwrap(); + batch + .into_iter() + .map(|log_event| (log_event, &*resource)) + .map(Into::into) + .collect() + }; + client .export(Request::from_parts( metadata, extensions, - ExportLogsServiceRequest { - resource_logs: batch.into_iter().map(Into::into).collect(), - }, + ExportLogsServiceRequest { resource_logs }, )) .await .map_err(crate::Error::from)?; diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index bc5c2697c0..1fd34c3ded 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -110,27 +110,35 @@ pub mod tonic { } } - impl From for ResourceLogs { - fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self { + impl + From<( + opentelemetry_sdk::export::logs::LogEvent, + &opentelemetry_sdk::Resource, + )> for ResourceLogs + { + fn from( + data: ( + opentelemetry_sdk::export::logs::LogEvent, + &opentelemetry_sdk::Resource, + ), + ) -> Self { + let (log_event, resource) = data; + ResourceLogs { resource: Some(Resource { - attributes: resource_attributes(&log_data.resource).0, + attributes: resource_attributes(resource).0, dropped_attributes_count: 0, }), - schema_url: log_data - .resource - .schema_url() - .map(Into::into) - .unwrap_or_default(), + schema_url: resource.schema_url().map(Into::into).unwrap_or_default(), scope_logs: vec![ScopeLogs { - schema_url: log_data + schema_url: log_event .instrumentation .schema_url .clone() .map(Into::into) .unwrap_or_default(), - scope: Some(log_data.instrumentation.into()), - log_records: vec![log_data.record.into()], + scope: Some(log_event.instrumentation.into()), + log_records: vec![log_event.record.into()], }], } } From 732b9d6aa8b32ca071486a3e40a167185c6d9f63 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 27 Mar 2024 06:50:49 +0000 Subject: [PATCH 03/22] lint error --- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 9c3faa6413..f7c535d191 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -250,7 +250,7 @@ impl BatchLogProcessor { // propagate the resource BatchMessage::SetResource(resource) => { - exporter.set_resource(&*resource); + exporter.set_resource(&resource); } } } From c18499a7b55a28465081f15c255c7639322f9a9a Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 27 Mar 2024 07:09:23 +0000 Subject: [PATCH 04/22] fix lint --- opentelemetry-otlp/src/exporter/http/logs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 7a6b531564..c402a4fc57 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -21,7 +21,7 @@ impl LogExporter for OtlpHttpClient { let (body, content_type) = { let resource = self.resource.lock().unwrap(); - build_body(batch, &*resource)? + build_body(batch, &resource)? }; let mut request = http::Request::builder() .method(Method::POST) From 00c0eaa3e05141eb373706ee7fdc0a3e0f0c8bb5 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 27 Mar 2024 20:05:57 +0000 Subject: [PATCH 05/22] lint errors, rename LogEvent to LogData ; and LogData to LogDataWithResources --- opentelemetry-otlp/src/exporter/http/logs.rs | 8 ++++---- opentelemetry-otlp/src/exporter/http/mod.rs | 2 ++ opentelemetry-otlp/src/exporter/tonic/logs.rs | 4 ++-- opentelemetry-otlp/src/logs.rs | 4 ++-- opentelemetry-proto/src/transform/logs.rs | 4 ++-- opentelemetry-sdk/benches/log.rs | 4 ++-- opentelemetry-sdk/src/export/logs/mod.rs | 14 +++++++------- opentelemetry-sdk/src/logs/log_emitter.rs | 6 +++--- opentelemetry-sdk/src/logs/log_processor.rs | 12 ++++++------ .../src/testing/logs/in_memory_exporter.rs | 16 ++++++++-------- opentelemetry-stdout/src/logs/exporter.rs | 4 ++-- opentelemetry-stdout/src/logs/transform.rs | 8 ++++---- stress/src/logs.rs | 2 +- 13 files changed, 45 insertions(+), 43 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index c402a4fc57..389a6eea43 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry_sdk::export::logs::{LogEvent, LogExporter}; +use opentelemetry_sdk::export::logs::{LogData, LogExporter}; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export(&mut self, batch: Vec) -> LogResult<()> { let client = self .client .lock() @@ -62,7 +62,7 @@ impl LogExporter for OtlpHttpClient { #[cfg(feature = "http-proto")] fn build_body( - logs: Vec, + logs: Vec, resource: &opentelemetry_sdk::Resource, ) -> LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; @@ -81,7 +81,7 @@ fn build_body( #[cfg(not(feature = "http-proto"))] fn build_body( - logs: Vec, + logs: Vec, resource: &opentelemetry_sdk::Resource, ) -> LogResult<(Vec, &'static str)> { Err(LogsError::Other( diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 87b7d6a5cd..b8b83bce78 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -256,6 +256,8 @@ struct OtlpHttpClient { collector_endpoint: Uri, headers: HashMap, _timeout: Duration, + #[allow(dead_code)] + // this would be removed once we support set_resource for metrics and traces. resource: Arc>, } diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 54e0939e3b..ac834b42e6 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -7,7 +7,7 @@ use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogEvent, LogExporter}; +use opentelemetry_sdk::export::logs::{LogData, LogExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -53,7 +53,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export(&mut self, batch: Vec) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index b8aae5a9b7..6e6f9c3757 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -16,7 +16,7 @@ use opentelemetry::{ global, logs::{LogError, LoggerProvider}, }; -use opentelemetry_sdk::{self, export::logs::LogEvent, runtime::RuntimeChannel}; +use opentelemetry_sdk::{self, export::logs::LogData, runtime::RuntimeChannel}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -100,7 +100,7 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export(&mut self, batch: Vec) -> opentelemetry::logs::LogResult<()> { + async fn export(&mut self, batch: Vec) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 1fd34c3ded..5206377555 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -112,13 +112,13 @@ pub mod tonic { impl From<( - opentelemetry_sdk::export::logs::LogEvent, + opentelemetry_sdk::export::logs::LogData, &opentelemetry_sdk::Resource, )> for ResourceLogs { fn from( data: ( - opentelemetry_sdk::export::logs::LogEvent, + opentelemetry_sdk::export::logs::LogData, &opentelemetry_sdk::Resource, ), ) -> Self { diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index ca3338dd10..14bfae84c7 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -8,7 +8,7 @@ use opentelemetry::logs::{AnyValue, LogRecord, LogResult, Logger, LoggerProvider use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; use opentelemetry::Key; -use opentelemetry_sdk::export::logs::{LogEvent, LogExporter}; +use opentelemetry_sdk::export::logs::{LogData, LogExporter}; use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::trace::{config, Sampler, TracerProvider}; @@ -17,7 +17,7 @@ struct VoidExporter; #[async_trait] impl LogExporter for VoidExporter { - async fn export(&mut self, _batch: Vec) -> LogResult<()> { + async fn export(&mut self, _batch: Vec) -> LogResult<()> { LogResult::Ok(()) } diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index eb1fa7d887..d068f03bc2 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -13,7 +13,7 @@ use std::{borrow::Cow, fmt::Debug}; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export(&mut self, batch: Vec) -> LogResult<()>; + async fn export(&mut self, batch: Vec) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] @@ -24,25 +24,25 @@ pub trait LogExporter: Send + Sync + Debug { /// Set the resource for the exporter. fn set_resource(&mut self, resource: &Resource); } -/// `LogEvent` represents a single log event without resource context. +/// `LogData` represents a single log event without resource context. #[derive(Clone, Debug)] -pub struct LogEvent { +pub struct LogData { /// Log record pub record: LogRecord, /// Instrumentation details for the emitter who produced this `LogEvent`. pub instrumentation: InstrumentationLibrary, } -/// `LogData` associates a [`LogRecord`] with a [`Resource`] and +/// `LogDataWithResource` associates a [`LogRecord`] with a [`Resource`] and /// [`InstrumentationLibrary`]. #[derive(Clone, Debug)] -pub struct LogData { +pub struct LogDataWithResource { /// Log record pub record: LogRecord, - /// Resource for the emitter who produced this `LogData`. - pub resource: Cow<'static, Resource>, /// Instrumentation details for the emitter who produced this `LogData`. pub instrumentation: InstrumentationLibrary, + /// Resource for the emitter who produced this `LogData`. + pub resource: Cow<'static, Resource>, } /// Describes the result of an export. diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index de74144dfe..55362b51b9 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,6 +1,6 @@ use super::{BatchLogProcessor, Config, LogProcessor, SimpleLogProcessor}; use crate::{ - export::logs::{LogEvent, LogExporter}, + export::logs::{LogData, LogExporter}, runtime::RuntimeChannel, }; use opentelemetry::{ @@ -210,7 +210,7 @@ impl opentelemetry::logs::Logger for Logger { if let Some(ref trace_context) = trace_context { record.trace_context = Some(trace_context.clone()) } - let data = LogEvent { + let data = LogData { record, instrumentation: self.instrumentation_library().clone(), }; @@ -343,7 +343,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: LogEvent) { + fn emit(&self, _data: LogData) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index f7c535d191..c10d05114e 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,5 +1,5 @@ use crate::{ - export::logs::{ExportResult, LogEvent, LogExporter}, + export::logs::{ExportResult, LogData, LogExporter}, runtime::{RuntimeChannel, TrySend}, Resource, }; @@ -44,7 +44,7 @@ const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; /// [`Logger`]: crate::logs::Logger pub trait LogProcessor: Send + Sync + Debug { /// Called when a log record is ready to processed and exported. - fn emit(&self, data: LogEvent); + fn emit(&self, data: LogData); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -75,7 +75,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: LogEvent) { + fn emit(&self, data: LogData) { let result = self .exporter .lock() @@ -128,7 +128,7 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: LogEvent) { + fn emit(&self, data: LogData) { let result = self.message_sender.try_send(BatchMessage::ExportLog(data)); if let Err(err) = result { @@ -277,7 +277,7 @@ async fn export_with_timeout( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec, + batch: Vec, ) -> ExportResult where R: RuntimeChannel, @@ -467,7 +467,7 @@ where #[derive(Debug)] enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog(LogEvent), + ExportLog(LogData), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index aed5d8c693..0d4434ae6c 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::{LogData, LogEvent, LogExporter}; +use crate::export::logs::{LogData, LogDataWithResource, LogExporter}; use crate::Resource; use async_trait::async_trait; use opentelemetry::logs::{LogError, LogResult}; @@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex}; /// #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { - logs: Arc>>, + logs: Arc>>, resource: Arc>, } @@ -111,15 +111,15 @@ impl InMemoryLogsExporter { /// let emitted_logs = exporter.get_emitted_logs().unwrap(); /// ``` /// - pub fn get_emitted_logs(&self) -> LogResult> { + pub fn get_emitted_logs(&self) -> LogResult> { let logs_guard = self.logs.lock().map_err(LogError::from)?; let resource_guard = self.resource.lock().map_err(LogError::from)?; - let logs: Vec = logs_guard + let logs: Vec = logs_guard .iter() - .map(|log_event| LogData { - record: log_event.record.clone(), + .map(|log_data| LogDataWithResource { + record: log_data.record.clone(), resource: Cow::Owned(resource_guard.clone()), - instrumentation: log_event.instrumentation.clone(), + instrumentation: log_data.instrumentation.clone(), }) .collect(); @@ -147,7 +147,7 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export(&mut self, batch: Vec) -> LogResult<()> { + async fn export(&mut self, batch: Vec) -> LogResult<()> { self.logs .lock() .map(|mut logs_guard| logs_guard.append(&mut batch.clone())) diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 0f70a4bc14..fcea153133 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -4,7 +4,7 @@ use opentelemetry::{ logs::{LogError, LogResult}, ExportError, }; -use opentelemetry_sdk::export::logs::{ExportResult, LogEvent}; +use opentelemetry_sdk::export::logs::{ExportResult, LogData}; use opentelemetry_sdk::Resource; use std::io::{stdout, Write}; @@ -44,7 +44,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export(&mut self, batch: Vec) -> ExportResult { + async fn export(&mut self, batch: Vec) -> ExportResult { if let Some(writer) = &mut self.writer { let log_data = crate::logs::transform::LogData::from((batch, &self.resource)); let result = (self.encoder)(writer, log_data) as LogResult<()>; diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 218ffb580b..10332953d2 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -17,13 +17,13 @@ pub struct LogData { impl From<( - Vec, + Vec, &opentelemetry_sdk::Resource, )> for LogData { fn from( (sdk_logs, sdk_resource): ( - Vec, + Vec, &opentelemetry_sdk::Resource, ), ) -> Self { @@ -105,8 +105,8 @@ struct LogRecord { trace_id: Option, } -impl From for LogRecord { - fn from(value: opentelemetry_sdk::export::logs::LogEvent) -> Self { +impl From for LogRecord { + fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { LogRecord { trace_id: value .record diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 5af87feef3..3564aa79b3 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -38,7 +38,7 @@ where pub struct NoOpLogProcessor; impl LogProcessor for NoOpLogProcessor { - fn emit(&self, _data: opentelemetry_sdk::export::logs::LogEvent) {} + fn emit(&self, _data: opentelemetry_sdk::export::logs::LogData) {} fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { Ok(()) From 5361996bb634a56ee50d6bf0c4929ba0f1dbdf8f Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 04:24:22 +0000 Subject: [PATCH 06/22] optimize otlp --- opentelemetry-otlp/src/exporter/http/logs.rs | 10 +++------- opentelemetry-otlp/src/exporter/http/mod.rs | 6 +++--- opentelemetry-otlp/src/exporter/tonic/logs.rs | 18 ++++++++---------- opentelemetry-proto/src/transform/common.rs | 19 ++++++++++++++++++- opentelemetry-proto/src/transform/logs.rs | 18 +++++++++--------- 5 files changed, 41 insertions(+), 30 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 389a6eea43..d1c0aba363 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -19,10 +19,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = { - let resource = self.resource.lock().unwrap(); - build_body(batch, &resource)? - }; + let (body, content_type) = { build_body(batch, &self.resource)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) @@ -55,15 +52,14 @@ impl LogExporter for OtlpHttpClient { } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { - let mut res = self.resource.lock().unwrap(); - *res = resource.clone(); + self.resource = resource.into(); } } #[cfg(feature = "http-proto")] fn build_body( logs: Vec, - resource: &opentelemetry_sdk::Resource, + resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use prost::Message; diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index b8b83bce78..d54a490f0d 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -4,7 +4,7 @@ use crate::{ }; use http::{HeaderName, HeaderValue, Uri}; use opentelemetry_http::HttpClient; -use opentelemetry_sdk::Resource; +use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; use std::collections::HashMap; use std::env; use std::str::FromStr; @@ -258,7 +258,7 @@ struct OtlpHttpClient { _timeout: Duration, #[allow(dead_code)] // this would be removed once we support set_resource for metrics and traces. - resource: Arc>, + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } impl OtlpHttpClient { @@ -274,7 +274,7 @@ impl OtlpHttpClient { collector_endpoint, headers, _timeout: timeout, - resource: Arc::new(Mutex::new(Resource::default())), + resource: ResourceAttributesWithSchema::default(), } } } diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index ac834b42e6..65759f7a4b 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -1,8 +1,5 @@ -use core::fmt; - -use std::sync::{Arc, Mutex}; - use async_trait::async_trait; +use core::fmt; use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, @@ -14,7 +11,9 @@ use super::BoxInterceptor; pub(crate) struct TonicLogsClient { inner: Option, - resource: Arc>, + #[allow(dead_code)] + // would be removed once we support set_resource for metrics and traces. + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { @@ -46,7 +45,7 @@ impl TonicLogsClient { client, interceptor, }), - resource: Arc::new(Mutex::new(opentelemetry_sdk::Resource::default())), + resource: Default::default(), } } } @@ -67,10 +66,9 @@ impl LogExporter for TonicLogsClient { }; let resource_logs = { - let resource = self.resource.lock().unwrap(); batch .into_iter() - .map(|log_event| (log_event, &*resource)) + .map(|log_data| (log_data, &self.resource)) .map(Into::into) .collect() }; @@ -91,7 +89,7 @@ impl LogExporter for TonicLogsClient { let _ = self.inner.take(); } - fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) { - todo!("set_resource") + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); } } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index 9aa29e4b22..8197d90574 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -22,6 +22,23 @@ pub mod tonic { use opentelemetry::{Array, Value}; use std::borrow::Cow; + #[cfg(any(feature = "trace", feature = "logs"))] + #[derive(Debug, Default)] + pub struct ResourceAttributesWithSchema { + pub attributes: Attributes, + pub schema_url: Option, + } + + #[cfg(any(feature = "trace", feature = "logs"))] + impl From<&opentelemetry_sdk::Resource> for ResourceAttributesWithSchema { + fn from(resource: &opentelemetry_sdk::Resource) -> Self { + ResourceAttributesWithSchema { + attributes: resource_attributes(resource), + schema_url: resource.schema_url().map(ToString::to_string), + } + } + } + #[cfg(any(feature = "trace", feature = "logs"))] use opentelemetry_sdk::Resource; @@ -52,7 +69,7 @@ pub mod tonic { } /// Wrapper type for Vec<`KeyValue`> - #[derive(Default)] + #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); impl From> for Attributes { diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 5206377555..3e41f23da1 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -7,7 +7,7 @@ pub mod tonic { resource::v1::Resource, Attributes, }, - transform::common::{to_nanos, tonic::resource_attributes}, + transform::common::{to_nanos, tonic::ResourceAttributesWithSchema}, }; use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity}; @@ -113,32 +113,32 @@ pub mod tonic { impl From<( opentelemetry_sdk::export::logs::LogData, - &opentelemetry_sdk::Resource, + &ResourceAttributesWithSchema, )> for ResourceLogs { fn from( data: ( opentelemetry_sdk::export::logs::LogData, - &opentelemetry_sdk::Resource, + &ResourceAttributesWithSchema, ), ) -> Self { - let (log_event, resource) = data; + let (log_data, resource) = data; ResourceLogs { resource: Some(Resource { - attributes: resource_attributes(resource).0, + attributes: resource.attributes.0.clone(), dropped_attributes_count: 0, }), - schema_url: resource.schema_url().map(Into::into).unwrap_or_default(), + schema_url: resource.schema_url.clone().unwrap(), scope_logs: vec![ScopeLogs { - schema_url: log_event + schema_url: log_data .instrumentation .schema_url .clone() .map(Into::into) .unwrap_or_default(), - scope: Some(log_event.instrumentation.into()), - log_records: vec![log_event.record.into()], + scope: Some(log_data.instrumentation.into()), + log_records: vec![log_data.record.into()], }], } } From a0dc96e720f4cf129f4d155aa472b193fe59f597 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 04:25:36 +0000 Subject: [PATCH 07/22] leftover comments --- opentelemetry-otlp/src/exporter/http/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index d54a490f0d..53a72d0d3e 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -257,7 +257,7 @@ struct OtlpHttpClient { headers: HashMap, _timeout: Duration, #[allow(dead_code)] - // this would be removed once we support set_resource for metrics and traces. + // would be removed once we support set_resource for metrics and traces. resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } From 36d9900a38bd8078bee1b7d5f4f1065aabe5ea08 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 07:00:32 +0000 Subject: [PATCH 08/22] propagate resource --- opentelemetry-sdk/src/export/logs/mod.rs | 15 ++------------- opentelemetry-sdk/src/logs/log_emitter.rs | 6 +++++- opentelemetry-sdk/src/logs/log_processor.rs | 6 +++--- .../src/testing/logs/in_memory_exporter.rs | 14 +++++++++++++- stress/src/logs.rs | 2 +- 5 files changed, 24 insertions(+), 19 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index d068f03bc2..ef02f359f3 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -7,7 +7,7 @@ use opentelemetry::{ logs::{LogError, LogRecord, LogResult}, InstrumentationLibrary, }; -use std::{borrow::Cow, fmt::Debug}; +use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] @@ -24,6 +24,7 @@ pub trait LogExporter: Send + Sync + Debug { /// Set the resource for the exporter. fn set_resource(&mut self, resource: &Resource); } + /// `LogData` represents a single log event without resource context. #[derive(Clone, Debug)] pub struct LogData { @@ -33,17 +34,5 @@ pub struct LogData { pub instrumentation: InstrumentationLibrary, } -/// `LogDataWithResource` associates a [`LogRecord`] with a [`Resource`] and -/// [`InstrumentationLibrary`]. -#[derive(Clone, Debug)] -pub struct LogDataWithResource { - /// Log record - pub record: LogRecord, - /// Instrumentation details for the emitter who produced this `LogData`. - pub instrumentation: InstrumentationLibrary, - /// Resource for the emitter who produced this `LogData`. - pub resource: Cow<'static, Resource>, -} - /// Describes the result of an export. pub type ExportResult = Result<(), LogError>; diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 55362b51b9..faef0dbdc4 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -156,6 +156,10 @@ impl Builder { /// Create a new provider from this configuration. pub fn build(self) -> LoggerProvider { + // invoke set_resource by invoking on all the processors + for processor in &self.processors { + processor.set_resource(&self.config.resource); + } LoggerProvider { inner: Arc::new(LoggerProviderInner { processors: self.processors, @@ -362,7 +366,7 @@ mod tests { true } - fn set_resource(&mut self, _resource: &crate::Resource) { + fn set_resource(&self, _resource: &crate::Resource) { // nothing to do. } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index c10d05114e..b255a53e53 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -54,7 +54,7 @@ pub trait LogProcessor: Send + Sync + Debug { fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; /// Set the resource for the log processor. - fn set_resource(&mut self, resource: &Resource); + fn set_resource(&self, resource: &Resource); } /// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon @@ -101,7 +101,7 @@ impl LogProcessor for SimpleLogProcessor { } } - fn set_resource(&mut self, resource: &Resource) { + fn set_resource(&self, resource: &Resource) { if let Ok(mut exporter) = self.exporter.lock() { exporter.set_resource(resource); } @@ -163,7 +163,7 @@ impl LogProcessor for BatchLogProcessor { .and_then(std::convert::identity) } - fn set_resource(&mut self, resource: &Resource) { + fn set_resource(&self, resource: &Resource) { let resource = Arc::new(resource.clone()); let _ = self .message_sender diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 0d4434ae6c..5b73e700c0 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::{LogData, LogDataWithResource, LogExporter}; +use crate::export::logs::{LogData, LogExporter}; use crate::Resource; use async_trait::async_trait; use opentelemetry::logs::{LogError, LogResult}; @@ -47,6 +47,18 @@ impl Default for InMemoryLogsExporter { } } +/// `LogDataWithResource` associates a [`LogRecord`] with a [`Resource`] and +/// [`InstrumentationLibrary`]. +#[derive(Clone, Debug)] +pub struct LogDataWithResource { + /// Log record + pub record: opentelemetry::logs::LogRecord, + /// Instrumentation details for the emitter who produced this `LogData`. + pub instrumentation: opentelemetry::InstrumentationLibrary, + /// Resource for the emitter who produced this `LogData`. + pub resource: Cow<'static, Resource>, +} + ///Builder for ['InMemoryLogsExporter']. /// # Example /// diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 3564aa79b3..20517b2cd5 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -57,7 +57,7 @@ impl LogProcessor for NoOpLogProcessor { true } - fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) { + fn set_resource(&self, _resource: &opentelemetry_sdk::Resource) { todo!("set_resource") } } From 7ecf10660e8cb54731d94817ffa11640810b74af Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 07:06:10 +0000 Subject: [PATCH 09/22] update changelog --- opentelemetry-sdk/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 275cf46761..1e5d3a659e 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -12,6 +12,10 @@ - **Breaking** [#1624](https://github.com/open-telemetry/opentelemetry-rust/pull/1624) Remove `OsResourceDetector` and `ProcessResourceDetector` resource detectors, use the [`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead. +- [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Send resource attributes once to + processor and exporter, and not for every event. + - Add `set_resource` method in LogProcessor an Exporter + - Propagate resource attributes to processor and exporter during LoggerProvider creation. ## v0.22.1 From f031e29fb2b1221891ac1706ab298c2766de7a36 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 07:27:55 +0000 Subject: [PATCH 10/22] fix doc error --- opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 5b73e700c0..0ab182c868 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,7 +1,8 @@ use crate::export::logs::{LogData, LogExporter}; use crate::Resource; use async_trait::async_trait; -use opentelemetry::logs::{LogError, LogResult}; +use opentelemetry::logs::{LogError, LogRecord, LogResult}; +use opentelemetry::InstrumentationLibrary; use std::borrow::Cow; use std::sync::{Arc, Mutex}; @@ -52,9 +53,9 @@ impl Default for InMemoryLogsExporter { #[derive(Clone, Debug)] pub struct LogDataWithResource { /// Log record - pub record: opentelemetry::logs::LogRecord, + pub record: LogRecord, /// Instrumentation details for the emitter who produced this `LogData`. - pub instrumentation: opentelemetry::InstrumentationLibrary, + pub instrumentation: InstrumentationLibrary, /// Resource for the emitter who produced this `LogData`. pub resource: Cow<'static, Resource>, } From 112a053b981fa9560a2a911a144df42bbdc0c06b Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 17:19:22 +0000 Subject: [PATCH 11/22] add tests --- opentelemetry-sdk/src/logs/log_emitter.rs | 2 +- opentelemetry-sdk/src/logs/log_processor.rs | 74 ++++++++++++++++++++- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index faef0dbdc4..4372743838 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -156,7 +156,7 @@ impl Builder { /// Create a new provider from this configuration. pub fn build(self) -> LoggerProvider { - // invoke set_resource by invoking on all the processors + // invoke set_resource on all the processors for processor in &self.processors { processor.set_resource(&self.config.resource); } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b255a53e53..c072bf4818 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -484,18 +484,49 @@ mod tests { OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; use crate::{ + export::logs::{LogData, LogExporter}, logs::{ log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, }, - BatchConfig, BatchConfigBuilder, + BatchConfig, BatchConfigBuilder, Config, LoggerProvider, SimpleLogProcessor, }, runtime, testing::logs::InMemoryLogsExporter, + Resource, }; + use async_trait::async_trait; + use opentelemetry::{logs::LogResult, KeyValue}; + use std::sync::Arc; use std::time::Duration; + #[derive(Debug, Clone)] + struct MockLogExporter { + resource: Arc>, + } + + #[async_trait] + impl LogExporter for MockLogExporter { + async fn export(&mut self, _batch: Vec) -> LogResult<()> { + Ok(()) + } + + fn shutdown(&mut self) {} + + fn set_resource(&mut self, resource: &Resource) { + let res = Arc::make_mut(&mut self.resource); + *res = Some(resource.clone()); + } + } + + // Implementation specific to the MockLogExporter, not part of the LogExporter trait + impl MockLogExporter { + fn get_resource(&self) -> Option { + (*self.resource).clone() + } + } + #[test] fn test_default_const_values() { assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY"); @@ -645,4 +676,45 @@ mod tests { assert_eq!(actual.max_export_timeout, Duration::from_millis(3)); assert_eq!(actual.max_queue_size, 4); } + + #[test] + fn test_set_resource_simple_processor() { + let exporter = MockLogExporter { + resource: Arc::new(Some(Resource::default())), + }; + let processor = SimpleLogProcessor::new(Box::new(exporter.clone())); + let _ = LoggerProvider::builder() + .with_log_processor(processor) + .with_config(Config::default().with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + ]))) + .build(); + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 4); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_set_resource_batch_processor() { + let exporter = MockLogExporter { + resource: Arc::new(Some(Resource::default())), + }; + let processor = BatchLogProcessor::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + let mut provider = LoggerProvider::builder() + .with_log_processor(processor) + .with_config(Config::default().with_resource(Resource::new(vec![ + KeyValue::new("k1", "v1"), + KeyValue::new("k2", "v3"), + KeyValue::new("k3", "v3"), + KeyValue::new("k4", "v4"), + ]))) + .build(); + assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 4); + provider.shutdown(); + } } From becd7798563ba3c3a0156ae2d5682b9f46bc58f0 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 18:35:50 +0000 Subject: [PATCH 12/22] Remove todo --- stress/src/logs.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 20517b2cd5..efd9144e0d 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -57,9 +57,7 @@ impl LogProcessor for NoOpLogProcessor { true } - fn set_resource(&self, _resource: &opentelemetry_sdk::Resource) { - todo!("set_resource") - } + fn set_resource(&self, _resource: &opentelemetry_sdk::Resource) {} } fn main() { From aaa3c89ccb2280feb3468c50582ea27d50c9edb6 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 19:27:10 +0000 Subject: [PATCH 13/22] add empty implementation for set_resource for processor and exporter --- opentelemetry-sdk/src/export/logs/mod.rs | 2 +- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index ef02f359f3..8676db9d16 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -22,7 +22,7 @@ pub trait LogExporter: Send + Sync + Debug { true } /// Set the resource for the exporter. - fn set_resource(&mut self, resource: &Resource); + fn set_resource(&mut self, _resource: &Resource) {} } /// `LogData` represents a single log event without resource context. diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index c072bf4818..9370516a31 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -54,7 +54,7 @@ pub trait LogProcessor: Send + Sync + Debug { fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; /// Set the resource for the log processor. - fn set_resource(&self, resource: &Resource); + fn set_resource(&self, _resource: &Resource) {} } /// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon From 71a4efd932c1c37a6e0b26550b4e11475203a2bc Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 12:28:55 -0700 Subject: [PATCH 14/22] Update opentelemetry-sdk/CHANGELOG.md Co-authored-by: Cijo Thomas --- opentelemetry-sdk/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 1e5d3a659e..25551d7d41 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -14,7 +14,7 @@ [`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead. - [#1636](https://github.com/open-telemetry/opentelemetry-rust/pull/1636) [Logs SDK] Send resource attributes once to processor and exporter, and not for every event. - - Add `set_resource` method in LogProcessor an Exporter + - Add `set_resource` method in LogProcessor and Exporter traits - Propagate resource attributes to processor and exporter during LoggerProvider creation. ## v0.22.1 From e4d5a70fcd5148eae4c4ed20e2bcdc055790d3af Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 28 Mar 2024 20:46:59 +0000 Subject: [PATCH 15/22] leftover commit --- opentelemetry-otlp/src/logs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 6e6f9c3757..92890228f0 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -104,8 +104,8 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { self.client.export(batch).await } - fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) { - todo!("set_resource"); + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.client.set_resource(resource); } } From de7dc51d8780ef56903e9260edcea830096db9e2 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 29 Mar 2024 16:27:41 +0000 Subject: [PATCH 16/22] remove redundant set_resource definitions --- opentelemetry-sdk/benches/log.rs | 2 -- opentelemetry-sdk/src/logs/log_emitter.rs | 4 ---- 2 files changed, 6 deletions(-) diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index 14bfae84c7..9b66ad0447 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -20,8 +20,6 @@ impl LogExporter for VoidExporter { async fn export(&mut self, _batch: Vec) -> LogResult<()> { LogResult::Ok(()) } - - fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {} } fn log_benchmark_group(c: &mut Criterion, name: &str, f: F) { diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 4372743838..bc2e0cd232 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -365,9 +365,5 @@ mod tests { fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { true } - - fn set_resource(&self, _resource: &crate::Resource) { - // nothing to do. - } } } From 0634469154b450e0a5ce260bf6634fe976e3403c Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 2 Apr 2024 20:15:00 +0000 Subject: [PATCH 17/22] avoid log data cloning --- opentelemetry-sdk/src/logs/log_emitter.rs | 35 +++++++++++++++++------ stress/Cargo.toml | 4 +-- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index bc2e0cd232..e7757523be 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -209,16 +209,33 @@ impl opentelemetry::logs::Logger for Logger { cx.has_active_span() .then(|| TraceContext::from(cx.span().span_context())) }); - for p in processors { - let mut record = record.clone(); - if let Some(ref trace_context) = trace_context { - record.trace_context = Some(trace_context.clone()) + // Prioritize the scenario where there is likely to be only one processor. + if processors.len() == 1 { + // If there is exactly one processor, use the record directly. + if let Some(p) = processors.first() { + let mut record = record; // Move the record directly, avoiding clone. + if let Some(ref trace_context) = trace_context { + record.trace_context = Some(trace_context.clone()); + } + let data = LogData { + record, + instrumentation: self.instrumentation_library().clone(), + }; + p.emit(data); + } + } else { + // For multiple processors, clone the record for each. + for p in processors { + let mut cloned_record = record.clone(); + if let Some(ref trace_context) = trace_context { + cloned_record.trace_context = Some(trace_context.clone()); + } + let data = LogData { + record: cloned_record, + instrumentation: self.instrumentation_library().clone(), + }; + p.emit(data); } - let data = LogData { - record, - instrumentation: self.instrumentation_library().clone(), - }; - p.emit(data); } } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 7ee1379b9d..331f6b2752 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -23,8 +23,8 @@ doc = false ctrlc = "3.2.5" lazy_static = "1.4.0" num_cpus = "1.15.0" -opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace"] } -opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace"] } +opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace", "logs_level_enabled"] } +opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "logs_level_enabled"] } opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"} rand = { version = "0.8.4", features = ["small_rng"] } tracing = { workspace = true, features = ["std"]} From b220635f935b09ae5494cab6b1006f1c5f176fe4 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 25 Apr 2024 23:44:26 +0000 Subject: [PATCH 18/22] remove single processor optimization --- opentelemetry-sdk/src/logs/log_emitter.rs | 36 +++++++---------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index d953abe0e0..a75060c0fb 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -216,33 +216,17 @@ impl opentelemetry::logs::Logger for Logger { cx.has_active_span() .then(|| TraceContext::from(cx.span().span_context())) }); - // Prioritize the scenario where there is likely to be only one processor. - if processors.len() == 1 { - // If there is exactly one processor, use the record directly. - if let Some(p) = processors.first() { - let mut record = record; // Move the record directly, avoiding clone. - if let Some(ref trace_context) = trace_context { - record.trace_context = Some(trace_context.clone()); - } - let data = LogData { - record, - instrumentation: self.instrumentation_library().clone(), - }; - p.emit(data); - } - } else { - // For multiple processors, clone the record for each. - for p in processors { - let mut cloned_record = record.clone(); - if let Some(ref trace_context) = trace_context { - cloned_record.trace_context = Some(trace_context.clone()); - } - let data = LogData { - record: cloned_record, - instrumentation: self.instrumentation_library().clone(), - }; - p.emit(data); + + for p in processors { + let mut cloned_record = record.clone(); + if let Some(ref trace_context) = trace_context { + cloned_record.trace_context = Some(trace_context.clone()); } + let data = LogData { + record: cloned_record, + instrumentation: self.instrumentation_library().clone(), + }; + p.emit(data); } } From 98c11b195f08d275d4194c2bd061773be16dde02 Mon Sep 17 00:00:00 2001 From: Lalit Date: Thu, 25 Apr 2024 22:09:23 -0700 Subject: [PATCH 19/22] resolve merge conflict --- opentelemetry-otlp/src/exporter/http/logs.rs | 2 +- opentelemetry-otlp/src/exporter/http/mod.rs | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index bd2d5822ea..4bad1cc39e 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -19,7 +19,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = {self.build_logs_export_body(batch, &self.resource)?}; + let (body, content_type) = { self.build_logs_export_body(batch, &self.resource)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 5f0cd7c2e3..fc46953ed1 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -324,12 +324,15 @@ impl OtlpHttpClient { fn build_logs_export_body( &self, logs: Vec, + resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; + let resource_logs = logs + .into_iter() + .map(|log_event| (log_event, resource).into()) + .collect::>(); + let req = ExportLogsServiceRequest { resource_logs }; - let req = ExportLogsServiceRequest { - resource_logs: logs.into_iter().map(Into::into).collect(), - }; match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { From 2ce3e0f18f268bd84018126eaafeddec7de5b58b Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 26 Apr 2024 09:37:49 -0700 Subject: [PATCH 20/22] review comment - remove set_resource overwrite method --- stress/src/logs.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 24176be096..aaa3a1d7ae 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -27,8 +27,6 @@ impl LogProcessor for NoOpLogProcessor { ) -> bool { true } - - fn set_resource(&self, _resource: &opentelemetry_sdk::Resource) {} } fn main() { From 0aef2fc94e872730a6c22f89584e413c66de64b1 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 26 Apr 2024 12:00:48 -0700 Subject: [PATCH 21/22] fix merge conflict --- opentelemetry-proto/src/transform/logs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 3e41f23da1..335526685b 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -129,7 +129,7 @@ pub mod tonic { attributes: resource.attributes.0.clone(), dropped_attributes_count: 0, }), - schema_url: resource.schema_url.clone().unwrap(), + schema_url: resource.schema_url.clone().unwrap_or_default(), scope_logs: vec![ScopeLogs { schema_url: log_data .instrumentation From 896f67203e35505caca182d5efacd66948fc0cf0 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 30 Apr 2024 11:20:38 -0700 Subject: [PATCH 22/22] fix merge conflict --- opentelemetry-sdk/src/logs/log_processor.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b746b2bd5d..d3d69c87b8 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -495,8 +495,6 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; - use crate::export::logs::LogData; - use crate::logs::{LogProcessor, SimpleLogProcessor}; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ export::logs::{LogData, LogExporter}, @@ -505,7 +503,8 @@ mod tests { OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, }, - BatchConfig, BatchConfigBuilder, Config, LoggerProvider, SimpleLogProcessor, + BatchConfig, BatchConfigBuilder, Config, LogProcessor, LoggerProvider, + SimpleLogProcessor, }, runtime, testing::logs::InMemoryLogsExporter, @@ -720,7 +719,7 @@ mod tests { BatchConfig::default(), runtime::Tokio, ); - let mut provider = LoggerProvider::builder() + let provider = LoggerProvider::builder() .with_log_processor(processor) .with_config(Config::default().with_resource(Resource::new(vec![ KeyValue::new("k1", "v1"), @@ -732,7 +731,7 @@ mod tests { assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 4); provider.shutdown(); } - + #[tokio::test(flavor = "multi_thread")] async fn test_batch_shutdown() { // assert we will receive an error @@ -747,7 +746,6 @@ mod tests { ); processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), }); processor.force_flush().unwrap(); @@ -755,7 +753,6 @@ mod tests { // todo: expect to see errors here. How should we assert this? processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), }); assert_eq!(1, exporter.get_emitted_logs().unwrap().len()) @@ -770,7 +767,6 @@ mod tests { processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), }); @@ -783,7 +779,6 @@ mod tests { processor.emit(LogData { record: Default::default(), - resource: Default::default(), instrumentation: Default::default(), });