@@ -1061,14 +1061,7 @@ impl Downstairs {
1061
1061
}
1062
1062
1063
1063
// Submit the initial repair jobs, which kicks everything off
1064
- self . begin_repair_for (
1065
- ExtentId ( 0 ) ,
1066
- Some ( self . ddef . unwrap ( ) . extent_count ( ) ) ,
1067
- false ,
1068
- & repair_downstairs,
1069
- source_downstairs,
1070
- up_state,
1071
- ) ;
1064
+ self . start_live_repair ( & repair_downstairs, source_downstairs, up_state) ;
1072
1065
1073
1066
info ! (
1074
1067
self . log,
@@ -1248,31 +1241,9 @@ impl Downstairs {
1248
1241
flush_job : PendingJob :: new ( flush_id) ,
1249
1242
}
1250
1243
} else {
1251
- // Keep going!
1252
- repair. active_extent = next_extent;
1253
-
1254
- let repair_downstairs = repair. repair_downstairs . clone ( ) ;
1255
- let active_extent = repair. active_extent ;
1256
- let aborting = repair. aborting_repair ;
1257
- let source_downstairs = repair. source_downstairs ;
1258
-
1259
1244
let repair_id = repair. id ;
1260
- let extent_count = repair. extent_count ;
1261
-
1262
- self . notify_live_repair_progress (
1263
- repair_id,
1264
- active_extent,
1265
- extent_count,
1266
- ) ;
1267
-
1268
- self . begin_repair_for (
1269
- active_extent,
1270
- None ,
1271
- aborting,
1272
- & repair_downstairs,
1273
- source_downstairs,
1274
- up_state,
1275
- ) ;
1245
+ self . notify_live_repair_progress ( repair_id, next_extent) ;
1246
+ self . send_next_live_repair ( up_state) ;
1276
1247
} ;
1277
1248
}
1278
1249
LiveRepairState :: FinalFlush { .. } => {
@@ -1386,29 +1357,74 @@ impl Downstairs {
1386
1357
}
1387
1358
}
1388
1359
1389
- /// Begins live-repair for the given extent
1360
+ /// Begins live-repair
1390
1361
///
1391
- /// Claims initial IDs and submits initial jobs. If `extent_count` is set,
1392
- /// then we also set `self.repair` here; otherwise, we update the current
1393
- /// state (`self.repair.as_mut().unwrap().state`).
1394
- ///
1395
- /// If `aborting` is true, then all of the submitted jobs are no-ops.
1362
+ /// Claims initial IDs and submits initial jobs, initializing `self.repair`
1396
1363
///
1397
1364
/// # Panics
1398
- /// If the upstairs is not in `UpstairsState::Active`, or we _are not_
1399
- /// aborting the repair but either (1) the source downstairs is not
1400
- /// `DsState::Active`, or (2) the repair downstairs are not all
1401
- /// `DsState::LiveRepair`.
1402
- #[ allow( clippy:: too_many_arguments) ]
1403
- fn begin_repair_for (
1365
+ /// If the upstairs is not in `UpstairsState::Active`, or either (1) the
1366
+ /// source downstairs is not `DsState::Active`, or (2) the repair downstairs
1367
+ /// are not all `DsState::LiveRepair`.
1368
+ fn start_live_repair (
1404
1369
& mut self ,
1405
- extent : ExtentId ,
1406
- extent_count : Option < u32 > ,
1407
- aborting : bool ,
1408
1370
repair_downstairs : & [ ClientId ] ,
1409
1371
source_downstairs : ClientId ,
1410
1372
up_state : & UpstairsState ,
1411
1373
) {
1374
+ let extent_count = self . ddef . unwrap ( ) . extent_count ( ) ;
1375
+ // Load a partially valid repair state (`min_id` and `state` are bogus);
1376
+ // `state` will be updated in `send_live_repair_jobs`, and `min_jobs`
1377
+ // will be set once we know the close job ID.
1378
+ self . repair = Some ( LiveRepairData {
1379
+ id : Uuid :: new_v4 ( ) ,
1380
+ extent_count,
1381
+ repair_downstairs : repair_downstairs. to_vec ( ) ,
1382
+ source_downstairs,
1383
+ aborting_repair : false ,
1384
+ active_extent : ExtentId ( 0 ) ,
1385
+ min_id : JobId ( 0 ) , // fixed below
1386
+ repair_job_ids : BTreeMap :: new ( ) ,
1387
+ state : LiveRepairState :: dummy ( ) , // fixed when sending jobs
1388
+ } ) ;
1389
+ let extent_repair_ids = self . send_live_repair_jobs ( up_state) ;
1390
+ self . repair . as_mut ( ) . unwrap ( ) . min_id = extent_repair_ids. close_id ;
1391
+ }
1392
+
1393
+ /// Increments `self.repair.active_extent` and sends new jobs
1394
+ ///
1395
+ /// # Panics
1396
+ /// If any state is invalid, or `self.repair` is `None`
1397
+ fn send_next_live_repair ( & mut self , up_state : & UpstairsState ) {
1398
+ // This invalidates repair state, but we're about to update it
1399
+ let repair = self . repair . as_mut ( ) . unwrap ( ) ;
1400
+ repair. active_extent += 1 ;
1401
+ self . send_live_repair_jobs ( up_state) ;
1402
+ }
1403
+
1404
+ /// Begins live-repair for the next extent, based on `self.repair`
1405
+ ///
1406
+ /// Claims initial IDs and submits live-repair jobs, updating
1407
+ /// `self.repair.state` with the new state. If `self.repair.aborting` is
1408
+ /// true, then all of the submitted jobs are no-ops.
1409
+ ///
1410
+ /// # Panics
1411
+ /// - If `self.repair` is `None`
1412
+ /// - If the upstairs is not in `UpstairsState::Active`, or we _are not_
1413
+ /// aborting the repair but either (1) the source downstairs is not
1414
+ /// `DsState::Active`, or (2) the repair downstairs are not all
1415
+ /// `DsState::LiveRepair`.
1416
+ fn send_live_repair_jobs (
1417
+ & mut self ,
1418
+ up_state : & UpstairsState ,
1419
+ ) -> ExtentRepairIDs {
1420
+ // Keep going!
1421
+ let repair = self . repair . as_mut ( ) . unwrap ( ) ;
1422
+
1423
+ let repair_downstairs = repair. repair_downstairs . clone ( ) ;
1424
+ let extent = repair. active_extent ;
1425
+ let aborting = repair. aborting_repair ;
1426
+ let source_downstairs = repair. source_downstairs ;
1427
+
1412
1428
// Invariant checking to begin
1413
1429
assert ! (
1414
1430
matches!( up_state, UpstairsState :: Active ) ,
@@ -1422,7 +1438,7 @@ impl Downstairs {
1422
1438
self . clients[ source_downstairs] . state( ) ,
1423
1439
DsState :: Active
1424
1440
) ;
1425
- for & c in repair_downstairs {
1441
+ for & c in & repair_downstairs {
1426
1442
assert_eq ! ( self . clients[ c] . state( ) , DsState :: LiveRepair ) ;
1427
1443
}
1428
1444
}
@@ -1452,44 +1468,27 @@ impl Downstairs {
1452
1468
close_deps,
1453
1469
) ;
1454
1470
1455
- let state = LiveRepairState :: Closing {
1471
+ let repair = self . repair . as_mut ( ) . unwrap ( ) ; // reborrow
1472
+ repair. state = LiveRepairState :: Closing {
1456
1473
close_job : PendingJob :: new ( close_id) ,
1457
1474
repair_job : PendingJob :: new ( repair_id) ,
1458
1475
reopen_job : PendingJob :: new ( reopen_id) ,
1459
1476
noop_job : PendingJob :: new ( noop_id) ,
1460
1477
} ;
1461
- if let Some ( extent_count) = extent_count {
1462
- self . repair = Some ( LiveRepairData {
1463
- id : Uuid :: new_v4 ( ) ,
1464
- extent_count,
1465
- repair_downstairs : repair_downstairs. to_vec ( ) ,
1466
- source_downstairs,
1467
- aborting_repair : false ,
1468
- active_extent : ExtentId ( 0 ) ,
1469
- min_id : close_id,
1470
- repair_job_ids : BTreeMap :: new ( ) ,
1471
- state,
1472
- } ) ;
1473
- } else {
1474
- self . repair . as_mut ( ) . unwrap ( ) . state = state;
1475
- }
1476
1478
1477
1479
if aborting {
1478
- self . create_and_enqueue_noop_io ( vec ! [ noop_id] , reopen_id)
1480
+ self . create_and_enqueue_noop_io ( vec ! [ noop_id] , reopen_id) ;
1481
+ self . create_and_enqueue_noop_io ( close_deps, close_id) ;
1479
1482
} else {
1480
1483
self . create_and_enqueue_reopen_io ( extent, vec ! [ noop_id] , reopen_id) ;
1481
- } ;
1482
-
1483
- if aborting {
1484
- self . create_and_enqueue_noop_io ( close_deps, close_id)
1485
- } else {
1486
1484
self . create_and_enqueue_close_io (
1487
1485
extent,
1488
1486
close_deps,
1489
1487
close_id,
1490
- repair_downstairs,
1488
+ & repair_downstairs,
1491
1489
)
1492
1490
} ;
1491
+ extent_repair_ids
1493
1492
}
1494
1493
1495
1494
/// Creates a [DownstairsIO] job for an [IOop::ExtentLiveReopen], and
@@ -3836,9 +3835,9 @@ impl Downstairs {
3836
3835
& self ,
3837
3836
repair_id : Uuid ,
3838
3837
current_extent : ExtentId ,
3839
- extent_count : u32 ,
3840
3838
) {
3841
3839
if let Some ( notify) = & self . notify {
3840
+ let extent_count = self . ddef . unwrap ( ) . extent_count ( ) ;
3842
3841
notify. send ( NotifyRequest :: LiveRepairProgress {
3843
3842
upstairs_id : self . cfg . upstairs_id ,
3844
3843
repair_id,
0 commit comments