Skip to content

Commit

Permalink
feat(s2n-quic-dc): implement queue allocator/dispatcher (#2517)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Mar 10, 2025
1 parent fb30a2b commit 25e2706
Show file tree
Hide file tree
Showing 14 changed files with 1,515 additions and 14 deletions.
2 changes: 2 additions & 0 deletions dc/s2n-quic-dc/src/socket/recv/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use s2n_quic_core::inet::{ExplicitCongestionNotification, SocketAddress};

/// Routes incoming packet segments to the appropriate destination
pub trait Router {
fn is_open(&self) -> bool;

#[inline(always)]
fn tag_len(&self) -> usize {
16
Expand Down
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/src/stream/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod ack;
pub mod application;
pub(crate) mod buffer;
pub mod dispatch;
mod error;
mod packet;
mod probes;
Expand Down
209 changes: 209 additions & 0 deletions dc/s2n-quic-dc/src/stream/recv/dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{credentials, packet, socket::recv::descriptor as desc, sync::ring_deque};
use s2n_quic_core::{inet::SocketAddress, varint::VarInt};
use tracing::debug;

mod descriptor;
mod free_list;
mod handle;
mod pool;
mod queue;
mod sender;

#[cfg(test)]
mod tests;

/// Allocate this many channels at a time
///
/// With `debug_assertions`, we allocate smaller pages to try and cover more
/// branches in the allocator logic around growth.
const PAGE_SIZE: usize = if cfg!(debug_assertions) { 8 } else { 256 };

pub type Error = queue::Error;
pub type Control = handle::Control<desc::Filled>;
pub type Stream = handle::Stream<desc::Filled>;

/// A queue allocator for registering a receiver to process packets
/// for a given ID.
#[derive(Clone)]
pub struct Allocator {
pool: pool::Pool<desc::Filled, PAGE_SIZE>,
}

impl Allocator {
pub fn new(
stream_capacity: impl Into<ring_deque::Capacity>,
control_capacity: impl Into<ring_deque::Capacity>,
) -> Self {
Self {
pool: pool::Pool::new(
VarInt::ZERO,
stream_capacity.into(),
control_capacity.into(),
),
}
}

/// Creates an allocator with a non-zero queue id
///
/// This is used for patterns where the `queue_id=0` is special and used to
/// indicate newly initialized flows waiting to be assigned. For example,
/// a client sends a packet with `queue_id=0` to a server and waits for the
/// server to respond with an actual `queue_id` for future packets from the client.
pub fn new_non_zero(
stream_capacity: impl Into<ring_deque::Capacity>,
control_capacity: impl Into<ring_deque::Capacity>,
) -> Self {
Self {
pool: pool::Pool::new(
VarInt::from_u8(1),
stream_capacity.into(),
control_capacity.into(),
),
}
}

#[inline]
pub fn dispatcher(&self) -> Dispatch {
Dispatch {
senders: self.pool.senders(),
is_open: true,
}
}

#[inline]
pub fn alloc(&mut self) -> Option<(Control, Stream)> {
self.pool.alloc()
}

#[inline]
pub fn alloc_or_grow(&mut self) -> (Control, Stream) {
self.pool.alloc_or_grow()
}
}

/// A dispatcher which routes packets to the specified queue, if
/// there is a registered receiver.
#[derive(Clone)]
pub struct Dispatch {
senders: sender::Senders<desc::Filled, PAGE_SIZE>,
is_open: bool,
}

impl Dispatch {
#[inline]
pub fn send_control(
&mut self,
queue_id: VarInt,
segment: desc::Filled,
) -> Result<Option<desc::Filled>, Error> {
let mut res = Err(Error::Unallocated);
self.senders.lookup(queue_id, |sender| {
res = sender.send_control(segment);
});

if matches!(res, Err(Error::Closed)) {
self.is_open = false;
}

res
}

#[inline]
pub fn send_stream(
&mut self,
queue_id: VarInt,
segment: desc::Filled,
) -> Result<Option<desc::Filled>, Error> {
let mut res = Err(Error::Unallocated);
self.senders.lookup(queue_id, |sender| {
res = sender.send_stream(segment);
});

if matches!(res, Err(Error::Closed)) {
self.is_open = false;
}

res
}
}

impl crate::socket::recv::router::Router for Dispatch {
#[inline(always)]
fn is_open(&self) -> bool {
self.is_open
}

#[inline(always)]
fn tag_len(&self) -> usize {
16
}

/// implement this so we don't get warnings about not handling it
#[inline(always)]
fn handle_control_packet(
&mut self,
_remote_address: SocketAddress,
_ecn: s2n_quic_core::inet::ExplicitCongestionNotification,
_packet: packet::control::decoder::Packet,
) {
}

#[inline]
fn dispatch_control_packet(
&mut self,
_tag: packet::control::Tag,
id: Option<packet::stream::Id>,
credentials: credentials::Credentials,
segment: desc::Filled,
) {
let Some(id) = id else {
return;
};

match self.send_control(id.queue_id, segment) {
Ok(None) => {}
Ok(Some(_prev)) => {
// TODO increment metrics
debug!(queue_id = %id.queue_id, "control queue overflow");
}
Err(_) => {
// TODO increment metrics
debug!(stream_id = ?id, ?credentials, "unroutable control packet");
}
}
}

/// implement this so we don't get warnings about not handling it
#[inline(always)]
fn handle_stream_packet(
&mut self,
_remote_address: SocketAddress,
_ecn: s2n_quic_core::inet::ExplicitCongestionNotification,
_packet: packet::stream::decoder::Packet,
) {
}

#[inline]
fn dispatch_stream_packet(
&mut self,
_tag: packet::stream::Tag,
id: packet::stream::Id,
credentials: credentials::Credentials,
segment: desc::Filled,
) {
match self.send_stream(id.queue_id, segment) {
Ok(None) => {}
Ok(Some(_prev)) => {
// TODO increment metrics
debug!(queue_id = %id.queue_id, "stream queue overflow");
}
Err(_) => {
// TODO increment metrics
debug!(stream_id = ?id, ?credentials, "unroutable stream packet");
}
}
}
}
Git LFS file not shown
Loading

0 comments on commit 25e2706

Please sign in to comment.