Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce cost of no attribute counters #1519

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,26 @@

use core::fmt;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Mutex;

pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
fn add(&self, value: T);
fn get_value(&self) -> T;
fn get_and_reset_value(&self) -> T;
}

/// Marks a type that can have an atomic tracker generated for it
pub(crate) trait AtomicallyUpdate<T> {
type AtomicTracker: AtomicTracker<T>;
fn new_atomic_tracker() -> Self::AtomicTracker;
}

pub(crate) trait Number<T>:
Add<Output = T>
+ AddAssign
Expand All @@ -23,6 +39,7 @@
+ Send
+ Sync
+ 'static
+ AtomicallyUpdate<T>
{
fn min() -> Self;
fn max() -> Self;
Expand Down Expand Up @@ -71,3 +88,87 @@
self
}
}

impl AtomicTracker<u64> for AtomicU64 {
fn add(&self, value: u64) {
self.fetch_add(value, Ordering::Relaxed);
}

fn get_value(&self) -> u64 {
self.load(Ordering::Relaxed)
}

fn get_and_reset_value(&self) -> u64 {
self.swap(0, Ordering::Relaxed)
}
}

impl AtomicallyUpdate<u64> for u64 {
type AtomicTracker = AtomicU64;

fn new_atomic_tracker() -> Self::AtomicTracker {
AtomicU64::new(0)
}
}

impl AtomicTracker<i64> for AtomicI64 {
fn add(&self, value: i64) {
self.fetch_add(value, Ordering::Relaxed);
}

fn get_value(&self) -> i64 {
self.load(Ordering::Relaxed)
}

fn get_and_reset_value(&self) -> i64 {
self.swap(0, Ordering::Relaxed)
}
}

impl AtomicallyUpdate<i64> for i64 {
type AtomicTracker = AtomicI64;

fn new_atomic_tracker() -> Self::AtomicTracker {
AtomicI64::new(0)
}
}

pub(crate) struct F64AtomicTracker {
inner: Mutex<f64>, // Floating points don't have true atomics, so we need to use mutex for them
}

impl F64AtomicTracker {
fn new() -> Self {
F64AtomicTracker {
inner: Mutex::new(0.0),
}
}
}

impl AtomicTracker<f64> for F64AtomicTracker {
fn add(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard += value;
}

Check warning on line 152 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L149-L152

Added lines #L149 - L152 were not covered by tests

fn get_value(&self) -> f64 {
let guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard
}

Check warning on line 157 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L154-L157

Added lines #L154 - L157 were not covered by tests

fn get_and_reset_value(&self) -> f64 {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
let value = *guard;
*guard = 0.0;

value
}

Check warning on line 165 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L159-L165

Added lines #L159 - L165 were not covered by tests
}

impl AtomicallyUpdate<f64> for f64 {
type AtomicTracker = F64AtomicTracker;

fn new_atomic_tracker() -> Self::AtomicTracker {
F64AtomicTracker::new()
}
}
105 changes: 94 additions & 11 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Mutex,
Expand All @@ -10,26 +11,39 @@

use super::{
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
Number,
AtomicTracker, Number,
};

/// The storage for sums.
#[derive(Default)]
struct ValueMap<T: Number<T>> {
values: Mutex<HashMap<AttributeSet, T>>,
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
}

impl<T: Number<T>> Default for ValueMap<T> {
fn default() -> Self {
ValueMap::new()
}

Check warning on line 27 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L25-L27

Added lines #L25 - L27 were not covered by tests
}

impl<T: Number<T>> ValueMap<T> {
fn new() -> Self {
ValueMap {
values: Mutex::new(HashMap::new()),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
}
}
}

impl<T: Number<T>> ValueMap<T> {
fn measure(&self, measurement: T, attrs: AttributeSet) {
if let Ok(mut values) = self.values.lock() {
if attrs.is_empty() {
self.no_attribute_value.add(measurement);
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(mut values) = self.values.lock() {
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
Expand Down Expand Up @@ -103,14 +117,28 @@
Err(_) => return (0, None),
};

let n = values.len();
let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}

let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
if self
.value_map
.has_no_value_attribute_value
.swap(false, Ordering::AcqRel)
{
s_data.data_points.push(DataPoint {
attributes: AttributeSet::default(),
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_value.get_and_reset_value(),
exemplars: vec![],
});
}

for (attrs, value) in values.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs,
Expand All @@ -126,7 +154,10 @@
*start = t;
}

(n, new_agg.map(|a| Box::new(a) as Box<_>))
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}

pub(crate) fn cumulative(
Expand Down Expand Up @@ -155,14 +186,29 @@
Err(_) => return (0, None),
};

let n = values.len();
let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}

let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);

if self
.value_map
.has_no_value_attribute_value
.load(Ordering::Acquire)
{
s_data.data_points.push(DataPoint {
attributes: AttributeSet::default(),
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_value.get_value(),
exemplars: vec![],
});
}

// TODO: This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
Expand All @@ -177,7 +223,10 @@
});
}

(n, new_agg.map(|a| Box::new(a) as Box<_>))
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}

Expand Down Expand Up @@ -230,7 +279,7 @@
Err(_) => return (0, None),
};

let n = values.len();
let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
Expand All @@ -242,6 +291,20 @@
Err(_) => return (0, None),
};

if self
.value_map
.has_no_value_attribute_value
.swap(false, Ordering::AcqRel)
{
s_data.data_points.push(DataPoint {
attributes: AttributeSet::default(),
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_value.get_and_reset_value(),
exemplars: vec![],
});

Check warning on line 305 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L298-L305

Added lines #L298 - L305 were not covered by tests
}

let default = T::default();
for (attrs, value) in values.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
Expand All @@ -265,7 +328,10 @@
*reported = new_reported;
drop(reported); // drop before values guard is dropped

(n, new_agg.map(|a| Box::new(a) as Box<_>))
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}

pub(crate) fn cumulative(
Expand Down Expand Up @@ -295,7 +361,7 @@
Err(_) => return (0, None),
};

let n = values.len();
let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
Expand All @@ -307,6 +373,20 @@
Err(_) => return (0, None),
};

if self
.value_map
.has_no_value_attribute_value
.load(Ordering::Acquire)
{
s_data.data_points.push(DataPoint {
attributes: AttributeSet::default(),
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_value.get_value(),
exemplars: vec![],
});

Check warning on line 387 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L380-L387

Added lines #L380 - L387 were not covered by tests
}

let default = T::default();
for (attrs, value) in values.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
Expand All @@ -325,6 +405,9 @@
*reported = new_reported;
drop(reported); // drop before values guard is dropped

(n, new_agg.map(|a| Box::new(a) as Box<_>))
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}
Loading
Loading