From 828354f1cbd4c8ec0a564c25b4277c5fc8aa11fa Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 30 Apr 2024 16:07:31 +0100 Subject: [PATCH 1/2] Collect span attributes Signed-off-by: Kai Fricke --- opentelemetry-appender-tracing/src/layer.rs | 341 +++++++++++++++++++- 1 file changed, 330 insertions(+), 11 deletions(-) diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index ef2a691dcc..9ceebbe824 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -3,6 +3,7 @@ use opentelemetry::{ Key, }; use std::borrow::Cow; +use std::collections::HashMap; use tracing_core::{Level, Metadata}; #[cfg(feature = "experimental_metadata_attributes")] use tracing_log::NormalizeEvent; @@ -122,12 +123,41 @@ impl tracing::field::Visit for EventVisitor { // TODO: Remaining field types from AnyValue : Bytes, ListAny, Boolean } +#[derive(Debug, Default)] +struct OtelAttrs(HashMap); + +impl OtelAttrs { + fn set_metadata(&mut self, metadata: &Metadata) { + self.0.insert("name".into(), metadata.name().into()); + } + + fn init(&mut self, attrs: &tracing_core::span::Attributes<'_>) { + let mut visitor = EventVisitor::default(); + attrs.values().record(&mut visitor); + + self.0.extend(visitor.log_record_attributes); + } + + fn update(&mut self, record: &tracing_core::span::Record<'_>) { + let mut visitor = EventVisitor::default(); + record.record(&mut visitor); + + self.0.extend(visitor.log_record_attributes); + } + + pub fn get_attrs(&self) -> &HashMap { + &self.0 + } +} + pub struct OpenTelemetryTracingBridge where P: LoggerProvider + Send + Sync, L: Logger + Send + Sync, { logger: L, + collect_span_attributes: bool, + flatten_span_attributes: bool, _phantom: std::marker::PhantomData

, // P is not used. } @@ -142,22 +172,34 @@ where .logger_builder(INSTRUMENTATION_LIBRARY_NAME) .with_version(Cow::Borrowed(env!("CARGO_PKG_VERSION"))) .build(), + collect_span_attributes: false, + flatten_span_attributes: false, _phantom: Default::default(), } } + + pub fn with_collect_span_attributes(self, enable: bool) -> Self { + OpenTelemetryTracingBridge { + collect_span_attributes: enable, + ..self + } + } + + pub fn with_flatten_span_attributes(self, enable: bool) -> Self { + OpenTelemetryTracingBridge { + flatten_span_attributes: enable, + ..self + } + } } impl Layer for OpenTelemetryTracingBridge where - S: tracing::Subscriber, + S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, P: LoggerProvider + Send + Sync + 'static, L: Logger + Send + Sync + 'static, { - fn on_event( - &self, - event: &tracing::Event<'_>, - _ctx: tracing_subscriber::layer::Context<'_, S>, - ) { + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { #[cfg(feature = "experimental_metadata_attributes")] let normalized_meta = event.normalized_metadata(); #[cfg(feature = "experimental_metadata_attributes")] @@ -175,6 +217,40 @@ where let mut visitor = EventVisitor::default(); visitor.visit_metadata(meta); + + // Add span fields. + if self.collect_span_attributes { + if let Some(span_id) = event.parent().or(ctx.current_span().id()) { + let start_span = ctx.span(span_id).expect("Span not found, this is a bug"); + + // Build tree of span attributes. The leaf will be the outermost attribute. + // If attributes are flattened and there are conflicts, this will keep + // the values of the leaf span. + let mut last_span_attrs: HashMap = HashMap::new(); + for span in start_span.scope().from_root() { + if let Some(otel_attrs) = span.extensions().get::() { + let mut attrs = otel_attrs.get_attrs().clone(); + + if self.flatten_span_attributes { + last_span_attrs.extend(attrs) + } else { + if !last_span_attrs.is_empty() { + attrs.insert("span".into(), AnyValue::Map(last_span_attrs)); + } + last_span_attrs = attrs; + } + } else { + break; + } + } + if !last_span_attrs.is_empty() { + visitor + .log_record_attributes + .push(("span".into(), AnyValue::Map(last_span_attrs))); + } + } + } + // Visit fields. event.record(&mut visitor); visitor.push_to_otel_log_record(&mut log_record); @@ -182,6 +258,55 @@ where self.logger.emit(log_record); } + fn on_new_span( + &self, + attrs: &tracing_core::span::Attributes<'_>, + id: &tracing_core::span::Id, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + if !self.collect_span_attributes { + return; + } + + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if extensions.get_mut::().is_none() { + let meta = attrs.metadata(); + + let mut otel_attrs = OtelAttrs::default(); + otel_attrs.set_metadata(meta); + otel_attrs.init(attrs); + + extensions.insert(otel_attrs); + } + } + + fn on_record( + &self, + id: &tracing_core::span::Id, + record: &tracing_core::span::Record<'_>, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + if !self.collect_span_attributes { + return; + } + + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + // Set values on existing OtelAttrs if they exist. + if let Some(otel_attrs) = extensions.get_mut::() { + let _ = otel_attrs.update(record); + return; + } + + // Otherwise create new OtelAttrs. + let mut otel_attrs = OtelAttrs::default(); + otel_attrs.update(record); + extensions.insert(otel_attrs); + } + #[cfg(feature = "logs_level_enabled")] fn event_enabled( &self, @@ -206,7 +331,7 @@ const fn severity_of_level(level: &Level) -> Severity { #[cfg(test)] mod tests { - use crate::layer; + use super::OpenTelemetryTracingBridge; use opentelemetry::logs::Severity; use opentelemetry::trace::TracerProvider as _; use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer}; @@ -214,6 +339,7 @@ mod tests { use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::testing::logs::InMemoryLogsExporter; use opentelemetry_sdk::trace::{config, Sampler, TracerProvider}; + use std::collections::HashMap; use tracing::error; use tracing_subscriber::layer::SubscriberExt; @@ -226,7 +352,7 @@ mod tests { .with_simple_exporter(exporter.clone()) .build(); - let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); // avoiding setting tracing subscriber as global as that does not @@ -292,7 +418,7 @@ mod tests { .with_simple_exporter(exporter.clone()) .build(); - let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); // avoiding setting tracing subscriber as global as that does not @@ -389,7 +515,7 @@ mod tests { .with_simple_exporter(exporter.clone()) .build(); - let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); // avoiding setting tracing subscriber as global as that does not @@ -455,7 +581,7 @@ mod tests { .with_simple_exporter(exporter.clone()) .build(); - let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); // avoiding setting tracing subscriber as global as that does not @@ -543,4 +669,197 @@ mod tests { assert!(attributes_key.contains(&Key::new("log.target"))); } } + + #[test] + fn tracing_appender_with_span_data_nested() { + let exporter: InMemoryLogsExporter = InMemoryLogsExporter::default(); + let logger_provider = LoggerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + + let layer = + OpenTelemetryTracingBridge::new(&logger_provider).with_collect_span_attributes(true); + let subscriber = tracing_subscriber::registry().with(layer); + + let _guard = tracing::subscriber::set_default(subscriber); + + let outer_span = + tracing::info_span!("outer_span", foo = "bar", set_later = tracing::field::Empty); + + outer_span.in_scope(|| { + tracing::info!("Simple log message"); + tracing::info!(baz = 42, "Log message with metrics"); + + outer_span.record("set_later", 99); + + tracing::info!("After record (first)"); + + outer_span.record("set_later", 77); + + tracing::info!("After record (update)"); + + let nested_span = tracing::info_span!("nested_span", nested_val = 1337, foo = "oof"); + nested_span.in_scope(|| tracing::info!(foo = "overwrite", "This is in a nested span")); + }); + + let emitted = exporter.get_emitted_logs().expect("No logs emitted"); + assert!(emitted.len() == 5); + + let mut outer_span = HashMap::new(); + outer_span.insert(Key::new("name"), AnyValue::from("outer_span")); + outer_span.insert(Key::new("foo"), AnyValue::from("bar")); + + // Simple log message + assert!(emitted[0] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + + // Log message with metrics + assert!(emitted[1] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + assert!(emitted[1] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("baz"), AnyValue::from(42)))); + + // After record (first) + outer_span.insert(Key::new("set_later"), AnyValue::from(99)); + assert!(emitted[2] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + + // After record (update) + outer_span.insert(Key::new("set_later"), AnyValue::from(77)); + assert!(emitted[3] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + + // nested_span + let mut nested_span = HashMap::new(); + nested_span.insert(Key::new("name"), AnyValue::from("nested_span")); + nested_span.insert(Key::new("nested_val"), AnyValue::from(1337)); + nested_span.insert(Key::new("foo"), AnyValue::from("oof")); + + nested_span.insert(Key::new("span"), AnyValue::Map(outer_span)); + + assert!(emitted[4] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(nested_span.clone())))); + } + + #[test] + fn tracing_appender_with_span_data_flattened() { + let exporter: InMemoryLogsExporter = InMemoryLogsExporter::default(); + let logger_provider = LoggerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + + let layer = OpenTelemetryTracingBridge::new(&logger_provider) + .with_collect_span_attributes(true) + .with_flatten_span_attributes(true); + let subscriber = tracing_subscriber::registry().with(layer); + + let _guard = tracing::subscriber::set_default(subscriber); + + let outer_span = + tracing::info_span!("outer_span", foo = "bar", set_later = tracing::field::Empty); + + outer_span.in_scope(|| { + tracing::info!("Simple log message"); + tracing::info!(baz = 42, "Log message with metrics"); + + outer_span.record("set_later", 99); + + tracing::info!("After record (first)"); + + outer_span.record("set_later", 77); + + tracing::info!("After record (update)"); + + let nested_span = tracing::info_span!("nested_span", nested_val = 1337, foo = "oof"); + nested_span.in_scope(|| tracing::info!(foo = "overwrite", "This is in a nested span")); + }); + + let emitted = exporter.get_emitted_logs().expect("No logs emitted"); + assert!(emitted.len() == 5); + + let mut outer_span = HashMap::new(); + outer_span.insert(Key::new("name"), AnyValue::from("outer_span")); + outer_span.insert(Key::new("foo"), AnyValue::from("bar")); + + // Simple log message + assert!(emitted[0] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + + // Log message with metrics + assert!(emitted[1] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + assert!(emitted[1] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("baz"), AnyValue::from(42)))); + + // After record (first) + outer_span.insert(Key::new("set_later"), AnyValue::from(99)); + assert!(emitted[2] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + + // After record (update) + outer_span.insert(Key::new("set_later"), AnyValue::from(77)); + assert!(emitted[3] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(outer_span.clone())))); + + // nested_span + // Use outer_span data as the source + let mut nested_span = outer_span.clone(); + nested_span.insert(Key::new("name"), AnyValue::from("nested_span")); + nested_span.insert(Key::new("nested_val"), AnyValue::from(1337)); + // This value gets overwritten + nested_span.insert(Key::new("foo"), AnyValue::from("oof")); + + // Because we flatten, the outer span will not be included in the nested_span + + assert!(emitted[4] + .record + .attributes + .as_ref() + .expect("Attributes") + .contains(&(Key::new("span"), AnyValue::Map(nested_span.clone())))); + } } From 279061aa7a8fc590fe6cfab0c05f58efed9c978e Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 30 Apr 2024 16:11:58 +0100 Subject: [PATCH 2/2] changelog Signed-off-by: Kai Fricke --- opentelemetry-appender-tracing/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-appender-tracing/CHANGELOG.md b/opentelemetry-appender-tracing/CHANGELOG.md index d7c0522624..313cff48fe 100644 --- a/opentelemetry-appender-tracing/CHANGELOG.md +++ b/opentelemetry-appender-tracing/CHANGELOG.md @@ -3,6 +3,7 @@ ## vNext - Removed unwanted dependency on opentelemetry-sdk. +- Added support to collect attributes from tracing spans. ## v0.3.0