Skip to content

Commit 32c8af5

Browse files
committed
feat: introduce pprof debugging endpoint
Allow CPU profiling using pprof. There's a new boolean flag `--enable-pprof` that exposes a new HTTP endpoint named `/debug/pprof/cpu`. By doing a GET request against the endpoint, a pprof profile is generated and downloaded. The endpoint takes two optional query parameters: - interval: the length in seconds of the profile (30 secs by default) - frequency: profiling frequency (99 Hz by default) Signed-off-by: Flavio Castelli <fcastelli@suse.com>
1 parent 7f2f59c commit 32c8af5

File tree

8 files changed

+522
-83
lines changed

8 files changed

+522
-83
lines changed

Cargo.lock

+279-70
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ edition = "2021"
1414
anyhow = "1.0"
1515
clap = { version = "4.5", features = ["cargo", "env"] }
1616
daemonize = "0.5"
17+
futures = "0.3"
1718
humansize = "2.1"
1819
itertools = "0.12.1"
1920
k8s-openapi = { version = "0.21.1", default-features = false, features = [
2021
"v1_29",
2122
] }
2223
lazy_static = "1.4.0"
24+
mime = "0.3"
2325
num_cpus = "1.16.0"
2426
opentelemetry-otlp = { version = "0.14.0", features = ["metrics", "tonic"] }
2527
opentelemetry = { version = "0.21", default-features = false, features = [
@@ -28,8 +30,10 @@ opentelemetry = { version = "0.21", default-features = false, features = [
2830
] }
2931
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
3032
procfs = "0.16"
33+
pprof = { version = "0.13", features = ["prost-codec"] }
3134
policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.16.1" }
3235
rayon = "1.9"
36+
regex = "1.10"
3337
serde_json = "1.0"
3438
serde = { version = "1.0", features = ["derive"] }
3539
serde_yaml = "0.9.32"
@@ -42,7 +46,7 @@ tracing-opentelemetry = "0.22.0"
4246
tracing-subscriber = { version = "0.3", features = ["ansi", "fmt", "json"] }
4347
semver = { version = "1.0.22", features = ["serde"] }
4448
mockall_double = "0.3"
45-
axum = { version = "0.7.4", features = ["macros"] }
49+
axum = { version = "0.7.4", features = ["macros", "query"] }
4650
axum-server = { version = "0.6", features = ["tls-rustls"] }
4751
tower-http = { version = "0.5.2", features = ["trace"] }
4852

src/api/handlers.rs

+70-10
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,30 @@
11
use axum::{
2-
extract::{self, FromRequest},
3-
http::StatusCode,
2+
extract::{self, FromRequest, Query},
3+
http::{header, StatusCode},
44
response::IntoResponse,
55
Json,
66
};
77
use policy_evaluator::{
88
admission_request::AdmissionRequest, admission_response::AdmissionResponse,
99
policy_evaluator::ValidateRequest,
1010
};
11-
use serde::Serialize;
11+
12+
use serde::{Deserialize, Serialize};
1213
use std::sync::Arc;
1314
use tokio::task;
1415
use tracing::{debug, error, Span};
1516

16-
use crate::api::{
17-
admission_review::{AdmissionReviewRequest, AdmissionReviewResponse},
18-
api_error::ApiError,
19-
raw_review::{RawReviewRequest, RawReviewResponse},
20-
service::{evaluate, RequestOrigin},
21-
state::ApiServerState,
17+
use crate::{
18+
api::{
19+
admission_review::{AdmissionReviewRequest, AdmissionReviewResponse},
20+
api_error::ApiError,
21+
raw_review::{RawReviewRequest, RawReviewResponse},
22+
service::{evaluate, RequestOrigin},
23+
state::ApiServerState,
24+
},
25+
profiling,
2226
};
23-
use crate::evaluation::errors::EvaluationError;
27+
use crate::{evaluation::errors::EvaluationError, profiling::ReportGenerationError};
2428

2529
// create an extractor that internally uses `axum::Json` but has a custom rejection
2630
#[derive(FromRequest)]
@@ -169,6 +173,50 @@ pub(crate) async fn validate_raw_handler(
169173
Ok(Json(RawReviewResponse::new(response)))
170174
}
171175

176+
#[derive(Deserialize)]
177+
pub(crate) struct ProfileParams {
178+
/// profiling frequency (Hz)
179+
#[serde(default = "profiling::default_profiling_frequency")]
180+
pub frequency: i32,
181+
182+
/// profiling time interval (seconds)
183+
#[serde(default = "profiling::default_profiling_interval")]
184+
pub interval: u64,
185+
}
186+
187+
// Generate a pprof CPU profile using google's pprof format
188+
// The report is generated and sent to the user as binary data
189+
pub(crate) async fn pprof_get_cpu(
190+
profiling_params: Query<ProfileParams>,
191+
) -> Result<impl axum::response::IntoResponse, (StatusCode, ApiError)> {
192+
let frequency = profiling_params.frequency;
193+
let interval = profiling_params.interval;
194+
195+
let end = async move {
196+
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
197+
Ok(())
198+
};
199+
200+
let body = profiling::start_one_cpu_profile(end, frequency)
201+
.await
202+
.map_err(handle_pprof_error)?;
203+
204+
let mut headers = header::HeaderMap::new();
205+
headers.insert(
206+
header::CONTENT_DISPOSITION,
207+
r#"attachment; filename="cpu_profile"#.parse().unwrap(),
208+
);
209+
headers.insert(
210+
header::CONTENT_LENGTH,
211+
body.len().to_string().parse().unwrap(),
212+
);
213+
headers.insert(
214+
header::CONTENT_TYPE,
215+
mime::APPLICATION_OCTET_STREAM.to_string().parse().unwrap(),
216+
);
217+
218+
Ok((headers, body))
219+
}
172220
pub(crate) async fn readiness_handler() -> StatusCode {
173221
StatusCode::OK
174222
}
@@ -261,3 +309,15 @@ fn handle_evaluation_error(error: EvaluationError) -> (StatusCode, ApiError) {
261309
}
262310
}
263311
}
312+
313+
fn handle_pprof_error(error: ReportGenerationError) -> (StatusCode, ApiError) {
314+
error!("pprof error: {}", error);
315+
316+
(
317+
StatusCode::INTERNAL_SERVER_ERROR,
318+
ApiError {
319+
status: StatusCode::INTERNAL_SERVER_ERROR,
320+
message: "Something went wrong".to_owned(),
321+
},
322+
)
323+
}

src/cli.rs

+5
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ pub(crate) fn build_cli() -> Command {
160160
.env("KUBEWARDEN_IGNORE_KUBERNETES_CONNECTION_FAILURE")
161161
.action(ArgAction::SetTrue)
162162
.help("Do not exit with an error if the Kubernetes connection fails. This will cause context aware policies to break when there's no connection with Kubernetes."),
163+
Arg::new("enable-pprof")
164+
.long("enable-pprof")
165+
.env("KUBEWARDEN_ENABLE_PPROF")
166+
.action(ArgAction::SetTrue)
167+
.help("Enable pprof profiling"),
163168
];
164169
args.sort_by(|a, b| a.get_id().cmp(b.get_id()));
165170

src/config.rs

+6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct Config {
4141
pub log_fmt: String,
4242
pub log_no_color: bool,
4343
pub daemon: bool,
44+
pub enable_pprof: bool,
4445
pub daemon_pid_file: String,
4546
pub daemon_stdout_file: Option<String>,
4647
pub daemon_stderr_file: Option<String>,
@@ -131,6 +132,10 @@ impl Config {
131132
key_file,
132133
})
133134
};
135+
let enable_pprof = matches
136+
.get_one::<bool>("enable-pprof")
137+
.expect("clap should have assigned a default value")
138+
.to_owned();
134139

135140
Ok(Self {
136141
addr,
@@ -152,6 +157,7 @@ impl Config {
152157
daemon_pid_file,
153158
daemon_stdout_file,
154159
daemon_stderr_file,
160+
enable_pprof,
155161
})
156162
}
157163
}

src/lib.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod test_utils;
77
pub mod api;
88
pub mod config;
99
pub mod metrics;
10+
pub mod profiling;
1011
pub mod tracing;
1112

1213
use ::tracing::{debug, error, info, Level};
@@ -29,7 +30,7 @@ use tokio::time;
2930
use tower_http::trace::{self, TraceLayer};
3031

3132
use crate::api::handlers::{
32-
audit_handler, readiness_handler, validate_handler, validate_raw_handler,
33+
audit_handler, pprof_get_cpu, readiness_handler, validate_handler, validate_raw_handler,
3334
};
3435
use crate::api::state::ApiServerState;
3536
use crate::evaluation::{
@@ -176,7 +177,7 @@ impl PolicyServer {
176177
None
177178
};
178179

179-
let router = Router::new()
180+
let mut router = Router::new()
180181
.route("/audit/:policy_id", post(audit_handler))
181182
.route("/validate/:policy_id", post(validate_handler))
182183
.route("/validate_raw/:policy_id", post(validate_raw_handler))
@@ -187,6 +188,10 @@ impl PolicyServer {
187188
.make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
188189
.on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
189190
);
191+
if config.enable_pprof {
192+
let pprof_router = Router::new().route("/debug/pprof/cpu", get(pprof_get_cpu));
193+
router = Router::new().merge(router).merge(pprof_router);
194+
}
190195

191196
Ok(Self {
192197
router,

src/profiling.rs

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use futures::{
2+
future::BoxFuture,
3+
task::{Context, Poll},
4+
Future, FutureExt,
5+
};
6+
use lazy_static::lazy_static;
7+
use pprof::protos::Message;
8+
use regex::Regex;
9+
use std::{pin::Pin, sync::Mutex};
10+
use thiserror::Error;
11+
12+
lazy_static! {
13+
// If it's some it means there are already a CPU profiling.
14+
static ref CPU_PROFILE_ACTIVE: Mutex<Option<()>> = Mutex::new(None);
15+
16+
// To normalize thread names.
17+
static ref THREAD_NAME_RE: Regex =
18+
Regex::new(r"^(?P<thread_name>[a-z-_ :]+?)(-?\d)*$").unwrap();
19+
static ref THREAD_NAME_REPLACE_SEPERATOR_RE: Regex = Regex::new(r"[_ ]").unwrap();
20+
}
21+
22+
#[derive(Debug, Error)]
23+
pub enum ReportGenerationError {
24+
#[error("CPU profile already running")]
25+
CPUAlreadyProfiling,
26+
27+
#[error("pprof error: {0}")]
28+
PprofError(#[from] pprof::Error),
29+
30+
#[error("cannot encode report to pprof format: {0}")]
31+
EncodeError(String),
32+
}
33+
34+
/// Default frequency of sampling. 99Hz to avoid coincide with special periods
35+
pub fn default_profiling_frequency() -> i32 {
36+
99
37+
}
38+
39+
/// Default profiling interval time - 30 seconds
40+
pub fn default_profiling_interval() -> u64 {
41+
30
42+
}
43+
44+
/// Trigger one cpu profile.
45+
pub async fn start_one_cpu_profile<F>(
46+
end: F,
47+
frequency: i32,
48+
) -> Result<Vec<u8>, ReportGenerationError>
49+
where
50+
F: Future<Output = Result<(), ReportGenerationError>> + Send + 'static,
51+
{
52+
if CPU_PROFILE_ACTIVE.lock().unwrap().is_some() {
53+
return Err(ReportGenerationError::CPUAlreadyProfiling);
54+
}
55+
56+
let on_start = || {
57+
let mut activate = CPU_PROFILE_ACTIVE.lock().unwrap();
58+
assert!(activate.is_none());
59+
*activate = Some(());
60+
let guard = pprof::ProfilerGuardBuilder::default()
61+
.frequency(frequency)
62+
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
63+
.build()?;
64+
Ok(guard)
65+
};
66+
67+
let on_end = move |guard: pprof::ProfilerGuard<'static>| {
68+
let report = guard
69+
.report()
70+
.frames_post_processor(move |frames| {
71+
let name = extract_thread_name(&frames.thread_name);
72+
frames.thread_name = name;
73+
})
74+
.build()?;
75+
let mut body = Vec::new();
76+
let profile = report.pprof()?;
77+
78+
profile
79+
.encode(&mut body)
80+
.map_err(|e| ReportGenerationError::EncodeError(e.to_string()))?;
81+
82+
drop(guard);
83+
*CPU_PROFILE_ACTIVE.lock().unwrap() = None;
84+
85+
Ok(body)
86+
};
87+
88+
ProfileRunner::new(on_start, on_end, end.boxed())?.await
89+
}
90+
91+
fn extract_thread_name(thread_name: &str) -> String {
92+
THREAD_NAME_RE
93+
.captures(thread_name)
94+
.and_then(|cap| {
95+
cap.name("thread_name").map(|thread_name| {
96+
THREAD_NAME_REPLACE_SEPERATOR_RE
97+
.replace_all(thread_name.as_str(), "-")
98+
.into_owned()
99+
})
100+
})
101+
.unwrap_or_else(|| thread_name.to_owned())
102+
}
103+
104+
type OnEndFn<I, T> = Box<dyn FnOnce(I) -> Result<T, ReportGenerationError> + Send + 'static>;
105+
106+
struct ProfileRunner<I, T> {
107+
item: Option<I>,
108+
on_end: Option<OnEndFn<I, T>>,
109+
end: BoxFuture<'static, Result<(), ReportGenerationError>>,
110+
}
111+
112+
impl<I, T> Unpin for ProfileRunner<I, T> {}
113+
114+
impl<I, T> ProfileRunner<I, T> {
115+
fn new<F1, F2>(
116+
on_start: F1,
117+
on_end: F2,
118+
end: BoxFuture<'static, Result<(), ReportGenerationError>>,
119+
) -> Result<Self, ReportGenerationError>
120+
where
121+
F1: FnOnce() -> Result<I, ReportGenerationError>,
122+
F2: FnOnce(I) -> Result<T, ReportGenerationError> + Send + 'static,
123+
{
124+
let item = on_start()?;
125+
Ok(ProfileRunner {
126+
item: Some(item),
127+
on_end: Some(Box::new(on_end) as OnEndFn<I, T>),
128+
end,
129+
})
130+
}
131+
}
132+
133+
impl<I, T> Future for ProfileRunner<I, T> {
134+
type Output = Result<T, ReportGenerationError>;
135+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136+
match self.end.as_mut().poll(cx) {
137+
Poll::Ready(res) => {
138+
let item = self.item.take().unwrap();
139+
let on_end = self.on_end.take().unwrap();
140+
let r = match (res, on_end(item)) {
141+
(Ok(_), r) => r,
142+
(Err(errmsg), _) => Err(errmsg),
143+
};
144+
Poll::Ready(r)
145+
}
146+
Poll::Pending => Poll::Pending,
147+
}
148+
}
149+
}

tests/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub(crate) async fn app() -> Router {
6969
daemon_pid_file: "policy_server.pid".to_owned(),
7070
daemon_stdout_file: None,
7171
daemon_stderr_file: None,
72+
enable_pprof: true,
7273
};
7374

7475
let server = PolicyServer::new_from_config(config).await.unwrap();

0 commit comments

Comments
 (0)