diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 3ecae355b5..a898105bad 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -8,6 +8,7 @@ use opentelemetry::{ Key, KeyValue, }; +use crate::metrics::internal::{AtomicTracker, Number}; use crate::{ attributes::AttributeSet, instrumentation::Scope, @@ -254,19 +255,24 @@ impl InstrumentId { } } -pub(crate) struct ResolvedMeasures { +pub(crate) struct ResolvedMeasures> { pub(crate) measures: Vec>>, + pub(crate) no_attribute_value: Arc>, } -impl SyncCounter for ResolvedMeasures { +impl> SyncCounter for ResolvedMeasures { fn add(&self, val: T, attrs: &[KeyValue]) { - for measure in &self.measures { - measure.call(val, AttributeSet::from(attrs)) + if attrs.is_empty() { + self.no_attribute_value.add(val); + } else { + for measure in &self.measures { + measure.call(val, AttributeSet::from(attrs)) + } } } } -impl SyncUpDownCounter for ResolvedMeasures { +impl> SyncUpDownCounter for ResolvedMeasures { fn add(&self, val: T, attrs: &[KeyValue]) { for measure in &self.measures { measure.call(val, AttributeSet::from(attrs)) @@ -274,7 +280,7 @@ impl SyncUpDownCounter for ResolvedMeasures { } } -impl SyncGauge for ResolvedMeasures { +impl> SyncGauge for ResolvedMeasures { fn record(&self, val: T, attrs: &[KeyValue]) { for measure in &self.measures { measure.call(val, AttributeSet::from(attrs)) @@ -282,7 +288,7 @@ impl SyncGauge for ResolvedMeasures { } } -impl SyncHistogram for ResolvedMeasures { +impl> SyncHistogram for ResolvedMeasures { fn record(&self, val: T, attrs: &[KeyValue]) { for measure in &self.measures { measure.call(val, AttributeSet::from(attrs)) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index ad923fea91..2c019cd79c 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -13,7 +13,7 @@ use super::{ histogram::Histogram, last_value::LastValue, sum::{PrecomputedSum, Sum}, - Number, + AtomicTracker, Number, }; const STREAM_CARDINALITY_LIMIT: u32 = 2000; @@ -150,8 +150,12 @@ impl> AggregateBuilder { } /// Builds a sum aggregate function input and output. - pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure, impl ComputeAggregation) { - let s = Arc::new(Sum::new(monotonic)); + pub(crate) fn sum( + &self, + monotonic: bool, + no_attribute_value: Arc>, + ) -> (impl Measure, impl ComputeAggregation) { + let s = Arc::new(Sum::new(monotonic, no_attribute_value)); let agg_sum = Arc::clone(&s); let t = self.temporality; @@ -217,6 +221,7 @@ mod tests { DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Histogram, HistogramDataPoint, Sum, }; + use crate::metrics::internal::AtomicallyUpdate; use std::time::SystemTime; use super::*; @@ -298,7 +303,8 @@ mod tests { #[test] fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let (measure, agg) = AggregateBuilder::::new(Some(temporality), None).sum(true); + let (measure, agg) = AggregateBuilder::::new(Some(temporality), None) + .sum(true, Arc::new(u64::new_atomic_tracker())); let mut a = Sum { data_points: vec![ DataPoint { diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 92bc3d947f..bcb33521ce 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -5,8 +5,9 @@ mod last_value; mod sum; use core::fmt; +use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Mutex; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; @@ -14,16 +15,22 @@ pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms -pub(crate) trait AtomicTracker: Sync + Send + 'static { +pub(crate) trait AtomicValue: Sync + Send + 'static { fn add(&self, value: T); - fn get_value(&self) -> T; - fn get_and_reset_value(&self) -> T; + fn get_value(&self, reset: bool) -> T; +} + +/// Keeps track if an atomic value has had a value set since the last reset +pub(crate) struct AtomicTracker> { + value: T, + has_value: AtomicBool, + _number: PhantomData, // Required for the N generic to be considered used } /// Marks a type that can have an atomic tracker generated for it pub(crate) trait AtomicallyUpdate { - type AtomicTracker: AtomicTracker; - fn new_atomic_tracker() -> Self::AtomicTracker; + type AtomicValue: AtomicValue; + fn new_atomic_tracker() -> AtomicTracker; } pub(crate) trait Number: @@ -89,87 +96,123 @@ impl Number for f64 { } } -impl AtomicTracker for AtomicU64 { +impl AtomicValue for AtomicU64 { fn add(&self, value: u64) { self.fetch_add(value, Ordering::Relaxed); } - fn get_value(&self) -> u64 { - self.load(Ordering::Relaxed) - } - - fn get_and_reset_value(&self) -> u64 { - self.swap(0, Ordering::Relaxed) + fn get_value(&self, reset: bool) -> u64 { + if reset { + self.swap(0, Ordering::Relaxed) + } else { + self.load(Ordering::Relaxed) + } } } impl AtomicallyUpdate for u64 { - type AtomicTracker = AtomicU64; + type AtomicValue = AtomicU64; - fn new_atomic_tracker() -> Self::AtomicTracker { - AtomicU64::new(0) + fn new_atomic_tracker() -> AtomicTracker { + AtomicTracker::new(AtomicU64::new(0)) } } -impl AtomicTracker for AtomicI64 { +impl AtomicValue for AtomicI64 { fn add(&self, value: i64) { self.fetch_add(value, Ordering::Relaxed); } - fn get_value(&self) -> i64 { - self.load(Ordering::Relaxed) - } - - fn get_and_reset_value(&self) -> i64 { - self.swap(0, Ordering::Relaxed) + fn get_value(&self, reset: bool) -> i64 { + if reset { + self.swap(0, Ordering::Relaxed) + } else { + self.load(Ordering::Relaxed) + } } } impl AtomicallyUpdate for i64 { - type AtomicTracker = AtomicI64; + type AtomicValue = AtomicI64; - fn new_atomic_tracker() -> Self::AtomicTracker { - AtomicI64::new(0) + fn new_atomic_tracker() -> AtomicTracker { + AtomicTracker::new(AtomicI64::new(0)) } } -pub(crate) struct F64AtomicTracker { +pub(crate) struct F64AtomicValue { inner: Mutex, // Floating points don't have true atomics, so we need to use mutex for them } -impl F64AtomicTracker { - fn new() -> Self { - F64AtomicTracker { +impl F64AtomicValue { + pub(crate) fn new() -> Self { + F64AtomicValue { inner: Mutex::new(0.0), } } } -impl AtomicTracker for F64AtomicTracker { +impl AtomicValue for F64AtomicValue { fn add(&self, value: f64) { let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); *guard += value; } - fn get_value(&self) -> f64 { - let guard = self.inner.lock().expect("F64 mutex was poisoned"); - *guard + fn get_value(&self, reset: bool) -> f64 { + let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); + if reset { + let value = *guard; + *guard = 0.0; + value + } else { + *guard + } } +} - fn get_and_reset_value(&self) -> f64 { - let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); - let value = *guard; - *guard = 0.0; +impl AtomicallyUpdate for f64 { + type AtomicValue = F64AtomicValue; - value + fn new_atomic_tracker() -> AtomicTracker { + AtomicTracker::new(F64AtomicValue::new()) } } -impl AtomicallyUpdate for f64 { - type AtomicTracker = F64AtomicTracker; +impl> AtomicTracker { + fn new(value: T) -> Self { + AtomicTracker { + value, + has_value: AtomicBool::new(false), + _number: PhantomData, + } + } + + pub(crate) fn add(&self, value: N) { + // Technically we lose atomicity from using 2 atomics. However, the `add()` is specifically + // designed mutate the value *then* set `has_value`, while the `get_value()` is designed + // to read `has_value` *then* read the value. This means that in a worst case race + // condition, the value added may get picked up from a previous `get_value()` call, but + // the `has_value` being true will be picked up in the next `get_value()` call. This really + // should only mean that the first export gets the added value, and the 2nd export will + // get a 0 value. + // + // This doesn't seem like a big deal, and we avoid the cost of locking. + self.value.add(value); + self.has_value.store(true, Ordering::Release); + } - fn new_atomic_tracker() -> Self::AtomicTracker { - F64AtomicTracker::new() + pub(crate) fn get_value(&self, reset: bool) -> Option { + let has_value = if reset { + self.has_value.swap(false, Ordering::AcqRel) + } else { + self.has_value.load(Ordering::Acquire) + }; + + if has_value { + Some(self.value.get_value(reset)) + } else { + None + } } } @@ -183,7 +226,7 @@ mod tests { atomic.add(15); atomic.add(10); - let value = atomic.get_value(); + let value = atomic.get_value(false).unwrap(); assert_eq!(value, 25); } @@ -192,11 +235,11 @@ mod tests { let atomic = u64::new_atomic_tracker(); atomic.add(15); - let value = atomic.get_and_reset_value(); - let value2 = atomic.get_value(); + let value = atomic.get_value(true); + let value2 = atomic.get_value(false); - assert_eq!(value, 15, "Incorrect first value"); - assert_eq!(value2, 0, "Incorrect second value"); + assert_eq!(value, Some(15), "Incorrect first value"); + assert_eq!(value2, None, "Incorrect second value"); } #[test] @@ -205,7 +248,7 @@ mod tests { atomic.add(15); atomic.add(-10); - let value = atomic.get_value(); + let value = atomic.get_value(false).unwrap(); assert_eq!(value, 5); } @@ -214,11 +257,11 @@ mod tests { let atomic = i64::new_atomic_tracker(); atomic.add(15); - let value = atomic.get_and_reset_value(); - let value2 = atomic.get_value(); + let value = atomic.get_value(true); + let value2 = atomic.get_value(false); - assert_eq!(value, 15, "Incorrect first value"); - assert_eq!(value2, 0, "Incorrect second value"); + assert_eq!(value, Some(15), "Incorrect first value"); + assert_eq!(value2, None, "Incorrect second value"); } #[test] @@ -227,7 +270,7 @@ mod tests { atomic.add(15.3); atomic.add(10.4); - let value = atomic.get_value(); + let value = atomic.get_value(false).unwrap(); assert!(f64::abs(25.7 - value) < 0.0001); } @@ -237,10 +280,10 @@ mod tests { let atomic = f64::new_atomic_tracker(); atomic.add(15.5); - let value = atomic.get_and_reset_value(); - let value2 = atomic.get_value(); + let value = atomic.get_value(true).unwrap(); + let value2 = atomic.get_value(false); assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value"); - assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value"); + assert_eq!(value2, None, "Expected no value from second get_value call"); } } diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 83a6b07858..4df59d9d2e 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,4 +1,5 @@ use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::{ collections::{hash_map::Entry, HashMap}, sync::Mutex, @@ -18,21 +19,15 @@ use super::{ struct ValueMap> { values: Mutex>, has_no_value_attribute_value: AtomicBool, - no_attribute_value: T::AtomicTracker, -} - -impl> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } + no_attribute_value: Arc>, } impl> ValueMap { - fn new() -> Self { + fn new(no_attribute_value: Arc>) -> Self { ValueMap { values: Mutex::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), - no_attribute_value: T::new_atomic_tracker(), + no_attribute_value, } } } @@ -79,9 +74,12 @@ impl> Sum { /// /// Each sum is scoped by attributes and the aggregation cycle the measurements /// were made in. - pub(crate) fn new(monotonic: bool) -> Self { + pub(crate) fn new( + monotonic: bool, + no_attribute_value: Arc>, + ) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(no_attribute_value), monotonic, start: Mutex::new(SystemTime::now()), } @@ -125,16 +123,12 @@ impl> Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_value_attribute_value - .swap(false, Ordering::AcqRel) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(true) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_and_reset_value(), + value, exemplars: vec![], }); } @@ -194,17 +188,12 @@ impl> Sum { } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - if self - .value_map - .has_no_value_attribute_value - .load(Ordering::Acquire) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(false) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_value(), + value, exemplars: vec![], }); } @@ -241,7 +230,7 @@ pub(crate) struct PrecomputedSum> { impl> PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(Arc::new(T::new_atomic_tracker())), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -291,16 +280,12 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - if self - .value_map - .has_no_value_attribute_value - .swap(false, Ordering::AcqRel) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(true) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_and_reset_value(), + value, exemplars: vec![], }); } @@ -373,16 +358,12 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; - if self - .value_map - .has_no_value_attribute_value - .load(Ordering::Acquire) - { + if let Some(value) = self.value_map.no_attribute_value.get_value(false) { s_data.data_points.push(DataPoint { attributes: AttributeSet::default(), start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_value(), + value, exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index 542c6e1281..5855aabd6b 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -13,6 +13,7 @@ use opentelemetry::{ }; use crate::instrumentation::Scope; +use crate::metrics::internal::{AtomicTracker, AtomicallyUpdate}; use crate::metrics::{ instrument::{ Instrument, InstrumentKind, Observable, ObservableId, ResolvedMeasures, EMPTY_MEASURE_MSG, @@ -128,6 +129,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(u64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new()))); @@ -165,6 +167,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(f64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new()))); @@ -235,6 +238,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(i64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableUpDownCounter::new(Arc::new( @@ -274,6 +278,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(f64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableUpDownCounter::new(Arc::new( @@ -364,6 +369,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(u64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -401,6 +407,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(i64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -438,6 +445,7 @@ impl InstrumentProvider for SdkMeter { name.clone(), description.clone(), unit.clone().unwrap_or_default(), + Arc::new(f64::new_atomic_tracker()), )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -733,9 +741,12 @@ where description: Option>, unit: Unit, ) -> Result> { - let aggregators = self.measures(kind, name, description, unit)?; + let no_attribute_value = Arc::new(T::new_atomic_tracker()); + let aggregators = + self.measures(kind, name, description, unit, no_attribute_value.clone())?; Ok(ResolvedMeasures { measures: aggregators, + no_attribute_value, }) } @@ -745,6 +756,7 @@ where name: Cow<'static, str>, description: Option>, unit: Unit, + no_attribute_value: Arc>, ) -> Result>>> { let inst = Instrument { name, @@ -754,7 +766,7 @@ where scope: self.meter.scope.clone(), }; - self.resolve.measures(inst) + self.resolve.measures(inst, no_attribute_value) } } diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index cbb942b17e..a6025f2eb6 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -11,6 +11,7 @@ use opentelemetry::{ KeyValue, }; +use crate::metrics::internal::AtomicTracker; use crate::{ instrumentation::Scope, metrics::{ @@ -265,7 +266,11 @@ where /// /// If an instrument is determined to use a [aggregation::Aggregation::Drop], /// that instrument is not inserted nor returned. - fn instrument(&self, inst: Instrument) -> Result>>> { + fn instrument( + &self, + inst: Instrument, + no_attribute_value: Arc>, + ) -> Result>>> { let mut matched = false; let mut measures = vec![]; let mut errs = vec![]; @@ -288,14 +293,16 @@ where continue; // This aggregator has already been added } - let agg = match self.cached_aggregator(&inst.scope, kind, stream) { - Ok(Some(agg)) => agg, - Ok(None) => continue, // Drop aggregator. - Err(err) => { - errs.push(err); - continue; - } - }; + let agg = + match self.cached_aggregator(&inst.scope, kind, stream, no_attribute_value.clone()) + { + Ok(Some(agg)) => agg, + Ok(None) => continue, // Drop aggregator. + Err(err) => { + errs.push(err); + continue; + } + }; seen.insert(id); measures.push(agg); } @@ -317,7 +324,7 @@ where allowed_attribute_keys: None, }; - match self.cached_aggregator(&inst.scope, kind, stream) { + match self.cached_aggregator(&inst.scope, kind, stream, no_attribute_value) { Ok(agg) => { if errs.is_empty() { if let Some(agg) = agg { @@ -353,6 +360,7 @@ where scope: &Scope, kind: InstrumentKind, mut stream: Stream, + no_attribute_value: Arc>, ) -> Result>>> { let mut agg = stream .aggregation @@ -390,7 +398,7 @@ where .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>); let b = AggregateBuilder::new(Some(self.pipeline.reader.temporality(kind)), filter); - let (m, ca) = match aggregate_fn(b, &agg, kind) { + let (m, ca) = match aggregate_fn(b, &agg, kind, no_attribute_value) { Ok(Some((m, ca))) => (m, ca), other => return other.map(|fs| fs.map(|(m, _)| m)), // Drop aggregator or error }; @@ -459,6 +467,7 @@ fn aggregate_fn>( b: AggregateBuilder, agg: &aggregation::Aggregation, kind: InstrumentKind, + no_attribute_value: Arc>, ) -> Result>> { use aggregation::Aggregation; fn box_val( @@ -475,6 +484,7 @@ fn aggregate_fn>( b, &DefaultAggregationSelector::new().aggregation(kind), kind, + no_attribute_value, ), Aggregation::Drop => Ok(None), Aggregation::LastValue => Ok(Some(box_val(b.last_value()))), @@ -482,8 +492,10 @@ fn aggregate_fn>( let fns = match kind { InstrumentKind::ObservableCounter => box_val(b.precomputed_sum(true)), InstrumentKind::ObservableUpDownCounter => box_val(b.precomputed_sum(false)), - InstrumentKind::Counter | InstrumentKind::Histogram => box_val(b.sum(true)), - _ => box_val(b.sum(false)), + InstrumentKind::Counter | InstrumentKind::Histogram => { + box_val(b.sum(true, no_attribute_value)) + } + _ => box_val(b.sum(false, no_attribute_value)), }; Ok(Some(fns)) } @@ -717,11 +729,15 @@ where } /// The measures that must be updated by the instrument defined by key. - pub(crate) fn measures(&self, id: Instrument) -> Result>>> { + pub(crate) fn measures( + &self, + id: Instrument, + no_attribute_value: Arc>, + ) -> Result>>> { let (mut measures, mut errs) = (vec![], vec![]); for inserter in &self.inserters { - match inserter.instrument(id.clone()) { + match inserter.instrument(id.clone(), no_attribute_value.clone()) { Ok(ms) => measures.extend(ms), Err(err) => errs.push(err), }