@@ -39,7 +39,9 @@ mod stats;
39
39
mod extent_inner_raw;
40
40
mod extent_inner_sqlite;
41
41
42
+ use extent:: ExtentState ;
42
43
use region:: Region ;
44
+ use repair:: RepairReadyRequest ;
43
45
44
46
pub use admin:: run_dropshot;
45
47
pub use dump:: dump_region;
@@ -1595,6 +1597,40 @@ where
1595
1597
. await
1596
1598
}
1597
1599
1600
+ // Repair info task
1601
+ // listen for the repair API to request if a specific extent is closed
1602
+ // or if the entire region is read only.
1603
+ async fn repair_info (
1604
+ ads : & Arc < Mutex < Downstairs > > ,
1605
+ mut repair_request_rx : mpsc:: Receiver < RepairReadyRequest > ,
1606
+ ) -> Result < ( ) > {
1607
+ info ! ( ads. lock( ) . await . log, "Started repair info task" ) ;
1608
+ while let Some ( repair_request) = repair_request_rx. recv ( ) . await {
1609
+ let ds = ads. lock ( ) . await ;
1610
+ let state = if ds. read_only {
1611
+ true
1612
+ } else {
1613
+ matches ! ( ds. region. extents[ repair_request. eid] , ExtentState :: Closed )
1614
+ } ;
1615
+ drop ( ds) ;
1616
+
1617
+ if let Err ( e) = repair_request. tx . send ( state) {
1618
+ info ! (
1619
+ ads. lock( ) . await . log,
1620
+ "Error {e} sending repair info for extent {}" ,
1621
+ repair_request. eid,
1622
+ ) ;
1623
+ } else {
1624
+ info ! (
1625
+ ads. lock( ) . await . log,
1626
+ "Repair info sent {state} for extent {}" , repair_request. eid,
1627
+ ) ;
1628
+ }
1629
+ }
1630
+ warn ! ( ads. lock( ) . await . log, "Repair info task ends" ) ;
1631
+ Ok ( ( ) )
1632
+ }
1633
+
1598
1634
async fn reply_task < WT > (
1599
1635
mut resp_channel_rx : mpsc:: Receiver < Message > ,
1600
1636
mut fw : FramedWrite < WT , CrucibleEncoder > ,
@@ -3255,23 +3291,42 @@ pub async fn start_downstairs(
3255
3291
let dss = d. clone ( ) ;
3256
3292
let repair_log = d. lock ( ) . await . log . new ( o ! ( "task" => "repair" . to_string( ) ) ) ;
3257
3293
3258
- let repair_listener =
3259
- match repair:: repair_main ( & dss, repair_address, & repair_log) . await {
3260
- Err ( e) => {
3261
- // TODO tear down other things if repair server can't be
3262
- // started?
3263
- bail ! ( "got {:?} from repair main" , e) ;
3264
- }
3294
+ let ( repair_request_tx, repair_request_rx) = mpsc:: channel ( 10 ) ;
3295
+ let repair_listener = match repair:: repair_main (
3296
+ & dss,
3297
+ repair_address,
3298
+ & repair_log,
3299
+ repair_request_tx,
3300
+ )
3301
+ . await
3302
+ {
3303
+ Err ( e) => {
3304
+ // TODO tear down other things if repair server can't be
3305
+ // started?
3306
+ bail ! ( "got {:?} from repair main" , e) ;
3307
+ }
3265
3308
3266
- Ok ( socket_addr) => socket_addr,
3267
- } ;
3309
+ Ok ( socket_addr) => socket_addr,
3310
+ } ;
3268
3311
3269
3312
{
3270
3313
let mut ds = d. lock ( ) . await ;
3271
3314
ds. repair_address = Some ( repair_listener) ;
3272
3315
}
3273
3316
info ! ( log, "Using repair address: {:?}" , repair_listener) ;
3274
3317
3318
+ let rd = d. clone ( ) ;
3319
+ let _repair_info_handle = tokio:: spawn ( async move {
3320
+ if let Err ( e) = repair_info ( & rd, repair_request_rx) . await {
3321
+ error ! (
3322
+ rd. lock( ) . await . log,
3323
+ "repair info exits with error: {:?}" , e
3324
+ ) ;
3325
+ } else {
3326
+ info ! ( rd. lock( ) . await . log, "repair info connection all done" ) ;
3327
+ }
3328
+ } ) ;
3329
+
3275
3330
// Optionally require SSL connections
3276
3331
let ssl_acceptor = if let Some ( cert_pem_path) = cert_pem {
3277
3332
let key_pem_path = key_pem. unwrap ( ) ;
0 commit comments