Skip to content

Commit fd858f6

Browse files
committed
use static vector at first level hash
1 parent 75c853e commit fd858f6

File tree

1 file changed

+71
-62
lines changed
  • opentelemetry-sdk/src/metrics/internal

1 file changed

+71
-62
lines changed

opentelemetry-sdk/src/metrics/internal/sum.rs

+71-62
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use super::{
1818

1919
/// The storage for sums.
2020
struct ValueMap<T: Number<T>> {
21-
buckets: Arc<HashMap<u8, Mutex<HashMap<AttributeSet, T>>>>,
21+
buckets: Arc<[Mutex<Option<HashMap<AttributeSet, T>>>; 256]>,
2222
has_no_value_attribute_value: AtomicBool,
2323
no_attribute_value: T::AtomicTracker,
2424
total_count: AtomicUsize,
@@ -32,12 +32,14 @@ impl<T: Number<T>> Default for ValueMap<T> {
3232

3333
impl<T: Number<T>> ValueMap<T> {
3434
fn new() -> Self {
35-
let mut outer_map = HashMap::new();
36-
for i in 0..=255 {
37-
outer_map.insert(i, Mutex::new(HashMap::new()));
38-
}
35+
let buckets = std::iter::repeat_with(|| Mutex::new(None))
36+
.take(256)
37+
.collect::<Vec<_>>()
38+
.try_into()
39+
.unwrap_or_else(|_| panic!("Incorrect length"));
40+
3941
ValueMap {
40-
buckets: Arc::new(outer_map),
42+
buckets: Arc::new(buckets),
4143
has_no_value_attribute_value: AtomicBool::new(false),
4244
no_attribute_value: T::new_atomic_tracker(),
4345
total_count: AtomicUsize::new(0),
@@ -48,7 +50,8 @@ impl<T: Number<T>> ValueMap<T> {
4850
fn hash_to_bucket(key: &AttributeSet) -> u8 {
4951
let mut hasher = DefaultHasher::new();
5052
key.hash(&mut hasher);
51-
(hasher.finish() % 256) as u8
53+
// Use the 8 least significant bits directly, avoiding the modulus operation.
54+
hasher.finish() as u8
5255
}
5356
}
5457

@@ -59,9 +62,15 @@ impl<T: Number<T>> ValueMap<T> {
5962
self.has_no_value_attribute_value
6063
.store(true, Ordering::Release);
6164
} else {
62-
let bucket_key = Self::hash_to_bucket(&attrs);
63-
if let Some(bucket) = self.buckets.get(&bucket_key) {
64-
let mut values = bucket.lock().unwrap();
65+
let bucket_index = Self::hash_to_bucket(&attrs) as usize; // Ensure index is usize for array indexing
66+
let bucket_mutex = &self.buckets[bucket_index];
67+
let mut bucket_guard = bucket_mutex.lock().unwrap();
68+
69+
if bucket_guard.is_none() {
70+
*bucket_guard = Some(HashMap::new()); // Initialize the bucket if it's None
71+
}
72+
73+
if let Some(ref mut values) = *bucket_guard {
6574
let size = values.len();
6675
match values.entry(attrs) {
6776
Entry::Occupied(mut occupied_entry) => {
@@ -158,18 +167,19 @@ impl<T: Number<T>> Sum<T> {
158167
});
159168
}
160169

161-
for (_, bucket) in self.value_map.buckets.iter() {
162-
let mut locked_bucket = bucket.lock().unwrap();
163-
for (attrs, value) in locked_bucket.drain() {
164-
s_data.data_points.push(DataPoint {
165-
attributes: attrs,
166-
start_time: Some(*self.start.lock().unwrap()),
167-
time: Some(t),
168-
value,
169-
exemplars: vec![],
170-
});
170+
for bucket_mutex in self.value_map.buckets.iter() {
171+
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
172+
for (attrs, value) in locked_bucket.drain() {
173+
s_data.data_points.push(DataPoint {
174+
attributes: attrs,
175+
start_time: Some(*self.start.lock().unwrap()),
176+
time: Some(t),
177+
value,
178+
exemplars: vec![],
179+
});
180+
}
181+
// The bucket is automatically cleared by the .drain() method
171182
}
172-
// The bucket is automatically cleared by the .drain() method
173183
}
174184

175185
// The delta collection cycle resets.
@@ -231,16 +241,17 @@ impl<T: Number<T>> Sum<T> {
231241
// are unbounded number of attribute sets being aggregated. Attribute
232242
// sets that become "stale" need to be forgotten so this will not
233243
// overload the system.
234-
for (_, bucket) in self.value_map.buckets.iter() {
235-
let locked_bucket = bucket.lock().unwrap();
236-
for (attrs, value) in locked_bucket.iter() {
237-
s_data.data_points.push(DataPoint {
238-
attributes: attrs.clone(),
239-
start_time: Some(*self.start.lock().unwrap()), // Consider last reset time
240-
time: Some(t),
241-
value: *value,
242-
exemplars: vec![],
243-
});
244+
for bucket_mutex in self.value_map.buckets.iter() {
245+
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
246+
for (attrs, value) in locked_bucket.iter() {
247+
s_data.data_points.push(DataPoint {
248+
attributes: attrs.clone(),
249+
start_time: Some(*self.start.lock().unwrap()), // Consider last reset time
250+
time: Some(t),
251+
value: *value,
252+
exemplars: vec![],
253+
});
254+
}
244255
}
245256
}
246257

@@ -322,24 +333,22 @@ impl<T: Number<T>> PrecomputedSum<T> {
322333
});
323334
}
324335

325-
// Iterating through each bucket to aggregate and drain data points
326-
for (_, bucket) in self.value_map.buckets.iter() {
327-
let mut locked_bucket = bucket.lock().unwrap();
328-
329-
// Drain operation to move out values from the bucket
330-
let default = T::default();
331-
for (attrs, value) in locked_bucket.drain() {
332-
let delta = value - *reported.get(&attrs).unwrap_or(&default);
333-
if delta != default {
334-
new_reported.insert(attrs.clone(), value);
336+
for bucket_mutex in self.value_map.buckets.iter() {
337+
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
338+
let default = T::default();
339+
for (attrs, value) in locked_bucket.drain() {
340+
let delta = value - *reported.get(&attrs).unwrap_or(&default);
341+
if delta != default {
342+
new_reported.insert(attrs.clone(), value);
343+
}
344+
s_data.data_points.push(DataPoint {
345+
attributes: attrs.clone(),
346+
start_time: Some(prev_start),
347+
time: Some(t),
348+
value: delta,
349+
exemplars: vec![],
350+
});
335351
}
336-
s_data.data_points.push(DataPoint {
337-
attributes: attrs.clone(),
338-
start_time: Some(prev_start),
339-
time: Some(t),
340-
value: delta,
341-
exemplars: vec![],
342-
});
343352
}
344353
}
345354

@@ -406,21 +415,21 @@ impl<T: Number<T>> PrecomputedSum<T> {
406415
}
407416

408417
let default = T::default();
409-
for (_, bucket) in self.value_map.buckets.iter() {
410-
let locked_bucket = bucket.lock().unwrap();
411-
412-
for (attrs, value) in locked_bucket.iter() {
413-
let delta = *value - *reported.get(attrs).unwrap_or(&default);
414-
if delta != default {
415-
new_reported.insert(attrs.clone(), *value);
418+
for bucket_mutex in self.value_map.buckets.iter() {
419+
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
420+
for (attrs, value) in locked_bucket.iter() {
421+
let delta = *value - *reported.get(attrs).unwrap_or(&default);
422+
if delta != default {
423+
new_reported.insert(attrs.clone(), *value);
424+
}
425+
s_data.data_points.push(DataPoint {
426+
attributes: attrs.clone(),
427+
start_time: Some(prev_start),
428+
time: Some(t),
429+
value: *value, // For cumulative, we use the value directly without calculating delta
430+
exemplars: vec![],
431+
});
416432
}
417-
s_data.data_points.push(DataPoint {
418-
attributes: attrs.clone(),
419-
start_time: Some(prev_start),
420-
time: Some(t),
421-
value: *value, // For cumulative, we use the value directly without calculating delta
422-
exemplars: vec![],
423-
});
424433
}
425434
}
426435

0 commit comments

Comments
 (0)