Skip to content

Commit b2c8912

Browse files
authored
Merge pull request #1080 from fabriziosestito/feat/readiness-probe-http
feat: expose readiness probe over http
2 parents c9410c7 + 6d36ffe commit b2c8912

File tree

6 files changed

+66
-68
lines changed

6 files changed

+66
-68
lines changed

cli-docs.md

+3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ This document contains the help content for the `policy-server` command-line pro
6767
Default value: `2`
6868
* `--port <PORT>` — Listen on PORT
6969

70+
Default value: `3000`
71+
* `--readiness-probe-port <READINESS_PROBE_PORT>` — Expose readiness endpoint on READINESS_PROBE_PORT
72+
7073
Default value: `3000`
7174
* `--sigstore-cache-dir <SIGSTORE_CACHE_DIR>` — Directory used to cache sigstore data
7275

src/cli.rs

+7
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ pub(crate) fn build_cli() -> Command {
6868
.env("KUBEWARDEN_PORT")
6969
.help("Listen on PORT"),
7070

71+
Arg::new("readiness-probe-port")
72+
.long("readiness-probe-port")
73+
.value_name("READINESS_PROBE_PORT")
74+
.default_value("3000")
75+
.env("KUBEWARDEN_READINESS_PROBE_PORT")
76+
.help("Expose readiness endpoint on READINESS_PROBE_PORT"),
77+
7178
Arg::new("workers")
7279
.long("workers")
7380
.value_name("WORKERS_NUMBER")

src/config.rs

+13
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ lazy_static! {
2828

2929
pub struct Config {
3030
pub addr: SocketAddr,
31+
pub readiness_probe_addr: SocketAddr,
3132
pub sources: Option<Sources>,
3233
pub policies: HashMap<String, PolicyOrPolicyGroup>,
3334
pub policies_download_dir: PathBuf,
@@ -60,6 +61,7 @@ impl Config {
6061
pub fn from_args(matches: &ArgMatches) -> Result<Self> {
6162
// init some variables based on the cli parameters
6263
let addr = api_bind_address(matches)?;
64+
let readiness_probe_addr = readiness_probe_bind_address(matches)?;
6365

6466
let policies = policies(matches)?;
6567
let policies_download_dir = matches
@@ -142,6 +144,7 @@ impl Config {
142144

143145
Ok(Self {
144146
addr,
147+
readiness_probe_addr,
145148
sources,
146149
policies,
147150
policies_download_dir,
@@ -176,6 +179,16 @@ fn api_bind_address(matches: &clap::ArgMatches) -> Result<SocketAddr> {
176179
.map_err(|e| anyhow!("error parsing arguments: {}", e))
177180
}
178181

182+
fn readiness_probe_bind_address(matches: &clap::ArgMatches) -> Result<SocketAddr> {
183+
format!(
184+
"{}:{}",
185+
matches.get_one::<String>("address").unwrap(),
186+
matches.get_one::<String>("readiness-probe-port").unwrap()
187+
)
188+
.parse()
189+
.map_err(|e| anyhow!("error parsing arguments: {}", e))
190+
}
191+
179192
fn build_tls_config(matches: &clap::ArgMatches) -> Result<TlsConfig> {
180193
let cert_file = matches.get_one::<String>("cert-file").unwrap().to_owned();
181194
let key_file = matches.get_one::<String>("key-file").unwrap().to_owned();

src/lib.rs

+32-17
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use profiling::activate_memory_profiling;
3434
use rayon::prelude::*;
3535
use std::{fs, net::SocketAddr, sync::Arc};
3636
use tokio::{
37-
sync::{oneshot, Semaphore},
37+
sync::{oneshot, Notify, Semaphore},
3838
time,
3939
};
4040
use tower_http::trace::{self, TraceLayer};
@@ -65,10 +65,12 @@ pub static malloc_conf: &[u8] = b"background_thread:true,tcache_max:4096,dirty_d
6565

6666
pub struct PolicyServer {
6767
router: Router,
68+
readiness_probe_router: Router,
6869
callback_handler: CallbackHandler,
6970
callback_handler_shutdown_channel_tx: oneshot::Sender<()>,
7071
addr: SocketAddr,
7172
tls_config: Option<RustlsConfig>,
73+
readiness_probe_addr: SocketAddr,
7274
}
7375

7476
impl PolicyServer {
@@ -211,10 +213,7 @@ impl PolicyServer {
211213
TraceLayer::new_for_http()
212214
.make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
213215
.on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
214-
)
215-
// Adding the readiness probe handler after the tracing layer to avoid logging
216-
// See: https://github.com/tokio-rs/axum/discussions/355
217-
.route("/readiness", get(readiness_handler));
216+
);
218217

219218
if config.enable_pprof {
220219
activate_memory_profiling().await?;
@@ -225,40 +224,56 @@ impl PolicyServer {
225224
router = Router::new().merge(router).merge(pprof_router);
226225
}
227226

227+
let readiness_probe_router = Router::new().route("/readiness", get(readiness_handler));
228+
228229
Ok(Self {
229230
router,
231+
readiness_probe_router,
230232
callback_handler,
231233
callback_handler_shutdown_channel_tx,
232234
addr: config.addr,
233235
tls_config,
236+
readiness_probe_addr: config.readiness_probe_addr,
234237
})
235238
}
236239

237240
pub async fn run(self) -> Result<()> {
238-
// Start the CallbackHandler
241+
let notify = Notify::new();
242+
239243
let mut callback_handler = self.callback_handler;
240244
let callback_handler = tokio::spawn(async move {
241245
info!(status = "init", "CallbackHandler task");
242246
callback_handler.loop_eval().await;
243247
info!(status = "exit", "CallbackHandler task");
244248
});
245249

246-
if let Some(tls_config) = self.tls_config {
247-
axum_server::bind_rustls(self.addr, tls_config)
248-
.serve(self.router.into_make_service())
249-
.await?;
250-
} else {
251-
axum_server::bind(self.addr)
252-
.serve(self.router.into_make_service())
253-
.await?;
250+
let api_server = async {
251+
if let Some(tls_config) = self.tls_config {
252+
let server_with_tls = axum_server::bind_rustls(self.addr, tls_config);
253+
notify.notify_one();
254+
255+
server_with_tls.serve(self.router.into_make_service()).await
256+
} else {
257+
let server = axum_server::bind(self.addr);
258+
notify.notify_one();
259+
260+
server.serve(self.router.into_make_service()).await
261+
}
262+
};
263+
264+
let readiness_probe_server = async {
265+
notify.notified().await;
266+
267+
axum_server::bind(self.readiness_probe_addr)
268+
.serve(self.readiness_probe_router.into_make_service())
269+
.await
254270
};
255271

256-
// Stop the CallbackHandler
272+
tokio::try_join!(api_server, readiness_probe_server)?;
273+
257274
self.callback_handler_shutdown_channel_tx
258275
.send(())
259276
.expect("Cannot send shutdown signal to CallbackHandler");
260-
261-
// Wait for the CallbackHandler to exit
262277
callback_handler
263278
.await
264279
.expect("Cannot wait for CallbackHandler to exit");

tests/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ pub(crate) fn default_test_config() -> Config {
106106

107107
Config {
108108
addr: SocketAddr::from(([127, 0, 0, 1], 3001)),
109+
readiness_probe_addr: SocketAddr::from(([127, 0, 0, 1], 3002)),
109110
sources: None,
110111
policies,
111112
policies_download_dir: tempdir().unwrap().into_path(),

tests/integration_test.rs

+10-51
Original file line numberDiff line numberDiff line change
@@ -654,23 +654,11 @@ mod certificate_reload_helpers {
654654
}
655655
}
656656

657-
pub async fn policy_server_is_ready(
658-
address: &str,
659-
client_tls_pem_bundle: Option<String>,
660-
) -> anyhow::Result<StatusCode> {
657+
pub async fn policy_server_is_ready(address: &str) -> anyhow::Result<StatusCode> {
661658
// wait for the server to start
662-
let mut client_builder = reqwest::Client::builder();
663-
664-
if let Some(tls_data) = client_tls_pem_bundle {
665-
let identity = reqwest::Identity::from_pem(tls_data.as_bytes())?;
666-
client_builder = client_builder.identity(identity)
667-
};
668-
let client = client_builder
669-
.danger_accept_invalid_certs(true)
670-
.build()
671-
.unwrap();
659+
let client = reqwest::Client::builder().build().unwrap();
672660

673-
let url = reqwest::Url::parse(&format!("https://{address}/readiness")).unwrap();
661+
let url = reqwest::Url::parse(&format!("http://{address}/readiness")).unwrap();
674662
let response = client.get(url).send().await?;
675663
Ok(response.status())
676664
}
@@ -704,8 +692,9 @@ async fn test_detect_certificate_rotation() {
704692
});
705693
config.policies = HashMap::new();
706694

707-
let domain_ip = config.addr.ip().to_string();
708-
let domain_port = config.addr.port().to_string();
695+
let host = config.addr.ip().to_string();
696+
let port = config.addr.port().to_string();
697+
let readiness_probe_port = config.readiness_probe_addr.port().to_string();
709698

710699
tokio::spawn(async move {
711700
let api_server = policy_server::PolicyServer::new_from_config(config)
@@ -719,29 +708,22 @@ async fn test_detect_certificate_rotation() {
719708
.with_max_delay(Duration::from_secs(30))
720709
.with_max_times(5);
721710

722-
let client_cert = tls_data_client.cert.clone();
723-
let client_key = tls_data_client.key.clone();
724711
let status_code = (|| async {
725-
policy_server_is_ready(
726-
format!("{domain_ip}:{domain_port}").as_str(),
727-
Some(format!("{client_cert}\n{client_key}")),
728-
)
729-
.await
712+
policy_server_is_ready(format!("{host}:{readiness_probe_port}").as_str()).await
730713
})
731714
.retry(exponential_backoff)
732715
.await
733716
.unwrap();
734717
assert_eq!(status_code, reqwest::StatusCode::OK);
735718

736-
check_tls_san_name(&domain_ip, &domain_port, hostname1)
719+
check_tls_san_name(&host, &port, hostname1)
737720
.await
738721
.expect("certificate served doesn't use the expected SAN name");
739722

740723
// Generate a new certificate and key, and switch to them
741724

742725
let hostname2 = "cert2.example.com";
743726
let tls_data2 = create_cert(hostname2);
744-
let client_ca2 = create_cert(hostname2);
745727

746728
// write only the cert file
747729
std::fs::write(&cert_file, tls_data2.cert).unwrap();
@@ -750,7 +732,7 @@ async fn test_detect_certificate_rotation() {
750732
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
751733

752734
// the old certificate should still be in use, since we didn't change also the key
753-
check_tls_san_name(&domain_ip, &domain_port, hostname1)
735+
check_tls_san_name(&host, &port, hostname1)
754736
.await
755737
.expect("certificate should not have been changed");
756738

@@ -760,32 +742,9 @@ async fn test_detect_certificate_rotation() {
760742
// give inotify some time to ensure it detected the cert change,
761743
// also give axum some time to complete the certificate reload
762744
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
763-
check_tls_san_name(&domain_ip, &domain_port, hostname2)
745+
check_tls_san_name(&host, &port, hostname2)
764746
.await
765747
.expect("certificate hasn't been reloaded");
766-
767-
// Let test if the server is reloading client certificate
768-
std::fs::write(&client_ca, client_ca2.cert.clone()).unwrap();
769-
770-
// give inotify some time to ensure it detected the cert change
771-
tokio::time::sleep(std::time::Duration::from_secs(4)).await;
772-
773-
assert!(policy_server_is_ready(
774-
format!("{domain_ip}:{domain_port}").as_str(),
775-
Some(format!("{client_cert}\n{client_key}")),
776-
)
777-
.await
778-
.is_err());
779-
780-
let client_cert = client_ca2.cert.clone();
781-
let client_key = client_ca2.key.clone();
782-
let status_code = policy_server_is_ready(
783-
format!("{domain_ip}:{domain_port}").as_str(),
784-
Some(format!("{client_cert}\n{client_key}")),
785-
)
786-
.await
787-
.unwrap();
788-
assert_eq!(status_code, reqwest::StatusCode::OK);
789748
}
790749

791750
#[tokio::test]

0 commit comments

Comments
 (0)