From fbd3890a9eba159f88dae1a8eb19a4e17c5e55c5 Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Thu, 27 Feb 2025 13:33:56 -0700 Subject: [PATCH] feat(s2n-quic-dc): add channel recv buffer impl --- dc/s2n-quic-dc/src/stream.rs | 6 + dc/s2n-quic-dc/src/stream/client/tokio.rs | 3 +- dc/s2n-quic-dc/src/stream/recv/application.rs | 3 +- dc/s2n-quic-dc/src/stream/recv/buffer.rs | 12 +- .../src/stream/recv/buffer/channel.rs | 134 ++++++++++++++++++ .../src/stream/recv/buffer/local.rs | 3 +- .../src/stream/recv/dispatch/handle.rs | 17 ++- .../src/stream/recv/dispatch/queue.rs | 84 ++++++++--- .../src/stream/recv/dispatch/tests.rs | 5 +- dc/s2n-quic-dc/src/stream/recv/shared.rs | 13 +- dc/s2n-quic-dc/src/stream/recv/worker.rs | 3 +- .../src/stream/server/tokio/tcp/worker.rs | 1 + dc/s2n-quic-dc/src/stream/server/tokio/udp.rs | 1 + 13 files changed, 244 insertions(+), 41 deletions(-) create mode 100644 dc/s2n-quic-dc/src/stream/recv/buffer/channel.rs diff --git a/dc/s2n-quic-dc/src/stream.rs b/dc/s2n-quic-dc/src/stream.rs index a110eb2e1..df4864d31 100644 --- a/dc/s2n-quic-dc/src/stream.rs +++ b/dc/s2n-quic-dc/src/stream.rs @@ -70,3 +70,9 @@ impl TransportFeatures { is_feature!(is_stream, STREAM); is_feature!(is_connected, CONNECTED); } + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Actor { + Application, + Worker, +} diff --git a/dc/s2n-quic-dc/src/stream/client/tokio.rs b/dc/s2n-quic-dc/src/stream/client/tokio.rs index 2ec393e57..4146d66d9 100644 --- a/dc/s2n-quic-dc/src/stream/client/tokio.rs +++ b/dc/s2n-quic-dc/src/stream/client/tokio.rs @@ -161,5 +161,6 @@ where #[inline] fn recv_buffer() -> recv::shared::RecvBuffer { // TODO replace this with a parameter once everything is in place - recv::buffer::Local::new(msg::recv::Message::new(9000), None) + let recv_buffer = recv::buffer::Local::new(msg::recv::Message::new(9000), None); + recv::buffer::Either::A(recv_buffer) } diff --git a/dc/s2n-quic-dc/src/stream/recv/application.rs b/dc/s2n-quic-dc/src/stream/recv/application.rs index 7ab64ff52..f0a348052 100644 --- a/dc/s2n-quic-dc/src/stream/recv/application.rs +++ b/dc/s2n-quic-dc/src/stream/recv/application.rs @@ -5,7 +5,7 @@ use crate::{ clock::Timer, event::{self, ConnectionPublisher as _}, msg, - stream::{recv, runtime, shared::ArcShared, socket}, + stream::{recv, runtime, shared::ArcShared, socket, Actor}, }; use core::{ fmt, @@ -265,6 +265,7 @@ where let recv = reader.poll_fill_recv_buffer( cx, + Actor::Application, self.sockets.read_application(), &self.shared.clock, &self.shared.subscriber, diff --git a/dc/s2n-quic-dc/src/stream/recv/buffer.rs b/dc/s2n-quic-dc/src/stream/recv/buffer.rs index 35e5088b2..87333d78e 100644 --- a/dc/s2n-quic-dc/src/stream/recv/buffer.rs +++ b/dc/s2n-quic-dc/src/stream/recv/buffer.rs @@ -3,14 +3,16 @@ use crate::{ event, - stream::{recv, socket::Socket, TransportFeatures}, + stream::{recv, socket::Socket, Actor, TransportFeatures}, }; use core::task::{Context, Poll}; use std::io; +pub mod channel; mod dispatch; mod local; +pub use channel::Channel; pub use dispatch::Dispatch; pub use local::Local; @@ -20,6 +22,7 @@ pub trait Buffer { fn poll_fill( &mut self, cx: &mut Context, + actor: Actor, socket: &S, publisher: &mut Pub, ) -> Poll> @@ -36,7 +39,7 @@ pub trait Buffer { R: Dispatch; } -#[allow(dead_code)] // TODO remove this once we start using the channel buffer +#[derive(Debug)] pub enum Either { A(A), B(B), @@ -59,6 +62,7 @@ where fn poll_fill( &mut self, cx: &mut Context, + actor: Actor, socket: &S, publisher: &mut Pub, ) -> Poll> @@ -67,8 +71,8 @@ where Pub: event::ConnectionPublisher, { match self { - Self::A(a) => a.poll_fill(cx, socket, publisher), - Self::B(b) => b.poll_fill(cx, socket, publisher), + Self::A(a) => a.poll_fill(cx, actor, socket, publisher), + Self::B(b) => b.poll_fill(cx, actor, socket, publisher), } } diff --git a/dc/s2n-quic-dc/src/stream/recv/buffer/channel.rs b/dc/s2n-quic-dc/src/stream/recv/buffer/channel.rs new file mode 100644 index 000000000..99e39c4d9 --- /dev/null +++ b/dc/s2n-quic-dc/src/stream/recv/buffer/channel.rs @@ -0,0 +1,134 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::Dispatch; +use crate::{ + event, + socket::recv::descriptor::Filled, + stream::{ + recv::{ + self, + dispatch::{Control, Stream}, + }, + socket::Socket, + Actor, TransportFeatures, + }, +}; +use core::task::{Context, Poll}; +use s2n_quic_core::ensure; +use std::{collections::VecDeque, io}; + +#[derive(Debug)] +pub struct Channel { + pending: VecDeque, + receiver: Recv, +} + +impl Channel { + #[inline] + pub fn new(receiver: Recv) -> Self { + Self { + pending: VecDeque::new(), + receiver, + } + } +} + +macro_rules! impl_buffer { + ($recv:ident) => { + impl super::Buffer for Channel<$recv> { + #[inline] + fn is_empty(&self) -> bool { + self.pending.is_empty() + } + + #[inline] + fn poll_fill( + &mut self, + cx: &mut Context, + actor: Actor, + socket: &S, + publisher: &mut Pub, + ) -> Poll> + where + S: ?Sized + Socket, + Pub: event::ConnectionPublisher, + { + // check if we've already filled the queue + ensure!(self.pending.is_empty(), Ok(1).into()); + + let capacity = u16::MAX as usize; + + // the socket isn't actually used since we're relying on another task to fill the `receiver` channel + let _ = socket; + + let result = self + .receiver + .poll_swap(cx, actor, &mut self.pending) + .map_err(|_err| io::Error::from(io::ErrorKind::BrokenPipe)); + + match result { + Poll::Ready(Ok(())) => { + let committed_len = self + .pending + .iter() + .map(|segment| { + debug_assert!( + !segment.is_empty(), + "the channel should not contain empty packets" + ); + segment.len() as usize + }) + .sum::(); + publisher.on_stream_read_socket_flushed( + event::builder::StreamReadSocketFlushed { + capacity, + committed_len, + }, + ); + Ok(committed_len).into() + } + Poll::Ready(Err(error)) => { + let errno = error.raw_os_error(); + publisher.on_stream_read_socket_errored( + event::builder::StreamReadSocketErrored { capacity, errno }, + ); + Err(error).into() + } + Poll::Pending => { + publisher.on_stream_read_socket_blocked( + event::builder::StreamReadSocketBlocked { capacity }, + ); + Poll::Pending + } + } + } + + #[inline] + fn process( + &mut self, + features: TransportFeatures, + router: &mut R, + ) -> Result<(), recv::Error> + where + R: Dispatch, + { + debug_assert!( + !features.is_stream(), + "only datagram oriented transport is supported" + ); + + for mut segment in self.pending.drain(..) { + let remote_addr = segment.remote_address().get(); + let ecn = segment.ecn(); + router.on_datagram_segment(&remote_addr, ecn, segment.payload_mut())?; + } + + Ok(()) + } + } + }; +} + +impl_buffer!(Stream); +impl_buffer!(Control); diff --git a/dc/s2n-quic-dc/src/stream/recv/buffer/local.rs b/dc/s2n-quic-dc/src/stream/recv/buffer/local.rs index 0b965cc96..70905ac20 100644 --- a/dc/s2n-quic-dc/src/stream/recv/buffer/local.rs +++ b/dc/s2n-quic-dc/src/stream/recv/buffer/local.rs @@ -4,7 +4,7 @@ use super::Dispatch; use crate::{ event, msg, - stream::{recv, server::handshake, socket::Socket, TransportFeatures}, + stream::{recv, server::handshake, socket::Socket, Actor, TransportFeatures}, }; use core::task::{Context, Poll}; use s2n_codec::{DecoderBufferMut, DecoderError}; @@ -37,6 +37,7 @@ impl super::Buffer for Local { fn poll_fill( &mut self, cx: &mut Context, + _actor: Actor, socket: &S, publisher: &mut Pub, ) -> Poll> diff --git a/dc/s2n-quic-dc/src/stream/recv/dispatch/handle.rs b/dc/s2n-quic-dc/src/stream/recv/dispatch/handle.rs index 34554a8f9..7aa9a0b6d 100644 --- a/dc/s2n-quic-dc/src/stream/recv/dispatch/handle.rs +++ b/dc/s2n-quic-dc/src/stream/recv/dispatch/handle.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::{descriptor::Descriptor, queue::Error}; -use crate::sync::ring_deque; +use crate::{stream::Actor, sync::ring_deque}; use core::{ fmt, task::{Context, Poll}, @@ -41,22 +41,27 @@ macro_rules! impl_recv { } #[inline] - pub async fn recv(&self) -> Result { - core::future::poll_fn(|cx| self.poll_recv(cx)).await + pub async fn recv(&self, actor: Actor) -> Result { + core::future::poll_fn(|cx| self.poll_recv(cx, actor)).await } #[inline] - pub fn poll_recv(&self, cx: &mut Context) -> Poll> { - unsafe { self.descriptor.$field().poll_pop(cx) } + pub fn poll_recv( + &self, + cx: &mut Context, + actor: Actor, + ) -> Poll> { + unsafe { self.descriptor.$field().poll_pop(cx, actor) } } #[inline] pub fn poll_swap( &self, cx: &mut Context, + actor: Actor, out: &mut VecDeque, ) -> Poll> { - unsafe { self.descriptor.$field().poll_swap(cx, out) } + unsafe { self.descriptor.$field().poll_swap(cx, actor, out) } } } diff --git a/dc/s2n-quic-dc/src/stream/recv/dispatch/queue.rs b/dc/s2n-quic-dc/src/stream/recv/dispatch/queue.rs index 69cbdbdeb..db151156d 100644 --- a/dc/s2n-quic-dc/src/stream/recv/dispatch/queue.rs +++ b/dc/s2n-quic-dc/src/stream/recv/dispatch/queue.rs @@ -1,7 +1,10 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -use crate::sync::ring_deque::{Capacity, Closed, RecvWaker}; +use crate::{ + stream::Actor, + sync::ring_deque::{Capacity, Closed, RecvWaker}, +}; use core::task::{Context, Poll}; use s2n_quic_core::ensure; use std::{collections::VecDeque, sync::Mutex, task::Waker}; @@ -27,7 +30,17 @@ struct Inner { capacity: usize, is_open: bool, has_receiver: bool, - waker: Option, + app_waker: Option, + worker_waker: Option, +} + +impl Inner { + fn wake_all(&mut self) -> Wakers { + Wakers { + app_waker: self.app_waker.take(), + worker_waker: self.worker_waker.take(), + } + } } pub struct Queue { @@ -43,7 +56,8 @@ impl Queue { capacity: capacity.max, is_open: true, has_receiver: false, - waker: None, + app_waker: None, + worker_waker: None, }), } } @@ -65,11 +79,8 @@ impl Queue { trace!(has_overflow = prev.is_some(), "push"); inner.queue.push_back(value); - let waker = inner.waker.take(); + let _wakers = inner.wake_all(); drop(inner); - if let Some(waker) = waker { - waker.wake(); - } Ok(prev) } @@ -90,11 +101,8 @@ impl Queue { trace!(has_overflow = prev.is_some(), "push"); inner.queue.push_back(value); - let waker = inner.waker.take(); + let _wakers = inner.wake_all(); drop(inner); - if let Some(waker) = waker { - waker.wake(); - } prev } @@ -112,28 +120,46 @@ impl Queue { } #[inline] - pub fn poll_pop(&self, cx: &mut Context) -> Poll> { + pub fn poll_pop(&self, cx: &mut Context, actor: Actor) -> Poll> { let mut inner = self.lock()?; - trace!(has_items = !inner.queue.is_empty(), "poll_pop"); + trace!(has_items = !inner.queue.is_empty(), ?actor, "poll_pop"); if let Some(item) = inner.queue.pop_front() { Ok(item).into() } else { ensure!(inner.is_open, Err(Closed).into()); - inner.waker.update(cx); + match actor { + Actor::Application => &mut inner.app_waker, + Actor::Worker => &mut inner.worker_waker, + } + .update(cx); Poll::Pending } } #[inline] - pub fn poll_swap(&self, cx: &mut Context, items: &mut VecDeque) -> Poll> { + pub fn poll_swap( + &self, + cx: &mut Context, + actor: Actor, + items: &mut VecDeque, + ) -> Poll> { + debug_assert!(items.is_empty(), "destination items should be empty"); + let mut inner = self.lock()?; - trace!(items = 0, "poll_swap"); if inner.queue.is_empty() { ensure!(inner.is_open, Err(Closed).into()); - inner.waker.update(cx); + match actor { + Actor::Application => &mut inner.app_waker, + Actor::Worker => &mut inner.worker_waker, + } + .update(cx); + drop(inner); + trace!(items = 0, ?actor, "poll_swap"); return Poll::Pending; } core::mem::swap(items, &mut inner.queue); + drop(inner); + trace!(items = items.len(), ?actor, "poll_swap"); Ok(()).into() } @@ -158,7 +184,8 @@ impl Queue { }; trace!("closing receiver"); inner.has_receiver = false; - inner.waker = None; + inner.app_waker = None; + inner.worker_waker = None; inner.queue.clear(); } @@ -172,9 +199,7 @@ impl Queue { // Leave the remaining items in the queue in case the receiver wants them. // Notify the receiver that the queue is now closed - if let Some(waker) = inner.waker.take() { - waker.wake(); - } + let _wakers = inner.wake_all(); } #[inline] @@ -182,3 +207,20 @@ impl Queue { self.inner.lock().map_err(|_| Closed) } } + +struct Wakers { + app_waker: Option, + worker_waker: Option, +} + +impl Drop for Wakers { + #[inline] + fn drop(&mut self) { + if let Some(waker) = self.app_waker.take() { + waker.wake(); + } + if let Some(waker) = self.worker_waker.take() { + waker.wake(); + } + } +} diff --git a/dc/s2n-quic-dc/src/stream/recv/dispatch/tests.rs b/dc/s2n-quic-dc/src/stream/recv/dispatch/tests.rs index 797d89a1a..e9a049751 100644 --- a/dc/s2n-quic-dc/src/stream/recv/dispatch/tests.rs +++ b/dc/s2n-quic-dc/src/stream/recv/dispatch/tests.rs @@ -4,6 +4,7 @@ use super::*; use crate::{ socket::recv, + stream::Actor, testing::{ext::*, sim}, }; use bolero::{check, TypeGenerator}; @@ -284,13 +285,13 @@ fn alloc_drop_notify() { let (stream, control) = alloc.alloc_or_grow(); async move { - stream.recv().await.unwrap_err(); + stream.recv(Actor::Application).await.unwrap_err(); } .primary() .spawn(); async move { - control.recv().await.unwrap_err(); + control.recv(Actor::Application).await.unwrap_err(); } .primary() .spawn(); diff --git a/dc/s2n-quic-dc/src/stream/recv/shared.rs b/dc/s2n-quic-dc/src/stream/recv/shared.rs index 86ea5e66a..dd804271b 100644 --- a/dc/s2n-quic-dc/src/stream/recv/shared.rs +++ b/dc/s2n-quic-dc/src/stream/recv/shared.rs @@ -9,7 +9,7 @@ use crate::{ recv::{self, buffer::Buffer as _}, shared::{self, ArcShared, Half}, socket::{self, Socket}, - TransportFeatures, + Actor, TransportFeatures, }, task::waker::worker::Waker as WorkerWaker, }; @@ -28,7 +28,7 @@ use std::{ }, }; -pub type RecvBuffer = recv::buffer::Local; +pub type RecvBuffer = recv::buffer::Either; /// Who will send ACKs? #[derive(Clone, Copy, Debug, Default)] @@ -318,6 +318,7 @@ impl Inner { pub fn poll_fill_recv_buffer( &mut self, cx: &mut Context, + actor: Actor, socket: &S, clock: &C, subscriber: &shared::Subscriber, @@ -327,8 +328,12 @@ impl Inner { C: ?Sized + Clock, Sub: event::Subscriber, { - self.buffer - .poll_fill(cx, socket, &mut subscriber.publisher(clock.get_time())) + self.buffer.poll_fill( + cx, + actor, + socket, + &mut subscriber.publisher(clock.get_time()), + ) } #[inline] diff --git a/dc/s2n-quic-dc/src/stream/recv/worker.rs b/dc/s2n-quic-dc/src/stream/recv/worker.rs index 36c9fd29d..25ef1bd17 100644 --- a/dc/s2n-quic-dc/src/stream/recv/worker.rs +++ b/dc/s2n-quic-dc/src/stream/recv/worker.rs @@ -5,7 +5,7 @@ use crate::{ allocator::Allocator, clock::Timer, event, msg, - stream::{shared::ArcShared, socket::Socket}, + stream::{shared::ArcShared, socket::Socket, Actor}, }; use core::task::{Context, Poll}; use s2n_quic_core::{buffer, endpoint, ensure, ready, time::clock::Timer as _}; @@ -306,6 +306,7 @@ where let res = recv.poll_fill_recv_buffer( cx, + Actor::Worker, &self.socket, &self.shared.clock, &self.shared.subscriber, diff --git a/dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs b/dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs index 002fc33a0..9f62845df 100644 --- a/dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs +++ b/dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs @@ -317,6 +317,7 @@ impl WorkerState { // TCP doesn't use the route key so just pick 0 let queue_id = VarInt::ZERO; let recv_buffer = recv::buffer::Local::new(recv_buffer.take(), None); + let recv_buffer = recv::buffer::Either::A(recv_buffer); let stream_builder = match endpoint::accept_stream( now, diff --git a/dc/s2n-quic-dc/src/stream/server/tokio/udp.rs b/dc/s2n-quic-dc/src/stream/server/tokio/udp.rs index da9645b5a..3de474416 100644 --- a/dc/s2n-quic-dc/src/stream/server/tokio/udp.rs +++ b/dc/s2n-quic-dc/src/stream/server/tokio/udp.rs @@ -114,6 +114,7 @@ where // TODO allocate a queue for this stream let queue_id = VarInt::ZERO; let recv_buffer = recv::buffer::Local::new(self.recv_buffer.take(), Some(handshake)); + let recv_buffer = recv::buffer::Either::A(recv_buffer); let stream = match endpoint::accept_stream( now,