Skip to content

Commit

Permalink
Move SubmissionQueue::(try_)send_msg to msg module
Browse files Browse the repository at this point in the history
As standalone functions.

SubmissionQueue::(try_)send_msg functions are not yet deleted, just
marked as deprecated.
  • Loading branch information
Thomasdezeeuw committed Apr 20, 2024
1 parent 0a9ff3d commit d620de6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 34 deletions.
27 changes: 15 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ impl SubmissionQueue {
/// Setup a listener for user space messages.
///
/// The returned [`MsgListener`] will return all messages send using
/// [`SubmissionQueue::try_send_msg`] and [`SubmissionQueue::send_msg`]
/// using the returned `MsgToken`.
/// [`msg::try_send_msg`] and [`msg::send_msg`] using the returned
/// `MsgToken`.
///
/// # Notes
///
Expand All @@ -510,31 +510,34 @@ impl SubmissionQueue {
msg::msg_listener(self)
}

/// Try to send a message to iterator listening for message using `MsgToken`.
/// Try to send a message to iterator listening for message using [`MsgToken`].
///
/// This will use the io_uring submission queue to share `data` with the
/// receiving end. This means that it will wake up the thread if it's
/// currently [polling].
///
/// This will fail if the submission queue is currently full. See
/// [`SubmissionQueue::send_msg`] for a version that tries again when the
/// submission queue is full.
/// [`send_msg`] for a version that tries again when the submission queue is
/// full.
///
/// See [`msg_listener`] for examples.
///
/// This is deprecated, use [`msg::try_send_msg`] instead.
///
/// [polling]: Ring::poll
/// [`send_msg`]: msg::send_msg
/// [`msg_listener`]: msg::msg_listener
#[deprecated(note = "use a10::msg::try_send_msg instead")]
pub fn try_send_msg(&self, token: MsgToken, data: u32) -> io::Result<()> {
self.add_no_result(|submission| unsafe {
submission.msg(self.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0);
submission.no_completion_event();
})?;
Ok(())
msg::try_send_msg(self, token, data)
}

/// Send a message to iterator listening for message using `MsgToken`.
/// Send a message to iterator listening for message using [`MsgToken`].
///
/// This is deprecated, use [`msg::send_msg`] instead.
#[deprecated(note = "use a10::msg::send_msg instead")]
pub const fn send_msg<'a>(&'a self, token: MsgToken, data: u32) -> SendMsg<'a> {
SendMsg::new(self, token, data)
msg::send_msg(self, token, data)
}

/// Wait for an event specified in `mask` on the file descriptor `fd`.
Expand Down
57 changes: 38 additions & 19 deletions src/msg.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
//! User space messages.
//!
//! To setup a [`MsgListener`] use [`msg_listener`]. It returns the listener as
//! well as a [`MsgToken`], which can be used in
//! [`SubmissionQueue::try_send_msg`] and [`SubmissionQueue::send_msg`] to send
//! a message to the created `MsgListener`.
//! well as a [`MsgToken`], which can be used in [`try_send_msg`] and
//! [`send_msg`] to send a message to the created `MsgListener`.
use std::future::Future;
use std::io;
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::task::{self, Poll};

use crate::{OpIndex, SubmissionQueue};

/// Token used to the messages.
///
/// See [`SubmissionQueue::msg_listener`].
/// See [`msg_listener`].
#[derive(Copy, Clone, Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct MsgToken(pub(crate) OpIndex);

/// Setup a listener for user space messages.
///
/// The returned [`MsgListener`] will return all messages send using
/// [`SubmissionQueue::try_send_msg`] and [`SubmissionQueue::send_msg`] using
/// the returned `MsgToken`.
/// [`try_send_msg`] and [`send_msg`] using the returned `MsgToken`.
///
/// # Notes
///
Expand All @@ -48,7 +47,7 @@ pub fn msg_listener(sq: SubmissionQueue) -> io::Result<(MsgListener, MsgToken)>
Ok((MsgListener { sq, op_index }, MsgToken(op_index)))
}

/// [`AsyncIterator`] behind [`SubmissionQueue::msg_listener`].
/// [`AsyncIterator`] behind [`msg_listener`].
///
/// [`AsyncIterator`]: std::async_iter::AsyncIterator
#[derive(Debug)]
Expand Down Expand Up @@ -86,28 +85,48 @@ impl std::async_iter::AsyncIterator for MsgListener {
}
}

/// [`Future`] behind [`SubmissionQueue::send_msg`].
/// Try to send a message to iterator listening for message using [`MsgToken`].
///
/// This will use the io_uring submission queue to share `data` with the
/// receiving end. This means that it will wake up the thread if it's
/// currently [polling].
///
/// This will fail if the submission queue is currently full. See [`send_msg`]
/// for a version that tries again when the submission queue is full.
///
/// See [`msg_listener`] for examples.
///
/// [polling]: crate::Ring::poll
#[allow(clippy::module_name_repetitions)]
pub fn try_send_msg(sq: &SubmissionQueue, token: MsgToken, data: u32) -> io::Result<()> {
sq.add_no_result(|submission| unsafe {
submission.msg(sq.shared.ring_fd.as_raw_fd(), (token.0).0 as u64, data, 0);
submission.no_completion_event();
})?;
Ok(())
}

/// Send a message to iterator listening for message using [`MsgToken`].
#[allow(clippy::module_name_repetitions)]
pub const fn send_msg<'sq>(sq: &'sq SubmissionQueue, token: MsgToken, data: u32) -> SendMsg<'sq> {
SendMsg { sq, token, data }
}

/// [`Future`] behind [`send_msg`].
#[derive(Debug)]
#[must_use = "`Future`s do nothing unless polled"]
#[allow(clippy::module_name_repetitions)]
pub struct SendMsg<'a> {
sq: &'a SubmissionQueue,
pub struct SendMsg<'sq> {
sq: &'sq SubmissionQueue,
token: MsgToken,
data: u32,
}

impl<'a> SendMsg<'a> {
/// Create a new `SendMsg`.
pub(crate) const fn new(sq: &'a SubmissionQueue, token: MsgToken, data: u32) -> SendMsg {
SendMsg { sq, token, data }
}
}

impl<'a> Future for SendMsg<'a> {
impl<'sq> Future for SendMsg<'sq> {
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.sq.try_send_msg(self.token, self.data) {
match try_send_msg(self.sq, self.token, self.data) {
Ok(()) => Poll::Ready(Ok(())),
Err(_) => {
self.sq.wait_for_submission(ctx.waker().clone());
Expand Down
8 changes: 5 additions & 3 deletions tests/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::{Duration, Instant};
use a10::cancel::Cancel;
use a10::fs::OpenOptions;
use a10::io::ReadBufPool;
use a10::msg::{msg_listener, MsgListener, MsgToken, SendMsg};
use a10::msg::{msg_listener, send_msg, try_send_msg, MsgListener, MsgToken, SendMsg};
use a10::poll::{multishot_poll, oneshot_poll, MultishotPoll, OneshotPoll};
use a10::{mem, process, AsyncFd, Config, Ring, SubmissionQueue};

Expand Down Expand Up @@ -284,8 +284,10 @@ fn message_sending() {
start_mulitshot_op(msg_listener.as_mut());

// Send some messages.
sq.try_send_msg(msg_token, DATA1).unwrap();
waker.block_on(pin!(sq.send_msg(msg_token, DATA2))).unwrap();
try_send_msg(&sq, msg_token, DATA1).unwrap();
waker
.block_on(pin!(send_msg(&sq, msg_token, DATA2)))
.unwrap();

assert_eq!(waker.block_on(next(msg_listener.as_mut())), Some(DATA1));
assert_eq!(waker.block_on(next(msg_listener.as_mut())), Some(DATA2));
Expand Down

0 comments on commit d620de6

Please sign in to comment.