Skip to content

Commit 9e2e1b4

Browse files
authored
Merge branch 'main' into cijothomas/test-for-observablecounter
2 parents 9bcdae8 + 3f327a1 commit 9e2e1b4

File tree

6 files changed

+131
-105
lines changed

6 files changed

+131
-105
lines changed

examples/metrics-advanced/src/main.rs

+10-15
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
8080
// Record measurements using the histogram instrument.
8181
histogram.record(
8282
10.5,
83-
[
83+
&[
8484
KeyValue::new("mykey1", "myvalue1"),
8585
KeyValue::new("mykey2", "myvalue2"),
8686
KeyValue::new("mykey3", "myvalue3"),
8787
KeyValue::new("mykey4", "myvalue4"),
88-
]
89-
.as_ref(),
88+
],
9089
);
9190

9291
// Example 2 - Drop unwanted attributes using view.
@@ -98,13 +97,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
9897
// attribute.
9998
counter.add(
10099
10,
101-
[
100+
&[
102101
KeyValue::new("mykey1", "myvalue1"),
103102
KeyValue::new("mykey2", "myvalue2"),
104103
KeyValue::new("mykey3", "myvalue3"),
105104
KeyValue::new("mykey4", "myvalue4"),
106-
]
107-
.as_ref(),
105+
],
108106
);
109107

110108
// Example 3 - Change Aggregation configuration using View.
@@ -122,35 +120,32 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
122120
// the change of boundaries.
123121
histogram2.record(
124122
1.5,
125-
[
123+
&[
126124
KeyValue::new("mykey1", "myvalue1"),
127125
KeyValue::new("mykey2", "myvalue2"),
128126
KeyValue::new("mykey3", "myvalue3"),
129127
KeyValue::new("mykey4", "myvalue4"),
130-
]
131-
.as_ref(),
128+
],
132129
);
133130

134131
histogram2.record(
135132
1.2,
136-
[
133+
&[
137134
KeyValue::new("mykey1", "myvalue1"),
138135
KeyValue::new("mykey2", "myvalue2"),
139136
KeyValue::new("mykey3", "myvalue3"),
140137
KeyValue::new("mykey4", "myvalue4"),
141-
]
142-
.as_ref(),
138+
],
143139
);
144140

145141
histogram2.record(
146142
1.23,
147-
[
143+
&[
148144
KeyValue::new("mykey1", "myvalue1"),
149145
KeyValue::new("mykey2", "myvalue2"),
150146
KeyValue::new("mykey3", "myvalue3"),
151147
KeyValue::new("mykey4", "myvalue4"),
152-
]
153-
.as_ref(),
148+
],
154149
);
155150

156151
// Metrics are exported by default every 30 seconds when using stdout exporter,

examples/metrics-basic/src/main.rs

+14-21
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
3434
// Record measurements using the Counter instrument.
3535
counter.add(
3636
10,
37-
[
37+
&[
3838
KeyValue::new("mykey1", "myvalue1"),
3939
KeyValue::new("mykey2", "myvalue2"),
40-
]
41-
.as_ref(),
40+
],
4241
);
4342

4443
// Create a ObservableCounter instrument and register a callback that reports the measurement.
@@ -52,11 +51,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
5251
observer.observe_u64(
5352
&observable_counter,
5453
100,
55-
[
54+
&[
5655
KeyValue::new("mykey1", "myvalue1"),
5756
KeyValue::new("mykey2", "myvalue2"),
58-
]
59-
.as_ref(),
57+
],
6058
)
6159
})?;
6260

@@ -66,11 +64,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
6664
// Record measurements using the UpCounter instrument.
6765
updown_counter.add(
6866
-10,
69-
[
67+
&[
7068
KeyValue::new("mykey1", "myvalue1"),
7169
KeyValue::new("mykey2", "myvalue2"),
72-
]
73-
.as_ref(),
70+
],
7471
);
7572

7673
// Create a Observable UpDownCounter instrument and register a callback that reports the measurement.
@@ -84,11 +81,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
8481
observer.observe_i64(
8582
&observable_up_down_counter,
8683
100,
87-
[
84+
&[
8885
KeyValue::new("mykey1", "myvalue1"),
8986
KeyValue::new("mykey2", "myvalue2"),
90-
]
91-
.as_ref(),
87+
],
9288
)
9389
})?;
9490

@@ -101,11 +97,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
10197
// Record measurements using the histogram instrument.
10298
histogram.record(
10399
10.5,
104-
[
100+
&[
105101
KeyValue::new("mykey1", "myvalue1"),
106102
KeyValue::new("mykey2", "myvalue2"),
107-
]
108-
.as_ref(),
103+
],
109104
);
110105

111106
// Note that there is no ObservableHistogram instrument.
@@ -122,11 +117,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
122117

123118
gauge.record(
124119
1.0,
125-
[
120+
&[
126121
KeyValue::new("mykey1", "myvalue1"),
127122
KeyValue::new("mykey2", "myvalue2"),
128-
]
129-
.as_ref(),
123+
],
130124
);
131125
}
132126

@@ -142,11 +136,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
142136
observer.observe_f64(
143137
&observable_gauge,
144138
1.0,
145-
[
139+
&[
146140
KeyValue::new("mykey1", "myvalue1"),
147141
KeyValue::new("mykey2", "myvalue2"),
148-
]
149-
.as_ref(),
142+
],
150143
)
151144
})?;
152145

opentelemetry-otlp/tests/integration_test/src/asserter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ impl TraceAsserter {
1919
self.assert_resource_span_eq(&self.results, &self.expected);
2020
}
2121

22-
fn assert_resource_span_eq(&self, results: &Vec<ResourceSpans>, expected: &Vec<ResourceSpans>) {
22+
fn assert_resource_span_eq(&self, results: &[ResourceSpans], expected: &[ResourceSpans]) {
2323
let mut results_spans = Vec::new();
2424
let mut expected_spans = Vec::new();
2525

opentelemetry-sdk/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646
- `LoggerProviderInner` is no longer `pub (crate)`
4747
- `Logger.provider()` now returns `&LoggerProvider` instead of an `Option<LoggerProvider>`
4848

49+
### Fixed
50+
51+
- [#1481](https://github.com/open-telemetry/opentelemetry-rust/pull/1481) Fix error message caused by race condition when using PeriodicReader
52+
4953
## v0.21.2
5054

5155
### Fixed

opentelemetry-sdk/src/metrics/periodic_reader.rs

+76-42
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
env, fmt,
2+
env, fmt, mem,
33
sync::{Arc, Mutex, Weak},
44
time::Duration,
55
};
@@ -127,37 +127,39 @@ where
127127
/// Create a [PeriodicReader] with the given config.
128128
pub fn build(self) -> PeriodicReader {
129129
let (message_sender, message_receiver) = mpsc::channel(256);
130-
let ticker = self
131-
.runtime
132-
.interval(self.interval)
133-
.map(|_| Message::Export);
134130

135-
let messages = Box::pin(stream::select(message_receiver, ticker));
136-
let reader = PeriodicReader {
131+
let worker = move |reader: &PeriodicReader| {
132+
let ticker = self
133+
.runtime
134+
.interval(self.interval)
135+
.map(|_| Message::Export);
136+
137+
let messages = Box::pin(stream::select(message_receiver, ticker));
138+
139+
let runtime = self.runtime.clone();
140+
self.runtime.spawn(Box::pin(
141+
PeriodicReaderWorker {
142+
reader: reader.clone(),
143+
timeout: self.timeout,
144+
runtime,
145+
rm: ResourceMetrics {
146+
resource: Resource::empty(),
147+
scope_metrics: Vec::new(),
148+
},
149+
}
150+
.run(messages),
151+
));
152+
};
153+
154+
PeriodicReader {
137155
exporter: Arc::new(self.exporter),
138156
inner: Arc::new(Mutex::new(PeriodicReaderInner {
139157
message_sender,
140-
sdk_producer: None,
141158
is_shutdown: false,
142159
external_producers: self.producers,
160+
sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
143161
})),
144-
};
145-
146-
let runtime = self.runtime.clone();
147-
self.runtime.spawn(Box::pin(
148-
PeriodicReaderWorker {
149-
reader: reader.clone(),
150-
timeout: self.timeout,
151-
runtime,
152-
rm: ResourceMetrics {
153-
resource: Resource::empty(),
154-
scope_metrics: Vec::new(),
155-
},
156-
}
157-
.run(messages),
158-
));
159-
160-
reader
162+
}
161163
}
162164
}
163165

@@ -223,9 +225,9 @@ impl fmt::Debug for PeriodicReader {
223225

224226
struct PeriodicReaderInner {
225227
message_sender: mpsc::Sender<Message>,
226-
sdk_producer: Option<Weak<dyn SdkProducer>>,
227228
is_shutdown: bool,
228229
external_producers: Vec<Box<dyn MetricProducer>>,
230+
sdk_producer_or_worker: ProducerOrWorker,
229231
}
230232

231233
#[derive(Debug)]
@@ -235,6 +237,11 @@ enum Message {
235237
Shutdown(oneshot::Sender<Result<()>>),
236238
}
237239

240+
enum ProducerOrWorker {
241+
Producer(Weak<dyn SdkProducer>),
242+
Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
243+
}
244+
238245
struct PeriodicReaderWorker<RT: Runtime> {
239246
reader: PeriodicReader,
240247
timeout: Duration,
@@ -311,14 +318,19 @@ impl MetricReader for PeriodicReader {
311318
Err(_) => return,
312319
};
313320

314-
// Only register once. If producer is already set, do nothing.
315-
if inner.sdk_producer.is_none() {
316-
inner.sdk_producer = Some(pipeline);
317-
} else {
318-
global::handle_error(MetricsError::Other(
319-
"duplicate meter registration, did not register manual reader".into(),
320-
))
321-
}
321+
let worker = match &mut inner.sdk_producer_or_worker {
322+
ProducerOrWorker::Producer(_) => {
323+
// Only register once. If producer is already set, do nothing.
324+
global::handle_error(MetricsError::Other(
325+
"duplicate meter registration, did not register manual reader".into(),
326+
));
327+
return;
328+
}
329+
ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
330+
};
331+
332+
inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
333+
worker(self);
322334
}
323335

324336
fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
@@ -327,14 +339,14 @@ impl MetricReader for PeriodicReader {
327339
return Err(MetricsError::Other("reader is shut down".into()));
328340
}
329341

330-
match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
331-
Some(producer) => producer.produce(rm)?,
332-
None => {
333-
return Err(MetricsError::Other(
334-
"reader is shut down or not registered".into(),
335-
))
336-
}
337-
};
342+
if let Some(producer) = match &inner.sdk_producer_or_worker {
343+
ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
344+
ProducerOrWorker::Worker(_) => None,
345+
} {
346+
producer.produce(rm)?;
347+
} else {
348+
return Err(MetricsError::Other("reader is not registered".into()));
349+
}
338350

339351
let mut errs = vec![];
340352
for producer in &inner.external_producers {
@@ -392,3 +404,25 @@ impl MetricReader for PeriodicReader {
392404
shutdown_result
393405
}
394406
}
407+
408+
#[cfg(all(test, feature = "testing"))]
409+
mod tests {
410+
use super::PeriodicReader;
411+
use crate::{
412+
metrics::data::ResourceMetrics, metrics::reader::MetricReader, runtime,
413+
testing::metrics::InMemoryMetricsExporter, Resource,
414+
};
415+
416+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
417+
async fn unregistered_collect() {
418+
let exporter = InMemoryMetricsExporter::default();
419+
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
420+
421+
let mut rm = ResourceMetrics {
422+
resource: Resource::empty(),
423+
scope_metrics: Vec::new(),
424+
};
425+
let result = reader.collect(&mut rm);
426+
result.expect_err("error expected when reader is not registered");
427+
}
428+
}

0 commit comments

Comments
 (0)