From 3d7df98a24a9763c2770f81f4f86470f25987ef4 Mon Sep 17 00:00:00 2001 From: sree Date: Tue, 12 Mar 2024 09:33:57 +0530 Subject: [PATCH 01/12] remove dependency of urlencoding crate implement the url encode fn --- opentelemetry/Cargo.toml | 1 - opentelemetry/src/baggage.rs | 43 ++++++++++++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index ae198db3d9..cbbe1c0dbd 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -26,7 +26,6 @@ futures-sink = "0.3" once_cell = { workspace = true } pin-project-lite = { workspace = true, optional = true } thiserror = { workspace = true } -urlencoding = "2.1.2" [target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dependencies] js-sys = "0.3.63" diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index dc8de3914d..f22ce374e0 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -16,10 +16,10 @@ //! [W3C Baggage]: https://w3c.github.io/baggage use crate::{Context, Key, KeyValue, Value}; use once_cell::sync::Lazy; +use std::borrow::Cow; use std::collections::{hash_map, HashMap}; use std::fmt; use std::iter::FromIterator; -use urlencoding::encode; static DEFAULT_BAGGAGE: Lazy = Lazy::new(Baggage::default); @@ -282,10 +282,29 @@ impl FromIterator for Baggage { } } +fn encode(s: Cow<'_, str>) -> String { + let special_characters = ['.', '-', '_', '~']; + let mut encoded_string = String::new(); + + for char in s.as_ref().chars() { + if char.is_ascii_alphanumeric() || special_characters.contains(&char) { + encoded_string.push(char); + } else if char == ' ' { + encoded_string.push_str("%20"); + } else { + // Convert to a two-digit hexadecimal representation + for byte in char.to_string().as_bytes() { + encoded_string.push_str(&format!("%{:02X}", byte)); + } + } + } + encoded_string +} + impl fmt::Display for Baggage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for (i, (k, v)) in self.into_iter().enumerate() { - write!(f, "{}={}", k, encode(&v.0.as_str()))?; + write!(f, "{}={}", k, encode(v.0.as_str()))?; if !v.1.as_str().is_empty() { write!(f, ";{}", v.1)?; } @@ -473,6 +492,26 @@ mod tests { assert_eq!(baggage.len(), 0, "did not insert invalid key"); } + #[test] + fn test_ascii_values() { + let string1 = Cow::Borrowed("test_ 123"); + let string2 = Cow::Borrowed("Hello123"); + let string3 = Cow::Borrowed("This & That = More"); + let string4 = Cow::Borrowed("Unicode: 😊"); + let string5 = Cow::Borrowed("Non-ASCII: áéíóú"); + let string6 = Cow::Borrowed("Unsafe: ~!@#$%^&*()_+{}[];:'\\\"<>?,./"); + + assert_eq!(encode(string1), "test_%20123"); + assert_eq!(encode(string2), "Hello123"); + assert_eq!(encode(string3), "This%20%26%20That%20%3D%20More"); + assert_eq!(encode(string4), "Unicode%3A%20%F0%9F%98%8A"); + assert_eq!( + encode(string5), + "Non-ASCII%3A%20%C3%A1%C3%A9%C3%AD%C3%B3%C3%BA" + ); + assert_eq!(encode(string6), "Unsafe%3A%20~%21%40%23%24%25%5E%26%2A%28%29_%2B%7B%7D%5B%5D%3B%3A%27%5C%22%3C%3E%3F%2C.%2F"); + } + #[test] fn insert_too_much_baggage() { // too many key pairs From e49ad58a332eaffa0b83183c357b4a33232593b8 Mon Sep 17 00:00:00 2001 From: sree Date: Tue, 12 Mar 2024 09:39:18 +0530 Subject: [PATCH 02/12] add changes to changelog --- opentelemetry/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 22335a14f0..9613638a57 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -2,6 +2,9 @@ ## vNext +### Removed +- Remove `urlencoding` crate dependency. [#1613](https://github.com/open-telemetry/opentelemetry-rust/pull/1613) + ## v0.22.0 ### Added From 577718ccb3ceb776145672195e98067b5d380757 Mon Sep 17 00:00:00 2001 From: sree Date: Tue, 12 Mar 2024 09:51:09 +0530 Subject: [PATCH 03/12] format --- opentelemetry/src/baggage.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index f22ce374e0..0e57d6501e 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -290,14 +290,14 @@ fn encode(s: Cow<'_, str>) -> String { if char.is_ascii_alphanumeric() || special_characters.contains(&char) { encoded_string.push(char); } else if char == ' ' { - encoded_string.push_str("%20"); - } else { - // Convert to a two-digit hexadecimal representation - for byte in char.to_string().as_bytes() { - encoded_string.push_str(&format!("%{:02X}", byte)); - } + encoded_string.push_str("%20"); + } else { + // Convert to a two-digit hexadecimal representation + for byte in char.to_string().as_bytes() { + encoded_string.push_str(&format!("%{:02X}", byte)); } } + } encoded_string } From 78399b47d4f71cdd26a1905a524f4ee27b920db3 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 22:43:35 -0700 Subject: [PATCH 04/12] Use regular lock for simplespanprocessor (#1612) --- opentelemetry-sdk/CHANGELOG.md | 4 + opentelemetry-sdk/Cargo.toml | 5 +- opentelemetry-sdk/src/logs/log_processor.rs | 10 +- .../src/testing/trace/span_exporters.rs | 65 ++++----- opentelemetry-sdk/src/trace/span_processor.rs | 125 ++++++------------ 5 files changed, 84 insertions(+), 125 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 61d2dbb0b7..f5880ce3ae 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,10 @@ ## vNext +- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed + dependency on crossbeam-channel. + [1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files) + ## v0.22.1 ### Fixed diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index cc6f010683..cabcc84476 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -14,7 +14,6 @@ opentelemetry = { version = "0.22", path = "../opentelemetry/" } opentelemetry-http = { version = "0.11", path = "../opentelemetry-http", optional = true } async-std = { workspace = true, features = ["unstable"], optional = true } async-trait = { workspace = true, optional = true } -crossbeam-channel = { version = "0.5", optional = true } futures-channel = "0.3" futures-executor = { workspace = true } futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] } @@ -45,9 +44,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] } [features] default = ["trace"] -trace = ["opentelemetry/trace", "crossbeam-channel", "rand", "async-trait", "percent-encoding"] +trace = ["opentelemetry/trace", "rand", "async-trait", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"] -logs = ["opentelemetry/logs", "crossbeam-channel", "async-trait", "serde_json"] +logs = ["opentelemetry/logs", "async-trait", "serde_json"] logs_level_enabled = ["logs", "opentelemetry/logs_level_enabled"] metrics = ["opentelemetry/metrics", "glob", "async-trait"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index bda7730283..57ace8dad6 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -52,12 +52,10 @@ pub trait LogProcessor: Send + Sync + Debug { fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool; } -/// A [`LogProcessor`] that exports synchronously when logs are emitted. -/// -/// # Examples -/// -/// Note that the simple processor exports synchronously every time a log is -/// emitted. If you find this limiting, consider the batch processor instead. +/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon +/// as they are emitted, without any batching. This is typically useful for +/// debugging and testing. For scenarios requiring higher +/// performance/throughput, consider using [BatchLogProcessor]. #[derive(Debug)] pub struct SimpleLogProcessor { exporter: Mutex>, diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 92666e229f..aa5ec2d651 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -7,13 +7,15 @@ use crate::{ InstrumentationLibrary, }; use async_trait::async_trait; -use crossbeam_channel::{unbounded, Receiver, SendError, Sender}; use futures_util::future::BoxFuture; pub use opentelemetry::testing::trace::TestSpan; use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; -use std::fmt::{Display, Formatter}; +use std::{ + fmt::{Display, Formatter}, + sync::{Arc, Mutex}, +}; pub fn new_test_export_span_data() -> SpanData { let config = Config::default(); @@ -40,42 +42,47 @@ pub fn new_test_export_span_data() -> SpanData { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TestSpanExporter { - tx_export: Sender, - tx_shutdown: Sender<()>, + pub export_called: Arc>, + pub shutdown_called: Arc>, +} + +impl Default for TestSpanExporter { + fn default() -> Self { + Self::new() + } +} + +impl TestSpanExporter { + pub fn new() -> Self { + TestSpanExporter { + export_called: Arc::new(Mutex::new(false)), + shutdown_called: Arc::new(Mutex::new(false)), + } + } + + pub fn is_export_called(&self) -> bool { + *self.export_called.lock().unwrap() + } + + pub fn is_shutdown_called(&self) -> bool { + *self.shutdown_called.lock().unwrap() + } } #[async_trait] impl SpanExporter for TestSpanExporter { - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { - for span_data in batch { - if let Err(err) = self - .tx_export - .send(span_data) - .map_err::(Into::into) - { - return Box::pin(std::future::ready(Err(Into::into(err)))); - } - } + fn export(&mut self, _batch: Vec) -> BoxFuture<'static, ExportResult> { + *self.export_called.lock().unwrap() = true; Box::pin(std::future::ready(Ok(()))) } fn shutdown(&mut self) { - let _ = self.tx_shutdown.send(()); // ignore error + *self.shutdown_called.lock().unwrap() = true; } } -pub fn new_test_exporter() -> (TestSpanExporter, Receiver, Receiver<()>) { - let (tx_export, rx_export) = unbounded(); - let (tx_shutdown, rx_shutdown) = unbounded(); - let exporter = TestSpanExporter { - tx_export, - tx_shutdown, - }; - (exporter, rx_export, rx_shutdown) -} - #[derive(Debug)] pub struct TokioSpanExporter { tx_export: tokio::sync::mpsc::UnboundedSender, @@ -139,12 +146,6 @@ impl From> for TestExportError { } } -impl From> for TestExportError { - fn from(err: SendError) -> Self { - TestExportError(err.to_string()) - } -} - /// A no-op instance of an [`SpanExporter`]. /// /// [`SpanExporter`]: crate::export::trace::SpanExporter diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index c9230a1603..1f8ec47c03 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -50,7 +50,8 @@ use opentelemetry::{ Context, }; use std::cmp::min; -use std::{env, fmt, str::FromStr, thread, time::Duration}; +use std::sync::Mutex; +use std::{env, fmt, str::FromStr, time::Duration}; /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; @@ -93,65 +94,19 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { fn shutdown(&mut self) -> TraceResult<()>; } -/// A [SpanProcessor] that passes finished spans to the configured `SpanExporter`, as -/// soon as they are finished, without any batching. +/// A [SpanProcessor] that passes finished spans to the configured +/// `SpanExporter`, as soon as they are finished, without any batching. This is +/// typically useful for debugging and testing. For scenarios requiring higher +/// performance/throughput, consider using [BatchSpanProcessor]. #[derive(Debug)] pub struct SimpleSpanProcessor { - message_sender: crossbeam_channel::Sender, + exporter: Mutex>, } impl SimpleSpanProcessor { - pub(crate) fn new(mut exporter: Box) -> Self { - let (message_sender, rx) = crossbeam_channel::unbounded(); - - let _ = thread::Builder::new() - .name("opentelemetry-exporter".to_string()) - .spawn(move || { - while let Ok(msg) = rx.recv() { - match msg { - Message::ExportSpan(span) => { - if let Err(err) = - futures_executor::block_on(exporter.export(vec![span])) - { - global::handle_error(err); - } - } - Message::Flush(sender) => { - Self::respond(&sender, "sync"); - } - Message::Shutdown(sender) => { - exporter.shutdown(); - - Self::respond(&sender, "shutdown"); - - return; - } - } - } - - exporter.shutdown(); - }); - - Self { message_sender } - } - - fn signal(&self, msg: fn(crossbeam_channel::Sender<()>) -> Message, description: &str) { - let (tx, rx) = crossbeam_channel::bounded(0); - - if self.message_sender.send(msg(tx)).is_ok() { - if let Err(err) = rx.recv() { - global::handle_error(TraceError::from(format!( - "error {description} span processor: {err:?}" - ))); - } - } - } - - fn respond(sender: &crossbeam_channel::Sender<()>, description: &str) { - if let Err(err) = sender.send(()) { - global::handle_error(TraceError::from(format!( - "could not send {description}: {err:?}" - ))); + pub(crate) fn new(exporter: Box) -> Self { + Self { + exporter: Mutex::new(exporter), } } } @@ -166,34 +121,34 @@ impl SpanProcessor for SimpleSpanProcessor { return; } - if let Err(err) = self.message_sender.send(Message::ExportSpan(span)) { - global::handle_error(TraceError::from(format!("error processing span {:?}", err))); + let result = self + .exporter + .lock() + .map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into())) + .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span]))); + + if let Err(err) = result { + global::handle_error(err); } } fn force_flush(&self) -> TraceResult<()> { - self.signal(Message::Flush, "flushing"); - + // Nothing to flush for simple span processor. Ok(()) } fn shutdown(&mut self) -> TraceResult<()> { - self.signal(Message::Shutdown, "shutting down"); - - Ok(()) + if let Ok(mut exporter) = self.exporter.lock() { + exporter.shutdown(); + Ok(()) + } else { + Err(TraceError::Other( + "SimpleSpanProcessor mutex poison at shutdown".into(), + )) + } } } -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -// reason = "TODO: SpanData storing dropped_attribute_count separately triggered this clippy warning. -// Expecting to address that separately in the future."") -enum Message { - ExportSpan(SpanData), - Flush(crossbeam_channel::Sender<()>), - Shutdown(crossbeam_channel::Sender<()>), -} - /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. /// @@ -707,6 +662,7 @@ where #[cfg(all(test, feature = "testing", feature = "trace"))] mod tests { + // cargo test trace::span_processor::tests:: --features=trace,testing use super::{ BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, @@ -715,7 +671,7 @@ mod tests { use crate::export::trace::{ExportResult, SpanData, SpanExporter}; use crate::runtime; use crate::testing::trace::{ - new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, + new_test_export_span_data, new_tokio_test_exporter, TestSpanExporter, }; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, @@ -729,17 +685,17 @@ mod tests { #[test] fn simple_span_processor_on_end_calls_export() { - let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); - let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); processor.on_end(new_test_export_span_data()); - assert!(rx_export.recv().is_ok()); + assert!(exporter.is_export_called()); let _result = processor.shutdown(); } #[test] fn simple_span_processor_on_end_skips_export_if_not_sampled() { - let (exporter, rx_export, _rx_shutdown) = new_test_exporter(); - let processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); let unsampled = SpanData { span_context: SpanContext::empty_context(), parent_span_id: SpanId::INVALID, @@ -756,15 +712,16 @@ mod tests { instrumentation_lib: Default::default(), }; processor.on_end(unsampled); - assert!(rx_export.recv_timeout(Duration::from_millis(100)).is_err()); + assert!(!exporter.is_export_called()); } #[test] fn simple_span_processor_shutdown_calls_shutdown() { - let (exporter, _rx_export, rx_shutdown) = new_test_exporter(); - let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); + let exporter = TestSpanExporter::new(); + let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone())); + assert!(!exporter.is_shutdown_called()); let _result = processor.shutdown(); - assert!(rx_shutdown.try_recv().is_ok()); + assert!(exporter.is_shutdown_called()); } #[test] @@ -863,7 +820,7 @@ mod tests { (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), ]; temp_env::with_vars(env_vars.clone(), || { - let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio); + let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio); // export batch size cannot exceed max queue size assert_eq!(builder.config.max_export_batch_size, 500); assert_eq!( @@ -883,7 +840,7 @@ mod tests { env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); temp_env::with_vars(env_vars, || { - let builder = BatchSpanProcessor::builder(new_test_exporter().0, runtime::Tokio); + let builder = BatchSpanProcessor::builder(TestSpanExporter::new(), runtime::Tokio); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); }); From 35166c6f3c9de2ade1152366ea73de6502b6043a Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 11 Mar 2024 22:56:07 -0700 Subject: [PATCH 05/12] Add more tests to Metric SDK aggregation (#1600) --- opentelemetry-sdk/src/metrics/mod.rs | 184 +++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 7d41cdf18d..50673f7d25 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -60,6 +60,7 @@ pub use view::*; #[cfg(all(test, feature = "testing"))] mod tests { + use self::data::ScopeMetrics; use super::*; use crate::metrics::data::{ResourceMetrics, Temporality}; use crate::metrics::reader::TemporalitySelector; @@ -70,6 +71,7 @@ mod tests { metrics::{MeterProvider as _, Unit}, KeyValue, }; + use std::borrow::Cow; // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! @@ -214,6 +216,179 @@ mod tests { assert_eq!(datapoint.value, 15); } + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_duplicate_instrument_different_meter_no_merge() { + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter1 = meter_provider.meter("test.meter1"); + let meter2 = meter_provider.meter("test.meter2"); + let counter1 = meter1 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let counter2 = meter2 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let attribute = vec![KeyValue::new("key1", "value1")]; + counter1.add(10, &attribute); + counter2.add(5, &attribute); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!( + resource_metrics[0].scope_metrics.len() == 2, + "There should be 2 separate scope" + ); + assert!( + resource_metrics[0].scope_metrics[0].metrics.len() == 1, + "There should be single metric for the scope" + ); + assert!( + resource_metrics[0].scope_metrics[1].metrics.len() == 1, + "There should be single metric for the scope" + ); + + let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1"); + let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2"); + + if let Some(scope1) = scope1 { + let metric1 = &scope1.metrics[0]; + assert_eq!(metric1.name, "my_counter"); + assert_eq!(metric1.unit.as_str(), "my_unit"); + assert_eq!(metric1.description, "my_description"); + let sum1 = metric1 + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum1.data_points.len(), 1); + + let datapoint1 = &sum1.data_points[0]; + assert_eq!(datapoint1.value, 10); + } else { + panic!("No MetricScope found for 'test.meter1'"); + } + + if let Some(scope2) = scope2 { + let metric2 = &scope2.metrics[0]; + assert_eq!(metric2.name, "my_counter"); + assert_eq!(metric2.unit.as_str(), "my_unit"); + assert_eq!(metric2.description, "my_description"); + let sum2 = metric2 + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum2.data_points.len(), 1); + + let datapoint2 = &sum2.data_points[0]; + assert_eq!(datapoint2.value, 5); + } else { + panic!("No MetricScope found for 'test.meter2'"); + } + } + + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn instrumentation_scope_identity_test() { + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + // Meters are identical except for scope attributes, but scope attributes are not an identifying property. + // Hence there should be a single metric stream output for this test. + let meter1 = meter_provider.versioned_meter( + "test.meter", + Some("v0.1.0"), + Some("schema_url"), + Some(vec![KeyValue::new("key", "value1")]), + ); + let meter2 = meter_provider.versioned_meter( + "test.meter", + Some("v0.1.0"), + Some("schema_url"), + Some(vec![KeyValue::new("key", "value2")]), + ); + let counter1 = meter1 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let counter2 = meter2 + .u64_counter("my_counter") + .with_unit(Unit::new("my_unit")) + .with_description("my_description") + .init(); + + let attribute = vec![KeyValue::new("key1", "value1")]; + counter1.add(10, &attribute); + counter2.add(5, &attribute); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + println!("resource_metrics: {:?}", resource_metrics); + assert!( + resource_metrics[0].scope_metrics.len() == 1, + "There should be a single scope as the meters are identical" + ); + assert!( + resource_metrics[0].scope_metrics[0].metrics.len() == 1, + "There should be single metric for the scope as instruments are identical" + ); + + let scope = &resource_metrics[0].scope_metrics[0].scope; + assert_eq!(scope.name, "test.meter"); + assert_eq!(scope.version, Some(Cow::Borrowed("v0.1.0"))); + assert_eq!(scope.schema_url, Some(Cow::Borrowed("schema_url"))); + + // This is validating current behavior, but it is not guaranteed to be the case in the future, + // as this is a user error and SDK reserves right to change this behavior. + assert_eq!(scope.attributes, vec![KeyValue::new("key", "value1")]); + + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_counter"); + assert_eq!(metric.unit.as_str(), "my_unit"); + assert_eq!(metric.description, "my_description"); + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for Counter instruments by default"); + + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); + + let datapoint = &sum.data_points[0]; + assert_eq!(datapoint.value, 15); + } + // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -692,6 +867,15 @@ mod tests { assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect."); } + fn find_scope_metric<'a>( + metrics: &'a [ScopeMetrics], + name: &'a str, + ) -> Option<&'a ScopeMetrics> { + metrics + .iter() + .find(|&scope_metric| scope_metric.scope.name == name) + } + struct DeltaTemporalitySelector(); impl TemporalitySelector for DeltaTemporalitySelector { fn temporality(&self, _kind: InstrumentKind) -> Temporality { From 3a48daeefcb468ac9d2de10c918d4948ed939c44 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 12 Mar 2024 08:13:34 -0700 Subject: [PATCH 06/12] More nit fixes to opentelemetry benches (#1617) --- opentelemetry/benches/attributes.rs | 27 +++++++++++++++++++++++---- opentelemetry/benches/metrics.rs | 8 ++------ opentelemetry/src/global/mod.rs | 4 ++-- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/opentelemetry/benches/attributes.rs b/opentelemetry/benches/attributes.rs index 6b811c2b3b..081af06c36 100644 --- a/opentelemetry/benches/attributes.rs +++ b/opentelemetry/benches/attributes.rs @@ -1,5 +1,6 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use opentelemetry::KeyValue; +use opentelemetry::{Key, KeyValue}; +use std::sync::Arc; // Run this benchmark with: // cargo bench --bench attributes @@ -9,20 +10,38 @@ fn criterion_benchmark(c: &mut Criterion) { } fn attributes_creation(c: &mut Criterion) { + c.bench_function("CreateOTelKey_Static", |b| { + b.iter(|| { + let _v1 = black_box(Key::new("attribute1")); + }); + }); + + c.bench_function("CreateOTelKey_Owned", |b| { + b.iter(|| { + let _v1 = black_box(Key::new(String::from("attribute1"))); + }); + }); + + c.bench_function("CreateOTelKey_Arc", |b| { + b.iter(|| { + let _v1 = black_box(Key::new(Arc::from("attribute1"))); + }); + }); + c.bench_function("CreateOTelKeyValue", |b| { b.iter(|| { let _v1 = black_box(KeyValue::new("attribute1", "value1")); }); }); - c.bench_function("CreateKeyValueTuple", |b| { + c.bench_function("CreateTupleKeyValue", |b| { b.iter(|| { let _v1 = black_box(("attribute1", "value1")); }); }); #[allow(clippy::useless_vec)] - c.bench_function("CreateVector_KeyValue", |b| { + c.bench_function("CreateOtelKeyValueVector", |b| { b.iter(|| { let _v1 = black_box(vec![ KeyValue::new("attribute1", "value1"), @@ -34,7 +53,7 @@ fn attributes_creation(c: &mut Criterion) { }); #[allow(clippy::useless_vec)] - c.bench_function("CreateVector_StringPairs", |b| { + c.bench_function("CreateTupleKeyValueVector", |b| { b.iter(|| { let _v1 = black_box(vec![ ("attribute1", "value1"), diff --git a/opentelemetry/benches/metrics.rs b/opentelemetry/benches/metrics.rs index bf562fe9b7..44c898d900 100644 --- a/opentelemetry/benches/metrics.rs +++ b/opentelemetry/benches/metrics.rs @@ -1,15 +1,11 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use opentelemetry::{ - metrics::{noop::NoopMeterProvider, Counter, MeterProvider as _}, - KeyValue, -}; +use opentelemetry::{global, metrics::Counter, KeyValue}; // Run this benchmark with: // cargo bench --bench metrics --features=metrics fn create_counter() -> Counter { - let meter_provider: NoopMeterProvider = NoopMeterProvider::default(); - let meter = meter_provider.meter("benchmarks"); + let meter = global::meter("benchmarks"); let counter = meter.u64_counter("counter_bench").init(); counter } diff --git a/opentelemetry/src/global/mod.rs b/opentelemetry/src/global/mod.rs index 790343968c..47b3688b19 100644 --- a/opentelemetry/src/global/mod.rs +++ b/opentelemetry/src/global/mod.rs @@ -127,8 +127,8 @@ //! pub fn my_traced_library_function() { //! // End users of your library will configure their global meter provider //! // so you can use the global meter without any setup -//! let tracer = global::meter("my-library-name"); -//! let counter = tracer.u64_counter("my_counter").init(); +//! let meter = global::meter("my-library-name"); +//! let counter = meter.u64_counter("my_counter").init(); //! //! // record metrics //! counter.add(1, &[KeyValue::new("mykey", "myvalue")]); From 849fbcc74dd082a120104613cf74dce535b050e9 Mon Sep 17 00:00:00 2001 From: sree Date: Tue, 12 Mar 2024 20:17:17 +0530 Subject: [PATCH 07/12] use str ref --- opentelemetry/src/baggage.rs | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index 0e57d6501e..d08f7f5b56 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -16,7 +16,6 @@ //! [W3C Baggage]: https://w3c.github.io/baggage use crate::{Context, Key, KeyValue, Value}; use once_cell::sync::Lazy; -use std::borrow::Cow; use std::collections::{hash_map, HashMap}; use std::fmt; use std::iter::FromIterator; @@ -282,11 +281,11 @@ impl FromIterator for Baggage { } } -fn encode(s: Cow<'_, str>) -> String { +fn encode(s: &str) -> String { let special_characters = ['.', '-', '_', '~']; let mut encoded_string = String::new(); - for char in s.as_ref().chars() { + for char in s.chars() { if char.is_ascii_alphanumeric() || special_characters.contains(&char) { encoded_string.push(char); } else if char == ' ' { @@ -304,7 +303,7 @@ fn encode(s: Cow<'_, str>) -> String { impl fmt::Display for Baggage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for (i, (k, v)) in self.into_iter().enumerate() { - write!(f, "{}={}", k, encode(v.0.as_str()))?; + write!(f, "{}={}", k, encode(v.0.as_str().as_ref()))?; if !v.1.as_str().is_empty() { write!(f, ";{}", v.1)?; } @@ -494,22 +493,22 @@ mod tests { #[test] fn test_ascii_values() { - let string1 = Cow::Borrowed("test_ 123"); - let string2 = Cow::Borrowed("Hello123"); - let string3 = Cow::Borrowed("This & That = More"); - let string4 = Cow::Borrowed("Unicode: 😊"); - let string5 = Cow::Borrowed("Non-ASCII: áéíóú"); - let string6 = Cow::Borrowed("Unsafe: ~!@#$%^&*()_+{}[];:'\\\"<>?,./"); - - assert_eq!(encode(string1), "test_%20123"); - assert_eq!(encode(string2), "Hello123"); - assert_eq!(encode(string3), "This%20%26%20That%20%3D%20More"); - assert_eq!(encode(string4), "Unicode%3A%20%F0%9F%98%8A"); + let string1 = "test_ 123"; + let string2 = "Hello123"; + let string3 = "This & That = More"; + let string4 = "Unicode: 😊"; + let string5 = "Non-ASCII: áéíóú"; + let string6 = "Unsafe: ~!@#$%^&*()_+{}[];:'\\\"<>?,./"; + + assert_eq!(encode(&string1), "test_%20123"); + assert_eq!(encode(&string2), "Hello123"); + assert_eq!(encode(&string3), "This%20%26%20That%20%3D%20More"); + assert_eq!(encode(&string4), "Unicode%3A%20%F0%9F%98%8A"); assert_eq!( - encode(string5), + encode(&string5), "Non-ASCII%3A%20%C3%A1%C3%A9%C3%AD%C3%B3%C3%BA" ); - assert_eq!(encode(string6), "Unsafe%3A%20~%21%40%23%24%25%5E%26%2A%28%29_%2B%7B%7D%5B%5D%3B%3A%27%5C%22%3C%3E%3F%2C.%2F"); + assert_eq!(encode(&string6), "Unsafe%3A%20~%21%40%23%24%25%5E%26%2A%28%29_%2B%7B%7D%5B%5D%3B%3A%27%5C%22%3C%3E%3F%2C.%2F"); } #[test] From 75d8fc2525e61792a13b468220aa9751e06956be Mon Sep 17 00:00:00 2001 From: sree Date: Tue, 12 Mar 2024 21:42:12 +0530 Subject: [PATCH 08/12] set len while initialising encoded string --- opentelemetry/src/baggage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index d08f7f5b56..5747ee23f4 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -283,7 +283,7 @@ impl FromIterator for Baggage { fn encode(s: &str) -> String { let special_characters = ['.', '-', '_', '~']; - let mut encoded_string = String::new(); + let mut encoded_string = String::with_capacity(s.len()); for char in s.chars() { if char.is_ascii_alphanumeric() || special_characters.contains(&char) { @@ -493,7 +493,7 @@ mod tests { #[test] fn test_ascii_values() { - let string1 = "test_ 123"; + let string1 = "test_ 123"; let string2 = "Hello123"; let string3 = "This & That = More"; let string4 = "Unicode: 😊"; From 2c7df6cc26473c724b5ecb290b84c84e342446c6 Mon Sep 17 00:00:00 2001 From: sree Date: Tue, 12 Mar 2024 22:26:37 +0530 Subject: [PATCH 09/12] convert char to bytes directly --- opentelemetry/src/baggage.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index 5747ee23f4..8894e8a095 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -291,8 +291,9 @@ fn encode(s: &str) -> String { } else if char == ' ' { encoded_string.push_str("%20"); } else { - // Convert to a two-digit hexadecimal representation - for byte in char.to_string().as_bytes() { + let mut buffer = [0; 4]; + let encoded_char = char.encode_utf8(&mut buffer); + for byte in encoded_char.as_bytes() { encoded_string.push_str(&format!("%{:02X}", byte)); } } From b176f8dea22a3798449909491ef31b5850c1cdde Mon Sep 17 00:00:00 2001 From: sree Date: Tue, 12 Mar 2024 22:31:45 +0530 Subject: [PATCH 10/12] fix lint issues --- opentelemetry/src/baggage.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index 8894e8a095..23e862c519 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -501,15 +501,15 @@ mod tests { let string5 = "Non-ASCII: áéíóú"; let string6 = "Unsafe: ~!@#$%^&*()_+{}[];:'\\\"<>?,./"; - assert_eq!(encode(&string1), "test_%20123"); - assert_eq!(encode(&string2), "Hello123"); - assert_eq!(encode(&string3), "This%20%26%20That%20%3D%20More"); - assert_eq!(encode(&string4), "Unicode%3A%20%F0%9F%98%8A"); + assert_eq!(encode(string1), "test_%20123"); + assert_eq!(encode(string2), "Hello123"); + assert_eq!(encode(string3), "This%20%26%20That%20%3D%20More"); + assert_eq!(encode(string4), "Unicode%3A%20%F0%9F%98%8A"); assert_eq!( - encode(&string5), + encode(string5), "Non-ASCII%3A%20%C3%A1%C3%A9%C3%AD%C3%B3%C3%BA" ); - assert_eq!(encode(&string6), "Unsafe%3A%20~%21%40%23%24%25%5E%26%2A%28%29_%2B%7B%7D%5B%5D%3B%3A%27%5C%22%3C%3E%3F%2C.%2F"); + assert_eq!(encode(string6), "Unsafe%3A%20~%21%40%23%24%25%5E%26%2A%28%29_%2B%7B%7D%5B%5D%3B%3A%27%5C%22%3C%3E%3F%2C.%2F"); } #[test] From 82f0108f000c5b277563101bea9716b34e8f62b4 Mon Sep 17 00:00:00 2001 From: sree Date: Wed, 13 Mar 2024 11:59:07 +0530 Subject: [PATCH 11/12] fix format issues --- opentelemetry/src/baggage.rs | 53 +++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index 23e862c519..78064f8566 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -282,22 +282,45 @@ impl FromIterator for Baggage { } fn encode(s: &str) -> String { - let special_characters = ['.', '-', '_', '~']; - let mut encoded_string = String::with_capacity(s.len()); - - for char in s.chars() { - if char.is_ascii_alphanumeric() || special_characters.contains(&char) { - encoded_string.push(char); - } else if char == ' ' { - encoded_string.push_str("%20"); - } else { - let mut buffer = [0; 4]; - let encoded_char = char.encode_utf8(&mut buffer); - for byte in encoded_char.as_bytes() { - encoded_string.push_str(&format!("%{:02X}", byte)); + let special_characters = [b'.', b'-', b'_', b'~']; + let mut encoded_string = String::with_capacity(s.len() * 3); + + let bytes = s.as_bytes(); + let mut i = 0; + + while i < bytes.len() { + let byte = bytes[i]; + + match byte { + b' ' => encoded_string.push_str("%20"), + byte if byte.is_ascii_alphanumeric() || special_characters.contains(&byte) => { + encoded_string.push(byte as char) + } + _ => { + if byte.is_ascii() { + encoded_string.push_str(&format!("%{:02X}", byte)); + } else { + let start = i; + let mut end = start + 1; + + while end < bytes.len() && !bytes[end].is_ascii() { + end += 1; + } + + // Encoding each byte of the multi-byte character + for &multi_byte in &bytes[start..end] { + encoded_string.push_str(&format!("%{:02X}", multi_byte)); + } + + // Adjust `i` to skip over the bytes we've just encoded + i = end - 1; + } } } + + i += 1; } + encoded_string } @@ -500,6 +523,8 @@ mod tests { let string4 = "Unicode: 😊"; let string5 = "Non-ASCII: áéíóú"; let string6 = "Unsafe: ~!@#$%^&*()_+{}[];:'\\\"<>?,./"; + let string7: &str = "🚀Unicode:"; + let string8 = "ΑΒΓ"; assert_eq!(encode(string1), "test_%20123"); assert_eq!(encode(string2), "Hello123"); @@ -510,6 +535,8 @@ mod tests { "Non-ASCII%3A%20%C3%A1%C3%A9%C3%AD%C3%B3%C3%BA" ); assert_eq!(encode(string6), "Unsafe%3A%20~%21%40%23%24%25%5E%26%2A%28%29_%2B%7B%7D%5B%5D%3B%3A%27%5C%22%3C%3E%3F%2C.%2F"); + assert_eq!(encode(string7), "%F0%9F%9A%80Unicode%3A"); + assert_eq!(encode(string8), "%CE%91%CE%92%CE%93"); } #[test] From 5f2cc10fe7449f26cf0b63d963278ad9bd379282 Mon Sep 17 00:00:00 2001 From: sree Date: Wed, 13 Mar 2024 17:50:45 +0530 Subject: [PATCH 12/12] simply encode logic --- opentelemetry/src/baggage.rs | 42 ++++++------------------------------ 1 file changed, 7 insertions(+), 35 deletions(-) diff --git a/opentelemetry/src/baggage.rs b/opentelemetry/src/baggage.rs index 78064f8566..62a8cb66b8 100644 --- a/opentelemetry/src/baggage.rs +++ b/opentelemetry/src/baggage.rs @@ -282,45 +282,17 @@ impl FromIterator for Baggage { } fn encode(s: &str) -> String { - let special_characters = [b'.', b'-', b'_', b'~']; - let mut encoded_string = String::with_capacity(s.len() * 3); + let mut encoded_string = String::with_capacity(s.len()); - let bytes = s.as_bytes(); - let mut i = 0; - - while i < bytes.len() { - let byte = bytes[i]; - - match byte { - b' ' => encoded_string.push_str("%20"), - byte if byte.is_ascii_alphanumeric() || special_characters.contains(&byte) => { - encoded_string.push(byte as char) - } - _ => { - if byte.is_ascii() { - encoded_string.push_str(&format!("%{:02X}", byte)); - } else { - let start = i; - let mut end = start + 1; - - while end < bytes.len() && !bytes[end].is_ascii() { - end += 1; - } - - // Encoding each byte of the multi-byte character - for &multi_byte in &bytes[start..end] { - encoded_string.push_str(&format!("%{:02X}", multi_byte)); - } - - // Adjust `i` to skip over the bytes we've just encoded - i = end - 1; - } + for byte in s.as_bytes() { + match *byte { + b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'.' | b'-' | b'_' | b'~' => { + encoded_string.push(*byte as char) } + b' ' => encoded_string.push_str("%20"), + _ => encoded_string.push_str(&format!("%{:02X}", byte)), } - - i += 1; } - encoded_string }