Skip to content

Commit 6b12017

Browse files
authored
Make the Guest -> Upstairs queue fully async (#1086)
The upstairs `struct Guest` is currently used in at least two different tasks: - The guest uses the `trait BlockIO` to push requests into a queue (protected by a `tokio::sync::Mutex`). There can be multiple tasks sharing an `Arc<Guest>` here, e.g. 8x in Propolis. - The `struct Upstairs` pulls requests from that queue in the `up_main` event loop This PR splits the `Guest` into two separate `structs`, which are **owned** (not shared) by their respective tasks: - `Guest` (same name as before) implements `BlockIO` and is used by the guest task to send new jobs - `GuestIoHandle` (new!) is used by the `struct Upstairs` and receives those jobs The two halves communicate via a `tokio::sync::mpsc` queue, removing the previous combination of `tokio::sync::Mutex<VecDeque<BlockReq>> + tokio::sync::Notify` It's still possible to share a `Guest` by wrapping it in an `Arc`, just like before. As a side effect of removing the `tokio::sync::Mutex`, many functions become synchronous; don't be alarmed by the LOC in this PR, since they're mostly mechanical. I don't see any performance changes here (alas), but I think it's still worthwhile: it simplifies our code and helps set up a future consolidation of backpressure logic.
1 parent eb96d24 commit 6b12017

File tree

14 files changed

+994
-1070
lines changed

14 files changed

+994
-1070
lines changed

crudd/src/main.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -571,10 +571,10 @@ async fn main() -> Result<()> {
571571
};
572572

573573
// TODO: volumes?
574-
let guest = Arc::new(Guest::new(None));
574+
let (guest, io) = Guest::new(None);
575+
let guest = Arc::new(guest);
575576

576-
let _join_handle =
577-
up_main(crucible_opts, opt.gen, None, guest.clone(), None)?;
577+
let _join_handle = up_main(crucible_opts, opt.gen, None, io, None)?;
578578
eprintln!("Crucible runtime is spawned");
579579

580580
// IO time

crutest/src/main.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,8 @@ async fn main() -> Result<()> {
668668
* We create this here instead of inside up_main() so we can use
669669
* the methods provided by guest to interact with Crucible.
670670
*/
671-
let guest = Arc::new(Guest::new(None));
671+
let (guest, io) = Guest::new(None);
672+
let guest = Arc::new(guest);
672673

673674
let pr;
674675
if opt.metrics {
@@ -698,8 +699,7 @@ async fn main() -> Result<()> {
698699
pr = None;
699700
}
700701

701-
let _join_handle =
702-
up_main(crucible_opts, opt.gen, None, guest.clone(), pr)?;
702+
let _join_handle = up_main(crucible_opts, opt.gen, None, io, pr)?;
703703
println!("Crucible runtime is spawned");
704704

705705
if let Workload::CliServer { listen, port } = opt.workload {

hammer/src/main.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright 2023 Oxide Computer Company
22

33
use std::net::SocketAddr;
4-
use std::sync::Arc;
54

65
use anyhow::{bail, Result};
76
use clap::Parser;
@@ -130,11 +129,10 @@ async fn main() -> Result<()> {
130129
* We create this here instead of inside up_main() so we can use
131130
* the methods provided by guest to interact with Crucible.
132131
*/
133-
let guest = Arc::new(Guest::new(None));
132+
let (guest, io) = Guest::new(None);
134133

135134
let gen: u64 = i as u64 + opt.gen;
136-
let _join_handle =
137-
up_main(crucible_opts.clone(), gen, None, guest.clone(), None)?;
135+
let _join_handle = up_main(crucible_opts.clone(), gen, None, io, None)?;
138136
println!("Crucible runtime is spawned");
139137

140138
cpfs.push(crucible::CruciblePseudoFile::from(guest)?);

integration_tests/src/lib.rs

+90-39
Original file line numberDiff line numberDiff line change
@@ -3058,10 +3058,9 @@ mod test {
30583058
let tds = TestDownstairsSet::small(false).await?;
30593059
let opts = tds.opts();
30603060

3061-
let guest = Arc::new(Guest::new(None));
3062-
let gc = guest.clone();
3061+
let (guest, io) = Guest::new(None);
30633062

3064-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3063+
let _join_handle = up_main(opts, 1, None, io, None)?;
30653064

30663065
guest.activate().await?;
30673066
guest.query_work_queue().await?;
@@ -3093,6 +3092,70 @@ mod test {
30933092
Ok(())
30943093
}
30953094

3095+
/// Drop the guest right away and confirm that the worker thread stops
3096+
#[tokio::test]
3097+
async fn integration_test_guest_drop_early() -> Result<()> {
3098+
// Spin off three downstairs, build our Crucible struct.
3099+
let tds = TestDownstairsSet::small(false).await?;
3100+
let opts = tds.opts();
3101+
3102+
let (guest, io) = Guest::new(None);
3103+
3104+
let join_handle = up_main(opts, 1, None, io, None)?;
3105+
3106+
drop(guest);
3107+
3108+
let r = join_handle.await;
3109+
assert!(r.is_ok());
3110+
Ok(())
3111+
}
3112+
3113+
/// Same as `integration_test_guest_downstairs`, but dropping the Guest at
3114+
/// the end and confirming that the worker thread stops
3115+
#[tokio::test]
3116+
async fn integration_test_guest_drop() -> Result<()> {
3117+
const BLOCK_SIZE: usize = 512;
3118+
// Spin off three downstairs, build our Crucible struct.
3119+
let tds = TestDownstairsSet::small(false).await?;
3120+
let opts = tds.opts();
3121+
3122+
let (guest, io) = Guest::new(None);
3123+
3124+
let join_handle = up_main(opts, 1, None, io, None)?;
3125+
3126+
guest.activate().await?;
3127+
guest.query_work_queue().await?;
3128+
3129+
// Verify contents are zero on init
3130+
let mut buffer = Buffer::new(10, BLOCK_SIZE);
3131+
guest
3132+
.read(Block::new(0, BLOCK_SIZE.trailing_zeros()), &mut buffer)
3133+
.await?;
3134+
3135+
assert_eq!(vec![0x00_u8; BLOCK_SIZE * 10], buffer.to_vec());
3136+
3137+
// Write data in
3138+
guest
3139+
.write(
3140+
Block::new(0, BLOCK_SIZE.trailing_zeros()),
3141+
Bytes::from(vec![0x55; BLOCK_SIZE * 10]),
3142+
)
3143+
.await?;
3144+
3145+
// Read parent, verify contents
3146+
let mut buffer = Buffer::new(10, BLOCK_SIZE);
3147+
guest
3148+
.read(Block::new(0, BLOCK_SIZE.trailing_zeros()), &mut buffer)
3149+
.await?;
3150+
3151+
assert_eq!(vec![0x55_u8; BLOCK_SIZE * 10], buffer.to_vec());
3152+
drop(guest);
3153+
3154+
let r = join_handle.await;
3155+
assert!(r.is_ok());
3156+
Ok(())
3157+
}
3158+
30963159
#[tokio::test]
30973160
async fn integration_test_guest_zero_length_io() -> Result<()> {
30983161
// Test the guest layer with a write and read of zero length
@@ -3102,10 +3165,9 @@ mod test {
31023165
let tds = TestDownstairsSet::small(false).await?;
31033166
let opts = tds.opts();
31043167

3105-
let guest = Arc::new(Guest::new(None));
3106-
let gc = guest.clone();
3168+
let (guest, io) = Guest::new(None);
31073169

3108-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3170+
let _join_handle = up_main(opts, 1, None, io, None)?;
31093171

31103172
guest.activate().await?;
31113173
guest.query_work_queue().await?;
@@ -3138,9 +3200,8 @@ mod test {
31383200
let opts = tds.opts();
31393201

31403202
let log = csl();
3141-
let guest = Arc::new(Guest::new(Some(log.clone())));
3142-
let gc = guest.clone();
3143-
let _jh = up_main(opts, 1, None, gc, None)?;
3203+
let (guest, io) = Guest::new(Some(log.clone()));
3204+
let _jh = up_main(opts, 1, None, io, None)?;
31443205

31453206
guest.activate().await?;
31463207

@@ -3212,11 +3273,10 @@ mod test {
32123273
let tds = TestDownstairsSet::small(true).await?;
32133274
let opts = tds.opts();
32143275

3215-
let guest = Arc::new(Guest::new(None));
3216-
let gc = guest.clone();
3276+
let (guest, io) = Guest::new(None);
32173277

32183278
// Read-only Upstairs should return errors if writes are attempted.
3219-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3279+
let _join_handle = up_main(opts, 1, None, io, None)?;
32203280

32213281
guest.activate().await?;
32223282

@@ -3246,9 +3306,8 @@ mod test {
32463306
let opts = tds.opts();
32473307

32483308
let log = csl();
3249-
let guest = Arc::new(Guest::new(Some(log.clone())));
3250-
let gc = guest.clone();
3251-
let _jh = up_main(opts, 1, None, gc, None)?;
3309+
let (guest, io) = Guest::new(Some(log.clone()));
3310+
let _jh = up_main(opts, 1, None, io, None)?;
32523311

32533312
guest.activate().await?;
32543313

@@ -3293,10 +3352,9 @@ mod test {
32933352
let tds = TestDownstairsSet::small(false).await?;
32943353
let opts = tds.opts();
32953354

3296-
let guest = Arc::new(Guest::new(None));
3297-
let gc = guest.clone();
3355+
let (guest, io) = Guest::new(None);
32983356

3299-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3357+
let _join_handle = up_main(opts, 1, None, io, None)?;
33003358

33013359
guest.activate().await?;
33023360
guest.query_work_queue().await?;
@@ -3366,10 +3424,9 @@ mod test {
33663424
let tds = TestDownstairsSet::small(false).await?;
33673425
let opts = tds.opts();
33683426

3369-
let guest = Arc::new(Guest::new(None));
3370-
let gc = guest.clone();
3427+
let (guest, io) = Guest::new(None);
33713428

3372-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3429+
let _join_handle = up_main(opts, 1, None, io, None)?;
33733430

33743431
guest.activate().await?;
33753432
guest.query_work_queue().await?;
@@ -3424,10 +3481,9 @@ mod test {
34243481
let tds = TestDownstairsSet::small(false).await?;
34253482
let opts = tds.opts();
34263483

3427-
let guest = Arc::new(Guest::new(None));
3428-
let gc = guest.clone();
3484+
let (guest, io) = Guest::new(None);
34293485

3430-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3486+
let _join_handle = up_main(opts, 1, None, io, None)?;
34313487

34323488
guest.activate().await?;
34333489
guest.query_work_queue().await?;
@@ -3483,10 +3539,9 @@ mod test {
34833539
let tds = TestDownstairsSet::small(false).await?;
34843540
let opts = tds.opts();
34853541

3486-
let guest = Arc::new(Guest::new(None));
3487-
let gc = guest.clone();
3542+
let (guest, io) = Guest::new(None);
34883543

3489-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3544+
let _join_handle = up_main(opts, 1, None, io, None)?;
34903545

34913546
guest.activate().await?;
34923547
guest.query_work_queue().await?;
@@ -3537,10 +3592,9 @@ mod test {
35373592
let tds = TestDownstairsSet::small(false).await?;
35383593
let opts = tds.opts();
35393594

3540-
let guest = Arc::new(Guest::new(None));
3541-
let gc = guest.clone();
3595+
let (guest, io) = Guest::new(None);
35423596

3543-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3597+
let _join_handle = up_main(opts, 1, None, io, None)?;
35443598

35453599
guest.activate().await?;
35463600
guest.query_work_queue().await?;
@@ -3591,10 +3645,9 @@ mod test {
35913645
let tds = TestDownstairsSet::small(false).await?;
35923646
let opts = tds.opts();
35933647

3594-
let guest = Arc::new(Guest::new(None));
3595-
let gc = guest.clone();
3648+
let (guest, io) = Guest::new(None);
35963649

3597-
let _join_handle = up_main(opts, 1, None, gc, None)?;
3650+
let _join_handle = up_main(opts, 1, None, io, None)?;
35983651

35993652
guest.activate().await?;
36003653
guest.query_work_queue().await?;
@@ -3643,10 +3696,9 @@ mod test {
36433696
let tds = TestDownstairsSet::small(false).await.unwrap();
36443697
let opts = tds.opts();
36453698

3646-
let guest = Arc::new(Guest::new(None));
3647-
let gc = guest.clone();
3699+
let (guest, io) = Guest::new(None);
36483700

3649-
let _join_handle = up_main(opts, 1, None, gc, None).unwrap();
3701+
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
36503702

36513703
guest.activate().await.unwrap();
36523704

@@ -3679,10 +3731,9 @@ mod test {
36793731
let tds = TestDownstairsSet::small(false).await.unwrap();
36803732
let opts = tds.opts();
36813733

3682-
let guest = Arc::new(Guest::new(None));
3683-
let gc = guest.clone();
3734+
let (guest, io) = Guest::new(None);
36843735

3685-
let _join_handle = up_main(opts, 1, None, gc, None).unwrap();
3736+
let _join_handle = up_main(opts, 1, None, io, None).unwrap();
36863737

36873738
guest.activate().await.unwrap();
36883739

measure_iops/src/main.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,19 @@ async fn main() -> Result<()> {
101101
std::process::exit(1);
102102
}));
103103

104-
let mut guest = Guest::new(None);
104+
let (guest, mut io) = Guest::new(None);
105+
let guest = Arc::new(guest);
105106

106107
if let Some(iop_limit) = opt.iop_limit {
107-
guest.set_iop_limit(16 * 1024 * 1024, iop_limit);
108+
io.set_iop_limit(16 * 1024 * 1024, iop_limit);
108109
}
109110

110111
if let Some(bw_limit) = opt.bw_limit_in_bytes {
111-
guest.set_bw_limit(bw_limit);
112+
io.set_bw_limit(bw_limit);
112113
}
113114

114115
let guest = Arc::new(guest);
115-
let _join_handle =
116-
up_main(crucible_opts, opt.gen, None, guest.clone(), None)?;
116+
let _join_handle = up_main(crucible_opts, opt.gen, None, io, None)?;
117117
println!("Crucible runtime is spawned");
118118

119119
guest.activate().await?;

nbd_server/src/main.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
// Copyright 2021 Oxide Computer Company
22
use std::net::SocketAddr;
3-
use std::sync::Arc;
43

54
use anyhow::{bail, Result};
65
use clap::Parser;
@@ -91,10 +90,9 @@ async fn main() -> Result<()> {
9190
* We create this here instead of inside up_main() so we can use
9291
* the methods provided by guest to interact with Crucible.
9392
*/
94-
let guest = Arc::new(Guest::new(None));
93+
let (guest, io) = Guest::new(None);
9594

96-
let _join_handle =
97-
up_main(crucible_opts, opt.gen, None, guest.clone(), None)?;
95+
let _join_handle = up_main(crucible_opts, opt.gen, None, io, None)?;
9896
println!("Crucible runtime is spawned");
9997

10098
// NBD server

upstairs/src/client.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -2311,7 +2311,12 @@ async fn client_run(
23112311
.await;
23122312

23132313
warn!(log, "client task is sending Done({r:?})");
2314-
tx.send(ClientResponse::Done(r)).await.unwrap();
2314+
if tx.send(ClientResponse::Done(r)).await.is_err() {
2315+
warn!(
2316+
log,
2317+
"client task could not reply to main task; shutting down?"
2318+
);
2319+
}
23152320
while let Some(v) = rx.recv().await {
23162321
warn!(log, "exiting client task is ignoring message {v:?}");
23172322
}

upstairs/src/dummy_downstairs_tests.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -522,8 +522,8 @@ pub(crate) mod protocol_test {
522522

523523
// Configure our guest without queue backpressure, to speed up tests
524524
// which require triggering a timeout
525-
let mut g = Guest::new(Some(log.clone()));
526-
g.backpressure_config.queue_max_delay = Duration::ZERO;
525+
let (g, mut io) = Guest::new(Some(log.clone()));
526+
io.backpressure_config.queue_max_delay = Duration::ZERO;
527527
let guest = Arc::new(g);
528528

529529
let crucible_opts = CrucibleOpts {
@@ -536,7 +536,7 @@ pub(crate) mod protocol_test {
536536
};
537537

538538
let join_handle =
539-
up_main(crucible_opts, 1, None, guest.clone(), None).unwrap();
539+
up_main(crucible_opts, 1, None, io, None).unwrap();
540540

541541
let mut handles: Vec<JoinHandle<Result<()>>> = vec![];
542542

0 commit comments

Comments
 (0)