@@ -10,15 +10,22 @@ use crate::{
10
10
use crucible_common:: x509:: TLSContext ;
11
11
use crucible_protocol:: { ReconciliationId , CRUCIBLE_MESSAGE_VERSION } ;
12
12
13
- use std:: { collections:: BTreeSet , net:: SocketAddr , sync:: Arc } ;
13
+ use std:: {
14
+ collections:: BTreeSet ,
15
+ net:: SocketAddr ,
16
+ sync:: {
17
+ atomic:: { AtomicU64 , Ordering } ,
18
+ Arc ,
19
+ } ,
20
+ } ;
14
21
15
22
use futures:: StreamExt ;
16
23
use slog:: { debug, error, info, o, warn, Logger } ;
17
24
use tokio:: {
18
25
io:: AsyncWriteExt ,
19
26
net:: { TcpSocket , TcpStream } ,
20
27
sync:: { mpsc, oneshot} ,
21
- time:: sleep_until,
28
+ time:: { sleep_until, Duration } ,
22
29
} ;
23
30
use tokio_util:: codec:: { Encoder , FramedRead } ;
24
31
use uuid:: Uuid ;
@@ -154,6 +161,12 @@ pub(crate) struct DownstairsClient {
154
161
/// IO state counters
155
162
pub ( crate ) io_state_count : ClientIOStateCount ,
156
163
164
+ /// Bytes in queues for this client
165
+ ///
166
+ /// This includes read, write, and write-unwritten jobs, and is used to
167
+ /// estimate per-client backpressure to keep the 3x downstairs in sync.
168
+ pub ( crate ) bytes_outstanding : u64 ,
169
+
157
170
/// UUID for this downstairs region
158
171
///
159
172
/// Unpopulated until provided by `Message::RegionInfo`
@@ -216,6 +229,9 @@ pub(crate) struct DownstairsClient {
216
229
217
230
/// Session ID for a clients connection to a downstairs.
218
231
connection_id : ConnectionId ,
232
+
233
+ /// Per-client delay, shared with the [`DownstairsClient`]
234
+ client_delay_us : Arc < AtomicU64 > ,
219
235
}
220
236
221
237
impl DownstairsClient {
@@ -226,6 +242,7 @@ impl DownstairsClient {
226
242
log : Logger ,
227
243
tls_context : Option < Arc < crucible_common:: x509:: TLSContext > > ,
228
244
) -> Self {
245
+ let client_delay_us = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
229
246
Self {
230
247
cfg,
231
248
client_task : Self :: new_io_task (
@@ -234,6 +251,7 @@ impl DownstairsClient {
234
251
false , // do not start the task until GoActive
235
252
client_id,
236
253
tls_context. clone ( ) ,
254
+ client_delay_us. clone ( ) ,
237
255
& log,
238
256
) ,
239
257
client_id,
@@ -252,7 +270,9 @@ impl DownstairsClient {
252
270
region_metadata : None ,
253
271
repair_info : None ,
254
272
io_state_count : ClientIOStateCount :: new ( ) ,
273
+ bytes_outstanding : 0 ,
255
274
connection_id : ConnectionId ( 0 ) ,
275
+ client_delay_us,
256
276
}
257
277
}
258
278
@@ -262,6 +282,7 @@ impl DownstairsClient {
262
282
/// client will disappear into the void.
263
283
#[ cfg( test) ]
264
284
fn test_default ( ) -> Self {
285
+ let client_delay_us = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
265
286
let cfg = Arc :: new ( UpstairsConfig {
266
287
encryption_context : None ,
267
288
upstairs_id : Uuid :: new_v4 ( ) ,
@@ -289,7 +310,9 @@ impl DownstairsClient {
289
310
region_metadata : None ,
290
311
repair_info : None ,
291
312
io_state_count : ClientIOStateCount :: new ( ) ,
313
+ bytes_outstanding : 0 ,
292
314
connection_id : ConnectionId ( 0 ) ,
315
+ client_delay_us,
293
316
}
294
317
}
295
318
@@ -407,9 +430,26 @@ impl DownstairsClient {
407
430
job : & mut DownstairsIO ,
408
431
new_state : IOState ,
409
432
) -> IOState {
433
+ let is_running =
434
+ matches ! ( new_state, IOState :: New | IOState :: InProgress ) ;
410
435
self . io_state_count . incr ( & new_state) ;
411
436
let old_state = job. state . insert ( self . client_id , new_state) ;
437
+ let was_running =
438
+ matches ! ( old_state, IOState :: New | IOState :: InProgress ) ;
412
439
self . io_state_count . decr ( & old_state) ;
440
+
441
+ // Update our bytes-in-flight counter
442
+ if was_running && !is_running {
443
+ self . bytes_outstanding = self
444
+ . bytes_outstanding
445
+ . checked_sub ( job. work . job_bytes ( ) )
446
+ . unwrap ( ) ;
447
+ } else if is_running && !was_running {
448
+ // This should only happen if a job is replayed, but that still
449
+ // counts!
450
+ self . bytes_outstanding += job. work . job_bytes ( ) ;
451
+ }
452
+
413
453
old_state
414
454
}
415
455
@@ -471,8 +511,12 @@ impl DownstairsClient {
471
511
}
472
512
473
513
/// Sets this job as skipped and moves it to `skipped_jobs`
514
+ ///
515
+ /// # Panics
516
+ /// If the job is not new or in-progress
474
517
pub ( crate ) fn skip_job ( & mut self , job : & mut DownstairsIO ) {
475
- self . set_job_state ( job, IOState :: Skipped ) ;
518
+ let prev_state = self . set_job_state ( job, IOState :: Skipped ) ;
519
+ assert ! ( matches!( prev_state, IOState :: New | IOState :: InProgress ) ) ;
476
520
self . skipped_jobs . insert ( job. ds_id ) ;
477
521
}
478
522
@@ -628,6 +672,7 @@ impl DownstairsClient {
628
672
}
629
673
630
674
self . connection_id . update ( ) ;
675
+
631
676
// Restart with a short delay
632
677
self . start_task ( true , auto_promote) ;
633
678
}
@@ -652,6 +697,7 @@ impl DownstairsClient {
652
697
connect,
653
698
self . client_id ,
654
699
self . tls_context . clone ( ) ,
700
+ self . client_delay_us . clone ( ) ,
655
701
& self . log ,
656
702
) ;
657
703
}
@@ -662,6 +708,7 @@ impl DownstairsClient {
662
708
connect : bool ,
663
709
client_id : ClientId ,
664
710
tls_context : Option < Arc < TLSContext > > ,
711
+ client_delay_us : Arc < AtomicU64 > ,
665
712
log : & Logger ,
666
713
) -> ClientTaskHandle {
667
714
#[ cfg( test) ]
@@ -672,6 +719,7 @@ impl DownstairsClient {
672
719
connect,
673
720
client_id,
674
721
tls_context,
722
+ client_delay_us,
675
723
log,
676
724
)
677
725
} else {
@@ -685,6 +733,7 @@ impl DownstairsClient {
685
733
connect,
686
734
client_id,
687
735
tls_context,
736
+ client_delay_us,
688
737
log,
689
738
)
690
739
}
@@ -695,6 +744,7 @@ impl DownstairsClient {
695
744
connect : bool ,
696
745
client_id : ClientId ,
697
746
tls_context : Option < Arc < TLSContext > > ,
747
+ client_delay_us : Arc < AtomicU64 > ,
698
748
log : & Logger ,
699
749
) -> ClientTaskHandle {
700
750
// These channels must support at least MAX_ACTIVE_COUNT messages;
@@ -730,6 +780,7 @@ impl DownstairsClient {
730
780
log : log. clone ( ) ,
731
781
} ,
732
782
delay,
783
+ client_delay_us,
733
784
log,
734
785
} ;
735
786
c. run ( ) . await
@@ -945,6 +996,9 @@ impl DownstairsClient {
945
996
IOState :: New
946
997
}
947
998
} ;
999
+ if r == IOState :: New {
1000
+ self . bytes_outstanding += io. work . job_bytes ( ) ;
1001
+ }
948
1002
self . io_state_count . incr ( & r) ;
949
1003
r
950
1004
}
@@ -1261,9 +1315,8 @@ impl DownstairsClient {
1261
1315
}
1262
1316
} ;
1263
1317
1264
- let old_state = job. state . insert ( self . client_id , new_state. clone ( ) ) ;
1265
- self . io_state_count . decr ( & old_state) ;
1266
- self . io_state_count . incr ( & new_state) ;
1318
+ // Update the state, maintaining various counters
1319
+ let old_state = self . set_job_state ( job, new_state. clone ( ) ) ;
1267
1320
1268
1321
/*
1269
1322
* Verify the job was InProgress
@@ -2209,6 +2262,10 @@ impl DownstairsClient {
2209
2262
( self . io_state_count . new + self . io_state_count . in_progress ) as usize
2210
2263
}
2211
2264
2265
+ pub ( crate ) fn total_bytes_outstanding ( & self ) -> usize {
2266
+ self . bytes_outstanding as usize
2267
+ }
2268
+
2212
2269
/// Returns a unique ID for the current connection, or `None`
2213
2270
///
2214
2271
/// This can be used to disambiguate between messages returned from
@@ -2220,6 +2277,16 @@ impl DownstairsClient {
2220
2277
None
2221
2278
}
2222
2279
}
2280
+
2281
+ /// Sets the per-client delay
2282
+ pub ( crate ) fn set_delay_us ( & self , delay : u64 ) {
2283
+ self . client_delay_us . store ( delay, Ordering :: Relaxed ) ;
2284
+ }
2285
+
2286
+ /// Looks up the per-client delay
2287
+ pub ( crate ) fn get_delay_us ( & self ) -> u64 {
2288
+ self . client_delay_us . load ( Ordering :: Relaxed )
2289
+ }
2223
2290
}
2224
2291
2225
2292
/// How to handle "promote to active" requests
@@ -2420,6 +2487,9 @@ struct ClientIoTask {
2420
2487
/// Handle for the rx task
2421
2488
recv_task : ClientRxTask ,
2422
2489
2490
+ /// Shared handle to receive per-client backpressure delay
2491
+ client_delay_us : Arc < AtomicU64 > ,
2492
+
2423
2493
log : Logger ,
2424
2494
}
2425
2495
@@ -2692,6 +2762,24 @@ impl ClientIoTask {
2692
2762
+ std:: marker:: Send
2693
2763
+ ' static ,
2694
2764
{
2765
+ // Delay communication with this client based on backpressure, to keep
2766
+ // the three clients relatively in sync with each other.
2767
+ //
2768
+ // We don't need to delay writes, because they're already constrained by
2769
+ // the global backpressure system and cannot build up an unbounded
2770
+ // queue. This is admittedly quite subtle; see crucible#1167 for
2771
+ // discussions and graphs.
2772
+ if !matches ! (
2773
+ m,
2774
+ ClientRequest :: Message ( Message :: Write { .. } )
2775
+ | ClientRequest :: RawMessage ( RawMessage :: Write { .. } , ..)
2776
+ ) {
2777
+ let d = self . client_delay_us . load ( Ordering :: Relaxed ) ;
2778
+ if d > 0 {
2779
+ tokio:: time:: sleep ( Duration :: from_micros ( d) ) . await ;
2780
+ }
2781
+ }
2782
+
2695
2783
// There's some duplication between this function and `cmd_loop` above,
2696
2784
// but it's not obvious whether there's a cleaner way to organize stuff.
2697
2785
tokio:: select! {
0 commit comments