diff --git a/src/io_uring/cq.rs b/src/io_uring/cq.rs index 895015f..a4faf12 100644 --- a/src/io_uring/cq.rs +++ b/src/io_uring/cq.rs @@ -6,7 +6,7 @@ use std::{fmt, io, ptr}; use crate::msg::MsgData; use crate::op::OpResult; -use crate::sys::{self, libc, mmap, munmap, Shared}; +use crate::sys::{self, libc, load_atomic_u32, mmap, munmap, Shared}; use crate::{syscall, OperationId}; #[derive(Debug)] @@ -24,6 +24,10 @@ pub(crate) struct Completions { /// Array of `len` completion entries shared with the kernel. The kernel /// modifies this array, we're only reading from it. entries: *const Completion, + /// Number of `entries`. + entries_len: u32, + /// Mask used to index into the `entries` queue. + entries_mask: u32, } impl Completions { @@ -41,6 +45,14 @@ impl Completions { libc::off_t::from(libc::IORING_OFF_CQ_RING), )?; + let entries_len = unsafe { + load_atomic_u32(completion_queue.add(parameters.cq_off.ring_entries as usize)) + }; + debug_assert!(entries_len == parameters.cq_entries); + let entries_mask = + unsafe { load_atomic_u32(completion_queue.add(parameters.cq_off.ring_mask as usize)) }; + debug_assert!(entries_mask == parameters.cq_entries - 1); + unsafe { Ok(Completions { ptr: completion_queue, @@ -49,6 +61,8 @@ impl Completions { head: completion_queue.add(parameters.cq_off.head as usize).cast(), tail: completion_queue.add(parameters.cq_off.tail as usize).cast(), entries: completion_queue.add(parameters.cq_off.cqes as usize).cast(), + entries_len, + entries_mask, }) } } @@ -138,7 +152,7 @@ impl crate::cq::Completions for Completions { local_head: head, head: self.head, tail, - ring_mask: shared.ring_mask, + mask: self.entries_mask, _lifetime: PhantomData, }) } @@ -150,7 +164,7 @@ impl crate::cq::Completions for Completions { // be outdated. let kernel_read = unsafe { (*shared.kernel_read).load(Ordering::Relaxed) }; let pending_tail = shared.pending_tail.load(Ordering::Relaxed); - (shared.len - (pending_tail - kernel_read)) as usize + (self.entries_len - (pending_tail - kernel_read)) as usize } } @@ -178,8 +192,8 @@ struct CompletionsIter<'a> { head: *mut AtomicU32, /// Tail of `entries`, i.e. number of completions the kernel wrote. tail: u32, - /// Same as [`Completions.ring_mask`]. - ring_mask: u32, + /// Same as [`Completions.entries_mask`]. + mask: u32, /// We're depend on the lifetime of [`sys::Shared`]. _lifetime: PhantomData<&'a sys::Shared>, } @@ -188,14 +202,12 @@ impl<'a> Iterator for CompletionsIter<'a> { type Item = &'a Completion; fn next(&mut self) -> Option { - let head = self.local_head; - let tail = self.tail; - if head < tail { - // SAFETY: the `ring_mask` ensures we can never get an `idx` larger - // then the size of the queue. We checked above that the kernel has + if self.local_head < self.tail { + // SAFETY: the `mask` ensures we can never get an `idx` larger then + // the size of the queue. We checked above that the kernel has // written the struct (and isn't writing to now) os we can safely // read from it. - let idx = (head & self.ring_mask) as usize; + let idx = (self.local_head & self.mask) as usize; let completion = unsafe { &*self.entries.add(idx) }; self.local_head += 1; Some(completion) @@ -270,7 +282,7 @@ impl crate::cq::Event for Completion { // Zero copy completed, we can now mark ourselves as done. OperationState::Single { .. } if self.is_notification() => true, OperationState::Single { result } => { - debug_assert!(result.result == -1); + debug_assert!(result.result == i32::MIN); debug_assert!(result.flags == u16::MAX); *result = completion; // For zero copy this may be false, in which case we get a @@ -319,7 +331,7 @@ impl crate::cq::OperationState for OperationState { OperationState::Single { result: CompletionResult { flags: u16::MAX, - result: -1, + result: i32::MIN, }, } } diff --git a/src/io_uring/mod.rs b/src/io_uring/mod.rs index 08a5a21..a8ad0b3 100644 --- a/src/io_uring/mod.rs +++ b/src/io_uring/mod.rs @@ -69,16 +69,9 @@ pub(crate) struct Shared { /// Used by [`Completions`] to determine the number of submissions to /// submit. pending_tail: AtomicU32, - - // NOTE: the following two fields are constant. - /// Number of entries in the queue. - len: u32, - /// Mask used to index into the `sqes` queue. - ring_mask: u32, /// True if we're using a kernel thread to do submission polling, i.e. if /// `IORING_SETUP_SQPOLL` is enabled. kernel_thread: bool, - // NOTE: the following fields reference mmaped pages shared with the kernel, // thus all need atomic/synchronised access. /// Flags set by the kernel to communicate state information. @@ -91,6 +84,10 @@ pub(crate) struct Shared { /// /// This pointer is also used in the `unmmap` call. entries: *mut sq::Submission, + /// Number of `entries`. + entries_len: u32, + /// Mask used to index into the `entries` queue. + entries_mask: u32, /// Variable used to get an index into `array`. The lock must be held while /// writing into `array` to prevent race conditions with other threads. array_index: Mutex, @@ -105,11 +102,6 @@ pub(crate) struct Shared { impl Shared { pub(crate) fn new(rfd: OwnedFd, parameters: &libc::io_uring_params) -> io::Result { - /// Load a `u32` using relaxed ordering from `ptr`. - unsafe fn load_atomic_u32(ptr: *mut libc::c_void) -> u32 { - (*ptr.cast::()).load(Ordering::Relaxed) - } - let submission_queue_size = parameters.sq_off.array + parameters.sq_entries * (size_of::() as u32); let submission_queue = mmap( @@ -131,6 +123,14 @@ impl Shared { _ = munmap(submission_queue, submission_queue_size as usize); // Can't handle two errors. })?; + let entries_len = unsafe { + load_atomic_u32(submission_queue.add(parameters.sq_off.ring_entries as usize)) + }; + debug_assert!(entries_len == parameters.sq_entries); + let entries_mask = + unsafe { load_atomic_u32(submission_queue.add(parameters.sq_off.ring_mask as usize)) }; + debug_assert!(entries_mask == parameters.sq_entries - 1); + // SAFETY: we do a whole bunch of pointer manipulations, the kernel // ensures all of this stuff is set up for us with the mmap calls above. #[allow(clippy::mutex_integer)] // For `array_index`, need to the lock for more. @@ -139,21 +139,16 @@ impl Shared { rfd, ptr: submission_queue, size: submission_queue_size, - pending_tail: AtomicU32::new(0), - // Fields are constant, so we load them once. - len: load_atomic_u32(submission_queue.add(parameters.sq_off.ring_entries as usize)), - ring_mask: load_atomic_u32( - submission_queue.add(parameters.sq_off.ring_mask as usize), - ), kernel_thread: (parameters.flags & libc::IORING_SETUP_SQPOLL) != 0, // Fields are shared with the kernel. kernel_read: submission_queue.add(parameters.sq_off.head as usize).cast(), flags: submission_queue .add(parameters.sq_off.flags as usize) .cast(), - entries: submission_queue_entries.cast(), + entries_len, + entries_mask, array_index: Mutex::new(0), array: submission_queue .add(parameters.sq_off.array as usize) @@ -241,7 +236,7 @@ unsafe impl Sync for Shared {} impl Drop for Shared { fn drop(&mut self) { let ptr = self.entries.cast(); - let size = self.len as usize * size_of::(); + let size = self.entries_len as usize * size_of::(); if let Err(err) = munmap(ptr, size) { log::warn!(ptr:? = ptr, size = size; "error unmapping io_uring entries: {err}"); } @@ -401,3 +396,8 @@ pub(crate) fn munmap(addr: *mut libc::c_void, len: libc::size_t) -> io::Result<( _ => Err(io::Error::last_os_error()), } } + +/// Load a `u32` using relaxed ordering from `ptr`. +unsafe fn load_atomic_u32(ptr: *mut libc::c_void) -> u32 { + (*ptr.cast::()).load(Ordering::Relaxed) +} diff --git a/src/io_uring/sq.rs b/src/io_uring/sq.rs index 1904e3c..41a4dee 100644 --- a/src/io_uring/sq.rs +++ b/src/io_uring/sq.rs @@ -38,7 +38,7 @@ impl crate::sq::Submissions for Submissions { let tail = shared .pending_tail .fetch_update(Ordering::AcqRel, Ordering::Acquire, |tail| { - if tail - kernel_read < shared.len { + if tail - kernel_read < shared.entries_len { // Still an entry available. Some(tail.wrapping_add(1)) } else { @@ -55,7 +55,7 @@ impl crate::sq::Submissions for Submissions { // SAFETY: the `ring_mask` ensures we can never get an index larger // then the size of the queue. Above we've already ensured that // we're the only thread with mutable access to the entry. - let submission_index = tail & shared.ring_mask; + let submission_index = tail & shared.entries_mask; let submission = unsafe { &mut *shared.entries.add(submission_index as usize) }; // Let the caller fill the `submission`. @@ -91,7 +91,7 @@ impl crate::sq::Submissions for Submissions { // | `shared.tail.fetch_add` to 2. let mut array_index = shared.array_index.lock().unwrap(); - let idx = (*array_index & shared.ring_mask) as usize; + let idx = (*array_index & shared.entries_mask) as usize; // SAFETY: `idx` is masked above to be within the correct bounds. unsafe { (*shared.array.add(idx)).store(submission_index, Ordering::Release) }; // SAFETY: we filled the array above.