diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 7d41cdf18d..50673f7d25 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -60,6 +60,7 @@ pub use view::*; #[cfg(all(test, feature = "testing"))] mod tests { + use self::data::ScopeMetrics; use super::*; use crate::metrics::data::{ResourceMetrics, Temporality}; use crate::metrics::reader::TemporalitySelector; @@ -70,6 +71,7 @@ mod tests { metrics::{MeterProvider as _, Unit}, KeyValue, }; + use std::borrow::Cow; // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! @@ -214,6 +216,179 @@ mod tests { assert_eq!(datapoint.value, 15); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_duplicate_instrument_different_meter_no_merge() { + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter1 = meter_provider.meter("test.meter1"); + let meter2 = meter_provider.meter("test.meter2"); + let counter1 = meter1 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let counter2 = meter2 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let attribute = vec![KeyValue::new("key1", "value1")]; + counter1.add(10, &attribute); + counter2.add(5, &attribute); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!( + resource_metrics[0].scope_metrics.len() == 2, + "There should be 2 separate scope" + ); + assert!( + resource_metrics[0].scope_metrics[0].metrics.len() == 1, + "There should be single metric for the scope" + ); + assert!( + resource_metrics[0].scope_metrics[1].metrics.len() == 1, + "There should be single metric for the scope" + ); + + let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1"); + let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2"); + + if let Some(scope1) = scope1 { + let metric1 = &scope1.metrics[0]; + assert_eq!(metric1.name, "my_counter"); + assert_eq!(metric1.unit.as_str(), "my_unit"); + assert_eq!(metric1.description, "my_description"); + let sum1 = metric1 + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum1.data_points.len(), 1); + + let datapoint1 = &sum1.data_points[0]; + assert_eq!(datapoint1.value, 10); + } else { + panic!("No MetricScope found for 'test.meter1'"); + } + + if let Some(scope2) = scope2 { + let metric2 = &scope2.metrics[0]; + assert_eq!(metric2.name, "my_counter"); + assert_eq!(metric2.unit.as_str(), "my_unit"); + assert_eq!(metric2.description, "my_description"); + let sum2 = metric2 + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum2.data_points.len(), 1); + + let datapoint2 = &sum2.data_points[0]; + assert_eq!(datapoint2.value, 5); + } else { + panic!("No MetricScope found for 'test.meter2'"); + } + } + + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn instrumentation_scope_identity_test() { + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + // Meters are identical except for scope attributes, but scope attributes are not an identifying property. + // Hence there should be a single metric stream output for this test. + let meter1 = meter_provider.versioned_meter( + "test.meter", + Some("v0.1.0"), + Some("schema_url"), + Some(vec![KeyValue::new("key", "value1")]), + ); + let meter2 = meter_provider.versioned_meter( + "test.meter", + Some("v0.1.0"), + Some("schema_url"), + Some(vec![KeyValue::new("key", "value2")]), + ); + let counter1 = meter1 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let counter2 = meter2 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let attribute = vec![KeyValue::new("key1", "value1")]; + counter1.add(10, &attribute); + counter2.add(5, &attribute); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + println!("resource_metrics: {:?}", resource_metrics); + assert!( + resource_metrics[0].scope_metrics.len() == 1, + "There should be a single scope as the meters are identical" + ); + assert!( + resource_metrics[0].scope_metrics[0].metrics.len() == 1, + "There should be single metric for the scope as instruments are identical" + ); + + let scope = &resource_metrics[0].scope_metrics[0].scope; + assert_eq!(scope.name, "test.meter"); + assert_eq!(scope.version, Some(Cow::Borrowed("v0.1.0"))); + assert_eq!(scope.schema_url, Some(Cow::Borrowed("schema_url"))); + + // This is validating current behavior, but it is not guaranteed to be the case in the future, + // as this is a user error and SDK reserves right to change this behavior. + assert_eq!(scope.attributes, vec![KeyValue::new("key", "value1")]); + + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + assert_eq!(metric.description, "my_description"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); + + let datapoint = &sum.data_points[0]; + assert_eq!(datapoint.value, 15); + } + // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -692,6 +867,15 @@ mod tests { assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect."); } + fn find_scope_metric<'a>( + metrics: &'a [ScopeMetrics], + name: &'a str, + ) -> Option<&'a ScopeMetrics> { + metrics + .iter() + .find(|&scope_metric| scope_metric.scope.name == name) + } + struct DeltaTemporalitySelector(); impl TemporalitySelector for DeltaTemporalitySelector { fn temporality(&self, _kind: InstrumentKind) -> Temporality {