Skip to content

Commit ac0ea9f

Browse files
frailltcijothomaslalitb
authored
Add collect methods on ValueMap (#2267)
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com> Co-authored-by: Lalit Kumar Bhasin <labhas@microsoft.com>
1 parent 1fd871a commit ac0ea9f

File tree

5 files changed

+170
-423
lines changed

5 files changed

+170
-423
lines changed

opentelemetry-sdk/src/metrics/internal/histogram.rs

+38-146
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use std::collections::HashSet;
2-
use std::sync::atomic::Ordering;
3-
use std::sync::Arc;
1+
use std::mem::replace;
2+
use std::ops::DerefMut;
43
use std::{sync::Mutex, time::SystemTime};
54

65
use crate::metrics::data::HistogramDataPoint;
@@ -37,6 +36,14 @@ where
3736
buckets: Mutex::new(Buckets::<T>::new(*count)),
3837
}
3938
}
39+
40+
fn clone_and_reset(&self, count: &usize) -> Self {
41+
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
42+
let cloned = replace(current.deref_mut(), Buckets::new(*count));
43+
Self {
44+
buckets: Mutex::new(cloned),
45+
}
46+
}
4047
}
4148

4249
#[derive(Default)]
@@ -73,16 +80,6 @@ impl<T: Number> Buckets<T> {
7380
self.max = value
7481
}
7582
}
76-
77-
fn reset(&mut self) {
78-
for item in &mut self.counts {
79-
*item = 0;
80-
}
81-
self.count = Default::default();
82-
self.total = Default::default();
83-
self.min = T::max();
84-
self.max = T::min();
85-
}
8683
}
8784

8885
/// Summarizes a set of measurements as a histogram with explicitly defined
@@ -139,11 +136,6 @@ impl<T: Number> Histogram<T> {
139136
dest: Option<&mut dyn Aggregation>,
140137
) -> (usize, Option<Box<dyn Aggregation>>) {
141138
let t = SystemTime::now();
142-
let start = self
143-
.start
144-
.lock()
145-
.map(|s| *s)
146-
.unwrap_or_else(|_| SystemTime::now());
147139
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
148140
let mut new_agg = if h.is_none() {
149141
Some(data::Histogram {
@@ -155,24 +147,22 @@ impl<T: Number> Histogram<T> {
155147
};
156148
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
157149
h.temporality = Temporality::Delta;
158-
h.data_points.clear();
159-
160-
// Max number of data points need to account for the special casing
161-
// of the no attribute value + overflow attribute.
162-
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
163-
if n > h.data_points.capacity() {
164-
h.data_points.reserve_exact(n - h.data_points.capacity());
165-
}
166150

167-
if self
168-
.value_map
169-
.has_no_attribute_value
170-
.swap(false, Ordering::AcqRel)
171-
{
172-
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
173-
h.data_points.push(HistogramDataPoint {
174-
attributes: vec![],
175-
start_time: start,
151+
let prev_start = self
152+
.start
153+
.lock()
154+
.map(|mut start| replace(start.deref_mut(), t))
155+
.unwrap_or(t);
156+
157+
self.value_map
158+
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
159+
let b = aggr
160+
.buckets
161+
.into_inner()
162+
.unwrap_or_else(|err| err.into_inner());
163+
HistogramDataPoint {
164+
attributes,
165+
start_time: prev_start,
176166
time: t,
177167
count: b.count,
178168
bounds: self.bounds.clone(),
@@ -193,54 +183,8 @@ impl<T: Number> Histogram<T> {
193183
None
194184
},
195185
exemplars: vec![],
196-
});
197-
198-
b.reset();
199-
}
200-
}
201-
202-
let mut trackers = match self.value_map.trackers.write() {
203-
Ok(v) => v,
204-
Err(_) => return (0, None),
205-
};
206-
207-
let mut seen = HashSet::new();
208-
for (attrs, tracker) in trackers.drain() {
209-
if seen.insert(Arc::as_ptr(&tracker)) {
210-
if let Ok(b) = tracker.buckets.lock() {
211-
h.data_points.push(HistogramDataPoint {
212-
attributes: attrs.clone(),
213-
start_time: start,
214-
time: t,
215-
count: b.count,
216-
bounds: self.bounds.clone(),
217-
bucket_counts: b.counts.clone(),
218-
sum: if self.record_sum {
219-
b.total
220-
} else {
221-
T::default()
222-
},
223-
min: if self.record_min_max {
224-
Some(b.min)
225-
} else {
226-
None
227-
},
228-
max: if self.record_min_max {
229-
Some(b.max)
230-
} else {
231-
None
232-
},
233-
exemplars: vec![],
234-
});
235186
}
236-
}
237-
}
238-
239-
// The delta collection cycle resets.
240-
if let Ok(mut start) = self.start.lock() {
241-
*start = t;
242-
}
243-
self.value_map.count.store(0, Ordering::SeqCst);
187+
});
244188

245189
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
246190
}
@@ -250,11 +194,6 @@ impl<T: Number> Histogram<T> {
250194
dest: Option<&mut dyn Aggregation>,
251195
) -> (usize, Option<Box<dyn Aggregation>>) {
252196
let t = SystemTime::now();
253-
let start = self
254-
.start
255-
.lock()
256-
.map(|s| *s)
257-
.unwrap_or_else(|_| SystemTime::now());
258197
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
259198
let mut new_agg = if h.is_none() {
260199
Some(data::Histogram {
@@ -266,24 +205,19 @@ impl<T: Number> Histogram<T> {
266205
};
267206
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
268207
h.temporality = Temporality::Cumulative;
269-
h.data_points.clear();
270208

271-
// Max number of data points need to account for the special casing
272-
// of the no attribute value + overflow attribute.
273-
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
274-
if n > h.data_points.capacity() {
275-
h.data_points.reserve_exact(n - h.data_points.capacity());
276-
}
209+
let prev_start = self
210+
.start
211+
.lock()
212+
.map(|s| *s)
213+
.unwrap_or_else(|_| SystemTime::now());
277214

278-
if self
279-
.value_map
280-
.has_no_attribute_value
281-
.load(Ordering::Acquire)
282-
{
283-
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
284-
h.data_points.push(HistogramDataPoint {
285-
attributes: vec![],
286-
start_time: start,
215+
self.value_map
216+
.collect_readonly(&mut h.data_points, |attributes, aggr| {
217+
let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner());
218+
HistogramDataPoint {
219+
attributes,
220+
start_time: prev_start,
287221
time: t,
288222
count: b.count,
289223
bounds: self.bounds.clone(),
@@ -304,50 +238,8 @@ impl<T: Number> Histogram<T> {
304238
None
305239
},
306240
exemplars: vec![],
307-
});
308-
}
309-
}
310-
311-
let trackers = match self.value_map.trackers.write() {
312-
Ok(v) => v,
313-
Err(_) => return (0, None),
314-
};
315-
316-
// TODO: This will use an unbounded amount of memory if there
317-
// are unbounded number of attribute sets being aggregated. Attribute
318-
// sets that become "stale" need to be forgotten so this will not
319-
// overload the system.
320-
let mut seen = HashSet::new();
321-
for (attrs, tracker) in trackers.iter() {
322-
if seen.insert(Arc::as_ptr(tracker)) {
323-
if let Ok(b) = tracker.buckets.lock() {
324-
h.data_points.push(HistogramDataPoint {
325-
attributes: attrs.clone(),
326-
start_time: start,
327-
time: t,
328-
count: b.count,
329-
bounds: self.bounds.clone(),
330-
bucket_counts: b.counts.clone(),
331-
sum: if self.record_sum {
332-
b.total
333-
} else {
334-
T::default()
335-
},
336-
min: if self.record_min_max {
337-
Some(b.min)
338-
} else {
339-
None
340-
},
341-
max: if self.record_min_max {
342-
Some(b.max)
343-
} else {
344-
None
345-
},
346-
exemplars: vec![],
347-
});
348241
}
349-
}
350-
}
242+
});
351243

352244
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
353245
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use std::{
2-
collections::HashSet,
3-
sync::{atomic::Ordering, Arc, Mutex},
4-
time::SystemTime,
5-
};
1+
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
62

73
use crate::metrics::data::DataPoint;
84
use opentelemetry::KeyValue;
@@ -33,6 +29,12 @@ where
3329
fn update(&self, value: T) {
3430
self.value.store(value)
3531
}
32+
33+
fn clone_and_reset(&self, _: &()) -> Self {
34+
Self {
35+
value: T::new_atomic_tracker(self.value.get_and_reset_value()),
36+
}
37+
}
3638
}
3739

3840
/// Summarizes a set of measurements as the last one made.
@@ -56,102 +58,31 @@ impl<T: Number> LastValue<T> {
5658

5759
pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
5860
let t = SystemTime::now();
59-
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
60-
dest.clear();
61-
62-
// Max number of data points need to account for the special casing
63-
// of the no attribute value + overflow attribute.
64-
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
65-
if n > dest.capacity() {
66-
dest.reserve_exact(n - dest.capacity());
67-
}
68-
69-
if self
70-
.value_map
71-
.has_no_attribute_value
72-
.swap(false, Ordering::AcqRel)
73-
{
74-
dest.push(DataPoint {
75-
attributes: vec![],
61+
let prev_start = self
62+
.start
63+
.lock()
64+
.map(|mut start| replace(start.deref_mut(), t))
65+
.unwrap_or(t);
66+
self.value_map
67+
.collect_and_reset(dest, |attributes, aggr| DataPoint {
68+
attributes,
7669
start_time: Some(prev_start),
7770
time: Some(t),
78-
value: self
79-
.value_map
80-
.no_attribute_tracker
81-
.value
82-
.get_and_reset_value(),
71+
value: aggr.value.get_value(),
8372
exemplars: vec![],
8473
});
85-
}
86-
87-
let mut trackers = match self.value_map.trackers.write() {
88-
Ok(v) => v,
89-
_ => return,
90-
};
91-
92-
let mut seen = HashSet::new();
93-
for (attrs, tracker) in trackers.drain() {
94-
if seen.insert(Arc::as_ptr(&tracker)) {
95-
dest.push(DataPoint {
96-
attributes: attrs.clone(),
97-
start_time: Some(prev_start),
98-
time: Some(t),
99-
value: tracker.value.get_value(),
100-
exemplars: vec![],
101-
});
102-
}
103-
}
104-
105-
// The delta collection cycle resets.
106-
if let Ok(mut start) = self.start.lock() {
107-
*start = t;
108-
}
109-
self.value_map.count.store(0, Ordering::SeqCst);
11074
}
11175

11276
pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<DataPoint<T>>) {
11377
let t = SystemTime::now();
11478
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
115-
116-
dest.clear();
117-
118-
// Max number of data points need to account for the special casing
119-
// of the no attribute value + overflow attribute.
120-
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
121-
if n > dest.capacity() {
122-
dest.reserve_exact(n - dest.capacity());
123-
}
124-
125-
if self
126-
.value_map
127-
.has_no_attribute_value
128-
.load(Ordering::Acquire)
129-
{
130-
dest.push(DataPoint {
131-
attributes: vec![],
79+
self.value_map
80+
.collect_readonly(dest, |attributes, aggr| DataPoint {
81+
attributes,
13282
start_time: Some(prev_start),
13383
time: Some(t),
134-
value: self.value_map.no_attribute_tracker.value.get_value(),
84+
value: aggr.value.get_value(),
13585
exemplars: vec![],
13686
});
137-
}
138-
139-
let trackers = match self.value_map.trackers.write() {
140-
Ok(v) => v,
141-
_ => return,
142-
};
143-
144-
let mut seen = HashSet::new();
145-
for (attrs, tracker) in trackers.iter() {
146-
if seen.insert(Arc::as_ptr(tracker)) {
147-
dest.push(DataPoint {
148-
attributes: attrs.clone(),
149-
start_time: Some(prev_start),
150-
time: Some(t),
151-
value: tracker.value.get_value(),
152-
exemplars: vec![],
153-
});
154-
}
155-
}
15687
}
15788
}

0 commit comments

Comments
 (0)