Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Move FutureExt into context #2776

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ thread_local! {
#[derive(Clone, Default)]
pub struct Context {
#[cfg(feature = "trace")]
pub(super) span: Option<Arc<SynchronizedSpan>>,
pub(crate) span: Option<Arc<SynchronizedSpan>>,
entries: Option<Arc<EntryMap>>,
}

Expand Down Expand Up @@ -314,15 +314,15 @@ impl Context {
}

#[cfg(feature = "trace")]
pub(super) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self {
pub(crate) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self {
Context {
span: Some(Arc::new(value)),
entries: Context::map_current(|cx| cx.entries.clone()),
}
}

#[cfg(feature = "trace")]
pub(super) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self {
pub(crate) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self {
Context {
span: Some(Arc::new(value)),
entries: self.entries.clone(),
Expand Down
108 changes: 108 additions & 0 deletions opentelemetry/src/context/future_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::Context;
use futures_core::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::Context as TaskContext;
use std::task::Poll;
impl<T: Sized> FutureExt for T {}

impl<T: std::future::Future> std::future::Future for WithContext<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();

this.inner.poll(task_cx)
}

Check warning on line 18 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L13-L18

Added lines #L13 - L18 were not covered by tests
}

impl<T: Stream> Stream for WithContext<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_next(this.inner, task_cx)
}

Check warning on line 28 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L24-L28

Added lines #L24 - L28 were not covered by tests
}

pin_project! {
/// A future, stream, or sink that has an associated context.
#[derive(Clone, Debug)]
pub struct WithContext<T> {
#[pin]
inner: T,
otel_cx: Context,
}
}

impl<I, T: Sink<I>> Sink<I> for WithContext<T>
where
T: Sink<I>,
{
type Error = T::Error;

fn poll_ready(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_ready(this.inner, task_cx)
}

Check warning on line 54 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L47-L54

Added lines #L47 - L54 were not covered by tests

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::start_send(this.inner, item)
}

Check warning on line 60 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L56-L60

Added lines #L56 - L60 were not covered by tests

fn poll_flush(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_flush(this.inner, task_cx)
}

Check warning on line 69 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L62-L69

Added lines #L62 - L69 were not covered by tests

fn poll_close(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _enter = this.otel_cx.clone().attach();
T::poll_close(this.inner, task_cx)
}

Check warning on line 78 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L71-L78

Added lines #L71 - L78 were not covered by tests
}

/// Extension trait allowing futures, streams, and sinks to be traced with a span.
pub trait FutureExt: Sized {
/// Attaches the provided [`Context`] to this type, returning a `WithContext`
/// wrapper.
///
/// When the wrapped type is a future, stream, or sink, the attached context
/// will be set as current while it is being polled.
///
/// [`Context`]: Context
fn with_context(self, otel_cx: Context) -> WithContext<Self> {
WithContext {
inner: self,
otel_cx,
}
}

Check warning on line 95 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L90-L95

Added lines #L90 - L95 were not covered by tests

/// Attaches the current [`Context`] to this type, returning a `WithContext`
/// wrapper.
///
/// When the wrapped type is a future, stream, or sink, the attached context
/// will be set as the default while it is being polled.
///
/// [`Context`]: Context
fn with_current_context(self) -> WithContext<Self> {
let otel_cx = Context::current();
self.with_context(otel_cx)
}

Check warning on line 107 in opentelemetry/src/context/future_ext.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/context/future_ext.rs#L104-L107

Added lines #L104 - L107 were not covered by tests
}
6 changes: 6 additions & 0 deletions opentelemetry/src/context/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod context_store;
mod future_ext;

pub use context_store::Context;
pub use context_store::ContextGuard;
pub use future_ext::FutureExt;
116 changes: 4 additions & 112 deletions opentelemetry/src/trace/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,10 @@ use crate::{
trace::{Span, SpanContext, Status},
Context, ContextGuard, KeyValue,
};
use futures_core::stream::Stream;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::{
borrow::Cow,
error::Error,
pin::Pin,
sync::Mutex,
task::{Context as TaskContext, Poll},
};
use std::{borrow::Cow, error::Error, sync::Mutex};

// Re-export for compatability. This used to be contained here.
pub use crate::context::FutureExt;

const NOOP_SPAN: SynchronizedSpan = SynchronizedSpan {
span_context: SpanContext::NONE,
Expand Down Expand Up @@ -371,105 +365,3 @@ where
{
Context::map_current(|cx| f(cx.span()))
}

pin_project! {
/// A future, stream, or sink that has an associated context.
#[derive(Clone, Debug)]
pub struct WithContext<T> {
#[pin]
inner: T,
otel_cx: Context,
}
}

impl<T: Sized> FutureExt for T {}

impl<T: std::future::Future> std::future::Future for WithContext<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();

this.inner.poll(task_cx)
}
}

impl<T: Stream> Stream for WithContext<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, task_cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_next(this.inner, task_cx)
}
}

impl<I, T: Sink<I>> Sink<I> for WithContext<T>
where
T: Sink<I>,
{
type Error = T::Error;

fn poll_ready(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_ready(this.inner, task_cx)
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::start_send(this.inner, item)
}

fn poll_flush(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _guard = this.otel_cx.clone().attach();
T::poll_flush(this.inner, task_cx)
}

fn poll_close(
self: Pin<&mut Self>,
task_cx: &mut TaskContext<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let _enter = this.otel_cx.clone().attach();
T::poll_close(this.inner, task_cx)
}
}

/// Extension trait allowing futures, streams, and sinks to be traced with a span.
pub trait FutureExt: Sized {
/// Attaches the provided [`Context`] to this type, returning a `WithContext`
/// wrapper.
///
/// When the wrapped type is a future, stream, or sink, the attached context
/// will be set as current while it is being polled.
///
/// [`Context`]: crate::Context
fn with_context(self, otel_cx: Context) -> WithContext<Self> {
WithContext {
inner: self,
otel_cx,
}
}

/// Attaches the current [`Context`] to this type, returning a `WithContext`
/// wrapper.
///
/// When the wrapped type is a future, stream, or sink, the attached context
/// will be set as the default while it is being polled.
///
/// [`Context`]: crate::Context
fn with_current_context(self) -> WithContext<Self> {
let otel_cx = Context::current();
self.with_context(otel_cx)
}
}
4 changes: 1 addition & 3 deletions opentelemetry/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ mod tracer;
mod tracer_provider;

pub use self::{
context::{
get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt, WithContext,
},
context::{get_active_span, mark_span_as_active, FutureExt, SpanRef, TraceContextExt},
span::{Span, SpanKind, Status},
span_context::{SpanContext, TraceState},
tracer::{SamplingDecision, SamplingResult, SpanBuilder, Tracer},
Expand Down