From 242b9f4cc8a7f086c2cd45aef238e077308380a7 Mon Sep 17 00:00:00 2001 From: Matthew Shapiro Date: Fri, 9 Feb 2024 19:05:16 +0000 Subject: [PATCH 1/4] experiment gets 2ns, requires test fixes --- opentelemetry-sdk/src/metrics/instrument.rs | 20 +++++--- .../src/metrics/internal/aggregate.rs | 7 +-- opentelemetry-sdk/src/metrics/internal/mod.rs | 2 +- opentelemetry-sdk/src/metrics/internal/sum.rs | 51 +++++++++---------- opentelemetry-sdk/src/metrics/meter.rs | 15 +++++- opentelemetry-sdk/src/metrics/pipeline.rs | 19 ++++--- 6 files changed, 65 insertions(+), 49 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 3ecae355b5..1fba3d1564 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -13,6 +13,7 @@ use crate::{ instrumentation::Scope, metrics::{aggregation::Aggregation, internal::Measure}, }; +use crate::metrics::internal::{AtomicTracker, Number}; pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument"; @@ -254,19 +255,24 @@ impl InstrumentId { } } -pub(crate) struct ResolvedMeasures { +pub(crate) struct ResolvedMeasures> { pub(crate) measures: Vec>>, + pub(crate) no_attribute_value: Arc } -impl SyncCounter for ResolvedMeasures { +impl> SyncCounter for ResolvedMeasures { fn add(&self, val: T, attrs: &[KeyValue]) { - for measure in &self.measures { - measure.call(val, AttributeSet::from(attrs)) + if attrs.is_empty() { + self.no_attribute_value.add(val); + } else { + for measure in &self.measures { + measure.call(val, AttributeSet::from(attrs)) + } } } } -impl SyncUpDownCounter for ResolvedMeasures { +impl> SyncUpDownCounter for ResolvedMeasures { fn add(&self, val: T, attrs: &[KeyValue]) { for measure in &self.measures { measure.call(val, AttributeSet::from(attrs)) @@ -274,7 +280,7 @@ impl SyncUpDownCounter for ResolvedMeasures { } } -impl SyncGauge for ResolvedMeasures { +impl> SyncGauge for ResolvedMeasures { fn record(&self, val: T, attrs: &[KeyValue]) { for measure in &self.measures { measure.call(val, AttributeSet::from(attrs)) @@ -282,7 +288,7 @@ impl SyncGauge for ResolvedMeasures { } } -impl SyncHistogram for ResolvedMeasures { +impl> SyncHistogram for ResolvedMeasures { fn record(&self, val: T, attrs: &[KeyValue]) { for measure in &self.measures { measure.call(val, AttributeSet::from(attrs)) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 08d6feec04..af25a6b80d 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -150,8 +150,8 @@ impl> AggregateBuilder { } /// Builds a sum aggregate function input and output. - pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(Sum::new(monotonic)); + pub(crate) fn sum(&self, monotonic: bool, no_attribute_value: Arc) -> (impl Measure, impl ComputeAggregation) { + let s = Arc::new(Sum::new(monotonic, no_attribute_value)); let agg_sum = Arc::clone(&s); let t = self.temporality; @@ -213,6 +213,7 @@ impl> AggregateBuilder { #[cfg(test)] mod tests { + use std::sync::atomic::AtomicU64; use crate::metrics::data::{ DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Histogram, HistogramDataPoint, Sum, @@ -298,7 +299,7 @@ mod tests { #[test] fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None).sum(true); + let (measure, agg) = AggregateBuilder::::new(Some(temporality), None).sum(true, Arc::new(AtomicU64::new(0))); let mut a = Sum { data_points: vec![ DataPoint { diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 92bc3d947f..1a9f3656db 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -138,7 +138,7 @@ pub(crate) struct F64AtomicTracker { } impl F64AtomicTracker { - fn new() -> Self { + pub(crate) fn new() -> Self { F64AtomicTracker { inner: Mutex::new(0.0), } diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 83a6b07858..289fc6235a 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -4,6 +4,7 @@ use std::{ sync::Mutex, time::SystemTime, }; +use std::sync::Arc; use crate::attributes::AttributeSet; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; @@ -18,21 +19,15 @@ use super::{ struct ValueMap> { values: Mutex>, has_no_value_attribute_value: AtomicBool, - no_attribute_value: T::AtomicTracker, -} - -impl> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } + no_attribute_value: Arc, } impl> ValueMap { - fn new() -> Self { + fn new(no_attribute_value: Arc) -> Self { ValueMap { values: Mutex::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), - no_attribute_value: T::new_atomic_tracker(), + no_attribute_value, } } } @@ -79,9 +74,9 @@ impl> Sum { /// /// Each sum is scoped by attributes and the aggregation cycle the measurements /// were made in. - pub(crate) fn new(monotonic: bool) -> Self { + pub(crate) fn new(monotonic: bool, no_attribute_value: Arc) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(no_attribute_value), monotonic, start: Mutex::new(SystemTime::now()), } @@ -125,10 +120,10 @@ impl> Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_value_attribute_value - .swap(false, Ordering::AcqRel) + // if self + // .value_map + // .has_no_value_attribute_value + // .swap(false, Ordering::AcqRel) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), @@ -195,10 +190,10 @@ impl> Sum { let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_value_attribute_value - .load(Ordering::Acquire) + // if self + // .value_map + // .has_no_value_attribute_value + // .load(Ordering::Acquire) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), @@ -241,7 +236,7 @@ pub(crate) struct PrecomputedSum> { impl> PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(Arc::new(T::new_atomic_tracker())), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -291,10 +286,10 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - if self - .value_map - .has_no_value_attribute_value - .swap(false, Ordering::AcqRel) + // if self + // .value_map + // .has_no_value_attribute_value + // .swap(false, Ordering::AcqRel) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), @@ -373,10 +368,10 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - if self - .value_map - .has_no_value_attribute_value - .load(Ordering::Acquire) + // if self + // .value_map + // .has_no_value_attribute_value + // .load(Ordering::Acquire) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index c801adcb0a..9ae957e81b 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -20,6 +20,7 @@ use crate::metrics::{ internal::{self, Number}, pipeline::{Pipelines, Resolver}, }; +use crate::metrics::internal::{AtomicallyUpdate, F64AtomicTracker}; // maximum length of instrument name const INSTRUMENT_NAME_MAX_LENGTH: usize = 255; @@ -128,6 +129,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(u64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new()))); @@ -165,6 +167,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(F64AtomicTracker::new()), )?; if ms.is_empty() { return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new()))); @@ -235,6 +238,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(i64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableUpDownCounter::new(Arc::new( @@ -274,6 +278,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(F64AtomicTracker::new()), )?; if ms.is_empty() { return Ok(ObservableUpDownCounter::new(Arc::new( @@ -364,6 +369,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(u64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -401,6 +407,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(i64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -438,6 +445,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(f64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -733,9 +741,11 @@ where description: Option>, unit: Unit, ) -> Result> { - let aggregators = self.measures(kind, name, description, unit)?; + let no_attribute_value = Arc::new(T::new_atomic_tracker()); + let aggregators = self.measures(kind, name, description, unit, no_attribute_value.clone())?; Ok(ResolvedMeasures { measures: aggregators, + no_attribute_value, }) } @@ -745,6 +755,7 @@ where name: Cow<'static, str>, description: Option>, unit: Unit, + no_attribute_value: Arc, ) -> Result>>> { let inst = Instrument { name, @@ -754,7 +765,7 @@ where scope: self.meter.scope.clone(), }; - self.resolve.measures(inst) + self.resolve.measures(inst, no_attribute_value) } } diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index be0dde9736..377b413ebf 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -265,7 +265,7 @@ where /// /// If an instrument is determined to use a [aggregation::Aggregation::Drop], /// that instrument is not inserted nor returned. - fn instrument(&self, inst: Instrument) -> Result>>> { + fn instrument(&self, inst: Instrument, no_attribute_value: Arc) -> Result>>> { let mut matched = false; let mut measures = vec![]; let mut errs = vec![]; @@ -288,7 +288,7 @@ where continue; // This aggregator has already been added } - let agg = match self.cached_aggregator(&inst.scope, kind, stream) { + let agg = match self.cached_aggregator(&inst.scope, kind, stream, no_attribute_value.clone()) { Ok(Some(agg)) => agg, Ok(None) => continue, // Drop aggregator. Err(err) => { @@ -317,7 +317,7 @@ where allowed_attribute_keys: None, }; - match self.cached_aggregator(&inst.scope, kind, stream) { + match self.cached_aggregator(&inst.scope, kind, stream, no_attribute_value) { Ok(agg) => { if errs.is_empty() { if let Some(agg) = agg { @@ -353,6 +353,7 @@ where scope: &Scope, kind: InstrumentKind, mut stream: Stream, + no_attribute_value: Arc, ) -> Result>>> { let mut agg = stream .aggregation @@ -391,7 +392,7 @@ where .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>); let b = AggregateBuilder::new(Some(self.pipeline.reader.temporality(kind)), filter); - let (m, ca) = match aggregate_fn(b, &agg, kind) { + let (m, ca) = match aggregate_fn(b, &agg, kind, no_attribute_value) { Ok(Some((m, ca))) => (m, ca), other => return other.map(|fs| fs.map(|(m, _)| m)), // Drop aggregator or error }; @@ -460,6 +461,7 @@ fn aggregate_fn>( b: AggregateBuilder, agg: &aggregation::Aggregation, kind: InstrumentKind, + no_attribute_value: Arc, ) -> Result>> { use aggregation::Aggregation; fn box_val( @@ -476,6 +478,7 @@ fn aggregate_fn>( b, &DefaultAggregationSelector::new().aggregation(kind), kind, + no_attribute_value, ), Aggregation::Drop => Ok(None), Aggregation::LastValue => Ok(Some(box_val(b.last_value()))), @@ -483,8 +486,8 @@ fn aggregate_fn>( let fns = match kind { InstrumentKind::ObservableCounter => box_val(b.precomputed_sum(true)), InstrumentKind::ObservableUpDownCounter => box_val(b.precomputed_sum(false)), - InstrumentKind::Counter | InstrumentKind::Histogram => box_val(b.sum(true)), - _ => box_val(b.sum(false)), + InstrumentKind::Counter | InstrumentKind::Histogram => box_val(b.sum(true, no_attribute_value)), + _ => box_val(b.sum(false, no_attribute_value)), }; Ok(Some(fns)) } @@ -718,11 +721,11 @@ where } /// The measures that must be updated by the instrument defined by key. - pub(crate) fn measures(&self, id: Instrument) -> Result>>> { + pub(crate) fn measures(&self, id: Instrument, no_attribute_value: Arc) -> Result>>> { let (mut measures, mut errs) = (vec![], vec![]); for inserter in &self.inserters { - match inserter.instrument(id.clone()) { + match inserter.instrument(id.clone(), no_attribute_value.clone()) { Ok(ms) => measures.extend(ms), Err(err) => errs.push(err), } From 8c3d9c12e0fb894c6a4851167c803a1f3015c73b Mon Sep 17 00:00:00 2001 From: Matthew Shapiro Date: Fri, 16 Feb 2024 17:06:10 -0500 Subject: [PATCH 2/4] Refactored concept of atomic tracker to know when it's been updated --- opentelemetry-sdk/src/metrics/instrument.rs | 4 +- .../src/metrics/internal/aggregate.rs | 13 +- opentelemetry-sdk/src/metrics/internal/mod.rs | 130 +++++++++++------- opentelemetry-sdk/src/metrics/internal/sum.rs | 44 ++---- opentelemetry-sdk/src/metrics/meter.rs | 11 +- opentelemetry-sdk/src/metrics/pipeline.rs | 39 ++++-- 6 files changed, 140 insertions(+), 101 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 1fba3d1564..a898105bad 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -8,12 +8,12 @@ use opentelemetry::{ Key, KeyValue, }; +use crate::metrics::internal::{AtomicTracker, Number}; use crate::{ attributes::AttributeSet, instrumentation::Scope, metrics::{aggregation::Aggregation, internal::Measure}, }; -use crate::metrics::internal::{AtomicTracker, Number}; pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument"; @@ -257,7 +257,7 @@ impl InstrumentId { pub(crate) struct ResolvedMeasures> { pub(crate) measures: Vec>>, - pub(crate) no_attribute_value: Arc + pub(crate) no_attribute_value: Arc>, } impl> SyncCounter for ResolvedMeasures { diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index af25a6b80d..af95ee91c7 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -13,7 +13,7 @@ use super::{ histogram::Histogram, last_value::LastValue, sum::{PrecomputedSum, Sum}, - Number, + AtomicTracker, Number, }; const STREAM_CARDINALITY_LIMIT: u32 = 2000; @@ -150,7 +150,11 @@ impl> AggregateBuilder { } /// Builds a sum aggregate function input and output. - pub(crate) fn sum(&self, monotonic: bool, no_attribute_value: Arc) -> (impl Measure, impl ComputeAggregation) { + pub(crate) fn sum( + &self, + monotonic: bool, + no_attribute_value: Arc>, + ) -> (impl Measure, impl ComputeAggregation) { let s = Arc::new(Sum::new(monotonic, no_attribute_value)); let agg_sum = Arc::clone(&s); let t = self.temporality; @@ -213,11 +217,11 @@ impl> AggregateBuilder { #[cfg(test)] mod tests { - use std::sync::atomic::AtomicU64; use crate::metrics::data::{ DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Histogram, HistogramDataPoint, Sum, }; + use crate::metrics::internal::AtomicallyUpdate; use std::time::SystemTime; use super::*; @@ -299,7 +303,8 @@ mod tests { #[test] fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None).sum(true, Arc::new(AtomicU64::new(0))); + let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + .sum(true, Arc::new(u64::new_atomic_tracker())); let mut a = Sum { data_points: vec![ DataPoint { diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 1a9f3656db..4a1ddfbda5 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -5,8 +5,9 @@ mod last_value; mod sum; use core::fmt; +use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Mutex; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; @@ -14,16 +15,22 @@ pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms -pub(crate) trait AtomicTracker: Sync + Send + 'static { +pub(crate) trait AtomicValue: Sync + Send + 'static { fn add(&self, value: T); - fn get_value(&self) -> T; - fn get_and_reset_value(&self) -> T; + fn get_value(&self, reset: bool) -> T; +} + +/// Keeps track if an atomic value has had a value set since the last reset +pub(crate) struct AtomicTracker> { + value: T, + has_value: AtomicBool, + _number: PhantomData, // Required for the N generic to be considered used } /// Marks a type that can have an atomic tracker generated for it pub(crate) trait AtomicallyUpdate { - type AtomicTracker: AtomicTracker; - fn new_atomic_tracker() -> Self::AtomicTracker; + type AtomicValue: AtomicValue; + fn new_atomic_tracker() -> AtomicTracker; } pub(crate) trait Number: @@ -89,47 +96,47 @@ impl Number for f64 { } } -impl AtomicTracker for AtomicU64 { +impl AtomicValue for AtomicU64 { fn add(&self, value: u64) { self.fetch_add(value, Ordering::Relaxed); } - fn get_value(&self) -> u64 { - self.load(Ordering::Relaxed) - } - - fn get_and_reset_value(&self) -> u64 { - self.swap(0, Ordering::Relaxed) + fn get_value(&self, reset: bool) -> u64 { + if reset { + self.swap(0, Ordering::Relaxed) + } else { + self.load(Ordering::Relaxed) + } } } impl AtomicallyUpdate for u64 { - type AtomicTracker = AtomicU64; + type AtomicValue = AtomicU64; - fn new_atomic_tracker() -> Self::AtomicTracker { - AtomicU64::new(0) + fn new_atomic_tracker() -> AtomicTracker { + AtomicTracker::new(AtomicU64::new(0)) } } -impl AtomicTracker for AtomicI64 { +impl AtomicValue for AtomicI64 { fn add(&self, value: i64) { self.fetch_add(value, Ordering::Relaxed); } - fn get_value(&self) -> i64 { - self.load(Ordering::Relaxed) - } - - fn get_and_reset_value(&self) -> i64 { - self.swap(0, Ordering::Relaxed) + fn get_value(&self, reset: bool) -> i64 { + if reset { + self.swap(0, Ordering::Relaxed) + } else { + self.load(Ordering::Relaxed) + } } } impl AtomicallyUpdate for i64 { - type AtomicTracker = AtomicI64; + type AtomicValue = AtomicI64; - fn new_atomic_tracker() -> Self::AtomicTracker { - AtomicI64::new(0) + fn new_atomic_tracker() -> AtomicTracker { + AtomicTracker::new(AtomicI64::new(0)) } } @@ -145,31 +152,58 @@ impl F64AtomicTracker { } } -impl AtomicTracker for F64AtomicTracker { +impl AtomicValue for F64AtomicTracker { fn add(&self, value: f64) { let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); *guard += value; } - fn get_value(&self) -> f64 { - let guard = self.inner.lock().expect("F64 mutex was poisoned"); - *guard + fn get_value(&self, reset: bool) -> f64 { + let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); + if reset { + let value = *guard; + *guard = 0.0; + value + } else { + *guard + } } +} - fn get_and_reset_value(&self) -> f64 { - let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); - let value = *guard; - *guard = 0.0; +impl AtomicallyUpdate for f64 { + type AtomicValue = F64AtomicTracker; - value + fn new_atomic_tracker() -> AtomicTracker { + AtomicTracker::new(F64AtomicTracker::new()) } } -impl AtomicallyUpdate for f64 { - type AtomicTracker = F64AtomicTracker; +impl> AtomicTracker { + fn new(value: T) -> Self { + AtomicTracker { + value, + has_value: AtomicBool::new(false), + _number: PhantomData::default(), + } + } + + pub(crate) fn add(&self, value: N) { + self.value.add(value); + self.has_value.store(true, Ordering::Release); + } - fn new_atomic_tracker() -> Self::AtomicTracker { - F64AtomicTracker::new() + pub(crate) fn get_value(&self, reset: bool) -> Option { + let has_value = if reset { + self.has_value.swap(false, Ordering::AcqRel) + } else { + self.has_value.load(Ordering::Acquire) + }; + + if has_value { + Some(self.value.get_value(reset)) + } else { + None + } } } @@ -183,7 +217,7 @@ mod tests { atomic.add(15); atomic.add(10); - let value = atomic.get_value(); + let value = atomic.get_value(false).unwrap(); assert_eq!(value, 25); } @@ -192,8 +226,8 @@ mod tests { let atomic = u64::new_atomic_tracker(); atomic.add(15); - let value = atomic.get_and_reset_value(); - let value2 = atomic.get_value(); + let value = atomic.get_value(true).unwrap(); + let value2 = atomic.get_value(false).unwrap(); assert_eq!(value, 15, "Incorrect first value"); assert_eq!(value2, 0, "Incorrect second value"); @@ -205,7 +239,7 @@ mod tests { atomic.add(15); atomic.add(-10); - let value = atomic.get_value(); + let value = atomic.get_value(false).unwrap(); assert_eq!(value, 5); } @@ -214,8 +248,8 @@ mod tests { let atomic = i64::new_atomic_tracker(); atomic.add(15); - let value = atomic.get_and_reset_value(); - let value2 = atomic.get_value(); + let value = atomic.get_value(true).unwrap(); + let value2 = atomic.get_value(false).unwrap(); assert_eq!(value, 15, "Incorrect first value"); assert_eq!(value2, 0, "Incorrect second value"); @@ -227,7 +261,7 @@ mod tests { atomic.add(15.3); atomic.add(10.4); - let value = atomic.get_value(); + let value = atomic.get_value(false).unwrap(); assert!(f64::abs(25.7 - value) < 0.0001); } @@ -237,8 +271,8 @@ mod tests { let atomic = f64::new_atomic_tracker(); atomic.add(15.5); - let value = atomic.get_and_reset_value(); - let value2 = atomic.get_value(); + let value = atomic.get_value(true).unwrap(); + let value2 = atomic.get_value(false).unwrap(); assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value"); assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value"); diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 289fc6235a..4df59d9d2e 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,10 +1,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::{ collections::{hash_map::Entry, HashMap}, sync::Mutex, time::SystemTime, }; -use std::sync::Arc; use crate::attributes::AttributeSet; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; @@ -19,11 +19,11 @@ use super::{ struct ValueMap> { values: Mutex>, has_no_value_attribute_value: AtomicBool, - no_attribute_value: Arc, + no_attribute_value: Arc>, } impl> ValueMap { - fn new(no_attribute_value: Arc) -> Self { + fn new(no_attribute_value: Arc>) -> Self { ValueMap { values: Mutex::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), @@ -74,7 +74,10 @@ impl> Sum { /// /// Each sum is scoped by attributes and the aggregation cycle the measurements /// were made in. - pub(crate) fn new(monotonic: bool, no_attribute_value: Arc) -> Self { + pub(crate) fn new( + monotonic: bool, + no_attribute_value: Arc>, + ) -> Self { Sum { value_map: ValueMap::new(no_attribute_value), monotonic, @@ -120,16 +123,12 @@ impl> Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - // if self - // .value_map - // .has_no_value_attribute_value - // .swap(false, Ordering::AcqRel) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(true) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_and_reset_value(), + value, exemplars: vec![], }); } @@ -189,17 +188,12 @@ impl> Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - // if self - // .value_map - // .has_no_value_attribute_value - // .load(Ordering::Acquire) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(false) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_value(), + value, exemplars: vec![], }); } @@ -286,16 +280,12 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - // if self - // .value_map - // .has_no_value_attribute_value - // .swap(false, Ordering::AcqRel) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(true) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_and_reset_value(), + value, exemplars: vec![], }); } @@ -368,16 +358,12 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - // if self - // .value_map - // .has_no_value_attribute_value - // .load(Ordering::Acquire) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(false) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_value(), + value, exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index 9ae957e81b..d04076281d 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -13,6 +13,7 @@ use opentelemetry::{ }; use crate::instrumentation::Scope; +use crate::metrics::internal::{AtomicTracker, AtomicallyUpdate}; use crate::metrics::{ instrument::{ Instrument, InstrumentKind, Observable, ObservableId, ResolvedMeasures, EMPTY_MEASURE_MSG, @@ -20,7 +21,6 @@ use crate::metrics::{ internal::{self, Number}, pipeline::{Pipelines, Resolver}, }; -use crate::metrics::internal::{AtomicallyUpdate, F64AtomicTracker}; // maximum length of instrument name const INSTRUMENT_NAME_MAX_LENGTH: usize = 255; @@ -167,7 +167,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), - Arc::new(F64AtomicTracker::new()), + Arc::new(f64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new()))); @@ -278,7 +278,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), - Arc::new(F64AtomicTracker::new()), + Arc::new(f64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableUpDownCounter::new(Arc::new( @@ -742,7 +742,8 @@ where unit: Unit, ) -> Result> { let no_attribute_value = Arc::new(T::new_atomic_tracker()); - let aggregators = self.measures(kind, name, description, unit, no_attribute_value.clone())?; + let aggregators = + self.measures(kind, name, description, unit, no_attribute_value.clone())?; Ok(ResolvedMeasures { measures: aggregators, no_attribute_value, @@ -755,7 +756,7 @@ where name: Cow<'static, str>, description: Option>, unit: Unit, - no_attribute_value: Arc, + no_attribute_value: Arc>, ) -> Result>>> { let inst = Instrument { name, diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 377b413ebf..2fcdf76e2f 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -11,6 +11,7 @@ use opentelemetry::{ KeyValue, }; +use crate::metrics::internal::AtomicTracker; use crate::{ instrumentation::Scope, metrics::{ @@ -265,7 +266,11 @@ where /// /// If an instrument is determined to use a [aggregation::Aggregation::Drop], /// that instrument is not inserted nor returned. - fn instrument(&self, inst: Instrument, no_attribute_value: Arc) -> Result>>> { + fn instrument( + &self, + inst: Instrument, + no_attribute_value: Arc>, + ) -> Result>>> { let mut matched = false; let mut measures = vec![]; let mut errs = vec![]; @@ -288,14 +293,16 @@ where continue; // This aggregator has already been added } - let agg = match self.cached_aggregator(&inst.scope, kind, stream, no_attribute_value.clone()) { - Ok(Some(agg)) => agg, - Ok(None) => continue, // Drop aggregator. - Err(err) => { - errs.push(err); - continue; - } - }; + let agg = + match self.cached_aggregator(&inst.scope, kind, stream, no_attribute_value.clone()) + { + Ok(Some(agg)) => agg, + Ok(None) => continue, // Drop aggregator. + Err(err) => { + errs.push(err); + continue; + } + }; seen.insert(id); measures.push(agg); } @@ -353,7 +360,7 @@ where scope: &Scope, kind: InstrumentKind, mut stream: Stream, - no_attribute_value: Arc, + no_attribute_value: Arc>, ) -> Result>>> { let mut agg = stream .aggregation @@ -461,7 +468,7 @@ fn aggregate_fn>( b: AggregateBuilder, agg: &aggregation::Aggregation, kind: InstrumentKind, - no_attribute_value: Arc, + no_attribute_value: Arc>, ) -> Result>> { use aggregation::Aggregation; fn box_val( @@ -486,7 +493,9 @@ fn aggregate_fn>( let fns = match kind { InstrumentKind::ObservableCounter => box_val(b.precomputed_sum(true)), InstrumentKind::ObservableUpDownCounter => box_val(b.precomputed_sum(false)), - InstrumentKind::Counter | InstrumentKind::Histogram => box_val(b.sum(true, no_attribute_value)), + InstrumentKind::Counter | InstrumentKind::Histogram => { + box_val(b.sum(true, no_attribute_value)) + } _ => box_val(b.sum(false, no_attribute_value)), }; Ok(Some(fns)) @@ -721,7 +730,11 @@ where } /// The measures that must be updated by the instrument defined by key. - pub(crate) fn measures(&self, id: Instrument, no_attribute_value: Arc) -> Result>>> { + pub(crate) fn measures( + &self, + id: Instrument, + no_attribute_value: Arc>, + ) -> Result>>> { let (mut measures, mut errs) = (vec![], vec![]); for inserter in &self.inserters { From 056a195e6f576deb877fe8598b1105d20a532cef Mon Sep 17 00:00:00 2001 From: Matthew Shapiro Date: Tue, 20 Feb 2024 09:32:01 -0500 Subject: [PATCH 3/4] Finish --- opentelemetry-sdk/src/metrics/internal/mod.rs | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 4a1ddfbda5..3ef23b35f2 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -140,19 +140,19 @@ impl AtomicallyUpdate for i64 { } } -pub(crate) struct F64AtomicTracker { +pub(crate) struct F64AtomicValue { inner: Mutex, // Floating points don't have true atomics, so we need to use mutex for them } -impl F64AtomicTracker { +impl F64AtomicValue { pub(crate) fn new() -> Self { - F64AtomicTracker { + F64AtomicValue { inner: Mutex::new(0.0), } } } -impl AtomicValue for F64AtomicTracker { +impl AtomicValue for F64AtomicValue { fn add(&self, value: f64) { let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); *guard += value; @@ -171,10 +171,10 @@ impl AtomicValue for F64AtomicTracker { } impl AtomicallyUpdate for f64 { - type AtomicValue = F64AtomicTracker; + type AtomicValue = F64AtomicValue; fn new_atomic_tracker() -> AtomicTracker { - AtomicTracker::new(F64AtomicTracker::new()) + AtomicTracker::new(F64AtomicValue::new()) } } @@ -188,6 +188,15 @@ impl> AtomicTracker { } pub(crate) fn add(&self, value: N) { + // Technically we lose atomicity from using 2 atomics. However, the `add()` is specifically + // designed mutate the value *then* set `has_value`, while the `get_value()` is designed + // to read `has_value` *then* read the value. This means that in a worst case race + // condition, the value added may get picked up from a previous `get_value()` call, but + // the `has_value` being true will be picked up in the next `get_value()` call. This really + // should only mean that the first export gets the added value, and the 2nd export will + // get a 0 value. + // + // This doesn't seem like a big deal, and we avoid the cost of locking. self.value.add(value); self.has_value.store(true, Ordering::Release); } @@ -226,11 +235,11 @@ mod tests { let atomic = u64::new_atomic_tracker(); atomic.add(15); - let value = atomic.get_value(true).unwrap(); - let value2 = atomic.get_value(false).unwrap(); + let value = atomic.get_value(true); + let value2 = atomic.get_value(false); - assert_eq!(value, 15, "Incorrect first value"); - assert_eq!(value2, 0, "Incorrect second value"); + assert_eq!(value, Some(15), "Incorrect first value"); + assert_eq!(value2, None, "Incorrect second value"); } #[test] @@ -248,11 +257,11 @@ mod tests { let atomic = i64::new_atomic_tracker(); atomic.add(15); - let value = atomic.get_value(true).unwrap(); - let value2 = atomic.get_value(false).unwrap(); + let value = atomic.get_value(true); + let value2 = atomic.get_value(false); - assert_eq!(value, 15, "Incorrect first value"); - assert_eq!(value2, 0, "Incorrect second value"); + assert_eq!(value, Some(15), "Incorrect first value"); + assert_eq!(value2, None, "Incorrect second value"); } #[test] @@ -272,9 +281,9 @@ mod tests { atomic.add(15.5); let value = atomic.get_value(true).unwrap(); - let value2 = atomic.get_value(false).unwrap(); + let value2 = atomic.get_value(false); assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value"); - assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value"); + assert_eq!(value2, None, "Expected no value from second get_value call"); } } From 4e1cc8eb64500d5487029112ea10a9849705c6a6 Mon Sep 17 00:00:00 2001 From: Matthew Shapiro Date: Tue, 20 Feb 2024 09:49:50 -0500 Subject: [PATCH 4/4] Clippy lint fix --- opentelemetry-sdk/src/metrics/internal/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 3ef23b35f2..bcb33521ce 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -183,7 +183,7 @@ impl> AtomicTracker { AtomicTracker { value, has_value: AtomicBool::new(false), - _number: PhantomData::default(), + _number: PhantomData, } }