Skip to content

Commit 4957840

Browse files
authored
Merge branch 'main' into otel-otlp-http-timeout
2 parents 4deb02a + aece641 commit 4957840

File tree

7 files changed

+494
-361
lines changed

7 files changed

+494
-361
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
pub mod logs_asserter;
2-
pub mod metrics_asserter;
2+
pub mod metric_helpers;
33
pub mod test_utils;
44
pub mod trace_asserter;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
#![cfg(unix)]
2+
use crate::test_utils;
3+
use anyhow::Result;
4+
use anyhow::{Context, Ok};
5+
use opentelemetry_otlp::MetricExporter;
6+
use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider};
7+
use opentelemetry_sdk::Resource;
8+
use serde_json::Value;
9+
use std::fs;
10+
use std::fs::File;
11+
use std::io::BufReader;
12+
use std::io::Read;
13+
use std::time::Duration;
14+
use tracing::info;
15+
16+
static RESULT_PATH: &str = "actual/metrics.json";
17+
pub const SLEEP_DURATION: Duration = Duration::from_secs(5);
18+
19+
///
20+
/// Creates an exporter using the appropriate HTTP or gRPC client based on
21+
/// the configured features.
22+
///
23+
fn create_exporter() -> MetricExporter {
24+
let exporter_builder = MetricExporter::builder();
25+
26+
#[cfg(feature = "tonic-client")]
27+
let exporter_builder = exporter_builder.with_tonic();
28+
#[cfg(not(feature = "tonic-client"))]
29+
#[cfg(any(
30+
feature = "hyper-client",
31+
feature = "reqwest-client",
32+
feature = "reqwest-blocking-client"
33+
))]
34+
let exporter_builder = exporter_builder.with_http();
35+
36+
exporter_builder
37+
.build()
38+
.expect("Failed to build MetricExporter")
39+
}
40+
41+
/// Initializes the OpenTelemetry metrics pipeline
42+
fn init_meter_provider() -> SdkMeterProvider {
43+
let exporter = create_exporter();
44+
let reader = PeriodicReader::builder(exporter).build();
45+
let resource = Resource::builder_empty()
46+
.with_service_name("metrics-integration-test")
47+
.build();
48+
let meter_provider = MeterProviderBuilder::default()
49+
.with_resource(resource)
50+
.with_reader(reader)
51+
.build();
52+
opentelemetry::global::set_meter_provider(meter_provider.clone());
53+
meter_provider
54+
}
55+
56+
///
57+
/// Performs setup for metrics tests using the Tokio runtime.
58+
///
59+
pub async fn setup_metrics_tokio() -> SdkMeterProvider {
60+
let _ = test_utils::start_collector_container().await;
61+
// Truncate results
62+
_ = File::create(RESULT_PATH).expect("it's good");
63+
info!("Truncated metrics file");
64+
65+
init_meter_provider()
66+
}
67+
68+
///
69+
/// Performs setup for metrics tests.
70+
///
71+
pub fn setup_metrics_non_tokio(
72+
initialize_metric_in_tokio: bool,
73+
) -> (SdkMeterProvider, tokio::runtime::Runtime) {
74+
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
75+
let meter_provider: SdkMeterProvider = if initialize_metric_in_tokio {
76+
// Initialize the logger provider inside the Tokio runtime
77+
rt.block_on(async {
78+
// Setup the collector container inside Tokio runtime
79+
let _ = test_utils::start_collector_container().await;
80+
init_meter_provider()
81+
})
82+
} else {
83+
rt.block_on(async {
84+
let _ = test_utils::start_collector_container().await;
85+
});
86+
87+
// Initialize the logger provider outside the Tokio runtime
88+
init_meter_provider()
89+
};
90+
91+
(meter_provider, rt)
92+
}
93+
94+
///
95+
/// Check that the results contain the given string.
96+
///
97+
pub fn assert_metrics_results_contains(expected_content: &str) -> Result<()> {
98+
// let contents = fs::read_to_string(test_utils::METRICS_FILE)?;
99+
let file = File::open(test_utils::METRICS_FILE)?;
100+
let mut contents = String::new();
101+
let mut reader = std::io::BufReader::new(&file);
102+
reader.read_to_string(&mut contents)?;
103+
assert!(
104+
contents.contains(expected_content),
105+
"Expected content {} not found in actual content {}",
106+
expected_content,
107+
contents
108+
);
109+
Ok(())
110+
}
111+
112+
///
113+
/// Retrieves the latest metrics for the given scope. Each test should use
114+
/// its own scope, so that we can easily pull the data for it out from the rest
115+
/// of the data.
116+
///
117+
/// This will also retrieve the resource attached to the scope.
118+
///
119+
pub fn fetch_latest_metrics_for_scope(scope_name: &str) -> Result<Value> {
120+
// Open the file and fetch the contents
121+
let contents = fs::read_to_string(test_utils::METRICS_FILE)?;
122+
123+
// Find the last parseable metrics line that contains the desired scope
124+
let json_line = contents
125+
.lines()
126+
.rev()
127+
.find_map(|line| {
128+
// Attempt to parse the line as JSON
129+
serde_json::from_str::<Value>(line)
130+
.ok()
131+
.and_then(|mut json_line| {
132+
// Check if it contains the specified scope
133+
if let Some(resource_metrics) = json_line
134+
.get_mut("resourceMetrics")
135+
.and_then(|v| v.as_array_mut())
136+
{
137+
resource_metrics.retain_mut(|resource| {
138+
if let Some(scope_metrics) = resource
139+
.get_mut("scopeMetrics")
140+
.and_then(|v| v.as_array_mut())
141+
{
142+
scope_metrics.retain(|scope| {
143+
scope
144+
.get("scope")
145+
.and_then(|s| s.get("name"))
146+
.and_then(|name| name.as_str())
147+
.map_or(false, |n| n == scope_name)
148+
});
149+
150+
// Keep the resource only if it has any matching `ScopeMetrics`
151+
!scope_metrics.is_empty()
152+
} else {
153+
false
154+
}
155+
});
156+
157+
// If any resource metrics remain, return this line
158+
if !resource_metrics.is_empty() {
159+
return Some(json_line);
160+
}
161+
}
162+
163+
None
164+
})
165+
})
166+
.with_context(|| {
167+
format!(
168+
"No valid JSON line containing scope `{}` found.",
169+
scope_name
170+
)
171+
})?;
172+
173+
Ok(json_line)
174+
}
175+
176+
///
177+
/// Check that the metrics for the given scope match what we expect. This
178+
/// includes zeroing out timestamps, which we reasonably expect not to match.
179+
///
180+
pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> {
181+
// Define the results file path
182+
let results_file_path = format!("./expected/metrics/{}.json", scope_name);
183+
184+
// Fetch the actual metrics for the given scope
185+
let actual_metrics = fetch_latest_metrics_for_scope(scope_name)
186+
.context(format!("Failed to fetch metrics for scope: {}", scope_name))?;
187+
188+
// Read the expected metrics from the results file
189+
let expected_metrics = {
190+
let file = File::open(&results_file_path).context(format!(
191+
"Failed to open results file: {}",
192+
results_file_path
193+
))?;
194+
read_metrics_from_json(file)
195+
}?;
196+
197+
// Compare the actual metrics with the expected metrics
198+
MetricsAsserter::new(actual_metrics, expected_metrics).assert();
199+
200+
Ok(())
201+
}
202+
203+
pub fn read_metrics_from_json(file: File) -> Result<Value> {
204+
// Create a buffered reader for the file
205+
let mut reader = BufReader::new(file);
206+
let mut contents = String::new();
207+
208+
// Read the file contents into a string
209+
reader
210+
.read_to_string(&mut contents)
211+
.expect("Failed to read json file");
212+
213+
// Parse the contents into a JSON Value
214+
let metrics_data: Value = serde_json::from_str(&contents)?;
215+
Ok(metrics_data)
216+
}
217+
218+
pub struct MetricsAsserter {
219+
results: Value,
220+
expected: Value,
221+
}
222+
223+
impl MetricsAsserter {
224+
pub fn new(results: Value, expected: Value) -> Self {
225+
MetricsAsserter { results, expected }
226+
}
227+
228+
pub fn assert(mut self) {
229+
// Normalize JSON by cleaning out timestamps
230+
Self::zero_out_timestamps(&mut self.results);
231+
Self::zero_out_timestamps(&mut self.expected);
232+
233+
// Perform the assertion
234+
assert_eq!(
235+
self.results, self.expected,
236+
"Metrics did not match. Results: {:#?}, Expected: {:#?}",
237+
self.results, self.expected
238+
);
239+
}
240+
241+
/// Recursively removes or zeros out timestamp fields in the JSON
242+
fn zero_out_timestamps(value: &mut Value) {
243+
match value {
244+
Value::Object(map) => {
245+
for (key, val) in map.iter_mut() {
246+
if key == "startTimeUnixNano" || key == "timeUnixNano" {
247+
*val = Value::String("0".to_string());
248+
} else {
249+
Self::zero_out_timestamps(val);
250+
}
251+
}
252+
}
253+
Value::Array(array) => {
254+
for item in array.iter_mut() {
255+
Self::zero_out_timestamps(item);
256+
}
257+
}
258+
_ => {}
259+
}
260+
}
261+
}

opentelemetry-otlp/tests/integration_test/src/metrics_asserter.rs

-64
This file was deleted.

opentelemetry-otlp/tests/integration_test/src/test_utils.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ pub async fn start_collector_container() -> Result<()> {
9999
.await?;
100100

101101
let container = Arc::new(container_instance);
102-
otel_debug!(
102+
otel_info!(
103103
name: "Container started",
104104
ports = format!("{:?}", container.ports().await));
105105

@@ -108,6 +108,8 @@ pub async fn start_collector_container() -> Result<()> {
108108

109109
// Store the container in COLLECTOR_ARC
110110
*arc_guard = Some(Arc::clone(&container));
111+
} else {
112+
otel_info!(name: "OTel Collector already running");
111113
}
112114

113115
Ok(())

0 commit comments

Comments
 (0)