diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index bca79a4..46ffeba 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -1,4 +1,56 @@ -//! Tokio IO integration for hyper +//! [`tokio`] runtime components integration for [`hyper`]. +//! +//! [`hyper::rt`] exposes a set of traits to allow hyper to be agnostic to +//! its underlying asynchronous runtime. This submodule provides glue for +//! [`tokio`] users to bridge those types to [`hyper`]'s interfaces. +//! +//! # IO +//! +//! [`hyper`] abstracts over asynchronous readers and writers using [`Read`] +//! and [`Write`], while [`tokio`] abstracts over this using [`AsyncRead`] +//! and [`AsyncWrite`]. This submodule provides a collection of IO adaptors +//! to bridge these two IO ecosystems together: [`TokioIo`], +//! [`WithHyperIo`], and [`WithTokioIo`]. +//! +//! To compare and constrast these IO adaptors and to help explain which +//! is the proper choice for your needs, here is a table showing which IO +//! traits these implement, given two types `T` and `H` which implement +//! Tokio's and Hyper's corresponding IO traits: +//! +//! | | [`AsyncRead`] | [`AsyncWrite`] | [`Read`] | [`Write`] | +//! |--------------------|------------------|-------------------|--------------|--------------| +//! | `T` | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | +//! | `H` | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | +//! | [`TokioIo`] | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | +//! | [`TokioIo`] | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | +//! | [`WithHyperIo`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | +//! | [`WithHyperIo`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | +//! | [`WithTokioIo`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | +//! | [`WithTokioIo`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | +//! +//! For most situations, [`TokioIo`] is the proper choice. This should be +//! constructed, wrapping some underlying [`hyper`] or [`tokio`] IO, at the +//! call-site of a function like [`hyper::client::conn::http1::handshake`]. +//! +//! [`TokioIo`] switches across these ecosystems, but notably does not +//! preserve the existing IO trait implementations of its underlying IO. If +//! one wishes to _extend_ IO with additional implementations, +//! [`WithHyperIo`] and [`WithTokioIo`] are the correct choice. +//! +//! For example, a Tokio reader/writer can be wrapped in [`WithHyperIo`]. +//! That will implement _both_ sets of IO traits. Conversely, +//! [`WithTokioIo`] will implement both sets of IO traits given a +//! reader/writer that implements Hyper's [`Read`] and [`Write`]. +//! +//! See [`tokio::io`] and ["_Asynchronous IO_"][tokio-async-docs] for more +//! information. +//! +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite +//! [`Read`]: hyper::rt::Read +//! [`Write`]: hyper::rt::Write +//! [tokio-async-docs]: https://docs.rs/tokio/latest/tokio/#asynchronous-io + use std::{ future::Future, pin::Pin, @@ -12,6 +64,11 @@ use pin_project_lite::pin_project; #[cfg(feature = "tracing")] use tracing::instrument::Instrument; +pub use self::{with_hyper_io::WithHyperIo, with_tokio_io::WithTokioIo}; + +mod with_hyper_io; +mod with_tokio_io; + /// Future executor that utilises `tokio` threads. #[non_exhaustive] #[derive(Default, Debug, Clone)] diff --git a/src/rt/tokio/with_hyper_io.rs b/src/rt/tokio/with_hyper_io.rs new file mode 100644 index 0000000..9c5072d --- /dev/null +++ b/src/rt/tokio/with_hyper_io.rs @@ -0,0 +1,170 @@ +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Extends an underlying [`tokio`] I/O with [`hyper`] I/O implementations. + /// + /// This implements [`Read`] and [`Write`] given an inner type that implements [`AsyncRead`] + /// and [`AsyncWrite`], respectively. + #[derive(Debug)] + pub struct WithHyperIo { + #[pin] + inner: I, + } +} + +// ==== impl WithHyperIo ===== + +impl WithHyperIo { + /// Wraps the inner I/O in an [`WithHyperIo`] + pub fn new(inner: I) -> Self { + Self { inner } + } + + /// Returns a reference to the inner type. + pub fn inner(&self) -> &I { + &self.inner + } + + /// Returns a mutable reference to the inner type. + pub fn inner_mut(&mut self) -> &mut I { + &mut self.inner + } + + /// Consumes this wrapper and returns the inner type. + pub fn into_inner(self) -> I { + self.inner + } +} + +/// [`WithHyperIo`] is [`Read`] if `I` is [`AsyncRead`]. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +/// [`Read`]: hyper::rt::Read +impl hyper::rt::Read for WithHyperIo +where + I: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +/// [`WithHyperIo`] is [`Write`] if `I` is [`AsyncWrite`]. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +/// [`Write`]: hyper::rt::Write +impl hyper::rt::Write for WithHyperIo +where + I: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +/// [`WithHyperIo`] exposes its inner `I`'s [`AsyncRead`] implementation. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +impl tokio::io::AsyncRead for WithHyperIo +where + I: tokio::io::AsyncRead, +{ + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +/// [`WithHyperIo`] exposes its inner `I`'s [`AsyncWrite`] implementation. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +impl tokio::io::AsyncWrite for WithHyperIo +where + I: tokio::io::AsyncWrite, +{ + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().inner.poll_shutdown(cx) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } +} diff --git a/src/rt/tokio/with_tokio_io.rs b/src/rt/tokio/with_tokio_io.rs new file mode 100644 index 0000000..223e0ed --- /dev/null +++ b/src/rt/tokio/with_tokio_io.rs @@ -0,0 +1,178 @@ +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Extends an underlying [`hyper`] I/O with [`tokio`] I/O implementations. + /// + /// This implements [`AsyncRead`] and [`AsyncWrite`] given an inner type that implements + /// [`Read`] and [`Write`], respectively. + #[derive(Debug)] + pub struct WithTokioIo { + #[pin] + inner: I, + } +} + +// ==== impl WithTokioIo ===== + +/// [`WithTokioIo`] is [`AsyncRead`] if `I` is [`Read`]. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +/// [`Read`]: hyper::rt::Read +impl tokio::io::AsyncRead for WithTokioIo +where + I: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +/// [`WithTokioIo`] is [`AsyncWrite`] if `I` is [`Write`]. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +/// [`Write`]: hyper::rt::Write +impl tokio::io::AsyncWrite for WithTokioIo +where + I: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +/// [`WithTokioIo`] exposes its inner `I`'s [`Write`] implementation. +/// +/// [`Write`]: hyper::rt::Write +impl hyper::rt::Write for WithTokioIo +where + I: hyper::rt::Write, +{ + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().inner.poll_shutdown(cx) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } +} + +impl WithTokioIo { + /// Wraps the inner I/O in an [`WithTokioIo`] + pub fn new(inner: I) -> Self { + Self { inner } + } + + /// Returns a reference to the inner type. + pub fn inner(&self) -> &I { + &self.inner + } + + /// Returns a mutable reference to the inner type. + pub fn inner_mut(&mut self) -> &mut I { + &mut self.inner + } + + /// Consumes this wrapper and returns the inner type. + pub fn into_inner(self) -> I { + self.inner + } +} + +/// [`WithTokioIo`] exposes its inner `I`'s [`Read`] implementation. +/// +/// [`Read`]: hyper::rt::Read +impl hyper::rt::Read for WithTokioIo +where + I: hyper::rt::Read, +{ + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +}