Skip to content

Commit 56dff8e

Browse files
committedJan 25, 2021
WIP: support running different policies
Initial iteration, everything seems to be working, but there's still some plumbing to do (settings, cli flags,...)
1 parent e32e6c2 commit 56dff8e

File tree

2 files changed

+125
-31
lines changed

2 files changed

+125
-31
lines changed
 

‎src/main.rs

+35-31
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@ use hyper::service::{make_service_fn, service_fn};
33
use hyper::Server;
44
use std::{net::SocketAddr, thread};
55
use std::process;
6-
use tokio::{runtime::Runtime, sync::mpsc};
6+
use tokio::{runtime::Runtime, sync::mpsc::channel};
77

88
mod admission_review;
99
mod api;
1010
mod wasm;
1111
mod wasm_fetcher;
12+
mod worker;
1213

13-
#[tokio::main]
14-
async fn main() {
14+
use crate::wasm::EvalRequest;
15+
16+
fn main() {
1517
let matches = App::new("policy-server")
1618
.version("0.0.1")
1719
.about("Kubernetes admission controller powered by Chimera WASM policies")
@@ -84,6 +86,13 @@ async fn main() {
8486
}
8587
};
8688

89+
let rt = match Runtime::new() {
90+
Ok(r) => { r },
91+
Err(error) => {
92+
return fatal_error(format!("Error initializing tokio runtime: {}", error));
93+
}
94+
};
95+
8796
let fetcher = match wasm_fetcher::parse_wasm_url(
8897
matches.value_of("wasm-uri").unwrap(),
8998
matches.is_present("wasm-remote-insecure"),
@@ -94,42 +103,37 @@ async fn main() {
94103
return fatal_error(format!("Error parsing arguments: {}", error));
95104
}
96105
};
97-
let wasm_path = match fetcher.fetch().await {
98-
Ok(p) => { p },
99-
Err(error) => {
100-
return fatal_error(format!("Error fetching WASM module: {}", error));
101-
}
106+
let wasm_path = match rt.block_on(async { fetcher.fetch().await }) {
107+
Ok(p) =>p,
108+
Err(error) => { return fatal_error(format!("Error fetching WASM module: {}", error));}
102109
};
103110

104-
let (tx, mut rx) = mpsc::channel::<wasm::EvalRequest>(32);
111+
let (api_tx, api_rx) = channel::<EvalRequest>(32);
112+
113+
let mut wasm_modules = Vec::<String>::new();
114+
wasm_modules.push(wasm_path);
105115

106-
let rt = Runtime::new().unwrap();
107116
let wasm_thread = thread::spawn(move || {
108-
let mut policy_evaluator = match wasm::PolicyEvaluator::new(&wasm_path) {
109-
Ok(e) => { e },
110-
Err(error) => {
111-
return fatal_error(format!("Error initializing policy evaluator for {}: {}", wasm_path, error));
112-
}
113-
};
114-
rt.block_on(async move {
115-
while let Some(req) = rx.recv().await {
116-
let resp = policy_evaluator.validate(req.req);
117-
let _ = req.resp_chan.send(resp);
118-
}
119-
});
120-
});
117+
let worker_pool = worker::WorkerPool::new(3, wasm_modules.clone(), api_rx).unwrap();
121118

122-
let make_svc = make_service_fn(|_conn| {
123-
let svc_tx = tx.clone();
124-
async move { Ok::<_, hyper::Error>(service_fn(move |req| api::route(req, svc_tx.clone()))) }
119+
worker_pool.run();
125120
});
126121

127-
let server = Server::bind(&addr).serve(make_svc);
128-
println!("Started server on {}", addr);
122+
rt.block_on( async {
123+
let make_svc = make_service_fn(|_conn| {
124+
let svc_tx = api_tx.clone();
125+
async move {
126+
Ok::<_, hyper::Error>(service_fn(move |req| api::route(req, svc_tx.clone()))) }
127+
});
128+
129+
let server = Server::bind(&addr).serve(make_svc);
130+
println!("Started server on {}", addr);
131+
132+
if let Err(e) = server.await {
133+
eprintln!("server error: {}", e);
134+
}
135+
});
129136

130-
if let Err(e) = server.await {
131-
eprintln!("server error: {}", e);
132-
}
133137
wasm_thread.join().unwrap();
134138
}
135139

‎src/worker.rs

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
use crate::wasm::{EvalRequest, PolicyEvaluator};
2+
use anyhow::Result;
3+
use std::{collections::HashMap, thread, vec::Vec};
4+
use tokio::sync::mpsc::{channel, Receiver, Sender};
5+
6+
pub(crate) struct Worker {
7+
evaluators: HashMap<String, PolicyEvaluator>,
8+
channel_rx: Receiver<EvalRequest>,
9+
}
10+
11+
impl Worker {
12+
pub(crate) fn new(rx: Receiver<EvalRequest>, wasm_modules: Vec<String>) -> Result<Worker> {
13+
let mut evs: HashMap<String, PolicyEvaluator> = HashMap::new();
14+
15+
for w in wasm_modules {
16+
let policy_evaluator = PolicyEvaluator::new(&w)?;
17+
//TODO: no hard coded value
18+
evs.insert("validate".into(), policy_evaluator);
19+
}
20+
21+
Ok(Worker {
22+
evaluators: evs,
23+
channel_rx: rx,
24+
})
25+
}
26+
27+
pub(crate) fn run(mut self) {
28+
while let Some(req) = self.channel_rx.blocking_recv() {
29+
//TODO: handle error
30+
let policy_id = "validate".to_string();
31+
match self.evaluators.get_mut(&policy_id) {
32+
Some(policy_evaluator) => {
33+
let resp = policy_evaluator.validate(req.req);
34+
let _ = req.resp_chan.send(resp);
35+
}
36+
None => continue,
37+
}
38+
}
39+
}
40+
}
41+
42+
pub(crate) struct WorkerPool {
43+
pool_size: usize,
44+
worker_tx_chans: Vec<Sender<EvalRequest>>,
45+
api_rx: Receiver<EvalRequest>,
46+
}
47+
48+
impl WorkerPool {
49+
pub(crate) fn new(
50+
size: usize,
51+
wasm_modules: Vec<String>,
52+
rx: Receiver<EvalRequest>,
53+
) -> Result<WorkerPool> {
54+
let mut tx_chans = Vec::<Sender<EvalRequest>>::new();
55+
56+
for n in 1..=size {
57+
let (tx, rx) = channel::<EvalRequest>(32);
58+
tx_chans.push(tx);
59+
let wasm_modules = wasm_modules.clone();
60+
61+
thread::spawn(move || {
62+
let worker = Worker::new(rx, wasm_modules).unwrap();
63+
64+
//TODO: better logging
65+
println!("worker {} loop start", n);
66+
worker.run();
67+
println!("worker {} loop exit", n);
68+
});
69+
}
70+
71+
Ok(WorkerPool {
72+
pool_size: size,
73+
worker_tx_chans: tx_chans,
74+
api_rx: rx,
75+
})
76+
}
77+
78+
pub(crate) fn run(mut self) {
79+
let mut next_worker_id = 0;
80+
while let Some(req) = self.api_rx.blocking_recv() {
81+
let _ = self.worker_tx_chans[next_worker_id].blocking_send(req);
82+
next_worker_id += 1;
83+
if next_worker_id >= self.pool_size {
84+
next_worker_id = 0;
85+
}
86+
}
87+
88+
//TODO: should we also `join` the children threads here?
89+
}
90+
}

0 commit comments

Comments
 (0)