Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics aggregate collector generic over temporality #2506

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
fn as_mut(&mut self) -> &mut dyn any::Any;
}

/// Allow to access data points of an [Aggregation].
pub(crate) trait AggregationDataPoints {
/// The type of data point in the aggregation.
type DataPoint;
/// The data points of the aggregation.
fn points(&mut self) -> &mut Vec<Self::DataPoint>;
}

/// DataPoint is a single data point in a time series.
#[derive(Debug, PartialEq)]
pub struct GaugeDataPoint<T> {
Expand Down Expand Up @@ -228,6 +236,14 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
}
}

impl<T> AggregationDataPoints for ExponentialHistogram<T> {
type DataPoint = ExponentialHistogramDataPoint<T>;

fn points(&mut self) -> &mut Vec<Self::DataPoint> {
&mut self.data_points
}
}

/// A single exponential histogram data point in a time series.
#[derive(Debug, PartialEq)]
pub struct ExponentialHistogramDataPoint<T> {
Expand Down
56 changes: 44 additions & 12 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@ use std::{

use opentelemetry::KeyValue;

use crate::metrics::{data::Aggregation, Temporality};
use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
collector::{Collector, CumulativeValueMap, DeltaValueMap},
exponential_histogram::{ExpoHistogram, ExpoHistogramBucketConfig},
histogram::Histogram,
last_value::LastValue,
precomputed_sum::PrecomputedSum,
sum::Sum,
Number,
};

pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;
Expand Down Expand Up @@ -58,6 +66,7 @@ where
}
}

#[derive(Clone, Copy)]
pub(crate) struct AggregateTime {
pub start: SystemTime,
pub current: SystemTime,
Expand Down Expand Up @@ -121,6 +130,12 @@ impl AttributeSetFilter {
}
}

pub(crate) trait InitAggregationData {
type Aggr: Aggregation + AggregationDataPoints;
fn create_new(&self, time: AggregateTime) -> Self::Aggr;
fn reset_existing(&self, existing: &mut Self::Aggr, time: AggregateTime);
}

/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
Expand Down Expand Up @@ -182,15 +197,32 @@ impl<T: Number> AggregateBuilder<T> {
record_min_max: bool,
record_sum: bool,
) -> AggregateFns<T> {
ExpoHistogram::new(
self.temporality,
self.filter.clone(),
max_size,
max_scale,
record_min_max,
record_sum,
)
.into()
match self.temporality {
Temporality::Delta => ExpoHistogram::new(
Collector::new(
self.filter.clone(),
DeltaValueMap::new(ExpoHistogramBucketConfig {
max_size: max_size as i32,
max_scale,
}),
),
record_sum,
record_min_max,
)
.into(),
_ => ExpoHistogram::new(
Collector::new(
self.filter.clone(),
CumulativeValueMap::new(ExpoHistogramBucketConfig {
max_size: max_size as i32,
max_scale,
}),
),
record_sum,
record_min_max,
)
.into(),
}
}
}

Expand Down
198 changes: 198 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
use opentelemetry::KeyValue;

use crate::metrics::{
data::{Aggregation, AggregationDataPoints},
Temporality,
};

use super::{
aggregate::{AggregateTime, AttributeSetFilter},
AggregateTimeInitiator, Aggregator, InitAggregationData, ValueMap,
};

/// Aggregate measurements for attribute sets and collect these aggregates into data points for specific temporality
pub(crate) trait AggregateMap: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP;
}

/// Higher level abstraction (compared to [`AggregateMap`]) that also does the filtering and collection into aggregation data
pub(crate) trait AggregateCollector: Send + Sync + 'static {
const TEMPORALITY: Temporality;
type Aggr: Aggregator;

fn measure(&self, value: <Self::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]);
Copy link
Contributor

@utpilla utpilla Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's getting a bit too confusing with these new coupled traits. Could we separate the concerns here? Could we update Collector trait to only have collect related methods and AggregateMap trait to only have update related methods? You could then keep an impl of both the traits as fields of ExpoHistogram struct.

Copy link
Contributor Author

@fraillt fraillt Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's possible to have this separation, basically this whole things is like an onion.
Starting from the center:

  1. trait AggregateMap - implement the core data structure optimized for updating AND collecting aggregates for specific temporality (e.g. DeltaValueMap, and CumulativeValueMap)
  2. trait AggregateCollector - provides filtering attributes AND collecting impl specific DataPoints into dyn Aggregation (including time initialization for specific temporality). There's only one implementation, - Collector. The reason we have this trait, is because Collector itself is also generic (currently only over trait AggregateMap, but it might also be generic over "aggregate-filter"), which makes trait bounds for implementations to be very verbose). Important property of this implementation is that it has all common code for aggregations.
  3. specific aggregation e.g. (Sum, LastValue, Histogram, etc..) - implements Measure and ComputeAggregation traits, which are used by SDK. Also implements InitAggregationData which is used by AggregateCollector to make whole collection phase reusable.

The thing that I like about this is that 1) and 2) (e.g. impls for trait AggregateMap and trait AggregateCollector ) is common code for absolutely all aggregates. The only aggregate specific logic is in three traits: Measure, ComputeAggregation, InitAggregationData.

Regarding splitting...:

  • trait AggregateMap cannot be split, because it's implementation has optimized internal structure that only it knows how to update and collect
  • since trait AggregateCollector depends on trait AggregateMap, it also cannot be split into few instances.
  • same fields within specific implementations can be used in measure and collect phases, as well. (e.g. Histogram::bounds field), so splitting concrete implementation not optimal as well.
  • so the AggregateFns implementation is probably most efficient way to do it. (e.g. Arc and clone, so one instance implements Measure,- another ComputeAggregation).

BTW, I agree that this gets a bit confusing, but I think better naming would solve the problem here... but that's the hardest part :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mm... maybe I have one more idea.
Instead of having specific aggregation "the final thing" (which implements Measure and ComputeAggregation). I could make that Collector is the final thing, and specific aggregations would have a trait which will be used to provide specific functionality...
I will probably create separate revision for that, (so we could compare both at the same time).
Ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2530 Same thing,- new design.
IMO it turned out very good, I like this new design much more.
If you have same opinion, then we can close this PR.


fn collect<InitAggregate, F>(
&self,
aggregate: &InitAggregate,
dest: Option<&mut dyn Aggregation>,
create_point: F,
) -> (usize, Option<Box<dyn Aggregation>>)
where
InitAggregate: InitAggregationData,
F: FnMut(
Vec<KeyValue>,
&Self::Aggr,
) -> <InitAggregate::Aggr as AggregationDataPoints>::DataPoint;
}

pub(crate) struct Collector<AM> {
filter: AttributeSetFilter,
aggregate_map: AM,
time: AggregateTimeInitiator,
}

impl<AM> Collector<AM>
where
AM: AggregateMap,
{
pub(crate) fn new(filter: AttributeSetFilter, aggregate_map: AM) -> Self {
Self {
filter,
aggregate_map,
time: AggregateTimeInitiator::default(),
}
}

fn init_time(&self) -> AggregateTime {
if let Temporality::Delta = AM::TEMPORALITY {
self.time.delta()
} else {
self.time.cumulative()
}
}
}

impl<AM> AggregateCollector for Collector<AM>
where
AM: AggregateMap,
{
const TEMPORALITY: Temporality = AM::TEMPORALITY;

type Aggr = AM::Aggr;

fn measure(&self, value: <AM::Aggr as Aggregator>::PreComputedValue, attributes: &[KeyValue]) {
self.filter.apply(attributes, |filtered_attrs| {
self.aggregate_map.measure(value, filtered_attrs);
});
}

fn collect<InitAggregate, F>(
&self,
aggregate: &InitAggregate,
dest: Option<&mut dyn Aggregation>,
create_point: F,
) -> (usize, Option<Box<dyn Aggregation>>)
where
InitAggregate: InitAggregationData,
F: FnMut(
Vec<KeyValue>,
&AM::Aggr,
) -> <InitAggregate::Aggr as AggregationDataPoints>::DataPoint,
{
let time = self.init_time();
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<InitAggregate::Aggr>());
let mut new_agg = if s_data.is_none() {
Some(aggregate.create_new(time))

Check warning on line 103 in opentelemetry-sdk/src/metrics/internal/collector.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/collector.rs#L103

Added line #L103 was not covered by tests
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
aggregate.reset_existing(s_data, time);
self.aggregate_map
.collect_data_points(s_data.points(), create_point);

(
s_data.points().len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Delta temporality
/// Later this could be improved to support only Delta temporality
pub(crate) struct DeltaValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> DeltaValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for DeltaValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Delta;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0
.collect_and_reset(dest, |attributes, aggr| map_fn(attributes, &aggr));
}
}

/// At the moment use [`ValueMap`] under the hood (which support both Delta and Cumulative), to implement `AggregateMap` for Cumulative temporality
/// Later this could be improved to support only Cumulative temporality
pub(crate) struct CumulativeValueMap<A>(ValueMap<A>)
where
A: Aggregator;

impl<A> CumulativeValueMap<A>
where
A: Aggregator,
{
pub(crate) fn new(config: A::InitConfig) -> Self {
Self(ValueMap::new(config))
}
}

impl<A> AggregateMap for CumulativeValueMap<A>
where
A: Aggregator,
<A as Aggregator>::InitConfig: Send + Sync,
{
const TEMPORALITY: Temporality = Temporality::Cumulative;

type Aggr = A;

fn measure(
&self,
value: <Self::Aggr as Aggregator>::PreComputedValue,
attributes: &[KeyValue],
) {
self.0.measure(value, attributes);
}

fn collect_data_points<DP, MapFn>(&self, dest: &mut Vec<DP>, map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &Self::Aggr) -> DP,
{
self.0.collect_readonly(dest, map_fn);
}
}
Loading
Loading