Skip to content

Commit

Permalink
move FutureExt over too
Browse files Browse the repository at this point in the history
  • Loading branch information
scottgerring committed Mar 10, 2025
1 parent f781de0 commit 716cee2
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 81 deletions.
81 changes: 80 additions & 1 deletion opentelemetry/src/context/future_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,84 @@
use std::pin::Pin;
use std::task::Poll;
use futures_core::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use crate::Context;
use crate::trace::WithContext;
use std::task::Context as TaskContext;
impl<T: Sized> FutureExt for T {}

impl<T: std::future::Future> std::future::Future for WithContext<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();

this.inner.poll(task_cx)
}
}

impl<T: Stream> Stream for WithContext<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_next(this.inner, task_cx)
}
}

pin_project! {
/// A future, stream, or sink that has an associated context.
#[derive(Clone, Debug)]
pub struct WithContext<T> {
#[pin]
inner: T,
otel_cx: Context,
}
}

impl<I, T: Sink<I>> Sink<I> for WithContext<T>
where
T: Sink<I>,
{
type Error = T::Error;

fn poll_ready(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_ready(this.inner, task_cx)
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::start_send(this.inner, item)
}

fn poll_flush(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_flush(this.inner, task_cx)
}

fn poll_close(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _enter = this.otel_cx.clone().attach();
T::poll_close(this.inner, task_cx)
}
}



/// Extension trait allowing futures, streams, and sinks to be traced with a span.
pub trait FutureExt: Sized {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod future_ext;

pub use context_store::Context;
pub use context_store::ContextGuard;
pub use future_ext::FutureExt;
pub use future_ext::FutureExt;
78 changes: 0 additions & 78 deletions opentelemetry/src/trace/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@ use crate::{
trace::{Span, SpanContext, Status},
Context, ContextGuard, KeyValue,
};
use futures_core::stream::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::{
borrow::Cow,
error::Error,
pin::Pin,
sync::Mutex,
task::{Context as TaskContext, Poll},
};

// Re-export for compatability. This used to be contained here.
Expand Down Expand Up @@ -375,76 +370,3 @@ where
Context::map_current(|cx| f(cx.span()))
}

pin_project! {
/// A future, stream, or sink that has an associated context.
#[derive(Clone, Debug)]
pub struct WithContext<T> {
#[pin]
pub(crate) inner: T,
pub(crate) otel_cx: Context,
}
}

impl<T: Sized> FutureExt for T {}

impl<T: std::future::Future> std::future::Future for WithContext<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();

this.inner.poll(task_cx)
}
}

impl<T: Stream> Stream for WithContext<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_next(this.inner, task_cx)
}
}

impl<I, T: Sink<I>> Sink<I> for WithContext<T>
where
T: Sink<I>,
{
type Error = T::Error;

fn poll_ready(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_ready(this.inner, task_cx)
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::start_send(this.inner, item)
}

fn poll_flush(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_flush(this.inner, task_cx)
}

fn poll_close(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _enter = this.otel_cx.clone().attach();
T::poll_close(this.inner, task_cx)
}
}

2 changes: 1 addition & 1 deletion opentelemetry/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ mod tracer_provider;

pub use self::{
context::{
get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext,
get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt
},
span::{Span, SpanKind, Status},
span_context::{SpanContext, TraceState},
Expand Down

0 comments on commit 716cee2

Please sign in to comment.