Skip to content

Commit c9388e4

Browse files
authored
Preallocate and keep memory for HashMap in Metric aggregation (#2343)
1 parent 3a4d12a commit c9388e4

File tree

2 files changed

+26
-14
lines changed

2 files changed

+26
-14
lines changed

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ use super::{
1212
precomputed_sum::PrecomputedSum, sum::Sum, Number,
1313
};
1414

15-
const STREAM_CARDINALITY_LIMIT: u32 = 2000;
15+
pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
1616

1717
/// Checks whether aggregator has hit cardinality limit for metric streams
1818
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
19-
size < STREAM_CARDINALITY_LIMIT as usize
19+
size < STREAM_CARDINALITY_LIMIT
2020
}
2121

2222
/// Receives measurements to be aggregated.

opentelemetry-sdk/src/metrics/internal/mod.rs

+24-12
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ mod sum;
77

88
use core::fmt;
99
use std::collections::{HashMap, HashSet};
10-
use std::mem::take;
10+
use std::mem::swap;
1111
use std::ops::{Add, AddAssign, DerefMut, Sub};
1212
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
1313
use std::sync::{Arc, RwLock};
1414

15-
use aggregate::is_under_cardinality_limit;
15+
use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
1616
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
1717
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
1818
use once_cell::sync::Lazy;
@@ -51,6 +51,11 @@ where
5151
{
5252
/// Trackers store the values associated with different attribute sets.
5353
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
54+
55+
/// Used by collect exclusively. The data type must match the one used in
56+
/// `trackers` to allow mem::swap.
57+
trackers_for_collect: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
58+
5459
/// Number of different attribute set stored in the `trackers` map.
5560
count: AtomicUsize,
5661
/// Indicates whether a value with no attributes has been stored.
@@ -67,7 +72,10 @@ where
6772
{
6873
fn new(config: A::InitConfig) -> Self {
6974
ValueMap {
70-
trackers: RwLock::new(HashMap::new()),
75+
trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
76+
// TODO: For cumulative, this is not required, so avoid this
77+
// pre-allocation.
78+
trackers_for_collect: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
7179
has_no_attribute_value: AtomicBool::new(false),
7280
no_attribute_tracker: A::create(&config),
7381
count: AtomicUsize::new(0),
@@ -170,19 +178,23 @@ where
170178
));
171179
}
172180

173-
let trackers = match self.trackers.write() {
174-
Ok(mut trackers) => {
181+
if let Ok(mut trackers_collect) = self.trackers_for_collect.write() {
182+
if let Ok(mut trackers_current) = self.trackers.write() {
183+
swap(trackers_collect.deref_mut(), trackers_current.deref_mut());
175184
self.count.store(0, Ordering::SeqCst);
176-
take(trackers.deref_mut())
185+
} else {
186+
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
187+
return;
177188
}
178-
Err(_) => todo!(),
179-
};
180189

181-
let mut seen = HashSet::new();
182-
for (attrs, tracker) in trackers.into_iter() {
183-
if seen.insert(Arc::as_ptr(&tracker)) {
184-
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
190+
let mut seen = HashSet::new();
191+
for (attrs, tracker) in trackers_collect.drain() {
192+
if seen.insert(Arc::as_ptr(&tracker)) {
193+
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
194+
}
185195
}
196+
} else {
197+
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
186198
}
187199
}
188200
}

0 commit comments

Comments
 (0)