diff --git a/src/lib.rs b/src/lib.rs index 10811107..f8633dde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -484,9 +484,9 @@ impl SubmissionQueue { /// Setup a listener for user space messages. /// - /// The returned [`MsgListener`] iterator will return all messages send - /// using [`SubmissionQueue::try_send_msg`] and - /// [`SubmissionQueue::send_msg`] using the returned `MsgToken`. + /// The returned [`MsgListener`] will return all messages send using + /// [`msg::try_send_msg`] and [`msg::send_msg`] using the returned + /// `MsgToken`. /// /// # Notes /// @@ -503,34 +503,41 @@ impl SubmissionQueue { /// don't use `MsgToken` after it became invalid. Furthermore to ensure /// the creation of it succeeds it should be done early in the lifetime of /// `Ring`. + /// + /// This is deprecated, use [`msg::msg_listener`] instead. + #[deprecated(note = "use a10::msg::msg_listener instead")] pub fn msg_listener(self) -> io::Result<(MsgListener, MsgToken)> { - MsgListener::new(self) + msg::msg_listener(self) } - /// Try to send a message to iterator listening for message using `MsgToken`. + /// Try to send a message to iterator listening for message using [`MsgToken`]. /// /// This will use the io_uring submission queue to share `data` with the /// receiving end. This means that it will wake up the thread if it's /// currently [polling]. /// /// This will fail if the submission queue is currently full. See - /// [`SubmissionQueue::send_msg`] for a version that tries again when the - /// submission queue is full. + /// [`send_msg`] for a version that tries again when the submission queue is + /// full. + /// + /// See [`msg_listener`] for examples. /// - /// See [`SubmissionQueue::msg_listener`] for examples. + /// This is deprecated, use [`msg::try_send_msg`] instead. /// /// [polling]: Ring::poll + /// [`send_msg`]: msg::send_msg + /// [`msg_listener`]: msg::msg_listener + #[deprecated(note = "use a10::msg::try_send_msg instead")] pub fn try_send_msg(&self, token: MsgToken, data: u32) -> io::Result<()> { - self.add_no_result(|submission| unsafe { - submission.msg(self.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0); - submission.no_completion_event(); - })?; - Ok(()) + msg::try_send_msg(self, token, data) } - /// Send a message to iterator listening for message using `MsgToken`. + /// Send a message to iterator listening for message using [`MsgToken`]. + /// + /// This is deprecated, use [`msg::send_msg`] instead. + #[deprecated(note = "use a10::msg::send_msg instead")] pub const fn send_msg<'a>(&'a self, token: MsgToken, data: u32) -> SendMsg<'a> { - SendMsg::new(self, token, data) + msg::send_msg(self, token, data) } /// Wait for an event specified in `mask` on the file descriptor `fd`. @@ -546,13 +553,15 @@ impl SubmissionQueue { /// Returns an [`AsyncIterator`] that returns multiple events as specified /// in `mask` on the file descriptor `fd`. /// - /// This is not the same as calling [`SubmissionQueue::oneshot_poll`] in a + /// This is not the same as calling [`oneshot_poll`] in a /// loop as this uses a multishot operation, which means only a single /// operation is created kernel side, making this more efficient. /// /// This is deprecated, use [`poll::multishot_poll`] instead. /// /// [`AsyncIterator`]: std::async_iter::AsyncIterator + /// + /// [`oneshot_poll`]: poll::oneshot_poll #[deprecated(note = "use a10::poll::multishot_poll instead")] pub fn multishot_poll<'a>(&'a self, fd: BorrowedFd, mask: libc::c_int) -> MultishotPoll<'a> { poll::multishot_poll(self, fd, mask) diff --git a/src/msg.rs b/src/msg.rs index 536568f0..49d5316a 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -1,12 +1,12 @@ //! User space messages. //! -//! To setup a [`MsgListener`] use [`SubmissionQueue::msg_listener`]. It returns -//! the listener as well as a [`MsgToken`], which can be used in -//! [`SubmissionQueue::try_send_msg`] and [`SubmissionQueue::send_msg`] to send -//! a message to the created `MsgListener`. +//! To setup a [`MsgListener`] use [`msg_listener`]. It returns the listener as +//! well as a [`MsgToken`], which can be used in [`try_send_msg`] and +//! [`send_msg`] to send a message to the created `MsgListener`. use std::future::Future; use std::io; +use std::os::fd::AsRawFd; use std::pin::Pin; use std::task::{self, Poll}; @@ -14,12 +14,40 @@ use crate::{OpIndex, SubmissionQueue}; /// Token used to the messages. /// -/// See [`SubmissionQueue::msg_listener`]. +/// See [`msg_listener`]. #[derive(Copy, Clone, Debug)] #[allow(clippy::module_name_repetitions)] pub struct MsgToken(pub(crate) OpIndex); -/// [`AsyncIterator`] behind [`SubmissionQueue::msg_listener`]. +/// Setup a listener for user space messages. +/// +/// The returned [`MsgListener`] will return all messages send using +/// [`try_send_msg`] and [`send_msg`] using the returned `MsgToken`. +/// +/// # Notes +/// +/// This will return an error if too many operations are already queued, +/// this is usually resolved by calling [`Ring::poll`]. +/// +/// The returned `MsgToken` has an implicitly lifetime linked to +/// `MsgListener`. If `MsgListener` is dropped the `MsgToken` will +/// become invalid. +/// +/// Due to the limitations mentioned above it's advised to consider the +/// usefulness of the type severly limited. The returned `MsgListener` +/// iterator should live for the entire lifetime of the `Ring`, to ensure we +/// don't use `MsgToken` after it became invalid. Furthermore to ensure +/// the creation of it succeeds it should be done early in the lifetime of +/// `Ring`. +/// +/// [`Ring::poll`]: crate::Ring::poll +#[allow(clippy::module_name_repetitions)] +pub fn msg_listener(sq: SubmissionQueue) -> io::Result<(MsgListener, MsgToken)> { + let op_index = sq.queue_multishot()?; + Ok((MsgListener { sq, op_index }, MsgToken(op_index))) +} + +/// [`AsyncIterator`] behind [`msg_listener`]. /// /// [`AsyncIterator`]: std::async_iter::AsyncIterator #[derive(Debug)] @@ -31,12 +59,6 @@ pub struct MsgListener { } impl MsgListener { - /// Create a new `MsgListener`. - pub(crate) fn new(sq: SubmissionQueue) -> io::Result<(MsgListener, MsgToken)> { - let op_index = sq.queue_multishot()?; - Ok((MsgListener { sq, op_index }, MsgToken(op_index))) - } - /// This is the same as the `AsyncIterator::poll_next` function, but then /// available on stable Rust. pub fn poll_next(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll> { @@ -63,28 +85,48 @@ impl std::async_iter::AsyncIterator for MsgListener { } } -/// [`Future`] behind [`SubmissionQueue::send_msg`]. +/// Try to send a message to iterator listening for message using [`MsgToken`]. +/// +/// This will use the io_uring submission queue to share `data` with the +/// receiving end. This means that it will wake up the thread if it's +/// currently [polling]. +/// +/// This will fail if the submission queue is currently full. See [`send_msg`] +/// for a version that tries again when the submission queue is full. +/// +/// See [`msg_listener`] for examples. +/// +/// [polling]: crate::Ring::poll +#[allow(clippy::module_name_repetitions)] +pub fn try_send_msg(sq: &SubmissionQueue, token: MsgToken, data: u32) -> io::Result<()> { + sq.add_no_result(|submission| unsafe { + submission.msg(sq.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0); + submission.no_completion_event(); + })?; + Ok(()) +} + +/// Send a message to iterator listening for message using [`MsgToken`]. +#[allow(clippy::module_name_repetitions)] +pub const fn send_msg<'sq>(sq: &'sq SubmissionQueue, token: MsgToken, data: u32) -> SendMsg<'sq> { + SendMsg { sq, token, data } +} + +/// [`Future`] behind [`send_msg`]. #[derive(Debug)] #[must_use = "`Future`s do nothing unless polled"] #[allow(clippy::module_name_repetitions)] -pub struct SendMsg<'a> { - sq: &'a SubmissionQueue, +pub struct SendMsg<'sq> { + sq: &'sq SubmissionQueue, token: MsgToken, data: u32, } -impl<'a> SendMsg<'a> { - /// Create a new `SendMsg`. - pub(crate) const fn new(sq: &'a SubmissionQueue, token: MsgToken, data: u32) -> SendMsg { - SendMsg { sq, token, data } - } -} - -impl<'a> Future for SendMsg<'a> { +impl<'sq> Future for SendMsg<'sq> { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll { - match self.sq.try_send_msg(self.token, self.data) { + match try_send_msg(self.sq, self.token, self.data) { Ok(()) => Poll::Ready(Ok(())), Err(_) => { self.sq.wait_for_submission(ctx.waker().clone()); diff --git a/src/poll.rs b/src/poll.rs index 861e0d7c..0bd24815 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,10 +1,9 @@ //! Poll for file descriptor events. //! //! To wait for events on a file descriptor use: -//! * [`SubmissionQueue::oneshot_poll`] a [`Future`] returning a single -//! [`PollEvent`]. -//! * [`SubmissionQueue::multishot_poll`] an [`AsyncIterator`] returning -//! multiple [`PollEvent`]s. +//! * [`oneshot_poll`] a [`Future`] returning a single [`PollEvent`]. +//! * [`multishot_poll`] an [`AsyncIterator`] returning multiple +//! [`PollEvent`]s. //! //! [`AsyncIterator`]: std::async_iter::AsyncIterator @@ -36,7 +35,7 @@ pub fn oneshot_poll<'sq>( } } -/// [`Future`] behind [`SubmissionQueue::oneshot_poll`]. +/// [`Future`] behind [`oneshot_poll`]. #[derive(Debug)] #[must_use = "`Future`s do nothing unless polled"] #[allow(clippy::module_name_repetitions)] @@ -104,9 +103,9 @@ impl<'sq> Drop for OneshotPoll<'sq> { /// Returns an [`AsyncIterator`] that returns multiple events as specified /// in `mask` on the file descriptor `fd`. /// -/// This is not the same as calling [`SubmissionQueue::oneshot_poll`] in a -/// loop as this uses a multishot operation, which means only a single -/// operation is created kernel side, making this more efficient. +/// This is not the same as calling [`oneshot_poll`] in a loop as this uses a +/// multishot operation, which means only a single operation is created kernel +/// side, making this more efficient. /// /// [`AsyncIterator`]: std::async_iter::AsyncIterator #[allow(clippy::module_name_repetitions)] @@ -121,7 +120,7 @@ pub fn multishot_poll<'sq>( } } -/// [`AsyncIterator`] behind [`SubmissionQueue::multishot_poll`]. +/// [`AsyncIterator`] behind [`multishot_poll`]. /// /// [`AsyncIterator`]: std::async_iter::AsyncIterator #[derive(Debug)] diff --git a/tests/ring.rs b/tests/ring.rs index d8aac82c..18b75eaf 100644 --- a/tests/ring.rs +++ b/tests/ring.rs @@ -18,7 +18,7 @@ use std::time::{Duration, Instant}; use a10::cancel::Cancel; use a10::fs::OpenOptions; use a10::io::ReadBufPool; -use a10::msg::{MsgListener, MsgToken, SendMsg}; +use a10::msg::{msg_listener, send_msg, try_send_msg, MsgListener, MsgToken, SendMsg}; use a10::poll::{multishot_poll, oneshot_poll, MultishotPoll, OneshotPoll}; use a10::{mem, process, AsyncFd, Config, Ring, SubmissionQueue}; @@ -279,13 +279,15 @@ fn message_sending() { is_send::(); is_sync::(); - let (msg_listener, msg_token) = sq.clone().msg_listener().unwrap(); + let (msg_listener, msg_token) = msg_listener(sq.clone()).unwrap(); let mut msg_listener = pin!(msg_listener); start_mulitshot_op(msg_listener.as_mut()); // Send some messages. - sq.try_send_msg(msg_token, DATA1).unwrap(); - waker.block_on(pin!(sq.send_msg(msg_token, DATA2))).unwrap(); + try_send_msg(&sq, msg_token, DATA1).unwrap(); + waker + .block_on(pin!(send_msg(&sq, msg_token, DATA2))) + .unwrap(); assert_eq!(waker.block_on(next(msg_listener.as_mut())), Some(DATA1)); assert_eq!(waker.block_on(next(msg_listener.as_mut())), Some(DATA2));