Skip to content

Commit 267e305

Browse files
committed
fix for overflow metrics
1 parent 61c8b0d commit 267e305

File tree

1 file changed

+49
-39
lines changed
  • opentelemetry-sdk/src/metrics/internal

1 file changed

+49
-39
lines changed

opentelemetry-sdk/src/metrics/internal/sum.rs

+49-39
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::atomic::{AtomicBool, Ordering};
1+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22
use std::{
33
sync::{Arc, Mutex},
44
time::SystemTime,
@@ -12,27 +12,26 @@ use std::hash::{Hash, Hasher};
1212
#[cfg(feature = "use_hashbrown")]
1313
use ahash::AHasher;
1414
#[cfg(feature = "use_hashbrown")]
15-
use hashbrown::{hash_map::Entry, HashMap};
15+
use hashbrown::HashMap;
1616

1717
#[cfg(not(feature = "use_hashbrown"))]
18-
use std::collections::{
19-
hash_map::{DefaultHasher, Entry},
20-
HashMap,
21-
};
18+
use std::collections::{hash_map::DefaultHasher, HashMap};
2219

2320
use super::{
2421
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
2522
AtomicTracker, Number,
2623
};
2724

2825
const BUCKET_COUNT: usize = 256;
26+
const OVERFLOW_BUCKET_INDEX: usize = BUCKET_COUNT - 1; // Use the last bucket as overflow bucket
2927
type BucketValue<T> = Mutex<Option<HashMap<AttributeSet, T>>>;
3028
type Buckets<T> = Arc<[BucketValue<T>; BUCKET_COUNT]>;
3129
/// The storage for sums.
3230
struct ValueMap<T: Number<T>> {
3331
buckets: Buckets<T>,
3432
has_no_value_attribute_value: AtomicBool,
3533
no_attribute_value: T::AtomicTracker,
34+
total_unique_entries: AtomicUsize,
3635
}
3736

3837
impl<T: Number<T>> Default for ValueMap<T> {
@@ -53,19 +52,20 @@ impl<T: Number<T>> ValueMap<T> {
5352
buckets: Arc::new(buckets),
5453
has_no_value_attribute_value: AtomicBool::new(false),
5554
no_attribute_value: T::new_atomic_tracker(),
55+
total_unique_entries: AtomicUsize::new(0),
5656
}
5757
}
5858

5959
// Hash function to determine the bucket
60-
fn hash_to_bucket(key: &AttributeSet) -> u8 {
60+
fn hash_to_bucket(key: &AttributeSet) -> usize {
6161
#[cfg(not(feature = "use_hashbrown"))]
6262
let mut hasher = DefaultHasher::new();
6363
#[cfg(feature = "use_hashbrown")]
6464
let mut hasher = AHasher::default();
6565

6666
key.hash(&mut hasher);
6767
// Use the 8 least significant bits directly, avoiding the modulus operation.
68-
hasher.finish() as u8
68+
hasher.finish() as u8 as usize
6969
}
7070

7171
// Calculate the total length of data points across all buckets.
@@ -90,43 +90,47 @@ impl<T: Number<T>> ValueMap<T> {
9090
self.no_attribute_value.add(measurement);
9191
self.has_no_value_attribute_value
9292
.store(true, Ordering::Release);
93-
} else {
94-
let bucket_index = Self::hash_to_bucket(&attrs) as usize;
93+
return;
94+
}
95+
96+
let bucket_index = Self::hash_to_bucket(&attrs);
97+
let (is_new_entry, should_use_overflow) = {
9598
let bucket_mutex = &self.buckets[bucket_index];
99+
let bucket_guard = bucket_mutex.lock().unwrap();
96100

97-
let mut bucket_guard = match bucket_mutex.lock() {
98-
Ok(guard) => guard,
99-
Err(e) => {
100-
eprintln!("Failed to acquire lock due to: {:?}", e);
101-
return; // TBD - retry ?
102-
}
101+
let is_new_entry = if let Some(bucket) = &*bucket_guard {
102+
!bucket.contains_key(&attrs)
103+
} else {
104+
true
103105
};
104106

105-
if bucket_guard.is_none() {
106-
*bucket_guard = Some(HashMap::new());
107-
}
107+
let should_use_overflow = is_new_entry
108+
&& !is_under_cardinality_limit(
109+
self.total_unique_entries.load(Ordering::Relaxed) + 1,
110+
);
108111

109-
if let Some(ref mut values) = *bucket_guard {
110-
let size = values.len();
111-
match values.entry(attrs) {
112-
Entry::Occupied(mut occupied_entry) => {
113-
let sum = occupied_entry.get_mut();
114-
*sum += measurement;
115-
}
116-
Entry::Vacant(vacant_entry) => {
117-
if is_under_cardinality_limit(size) {
118-
vacant_entry.insert(measurement);
119-
} else {
120-
values
121-
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
122-
.and_modify(|val| *val += measurement)
123-
.or_insert(measurement);
124-
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
125-
}
126-
}
127-
}
128-
}
112+
(is_new_entry, should_use_overflow)
113+
};
114+
if is_new_entry && !should_use_overflow {
115+
self.total_unique_entries.fetch_add(1, Ordering::Relaxed);
129116
}
117+
let final_bucket_index = if should_use_overflow {
118+
OVERFLOW_BUCKET_INDEX
119+
} else {
120+
bucket_index
121+
};
122+
let bucket_mutex = &self.buckets[final_bucket_index];
123+
let mut bucket_guard = bucket_mutex.lock().unwrap();
124+
let bucket = bucket_guard.get_or_insert_with(HashMap::new);
125+
let entry_key = if should_use_overflow {
126+
STREAM_OVERFLOW_ATTRIBUTE_SET.clone()
127+
} else {
128+
attrs
129+
};
130+
bucket
131+
.entry(entry_key)
132+
.and_modify(|e| *e += measurement)
133+
.or_insert(measurement);
130134
}
131135
}
132136

@@ -215,6 +219,9 @@ impl<T: Number<T>> Sum<T> {
215219
value,
216220
exemplars: vec![],
217221
});
222+
self.value_map
223+
.total_unique_entries
224+
.fetch_sub(1, Ordering::Relaxed);
218225
}
219226
}
220227
}
@@ -406,6 +413,9 @@ impl<T: Number<T>> PrecomputedSum<T> {
406413
value: delta,
407414
exemplars: vec![],
408415
});
416+
self.value_map
417+
.total_unique_entries
418+
.fetch_sub(1, Ordering::Relaxed);
409419
}
410420
}
411421
}

0 commit comments

Comments
 (0)