diff --git a/src/fs.rs b/src/fs.rs index 5f55e19f..2d9e7c41 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -1,4 +1,4 @@ -//! Asynchronous filesystem manipulation operations. +//! Filesystem manipulation operations. //! //! To open a file ([`AsyncFd`]) use [`open_file`] or [`OpenOptions`]. diff --git a/src/mem.rs b/src/mem.rs index 38a3dd4f..21ee5db6 100644 --- a/src/mem.rs +++ b/src/mem.rs @@ -1,4 +1,4 @@ -//! Asynchronous memory operations. +//! Memory operations. use std::future::Future; use std::io; diff --git a/src/net.rs b/src/net.rs index b8620096..5ca78bc8 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,4 +1,4 @@ -//! Asynchronous networking. +//! Networking. //! //! To create a new socket ([`AsyncFd`]) use the [`socket`] function, which //! issues a non-blocking `socket(2)` call. @@ -1027,6 +1027,7 @@ op_future! { let (ptr, len) = SocketAddress::as_mut_ptr(&mut address.0); address.1 = len; submission.accept(fd.fd(), ptr, &mut address.1, flags); + submission.set_async(); D::create_flags(submission); }, map_result: |this, (address,), fd| { @@ -1049,6 +1050,7 @@ op_async_iter! { setup_state: flags: libc::c_int, setup: |submission, this, flags| unsafe { submission.multishot_accept(this.fd.fd(), flags); + submission.set_async(); D::create_flags(submission); }, map_result: |this, _flags, fd| { diff --git a/src/op.rs b/src/op.rs index 88a8a507..fddf1025 100644 --- a/src/op.rs +++ b/src/op.rs @@ -257,6 +257,12 @@ impl Submission { self.inner.flags |= libc::IOSQE_CQE_SKIP_SUCCESS; } + /// Don't attempt to do the operation non-blocking first, always execute it + /// in an async manner. + pub(crate) fn set_async(&mut self) { + self.inner.flags |= libc::IOSQE_ASYNC; + } + /// Set the flag to use direct descriptors. pub(crate) fn use_direct_fd(&mut self) { self.inner.flags |= libc::IOSQE_FIXED_FILE; diff --git a/src/poll.rs b/src/poll.rs index a4aecf92..9b9cbeea 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -60,6 +60,7 @@ impl<'sq> Future for OneshotPoll<'sq> { ctx, |submission, (fd, mask)| unsafe { submission.poll(fd, mask as u32); + submission.set_async(); } ); @@ -91,6 +92,7 @@ impl<'sq> Drop for OneshotPoll<'sq> { if let OpState::Running(op_index) = self.state { let result = self.sq.cancel_op(op_index, (), |submission| unsafe { submission.remove_poll(op_index); + submission.set_async(); // We'll get a canceled completion event if we succeeded, which // is sufficient to cleanup the operation. submission.no_completion_event(); diff --git a/src/process.rs b/src/process.rs index ab689fe8..aa10ed4e 100644 --- a/src/process.rs +++ b/src/process.rs @@ -481,6 +481,8 @@ op_future! { setup: |submission, fd, (info,), _unused| unsafe { let ptr = (**info).as_mut_ptr().cast(); submission.read_at(fd.fd(), ptr, size_of::() as u32, NO_OFFSET); + submission.set_async(); + D::use_flags(submission); }, map_result: |this, (info,), n| { #[allow(clippy::cast_sign_loss)] // Negative values are mapped to errors. @@ -524,6 +526,7 @@ impl ReceiveSignals { size_of::() as u32, NO_OFFSET, ); + submission.set_async(); D::use_flags(submission); }); match result { diff --git a/tests/async_fd/io.rs b/tests/async_fd/io.rs index 6ea9fb4b..be14dca6 100644 --- a/tests/async_fd/io.rs +++ b/tests/async_fd/io.rs @@ -763,11 +763,9 @@ fn cancel_all_twice_accept() { let n = waker .block_on(listener.cancel_all()) .expect("failed to cancel all calls"); - assert_eq!(n, 1); - let n = waker - .block_on(listener.cancel_all()) - .expect("failed to cancel all calls"); - assert_eq!(n, 0); + // Because the the accept call is asynchronous we can cancel up to one + // operations. + assert!(n <= 1); expect_io_errno(waker.block_on(accept), libc::ECANCELED); }