Skip to content

Commit 43988c8

Browse files
committed
feat(http/retry): ReplayBody<B> polls for frames
pr #3559 (dd4fbcd) refactored our trailer peeking body middleware to model its buffering in terms of the `Frame<T>` type used in `http-body`'s 1.0 release. this commit performs a similar change for the other piece of body middleware that super linkerd's retry facilities: `ReplayBody<B>`. the inner body `B` is now wrapped in the `ForwardCompatibleBody<B>` adapter, and we now poll it in terms of frames. NB: polling the underlying in terms of frames has a subtle knock-on effect regarding when we observe the trailers, in the liminal period between this refactor and the subsequent upgrade to hyper 1.0, whilst we must still implement the existing 0.4 interface for `Body` that includes `poll_trailers()`. see the comment above `replay_trailers` for more on this, describing why we now initialize this to `true`. relatedly, this is why we now longer delegate down to `B::poll_trailers` ourselves. it will have already been called by our adapter. `ReplayBody::is_end_stream()` now behaves identically when initially polling a body compared to subsequent replays. this is fine, as `is_end_stream()` is a hint that facilitates optimizations (hyperium/http-body#143). we do still report the end properly, we just won't be quite as prescient on the initial playthrough. see: - linkerd/linkerd2#8733. - #3559 Signed-off-by: katelyn martin <kate@buoyant.io>
1 parent a5eb1dc commit 43988c8

File tree

3 files changed

+66
-26
lines changed

3 files changed

+66
-26
lines changed

linkerd/http/retry/src/compat.rs

-3
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,17 @@ impl<B: Body> ForwardCompatibleBody<B> {
4747
}
4848

4949
/// Returns `true` when the end of stream has been reached.
50-
#[allow(unused, reason = "not yet used")]
5150
pub(crate) fn is_end_stream(&self) -> bool {
5251
self.inner.is_end_stream()
5352
}
5453

5554
/// Returns the bounds on the remaining length of the stream.
56-
#[allow(unused, reason = "not yet used")]
5755
pub(crate) fn size_hint(&self) -> SizeHint {
5856
self.inner.size_hint()
5957
}
6058
}
6159

6260
impl<B: Body + Unpin> ForwardCompatibleBody<B> {
63-
#[allow(unused, reason = "not yet used")]
6461
pub(crate) fn poll_frame(
6562
self: Pin<&mut Self>,
6663
cx: &mut Context<'_>,

linkerd/http/retry/src/replay.rs

+58-23
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ struct SharedState<B> {
6868
struct BodyState<B> {
6969
replay: Replay,
7070
trailers: Option<HeaderMap>,
71-
rest: B,
71+
rest: crate::compat::ForwardCompatibleBody<B>,
7272
is_completed: bool,
7373

7474
/// Maximum number of bytes to buffer.
@@ -104,13 +104,19 @@ impl<B: Body> ReplayBody<B> {
104104
state: Some(BodyState {
105105
replay: Default::default(),
106106
trailers: None,
107-
rest: body,
107+
rest: crate::compat::ForwardCompatibleBody::new(body),
108108
is_completed: false,
109109
max_bytes: max_bytes + 1,
110110
}),
111-
// The initial `ReplayBody` has nothing to replay
111+
// The initial `ReplayBody` has no data to replay.
112112
replay_body: false,
113-
replay_trailers: false,
113+
// NOTE(kate): When polling the inner body in terms of frames, we will not yield
114+
// `Ready(None)` from `Body::poll_data()` until we have reached the end of the
115+
// underlying stream. Once we have migrated to `http-body` v1, this field will be
116+
// initialized `false` thanks to the use of `Body::poll_frame()`, but for now we must
117+
// initialize this to true; `poll_trailers()` will be called after the trailers have
118+
// been observed previously, even for the initial body.
119+
replay_trailers: true,
114120
})
115121
}
116122

@@ -204,16 +210,33 @@ where
204210
// Poll the inner body for more data. If the body has ended, remember
205211
// that so that future clones will not try polling it again (as
206212
// described above).
207-
let data = {
213+
let data: B::Data = {
214+
use futures::{future::Either, ready};
215+
// Poll the inner body for the next frame.
208216
tracing::trace!("Polling initial body");
209-
match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) {
210-
Some(Ok(data)) => data,
211-
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
217+
let poll = Pin::new(&mut state.rest).poll_frame(cx).map_err(Into::into);
218+
let frame = match ready!(poll) {
219+
// The body yielded a new frame.
220+
Some(Ok(frame)) => frame,
221+
// The body yielded an error.
222+
Some(Err(error)) => return Poll::Ready(Some(Err(error))),
223+
// The body has reached the end of the stream.
212224
None => {
213225
tracing::trace!("Initial body completed");
214226
state.is_completed = true;
215227
return Poll::Ready(None);
216228
}
229+
};
230+
// Now, inspect the frame: was it a chunk of data, or a trailers frame?
231+
match Self::split_frame(frame) {
232+
Some(Either::Left(data)) => data,
233+
Some(Either::Right(trailers)) => {
234+
tracing::trace!("Initial body completed");
235+
state.trailers = Some(trailers);
236+
state.is_completed = true;
237+
return Poll::Ready(None);
238+
}
239+
None => return Poll::Ready(None),
217240
}
218241
};
219242

@@ -234,7 +257,7 @@ where
234257
/// NOT be polled until the previous body has been dropped.
235258
fn poll_trailers(
236259
self: Pin<&mut Self>,
237-
cx: &mut Context<'_>,
260+
_cx: &mut Context<'_>,
238261
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
239262
let this = self.get_mut();
240263
let state = Self::acquire_state(&mut this.state, &this.shared.body);
@@ -251,20 +274,6 @@ where
251274
}
252275
}
253276

254-
// If the inner body has previously ended, don't poll it again.
255-
if !state.rest.is_end_stream() {
256-
return Pin::new(&mut state.rest)
257-
.poll_trailers(cx)
258-
.map_ok(|tlrs| {
259-
// Record a copy of the inner body's trailers in the shared state.
260-
if state.trailers.is_none() {
261-
state.trailers.clone_from(&tlrs);
262-
}
263-
tlrs
264-
})
265-
.map_err(Into::into);
266-
}
267-
268277
Poll::Ready(Ok(None))
269278
}
270279

@@ -334,6 +343,32 @@ impl<B> Drop for ReplayBody<B> {
334343
}
335344
}
336345

346+
impl<B: Body> ReplayBody<B> {
347+
/// Splits a `Frame<T>` into a chunk of data or a header map.
348+
///
349+
/// Frames do not expose their inner enums, and instead expose `into_data()` and
350+
/// `into_trailers()` methods. This function breaks the frame into either `Some(Left(data))`
351+
/// if it is given a DATA frame, and `Some(Right(trailers))` if it is given a TRAILERS frame.
352+
///
353+
/// This returns `None` if an unknown frame is provided, that is neither.
354+
///
355+
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
356+
fn split_frame(
357+
frame: crate::compat::Frame<B::Data>,
358+
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
359+
use {crate::compat::Frame, futures::future::Either};
360+
match frame.into_data().map_err(Frame::into_trailers) {
361+
Ok(data) => Some(Either::Left(data)),
362+
Err(Ok(trailers)) => Some(Either::Right(trailers)),
363+
Err(Err(_unknown)) => {
364+
// It's possible that some sort of unknown frame could be encountered.
365+
tracing::warn!("an unknown body frame has been buffered");
366+
None
367+
}
368+
}
369+
}
370+
}
371+
337372
// === impl BodyState ===
338373

339374
impl<B> BodyState<B> {

linkerd/http/retry/src/replay/tests.rs

+8
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ async fn eos_only_when_fully_replayed() {
345345
.expect("yields a frame")
346346
.into_data()
347347
.expect("yields a data frame");
348+
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
349+
assert!(initial.frame().await.is_none());
348350
assert!(initial.is_end_stream());
349351
assert!(!replay.is_end_stream());
350352
drop(initial);
@@ -634,6 +636,12 @@ async fn size_hint_is_correct_across_replays() {
634636

635637
// Read the body, check the size hint again.
636638
assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY));
639+
let initial = {
640+
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
641+
let mut body = crate::compat::ForwardCompatibleBody::new(initial);
642+
assert!(body.frame().await.is_none());
643+
body.into_inner()
644+
};
637645
debug_assert!(initial.is_end_stream());
638646
// TODO(kate): this currently misreports the *remaining* size of the body.
639647
// let size = initial.size_hint();

0 commit comments

Comments
 (0)