Skip to content

Commit cc4a9c0

Browse files
committed
Merge branch 'utpilla/Update-OTLP-Exporter-Trace-Pipeline' of https://github.com/utpilla/opentelemetry-rust into utpilla/Update-OTLP-Exporter-Trace-Pipeline
2 parents dd142bc + f46fb90 commit cc4a9c0

File tree

6 files changed

+118
-14
lines changed

6 files changed

+118
-14
lines changed

opentelemetry-sdk/benches/metric_counter.rs

+29-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use opentelemetry::{
1919
};
2020
use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider};
2121
use rand::{
22-
rngs::{self, SmallRng},
22+
rngs::{self},
2323
Rng, SeedableRng,
2424
};
2525
use std::cell::RefCell;
@@ -107,14 +107,35 @@ fn counter_add(c: &mut Criterion) {
107107
});
108108
});
109109

110-
c.bench_function("Random_Generator_5", |b| {
110+
// Cause overflow.
111+
for v in 0..2001 {
112+
counter.add(100, &[KeyValue::new("A", v.to_string())]);
113+
}
114+
c.bench_function("Counter_Overflow", |b| {
111115
b.iter(|| {
112-
let mut rng = SmallRng::from_entropy();
113-
let _i1 = rng.gen_range(0..4);
114-
let _i2 = rng.gen_range(0..4);
115-
let _i3 = rng.gen_range(0..10);
116-
let _i4 = rng.gen_range(0..10);
117-
let _i5 = rng.gen_range(0..10);
116+
// 4*4*10*10 = 1600 time series.
117+
let rands = CURRENT_RNG.with(|rng| {
118+
let mut rng = rng.borrow_mut();
119+
[
120+
rng.gen_range(0..4),
121+
rng.gen_range(0..4),
122+
rng.gen_range(0..10),
123+
rng.gen_range(0..10),
124+
]
125+
});
126+
let index_first_attribute = rands[0];
127+
let index_second_attribute = rands[1];
128+
let index_third_attribute = rands[2];
129+
let index_forth_attribute = rands[3];
130+
counter.add(
131+
1,
132+
&[
133+
KeyValue::new("attribute1", attribute_values[index_first_attribute]),
134+
KeyValue::new("attribute2", attribute_values[index_second_attribute]),
135+
KeyValue::new("attribute3", attribute_values[index_third_attribute]),
136+
KeyValue::new("attribute4", attribute_values[index_forth_attribute]),
137+
],
138+
);
118139
});
119140
});
120141

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(
2424

2525
/// Checks whether aggregator has hit cardinality limit for metric streams
2626
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
27-
size < STREAM_CARDINALITY_LIMIT as usize - 1
27+
size < STREAM_CARDINALITY_LIMIT as usize
2828
}
2929

3030
/// Receives measurements to be aggregated.

opentelemetry-sdk/src/metrics/internal/sum.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,12 @@ impl<T: Number<T>> ValueMap<T> {
5555
Entry::Vacant(vacant_entry) => {
5656
if is_under_cardinality_limit(size) {
5757
vacant_entry.insert(measurement);
58+
} else if let Some(val) = values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET) {
59+
*val += measurement;
60+
return;
5861
} else {
59-
values
60-
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
61-
.and_modify(|val| *val += measurement)
62-
.or_insert(measurement);
63-
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
62+
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), measurement);
63+
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
6464
}
6565
}
6666
}

opentelemetry-sdk/src/metrics/mod.rs

+29
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,35 @@ mod tests {
161161
// "multi_thread" tokio flavor must be used else flush won't
162162
// be able to make progress!
163163

164+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
165+
async fn counter_overflow_delta() {
166+
// Arrange
167+
let mut test_context = TestContext::new(Temporality::Delta);
168+
let counter = test_context.u64_counter("test", "my_counter", None);
169+
170+
// Act
171+
// Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit
172+
for v in 0..2000 {
173+
counter.add(100, &[KeyValue::new("A", v.to_string())]);
174+
}
175+
176+
// All of the below will now go into overflow.
177+
counter.add(100, &[KeyValue::new("A", "foo")]);
178+
counter.add(100, &[KeyValue::new("A", "another")]);
179+
counter.add(100, &[KeyValue::new("A", "yet_another")]);
180+
test_context.flush_metrics();
181+
182+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
183+
184+
// Expecting 2001 metric points. (2000 + 1 overflow)
185+
assert_eq!(sum.data_points.len(), 2001);
186+
187+
let data_point =
188+
find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
189+
.expect("overflow point expected");
190+
assert_eq!(data_point.value, 300);
191+
}
192+
164193
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
165194
async fn counter_aggregation_cumulative() {
166195
// Run this test with stdout enabled to see output.

stress/Cargo.toml

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ name = "metrics"
99
path = "src/metrics.rs"
1010
doc = false
1111

12+
[[bin]] # Bin to run the metrics overflow stress tests
13+
name = "metrics_overflow"
14+
path = "src/metrics_overflow.rs"
15+
doc = false
16+
1217
[[bin]] # Bin to run the logs stress tests
1318
name = "logs"
1419
path = "src/logs.rs"

stress/src/metrics_overflow.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
Stress test results:
3+
OS: Ubuntu 22.04.3 LTS (5.15.146.1-microsoft-standard-WSL2)
4+
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
5+
RAM: 64.0 GB
6+
4.5M /sec
7+
*/
8+
9+
use lazy_static::lazy_static;
10+
use opentelemetry::{
11+
metrics::{Counter, MeterProvider as _},
12+
KeyValue,
13+
};
14+
use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider};
15+
use rand::{
16+
rngs::{self},
17+
Rng, SeedableRng,
18+
};
19+
use std::{borrow::Cow, cell::RefCell};
20+
21+
mod throughput;
22+
23+
lazy_static! {
24+
static ref PROVIDER: SdkMeterProvider = SdkMeterProvider::builder()
25+
.with_reader(ManualReader::builder().build())
26+
.build();
27+
static ref COUNTER: Counter<u64> = PROVIDER
28+
.meter(<&str as Into<Cow<'static, str>>>::into("test"))
29+
.u64_counter("hello")
30+
.init();
31+
}
32+
33+
thread_local! {
34+
/// Store random number generator for each thread
35+
static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
36+
}
37+
38+
fn main() {
39+
throughput::test_throughput(test_counter);
40+
}
41+
42+
fn test_counter() {
43+
// The main goal of this test is to ensure that OTel SDK is not growing its
44+
// memory usage indefinitely even when user code misbehaves by producing
45+
// unbounded metric points (unique time series).
46+
// It also checks that SDK's internal logging is also done in a bounded way.
47+
let rand = CURRENT_RNG.with(|rng| rng.borrow_mut().gen_range(0..100000000));
48+
COUNTER.add(1, &[KeyValue::new("A", rand)]);
49+
}

0 commit comments

Comments
 (0)