Skip to content

Commit bf20372

Browse files
cijothomasTommyCpp
andauthoredMay 12, 2024··
Refactor Metrics tests and add more (#1737)
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
1 parent 2a526d7 commit bf20372

File tree

1 file changed

+192
-243
lines changed
  • opentelemetry-sdk/src/metrics

1 file changed

+192
-243
lines changed
 

‎opentelemetry-sdk/src/metrics/mod.rs

+192-243
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub use view::*;
6060

6161
#[cfg(all(test, feature = "testing"))]
6262
mod tests {
63-
use self::data::ScopeMetrics;
63+
use self::data::{DataPoint, ScopeMetrics};
6464
use super::*;
6565
use crate::metrics::data::{ResourceMetrics, Temporality};
6666
use crate::metrics::reader::TemporalitySelector;
@@ -73,98 +73,41 @@ mod tests {
7373
};
7474
use std::borrow::Cow;
7575

76+
// Run all tests in this mod
77+
// cargo test metrics::tests --features=metrics,testing
78+
79+
// Note for all tests in this mod:
7680
// "multi_thread" tokio flavor must be used else flush won't
7781
// be able to make progress!
82+
7883
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
79-
async fn counter_aggregation() {
84+
async fn counter_aggregation_cumulative() {
8085
// Run this test with stdout enabled to see output.
81-
// cargo test counter --features=metrics,testing -- --nocapture
82-
83-
// Arrange
84-
let exporter = InMemoryMetricsExporter::default();
85-
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
86-
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
87-
88-
// Act
89-
let meter = meter_provider.meter("test");
90-
let counter = meter
91-
.u64_counter("my_counter")
92-
.with_unit(Unit::new("my_unit"))
93-
.init();
94-
counter.add(1, &[KeyValue::new("key1", "value1")]);
95-
counter.add(1, &[KeyValue::new("key1", "value1")]);
96-
counter.add(1, &[KeyValue::new("key1", "value1")]);
97-
counter.add(1, &[KeyValue::new("key1", "value1")]);
98-
counter.add(1, &[KeyValue::new("key1", "value1")]);
99-
100-
counter.add(1, &[KeyValue::new("key1", "value2")]);
101-
counter.add(1, &[KeyValue::new("key1", "value2")]);
102-
counter.add(1, &[KeyValue::new("key1", "value2")]);
103-
104-
meter_provider.force_flush().unwrap();
105-
106-
// Assert
107-
let resource_metrics = exporter
108-
.get_finished_metrics()
109-
.expect("metrics are expected to be exported.");
110-
assert!(!resource_metrics.is_empty());
111-
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
112-
assert_eq!(metric.name, "my_counter");
113-
assert_eq!(metric.unit.as_str(), "my_unit");
114-
let sum = metric
115-
.data
116-
.as_any()
117-
.downcast_ref::<data::Sum<u64>>()
118-
.expect("Sum aggregation expected for Counter instruments by default");
86+
// cargo test counter_aggregation_cumulative --features=metrics,testing -- --nocapture
87+
counter_aggregation_helper(Temporality::Cumulative);
88+
}
11989

120-
// Expecting 2 time-series.
121-
assert_eq!(sum.data_points.len(), 2);
122-
assert!(sum.is_monotonic, "Counter should produce monotonic.");
123-
assert_eq!(
124-
sum.temporality,
125-
data::Temporality::Cumulative,
126-
"Should produce cumulative by default."
127-
);
90+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
91+
async fn counter_aggregation_delta() {
92+
// Run this test with stdout enabled to see output.
93+
// cargo test counter_aggregation_delta --features=metrics,testing -- --nocapture
94+
counter_aggregation_helper(Temporality::Delta);
95+
}
12896

129-
// find and validate key1=value1 datapoint
130-
let mut data_point1 = None;
131-
for datapoint in &sum.data_points {
132-
if datapoint
133-
.attributes
134-
.iter()
135-
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1")
136-
{
137-
data_point1 = Some(datapoint);
138-
}
139-
}
140-
assert_eq!(
141-
data_point1
142-
.expect("datapoint with key1=value1 expected")
143-
.value,
144-
5
145-
);
97+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
98+
async fn updown_counter_aggregation_cumulative() {
99+
// Run this test with stdout enabled to see output.
100+
// cargo test counter_aggregation_cumulative --features=metrics,testing -- --nocapture
101+
updown_counter_aggregation_helper(Temporality::Cumulative);
102+
}
146103

147-
// find and validate key1=value2 datapoint
148-
let mut data_point1 = None;
149-
for datapoint in &sum.data_points {
150-
if datapoint
151-
.attributes
152-
.iter()
153-
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2")
154-
{
155-
data_point1 = Some(datapoint);
156-
}
157-
}
158-
assert_eq!(
159-
data_point1
160-
.expect("datapoint with key1=value2 expected")
161-
.value,
162-
3
163-
);
104+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
105+
async fn updown_counter_aggregation_delta() {
106+
// Run this test with stdout enabled to see output.
107+
// cargo test counter_aggregation_delta --features=metrics,testing -- --nocapture
108+
updown_counter_aggregation_helper(Temporality::Delta);
164109
}
165110

166-
// "multi_thread" tokio flavor must be used else flush won't
167-
// be able to make progress!
168111
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
169112
async fn observable_counter_aggregation() {
170113
// Run this test with stdout enabled to see output.
@@ -248,8 +191,6 @@ mod tests {
248191
);
249192
}
250193

251-
// "multi_thread" tokio flavor must be used else flush won't
252-
// be able to make progress!
253194
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
254195
async fn counter_duplicate_instrument_merge() {
255196
// Arrange
@@ -301,8 +242,6 @@ mod tests {
301242
assert_eq!(datapoint.value, 15);
302243
}
303244

304-
// "multi_thread" tokio flavor must be used else flush won't
305-
// be able to make progress!
306245
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
307246
async fn counter_duplicate_instrument_different_meter_no_merge() {
308247
// Arrange
@@ -392,8 +331,6 @@ mod tests {
392331
}
393332
}
394333

395-
// "multi_thread" tokio flavor must be used else flush won't
396-
// be able to make progress!
397334
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
398335
async fn instrumentation_scope_identity_test() {
399336
// Arrange
@@ -474,8 +411,6 @@ mod tests {
474411
assert_eq!(datapoint.value, 15);
475412
}
476413

477-
// "multi_thread" tokio flavor must be used else flush won't
478-
// be able to make progress!
479414
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
480415
async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
481416
// Run this test with stdout enabled to see output.
@@ -527,8 +462,6 @@ mod tests {
527462
);
528463
}
529464

530-
// "multi_thread" tokio flavor must be used else flush won't
531-
// be able to make progress!
532465
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
533466
// #[ignore = "Spatial aggregation is not yet implemented."]
534467
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
@@ -605,8 +538,6 @@ mod tests {
605538
assert_eq!(data_point.value, 300);
606539
}
607540

608-
// "multi_thread" tokio flavor must be used else flush won't
609-
// be able to make progress!
610541
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
611542
async fn spatial_aggregation_when_view_drops_attributes_counter() {
612543
// cargo test spatial_aggregation_when_view_drops_attributes_counter --features=metrics,testing
@@ -683,21 +614,16 @@ mod tests {
683614
assert_eq!(data_point.value, 30);
684615
}
685616

686-
// "multi_thread" tokio flavor must be used else flush won't
687-
// be able to make progress!
688617
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
689618
async fn counter_aggregation_attribute_order() {
690619
// Run this test with stdout enabled to see output.
691620
// cargo test counter_aggregation_attribute_order --features=metrics,testing -- --nocapture
692621

693622
// Arrange
694-
let exporter = InMemoryMetricsExporter::default();
695-
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
696-
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
623+
let mut test_context = TestContext::new(Temporality::Delta);
624+
let counter = test_context.u64_counter("test", "my_counter", None);
697625

698626
// Act
699-
let meter = meter_provider.meter("test");
700-
let counter = meter.u64_counter("my_counter").init();
701627
// Add the same set of attributes in different order. (they are expected
702628
// to be treated as same attributes)
703629
counter.add(
@@ -748,33 +674,12 @@ mod tests {
748674
KeyValue::new("B", "b"),
749675
],
750676
);
677+
test_context.flush_metrics();
751678

752-
meter_provider.force_flush().unwrap();
753-
754-
// Assert
755-
let resource_metrics = exporter
756-
.get_finished_metrics()
757-
.expect("metrics are expected to be exported.");
758-
assert!(!resource_metrics.is_empty());
759-
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
760-
assert_eq!(metric.name, "my_counter");
761-
let sum = metric
762-
.data
763-
.as_any()
764-
.downcast_ref::<data::Sum<u64>>()
765-
.expect("Sum aggregation expected for Counter instruments by default");
679+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
766680

767681
// Expecting 1 time-series.
768-
assert_eq!(
769-
sum.data_points.len(),
770-
1,
771-
"Expected only one data point as attributes are same, but just reordered."
772-
);
773-
assert_eq!(
774-
sum.temporality,
775-
data::Temporality::Cumulative,
776-
"Should produce cumulative by default."
777-
);
682+
assert_eq!(sum.data_points.len(), 1);
778683

779684
// validate the sole datapoint
780685
let data_point1 = &sum.data_points[0];
@@ -783,13 +688,13 @@ mod tests {
783688

784689
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
785690
async fn no_attr_cumulative_counter() {
786-
let mut test_context = TestContext::new(Some(Temporality::Cumulative));
787-
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
691+
let mut test_context = TestContext::new(Temporality::Cumulative);
692+
let counter = test_context.u64_counter("test", "my_counter", None);
788693

789694
counter.add(50, &[]);
790695
test_context.flush_metrics();
791696

792-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
697+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
793698

794699
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
795700
assert!(sum.is_monotonic, "Should produce monotonic.");
@@ -806,13 +711,13 @@ mod tests {
806711

807712
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
808713
async fn no_attr_delta_counter() {
809-
let mut test_context = TestContext::new(Some(Temporality::Delta));
810-
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
714+
let mut test_context = TestContext::new(Temporality::Delta);
715+
let counter = test_context.u64_counter("test", "my_counter", None);
811716

812717
counter.add(50, &[]);
813718
test_context.flush_metrics();
814719

815-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
720+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
816721

817722
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
818723
assert!(sum.is_monotonic, "Should produce monotonic.");
@@ -825,13 +730,13 @@ mod tests {
825730

826731
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
827732
async fn no_attr_cumulative_up_down_counter() {
828-
let mut test_context = TestContext::new(Some(Temporality::Cumulative));
829-
let counter = test_context.i64_up_down_counter("test", "my_counter", "my_unit");
733+
let mut test_context = TestContext::new(Temporality::Cumulative);
734+
let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
830735

831736
counter.add(50, &[]);
832737
test_context.flush_metrics();
833738

834-
let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", "my_unit");
739+
let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", Some("my_unit"));
835740

836741
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
837742
assert!(!sum.is_monotonic, "Should not produce monotonic.");
@@ -848,13 +753,13 @@ mod tests {
848753

849754
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
850755
async fn no_attr_delta_up_down_counter() {
851-
let mut test_context = TestContext::new(Some(Temporality::Delta));
852-
let counter = test_context.i64_up_down_counter("test", "my_counter", "my_unit");
756+
let mut test_context = TestContext::new(Temporality::Delta);
757+
let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
853758

854759
counter.add(50, &[]);
855760
test_context.flush_metrics();
856761

857-
let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", "my_unit");
762+
let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", Some("my_unit"));
858763

859764
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
860765
assert!(!sum.is_monotonic, "Should not produce monotonic.");
@@ -867,16 +772,16 @@ mod tests {
867772

868773
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
869774
async fn no_attr_cumulative_counter_value_added_after_export() {
870-
let mut test_context = TestContext::new(Some(Temporality::Cumulative));
871-
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
775+
let mut test_context = TestContext::new(Temporality::Cumulative);
776+
let counter = test_context.u64_counter("test", "my_counter", None);
872777

873778
counter.add(50, &[]);
874779
test_context.flush_metrics();
875-
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
780+
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
876781

877782
counter.add(5, &[]);
878783
test_context.flush_metrics();
879-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
784+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
880785

881786
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
882787
assert!(sum.is_monotonic, "Should produce monotonic.");
@@ -893,16 +798,16 @@ mod tests {
893798

894799
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
895800
async fn no_attr_delta_counter_value_reset_after_export() {
896-
let mut test_context = TestContext::new(Some(Temporality::Delta));
897-
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
801+
let mut test_context = TestContext::new(Temporality::Delta);
802+
let counter = test_context.u64_counter("test", "my_counter", None);
898803

899804
counter.add(50, &[]);
900805
test_context.flush_metrics();
901-
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
806+
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
902807

903808
counter.add(5, &[]);
904809
test_context.flush_metrics();
905-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
810+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
906811

907812
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
908813
assert!(sum.is_monotonic, "Should produce monotonic.");
@@ -919,16 +824,16 @@ mod tests {
919824

920825
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
921826
async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
922-
let mut test_context = TestContext::new(Some(Temporality::Delta));
923-
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
827+
let mut test_context = TestContext::new(Temporality::Delta);
828+
let counter = test_context.u64_counter("test", "my_counter", None);
924829

925830
counter.add(50, &[]);
926831
test_context.flush_metrics();
927-
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
832+
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
928833

929834
counter.add(50, &[KeyValue::new("a", "b")]);
930835
test_context.flush_metrics();
931-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
836+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
932837

933838
let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
934839

@@ -938,27 +843,17 @@ mod tests {
938843
);
939844
}
940845

941-
// "multi_thread" tokio flavor must be used else flush won't
942-
// be able to make progress!
943846
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
944847
#[ignore = "Known bug: https://github.com/open-telemetry/opentelemetry-rust/issues/1598"]
945848
async fn delta_memory_efficiency_test() {
946849
// Run this test with stdout enabled to see output.
947850
// cargo test delta_memory_efficiency_test --features=metrics,testing -- --nocapture
948851

949852
// Arrange
950-
let exporter = InMemoryMetricsExporterBuilder::new()
951-
.with_temporality_selector(DeltaTemporalitySelector())
952-
.build();
953-
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
954-
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
853+
let mut test_context = TestContext::new(Temporality::Delta);
854+
let counter = test_context.u64_counter("test", "my_counter", None);
955855

956856
// Act
957-
let meter = meter_provider.meter("test");
958-
let counter = meter
959-
.u64_counter("my_counter")
960-
.with_unit(Unit::new("my_unit"))
961-
.init();
962857
counter.add(1, &[KeyValue::new("key1", "value1")]);
963858
counter.add(1, &[KeyValue::new("key1", "value1")]);
964859
counter.add(1, &[KeyValue::new("key1", "value1")]);
@@ -968,78 +863,137 @@ mod tests {
968863
counter.add(1, &[KeyValue::new("key1", "value2")]);
969864
counter.add(1, &[KeyValue::new("key1", "value2")]);
970865
counter.add(1, &[KeyValue::new("key1", "value2")]);
866+
test_context.flush_metrics();
971867

972-
meter_provider.force_flush().unwrap();
973-
974-
// Assert
975-
let resource_metrics = exporter
976-
.get_finished_metrics()
977-
.expect("metrics are expected to be exported.");
978-
assert!(!resource_metrics.is_empty());
979-
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
980-
assert_eq!(metric.name, "my_counter");
981-
assert_eq!(metric.unit.as_str(), "my_unit");
982-
let sum = metric
983-
.data
984-
.as_any()
985-
.downcast_ref::<data::Sum<u64>>()
986-
.expect("Sum aggregation expected for Counter instruments by default");
868+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
987869

988870
// Expecting 2 time-series.
989871
assert_eq!(sum.data_points.len(), 2);
990-
assert!(sum.is_monotonic, "Counter should produce monotonic.");
991-
assert_eq!(
992-
sum.temporality,
993-
data::Temporality::Delta,
994-
"Should produce Delta as configured"
995-
);
996872

997873
// find and validate key1=value1 datapoint
998-
let mut data_point1 = None;
999-
for datapoint in &sum.data_points {
1000-
if datapoint
1001-
.attributes
1002-
.iter()
1003-
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1")
1004-
{
1005-
data_point1 = Some(datapoint);
1006-
}
1007-
}
1008-
assert_eq!(
1009-
data_point1
1010-
.expect("datapoint with key1=value1 expected")
1011-
.value,
1012-
5
1013-
);
874+
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
875+
.expect("datapoint with key1=value1 expected");
876+
assert_eq!(data_point1.value, 5);
1014877

1015878
// find and validate key1=value2 datapoint
1016-
let mut data_point1 = None;
1017-
for datapoint in &sum.data_points {
1018-
if datapoint
1019-
.attributes
1020-
.iter()
1021-
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2")
1022-
{
1023-
data_point1 = Some(datapoint);
1024-
}
1025-
}
1026-
assert_eq!(
1027-
data_point1
1028-
.expect("datapoint with key1=value2 expected")
1029-
.value,
1030-
3
1031-
);
879+
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
880+
.expect("datapoint with key1=value2 expected");
881+
assert_eq!(data_point1.value, 3);
1032882

1033883
// flush again, and validate that nothing is flushed
1034884
// as delta temporality.
1035-
meter_provider.force_flush().unwrap();
1036-
let resource_metrics = exporter
885+
test_context.flush_metrics();
886+
887+
let resource_metrics = test_context
888+
.exporter
1037889
.get_finished_metrics()
1038890
.expect("metrics are expected to be exported.");
1039891
println!("resource_metrics: {:?}", resource_metrics);
1040892
assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1041893
}
1042894

895+
fn counter_aggregation_helper(temporality: Temporality) {
896+
// Arrange
897+
let mut test_context = TestContext::new(temporality);
898+
let counter = test_context.u64_counter("test", "my_counter", None);
899+
900+
// Act
901+
counter.add(1, &[KeyValue::new("key1", "value1")]);
902+
counter.add(1, &[KeyValue::new("key1", "value1")]);
903+
counter.add(1, &[KeyValue::new("key1", "value1")]);
904+
counter.add(1, &[KeyValue::new("key1", "value1")]);
905+
counter.add(1, &[KeyValue::new("key1", "value1")]);
906+
907+
counter.add(1, &[KeyValue::new("key1", "value2")]);
908+
counter.add(1, &[KeyValue::new("key1", "value2")]);
909+
counter.add(1, &[KeyValue::new("key1", "value2")]);
910+
911+
test_context.flush_metrics();
912+
913+
// Assert
914+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
915+
// Expecting 2 time-series.
916+
assert_eq!(sum.data_points.len(), 2);
917+
assert!(sum.is_monotonic, "Counter should produce monotonic.");
918+
if let Temporality::Cumulative = temporality {
919+
assert_eq!(
920+
sum.temporality,
921+
Temporality::Cumulative,
922+
"Should produce cumulative"
923+
);
924+
} else {
925+
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
926+
}
927+
928+
// find and validate key1=value2 datapoint
929+
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
930+
.expect("datapoint with key1=value1 expected");
931+
assert_eq!(data_point1.value, 5);
932+
933+
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
934+
.expect("datapoint with key1=value2 expected");
935+
assert_eq!(data_point1.value, 3);
936+
}
937+
938+
fn updown_counter_aggregation_helper(temporality: Temporality) {
939+
// Arrange
940+
let mut test_context = TestContext::new(temporality);
941+
let counter = test_context.i64_up_down_counter("test", "my_counter", None);
942+
943+
// Act
944+
counter.add(1, &[KeyValue::new("key1", "value1")]);
945+
counter.add(1, &[KeyValue::new("key1", "value1")]);
946+
counter.add(1, &[KeyValue::new("key1", "value1")]);
947+
counter.add(1, &[KeyValue::new("key1", "value1")]);
948+
counter.add(1, &[KeyValue::new("key1", "value1")]);
949+
950+
counter.add(1, &[KeyValue::new("key1", "value2")]);
951+
counter.add(1, &[KeyValue::new("key1", "value2")]);
952+
counter.add(1, &[KeyValue::new("key1", "value2")]);
953+
954+
test_context.flush_metrics();
955+
956+
// Assert
957+
let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", None);
958+
// Expecting 2 time-series.
959+
assert_eq!(sum.data_points.len(), 2);
960+
assert!(
961+
!sum.is_monotonic,
962+
"UpDownCounter should produce non-monotonic."
963+
);
964+
if let Temporality::Cumulative = temporality {
965+
assert_eq!(
966+
sum.temporality,
967+
Temporality::Cumulative,
968+
"Should produce cumulative"
969+
);
970+
} else {
971+
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
972+
}
973+
974+
// find and validate key1=value2 datapoint
975+
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
976+
.expect("datapoint with key1=value1 expected");
977+
assert_eq!(data_point1.value, 5);
978+
979+
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
980+
.expect("datapoint with key1=value2 expected");
981+
assert_eq!(data_point1.value, 3);
982+
}
983+
984+
fn find_datapoint_with_key_value<'a, T>(
985+
data_points: &'a [DataPoint<T>],
986+
key: &str,
987+
value: &str,
988+
) -> Option<&'a DataPoint<T>> {
989+
data_points.iter().find(|&datapoint| {
990+
datapoint
991+
.attributes
992+
.iter()
993+
.any(|(k, v)| k.as_str() == key && v.as_str() == value)
994+
})
995+
}
996+
1043997
fn find_scope_metric<'a>(
1044998
metrics: &'a [ScopeMetrics],
1045999
name: &'a str,
@@ -1049,13 +1003,6 @@ mod tests {
10491003
.find(|&scope_metric| scope_metric.scope.name == name)
10501004
}
10511005

1052-
struct DeltaTemporalitySelector();
1053-
impl TemporalitySelector for DeltaTemporalitySelector {
1054-
fn temporality(&self, _kind: InstrumentKind) -> Temporality {
1055-
Temporality::Delta
1056-
}
1057-
}
1058-
10591006
struct TestContext {
10601007
exporter: InMemoryMetricsExporter,
10611008
meter_provider: SdkMeterProvider,
@@ -1065,7 +1012,7 @@ mod tests {
10651012
}
10661013

10671014
impl TestContext {
1068-
fn new(temporality: Option<Temporality>) -> Self {
1015+
fn new(temporality: Temporality) -> Self {
10691016
struct TestTemporalitySelector(Temporality);
10701017
impl TemporalitySelector for TestTemporalitySelector {
10711018
fn temporality(&self, _kind: InstrumentKind) -> Temporality {
@@ -1074,9 +1021,7 @@ mod tests {
10741021
}
10751022

10761023
let mut exporter = InMemoryMetricsExporterBuilder::new();
1077-
if let Some(temporality) = temporality {
1078-
exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality));
1079-
}
1024+
exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality));
10801025

10811026
let exporter = exporter.build();
10821027
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
@@ -1093,26 +1038,28 @@ mod tests {
10931038
&self,
10941039
meter_name: &'static str,
10951040
counter_name: &'static str,
1096-
unit_name: &'static str,
1041+
unit: Option<&'static str>,
10971042
) -> Counter<u64> {
1098-
self.meter_provider
1099-
.meter(meter_name)
1100-
.u64_counter(counter_name)
1101-
.with_unit(Unit::new(unit_name))
1102-
.init()
1043+
let meter = self.meter_provider.meter(meter_name);
1044+
let mut counter_builder = meter.u64_counter(counter_name);
1045+
if let Some(unit_name) = unit {
1046+
counter_builder = counter_builder.with_unit(Unit::new(unit_name));
1047+
}
1048+
counter_builder.init()
11031049
}
11041050

11051051
fn i64_up_down_counter(
11061052
&self,
11071053
meter_name: &'static str,
11081054
counter_name: &'static str,
1109-
unit_name: &'static str,
1055+
unit: Option<&'static str>,
11101056
) -> UpDownCounter<i64> {
1111-
self.meter_provider
1112-
.meter(meter_name)
1113-
.i64_up_down_counter(counter_name)
1114-
.with_unit(Unit::new(unit_name))
1115-
.init()
1057+
let meter = self.meter_provider.meter(meter_name);
1058+
let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
1059+
if let Some(unit_name) = unit {
1060+
updown_counter_builder = updown_counter_builder.with_unit(Unit::new(unit_name));
1061+
}
1062+
updown_counter_builder.init()
11161063
}
11171064

11181065
fn flush_metrics(&self) {
@@ -1122,7 +1069,7 @@ mod tests {
11221069
fn get_aggregation<T: data::Aggregation>(
11231070
&mut self,
11241071
counter_name: &str,
1125-
unit_name: &str,
1072+
unit_name: Option<&str>,
11261073
) -> &T {
11271074
self.resource_metrics = self
11281075
.exporter
@@ -1145,7 +1092,9 @@ mod tests {
11451092

11461093
let metric = &resource_metric.scope_metrics[0].metrics[0];
11471094
assert_eq!(metric.name, counter_name);
1148-
assert_eq!(metric.unit.as_str(), unit_name);
1095+
if let Some(expected_unit) = unit_name {
1096+
assert_eq!(metric.unit.as_str(), expected_unit);
1097+
}
11491098

11501099
metric
11511100
.data

0 commit comments

Comments
 (0)
Please sign in to comment.