Skip to content

Commit 02e11f5

Browse files
committed
feat(util): introduce a "fused" body combinator
this commit introduces a new `Body` combinator to the `http-body-util` library, `http_body_util::combinators::Fuse<B>`. this combinator is roughly equivalent to the `std::iter::Fuse<I>` iterator, which returns `None` after the inner iterator returns it once. while bodies *should* return `Poll::Ready(None)` indefinitely after reaching the end of the stream or returning an error, this combinator can help prevent further polling of an underlying body implementation, in the same manner that `std::iter::Iterator::fuse()` helps prevent an underlying iterator that might e.g. yield `Some(value)` after yielding `None`, or panic. Signed-off-by: katelyn martin <me+cratelyn@katelyn.world>
1 parent 1090bff commit 02e11f5

File tree

3 files changed

+247
-0
lines changed

3 files changed

+247
-0
lines changed
+232
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
use std::{
2+
pin::Pin,
3+
task::{Context, Poll},
4+
};
5+
6+
use http_body::{Body, Frame, SizeHint};
7+
8+
/// A "fused" [`Body`].
9+
///
10+
/// This [`Body`] yields [`Poll::Ready(None)`] forever after the underlying body yields
11+
/// [`Poll::Ready(None)`], or an error [`Poll::Ready(Some(Err(_)))`], once.
12+
///
13+
/// Bodies should ideally continue to return [`Poll::Ready(None)`] indefinitely after the end of
14+
/// the stream is reached. [`Fuse<B>`] avoids polling its underlying body `B` further after the
15+
/// underlying stream as ended, which can be useful for implementation that cannot uphold this
16+
/// guarantee.
17+
///
18+
/// This is akin to the functionality that [`std::iter::Iterator::fuse()`] provides for
19+
/// [`Iterator`][std::iter::Iterator]s.
20+
#[derive(Debug)]
21+
pub struct Fuse<B> {
22+
inner: Option<B>,
23+
}
24+
25+
impl<B> Fuse<B>
26+
where
27+
B: Body,
28+
{
29+
/// Returns a fused body.
30+
pub fn new(body: B) -> Self {
31+
Self {
32+
inner: if body.is_end_stream() {
33+
None
34+
} else {
35+
Some(body)
36+
},
37+
}
38+
}
39+
}
40+
41+
impl<B> Body for Fuse<B>
42+
where
43+
B: Body + Unpin,
44+
{
45+
type Data = B::Data;
46+
type Error = B::Error;
47+
48+
fn poll_frame(
49+
self: Pin<&mut Self>,
50+
cx: &mut Context<'_>,
51+
) -> Poll<Option<Result<Frame<B::Data>, B::Error>>> {
52+
let Self { inner } = self.get_mut();
53+
54+
let Some((frame, eos)) =
55+
inner
56+
.as_mut()
57+
.map(|mut inner| match Pin::new(&mut inner).poll_frame(cx) {
58+
frame @ Poll::Ready(Some(Ok(_))) => (frame, inner.is_end_stream()),
59+
end @ Poll::Ready(Some(Err(_)) | None) => (end, true),
60+
poll @ Poll::Pending => (poll, false),
61+
})
62+
else {
63+
return Poll::Ready(None);
64+
};
65+
66+
eos.then(|| inner.take());
67+
frame
68+
}
69+
70+
fn is_end_stream(&self) -> bool {
71+
self.inner.is_none()
72+
}
73+
74+
fn size_hint(&self) -> SizeHint {
75+
self.inner
76+
.as_ref()
77+
.map(B::size_hint)
78+
.unwrap_or_else(|| SizeHint::with_exact(0))
79+
}
80+
}
81+
82+
#[cfg(test)]
83+
mod tests {
84+
use super::*;
85+
use bytes::Bytes;
86+
use std::collections::VecDeque;
87+
88+
/// A value returned by a call to [`Body::poll_frame()`].
89+
type PollFrame = Poll<Option<Result<Frame<Bytes>, Error>>>;
90+
91+
type Error = &'static str;
92+
93+
struct Mock<'count> {
94+
poll_count: &'count mut u8,
95+
polls: VecDeque<PollFrame>,
96+
}
97+
98+
#[test]
99+
fn empty_never_polls() {
100+
let mut count = 0_u8;
101+
let empty = Mock::new(&mut count, []);
102+
debug_assert!(empty.is_end_stream());
103+
let fused = Fuse::new(empty);
104+
assert!(fused.inner.is_none());
105+
drop(fused);
106+
assert_eq!(count, 0);
107+
}
108+
109+
#[test]
110+
fn stops_polling_after_none() {
111+
let mut count = 0_u8;
112+
let empty = Mock::new(&mut count, [Poll::Ready(None)]);
113+
debug_assert!(!empty.is_end_stream());
114+
let mut fused = Fuse::new(empty);
115+
assert!(fused.inner.is_some());
116+
117+
let waker = futures_util::task::noop_waker();
118+
let mut cx = Context::from_waker(&waker);
119+
match Pin::new(&mut fused).poll_frame(&mut cx) {
120+
Poll::Ready(None) => {}
121+
other => panic!("unexpected poll outcome: {:?}", other),
122+
}
123+
124+
assert!(fused.inner.is_none());
125+
match Pin::new(&mut fused).poll_frame(&mut cx) {
126+
Poll::Ready(None) => {}
127+
other => panic!("unexpected poll outcome: {:?}", other),
128+
}
129+
130+
drop(fused);
131+
assert_eq!(count, 1);
132+
}
133+
134+
#[test]
135+
fn stops_polling_after_some_eos() {
136+
let mut count = 0_u8;
137+
let body = Mock::new(
138+
&mut count,
139+
[Poll::Ready(Some(Ok(Frame::data(Bytes::from_static(
140+
b"hello",
141+
)))))],
142+
);
143+
debug_assert!(!body.is_end_stream());
144+
let mut fused = Fuse::new(body);
145+
assert!(fused.inner.is_some());
146+
147+
let waker = futures_util::task::noop_waker();
148+
let mut cx = Context::from_waker(&waker);
149+
150+
match Pin::new(&mut fused).poll_frame(&mut cx) {
151+
Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes.into_data().expect("data"), "hello"),
152+
other => panic!("unexpected poll outcome: {:?}", other),
153+
}
154+
155+
assert!(fused.inner.is_none());
156+
match Pin::new(&mut fused).poll_frame(&mut cx) {
157+
Poll::Ready(None) => {}
158+
other => panic!("unexpected poll outcome: {:?}", other),
159+
}
160+
161+
drop(fused);
162+
assert_eq!(count, 1);
163+
}
164+
165+
#[test]
166+
fn stops_polling_after_some_error() {
167+
let mut count = 0_u8;
168+
let body = Mock::new(
169+
&mut count,
170+
[
171+
Poll::Ready(Some(Ok(Frame::data(Bytes::from_static(b"hello"))))),
172+
Poll::Ready(Some(Err("oh no"))),
173+
Poll::Ready(Some(Ok(Frame::data(Bytes::from_static(b"world"))))),
174+
],
175+
);
176+
debug_assert!(!body.is_end_stream());
177+
let mut fused = Fuse::new(body);
178+
assert!(fused.inner.is_some());
179+
180+
let waker = futures_util::task::noop_waker();
181+
let mut cx = Context::from_waker(&waker);
182+
183+
match Pin::new(&mut fused).poll_frame(&mut cx) {
184+
Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes.into_data().expect("data"), "hello"),
185+
other => panic!("unexpected poll outcome: {:?}", other),
186+
}
187+
188+
assert!(fused.inner.is_some());
189+
match Pin::new(&mut fused).poll_frame(&mut cx) {
190+
Poll::Ready(Some(Err("oh no"))) => {}
191+
other => panic!("unexpected poll outcome: {:?}", other),
192+
}
193+
194+
assert!(fused.inner.is_none());
195+
match Pin::new(&mut fused).poll_frame(&mut cx) {
196+
Poll::Ready(None) => {}
197+
other => panic!("unexpected poll outcome: {:?}", other),
198+
}
199+
200+
drop(fused);
201+
assert_eq!(count, 2);
202+
}
203+
204+
// === impl Mock ===
205+
206+
impl<'count> Mock<'count> {
207+
fn new(poll_count: &'count mut u8, polls: impl IntoIterator<Item = PollFrame>) -> Self {
208+
Self {
209+
poll_count,
210+
polls: polls.into_iter().collect(),
211+
}
212+
}
213+
}
214+
215+
impl Body for Mock<'_> {
216+
type Data = Bytes;
217+
type Error = &'static str;
218+
219+
fn poll_frame(
220+
self: Pin<&mut Self>,
221+
_cx: &mut Context<'_>,
222+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
223+
let Self { poll_count, polls } = self.get_mut();
224+
**poll_count = poll_count.saturating_add(1);
225+
polls.pop_front().unwrap_or(Poll::Ready(None))
226+
}
227+
228+
fn is_end_stream(&self) -> bool {
229+
self.polls.is_empty()
230+
}
231+
}
232+
}

http-body-util/src/combinators/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
mod box_body;
44
mod collect;
55
mod frame;
6+
mod fuse;
67
mod map_err;
78
mod map_frame;
89
mod with_trailers;
@@ -11,6 +12,7 @@ pub use self::{
1112
box_body::{BoxBody, UnsyncBoxBody},
1213
collect::Collect,
1314
frame::Frame,
15+
fuse::Fuse,
1416
map_err::MapErr,
1517
map_frame::MapFrame,
1618
with_trailers::WithTrailers,

http-body-util/src/lib.rs

+13
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@ pub trait BodyExt: http_body::Body {
142142
{
143143
BodyDataStream::new(self)
144144
}
145+
146+
/// Creates a "fused" body.
147+
///
148+
/// This [`Body`][http_body::Body] yields [`Poll::Ready(None)`] forever after the underlying
149+
/// body yields [`Poll::Ready(None)`], or an error [`Poll::Ready(Some(Err(_)))`], once.
150+
///
151+
/// See [`Fuse<B>`][combinators::Fuse] for more information.
152+
fn fuse(self) -> combinators::Fuse<Self>
153+
where
154+
Self: Sized,
155+
{
156+
combinators::Fuse::new(self)
157+
}
145158
}
146159

147160
impl<T: ?Sized> BodyExt for T where T: http_body::Body {}

0 commit comments

Comments
 (0)