From 435396ff606e5a25be374615cc4d55380618800d Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Thu, 15 Feb 2024 03:03:15 +0330 Subject: [PATCH 1/4] implement try_new and try_with_interest for AsyncFd --- tokio/src/io/async_fd.rs | 110 ++++++++++++++++++++++++++++++++++--- tokio/src/io/mod.rs | 2 +- tokio/tests/io_async_fd.rs | 30 ++++++++++ tokio/tests/io_panic.rs | 45 +++++++++++++++ 4 files changed, 179 insertions(+), 8 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index aaf17584198..3c3da842707 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -3,6 +3,8 @@ use crate::runtime::io::{ReadyEvent, Registration}; use crate::runtime::scheduler; use mio::unix::SourceFd; +use std::error::Error; +use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; use std::{task::Context, task::Poll}; @@ -249,15 +251,69 @@ impl AsyncFd { handle: scheduler::Handle, interest: Interest, ) -> io::Result { - let fd = inner.as_raw_fd(); + Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into) + } - let registration = - Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle)?; + /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object + /// implementing [`AsRawFd`]. The backing file descriptor is cached at the + /// time of creation. + /// + /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more + /// control, use [`AsyncFd::with_interest`]. + /// + /// This method must be called in the context of a tokio runtime. + /// + /// In the case of failure, it returns [`AsyncFdError`] that contains the original object + /// passed to this function. + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt` + /// feature flag is not enabled. + #[inline] + #[track_caller] + pub fn try_new(inner: T) -> Result> + where + T: AsRawFd, + { + Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE) + } - Ok(AsyncFd { - registration, - inner: Some(inner), - }) + /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object + /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing + /// file descriptor is cached at the time of creation. + /// + /// In the case of failure, it returns [`AsyncFdError`] that contains the original object + /// passed to this function. + /// + /// # Panics + /// + /// This function panics if there is no current reactor set, or if the `rt` + /// feature flag is not enabled. + #[inline] + #[track_caller] + pub fn try_with_interest(inner: T, interest: Interest) -> Result> + where + T: AsRawFd, + { + Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest) + } + + #[track_caller] + pub(crate) fn try_new_with_handle_and_interest( + inner: T, + handle: scheduler::Handle, + interest: Interest, + ) -> Result> { + let fd = inner.as_raw_fd(); + + match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) { + Ok(registration) => Ok(AsyncFd { + registration, + inner: Some(inner), + }), + Err(cause) => Err(AsyncFdError { inner, cause }), + } } /// Returns a shared reference to the backing object of this [`AsyncFd`]. @@ -1257,3 +1313,43 @@ impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard< /// [`try_io`]: method@AsyncFdReadyGuard::try_io #[derive(Debug)] pub struct TryIoError(()); + +/// Error returned by [`try_new`] or [`try_with_interest`]. +/// +/// [`try_new`]: AsyncFd::try_new +/// [`try_with_interest`]: AsyncFd::try_with_interest +pub struct AsyncFdError { + inner: T, + cause: io::Error, +} + +impl AsyncFdError { + /// Returns the original object passed to [`try_new`] or [`try_with_interest`] + /// alongside the error that caused these functions to fail. + /// + /// [`try_new`]: AsyncFd::try_new + /// [`try_with_interest`]: AsyncFd::try_with_interest + pub fn into_parts(self) -> (T, io::Error) { + (self.inner, self.cause) + } +} + +impl fmt::Display for AsyncFdError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.cause, f) + } +} + +impl fmt::Debug for AsyncFdError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&self.cause, f) + } +} + +impl Error for AsyncFdError {} + +impl From> for io::Error { + fn from(value: AsyncFdError) -> Self { + value.cause + } +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 5e903c04842..b8e49145baa 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -245,7 +245,7 @@ cfg_net_unix! { pub mod unix { //! Asynchronous IO structures specific to Unix-like operating systems. - pub use super::async_fd::{AsyncFd, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError}; + pub use super::async_fd::{AsyncFd, AsyncFdError, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError}; } } diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index 1fb203a6524..6f8a10aefbc 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -18,6 +18,7 @@ use nix::unistd::{close, read, write}; use futures::poll; use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard}; +use tokio::io::Interest; use tokio_test::{assert_err, assert_pending}; struct TestWaker { @@ -834,3 +835,32 @@ async fn await_error_readiness_invalid_address() { let guard = fd.ready(Interest::ERROR).await.unwrap(); assert_eq!(guard.ready(), Ready::ERROR); } + +#[derive(Debug, PartialEq, Eq)] +struct InvalidSource; + +impl AsRawFd for InvalidSource { + fn as_raw_fd(&self) -> RawFd { + -1 + } +} + +#[tokio::test] +async fn try_new() { + let original = Arc::new(InvalidSource); + + let error = AsyncFd::try_new(original.clone()).unwrap_err(); + let (returned, _cause) = error.into_parts(); + + assert!(Arc::ptr_eq(&original, &returned)); +} + +#[tokio::test] +async fn try_with_interest() { + let original = Arc::new(InvalidSource); + + let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err(); + let (returned, _cause) = error.into_parts(); + + assert!(Arc::ptr_eq(&original, &returned)); +} diff --git a/tokio/tests/io_panic.rs b/tokio/tests/io_panic.rs index b2cbad2751d..9e4cda21f3b 100644 --- a/tokio/tests/io_panic.rs +++ b/tokio/tests/io_panic.rs @@ -175,3 +175,48 @@ fn async_fd_with_interest_panic_caller() -> Result<(), Box> { Ok(()) } + +#[test] +#[cfg(unix)] +fn async_fd_try_new_panic_caller() -> Result<(), Box> { + use tokio::io::unix::AsyncFd; + use tokio::runtime::Builder; + + let panic_location_file = test_panic(|| { + // Runtime without `enable_io` so it has no IO driver set. + let rt = Builder::new_current_thread().build().unwrap(); + rt.block_on(async { + let fd = unix::MockFd; + + let _ = AsyncFd::try_new(fd); + }); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} + +#[test] +#[cfg(unix)] +fn async_fd_try_with_interest_panic_caller() -> Result<(), Box> { + use tokio::io::unix::AsyncFd; + use tokio::io::Interest; + use tokio::runtime::Builder; + + let panic_location_file = test_panic(|| { + // Runtime without `enable_io` so it has no IO driver set. + let rt = Builder::new_current_thread().build().unwrap(); + rt.block_on(async { + let fd = unix::MockFd; + + let _ = AsyncFd::try_with_interest(fd, Interest::READABLE); + }); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} From 5e90ce39d018f243cae04d9d151e6c773568fa7c Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Sun, 10 Mar 2024 00:38:11 +0330 Subject: [PATCH 2/4] implement source and rename --- tokio/src/io/async_fd.rs | 26 +++++++++++++++----------- tokio/src/io/mod.rs | 2 +- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 3c3da842707..dad6607541d 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -272,7 +272,7 @@ impl AsyncFd { /// feature flag is not enabled. #[inline] #[track_caller] - pub fn try_new(inner: T) -> Result> + pub fn try_new(inner: T) -> Result> where T: AsRawFd, { @@ -292,7 +292,7 @@ impl AsyncFd { /// feature flag is not enabled. #[inline] #[track_caller] - pub fn try_with_interest(inner: T, interest: Interest) -> Result> + pub fn try_with_interest(inner: T, interest: Interest) -> Result> where T: AsRawFd, { @@ -304,7 +304,7 @@ impl AsyncFd { inner: T, handle: scheduler::Handle, interest: Interest, - ) -> Result> { + ) -> Result> { let fd = inner.as_raw_fd(); match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) { @@ -312,7 +312,7 @@ impl AsyncFd { registration, inner: Some(inner), }), - Err(cause) => Err(AsyncFdError { inner, cause }), + Err(cause) => Err(AsyncFdTryNewError { inner, cause }), } } @@ -1318,12 +1318,12 @@ pub struct TryIoError(()); /// /// [`try_new`]: AsyncFd::try_new /// [`try_with_interest`]: AsyncFd::try_with_interest -pub struct AsyncFdError { +pub struct AsyncFdTryNewError { inner: T, cause: io::Error, } -impl AsyncFdError { +impl AsyncFdTryNewError { /// Returns the original object passed to [`try_new`] or [`try_with_interest`] /// alongside the error that caused these functions to fail. /// @@ -1334,22 +1334,26 @@ impl AsyncFdError { } } -impl fmt::Display for AsyncFdError { +impl fmt::Display for AsyncFdTryNewError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.cause, f) } } -impl fmt::Debug for AsyncFdError { +impl fmt::Debug for AsyncFdTryNewError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.cause, f) } } -impl Error for AsyncFdError {} +impl Error for AsyncFdTryNewError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.cause) + } +} -impl From> for io::Error { - fn from(value: AsyncFdError) -> Self { +impl From> for io::Error { + fn from(value: AsyncFdTryNewError) -> Self { value.cause } } diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index b8e49145baa..7dab413ceb6 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -245,7 +245,7 @@ cfg_net_unix! { pub mod unix { //! Asynchronous IO structures specific to Unix-like operating systems. - pub use super::async_fd::{AsyncFd, AsyncFdError, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError}; + pub use super::async_fd::{AsyncFd, AsyncFdTryNewError, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError}; } } From c35168155a4f0b696e108bd2fbc32c2db66cea8b Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Sun, 10 Mar 2024 00:40:31 +0330 Subject: [PATCH 3/4] fix docs --- tokio/src/io/async_fd.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index dad6607541d..6171f2d651c 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -263,7 +263,7 @@ impl AsyncFd { /// /// This method must be called in the context of a tokio runtime. /// - /// In the case of failure, it returns [`AsyncFdError`] that contains the original object + /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object /// passed to this function. /// /// # Panics @@ -283,7 +283,7 @@ impl AsyncFd { /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing /// file descriptor is cached at the time of creation. /// - /// In the case of failure, it returns [`AsyncFdError`] that contains the original object + /// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object /// passed to this function. /// /// # Panics From 32aaa1170bec69cbbc77dbd026410222ec9cf274 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Wed, 13 Mar 2024 19:50:34 +0330 Subject: [PATCH 4/4] Update tokio/src/io/async_fd.rs Co-authored-by: Folkert de Vries --- tokio/src/io/async_fd.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 6171f2d651c..96d0518a6e5 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -259,7 +259,7 @@ impl AsyncFd { /// time of creation. /// /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more - /// control, use [`AsyncFd::with_interest`]. + /// control, use [`AsyncFd::try_with_interest`]. /// /// This method must be called in the context of a tokio runtime. ///