Skip to content

Commit 3443940

Browse files
frailltMindaugas Vinkelis
authored and
Mindaugas Vinkelis
committed
Unify Histogram and ExpHistogram aggregation
1 parent ff9d50b commit 3443940

File tree

7 files changed

+522
-474
lines changed

7 files changed

+522
-474
lines changed

opentelemetry-sdk/benches/metric.rs

+36-16
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::{Arc, Weak};
33

44
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
55
use opentelemetry::{
6+
global,
67
metrics::{Counter, Histogram, MeterProvider as _, Result},
78
Key, KeyValue,
89
};
@@ -344,7 +345,7 @@ fn counters(c: &mut Criterion) {
344345
const MAX_BOUND: usize = 100000;
345346

346347
fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram<u64>) {
347-
let mut bounds = vec![0; bound_count];
348+
let mut bounds: Vec<usize> = vec![0; bound_count];
348349
#[allow(clippy::needless_range_loop)]
349350
for i in 0..bounds.len() {
350351
bounds[i] = i * MAX_BOUND / bound_count
@@ -394,22 +395,41 @@ fn histograms(c: &mut Criterion) {
394395
);
395396
}
396397
}
397-
group.bench_function("CollectOne", |b| benchmark_collect_histogram(b, 1));
398-
group.bench_function("CollectFive", |b| benchmark_collect_histogram(b, 5));
399-
group.bench_function("CollectTen", |b| benchmark_collect_histogram(b, 10));
400-
group.bench_function("CollectTwentyFive", |b| benchmark_collect_histogram(b, 25));
398+
for metrics_size in [1, 5, 20] {
399+
for attr_sets in [1, 5, 20, 200] {
400+
group.bench_function(
401+
format!("Collect{metrics_size}Metric{attr_sets}AttrSets"),
402+
|b| benchmark_collect_histogram(b, metrics_size, attr_sets),
403+
);
404+
}
405+
}
401406
}
402407

403-
fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
408+
fn benchmark_collect_histogram(b: &mut Bencher, metrics_size: usize, attr_sets: usize) {
404409
let r = SharedReader(Arc::new(ManualReader::default()));
405-
let mtr = SdkMeterProvider::builder()
406-
.with_reader(r.clone())
407-
.build()
408-
.meter("sdk/metric/bench/histogram");
409-
410-
for i in 0..n {
411-
let h = mtr.u64_histogram(format!("fake_data_{i}")).init();
412-
h.record(1, &[]);
410+
let provider = SdkMeterProvider::builder().with_reader(r.clone()).build();
411+
let mtr = provider.meter("sdk/metric/bench/histogram");
412+
global::set_meter_provider(provider);
413+
414+
let mut rng = rand::thread_rng();
415+
for m in 0..metrics_size {
416+
let h = mtr.u64_histogram(format!("fake_data_{m}")).init();
417+
for _att in 0..attr_sets {
418+
let mut attributes: Vec<KeyValue> = Vec::new();
419+
for _i in 0..rng.gen_range(0..3).try_into().unwrap() {
420+
attributes.push(KeyValue::new(
421+
format!(
422+
"K{}",
423+
TryInto::<i32>::try_into(rng.gen_range(0..10)).unwrap()
424+
),
425+
format!(
426+
"V{}",
427+
TryInto::<i32>::try_into(rng.gen_range(0..10)).unwrap()
428+
),
429+
))
430+
}
431+
h.record(1, &attributes)
432+
}
413433
}
414434

415435
let mut rm = ResourceMetrics {
@@ -418,8 +438,8 @@ fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
418438
};
419439

420440
b.iter(|| {
421-
let _ = r.collect(&mut rm);
422-
assert_eq!(rm.scope_metrics[0].metrics.len(), n);
441+
r.collect(&mut rm).unwrap();
442+
assert_eq!(rm.scope_metrics[0].metrics.len(), metrics_size);
423443
})
424444
}
425445

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use std::{
2+
collections::HashMap,
3+
fmt::Debug,
4+
ops::Deref,
5+
sync::{Arc, Mutex, RwLock},
6+
};
7+
8+
use opentelemetry::{global, metrics::MetricsError, KeyValue};
9+
10+
use crate::metrics::AttributeSet;
11+
12+
use super::{
13+
aggregate::is_under_cardinality_limit, Number, STREAM_OVERFLOW_ATTRIBUTES,
14+
STREAM_OVERFLOW_ATTRIBUTES_ERR,
15+
};
16+
17+
/// Aggregator interface
18+
pub(crate) trait Aggregator<T>: Debug
19+
where
20+
T: Number,
21+
{
22+
/// A static configuration that is needed by configurators.
23+
/// E.g. bucket_size at creation time and buckets list at aggregator update.
24+
type Config;
25+
26+
/// Called everytime a new attribute-set is stored.
27+
fn create(init: &Self::Config) -> Self;
28+
29+
/// Called for each measurement.
30+
fn update(&mut self, config: &Self::Config, measurement: T);
31+
}
32+
33+
/// hashing and sorting is expensive, so we have two lists
34+
/// sorted list is mainly needed for fast collection phase
35+
struct WithAttribsAggregators<A> {
36+
// put all attribute combinations in this list
37+
all: HashMap<Vec<KeyValue>, Arc<Mutex<A>>>,
38+
sorted: HashMap<Vec<KeyValue>, Arc<Mutex<A>>>,
39+
}
40+
41+
/// This class is responsible for two things:
42+
/// * send measurement information for specific aggregator (per attribute-set)
43+
/// * collect all attribute-sets + aggregators (either readonly OR reset)
44+
///
45+
/// Even though it's simple to understand it's responsibility,
46+
/// implementation is a lot more complex to make it very performant.
47+
pub(crate) struct AttributeSetAggregation<T, A>
48+
where
49+
T: Number,
50+
A: Aggregator<T>,
51+
{
52+
/// Aggregator for values with no attributes attached.
53+
no_attribs: Mutex<Option<A>>,
54+
list: RwLock<WithAttribsAggregators<A>>,
55+
/// Configuration required to create and update the [`Aggregator`]
56+
config: A::Config,
57+
}
58+
59+
impl<T, A> AttributeSetAggregation<T, A>
60+
where
61+
T: Number,
62+
A: Aggregator<T>,
63+
{
64+
/// Initiate aggregators by specifing [`Aggregator`] configuration.
65+
pub(crate) fn new(init_data: A::Config) -> Self {
66+
Self {
67+
no_attribs: Mutex::new(None),
68+
list: RwLock::new(WithAttribsAggregators {
69+
all: Default::default(),
70+
sorted: Default::default(),
71+
}),
72+
config: init_data,
73+
}
74+
}
75+
76+
/// Update specific aggregator depending on provided attributes.
77+
pub(crate) fn measure(&self, attrs: &[KeyValue], measurement: T) {
78+
if attrs.is_empty() {
79+
if let Ok(mut aggr) = self.no_attribs.lock() {
80+
aggr.get_or_insert_with(|| A::create(&self.config))
81+
.update(&self.config, measurement);
82+
}
83+
return;
84+
}
85+
let Ok(list) = self.list.read() else {
86+
return;
87+
};
88+
if let Some(aggr) = list.all.get(attrs) {
89+
if let Ok(mut aggr) = aggr.lock() {
90+
aggr.update(&self.config, measurement);
91+
}
92+
return;
93+
}
94+
drop(list);
95+
let Ok(mut list) = self.list.write() else {
96+
return;
97+
};
98+
99+
// Recheck again in case another thread already inserted
100+
if let Some(aggr) = list.all.get(attrs) {
101+
if let Ok(mut aggr) = aggr.lock() {
102+
aggr.update(&self.config, measurement);
103+
}
104+
} else if is_under_cardinality_limit(list.all.len()) {
105+
let mut aggr = A::create(&self.config);
106+
aggr.update(&self.config, measurement);
107+
let aggr = Arc::new(Mutex::new(aggr));
108+
list.all.insert(attrs.into(), aggr.clone());
109+
let sorted_attribs = AttributeSet::from(attrs).into_vec();
110+
list.sorted.insert(sorted_attribs, aggr);
111+
} else if let Some(aggr) = list.sorted.get(STREAM_OVERFLOW_ATTRIBUTES.deref()) {
112+
if let Ok(mut aggr) = aggr.lock() {
113+
aggr.update(&self.config, measurement);
114+
}
115+
} else {
116+
let mut aggr = A::create(&self.config);
117+
aggr.update(&self.config, measurement);
118+
list.sorted.insert(
119+
STREAM_OVERFLOW_ATTRIBUTES.clone(),
120+
Arc::new(Mutex::new(aggr)),
121+
);
122+
global::handle_error(MetricsError::Other(STREAM_OVERFLOW_ATTRIBUTES_ERR.into()));
123+
}
124+
}
125+
126+
/// Iterate through all attribute sets and populate `DataPoints`in readonly mode.
127+
pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
128+
where
129+
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
130+
{
131+
let Ok(list) = self.list.read() else {
132+
return;
133+
};
134+
prepare_data(dest, list.sorted.len());
135+
if let Ok(aggr) = self.no_attribs.lock() {
136+
if let Some(aggr) = aggr.deref() {
137+
dest.push(map_fn(Default::default(), aggr));
138+
}
139+
};
140+
dest.extend(
141+
list.sorted
142+
.iter()
143+
.filter_map(|(k, v)| v.lock().ok().map(|v| map_fn(k.clone(), &v))),
144+
)
145+
}
146+
147+
/// Iterate through all attribute sets and populate `DataPoints`, while also consuming (reseting) aggregators
148+
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
149+
where
150+
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
151+
{
152+
let Ok(mut list) = self.list.write() else {
153+
return;
154+
};
155+
prepare_data(dest, list.sorted.len());
156+
if let Ok(mut aggr) = self.no_attribs.lock() {
157+
if let Some(aggr) = aggr.take() {
158+
dest.push(map_fn(Default::default(), aggr));
159+
}
160+
};
161+
list.all.clear();
162+
dest.extend(list.sorted.drain().filter_map(|(k, v)| {
163+
Arc::try_unwrap(v)
164+
.expect("this is last instance, so we cannot fail to get it")
165+
.into_inner()
166+
.ok()
167+
.map(|v| map_fn(k, v))
168+
}));
169+
}
170+
171+
pub(crate) fn config(&self) -> &A::Config {
172+
&self.config
173+
}
174+
}
175+
176+
/// Clear and allocate exactly required amount of space for all attribute-sets
177+
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
178+
data.clear();
179+
let total_len = list_len + 1; // to account for no_attributes case
180+
if total_len > data.capacity() {
181+
data.reserve_exact(total_len - data.capacity());
182+
}
183+
}

0 commit comments

Comments
 (0)