Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counter performance increase by incrementing atomics immediately when no attributes are provided #1563

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use opentelemetry::{
Key, KeyValue,
};

use crate::metrics::internal::{AtomicTracker, Number};
use crate::{
attributes::AttributeSet,
instrumentation::Scope,
Expand Down Expand Up @@ -254,35 +255,40 @@ impl InstrumentId {
}
}

pub(crate) struct ResolvedMeasures<T> {
pub(crate) struct ResolvedMeasures<T: Number<T>> {
pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
pub(crate) no_attribute_value: Arc<AtomicTracker<T, T::AtomicValue>>,
}

impl<T: Copy + 'static> SyncCounter<T> for ResolvedMeasures<T> {
impl<T: Copy + 'static + Number<T>> SyncCounter<T> for ResolvedMeasures<T> {
fn add(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
if attrs.is_empty() {
self.no_attribute_value.add(val);
} else {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
}
}
}
}

impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
impl<T: Copy + 'static + Number<T>> SyncUpDownCounter<T> for ResolvedMeasures<T> {
fn add(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
}
}
}

impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
impl<T: Copy + 'static + Number<T>> SyncGauge<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
}
}
}

impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
impl<T: Copy + 'static + Number<T>> SyncHistogram<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
Expand Down
14 changes: 10 additions & 4 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::{
histogram::Histogram,
last_value::LastValue,
sum::{PrecomputedSum, Sum},
Number,
AtomicTracker, Number,
};

const STREAM_CARDINALITY_LIMIT: u32 = 2000;
Expand Down Expand Up @@ -150,8 +150,12 @@ impl<T: Number<T>> AggregateBuilder<T> {
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(monotonic));
pub(crate) fn sum(
&self,
monotonic: bool,
no_attribute_value: Arc<AtomicTracker<T, T::AtomicValue>>,
) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(monotonic, no_attribute_value));
let agg_sum = Arc::clone(&s);
let t = self.temporality;

Expand Down Expand Up @@ -217,6 +221,7 @@ mod tests {
DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
Histogram, HistogramDataPoint, Sum,
};
use crate::metrics::internal::AtomicallyUpdate;
use std::time::SystemTime;

use super::*;
Expand Down Expand Up @@ -298,7 +303,8 @@ mod tests {
#[test]
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
.sum(true, Arc::new(u64::new_atomic_tracker()));
let mut a = Sum {
data_points: vec![
DataPoint {
Expand Down
157 changes: 100 additions & 57 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ mod last_value;
mod sum;

use core::fmt;
use std::marker::PhantomData;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Mutex;

pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
pub(crate) trait AtomicValue<T>: Sync + Send + 'static {
fn add(&self, value: T);
fn get_value(&self) -> T;
fn get_and_reset_value(&self) -> T;
fn get_value(&self, reset: bool) -> T;
}

/// Keeps track if an atomic value has had a value set since the last reset
pub(crate) struct AtomicTracker<N, T: AtomicValue<N>> {
value: T,
has_value: AtomicBool,
_number: PhantomData<N>, // Required for the N generic to be considered used
}

/// Marks a type that can have an atomic tracker generated for it
pub(crate) trait AtomicallyUpdate<T> {
type AtomicTracker: AtomicTracker<T>;
fn new_atomic_tracker() -> Self::AtomicTracker;
type AtomicValue: AtomicValue<T>;
fn new_atomic_tracker() -> AtomicTracker<T, Self::AtomicValue>;
}

pub(crate) trait Number<T>:
Expand Down Expand Up @@ -89,87 +96,123 @@ impl Number<f64> for f64 {
}
}

impl AtomicTracker<u64> for AtomicU64 {
impl AtomicValue<u64> for AtomicU64 {
fn add(&self, value: u64) {
self.fetch_add(value, Ordering::Relaxed);
}

fn get_value(&self) -> u64 {
self.load(Ordering::Relaxed)
}

fn get_and_reset_value(&self) -> u64 {
self.swap(0, Ordering::Relaxed)
fn get_value(&self, reset: bool) -> u64 {
if reset {
self.swap(0, Ordering::Relaxed)
} else {
self.load(Ordering::Relaxed)
}
}
}

impl AtomicallyUpdate<u64> for u64 {
type AtomicTracker = AtomicU64;
type AtomicValue = AtomicU64;

fn new_atomic_tracker() -> Self::AtomicTracker {
AtomicU64::new(0)
fn new_atomic_tracker() -> AtomicTracker<u64, Self::AtomicValue> {
AtomicTracker::new(AtomicU64::new(0))
}
}

impl AtomicTracker<i64> for AtomicI64 {
impl AtomicValue<i64> for AtomicI64 {
fn add(&self, value: i64) {
self.fetch_add(value, Ordering::Relaxed);
}

fn get_value(&self) -> i64 {
self.load(Ordering::Relaxed)
}

fn get_and_reset_value(&self) -> i64 {
self.swap(0, Ordering::Relaxed)
fn get_value(&self, reset: bool) -> i64 {
if reset {
self.swap(0, Ordering::Relaxed)
} else {
self.load(Ordering::Relaxed)
}
}
}

impl AtomicallyUpdate<i64> for i64 {
type AtomicTracker = AtomicI64;
type AtomicValue = AtomicI64;

fn new_atomic_tracker() -> Self::AtomicTracker {
AtomicI64::new(0)
fn new_atomic_tracker() -> AtomicTracker<i64, Self::AtomicValue> {
AtomicTracker::new(AtomicI64::new(0))
}
}

pub(crate) struct F64AtomicTracker {
pub(crate) struct F64AtomicValue {
inner: Mutex<f64>, // Floating points don't have true atomics, so we need to use mutex for them
}

impl F64AtomicTracker {
fn new() -> Self {
F64AtomicTracker {
impl F64AtomicValue {
pub(crate) fn new() -> Self {
F64AtomicValue {
inner: Mutex::new(0.0),
}
}
}

impl AtomicTracker<f64> for F64AtomicTracker {
impl AtomicValue<f64> for F64AtomicValue {
fn add(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard += value;
}

fn get_value(&self) -> f64 {
let guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard
fn get_value(&self, reset: bool) -> f64 {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
if reset {
let value = *guard;
*guard = 0.0;
value
} else {
*guard
}
}
}

fn get_and_reset_value(&self) -> f64 {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
let value = *guard;
*guard = 0.0;
impl AtomicallyUpdate<f64> for f64 {
type AtomicValue = F64AtomicValue;

value
fn new_atomic_tracker() -> AtomicTracker<f64, Self::AtomicValue> {
AtomicTracker::new(F64AtomicValue::new())
}
}

impl AtomicallyUpdate<f64> for f64 {
type AtomicTracker = F64AtomicTracker;
impl<N, T: AtomicValue<N>> AtomicTracker<N, T> {
fn new(value: T) -> Self {
AtomicTracker {
value,
has_value: AtomicBool::new(false),
_number: PhantomData,
}
}

pub(crate) fn add(&self, value: N) {
// Technically we lose atomicity from using 2 atomics. However, the `add()` is specifically
// designed mutate the value *then* set `has_value`, while the `get_value()` is designed
// to read `has_value` *then* read the value. This means that in a worst case race
// condition, the value added may get picked up from a previous `get_value()` call, but
// the `has_value` being true will be picked up in the next `get_value()` call. This really
// should only mean that the first export gets the added value, and the 2nd export will
// get a 0 value.
//
// This doesn't seem like a big deal, and we avoid the cost of locking.
self.value.add(value);
self.has_value.store(true, Ordering::Release);
}

fn new_atomic_tracker() -> Self::AtomicTracker {
F64AtomicTracker::new()
pub(crate) fn get_value(&self, reset: bool) -> Option<N> {
let has_value = if reset {
self.has_value.swap(false, Ordering::AcqRel)
} else {
self.has_value.load(Ordering::Acquire)
};

if has_value {
Some(self.value.get_value(reset))
} else {
None
}
}
}

Expand All @@ -183,7 +226,7 @@ mod tests {
atomic.add(15);
atomic.add(10);

let value = atomic.get_value();
let value = atomic.get_value(false).unwrap();
assert_eq!(value, 25);
}

Expand All @@ -192,11 +235,11 @@ mod tests {
let atomic = u64::new_atomic_tracker();
atomic.add(15);

let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();
let value = atomic.get_value(true);
let value2 = atomic.get_value(false);

assert_eq!(value, 15, "Incorrect first value");
assert_eq!(value2, 0, "Incorrect second value");
assert_eq!(value, Some(15), "Incorrect first value");
assert_eq!(value2, None, "Incorrect second value");
}

#[test]
Expand All @@ -205,7 +248,7 @@ mod tests {
atomic.add(15);
atomic.add(-10);

let value = atomic.get_value();
let value = atomic.get_value(false).unwrap();
assert_eq!(value, 5);
}

Expand All @@ -214,11 +257,11 @@ mod tests {
let atomic = i64::new_atomic_tracker();
atomic.add(15);

let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();
let value = atomic.get_value(true);
let value2 = atomic.get_value(false);

assert_eq!(value, 15, "Incorrect first value");
assert_eq!(value2, 0, "Incorrect second value");
assert_eq!(value, Some(15), "Incorrect first value");
assert_eq!(value2, None, "Incorrect second value");
}

#[test]
Expand All @@ -227,7 +270,7 @@ mod tests {
atomic.add(15.3);
atomic.add(10.4);

let value = atomic.get_value();
let value = atomic.get_value(false).unwrap();

assert!(f64::abs(25.7 - value) < 0.0001);
}
Expand All @@ -237,10 +280,10 @@ mod tests {
let atomic = f64::new_atomic_tracker();
atomic.add(15.5);

let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();
let value = atomic.get_value(true).unwrap();
let value2 = atomic.get_value(false);

assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value");
assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value");
assert_eq!(value2, None, "Expected no value from second get_value call");
}
}
Loading
Loading