From 34f6e2bb1e2afd80689529d3eb67aff30f14acbf Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Thu, 20 Feb 2025 00:00:00 +0000 Subject: [PATCH 1/2] feat(rt/tokio): additive tokio and hyper i/o adaptors this commit introduces two new i/o adaptors, akin to `rt::tokio::TokioIo`. see this small program demonstrating the motivation for this: ```rust #![allow(unreachable_code, unused_variables, dead_code)] use { hyper::rt::{Read, Write}, hyper_util::rt::TokioIo, tokio::io::{AsyncRead, AsyncWrite}, tokio::net::TcpStream, }; fn is_tokio_io(_: &I) where I: AsyncRead + AsyncWrite, { } fn is_hyper_io(_: &I) where I: Read + Write, { } fn check_io_impls() { // `I: AsyncRead + AsyncWrite` does not implement hyper's `Read` and `Write`. let stream: TcpStream = todo!(); is_tokio_io(&stream); // is_hyper_io(&stream); does not compile // `TokioIo` does not implement `tokio::io` traits. let stream: TokioIo = TokioIo::new(stream); // is_tokio_io(&stream); does not compile is_hyper_io(&stream); // `TokioIo>` does not implement hyper's `Read` and `Write`. let stream: TokioIo> = TokioIo::new(stream); is_tokio_io(&stream); // is_hyper_io(&stream); does not compile } fn main() {} ``` `TokioIo` is not an adaptor that is additive with respect to its inner i/o transport. if it implements hyper's i/o, the wrapped type will implement tokio's _at the expense of hyper's_, and vice versa. this is often fine for simple programs, but this "switching" approach can interfere with larger applications' ability to make use of middleware that is generic across different kinds of i/o streams. this commit introduces types that allow a caller to _add_ `Read` or `Write` implementations to tokio readers and writers, or inversely, `AsyncRead` and `AsyncWrite` implementations to hyper readers and writers. Signed-off-by: katelyn martin --- src/rt/tokio.rs | 5 + src/rt/tokio/with_hyper_io.rs | 170 ++++++++++++++++++++++++++++++++ src/rt/tokio/with_tokio_io.rs | 178 ++++++++++++++++++++++++++++++++++ 3 files changed, 353 insertions(+) create mode 100644 src/rt/tokio/with_hyper_io.rs create mode 100644 src/rt/tokio/with_tokio_io.rs diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index bca79a4..e317ca5 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -12,6 +12,11 @@ use pin_project_lite::pin_project; #[cfg(feature = "tracing")] use tracing::instrument::Instrument; +pub use self::{with_hyper_io::WithHyperIo, with_tokio_io::WithTokioIo}; + +mod with_hyper_io; +mod with_tokio_io; + /// Future executor that utilises `tokio` threads. #[non_exhaustive] #[derive(Default, Debug, Clone)] diff --git a/src/rt/tokio/with_hyper_io.rs b/src/rt/tokio/with_hyper_io.rs new file mode 100644 index 0000000..9c5072d --- /dev/null +++ b/src/rt/tokio/with_hyper_io.rs @@ -0,0 +1,170 @@ +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Extends an underlying [`tokio`] I/O with [`hyper`] I/O implementations. + /// + /// This implements [`Read`] and [`Write`] given an inner type that implements [`AsyncRead`] + /// and [`AsyncWrite`], respectively. + #[derive(Debug)] + pub struct WithHyperIo { + #[pin] + inner: I, + } +} + +// ==== impl WithHyperIo ===== + +impl WithHyperIo { + /// Wraps the inner I/O in an [`WithHyperIo`] + pub fn new(inner: I) -> Self { + Self { inner } + } + + /// Returns a reference to the inner type. + pub fn inner(&self) -> &I { + &self.inner + } + + /// Returns a mutable reference to the inner type. + pub fn inner_mut(&mut self) -> &mut I { + &mut self.inner + } + + /// Consumes this wrapper and returns the inner type. + pub fn into_inner(self) -> I { + self.inner + } +} + +/// [`WithHyperIo`] is [`Read`] if `I` is [`AsyncRead`]. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +/// [`Read`]: hyper::rt::Read +impl hyper::rt::Read for WithHyperIo +where + I: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +/// [`WithHyperIo`] is [`Write`] if `I` is [`AsyncWrite`]. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +/// [`Write`]: hyper::rt::Write +impl hyper::rt::Write for WithHyperIo +where + I: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +/// [`WithHyperIo`] exposes its inner `I`'s [`AsyncRead`] implementation. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +impl tokio::io::AsyncRead for WithHyperIo +where + I: tokio::io::AsyncRead, +{ + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} + +/// [`WithHyperIo`] exposes its inner `I`'s [`AsyncWrite`] implementation. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +impl tokio::io::AsyncWrite for WithHyperIo +where + I: tokio::io::AsyncWrite, +{ + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().inner.poll_shutdown(cx) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } +} diff --git a/src/rt/tokio/with_tokio_io.rs b/src/rt/tokio/with_tokio_io.rs new file mode 100644 index 0000000..223e0ed --- /dev/null +++ b/src/rt/tokio/with_tokio_io.rs @@ -0,0 +1,178 @@ +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +pin_project! { + /// Extends an underlying [`hyper`] I/O with [`tokio`] I/O implementations. + /// + /// This implements [`AsyncRead`] and [`AsyncWrite`] given an inner type that implements + /// [`Read`] and [`Write`], respectively. + #[derive(Debug)] + pub struct WithTokioIo { + #[pin] + inner: I, + } +} + +// ==== impl WithTokioIo ===== + +/// [`WithTokioIo`] is [`AsyncRead`] if `I` is [`Read`]. +/// +/// [`AsyncRead`]: tokio::io::AsyncRead +/// [`Read`]: hyper::rt::Read +impl tokio::io::AsyncRead for WithTokioIo +where + I: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +/// [`WithTokioIo`] is [`AsyncWrite`] if `I` is [`Write`]. +/// +/// [`AsyncWrite`]: tokio::io::AsyncWrite +/// [`Write`]: hyper::rt::Write +impl tokio::io::AsyncWrite for WithTokioIo +where + I: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +/// [`WithTokioIo`] exposes its inner `I`'s [`Write`] implementation. +/// +/// [`Write`]: hyper::rt::Write +impl hyper::rt::Write for WithTokioIo +where + I: hyper::rt::Write, +{ + #[inline] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().inner.poll_shutdown(cx) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } +} + +impl WithTokioIo { + /// Wraps the inner I/O in an [`WithTokioIo`] + pub fn new(inner: I) -> Self { + Self { inner } + } + + /// Returns a reference to the inner type. + pub fn inner(&self) -> &I { + &self.inner + } + + /// Returns a mutable reference to the inner type. + pub fn inner_mut(&mut self) -> &mut I { + &mut self.inner + } + + /// Consumes this wrapper and returns the inner type. + pub fn into_inner(self) -> I { + self.inner + } +} + +/// [`WithTokioIo`] exposes its inner `I`'s [`Read`] implementation. +/// +/// [`Read`]: hyper::rt::Read +impl hyper::rt::Read for WithTokioIo +where + I: hyper::rt::Read, +{ + #[inline] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + self.project().inner.poll_read(cx, buf) + } +} From f8b1b1c6ce66773abedcb08a3b17122c3aa22a7f Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 17 Mar 2025 00:00:00 +0000 Subject: [PATCH 2/2] docs(rt/tokio): document io adaptors Signed-off-by: katelyn martin --- src/rt/tokio.rs | 54 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index e317ca5..46ffeba 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -1,4 +1,56 @@ -//! Tokio IO integration for hyper +//! [`tokio`] runtime components integration for [`hyper`]. +//! +//! [`hyper::rt`] exposes a set of traits to allow hyper to be agnostic to +//! its underlying asynchronous runtime. This submodule provides glue for +//! [`tokio`] users to bridge those types to [`hyper`]'s interfaces. +//! +//! # IO +//! +//! [`hyper`] abstracts over asynchronous readers and writers using [`Read`] +//! and [`Write`], while [`tokio`] abstracts over this using [`AsyncRead`] +//! and [`AsyncWrite`]. This submodule provides a collection of IO adaptors +//! to bridge these two IO ecosystems together: [`TokioIo`], +//! [`WithHyperIo`], and [`WithTokioIo`]. +//! +//! To compare and constrast these IO adaptors and to help explain which +//! is the proper choice for your needs, here is a table showing which IO +//! traits these implement, given two types `T` and `H` which implement +//! Tokio's and Hyper's corresponding IO traits: +//! +//! | | [`AsyncRead`] | [`AsyncWrite`] | [`Read`] | [`Write`] | +//! |--------------------|------------------|-------------------|--------------|--------------| +//! | `T` | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | +//! | `H` | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | +//! | [`TokioIo`] | ❌ **false** | ❌ **false** | ✅ **true** | ✅ **true** | +//! | [`TokioIo`] | ✅ **true** | ✅ **true** | ❌ **false** | ❌ **false** | +//! | [`WithHyperIo`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | +//! | [`WithHyperIo`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | +//! | [`WithTokioIo`] | ❌ **false** | ❌ **false** | ❌ **false** | ❌ **false** | +//! | [`WithTokioIo`] | ✅ **true** | ✅ **true** | ✅ **true** | ✅ **true** | +//! +//! For most situations, [`TokioIo`] is the proper choice. This should be +//! constructed, wrapping some underlying [`hyper`] or [`tokio`] IO, at the +//! call-site of a function like [`hyper::client::conn::http1::handshake`]. +//! +//! [`TokioIo`] switches across these ecosystems, but notably does not +//! preserve the existing IO trait implementations of its underlying IO. If +//! one wishes to _extend_ IO with additional implementations, +//! [`WithHyperIo`] and [`WithTokioIo`] are the correct choice. +//! +//! For example, a Tokio reader/writer can be wrapped in [`WithHyperIo`]. +//! That will implement _both_ sets of IO traits. Conversely, +//! [`WithTokioIo`] will implement both sets of IO traits given a +//! reader/writer that implements Hyper's [`Read`] and [`Write`]. +//! +//! See [`tokio::io`] and ["_Asynchronous IO_"][tokio-async-docs] for more +//! information. +//! +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite +//! [`Read`]: hyper::rt::Read +//! [`Write`]: hyper::rt::Write +//! [tokio-async-docs]: https://docs.rs/tokio/latest/tokio/#asynchronous-io + use std::{ future::Future, pin::Pin,