From 2142583b75ec816e4edeb2f7da8b1beeb8706fb0 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 6 May 2024 23:11:05 -0700 Subject: [PATCH] Metric attributes need sort with keys only --- opentelemetry-sdk/src/attributes/set.rs | 42 +---------- opentelemetry-sdk/src/metrics/mod.rs | 98 +++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 41 deletions(-) diff --git a/opentelemetry-sdk/src/attributes/set.rs b/opentelemetry-sdk/src/attributes/set.rs index 743ac49fe7..644f44e0f5 100644 --- a/opentelemetry-sdk/src/attributes/set.rs +++ b/opentelemetry-sdk/src/attributes/set.rs @@ -39,47 +39,7 @@ impl PartialOrd for HashKeyValue { impl Ord for HashKeyValue { fn cmp(&self, other: &Self) -> Ordering { - match self.0.key.cmp(&other.0.key) { - Ordering::Equal => match type_order(&self.0.value).cmp(&type_order(&other.0.value)) { - Ordering::Equal => match (&self.0.value, &other.0.value) { - (Value::F64(f), Value::F64(of)) => OrderedFloat(*f).cmp(&OrderedFloat(*of)), - (Value::Array(Array::Bool(b)), Value::Array(Array::Bool(ob))) => b.cmp(ob), - (Value::Array(Array::I64(i)), Value::Array(Array::I64(oi))) => i.cmp(oi), - (Value::Array(Array::String(s)), Value::Array(Array::String(os))) => s.cmp(os), - (Value::Array(Array::F64(f)), Value::Array(Array::F64(of))) => { - match f.len().cmp(&of.len()) { - Ordering::Equal => f - .iter() - .map(|x| OrderedFloat(*x)) - .collect::>() - .cmp(&of.iter().map(|x| OrderedFloat(*x)).collect()), - other => other, - } - } - (Value::Bool(b), Value::Bool(ob)) => b.cmp(ob), - (Value::I64(i), Value::I64(oi)) => i.cmp(oi), - (Value::String(s), Value::String(os)) => s.cmp(os), - _ => Ordering::Equal, - }, - other => other, // 2nd order by value types - }, - other => other, // 1st order by key - } - } -} - -fn type_order(v: &Value) -> u8 { - match v { - Value::Bool(_) => 1, - Value::I64(_) => 2, - Value::F64(_) => 3, - Value::String(_) => 4, - Value::Array(a) => match a { - Array::Bool(_) => 5, - Array::I64(_) => 6, - Array::F64(_) => 7, - Array::String(_) => 8, - }, + self.0.key.cmp(&other.0.key) } } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 80ff89469b..2c7ec799b7 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -607,6 +607,104 @@ mod tests { assert_eq!(data_point.value, 30); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_attribute_order() { + // Run this test with stdout enabled to see output. + // cargo test counter_aggregation_attribute_order --features=metrics,testing -- --nocapture + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter.u64_counter("my_counter").init(); + // Add the same set of attributes in different order. (they are expected + // to be treated as same attributes) + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + ], + ); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_counter"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!( + sum.data_points.len(), + 1, + "Expected only one data point as attributes are same, but just reordered." + ); + assert_eq!( + sum.temporality, + data::Temporality::Cumulative, + "Should produce cumulative by default." + ); + + // validate the sole datapoint + let data_point1 = &sum.data_points[0]; + assert_eq!(data_point1.value, 6); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn no_attr_cumulative_counter() { let mut test_context = TestContext::new(Some(Temporality::Cumulative));