Skip to content

Commit 1a77749

Browse files
authored
Merge branch 'main' into logrecord-instrlib-cow
2 parents d14416f + 056d2ae commit 1a77749

File tree

11 files changed

+508
-391
lines changed

11 files changed

+508
-391
lines changed

opentelemetry-sdk/Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,15 @@ name = "span_builder"
6161
harness = false
6262

6363
[[bench]]
64-
name = "metric_counter"
64+
name = "metrics_counter"
6565
harness = false
6666

6767
[[bench]]
68-
name = "metric_gauge"
68+
name = "metrics_gauge"
6969
harness = false
7070

7171
[[bench]]
72-
name = "metric_histogram"
72+
name = "metrics_histogram"
7373
harness = false
7474

7575
[[bench]]

opentelemetry-sdk/benches/metric_counter.rs opentelemetry-sdk/benches/metrics_counter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ static ATTRIBUTE_VALUES: [&str; 10] = [
3636
];
3737

3838
// Run this benchmark with:
39-
// cargo bench --bench metric_counter
39+
// cargo bench --bench metrics_counter
4040
fn create_counter(name: &'static str) -> Counter<u64> {
4141
let meter_provider: SdkMeterProvider = SdkMeterProvider::builder()
4242
.with_reader(ManualReader::builder().build())

opentelemetry-sdk/benches/metric_gauge.rs opentelemetry-sdk/benches/metrics_gauge.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ static ATTRIBUTE_VALUES: [&str; 10] = [
3232
];
3333

3434
// Run this benchmark with:
35-
// cargo bench --bench metric_gauge
35+
// cargo bench --bench metrics_gauge
3636
fn create_gauge() -> Gauge<u64> {
3737
let meter_provider: SdkMeterProvider = SdkMeterProvider::builder()
3838
.with_reader(ManualReader::builder().build())

opentelemetry-sdk/benches/metric_histogram.rs opentelemetry-sdk/benches/metrics_histogram.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ static ATTRIBUTE_VALUES: [&str; 10] = [
3434
];
3535

3636
// Run this benchmark with:
37-
// cargo bench --bench metric_histogram
37+
// cargo bench --bench metrics_histogram
3838
fn create_histogram(name: &'static str) -> Histogram<u64> {
3939
let meter_provider: SdkMeterProvider = SdkMeterProvider::builder()
4040
.with_reader(ManualReader::builder().build())

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,8 @@ use opentelemetry::KeyValue;
55
use crate::metrics::data::{Aggregation, Gauge, Temporality};
66

77
use super::{
8-
exponential_histogram::ExpoHistogram,
9-
histogram::Histogram,
10-
last_value::LastValue,
11-
sum::{PrecomputedSum, Sum},
12-
Number,
8+
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
9+
precomputed_sum::PrecomputedSum, sum::Sum, Number,
1310
};
1411

1512
const STREAM_CARDINALITY_LIMIT: u32 = 2000;

opentelemetry-sdk/src/metrics/internal/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod aggregate;
22
mod exponential_histogram;
33
mod histogram;
44
mod last_value;
5+
mod precomputed_sum;
56
mod sum;
67

78
use core::fmt;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
use opentelemetry::KeyValue;
2+
3+
use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
4+
5+
use super::{Assign, AtomicTracker, Number, ValueMap};
6+
use std::{
7+
collections::{HashMap, HashSet},
8+
sync::{atomic::Ordering, Arc, Mutex},
9+
time::SystemTime,
10+
};
11+
12+
/// Summarizes a set of pre-computed sums as their arithmetic sum.
13+
pub(crate) struct PrecomputedSum<T: Number<T>> {
14+
value_map: ValueMap<T, T, Assign>,
15+
monotonic: bool,
16+
start: Mutex<SystemTime>,
17+
reported: Mutex<HashMap<Vec<KeyValue>, T>>,
18+
}
19+
20+
impl<T: Number<T>> PrecomputedSum<T> {
21+
pub(crate) fn new(monotonic: bool) -> Self {
22+
PrecomputedSum {
23+
value_map: ValueMap::new(),
24+
monotonic,
25+
start: Mutex::new(SystemTime::now()),
26+
reported: Mutex::new(Default::default()),
27+
}
28+
}
29+
30+
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
31+
// The argument index is not applicable to PrecomputedSum.
32+
self.value_map.measure(measurement, attrs, 0);
33+
}
34+
35+
pub(crate) fn delta(
36+
&self,
37+
dest: Option<&mut dyn Aggregation>,
38+
) -> (usize, Option<Box<dyn Aggregation>>) {
39+
let t = SystemTime::now();
40+
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
41+
42+
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
43+
let mut new_agg = if s_data.is_none() {
44+
Some(data::Sum {
45+
data_points: vec![],
46+
temporality: Temporality::Delta,
47+
is_monotonic: self.monotonic,
48+
})
49+
} else {
50+
None
51+
};
52+
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
53+
s_data.data_points.clear();
54+
s_data.temporality = Temporality::Delta;
55+
s_data.is_monotonic = self.monotonic;
56+
57+
// Max number of data points need to account for the special casing
58+
// of the no attribute value + overflow attribute.
59+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
60+
if n > s_data.data_points.capacity() {
61+
s_data
62+
.data_points
63+
.reserve_exact(n - s_data.data_points.capacity());
64+
}
65+
let mut new_reported = HashMap::with_capacity(n);
66+
let mut reported = match self.reported.lock() {
67+
Ok(r) => r,
68+
Err(_) => return (0, None),
69+
};
70+
71+
if self
72+
.value_map
73+
.has_no_attribute_value
74+
.swap(false, Ordering::AcqRel)
75+
{
76+
let value = self.value_map.no_attribute_tracker.get_value();
77+
let delta = value - *reported.get(&vec![]).unwrap_or(&T::default());
78+
new_reported.insert(vec![], value);
79+
80+
s_data.data_points.push(DataPoint {
81+
attributes: vec![],
82+
start_time: Some(prev_start),
83+
time: Some(t),
84+
value: delta,
85+
exemplars: vec![],
86+
});
87+
}
88+
89+
let mut trackers = match self.value_map.trackers.write() {
90+
Ok(v) => v,
91+
Err(_) => return (0, None),
92+
};
93+
94+
let mut seen = HashSet::new();
95+
for (attrs, tracker) in trackers.drain() {
96+
if seen.insert(Arc::as_ptr(&tracker)) {
97+
let value = tracker.get_value();
98+
let delta = value - *reported.get(&attrs).unwrap_or(&T::default());
99+
new_reported.insert(attrs.clone(), value);
100+
s_data.data_points.push(DataPoint {
101+
attributes: attrs.clone(),
102+
start_time: Some(prev_start),
103+
time: Some(t),
104+
value: delta,
105+
exemplars: vec![],
106+
});
107+
}
108+
}
109+
110+
// The delta collection cycle resets.
111+
if let Ok(mut start) = self.start.lock() {
112+
*start = t;
113+
}
114+
self.value_map.count.store(0, Ordering::SeqCst);
115+
116+
*reported = new_reported;
117+
drop(reported); // drop before values guard is dropped
118+
119+
(
120+
s_data.data_points.len(),
121+
new_agg.map(|a| Box::new(a) as Box<_>),
122+
)
123+
}
124+
125+
pub(crate) fn cumulative(
126+
&self,
127+
dest: Option<&mut dyn Aggregation>,
128+
) -> (usize, Option<Box<dyn Aggregation>>) {
129+
let t = SystemTime::now();
130+
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
131+
132+
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
133+
let mut new_agg = if s_data.is_none() {
134+
Some(data::Sum {
135+
data_points: vec![],
136+
temporality: Temporality::Cumulative,
137+
is_monotonic: self.monotonic,
138+
})
139+
} else {
140+
None
141+
};
142+
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
143+
s_data.data_points.clear();
144+
s_data.temporality = Temporality::Cumulative;
145+
s_data.is_monotonic = self.monotonic;
146+
147+
// Max number of data points need to account for the special casing
148+
// of the no attribute value + overflow attribute.
149+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
150+
if n > s_data.data_points.capacity() {
151+
s_data
152+
.data_points
153+
.reserve_exact(n - s_data.data_points.capacity());
154+
}
155+
156+
if self
157+
.value_map
158+
.has_no_attribute_value
159+
.load(Ordering::Acquire)
160+
{
161+
s_data.data_points.push(DataPoint {
162+
attributes: vec![],
163+
start_time: Some(prev_start),
164+
time: Some(t),
165+
value: self.value_map.no_attribute_tracker.get_value(),
166+
exemplars: vec![],
167+
});
168+
}
169+
170+
let trackers = match self.value_map.trackers.write() {
171+
Ok(v) => v,
172+
Err(_) => return (0, None),
173+
};
174+
175+
let mut seen = HashSet::new();
176+
for (attrs, tracker) in trackers.iter() {
177+
if seen.insert(Arc::as_ptr(tracker)) {
178+
s_data.data_points.push(DataPoint {
179+
attributes: attrs.clone(),
180+
start_time: Some(prev_start),
181+
time: Some(t),
182+
value: tracker.get_value(),
183+
exemplars: vec![],
184+
});
185+
}
186+
}
187+
188+
(
189+
s_data.data_points.len(),
190+
new_agg.map(|a| Box::new(a) as Box<_>),
191+
)
192+
}
193+
}

0 commit comments

Comments
 (0)