Skip to content

Commit

Permalink
rt: spawn_into with explicit group
Browse files Browse the repository at this point in the history
  • Loading branch information
IgorErin committed Feb 28, 2025
1 parent 72c23ee commit 0791d6d
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 28 deletions.
13 changes: 9 additions & 4 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ impl Handle {
{
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
self.spawn_named(Box::pin(future), None, SpawnMeta::new_unnamed(fut_size))
} else {
self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
self.spawn_named(future, None, SpawnMeta::new_unnamed(fut_size))
}
}

Expand Down Expand Up @@ -329,7 +329,12 @@ impl Handle {
}

#[track_caller]
pub(crate) fn spawn_named<F>(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
pub(crate) fn spawn_named<F>(
&self,
future: F,
group: Option<usize>,
_meta: SpawnMeta<'_>,
) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand All @@ -345,7 +350,7 @@ impl Handle {
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _meta, id.as_u64());
self.inner.spawn(future, id)
self.inner.spawn(future, group, id)
}

#[track_caller]
Expand Down
21 changes: 19 additions & 2 deletions tokio/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,27 @@ impl Runtime {
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.handle
.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
.spawn_named(Box::pin(future), None, SpawnMeta::new_unnamed(fut_size))
} else {
self.handle
.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
.spawn_named(future, None, SpawnMeta::new_unnamed(fut_size))
}
}

/// TODO(i.Erin)
#[track_caller]
pub fn spawn_into<F>(&self, group: Option<usize>, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.handle
.spawn_named(Box::pin(future), group, SpawnMeta::new_unnamed(fut_size))
} else {
self.handle
.spawn_named(future, group, SpawnMeta::new_unnamed(fut_size))
}
}

Expand Down
9 changes: 6 additions & 3 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,19 @@ cfg_rt! {
}
}

pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F, group: Option<usize>, id: Id) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
Handle::CurrentThread(h) => {
assert!(group.is_none());
current_thread::Handle::spawn(h, future, id)
},

#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, group, id),

#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id),
Expand Down
19 changes: 15 additions & 4 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,42 @@ pub(crate) struct Handle {

impl Handle {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(
me: &Arc<Self>,
future: F,
group: Option<usize>,
id: task::Id,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id)
Self::bind_new_task(me, future, group, id)
}

pub(crate) fn shutdown(&self) {
self.close();
}

pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
pub(super) fn bind_new_task<T>(
me: &Arc<Self>,
future: T,
group: Option<usize>,
id: task::Id,
) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);

// TODO(i.Erin) hooks on group
me.task_hooks.spawn(&TaskMeta {
id,
_phantom: Default::default(),
});

me.schedule_option_task_without_yield(notified);
me.schedule_option_task_without_yield(notified, group);

handle
}
Expand Down
19 changes: 12 additions & 7 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ impl task::Schedule for Arc<Handle> {
}

fn schedule(&self, task: Notified) {
self.schedule_task(task, false);
self.schedule_task(task, None, false);
}

fn hooks(&self) -> TaskHarnessScheduleHooks {
Expand All @@ -1110,32 +1110,37 @@ impl task::Schedule for Arc<Handle> {
}

fn yield_now(&self, task: Notified) {
self.schedule_task(task, true);
self.schedule_task(task, None, true);
}
}

impl Handle {
pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
pub(super) fn schedule_task(&self, task: Notified, group: Option<usize>, is_yield: bool) {
with_current(|maybe_cx| {
if let Some(cx) = maybe_cx {
// Make sure the task is part of the **current** scheduler.
if self.ptr_eq(&cx.worker.handle) {
// And the current thread still holds a core
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, cx.worker.group, is_yield);
let group = group.unwrap_or(cx.worker.group);
self.schedule_local(core, task, group, is_yield);
return;
}
}
}

// Otherwise, use the inject queue.
self.push_remote_task(None, task);
self.push_remote_task(group, task);
});
}

pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
pub(super) fn schedule_option_task_without_yield(
&self,
task: Option<Notified>,
group: Option<usize>,
) {
if let Some(task) = task {
self.schedule_task(task, false);
self.schedule_task(task, group, false);
}
}

Expand Down
8 changes: 4 additions & 4 deletions tokio/src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ impl<'a> Builder<'a> {
{
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
super::spawn::spawn_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size))
super::spawn::spawn_inner(Box::pin(future), None, SpawnMeta::new(self.name, fut_size))
} else {
super::spawn::spawn_inner(future, SpawnMeta::new(self.name, fut_size))
super::spawn::spawn_inner(future, None, SpawnMeta::new(self.name, fut_size))
})
}

Expand All @@ -112,9 +112,9 @@ impl<'a> Builder<'a> {
{
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
handle.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size))
handle.spawn_named(Box::pin(future), None, SpawnMeta::new(self.name, fut_size))
} else {
handle.spawn_named(future, SpawnMeta::new(self.name, fut_size))
handle.spawn_named(future, None, SpawnMeta::new(self.name, fut_size))
})
}

Expand Down
2 changes: 2 additions & 0 deletions tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ cfg_rt! {

mod spawn;
pub use spawn::spawn;
#[cfg(feature = "rt-multi-thread")]
pub use spawn::spawn_into;

cfg_rt_multi_thread! {
pub use blocking::block_in_place;
Expand Down
24 changes: 20 additions & 4 deletions tokio/src/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ cfg_rt! {
{
let fut_size = std::mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
spawn_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
spawn_inner(Box::pin(future), None, SpawnMeta::new_unnamed(fut_size))
} else {
spawn_inner(future, SpawnMeta::new_unnamed(fut_size))
spawn_inner(future, None, SpawnMeta::new_unnamed(fut_size))
}
}

#[track_caller]
pub(super) fn spawn_inner<T>(future: T, meta: SpawnMeta<'_>) -> JoinHandle<T::Output>
pub(super) fn spawn_inner<T>(future: T, group: Option<usize>, meta: SpawnMeta<'_>) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
Expand All @@ -199,9 +199,25 @@ cfg_rt! {
let id = task::Id::next();
let task = crate::util::trace::task(future, "task", meta, id.as_u64());

match context::with_current(|handle| handle.spawn(task, id)) {
match context::with_current(|handle| handle.spawn(task, group, id)) {
Ok(join_handle) => join_handle,
Err(e) => panic!("{}", e),
}
}
}

cfg_rt_multi_thread! {
/// TODO(i.Erin)
pub fn spawn_into<F>(future: F, group: usize) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let fut_size = std::mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
spawn_inner(Box::pin(future), Some(group), SpawnMeta::new_unnamed(fut_size))
} else {
spawn_inner(future, Some(group), SpawnMeta::new_unnamed(fut_size))
}
}
}

0 comments on commit 0791d6d

Please sign in to comment.