Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New InitAggregationData abstraction/trait #2500

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
marker,
mem::replace,
ops::DerefMut,
ops::{Deref, DerefMut},
sync::{Arc, Mutex},
time::SystemTime,
};
Expand Down Expand Up @@ -121,6 +121,65 @@ impl AttributeSetFilter {
}
}

pub(crate) trait InitAggregationData {
type Aggr;
fn create_new(&self, time: AggregateTime) -> Self::Aggr;
fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime);
}

pub(crate) enum AggregationData<'a, Aggr> {
Existing(&'a mut Aggr),
New(Aggr),
}

impl<'a, Aggr> AggregationData<'a, Aggr>
where
Aggr: Aggregation,
{
pub(crate) fn init(
init: &impl InitAggregationData<Aggr = Aggr>,
existing: Option<&'a mut dyn Aggregation>,
time: AggregateTime,
) -> Self {
match existing.and_then(|aggr| aggr.as_mut().downcast_mut::<Aggr>()) {
Some(existing) => {
init.reset_existing(existing, time);
AggregationData::Existing(existing)
}
None => AggregationData::New(init.create_new(time)),
}
}

pub(crate) fn into_new_boxed(self) -> Option<Box<dyn Aggregation>> {
match self {
AggregationData::Existing(_) => None,
AggregationData::New(aggregation) => {
Some(Box::new(aggregation) as Box<dyn Aggregation>)
}
}
}
}

impl<Aggr> Deref for AggregationData<'_, Aggr> {
type Target = Aggr;

fn deref(&self) -> &Self::Target {
match self {
AggregationData::Existing(existing) => existing,
AggregationData::New(new) => new,
}
}
}

impl<Aggr> DerefMut for AggregationData<'_, Aggr> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
AggregationData::Existing(existing) => existing,
AggregationData::New(new) => new,
}
}
}

/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
Expand Down
73 changes: 34 additions & 39 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
};

use super::{
aggregate::{AggregateTimeInitiator, AttributeSetFilter},
aggregate::{
AggregateTime, AggregateTimeInitiator, AggregationData, AttributeSetFilter,
InitAggregationData,
},
Aggregator, ComputeAggregation, Measure, Number, ValueMap,
};

Expand Down Expand Up @@ -384,26 +387,10 @@
}

fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.delta());

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, attr| {
.collect_and_reset(&mut s_data.data_points, |attributes, attr| {
let b = attr.into_inner().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
Expand Down Expand Up @@ -434,33 +421,17 @@
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}

fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.cumulative();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative());

self.value_map
.collect_readonly(&mut h.data_points, |attributes, attr| {
.collect_readonly(&mut s_data.data_points, |attributes, attr| {
let b = attr.lock().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
Expand Down Expand Up @@ -491,7 +462,7 @@
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}
}

Expand Down Expand Up @@ -524,6 +495,30 @@
}
}
}

impl<T> InitAggregationData for ExpoHistogram<T>
where
T: Number,
{
type Aggr = data::ExponentialHistogram<T>;

fn create_new(&self, time: AggregateTime) -> Self::Aggr {
data::ExponentialHistogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: self.temporality,
}
}

Check warning on line 512 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L505-L512

Added lines #L505 - L512 were not covered by tests

fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) {
existing.data_points.clear();
existing.start_time = time.start;
existing.time = time.current;
existing.temporality = self.temporality;
}
}

#[cfg(test)]
mod tests {
use std::{ops::Neg, time::SystemTime};
Expand Down
70 changes: 32 additions & 38 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use crate::metrics::data::{self, Aggregation};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;

use super::aggregate::AggregateTimeInitiator;
use super::aggregate::AttributeSetFilter;
use super::aggregate::{
AggregateTime, AggregateTimeInitiator, AggregationData, InitAggregationData,
};
use super::ComputeAggregation;
use super::Measure;
use super::ValueMap;
Expand Down Expand Up @@ -108,26 +110,10 @@ impl<T: Number> Histogram<T> {
}

fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.delta());

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| {
let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
Expand All @@ -153,32 +139,17 @@ impl<T: Number> Histogram<T> {
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}

fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.cumulative();
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.start_time = time.start;
h.time = time.current;
let mut s_data = AggregationData::init(self, dest, self.init_time.cumulative());

self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
.collect_readonly(&mut s_data.data_points, |attributes, aggr| {
let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
Expand All @@ -204,7 +175,7 @@ impl<T: Number> Histogram<T> {
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
(s_data.data_points.len(), s_data.into_new_boxed())
}
}

Expand Down Expand Up @@ -239,6 +210,29 @@ where
}
}

impl<T> InitAggregationData for Histogram<T>
where
T: Number,
{
type Aggr = data::Histogram<T>;

fn create_new(&self, time: AggregateTime) -> Self::Aggr {
data::Histogram {
data_points: vec![],
start_time: time.start,
time: time.current,
temporality: self.temporality,
}
}

fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime) {
existing.data_points.clear();
existing.start_time = time.start;
existing.time = time.current;
existing.temporality = self.temporality;
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading