Skip to content

Commit 82ed8e0

Browse files
authored
Sepatate Aggregate measure and collect functions (open-telemetry#2492)
1 parent 37d2e51 commit 82ed8e0

File tree

8 files changed

+107
-167
lines changed

8 files changed

+107
-167
lines changed

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+40-54
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,6 @@ pub(crate) trait Measure<T>: Send + Sync + 'static {
2727
fn call(&self, measurement: T, attrs: &[KeyValue]);
2828
}
2929

30-
impl<F, T> Measure<T> for F
31-
where
32-
F: Fn(T, &[KeyValue]) + Send + Sync + 'static,
33-
{
34-
fn call(&self, measurement: T, attrs: &[KeyValue]) {
35-
self(measurement, attrs)
36-
}
37-
}
38-
3930
/// Stores the aggregate of measurements into the aggregation and returns the number
4031
/// of aggregate data-points output.
4132
pub(crate) trait ComputeAggregation: Send + Sync + 'static {
@@ -47,15 +38,23 @@ pub(crate) trait ComputeAggregation: Send + Sync + 'static {
4738
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>);
4839
}
4940

50-
impl<T> ComputeAggregation for T
41+
/// Separate `measure` and `collect` functions for an aggregate.
42+
pub(crate) struct AggregateFns<T> {
43+
pub(crate) measure: Arc<dyn Measure<T>>,
44+
pub(crate) collect: Arc<dyn ComputeAggregation>,
45+
}
46+
47+
/// Creates aggregate functions out of aggregate instance
48+
impl<A, T> From<A> for AggregateFns<T>
5149
where
52-
T: Fn(Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>)
53-
+ Send
54-
+ Sync
55-
+ 'static,
50+
A: Measure<T> + ComputeAggregation,
5651
{
57-
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
58-
self(dest)
52+
fn from(value: A) -> Self {
53+
let inst = Arc::new(value);
54+
Self {
55+
measure: inst.clone(),
56+
collect: inst,
57+
}
5958
}
6059
}
6160

@@ -144,30 +143,18 @@ impl<T: Number> AggregateBuilder<T> {
144143
}
145144

146145
/// Builds a last-value aggregate function input and output.
147-
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
148-
let lv = Arc::new(LastValue::new(self.temporality, self.filter.clone()));
149-
(lv.clone(), lv)
146+
pub(crate) fn last_value(&self) -> AggregateFns<T> {
147+
LastValue::new(self.temporality, self.filter.clone()).into()
150148
}
151149

152150
/// Builds a precomputed sum aggregate function input and output.
153-
pub(crate) fn precomputed_sum(
154-
&self,
155-
monotonic: bool,
156-
) -> (impl Measure<T>, impl ComputeAggregation) {
157-
let s = Arc::new(PrecomputedSum::new(
158-
self.temporality,
159-
self.filter.clone(),
160-
monotonic,
161-
));
162-
163-
(s.clone(), s)
151+
pub(crate) fn precomputed_sum(&self, monotonic: bool) -> AggregateFns<T> {
152+
PrecomputedSum::new(self.temporality, self.filter.clone(), monotonic).into()
164153
}
165154

166155
/// Builds a sum aggregate function input and output.
167-
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
168-
let s = Arc::new(Sum::new(self.temporality, self.filter.clone(), monotonic));
169-
170-
(s.clone(), s)
156+
pub(crate) fn sum(&self, monotonic: bool) -> AggregateFns<T> {
157+
Sum::new(self.temporality, self.filter.clone(), monotonic).into()
171158
}
172159

173160
/// Builds a histogram aggregate function input and output.
@@ -176,16 +163,15 @@ impl<T: Number> AggregateBuilder<T> {
176163
boundaries: Vec<f64>,
177164
record_min_max: bool,
178165
record_sum: bool,
179-
) -> (impl Measure<T>, impl ComputeAggregation) {
180-
let h = Arc::new(Histogram::new(
166+
) -> AggregateFns<T> {
167+
Histogram::new(
181168
self.temporality,
182169
self.filter.clone(),
183170
boundaries,
184171
record_min_max,
185172
record_sum,
186-
));
187-
188-
(h.clone(), h)
173+
)
174+
.into()
189175
}
190176

191177
/// Builds an exponential histogram aggregate function input and output.
@@ -195,17 +181,16 @@ impl<T: Number> AggregateBuilder<T> {
195181
max_scale: i8,
196182
record_min_max: bool,
197183
record_sum: bool,
198-
) -> (impl Measure<T>, impl ComputeAggregation) {
199-
let h = Arc::new(ExpoHistogram::new(
184+
) -> AggregateFns<T> {
185+
ExpoHistogram::new(
200186
self.temporality,
201187
self.filter.clone(),
202188
max_size,
203189
max_scale,
204190
record_min_max,
205191
record_sum,
206-
));
207-
208-
(h.clone(), h)
192+
)
193+
.into()
209194
}
210195
}
211196

@@ -221,7 +206,7 @@ mod tests {
221206

222207
#[test]
223208
fn last_value_aggregation() {
224-
let (measure, agg) =
209+
let AggregateFns { measure, collect } =
225210
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
226211
let mut a = Gauge {
227212
data_points: vec![GaugeDataPoint {
@@ -235,7 +220,7 @@ mod tests {
235220
let new_attributes = [KeyValue::new("b", 2)];
236221
measure.call(2, &new_attributes[..]);
237222

238-
let (count, new_agg) = agg.call(Some(&mut a));
223+
let (count, new_agg) = collect.call(Some(&mut a));
239224

240225
assert_eq!(count, 1);
241226
assert!(new_agg.is_none());
@@ -247,7 +232,7 @@ mod tests {
247232
#[test]
248233
fn precomputed_sum_aggregation() {
249234
for temporality in [Temporality::Delta, Temporality::Cumulative] {
250-
let (measure, agg) =
235+
let AggregateFns { measure, collect } =
251236
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
252237
let mut a = Sum {
253238
data_points: vec![
@@ -274,7 +259,7 @@ mod tests {
274259
let new_attributes = [KeyValue::new("b", 2)];
275260
measure.call(3, &new_attributes[..]);
276261

277-
let (count, new_agg) = agg.call(Some(&mut a));
262+
let (count, new_agg) = collect.call(Some(&mut a));
278263

279264
assert_eq!(count, 1);
280265
assert!(new_agg.is_none());
@@ -289,7 +274,8 @@ mod tests {
289274
#[test]
290275
fn sum_aggregation() {
291276
for temporality in [Temporality::Delta, Temporality::Cumulative] {
292-
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None).sum(true);
277+
let AggregateFns { measure, collect } =
278+
AggregateBuilder::<u64>::new(temporality, None).sum(true);
293279
let mut a = Sum {
294280
data_points: vec![
295281
SumDataPoint {
@@ -315,7 +301,7 @@ mod tests {
315301
let new_attributes = [KeyValue::new("b", 2)];
316302
measure.call(3, &new_attributes[..]);
317303

318-
let (count, new_agg) = agg.call(Some(&mut a));
304+
let (count, new_agg) = collect.call(Some(&mut a));
319305

320306
assert_eq!(count, 1);
321307
assert!(new_agg.is_none());
@@ -330,7 +316,7 @@ mod tests {
330316
#[test]
331317
fn explicit_bucket_histogram_aggregation() {
332318
for temporality in [Temporality::Delta, Temporality::Cumulative] {
333-
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
319+
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
334320
.explicit_bucket_histogram(vec![1.0], true, true);
335321
let mut a = Histogram {
336322
data_points: vec![HistogramDataPoint {
@@ -354,7 +340,7 @@ mod tests {
354340
let new_attributes = [KeyValue::new("b", 2)];
355341
measure.call(3, &new_attributes[..]);
356342

357-
let (count, new_agg) = agg.call(Some(&mut a));
343+
let (count, new_agg) = collect.call(Some(&mut a));
358344

359345
assert_eq!(count, 1);
360346
assert!(new_agg.is_none());
@@ -373,7 +359,7 @@ mod tests {
373359
#[test]
374360
fn exponential_histogram_aggregation() {
375361
for temporality in [Temporality::Delta, Temporality::Cumulative] {
376-
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
362+
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
377363
.exponential_bucket_histogram(4, 20, true, true);
378364
let mut a = ExponentialHistogram {
379365
data_points: vec![ExponentialHistogramDataPoint {
@@ -406,7 +392,7 @@ mod tests {
406392
let new_attributes = [KeyValue::new("b", 2)];
407393
measure.call(3, &new_attributes[..]);
408394

409-
let (count, new_agg) = agg.call(Some(&mut a));
395+
let (count, new_agg) = collect.call(Some(&mut a));
410396

411397
assert_eq!(count, 1);
412398
assert!(new_agg.is_none());

0 commit comments

Comments
 (0)