Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: overhaul task hooks #7197

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
3 changes: 0 additions & 3 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,7 @@
//! - [`task::Builder`]
//! - Some methods on [`task::JoinSet`]
//! - [`runtime::RuntimeMetrics`]
//! - [`runtime::Builder::on_task_spawn`]
//! - [`runtime::Builder::on_task_terminate`]
//! - [`runtime::Builder::unhandled_panic`]
//! - [`runtime::TaskMeta`]
//!
//! This flag enables **unstable** features. The public API of these features
//! may break in 1.x releases. To enable these features, the `--cfg
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,15 @@ impl Spawner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
// let parent = with_c
let id = task::Id::next();
let fut =
blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64());

#[cfg(tokio_unstable)]
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id, None);

#[cfg(not(tokio_unstable))]
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);

let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
Expand Down
24 changes: 15 additions & 9 deletions tokio/src/runtime/blocking/schedule.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#[cfg(feature = "test-util")]
use crate::runtime::scheduler;
use crate::runtime::task::{self, Task, TaskHarnessScheduleHooks};
use crate::runtime::task::{self, Task};
use crate::runtime::Handle;
#[cfg(tokio_unstable)]
use crate::runtime::{OptionalTaskHooksFactory, OptionalTaskHooksFactoryRef};

/// `task::Schedule` implementation that does nothing (except some bookkeeping
/// in test-util builds). This is unique to the blocking scheduler as tasks
Expand All @@ -12,7 +14,8 @@ use crate::runtime::Handle;
pub(crate) struct BlockingSchedule {
#[cfg(feature = "test-util")]
handle: Handle,
hooks: TaskHarnessScheduleHooks,
#[cfg(tokio_unstable)]
hooks_factory: OptionalTaskHooksFactory,
}

impl BlockingSchedule {
Expand All @@ -33,9 +36,8 @@ impl BlockingSchedule {
BlockingSchedule {
#[cfg(feature = "test-util")]
handle: handle.clone(),
hooks: TaskHarnessScheduleHooks {
task_terminate_callback: handle.inner.hooks().task_terminate_callback.clone(),
},
#[cfg(tokio_unstable)]
hooks_factory: handle.inner.hooks_factory(),
}
}
}
Expand All @@ -62,9 +64,13 @@ impl task::Schedule for BlockingSchedule {
unreachable!();
}

fn hooks(&self) -> TaskHarnessScheduleHooks {
TaskHarnessScheduleHooks {
task_terminate_callback: self.hooks.task_terminate_callback.clone(),
}
#[cfg(tokio_unstable)]
fn hooks_factory(&self) -> OptionalTaskHooksFactory {
self.hooks_factory.clone()
}

#[cfg(tokio_unstable)]
fn hooks_factory_ref(&self) -> OptionalTaskHooksFactoryRef<'_> {
self.hooks_factory.as_ref().map(AsRef::as_ref)
}
}
237 changes: 22 additions & 215 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
#![cfg_attr(loom, allow(unused_imports))]

use crate::runtime::blocking::BlockingPool;
use crate::runtime::handle::Handle;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
use crate::runtime::scheduler::CurrentThread;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
#[cfg(tokio_unstable)]
use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
use crate::runtime::{
metrics::HistogramConfiguration, LocalOptions, LocalRuntime, OptionalTaskHooksFactory,
TaskHookHarnessFactory,
};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use crate::runtime::blocking::BlockingPool;
use crate::runtime::scheduler::CurrentThread;
use std::fmt;
use std::io;
#[cfg(tokio_unstable)]
use std::sync::Arc;
use std::thread::ThreadId;
use std::time::Duration;

Expand Down Expand Up @@ -85,19 +89,8 @@ pub struct Builder {
/// To run after each thread is unparked.
pub(super) after_unpark: Option<Callback>,

/// To run before each task is spawned.
pub(super) before_spawn: Option<TaskCallback>,

/// To run before each poll
#[cfg(tokio_unstable)]
pub(super) before_poll: Option<TaskCallback>,

/// To run after each poll
#[cfg(tokio_unstable)]
pub(super) after_poll: Option<TaskCallback>,

/// To run after each task is terminated.
pub(super) after_termination: Option<TaskCallback>,
pub(super) task_hook_harness_factory: OptionalTaskHooksFactory,

/// Customizable keep alive timeout for `BlockingPool`
pub(super) keep_alive: Option<Duration>,
Expand Down Expand Up @@ -311,13 +304,8 @@ impl Builder {
before_park: None,
after_unpark: None,

before_spawn: None,
after_termination: None,

#[cfg(tokio_unstable)]
before_poll: None,
#[cfg(tokio_unstable)]
after_poll: None,
task_hook_harness_factory: None,

keep_alive: None,

Expand Down Expand Up @@ -706,188 +694,19 @@ impl Builder {
self
}

/// Executes function `f` just before a task is spawned.
///
/// `f` is called within the Tokio context, so functions like
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
/// invoked immediately.
///
/// This can be used for bookkeeping or monitoring purposes.
///
/// Note: There can only be one spawn callback for a runtime; calling this function more
/// than once replaces the last callback defined, rather than adding to it.
///
/// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
/// # pub fn main() {
/// let runtime = runtime::Builder::new_current_thread()
/// .on_task_spawn(|_| {
/// println!("spawning task");
/// })
/// .build()
/// .unwrap();
/// Factory method for producing "fallback" task hook harnesses.
///
/// runtime.block_on(async {
/// tokio::task::spawn(std::future::ready(()));
///
/// for _ in 0..64 {
/// tokio::task::yield_now().await;
/// }
/// })
/// # }
/// ```
/// The order of operations for assigning the hook harness for a task are as follows:
/// 1. [crate::task::spawn_with_hooks], if used.
/// 2. [crate::runtime::task_hooks::TaskHookHarnessFactory], if it returns something other than [Option::None].
/// 3. This function.
#[cfg(all(not(loom), tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
where
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
{
self.before_spawn = Some(std::sync::Arc::new(f));
self
}

/// Executes function `f` just before a task is polled
///
/// `f` is called within the Tokio context, so functions like
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
/// invoked immediately.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
///
/// # Examples
///
/// ```
/// # use std::sync::{atomic::AtomicUsize, Arc};
/// # use tokio::task::yield_now;
/// # pub fn main() {
/// let poll_start_counter = Arc::new(AtomicUsize::new(0));
/// let poll_start = poll_start_counter.clone();
/// let rt = tokio::runtime::Builder::new_multi_thread()
/// .enable_all()
/// .on_before_task_poll(move |meta| {
/// println!("task {} is about to be polled", meta.id())
/// })
/// .build()
/// .unwrap();
/// let task = rt.spawn(async {
/// yield_now().await;
/// });
/// let _ = rt.block_on(task);
///
/// # }
/// ```
#[cfg(tokio_unstable)]
pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
pub fn hook_harness_factory<T>(&mut self, hooks: T) -> &mut Self
where
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
T: TaskHookHarnessFactory + Send + Sync + 'static,
{
self.before_poll = Some(std::sync::Arc::new(f));
self
}

/// Executes function `f` just after a task is polled
///
/// `f` is called within the Tokio context, so functions like
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
/// invoked immediately.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
///
/// # Examples
///
/// ```
/// # use std::sync::{atomic::AtomicUsize, Arc};
/// # use tokio::task::yield_now;
/// # pub fn main() {
/// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
/// let poll_stop = poll_stop_counter.clone();
/// let rt = tokio::runtime::Builder::new_multi_thread()
/// .enable_all()
/// .on_after_task_poll(move |meta| {
/// println!("task {} completed polling", meta.id());
/// })
/// .build()
/// .unwrap();
/// let task = rt.spawn(async {
/// yield_now().await;
/// });
/// let _ = rt.block_on(task);
///
/// # }
/// ```
#[cfg(tokio_unstable)]
pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
where
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
{
self.after_poll = Some(std::sync::Arc::new(f));
self
}

/// Executes function `f` just after a task is terminated.
///
/// `f` is called within the Tokio context, so functions like
/// [`tokio::spawn`](crate::spawn) can be called.
///
/// This can be used for bookkeeping or monitoring purposes.
///
/// Note: There can only be one task termination callback for a runtime; calling this
/// function more than once replaces the last callback defined, rather than adding to it.
///
/// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
/// # pub fn main() {
/// let runtime = runtime::Builder::new_current_thread()
/// .on_task_terminate(|_| {
/// println!("killing task");
/// })
/// .build()
/// .unwrap();
///
/// runtime.block_on(async {
/// tokio::task::spawn(std::future::ready(()));
///
/// for _ in 0..64 {
/// tokio::task::yield_now().await;
/// }
/// })
/// # }
/// ```
#[cfg(all(not(loom), tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
where
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
{
self.after_termination = Some(std::sync::Arc::new(f));
self.task_hook_harness_factory = Some(Arc::new(hooks));
self
}

Expand Down Expand Up @@ -1508,12 +1327,8 @@ impl Builder {
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
after_termination: self.after_termination.clone(),
task_hook_factory: self.task_hook_harness_factory.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
Expand Down Expand Up @@ -1662,12 +1477,8 @@ cfg_rt_multi_thread! {
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
after_termination: self.after_termination.clone(),
task_hook_factory: self.task_hook_harness_factory.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
Expand Down Expand Up @@ -1715,12 +1526,8 @@ cfg_rt_multi_thread! {
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
before_spawn: self.before_spawn.clone(),
after_termination: self.after_termination.clone(),
#[cfg(tokio_unstable)]
before_poll: self.before_poll.clone(),
#[cfg(tokio_unstable)]
after_poll: self.after_poll.clone(),
task_hook_factory: self.task_hook_harness_factory.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
local_queue_capacity: self.local_queue_capacity,
Expand Down
Loading
Loading