From 0791d6df847980927e8a01615779fea7738232ea Mon Sep 17 00:00:00 2001 From: Igor Erin Date: Fri, 28 Feb 2025 19:36:37 +0300 Subject: [PATCH] rt: spawn_into with explicit group --- tokio/src/runtime/handle.rs | 13 ++++++---- tokio/src/runtime/runtime.rs | 21 ++++++++++++++-- tokio/src/runtime/scheduler/mod.rs | 9 ++++--- .../runtime/scheduler/multi_thread/handle.rs | 19 +++++++++++---- .../runtime/scheduler/multi_thread/worker.rs | 19 +++++++++------ tokio/src/task/builder.rs | 8 +++---- tokio/src/task/mod.rs | 2 ++ tokio/src/task/spawn.rs | 24 +++++++++++++++---- 8 files changed, 87 insertions(+), 28 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 91f13d6c2ed..b52827cbaf3 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -192,9 +192,9 @@ impl Handle { { let fut_size = mem::size_of::(); if fut_size > BOX_FUTURE_THRESHOLD { - self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) + self.spawn_named(Box::pin(future), None, SpawnMeta::new_unnamed(fut_size)) } else { - self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) + self.spawn_named(future, None, SpawnMeta::new_unnamed(fut_size)) } } @@ -329,7 +329,12 @@ impl Handle { } #[track_caller] - pub(crate) fn spawn_named(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle + pub(crate) fn spawn_named( + &self, + future: F, + group: Option, + _meta: SpawnMeta<'_>, + ) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, @@ -345,7 +350,7 @@ impl Handle { let future = super::task::trace::Trace::root(future); #[cfg(all(tokio_unstable, feature = "tracing"))] let future = crate::util::trace::task(future, "task", _meta, id.as_u64()); - self.inner.spawn(future, id) + self.inner.spawn(future, group, id) } #[track_caller] diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index ea6bb247941..58259376e4a 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -247,10 +247,27 @@ impl Runtime { let fut_size = mem::size_of::(); if fut_size > BOX_FUTURE_THRESHOLD { self.handle - .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) + .spawn_named(Box::pin(future), None, SpawnMeta::new_unnamed(fut_size)) } else { self.handle - .spawn_named(future, SpawnMeta::new_unnamed(fut_size)) + .spawn_named(future, None, SpawnMeta::new_unnamed(fut_size)) + } + } + + /// TODO(i.Erin) + #[track_caller] + pub fn spawn_into(&self, group: Option, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.handle + .spawn_named(Box::pin(future), group, SpawnMeta::new_unnamed(fut_size)) + } else { + self.handle + .spawn_named(future, group, SpawnMeta::new_unnamed(fut_size)) } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index f09da02c20d..077330bbdc3 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -138,16 +138,19 @@ cfg_rt! { } } - pub(crate) fn spawn(&self, future: F, id: Id) -> JoinHandle + pub(crate) fn spawn(&self, future: F, group: Option, id: Id) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { match self { - Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id), + Handle::CurrentThread(h) => { + assert!(group.is_none()); + current_thread::Handle::spawn(h, future, id) + }, #[cfg(feature = "rt-multi-thread")] - Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id), + Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, group, id), #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id), diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 4075713c979..97a19c5d6ac 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -36,31 +36,42 @@ pub(crate) struct Handle { impl Handle { /// Spawns a future onto the thread pool - pub(crate) fn spawn(me: &Arc, future: F, id: task::Id) -> JoinHandle + pub(crate) fn spawn( + me: &Arc, + future: F, + group: Option, + id: task::Id, + ) -> JoinHandle where F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { - Self::bind_new_task(me, future, id) + Self::bind_new_task(me, future, group, id) } pub(crate) fn shutdown(&self) { self.close(); } - pub(super) fn bind_new_task(me: &Arc, future: T, id: task::Id) -> JoinHandle + pub(super) fn bind_new_task( + me: &Arc, + future: T, + group: Option, + id: task::Id, + ) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); + // TODO(i.Erin) hooks on group me.task_hooks.spawn(&TaskMeta { id, _phantom: Default::default(), }); - me.schedule_option_task_without_yield(notified); + me.schedule_option_task_without_yield(notified, group); handle } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e4fc595ed34..a21cf5046a7 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1100,7 +1100,7 @@ impl task::Schedule for Arc { } fn schedule(&self, task: Notified) { - self.schedule_task(task, false); + self.schedule_task(task, None, false); } fn hooks(&self) -> TaskHarnessScheduleHooks { @@ -1110,32 +1110,37 @@ impl task::Schedule for Arc { } fn yield_now(&self, task: Notified) { - self.schedule_task(task, true); + self.schedule_task(task, None, true); } } impl Handle { - pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { + pub(super) fn schedule_task(&self, task: Notified, group: Option, is_yield: bool) { with_current(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. if self.ptr_eq(&cx.worker.handle) { // And the current thread still holds a core if let Some(core) = cx.core.borrow_mut().as_mut() { - self.schedule_local(core, task, cx.worker.group, is_yield); + let group = group.unwrap_or(cx.worker.group); + self.schedule_local(core, task, group, is_yield); return; } } } // Otherwise, use the inject queue. - self.push_remote_task(None, task); + self.push_remote_task(group, task); }); } - pub(super) fn schedule_option_task_without_yield(&self, task: Option) { + pub(super) fn schedule_option_task_without_yield( + &self, + task: Option, + group: Option, + ) { if let Some(task) = task { - self.schedule_task(task, false); + self.schedule_task(task, group, false); } } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 6053352a01c..13840bab0ab 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -91,9 +91,9 @@ impl<'a> Builder<'a> { { let fut_size = mem::size_of::(); Ok(if fut_size > BOX_FUTURE_THRESHOLD { - super::spawn::spawn_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) + super::spawn::spawn_inner(Box::pin(future), None, SpawnMeta::new(self.name, fut_size)) } else { - super::spawn::spawn_inner(future, SpawnMeta::new(self.name, fut_size)) + super::spawn::spawn_inner(future, None, SpawnMeta::new(self.name, fut_size)) }) } @@ -112,9 +112,9 @@ impl<'a> Builder<'a> { { let fut_size = mem::size_of::(); Ok(if fut_size > BOX_FUTURE_THRESHOLD { - handle.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size)) + handle.spawn_named(Box::pin(future), None, SpawnMeta::new(self.name, fut_size)) } else { - handle.spawn_named(future, SpawnMeta::new(self.name, fut_size)) + handle.spawn_named(future, None, SpawnMeta::new(self.name, fut_size)) }) } diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index f0c6f71c15a..4ed4dbfb394 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -274,6 +274,8 @@ cfg_rt! { mod spawn; pub use spawn::spawn; + #[cfg(feature = "rt-multi-thread")] + pub use spawn::spawn_into; cfg_rt_multi_thread! { pub use blocking::block_in_place; diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 7c748226121..77ce7e0e446 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -170,14 +170,14 @@ cfg_rt! { { let fut_size = std::mem::size_of::(); if fut_size > BOX_FUTURE_THRESHOLD { - spawn_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) + spawn_inner(Box::pin(future), None, SpawnMeta::new_unnamed(fut_size)) } else { - spawn_inner(future, SpawnMeta::new_unnamed(fut_size)) + spawn_inner(future, None, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - pub(super) fn spawn_inner(future: T, meta: SpawnMeta<'_>) -> JoinHandle + pub(super) fn spawn_inner(future: T, group: Option, meta: SpawnMeta<'_>) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, @@ -199,9 +199,25 @@ cfg_rt! { let id = task::Id::next(); let task = crate::util::trace::task(future, "task", meta, id.as_u64()); - match context::with_current(|handle| handle.spawn(task, id)) { + match context::with_current(|handle| handle.spawn(task, group, id)) { Ok(join_handle) => join_handle, Err(e) => panic!("{}", e), } } } + +cfg_rt_multi_thread! { + /// TODO(i.Erin) + pub fn spawn_into(future: F, group: usize) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let fut_size = std::mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + spawn_inner(Box::pin(future), Some(group), SpawnMeta::new_unnamed(fut_size)) + } else { + spawn_inner(future, Some(group), SpawnMeta::new_unnamed(fut_size)) + } + } +}