@@ -73,17 +73,6 @@ pub(crate) struct Downstairs {
73
73
/// The active list of IO for the downstairs.
74
74
pub ( crate ) ds_active : ActiveJobs ,
75
75
76
- /// The number of write bytes that haven't finished yet
77
- ///
78
- /// This is used to configure backpressure to the host, because writes
79
- /// (uniquely) will return before actually being completed by a Downstairs
80
- /// and can clog up the queues.
81
- ///
82
- /// It is stored in the Downstairs because from the perspective of the
83
- /// Upstairs, writes complete immediately; only the Downstairs is actually
84
- /// tracking the pending jobs.
85
- write_bytes_outstanding : BackpressureBytes ,
86
-
87
76
/// The next Job ID this Upstairs should use for downstairs work.
88
77
next_id : JobId ,
89
78
@@ -139,44 +128,6 @@ pub(crate) struct Downstairs {
139
128
reqwest_client : reqwest:: Client ,
140
129
}
141
130
142
- /// Helper struct to contain a count of backpressure bytes
143
- #[ derive( Debug ) ]
144
- struct BackpressureBytes ( u64 ) ;
145
-
146
- impl BackpressureBytes {
147
- fn new ( ) -> Self {
148
- BackpressureBytes ( 0 )
149
- }
150
-
151
- /// Ensures that the given `DownstairsIO` is counted for backpressure
152
- ///
153
- /// This is idempotent: if the job has already been counted, indicated by
154
- /// the `DownstairsIO::backpressure_bytes` member being `Some(..)`, it will
155
- /// not be counted again.
156
- fn increment ( & mut self , io : & mut DownstairsIO ) {
157
- match & io. work {
158
- IOop :: Write { data, .. } | IOop :: WriteUnwritten { data, .. } => {
159
- if io. backpressure_bytes . is_none ( ) {
160
- let n = data. len ( ) as u64 ;
161
- io. backpressure_bytes = Some ( n) ;
162
- self . 0 += n;
163
- }
164
- }
165
- _ => ( ) ,
166
- } ;
167
- }
168
-
169
- /// Remove the given job's contribution to backpressure
170
- ///
171
- /// This is idempotent: `DownstairsIO::backpressure_bytes` is set to `None`
172
- /// by this function call, so it's harmless to call repeatedly.
173
- fn decrement ( & mut self , io : & mut DownstairsIO ) {
174
- if let Some ( n) = io. backpressure_bytes . take ( ) {
175
- self . 0 = self . 0 . checked_sub ( n) . unwrap ( ) ;
176
- }
177
- }
178
- }
179
-
180
131
/// State machine for a live-repair operation
181
132
///
182
133
/// We pass through all states (except `FinalFlush`) in order for each extent,
@@ -331,7 +282,6 @@ impl Downstairs {
331
282
cfg,
332
283
next_flush : 0 ,
333
284
ds_active : ActiveJobs :: new ( ) ,
334
- write_bytes_outstanding : BackpressureBytes :: new ( ) ,
335
285
completed : AllocRingBuffer :: new ( 2048 ) ,
336
286
completed_jobs : AllocRingBuffer :: new ( 8 ) ,
337
287
next_id : JobId ( 1000 ) ,
@@ -936,10 +886,6 @@ impl Downstairs {
936
886
if self . clients [ client_id] . replay_job ( job) {
937
887
count += 1 ;
938
888
}
939
-
940
- // Make sure this job counts for backpressure (this is a no-op if
941
- // the job is already counted).
942
- self . write_bytes_outstanding . increment ( job) ;
943
889
} ) ;
944
890
info ! (
945
891
self . log,
@@ -1519,7 +1465,7 @@ impl Downstairs {
1519
1465
replay : false ,
1520
1466
data : None ,
1521
1467
read_validations : Vec :: new ( ) ,
1522
- backpressure_bytes : None ,
1468
+ backpressure_bytes : ClientMap :: new ( ) ,
1523
1469
}
1524
1470
}
1525
1471
@@ -1643,7 +1589,7 @@ impl Downstairs {
1643
1589
replay : false ,
1644
1590
data : None ,
1645
1591
read_validations : Vec :: new ( ) ,
1646
- backpressure_bytes : None ,
1592
+ backpressure_bytes : ClientMap :: new ( ) ,
1647
1593
}
1648
1594
}
1649
1595
@@ -1784,7 +1730,7 @@ impl Downstairs {
1784
1730
replay : false ,
1785
1731
data : None ,
1786
1732
read_validations : Vec :: new ( ) ,
1787
- backpressure_bytes : None ,
1733
+ backpressure_bytes : ClientMap :: new ( ) ,
1788
1734
}
1789
1735
}
1790
1736
@@ -1842,7 +1788,7 @@ impl Downstairs {
1842
1788
replay : false ,
1843
1789
data : None ,
1844
1790
read_validations : Vec :: new ( ) ,
1845
- backpressure_bytes : None ,
1791
+ backpressure_bytes : ClientMap :: new ( ) ,
1846
1792
} ;
1847
1793
self . enqueue ( io) ;
1848
1794
ds_id
@@ -1926,7 +1872,7 @@ impl Downstairs {
1926
1872
replay : false ,
1927
1873
data : None ,
1928
1874
read_validations : Vec :: new ( ) ,
1929
- backpressure_bytes : None ,
1875
+ backpressure_bytes : ClientMap :: new ( ) ,
1930
1876
} ;
1931
1877
self . enqueue ( io) ;
1932
1878
ds_id
@@ -1957,7 +1903,7 @@ impl Downstairs {
1957
1903
replay : false ,
1958
1904
data : None ,
1959
1905
read_validations : Vec :: new ( ) ,
1960
- backpressure_bytes : None ,
1906
+ backpressure_bytes : ClientMap :: new ( ) ,
1961
1907
}
1962
1908
}
1963
1909
@@ -2366,7 +2312,7 @@ impl Downstairs {
2366
2312
replay : false ,
2367
2313
data : None ,
2368
2314
read_validations : Vec :: new ( ) ,
2369
- backpressure_bytes : None ,
2315
+ backpressure_bytes : ClientMap :: new ( ) ,
2370
2316
} ;
2371
2317
2372
2318
self . enqueue ( fl) ;
@@ -2493,7 +2439,7 @@ impl Downstairs {
2493
2439
replay : false ,
2494
2440
data : None ,
2495
2441
read_validations : Vec :: new ( ) ,
2496
- backpressure_bytes : None ,
2442
+ backpressure_bytes : ClientMap :: new ( ) ,
2497
2443
} ;
2498
2444
2499
2445
self . enqueue ( io) ;
@@ -2580,7 +2526,6 @@ impl Downstairs {
2580
2526
}
2581
2527
2582
2528
// Make sure this job is counted for backpressure
2583
- self . write_bytes_outstanding . increment ( & mut io) ;
2584
2529
let is_write = matches ! ( io. work, IOop :: Write { .. } ) ;
2585
2530
2586
2531
// Puts the IO onto the downstairs work queue.
@@ -2963,12 +2908,17 @@ impl Downstairs {
2963
2908
// they are completed (in `process_io_completion_inner`),
2964
2909
// **not** when they are retired. We'll do a sanity check here
2965
2910
// and print a warning if that's not the case.
2966
- if job. backpressure_bytes . is_some ( ) {
2967
- warn ! (
2968
- self . log,
2969
- "job {ds_id} had pending backpressure bytes"
2970
- ) ;
2971
- self . write_bytes_outstanding . decrement ( & mut job) ;
2911
+ for c in ClientId :: iter ( ) {
2912
+ if job. backpressure_bytes . contains ( & c) {
2913
+ warn ! (
2914
+ self . log,
2915
+ "job {ds_id} had pending backpressure bytes \
2916
+ for client {c}"
2917
+ ) ;
2918
+ self . clients [ c]
2919
+ . write_bytes_outstanding
2920
+ . decrement ( & mut job, c) ;
2921
+ }
2972
2922
}
2973
2923
}
2974
2924
@@ -3467,14 +3417,6 @@ impl Downstairs {
3467
3417
self . ackable_work . insert ( ds_id) ;
3468
3418
}
3469
3419
3470
- // Write bytes no longer count for backpressure once all 3x downstairs
3471
- // have returned (although they'll continue to be stored until they are
3472
- // retired by the next flush).
3473
- let wc = job. state_count ( ) ;
3474
- if ( wc. error + wc. skipped + wc. done ) == 3 {
3475
- self . write_bytes_outstanding . decrement ( job) ;
3476
- }
3477
-
3478
3420
/*
3479
3421
* If all 3 jobs are done, we can check here to see if we can
3480
3422
* remove this job from the DS list. If we have completed the ack
@@ -3492,6 +3434,7 @@ impl Downstairs {
3492
3434
// If we are a write or a flush with one success, then
3493
3435
// we must switch our state to failed. This condition is
3494
3436
// handled when we check the job result.
3437
+ let wc = job. state_count ( ) ;
3495
3438
if ( wc. error + wc. skipped + wc. done ) == 3 {
3496
3439
self . ackable_work . insert ( ds_id) ;
3497
3440
debug ! ( self . log, "[{}] Set AckReady {}" , client_id, job. ds_id) ;
@@ -3589,7 +3532,12 @@ impl Downstairs {
3589
3532
}
3590
3533
3591
3534
pub ( crate ) fn write_bytes_outstanding ( & self ) -> u64 {
3592
- self . write_bytes_outstanding . 0
3535
+ self . clients
3536
+ . iter ( )
3537
+ . filter ( |c| matches ! ( c. state( ) , DsState :: Active ) )
3538
+ . map ( |c| c. write_bytes_outstanding . get ( ) )
3539
+ . max ( )
3540
+ . unwrap_or ( 0 )
3593
3541
}
3594
3542
3595
3543
/// Marks a single job as acked
0 commit comments