Skip to content

Commit 56b4475

Browse files
committed
Use ValueMap for Gauge
1 parent 399846e commit 56b4475

File tree

1 file changed

+41
-52
lines changed

1 file changed

+41
-52
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,71 @@
11
use std::{
2-
collections::{hash_map::Entry, HashMap},
3-
sync::Mutex,
2+
collections::HashSet,
3+
sync::{atomic::Ordering, Arc},
44
time::SystemTime,
55
};
66

7-
use crate::{metrics::data::DataPoint, metrics::AttributeSet};
8-
use opentelemetry::{global, metrics::MetricsError, KeyValue};
7+
use crate::metrics::data::DataPoint;
8+
use opentelemetry::KeyValue;
99

10-
use super::{
11-
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
12-
Number,
13-
};
14-
15-
/// Timestamped measurement data.
16-
struct DataPointValue<T> {
17-
timestamp: SystemTime,
18-
value: T,
19-
}
10+
use super::{Assign, AtomicTracker, Number, ValueMap};
2011

2112
/// Summarizes a set of measurements as the last one made.
22-
#[derive(Default)]
23-
pub(crate) struct LastValue<T> {
24-
values: Mutex<HashMap<AttributeSet, DataPointValue<T>>>,
13+
pub(crate) struct LastValue<T: Number<T>> {
14+
value_map: ValueMap<T, Assign>,
2515
}
2616

2717
impl<T: Number<T>> LastValue<T> {
2818
pub(crate) fn new() -> Self {
29-
Self::default()
19+
LastValue {
20+
value_map: ValueMap::new(),
21+
}
3022
}
3123

3224
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
33-
let d: DataPointValue<T> = DataPointValue {
34-
timestamp: SystemTime::now(),
35-
value: measurement,
36-
};
37-
38-
let attrs: AttributeSet = attrs.into();
39-
if let Ok(mut values) = self.values.lock() {
40-
let size = values.len();
41-
match values.entry(attrs) {
42-
Entry::Occupied(mut occupied_entry) => {
43-
occupied_entry.insert(d);
44-
}
45-
Entry::Vacant(vacant_entry) => {
46-
if is_under_cardinality_limit(size) {
47-
vacant_entry.insert(d);
48-
} else {
49-
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), d);
50-
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
51-
}
52-
}
53-
}
54-
}
25+
self.value_map.measure(measurement, attrs);
5526
}
5627

5728
pub(crate) fn compute_aggregation(&self, dest: &mut Vec<DataPoint<T>>) {
29+
let t = SystemTime::now();
5830
dest.clear();
59-
let mut values = match self.values.lock() {
60-
Ok(guard) if !guard.is_empty() => guard,
61-
_ => return,
62-
};
6331

64-
let n = values.len();
32+
// Max number of data points need to account for the special casing
33+
// of the no attribute value + overflow attribute.
34+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
6535
if n > dest.capacity() {
6636
dest.reserve_exact(n - dest.capacity());
6737
}
6838

69-
for (attrs, value) in values.drain() {
39+
if self
40+
.value_map
41+
.has_no_attribute_value
42+
.swap(false, Ordering::AcqRel)
43+
{
7044
dest.push(DataPoint {
71-
attributes: attrs
72-
.iter()
73-
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
74-
.collect(),
75-
time: Some(value.timestamp),
76-
value: value.value,
45+
attributes: vec![],
7746
start_time: None,
47+
time: Some(t),
48+
value: self.value_map.no_attribute_tracker.get_and_reset_value(),
7849
exemplars: vec![],
7950
});
8051
}
52+
53+
let mut trackers = match self.value_map.trackers.write() {
54+
Ok(v) => v,
55+
_ => return,
56+
};
57+
58+
let mut seen = HashSet::new();
59+
for (attrs, tracker) in trackers.drain() {
60+
if seen.insert(Arc::as_ptr(&tracker)) {
61+
dest.push(DataPoint {
62+
attributes: attrs.clone(),
63+
start_time: None,
64+
time: Some(t),
65+
value: tracker.get_value(),
66+
exemplars: vec![],
67+
});
68+
}
69+
}
8170
}
8271
}

0 commit comments

Comments
 (0)