Skip to content

Commit 5d21ad1

Browse files
authored
feat(util): introduce channel::Sender::try_send() (#146)
pr #140 introduced a new channel-backed body. this body provides a new equivalent to the defunct `hyper::Body::channel()` interface that was exposed in hyper 0.14, and removed in the 1.0 major release. the previous `Sender` type also included a useful method, `try_send_data()`, which allows a thread to _synchronously_ attempt to send data _outside_ of an asynchronous context. <https://docs.rs/hyper/0.14.31/hyper/body/struct.Sender.html#method.try_send_data> this commit introduces a loosely equivalent `Sender::try_send()` method to provide users of `http-body-util` with a means to send `Frame<T>` values outside of an asynchronous context, without potentially scheduling the caller to be woken later or yielding should the channel be full. this function accepts a `Frame<D>` rather than a chunk of data, to fit in with the shift towards frame-oriented interfaces in `http-body` and `http-body-util`. Signed-off-by: katelyn martin <me+cratelyn@katelyn.world>
1 parent 1090bff commit 5d21ad1

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

http-body-util/src/channel.rs

+66
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,26 @@ impl<D, E> Sender<D, E> {
9494
self.send(Frame::trailers(trailers)).await
9595
}
9696

97+
/// Attempts to send a frame on this channel.
98+
///
99+
/// This function returns the unsent frame back as an `Err(_)` if the channel could not
100+
/// (currently) accept another frame.
101+
///
102+
/// # Note
103+
///
104+
/// This is mostly useful for when trying to send a frame from outside of an asynchronous
105+
/// context. If in an async context, prefer [`Sender::send_data()`] instead.
106+
pub fn try_send(&mut self, frame: Frame<D>) -> Result<(), Frame<D>> {
107+
let Self {
108+
tx_frame,
109+
tx_error: _,
110+
} = self;
111+
112+
tx_frame
113+
.try_send(frame)
114+
.map_err(tokio::sync::mpsc::error::TrySendError::into_inner)
115+
}
116+
97117
/// Aborts the body in an abnormal fashion.
98118
pub fn abort(self, error: E) {
99119
self.tx_error.send(error).ok();
@@ -193,6 +213,52 @@ mod tests {
193213
assert_eq!(collected.to_bytes(), "Hello!");
194214
}
195215

216+
#[tokio::test]
217+
async fn try_send_works() {
218+
let (mut tx, mut body) = Channel::<Bytes>::new(2);
219+
220+
// Send two messages, filling the channel's buffer.
221+
tx.try_send(Frame::data(Bytes::from("one")))
222+
.expect("can send one message");
223+
tx.try_send(Frame::data(Bytes::from("two")))
224+
.expect("can send two messages");
225+
226+
// Sending a value to a full channel should return it back to us.
227+
match tx.try_send(Frame::data(Bytes::from("three"))) {
228+
Err(frame) => assert_eq!(frame.into_data().unwrap(), "three"),
229+
Ok(()) => panic!("synchronously sending a value to a full channel should fail"),
230+
};
231+
232+
// Read the messages out of the body.
233+
assert_eq!(
234+
body.frame()
235+
.await
236+
.expect("yields result")
237+
.expect("yields frame")
238+
.into_data()
239+
.expect("yields data"),
240+
"one"
241+
);
242+
assert_eq!(
243+
body.frame()
244+
.await
245+
.expect("yields result")
246+
.expect("yields frame")
247+
.into_data()
248+
.expect("yields data"),
249+
"two"
250+
);
251+
252+
// Drop the body.
253+
drop(body);
254+
255+
// Sending a value to a closed channel should return it back to us.
256+
match tx.try_send(Frame::data(Bytes::from("closed"))) {
257+
Err(frame) => assert_eq!(frame.into_data().unwrap(), "closed"),
258+
Ok(()) => panic!("synchronously sending a value to a closed channel should fail"),
259+
};
260+
}
261+
196262
/// A stand-in for an error type, for unit tests.
197263
type Error = &'static str;
198264
/// An example error message.

0 commit comments

Comments
 (0)