Skip to content

Commit

Permalink
feat(s2n-quic-dc): add channel recv buffer impl
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Mar 12, 2025
1 parent c8de638 commit fbd3890
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 41 deletions.
6 changes: 6 additions & 0 deletions dc/s2n-quic-dc/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ impl TransportFeatures {
is_feature!(is_stream, STREAM);
is_feature!(is_connected, CONNECTED);
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Actor {
Application,
Worker,
}
3 changes: 2 additions & 1 deletion dc/s2n-quic-dc/src/stream/client/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,6 @@ where
#[inline]
fn recv_buffer() -> recv::shared::RecvBuffer {
// TODO replace this with a parameter once everything is in place
recv::buffer::Local::new(msg::recv::Message::new(9000), None)
let recv_buffer = recv::buffer::Local::new(msg::recv::Message::new(9000), None);
recv::buffer::Either::A(recv_buffer)
}
3 changes: 2 additions & 1 deletion dc/s2n-quic-dc/src/stream/recv/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
clock::Timer,
event::{self, ConnectionPublisher as _},
msg,
stream::{recv, runtime, shared::ArcShared, socket},
stream::{recv, runtime, shared::ArcShared, socket, Actor},
};
use core::{
fmt,
Expand Down Expand Up @@ -265,6 +265,7 @@ where

let recv = reader.poll_fill_recv_buffer(
cx,
Actor::Application,
self.sockets.read_application(),
&self.shared.clock,
&self.shared.subscriber,
Expand Down
12 changes: 8 additions & 4 deletions dc/s2n-quic-dc/src/stream/recv/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

use crate::{
event,
stream::{recv, socket::Socket, TransportFeatures},
stream::{recv, socket::Socket, Actor, TransportFeatures},
};
use core::task::{Context, Poll};
use std::io;

pub mod channel;
mod dispatch;
mod local;

pub use channel::Channel;
pub use dispatch::Dispatch;
pub use local::Local;

Expand All @@ -20,6 +22,7 @@ pub trait Buffer {
fn poll_fill<S, Pub>(
&mut self,
cx: &mut Context,
actor: Actor,
socket: &S,
publisher: &mut Pub,
) -> Poll<io::Result<usize>>
Expand All @@ -36,7 +39,7 @@ pub trait Buffer {
R: Dispatch;
}

#[allow(dead_code)] // TODO remove this once we start using the channel buffer
#[derive(Debug)]
pub enum Either<A, B> {
A(A),
B(B),
Expand All @@ -59,6 +62,7 @@ where
fn poll_fill<S, Pub>(
&mut self,
cx: &mut Context,
actor: Actor,
socket: &S,
publisher: &mut Pub,
) -> Poll<io::Result<usize>>
Expand All @@ -67,8 +71,8 @@ where
Pub: event::ConnectionPublisher,
{
match self {
Self::A(a) => a.poll_fill(cx, socket, publisher),
Self::B(b) => b.poll_fill(cx, socket, publisher),
Self::A(a) => a.poll_fill(cx, actor, socket, publisher),
Self::B(b) => b.poll_fill(cx, actor, socket, publisher),
}
}

Expand Down
134 changes: 134 additions & 0 deletions dc/s2n-quic-dc/src/stream/recv/buffer/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::Dispatch;
use crate::{
event,
socket::recv::descriptor::Filled,
stream::{
recv::{
self,
dispatch::{Control, Stream},
},
socket::Socket,
Actor, TransportFeatures,
},
};
use core::task::{Context, Poll};
use s2n_quic_core::ensure;
use std::{collections::VecDeque, io};

#[derive(Debug)]
pub struct Channel<Recv = Stream> {
pending: VecDeque<Filled>,
receiver: Recv,
}

impl<Recv> Channel<Recv> {
#[inline]
pub fn new(receiver: Recv) -> Self {
Self {
pending: VecDeque::new(),
receiver,
}
}
}

macro_rules! impl_buffer {
($recv:ident) => {
impl super::Buffer for Channel<$recv> {
#[inline]
fn is_empty(&self) -> bool {
self.pending.is_empty()
}

#[inline]
fn poll_fill<S, Pub>(
&mut self,
cx: &mut Context,
actor: Actor,
socket: &S,
publisher: &mut Pub,
) -> Poll<io::Result<usize>>
where
S: ?Sized + Socket,
Pub: event::ConnectionPublisher,
{
// check if we've already filled the queue
ensure!(self.pending.is_empty(), Ok(1).into());

let capacity = u16::MAX as usize;

// the socket isn't actually used since we're relying on another task to fill the `receiver` channel
let _ = socket;

let result = self
.receiver
.poll_swap(cx, actor, &mut self.pending)
.map_err(|_err| io::Error::from(io::ErrorKind::BrokenPipe));

match result {
Poll::Ready(Ok(())) => {
let committed_len = self
.pending
.iter()
.map(|segment| {
debug_assert!(
!segment.is_empty(),
"the channel should not contain empty packets"
);
segment.len() as usize
})
.sum::<usize>();
publisher.on_stream_read_socket_flushed(
event::builder::StreamReadSocketFlushed {
capacity,
committed_len,
},
);
Ok(committed_len).into()
}
Poll::Ready(Err(error)) => {
let errno = error.raw_os_error();
publisher.on_stream_read_socket_errored(
event::builder::StreamReadSocketErrored { capacity, errno },
);
Err(error).into()
}
Poll::Pending => {
publisher.on_stream_read_socket_blocked(
event::builder::StreamReadSocketBlocked { capacity },
);
Poll::Pending
}
}
}

#[inline]
fn process<R>(
&mut self,
features: TransportFeatures,
router: &mut R,
) -> Result<(), recv::Error>
where
R: Dispatch,
{
debug_assert!(
!features.is_stream(),
"only datagram oriented transport is supported"
);

for mut segment in self.pending.drain(..) {
let remote_addr = segment.remote_address().get();
let ecn = segment.ecn();
router.on_datagram_segment(&remote_addr, ecn, segment.payload_mut())?;
}

Ok(())
}
}
};
}

impl_buffer!(Stream);
impl_buffer!(Control);
3 changes: 2 additions & 1 deletion dc/s2n-quic-dc/src/stream/recv/buffer/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::Dispatch;
use crate::{
event, msg,
stream::{recv, server::handshake, socket::Socket, TransportFeatures},
stream::{recv, server::handshake, socket::Socket, Actor, TransportFeatures},
};
use core::task::{Context, Poll};
use s2n_codec::{DecoderBufferMut, DecoderError};
Expand Down Expand Up @@ -37,6 +37,7 @@ impl super::Buffer for Local {
fn poll_fill<S, Pub>(
&mut self,
cx: &mut Context,
_actor: Actor,
socket: &S,
publisher: &mut Pub,
) -> Poll<io::Result<usize>>
Expand Down
17 changes: 11 additions & 6 deletions dc/s2n-quic-dc/src/stream/recv/dispatch/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{descriptor::Descriptor, queue::Error};
use crate::sync::ring_deque;
use crate::{stream::Actor, sync::ring_deque};
use core::{
fmt,
task::{Context, Poll},
Expand Down Expand Up @@ -41,22 +41,27 @@ macro_rules! impl_recv {
}

#[inline]
pub async fn recv(&self) -> Result<T, ring_deque::Closed> {
core::future::poll_fn(|cx| self.poll_recv(cx)).await
pub async fn recv(&self, actor: Actor) -> Result<T, ring_deque::Closed> {
core::future::poll_fn(|cx| self.poll_recv(cx, actor)).await
}

#[inline]
pub fn poll_recv(&self, cx: &mut Context) -> Poll<Result<T, ring_deque::Closed>> {
unsafe { self.descriptor.$field().poll_pop(cx) }
pub fn poll_recv(
&self,
cx: &mut Context,
actor: Actor,
) -> Poll<Result<T, ring_deque::Closed>> {
unsafe { self.descriptor.$field().poll_pop(cx, actor) }
}

#[inline]
pub fn poll_swap(
&self,
cx: &mut Context,
actor: Actor,
out: &mut VecDeque<T>,
) -> Poll<Result<(), ring_deque::Closed>> {
unsafe { self.descriptor.$field().poll_swap(cx, out) }
unsafe { self.descriptor.$field().poll_swap(cx, actor, out) }
}
}

Expand Down
Loading

0 comments on commit fbd3890

Please sign in to comment.