Skip to content

Commit c86632e

Browse files
authored
feat(util): introduce Sender::{capacity, max_capacity} (#147)
this commit introduces two methods, allowing callers to inspect the capacity, both available and maximum, of the channel-backed body. this allows callers to observe when the channel is full. X-Ref: #140 Signed-off-by: katelyn martin <me+cratelyn@katelyn.world>
1 parent f904761 commit c86632e

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

http-body-util/src/channel.rs

+59
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,65 @@ impl<D, E> Sender<D, E> {
114114
.map_err(tokio::sync::mpsc::error::TrySendError::into_inner)
115115
}
116116

117+
/// Returns the current capacity of the channel.
118+
///
119+
/// The capacity goes down when [`Frame<T>`]s are sent. The capacity goes up when these frames
120+
/// are received by the corresponding [`Channel<D, E>`]. This is distinct from
121+
/// [`max_capacity()`][Self::max_capacity], which always returns the buffer capacity initially
122+
/// specified when [`Channel::new()`][Channel::new] was called.
123+
///
124+
/// # Examples
125+
///
126+
/// ```
127+
/// use bytes::Bytes;
128+
/// use http_body_util::{BodyExt, channel::Channel};
129+
/// use std::convert::Infallible;
130+
///
131+
/// #[tokio::main]
132+
/// async fn main() {
133+
/// let (mut tx, mut body) = Channel::<Bytes, Infallible>::new(4);
134+
/// assert_eq!(tx.capacity(), 4);
135+
///
136+
/// // Sending a value decreases the available capacity.
137+
/// tx.send_data(Bytes::from("Hel")).await.unwrap();
138+
/// assert_eq!(tx.capacity(), 3);
139+
///
140+
/// // Reading a value increases the available capacity.
141+
/// let _ = body.frame().await;
142+
/// assert_eq!(tx.capacity(), 4);
143+
/// }
144+
/// ```
145+
pub fn capacity(&mut self) -> usize {
146+
self.tx_frame.capacity()
147+
}
148+
149+
/// Returns the maximum capacity of the channel.
150+
///
151+
/// This function always returns the buffer capacity initially specified when
152+
/// [`Channel::new()`][Channel::new] was called. This is distinct from
153+
/// [`capacity()`][Self::capacity], which returns the currently available capacity.
154+
///
155+
/// # Examples
156+
///
157+
/// ```
158+
/// use bytes::Bytes;
159+
/// use http_body_util::{BodyExt, channel::Channel};
160+
/// use std::convert::Infallible;
161+
///
162+
/// #[tokio::main]
163+
/// async fn main() {
164+
/// let (mut tx, mut body) = Channel::<Bytes, Infallible>::new(4);
165+
/// assert_eq!(tx.max_capacity(), 4);
166+
///
167+
/// // Sending a value buffers it, but does not affect the maximum capacity reported.
168+
/// tx.send_data(Bytes::from("Hel")).await.unwrap();
169+
/// assert_eq!(tx.max_capacity(), 4);
170+
/// }
171+
/// ```
172+
pub fn max_capacity(&mut self) -> usize {
173+
self.tx_frame.max_capacity()
174+
}
175+
117176
/// Aborts the body in an abnormal fashion.
118177
pub fn abort(self, error: E) {
119178
self.tx_error.send(error).ok();

0 commit comments

Comments
 (0)