Skip to content

Commit 7948912

Browse files
authored
Merge branch 'main' into trace-set-resource
2 parents 080d6ce + 0f6de5a commit 7948912

File tree

7 files changed

+205
-32
lines changed

7 files changed

+205
-32
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ jobs:
7171
- uses: actions/checkout@v4
7272
- uses: dtolnay/rust-toolchain@nightly
7373
with:
74-
toolchain: nightly-2024-02-07
74+
toolchain: nightly-2024-05-01
7575
components: rustfmt
7676
- name: external-type-check
7777
run: |

opentelemetry-sdk/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
- Removed `XrayIdGenerator`, which was marked deprecated since 0.21.3. Use
1414
[`opentelemetry-aws`](https://crates.io/crates/opentelemetry-aws), version
1515
0.10.0 or newer.
16+
- Performance Improvement - Counter/UpDownCounter instruments internally use
17+
`RwLock` instead of `Mutex` to reduce contention.
1618

1719
- **Breaking** [1726](https://github.com/open-telemetry/opentelemetry-rust/pull/1726)
1820
Update `LogProcessor::emit() method to take mutable reference to LogData. This is breaking

opentelemetry-sdk/src/metrics/internal/sum.rs

+37-27
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::sync::atomic::{AtomicBool, Ordering};
22
use std::vec;
33
use std::{
4-
collections::{hash_map::Entry, HashMap},
5-
sync::Mutex,
4+
collections::HashMap,
5+
sync::{Mutex, RwLock},
66
time::SystemTime,
77
};
88

@@ -18,7 +18,7 @@ use super::{
1818

1919
/// The storage for sums.
2020
struct ValueMap<T: Number<T>> {
21-
values: Mutex<HashMap<AttributeSet, T>>,
21+
values: RwLock<HashMap<AttributeSet, T::AtomicTracker>>,
2222
has_no_value_attribute_value: AtomicBool,
2323
no_attribute_value: T::AtomicTracker,
2424
}
@@ -32,7 +32,7 @@ impl<T: Number<T>> Default for ValueMap<T> {
3232
impl<T: Number<T>> ValueMap<T> {
3333
fn new() -> Self {
3434
ValueMap {
35-
values: Mutex::new(HashMap::new()),
35+
values: RwLock::new(HashMap::new()),
3636
has_no_value_attribute_value: AtomicBool::new(false),
3737
no_attribute_value: T::new_atomic_tracker(),
3838
}
@@ -45,21 +45,31 @@ impl<T: Number<T>> ValueMap<T> {
4545
self.no_attribute_value.add(measurement);
4646
self.has_no_value_attribute_value
4747
.store(true, Ordering::Release);
48-
} else if let Ok(mut values) = self.values.lock() {
49-
let size = values.len();
50-
match values.entry(attrs) {
51-
Entry::Occupied(mut occupied_entry) => {
52-
let sum = occupied_entry.get_mut();
53-
*sum += measurement;
54-
}
55-
Entry::Vacant(vacant_entry) => {
56-
if is_under_cardinality_limit(size) {
57-
vacant_entry.insert(measurement);
58-
} else if let Some(val) = values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET) {
59-
*val += measurement;
48+
} else if let Ok(values) = self.values.read() {
49+
if let Some(value_to_update) = values.get(&attrs) {
50+
value_to_update.add(measurement);
51+
return;
52+
} else {
53+
drop(values);
54+
if let Ok(mut values) = self.values.write() {
55+
// Recheck after acquiring write lock, in case another
56+
// thread has added the value.
57+
if let Some(value_to_update) = values.get(&attrs) {
58+
value_to_update.add(measurement);
59+
return;
60+
} else if is_under_cardinality_limit(values.len()) {
61+
let new_value = T::new_atomic_tracker();
62+
new_value.add(measurement);
63+
values.insert(attrs, new_value);
64+
} else if let Some(overflow_value) =
65+
values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET)
66+
{
67+
overflow_value.add(measurement);
6068
return;
6169
} else {
62-
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), measurement);
70+
let new_value = T::new_atomic_tracker();
71+
new_value.add(measurement);
72+
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value);
6373
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
6474
}
6575
}
@@ -114,7 +124,7 @@ impl<T: Number<T>> Sum<T> {
114124
s_data.is_monotonic = self.monotonic;
115125
s_data.data_points.clear();
116126

117-
let mut values = match self.value_map.values.lock() {
127+
let mut values = match self.value_map.values.write() {
118128
Ok(v) => v,
119129
Err(_) => return (0, None),
120130
};
@@ -149,7 +159,7 @@ impl<T: Number<T>> Sum<T> {
149159
.collect(),
150160
start_time: Some(prev_start),
151161
time: Some(t),
152-
value,
162+
value: value.get_value(),
153163
exemplars: vec![],
154164
});
155165
}
@@ -186,7 +196,7 @@ impl<T: Number<T>> Sum<T> {
186196
s_data.is_monotonic = self.monotonic;
187197
s_data.data_points.clear();
188198

189-
let values = match self.value_map.values.lock() {
199+
let values = match self.value_map.values.write() {
190200
Ok(v) => v,
191201
Err(_) => return (0, None),
192202
};
@@ -226,7 +236,7 @@ impl<T: Number<T>> Sum<T> {
226236
.collect(),
227237
start_time: Some(prev_start),
228238
time: Some(t),
229-
value: *value,
239+
value: value.get_value(),
230240
exemplars: vec![],
231241
});
232242
}
@@ -282,7 +292,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
282292
s_data.temporality = Temporality::Delta;
283293
s_data.is_monotonic = self.monotonic;
284294

285-
let mut values = match self.value_map.values.lock() {
295+
let mut values = match self.value_map.values.write() {
286296
Ok(v) => v,
287297
Err(_) => return (0, None),
288298
};
@@ -315,9 +325,9 @@ impl<T: Number<T>> PrecomputedSum<T> {
315325

316326
let default = T::default();
317327
for (attrs, value) in values.drain() {
318-
let delta = value - *reported.get(&attrs).unwrap_or(&default);
328+
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
319329
if delta != default {
320-
new_reported.insert(attrs.clone(), value);
330+
new_reported.insert(attrs.clone(), value.get_value());
321331
}
322332
s_data.data_points.push(DataPoint {
323333
attributes: attrs
@@ -367,7 +377,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
367377
s_data.temporality = Temporality::Cumulative;
368378
s_data.is_monotonic = self.monotonic;
369379

370-
let values = match self.value_map.values.lock() {
380+
let values = match self.value_map.values.write() {
371381
Ok(v) => v,
372382
Err(_) => return (0, None),
373383
};
@@ -400,9 +410,9 @@ impl<T: Number<T>> PrecomputedSum<T> {
400410

401411
let default = T::default();
402412
for (attrs, value) in values.iter() {
403-
let delta = *value - *reported.get(attrs).unwrap_or(&default);
413+
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
404414
if delta != default {
405-
new_reported.insert(attrs.clone(), *value);
415+
new_reported.insert(attrs.clone(), value.get_value());
406416
}
407417
s_data.data_points.push(DataPoint {
408418
attributes: attrs

opentelemetry-sdk/src/metrics/mod.rs

+88-1
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,15 @@ impl Hash for AttributeSet {
139139

140140
#[cfg(all(test, feature = "testing"))]
141141
mod tests {
142-
use self::data::{DataPoint, ScopeMetrics};
142+
use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
143143
use super::*;
144144
use crate::metrics::data::{ResourceMetrics, Temporality};
145145
use crate::metrics::reader::TemporalitySelector;
146146
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
147147
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
148148
use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
149149
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
150+
use rand::{rngs, Rng, SeedableRng};
150151
use std::borrow::Cow;
151152
use std::sync::{Arc, Mutex};
152153

@@ -199,6 +200,20 @@ mod tests {
199200
counter_aggregation_helper(Temporality::Delta);
200201
}
201202

203+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
204+
async fn histogram_aggregation_cumulative() {
205+
// Run this test with stdout enabled to see output.
206+
// cargo test histogram_aggregation_cumulative --features=metrics,testing -- --nocapture
207+
histogram_aggregation_helper(Temporality::Cumulative);
208+
}
209+
210+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
211+
async fn histogram_aggregation_delta() {
212+
// Run this test with stdout enabled to see output.
213+
// cargo test histogram_aggregation_delta --features=metrics,testing -- --nocapture
214+
histogram_aggregation_helper(Temporality::Delta);
215+
}
216+
202217
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
203218
async fn updown_counter_aggregation_cumulative() {
204219
// Run this test with stdout enabled to see output.
@@ -1007,6 +1022,65 @@ mod tests {
10071022
assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
10081023
}
10091024

1025+
fn histogram_aggregation_helper(temporality: Temporality) {
1026+
// Arrange
1027+
let mut test_context = TestContext::new(temporality);
1028+
let histogram = test_context.meter().u64_histogram("my_histogram").init();
1029+
1030+
// Act
1031+
let mut rand = rngs::SmallRng::from_entropy();
1032+
let values_kv1 = (0..50)
1033+
.map(|_| rand.gen_range(0..100))
1034+
.collect::<Vec<u64>>();
1035+
for value in values_kv1.iter() {
1036+
histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1037+
}
1038+
1039+
let values_kv2 = (0..30)
1040+
.map(|_| rand.gen_range(0..100))
1041+
.collect::<Vec<u64>>();
1042+
for value in values_kv2.iter() {
1043+
histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1044+
}
1045+
1046+
test_context.flush_metrics();
1047+
1048+
// Assert
1049+
let histogram = test_context.get_aggregation::<data::Histogram<u64>>("my_histogram", None);
1050+
// Expecting 2 time-series.
1051+
assert_eq!(histogram.data_points.len(), 2);
1052+
if let Temporality::Cumulative = temporality {
1053+
assert_eq!(
1054+
histogram.temporality,
1055+
Temporality::Cumulative,
1056+
"Should produce cumulative"
1057+
);
1058+
} else {
1059+
assert_eq!(
1060+
histogram.temporality,
1061+
Temporality::Delta,
1062+
"Should produce delta"
1063+
);
1064+
}
1065+
1066+
// find and validate key1=value2 datapoint
1067+
let data_point1 =
1068+
find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
1069+
.expect("datapoint with key1=value1 expected");
1070+
assert_eq!(data_point1.count, values_kv1.len() as u64);
1071+
assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1072+
assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1073+
assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1074+
1075+
let data_point2 =
1076+
find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value2")
1077+
.expect("datapoint with key1=value2 expected");
1078+
assert_eq!(data_point2.count, values_kv2.len() as u64);
1079+
assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
1080+
assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
1081+
assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
1082+
}
1083+
10101084
fn counter_aggregation_helper(temporality: Temporality) {
10111085
// Arrange
10121086
let mut test_context = TestContext::new(temporality);
@@ -1109,6 +1183,19 @@ mod tests {
11091183
})
11101184
}
11111185

1186+
fn find_histogram_datapoint_with_key_value<'a, T>(
1187+
data_points: &'a [HistogramDataPoint<T>],
1188+
key: &str,
1189+
value: &str,
1190+
) -> Option<&'a HistogramDataPoint<T>> {
1191+
data_points.iter().find(|&datapoint| {
1192+
datapoint
1193+
.attributes
1194+
.iter()
1195+
.any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
1196+
})
1197+
}
1198+
11121199
fn find_scope_metric<'a>(
11131200
metrics: &'a [ScopeMetrics],
11141201
name: &'a str,

stress/Cargo.toml

+7-2
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ version = "0.1.0"
44
edition = "2021"
55
publish = false
66

7-
[[bin]] # Bin to run the metrics stress tests
7+
[[bin]] # Bin to run the metrics stress tests for Counter
88
name = "metrics"
9-
path = "src/metrics.rs"
9+
path = "src/metrics_counter.rs"
10+
doc = false
11+
12+
[[bin]] # Bin to run the metrics stress tests for Histogram
13+
name = "metrics_histogram"
14+
path = "src/metrics_histogram.rs"
1015
doc = false
1116

1217
[[bin]] # Bin to run the metrics overflow stress tests

stress/src/metrics.rs stress/src/metrics_counter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
44
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
55
RAM: 64.0 GB
6-
3M /sec
6+
35 M /sec
77
*/
88

99
use lazy_static::lazy_static;

stress/src/metrics_histogram.rs

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
Stress test results:
3+
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
4+
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
5+
RAM: 64.0 GB
6+
4.6M /sec
7+
*/
8+
9+
use lazy_static::lazy_static;
10+
use opentelemetry::{
11+
metrics::{Histogram, MeterProvider as _},
12+
KeyValue,
13+
};
14+
use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider};
15+
use rand::{
16+
rngs::{self},
17+
Rng, SeedableRng,
18+
};
19+
use std::{borrow::Cow, cell::RefCell};
20+
21+
mod throughput;
22+
23+
lazy_static! {
24+
static ref PROVIDER: SdkMeterProvider = SdkMeterProvider::builder()
25+
.with_reader(ManualReader::builder().build())
26+
.build();
27+
static ref ATTRIBUTE_VALUES: [&'static str; 10] = [
28+
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
29+
"value10"
30+
];
31+
static ref HISTOGRAM: Histogram<u64> = PROVIDER
32+
.meter(<&str as Into<Cow<'static, str>>>::into("test"))
33+
.u64_histogram("hello")
34+
.init();
35+
}
36+
37+
thread_local! {
38+
/// Store random number generator for each thread
39+
static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
40+
}
41+
42+
fn main() {
43+
throughput::test_throughput(test_counter);
44+
}
45+
46+
fn test_counter() {
47+
let len = ATTRIBUTE_VALUES.len();
48+
let rands = CURRENT_RNG.with(|rng| {
49+
let mut rng = rng.borrow_mut();
50+
[
51+
rng.gen_range(0..len),
52+
rng.gen_range(0..len),
53+
rng.gen_range(0..len),
54+
]
55+
});
56+
let index_first_attribute = rands[0];
57+
let index_second_attribute = rands[1];
58+
let index_third_attribute = rands[2];
59+
60+
// each attribute has 10 possible values, so there are 1000 possible combinations (time-series)
61+
HISTOGRAM.record(
62+
1,
63+
&[
64+
KeyValue::new("attribute1", ATTRIBUTE_VALUES[index_first_attribute]),
65+
KeyValue::new("attribute2", ATTRIBUTE_VALUES[index_second_attribute]),
66+
KeyValue::new("attribute3", ATTRIBUTE_VALUES[index_third_attribute]),
67+
],
68+
);
69+
}

0 commit comments

Comments
 (0)