Skip to content

Commit 7752afb

Browse files
feat(s2n-quic-dc): update MTU on dc path when MTU is updated (#2327)
* feat(s2n-quic-dc): update MTU on dc path when MTU is updated * remove std * add tests and update map * typo * add test * add clone test * use fetch_min * add entry to the handshakingpath
1 parent 0fbb99d commit 7752afb

19 files changed

+421
-65
lines changed

dc/s2n-quic-dc/src/path/secret/map.rs

+20-8
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ impl Map {
298298
) -> Option<(seal::Once, Credentials, ApplicationParams)> {
299299
let state = self.state.peers.get_by_key(&peer)?;
300300
let (sealer, credentials) = state.uni_sealer();
301-
Some((sealer, credentials, state.parameters))
301+
Some((sealer, credentials, state.parameters.clone()))
302302
}
303303

304304
pub fn open_once(
@@ -319,7 +319,7 @@ impl Map {
319319
let state = self.state.peers.get_by_key(&peer)?;
320320
let keys = state.bidi_local(features);
321321

322-
Some((keys, state.parameters))
322+
Some((keys, state.parameters.clone()))
323323
}
324324

325325
pub fn pair_for_credentials(
@@ -330,7 +330,7 @@ impl Map {
330330
) -> Option<(Bidirectional, ApplicationParams)> {
331331
let state = self.pre_authentication(credentials, control_out)?;
332332

333-
let params = state.parameters;
333+
let params = state.parameters.clone();
334334
let keys = state.bidi_remote(self.clone(), credentials, features);
335335

336336
Some((keys, params))
@@ -684,13 +684,13 @@ impl Entry {
684684
secret: schedule::Secret,
685685
sender: sender::State,
686686
receiver: receiver::State,
687-
mut parameters: ApplicationParams,
687+
parameters: ApplicationParams,
688688
rehandshake_time: Duration,
689689
) -> Self {
690690
// clamp max datagram size to a well-known value
691-
parameters.max_datagram_size = parameters
691+
parameters
692692
.max_datagram_size
693-
.min(crate::stream::MAX_DATAGRAM_SIZE as _);
693+
.fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed);
694694

695695
assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
696696
Self {
@@ -911,6 +911,7 @@ pub struct HandshakingPath {
911911
parameters: ApplicationParams,
912912
endpoint_type: s2n_quic_core::endpoint::Type,
913913
secret: Option<schedule::Secret>,
914+
entry: Option<Arc<Entry>>,
914915
map: Map,
915916
}
916917

@@ -924,9 +925,10 @@ impl HandshakingPath {
924925
Self {
925926
peer: connection_info.remote_address.clone().into(),
926927
dc_version: connection_info.dc_version,
927-
parameters: connection_info.application_params,
928+
parameters: connection_info.application_params.clone(),
928929
endpoint_type,
929930
secret: None,
931+
entry: None,
930932
map,
931933
}
932934
}
@@ -1018,12 +1020,22 @@ impl dc::Path for HandshakingPath {
10181020
.expect("peer tokens are only received after secrets are ready"),
10191021
sender,
10201022
receiver,
1021-
self.parameters,
1023+
self.parameters.clone(),
10221024
self.map.state.rehandshake_period,
10231025
);
10241026
let entry = Arc::new(entry);
1027+
self.entry = Some(entry.clone());
10251028
self.map.insert(entry);
10261029
}
1030+
1031+
fn on_mtu_updated(&mut self, mtu: u16) {
1032+
if let Some(entry) = self.entry.as_ref() {
1033+
entry
1034+
.parameters
1035+
.max_datagram_size
1036+
.store(mtu, Ordering::Relaxed);
1037+
}
1038+
}
10271039
}
10281040

10291041
#[cfg(test)]

dc/s2n-quic-dc/src/stream/endpoint.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use s2n_quic_core::{
1919
inet::{ExplicitCongestionNotification, SocketAddress},
2020
varint::VarInt,
2121
};
22-
use std::{io, sync::Arc};
22+
use std::{
23+
io,
24+
sync::{atomic::Ordering, Arc},
25+
};
2326
use tracing::{debug_span, Instrument as _};
2427

2528
type Result<T = (), E = io::Error> = core::result::Result<T, E>;
@@ -193,7 +196,7 @@ where
193196
let flow = flow::non_blocking::State::new(flow_offset);
194197

195198
let path = send::path::Info {
196-
max_datagram_size: parameters.max_datagram_size,
199+
max_datagram_size: parameters.max_datagram_size.load(Ordering::Relaxed),
197200
send_quantum,
198201
ecn: ExplicitCongestionNotification::Ect0,
199202
next_expected_control_packet: VarInt::ZERO,

dc/s2n-quic-dc/src/stream/send/state.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ use s2n_quic_core::{
4242
varint::VarInt,
4343
};
4444
use slotmap::SlotMap;
45-
use std::collections::{BinaryHeap, VecDeque};
45+
use std::{
46+
collections::{BinaryHeap, VecDeque},
47+
sync::atomic::Ordering,
48+
};
4649
use tracing::{debug, trace};
4750

4851
pub mod probe;
@@ -118,7 +121,7 @@ pub struct PeerActivity {
118121
impl State {
119122
#[inline]
120123
pub fn new(stream_id: stream::Id, params: &ApplicationParams) -> Self {
121-
let max_datagram_size = params.max_datagram_size;
124+
let max_datagram_size = params.max_datagram_size.load(Ordering::Relaxed);
122125
let initial_max_data = params.remote_max_data;
123126
let local_max_data = params.local_send_max_data;
124127

quic/s2n-quic-core/src/dc.rs

+69-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ use crate::{
1111
transport::parameters::{DcSupportedVersions, InitialFlowControlLimits},
1212
varint::VarInt,
1313
};
14-
use core::{num::NonZeroU32, time::Duration};
14+
use core::{
15+
num::NonZeroU32,
16+
sync::atomic::{AtomicU16, Ordering},
17+
time::Duration,
18+
};
1519

1620
mod disabled;
1721
mod traits;
@@ -91,25 +95,37 @@ impl<'a> DatagramInfo<'a> {
9195
}
9296

9397
/// Various settings relevant to the dc path
94-
#[derive(Clone, Copy, Debug)]
98+
#[derive(Debug)]
9599
#[non_exhaustive]
96100
pub struct ApplicationParams {
97-
pub max_datagram_size: u16,
101+
pub max_datagram_size: AtomicU16,
98102
pub remote_max_data: VarInt,
99103
pub local_send_max_data: VarInt,
100104
pub local_recv_max_data: VarInt,
101105
// Actually a Duration, stored as milliseconds to shrink this struct
102106
pub max_idle_timeout: Option<NonZeroU32>,
103107
}
104108

109+
impl Clone for ApplicationParams {
110+
fn clone(&self) -> Self {
111+
Self {
112+
max_datagram_size: AtomicU16::new(self.max_datagram_size.load(Ordering::Relaxed)),
113+
remote_max_data: self.remote_max_data,
114+
local_send_max_data: self.local_send_max_data,
115+
local_recv_max_data: self.local_recv_max_data,
116+
max_idle_timeout: self.max_idle_timeout,
117+
}
118+
}
119+
}
120+
105121
impl ApplicationParams {
106122
pub fn new(
107123
max_datagram_size: u16,
108124
peer_flow_control_limits: &InitialFlowControlLimits,
109125
limits: &Limits,
110126
) -> Self {
111127
Self {
112-
max_datagram_size,
128+
max_datagram_size: AtomicU16::new(max_datagram_size),
113129
remote_max_data: peer_flow_control_limits.max_data,
114130
local_send_max_data: limits.initial_stream_limits().max_data_bidi_local,
115131
local_recv_max_data: limits.initial_stream_limits().max_data_bidi_remote,
@@ -125,3 +141,52 @@ impl ApplicationParams {
125141
Some(Duration::from_millis(self.max_idle_timeout?.get() as u64))
126142
}
127143
}
144+
145+
#[cfg(test)]
146+
mod tests {
147+
use crate::{
148+
connection::Limits, dc::ApplicationParams, transport::parameters::InitialFlowControlLimits,
149+
varint::VarInt,
150+
};
151+
use std::{sync::atomic::Ordering, time::Duration};
152+
153+
#[test]
154+
fn clone() {
155+
let initial_flow_control_limits = InitialFlowControlLimits {
156+
max_data: VarInt::from_u32(2222),
157+
..Default::default()
158+
};
159+
160+
let limits = Limits {
161+
bidirectional_local_data_window: 1234.try_into().unwrap(),
162+
bidirectional_remote_data_window: 6789.try_into().unwrap(),
163+
max_idle_timeout: Duration::from_millis(999).try_into().unwrap(),
164+
..Default::default()
165+
};
166+
167+
let params = ApplicationParams::new(9000, &initial_flow_control_limits, &limits);
168+
169+
assert_eq!(9000, params.max_datagram_size.load(Ordering::Relaxed));
170+
assert_eq!(limits.max_idle_timeout(), params.max_idle_timeout());
171+
assert_eq!(1234, params.local_send_max_data.as_u64());
172+
assert_eq!(6789, params.local_recv_max_data.as_u64());
173+
assert_eq!(2222, params.remote_max_data.as_u64());
174+
175+
let cloned_params = params.clone();
176+
177+
assert_eq!(
178+
params.max_datagram_size.load(Ordering::Relaxed),
179+
cloned_params.max_datagram_size.load(Ordering::Relaxed)
180+
);
181+
assert_eq!(params.max_idle_timeout, cloned_params.max_idle_timeout);
182+
assert_eq!(
183+
params.local_send_max_data,
184+
cloned_params.local_send_max_data
185+
);
186+
assert_eq!(
187+
params.local_recv_max_data,
188+
cloned_params.local_recv_max_data
189+
);
190+
assert_eq!(params.remote_max_data, cloned_params.remote_max_data);
191+
}
192+
}

quic/s2n-quic-core/src/dc/disabled.rs

+4
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,8 @@ impl Path for () {
4444
) {
4545
unimplemented!()
4646
}
47+
48+
fn on_mtu_updated(&mut self, _mtu: u16) {
49+
unimplemented!()
50+
}
4751
}

quic/s2n-quic-core/src/dc/testing.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
};
1111
use core::{num::NonZeroU32, time::Duration};
1212
use std::sync::{
13-
atomic::{AtomicU8, Ordering},
13+
atomic::{AtomicU16, AtomicU8, Ordering},
1414
Arc,
1515
};
1616

@@ -36,14 +36,19 @@ pub struct MockDcPath {
3636
pub on_peer_stateless_reset_tokens_count: u8,
3737
pub stateless_reset_tokens: Vec<stateless_reset::Token>,
3838
pub peer_stateless_reset_tokens: Vec<stateless_reset::Token>,
39+
pub mtu: u16,
3940
}
4041

4142
impl dc::Endpoint for MockDcEndpoint {
4243
type Path = MockDcPath;
4344

44-
fn new_path(&mut self, _connection_info: &ConnectionInfo) -> Option<Self::Path> {
45+
fn new_path(&mut self, connection_info: &ConnectionInfo) -> Option<Self::Path> {
4546
Some(MockDcPath {
4647
stateless_reset_tokens: self.stateless_reset_tokens.clone(),
48+
mtu: connection_info
49+
.application_params
50+
.max_datagram_size
51+
.load(Ordering::Relaxed),
4752
..Default::default()
4853
})
4954
}
@@ -76,10 +81,15 @@ impl dc::Path for MockDcPath {
7681
self.peer_stateless_reset_tokens
7782
.extend(stateless_reset_tokens);
7883
}
84+
85+
fn on_mtu_updated(&mut self, mtu: u16) {
86+
self.mtu = mtu
87+
}
7988
}
8089

90+
#[allow(clippy::declare_interior_mutable_const)]
8191
pub const TEST_APPLICATION_PARAMS: ApplicationParams = ApplicationParams {
82-
max_datagram_size: 1472,
92+
max_datagram_size: AtomicU16::new(1472),
8393
remote_max_data: VarInt::from_u32(1u32 << 25),
8494
local_send_max_data: VarInt::from_u32(1u32 << 25),
8595
local_recv_max_data: VarInt::from_u32(1u32 << 25),

quic/s2n-quic-core/src/dc/traits.rs

+10
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ pub trait Path: 'static + Send {
4545
&mut self,
4646
stateless_reset_tokens: impl Iterator<Item = &'a stateless_reset::Token>,
4747
);
48+
49+
/// Called when the MTU has been updated for the path
50+
fn on_mtu_updated(&mut self, mtu: u16);
4851
}
4952

5053
impl<P: Path> Path for Option<P> {
@@ -69,4 +72,11 @@ impl<P: Path> Path for Option<P> {
6972
path.on_peer_stateless_reset_tokens(stateless_reset_tokens)
7073
}
7174
}
75+
76+
#[inline]
77+
fn on_mtu_updated(&mut self, max_datagram_size: u16) {
78+
if let Some(path) = self {
79+
path.on_mtu_updated(max_datagram_size)
80+
}
81+
}
7282
}

0 commit comments

Comments
 (0)