Skip to content

Commit 080d6ce

Browse files
authored
Merge branch 'main' into trace-set-resource
2 parents 5f7a564 + 0ce6a6d commit 080d6ce

File tree

7 files changed

+53
-11
lines changed

7 files changed

+53
-11
lines changed

opentelemetry-sdk/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
asynchronously, it should clone the log data to ensure it can be safely processed without
2727
lifetime issues.
2828

29+
- **Breaking** [1836](https://github.com/open-telemetry/opentelemetry-rust/pull/1836) `SpanProcessor::shutdown` now takes an immutable reference to self. Any reference can call shutdown on the processor. After the first call to `shutdown` the processor will not process any new spans.
30+
2931
## v0.23.0
3032

3133
- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed

opentelemetry-sdk/src/trace/provider.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ mod tests {
269269
}
270270
}
271271

272-
fn shutdown(&mut self) -> TraceResult<()> {
272+
fn shutdown(&self) -> TraceResult<()> {
273273
self.force_flush()
274274
}
275275

opentelemetry-sdk/src/trace/span_processor.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
9292
fn force_flush(&self) -> TraceResult<()>;
9393
/// Shuts down the processor. Called when SDK is shut down. This is an
9494
/// opportunity for processors to do any cleanup required.
95-
fn shutdown(&mut self) -> TraceResult<()>;
95+
fn shutdown(&self) -> TraceResult<()>;
9696
/// Set the resource for the log processor.
9797
fn set_resource(&mut self, _resource: &Resource);
9898
}
@@ -140,7 +140,7 @@ impl SpanProcessor for SimpleSpanProcessor {
140140
Ok(())
141141
}
142142

143-
fn shutdown(&mut self) -> TraceResult<()> {
143+
fn shutdown(&self) -> TraceResult<()> {
144144
if let Ok(mut exporter) = self.exporter.lock() {
145145
exporter.shutdown();
146146
Ok(())
@@ -258,7 +258,7 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
258258
.and_then(|identity| identity)
259259
}
260260

261-
fn shutdown(&mut self) -> TraceResult<()> {
261+
fn shutdown(&self) -> TraceResult<()> {
262262
let (res_sender, res_receiver) = oneshot::channel();
263263
self.message_sender
264264
.try_send(BatchMessage::Shutdown(res_sender))
@@ -709,7 +709,7 @@ mod tests {
709709
#[test]
710710
fn simple_span_processor_on_end_calls_export() {
711711
let exporter = InMemorySpanExporterBuilder::new().build();
712-
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
712+
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
713713
let span_data = new_test_export_span_data();
714714
processor.on_end(span_data.clone());
715715
assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
@@ -741,7 +741,7 @@ mod tests {
741741
#[test]
742742
fn simple_span_processor_shutdown_calls_shutdown() {
743743
let exporter = InMemorySpanExporterBuilder::new().build();
744-
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
744+
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
745745
let span_data = new_test_export_span_data();
746746
processor.on_end(span_data.clone());
747747
assert!(!exporter.get_finished_spans().unwrap().is_empty());
@@ -897,7 +897,7 @@ mod tests {
897897
scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
898898
..Default::default()
899899
};
900-
let mut processor =
900+
let processor =
901901
BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
902902
let handle = tokio::spawn(async move {
903903
loop {
@@ -999,7 +999,7 @@ mod tests {
999999
delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
10001000
delay_fn: async_std::task::sleep,
10011001
};
1002-
let mut processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
1002+
let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
10031003
processor.on_end(new_test_export_span_data());
10041004
let flush_res = processor.force_flush();
10051005
if time_out {
@@ -1023,7 +1023,7 @@ mod tests {
10231023
delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
10241024
delay_fn: tokio::time::sleep,
10251025
};
1026-
let mut processor =
1026+
let processor =
10271027
BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
10281028
tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
10291029
processor.on_end(new_test_export_span_data());

stress/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ tracing = { workspace = true, features = ["std"]}
4141
tracing-core = { workspace = true }
4242
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
4343
num-format = "0.4.4"
44+
sysinfo = { version = "0.30.12", optional = true }
45+
46+
[features]
47+
stats = ["sysinfo"]

stress/README.md

+8
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,11 @@ Throughput: 3,905,200 iterations/sec
3636
Throughput: 4,106,600 iterations/sec
3737
Throughput: 5,075,400 iterations/sec
3838
```
39+
40+
## Feature flags
41+
42+
"stats" - Prints memory and CPU usage. Has slight impact on throughput.
43+
44+
```sh
45+
cargo run --release --bin metrics --feature=stats
46+
```

stress/src/throughput.rs

+29-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33
use std::sync::Arc;
44
use std::thread;
55
use std::time::{Duration, Instant};
6+
#[cfg(feature = "stats")]
7+
use sysinfo::{Pid, System};
68

79
const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds
810
const BATCH_SIZE: u64 = 1000;
@@ -27,7 +29,7 @@ where
2729
})
2830
.expect("Error setting Ctrl-C handler");
2931
let num_threads = num_cpus::get();
30-
println!("Number of threads: {}", num_threads);
32+
println!("Number of threads: {}\n", num_threads);
3133
let mut handles = Vec::with_capacity(num_threads);
3234
let func_arc = Arc::new(func);
3335
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();
@@ -42,6 +44,12 @@ where
4244
let mut start_time = Instant::now();
4345
let mut end_time = start_time;
4446
let mut total_count_old: u64 = 0;
47+
48+
#[cfg(feature = "stats")]
49+
let pid = Pid::from(std::process::id() as usize);
50+
#[cfg(feature = "stats")]
51+
let mut system = System::new_all();
52+
4553
loop {
4654
let elapsed = end_time.duration_since(start_time).as_secs();
4755
if elapsed >= SLIDING_WINDOW_SIZE {
@@ -56,6 +64,26 @@ where
5664
"Throughput: {} iterations/sec",
5765
throughput.to_formatted_string(&Locale::en)
5866
);
67+
68+
#[cfg(feature = "stats")]
69+
{
70+
system.refresh_all();
71+
if let Some(process) = system.process(pid) {
72+
println!(
73+
"Memory usage: {:.2} MB",
74+
process.memory() as f64 / (1024.0 * 1024.0)
75+
);
76+
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
77+
println!(
78+
"Virtual memory usage: {:.2} MB",
79+
process.virtual_memory() as f64 / (1024.0 * 1024.0)
80+
);
81+
} else {
82+
println!("Process not found");
83+
}
84+
}
85+
86+
println!("\n");
5987
start_time = Instant::now();
6088
}
6189

stress/src/traces.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl SpanProcessor for NoOpSpanProcessor {
4242
Ok(())
4343
}
4444

45-
fn shutdown(&mut self) -> TraceResult<()> {
45+
fn shutdown(&self) -> TraceResult<()> {
4646
Ok(())
4747
}
4848

0 commit comments

Comments
 (0)