From e4f9bcb5775a8cbbc848aedea3ad49aa60dd1dae Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 11 Jan 2024 22:58:23 +1000 Subject: [PATCH] process: use pidfd on Linux when available (#6152) Signed-off-by: Jiahao XU --- tokio/src/io/poll_evented.rs | 19 ++ tokio/src/process/unix/mod.rs | 57 +++- tokio/src/process/unix/pidfd_reaper.rs | 317 +++++++++++++++++++++++ tokio/tests/process_change_of_runtime.rs | 34 +++ 4 files changed, 414 insertions(+), 13 deletions(-) create mode 100644 tokio/src/process/unix/pidfd_reaper.rs create mode 100644 tokio/tests/process_change_of_runtime.rs diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index cb5bffd54a9..67beb5b1551 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -136,6 +136,25 @@ impl PollEvented { self.registration.deregister(&mut inner)?; Ok(inner) } + + #[cfg(all(feature = "process", target_os = "linux"))] + pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.registration + .poll_read_ready(cx) + .map_err(io::Error::from) + .map_ok(|_| ()) + } + + /// Re-register under new runtime with `interest`. + #[cfg(all(feature = "process", target_os = "linux"))] + pub(crate) fn reregister(&mut self, interest: Interest) -> io::Result<()> { + let io = self.io.as_mut().unwrap(); // As io shouldn't ever be None, just unwrap here. + let _ = self.registration.deregister(io); + self.registration = + Registration::new_with_interest_and_handle(io, interest, scheduler::Handle::current())?; + + Ok(()) + } } feature! { diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index 5b55b7a52f7..c9d1035f53d 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -27,6 +27,9 @@ use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; mod reap; use reap::Reaper; +#[cfg(all(target_os = "linux", feature = "rt"))] +mod pidfd_reaper; + use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; use crate::process::kill::Kill; use crate::process::SpawnedChild; @@ -100,15 +103,15 @@ impl OrphanQueue for GlobalOrphanQueue { } #[must_use = "futures do nothing unless polled"] -pub(crate) struct Child { - inner: Reaper, +pub(crate) enum Child { + SignalReaper(Reaper), + #[cfg(all(target_os = "linux", feature = "rt"))] + PidfdReaper(pidfd_reaper::PidfdReaper), } impl fmt::Debug for Child { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Child") - .field("pid", &self.inner.id()) - .finish() + fmt.debug_struct("Child").field("pid", &self.id()).finish() } } @@ -118,12 +121,24 @@ pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result { + return Ok(SpawnedChild { + child: Child::PidfdReaper(pidfd_reaper), + stdin, + stdout, + stderr, + }) + } + Err((Some(err), _child)) => return Err(err), + Err((None, child_returned)) => child = child_returned, + } + let signal = signal(SignalKind::child())?; Ok(SpawnedChild { - child: Child { - inner: Reaper::new(child, GlobalOrphanQueue, signal), - }, + child: Child::SignalReaper(Reaper::new(child, GlobalOrphanQueue, signal)), stdin, stdout, stderr, @@ -132,25 +147,41 @@ pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result u32 { - self.inner.id() + match self { + Self::SignalReaper(signal_reaper) => signal_reaper.id(), + #[cfg(all(target_os = "linux", feature = "rt"))] + Self::PidfdReaper(pidfd_reaper) => pidfd_reaper.id(), + } + } + + fn std_child(&mut self) -> &mut StdChild { + match self { + Self::SignalReaper(signal_reaper) => signal_reaper.inner_mut(), + #[cfg(all(target_os = "linux", feature = "rt"))] + Self::PidfdReaper(pidfd_reaper) => pidfd_reaper.inner_mut(), + } } pub(crate) fn try_wait(&mut self) -> io::Result> { - self.inner.inner_mut().try_wait() + self.std_child().try_wait() } } impl Kill for Child { fn kill(&mut self) -> io::Result<()> { - self.inner.kill() + self.std_child().kill() } } impl Future for Child { type Output = io::Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.inner).poll(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::into_inner(self) { + Self::SignalReaper(signal_reaper) => Pin::new(signal_reaper).poll(cx), + #[cfg(all(target_os = "linux", feature = "rt"))] + Self::PidfdReaper(pidfd_reaper) => Pin::new(pidfd_reaper).poll(cx), + } } } diff --git a/tokio/src/process/unix/pidfd_reaper.rs b/tokio/src/process/unix/pidfd_reaper.rs new file mode 100644 index 00000000000..45d23471f84 --- /dev/null +++ b/tokio/src/process/unix/pidfd_reaper.rs @@ -0,0 +1,317 @@ +use crate::{ + io::{interest::Interest, PollEvented}, + process::{ + imp::{orphan::Wait, OrphanQueue}, + kill::Kill, + }, + util::error::RUNTIME_SHUTTING_DOWN_ERROR, +}; + +use libc::{syscall, SYS_pidfd_open, ENOSYS, PIDFD_NONBLOCK}; +use mio::{event::Source, unix::SourceFd}; +use std::{ + fs::File, + future::Future, + io, + marker::Unpin, + ops::Deref, + os::unix::io::{AsRawFd, FromRawFd, RawFd}, + pin::Pin, + process::ExitStatus, + sync::atomic::{AtomicBool, Ordering::Relaxed}, + task::{Context, Poll}, +}; + +#[derive(Debug)] +struct Pidfd { + fd: File, +} + +impl Pidfd { + fn open(pid: u32) -> Option { + // Store false (0) to reduce executable size + static NO_PIDFD_SUPPORT: AtomicBool = AtomicBool::new(false); + + if NO_PIDFD_SUPPORT.load(Relaxed) { + return None; + } + + // Safety: The following function calls invovkes syscall pidfd_open, + // which takes two parameter: pidfd_open(fd: c_int, flag: c_int) + let fd = unsafe { syscall(SYS_pidfd_open, pid, PIDFD_NONBLOCK) }; + if fd == -1 { + let errno = io::Error::last_os_error().raw_os_error().unwrap(); + + if errno == ENOSYS { + NO_PIDFD_SUPPORT.store(true, Relaxed) + } + + None + } else { + // Safety: pidfd_open returns -1 on error or a valid fd with ownership. + Some(Pidfd { + fd: unsafe { File::from_raw_fd(fd as i32) }, + }) + } + } +} + +impl AsRawFd for Pidfd { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +} + +impl Source for Pidfd { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).register(registry, token, interest) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).reregister(registry, token, interest) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).deregister(registry) + } +} + +#[derive(Debug)] +struct PidfdReaperInner +where + W: Unpin, +{ + inner: W, + pidfd: PollEvented, +} + +#[allow(deprecated)] +fn is_rt_shutdown_err(err: &io::Error) -> bool { + if let Some(inner) = err.get_ref() { + // Using `Error::description()` is more efficient than `format!("{inner}")`, + // so we use it here even if it is deprecated. + err.kind() == io::ErrorKind::Other + && inner.source().is_none() + && inner.description() == RUNTIME_SHUTTING_DOWN_ERROR + } else { + false + } +} + +impl Future for PidfdReaperInner +where + W: Wait + Unpin, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = Pin::into_inner(self); + + match ready!(this.pidfd.poll_read_ready(cx)) { + Err(err) if is_rt_shutdown_err(&err) => { + this.pidfd.reregister(Interest::READABLE)?; + ready!(this.pidfd.poll_read_ready(cx))? + } + res => res?, + } + Poll::Ready(Ok(this + .inner + .try_wait()? + .expect("pidfd is ready to read, the process should have exited"))) + } +} + +#[derive(Debug)] +pub(crate) struct PidfdReaper +where + W: Wait + Unpin, + Q: OrphanQueue + Unpin, +{ + inner: Option>, + orphan_queue: Q, +} + +impl Deref for PidfdReaper +where + W: Wait + Unpin, + Q: OrphanQueue + Unpin, +{ + type Target = W; + + fn deref(&self) -> &Self::Target { + &self.inner.as_ref().expect("inner has gone away").inner + } +} + +impl PidfdReaper +where + W: Wait + Unpin, + Q: OrphanQueue + Unpin, +{ + pub(crate) fn new(inner: W, orphan_queue: Q) -> Result, W)> { + if let Some(pidfd) = Pidfd::open(inner.id()) { + match PollEvented::new_with_interest(pidfd, Interest::READABLE) { + Ok(pidfd) => Ok(Self { + inner: Some(PidfdReaperInner { pidfd, inner }), + orphan_queue, + }), + Err(io_error) => Err((Some(io_error), inner)), + } + } else { + Err((None, inner)) + } + } + + pub(crate) fn inner_mut(&mut self) -> &mut W { + &mut self.inner.as_mut().expect("inner has gone away").inner + } +} + +impl Future for PidfdReaper +where + W: Wait + Unpin, + Q: OrphanQueue + Unpin, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new( + Pin::into_inner(self) + .inner + .as_mut() + .expect("inner has gone away"), + ) + .poll(cx) + } +} + +impl Kill for PidfdReaper +where + W: Wait + Unpin + Kill, + Q: OrphanQueue + Unpin, +{ + fn kill(&mut self) -> io::Result<()> { + self.inner_mut().kill() + } +} + +impl Drop for PidfdReaper +where + W: Wait + Unpin, + Q: OrphanQueue + Unpin, +{ + fn drop(&mut self) { + let mut orphan = self.inner.take().expect("inner has gone away").inner; + if let Ok(Some(_)) = orphan.try_wait() { + return; + } + + self.orphan_queue.push_orphan(orphan); + } +} + +#[cfg(all(test, not(loom), not(miri)))] +mod test { + use super::*; + use crate::{ + process::unix::orphan::test::MockQueue, + runtime::{Builder as RuntimeBuilder, Runtime}, + }; + use std::process::{Command, Output}; + + fn create_runtime() -> Runtime { + RuntimeBuilder::new_current_thread() + .enable_io() + .build() + .unwrap() + } + + fn run_test(fut: impl Future) { + create_runtime().block_on(fut) + } + + fn is_pidfd_available() -> bool { + let Output { stdout, status, .. } = Command::new("uname").arg("-r").output().unwrap(); + assert!(status.success()); + let stdout = String::from_utf8_lossy(&stdout); + + let mut kernel_version_iter = stdout.split_once('-').unwrap().0.split('.'); + let major: u32 = kernel_version_iter.next().unwrap().parse().unwrap(); + let minor: u32 = kernel_version_iter.next().unwrap().parse().unwrap(); + + major >= 6 || (major == 5 && minor >= 10) + } + + #[test] + fn test_pidfd_reaper_poll() { + if !is_pidfd_available() { + eprintln!("pidfd is not available on this linux kernel, skip this test"); + return; + } + + let queue = MockQueue::new(); + + run_test(async { + let child = Command::new("true").spawn().unwrap(); + let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap(); + + let exit_status = pidfd_reaper.await.unwrap(); + assert!(exit_status.success()); + }); + + assert!(queue.all_enqueued.borrow().is_empty()); + } + + #[test] + fn test_pidfd_reaper_kill() { + if !is_pidfd_available() { + eprintln!("pidfd is not available on this linux kernel, skip this test"); + return; + } + + let queue = MockQueue::new(); + + run_test(async { + let child = Command::new("sleep").arg("1800").spawn().unwrap(); + let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap(); + + pidfd_reaper.kill().unwrap(); + + let exit_status = pidfd_reaper.await.unwrap(); + assert!(!exit_status.success()); + }); + + assert!(queue.all_enqueued.borrow().is_empty()); + } + + #[test] + fn test_pidfd_reaper_drop() { + if !is_pidfd_available() { + eprintln!("pidfd is not available on this linux kernel, skip this test"); + return; + } + + let queue = MockQueue::new(); + + let mut child = Command::new("sleep").arg("1800").spawn().unwrap(); + + run_test(async { + let _pidfd_reaper = PidfdReaper::new(&mut child, &queue).unwrap(); + }); + + assert_eq!(queue.all_enqueued.borrow().len(), 1); + + child.kill().unwrap(); + child.wait().unwrap(); + } +} diff --git a/tokio/tests/process_change_of_runtime.rs b/tokio/tests/process_change_of_runtime.rs new file mode 100644 index 00000000000..94efe35b146 --- /dev/null +++ b/tokio/tests/process_change_of_runtime.rs @@ -0,0 +1,34 @@ +#![cfg(feature = "process")] +#![warn(rust_2018_idioms)] +// This tests test the behavior of `process::Command::spawn` when it is used +// outside runtime, and when `process::Child::wait ` is used in a different +// runtime from which `process::Command::spawn` is used. +#![cfg(all(unix, not(target_os = "freebsd")))] + +use std::process::Stdio; +use tokio::{process::Command, runtime::Runtime}; + +#[test] +fn process_spawned_and_wait_in_different_runtime() { + let mut child = Runtime::new().unwrap().block_on(async { + Command::new("true") + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .spawn() + .unwrap() + }); + Runtime::new().unwrap().block_on(async { + let _ = child.wait().await.unwrap(); + }); +} + +#[test] +#[should_panic( + expected = "there is no reactor running, must be called from the context of a Tokio 1.x runtime" +)] +fn process_spawned_outside_runtime() { + let _ = Command::new("true") + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .spawn(); +}