Skip to content

Commit

Permalink
feat(metrics): add custom bucket support for http metrics layer
Browse files Browse the repository at this point in the history
  • Loading branch information
ttys3 committed Feb 13, 2025
1 parent 2c6834f commit 89ed763
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ opentelemetry-prometheus = { version = "0.28.0"}
opentelemetry_sdk = { version = "0.28.0", features = ["rt-tokio"] }
prometheus = "0.13.4"
tokio = { version = "1.43", features = ["macros"] }
axum-test = "17.2.0"
236 changes: 163 additions & 73 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ impl Default for PathSkipper {
pub struct HttpMetricsLayerBuilder {
skipper: PathSkipper,
is_tls: bool,
duration_buckets: Option<Vec<f64>>,
size_buckets: Option<Vec<f64>>,
}

impl HttpMetricsLayerBuilder {
Expand All @@ -216,6 +218,16 @@ impl HttpMetricsLayerBuilder {
self
}

pub fn with_duration_buckets(mut self, buckets: Vec<f64>) -> Self {
self.duration_buckets = Some(buckets);
self
}

pub fn with_size_buckets(mut self, buckets: Vec<f64>) -> Self {
self.size_buckets = Some(buckets);
self
}

pub fn build(self) -> HttpMetricsLayer {
let provider = global::meter_provider();
let meter = provider.meter_with_scope(
Expand All @@ -224,27 +236,33 @@ impl HttpMetricsLayerBuilder {
.build(),
);

// request_duration_seconds
let duration_buckets = self.duration_buckets.unwrap_or_else(||
HTTP_REQ_DURATION_HISTOGRAM_BUCKETS.to_vec()
);

let size_buckets = self.size_buckets.unwrap_or_else(||
HTTP_REQ_SIZE_HISTOGRAM_BUCKETS.to_vec()
);

let req_duration = meter
.f64_histogram("http.server.request.duration")
.with_unit("s")
.with_description("The HTTP request latencies in seconds.")
.with_boundaries(HTTP_REQ_DURATION_HISTOGRAM_BUCKETS.to_vec())
.with_boundaries(duration_buckets)
.build();

// request_size_bytes
let req_size = meter
.u64_histogram("http.server.request.size")
.with_unit("By")
.with_description("The HTTP request sizes in bytes.")
.with_boundaries(HTTP_REQ_SIZE_HISTOGRAM_BUCKETS.to_vec())
.with_boundaries(size_buckets.clone())
.build();

let res_size = meter
.u64_histogram("http.server.response.size")
.with_unit("By")
.with_description("The HTTP response sizes in bytes.")
.with_boundaries(HTTP_REQ_SIZE_HISTOGRAM_BUCKETS.to_vec())
.with_boundaries(size_buckets)
.build();

// no u64_up_down_counter because up_down_counter maybe < 0 since it allow negative values
Expand Down Expand Up @@ -443,16 +461,26 @@ where

#[cfg(test)]
mod tests {
use crate::HttpMetricsLayer;
use crate::HttpMetricsLayerBuilder;
use axum::extract::State;
use crate::HTTP_REQ_DURATION_HISTOGRAM_BUCKETS;
use crate::HTTP_REQ_SIZE_HISTOGRAM_BUCKETS;
use axum::routing::get;
use axum::Router;
use axum_test::TestServer;
use opentelemetry::{global, Context, KeyValue};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, Registry, TextEncoder};
use std::sync::Arc;


async fn metrics_handler() -> String {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}

#[tokio::test]
async fn test_prometheus_exporter() {
let _cx = Context::current();
Expand Down Expand Up @@ -487,87 +515,149 @@ mod tests {
println!("{}", String::from_utf8(result).unwrap());
}


#[tokio::test]
async fn test_prom_exporter_builder() {
let metrics = HttpMetricsLayerBuilder::new().build();
let _app = Router::<HttpMetricsLayer>::new()
// export metrics at `/metrics` endpoint
.route(
"/metrics",
get(|| async {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}),
)
.route("/", get(handler))
.route("/hello", get(handler))
.route("/world", get(handler))
async fn test_builder_with_arced_skipper() {
let exporter = opentelemetry_prometheus::exporter().with_registry(prometheus::default_registry().clone()).build().unwrap();
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
global::set_meter_provider(provider.clone());

let metrics = HttpMetricsLayerBuilder::new()
.with_skipper(crate::PathSkipper::new_with_fn(Arc::new(|s: &str| s.starts_with("/skip"))))
.build();

let app: Router = Router::new()
.route("/metrics", get(metrics_handler))
.route("/skip", get(|| async { "skip this handler" }))
.route("/record", get(|| async { "record this handler" }))
// add the metrics middleware
.layer(metrics);

async fn handler() -> &'static str {
"<h1>Hello, World!</h1>"
}
// Create test server
let server = TestServer::new(app).unwrap();

// Make a test request
let response = server.get("/skip").await;
assert_eq!(response.status_code(), 200);

let response = server.get("/record").await;
assert_eq!(response.status_code(), 200);
println!("/record response: {:}", String::from_utf8(response.as_bytes().to_vec()).unwrap());

let response = server.get("/metrics").await;
assert_eq!(response.status_code(), 200);

let metrics_str = String::from_utf8(response.as_bytes().to_vec()).unwrap();
println!("/metrics response: {:}", metrics_str);

assert!(!metrics_str.contains("http_route=\"/skip\""));
assert!(metrics_str.contains("http_route=\"/record\""));
}

#[tokio::test]
async fn test_builder_with_state_router() {
#[derive(Clone)]
struct AppState {}
async fn test_custom_buckets() {
// Custom buckets for testing
let custom_duration_buckets = vec![0.11, 0.22, 0.33, 0.44];
let custom_size_buckets = vec![1024.0, 4096.0, 16384.0];

let metrics = HttpMetricsLayerBuilder::new().build();
let _app: Router<AppState> = Router::new()
.route(
"/metrics",
get(|| async {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}),
)
.route("/", get(handler))
.route("/hello", get(handler))
.route("/world", get(handler))
// add the metrics middleware
.layer(metrics)
.with_state(AppState {});
let registry = Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
global::set_meter_provider(provider.clone());

// Create metrics layer with custom buckets
let metrics = HttpMetricsLayerBuilder::new()
.with_duration_buckets(custom_duration_buckets.clone())
.with_size_buckets(custom_size_buckets.clone())
.build();

let app = Router::<()>::new()
.route("/metrics", get(metrics_handler))
.route("/test", get(|| async { "test" }))
.layer(metrics);

// Create test server
let server = TestServer::new(app).unwrap();

// Make a test request
let response = server.get("/test").await;
assert_eq!(response.status_code(), 200);

async fn handler(_state: State<AppState>) -> &'static str {
"<h1>Hello, World!</h1>"
// Get the metrics output
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut output = Vec::new();
encoder.encode(&metric_families, &mut output).unwrap();
let metrics_str = String::from_utf8(output).unwrap();

// print the metrics output
println!("metrics_str: {:?}", metrics_str);

// Verify that our custom buckets are present in the output
// Duration buckets
for bucket in custom_duration_buckets {
assert!(
metrics_str.contains(&format!("le=\"{}\"", bucket)),
"Duration bucket {} not found in metrics output", bucket
);
}

// Size buckets
for bucket in custom_size_buckets {
assert!(
metrics_str.contains(&format!("le=\"{}\"", bucket)),
"Size bucket {} not found in metrics output", bucket
);
}
}

#[tokio::test]
async fn test_builder_with_arced_skipper() {
#[derive(Clone)]
struct AppState {}
async fn test_default_buckets() {
let registry = Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
global::set_meter_provider(provider.clone());

let metrics = HttpMetricsLayerBuilder::new()
.with_skipper(crate::PathSkipper::new_with_fn(Arc::new(|_: &str| true)))
.build();
let _app: Router<AppState> = Router::new()
.route(
"/metrics",
get(|| async {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
// return metrics
String::from_utf8(buffer).unwrap()
}),
)
.route("/", get(handler))
// add the metrics middleware
.layer(metrics)
.with_state(AppState {});
// Create metrics layer with default buckets
let metrics = HttpMetricsLayerBuilder::new().build();

let app = Router::<()>::new()
.route("/test", get(|| async { "test" }))
.layer(metrics);

// Create test server
let server = TestServer::new(app).unwrap();

// Make a test request
let response = server.get("/test").await;
assert_eq!(response.status_code(), 200);

// Get the metrics output
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut output = Vec::new();
encoder.encode(&metric_families, &mut output).unwrap();
let metrics_str = String::from_utf8(output).unwrap();

// Verify that default buckets are present
for bucket in HTTP_REQ_DURATION_HISTOGRAM_BUCKETS {
assert!(
metrics_str.contains(&format!("le=\"{}\"", bucket)),
"Default duration bucket {} not found in metrics output", bucket
);
}

async fn handler(_state: State<AppState>) -> &'static str {
"<h1>Hello, World!</h1>"
for bucket in HTTP_REQ_SIZE_HISTOGRAM_BUCKETS {
assert!(
metrics_str.contains(&format!("le=\"{}\"", bucket)),
"Default size bucket {} not found in metrics output", bucket
);
}
}
}

0 comments on commit 89ed763

Please sign in to comment.