Skip to content

Commit a242043

Browse files
committed
sync: add Receiver::poll_recv(..) method
this commit adds a `rx.poll_recv(&mut cx)` to the public interface of `tokio::sync::oneshot::Receiver<T>`. this method has the following signature: ```rust // tokio/src/sync/oneshot.rs impl<T> Receiver<T> { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> { // ... } } ``` this is similar to the `tokio::sync::mpsc::Receiver::poll_recv` and `tokio::sync::mpsc::UnboundedReceiver::poll_recv` methods, which have the following signature: ```rust // tokio/src/sync/mpsc/bounded.rs impl<T> Receiver<T> { pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { // ... } } ``` see: https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv in particular, note the `&mut self` receiver of these methods, as opposed to the `Pin<&mut Self>` receiver in `Future::poll(..)`. today, a oneshot receiver must be pinned in order to be polled via `Future::poll(..)`. `tokio::sync::oneshot::Receiver::try_recv(..)` has an important but subtle difference from `poll_recv(..)`, alluded to in its documentation: > If a pending value exists in the channel, it is returned. If no value > has been sent, the current task will not be registered for future > notification. > > This function is useful to call from outside the context of an > asynchronous task. see hyperium/http-body#100 for an example use-case for this. if we *are* in the context of an asynchronous task, we may wish to poll on the receiver-end of the channel and register for future notification, indicating that we should be awoken later when a value is ready or when conditions yielding a spurious failure have passed. providing a means to poll a `&mut Receiver<T>` avoids the performance impact of boxing the receiver as an erased `dyn Future` trait object, or of using an `tokio::sync::mpsc::Receiver<T>`, or the ergonomic wrinkles of needing to rely on pin projection in asynchronous types that compose on top of oneshot channels. --- * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.poll_recv * https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#method.poll_recv * https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll * https://docs.rs/tokio/latest/tokio/sync/oneshot/struct.Receiver.html#method.try_recv * https://github.com/hyperium/http-body/pull/100/files#r1399818104 * hyperium/http-body#100
1 parent b3ff911 commit a242043

File tree

1 file changed

+57
-22
lines changed

1 file changed

+57
-22
lines changed

tokio/src/sync/oneshot.rs

+57-22
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,62 @@ impl<T> Receiver<T> {
931931
}
932932
}
933933

934+
/// Polls the channel for a value.
935+
///
936+
/// If a pending value exists in the channel, it is returned. If no value
937+
/// has been sent, the current task will be registered for future
938+
/// notification.
939+
///
940+
/// When the method returns `Poll::Pending`, the `Waker` in the provided
941+
/// `Context` is scheduled to receive a wakeup when a message is sent on any
942+
/// receiver, or when the channel is closed.
943+
///
944+
/// If this method returns `Poll::Pending` due to a spurious failure, then
945+
/// the `Waker` will be notified when the situation causing the spurious
946+
/// failure has been resolved. Note that receiving such a wakeup does not
947+
/// guarantee that the next call will succeed — it could fail with another
948+
/// spurious failure.
949+
///
950+
/// To attempt to receive a value from outside of the context of an
951+
/// asynchronous task, or to avoid spurious failures, see the `try_recv`
952+
/// method.
953+
///
954+
/// # Return
955+
///
956+
/// * `Poll::Pending` if no value has been sent yet but the channel is not
957+
/// closed, or if a spurious failure happens.
958+
/// * `Poll::Ready(Ok(T))` is a value is pending in the channel.
959+
/// * `Poll::Ready(Err(RecvError))` if the sender has dropped without
960+
/// sending a value.
961+
///
962+
/// # Panics
963+
///
964+
/// This function panics if it is called after a value has been received.
965+
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
966+
// If `inner` is `None`, then `poll()` has already completed.
967+
#[cfg(all(tokio_unstable, feature = "tracing"))]
968+
let _res_span = self.resource_span.clone().entered();
969+
#[cfg(all(tokio_unstable, feature = "tracing"))]
970+
let _ao_span = self.async_op_span.clone().entered();
971+
#[cfg(all(tokio_unstable, feature = "tracing"))]
972+
let _ao_poll_span = self.async_op_poll_span.clone().entered();
973+
974+
let ret = if let Some(inner) = self.inner.as_ref() {
975+
#[cfg(all(tokio_unstable, feature = "tracing"))]
976+
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
977+
978+
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
979+
let res = ready!(inner.poll_recv(cx))?;
980+
981+
res
982+
} else {
983+
panic!("called after complete");
984+
};
985+
986+
self.inner = None;
987+
Ready(Ok(ret))
988+
}
989+
934990
/// Attempts to receive a value.
935991
///
936992
/// If a pending value exists in the channel, it is returned. If no value
@@ -1096,28 +1152,7 @@ impl<T> Future for Receiver<T> {
10961152
type Output = Result<T, RecvError>;
10971153

10981154
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1099-
// If `inner` is `None`, then `poll()` has already completed.
1100-
#[cfg(all(tokio_unstable, feature = "tracing"))]
1101-
let _res_span = self.resource_span.clone().entered();
1102-
#[cfg(all(tokio_unstable, feature = "tracing"))]
1103-
let _ao_span = self.async_op_span.clone().entered();
1104-
#[cfg(all(tokio_unstable, feature = "tracing"))]
1105-
let _ao_poll_span = self.async_op_poll_span.clone().entered();
1106-
1107-
let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
1108-
#[cfg(all(tokio_unstable, feature = "tracing"))]
1109-
let res = ready!(trace_poll_op!("poll_recv", inner.poll_recv(cx)))?;
1110-
1111-
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
1112-
let res = ready!(inner.poll_recv(cx))?;
1113-
1114-
res
1115-
} else {
1116-
panic!("called after complete");
1117-
};
1118-
1119-
self.inner = None;
1120-
Ready(Ok(ret))
1155+
self.poll_recv(cx)
11211156
}
11221157
}
11231158

0 commit comments

Comments
 (0)