@@ -10,15 +10,22 @@ use crate::{
10
10
use crucible_common:: x509:: TLSContext ;
11
11
use crucible_protocol:: { ReconciliationId , CRUCIBLE_MESSAGE_VERSION } ;
12
12
13
- use std:: { collections:: BTreeSet , net:: SocketAddr , sync:: Arc } ;
13
+ use std:: {
14
+ collections:: BTreeSet ,
15
+ net:: SocketAddr ,
16
+ sync:: {
17
+ atomic:: { AtomicU64 , Ordering } ,
18
+ Arc ,
19
+ } ,
20
+ } ;
14
21
15
22
use futures:: StreamExt ;
16
23
use slog:: { debug, error, info, o, warn, Logger } ;
17
24
use tokio:: {
18
25
io:: AsyncWriteExt ,
19
26
net:: { TcpSocket , TcpStream } ,
20
27
sync:: { mpsc, oneshot} ,
21
- time:: sleep_until,
28
+ time:: { sleep_until, Duration } ,
22
29
} ;
23
30
use tokio_util:: codec:: { Encoder , FramedRead } ;
24
31
use uuid:: Uuid ;
@@ -216,6 +223,9 @@ pub(crate) struct DownstairsClient {
216
223
217
224
/// Session ID for a clients connection to a downstairs.
218
225
connection_id : ConnectionId ,
226
+
227
+ /// Per-client delay, shared with the [`DownstairsClient`]
228
+ client_delay_us : Arc < AtomicU64 > ,
219
229
}
220
230
221
231
impl DownstairsClient {
@@ -226,6 +236,7 @@ impl DownstairsClient {
226
236
log : Logger ,
227
237
tls_context : Option < Arc < crucible_common:: x509:: TLSContext > > ,
228
238
) -> Self {
239
+ let client_delay_us = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
229
240
Self {
230
241
cfg,
231
242
client_task : Self :: new_io_task (
@@ -234,6 +245,7 @@ impl DownstairsClient {
234
245
false , // do not start the task until GoActive
235
246
client_id,
236
247
tls_context. clone ( ) ,
248
+ client_delay_us. clone ( ) ,
237
249
& log,
238
250
) ,
239
251
client_id,
@@ -253,6 +265,7 @@ impl DownstairsClient {
253
265
repair_info : None ,
254
266
io_state_count : ClientIOStateCount :: new ( ) ,
255
267
connection_id : ConnectionId ( 0 ) ,
268
+ client_delay_us,
256
269
}
257
270
}
258
271
@@ -262,6 +275,7 @@ impl DownstairsClient {
262
275
/// client will disappear into the void.
263
276
#[ cfg( test) ]
264
277
fn test_default ( ) -> Self {
278
+ let client_delay_us = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
265
279
let cfg = Arc :: new ( UpstairsConfig {
266
280
encryption_context : None ,
267
281
upstairs_id : Uuid :: new_v4 ( ) ,
@@ -290,6 +304,7 @@ impl DownstairsClient {
290
304
repair_info : None ,
291
305
io_state_count : ClientIOStateCount :: new ( ) ,
292
306
connection_id : ConnectionId ( 0 ) ,
307
+ client_delay_us,
293
308
}
294
309
}
295
310
@@ -652,6 +667,7 @@ impl DownstairsClient {
652
667
connect,
653
668
self . client_id ,
654
669
self . tls_context . clone ( ) ,
670
+ self . client_delay_us . clone ( ) ,
655
671
& self . log ,
656
672
) ;
657
673
}
@@ -662,6 +678,7 @@ impl DownstairsClient {
662
678
connect : bool ,
663
679
client_id : ClientId ,
664
680
tls_context : Option < Arc < TLSContext > > ,
681
+ client_delay_us : Arc < AtomicU64 > ,
665
682
log : & Logger ,
666
683
) -> ClientTaskHandle {
667
684
#[ cfg( test) ]
@@ -672,6 +689,7 @@ impl DownstairsClient {
672
689
connect,
673
690
client_id,
674
691
tls_context,
692
+ client_delay_us,
675
693
log,
676
694
)
677
695
} else {
@@ -685,6 +703,7 @@ impl DownstairsClient {
685
703
connect,
686
704
client_id,
687
705
tls_context,
706
+ client_delay_us,
688
707
log,
689
708
)
690
709
}
@@ -695,6 +714,7 @@ impl DownstairsClient {
695
714
connect : bool ,
696
715
client_id : ClientId ,
697
716
tls_context : Option < Arc < TLSContext > > ,
717
+ client_delay_us : Arc < AtomicU64 > ,
698
718
log : & Logger ,
699
719
) -> ClientTaskHandle {
700
720
// These channels must support at least MAX_ACTIVE_COUNT messages;
@@ -730,6 +750,7 @@ impl DownstairsClient {
730
750
log : log. clone ( ) ,
731
751
} ,
732
752
delay,
753
+ client_delay_us,
733
754
log,
734
755
} ;
735
756
c. run ( ) . await
@@ -1242,6 +1263,8 @@ impl DownstairsClient {
1242
1263
return false ;
1243
1264
}
1244
1265
1266
+ job. reply_time [ self . client_id ] = Some ( std:: time:: Instant :: now ( ) ) ;
1267
+
1245
1268
let mut jobs_completed_ok = job. state_count ( ) . completed_ok ( ) ;
1246
1269
let mut ackable = false ;
1247
1270
@@ -2220,6 +2243,11 @@ impl DownstairsClient {
2220
2243
None
2221
2244
}
2222
2245
}
2246
+
2247
+ /// Sets the per-client delay
2248
+ pub ( crate ) fn set_delay_us ( & self , delay : u64 ) {
2249
+ self . client_delay_us . store ( delay, Ordering :: Relaxed ) ;
2250
+ }
2223
2251
}
2224
2252
2225
2253
/// How to handle "promote to active" requests
@@ -2420,6 +2448,9 @@ struct ClientIoTask {
2420
2448
/// Handle for the rx task
2421
2449
recv_task : ClientRxTask ,
2422
2450
2451
+ /// Shared handle to receive per-client backpressure delay
2452
+ client_delay_us : Arc < AtomicU64 > ,
2453
+
2423
2454
log : Logger ,
2424
2455
}
2425
2456
@@ -2692,6 +2723,24 @@ impl ClientIoTask {
2692
2723
+ std:: marker:: Send
2693
2724
+ ' static ,
2694
2725
{
2726
+ // Delay communication with this client based on backpressure, to keep
2727
+ // the three clients relatively in sync with each other.
2728
+ //
2729
+ // We don't need to delay writes, because they're already constrained by
2730
+ // the global backpressure system and cannot build up an unbounded
2731
+ // queue. This is admittedly quite subtle; see crucible#1167 for
2732
+ // discussions and graphs.
2733
+ if !matches ! (
2734
+ m,
2735
+ ClientRequest :: Message ( Message :: Write { .. } )
2736
+ | ClientRequest :: RawMessage ( RawMessage :: Write { .. } , ..)
2737
+ ) {
2738
+ let d = self . client_delay_us . load ( Ordering :: Relaxed ) ;
2739
+ if d > 0 {
2740
+ tokio:: time:: sleep ( Duration :: from_micros ( d) ) . await ;
2741
+ }
2742
+ }
2743
+
2695
2744
// There's some duplication between this function and `cmd_loop` above,
2696
2745
// but it's not obvious whether there's a cleaner way to organize stuff.
2697
2746
tokio:: select! {
0 commit comments