Skip to content

Commit 53cb06e

Browse files
authored
Implement cardinality limits for metric streams (#1066)
1 parent dcd4366 commit 53cb06e

File tree

6 files changed

+143
-37
lines changed

6 files changed

+143
-37
lines changed

opentelemetry-sdk/CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22

33
## Unreleased
44

5+
### Added
6+
7+
- Implement cardinality limits for metric streams
8+
[#1066](https://github.com/open-telemetry/opentelemetry-rust/pull/1066).
9+
510
### Removed
11+
612
- Samplers no longer has access to `InstrumentationLibrary` as one of parameters
713
to `should_sample`.
814
[#1041](https://github.com/open-telemetry/opentelemetry-rust/pull/1041).

opentelemetry-sdk/benches/metric.rs

+21
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ fn bench_counter(
171171
fn counters(c: &mut Criterion) {
172172
let (cx, _, cntr) = bench_counter(None, "cumulative");
173173
let (cx2, _, cntr2) = bench_counter(None, "delta");
174+
let (cx3, _, cntr3) = bench_counter(None, "cumulative");
174175

175176
let mut group = c.benchmark_group("Counter");
176177
group.bench_function("AddNoAttrs", |b| b.iter(|| cntr.add(&cx, 1, &[])));
@@ -278,6 +279,26 @@ fn counters(c: &mut Criterion) {
278279
)
279280
})
280281
});
282+
283+
const MAX_DATA_POINTS: i64 = 2000;
284+
let mut max_attributes: Vec<KeyValue> = Vec::new();
285+
286+
for i in 0..MAX_DATA_POINTS - 2 {
287+
max_attributes.push(KeyValue::new(i.to_string(), i))
288+
}
289+
290+
group.bench_function("AddOneTillMaxAttr", |b| {
291+
b.iter(|| cntr3.add(&cx3, 1, &max_attributes))
292+
});
293+
294+
for i in MAX_DATA_POINTS..MAX_DATA_POINTS * 2 {
295+
max_attributes.push(KeyValue::new(i.to_string(), i))
296+
}
297+
298+
group.bench_function("AddMaxAttr", |b| {
299+
b.iter(|| cntr3.add(&cx3, 1, &max_attributes))
300+
});
301+
281302
group.bench_function("AddInvalidAttr", |b| {
282303
b.iter(|| cntr.add(&cx, 1, &[KeyValue::new("", "V"), KeyValue::new("K", "V")]))
283304
});

opentelemetry-sdk/src/metrics/internal/aggregator.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1+
use crate::{attributes::AttributeSet, metrics::data::Aggregation};
2+
use once_cell::sync::Lazy;
3+
use opentelemetry_api::KeyValue;
14
use std::sync::Arc;
25

3-
use crate::{attributes::AttributeSet, metrics::data::Aggregation};
6+
const STREAM_CARDINALITY_LIMIT: u32 = 2000;
7+
pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(|| {
8+
let key_values: [KeyValue; 1] = [KeyValue::new("otel.metric.overflow", "true")];
9+
AttributeSet::from(&key_values[..])
10+
});
411

512
/// Forms an aggregation from a collection of recorded measurements.
613
pub(crate) trait Aggregator<T>: Send + Sync {
@@ -15,6 +22,11 @@ pub(crate) trait Aggregator<T>: Send + Sync {
1522
fn as_precompute_aggregator(&self) -> Option<Arc<dyn PrecomputeAggregator<T>>> {
1623
None
1724
}
25+
26+
/// Checks whether aggregator has hit cardinality limit for metric streams
27+
fn is_under_cardinality_limit(&self, size: usize) -> bool {
28+
size < STREAM_CARDINALITY_LIMIT as usize - 1
29+
}
1830
}
1931

2032
/// An `Aggregator` that receives values to aggregate that have been pre-computed by the caller.

opentelemetry-sdk/src/metrics/internal/histogram.rs

+35-18
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::HashMap,
2+
collections::{hash_map::Entry, HashMap},
33
sync::{Arc, Mutex},
44
time::SystemTime,
55
};
@@ -9,8 +9,9 @@ use crate::metrics::{
99
aggregation,
1010
data::{self, Aggregation},
1111
};
12+
use opentelemetry_api::{global, metrics::MetricsError};
1213

13-
use super::{Aggregator, Number};
14+
use super::{aggregator::STREAM_OVERFLOW_ATTRIBUTE_SET, Aggregator, Number};
1415

1516
#[derive(Default)]
1617
struct Buckets<T> {
@@ -78,22 +79,38 @@ where
7879
Ok(guard) => guard,
7980
Err(_) => return,
8081
};
81-
82-
let b = values.entry(attrs).or_insert_with(|| {
83-
// N+1 buckets. For example:
84-
//
85-
// bounds = [0, 5, 10]
86-
//
87-
// Then,
88-
//
89-
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
90-
let mut b = Buckets::new(self.bounds.len() + 1);
91-
// Ensure min and max are recorded values (not zero), for new buckets.
92-
(b.min, b.max) = (measurement, measurement);
93-
94-
b
95-
});
96-
b.bin(idx, measurement)
82+
let size = values.len();
83+
84+
match values.entry(attrs) {
85+
Entry::Occupied(mut occupied_entry) => occupied_entry.get_mut().bin(idx, measurement),
86+
Entry::Vacant(vacant_entry) => {
87+
if self.is_under_cardinality_limit(size) {
88+
// N+1 buckets. For example:
89+
//
90+
// bounds = [0, 5, 10]
91+
//
92+
// Then,
93+
//
94+
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
95+
let mut b = Buckets::new(self.bounds.len() + 1);
96+
// Ensure min and max are recorded values (not zero), for new buckets.
97+
(b.min, b.max) = (measurement, measurement);
98+
b.bin(idx, measurement);
99+
vacant_entry.insert(b);
100+
} else {
101+
values
102+
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
103+
.and_modify(|val| val.bin(idx, measurement))
104+
.or_insert_with(|| {
105+
let mut b = Buckets::new(self.bounds.len() + 1);
106+
(b.min, b.max) = (measurement, measurement);
107+
b.bin(idx, measurement);
108+
b
109+
});
110+
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
111+
}
112+
}
113+
}
97114
}
98115

99116
fn aggregation(&self) -> Option<Box<dyn Aggregation>> {

opentelemetry-sdk/src/metrics/internal/last_value.rs

+20-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::{
2-
collections::HashMap,
2+
collections::{hash_map::Entry, HashMap},
33
sync::{Arc, Mutex},
44
time::SystemTime,
55
};
66

77
use crate::attributes::AttributeSet;
88
use crate::metrics::data::{self, Gauge};
9+
use opentelemetry_api::{global, metrics::MetricsError};
910

10-
use super::{Aggregator, Number};
11+
use super::{aggregator::STREAM_OVERFLOW_ATTRIBUTE_SET, Aggregator, Number};
1112

1213
/// Timestamped measurement data.
1314
struct DataPointValue<T> {
@@ -28,11 +29,26 @@ struct LastValue<T> {
2829

2930
impl<T: Number<T>> Aggregator<T> for LastValue<T> {
3031
fn aggregate(&self, measurement: T, attrs: AttributeSet) {
31-
let d = DataPointValue {
32+
let d: DataPointValue<T> = DataPointValue {
3233
timestamp: SystemTime::now(),
3334
value: measurement,
3435
};
35-
let _ = self.values.lock().map(|mut values| values.insert(attrs, d));
36+
if let Ok(mut values) = self.values.lock() {
37+
let size = values.len();
38+
match values.entry(attrs) {
39+
Entry::Occupied(mut occupied_entry) => {
40+
occupied_entry.insert(d);
41+
}
42+
Entry::Vacant(vacant_entry) => {
43+
if self.is_under_cardinality_limit(size) {
44+
vacant_entry.insert(d);
45+
} else {
46+
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), d);
47+
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
48+
}
49+
}
50+
}
51+
}
3652
}
3753

3854
fn aggregation(&self) -> Option<Box<dyn crate::metrics::data::Aggregation>> {

opentelemetry-sdk/src/metrics/internal/sum.rs

+48-14
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
use std::{
2-
collections::HashMap,
2+
collections::{hash_map::Entry, HashMap},
33
sync::{Arc, Mutex},
44
time::SystemTime,
55
};
66

77
use crate::attributes::AttributeSet;
88
use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
9+
use opentelemetry_api::{global, metrics::MetricsError};
910

10-
use super::{aggregator::PrecomputeAggregator, Aggregator, Number};
11+
use super::{
12+
aggregator::{PrecomputeAggregator, STREAM_OVERFLOW_ATTRIBUTE_SET},
13+
Aggregator, Number,
14+
};
1115

1216
/// The storage for sums.
1317
#[derive(Default)]
@@ -26,10 +30,24 @@ impl<T: Number<T>> ValueMap<T> {
2630
impl<T: Number<T>> Aggregator<T> for ValueMap<T> {
2731
fn aggregate(&self, measurement: T, attrs: AttributeSet) {
2832
if let Ok(mut values) = self.values.lock() {
29-
values
30-
.entry(attrs)
31-
.and_modify(|val| *val += measurement)
32-
.or_insert(measurement);
33+
let size = values.len();
34+
match values.entry(attrs) {
35+
Entry::Occupied(mut occupied_entry) => {
36+
let sum = occupied_entry.get_mut();
37+
*sum += measurement;
38+
}
39+
Entry::Vacant(vacant_entry) => {
40+
if self.is_under_cardinality_limit(size) {
41+
vacant_entry.insert(measurement);
42+
} else {
43+
values
44+
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
45+
.and_modify(|val| *val += measurement)
46+
.or_insert(measurement);
47+
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
48+
}
49+
}
50+
}
3351
}
3452
}
3553

@@ -211,14 +229,30 @@ impl<T: Number<T>> Aggregator<T> for PrecomputedMap<T> {
211229
Ok(guard) => guard,
212230
Err(_) => return,
213231
};
214-
215-
values
216-
.entry(attrs)
217-
.and_modify(|v| v.measured = measurement)
218-
.or_insert(PrecomputedValue {
219-
measured: measurement,
220-
..Default::default()
221-
});
232+
let size = values.len();
233+
match values.entry(attrs) {
234+
Entry::Occupied(mut occupied_entry) => {
235+
let v = occupied_entry.get_mut();
236+
v.measured = measurement;
237+
}
238+
Entry::Vacant(vacant_entry) => {
239+
if self.is_under_cardinality_limit(size) {
240+
vacant_entry.insert(PrecomputedValue {
241+
measured: measurement,
242+
..Default::default()
243+
});
244+
} else {
245+
values.insert(
246+
STREAM_OVERFLOW_ATTRIBUTE_SET.clone(),
247+
PrecomputedValue {
248+
measured: measurement,
249+
..Default::default()
250+
},
251+
);
252+
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
253+
}
254+
}
255+
}
222256
}
223257

224258
fn aggregation(&self) -> Option<Box<dyn Aggregation>> {

0 commit comments

Comments
 (0)