Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic): New function added to Connection Limits provider #2508

Merged
merged 7 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion quic/s2n-quic-core/src/connection/limits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

#[cfg(feature = "alloc")]
use crate::application::ServerName;
use crate::{
ack,
event::{api::SocketAddress, IntoEvent},
Expand All @@ -12,6 +13,8 @@ use crate::{
MaxDatagramFrameSize, MaxIdleTimeout, MigrationSupport, TransportParameters,
},
};
#[cfg(feature = "alloc")]
use bytes::Bytes;
use core::time::Duration;
use s2n_codec::decoder_invariant;

Expand Down Expand Up @@ -53,6 +56,30 @@ impl<'a> ConnectionInfo<'a> {
}
}

#[non_exhaustive]
#[derive(Debug)]
#[cfg(feature = "alloc")]
pub struct HandshakeInfo<'a> {
pub remote_address: SocketAddress<'a>,
pub server_name: Option<&'a ServerName>,
pub application_protocol: &'a Bytes,
}

#[cfg(feature = "alloc")]
impl<'a> HandshakeInfo<'a> {
pub fn new(
remote_address: &'a inet::SocketAddress,
server_name: Option<&'a ServerName>,
application_protocol: &'a Bytes,
) -> HandshakeInfo<'a> {
Self {
remote_address: remote_address.into_event(),
server_name,
application_protocol,
}
}
}

#[derive(Clone, Copy, Debug)]
pub struct Limits {
pub(crate) max_idle_timeout: MaxIdleTimeout,
Expand Down Expand Up @@ -397,16 +424,41 @@ impl Limits {
}
}

#[must_use]
#[derive(Debug)]
pub struct UpdatableLimits<'a>(&'a mut Limits);

impl<'a> UpdatableLimits<'a> {
pub fn new(limits: &'a mut Limits) -> UpdatableLimits<'a> {
UpdatableLimits(limits)
}

pub fn with_stream_batch_size(&mut self, size: u8) {
self.0.stream_batch_size = size;
}
}

/// Creates limits for a given connection
pub trait Limiter: 'static + Send {
fn on_connection(&mut self, info: &ConnectionInfo) -> Limits;

/// Provides another opportunity to change connection limits with information
/// from the handshake
#[inline]
#[cfg(feature = "alloc")]
fn on_post_handshake(&mut self, info: &HandshakeInfo, limits: &mut UpdatableLimits) {
let _ = info;
let _ = limits;
}
}

/// Implement Limiter for a Limits struct
impl Limiter for Limits {
fn on_connection(&mut self, _into: &ConnectionInfo) -> Limits {
*self
}
#[cfg(feature = "alloc")]
fn on_post_handshake(&mut self, _info: &HandshakeInfo, _limits: &mut UpdatableLimits) {}
}

#[cfg(test)]
Expand All @@ -429,4 +481,15 @@ mod tests {
assert!(limits.with_bidirectional_remote_data_window(data).is_ok());
assert!(limits.with_unidirectional_data_window(data).is_ok());
}

// Limits can be updated through the UpdatableLimits wrapper
#[test]
fn updatable_limits() {
let mut limits = Limits::default();
assert_eq!(limits.stream_batch_size, 1);
let mut updatable_limits = UpdatableLimits::new(&mut limits);
let new_size = 10;
updatable_limits.with_stream_batch_size(new_size);
assert_eq!(limits.stream_batch_size, new_size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl connection::Trait for TestConnection {
_subscriber: &mut <Self::Config as endpoint::Config>::EventSubscriber,
_datagram: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_conn_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), connection::Error> {
Ok(())
}
Expand All @@ -148,6 +149,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_conn_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand All @@ -163,6 +165,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_conn_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand All @@ -178,6 +181,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand All @@ -193,6 +197,7 @@ impl connection::Trait for TestConnection {
_packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
_datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
_dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError> {
Ok(())
}
Expand Down
14 changes: 13 additions & 1 deletion quic/s2n-quic-transport/src/connection/connection_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl<Config: endpoint::Config> ConnectionImpl<Config> {
subscriber: &mut Config::EventSubscriber,
datagram: &mut Config::DatagramEndpoint,
dc: &mut Config::DcEndpoint,
limits: &mut Config::ConnectionLimits,
) -> Result<(), connection::Error> {
let mut publisher = self.event_context.publisher(timestamp, subscriber);
let space_manager = &mut self.space_manager;
Expand All @@ -285,6 +286,7 @@ impl<Config: endpoint::Config> ConnectionImpl<Config> {
&mut publisher,
datagram,
dc,
limits,
) {
Poll::Ready(Ok(())) => {}
// use `from` instead of `into` so the location is correctly captured
Expand Down Expand Up @@ -658,6 +660,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
parameters.event_subscriber,
parameters.datagram_endpoint,
parameters.dc_endpoint,
parameters.limits_endpoint,
) {
connection.with_event_publisher(
parameters.timestamp,
Expand Down Expand Up @@ -1128,12 +1131,13 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
subscriber: &mut Config::EventSubscriber,
datagram: &mut Config::DatagramEndpoint,
dc: &mut Config::DcEndpoint,
conn_limits: &mut Config::ConnectionLimits,
) -> Result<(), connection::Error> {
// reset the queued state first so that new wakeup request are not missed
self.wakeup_handle.wakeup_handled();

// check if crypto progress can be made
self.update_crypto_state(timestamp, subscriber, datagram, dc)?;
self.update_crypto_state(timestamp, subscriber, datagram, dc, conn_limits)?;

// return an error if the application set one
self.error?;
Expand Down Expand Up @@ -1220,6 +1224,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut Config::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
connection_limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
//= https://www.rfc-editor.org/rfc/rfc9000#section-7.2
//= type=TODO
Expand Down Expand Up @@ -1261,6 +1266,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
)?;
}

Expand All @@ -1278,6 +1284,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut Config::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
connection_limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
if let Some((space, handshake_status)) = self.space_manager.initial_mut() {
let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber);
Expand Down Expand Up @@ -1339,6 +1346,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
subscriber,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
)?;

// notify the connection a packet was processed
Expand All @@ -1359,6 +1367,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut Config::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
connection_limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber);

Expand Down Expand Up @@ -1444,6 +1453,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
subscriber,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
)?;

// notify the connection a packet was processed
Expand All @@ -1464,6 +1474,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
packet_interceptor: &mut Config::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut Config::DcEndpoint,
limits_endpoint: &mut Config::ConnectionLimits,
) -> Result<(), ProcessingError> {
let mut publisher = self.event_context.publisher(datagram.timestamp, subscriber);

Expand Down Expand Up @@ -1561,6 +1572,7 @@ impl<Config: endpoint::Config> connection::Trait for ConnectionImpl<Config> {
&mut publisher,
datagram_endpoint,
dc_endpoint,
limits_endpoint,
)?;
}
// notify the connection a packet was processed
Expand Down
11 changes: 11 additions & 0 deletions quic/s2n-quic-transport/src/connection/connection_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
subscriber: &mut <Self::Config as endpoint::Config>::EventSubscriber,
datagram: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
conn_limits: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), connection::Error>;

// Packet handling
Expand All @@ -126,6 +127,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when an unprotected initial packet had been received
Expand All @@ -139,6 +141,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when a handshake packet had been received
Expand All @@ -152,6 +155,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when a short packet had been received
Expand All @@ -165,6 +169,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
) -> Result<(), ProcessingError>;

/// Is called when a version negotiation packet had been received
Expand Down Expand Up @@ -225,6 +230,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
check_for_stateless_reset: &mut bool,
) -> Result<(), connection::Error> {
macro_rules! emit_drop_reason {
Expand Down Expand Up @@ -286,6 +292,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
),
ProtectedPacket::VersionNegotiation(packet) => self.handle_version_negotiation_packet(
datagram,
Expand All @@ -303,6 +310,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
),
ProtectedPacket::ZeroRtt(packet) => self.handle_zero_rtt_packet(
datagram,
Expand All @@ -320,6 +328,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
),
ProtectedPacket::Retry(packet) => {
self.handle_retry_packet(datagram, path_id, packet, subscriber, packet_interceptor)
Expand Down Expand Up @@ -378,6 +387,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor: &mut <Self::Config as endpoint::Config>::PacketInterceptor,
datagram_endpoint: &mut <Self::Config as endpoint::Config>::DatagramEndpoint,
dc_endpoint: &mut <Self::Config as endpoint::Config>::DcEndpoint,
connection_limits_endpoint: &mut <Self::Config as endpoint::Config>::ConnectionLimits,
check_for_stateless_reset: &mut bool,
) -> Result<(), connection::Error> {
macro_rules! emit_drop_reason {
Expand Down Expand Up @@ -432,6 +442,7 @@ pub trait ConnectionTrait: 'static + Send + Sized {
packet_interceptor,
datagram_endpoint,
dc_endpoint,
connection_limits_endpoint,
check_for_stateless_reset,
);

Expand Down
2 changes: 2 additions & 0 deletions quic/s2n-quic-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,6 @@ pub struct Parameters<'a, Cfg: endpoint::Config> {
pub dc_endpoint: &'a mut Cfg::DcEndpoint,
/// The event subscriber for the endpoint
pub event_subscriber: &'a mut Cfg::EventSubscriber,
/// The connection limits provider
pub limits_endpoint: &'a mut Cfg::ConnectionLimits,
}
3 changes: 3 additions & 0 deletions quic/s2n-quic-transport/src/endpoint/initial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl<Config: endpoint::Config> endpoint::Endpoint<Config> {
datagram_endpoint: endpoint_context.datagram,
dc_endpoint: endpoint_context.dc,
open_registry: None,
limits_endpoint: endpoint_context.connection_limits,
};

let mut connection = <Config as endpoint::Config>::Connection::new(connection_parameters)?;
Expand Down Expand Up @@ -367,6 +368,7 @@ impl<Config: endpoint::Config> endpoint::Endpoint<Config> {
endpoint_context.packet_interceptor,
endpoint_context.datagram,
endpoint_context.dc,
endpoint_context.connection_limits,
)
.map_err(|err| {
use connection::ProcessingError;
Expand All @@ -391,6 +393,7 @@ impl<Config: endpoint::Config> endpoint::Endpoint<Config> {
endpoint_context.packet_interceptor,
endpoint_context.datagram,
endpoint_context.dc,
endpoint_context.connection_limits,
&mut false,
)?;

Expand Down
Loading
Loading