diff --git a/opentelemetry/src/context.rs b/opentelemetry/src/context/context_store.rs similarity index 99% rename from opentelemetry/src/context.rs rename to opentelemetry/src/context/context_store.rs index 3707bd3758..3c4a3e5f1c 100644 --- a/opentelemetry/src/context.rs +++ b/opentelemetry/src/context/context_store.rs @@ -78,7 +78,7 @@ thread_local! { #[derive(Clone, Default)] pub struct Context { #[cfg(feature = "trace")] - pub(super) span: Option>, + pub(crate) span: Option>, entries: Option>, } @@ -314,7 +314,7 @@ impl Context { } #[cfg(feature = "trace")] - pub(super) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self { + pub(crate) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self { Context { span: Some(Arc::new(value)), entries: Context::map_current(|cx| cx.entries.clone()), @@ -322,7 +322,7 @@ impl Context { } #[cfg(feature = "trace")] - pub(super) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self { + pub(crate) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self { Context { span: Some(Arc::new(value)), entries: self.entries.clone(), diff --git a/opentelemetry/src/context/future_ext.rs b/opentelemetry/src/context/future_ext.rs new file mode 100644 index 0000000000..0321b0be25 --- /dev/null +++ b/opentelemetry/src/context/future_ext.rs @@ -0,0 +1,108 @@ +use crate::Context; +use futures_core::Stream; +use futures_sink::Sink; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::Context as TaskContext; +use std::task::Poll; +impl FutureExt for T {} + +impl std::future::Future for WithContext { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + + this.inner.poll(task_cx) + } +} + +impl Stream for WithContext { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_next(this.inner, task_cx) + } +} + +pin_project! { + /// A future, stream, or sink that has an associated context. + #[derive(Clone, Debug)] + pub struct WithContext { + #[pin] + inner: T, + otel_cx: Context, + } +} + +impl> Sink for WithContext +where + T: Sink, +{ + type Error = T::Error; + + fn poll_ready( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_ready(this.inner, task_cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::start_send(this.inner, item) + } + + fn poll_flush( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_flush(this.inner, task_cx) + } + + fn poll_close( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _enter = this.otel_cx.clone().attach(); + T::poll_close(this.inner, task_cx) + } +} + +/// Extension trait allowing futures, streams, and sinks to be traced with a span. +pub trait FutureExt: Sized { + /// Attaches the provided [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as current while it is being polled. + /// + /// [`Context`]: Context + fn with_context(self, otel_cx: Context) -> WithContext { + WithContext { + inner: self, + otel_cx, + } + } + + /// Attaches the current [`Context`] to this type, returning a `WithContext` + /// wrapper. + /// + /// When the wrapped type is a future, stream, or sink, the attached context + /// will be set as the default while it is being polled. + /// + /// [`Context`]: Context + fn with_current_context(self) -> WithContext { + let otel_cx = Context::current(); + self.with_context(otel_cx) + } +} diff --git a/opentelemetry/src/context/mod.rs b/opentelemetry/src/context/mod.rs new file mode 100644 index 0000000000..47a9ac49f1 --- /dev/null +++ b/opentelemetry/src/context/mod.rs @@ -0,0 +1,6 @@ +mod context_store; +mod future_ext; + +pub use context_store::Context; +pub use context_store::ContextGuard; +pub use future_ext::FutureExt; diff --git a/opentelemetry/src/trace/context.rs b/opentelemetry/src/trace/context.rs index 125c96f5f5..38f4d48d61 100644 --- a/opentelemetry/src/trace/context.rs +++ b/opentelemetry/src/trace/context.rs @@ -4,16 +4,10 @@ use crate::{ trace::{Span, SpanContext, Status}, Context, ContextGuard, KeyValue, }; -use futures_core::stream::Stream; -use futures_sink::Sink; -use pin_project_lite::pin_project; -use std::{ - borrow::Cow, - error::Error, - pin::Pin, - sync::Mutex, - task::{Context as TaskContext, Poll}, -}; +use std::{borrow::Cow, error::Error, sync::Mutex}; + +// Re-export for compatability. This used to be contained here. +pub use crate::context::FutureExt; const NOOP_SPAN: SynchronizedSpan = SynchronizedSpan { span_context: SpanContext::NONE, @@ -371,105 +365,3 @@ where { Context::map_current(|cx| f(cx.span())) } - -pin_project! { - /// A future, stream, or sink that has an associated context. - #[derive(Clone, Debug)] - pub struct WithContext { - #[pin] - inner: T, - otel_cx: Context, - } -} - -impl FutureExt for T {} - -impl std::future::Future for WithContext { - type Output = T::Output; - - fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - - this.inner.poll(task_cx) - } -} - -impl Stream for WithContext { - type Item = T::Item; - - fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_next(this.inner, task_cx) - } -} - -impl> Sink for WithContext -where - T: Sink, -{ - type Error = T::Error; - - fn poll_ready( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_ready(this.inner, task_cx) - } - - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::start_send(this.inner, item) - } - - fn poll_flush( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_flush(this.inner, task_cx) - } - - fn poll_close( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _enter = this.otel_cx.clone().attach(); - T::poll_close(this.inner, task_cx) - } -} - -/// Extension trait allowing futures, streams, and sinks to be traced with a span. -pub trait FutureExt: Sized { - /// Attaches the provided [`Context`] to this type, returning a `WithContext` - /// wrapper. - /// - /// When the wrapped type is a future, stream, or sink, the attached context - /// will be set as current while it is being polled. - /// - /// [`Context`]: crate::Context - fn with_context(self, otel_cx: Context) -> WithContext { - WithContext { - inner: self, - otel_cx, - } - } - - /// Attaches the current [`Context`] to this type, returning a `WithContext` - /// wrapper. - /// - /// When the wrapped type is a future, stream, or sink, the attached context - /// will be set as the default while it is being polled. - /// - /// [`Context`]: crate::Context - fn with_current_context(self) -> WithContext { - let otel_cx = Context::current(); - self.with_context(otel_cx) - } -} diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index b5ae4dbf49..a0dae489c4 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -177,9 +177,7 @@ mod tracer; mod tracer_provider; pub use self::{ - context::{ - get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext, - }, + context::{get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt}, span::{Span, SpanKind, Status}, span_context::{SpanContext, TraceState}, tracer::{SamplingDecision, SamplingResult, SpanBuilder, Tracer},