@@ -1182,18 +1182,27 @@ where
1182
1182
let resp_channel_tx = resp_channel_tx. clone ( ) ;
1183
1183
tokio:: spawn ( async move {
1184
1184
while let Some ( m) = message_channel_rx. recv ( ) . await {
1185
- if let Err ( e ) = Downstairs :: proc_frame (
1185
+ match Downstairs :: proc_frame (
1186
1186
& adc,
1187
1187
upstairs_connection,
1188
1188
m,
1189
1189
& resp_channel_tx,
1190
- & tx,
1191
1190
)
1192
1191
. await
1193
1192
{
1194
- bail ! ( "Proc frame returns error: {}" , e) ;
1193
+ // If we added work, tell the work task to get busy.
1194
+ Ok ( Some ( new_ds_id) ) => {
1195
+ cdt:: work__start!( || new_ds_id. 0 ) ;
1196
+ tx. send ( ( ) ) . await ?;
1197
+ }
1198
+ // If we handled the job locally, nothing to do here
1199
+ Ok ( None ) => ( ) ,
1200
+ Err ( e) => {
1201
+ bail ! ( "Proc frame returns error: {}" , e) ;
1202
+ }
1195
1203
}
1196
1204
}
1205
+
1197
1206
Ok ( ( ) )
1198
1207
} )
1199
1208
} ;
@@ -2231,26 +2240,62 @@ impl Downstairs {
2231
2240
Ok ( ( ) )
2232
2241
}
2233
2242
2234
- /*
2235
- * A new IO request has been received.
2236
- * If the message is a ping, send the correct response. If the message is an
2237
- * IO, then put the new IO the work hashmap. If the message is a repair
2238
- * message, then we handle it right here.
2239
- */
2243
+ /// Handle a new message from the upstairs
2244
+ ///
2245
+ /// If the message is an IO, then put the new IO the work hashmap. If the
2246
+ /// message is a repair message, then we handle it right here.
2240
2247
async fn proc_frame (
2241
2248
ad : & Mutex < Downstairs > ,
2242
2249
upstairs_connection : UpstairsConnection ,
2243
2250
m : Message ,
2244
2251
resp_tx : & mpsc:: Sender < Message > ,
2245
- job_channel_tx : & mpsc :: Sender < ( ) > ,
2246
- ) -> Result < ( ) > {
2247
- let new_ds_id = match m {
2252
+ ) -> Result < Option < JobId > > {
2253
+ // Initial check against upstairs and session ID
2254
+ match m {
2248
2255
Message :: Write {
2249
2256
upstairs_id,
2250
2257
session_id,
2251
- job_id,
2252
- dependencies,
2253
- writes,
2258
+ ..
2259
+ }
2260
+ | Message :: WriteUnwritten {
2261
+ upstairs_id,
2262
+ session_id,
2263
+ ..
2264
+ }
2265
+ | Message :: Flush {
2266
+ upstairs_id,
2267
+ session_id,
2268
+ ..
2269
+ }
2270
+ | Message :: ReadRequest {
2271
+ upstairs_id,
2272
+ session_id,
2273
+ ..
2274
+ }
2275
+ | Message :: ExtentLiveClose {
2276
+ upstairs_id,
2277
+ session_id,
2278
+ ..
2279
+ }
2280
+ | Message :: ExtentLiveFlushClose {
2281
+ upstairs_id,
2282
+ session_id,
2283
+ ..
2284
+ }
2285
+ | Message :: ExtentLiveRepair {
2286
+ upstairs_id,
2287
+ session_id,
2288
+ ..
2289
+ }
2290
+ | Message :: ExtentLiveReopen {
2291
+ upstairs_id,
2292
+ session_id,
2293
+ ..
2294
+ }
2295
+ | Message :: ExtentLiveNoOp {
2296
+ upstairs_id,
2297
+ session_id,
2298
+ ..
2254
2299
} => {
2255
2300
if !is_message_valid (
2256
2301
upstairs_connection,
@@ -2260,8 +2305,19 @@ impl Downstairs {
2260
2305
)
2261
2306
. await ?
2262
2307
{
2263
- return Ok ( ( ) ) ;
2308
+ return Ok ( None ) ;
2264
2309
}
2310
+ }
2311
+ _ => ( ) ,
2312
+ }
2313
+
2314
+ let r = match m {
2315
+ Message :: Write {
2316
+ job_id,
2317
+ dependencies,
2318
+ writes,
2319
+ ..
2320
+ } => {
2265
2321
cdt:: submit__write__start!( || job_id. 0 ) ;
2266
2322
2267
2323
let new_write = IOop :: Write {
@@ -2274,25 +2330,14 @@ impl Downstairs {
2274
2330
Some ( job_id)
2275
2331
}
2276
2332
Message :: Flush {
2277
- upstairs_id,
2278
- session_id,
2279
2333
job_id,
2280
2334
dependencies,
2281
2335
flush_number,
2282
2336
gen_number,
2283
2337
snapshot_details,
2284
2338
extent_limit,
2339
+ ..
2285
2340
} => {
2286
- if !is_message_valid (
2287
- upstairs_connection,
2288
- upstairs_id,
2289
- session_id,
2290
- resp_tx,
2291
- )
2292
- . await ?
2293
- {
2294
- return Ok ( ( ) ) ;
2295
- }
2296
2341
cdt:: submit__flush__start!( || job_id. 0 ) ;
2297
2342
2298
2343
let new_flush = IOop :: Flush {
@@ -2308,22 +2353,11 @@ impl Downstairs {
2308
2353
Some ( job_id)
2309
2354
}
2310
2355
Message :: WriteUnwritten {
2311
- upstairs_id,
2312
- session_id,
2313
2356
job_id,
2314
2357
dependencies,
2315
2358
writes,
2359
+ ..
2316
2360
} => {
2317
- if !is_message_valid (
2318
- upstairs_connection,
2319
- upstairs_id,
2320
- session_id,
2321
- resp_tx,
2322
- )
2323
- . await ?
2324
- {
2325
- return Ok ( ( ) ) ;
2326
- }
2327
2361
cdt:: submit__writeunwritten__start!( || job_id. 0 ) ;
2328
2362
2329
2363
let new_write = IOop :: WriteUnwritten {
@@ -2336,22 +2370,11 @@ impl Downstairs {
2336
2370
Some ( job_id)
2337
2371
}
2338
2372
Message :: ReadRequest {
2339
- upstairs_id,
2340
- session_id,
2341
2373
job_id,
2342
2374
dependencies,
2343
2375
requests,
2376
+ ..
2344
2377
} => {
2345
- if !is_message_valid (
2346
- upstairs_connection,
2347
- upstairs_id,
2348
- session_id,
2349
- resp_tx,
2350
- )
2351
- . await ?
2352
- {
2353
- return Ok ( ( ) ) ;
2354
- }
2355
2378
cdt:: submit__read__start!( || job_id. 0 ) ;
2356
2379
2357
2380
let new_read = IOop :: Read {
@@ -2365,23 +2388,11 @@ impl Downstairs {
2365
2388
}
2366
2389
// These are for repair while taking live IO
2367
2390
Message :: ExtentLiveClose {
2368
- upstairs_id,
2369
- session_id,
2370
2391
job_id,
2371
2392
dependencies,
2372
2393
extent_id,
2394
+ ..
2373
2395
} => {
2374
- if !is_message_valid (
2375
- upstairs_connection,
2376
- upstairs_id,
2377
- session_id,
2378
- resp_tx,
2379
- )
2380
- . await ?
2381
- {
2382
- return Ok ( ( ) ) ;
2383
- }
2384
-
2385
2396
cdt:: submit__el__close__start!( || job_id. 0 ) ;
2386
2397
// TODO: Add dtrace probes
2387
2398
let ext_close = IOop :: ExtentClose {
@@ -2394,25 +2405,13 @@ impl Downstairs {
2394
2405
Some ( job_id)
2395
2406
}
2396
2407
Message :: ExtentLiveFlushClose {
2397
- upstairs_id,
2398
- session_id,
2399
2408
job_id,
2400
2409
dependencies,
2401
2410
extent_id,
2402
2411
flush_number,
2403
2412
gen_number,
2413
+ ..
2404
2414
} => {
2405
- if !is_message_valid (
2406
- upstairs_connection,
2407
- upstairs_id,
2408
- session_id,
2409
- resp_tx,
2410
- )
2411
- . await ?
2412
- {
2413
- return Ok ( ( ) ) ;
2414
- }
2415
-
2416
2415
cdt:: submit__el__flush__close__start!( || job_id. 0 ) ;
2417
2416
// Do both the flush, and then the close
2418
2417
let new_flush = IOop :: ExtentFlushClose {
@@ -2427,25 +2426,12 @@ impl Downstairs {
2427
2426
Some ( job_id)
2428
2427
}
2429
2428
Message :: ExtentLiveRepair {
2430
- upstairs_id,
2431
- session_id,
2432
2429
job_id,
2433
2430
dependencies,
2434
2431
extent_id,
2435
2432
source_repair_address,
2436
2433
..
2437
2434
} => {
2438
- if !is_message_valid (
2439
- upstairs_connection,
2440
- upstairs_id,
2441
- session_id,
2442
- resp_tx,
2443
- )
2444
- . await ?
2445
- {
2446
- return Ok ( ( ) ) ;
2447
- }
2448
-
2449
2435
cdt:: submit__el__repair__start!( || job_id. 0 ) ;
2450
2436
// Do both the flush, and then the close
2451
2437
let new_repair = IOop :: ExtentLiveRepair {
@@ -2461,23 +2447,11 @@ impl Downstairs {
2461
2447
Some ( job_id)
2462
2448
}
2463
2449
Message :: ExtentLiveReopen {
2464
- upstairs_id,
2465
- session_id,
2466
2450
job_id,
2467
2451
dependencies,
2468
2452
extent_id,
2453
+ ..
2469
2454
} => {
2470
- if !is_message_valid (
2471
- upstairs_connection,
2472
- upstairs_id,
2473
- session_id,
2474
- resp_tx,
2475
- )
2476
- . await ?
2477
- {
2478
- return Ok ( ( ) ) ;
2479
- }
2480
-
2481
2455
cdt:: submit__el__reopen__start!( || job_id. 0 ) ;
2482
2456
let new_open = IOop :: ExtentLiveReopen {
2483
2457
dependencies,
@@ -2489,21 +2463,10 @@ impl Downstairs {
2489
2463
Some ( job_id)
2490
2464
}
2491
2465
Message :: ExtentLiveNoOp {
2492
- upstairs_id,
2493
- session_id,
2494
2466
job_id,
2495
2467
dependencies,
2468
+ ..
2496
2469
} => {
2497
- if !is_message_valid (
2498
- upstairs_connection,
2499
- upstairs_id,
2500
- session_id,
2501
- resp_tx,
2502
- )
2503
- . await ?
2504
- {
2505
- return Ok ( ( ) ) ;
2506
- }
2507
2470
cdt:: submit__el__noop__start!( || job_id. 0 ) ;
2508
2471
let new_open = IOop :: ExtentLiveNoOp { dependencies } ;
2509
2472
@@ -2551,7 +2514,7 @@ impl Downstairs {
2551
2514
}
2552
2515
} ;
2553
2516
resp_tx. send ( msg) . await ?;
2554
- return Ok ( ( ) ) ;
2517
+ None
2555
2518
}
2556
2519
Message :: ExtentClose {
2557
2520
repair_id,
@@ -2570,7 +2533,7 @@ impl Downstairs {
2570
2533
}
2571
2534
} ;
2572
2535
resp_tx. send ( msg) . await ?;
2573
- return Ok ( ( ) ) ;
2536
+ None
2574
2537
}
2575
2538
Message :: ExtentRepair {
2576
2539
repair_id,
@@ -2604,7 +2567,7 @@ impl Downstairs {
2604
2567
}
2605
2568
} ;
2606
2569
resp_tx. send ( msg) . await ?;
2607
- return Ok ( ( ) ) ;
2570
+ None
2608
2571
}
2609
2572
Message :: ExtentReopen {
2610
2573
repair_id,
@@ -2623,20 +2586,11 @@ impl Downstairs {
2623
2586
}
2624
2587
} ;
2625
2588
resp_tx. send ( msg) . await ?;
2626
- return Ok ( ( ) ) ;
2589
+ None
2627
2590
}
2628
2591
x => bail ! ( "unexpected frame {:?}" , x) ,
2629
2592
} ;
2630
-
2631
- /*
2632
- * If we added work, tell the work task to get busy.
2633
- */
2634
- if let Some ( new_ds_id) = new_ds_id {
2635
- cdt:: work__start!( || new_ds_id. 0 ) ;
2636
- job_channel_tx. send ( ( ) ) . await ?;
2637
- }
2638
-
2639
- Ok ( ( ) )
2593
+ Ok ( r)
2640
2594
}
2641
2595
2642
2596
async fn do_work_for (
0 commit comments