diff --git a/Cargo.toml b/Cargo.toml index c939a86..727f7b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ futures-util = { version = "0.3", default-features = false } tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" tracing = { version = "0.1.35", default-features = false, features = ["std", "attributes"] } +tracing-error = "0.2.0" tracing-subscriber = { version = "0.3.0", default-features = false, features = ["registry", "std", "fmt"] } [target.'cfg(not(target_os = "windows"))'.dev-dependencies] diff --git a/benches/trace.rs b/benches/trace.rs index f4e6faf..83c1e56 100644 --- a/benches/trace.rs +++ b/benches/trace.rs @@ -1,13 +1,13 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::{ - trace::{SpanBuilder, Tracer as _, TracerProvider as _}, + trace::{Span, SpanBuilder, Tracer as _, TracerProvider as _}, Context, }; -use opentelemetry_sdk::trace::{Tracer, TracerProvider}; +use opentelemetry_sdk::trace::{Config, SpanLimits, Tracer, TracerProvider}; #[cfg(not(target_os = "windows"))] use pprof::criterion::{Output, PProfProfiler}; use std::time::SystemTime; -use tracing::trace_span; +use tracing::{trace, trace_span}; use tracing_subscriber::prelude::*; fn many_children(c: &mut Criterion) { @@ -55,6 +55,73 @@ fn many_children(c: &mut Criterion) { } } +fn many_events(c: &mut Criterion) { + let mut group = c.benchmark_group("otel_many_events"); + + group.bench_function("spec_baseline", |b| { + let provider = TracerProvider::default(); + let tracer = provider.tracer("bench"); + b.iter(|| { + fn dummy(tracer: &Tracer, cx: &Context) { + let mut span = tracer.start_with_context("child", cx); + for _ in 0..1000 { + span.add_event("name", Vec::new()); + } + } + + tracer.in_span("parent", |cx| dummy(&tracer, &cx)); + }); + }); + + { + let _subscriber = tracing_subscriber::registry() + .with(RegistryAccessLayer) + .set_default(); + group.bench_function("no_data_baseline", |b| b.iter(events_harness)); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(OtelDataLayer) + .set_default(); + group.bench_function("data_only_baseline", |b| b.iter(events_harness)); + } + + { + let provider = TracerProvider::default(); + let tracer = provider.tracer("bench"); + let otel_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_tracked_inactivity(false); + let _subscriber = tracing_subscriber::registry() + .with(otel_layer) + .set_default(); + + group.bench_function("full_filtered", |b| b.iter(events_harness)); + } + + { + let provider = TracerProvider::builder() + .with_config(Config { + span_limits: SpanLimits { + max_events_per_span: 1000, + ..SpanLimits::default() + }, + ..Config::default() + }) + .build(); + let tracer = provider.tracer("bench"); + let otel_layer = tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_tracked_inactivity(false); + let _subscriber = tracing_subscriber::registry() + .with(otel_layer) + .set_default(); + + group.bench_function("full_not_filtered", |b| b.iter(events_harness)); + } +} + struct NoDataSpan; struct RegistryAccessLayer; @@ -73,6 +140,23 @@ where extensions.insert(NoDataSpan); } + fn on_event( + &self, + event: &tracing_core::Event<'_>, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let Some(parent) = event.parent().and_then(|id| ctx.span(id)).or_else(|| { + event + .is_contextual() + .then(|| ctx.lookup_current()) + .flatten() + }) else { + return; + }; + let mut extensions = parent.extensions_mut(); + extensions.get_mut::(); + } + fn on_close(&self, id: tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) { let span = ctx.span(&id).expect("Span not found, this is a bug"); let mut extensions = span.extensions_mut(); @@ -100,6 +184,29 @@ where ); } + fn on_event( + &self, + event: &tracing_core::Event<'_>, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let Some(parent) = event.parent().and_then(|id| ctx.span(id)).or_else(|| { + event + .is_contextual() + .then(|| ctx.lookup_current()) + .flatten() + }) else { + return; + }; + let mut extensions = parent.extensions_mut(); + let builder = extensions + .get_mut::() + .expect("Builder not found in span, this is a bug"); + let events = builder.events.get_or_insert_with(Vec::new); + let otel_event = + opentelemetry::trace::Event::new(String::new(), SystemTime::now(), Vec::new(), 0); + events.push(otel_event); + } + fn on_close(&self, id: tracing::span::Id, ctx: tracing_subscriber::layer::Context<'_, S>) { let span = ctx.span(&id).expect("Span not found, this is a bug"); let mut extensions = span.extensions_mut(); @@ -124,16 +231,30 @@ fn tracing_harness() { dummy(); } +fn events_harness() { + fn dummy() { + let _child = trace_span!("child").entered(); + for _ in 0..1000 { + trace!("event"); + } + } + + let parent = trace_span!("parent"); + let _enter = parent.enter(); + + dummy(); +} + #[cfg(not(target_os = "windows"))] criterion_group! { name = benches; config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); - targets = many_children + targets = many_children, many_events } #[cfg(target_os = "windows")] criterion_group! { name = benches; config = Criterion::default(); - targets = many_children + targets = many_children, many_events } criterion_main!(benches); diff --git a/src/layer.rs b/src/layer.rs index 920373c..dc3203a 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1,15 +1,15 @@ use crate::{OtelData, PreSampledTracer}; use once_cell::unsync; use opentelemetry::{ - trace::{self as otel, noop, TraceContextExt}, + trace::{self as otel, noop, SpanBuilder, SpanKind, Status, TraceContextExt}, Context as OtelContext, Key, KeyValue, StringValue, Value, }; -use std::any::TypeId; use std::fmt; use std::marker; use std::thread; #[cfg(not(target_arch = "wasm32"))] use std::time::Instant; +use std::{any::TypeId, borrow::Cow}; use tracing_core::span::{self, Attributes, Id, Record}; use tracing_core::{field, Event, Subscriber}; #[cfg(feature = "tracing-log")] @@ -117,9 +117,45 @@ fn str_to_status(s: &str) -> otel::Status { } } +#[derive(Default)] +struct SpanBuilderUpdates { + name: Option>, + span_kind: Option, + status: Option, + attributes: Option>, +} + +impl SpanBuilderUpdates { + fn update(self, span_builder: &mut SpanBuilder) { + let Self { + name, + span_kind, + status, + attributes, + } = self; + + if let Some(name) = name { + span_builder.name = name; + } + if let Some(span_kind) = span_kind { + span_builder.span_kind = Some(span_kind); + } + if let Some(status) = status { + span_builder.status = status; + } + if let Some(attributes) = attributes { + if let Some(builder_attributes) = &mut span_builder.attributes { + builder_attributes.extend(attributes); + } else { + span_builder.attributes = Some(attributes); + } + } + } +} + struct SpanEventVisitor<'a, 'b> { event_builder: &'a mut otel::Event, - span_builder: Option<&'b mut otel::SpanBuilder>, + span_builder_updates: &'b mut Option, sem_conv_config: SemConvConfig, } @@ -186,9 +222,10 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { // In both cases, an event with an empty name and with an error attribute is created. "error" if self.event_builder.name.is_empty() => { if self.sem_conv_config.error_events_to_status { - if let Some(span) = &mut self.span_builder { - span.status = otel::Status::error(format!("{:?}", value)); - } + self.span_builder_updates + .get_or_insert_with(SpanBuilderUpdates::default) + .status + .replace(otel::Status::error(format!("{:?}", value))); } if self.sem_conv_config.error_events_to_exceptions { self.event_builder.name = EVENT_EXCEPTION_NAME.into(); @@ -225,9 +262,10 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { // In both cases, an event with an empty name and with an error attribute is created. "error" if self.event_builder.name.is_empty() => { if self.sem_conv_config.error_events_to_status { - if let Some(span) = &mut self.span_builder { - span.status = otel::Status::error(format!("{:?}", value)); - } + self.span_builder_updates + .get_or_insert_with(SpanBuilderUpdates::default) + .status + .replace(otel::Status::error(format!("{:?}", value))); } if self.sem_conv_config.error_events_to_exceptions { self.event_builder.name = EVENT_EXCEPTION_NAME.into(); @@ -288,25 +326,27 @@ impl<'a, 'b> field::Visit for SpanEventVisitor<'a, 'b> { } if self.sem_conv_config.error_records_to_exceptions { - if let Some(span) = &mut self.span_builder { - if let Some(attrs) = span.attributes.as_mut() { - attrs.push(KeyValue::new( - FIELD_EXCEPTION_MESSAGE, - Value::String(error_msg.clone().into()), - )); + let attributes = self + .span_builder_updates + .get_or_insert_with(SpanBuilderUpdates::default) + .attributes + .get_or_insert_with(Vec::new); - // NOTE: This is actually not the stacktrace of the exception. This is - // the "source chain". It represents the heirarchy of errors from the - // app level to the lowest level such as IO. It does not represent all - // of the callsites in the code that led to the error happening. - // `std::error::Error::backtrace` is a nightly-only API and cannot be - // used here until the feature is stabilized. - attrs.push(KeyValue::new( - FIELD_EXCEPTION_STACKTRACE, - Value::Array(chain.clone().into()), - )); - } - } + attributes.push(KeyValue::new( + FIELD_EXCEPTION_MESSAGE, + Value::String(error_msg.clone().into()), + )); + + // NOTE: This is actually not the stacktrace of the exception. This is + // the "source chain". It represents the heirarchy of errors from the + // app level to the lowest level such as IO. It does not represent all + // of the callsites in the code that led to the error happening. + // `std::error::Error::backtrace` is a nightly-only API and cannot be + // used here until the feature is stabilized. + attributes.push(KeyValue::new( + FIELD_EXCEPTION_STACKTRACE, + Value::Array(chain.clone().into()), + )); } self.event_builder @@ -354,16 +394,16 @@ struct SemConvConfig { } struct SpanAttributeVisitor<'a> { - span_builder: &'a mut otel::SpanBuilder, + span_builder_updates: &'a mut SpanBuilderUpdates, sem_conv_config: SemConvConfig, } impl<'a> SpanAttributeVisitor<'a> { fn record(&mut self, attribute: KeyValue) { - debug_assert!(self.span_builder.attributes.is_some()); - if let Some(v) = self.span_builder.attributes.as_mut() { - v.push(KeyValue::new(attribute.key, attribute.value)); - } + self.span_builder_updates + .attributes + .get_or_insert_with(Vec::new) + .push(KeyValue::new(attribute.key, attribute.value)); } } @@ -394,11 +434,11 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { /// [`Span`]: opentelemetry::trace::Span fn record_str(&mut self, field: &field::Field, value: &str) { match field.name() { - SPAN_NAME_FIELD => self.span_builder.name = value.to_string().into(), - SPAN_KIND_FIELD => self.span_builder.span_kind = str_to_span_kind(value), - SPAN_STATUS_CODE_FIELD => self.span_builder.status = str_to_status(value), + SPAN_NAME_FIELD => self.span_builder_updates.name = Some(value.to_string().into()), + SPAN_KIND_FIELD => self.span_builder_updates.span_kind = str_to_span_kind(value), + SPAN_STATUS_CODE_FIELD => self.span_builder_updates.status = Some(str_to_status(value)), SPAN_STATUS_MESSAGE_FIELD => { - self.span_builder.status = otel::Status::error(value.to_string()) + self.span_builder_updates.status = Some(otel::Status::error(value.to_string())) } _ => self.record(KeyValue::new(field.name(), value.to_string())), } @@ -410,15 +450,15 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { /// [`Span`]: opentelemetry::trace::Span fn record_debug(&mut self, field: &field::Field, value: &dyn fmt::Debug) { match field.name() { - SPAN_NAME_FIELD => self.span_builder.name = format!("{:?}", value).into(), + SPAN_NAME_FIELD => self.span_builder_updates.name = Some(format!("{:?}", value).into()), SPAN_KIND_FIELD => { - self.span_builder.span_kind = str_to_span_kind(&format!("{:?}", value)) + self.span_builder_updates.span_kind = str_to_span_kind(&format!("{:?}", value)) } SPAN_STATUS_CODE_FIELD => { - self.span_builder.status = str_to_status(&format!("{:?}", value)) + self.span_builder_updates.status = Some(str_to_status(&format!("{:?}", value))) } SPAN_STATUS_MESSAGE_FIELD => { - self.span_builder.status = otel::Status::error(format!("{:?}", value)) + self.span_builder_updates.status = Some(otel::Status::error(format!("{:?}", value))) } _ => self.record(Key::new(field.name()).string(format!("{:?}", value))), } @@ -904,10 +944,13 @@ where } } + let mut updates = SpanBuilderUpdates::default(); attrs.record(&mut SpanAttributeVisitor { - span_builder: &mut builder, + span_builder_updates: &mut updates, sem_conv_config: self.sem_conv_config, }); + + updates.update(&mut builder); extensions.insert(OtelData { builder, parent_cx }); } @@ -946,12 +989,14 @@ where /// [`attributes`]: opentelemetry::trace::SpanBuilder::attributes fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) { let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut updates = SpanBuilderUpdates::default(); + values.record(&mut SpanAttributeVisitor { + span_builder_updates: &mut updates, + sem_conv_config: self.sem_conv_config, + }); let mut extensions = span.extensions_mut(); if let Some(data) = extensions.get_mut::() { - values.record(&mut SpanAttributeVisitor { - span_builder: &mut data.builder, - sem_conv_config: self.sem_conv_config, - }); + updates.update(&mut data.builder); } } @@ -1023,10 +1068,6 @@ where #[cfg(not(feature = "tracing-log"))] let target = target.string(meta.target()); - // Move out extension data to not hold the extensions lock across the event.record() call, which could result in a deadlock - let mut otel_data = span.extensions_mut().remove::(); - let span_builder = otel_data.as_mut().map(|data| &mut data.builder); - let mut otel_event = otel::Event::new( String::new(), crate::time::now(), @@ -1034,13 +1075,17 @@ where 0, ); + let mut builder_updates = None; event.record(&mut SpanEventVisitor { event_builder: &mut otel_event, - span_builder, + span_builder_updates: &mut builder_updates, sem_conv_config: self.sem_conv_config, }); - if let Some(mut otel_data) = otel_data { + let mut extensions = span.extensions_mut(); + let otel_data = extensions.get_mut::(); + + if let Some(otel_data) = otel_data { let builder = &mut otel_data.builder; if builder.status == otel::Status::Unset @@ -1049,6 +1094,10 @@ where builder.status = otel::Status::error("") } + if let Some(builder_updates) = builder_updates { + builder_updates.update(builder); + } + if self.location { #[cfg(not(feature = "tracing-log"))] let normalized_meta: Option> = None; @@ -1085,8 +1134,6 @@ where } else { builder.events = Some(vec![otel_event]); } - - span.extensions_mut().replace(otel_data); } }; } @@ -1699,4 +1746,29 @@ mod tests { ) ); } + + #[test] + fn tracing_error_compatibility() { + let tracer = TestTracer(Arc::new(Mutex::new(None))); + let subscriber = tracing_subscriber::registry() + .with( + layer() + .with_error_fields_to_exceptions(false) + .with_tracer(tracer.clone()), + ) + .with(tracing_error::ErrorLayer::default()); + + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!("Blows up!", exception = tracing::field::Empty); + let _entered = span.enter(); + let context = tracing_error::SpanTrace::capture(); + + // This can cause a deadlock if `on_record` locks extensions while attributes are visited + span.record("exception", &tracing::field::debug(&context)); + // This can cause a deadlock if `on_event` locks extensions while the event is visited + tracing::info!(exception = &tracing::field::debug(&context), "hello"); + }); + + // No need to assert anything, as long as this finished (and did not panic), everything is ok. + } } diff --git a/tests/parallel.rs b/tests/parallel.rs new file mode 100644 index 0000000..6c9294b --- /dev/null +++ b/tests/parallel.rs @@ -0,0 +1,83 @@ +use futures_util::future::BoxFuture; +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_sdk::{ + export::trace::{ExportResult, SpanData, SpanExporter}, + trace::{Config, SpanLimits, Tracer, TracerProvider}, +}; +use std::sync::{Arc, Mutex}; +use tracing::level_filters::LevelFilter; +use tracing::Subscriber; +use tracing_opentelemetry::layer; +use tracing_subscriber::prelude::*; + +#[derive(Clone, Default, Debug)] +struct TestExporter(Arc>>); + +impl SpanExporter for TestExporter { + fn export(&mut self, mut batch: Vec) -> BoxFuture<'static, ExportResult> { + let spans = self.0.clone(); + Box::pin(async move { + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + }) + } +} + +fn test_tracer() -> ( + Tracer, + TracerProvider, + TestExporter, + impl Subscriber + Clone, +) { + let exporter = TestExporter::default(); + let provider = TracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .with_config(Config { + span_limits: SpanLimits { + max_events_per_span: u32::MAX, + ..SpanLimits::default() + }, + ..Config::default() + }) + .build(); + let tracer = provider.tracer("test"); + + let subscriber = tracing_subscriber::registry() + .with( + layer() + .with_tracer(tracer.clone()) + .with_filter(LevelFilter::TRACE), + ) + .with(tracing_subscriber::fmt::layer().with_filter(LevelFilter::DEBUG)); + + (tracer, provider, exporter, Arc::new(subscriber)) +} + +#[test] +fn multi_threading() { + let (_tracer, provider, exporter, subscriber) = test_tracer(); + + tracing::subscriber::with_default(subscriber.clone(), || { + let root = tracing::debug_span!("root"); + std::thread::scope(|scope| { + for _ in 0..10 { + scope.spawn(|| { + let _guard = tracing::subscriber::set_default(subscriber.clone()); + let _guard = root.enter(); + for _ in 0..1000 { + tracing::trace!("event"); + } + }); + } + }); + }); + + drop(provider); // flush all spans + let spans = exporter.0.lock().unwrap(); + + assert_eq!(spans.len(), 1); + + assert_eq!(spans.iter().next().unwrap().events.len(), 10_000); +} diff --git a/tests/parents.rs b/tests/parents.rs index 1621ee0..2934d20 100644 --- a/tests/parents.rs +++ b/tests/parents.rs @@ -33,7 +33,11 @@ fn test_tracer() -> (Tracer, TracerProvider, TestExporter, impl Subscriber) { let tracer = provider.tracer("test"); let subscriber = tracing_subscriber::registry() - .with(layer().with_tracer(tracer.clone()).with_filter(LevelFilter::DEBUG)) + .with( + layer() + .with_tracer(tracer.clone()) + .with_filter(LevelFilter::DEBUG), + ) .with(tracing_subscriber::fmt::layer().with_filter(LevelFilter::TRACE)); (tracer, provider, exporter, subscriber) @@ -81,7 +85,11 @@ fn explicit_parents_of_events() { let expected_root_events = ["1", "2", "5", "8", "9", "13"]; let root_span = spans.iter().find(|s| s.name == "root").unwrap(); - let actual_events: Vec<_> = root_span.events.iter().map(|event| event.name.to_string()).collect(); + let actual_events: Vec<_> = root_span + .events + .iter() + .map(|event| event.name.to_string()) + .collect(); assert_eq!(&expected_root_events, &actual_events[..]); } @@ -91,7 +99,11 @@ fn explicit_parents_of_events() { let expected_child_events = ["4", "6", "10", "14"]; let child_span = spans.iter().find(|s| s.name == "child").unwrap(); - let actual_events: Vec<_> = child_span.events.iter().map(|event| event.name.to_string()).collect(); + let actual_events: Vec<_> = child_span + .events + .iter() + .map(|event| event.name.to_string()) + .collect(); assert_eq!(&expected_child_events, &actual_events[..]); }