Skip to content

Commit dd02489

Browse files
authoredJul 17, 2023
Merge pull request #496 from flavio/mem-reduction
fix(perf): reduce memory usage
2 parents 9fe6473 + ef7724b commit dd02489

File tree

4 files changed

+114
-4
lines changed

4 files changed

+114
-4
lines changed
 

‎Cargo.lock

+56
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ edition = "2021"
1414
anyhow = "1.0"
1515
clap = { version = "4.2", features = [ "cargo", "env" ] }
1616
daemonize = "0.5"
17+
humansize = "2.1"
1718
itertools = "0.11.0"
1819
k8s-openapi = { version = "0.18.0", default-features = false, features = ["v1_26"] }
1920
lazy_static = "1.4.0"
2021
num_cpus = "1.16.0"
2122
opentelemetry-otlp = { version = "0.10.0", features = ["metrics", "tonic"] }
2223
opentelemetry = { version = "0.17", default-features = false, features = ["metrics", "trace", "rt-tokio", "serialize"] }
24+
procfs = "0.15"
2325
policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.11.0" }
2426
rayon = "1.6"
2527
serde_json = "1.0"

‎src/main.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use policy_evaluator::policy_fetcher::verify::FulcioAndRekorData;
99
use policy_evaluator::{callback_handler::CallbackHandlerBuilder, kube};
1010
use std::{fs, path::PathBuf, process, sync::RwLock, thread};
1111
use tokio::{runtime::Runtime, sync::mpsc, sync::oneshot};
12-
use tracing::{debug, error, info};
12+
use tracing::{debug, error, info, warn};
1313

1414
mod admission_review;
1515
mod api;
@@ -335,6 +335,7 @@ fn main() -> Result<()> {
335335
}
336336
}
337337
info!(status = "done", "worker pool bootstrap");
338+
memory_usage(pool_size);
338339

339340
// All is good, we can start listening for incoming requests through the
340341
// web server
@@ -367,6 +368,37 @@ fn main() -> Result<()> {
367368
Ok(())
368369
}
369370

371+
fn memory_usage(pool_size: usize) {
372+
let process = match procfs::process::Process::myself() {
373+
Ok(p) => p,
374+
Err(e) => {
375+
warn!(error =? e, "cannot access process stats");
376+
return;
377+
}
378+
};
379+
let mem_stats = match process.statm() {
380+
Ok(s) => s,
381+
Err(e) => {
382+
warn!(error =? e, "cannot access process memory stats");
383+
return;
384+
}
385+
};
386+
387+
let formatter = humansize::make_format(humansize::DECIMAL);
388+
389+
let vm_size = mem_stats.size * procfs::page_size();
390+
let vm_rss = mem_stats.resident * procfs::page_size();
391+
392+
debug!(
393+
VmSize = formatter(vm_size),
394+
VmSizeBytes = vm_size,
395+
VmRSS = formatter(vm_rss),
396+
VmRSSBytes = vm_rss,
397+
pool_size,
398+
"memory usage"
399+
);
400+
}
401+
370402
fn fatal_error(msg: String) {
371403
let trace_system_ready = TRACE_SYSTEM_INITIALIZED.read().unwrap();
372404
if *trace_system_ready {

‎src/worker_pool.rs

+23-3
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,11 @@ impl WorkerPool {
163163
}
164164
};
165165

166-
let precompiled_policies =
166+
// Use a reference counter to share access to precompiled policies
167+
// between workers. This reduces memory usage
168+
let precompiled_policies: Arc<PrecompiledPolicies> =
167169
match precompile_policies(&engine, &bootstrap_data.fetched_policies) {
168-
Ok(pp) => pp,
170+
Ok(pp) => Arc::new(pp),
169171
Err(e) => {
170172
eprintln!("{e}");
171173
std::process::exit(1);
@@ -202,10 +204,16 @@ impl WorkerPool {
202204
warn!("policy timeout protection is disabled");
203205
}
204206

207+
// Use a reference counter to share access to policies
208+
// between workers. This reduces memory usage
209+
let policies = Arc::new(bootstrap_data.policies);
210+
205211
for n in 1..=pool_size {
206212
let (tx, rx) = mpsc::channel::<EvalRequest>(32);
207213
worker_tx_chans.push(tx);
208214

215+
// Each worker has its own wasmtime::Engine, sharing the
216+
// same engine across all the workers leads to bad performance
209217
let engine = match wasmtime::Engine::new(&wasmtime_config) {
210218
Ok(e) => e,
211219
Err(e) => {
@@ -225,16 +233,17 @@ impl WorkerPool {
225233
};
226234
worker_engines.push(engine.clone());
227235

228-
let policies = bootstrap_data.policies.clone();
229236
let modules = precompiled_policies.clone();
230237
let b = barrier.clone();
231238
let canary = boot_canary.clone();
232239
let callback_handler_tx = self.callback_handler_tx.clone();
233240
let always_accept_admission_reviews_on_namespace =
234241
self.always_accept_admission_reviews_on_namespace.clone();
242+
let policies = policies.clone();
235243

236244
let join = thread::spawn(move || -> Result<()> {
237245
info!(spawned = n, total = pool_size, "spawning worker");
246+
238247
let mut worker = match Worker::new(
239248
rx,
240249
&policies,
@@ -252,6 +261,10 @@ impl WorkerPool {
252261
return Err(anyhow!("Worker {} couldn't start: {}", n, e));
253262
}
254263
};
264+
// Drop the Arc references ASAP, they are no longer needed
265+
// at this point
266+
drop(policies);
267+
drop(modules);
255268
b.wait();
256269

257270
debug!(id = n, "worker loop start");
@@ -262,6 +275,13 @@ impl WorkerPool {
262275
});
263276
join_handles.push(join);
264277
}
278+
279+
// Deallocate all the memory used by the precompiled policies since
280+
// they are no longer needed. Without this explicit cleanup
281+
// the reference would be dropped right before Policy Server exits,
282+
// meaning a lot of memory would have been consumed without a valid reason
283+
// during the whole execution time
284+
drop(precompiled_policies);
265285
barrier.wait();
266286

267287
if !boot_canary.load(Ordering::SeqCst) {

0 commit comments

Comments
 (0)
Please sign in to comment.