Skip to content

Commit

Permalink
server: add child reaper
Browse files Browse the repository at this point in the history
to be able to catch children exiting

Signed-off-by: Peter Hunt <pehunt@redhat.com>
  • Loading branch information
haircommander committed Dec 3, 2021
1 parent ac8f527 commit b544347
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 5 deletions.
114 changes: 114 additions & 0 deletions conmon-rs/server/src/child_reaper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! Child process reaping and management.
use anyhow::{bail, Result};
use getset::Getters;
use log::{debug, error};
use nix::sys::signal::{kill, Signal};
use nix::sys::wait::{waitpid, WaitStatus};
use nix::unistd::Pid;
use std::path::PathBuf;
use std::thread;
use std::{collections::HashMap, fs::File, io::Write, sync::Arc, sync::Mutex};

impl ChildReaper {
pub fn init_reaper(&self) -> Result<()> {
let children = Arc::clone(&self.children);
thread::spawn(move || {
loop {
let children = Arc::clone(&children);
match waitpid(Pid::from_raw(-1), None) {
Ok(status) => {
if let WaitStatus::Exited(child_pid, exit_status) = status {
// Immediately spawn a thread to reduce risk of dropping
// a SIGCHLD.
thread::spawn(move || {
if let Err(e) = Self::reap_child(children, child_pid, exit_status) {
error!("Failed to reap child for pid {}: {}", child_pid, e);
}
});
}
}

Err(err) => {
// TODO FIXME this busy loops right now while there are no children.
// There should be a broadcast mechanism so we only run this loop
// while there are children
if err != nix::errno::Errno::ECHILD {
error!("caught error in reading for sigchld {}", err);
}
}
}
}
});
Ok(())
}

fn reap_child(
locked_map: Arc<Mutex<HashMap<i32, ReapableChild>>>,
child_pid: Pid,
exit_status: i32,
) -> Result<()> {
debug!("caught signal for pid {}", child_pid);

let mut map = locked_map.lock().unwrap();
let child = match map.remove(&(i32::from(child_pid))) {
Some(c) => c,
None => {
// If we have an unregistered PID, then there's nothing to do.
return Ok(());
}
};

debug!(
"PID {} associated with container {} exited with {}",
child_pid, child.id, exit_status
);
if let Err(e) = write_to_exit_paths(exit_status, child.exit_paths) {
error!(
"failed to write to exit paths process for id {} :{}",
child.id, e
);
}
Ok(())
}

pub fn add_child(&self, id: String, pid: i32, exit_paths: Vec<PathBuf>) -> Result<()> {
let locked_children = Arc::clone(&self.children);
let mut map = locked_children.lock().unwrap();

let reapable_child = ReapableChild { id, exit_paths };
if let Some(old) = map.insert(pid, reapable_child) {
bail!("Repeat PID for container {} found", old.id);
}
Ok(())
}
pub fn kill_children(&self, s: Signal) -> Result<()> {
for (pid, kc) in Arc::clone(&self.children).lock().unwrap().iter() {
debug!("killing pid {} for container {}", pid, kc.id);
kill(Pid::from_raw(*pid), s)?;
}
Ok(())
}
}

#[derive(Debug, Default)]
pub struct ChildReaper {
children: Arc<Mutex<HashMap<i32, ReapableChild>>>,
}

#[derive(Debug, Getters)]
pub struct ReapableChild {
#[getset(get)]
id: String,
#[getset(get)]
exit_paths: Vec<PathBuf>,
}

fn write_to_exit_paths(code: i32, paths: Vec<PathBuf>) -> Result<()> {
let code_str = format!("{}", code);
for path in paths {
debug!("writing exit code {} to {}", code, path.display());
File::create(path)?.write_all(code_str.as_bytes())?;
}
Ok(())
}
27 changes: 24 additions & 3 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use getset::{Getters, MutGetters};
use log::{debug, info};
use nix::{
libc::_exit,
sys::signal::Signal,
unistd::{fork, ForkResult},
};
use std::{fs::File, io::Write, path::Path};
use std::{fs::File, io::Write, path::Path, sync::Arc};
use tokio::{
fs,
net::UnixListener,
Expand All @@ -24,6 +25,7 @@ use tokio::{
use tokio_util::compat::TokioAsyncReadCompatExt;
use twoparty::VatNetwork;

mod child_reaper;
mod config;
mod console;
mod cri_logger;
Expand All @@ -35,6 +37,9 @@ pub struct Server {
#[doc = "The main conmon configuration."]
#[getset(get, get_mut)]
config: config::Config,

#[getset(get, get_mut)]
reaper: Arc<child_reaper::ChildReaper>,
}

impl Server {
Expand Down Expand Up @@ -97,7 +102,12 @@ impl Server {
async fn spawn_tasks(self) -> Result<()> {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let socket = self.config().socket().to_path_buf();
tokio::spawn(Self::start_sigterm_handler(socket, shutdown_tx));
tokio::spawn(Self::start_signal_handler(
Arc::clone(self.reaper()),
socket,
shutdown_tx,
));
Arc::clone(self.reaper()).init_reaper()?;

task::spawn_blocking(move || {
let rt = runtime::Handle::current();
Expand All @@ -110,27 +120,38 @@ impl Server {
.await?
}

async fn start_sigterm_handler<T: AsRef<Path>>(
async fn start_signal_handler<T: AsRef<Path>>(
reaper: Arc<child_reaper::ChildReaper>,
socket: T,
shutdown_tx: oneshot::Sender<()>,
) -> Result<()> {
let mut sigterm = signal(SignalKind::terminate())?;
let mut sigint = signal(SignalKind::interrupt())?;
let handled_sig: Signal;

tokio::select! {
_ = sigterm.recv() => {
info!("Received SIGTERM");
handled_sig = Signal::SIGTERM;
}
_ = sigint.recv() => {
info!("Received SIGINT");
handled_sig = Signal::SIGINT;
}
};

let _ = shutdown_tx.send(());

// TODO FIXME Ideally we would drop after socket file is removed,
// but the removal is taking longer than 10 seconds, indicating someone
// is keeping it open...
reaper.kill_children(handled_sig)?;

debug!("Removing socket file {}", socket.as_ref().display());
fs::remove_file(socket)
.await
.context("remove existing socket file")?;

Ok(())
}

Expand Down
18 changes: 16 additions & 2 deletions conmon-rs/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl conmon::Server for Server {
.spawn());

let id = pry!(req.get_id()).to_string();

let status = pry!(child.wait());
debug!("Status for container ID {} is {}", id, status);
if let Some(console) = maybe_console {
Expand All @@ -61,9 +62,13 @@ impl conmon::Server for Server {
.context("wait for console socket connection"));
}

let pid = pry_err!(pry!(fs::read_to_string(pidfile)).parse::<u32>());
let pid = pry_err!(pry!(fs::read_to_string(pidfile)).parse::<i32>());
let exit_paths = pry!(path_vec_from_text_list(pry!(req.get_exit_paths())));

pry_err!(self.reaper().add_child(id, pid, exit_paths));

results.get().init_response().set_container_pid(pid);
// TODO FIXME why convert?
results.get().init_response().set_container_pid(pid as u32);
Promise::ok(())
}
}
Expand All @@ -75,3 +80,12 @@ fn pidfile_from_params(params: &conmon::CreateContainerParams) -> capnp::Result<
debug!("pidfile is {}", pidfile_pathbuf.display());
Ok(pidfile_pathbuf)
}

fn path_vec_from_text_list(tl: capnp::text_list::Reader) -> Result<Vec<PathBuf>, capnp::Error> {
let mut v: Vec<PathBuf> = vec![];
for t in tl {
let t_str = t?.to_string();
v.push(PathBuf::from(t_str));
}
Ok(v)
}

0 comments on commit b544347

Please sign in to comment.