Skip to content

Commit fc7a1ad

Browse files
authored
Buffer should destructure to Vec when single-referenced
Until now, Buffer offered no means of extracting the internal `Vec<u8>` when, say, a Volume::read() operation had completed. This made it impossible for Crucible consumers to reuse the `Vec` buffer, forcing an otherwise unnecessary allocation.
1 parent 7c1281b commit fc7a1ad

File tree

9 files changed

+311
-320
lines changed

9 files changed

+311
-320
lines changed

crudd/src/main.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async fn cmd_read<T: BlockIO>(
180180
// So say we have an offset of 5. we're misaligned by 5 bytes, so we
181181
// read 5 bytes we don't need. we skip those 5 bytes then write
182182
// the rest to the output
183-
let bytes = buffer.as_vec().await;
183+
let bytes = buffer.into_vec().unwrap();
184184
output.write_all(
185185
&bytes[offset_misalignment as usize
186186
..(offset_misalignment + alignment_bytes) as usize],
@@ -314,7 +314,7 @@ async fn write_remainder_and_finalize<'a, T: BlockIO>(
314314
crucible.read(uflow_offset, uflow_r_buf.clone()).await?;
315315

316316
// Copy it into w_buf
317-
let r_bytes = uflow_r_buf.as_vec().await;
317+
let r_bytes = uflow_r_buf.into_vec().unwrap();
318318
w_buf[n_read..n_read + uflow_backfill]
319319
.copy_from_slice(&r_bytes[uflow_remainder as usize..]);
320320

@@ -400,7 +400,7 @@ async fn cmd_write<T: BlockIO>(
400400
let offset = Block::new(block_idx, native_block_size.trailing_zeros());
401401
crucible.read(offset, buffer.clone()).await?;
402402

403-
let mut w_vec = buffer.as_vec().await.clone();
403+
let mut w_vec = buffer.into_vec().unwrap();
404404
// Write our data into the buffer
405405
let bytes_read = input.read(
406406
&mut w_vec[offset_misalignment as usize

crutest/src/cli.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -224,13 +224,12 @@ async fn cli_read(
224224
let offset = Block::new(block_index as u64, ri.block_size.trailing_zeros());
225225
let length: usize = size * ri.block_size as usize;
226226

227-
let vec: Vec<u8> = vec![255; length];
228-
let data = crucible::Buffer::from_vec(vec);
227+
let data = crucible::Buffer::from_vec(vec![255; length]);
229228

230229
println!("Read at block {:5}, len:{:7}", offset.value, data.len());
231230
guest.read(offset, data.clone()).await?;
232231

233-
let mut dl = data.as_vec().await.to_vec();
232+
let mut dl = data.into_vec().unwrap();
234233
match validate_vec(
235234
dl.clone(),
236235
block_index,

crutest/src/main.rs

+20-29
Original file line numberDiff line numberDiff line change
@@ -1121,11 +1121,10 @@ async fn verify_volume(
11211121
};
11221122

11231123
let length: usize = next_io_blocks * ri.block_size as usize;
1124-
let vec: Vec<u8> = vec![255; length];
1125-
let data = crucible::Buffer::from_vec(vec);
1124+
let data = crucible::Buffer::from_vec(vec![255; length]);
11261125
guest.read(offset, data.clone()).await?;
11271126

1128-
let dl = data.as_vec().await.to_vec();
1127+
let dl = data.into_vec().unwrap();
11291128
match validate_vec(
11301129
dl,
11311130
block_index,
@@ -1361,11 +1360,10 @@ async fn balloon_workload(
13611360
guest.flush(None).await?;
13621361

13631362
let length: usize = size * ri.block_size as usize;
1364-
let vec: Vec<u8> = vec![255; length];
1365-
let data = crucible::Buffer::from_vec(vec);
1363+
let data = crucible::Buffer::from_vec(vec![255; length]);
13661364
guest.read(offset, data.clone()).await?;
13671365

1368-
let dl = data.as_vec().await.to_vec();
1366+
let dl = data.into_vec().unwrap();
13691367
match validate_vec(
13701368
dl,
13711369
block_index,
@@ -1579,8 +1577,7 @@ async fn generic_workload(
15791577
} else {
15801578
// Read (+ verify)
15811579
let length: usize = size * ri.block_size as usize;
1582-
let vec: Vec<u8> = vec![255; length];
1583-
let data = crucible::Buffer::from_vec(vec);
1580+
let data = crucible::Buffer::from_vec(vec![255; length]);
15841581
if !quiet {
15851582
match wtq {
15861583
WhenToQuit::Count { count } => {
@@ -1605,7 +1602,7 @@ async fn generic_workload(
16051602
}
16061603
guest.read(offset, data.clone()).await?;
16071604

1608-
let dl = data.as_vec().await.to_vec();
1605+
let dl = data.into_vec().unwrap();
16091606
match validate_vec(
16101607
dl,
16111608
block_index,
@@ -2217,13 +2214,12 @@ async fn one_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
22172214
guest.write(offset, data).await?;
22182215

22192216
let length: usize = size * ri.block_size as usize;
2220-
let vec: Vec<u8> = vec![255; length];
2221-
let data = crucible::Buffer::from_vec(vec);
2217+
let data = crucible::Buffer::from_vec(vec![255; length]);
22222218

22232219
println!("Read at block {:5}, len:{:7}", offset.value, data.len());
22242220
guest.read(offset, data.clone()).await?;
22252221

2226-
let dl = data.as_vec().await.to_vec();
2222+
let dl = data.into_vec().unwrap();
22272223
match validate_vec(dl, block_index, &mut ri.write_log, ri.block_size, false)
22282224
{
22292225
ValidateStatus::Bad | ValidateStatus::InRange => {
@@ -2372,11 +2368,10 @@ async fn write_flush_read_workload(
23722368
guest.flush(None).await?;
23732369

23742370
let length: usize = size * ri.block_size as usize;
2375-
let vec: Vec<u8> = vec![255; length];
2376-
let data = crucible::Buffer::from_vec(vec);
2371+
let data = crucible::Buffer::from_vec(vec![255; length]);
23772372
guest.read(offset, data.clone()).await?;
23782373

2379-
let dl = data.as_vec().await.to_vec();
2374+
let dl = data.into_vec().unwrap();
23802375
match validate_vec(
23812376
dl,
23822377
block_index,
@@ -2534,8 +2529,7 @@ async fn repair_workload(
25342529
} else {
25352530
// Read
25362531
let length: usize = size * ri.block_size as usize;
2537-
let vec: Vec<u8> = vec![255; length];
2538-
let data = crucible::Buffer::from_vec(vec);
2532+
let data = crucible::Buffer::from_vec(vec![255; length]);
25392533
println!(
25402534
"{:>0width$}/{:>0width$} Read \
25412535
block {:>bw$} len {:>sw$}",
@@ -2547,7 +2541,7 @@ async fn repair_workload(
25472541
bw = block_width,
25482542
sw = size_width,
25492543
);
2550-
guest.read(offset, data.clone()).await?;
2544+
guest.read(offset, data).await?;
25512545
}
25522546
}
25532547
}
@@ -2615,10 +2609,9 @@ async fn demo_workload(
26152609
} else {
26162610
// Read
26172611
let length: usize = size * ri.block_size as usize;
2618-
let vec: Vec<u8> = vec![255; length];
2619-
let data = crucible::Buffer::from_vec(vec);
2612+
let data = crucible::Buffer::from_vec(vec![255; length]);
26202613

2621-
let future = guest.read(offset, data.clone());
2614+
let future = guest.read(offset, data);
26222615
futureslist.push(future);
26232616
}
26242617
}
@@ -2677,13 +2670,12 @@ async fn span_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
26772670
guest.flush(None).await?;
26782671

26792672
let length: usize = 2 * ri.block_size as usize;
2680-
let vec: Vec<u8> = vec![99; length];
2681-
let data = crucible::Buffer::from_vec(vec);
2673+
let data = crucible::Buffer::from_vec(vec![99; length]);
26822674

26832675
println!("Sending a read spanning two extents");
26842676
guest.read(offset, data.clone()).await?;
26852677

2686-
let dl = data.as_vec().await.to_vec();
2678+
let dl = data.into_vec().unwrap();
26872679
match validate_vec(dl, block_index, &mut ri.write_log, ri.block_size, false)
26882680
{
26892681
ValidateStatus::Bad | ValidateStatus::InRange => {
@@ -2718,11 +2710,10 @@ async fn big_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
27182710
guest.flush(None).await?;
27192711

27202712
let length: usize = ri.block_size as usize;
2721-
let vec: Vec<u8> = vec![255; length];
2722-
let data = crucible::Buffer::from_vec(vec);
2713+
let data = crucible::Buffer::from_vec(vec![255; length]);
27232714
guest.read(offset, data.clone()).await?;
27242715

2725-
let dl = data.as_vec().await.to_vec();
2716+
let dl = data.into_vec().unwrap();
27262717
match validate_vec(
27272718
dl,
27282719
block_index,
@@ -2848,8 +2839,8 @@ async fn dep_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
28482839
let future = guest.write_to_byte_offset(my_offset, data);
28492840
futureslist.push(future);
28502841
} else {
2851-
let vec: Vec<u8> = vec![0; ri.block_size as usize];
2852-
let data = crucible::Buffer::from_vec(vec);
2842+
let data =
2843+
crucible::Buffer::from_vec(vec![0; ri.block_size as usize]);
28532844

28542845
println!(
28552846
"Loop:{} send read {} @ offset:{} len:{}",

pantry/src/pantry.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,7 @@ impl PantryEntry {
298298
.read_from_byte_offset(offset, buffer.clone())
299299
.await?;
300300

301-
let response = buffer.as_vec().await;
302-
Ok(response.clone())
301+
Ok(buffer.into_vec().unwrap())
303302
}
304303

305304
pub async fn scrub(&self) -> Result<(), CrucibleError> {
@@ -341,7 +340,7 @@ impl PantryEntry {
341340
.read_from_byte_offset(start, data.clone())
342341
.await?;
343342

344-
hasher.update(&*data.as_vec().await);
343+
hasher.update(&data.into_vec().unwrap())
345344
}
346345

347346
let digest = hex::encode(hasher.finalize());

upstairs/src/block_req.rs

+25-54
Original file line numberDiff line numberDiff line change
@@ -11,41 +11,34 @@ use tokio::sync::oneshot;
1111
#[derive(Debug)]
1212
pub(crate) struct BlockReq {
1313
pub op: BlockOp,
14-
sender: oneshot::Sender<Result<(), CrucibleError>>,
14+
pub res: BlockRes,
1515
}
1616

17-
impl BlockReq {
18-
pub fn new(
19-
op: BlockOp,
20-
sender: oneshot::Sender<Result<(), CrucibleError>>,
21-
) -> BlockReq {
22-
Self { op, sender }
23-
}
24-
25-
/// Return a copy of the block op
26-
pub fn op(&self) -> BlockOp {
27-
self.op.clone()
28-
}
29-
30-
/// Consume this BlockReq and send Ok to the receiver
17+
#[must_use]
18+
#[derive(Debug)]
19+
pub(crate) struct BlockRes(Option<oneshot::Sender<Result<(), CrucibleError>>>);
20+
impl BlockRes {
21+
/// Consume this BlockRes and send Ok to the receiver
3122
pub fn send_ok(self) {
3223
self.send_result(Ok(()))
3324
}
3425

35-
/// Consume this BlockReq and send an Err to the receiver
26+
/// Consume this BlockRes and send an Err to the receiver
3627
pub fn send_err(self, e: CrucibleError) {
3728
self.send_result(Err(e))
3829
}
3930

40-
/// Consume this BlockReq and send a Result to the receiver
41-
pub fn send_result(self, r: Result<(), CrucibleError>) {
31+
/// Consume this BlockRes and send a Result to the receiver
32+
fn send_result(mut self, r: Result<(), CrucibleError>) {
4233
// XXX this eats the result!
43-
let _ = self.sender.send(r);
34+
let _ = self.0.take().expect("sender was populated").send(r);
4435
}
45-
46-
/// Consume this BlockReq and return the inner oneshot sender
47-
pub fn take_sender(self) -> oneshot::Sender<Result<(), CrucibleError>> {
48-
self.sender
36+
}
37+
impl Drop for BlockRes {
38+
fn drop(&mut self) {
39+
// Dropping a BlockRes without issuing a completion would mean the
40+
// associated waiter would be stuck waiting forever for a result.
41+
assert!(self.0.is_none(), "result should be sent for BlockRes");
4942
}
5043
}
5144

@@ -61,10 +54,10 @@ pub(crate) struct BlockReqWaiter {
6154
}
6255

6356
impl BlockReqWaiter {
64-
pub fn new(
65-
recv: oneshot::Receiver<Result<(), CrucibleError>>,
66-
) -> BlockReqWaiter {
67-
Self { recv }
57+
/// Create associated `BlockReqWaiter`/`BlockRes` pair
58+
pub fn pair() -> (BlockReqWaiter, BlockRes) {
59+
let (send, recv) = oneshot::channel();
60+
(Self { recv }, BlockRes(Some(send)))
6861
}
6962

7063
/// Consume this BlockReqWaiter and wait on the message
@@ -75,7 +68,7 @@ impl BlockReqWaiter {
7568
}
7669
}
7770

78-
#[allow(dead_code)]
71+
#[cfg(test)]
7972
pub fn try_wait(&mut self) -> Option<Result<(), CrucibleError>> {
8073
match self.recv.try_recv() {
8174
Ok(v) => Some(v),
@@ -93,42 +86,20 @@ impl BlockReqWaiter {
9386
mod test {
9487
use super::*;
9588

96-
#[tokio::test]
97-
async fn test_blockreqwaiter_send() {
98-
let (send, recv) = oneshot::channel();
99-
let brw = BlockReqWaiter::new(recv);
100-
101-
send.send(Ok(())).unwrap();
102-
103-
brw.wait().await.unwrap();
104-
}
105-
10689
#[tokio::test]
10790
async fn test_blockreq_and_blockreqwaiter() {
108-
let (send, recv) = oneshot::channel();
109-
110-
let op = BlockOp::Flush {
111-
snapshot_details: None,
112-
};
113-
let br = BlockReq::new(op, send);
114-
let brw = BlockReqWaiter::new(recv);
91+
let (brw, res) = BlockReqWaiter::pair();
11592

116-
br.send_ok();
93+
res.send_ok();
11794

11895
brw.wait().await.unwrap();
11996
}
12097

12198
#[tokio::test]
12299
async fn test_blockreq_and_blockreqwaiter_err() {
123-
let (send, recv) = oneshot::channel();
124-
125-
let op = BlockOp::Flush {
126-
snapshot_details: None,
127-
};
128-
let br = BlockReq::new(op, send);
129-
let brw = BlockReqWaiter::new(recv);
100+
let (brw, res) = BlockReqWaiter::pair();
130101

131-
br.send_err(CrucibleError::UpstairsInactive);
102+
res.send_err(CrucibleError::UpstairsInactive);
132103

133104
assert!(brw.wait().await.is_err());
134105
}

0 commit comments

Comments
 (0)