From 716cee231536a5a610253dc7c666d99cfd6b8c11 Mon Sep 17 00:00:00 2001 From: Scott Gerring Date: Mon, 10 Mar 2025 15:22:07 +0100 Subject: [PATCH] move FutureExt over too --- opentelemetry/src/context/future_ext.rs | 81 ++++++++++++++++++++++++- opentelemetry/src/context/mod.rs | 2 +- opentelemetry/src/trace/context.rs | 78 ------------------------ opentelemetry/src/trace/mod.rs | 2 +- 4 files changed, 82 insertions(+), 81 deletions(-) diff --git a/opentelemetry/src/context/future_ext.rs b/opentelemetry/src/context/future_ext.rs index 7c0831af34..a1847c4666 100644 --- a/opentelemetry/src/context/future_ext.rs +++ b/opentelemetry/src/context/future_ext.rs @@ -1,5 +1,84 @@ +use std::pin::Pin; +use std::task::Poll; +use futures_core::Stream; +use futures_sink::Sink; +use pin_project_lite::pin_project; use crate::Context; -use crate::trace::WithContext; +use std::task::Context as TaskContext; +impl FutureExt for T {} + +impl std::future::Future for WithContext { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + + this.inner.poll(task_cx) + } +} + +impl Stream for WithContext { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_next(this.inner, task_cx) + } +} + +pin_project! { + /// A future, stream, or sink that has an associated context. + #[derive(Clone, Debug)] + pub struct WithContext { + #[pin] + inner: T, + otel_cx: Context, + } +} + +impl> Sink for WithContext +where + T: Sink, +{ + type Error = T::Error; + + fn poll_ready( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_ready(this.inner, task_cx) + } + + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::start_send(this.inner, item) + } + + fn poll_flush( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _guard = this.otel_cx.clone().attach(); + T::poll_flush(this.inner, task_cx) + } + + fn poll_close( + self: Pin<&mut Self>, + task_cx: &mut TaskContext<'_>, + ) -> Poll> { + let this = self.project(); + let _enter = this.otel_cx.clone().attach(); + T::poll_close(this.inner, task_cx) + } +} + + /// Extension trait allowing futures, streams, and sinks to be traced with a span. pub trait FutureExt: Sized { diff --git a/opentelemetry/src/context/mod.rs b/opentelemetry/src/context/mod.rs index 042c34d0d8..47a9ac49f1 100644 --- a/opentelemetry/src/context/mod.rs +++ b/opentelemetry/src/context/mod.rs @@ -3,4 +3,4 @@ mod future_ext; pub use context_store::Context; pub use context_store::ContextGuard; -pub use future_ext::FutureExt; \ No newline at end of file +pub use future_ext::FutureExt; diff --git a/opentelemetry/src/trace/context.rs b/opentelemetry/src/trace/context.rs index 7db6404676..63c7610518 100644 --- a/opentelemetry/src/trace/context.rs +++ b/opentelemetry/src/trace/context.rs @@ -4,15 +4,10 @@ use crate::{ trace::{Span, SpanContext, Status}, Context, ContextGuard, KeyValue, }; -use futures_core::stream::Stream; -use futures_sink::Sink; -use pin_project_lite::pin_project; use std::{ borrow::Cow, error::Error, - pin::Pin, sync::Mutex, - task::{Context as TaskContext, Poll}, }; // Re-export for compatability. This used to be contained here. @@ -375,76 +370,3 @@ where Context::map_current(|cx| f(cx.span())) } -pin_project! { - /// A future, stream, or sink that has an associated context. - #[derive(Clone, Debug)] - pub struct WithContext { - #[pin] - pub(crate) inner: T, - pub(crate) otel_cx: Context, - } -} - -impl FutureExt for T {} - -impl std::future::Future for WithContext { - type Output = T::Output; - - fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - - this.inner.poll(task_cx) - } -} - -impl Stream for WithContext { - type Item = T::Item; - - fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_next(this.inner, task_cx) - } -} - -impl> Sink for WithContext -where - T: Sink, -{ - type Error = T::Error; - - fn poll_ready( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_ready(this.inner, task_cx) - } - - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::start_send(this.inner, item) - } - - fn poll_flush( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _guard = this.otel_cx.clone().attach(); - T::poll_flush(this.inner, task_cx) - } - - fn poll_close( - self: Pin<&mut Self>, - task_cx: &mut TaskContext<'_>, - ) -> Poll> { - let this = self.project(); - let _enter = this.otel_cx.clone().attach(); - T::poll_close(this.inner, task_cx) - } -} - diff --git a/opentelemetry/src/trace/mod.rs b/opentelemetry/src/trace/mod.rs index b5ae4dbf49..a3dcf77993 100644 --- a/opentelemetry/src/trace/mod.rs +++ b/opentelemetry/src/trace/mod.rs @@ -178,7 +178,7 @@ mod tracer_provider; pub use self::{ context::{ - get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext, + get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt }, span::{Span, SpanKind, Status}, span_context::{SpanContext, TraceState},