From 0284d1b5c8ea5aff5b30c254200fb0a46c21d67c Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Wed, 5 Mar 2025 01:07:18 +0330 Subject: [PATCH] macros: make `select!` budget-aware (#7164) --- tokio/src/macros/select.rs | 4 ++++ tokio/src/macros/support.rs | 18 +++++++++++++++++- tokio/src/task/coop/mod.rs | 23 +++++++++++++++++++---- tokio/tests/macros_select.rs | 24 ++++++++++++++++++++++++ 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index 10a71aa7b10..18cc44b988a 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -660,6 +660,10 @@ doc! {macro_rules! select { let mut futures = &mut futures; $crate::macros::support::poll_fn(|cx| { + // Return `Pending` when the task budget is depleted since budget-aware futures + // are going to yield anyway and other futures will not cooperate. + ::std::task::ready!($crate::macros::support::poll_budget_available(cx)); + // Track if any branch returns pending. If no branch completes // **or** returns pending, this implies that all branches are // disabled. diff --git a/tokio/src/macros/support.rs b/tokio/src/macros/support.rs index 8588f75c323..ff3ccb4068f 100644 --- a/tokio/src/macros/support.rs +++ b/tokio/src/macros/support.rs @@ -7,8 +7,24 @@ cfg_macros! { pub fn thread_rng_n(n: u32) -> u32 { crate::runtime::context::thread_rng_n(n) } + + cfg_coop! { + #[doc(hidden)] + #[inline] + pub fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> { + crate::task::coop::poll_budget_available(cx) + } + } + + cfg_not_coop! { + #[doc(hidden)] + #[inline] + pub fn poll_budget_available(_: &mut Context<'_>) -> Poll<()> { + Poll::Ready(()) + } + } } pub use std::future::{Future, IntoFuture}; pub use std::pin::Pin; -pub use std::task::Poll; +pub use std::task::{Context, Poll}; diff --git a/tokio/src/task/coop/mod.rs b/tokio/src/task/coop/mod.rs index f05f02050fd..58f0c848d82 100644 --- a/tokio/src/task/coop/mod.rs +++ b/tokio/src/task/coop/mod.rs @@ -305,12 +305,27 @@ cfg_coop! { Poll::Ready(restore) } else { - defer(cx); + register_waker(cx); Poll::Pending } }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained())))) } + /// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise. + /// + /// Note that in contrast to `poll_proceed`, this method does not consume any budget and is used when + /// polling for budget availability. + #[inline] + pub(crate) fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> { + if has_budget_remaining() { + Poll::Ready(()) + } else { + register_waker(cx); + + Poll::Pending + } + } + cfg_rt! { cfg_unstable_metrics! { #[inline(always)] @@ -326,7 +341,7 @@ cfg_coop! { fn inc_budget_forced_yield_count() {} } - fn defer(cx: &mut Context<'_>) { + fn register_waker(cx: &mut Context<'_>) { context::defer(cx.waker()); } } @@ -335,8 +350,8 @@ cfg_coop! { #[inline(always)] fn inc_budget_forced_yield_count() {} - fn defer(cx: &mut Context<'_>) { - cx.waker().wake_by_ref(); + fn register_waker(cx: &mut Context<'_>) { + cx.waker().wake_by_ref() } } diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 0c5ae6d9ab0..704e61337d2 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -716,3 +716,27 @@ async fn temporary_lifetime_extension() { () = &mut std::future::ready(()) => {}, } } + +#[tokio::test] +async fn select_is_budget_aware() { + const BUDGET: usize = 128; + + let task = || { + Box::pin(async move { + tokio::select! { + biased; + + () = tokio::task::coop::consume_budget() => {}, + () = std::future::ready(()) => {} + } + }) + }; + + for _ in 0..BUDGET { + let poll = futures::poll!(&mut task()); + assert!(poll.is_ready()); + } + + let poll = futures::poll!(&mut task()); + assert!(poll.is_pending()); +}