Skip to content

Commit

Permalink
Add worker id to on_thread_park callback
Browse files Browse the repository at this point in the history
It's possible to find out from WorkerMetrics which workers are not
actively polling for tasks.  However it's not possible to tell if
non-polling workers are merely idle (parked) or if they are stuck.
By adding the worker id (same usize as used in WorkerMetrics calls)
to the on_thread_park() and on_thread_unpark() callbacks it is
possible to track which specific workers are parked.  Then any
worker that is not polling tasks and is not parked is a worker
that is stuck.

With this information it's possible to create a watchdog that alerts
or kills the process if a worker is stuck for too long.
  • Loading branch information
theron-eg committed Feb 16, 2024
1 parent b32826b commit 009ab4d
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 21 deletions.
16 changes: 8 additions & 8 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::handle::Handle;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
use crate::runtime::{blocking, driver, Callback, CallbackWorker, HistogramBuilder, Runtime};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use std::fmt;
Expand Down Expand Up @@ -73,10 +73,10 @@ pub struct Builder {
pub(super) before_stop: Option<Callback>,

/// To run before each worker thread is parked.
pub(super) before_park: Option<Callback>,
pub(super) before_park: Option<CallbackWorker>,

/// To run after each thread is unparked.
pub(super) after_unpark: Option<Callback>,
pub(super) after_unpark: Option<CallbackWorker>,

/// Customizable keep alive timeout for `BlockingPool`
pub(super) keep_alive: Option<Duration>,
Expand Down Expand Up @@ -592,7 +592,7 @@ impl Builder {
/// .worker_threads(1)
/// .on_thread_park({
/// let barrier = barrier.clone();
/// move || {
/// move |_| {
/// let barrier = barrier.clone();
/// if once.swap(false, Ordering::Relaxed) {
/// tokio::spawn(async move { barrier.wait().await; });
Expand Down Expand Up @@ -620,7 +620,7 @@ impl Builder {
/// let runtime = runtime::Builder::new_current_thread()
/// .on_thread_park({
/// let barrier = barrier.clone();
/// move || {
/// move |_| {
/// let barrier = barrier.clone();
/// if once.swap(false, Ordering::Relaxed) {
/// tokio::spawn(async move { barrier.wait().await; });
Expand All @@ -638,7 +638,7 @@ impl Builder {
#[cfg(not(loom))]
pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
F: Fn(usize) + Send + Sync + 'static,
{
self.before_park = Some(std::sync::Arc::new(f));
self
Expand All @@ -659,7 +659,7 @@ impl Builder {
/// # use tokio::runtime;
/// # pub fn main() {
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_unpark(|| {
/// .on_thread_unpark(|_| {
/// println!("thread unparking");
/// })
/// .build();
Expand All @@ -673,7 +673,7 @@ impl Builder {
#[cfg(not(loom))]
pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
F: Fn(usize) + Send + Sync + 'static,
{
self.after_unpark = Some(std::sync::Arc::new(f));
self
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]
use crate::runtime::Callback;
use crate::runtime::CallbackWorker;
use crate::util::RngSeedGenerator;

pub(crate) struct Config {
Expand All @@ -16,10 +16,10 @@ pub(crate) struct Config {
pub(crate) local_queue_capacity: usize,

/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,
pub(crate) before_park: Option<CallbackWorker>,

/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,
pub(crate) after_unpark: Option<CallbackWorker>,

/// The multi-threaded scheduler includes a per-worker LIFO slot used to
/// store the last scheduled task. This can improve certain usage patterns,
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,7 @@ cfg_rt! {

/// After thread starts / before thread stops
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;

/// Before thread parks / after thread unparks
type CallbackWorker = std::sync::Arc<dyn Fn(usize) + Send + Sync>;
}
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl Context {
let mut driver = core.driver.take().expect("driver missing");

if let Some(f) = &handle.shared.config.before_park {
let (c, ()) = self.enter(core, || f());
let (c, ()) = self.enter(core, || f(0));
core = c;
}

Expand All @@ -371,7 +371,7 @@ impl Context {
}

if let Some(f) = &handle.shared.config.after_unpark {
let (c, ()) = self.enter(core, || f());
let (c, ()) = self.enter(core, || f(0));
core = c;
}

Expand Down
14 changes: 9 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub(super) struct Worker {

/// Core data
struct Core {
/// Index of this core
index: usize,

/// Used to schedule bookkeeping tasks every so often.
tick: u32,

Expand Down Expand Up @@ -252,7 +255,7 @@ pub(super) fn create(
let mut worker_metrics = Vec::with_capacity(size);

// Create the local queues
for _ in 0..size {
for i in 0..size {
let (steal, run_queue) = queue::local();

let park = park.clone();
Expand All @@ -261,6 +264,7 @@ pub(super) fn create(
let stats = Stats::new(&metrics);

cores.push(Box::new(Core {
index: i,
tick: 0,
lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot,
Expand Down Expand Up @@ -306,10 +310,10 @@ pub(super) fn create(

let mut launch = Launch(vec![]);

for (index, core) in cores.drain(..).enumerate() {
for core in cores.drain(..) {
launch.0.push(Arc::new(Worker {
handle: handle.clone(),
index,
index: core.index,
core: AtomicCell::new(Some(core)),
}));
}
Expand Down Expand Up @@ -684,7 +688,7 @@ impl Context {
/// after all the IOs get dispatched
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.handle.shared.config.before_park {
f();
f(core.index);
}

if core.transition_to_parked(&self.worker) {
Expand All @@ -702,7 +706,7 @@ impl Context {
}

if let Some(f) = &self.worker.handle.shared.config.after_unpark {
f();
f(core.index);
}
core
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ impl Worker {

fn park(&mut self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
if let Some(f) = &cx.shared().config.before_park {
f();
f(core.index);
}

if self.can_transition_to_parked(&mut core) {
Expand All @@ -1140,7 +1140,7 @@ impl Worker {
}

if let Some(f) = &cx.shared().config.after_unpark {
f();
f(core.index);
}

Ok((None, core))
Expand Down
86 changes: 85 additions & 1 deletion tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))]

use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::{atomic, Arc, Mutex};
use std::task::Poll;
use tokio::macros::support::poll_fn;

Expand Down Expand Up @@ -680,6 +680,90 @@ fn budget_exhaustion_yield_with_joins() {
assert_eq!(1, rt.metrics().budget_forced_yield_count());
}

#[test]
fn on_thread_park_unpark() {
const THREADS: usize = 8;

// Keeps track whether or not each worker is parked
let mut bools = Vec::new();
for _ in 0..THREADS {
bools.push(atomic::AtomicBool::new(false));
}
let bools = Arc::new(bools);
let bools_park = bools.clone();
let bools_unpark = bools.clone();

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(THREADS)
.enable_all()
.on_thread_park(move |worker| {
// worker is parked
bools_park[worker].store(true, atomic::Ordering::Release);
})
.on_thread_unpark(move |worker| {
bools_unpark[worker].store(false, atomic::Ordering::Release);
})
.build()
.unwrap();
let metrics = rt.metrics();

rt.block_on(async {
// Spawn some tasks to do things, but less than the number of workers. Some
// workers won't have any work to do and will stay parked the duration of the
// test. We rely on bools to distinguish between a busy (unparked) worker that
// isn't polling, vs. ones that are merely parked the entire time.
for _ in 0..(THREADS - 1) {
tokio::spawn(async {
loop {
tokio::time::sleep(Duration::from_millis(4)).await;
}
});
}

// Give the spawned tasks a chance to both poll and park. Not really necessary.
tokio::time::sleep(Duration::from_millis(30)).await;

let _ = tokio::spawn(async move {
let mut counts = Vec::new();
for ii in 0..THREADS {
counts.push(metrics.worker_poll_count(ii));
}

let start_time = std::time::Instant::now();
while start_time.elapsed() < Duration::from_millis(100) {
// Uncomment the line below and the test fails (current worker is no
// longer "stuck" and not yielding back to tokio).

// tokio::task::yield_now().await;
}

let mut stuck = 0;
for ii in 0..THREADS {
let parked = bools[ii].load(atomic::Ordering::Acquire);
// Uncomment below to verify that some workers are not doing any polls,
// yet only one of them is not parked.

// if !parked {
// println!("task {} is not parked", ii);
// }
// if metrics.worker_poll_count(ii) == counts[ii] {
// println!("task {} has same poll count", ii);
// }

if !parked && metrics.worker_poll_count(ii) == counts[ii] {
stuck += 1;
}
}

assert_eq!(
stuck, 1,
"should be exactly one non-polling, non-parked thread"
);
})
.await;
});
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {
Expand Down

0 comments on commit 009ab4d

Please sign in to comment.