From baf4bfd61eac214975424f64d74ce60340026645 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Wed, 5 Mar 2025 02:03:45 +0100 Subject: [PATCH] fix: Allow overlapping context scopes (#2378) Co-authored-by: Cijo Thomas --- opentelemetry/CHANGELOG.md | 1 + opentelemetry/src/context.rs | 280 +++++++++++++++++++++++++++++------ 2 files changed, 238 insertions(+), 43 deletions(-) diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 4247640d8b..6071e4211d 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -10,6 +10,7 @@ - *Breaking* Changed value type of `Baggage` from `Value` to `StringValue` - Updated `Baggage` constants to reflect latest standard (`MAX_KEY_VALUE_PAIRS` - 180 -> 64, `MAX_BYTES_FOR_ONE_PAIR` - removed) and increased insert performance see #[2284](https://github.com/open-telemetry/opentelemetry-rust/pull/2284). - *Breaking* Align `Baggage.remove()` signature with `.get()` to take the key as a reference +- Changed `Context` to use a stack to properly handle out of order dropping of `ContextGuard`. This imposes a limit of `65535` nested contexts on a single thread. See #[2378](https://github.com/open-telemetry/opentelemetry-rust/pull/2284) and #[1887](https://github.com/open-telemetry/opentelemetry-rust/issues/1887). - Added additional `name: Option<&str>` parameter to the `event_enabled` method on the `Logger` trait. This allows implementations (SDK, processor, exporters) diff --git a/opentelemetry/src/context.rs b/opentelemetry/src/context.rs index ecc7f7a6a5..3707bd3758 100644 --- a/opentelemetry/src/context.rs +++ b/opentelemetry/src/context.rs @@ -1,3 +1,4 @@ +use crate::otel_warn; #[cfg(feature = "trace")] use crate::trace::context::SynchronizedSpan; use std::any::{Any, TypeId}; @@ -9,7 +10,7 @@ use std::marker::PhantomData; use std::sync::Arc; thread_local! { - static CURRENT_CONTEXT: RefCell = RefCell::new(Context::default()); + static CURRENT_CONTEXT: RefCell = RefCell::new(ContextStack::default()); } /// An execution-scoped collection of values. @@ -78,9 +79,11 @@ thread_local! { pub struct Context { #[cfg(feature = "trace")] pub(super) span: Option>, - entries: HashMap, BuildHasherDefault>, + entries: Option>, } +type EntryMap = HashMap, BuildHasherDefault>; + impl Context { /// Creates an empty `Context`. /// @@ -110,7 +113,7 @@ impl Context { /// do_work() /// ``` pub fn current() -> Self { - Context::map_current(|cx| cx.clone()) + Self::map_current(|cx| cx.clone()) } /// Applies a function to the current context returning its value. @@ -122,7 +125,7 @@ impl Context { /// Note: This function will panic if you attempt to attach another context /// while the current one is still borrowed. pub fn map_current(f: impl FnOnce(&Context) -> T) -> T { - CURRENT_CONTEXT.with(|cx| f(&cx.borrow())) + CURRENT_CONTEXT.with(|cx| cx.borrow().map_current_cx(f)) } /// Returns a clone of the current thread's context with the given value. @@ -152,12 +155,7 @@ impl Context { /// assert_eq!(all_current_and_b.get::(), Some(&ValueB(42))); /// ``` pub fn current_with_value(value: T) -> Self { - let mut new_context = Context::current(); - new_context - .entries - .insert(TypeId::of::(), Arc::new(value)); - - new_context + Self::map_current(|cx| cx.with_value(value)) } /// Returns a reference to the entry for the corresponding value type. @@ -183,8 +181,9 @@ impl Context { /// ``` pub fn get(&self) -> Option<&T> { self.entries - .get(&TypeId::of::()) - .and_then(|rc| rc.downcast_ref()) + .as_ref()? + .get(&TypeId::of::())? + .downcast_ref() } /// Returns a copy of the context with the new value included. @@ -215,12 +214,20 @@ impl Context { /// assert_eq!(cx_with_a_and_b.get::(), Some(&ValueB(42))); /// ``` pub fn with_value(&self, value: T) -> Self { - let mut new_context = self.clone(); - new_context - .entries - .insert(TypeId::of::(), Arc::new(value)); - - new_context + let entries = if let Some(current_entries) = &self.entries { + let mut inner_entries = (**current_entries).clone(); + inner_entries.insert(TypeId::of::(), Arc::new(value)); + Some(Arc::new(inner_entries)) + } else { + let mut entries = EntryMap::default(); + entries.insert(TypeId::of::(), Arc::new(value)); + Some(Arc::new(entries)) + }; + Context { + entries, + #[cfg(feature = "trace")] + span: self.span.clone(), + } } /// Replaces the current context on this thread with this context. @@ -298,12 +305,10 @@ impl Context { /// assert_eq!(Context::current().get::(), None); /// ``` pub fn attach(self) -> ContextGuard { - let previous_cx = CURRENT_CONTEXT - .try_with(|current| current.replace(self)) - .ok(); + let cx_id = CURRENT_CONTEXT.with(|cx| cx.borrow_mut().push(self)); ContextGuard { - previous_cx, + cx_pos: cx_id, _marker: PhantomData, } } @@ -328,7 +333,7 @@ impl Context { impl fmt::Debug for Context { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut dbg = f.debug_struct("Context"); - let mut entries = self.entries.len(); + let mut entries = self.entries.as_ref().map_or(0, |e| e.len()); #[cfg(feature = "trace")] { if let Some(span) = &self.span { @@ -344,17 +349,19 @@ impl fmt::Debug for Context { } /// A guard that resets the current context to the prior context when dropped. -#[allow(missing_debug_implementations)] +#[derive(Debug)] pub struct ContextGuard { - previous_cx: Option, - // ensure this type is !Send as it relies on thread locals + // The position of the context in the stack. This is used to pop the context. + cx_pos: u16, + // Ensure this type is !Send as it relies on thread locals _marker: PhantomData<*const ()>, } impl Drop for ContextGuard { fn drop(&mut self) { - if let Some(previous_cx) = self.previous_cx.take() { - let _ = CURRENT_CONTEXT.try_with(|current| current.replace(previous_cx)); + let id = self.cx_pos; + if id > ContextStack::BASE_POS && id < ContextStack::MAX_POS { + CURRENT_CONTEXT.with(|context_stack| context_stack.borrow_mut().pop_id(id)); } } } @@ -381,17 +388,116 @@ impl Hasher for IdHasher { } } +/// A stack for keeping track of the [`Context`] instances that have been attached +/// to a thread. +/// +/// The stack allows for popping of contexts by position, which is used to do out +/// of order dropping of [`ContextGuard`] instances. Only when the top of the +/// stack is popped, the topmost [`Context`] is actually restored. +/// +/// The stack relies on the fact that it is thread local and that the +/// [`ContextGuard`] instances that are constructed using ids from it can't be +/// moved to other threads. That means that the ids are always valid and that +/// they are always within the bounds of the stack. +struct ContextStack { + /// This is the current [`Context`] that is active on this thread, and the top + /// of the [`ContextStack`]. It is always present, and if the `stack` is empty + /// it's an empty [`Context`]. + /// + /// Having this here allows for fast access to the current [`Context`]. + current_cx: Context, + /// A `stack` of the other contexts that have been attached to the thread. + stack: Vec>, + /// Ensure this type is !Send as it relies on thread locals + _marker: PhantomData<*const ()>, +} + +impl ContextStack { + const BASE_POS: u16 = 0; + const MAX_POS: u16 = u16::MAX; + const INITIAL_CAPACITY: usize = 8; + + #[inline(always)] + fn push(&mut self, cx: Context) -> u16 { + // The next id is the length of the `stack`, plus one since we have the + // top of the [`ContextStack`] as the `current_cx`. + let next_id = self.stack.len() + 1; + if next_id < ContextStack::MAX_POS.into() { + let current_cx = std::mem::replace(&mut self.current_cx, cx); + self.stack.push(Some(current_cx)); + next_id as u16 + } else { + // This is an overflow, log it and ignore it. + otel_warn!( + name: "Context.AttachFailed", + message = format!("Too many contexts. Max limit is {}. \ + Context::current() remains unchanged as this attach failed. \ + Dropping the returned ContextGuard will have no impact on Context::current().", + ContextStack::MAX_POS) + ); + ContextStack::MAX_POS + } + } + + #[inline(always)] + fn pop_id(&mut self, pos: u16) { + if pos == ContextStack::BASE_POS || pos == ContextStack::MAX_POS { + // The empty context is always at the bottom of the [`ContextStack`] + // and cannot be popped, and the overflow position is invalid, so do + // nothing. + return; + } + let len: u16 = self.stack.len() as u16; + // Are we at the top of the [`ContextStack`]? + if pos == len { + // Shrink the stack if possible to clear out any out of order pops. + while let Some(None) = self.stack.last() { + _ = self.stack.pop(); + } + // Restore the previous context. This will always happen since the + // empty context is always at the bottom of the stack if the + // [`ContextStack`] is not empty. + if let Some(Some(next_cx)) = self.stack.pop() { + self.current_cx = next_cx; + } + } else { + // This is an out of order pop. + if pos >= len { + // This is an invalid id, ignore it. + return; + } + // Clear out the entry at the given id. + _ = self.stack[pos as usize].take(); + } + } + + #[inline(always)] + fn map_current_cx(&self, f: impl FnOnce(&Context) -> T) -> T { + f(&self.current_cx) + } +} + +impl Default for ContextStack { + fn default() -> Self { + ContextStack { + current_cx: Context::default(), + stack: Vec::with_capacity(ContextStack::INITIAL_CAPACITY), + _marker: PhantomData, + } + } +} + #[cfg(test)] mod tests { use super::*; + #[derive(Debug, PartialEq)] + struct ValueA(u64); + #[derive(Debug, PartialEq)] + struct ValueB(u64); + #[test] fn context_immutable() { - #[derive(Debug, PartialEq)] - struct ValueA(u64); - #[derive(Debug, PartialEq)] - struct ValueB(u64); - // start with Current, which should be an empty context let cx = Context::current(); assert_eq!(cx.get::(), None); @@ -424,26 +530,22 @@ mod tests { #[test] fn nested_contexts() { - #[derive(Debug, PartialEq)] - struct ValueA(&'static str); - #[derive(Debug, PartialEq)] - struct ValueB(u64); - let _outer_guard = Context::new().with_value(ValueA("a")).attach(); + let _outer_guard = Context::new().with_value(ValueA(1)).attach(); // Only value `a` is set let current = Context::current(); - assert_eq!(current.get(), Some(&ValueA("a"))); + assert_eq!(current.get(), Some(&ValueA(1))); assert_eq!(current.get::(), None); { let _inner_guard = Context::current_with_value(ValueB(42)).attach(); // Both values are set in inner context let current = Context::current(); - assert_eq!(current.get(), Some(&ValueA("a"))); + assert_eq!(current.get(), Some(&ValueA(1))); assert_eq!(current.get(), Some(&ValueB(42))); assert!(Context::map_current(|cx| { - assert_eq!(cx.get(), Some(&ValueA("a"))); + assert_eq!(cx.get(), Some(&ValueA(1))); assert_eq!(cx.get(), Some(&ValueB(42))); true })); @@ -451,13 +553,105 @@ mod tests { // Resets to only value `a` when inner guard is dropped let current = Context::current(); - assert_eq!(current.get(), Some(&ValueA("a"))); + assert_eq!(current.get(), Some(&ValueA(1))); assert_eq!(current.get::(), None); assert!(Context::map_current(|cx| { - assert_eq!(cx.get(), Some(&ValueA("a"))); + assert_eq!(cx.get(), Some(&ValueA(1))); assert_eq!(cx.get::(), None); true })); } + + #[test] + fn overlapping_contexts() { + let outer_guard = Context::new().with_value(ValueA(1)).attach(); + + // Only value `a` is set + let current = Context::current(); + assert_eq!(current.get(), Some(&ValueA(1))); + assert_eq!(current.get::(), None); + + let inner_guard = Context::current_with_value(ValueB(42)).attach(); + // Both values are set in inner context + let current = Context::current(); + assert_eq!(current.get(), Some(&ValueA(1))); + assert_eq!(current.get(), Some(&ValueB(42))); + + assert!(Context::map_current(|cx| { + assert_eq!(cx.get(), Some(&ValueA(1))); + assert_eq!(cx.get(), Some(&ValueB(42))); + true + })); + + drop(outer_guard); + + // `inner_guard` is still alive so both `ValueA` and `ValueB` should still be accessible + let current = Context::current(); + assert_eq!(current.get(), Some(&ValueA(1))); + assert_eq!(current.get(), Some(&ValueB(42))); + + drop(inner_guard); + + // Both guards are dropped and neither value should be accessible. + let current = Context::current(); + assert_eq!(current.get::(), None); + assert_eq!(current.get::(), None); + } + + #[test] + fn too_many_contexts() { + let mut guards: Vec = Vec::with_capacity(ContextStack::MAX_POS as usize); + let stack_max_pos = ContextStack::MAX_POS as u64; + // Fill the stack up until the last position + for i in 1..stack_max_pos { + let cx_guard = Context::current().with_value(ValueB(i)).attach(); + assert_eq!(Context::current().get(), Some(&ValueB(i))); + assert_eq!(cx_guard.cx_pos, i as u16); + guards.push(cx_guard); + } + // Let's overflow the stack a couple of times + for _ in 0..16 { + let cx_guard = Context::current().with_value(ValueA(1)).attach(); + assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS); + assert_eq!(Context::current().get::(), None); + assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 1))); + guards.push(cx_guard); + } + // Drop the overflow contexts + for _ in 0..16 { + guards.pop(); + assert_eq!(Context::current().get::(), None); + assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 1))); + } + // Drop one more so we can add a new one + guards.pop(); + assert_eq!(Context::current().get::(), None); + assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2))); + // Push a new context and see that it works + let cx_guard = Context::current().with_value(ValueA(2)).attach(); + assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS - 1); + assert_eq!(Context::current().get(), Some(&ValueA(2))); + assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2))); + guards.push(cx_guard); + // Let's overflow the stack a couple of times again + for _ in 0..16 { + let cx_guard = Context::current().with_value(ValueA(1)).attach(); + assert_eq!(cx_guard.cx_pos, ContextStack::MAX_POS); + assert_eq!(Context::current().get(), Some(&ValueA(2))); + assert_eq!(Context::current().get(), Some(&ValueB(stack_max_pos - 2))); + guards.push(cx_guard); + } + } + + #[test] + fn context_stack_pop_id() { + // This is to get full line coverage of the `pop_id` function. + // In real life the `Drop`` implementation of `ContextGuard` ensures that + // the ids are valid and inside the bounds. + let mut stack = ContextStack::default(); + stack.pop_id(ContextStack::BASE_POS); + stack.pop_id(ContextStack::MAX_POS); + stack.pop_id(4711); + } }