Skip to content

Commit d772d38

Browse files
authored
Merge branch 'main' into addlink
2 parents 971db9f + a80dedf commit d772d38

File tree

9 files changed

+339
-162
lines changed

9 files changed

+339
-162
lines changed

opentelemetry-sdk/CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## vNext
44

5+
- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
6+
dependency on crossbeam-channel.
7+
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)
8+
59
## v0.22.1
610

711
### Fixed

opentelemetry-sdk/Cargo.toml

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" }
1414
opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true }
1515
async-std = { workspace = true, features = ["unstable"], optional = true }
1616
async-trait = { workspace = true, optional = true }
17-
crossbeam-channel = { version = "0.5", optional = true }
1817
futures-channel = "0.3"
1918
futures-executor = { workspace = true }
2019
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
@@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
4544

4645
[features]
4746
default = ["trace"]
48-
trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"]
47+
trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"]
4948
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
50-
logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"]
49+
logs = ["opentelemetry/logs", "async-trait", "serde_json"]
5150
logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"]
5251
metrics = ["opentelemetry/metrics", "glob", "async-trait"]
5352
testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]

opentelemetry-sdk/src/logs/log_processor.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug {
5252
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
5353
}
5454

55-
/// A [`LogProcessor`] that exports synchronously when logs are emitted.
56-
///
57-
/// # Examples
58-
///
59-
/// Note that the simple processor exports synchronously every time a log is
60-
/// emitted. If you find this limiting, consider the batch processor instead.
55+
/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
56+
/// as they are emitted, without any batching. This is typically useful for
57+
/// debugging and testing. For scenarios requiring higher
58+
/// performance/throughput, consider using [BatchLogProcessor].
6159
#[derive(Debug)]
6260
pub struct SimpleLogProcessor {
6361
exporter: Mutex<Box<dyn LogExporter>>,

opentelemetry-sdk/src/metrics/mod.rs

+184
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub use view::*;
6060

6161
#[cfg(all(test, feature = "testing"))]
6262
mod tests {
63+
use self::data::ScopeMetrics;
6364
use super::*;
6465
use crate::metrics::data::{ResourceMetrics, Temporality};
6566
use crate::metrics::reader::TemporalitySelector;
@@ -70,6 +71,7 @@ mod tests {
7071
metrics::{MeterProvider as _, Unit},
7172
KeyValue,
7273
};
74+
use std::borrow::Cow;
7375

7476
// "multi_thread" tokio flavor must be used else flush won't
7577
// be able to make progress!
@@ -214,6 +216,179 @@ mod tests {
214216
assert_eq!(datapoint.value, 15);
215217
}
216218

219+
// "multi_thread" tokio flavor must be used else flush won't
220+
// be able to make progress!
221+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
222+
async fn counter_duplicate_instrument_different_meter_no_merge() {
223+
// Arrange
224+
let exporter = InMemoryMetricsExporter::default();
225+
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
226+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
227+
228+
// Act
229+
let meter1 = meter_provider.meter("test.meter1");
230+
let meter2 = meter_provider.meter("test.meter2");
231+
let counter1 = meter1
232+
.u64_counter("my_counter")
233+
.with_unit(Unit::new("my_unit"))
234+
.with_description("my_description")
235+
.init();
236+
237+
let counter2 = meter2
238+
.u64_counter("my_counter")
239+
.with_unit(Unit::new("my_unit"))
240+
.with_description("my_description")
241+
.init();
242+
243+
let attribute = vec![KeyValue::new("key1", "value1")];
244+
counter1.add(10, &attribute);
245+
counter2.add(5, &attribute);
246+
247+
meter_provider.force_flush().unwrap();
248+
249+
// Assert
250+
let resource_metrics = exporter
251+
.get_finished_metrics()
252+
.expect("metrics are expected to be exported.");
253+
assert!(
254+
resource_metrics[0].scope_metrics.len() == 2,
255+
"There should be 2 separate scope"
256+
);
257+
assert!(
258+
resource_metrics[0].scope_metrics[0].metrics.len() == 1,
259+
"There should be single metric for the scope"
260+
);
261+
assert!(
262+
resource_metrics[0].scope_metrics[1].metrics.len() == 1,
263+
"There should be single metric for the scope"
264+
);
265+
266+
let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
267+
let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
268+
269+
if let Some(scope1) = scope1 {
270+
let metric1 = &scope1.metrics[0];
271+
assert_eq!(metric1.name, "my_counter");
272+
assert_eq!(metric1.unit.as_str(), "my_unit");
273+
assert_eq!(metric1.description, "my_description");
274+
let sum1 = metric1
275+
.data
276+
.as_any()
277+
.downcast_ref::<data::Sum<u64>>()
278+
.expect("Sum aggregation expected for Counter instruments by default");
279+
280+
// Expecting 1 time-series.
281+
assert_eq!(sum1.data_points.len(), 1);
282+
283+
let datapoint1 = &sum1.data_points[0];
284+
assert_eq!(datapoint1.value, 10);
285+
} else {
286+
panic!("No MetricScope found for 'test.meter1'");
287+
}
288+
289+
if let Some(scope2) = scope2 {
290+
let metric2 = &scope2.metrics[0];
291+
assert_eq!(metric2.name, "my_counter");
292+
assert_eq!(metric2.unit.as_str(), "my_unit");
293+
assert_eq!(metric2.description, "my_description");
294+
let sum2 = metric2
295+
.data
296+
.as_any()
297+
.downcast_ref::<data::Sum<u64>>()
298+
.expect("Sum aggregation expected for Counter instruments by default");
299+
300+
// Expecting 1 time-series.
301+
assert_eq!(sum2.data_points.len(), 1);
302+
303+
let datapoint2 = &sum2.data_points[0];
304+
assert_eq!(datapoint2.value, 5);
305+
} else {
306+
panic!("No MetricScope found for 'test.meter2'");
307+
}
308+
}
309+
310+
// "multi_thread" tokio flavor must be used else flush won't
311+
// be able to make progress!
312+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
313+
async fn instrumentation_scope_identity_test() {
314+
// Arrange
315+
let exporter = InMemoryMetricsExporter::default();
316+
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
317+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
318+
319+
// Act
320+
// Meters are identical except for scope attributes, but scope attributes are not an identifying property.
321+
// Hence there should be a single metric stream output for this test.
322+
let meter1 = meter_provider.versioned_meter(
323+
"test.meter",
324+
Some("v0.1.0"),
325+
Some("schema_url"),
326+
Some(vec![KeyValue::new("key", "value1")]),
327+
);
328+
let meter2 = meter_provider.versioned_meter(
329+
"test.meter",
330+
Some("v0.1.0"),
331+
Some("schema_url"),
332+
Some(vec![KeyValue::new("key", "value2")]),
333+
);
334+
let counter1 = meter1
335+
.u64_counter("my_counter")
336+
.with_unit(Unit::new("my_unit"))
337+
.with_description("my_description")
338+
.init();
339+
340+
let counter2 = meter2
341+
.u64_counter("my_counter")
342+
.with_unit(Unit::new("my_unit"))
343+
.with_description("my_description")
344+
.init();
345+
346+
let attribute = vec![KeyValue::new("key1", "value1")];
347+
counter1.add(10, &attribute);
348+
counter2.add(5, &attribute);
349+
350+
meter_provider.force_flush().unwrap();
351+
352+
// Assert
353+
let resource_metrics = exporter
354+
.get_finished_metrics()
355+
.expect("metrics are expected to be exported.");
356+
println!("resource_metrics: {:?}", resource_metrics);
357+
assert!(
358+
resource_metrics[0].scope_metrics.len() == 1,
359+
"There should be a single scope as the meters are identical"
360+
);
361+
assert!(
362+
resource_metrics[0].scope_metrics[0].metrics.len() == 1,
363+
"There should be single metric for the scope as instruments are identical"
364+
);
365+
366+
let scope = &resource_metrics[0].scope_metrics[0].scope;
367+
assert_eq!(scope.name, "test.meter");
368+
assert_eq!(scope.version, Some(Cow::Borrowed("v0.1.0")));
369+
assert_eq!(scope.schema_url, Some(Cow::Borrowed("schema_url")));
370+
371+
// This is validating current behavior, but it is not guaranteed to be the case in the future,
372+
// as this is a user error and SDK reserves right to change this behavior.
373+
assert_eq!(scope.attributes, vec![KeyValue::new("key", "value1")]);
374+
375+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
376+
assert_eq!(metric.name, "my_counter");
377+
assert_eq!(metric.unit.as_str(), "my_unit");
378+
assert_eq!(metric.description, "my_description");
379+
let sum = metric
380+
.data
381+
.as_any()
382+
.downcast_ref::<data::Sum<u64>>()
383+
.expect("Sum aggregation expected for Counter instruments by default");
384+
385+
// Expecting 1 time-series.
386+
assert_eq!(sum.data_points.len(), 1);
387+
388+
let datapoint = &sum.data_points[0];
389+
assert_eq!(datapoint.value, 15);
390+
}
391+
217392
// "multi_thread" tokio flavor must be used else flush won't
218393
// be able to make progress!
219394
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
@@ -692,6 +867,15 @@ mod tests {
692867
assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
693868
}
694869

870+
fn find_scope_metric<'a>(
871+
metrics: &'a [ScopeMetrics],
872+
name: &'a str,
873+
) -> Option<&'a ScopeMetrics> {
874+
metrics
875+
.iter()
876+
.find(|&scope_metric| scope_metric.scope.name == name)
877+
}
878+
695879
struct DeltaTemporalitySelector();
696880
impl TemporalitySelector for DeltaTemporalitySelector {
697881
fn temporality(&self, _kind: InstrumentKind) -> Temporality {

opentelemetry-sdk/src/testing/trace/span_exporters.rs

+33-32
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ use crate::{
77
InstrumentationLibrary,
88
};
99
use async_trait::async_trait;
10-
use crossbeam_channel::{unbounded, Receiver, SendError, Sender};
1110
use futures_util::future::BoxFuture;
1211
pub use opentelemetry::testing::trace::TestSpan;
1312
use opentelemetry::trace::{
1413
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
1514
};
16-
use std::fmt::{Display, Formatter};
15+
use std::{
16+
fmt::{Display, Formatter},
17+
sync::{Arc, Mutex},
18+
};
1719

1820
pub fn new_test_export_span_data() -> SpanData {
1921
let config = Config::default();
@@ -40,42 +42,47 @@ pub fn new_test_export_span_data() -> SpanData {
4042
}
4143
}
4244

43-
#[derive(Debug)]
45+
#[derive(Clone, Debug)]
4446
pub struct TestSpanExporter {
45-
tx_export: Sender<SpanData>,
46-
tx_shutdown: Sender<()>,
47+
pub export_called: Arc<Mutex<bool>>,
48+
pub shutdown_called: Arc<Mutex<bool>>,
49+
}
50+
51+
impl Default for TestSpanExporter {
52+
fn default() -> Self {
53+
Self::new()
54+
}
55+
}
56+
57+
impl TestSpanExporter {
58+
pub fn new() -> Self {
59+
TestSpanExporter {
60+
export_called: Arc::new(Mutex::new(false)),
61+
shutdown_called: Arc::new(Mutex::new(false)),
62+
}
63+
}
64+
65+
pub fn is_export_called(&self) -> bool {
66+
*self.export_called.lock().unwrap()
67+
}
68+
69+
pub fn is_shutdown_called(&self) -> bool {
70+
*self.shutdown_called.lock().unwrap()
71+
}
4772
}
4873

4974
#[async_trait]
5075
impl SpanExporter for TestSpanExporter {
51-
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
52-
for span_data in batch {
53-
if let Err(err) = self
54-
.tx_export
55-
.send(span_data)
56-
.map_err::<TestExportError, _>(Into::into)
57-
{
58-
return Box::pin(std::future::ready(Err(Into::into(err))));
59-
}
60-
}
76+
fn export(&mut self, _batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
77+
*self.export_called.lock().unwrap() = true;
6178
Box::pin(std::future::ready(Ok(())))
6279
}
6380

6481
fn shutdown(&mut self) {
65-
let _ = self.tx_shutdown.send(()); // ignore error
82+
*self.shutdown_called.lock().unwrap() = true;
6683
}
6784
}
6885

69-
pub fn new_test_exporter() -> (TestSpanExporter, Receiver<SpanData>, Receiver<()>) {
70-
let (tx_export, rx_export) = unbounded();
71-
let (tx_shutdown, rx_shutdown) = unbounded();
72-
let exporter = TestSpanExporter {
73-
tx_export,
74-
tx_shutdown,
75-
};
76-
(exporter, rx_export, rx_shutdown)
77-
}
78-
7986
#[derive(Debug)]
8087
pub struct TokioSpanExporter {
8188
tx_export: tokio::sync::mpsc::UnboundedSender<SpanData>,
@@ -139,12 +146,6 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for TestExportError {
139146
}
140147
}
141148

142-
impl<T> From<crossbeam_channel::SendError<T>> for TestExportError {
143-
fn from(err: SendError<T>) -> Self {
144-
TestExportError(err.to_string())
145-
}
146-
}
147-
148149
/// A no-op instance of an [`SpanExporter`].
149150
///
150151
/// [`SpanExporter`]: crate::export::trace::SpanExporter

0 commit comments

Comments
 (0)