Skip to content

Commit bd8e1bb

Browse files
Replace requested_handshakes map with immediate request
This avoids issues with bounding the map, and allows us to schedule handshakes immediately. We avoid a building herd of re-requested handshakes by rescheduling a handshake by handshake period just after our attempt to handshake.
1 parent b871ad0 commit bd8e1bb

File tree

5 files changed

+72
-65
lines changed

5 files changed

+72
-65
lines changed

dc/s2n-quic-dc/src/path/secret/map.rs

+2-7
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,8 @@ impl Map {
9090
self.store.contains(peer)
9191
}
9292

93-
/// Check whether we would like to (re-)handshake with this peer.
94-
///
95-
/// Note that this is distinct from `contains`, we may already have *some* credentials for a
96-
/// peer but still be interested in handshaking (e.g., due to periodic refresh of the
97-
/// credentials).
98-
pub fn needs_handshake(&self, peer: &SocketAddr) -> bool {
99-
self.store.needs_handshake(peer)
93+
pub fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
94+
self.store.register_request_handshake(cb);
10095
}
10196

10297
/// Gets the [`Peer`] entry for the given address

dc/s2n-quic-dc/src/path/secret/map/cleaner.rs

+4-23
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl Cleaner {
103103
let mut address_entries_initial = 0usize;
104104
let mut address_entries_retired = 0usize;
105105
let mut address_entries_active = 0usize;
106+
let mut handshake_requests = 0usize;
106107

107108
// For non-retired entries, if it's time for them to handshake again, request a
108109
// handshake to happen. This handshake will currently happen on the next request for this
@@ -115,7 +116,9 @@ impl Cleaner {
115116
current_epoch.saturating_sub(retired_at) < eviction_cycles
116117
} else {
117118
if entry.rehandshake_time() <= now {
119+
handshake_requests += 1;
118120
state.request_handshake(*entry.peer());
121+
entry.rehandshake_time_reschedule(state.rehandshake_period());
119122
}
120123

121124
// always retain
@@ -152,28 +155,6 @@ impl Cleaner {
152155
retained
153156
});
154157

155-
// Iteration order should be effectively random, so this effectively just prunes the list
156-
// periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note
157-
// that peers the application is actively interested in will typically bypass this list, so
158-
// this is mostly a risk of delaying regular re-handshaking with very large cardinalities.
159-
//
160-
// FIXME: Long or mid-term it likely makes sense to replace this data structure with a
161-
// fuzzy set of some kind and/or just moving to immediate background handshake attempts.
162-
const MAX_REQUESTED_HANDSHAKES: usize = 5000;
163-
164-
let mut handshake_requests = 0usize;
165-
let mut handshake_requests_retired = 0usize;
166-
state.requested_handshakes.pin().retain(|_| {
167-
handshake_requests += 1;
168-
let retain = handshake_requests < MAX_REQUESTED_HANDSHAKES;
169-
170-
if !retain {
171-
handshake_requests_retired += 1;
172-
}
173-
174-
retain
175-
});
176-
177158
let id_entries = id_entries_initial - id_entries_retired;
178159
let address_entries = address_entries_initial - address_entries_retired;
179160

@@ -192,7 +173,7 @@ impl Cleaner {
192173
address_entries_initial_utilization: utilization(address_entries_initial),
193174
address_entries_retired,
194175
handshake_requests,
195-
handshake_requests_retired,
176+
handshake_requests_retired: 0,
196177
},
197178
);
198179
}

dc/s2n-quic-dc/src/path/secret/map/entry.rs

+28-9
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use s2n_quic_core::{dc, varint::VarInt};
2222
use std::{
2323
net::SocketAddr,
2424
sync::{
25-
atomic::{AtomicU8, Ordering},
25+
atomic::{AtomicU32, AtomicU8, Ordering},
2626
Arc,
2727
},
2828
time::{Duration, Instant},
@@ -34,7 +34,7 @@ mod tests;
3434
#[derive(Debug)]
3535
pub(super) struct Entry {
3636
creation_time: Instant,
37-
rehandshake_delta_secs: u32,
37+
rehandshake_delta_secs: AtomicU32,
3838
peer: SocketAddr,
3939
secret: schedule::Secret,
4040
retired: IsRetired,
@@ -72,6 +72,7 @@ impl SizeOf for Entry {
7272
}
7373

7474
impl SizeOf for AtomicU8 {}
75+
impl SizeOf for AtomicU32 {}
7576

7677
impl Entry {
7778
pub fn new(
@@ -88,20 +89,19 @@ impl Entry {
8889
.fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed);
8990

9091
assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
91-
Self {
92+
let entry = Self {
9293
creation_time: Instant::now(),
93-
// Schedule another handshake sometime in [5 minutes, rehandshake_time] from now.
94-
rehandshake_delta_secs: rand::thread_rng().gen_range(
95-
std::cmp::min(rehandshake_time.as_secs(), 360)..rehandshake_time.as_secs(),
96-
) as u32,
94+
rehandshake_delta_secs: AtomicU32::new(0),
9795
peer,
9896
secret,
9997
retired: Default::default(),
10098
sender,
10199
receiver,
102100
parameters,
103101
accessed: AtomicU8::new(0),
104-
}
102+
};
103+
entry.rehandshake_time_reschedule(rehandshake_time);
104+
entry
105105
}
106106

107107
#[cfg(any(test, feature = "testing"))]
@@ -246,7 +246,26 @@ impl Entry {
246246
}
247247

248248
pub fn rehandshake_time(&self) -> Instant {
249-
self.creation_time + Duration::from_secs(u64::from(self.rehandshake_delta_secs))
249+
self.creation_time
250+
+ Duration::from_secs(u64::from(
251+
self.rehandshake_delta_secs.load(Ordering::Relaxed),
252+
))
253+
}
254+
255+
/// Reschedule the handshake some time into the future.
256+
pub fn rehandshake_time_reschedule(&self, rehandshake_period: Duration) {
257+
// The goal of rescheduling is to avoid continuously re-handshaking for N (possibly stale)
258+
// peers every cleaner loop, instead we defer handshakes out again. This effectively acts
259+
// as a (slow) retry mechanism.
260+
let delta = rand::thread_rng().gen_range(
261+
std::cmp::min(rehandshake_period.as_secs(), 360)..rehandshake_period.as_secs(),
262+
) as u32;
263+
// This can't practically overflow -- each time we add we push out the next add by at least
264+
// that much time. The fastest this loops is then running once every 360 seconds and adding
265+
// 360 each time. That takes (2**32/360)*360 to fill u32, which happens after 136 years of
266+
// continuous execution.
267+
self.rehandshake_delta_secs
268+
.fetch_add(delta, Ordering::Relaxed);
250269
}
251270

252271
pub fn age(&self) -> Duration {

dc/s2n-quic-dc/src/path/secret/map/state.rs

+36-24
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use s2n_quic_core::{
1717
use std::{
1818
hash::{BuildHasherDefault, Hasher},
1919
net::{Ipv4Addr, SocketAddr},
20-
sync::{Arc, Mutex, Weak},
20+
sync::{Arc, Mutex, RwLock, Weak},
2121
time::Duration,
2222
};
2323

@@ -64,11 +64,6 @@ where
6464
// needed.
6565
pub(super) peers: fixed_map::Map<SocketAddr, Arc<Entry>>,
6666

67-
// Stores the set of SocketAddr for which we received a UnknownPathSecret packet.
68-
// When handshake_with is called we will allow a new handshake if this contains a socket, this
69-
// is a temporary solution until we implement proper background handshaking.
70-
pub(super) requested_handshakes: flurry::HashSet<SocketAddr>,
71-
7267
// All known entries.
7368
pub(super) ids: fixed_map::Map<Id, Arc<Entry>, BuildHasherDefault<NoopIdHasher>>,
7469

@@ -78,6 +73,9 @@ where
7873
// FIXME: This will get replaced with sending on a handshake socket associated with the map.
7974
pub(super) control_socket: Arc<std::net::UdpSocket>,
8075

76+
#[allow(clippy::type_complexity)]
77+
pub(super) request_handshake: RwLock<Option<Box<dyn Fn(SocketAddr) + Send + Sync>>>,
78+
8179
cleaner: Cleaner,
8280

8381
init_time: Timestamp,
@@ -131,13 +129,13 @@ where
131129
rehandshake_period: Duration::from_secs(3600 * 24),
132130
peers: fixed_map::Map::with_capacity(capacity, Default::default()),
133131
ids: fixed_map::Map::with_capacity(capacity, Default::default()),
134-
requested_handshakes: Default::default(),
135132
cleaner: Cleaner::new(),
136133
signer,
137134
control_socket,
138135
init_time,
139136
clock,
140137
subscriber,
138+
request_handshake: RwLock::new(None),
141139
};
142140

143141
let state = Arc::new(state);
@@ -152,19 +150,36 @@ where
152150
}
153151

154152
pub fn request_handshake(&self, peer: SocketAddr) {
155-
// The length is reset as part of cleanup to 5000.
156-
let handshakes = self.requested_handshakes.pin();
157-
if handshakes.len() <= 6000 {
158-
handshakes.insert(peer);
159-
self.subscriber()
160-
.on_path_secret_map_background_handshake_requested(
161-
event::builder::PathSecretMapBackgroundHandshakeRequested {
162-
peer_address: SocketAddress::from(peer).into_event(),
163-
},
164-
);
153+
self.subscriber()
154+
.on_path_secret_map_background_handshake_requested(
155+
event::builder::PathSecretMapBackgroundHandshakeRequested {
156+
peer_address: SocketAddress::from(peer).into_event(),
157+
},
158+
);
159+
160+
// Normally we'd expect callers to use the Subscriber to register interest in this, but the
161+
// Map is typically created *before* the s2n_quic::Client with the dc provider registered.
162+
//
163+
// Users of the state tracker typically register the callback when creating a new s2n-quic
164+
// client to handshake into this map.
165+
if let Some(callback) = self
166+
.request_handshake
167+
.read()
168+
.unwrap_or_else(|e| e.into_inner())
169+
.as_deref()
170+
{
171+
(callback)(peer);
165172
}
166173
}
167174

175+
fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
176+
// FIXME: Maybe panic if already initialized?
177+
*self
178+
.request_handshake
179+
.write()
180+
.unwrap_or_else(|e| e.into_inner()) = Some(cb);
181+
}
182+
168183
fn handle_unknown_secret(
169184
&self,
170185
packet: &control::unknown_path_secret::Packet,
@@ -370,17 +385,10 @@ where
370385
self.peers.contains_key(peer)
371386
}
372387

373-
fn needs_handshake(&self, peer: &SocketAddr) -> bool {
374-
self.requested_handshakes.pin().contains(peer)
375-
}
376-
377388
fn on_new_path_secrets(&self, entry: Arc<Entry>) {
378389
let id = *entry.id();
379390
let peer = entry.peer();
380391

381-
// On insert clear our interest in a handshake.
382-
self.requested_handshakes.pin().remove(peer);
383-
384392
let (same, other) = self.ids.insert(id, entry.clone());
385393
if same.is_some() {
386394
// FIXME: Make insertion fallible and fail handshakes instead?
@@ -445,6 +453,10 @@ where
445453
});
446454
}
447455

456+
fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
457+
self.register_request_handshake(cb);
458+
}
459+
448460
fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>> {
449461
self.peers.get_by_key(peer)
450462
}

dc/s2n-quic-dc/src/path/secret/map/store.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ pub trait Store: 'static + Send + Sync {
2727

2828
fn contains(&self, peer: &SocketAddr) -> bool;
2929

30-
fn needs_handshake(&self, peer: &SocketAddr) -> bool;
31-
3230
fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>>;
3331

3432
fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>>;
@@ -47,6 +45,8 @@ pub trait Store: 'static + Send + Sync {
4745

4846
fn rehandshake_period(&self) -> Duration;
4947

48+
fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>);
49+
5050
fn check_dedup(
5151
&self,
5252
entry: &Entry,

0 commit comments

Comments
 (0)