Skip to content

Commit

Permalink
macros: make select! budget-aware (#7164)
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej authored Mar 4, 2025
1 parent 710bc80 commit 0284d1b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 5 deletions.
4 changes: 4 additions & 0 deletions tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,10 @@ doc! {macro_rules! select {
let mut futures = &mut futures;

$crate::macros::support::poll_fn(|cx| {
// Return `Pending` when the task budget is depleted since budget-aware futures
// are going to yield anyway and other futures will not cooperate.
::std::task::ready!($crate::macros::support::poll_budget_available(cx));

// Track if any branch returns pending. If no branch completes
// **or** returns pending, this implies that all branches are
// disabled.
Expand Down
18 changes: 17 additions & 1 deletion tokio/src/macros/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,24 @@ cfg_macros! {
pub fn thread_rng_n(n: u32) -> u32 {
crate::runtime::context::thread_rng_n(n)
}

cfg_coop! {
#[doc(hidden)]
#[inline]
pub fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
crate::task::coop::poll_budget_available(cx)
}
}

cfg_not_coop! {
#[doc(hidden)]
#[inline]
pub fn poll_budget_available(_: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}
}

pub use std::future::{Future, IntoFuture};
pub use std::pin::Pin;
pub use std::task::Poll;
pub use std::task::{Context, Poll};
23 changes: 19 additions & 4 deletions tokio/src/task/coop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,27 @@ cfg_coop! {

Poll::Ready(restore)
} else {
defer(cx);
register_waker(cx);
Poll::Pending
}
}).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
}

/// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise.
///
/// Note that in contrast to `poll_proceed`, this method does not consume any budget and is used when
/// polling for budget availability.
#[inline]
pub(crate) fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
if has_budget_remaining() {
Poll::Ready(())
} else {
register_waker(cx);

Poll::Pending
}
}

cfg_rt! {
cfg_unstable_metrics! {
#[inline(always)]
Expand All @@ -326,7 +341,7 @@ cfg_coop! {
fn inc_budget_forced_yield_count() {}
}

fn defer(cx: &mut Context<'_>) {
fn register_waker(cx: &mut Context<'_>) {
context::defer(cx.waker());
}
}
Expand All @@ -335,8 +350,8 @@ cfg_coop! {
#[inline(always)]
fn inc_budget_forced_yield_count() {}

fn defer(cx: &mut Context<'_>) {
cx.waker().wake_by_ref();
fn register_waker(cx: &mut Context<'_>) {
cx.waker().wake_by_ref()
}
}

Expand Down
24 changes: 24 additions & 0 deletions tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,27 @@ async fn temporary_lifetime_extension() {
() = &mut std::future::ready(()) => {},
}
}

#[tokio::test]
async fn select_is_budget_aware() {
const BUDGET: usize = 128;

let task = || {
Box::pin(async move {
tokio::select! {
biased;

() = tokio::task::coop::consume_budget() => {},
() = std::future::ready(()) => {}
}
})
};

for _ in 0..BUDGET {
let poll = futures::poll!(&mut task());
assert!(poll.is_ready());
}

let poll = futures::poll!(&mut task());
assert!(poll.is_pending());
}

0 comments on commit 0284d1b

Please sign in to comment.