Skip to content

Commit

Permalink
Merge branch 'main' into chore/semantic-commits
Browse files Browse the repository at this point in the history
  • Loading branch information
scottgerring authored Feb 24, 2025
2 parents bf22abe + bc5e6ce commit a627eef
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 127 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ you're more than welcome to participate!
### Approvers

* [Shaun Cox](https://github.com/shaun-cox)
* [Scott Gerring](https://github.com/scottgerring)

### Emeritus

Expand Down
16 changes: 12 additions & 4 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,25 @@
}
}
```

- **Breaking** The SpanExporter::export() method no longer requires a mutable reference to self.
Before:
Before:

```rust
async fn export(&mut self, batch: Vec<SpanData>) -> OTelSdkResult
```
After:
```rust

After:

```rust
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult
```

Custom exporters will need to internally synchronize any mutable state, if applicable.


- Bug Fix: `BatchLogProcessor` now correctly calls `shutdown` on the exporter
when its `shutdown` is invoked.

## 0.28.0

Released 2025-Feb-10
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ impl BatchLogProcessor {
&current_batch_size,
&config,
);
let _ = exporter.shutdown();
let _ = sender.send(result);

otel_debug!(
Expand Down Expand Up @@ -925,7 +926,8 @@ mod tests {
processor.shutdown().unwrap();
// todo: expect to see errors here. How should we assert this?
processor.emit(&mut record, &instrumentation);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
assert!(exporter.is_shutdown_called());
}

#[tokio::test(flavor = "current_thread")]
Expand Down
11 changes: 11 additions & 0 deletions opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::logs::{LogBatch, LogExporter};
use crate::Resource;
use opentelemetry::InstrumentationScope;
use std::borrow::Cow;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};

type LogResult<T> = Result<T, OTelSdkError>;
Expand Down Expand Up @@ -42,6 +43,7 @@ pub struct InMemoryLogExporter {
logs: Arc<Mutex<Vec<OwnedLogData>>>,
resource: Arc<Mutex<Resource>>,
should_reset_on_shutdown: bool,
shutdown_called: Arc<AtomicBool>,
}

impl Default for InMemoryLogExporter {
Expand Down Expand Up @@ -124,6 +126,7 @@ impl InMemoryLogExporterBuilder {
logs: Arc::new(Mutex::new(Vec::new())),
resource: Arc::new(Mutex::new(Resource::builder().build())),
should_reset_on_shutdown: self.reset_on_shutdown,
shutdown_called: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -137,6 +140,12 @@ impl InMemoryLogExporterBuilder {
}

impl InMemoryLogExporter {
/// Returns true if shutdown was called.
pub fn is_shutdown_called(&self) -> bool {
self.shutdown_called
.load(std::sync::atomic::Ordering::Relaxed)
}

/// Returns the logs emitted via Logger as a vector of `LogDataWithResource`.
///
/// # Example
Expand Down Expand Up @@ -203,6 +212,8 @@ impl LogExporter for InMemoryLogExporter {
}

fn shutdown(&mut self) -> OTelSdkResult {
self.shutdown_called
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.should_reset_on_shutdown {
self.reset();
}
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ mod tests {

processor.emit(&mut record, &instrumentation);

assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
assert!(exporter.is_shutdown_called());
}

#[test]
Expand Down
19 changes: 19 additions & 0 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,25 @@ mod tests {
assert_eq!(provider.inner.meters.lock().unwrap().len(), 5);
}

#[test]
fn same_meter_reused_same_scope_attributes() {
let meter_provider = super::SdkMeterProvider::builder().build();
let make_scope = |attributes| {
InstrumentationScope::builder("test.meter")
.with_version("v0.1.0")
.with_schema_url("http://example.com")
.with_attributes(attributes)
.build()
};

let _meter1 =
meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
let _meter2 =
meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));

assert_eq!(meter_provider.inner.meters.lock().unwrap().len(), 1);
}

#[test]
fn with_resource_multiple_calls_ensure_additive() {
let builder = SdkMeterProvider::builder()
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ mod tests {
.build();

// Act
// Meters are identical except for scope attributes, but scope attributes are not an identifying property.
// Meters are identical.
// Hence there should be a single metric stream output for this test.
let make_scope = |attributes| {
InstrumentationScope::builder("test.meter")
Expand All @@ -795,7 +795,7 @@ mod tests {
let meter1 =
meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));
let meter2 =
meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value2")]));
meter_provider.meter_with_scope(make_scope(vec![KeyValue::new("key", "value1")]));

let counter1 = meter1
.u64_counter("my_counter")
Expand Down
1 change: 1 addition & 0 deletions opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- *Breaking* Moved `TraceError` enum from `opentelemetry::trace::TraceError` to `opentelemetry_sdk::trace::TraceError`
- *Breaking* Moved `TraceResult` type alias from `opentelemetry::trace::TraceResult` to `opentelemetry_sdk::trace::TraceResult`
- {PLACEHOLDER} - Remove the above completely. // TODO fill this when changes are actually in.
- Bug Fix: `InstrumentationScope` implementation for `PartialEq` and `Hash` fixed to include Attributes also.

## 0.28.0

Expand Down
192 changes: 189 additions & 3 deletions opentelemetry/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::borrow::{Borrow, Cow};
use std::sync::Arc;
use std::{fmt, hash};

use std::hash::{Hash, Hasher};

/// The key part of attribute [KeyValue] pairs.
///
/// See the [attribute naming] spec for guidelines.
Expand Down Expand Up @@ -399,6 +401,42 @@ impl KeyValue {
}
}

struct F64Hashable(f64);

impl PartialEq for F64Hashable {
fn eq(&self, other: &Self) -> bool {
self.0.to_bits() == other.0.to_bits()
}
}

impl Eq for F64Hashable {}

impl Hash for F64Hashable {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.to_bits().hash(state);
}
}

impl Hash for KeyValue {
fn hash<H: Hasher>(&self, state: &mut H) {
self.key.hash(state);
match &self.value {
Value::F64(f) => F64Hashable(*f).hash(state),
Value::Array(a) => match a {
Array::Bool(b) => b.hash(state),
Array::I64(i) => i.hash(state),
Array::F64(f) => f.iter().for_each(|f| F64Hashable(*f).hash(state)),
Array::String(s) => s.hash(state),
},
Value::Bool(b) => b.hash(state),
Value::I64(i) => i.hash(state),
Value::String(s) => s.hash(state),
};
}
}

impl Eq for KeyValue {}

/// Information about a library or crate providing instrumentation.
///
/// An instrumentation scope should be named to follow any naming conventions
Expand Down Expand Up @@ -427,22 +465,33 @@ pub struct InstrumentationScope {
attributes: Vec<KeyValue>,
}

// Uniqueness for InstrumentationScope does not depend on attributes
impl Eq for InstrumentationScope {}

impl PartialEq for InstrumentationScope {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.version == other.version
&& self.schema_url == other.schema_url
&& {
let mut self_attrs = self.attributes.clone();
let mut other_attrs = other.attributes.clone();
self_attrs.sort_unstable_by(|a, b| a.key.cmp(&b.key));
other_attrs.sort_unstable_by(|a, b| a.key.cmp(&b.key));
self_attrs == other_attrs
}
}
}

impl Eq for InstrumentationScope {}

impl hash::Hash for InstrumentationScope {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.version.hash(state);
self.schema_url.hash(state);
let mut sorted_attrs = self.attributes.clone();
sorted_attrs.sort_unstable_by(|a, b| a.key.cmp(&b.key));
for attribute in sorted_attrs {
attribute.hash(state);
}
}
}

Expand Down Expand Up @@ -561,3 +610,140 @@ impl InstrumentationScopeBuilder {
}
}
}

#[cfg(test)]
mod tests {
use std::hash::{Hash, Hasher};

use crate::{InstrumentationScope, KeyValue};

use rand::random;
use std::collections::hash_map::DefaultHasher;
use std::f64;

#[test]
fn kv_float_equality() {
let kv1 = KeyValue::new("key", 1.0);
let kv2 = KeyValue::new("key", 1.0);
assert_eq!(kv1, kv2);

let kv1 = KeyValue::new("key", 1.0);
let kv2 = KeyValue::new("key", 1.01);
assert_ne!(kv1, kv2);

let kv1 = KeyValue::new("key", f64::NAN);
let kv2 = KeyValue::new("key", f64::NAN);
assert_ne!(kv1, kv2, "NAN is not equal to itself");

for float_val in [
f64::INFINITY,
f64::NEG_INFINITY,
f64::MAX,
f64::MIN,
f64::MIN_POSITIVE,
]
.iter()
{
let kv1 = KeyValue::new("key", *float_val);
let kv2 = KeyValue::new("key", *float_val);
assert_eq!(kv1, kv2);
}

for _ in 0..100 {
let random_value = random::<f64>();
let kv1 = KeyValue::new("key", random_value);
let kv2 = KeyValue::new("key", random_value);
assert_eq!(kv1, kv2);
}
}

#[test]
fn kv_float_hash() {
for float_val in [
f64::NAN,
f64::INFINITY,
f64::NEG_INFINITY,
f64::MAX,
f64::MIN,
f64::MIN_POSITIVE,
]
.iter()
{
let kv1 = KeyValue::new("key", *float_val);
let kv2 = KeyValue::new("key", *float_val);
assert_eq!(hash_helper(&kv1), hash_helper(&kv2));
}

for _ in 0..100 {
let random_value = random::<f64>();
let kv1 = KeyValue::new("key", random_value);
let kv2 = KeyValue::new("key", random_value);
assert_eq!(hash_helper(&kv1), hash_helper(&kv2));
}
}

fn hash_helper<T: Hash>(item: &T) -> u64 {
let mut hasher = DefaultHasher::new();
item.hash(&mut hasher);
hasher.finish()
}

#[test]
fn instrumentation_scope_equality() {
let scope1 = InstrumentationScope::builder("my-crate")
.with_version("v0.1.0")
.with_schema_url("https://opentelemetry.io/schemas/1.17.0")
.with_attributes([KeyValue::new("k", "v")])
.build();
let scope2 = InstrumentationScope::builder("my-crate")
.with_version("v0.1.0")
.with_schema_url("https://opentelemetry.io/schemas/1.17.0")
.with_attributes([KeyValue::new("k", "v")])
.build();
assert_eq!(scope1, scope2);
}

#[test]
fn instrumentation_scope_equality_attributes_diff_order() {
let scope1 = InstrumentationScope::builder("my-crate")
.with_version("v0.1.0")
.with_schema_url("https://opentelemetry.io/schemas/1.17.0")
.with_attributes([KeyValue::new("k1", "v1"), KeyValue::new("k2", "v2")])
.build();
let scope2 = InstrumentationScope::builder("my-crate")
.with_version("v0.1.0")
.with_schema_url("https://opentelemetry.io/schemas/1.17.0")
.with_attributes([KeyValue::new("k2", "v2"), KeyValue::new("k1", "v1")])
.build();
assert_eq!(scope1, scope2);

// assert hash are same for both scopes
let mut hasher1 = std::collections::hash_map::DefaultHasher::new();
scope1.hash(&mut hasher1);
let mut hasher2 = std::collections::hash_map::DefaultHasher::new();
scope2.hash(&mut hasher2);
assert_eq!(hasher1.finish(), hasher2.finish());
}

#[test]
fn instrumentation_scope_equality_different_attributes() {
let scope1 = InstrumentationScope::builder("my-crate")
.with_version("v0.1.0")
.with_schema_url("https://opentelemetry.io/schemas/1.17.0")
.with_attributes([KeyValue::new("k1", "v1"), KeyValue::new("k2", "v2")])
.build();
let scope2 = InstrumentationScope::builder("my-crate")
.with_version("v0.1.0")
.with_schema_url("https://opentelemetry.io/schemas/1.17.0")
.with_attributes([KeyValue::new("k2", "v3"), KeyValue::new("k4", "v5")])
.build();
assert_ne!(scope1, scope2);

// assert hash are same for both scopes
let mut hasher1 = std::collections::hash_map::DefaultHasher::new();
scope1.hash(&mut hasher1);
let mut hasher2 = std::collections::hash_map::DefaultHasher::new();
scope2.hash(&mut hasher2);
assert_ne!(hasher1.finish(), hasher2.finish());
}
}
Loading

0 comments on commit a627eef

Please sign in to comment.