Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding two level hashing in metrics hashmap #1564

Closed
wants to merge 31 commits into from
Closed
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
98db4f3
initial commiy
lalitb Feb 21, 2024
1c440e0
handle empty resource
lalitb Feb 21, 2024
75c853e
fix lint
lalitb Feb 21, 2024
fd858f6
use static vector at first level hash
lalitb Feb 23, 2024
1f05dcf
add method to calculate data point size
lalitb Feb 24, 2024
d191cf7
lint error
lalitb Feb 24, 2024
fd94caa
add hashbrown and ahash as optional dependency
lalitb Feb 24, 2024
da817b5
Merge branch 'main' into two-level-hash
lalitb Feb 24, 2024
498f088
Merge branch 'main' into two-level-hash
lalitb Feb 24, 2024
2930fe1
add const for bucket count
lalitb Feb 26, 2024
ebe4a38
Merge branch 'main' into two-level-hash
lalitb Feb 28, 2024
d8cbc4c
remove panic while converting vector to fixed array
lalitb Feb 28, 2024
cad1391
Merge branch 'main' into two-level-hash
lalitb Feb 29, 2024
61ae262
remove special handling of empty attributes
lalitb Feb 29, 2024
2b8549f
Merge branch 'main' into two-level-hash
lalitb Feb 29, 2024
44efee7
graceful unwrap
lalitb Mar 1, 2024
d8c56da
lint error, and disclaimer for hashbrown
lalitb Mar 1, 2024
7b0bac5
Merge branch 'main' into two-level-hash
lalitb Mar 1, 2024
61c8b0d
update hashbrown usage disclaimer
lalitb Mar 4, 2024
267e305
fix for overflow metrics
lalitb Mar 5, 2024
078c994
add tests
lalitb Mar 7, 2024
cc12da1
Merge branch 'main' into two-level-hash
lalitb Mar 7, 2024
90bbb2d
remove leftover method
lalitb Mar 7, 2024
f89c3ea
fix atomic
lalitb Mar 7, 2024
603305e
fix
lalitb Mar 7, 2024
21b0b3e
more comments
lalitb Mar 7, 2024
f33729c
fix race condition for concurrent same attribute insert
lalitb Mar 9, 2024
8b48564
fix lint
lalitb Mar 9, 2024
04c68c2
Merge branch 'main' into two-level-hash
lalitb Mar 9, 2024
fdb5020
Merge branch 'main' into two-level-hash
lalitb Mar 19, 2024
d973c4d
Merge branch 'main' into two-level-hash
hdost Apr 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,12 +12,14 @@ rust-version = "1.65"
[dependencies]
opentelemetry = { version = "0.22", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
ahash = { version = "0.8", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
crossbeam-channel = { version = "0.5", optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
hashbrown = { version = "0.14", optional = true }
once_cell = { workspace = true }
ordered-float = { workspace = true }
percent-encoding = { version = "2.0", optional = true }
@@ -54,6 +56,9 @@ testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std",
rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]
# Enable use_hashbrown to improve hashing performance in Metrics aggregation.
# This feature should be used carefully, especially when the key/value pairs in measurement attributes are derived from external or untrusted sources.
use_hashbrown = ["hashbrown", "ahash"]

[[bench]]
name = "context"
318 changes: 213 additions & 105 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Mutex,
sync::{Arc, Mutex},
time::SystemTime,
};

use crate::attributes::AttributeSet;
use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use opentelemetry::{global, metrics::MetricsError};
use std::hash::{Hash, Hasher};

#[cfg(feature = "use_hashbrown")]
use ahash::AHasher;
#[cfg(feature = "use_hashbrown")]
use hashbrown::HashMap;

#[cfg(not(feature = "use_hashbrown"))]
use std::collections::{hash_map::DefaultHasher, HashMap};

use super::{
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
AtomicTracker, Number,
};

const BUCKET_COUNT: usize = 256;
const OVERFLOW_BUCKET_INDEX: usize = BUCKET_COUNT - 1; // Use the last bucket as overflow bucket
type BucketValue<T> = Mutex<Option<HashMap<AttributeSet, T>>>;
type Buckets<T> = Arc<[BucketValue<T>; BUCKET_COUNT]>;
/// The storage for sums.
struct ValueMap<T: Number<T>> {
values: Mutex<HashMap<AttributeSet, T>>,
buckets: Buckets<T>,
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
total_unique_entries: AtomicUsize,
}

impl<T: Number<T>> Default for ValueMap<T> {
@@ -29,12 +42,31 @@

impl<T: Number<T>> ValueMap<T> {
fn new() -> Self {
let buckets = std::iter::repeat_with(|| Mutex::new(None))
.take(BUCKET_COUNT)
.collect::<Vec<_>>()
.try_into()
.unwrap(); // this will never fail as Vec length is fixed

ValueMap {
values: Mutex::new(HashMap::new()),
buckets: Arc::new(buckets),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
total_unique_entries: AtomicUsize::new(0),
}
}

// Hash function to determine the bucket
fn hash_to_bucket(key: &AttributeSet) -> usize {
#[cfg(not(feature = "use_hashbrown"))]
let mut hasher = DefaultHasher::new();
#[cfg(feature = "use_hashbrown")]
let mut hasher = AHasher::default();

key.hash(&mut hasher);
// Use the 8 least significant bits directly, avoiding the modulus operation.
hasher.finish() as u8 as usize
}
}

impl<T: Number<T>> ValueMap<T> {
@@ -43,26 +75,45 @@
self.no_attribute_value.add(measurement);
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(mut values) = self.values.lock() {
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
let sum = occupied_entry.get_mut();
*sum += measurement;
}
Entry::Vacant(vacant_entry) => {
if is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else {
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
.and_modify(|val| *val += measurement)
.or_insert(measurement);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
}
}
}
return;
}

let bucket_index = Self::hash_to_bucket(&attrs);
let (is_new_entry, should_use_overflow) = {
let bucket_mutex = &self.buckets[bucket_index];
let bucket_guard = bucket_mutex.lock().unwrap();

let is_new_entry = if let Some(bucket) = &*bucket_guard {
!bucket.contains_key(&attrs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the common path where attributes are already existing, now we have to acquire lock once, do lookup, release lock, and the re-acquire the lock, and do the lookup+update.

Apart from the perf hit, this loses the atomicity of the update. It is possible that, between the time we release the lock and re-acquire, other entries might have occurred and the limit was hit, so this attribute should be going into over-flow.

We need to ensure atomicity and avoid the two-step lock-release-re-lock.

Copy link
Member Author

@lalitb lalitb Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation, made the required changes. All operations should be (theoretically) atomic now, and perf should be fast for common scenarios of attributes already existing.

} else {
true
};

let should_use_overflow: bool = is_new_entry
&& !is_under_cardinality_limit(self.total_unique_entries.load(Ordering::Relaxed));

(is_new_entry, should_use_overflow)
};
if is_new_entry && !should_use_overflow {
self.total_unique_entries.fetch_add(1, Ordering::Relaxed);
}
let final_bucket_index = if should_use_overflow {
OVERFLOW_BUCKET_INDEX
} else {
bucket_index
};
let bucket_mutex = &self.buckets[final_bucket_index];
let mut bucket_guard = bucket_mutex.lock().unwrap();
let bucket = bucket_guard.get_or_insert_with(HashMap::new);
let entry_key = if should_use_overflow {
STREAM_OVERFLOW_ATTRIBUTE_SET.clone()
} else {
attrs
};
bucket
.entry(entry_key)
.and_modify(|e| *e += measurement)
.or_insert(measurement);
}
}

@@ -112,16 +163,10 @@
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let mut values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);
}

let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
@@ -139,14 +184,37 @@
});
}

for (attrs, value) in values.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(prev_start),
time: Some(t),
value,
exemplars: vec![],
});
for bucket_mutex in self.value_map.buckets.iter() {
match bucket_mutex.lock() {
Ok(mut locked_bucket) => {
if let Some(ref mut bucket) = *locked_bucket {
for (attrs, value) in bucket.drain() {
// Correctly handle lock acquisition on self.start
let start_time = self.start.lock().map_or_else(
|_| SystemTime::now(), // In case of an error, use SystemTime::now()
|guard| *guard, // In case of success, dereference the guard to get the SystemTime
);

s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(start_time),
time: Some(t),
value,
exemplars: vec![],
});
self.value_map
.total_unique_entries
.fetch_sub(1, Ordering::Relaxed);
}
}
}
Err(e) => {
global::handle_error(MetricsError::Other(format!(
"Failed to acquire lock on bucket due to: {:?}",
e
)));
}

Check warning on line 216 in opentelemetry-sdk/src/metrics/internal/sum.rs

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L211-L216

Added lines #L211 - L216 were not covered by tests
}
}

// The delta collection cycle resets.
@@ -181,16 +249,10 @@
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);
}

let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
@@ -213,14 +275,31 @@
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
for (attrs, value) in values.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: *value,
exemplars: vec![],
});
for bucket_mutex in self.value_map.buckets.iter() {
// Handle potential lock failure gracefully
if let Ok(locked_bucket) = bucket_mutex.lock() {
if let Some(locked_bucket) = &*locked_bucket {
for (attrs, value) in locked_bucket.iter() {
// Handle potential lock failure on self.start and use current time as fallback
let start_time = self.start.lock().map_or_else(
|_| SystemTime::now(), // Use SystemTime::now() as fallback on error
|guard| *guard, // Dereference the guard to get the SystemTime on success
);

s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(start_time),
time: Some(t),
value: *value,
exemplars: vec![],
});
}
}
} else {
global::handle_error(MetricsError::Other(
"Failed to acquire lock on a bucket".into(),
));
}

Check warning on line 302 in opentelemetry-sdk/src/metrics/internal/sum.rs

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L298-L302

Added lines #L298 - L302 were not covered by tests
}

(
@@ -274,18 +353,13 @@
s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic;

let mut values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);

Check warning on line 359 in opentelemetry-sdk/src/metrics/internal/sum.rs

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L358-L359

Added lines #L358 - L359 were not covered by tests
}
let mut new_reported = HashMap::with_capacity(n);

let mut new_reported = HashMap::with_capacity(total_len);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
@@ -305,19 +379,39 @@
});
}

let default = T::default();
for (attrs, value) in values.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
for bucket_mutex in self.value_map.buckets.iter() {
match bucket_mutex.lock() {
Ok(mut locked_bucket) => {
if let Some(locked_bucket) = &mut *locked_bucket {
let default = T::default();
for (attrs, value) in locked_bucket.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
self.value_map
.total_unique_entries
.fetch_sub(1, Ordering::Relaxed);
}
}
}
Err(e) => {
// Log or handle the lock acquisition error if necessary
global::handle_error(MetricsError::Other(format!(
"Failed to acquire lock on bucket due to: {:?}",
e
)));
// Continue to the next bucket if the lock cannot be acquired
continue;

Check warning on line 412 in opentelemetry-sdk/src/metrics/internal/sum.rs

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L405-L412

Added lines #L405 - L412 were not covered by tests
}
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}

// The delta collection cycle resets.
@@ -356,18 +450,13 @@
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;

let values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_unique_entries.load(Ordering::Relaxed) + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);
}
let mut new_reported = HashMap::with_capacity(n);

let mut new_reported = HashMap::with_capacity(total_len);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
@@ -388,18 +477,37 @@
}

let default = T::default();
for (attrs, value) in values.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
for bucket_mutex in self.value_map.buckets.iter() {
// Safely attempt to acquire the lock, handling any potential error.
let locked_bucket = match bucket_mutex.lock() {
Ok(bucket) => bucket,
Err(e) => {
// Log the error or handle it as needed.
global::handle_error(MetricsError::Other(format!(
"Failed to acquire lock on bucket due to: {:?}",
e
)));
continue; // Skip to the next bucket if the lock cannot be acquired.

Check warning on line 490 in opentelemetry-sdk/src/metrics/internal/sum.rs

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L484-L490

Added lines #L484 - L490 were not covered by tests
}
};

// Proceed only if the bucket is not empty.
if let Some(locked_bucket) = &*locked_bucket {
for (attrs, value) in locked_bucket.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
}

s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: *value, // For cumulative, directly use the value without calculating the delta.
exemplars: vec![],
});
}
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}

*reported = new_reported;
183 changes: 183 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -161,6 +161,189 @@ mod tests {
);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation_overflow() {
// Run this test with stdout enabled to see output.
// cargo test counter --features=metrics,testing -- --nocapture

// Arrange
let exporter = InMemoryMetricsExporter::default();
// PeriodicReader with large interval to avoid auto-flush
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
.with_interval(std::time::Duration::from_secs(100000))
.build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
let meter = meter_provider.meter("test");
let counter = meter
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.init();

// sleep for random ~5 milis to avoid recording during first collect cycle
// (TBD: need to fix PeriodicReader to NOT collect data immediately after start)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create an issue for this.

std::thread::sleep(std::time::Duration::from_millis(5));
let unique_measurements = 1999;
let overflow_measurements = 4;
// Generate measurements to enforce overflow
for i in 0..unique_measurements + overflow_measurements {
let attribute_value = format!("value{}", i); // Creates a unique attribute value for each measurement
counter.add(1, &[KeyValue::new("key1", attribute_value)]);
}
meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
// Every collect cycle produces a new ResourceMetrics (even if no data is collected).
// TBD = This needs to be fixed, and then below assert should validate for one entry
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create an issue for this.

assert!(resource_metrics.len() == 2);
let metric = &resource_metrics[1].scope_metrics[0].metrics[0]; // second ResourceMetrics
assert_eq!(metric.name, "my_counter");
assert_eq!(metric.unit.as_str(), "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");

// Expecting 2000 unique time-series.
assert_eq!(sum.data_points.len(), unique_measurements + 1); // all overflow measurements are merged into one
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce cumulative by default."
);
// ensure that overflow attribute is persent
for data_point in &sum.data_points {
let mut overflow_attribute_present = false;
for attribute in data_point.attributes.iter() {
if attribute.0 == &opentelemetry::Key::from("otel.metric.overflow") {
overflow_attribute_present = true;
break;
}
}
if overflow_attribute_present {
assert_eq!(data_point.value, overflow_measurements as u64);
} else {
assert_eq!(data_point.value, 1);
}
}
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation_concurrent_overflow() {
// Run this test with stdout enabled to see output.
// cargo test counter --features=metrics,testing -- --nocapture

// Arrange
let exporter = InMemoryMetricsExporter::default();
// PeriodicReader with large interval to avoid auto-flush
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
.with_interval(std::time::Duration::from_secs(100000))
.build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
let meter = meter_provider.meter("test");
let counter = meter
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.init();

// sleep for random ~5 milis to avoid recording during first collect cycle
// (TBD: need to fix PeriodicReader to NOT collect data immediately after start)
std::thread::sleep(std::time::Duration::from_millis(5));

let unique_measurements = 1999;
let overflow_measurements = 4;
let total_measurements = unique_measurements + overflow_measurements;

let counter = std::sync::Arc::new(std::sync::Mutex::new(counter)); // Shared counter among threads

let num_threads = 4;
let measurements_per_thread = total_measurements / num_threads;
let remainder = total_measurements % num_threads; // Remainder to be added to the last thread

let mut handles = vec![];

for thread_id in 0..num_threads {
let counter_clone = std::sync::Arc::clone(&counter);
let start_index = thread_id * measurements_per_thread;
let end_index = if thread_id == num_threads - 1 {
start_index + measurements_per_thread + remainder // Add remainder to the last thread
} else {
start_index + measurements_per_thread
};

let handle = std::thread::spawn(move || {
for i in start_index..end_index {
let attribute_value = format!("value{}", i);
let kv = vec![KeyValue::new("key1", attribute_value)];

let counter = counter_clone.lock().unwrap();
counter.add(1, &kv);
}
});

handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
// Every collect cycle produces a new ResourceMetrics (even if no data is collected).
// TBD = This needs to be fixed, and then below assert should validate for one entry
assert!(resource_metrics.len() == 2);
let metric = &resource_metrics[1].scope_metrics[0].metrics[0]; // second ResourceMetrics
assert_eq!(metric.name, "my_counter");
assert_eq!(metric.unit.as_str(), "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");

// Expecting 2000 unique time-series.
assert_eq!(sum.data_points.len(), unique_measurements + 1); // all overflow measurements are merged into one
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce cumulative by default."
);

// ensure that overflow attribute is persent
for data_point in &sum.data_points {
let mut overflow_attribute_present = false;
for attribute in data_point.attributes.iter() {
if attribute.0 == &opentelemetry::Key::from("otel.metric.overflow") {
overflow_attribute_present = true;
break;
}
}
if overflow_attribute_present {
assert_eq!(data_point.value, overflow_measurements as u64);
} else {
assert_eq!(data_point.value, 1);
}
}
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
2 changes: 1 addition & 1 deletion stress/Cargo.toml
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ ctrlc = "3.2.5"
lazy_static = "1.4.0"
num_cpus = "1.15.0"
opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "use_hashbrown"] }
opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"}
rand = { version = "0.8.4", features = ["small_rng"] }
tracing = { workspace = true, features = ["std"]}