Skip to content

Commit

Permalink
macros: docs about select! alternatives (#7110)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ddystopia authored Feb 15, 2025
1 parent 8e13417 commit 34cdcc7
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 0 deletions.
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ tokio-stream = { version = "0.1", path = "../tokio-stream" }
futures = { version = "0.3.0", features = ["async-await"] }
mockall = "0.11.1"
async-stream = "0.3"
futures-concurrency = "7.6.3"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
socket2 = "0.5.5"
Expand Down
147 changes: 147 additions & 0 deletions tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,153 @@ macro_rules! doc {
/// }
/// }
/// ```
/// # Alternatives from the Ecosystem
///
/// The `select!` macro is a powerful tool for managing multiple asynchronous
/// branches, enabling tasks to run concurrently within the same thread. However,
/// its use can introduce challenges, particularly around cancellation safety, which
/// can lead to subtle and hard-to-debug errors. For many use cases, ecosystem
/// alternatives may be preferable as they mitigate these concerns by offering
/// clearer syntax, more predictable control flow, and reducing the need to manually
/// handle issues like fuse semantics or cancellation safety.
///
/// ## Merging Streams
///
/// For cases where `loop { select! { ... } }` is used to poll multiple tasks,
/// stream merging offers a concise alternative, inherently handle cancellation-safe
/// processing, removing the risk of data loss. Libraries such as [`tokio_stream`],
/// [`futures::stream`] and [`futures_concurrency`] provide tools for merging
/// streams and handling their outputs sequentially.
///
/// [`tokio_stream`]: https://docs.rs/tokio-stream/latest/tokio_stream/
/// [`futures::stream`]: https://docs.rs/futures/latest/futures/stream/
/// [`futures_concurrency`]: https://docs.rs/futures-concurrency/latest/futures_concurrency/
///
/// ### Example with `select!`
///
/// ```
/// struct File;
/// struct Channel;
/// struct Socket;
///
/// impl Socket {
/// async fn read_packet(&mut self) -> Vec<u8> {
/// vec![]
/// }
/// }
///
/// async fn read_send(_file: &mut File, _channel: &mut Channel) {
/// // do work that is not cancel safe
/// }
///
/// #[tokio::main]
/// async fn main() {
/// // open our IO types
/// let mut file = File;
/// let mut channel = Channel;
/// let mut socket = Socket;
///
/// loop {
/// tokio::select! {
/// _ = read_send(&mut file, &mut channel) => { /* ... */ },
/// _data = socket.read_packet() => { /* ... */ }
/// _ = futures::future::ready(()) => break
/// }
/// }
/// }
///
/// ```
///
/// ### Moving to `merge`
///
/// By using merge, you can unify multiple asynchronous tasks into a single stream,
/// eliminating the need to manage tasks manually and reducing the risk of
/// unintended behavior like data loss.
///
/// ```
/// use std::pin::pin;
///
/// use futures::stream::unfold;
/// use tokio_stream::StreamExt;
///
/// struct File;
/// struct Channel;
/// struct Socket;
///
/// impl Socket {
/// async fn read_packet(&mut self) -> Vec<u8> {
/// vec![]
/// }
/// }
///
/// async fn read_send(_file: &mut File, _channel: &mut Channel) {
/// // do work that is not cancel safe
/// }
///
/// enum Message {
/// Stop,
/// Sent,
/// Data(Vec<u8>),
/// }
///
/// #[tokio::main]
/// async fn main() {
/// // open our IO types
/// let file = File;
/// let channel = Channel;
/// let socket = Socket;
///
/// let a = unfold((file, channel), |(mut file, mut channel)| async {
/// read_send(&mut file, &mut channel).await;
/// Some((Message::Sent, (file, channel)))
/// });
/// let b = unfold(socket, |mut socket| async {
/// let data = socket.read_packet().await;
/// Some((Message::Data(data), socket))
/// });
/// let c = tokio_stream::iter([Message::Stop]);
///
/// let mut s = pin!(a.merge(b).merge(c));
/// while let Some(msg) = s.next().await {
/// match msg {
/// Message::Data(_data) => { /* ... */ }
/// Message::Sent => continue,
/// Message::Stop => break,
/// }
/// }
/// }
/// ```
///
/// ## Racing Futures
///
/// If you need to wait for the first completion among several asynchronous tasks,
/// ecosystem utilities such as
/// [`futures`](https://docs.rs/futures/latest/futures/),
/// [`futures-lite`](https://docs.rs/futures-lite/latest/futures_lite/) or
/// [`futures-concurrency`](https://docs.rs/futures-concurrency/latest/futures_concurrency/)
/// provide streamlined syntax for racing futures:
///
/// - [`futures_concurrency::future::Race`](https://docs.rs/futures-concurrency/latest/futures_concurrency/future/trait.Race.html)
/// - [`futures::select`](https://docs.rs/futures/latest/futures/macro.select.html)
/// - [`futures::stream::select_all`](https://docs.rs/futures/latest/futures/stream/select_all/index.html) (for streams)
/// - [`futures_lite::future::or`](https://docs.rs/futures-lite/latest/futures_lite/future/fn.or.html)
/// - [`futures_lite::future::race`](https://docs.rs/futures-lite/latest/futures_lite/future/fn.race.html)
///
/// ```
/// use futures_concurrency::future::Race;
///
/// #[tokio::main]
/// async fn main() {
/// let task_a = async { Ok("ok") };
/// let task_b = async { Err("error") };
/// let result = (task_a, task_b).race().await;
///
/// match result {
/// Ok(output) => println!("First task completed with: {output}"),
/// Err(err) => eprintln!("Error occurred: {err}"),
/// }
/// }
/// ```
#[macro_export]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
$select
Expand Down

0 comments on commit 34cdcc7

Please sign in to comment.