diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
index c0d4263946..b61a18c710 100644
--- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
+++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
@@ -671,7 +671,7 @@ mod tests {
for v in test.values {
h.measure(v, &[]);
}
- let dp = h.value_map.no_attribute_tracker.lock().unwrap();
+ let dp = h.value_map.no_attribs.tracker.lock().unwrap();
assert_eq!(test.expected.max, dp.max);
assert_eq!(test.expected.min, dp.min);
@@ -720,7 +720,7 @@ mod tests {
for v in test.values {
h.measure(v, &[]);
}
- let dp = h.value_map.no_attribute_tracker.lock().unwrap();
+ let dp = h.value_map.no_attribs.tracker.lock().unwrap();
assert_eq!(test.expected.max, dp.max);
assert_eq!(test.expected.min, dp.min);
diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs
index 4eaea7972c..82886be5f1 100644
--- a/opentelemetry-sdk/src/metrics/internal/mod.rs
+++ b/opentelemetry-sdk/src/metrics/internal/mod.rs
@@ -6,11 +6,12 @@ mod precomputed_sum;
mod sum;
use core::fmt;
-use std::collections::{HashMap, HashSet};
-use std::mem::take;
+use std::collections::hash_map::Entry;
+use std::collections::HashMap;
+use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
-use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
-use std::sync::{Arc, RwLock};
+use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
+use std::sync::{Arc, Mutex, RwLock};
use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
@@ -41,6 +42,11 @@ pub(crate) trait Aggregator {
fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
}
+struct NoAttribs {
+ tracker: A,
+ is_set: AtomicBool,
+}
+
/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
@@ -49,16 +55,17 @@ pub(crate) struct ValueMap
where
A: Aggregator,
{
- /// Trackers store the values associated with different attribute sets.
- trackers: RwLock, Arc>>,
- /// Number of different attribute set stored in the `trackers` map.
- count: AtomicUsize,
- /// Indicates whether a value with no attributes has been stored.
- has_no_attribute_value: AtomicBool,
- /// Tracker for values with no attributes attached.
- no_attribute_tracker: A,
+ // for performance reasons, no_attribs tracker
+ no_attribs: NoAttribs,
+ // for performance reasons, to handle attributes in the provided order
+ all_attribs: RwLock, Arc>>,
+ // different order of attribute keys should still map to same tracker instance
+ // this helps to achieve that and also enables implementing collection efficiently
+ sorted_attribs: Mutex, Arc>>,
/// Configuration for an Aggregator
config: A::InitConfig,
+ /// Swap with `sorted_attribs` on every `collect_and_reset`.
+ for_collect_after_reset: Mutex, Arc>>,
}
impl ValueMap
@@ -67,70 +74,72 @@ where
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
- trackers: RwLock::new(HashMap::new()),
- has_no_attribute_value: AtomicBool::new(false),
- no_attribute_tracker: A::create(&config),
- count: AtomicUsize::new(0),
+ no_attribs: NoAttribs {
+ tracker: A::create(&config),
+ is_set: AtomicBool::new(false),
+ },
+ all_attribs: RwLock::new(Default::default()),
+ sorted_attribs: Mutex::new(Default::default()),
config,
+ for_collect_after_reset: Mutex::new(Default::default()),
}
}
fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
if attributes.is_empty() {
- self.no_attribute_tracker.update(value);
- self.has_no_attribute_value.store(true, Ordering::Release);
+ self.no_attribs.tracker.update(value);
+ self.no_attribs.is_set.store(true, Ordering::Release);
return;
}
- let Ok(trackers) = self.trackers.read() else {
- return;
- };
-
// Try to retrieve and update the tracker with the attributes in the provided order first
- if let Some(tracker) = trackers.get(attributes) {
- tracker.update(value);
- return;
- }
+ match self.all_attribs.read() {
+ Ok(trackers) => {
+ if let Some(tracker) = trackers.get(attributes) {
+ tracker.update(value);
+ return;
+ }
+ }
+ Err(_) => return,
+ };
- // Try to retrieve and update the tracker with the attributes sorted.
+ // Get or create a tracker
let sorted_attrs = sort_and_dedup(attributes);
- if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
- tracker.update(value);
+ let Ok(mut sorted_trackers) = self.sorted_attribs.lock() else {
return;
- }
+ };
+
+ let sorted_count = sorted_trackers.len();
+ let new_tracker = match sorted_trackers.entry(sorted_attrs) {
+ Entry::Occupied(occupied_entry) => {
+ // do not return early, because collection phase might clear `all_trackers` multiple times
+ occupied_entry.get().clone()
+ }
+ Entry::Vacant(vacant_entry) => {
+ if !is_under_cardinality_limit(sorted_count) {
+ sorted_trackers.entry(STREAM_OVERFLOW_ATTRIBUTES.clone())
+ .or_insert_with(|| {
+ otel_warn!( name: "ValueMap.measure",
+ message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
+ );
+ Arc::new(A::create(&self.config))
+ })
+ .update(value);
+ return;
+ }
+ let new_tracker = Arc::new(A::create(&self.config));
+ vacant_entry.insert(new_tracker).clone()
+ }
+ };
+ drop(sorted_trackers);
- // Give up the read lock before acquiring the write lock.
- drop(trackers);
+ new_tracker.update(value);
- let Ok(mut trackers) = self.trackers.write() else {
+ // Insert new tracker, so we could find it next time
+ let Ok(mut all_trackers) = self.all_attribs.write() else {
return;
};
-
- // Recheck both the provided and sorted orders after acquiring the write lock
- // in case another thread has pushed an update in the meantime.
- if let Some(tracker) = trackers.get(attributes) {
- tracker.update(value);
- } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
- tracker.update(value);
- } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
- let new_tracker = Arc::new(A::create(&self.config));
- new_tracker.update(value);
-
- // Insert tracker with the attributes in the provided and sorted orders
- trackers.insert(attributes.to_vec(), new_tracker.clone());
- trackers.insert(sorted_attrs, new_tracker);
-
- self.count.fetch_add(1, Ordering::SeqCst);
- } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
- overflow_value.update(value);
- } else {
- let new_tracker = A::create(&self.config);
- new_tracker.update(value);
- trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
- otel_warn!( name: "ValueMap.measure",
- message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
- );
- }
+ all_trackers.insert(attributes.to_vec(), new_tracker);
}
/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
@@ -139,20 +148,23 @@ where
where
MapFn: FnMut(Vec, &A) -> Res,
{
- prepare_data(dest, self.count.load(Ordering::SeqCst));
- if self.has_no_attribute_value.load(Ordering::Acquire) {
- dest.push(map_fn(vec![], &self.no_attribute_tracker));
- }
-
- let Ok(trackers) = self.trackers.read() else {
- return;
+ let trackers = match self.sorted_attribs.lock() {
+ Ok(trackers) => {
+ // it's important to release lock as fast as possible,
+ // so we don't block insertion of new attribute sets
+ trackers.clone()
+ }
+ Err(_) => return,
};
- let mut seen = HashSet::new();
- for (attrs, tracker) in trackers.iter() {
- if seen.insert(Arc::as_ptr(tracker)) {
- dest.push(map_fn(attrs.clone(), tracker));
- }
+ prepare_data(dest, trackers.len());
+
+ if self.no_attribs.is_set.load(Ordering::Acquire) {
+ dest.push(map_fn(vec![], &self.no_attribs.tracker));
+ }
+
+ for (attrs, tracker) in trackers.into_iter() {
+ dest.push(map_fn(attrs, &tracker));
}
}
@@ -162,27 +174,55 @@ where
where
MapFn: FnMut(Vec, A) -> Res,
{
- prepare_data(dest, self.count.load(Ordering::SeqCst));
- if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
+ let mut to_collect = self
+ .for_collect_after_reset
+ .lock()
+ .unwrap_or_else(|err| err.into_inner());
+ // reset sorted trackers so new attributes set will be written into new hashmap
+ match self.sorted_attribs.lock() {
+ Ok(mut trackers) => {
+ swap(trackers.deref_mut(), to_collect.deref_mut());
+ }
+ Err(_) => return,
+ };
+ // reset all trackers, so all attribute sets will start using new hashmap
+ match self.all_attribs.write() {
+ Ok(mut all_trackers) => all_trackers.clear(),
+ Err(_) => return,
+ };
+
+ prepare_data(dest, to_collect.len());
+
+ if self.no_attribs.is_set.swap(false, Ordering::AcqRel) {
dest.push(map_fn(
vec![],
- self.no_attribute_tracker.clone_and_reset(&self.config),
+ self.no_attribs.tracker.clone_and_reset(&self.config),
));
}
- let trackers = match self.trackers.write() {
- Ok(mut trackers) => {
- self.count.store(0, Ordering::SeqCst);
- take(trackers.deref_mut())
- }
- Err(_) => todo!(),
- };
-
- let mut seen = HashSet::new();
- for (attrs, tracker) in trackers.into_iter() {
- if seen.insert(Arc::as_ptr(&tracker)) {
- dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
- }
+ for (attrs, mut tracker) in to_collect.drain() {
+ // Handles special case:
+ // measure-thread: get inserted tracker from `sorted_attribs` (holds tracker)
+ // collect-thread: replace sorted_attribs (clears sorted_attribs)
+ // collect-thread: clear all_attribs
+ // collect_thread: THIS-LOOP: loop until measure-thread still holds a tracker
+ // measure-thread: insert tracker into `all_attribs``
+ // collect_thread: exits this loop after clearing trackers
+ let tracker = loop {
+ match Arc::try_unwrap(tracker) {
+ Ok(inner) => {
+ break inner;
+ }
+ Err(reinserted) => {
+ tracker = reinserted;
+ match self.all_attribs.write() {
+ Ok(mut all_trackers) => all_trackers.clear(),
+ Err(_) => return,
+ };
+ }
+ };
+ };
+ dest.push(map_fn(attrs, tracker));
}
}
}
@@ -190,7 +230,7 @@ where
/// Clear and allocate exactly required amount of space for all attribute-sets
fn prepare_data(data: &mut Vec, list_len: usize) {
data.clear();
- let total_len = list_len + 2; // to account for no_attributes case + overflow state
+ let total_len = list_len + 1; // to account for no_attributes case
if total_len > data.capacity() {
data.reserve_exact(total_len - data.capacity());
}