From 81a70475c116050ae8a3c1601d41e51bdf85a5e2 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Wed, 1 Jan 2025 02:09:27 +0100 Subject: [PATCH] Use correct submission and completion sizes As they may differ. Also renames the fields to make it clear they are related to entries. Shared especially has so many fields it's difficult to determine what fields relate to what other fields, this improves it somewhat. --- src/io_uring/cq.rs | 38 +++++++++++++++++++++++++------------- src/io_uring/mod.rs | 40 ++++++++++++++++++++-------------------- src/io_uring/sq.rs | 6 +++--- 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/src/io_uring/cq.rs b/src/io_uring/cq.rs index 895015f..a4faf12 100644 --- a/src/io_uring/cq.rs +++ b/src/io_uring/cq.rs @@ -6,7 +6,7 @@ use std::{fmt, io, ptr}; use crate::msg::MsgData; use crate::op::OpResult; -use crate::sys::{self, libc, mmap, munmap, Shared}; +use crate::sys::{self, libc, load_atomic_u32, mmap, munmap, Shared}; use crate::{syscall, OperationId}; #[derive(Debug)] @@ -24,6 +24,10 @@ pub(crate) struct Completions { /// Array of `len` completion entries shared with the kernel. The kernel /// modifies this array, we're only reading from it. entries: *const Completion, + /// Number of `entries`. + entries_len: u32, + /// Mask used to index into the `entries` queue. + entries_mask: u32, } impl Completions { @@ -41,6 +45,14 @@ impl Completions { libc::off_t::from(libc::IORING_OFF_CQ_RING), )?; + let entries_len = unsafe { + load_atomic_u32(completion_queue.add(parameters.cq_off.ring_entries as usize)) + }; + debug_assert!(entries_len == parameters.cq_entries); + let entries_mask = + unsafe { load_atomic_u32(completion_queue.add(parameters.cq_off.ring_mask as usize)) }; + debug_assert!(entries_mask == parameters.cq_entries - 1); + unsafe { Ok(Completions { ptr: completion_queue, @@ -49,6 +61,8 @@ impl Completions { head: completion_queue.add(parameters.cq_off.head as usize).cast(), tail: completion_queue.add(parameters.cq_off.tail as usize).cast(), entries: completion_queue.add(parameters.cq_off.cqes as usize).cast(), + entries_len, + entries_mask, }) } } @@ -138,7 +152,7 @@ impl crate::cq::Completions for Completions { local_head: head, head: self.head, tail, - ring_mask: shared.ring_mask, + mask: self.entries_mask, _lifetime: PhantomData, }) } @@ -150,7 +164,7 @@ impl crate::cq::Completions for Completions { // be outdated. let kernel_read = unsafe { (*shared.kernel_read).load(Ordering::Relaxed) }; let pending_tail = shared.pending_tail.load(Ordering::Relaxed); - (shared.len - (pending_tail - kernel_read)) as usize + (self.entries_len - (pending_tail - kernel_read)) as usize } } @@ -178,8 +192,8 @@ struct CompletionsIter<'a> { head: *mut AtomicU32, /// Tail of `entries`, i.e. number of completions the kernel wrote. tail: u32, - /// Same as [`Completions.ring_mask`]. - ring_mask: u32, + /// Same as [`Completions.entries_mask`]. + mask: u32, /// We're depend on the lifetime of [`sys::Shared`]. _lifetime: PhantomData<&'a sys::Shared>, } @@ -188,14 +202,12 @@ impl<'a> Iterator for CompletionsIter<'a> { type Item = &'a Completion; fn next(&mut self) -> Option { - let head = self.local_head; - let tail = self.tail; - if head < tail { - // SAFETY: the `ring_mask` ensures we can never get an `idx` larger - // then the size of the queue. We checked above that the kernel has + if self.local_head < self.tail { + // SAFETY: the `mask` ensures we can never get an `idx` larger then + // the size of the queue. We checked above that the kernel has // written the struct (and isn't writing to now) os we can safely // read from it. - let idx = (head & self.ring_mask) as usize; + let idx = (self.local_head & self.mask) as usize; let completion = unsafe { &*self.entries.add(idx) }; self.local_head += 1; Some(completion) @@ -270,7 +282,7 @@ impl crate::cq::Event for Completion { // Zero copy completed, we can now mark ourselves as done. OperationState::Single { .. } if self.is_notification() => true, OperationState::Single { result } => { - debug_assert!(result.result == -1); + debug_assert!(result.result == i32::MIN); debug_assert!(result.flags == u16::MAX); *result = completion; // For zero copy this may be false, in which case we get a @@ -319,7 +331,7 @@ impl crate::cq::OperationState for OperationState { OperationState::Single { result: CompletionResult { flags: u16::MAX, - result: -1, + result: i32::MIN, }, } } diff --git a/src/io_uring/mod.rs b/src/io_uring/mod.rs index 08a5a21..a8ad0b3 100644 --- a/src/io_uring/mod.rs +++ b/src/io_uring/mod.rs @@ -69,16 +69,9 @@ pub(crate) struct Shared { /// Used by [`Completions`] to determine the number of submissions to /// submit. pending_tail: AtomicU32, - - // NOTE: the following two fields are constant. - /// Number of entries in the queue. - len: u32, - /// Mask used to index into the `sqes` queue. - ring_mask: u32, /// True if we're using a kernel thread to do submission polling, i.e. if /// `IORING_SETUP_SQPOLL` is enabled. kernel_thread: bool, - // NOTE: the following fields reference mmaped pages shared with the kernel, // thus all need atomic/synchronised access. /// Flags set by the kernel to communicate state information. @@ -91,6 +84,10 @@ pub(crate) struct Shared { /// /// This pointer is also used in the `unmmap` call. entries: *mut sq::Submission, + /// Number of `entries`. + entries_len: u32, + /// Mask used to index into the `entries` queue. + entries_mask: u32, /// Variable used to get an index into `array`. The lock must be held while /// writing into `array` to prevent race conditions with other threads. array_index: Mutex, @@ -105,11 +102,6 @@ pub(crate) struct Shared { impl Shared { pub(crate) fn new(rfd: OwnedFd, parameters: &libc::io_uring_params) -> io::Result { - /// Load a `u32` using relaxed ordering from `ptr`. - unsafe fn load_atomic_u32(ptr: *mut libc::c_void) -> u32 { - (*ptr.cast::()).load(Ordering::Relaxed) - } - let submission_queue_size = parameters.sq_off.array + parameters.sq_entries * (size_of::() as u32); let submission_queue = mmap( @@ -131,6 +123,14 @@ impl Shared { _ = munmap(submission_queue, submission_queue_size as usize); // Can't handle two errors. })?; + let entries_len = unsafe { + load_atomic_u32(submission_queue.add(parameters.sq_off.ring_entries as usize)) + }; + debug_assert!(entries_len == parameters.sq_entries); + let entries_mask = + unsafe { load_atomic_u32(submission_queue.add(parameters.sq_off.ring_mask as usize)) }; + debug_assert!(entries_mask == parameters.sq_entries - 1); + // SAFETY: we do a whole bunch of pointer manipulations, the kernel // ensures all of this stuff is set up for us with the mmap calls above. #[allow(clippy::mutex_integer)] // For `array_index`, need to the lock for more. @@ -139,21 +139,16 @@ impl Shared { rfd, ptr: submission_queue, size: submission_queue_size, - pending_tail: AtomicU32::new(0), - // Fields are constant, so we load them once. - len: load_atomic_u32(submission_queue.add(parameters.sq_off.ring_entries as usize)), - ring_mask: load_atomic_u32( - submission_queue.add(parameters.sq_off.ring_mask as usize), - ), kernel_thread: (parameters.flags & libc::IORING_SETUP_SQPOLL) != 0, // Fields are shared with the kernel. kernel_read: submission_queue.add(parameters.sq_off.head as usize).cast(), flags: submission_queue .add(parameters.sq_off.flags as usize) .cast(), - entries: submission_queue_entries.cast(), + entries_len, + entries_mask, array_index: Mutex::new(0), array: submission_queue .add(parameters.sq_off.array as usize) @@ -241,7 +236,7 @@ unsafe impl Sync for Shared {} impl Drop for Shared { fn drop(&mut self) { let ptr = self.entries.cast(); - let size = self.len as usize * size_of::(); + let size = self.entries_len as usize * size_of::(); if let Err(err) = munmap(ptr, size) { log::warn!(ptr:? = ptr, size = size; "error unmapping io_uring entries: {err}"); } @@ -401,3 +396,8 @@ pub(crate) fn munmap(addr: *mut libc::c_void, len: libc::size_t) -> io::Result<( _ => Err(io::Error::last_os_error()), } } + +/// Load a `u32` using relaxed ordering from `ptr`. +unsafe fn load_atomic_u32(ptr: *mut libc::c_void) -> u32 { + (*ptr.cast::()).load(Ordering::Relaxed) +} diff --git a/src/io_uring/sq.rs b/src/io_uring/sq.rs index 1904e3c..41a4dee 100644 --- a/src/io_uring/sq.rs +++ b/src/io_uring/sq.rs @@ -38,7 +38,7 @@ impl crate::sq::Submissions for Submissions { let tail = shared .pending_tail .fetch_update(Ordering::AcqRel, Ordering::Acquire, |tail| { - if tail - kernel_read < shared.len { + if tail - kernel_read < shared.entries_len { // Still an entry available. Some(tail.wrapping_add(1)) } else { @@ -55,7 +55,7 @@ impl crate::sq::Submissions for Submissions { // SAFETY: the `ring_mask` ensures we can never get an index larger // then the size of the queue. Above we've already ensured that // we're the only thread with mutable access to the entry. - let submission_index = tail & shared.ring_mask; + let submission_index = tail & shared.entries_mask; let submission = unsafe { &mut *shared.entries.add(submission_index as usize) }; // Let the caller fill the `submission`. @@ -91,7 +91,7 @@ impl crate::sq::Submissions for Submissions { // | `shared.tail.fetch_add` to 2. let mut array_index = shared.array_index.lock().unwrap(); - let idx = (*array_index & shared.ring_mask) as usize; + let idx = (*array_index & shared.entries_mask) as usize; // SAFETY: `idx` is masked above to be within the correct bounds. unsafe { (*shared.array.add(idx)).store(submission_index, Ordering::Release) }; // SAFETY: we filled the array above.