diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index c0d4263946..b61a18c710 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -671,7 +671,7 @@ mod tests { for v in test.values { h.measure(v, &[]); } - let dp = h.value_map.no_attribute_tracker.lock().unwrap(); + let dp = h.value_map.no_attribs.tracker.lock().unwrap(); assert_eq!(test.expected.max, dp.max); assert_eq!(test.expected.min, dp.min); @@ -720,7 +720,7 @@ mod tests { for v in test.values { h.measure(v, &[]); } - let dp = h.value_map.no_attribute_tracker.lock().unwrap(); + let dp = h.value_map.no_attribs.tracker.lock().unwrap(); assert_eq!(test.expected.max, dp.max); assert_eq!(test.expected.min, dp.min); diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 4eaea7972c..82886be5f1 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -6,11 +6,12 @@ mod precomputed_sum; mod sum; use core::fmt; -use std::collections::{HashMap, HashSet}; -use std::mem::take; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::mem::swap; use std::ops::{Add, AddAssign, DerefMut, Sub}; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; @@ -41,6 +42,11 @@ pub(crate) trait Aggregator { fn clone_and_reset(&self, init: &Self::InitConfig) -> Self; } +struct NoAttribs { + tracker: A, + is_set: AtomicBool, +} + /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how @@ -49,16 +55,17 @@ pub(crate) struct ValueMap where A: Aggregator, { - /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, - /// Number of different attribute set stored in the `trackers` map. - count: AtomicUsize, - /// Indicates whether a value with no attributes has been stored. - has_no_attribute_value: AtomicBool, - /// Tracker for values with no attributes attached. - no_attribute_tracker: A, + // for performance reasons, no_attribs tracker + no_attribs: NoAttribs, + // for performance reasons, to handle attributes in the provided order + all_attribs: RwLock, Arc>>, + // different order of attribute keys should still map to same tracker instance + // this helps to achieve that and also enables implementing collection efficiently + sorted_attribs: Mutex, Arc>>, /// Configuration for an Aggregator config: A::InitConfig, + /// Swap with `sorted_attribs` on every `collect_and_reset`. + for_collect_after_reset: Mutex, Arc>>, } impl ValueMap @@ -67,70 +74,72 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: A::create(&config), - count: AtomicUsize::new(0), + no_attribs: NoAttribs { + tracker: A::create(&config), + is_set: AtomicBool::new(false), + }, + all_attribs: RwLock::new(Default::default()), + sorted_attribs: Mutex::new(Default::default()), config, + for_collect_after_reset: Mutex::new(Default::default()), } } fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) { if attributes.is_empty() { - self.no_attribute_tracker.update(value); - self.has_no_attribute_value.store(true, Ordering::Release); + self.no_attribs.tracker.update(value); + self.no_attribs.is_set.store(true, Ordering::Release); return; } - let Ok(trackers) = self.trackers.read() else { - return; - }; - // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { - tracker.update(value); - return; - } + match self.all_attribs.read() { + Ok(trackers) => { + if let Some(tracker) = trackers.get(attributes) { + tracker.update(value); + return; + } + } + Err(_) => return, + }; - // Try to retrieve and update the tracker with the attributes sorted. + // Get or create a tracker let sorted_attrs = sort_and_dedup(attributes); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - tracker.update(value); + let Ok(mut sorted_trackers) = self.sorted_attribs.lock() else { return; - } + }; + + let sorted_count = sorted_trackers.len(); + let new_tracker = match sorted_trackers.entry(sorted_attrs) { + Entry::Occupied(occupied_entry) => { + // do not return early, because collection phase might clear `all_trackers` multiple times + occupied_entry.get().clone() + } + Entry::Vacant(vacant_entry) => { + if !is_under_cardinality_limit(sorted_count) { + sorted_trackers.entry(STREAM_OVERFLOW_ATTRIBUTES.clone()) + .or_insert_with(|| { + otel_warn!( name: "ValueMap.measure", + message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged." + ); + Arc::new(A::create(&self.config)) + }) + .update(value); + return; + } + let new_tracker = Arc::new(A::create(&self.config)); + vacant_entry.insert(new_tracker).clone() + } + }; + drop(sorted_trackers); - // Give up the read lock before acquiring the write lock. - drop(trackers); + new_tracker.update(value); - let Ok(mut trackers) = self.trackers.write() else { + // Insert new tracker, so we could find it next time + let Ok(mut all_trackers) = self.all_attribs.write() else { return; }; - - // Recheck both the provided and sorted orders after acquiring the write lock - // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { - tracker.update(value); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - tracker.update(value); - } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(A::create(&self.config)); - new_tracker.update(value); - - // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); - trackers.insert(sorted_attrs, new_tracker); - - self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - overflow_value.update(value); - } else { - let new_tracker = A::create(&self.config); - new_tracker.update(value); - trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); - otel_warn!( name: "ValueMap.measure", - message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged." - ); - } + all_trackers.insert(attributes.to_vec(), new_tracker); } /// Iterate through all attribute sets and populate `DataPoints` in readonly mode. @@ -139,20 +148,23 @@ where where MapFn: FnMut(Vec, &A) -> Res, { - prepare_data(dest, self.count.load(Ordering::SeqCst)); - if self.has_no_attribute_value.load(Ordering::Acquire) { - dest.push(map_fn(vec![], &self.no_attribute_tracker)); - } - - let Ok(trackers) = self.trackers.read() else { - return; + let trackers = match self.sorted_attribs.lock() { + Ok(trackers) => { + // it's important to release lock as fast as possible, + // so we don't block insertion of new attribute sets + trackers.clone() + } + Err(_) => return, }; - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - dest.push(map_fn(attrs.clone(), tracker)); - } + prepare_data(dest, trackers.len()); + + if self.no_attribs.is_set.load(Ordering::Acquire) { + dest.push(map_fn(vec![], &self.no_attribs.tracker)); + } + + for (attrs, tracker) in trackers.into_iter() { + dest.push(map_fn(attrs, &tracker)); } } @@ -162,27 +174,55 @@ where where MapFn: FnMut(Vec, A) -> Res, { - prepare_data(dest, self.count.load(Ordering::SeqCst)); - if self.has_no_attribute_value.swap(false, Ordering::AcqRel) { + let mut to_collect = self + .for_collect_after_reset + .lock() + .unwrap_or_else(|err| err.into_inner()); + // reset sorted trackers so new attributes set will be written into new hashmap + match self.sorted_attribs.lock() { + Ok(mut trackers) => { + swap(trackers.deref_mut(), to_collect.deref_mut()); + } + Err(_) => return, + }; + // reset all trackers, so all attribute sets will start using new hashmap + match self.all_attribs.write() { + Ok(mut all_trackers) => all_trackers.clear(), + Err(_) => return, + }; + + prepare_data(dest, to_collect.len()); + + if self.no_attribs.is_set.swap(false, Ordering::AcqRel) { dest.push(map_fn( vec![], - self.no_attribute_tracker.clone_and_reset(&self.config), + self.no_attribs.tracker.clone_and_reset(&self.config), )); } - let trackers = match self.trackers.write() { - Ok(mut trackers) => { - self.count.store(0, Ordering::SeqCst); - take(trackers.deref_mut()) - } - Err(_) => todo!(), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.into_iter() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); - } + for (attrs, mut tracker) in to_collect.drain() { + // Handles special case: + // measure-thread: get inserted tracker from `sorted_attribs` (holds tracker) + // collect-thread: replace sorted_attribs (clears sorted_attribs) + // collect-thread: clear all_attribs + // collect_thread: THIS-LOOP: loop until measure-thread still holds a tracker + // measure-thread: insert tracker into `all_attribs`` + // collect_thread: exits this loop after clearing trackers + let tracker = loop { + match Arc::try_unwrap(tracker) { + Ok(inner) => { + break inner; + } + Err(reinserted) => { + tracker = reinserted; + match self.all_attribs.write() { + Ok(mut all_trackers) => all_trackers.clear(), + Err(_) => return, + }; + } + }; + }; + dest.push(map_fn(attrs, tracker)); } } } @@ -190,7 +230,7 @@ where /// Clear and allocate exactly required amount of space for all attribute-sets fn prepare_data(data: &mut Vec, list_len: usize) { data.clear(); - let total_len = list_len + 2; // to account for no_attributes case + overflow state + let total_len = list_len + 1; // to account for no_attributes case if total_len > data.capacity() { data.reserve_exact(total_len - data.capacity()); }