Skip to content

Commit 845f2f0

Browse files
authored
Simpler usage of Observable instruments (#1715)
1 parent 348ec9e commit 845f2f0

File tree

4 files changed

+148
-84
lines changed

4 files changed

+148
-84
lines changed

examples/metrics-basic/src/main.rs

+43-52
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,21 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
4343
);
4444

4545
// Create a ObservableCounter instrument and register a callback that reports the measurement.
46-
let observable_counter = meter
46+
let _observable_counter = meter
4747
.u64_observable_counter("my_observable_counter")
4848
.with_description("My observable counter example description")
4949
.with_unit(Unit::new("myunit"))
50+
.with_callback(|observer| {
51+
observer.observe(
52+
100,
53+
&[
54+
KeyValue::new("mykey1", "myvalue1"),
55+
KeyValue::new("mykey2", "myvalue2"),
56+
],
57+
)
58+
})
5059
.init();
5160

52-
meter.register_callback(&[observable_counter.as_any()], move |observer| {
53-
observer.observe_u64(
54-
&observable_counter,
55-
100,
56-
&[
57-
KeyValue::new("mykey1", "myvalue1"),
58-
KeyValue::new("mykey2", "myvalue2"),
59-
],
60-
)
61-
})?;
62-
6361
// Create a UpCounter Instrument.
6462
let updown_counter = meter.i64_up_down_counter("my_updown_counter").init();
6563

@@ -73,23 +71,21 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
7371
);
7472

7573
// Create a Observable UpDownCounter instrument and register a callback that reports the measurement.
76-
let observable_up_down_counter = meter
74+
let _observable_up_down_counter = meter
7775
.i64_observable_up_down_counter("my_observable_updown_counter")
7876
.with_description("My observable updown counter example description")
7977
.with_unit(Unit::new("myunit"))
78+
.with_callback(|observer| {
79+
observer.observe(
80+
100,
81+
&[
82+
KeyValue::new("mykey1", "myvalue1"),
83+
KeyValue::new("mykey2", "myvalue2"),
84+
],
85+
)
86+
})
8087
.init();
8188

82-
meter.register_callback(&[observable_up_down_counter.as_any()], move |observer| {
83-
observer.observe_i64(
84-
&observable_up_down_counter,
85-
100,
86-
&[
87-
KeyValue::new("mykey1", "myvalue1"),
88-
KeyValue::new("mykey2", "myvalue2"),
89-
],
90-
)
91-
})?;
92-
9389
// Create a Histogram Instrument.
9490
let histogram = meter
9591
.f64_histogram("my_histogram")
@@ -108,41 +104,36 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
108104
// Note that there is no ObservableHistogram instrument.
109105

110106
// Create a Gauge Instrument.
111-
{
112-
let gauge = meter
113-
.f64_gauge("my_gauge")
114-
.with_description("A gauge set to 1.0")
115-
.with_unit(Unit::new("myunit"))
116-
.init();
117-
118-
gauge.record(
119-
1.0,
120-
&[
121-
KeyValue::new("mykey1", "myvalue1"),
122-
KeyValue::new("mykey2", "myvalue2"),
123-
],
124-
);
125-
}
107+
let gauge = meter
108+
.f64_gauge("my_gauge")
109+
.with_description("A gauge set to 1.0")
110+
.with_unit(Unit::new("myunit"))
111+
.init();
112+
113+
gauge.record(
114+
1.0,
115+
&[
116+
KeyValue::new("mykey1", "myvalue1"),
117+
KeyValue::new("mykey2", "myvalue2"),
118+
],
119+
);
126120

127121
// Create a ObservableGauge instrument and register a callback that reports the measurement.
128-
let observable_gauge = meter
122+
let _observable_gauge = meter
129123
.f64_observable_gauge("my_observable_gauge")
130124
.with_description("An observable gauge set to 1.0")
131125
.with_unit(Unit::new("myunit"))
126+
.with_callback(|observer| {
127+
observer.observe(
128+
1.0,
129+
&[
130+
KeyValue::new("mykey1", "myvalue1"),
131+
KeyValue::new("mykey2", "myvalue2"),
132+
],
133+
)
134+
})
132135
.init();
133136

134-
// Register a callback that reports the measurement.
135-
meter.register_callback(&[observable_gauge.as_any()], move |observer| {
136-
observer.observe_f64(
137-
&observable_gauge,
138-
1.0,
139-
&[
140-
KeyValue::new("mykey1", "myvalue1"),
141-
KeyValue::new("mykey2", "myvalue2"),
142-
],
143-
)
144-
})?;
145-
146137
// Metrics are exported by default every 30 seconds when using stdout exporter,
147138
// however shutting down the MeterProvider here instantly flushes
148139
// the metrics, instead of waiting for the 30 sec interval.

opentelemetry-otlp/examples/basic-otlp/src/main.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
115115
let tracer = global::tracer("ex.com/basic");
116116
let meter = global::meter("ex.com/basic");
117117

118-
let gauge = meter
118+
let _gauge = meter
119119
.f64_observable_gauge("ex.com.one")
120120
.with_description("A gauge set to 1.0")
121+
.with_callback(|observer| observer.observe(1.0, COMMON_ATTRIBUTES.as_ref()))
121122
.init();
122123

123-
meter.register_callback(&[gauge.as_any()], move |observer| {
124-
observer.observe_f64(&gauge, 1.0, COMMON_ATTRIBUTES.as_ref())
125-
})?;
126-
127124
let histogram = meter.f64_histogram("ex.com.two").init();
128125
histogram.record(5.5, COMMON_ATTRIBUTES.as_ref());
129126

opentelemetry-sdk/src/metrics/mod.rs

+99-23
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,91 @@ mod tests {
163163
);
164164
}
165165

166+
// "multi_thread" tokio flavor must be used else flush won't
167+
// be able to make progress!
168+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
169+
async fn observable_counter_aggregation() {
170+
// Run this test with stdout enabled to see output.
171+
// cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture
172+
173+
// Arrange
174+
let exporter = InMemoryMetricsExporter::default();
175+
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
176+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
177+
178+
// Act
179+
let meter = meter_provider.meter("test");
180+
let _counter = meter
181+
.u64_observable_counter("my_observable_counter")
182+
.with_unit(Unit::new("my_unit"))
183+
.with_callback(|observer| {
184+
observer.observe(100, &[KeyValue::new("key1", "value1")]);
185+
observer.observe(200, &[KeyValue::new("key1", "value2")]);
186+
})
187+
.init();
188+
189+
meter_provider.force_flush().unwrap();
190+
191+
// Assert
192+
let resource_metrics = exporter
193+
.get_finished_metrics()
194+
.expect("metrics are expected to be exported.");
195+
assert!(!resource_metrics.is_empty());
196+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
197+
assert_eq!(metric.name, "my_observable_counter");
198+
assert_eq!(metric.unit.as_str(), "my_unit");
199+
let sum = metric
200+
.data
201+
.as_any()
202+
.downcast_ref::<data::Sum<u64>>()
203+
.expect("Sum aggregation expected for ObservableCounter instruments by default");
204+
205+
// Expecting 2 time-series.
206+
assert_eq!(sum.data_points.len(), 2);
207+
assert!(sum.is_monotonic, "Counter should produce monotonic.");
208+
assert_eq!(
209+
sum.temporality,
210+
data::Temporality::Cumulative,
211+
"Should produce cumulative by default."
212+
);
213+
214+
// find and validate key1=value1 datapoint
215+
let mut data_point1 = None;
216+
for datapoint in &sum.data_points {
217+
if datapoint
218+
.attributes
219+
.iter()
220+
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1")
221+
{
222+
data_point1 = Some(datapoint);
223+
}
224+
}
225+
assert_eq!(
226+
data_point1
227+
.expect("datapoint with key1=value1 expected")
228+
.value,
229+
100
230+
);
231+
232+
// find and validate key1=value2 datapoint
233+
let mut data_point1 = None;
234+
for datapoint in &sum.data_points {
235+
if datapoint
236+
.attributes
237+
.iter()
238+
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2")
239+
{
240+
data_point1 = Some(datapoint);
241+
}
242+
}
243+
assert_eq!(
244+
data_point1
245+
.expect("datapoint with key1=value2 expected")
246+
.value,
247+
200
248+
);
249+
}
250+
166251
// "multi_thread" tokio flavor must be used else flush won't
167252
// be able to make progress!
168253
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
@@ -445,7 +530,7 @@ mod tests {
445530
// "multi_thread" tokio flavor must be used else flush won't
446531
// be able to make progress!
447532
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
448-
#[ignore = "Spatial aggregation is not yet implemented."]
533+
// #[ignore = "Spatial aggregation is not yet implemented."]
449534
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
450535
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing
451536

@@ -465,43 +550,34 @@ mod tests {
465550

466551
// Act
467552
let meter = meter_provider.meter("test");
468-
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();
469-
470-
// Normally, these callbacks would generate 3 time-series, but since the view
471-
// drops all attributes, we expect only 1 time-series.
472-
meter
473-
.register_callback(&[observable_counter.as_any()], move |observer| {
474-
observer.observe_u64(
475-
&observable_counter,
553+
let _observable_counter = meter
554+
.u64_observable_counter("my_observable_counter")
555+
.with_callback(|observer| {
556+
observer.observe(
476557
100,
477-
[
558+
&[
478559
KeyValue::new("statusCode", "200"),
479560
KeyValue::new("verb", "get"),
480-
]
481-
.as_ref(),
561+
],
482562
);
483563

484-
observer.observe_u64(
485-
&observable_counter,
564+
observer.observe(
486565
100,
487-
[
566+
&[
488567
KeyValue::new("statusCode", "200"),
489568
KeyValue::new("verb", "post"),
490-
]
491-
.as_ref(),
569+
],
492570
);
493571

494-
observer.observe_u64(
495-
&observable_counter,
572+
observer.observe(
496573
100,
497-
[
574+
&[
498575
KeyValue::new("statusCode", "500"),
499576
KeyValue::new("verb", "get"),
500-
]
501-
.as_ref(),
577+
],
502578
);
503579
})
504-
.expect("Expected to register callback");
580+
.init();
505581

506582
meter_provider.force_flush().unwrap();
507583

opentelemetry-sdk/src/metrics/periodic_reader.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -428,12 +428,12 @@ mod tests {
428428
// Act
429429
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
430430
let meter = meter_provider.meter("test");
431-
let counter = meter.u64_observable_counter("testcounter").init();
432-
meter
433-
.register_callback(&[counter.as_any()], move |_| {
431+
let _counter = meter
432+
.u64_observable_counter("testcounter")
433+
.with_callback(move |_| {
434434
sender.send(()).expect("channel should still be open");
435435
})
436-
.expect("callback registration should succeed");
436+
.init();
437437

438438
_ = meter_provider.force_flush();
439439

0 commit comments

Comments
 (0)