From 5fc07d205a95f82ae72cf203f3e997103cd18786 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Tue, 28 Nov 2023 00:35:09 +1100 Subject: [PATCH] Fix `PidfdReaper::drop`: do not spawn any task there To avoid potential headaches such as: - The drop being run outside of a Tokio runtime context - The runtime being present but not actually running Since this is the initial implementation, I chose to use the `OrphanQueue` also used by the signal-driven `Reaper` impl. It's simple and easy to implement without having to add runtime driver and guaranteed to work. Further improvement can be done on the basis of this PR. Signed-off-by: Jiahao XU --- tokio/src/io/poll_evented.rs | 5 -- tokio/src/process/unix/mod.rs | 4 +- tokio/src/process/unix/pidfd_reaper.rs | 89 +++++++++++++++++--------- tokio/src/runtime/io/registration.rs | 5 -- 4 files changed, 60 insertions(+), 43 deletions(-) diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index bce3cd85f3b..e8caad60371 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -144,11 +144,6 @@ impl PollEvented { .map_err(io::Error::from) .map_ok(|_| ()) } - - #[cfg(all(feature = "process", target_os = "linux"))] - pub(crate) fn scheduler_handle(&self) -> &scheduler::Handle { - self.registration.scheduler_handle() - } } feature! { diff --git a/tokio/src/process/unix/mod.rs b/tokio/src/process/unix/mod.rs index 5a88ba8303c..e04c5373bfc 100644 --- a/tokio/src/process/unix/mod.rs +++ b/tokio/src/process/unix/mod.rs @@ -106,7 +106,7 @@ impl OrphanQueue for GlobalOrphanQueue { pub(crate) enum Child { SignalReaper(Reaper), #[cfg(target_os = "linux")] - PidfdReaper(pidfd_reaper::PidfdReaper), + PidfdReaper(pidfd_reaper::PidfdReaper), } impl fmt::Debug for Child { @@ -122,7 +122,7 @@ pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result { return Ok(SpawnedChild { child: Child::PidfdReaper(pidfd_reaper), diff --git a/tokio/src/process/unix/pidfd_reaper.rs b/tokio/src/process/unix/pidfd_reaper.rs index 713b79a8c3f..163b7879d33 100644 --- a/tokio/src/process/unix/pidfd_reaper.rs +++ b/tokio/src/process/unix/pidfd_reaper.rs @@ -1,7 +1,9 @@ use crate::{ io::{interest::Interest, PollEvented}, - process::{imp::orphan::Wait, kill::Kill}, - runtime::Handle, + process::{ + imp::{orphan::Wait, OrphanQueue}, + kill::Kill, + }, }; use libc::{syscall, SYS_pidfd_open, __errno_location, ENOSYS, PIDFD_NONBLOCK}; @@ -109,27 +111,39 @@ where } #[derive(Debug)] -pub(crate) struct PidfdReaper(Option>); +pub(crate) struct PidfdReaper +where + W: Wait + Unpin, + Q: OrphanQueue + Unpin, +{ + inner: Option>, + orphan_queue: Q, +} -impl Deref for PidfdReaper +impl Deref for PidfdReaper where - W: Wait + Send + Sync + Unpin + 'static, + W: Wait + Unpin, + Q: OrphanQueue + Unpin, { type Target = W; fn deref(&self) -> &Self::Target { - &self.0.as_ref().expect("inner has gone away").inner + &self.inner.as_ref().expect("inner has gone away").inner } } -impl PidfdReaper +impl PidfdReaper where - W: Wait + Send + Sync + Unpin + 'static, + W: Wait + Unpin, + Q: OrphanQueue + Unpin, { - pub(crate) fn new(inner: W) -> Result, W)> { + pub(crate) fn new(inner: W, orphan_queue: Q) -> Result, W)> { if let Some(pidfd) = Pidfd::open(inner.id()) { match PollEvented::new_with_interest(pidfd, Interest::READABLE) { - Ok(pidfd) => Ok(Self(Some(PidfdReaperInner { pidfd, inner }))), + Ok(pidfd) => Ok(Self { + inner: Some(PidfdReaperInner { pidfd, inner }), + orphan_queue, + }), Err(io_error) => Err((Some(io_error), inner)), } } else { @@ -138,20 +152,21 @@ where } pub(crate) fn inner_mut(&mut self) -> &mut W { - &mut self.0.as_mut().expect("inner has gone away").inner + &mut self.inner.as_mut().expect("inner has gone away").inner } } -impl Future for PidfdReaper +impl Future for PidfdReaper where - W: Wait + Send + Sync + Unpin + 'static, + W: Wait + Unpin, + Q: OrphanQueue + Unpin, { type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Pin::new( Pin::into_inner(self) - .0 + .inner .as_mut() .expect("inner has gone away"), ) @@ -159,38 +174,38 @@ where } } -impl Kill for PidfdReaper +impl Kill for PidfdReaper where - W: Wait + Send + Sync + Unpin + Kill + 'static, + W: Wait + Unpin + Kill, + Q: OrphanQueue + Unpin, { fn kill(&mut self) -> io::Result<()> { self.inner_mut().kill() } } -impl Drop for PidfdReaper +impl Drop for PidfdReaper where - W: Wait + Send + Sync + Unpin + 'static, + W: Wait + Unpin, + Q: OrphanQueue + Unpin, { fn drop(&mut self) { - let mut reaper_inner = self.0.take().expect("inner has gone away"); - if let Ok(Some(_)) = reaper_inner.inner.try_wait() { + let mut orphan = self.inner.take().expect("inner has gone away").inner; + if let Ok(Some(_)) = orphan.try_wait() { return; } - Handle { - inner: reaper_inner.pidfd.scheduler_handle().clone(), - } - .spawn(async move { - let _ = reaper_inner.await; - }); + self.orphan_queue.push_orphan(orphan); } } #[cfg(all(test, not(loom), not(miri)))] mod test { use super::*; - use crate::runtime::{Builder as RuntimeBuilder, Runtime}; + use crate::{ + process::unix::orphan::test::MockQueue, + runtime::{Builder as RuntimeBuilder, Runtime}, + }; use std::process::{Command, Output}; fn create_runtime() -> Runtime { @@ -223,13 +238,17 @@ mod test { return; } + let queue = MockQueue::new(); + run_test(async { let child = Command::new("true").spawn().unwrap(); - let pidfd_reaper = PidfdReaper::new(child).unwrap(); + let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap(); let exit_status = pidfd_reaper.await.unwrap(); assert!(exit_status.success()); }); + + assert!(queue.all_enqueued.borrow().is_empty()); } #[test] @@ -239,15 +258,19 @@ mod test { return; } + let queue = MockQueue::new(); + run_test(async { let child = Command::new("sleep").arg("1800").spawn().unwrap(); - let mut pidfd_reaper = PidfdReaper::new(child).unwrap(); + let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap(); pidfd_reaper.kill().unwrap(); let exit_status = pidfd_reaper.await.unwrap(); assert!(!exit_status.success()); }); + + assert!(queue.all_enqueued.borrow().is_empty()); } #[test] @@ -257,9 +280,13 @@ mod test { return; } + let queue = MockQueue::new(); + run_test(async { - let child = Command::new("true").spawn().unwrap(); - let _pidfd_reaper = PidfdReaper::new(child).unwrap(); + let child = Command::new("sleep").arg("1800").spawn().unwrap(); + let _pidfd_reaper = PidfdReaper::new(child, &queue).unwrap(); }); + + assert_eq!(queue.all_enqueued.borrow().len(), 1); } } diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index f560aa5f497..759589863eb 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -231,11 +231,6 @@ impl Registration { fn handle(&self) -> &Handle { self.handle.driver().io() } - - #[cfg(all(feature = "process", target_os = "linux"))] - pub(crate) fn scheduler_handle(&self) -> &scheduler::Handle { - &self.handle - } } impl Drop for Registration {