From c589841f9bfe608f0bf8aaf950d81f33becbc8f8 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Mar 2025 14:42:19 +0100 Subject: [PATCH 1/4] Rejig module structure to facilitate moving more into context module --- opentelemetry/src/{context.rs => context/context_store.rs} | 7 ++++--- opentelemetry/src/context/mod.rs | 4 ++++ 2 files changed, 8 insertions(+), 3 deletions(-) rename opentelemetry/src/{context.rs => context/context_store.rs} (99%) create mode 100644 opentelemetry/src/context/mod.rs diff --git a/opentelemetry/src/context.rs b/opentelemetry/src/context/context_store.rs similarity index 99% rename from opentelemetry/src/context.rs rename to opentelemetry/src/context/context_store.rs index 3707bd3758..809eec43d8 100644 --- a/opentelemetry/src/context.rs +++ b/opentelemetry/src/context/context_store.rs @@ -1,3 +1,4 @@ + use crate::otel_warn; #[cfg(feature = "trace")] use crate::trace::context::SynchronizedSpan; @@ -78,7 +79,7 @@ thread_local! { #[derive(Clone, Default)] pub struct Context { #[cfg(feature = "trace")] - pub(super) span: Option>, + pub(crate) span: Option>, entries: Option>, } @@ -314,7 +315,7 @@ impl Context { } #[cfg(feature = "trace")] - pub(super) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self { + pub(crate) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self { Context { span: Some(Arc::new(value)), entries: Context::map_current(|cx| cx.entries.clone()), @@ -322,7 +323,7 @@ impl Context { } #[cfg(feature = "trace")] - pub(super) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self { + pub(crate) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self { Context { span: Some(Arc::new(value)), entries: self.entries.clone(), diff --git a/opentelemetry/src/context/mod.rs b/opentelemetry/src/context/mod.rs new file mode 100644 index 0000000000..0873567f88 --- /dev/null +++ b/opentelemetry/src/context/mod.rs @@ -0,0 +1,4 @@ +mod context_store; + +pub use context_store::Context; +pub use context_store::ContextGuard; \ No newline at end of file From f781de0c751c2c29f09b1f2cb2e99066a2b01222 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Mar 2025 15:05:50 +0100 Subject: [PATCH 2/4] move FutureExt into context mod --- opentelemetry/src/context/future_ext.rs | 31 ++++++++++++++++++++++ opentelemetry/src/context/mod.rs | 4 ++- opentelemetry/src/trace/context.rs | 35 ++++--------------------- 3 files changed, 39 insertions(+), 31 deletions(-) create mode 100644 opentelemetry/src/context/future_ext.rs diff --git a/opentelemetry/src/context/future_ext.rs b/opentelemetry/src/context/future_ext.rs new file mode 100644 index 0000000000..7c0831af34 --- /dev/null +++ b/opentelemetry/src/context/future_ext.rs @@ -0,0 +1,31 @@ +use crate::Context; +use crate::trace::WithContext; + +/// Extension trait allowing futures, streams, and sinks to be traced with a span. +pub trait FutureExt: Sized { + /// Attaches the provided [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as current while it is being polled. + /// + /// [`Context`]: Context + fn with_context(self, otel_cx: Context) -> WithContext { + WithContext { + inner: self, + otel_cx, + } + } + + /// Attaches the current [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as the default while it is being polled. + /// + /// [`Context`]: Context + fn with_current_context(self) -> WithContext { + let otel_cx = Context::current(); + self.with_context(otel_cx) + } +} diff --git a/opentelemetry/src/context/mod.rs b/opentelemetry/src/context/mod.rs index 0873567f88..042c34d0d8 100644 --- a/opentelemetry/src/context/mod.rs +++ b/opentelemetry/src/context/mod.rs @@ -1,4 +1,6 @@ mod context_store; +mod future_ext; pub use context_store::Context; -pub use context_store::ContextGuard; \ No newline at end of file +pub use context_store::ContextGuard; +pub use future_ext::FutureExt; \ No newline at end of file diff --git a/opentelemetry/src/trace/context.rs b/opentelemetry/src/trace/context.rs index 125c96f5f5..7db6404676 100644 --- a/opentelemetry/src/trace/context.rs +++ b/opentelemetry/src/trace/context.rs @@ -15,6 +15,9 @@ use std::{ task::{Context as TaskContext, Poll}, }; +// Re-export for compatability. This used to be contained here. +pub use crate::context::FutureExt; + const NOOP_SPAN: SynchronizedSpan = SynchronizedSpan { span_context: SpanContext::NONE, inner: None, @@ -377,8 +380,8 @@ pin_project! { #[derive(Clone, Debug)] pub struct WithContext { #[pin] - inner: T, - otel_cx: Context, + pub(crate) inner: T, + pub(crate) otel_cx: Context, } } @@ -445,31 +448,3 @@ where } } -/// Extension trait allowing futures, streams, and sinks to be traced with a span. -pub trait FutureExt: Sized { - /// Attaches the provided [`Context`] to this type, returning a `WithContext` - /// wrapper. - /// - /// When the wrapped type is a future, stream, or sink, the attached context - /// will be set as current while it is being polled. - /// - /// [`Context`]: crate::Context - fn with_context(self, otel_cx: Context) -> WithContext { - WithContext { - inner: self, - otel_cx, - } - } - - /// Attaches the current [`Context`] to this type, returning a `WithContext` - /// wrapper. - /// - /// When the wrapped type is a future, stream, or sink, the attached context - /// will be set as the default while it is being polled. - /// - /// [`Context`]: crate::Context - fn with_current_context(self) -> WithContext { - let otel_cx = Context::current(); - self.with_context(otel_cx) - } -} From 716cee231536a5a610253dc7c666d99cfd6b8c11 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Mar 2025 15:22:07 +0100 Subject: [PATCH 3/4] move FutureExt over too --- opentelemetry/src/context/future_ext.rs | 81 ++++++++++++++++++++++++- opentelemetry/src/context/mod.rs | 2 +- opentelemetry/src/trace/context.rs | 78 ------------------------ opentelemetry/src/trace/mod.rs | 2 +- 4 files changed, 82 insertions(+), 81 deletions(-) diff --git a/opentelemetry/src/context/future_ext.rs b/opentelemetry/src/context/future_ext.rs index 7c0831af34..a1847c4666 100644 --- a/opentelemetry/src/context/future_ext.rs +++ b/opentelemetry/src/context/future_ext.rs @@ -1,5 +1,84 @@ +use std::pin::Pin; +use std::task::Poll; +use futures_core::Stream; +use futures_sink::Sink; +use pin_project_lite::pin_project; use crate::Context; -use crate::trace::WithContext; +use std::task::Context as TaskContext; +impl FutureExt for T {} + +impl std::future::Future for WithContext { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + + this.inner.poll(task_cx) + } +} + +impl Stream for WithContext { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_next(this.inner, task_cx) + } +} + +pin_project! { + /// A future, stream, or sink that has an associated context. + #[derive(Clone, Debug)] + pub struct WithContext { + #[pin] + inner: T, + otel_cx: Context, + } +} + +impl> Sink for WithContext +where + T: Sink, +{ + type Error = T::Error; + + fn poll_ready( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_ready(this.inner, task_cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::start_send(this.inner, item) + } + + fn poll_flush( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_flush(this.inner, task_cx) + } + + fn poll_close( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _enter = this.otel_cx.clone().attach(); + T::poll_close(this.inner, task_cx) + } +} + + /// Extension trait allowing futures, streams, and sinks to be traced with a span. pub trait FutureExt: Sized { diff --git a/opentelemetry/src/context/mod.rs b/opentelemetry/src/context/mod.rs index 042c34d0d8..47a9ac49f1 100644 --- a/opentelemetry/src/context/mod.rs +++ b/opentelemetry/src/context/mod.rs @@ -3,4 +3,4 @@ mod future_ext; pub use context_store::Context; pub use context_store::ContextGuard; -pub use future_ext::FutureExt; \ No newline at end of file +pub use future_ext::FutureExt; diff --git a/opentelemetry/src/trace/context.rs b/opentelemetry/src/trace/context.rs index 7db6404676..63c7610518 100644 --- a/opentelemetry/src/trace/context.rs +++ b/opentelemetry/src/trace/context.rs @@ -4,15 +4,10 @@ use crate::{ trace::{Span, SpanContext, Status}, Context, ContextGuard, KeyValue, }; -use futures_core::stream::Stream; -use futures_sink::Sink; -use pin_project_lite::pin_project; use std::{ borrow::Cow, error::Error, - pin::Pin, sync::Mutex, - task::{Context as TaskContext, Poll}, }; // Re-export for compatability. This used to be contained here. @@ -375,76 +370,3 @@ where Context::map_current(|cx| f(cx.span())) } -pin_project! { - /// A future, stream, or sink that has an associated context. - #[derive(Clone, Debug)] - pub struct WithContext { - #[pin] - pub(crate) inner: T, - pub(crate) otel_cx: Context, - } -} - -impl FutureExt for T {} - -impl std::future::Future for WithContext { - type Output = T::Output; - - fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - - this.inner.poll(task_cx) - } -} - -impl Stream for WithContext { - type Item = T::Item; - - fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_next(this.inner, task_cx) - } -} - -impl> Sink for WithContext -where - T: Sink, -{ - type Error = T::Error; - - fn poll_ready( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_ready(this.inner, task_cx) - } - - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::start_send(this.inner, item) - } - - fn poll_flush( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_flush(this.inner, task_cx) - } - - fn poll_close( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _enter = this.otel_cx.clone().attach(); - T::poll_close(this.inner, task_cx) - } -} - diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index b5ae4dbf49..a3dcf77993 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -178,7 +178,7 @@ mod tracer_provider; pub use self::{ context::{ - get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext, + get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt }, span::{Span, SpanKind, Status}, span_context::{SpanContext, TraceState}, From b35dec497632f18cee36bb8fce099e4d46c7807f Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Mar 2025 15:28:49 +0100 Subject: [PATCH 4/4] cargo formatting --- opentelemetry/src/context/context_store.rs | 1 - opentelemetry/src/context/future_ext.rs | 8 +++----- opentelemetry/src/trace/context.rs | 7 +------ opentelemetry/src/trace/mod.rs | 4 +--- 4 files changed, 5 insertions(+), 15 deletions(-) diff --git a/opentelemetry/src/context/context_store.rs b/opentelemetry/src/context/context_store.rs index 809eec43d8..3c4a3e5f1c 100644 --- a/opentelemetry/src/context/context_store.rs +++ b/opentelemetry/src/context/context_store.rs @@ -1,4 +1,3 @@ - use crate::otel_warn; #[cfg(feature = "trace")] use crate::trace::context::SynchronizedSpan; diff --git a/opentelemetry/src/context/future_ext.rs b/opentelemetry/src/context/future_ext.rs index a1847c4666..0321b0be25 100644 --- a/opentelemetry/src/context/future_ext.rs +++ b/opentelemetry/src/context/future_ext.rs @@ -1,10 +1,10 @@ -use std::pin::Pin; -use std::task::Poll; +use crate::Context; use futures_core::Stream; use futures_sink::Sink; use pin_project_lite::pin_project; -use crate::Context; +use std::pin::Pin; use std::task::Context as TaskContext; +use std::task::Poll; impl FutureExt for T {} impl std::future::Future for WithContext { @@ -78,8 +78,6 @@ where } } - - /// Extension trait allowing futures, streams, and sinks to be traced with a span. pub trait FutureExt: Sized { /// Attaches the provided [`Context`] to this type, returning a `WithContext` diff --git a/opentelemetry/src/trace/context.rs b/opentelemetry/src/trace/context.rs index 63c7610518..38f4d48d61 100644 --- a/opentelemetry/src/trace/context.rs +++ b/opentelemetry/src/trace/context.rs @@ -4,11 +4,7 @@ use crate::{ trace::{Span, SpanContext, Status}, Context, ContextGuard, KeyValue, }; -use std::{ - borrow::Cow, - error::Error, - sync::Mutex, -}; +use std::{borrow::Cow, error::Error, sync::Mutex}; // Re-export for compatability. This used to be contained here. pub use crate::context::FutureExt; @@ -369,4 +365,3 @@ where { Context::map_current(|cx| f(cx.span())) } - diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index a3dcf77993..a0dae489c4 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -177,9 +177,7 @@ mod tracer; mod tracer_provider; pub use self::{ - context::{ - get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt - }, + context::{get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt}, span::{Span, SpanKind, Status}, span_context::{SpanContext, TraceState}, tracer::{SamplingDecision, SamplingResult, SpanBuilder, Tracer},