Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ValueMap to mod file to allow for code reuse #2012

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,136 @@
mod sum;

use core::fmt;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::metrics::MetricsError;
use opentelemetry::{global, KeyValue};

use crate::metrics::AttributeSet;

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
pub(crate) trait Operation {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T);
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
pub(crate) struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}

Check warning on line 66 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L64-L66

Added lines #L64 - L66 were not covered by tests
}

impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_tracker, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;

Check warning on line 90 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L90

Added line #L90 was not covered by tests
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;

Check warning on line 110 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L110

Added line #L110 was not covered by tests
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);

Check warning on line 116 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L116

Added line #L116 was not covered by tests
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);

Check warning on line 118 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L118

Added line #L118 was not covered by tests
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
Expand Down
131 changes: 4 additions & 127 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,137 +1,14 @@
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::vec;
use std::{
collections::HashMap,
sync::{Mutex, RwLock},
time::SystemTime,
};
use std::{collections::HashMap, sync::Mutex, time::SystemTime};

use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use crate::metrics::AttributeSet;
use once_cell::sync::Lazy;
use opentelemetry::KeyValue;
use opentelemetry::{global, metrics::MetricsError};

use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number};

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
trait Operation {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T);
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static, AT: AtomicTracker<T>>(tracker: &AT, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_tracker, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}
use super::{Assign, Increment, ValueMap};
use super::{AtomicTracker, Number};

/// Summarizes a set of measurements made as their arithmetic sum.
pub(crate) struct Sum<T: Number<T>> {
Expand Down
Loading