Skip to content

Commit f89c3ea

Browse files
committed
fix atomic
1 parent 90bbb2d commit f89c3ea

File tree

2 files changed

+38
-30
lines changed

2 files changed

+38
-30
lines changed

opentelemetry-sdk/src/metrics/internal/sum.rs

+37-29
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ impl<T: Number<T>> ValueMap<T> {
6767
// Use the 8 least significant bits directly, avoiding the modulus operation.
6868
hasher.finish() as u8 as usize
6969
}
70+
71+
fn try_increment(&self) -> bool {
72+
let current = self.total_unique_entries.load(Ordering::Acquire);
73+
if is_under_cardinality_limit(current) {
74+
// Attempt to increment atomically
75+
self.total_unique_entries
76+
.compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
77+
.is_ok()
78+
} else {
79+
false // Limit reached, do not increment
80+
}
81+
}
7082
}
7183

7284
impl<T: Number<T>> ValueMap<T> {
@@ -78,40 +90,36 @@ impl<T: Number<T>> ValueMap<T> {
7890
return;
7991
}
8092

93+
// Determine the bucket index for the attributes
8194
let bucket_index = Self::hash_to_bucket(&attrs);
82-
let (is_new_entry, should_use_overflow) = {
83-
let bucket_mutex = &self.buckets[bucket_index];
84-
let bucket_guard = bucket_mutex.lock().unwrap();
85-
86-
let is_new_entry = if let Some(bucket) = &*bucket_guard {
87-
!bucket.contains_key(&attrs)
88-
} else {
89-
true
90-
};
91-
92-
let should_use_overflow: bool = is_new_entry
93-
&& !is_under_cardinality_limit(self.total_unique_entries.load(Ordering::Relaxed));
94-
95-
(is_new_entry, should_use_overflow)
96-
};
97-
if is_new_entry && !should_use_overflow {
98-
self.total_unique_entries.fetch_add(1, Ordering::Relaxed);
95+
let mut bucket_guard = self.buckets[bucket_index].lock().unwrap();
96+
if let Some(bucket) = &mut *bucket_guard {
97+
// if the attribute is present in bucket, lets update the value and return
98+
// (we need to be fast here, as this is most common use-case)
99+
if let Some(value) = bucket.get_mut(&attrs) {
100+
*value += measurement;
101+
return;
102+
}
99103
}
100-
let final_bucket_index = if should_use_overflow {
101-
OVERFLOW_BUCKET_INDEX
102-
} else {
103-
bucket_index
104-
};
105-
let bucket_mutex = &self.buckets[final_bucket_index];
106-
let mut bucket_guard = bucket_mutex.lock().unwrap();
107-
let bucket = bucket_guard.get_or_insert_with(HashMap::new);
108-
let entry_key = if should_use_overflow {
109-
STREAM_OVERFLOW_ATTRIBUTE_SET.clone()
104+
105+
// if the attribute is not present in bucket, lets unlock the bucket
106+
drop(bucket_guard);
107+
// Attempt to first increment the total unique entries if under limit.
108+
let under_limit = self.try_increment();
109+
// Determine the bucket index for the attributes
110+
let (bucket_index, attrs) = if under_limit {
111+
(bucket_index, attrs) // the index remains same
110112
} else {
111-
attrs
113+
// TBD - Should we log, as this can flood the logs ?
114+
(OVERFLOW_BUCKET_INDEX, STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
112115
};
116+
117+
// Lock and update the relevant bucket
118+
let mut final_bucket_guard = self.buckets[bucket_index].lock().unwrap();
119+
let bucket = final_bucket_guard.get_or_insert_with(HashMap::default);
120+
113121
bucket
114-
.entry(entry_key)
122+
.entry(attrs)
115123
.and_modify(|e| *e += measurement)
116124
.or_insert(measurement);
117125
}

opentelemetry-sdk/src/metrics/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ mod tests {
263263
std::thread::sleep(std::time::Duration::from_millis(5));
264264

265265
let unique_measurements = 1999;
266-
let overflow_measurements = 4;
266+
let overflow_measurements = 1000;
267267
let total_measurements = unique_measurements + overflow_measurements;
268268

269269
let counter = std::sync::Arc::new(std::sync::Mutex::new(counter)); // Shared counter among threads

0 commit comments

Comments
 (0)