Skip to content

Commit a10904d

Browse files
committed
feat(rt/tokio): additive tokio and hyper i/o adaptors
this commit introduces two new i/o adaptors, akin to `rt::tokio::TokioIo<I>`. see this small program demonstrating the motivation for this: ```rust #![allow(unreachable_code, unused_variables, dead_code)] use { hyper::rt::{Read, Write}, hyper_util::rt::TokioIo, tokio::io::{AsyncRead, AsyncWrite}, tokio::net::TcpStream, }; fn is_tokio_io<I>(_: &I) where I: AsyncRead + AsyncWrite, { } fn is_hyper_io<I>(_: &I) where I: Read + Write, { } fn check_io_impls() { // `I: AsyncRead + AsyncWrite` does not implement hyper's `Read` and `Write`. let stream: TcpStream = todo!(); is_tokio_io(&stream); // is_hyper_io(&stream); does not compile // `TokioIo<I: AsyncRead + AsyncWrite>` does not implement `tokio::io` traits. let stream: TokioIo<TcpStream> = TokioIo::new(stream); // is_tokio_io(&stream); does not compile is_hyper_io(&stream); // `TokioIo<TokioIo<I: AsyncRead + AsyncWrite>>` does not implement hyper's `Read` and `Write`. let stream: TokioIo<TokioIo<TcpStream>> = TokioIo::new(stream); is_tokio_io(&stream); // is_hyper_io(&stream); does not compile } fn main() {} ``` `TokioIo<I>` is not an adaptor that is additive with respect to its inner i/o transport. if it implements hyper's i/o, the wrapped type will implement tokio's _at the expense of hyper's_, and vice versa. this is often fine for simple programs, but this "switching" approach can interfere with larger applications' ability to make use of middleware that is generic across different kinds of i/o streams. this commit introduces types that allow a caller to _add_ `Read` or `Write` implementations to tokio readers and writers, or inversely, `AsyncRead` and `AsyncWrite` implementations to hyper readers and writers. Signed-off-by: katelyn martin <me+cratelyn@katelyn.world>
1 parent b90ff7d commit a10904d

File tree

3 files changed

+353
-0
lines changed

3 files changed

+353
-0
lines changed

src/rt/tokio.rs

+5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ use pin_project_lite::pin_project;
1313
#[cfg(feature = "tracing")]
1414
use tracing::instrument::Instrument;
1515

16+
pub use self::{with_hyper_io::WithHyperIo, with_tokio_io::WithTokioIo};
17+
18+
mod with_hyper_io;
19+
mod with_tokio_io;
20+
1621
/// Future executor that utilises `tokio` threads.
1722
#[non_exhaustive]
1823
#[derive(Default, Debug, Clone)]

src/rt/tokio/with_hyper_io.rs

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
use pin_project_lite::pin_project;
2+
use std::{
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
pin_project! {
8+
/// Extends an underlying [`tokio`] I/O with [`hyper`] I/O implementations.
9+
///
10+
/// This implements [`Read`] and [`Write`] given an inner type that implements [`AsyncRead`]
11+
/// and [`AsyncWrite`], respectively.
12+
#[derive(Debug)]
13+
pub struct WithHyperIo<I> {
14+
#[pin]
15+
inner: I,
16+
}
17+
}
18+
19+
// ==== impl WithHyperIo =====
20+
21+
impl<I> WithHyperIo<I> {
22+
/// Wraps the inner I/O in an [`WithHyperIo<I>`]
23+
pub fn new(inner: I) -> Self {
24+
Self { inner }
25+
}
26+
27+
/// Returns a reference to the inner type.
28+
pub fn inner(&self) -> &I {
29+
&self.inner
30+
}
31+
32+
/// Returns a mutable reference to the inner type.
33+
pub fn inner_mut(&mut self) -> &mut I {
34+
&mut self.inner
35+
}
36+
37+
/// Consumes this wrapper and returns the inner type.
38+
pub fn into_inner(self) -> I {
39+
self.inner
40+
}
41+
}
42+
43+
/// [`WithHyperIo<I>`] is [`Read`] if `I` is [`AsyncRead`].
44+
///
45+
/// [`AsyncRead`]: tokio::io::AsyncRead
46+
/// [`Read`]: hyper::rt::Read
47+
impl<I> hyper::rt::Read for WithHyperIo<I>
48+
where
49+
I: tokio::io::AsyncRead,
50+
{
51+
fn poll_read(
52+
self: Pin<&mut Self>,
53+
cx: &mut Context<'_>,
54+
mut buf: hyper::rt::ReadBufCursor<'_>,
55+
) -> Poll<Result<(), std::io::Error>> {
56+
let n = unsafe {
57+
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
58+
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
59+
Poll::Ready(Ok(())) => tbuf.filled().len(),
60+
other => return other,
61+
}
62+
};
63+
64+
unsafe {
65+
buf.advance(n);
66+
}
67+
Poll::Ready(Ok(()))
68+
}
69+
}
70+
71+
/// [`WithHyperIo<I>`] is [`Write`] if `I` is [`AsyncWrite`].
72+
///
73+
/// [`AsyncWrite`]: tokio::io::AsyncWrite
74+
/// [`Write`]: hyper::rt::Write
75+
impl<I> hyper::rt::Write for WithHyperIo<I>
76+
where
77+
I: tokio::io::AsyncWrite,
78+
{
79+
fn poll_write(
80+
self: Pin<&mut Self>,
81+
cx: &mut Context<'_>,
82+
buf: &[u8],
83+
) -> Poll<Result<usize, std::io::Error>> {
84+
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
85+
}
86+
87+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
88+
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
89+
}
90+
91+
fn poll_shutdown(
92+
self: Pin<&mut Self>,
93+
cx: &mut Context<'_>,
94+
) -> Poll<Result<(), std::io::Error>> {
95+
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
96+
}
97+
98+
fn is_write_vectored(&self) -> bool {
99+
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
100+
}
101+
102+
fn poll_write_vectored(
103+
self: Pin<&mut Self>,
104+
cx: &mut Context<'_>,
105+
bufs: &[std::io::IoSlice<'_>],
106+
) -> Poll<Result<usize, std::io::Error>> {
107+
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
108+
}
109+
}
110+
111+
/// [`WithHyperIo<I>`] exposes its inner `I`'s [`AsyncRead`] implementation.
112+
///
113+
/// [`AsyncRead`]: tokio::io::AsyncRead
114+
impl<I> tokio::io::AsyncRead for WithHyperIo<I>
115+
where
116+
I: tokio::io::AsyncRead,
117+
{
118+
#[inline]
119+
fn poll_read(
120+
self: Pin<&mut Self>,
121+
cx: &mut Context<'_>,
122+
buf: &mut tokio::io::ReadBuf<'_>,
123+
) -> Poll<Result<(), std::io::Error>> {
124+
self.project().inner.poll_read(cx, buf)
125+
}
126+
}
127+
128+
/// [`WithHyperIo<I>`] exposes its inner `I`'s [`AsyncWrite`] implementation.
129+
///
130+
/// [`AsyncWrite`]: tokio::io::AsyncWrite
131+
impl<I> tokio::io::AsyncWrite for WithHyperIo<I>
132+
where
133+
I: tokio::io::AsyncWrite,
134+
{
135+
#[inline]
136+
fn poll_write(
137+
self: Pin<&mut Self>,
138+
cx: &mut Context<'_>,
139+
buf: &[u8],
140+
) -> Poll<Result<usize, std::io::Error>> {
141+
self.project().inner.poll_write(cx, buf)
142+
}
143+
144+
#[inline]
145+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
146+
self.project().inner.poll_flush(cx)
147+
}
148+
149+
#[inline]
150+
fn poll_shutdown(
151+
self: Pin<&mut Self>,
152+
cx: &mut Context<'_>,
153+
) -> Poll<Result<(), std::io::Error>> {
154+
self.project().inner.poll_shutdown(cx)
155+
}
156+
157+
#[inline]
158+
fn is_write_vectored(&self) -> bool {
159+
self.inner.is_write_vectored()
160+
}
161+
162+
#[inline]
163+
fn poll_write_vectored(
164+
self: Pin<&mut Self>,
165+
cx: &mut Context<'_>,
166+
bufs: &[std::io::IoSlice<'_>],
167+
) -> Poll<Result<usize, std::io::Error>> {
168+
self.project().inner.poll_write_vectored(cx, bufs)
169+
}
170+
}

src/rt/tokio/with_tokio_io.rs

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
use pin_project_lite::pin_project;
2+
use std::{
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
pin_project! {
8+
/// Extends an underlying [`hyper`] I/O with [`tokio`] I/O implementations.
9+
///
10+
/// This implements [`AsyncRead`] and [`AsyncWrite`] given an inner type that implements
11+
/// [`Read`] and [`Write`], respectively.
12+
#[derive(Debug)]
13+
pub struct WithTokioIo<I> {
14+
#[pin]
15+
inner: I,
16+
}
17+
}
18+
19+
// ==== impl WithTokioIo =====
20+
21+
/// [`WithTokioIo<I>`] is [`AsyncRead`] if `I` is [`Read`].
22+
///
23+
/// [`AsyncRead`]: tokio::io::AsyncRead
24+
/// [`Read`]: hyper::rt::Read
25+
impl<I> tokio::io::AsyncRead for WithTokioIo<I>
26+
where
27+
I: hyper::rt::Read,
28+
{
29+
fn poll_read(
30+
self: Pin<&mut Self>,
31+
cx: &mut Context<'_>,
32+
tbuf: &mut tokio::io::ReadBuf<'_>,
33+
) -> Poll<Result<(), std::io::Error>> {
34+
//let init = tbuf.initialized().len();
35+
let filled = tbuf.filled().len();
36+
let sub_filled = unsafe {
37+
let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
38+
39+
match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
40+
Poll::Ready(Ok(())) => buf.filled().len(),
41+
other => return other,
42+
}
43+
};
44+
45+
let n_filled = filled + sub_filled;
46+
// At least sub_filled bytes had to have been initialized.
47+
let n_init = sub_filled;
48+
unsafe {
49+
tbuf.assume_init(n_init);
50+
tbuf.set_filled(n_filled);
51+
}
52+
53+
Poll::Ready(Ok(()))
54+
}
55+
}
56+
57+
/// [`WithTokioIo<I>`] is [`AsyncWrite`] if `I` is [`Write`].
58+
///
59+
/// [`AsyncWrite`]: tokio::io::AsyncWrite
60+
/// [`Write`]: hyper::rt::Write
61+
impl<I> tokio::io::AsyncWrite for WithTokioIo<I>
62+
where
63+
I: hyper::rt::Write,
64+
{
65+
fn poll_write(
66+
self: Pin<&mut Self>,
67+
cx: &mut Context<'_>,
68+
buf: &[u8],
69+
) -> Poll<Result<usize, std::io::Error>> {
70+
hyper::rt::Write::poll_write(self.project().inner, cx, buf)
71+
}
72+
73+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
74+
hyper::rt::Write::poll_flush(self.project().inner, cx)
75+
}
76+
77+
fn poll_shutdown(
78+
self: Pin<&mut Self>,
79+
cx: &mut Context<'_>,
80+
) -> Poll<Result<(), std::io::Error>> {
81+
hyper::rt::Write::poll_shutdown(self.project().inner, cx)
82+
}
83+
84+
fn is_write_vectored(&self) -> bool {
85+
hyper::rt::Write::is_write_vectored(&self.inner)
86+
}
87+
88+
fn poll_write_vectored(
89+
self: Pin<&mut Self>,
90+
cx: &mut Context<'_>,
91+
bufs: &[std::io::IoSlice<'_>],
92+
) -> Poll<Result<usize, std::io::Error>> {
93+
hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
94+
}
95+
}
96+
97+
/// [`WithTokioIo<I>`] exposes its inner `I`'s [`Write`] implementation.
98+
///
99+
/// [`Write`]: hyper::rt::Write
100+
impl<I> hyper::rt::Write for WithTokioIo<I>
101+
where
102+
I: hyper::rt::Write,
103+
{
104+
#[inline]
105+
fn poll_write(
106+
self: Pin<&mut Self>,
107+
cx: &mut Context<'_>,
108+
buf: &[u8],
109+
) -> Poll<Result<usize, std::io::Error>> {
110+
self.project().inner.poll_write(cx, buf)
111+
}
112+
113+
#[inline]
114+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
115+
self.project().inner.poll_flush(cx)
116+
}
117+
118+
#[inline]
119+
fn poll_shutdown(
120+
self: Pin<&mut Self>,
121+
cx: &mut Context<'_>,
122+
) -> Poll<Result<(), std::io::Error>> {
123+
self.project().inner.poll_shutdown(cx)
124+
}
125+
126+
#[inline]
127+
fn is_write_vectored(&self) -> bool {
128+
self.inner.is_write_vectored()
129+
}
130+
131+
#[inline]
132+
fn poll_write_vectored(
133+
self: Pin<&mut Self>,
134+
cx: &mut Context<'_>,
135+
bufs: &[std::io::IoSlice<'_>],
136+
) -> Poll<Result<usize, std::io::Error>> {
137+
self.project().inner.poll_write_vectored(cx, bufs)
138+
}
139+
}
140+
141+
impl<I> WithTokioIo<I> {
142+
/// Wraps the inner I/O in an [`WithTokioIo<I>`]
143+
pub fn new(inner: I) -> Self {
144+
Self { inner }
145+
}
146+
147+
/// Returns a reference to the inner type.
148+
pub fn inner(&self) -> &I {
149+
&self.inner
150+
}
151+
152+
/// Returns a mutable reference to the inner type.
153+
pub fn inner_mut(&mut self) -> &mut I {
154+
&mut self.inner
155+
}
156+
157+
/// Consumes this wrapper and returns the inner type.
158+
pub fn into_inner(self) -> I {
159+
self.inner
160+
}
161+
}
162+
163+
/// [`WithTokioIo<I>`] exposes its inner `I`'s [`Read`] implementation.
164+
///
165+
/// [`Read`]: hyper::rt::Read
166+
impl<I> hyper::rt::Read for WithTokioIo<I>
167+
where
168+
I: hyper::rt::Read,
169+
{
170+
#[inline]
171+
fn poll_read(
172+
self: Pin<&mut Self>,
173+
cx: &mut Context<'_>,
174+
buf: hyper::rt::ReadBufCursor<'_>,
175+
) -> Poll<Result<(), std::io::Error>> {
176+
self.project().inner.poll_read(cx, buf)
177+
}
178+
}

0 commit comments

Comments
 (0)