Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix few Observable Counter and UpDownCounter bugs #1992

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
The custom exporters and processors can't directly access the `LogData::LogRecord::attributes`, as
these are private to opentelemetry-sdk. Instead, they would now use LogRecord::attributes_iter()
method to access them.

- Fixed various Metric aggregation bug related to
ObservableCounter,UpDownCounter including
[1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517).
[#1990](https://github.com/open-telemetry/opentelemetry-rust/pull/1990)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[#1990](https://github.com/open-telemetry/opentelemetry-rust/pull/1990)
[#1992](https://github.com/open-telemetry/opentelemetry-rust/pull/1992)


## v0.24.1

Expand Down
14 changes: 14 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
fn store(&self, value: T);
fn add(&self, value: T);
fn get_value(&self) -> T;
fn get_and_reset_value(&self) -> T;
Expand Down Expand Up @@ -90,6 +91,10 @@
}

impl AtomicTracker<u64> for AtomicU64 {
fn store(&self, value: u64) {
self.store(value, Ordering::Relaxed);
}

fn add(&self, value: u64) {
self.fetch_add(value, Ordering::Relaxed);
}
Expand All @@ -112,6 +117,10 @@
}

impl AtomicTracker<i64> for AtomicI64 {
fn store(&self, value: i64) {
self.store(value, Ordering::Relaxed);
}

Check warning on line 122 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L120-L122

Added lines #L120 - L122 were not covered by tests

fn add(&self, value: i64) {
self.fetch_add(value, Ordering::Relaxed);
}
Expand Down Expand Up @@ -146,6 +155,11 @@
}

impl AtomicTracker<f64> for F64AtomicTracker {
fn store(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard = value;
}

Check warning on line 161 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L158-L161

Added lines #L158 - L161 were not covered by tests

fn add(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard += value;
Expand Down
84 changes: 54 additions & 30 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,54 @@
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
count: AtomicUsize,
assign_only: bool, // if true, only assign incoming value, instead of adding to existing value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename this to something more intuitive like is_sum_precomputed? I think that would better explain the reasoning behind the logic when reading the code:

if self.is_sum_precomputed {
    value_to_update.store(measurement);
} else {
    value_to_update.add(measurement);
}

}

impl<T: Number<T>> Default for ValueMap<T> {
fn default() -> Self {
ValueMap::new()
ValueMap::new(false)

Check warning on line 33 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L33

Added line #L33 was not covered by tests
}
}

impl<T: Number<T>> ValueMap<T> {
fn new() -> Self {
fn new(assign_only: bool) -> Self {
ValueMap {
values: RwLock::new(HashMap::new()),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
assign_only,
}
}
}

impl<T: Number<T>> ValueMap<T> {
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
if attrs.is_empty() {
self.no_attribute_value.add(measurement);
if self.assign_only {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stormshield-fabs Could you help refactor this to use generics, so we dont have this duplication everywhere and get better perf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract this repeated snippet to a method for now? It would also make it easier to review the future PR that would refactor it using generics.

self.no_attribute_value.store(measurement);
} else {
self.no_attribute_value.add(measurement);
}
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(values) = self.values.read() {
// Try incoming order first
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
}
} else {
// Then try sorted order.
let sorted_attrs = AttributeSet::from(attrs).into_vec();
if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);

Check warning on line 72 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L72

Added line #L72 was not covered by tests
} else {
value_to_update.add(measurement);
}
} else {
// Give up the lock, before acquiring write lock.
drop(values);
Expand All @@ -67,12 +81,24 @@
// write lock, in case another thread has added the
// value.
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
}

Check warning on line 88 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L84-L88

Added lines #L84 - L88 were not covered by tests
} else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
}

Check warning on line 94 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L90-L94

Added lines #L90 - L94 were not covered by tests
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
if self.assign_only {
new_value.store(measurement);
} else {
new_value.add(measurement);
}
let new_value = Arc::new(new_value);

// Insert original order
Expand All @@ -85,10 +111,18 @@
} else if let Some(overflow_value) =
values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice())
{
overflow_value.add(measurement);
if self.assign_only {
overflow_value.store(measurement);

Check warning on line 115 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L115

Added line #L115 was not covered by tests
} else {
overflow_value.add(measurement);
}
} else {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
if self.assign_only {
new_value.store(measurement);

Check warning on line 122 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L122

Added line #L122 was not covered by tests
} else {
new_value.add(measurement);
}
values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
Expand All @@ -114,7 +148,7 @@
/// were made in.
pub(crate) fn new(monotonic: bool) -> Self {
Sum {
value_map: ValueMap::new(),
value_map: ValueMap::new(false),
monotonic,
start: Mutex::new(SystemTime::now()),
}
Expand Down Expand Up @@ -282,7 +316,7 @@
impl<T: Number<T>> PrecomputedSum<T> {
pub(crate) fn new(monotonic: bool) -> Self {
PrecomputedSum {
value_map: ValueMap::new(),
value_map: ValueMap::new(true),
monotonic,
start: Mutex::new(SystemTime::now()),
reported: Mutex::new(Default::default()),
Expand Down Expand Up @@ -334,11 +368,16 @@
.has_no_value_attribute_value
.swap(false, Ordering::AcqRel)
{
let default = T::default();
let delta = self.value_map.no_attribute_value.get_value()
- *reported.get(&vec![]).unwrap_or(&default);
new_reported.insert(vec![], self.value_map.no_attribute_value.get_value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be storing the computed delta value here and not do another atomic read. Otherwise, we risk storing an incorrect value from another update thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The self.value_map.no_attribute_value fetched once in line 372 should be used as new_reported value instead of fetching again.


s_data.data_points.push(DataPoint {
attributes: vec![],
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_value.get_and_reset_value(),
value: delta,
exemplars: vec![],
});
}
Expand All @@ -351,9 +390,7 @@
let default = T::default();
for (attrs, value) in values.drain() {
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
new_reported.insert(attrs.clone(), value.get_value());
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
Expand Down Expand Up @@ -408,11 +445,6 @@
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}
let mut new_reported = HashMap::with_capacity(n);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
};

if self
.value_map
Expand All @@ -432,24 +464,16 @@
Ok(v) => v,
Err(_) => return (0, None),
};
let default = T::default();
for (attrs, value) in values.iter() {
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
value: value.get_value(),
exemplars: vec![],
});
}

*reported = new_reported;
drop(reported); // drop before values guard is dropped

(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
Expand Down
59 changes: 49 additions & 10 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,39 +271,72 @@
async fn observable_counter_aggregation_cumulative_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4);
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4);
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4);
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"]
async fn observable_counter_aggregation_delta_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4);
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
}

fn observable_counter_aggregation_helper(
temporality: Temporality,
start: u64,
increment: u64,
length: u64,
is_empty_attributes: bool,
) {
// Arrange
let mut test_context = TestContext::new(temporality);
let attributes = if is_empty_attributes {
vec![]
} else {
vec![KeyValue::new("key1", "value1")]
};
// The Observable counter reports values[0], values[1],....values[n] on each flush.
let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
println!("Testing with observable values: {:?}", values);
Expand All @@ -317,7 +350,7 @@
.with_callback(move |observer| {
let mut index = i.lock().unwrap();
if *index < values.len() {
observer.observe(values[*index], &[KeyValue::new("key1", "value1")]);
observer.observe(values[*index], &attributes);
*index += 1;
}
})
Expand All @@ -338,9 +371,14 @@
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}

// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
// find and validate datapoint
let data_point = if is_empty_attributes {
&sum.data_points[0]
} else {
find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected")
};

if let Temporality::Cumulative = temporality {
// Cumulative counter should have the value as is.
assert_eq!(data_point.value, *v);
Expand Down Expand Up @@ -629,8 +667,9 @@
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this got accidentally fixed in some other PR, but this PR breaks it. spatial aggregation is complex and needs a separate discussion altogether!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no issue to track this, probably good to have one.

async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing
// cargo test metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing

Check warning on line 672 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L672

Added line #L672 was not covered by tests

// Arrange
let exporter = InMemoryMetricsExporter::default();
Expand Down
Loading