Skip to content

Commit 61a6e5e

Browse files
authored
Merge branch 'main' into http-json-example
2 parents 4f2b65a + 2dd47fd commit 61a6e5e

File tree

10 files changed

+216
-52
lines changed

10 files changed

+216
-52
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ jobs:
7171
- uses: actions/checkout@v4
7272
- uses: dtolnay/rust-toolchain@nightly
7373
with:
74-
toolchain: nightly-2024-02-07
74+
toolchain: nightly-2024-05-01
7575
components: rustfmt
7676
- name: external-type-check
7777
run: |

opentelemetry-prometheus/tests/integration_test.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -406,18 +406,18 @@ fn gather_and_compare(registry: prometheus::Registry, expected: String, name: &'
406406
let metric_families = registry.gather();
407407
encoder.encode(&metric_families, &mut output).unwrap();
408408

409+
let expected = get_platform_specific_string(expected);
409410
let output_string = get_platform_specific_string(String::from_utf8(output).unwrap());
410411

411412
assert_eq!(output_string, expected, "{name}");
412413
}
413414

414415
/// Returns a String which uses the platform specific new line feed character.
415416
fn get_platform_specific_string(input: String) -> String {
416-
if cfg!(windows) {
417-
input.replace('\n', "\r\n")
418-
} else {
419-
input
417+
if cfg!(windows) && !input.ends_with("\r\n") && input.ends_with('\n') {
418+
return input.replace('\n', "\r\n");
420419
}
420+
input
421421
}
422422

423423
#[test]
@@ -812,6 +812,7 @@ fn duplicate_metrics() {
812812
.expected_files
813813
.into_iter()
814814
.map(|f| fs::read_to_string(Path::new("./tests/data").join(f)).expect(f))
815+
.map(get_platform_specific_string)
815816
.collect();
816817
gather_and_compare_multi(registry, possible_matches, tc.name);
817818
}

opentelemetry-proto/tests/grpc_build.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const TONIC_INCLUDES: &[&str] = &["src/proto/opentelemetry-proto", "src/proto"];
1818

1919
#[test]
2020
fn build_tonic() {
21-
let before_build = build_content_map(TONIC_OUT_DIR, false);
21+
let before_build = build_content_map(TONIC_OUT_DIR, true);
2222

2323
let out_dir = TempDir::new().expect("failed to create temp dir to store the generated files");
2424

@@ -130,13 +130,12 @@ fn build_content_map(path: impl AsRef<Path>, normalize_line_feed: bool) -> HashM
130130
.collect()
131131
}
132132

133-
/// Returns a String with the platform specific new line feed character.
133+
/// Returns a String which uses the platform specific new line feed character.
134134
fn get_platform_specific_string(input: String) -> String {
135-
if cfg!(windows) {
136-
input.replace('\n', "\r\n")
137-
} else {
138-
input
135+
if cfg!(windows) && !input.ends_with("\r\n") && input.ends_with('\n') {
136+
return input.replace('\n', "\r\n");
139137
}
138+
input
140139
}
141140

142141
fn ensure_files_are_same(

opentelemetry-sdk/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
- Removed `XrayIdGenerator`, which was marked deprecated since 0.21.3. Use
1414
[`opentelemetry-aws`](https://crates.io/crates/opentelemetry-aws), version
1515
0.10.0 or newer.
16+
- Performance Improvement - Counter/UpDownCounter instruments internally use
17+
`RwLock` instead of `Mutex` to reduce contention.
1618

1719
- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
1820
Update `LogProcessor::emit() method to take mutable reference to LogData. This is breaking

opentelemetry-sdk/src/metrics/internal/sum.rs

+37-27
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::sync::atomic::{AtomicBool, Ordering};
22
use std::vec;
33
use std::{
4-
collections::{hash_map::Entry, HashMap},
5-
sync::Mutex,
4+
collections::HashMap,
5+
sync::{Mutex, RwLock},
66
time::SystemTime,
77
};
88

@@ -18,7 +18,7 @@ use super::{
1818

1919
/// The storage for sums.
2020
struct ValueMap<T: Number<T>> {
21-
values: Mutex<HashMap<AttributeSet, T>>,
21+
values: RwLock<HashMap<AttributeSet, T::AtomicTracker>>,
2222
has_no_value_attribute_value: AtomicBool,
2323
no_attribute_value: T::AtomicTracker,
2424
}
@@ -32,7 +32,7 @@ impl<T: Number<T>> Default for ValueMap<T> {
3232
impl<T: Number<T>> ValueMap<T> {
3333
fn new() -> Self {
3434
ValueMap {
35-
values: Mutex::new(HashMap::new()),
35+
values: RwLock::new(HashMap::new()),
3636
has_no_value_attribute_value: AtomicBool::new(false),
3737
no_attribute_value: T::new_atomic_tracker(),
3838
}
@@ -45,21 +45,31 @@ impl<T: Number<T>> ValueMap<T> {
4545
self.no_attribute_value.add(measurement);
4646
self.has_no_value_attribute_value
4747
.store(true, Ordering::Release);
48-
} else if let Ok(mut values) = self.values.lock() {
49-
let size = values.len();
50-
match values.entry(attrs) {
51-
Entry::Occupied(mut occupied_entry) => {
52-
let sum = occupied_entry.get_mut();
53-
*sum += measurement;
54-
}
55-
Entry::Vacant(vacant_entry) => {
56-
if is_under_cardinality_limit(size) {
57-
vacant_entry.insert(measurement);
58-
} else if let Some(val) = values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET) {
59-
*val += measurement;
48+
} else if let Ok(values) = self.values.read() {
49+
if let Some(value_to_update) = values.get(&attrs) {
50+
value_to_update.add(measurement);
51+
return;
52+
} else {
53+
drop(values);
54+
if let Ok(mut values) = self.values.write() {
55+
// Recheck after acquiring write lock, in case another
56+
// thread has added the value.
57+
if let Some(value_to_update) = values.get(&attrs) {
58+
value_to_update.add(measurement);
59+
return;
60+
} else if is_under_cardinality_limit(values.len()) {
61+
let new_value = T::new_atomic_tracker();
62+
new_value.add(measurement);
63+
values.insert(attrs, new_value);
64+
} else if let Some(overflow_value) =
65+
values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET)
66+
{
67+
overflow_value.add(measurement);
6068
return;
6169
} else {
62-
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), measurement);
70+
let new_value = T::new_atomic_tracker();
71+
new_value.add(measurement);
72+
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value);
6373
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
6474
}
6575
}
@@ -114,7 +124,7 @@ impl<T: Number<T>> Sum<T> {
114124
s_data.is_monotonic = self.monotonic;
115125
s_data.data_points.clear();
116126

117-
let mut values = match self.value_map.values.lock() {
127+
let mut values = match self.value_map.values.write() {
118128
Ok(v) => v,
119129
Err(_) => return (0, None),
120130
};
@@ -149,7 +159,7 @@ impl<T: Number<T>> Sum<T> {
149159
.collect(),
150160
start_time: Some(prev_start),
151161
time: Some(t),
152-
value,
162+
value: value.get_value(),
153163
exemplars: vec![],
154164
});
155165
}
@@ -186,7 +196,7 @@ impl<T: Number<T>> Sum<T> {
186196
s_data.is_monotonic = self.monotonic;
187197
s_data.data_points.clear();
188198

189-
let values = match self.value_map.values.lock() {
199+
let values = match self.value_map.values.write() {
190200
Ok(v) => v,
191201
Err(_) => return (0, None),
192202
};
@@ -226,7 +236,7 @@ impl<T: Number<T>> Sum<T> {
226236
.collect(),
227237
start_time: Some(prev_start),
228238
time: Some(t),
229-
value: *value,
239+
value: value.get_value(),
230240
exemplars: vec![],
231241
});
232242
}
@@ -282,7 +292,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
282292
s_data.temporality = Temporality::Delta;
283293
s_data.is_monotonic = self.monotonic;
284294

285-
let mut values = match self.value_map.values.lock() {
295+
let mut values = match self.value_map.values.write() {
286296
Ok(v) => v,
287297
Err(_) => return (0, None),
288298
};
@@ -315,9 +325,9 @@ impl<T: Number<T>> PrecomputedSum<T> {
315325

316326
let default = T::default();
317327
for (attrs, value) in values.drain() {
318-
let delta = value - *reported.get(&attrs).unwrap_or(&default);
328+
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
319329
if delta != default {
320-
new_reported.insert(attrs.clone(), value);
330+
new_reported.insert(attrs.clone(), value.get_value());
321331
}
322332
s_data.data_points.push(DataPoint {
323333
attributes: attrs
@@ -367,7 +377,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
367377
s_data.temporality = Temporality::Cumulative;
368378
s_data.is_monotonic = self.monotonic;
369379

370-
let values = match self.value_map.values.lock() {
380+
let values = match self.value_map.values.write() {
371381
Ok(v) => v,
372382
Err(_) => return (0, None),
373383
};
@@ -400,9 +410,9 @@ impl<T: Number<T>> PrecomputedSum<T> {
400410

401411
let default = T::default();
402412
for (attrs, value) in values.iter() {
403-
let delta = *value - *reported.get(attrs).unwrap_or(&default);
413+
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
404414
if delta != default {
405-
new_reported.insert(attrs.clone(), *value);
415+
new_reported.insert(attrs.clone(), value.get_value());
406416
}
407417
s_data.data_points.push(DataPoint {
408418
attributes: attrs

opentelemetry-sdk/src/metrics/mod.rs

+88-1
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,15 @@ impl Hash for AttributeSet {
139139

140140
#[cfg(all(test, feature = "testing"))]
141141
mod tests {
142-
use self::data::{DataPoint, ScopeMetrics};
142+
use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
143143
use super::*;
144144
use crate::metrics::data::{ResourceMetrics, Temporality};
145145
use crate::metrics::reader::TemporalitySelector;
146146
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
147147
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
148148
use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
149149
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
150+
use rand::{rngs, Rng, SeedableRng};
150151
use std::borrow::Cow;
151152
use std::sync::{Arc, Mutex};
152153

@@ -199,6 +200,20 @@ mod tests {
199200
counter_aggregation_helper(Temporality::Delta);
200201
}
201202

203+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
204+
async fn histogram_aggregation_cumulative() {
205+
// Run this test with stdout enabled to see output.
206+
// cargo test histogram_aggregation_cumulative --features=metrics,testing -- --nocapture
207+
histogram_aggregation_helper(Temporality::Cumulative);
208+
}
209+
210+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
211+
async fn histogram_aggregation_delta() {
212+
// Run this test with stdout enabled to see output.
213+
// cargo test histogram_aggregation_delta --features=metrics,testing -- --nocapture
214+
histogram_aggregation_helper(Temporality::Delta);
215+
}
216+
202217
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
203218
async fn updown_counter_aggregation_cumulative() {
204219
// Run this test with stdout enabled to see output.
@@ -1007,6 +1022,65 @@ mod tests {
10071022
assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
10081023
}
10091024

1025+
fn histogram_aggregation_helper(temporality: Temporality) {
1026+
// Arrange
1027+
let mut test_context = TestContext::new(temporality);
1028+
let histogram = test_context.meter().u64_histogram("my_histogram").init();
1029+
1030+
// Act
1031+
let mut rand = rngs::SmallRng::from_entropy();
1032+
let values_kv1 = (0..50)
1033+
.map(|_| rand.gen_range(0..100))
1034+
.collect::<Vec<u64>>();
1035+
for value in values_kv1.iter() {
1036+
histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1037+
}
1038+
1039+
let values_kv2 = (0..30)
1040+
.map(|_| rand.gen_range(0..100))
1041+
.collect::<Vec<u64>>();
1042+
for value in values_kv2.iter() {
1043+
histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1044+
}
1045+
1046+
test_context.flush_metrics();
1047+
1048+
// Assert
1049+
let histogram = test_context.get_aggregation::<data::Histogram<u64>>("my_histogram", None);
1050+
// Expecting 2 time-series.
1051+
assert_eq!(histogram.data_points.len(), 2);
1052+
if let Temporality::Cumulative = temporality {
1053+
assert_eq!(
1054+
histogram.temporality,
1055+
Temporality::Cumulative,
1056+
"Should produce cumulative"
1057+
);
1058+
} else {
1059+
assert_eq!(
1060+
histogram.temporality,
1061+
Temporality::Delta,
1062+
"Should produce delta"
1063+
);
1064+
}
1065+
1066+
// find and validate key1=value2 datapoint
1067+
let data_point1 =
1068+
find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
1069+
.expect("datapoint with key1=value1 expected");
1070+
assert_eq!(data_point1.count, values_kv1.len() as u64);
1071+
assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1072+
assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1073+
assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1074+
1075+
let data_point2 =
1076+
find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value2")
1077+
.expect("datapoint with key1=value2 expected");
1078+
assert_eq!(data_point2.count, values_kv2.len() as u64);
1079+
assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
1080+
assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
1081+
assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
1082+
}
1083+
10101084
fn counter_aggregation_helper(temporality: Temporality) {
10111085
// Arrange
10121086
let mut test_context = TestContext::new(temporality);
@@ -1109,6 +1183,19 @@ mod tests {
11091183
})
11101184
}
11111185

1186+
fn find_histogram_datapoint_with_key_value<'a, T>(
1187+
data_points: &'a [HistogramDataPoint<T>],
1188+
key: &str,
1189+
value: &str,
1190+
) -> Option<&'a HistogramDataPoint<T>> {
1191+
data_points.iter().find(|&datapoint| {
1192+
datapoint
1193+
.attributes
1194+
.iter()
1195+
.any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
1196+
})
1197+
}
1198+
11121199
fn find_scope_metric<'a>(
11131200
metrics: &'a [ScopeMetrics],
11141201
name: &'a str,

stress/Cargo.toml

+7-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ version = "0.1.0"
44
edition = "2021"
55
publish = false
66

7-
[[bin]] # Bin to run the metrics stress tests
7+
[[bin]] # Bin to run the metrics stress tests for Counter
88
name = "metrics"
9-
path = "src/metrics.rs"
9+
path = "src/metrics_counter.rs"
10+
doc = false
11+
12+
[[bin]] # Bin to run the metrics stress tests for Histogram
13+
name = "metrics_histogram"
14+
path = "src/metrics_histogram.rs"
1015
doc = false
1116

1217
[[bin]] # Bin to run the metrics overflow stress tests

stress/src/metrics.rs stress/src/metrics_counter.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
44
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
55
RAM: 64.0 GB
6-
3M /sec
6+
35 M /sec
77
*/
88

99
use lazy_static::lazy_static;
@@ -16,7 +16,7 @@ use rand::{
1616
rngs::{self},
1717
Rng, SeedableRng,
1818
};
19-
use std::{borrow::Cow, cell::RefCell};
19+
use std::cell::RefCell;
2020

2121
mod throughput;
2222

@@ -28,10 +28,7 @@ lazy_static! {
2828
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
2929
"value10"
3030
];
31-
static ref COUNTER: Counter<u64> = PROVIDER
32-
.meter(<&str as Into<Cow<'static, str>>>::into("test"))
33-
.u64_counter("hello")
34-
.init();
31+
static ref COUNTER: Counter<u64> = PROVIDER.meter("test").u64_counter("hello").init();
3532
}
3633

3734
thread_local! {

0 commit comments

Comments
 (0)