-
Notifications
You must be signed in to change notification settings - Fork 504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix few Observable Counter and UpDownCounter bugs #1992
Changes from 3 commits
a4c51ec
223eea7
cc3d928
b1104fa
55539d4
150a086
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,40 +25,54 @@ struct ValueMap<T: Number<T>> { | |
has_no_value_attribute_value: AtomicBool, | ||
no_attribute_value: T::AtomicTracker, | ||
count: AtomicUsize, | ||
assign_only: bool, // if true, only assign incoming value, instead of adding to existing value | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we rename this to something more intuitive like if self.is_sum_precomputed {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
} |
||
} | ||
|
||
impl<T: Number<T>> Default for ValueMap<T> { | ||
fn default() -> Self { | ||
ValueMap::new() | ||
ValueMap::new(false) | ||
} | ||
} | ||
|
||
impl<T: Number<T>> ValueMap<T> { | ||
fn new() -> Self { | ||
fn new(assign_only: bool) -> Self { | ||
ValueMap { | ||
values: RwLock::new(HashMap::new()), | ||
has_no_value_attribute_value: AtomicBool::new(false), | ||
no_attribute_value: T::new_atomic_tracker(), | ||
count: AtomicUsize::new(0), | ||
assign_only, | ||
} | ||
} | ||
} | ||
|
||
impl<T: Number<T>> ValueMap<T> { | ||
fn measure(&self, measurement: T, attrs: &[KeyValue]) { | ||
if attrs.is_empty() { | ||
self.no_attribute_value.add(measurement); | ||
if self.assign_only { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @stormshield-fabs Could you help refactor this to use generics, so we dont have this duplication everywhere and get better perf? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's extract this repeated snippet to a method for now? It would also make it easier to review the future PR that would refactor it using generics. |
||
self.no_attribute_value.store(measurement); | ||
} else { | ||
self.no_attribute_value.add(measurement); | ||
} | ||
self.has_no_value_attribute_value | ||
.store(true, Ordering::Release); | ||
} else if let Ok(values) = self.values.read() { | ||
// Try incoming order first | ||
if let Some(value_to_update) = values.get(attrs) { | ||
value_to_update.add(measurement); | ||
if self.assign_only { | ||
value_to_update.store(measurement); | ||
} else { | ||
value_to_update.add(measurement); | ||
} | ||
} else { | ||
// Then try sorted order. | ||
let sorted_attrs = AttributeSet::from(attrs).into_vec(); | ||
if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { | ||
value_to_update.add(measurement); | ||
if self.assign_only { | ||
value_to_update.store(measurement); | ||
} else { | ||
value_to_update.add(measurement); | ||
} | ||
} else { | ||
// Give up the lock, before acquiring write lock. | ||
drop(values); | ||
|
@@ -67,12 +81,24 @@ impl<T: Number<T>> ValueMap<T> { | |
// write lock, in case another thread has added the | ||
// value. | ||
if let Some(value_to_update) = values.get(attrs) { | ||
value_to_update.add(measurement); | ||
if self.assign_only { | ||
value_to_update.store(measurement); | ||
} else { | ||
value_to_update.add(measurement); | ||
} | ||
} else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { | ||
value_to_update.add(measurement); | ||
if self.assign_only { | ||
value_to_update.store(measurement); | ||
} else { | ||
value_to_update.add(measurement); | ||
} | ||
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { | ||
let new_value = T::new_atomic_tracker(); | ||
new_value.add(measurement); | ||
if self.assign_only { | ||
new_value.store(measurement); | ||
} else { | ||
new_value.add(measurement); | ||
} | ||
let new_value = Arc::new(new_value); | ||
|
||
// Insert original order | ||
|
@@ -85,10 +111,18 @@ impl<T: Number<T>> ValueMap<T> { | |
} else if let Some(overflow_value) = | ||
values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) | ||
{ | ||
overflow_value.add(measurement); | ||
if self.assign_only { | ||
overflow_value.store(measurement); | ||
} else { | ||
overflow_value.add(measurement); | ||
} | ||
} else { | ||
let new_value = T::new_atomic_tracker(); | ||
new_value.add(measurement); | ||
if self.assign_only { | ||
new_value.store(measurement); | ||
} else { | ||
new_value.add(measurement); | ||
} | ||
values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value)); | ||
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); | ||
} | ||
|
@@ -114,7 +148,7 @@ impl<T: Number<T>> Sum<T> { | |
/// were made in. | ||
pub(crate) fn new(monotonic: bool) -> Self { | ||
Sum { | ||
value_map: ValueMap::new(), | ||
value_map: ValueMap::new(false), | ||
monotonic, | ||
start: Mutex::new(SystemTime::now()), | ||
} | ||
|
@@ -282,7 +316,7 @@ pub(crate) struct PrecomputedSum<T: Number<T>> { | |
impl<T: Number<T>> PrecomputedSum<T> { | ||
pub(crate) fn new(monotonic: bool) -> Self { | ||
PrecomputedSum { | ||
value_map: ValueMap::new(), | ||
value_map: ValueMap::new(true), | ||
monotonic, | ||
start: Mutex::new(SystemTime::now()), | ||
reported: Mutex::new(Default::default()), | ||
|
@@ -334,11 +368,16 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
.has_no_value_attribute_value | ||
.swap(false, Ordering::AcqRel) | ||
{ | ||
let default = T::default(); | ||
let delta = self.value_map.no_attribute_value.get_value() | ||
- *reported.get(&vec![]).unwrap_or(&default); | ||
new_reported.insert(vec![], self.value_map.no_attribute_value.get_value()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be storing the computed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. The |
||
|
||
s_data.data_points.push(DataPoint { | ||
attributes: vec![], | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value: self.value_map.no_attribute_value.get_and_reset_value(), | ||
value: delta, | ||
exemplars: vec![], | ||
}); | ||
} | ||
|
@@ -351,9 +390,7 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
let default = T::default(); | ||
for (attrs, value) in values.drain() { | ||
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default); | ||
if delta != default { | ||
new_reported.insert(attrs.clone(), value.get_value()); | ||
} | ||
new_reported.insert(attrs.clone(), value.get_value()); | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(prev_start), | ||
|
@@ -408,11 +445,6 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
.data_points | ||
.reserve_exact(n - s_data.data_points.capacity()); | ||
} | ||
let mut new_reported = HashMap::with_capacity(n); | ||
let mut reported = match self.reported.lock() { | ||
Ok(r) => r, | ||
Err(_) => return (0, None), | ||
}; | ||
|
||
if self | ||
.value_map | ||
|
@@ -432,24 +464,16 @@ impl<T: Number<T>> PrecomputedSum<T> { | |
Ok(v) => v, | ||
Err(_) => return (0, None), | ||
}; | ||
let default = T::default(); | ||
for (attrs, value) in values.iter() { | ||
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default); | ||
if delta != default { | ||
new_reported.insert(attrs.clone(), value.get_value()); | ||
} | ||
s_data.data_points.push(DataPoint { | ||
attributes: attrs.clone(), | ||
start_time: Some(prev_start), | ||
time: Some(t), | ||
value: delta, | ||
value: value.get_value(), | ||
exemplars: vec![], | ||
}); | ||
} | ||
|
||
*reported = new_reported; | ||
drop(reported); // drop before values guard is dropped | ||
|
||
( | ||
s_data.data_points.len(), | ||
new_agg.map(|a| Box::new(a) as Box<_>), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -271,39 +271,72 @@ mod tests { | |
async fn observable_counter_aggregation_cumulative_non_zero_increment() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4); | ||
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn observable_counter_aggregation_delta_non_zero_increment() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4); | ||
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn observable_counter_aggregation_cumulative_zero_increment() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4); | ||
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
#[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"] | ||
async fn observable_counter_aggregation_delta_zero_increment() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4); | ||
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn observable_counter_aggregation_delta_zero_increment_no_attrs() { | ||
// Run this test with stdout enabled to see output. | ||
// cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture | ||
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true); | ||
} | ||
|
||
fn observable_counter_aggregation_helper( | ||
temporality: Temporality, | ||
start: u64, | ||
increment: u64, | ||
length: u64, | ||
is_empty_attributes: bool, | ||
) { | ||
// Arrange | ||
let mut test_context = TestContext::new(temporality); | ||
let attributes = if is_empty_attributes { | ||
vec![] | ||
} else { | ||
vec![KeyValue::new("key1", "value1")] | ||
}; | ||
// The Observable counter reports values[0], values[1],....values[n] on each flush. | ||
let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect(); | ||
println!("Testing with observable values: {:?}", values); | ||
|
@@ -317,7 +350,7 @@ mod tests { | |
.with_callback(move |observer| { | ||
let mut index = i.lock().unwrap(); | ||
if *index < values.len() { | ||
observer.observe(values[*index], &[KeyValue::new("key1", "value1")]); | ||
observer.observe(values[*index], &attributes); | ||
*index += 1; | ||
} | ||
}) | ||
|
@@ -338,9 +371,14 @@ mod tests { | |
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); | ||
} | ||
|
||
// find and validate key1=value1 datapoint | ||
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") | ||
.expect("datapoint with key1=value1 expected"); | ||
// find and validate datapoint | ||
let data_point = if is_empty_attributes { | ||
&sum.data_points[0] | ||
} else { | ||
find_datapoint_with_key_value(&sum.data_points, "key1", "value1") | ||
.expect("datapoint with key1=value1 expected") | ||
}; | ||
|
||
if let Temporality::Cumulative = temporality { | ||
// Cumulative counter should have the value as is. | ||
assert_eq!(data_point.value, *v); | ||
|
@@ -629,8 +667,9 @@ mod tests { | |
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
#[ignore = "Spatial aggregation is not yet implemented."] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this got accidentally fixed in some other PR, but this PR breaks it. spatial aggregation is complex and needs a separate discussion altogether! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is no issue to track this, probably good to have one. |
||
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() { | ||
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing | ||
// metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter | ||
|
||
// Arrange | ||
let exporter = InMemoryMetricsExporter::default(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.