Skip to content

Commit a2c5b56

Browse files
committed
review: use sync::oneshot for error channel
this applies a review suggestion here: https://github.com/hyperium/http-body/pull/100/files#r1399781061 this commit refactors the channel-backed body in hyperium#100, changing the `mpsc::Receiver<E>` used to transmit an error into a `oneshot::Receiver<E>`. this should improve memory usage, and make the channel a smaller structure. in order to achieve this, some minor adjustments are made: * use pin projection, projecting pinnedness to the oneshot receiver, polling it via `core::future::Future::poll(..)` to yield a body frame. * add `Debug` bounds were needed. as an alternative, see tokio-rs/tokio#7059, which proposed a `poll_recv(..)` inherent method for a oneshot channel receiver.
1 parent a10e443 commit a2c5b56

File tree

1 file changed

+22
-28
lines changed

1 file changed

+22
-28
lines changed

http-body-util/src/channel.rs

+22-28
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@ use std::{
99
use bytes::Buf;
1010
use http::HeaderMap;
1111
use http_body::{Body, Frame};
12-
use tokio::sync::mpsc;
13-
14-
/// A body backed by a channel.
15-
pub struct Channel<D, E = std::convert::Infallible> {
16-
rx_frame: mpsc::Receiver<Frame<D>>,
17-
rx_error: mpsc::Receiver<E>,
12+
use pin_project_lite::pin_project;
13+
use tokio::sync::{mpsc, oneshot};
14+
15+
pin_project! {
16+
/// A body backed by a channel.
17+
pub struct Channel<D, E = std::convert::Infallible> {
18+
rx_frame: mpsc::Receiver<Frame<D>>,
19+
#[pin]
20+
rx_error: oneshot::Receiver<E>,
21+
}
1822
}
1923

2024
impl<D, E> Channel<D, E> {
@@ -25,7 +29,7 @@ impl<D, E> Channel<D, E> {
2529
/// provided buffer capacity must be at least 1.
2630
pub fn new(buffer: usize) -> (Sender<D, E>, Self) {
2731
let (tx_frame, rx_frame) = mpsc::channel(buffer);
28-
let (tx_error, rx_error) = mpsc::channel(1);
32+
let (tx_error, rx_error) = oneshot::channel();
2933
(Sender { tx_frame, tx_error }, Self { rx_frame, rx_error })
3034
}
3135
}
@@ -38,24 +42,27 @@ where
3842
type Error = E;
3943

4044
fn poll_frame(
41-
mut self: Pin<&mut Self>,
45+
self: Pin<&mut Self>,
4246
cx: &mut Context<'_>,
4347
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
44-
match self.rx_frame.poll_recv(cx) {
48+
let this = self.project();
49+
50+
match this.rx_frame.poll_recv(cx) {
4551
Poll::Ready(frame) => return Poll::Ready(frame.map(Ok)),
4652
Poll::Pending => {}
4753
}
4854

49-
match self.rx_error.poll_recv(cx) {
50-
Poll::Ready(err) => return Poll::Ready(err.map(Err)),
55+
use core::future::Future;
56+
match this.rx_error.poll(cx) {
57+
Poll::Ready(err) => return Poll::Ready(err.ok().map(Err)),
5158
Poll::Pending => {}
5259
}
5360

5461
Poll::Pending
5562
}
5663
}
5764

58-
impl<D, E> std::fmt::Debug for Channel<D, E> {
65+
impl<D, E: std::fmt::Debug> std::fmt::Debug for Channel<D, E> {
5966
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6067
f.debug_struct("Channel")
6168
.field("rx_frame", &self.rx_frame)
@@ -67,7 +74,7 @@ impl<D, E> std::fmt::Debug for Channel<D, E> {
6774
/// A sender half created through [`Channel::new`].
6875
pub struct Sender<D, E = std::convert::Infallible> {
6976
tx_frame: mpsc::Sender<Frame<D>>,
70-
tx_error: mpsc::Sender<E>,
77+
tx_error: oneshot::Sender<E>,
7178
}
7279

7380
impl<D, E> Sender<D, E> {
@@ -88,24 +95,11 @@ impl<D, E> Sender<D, E> {
8895

8996
/// Aborts the body in an abnormal fashion.
9097
pub fn abort(self, error: E) {
91-
match self.tx_error.try_send(error) {
92-
Ok(_) => {}
93-
Err(err) => {
94-
match err {
95-
mpsc::error::TrySendError::Full(_) => {
96-
// Channel::new creates the error channel with space for 1 message and we
97-
// only send once because this method consumes `self`. So the receiver
98-
// can't be full.
99-
unreachable!("error receiver should never be full")
100-
}
101-
mpsc::error::TrySendError::Closed(_) => {}
102-
}
103-
}
104-
}
98+
self.tx_error.send(error).ok();
10599
}
106100
}
107101

108-
impl<D, E> std::fmt::Debug for Sender<D, E> {
102+
impl<D, E: std::fmt::Debug> std::fmt::Debug for Sender<D, E> {
109103
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110104
f.debug_struct("Sender")
111105
.field("tx_frame", &self.tx_frame)

0 commit comments

Comments
 (0)