@@ -11,6 +11,7 @@ use std::{cmp, io};
11
11
use anyhow:: { bail, Result } ;
12
12
use clap:: Parser ;
13
13
use futures:: stream:: StreamExt ;
14
+ use itertools:: Itertools ;
14
15
use signal_hook:: consts:: signal:: * ;
15
16
use signal_hook_tokio:: Signals ;
16
17
use tokio:: sync:: oneshot;
@@ -102,7 +103,7 @@ pub fn opts() -> Result<Opt> {
102
103
Ok ( opt)
103
104
}
104
105
105
- async fn cmd_read < T : BlockIO > (
106
+ async fn cmd_read < T : BlockIO + std :: marker :: Send + ' static > (
106
107
opt : & Opt ,
107
108
crucible : Arc < T > ,
108
109
mut early_shutdown : oneshot:: Receiver < ( ) > ,
@@ -154,7 +155,6 @@ async fn cmd_read<T: BlockIO>(
154
155
//
155
156
// TODO this is no longer true after the Upstairs is async, multiple
156
157
// requests can be submitted and await'ed on at the same time.
157
- let mut buffers = VecDeque :: with_capacity ( opt. pipeline_length ) ;
158
158
let mut futures = VecDeque :: with_capacity ( opt. pipeline_length ) ;
159
159
160
160
// First, align our offset to the underlying blocks with an initial read
@@ -171,16 +171,16 @@ async fn cmd_read<T: BlockIO>(
171
171
cmp:: min ( num_bytes, native_block_size - offset_misalignment) ;
172
172
if offset_misalignment != 0 {
173
173
// Read the full block
174
- let buffer = Buffer :: new ( native_block_size as usize ) ;
174
+ let mut buffer = Buffer :: new ( 1 , native_block_size as usize ) ;
175
175
let block_idx = opt. byte_offset / native_block_size;
176
176
let offset = Block :: new ( block_idx, native_block_size. trailing_zeros ( ) ) ;
177
- crucible. read ( offset, buffer. clone ( ) ) . await ?;
177
+ crucible. read ( offset, & mut buffer) . await ?;
178
178
179
179
// write only (block size - misalignment) bytes
180
180
// So say we have an offset of 5. we're misaligned by 5 bytes, so we
181
181
// read 5 bytes we don't need. we skip those 5 bytes then write
182
182
// the rest to the output
183
- let bytes = buffer. into_vec ( ) . unwrap ( ) ;
183
+ let bytes = buffer. into_vec ( ) ;
184
184
output. write_all (
185
185
& bytes[ offset_misalignment as usize
186
186
..( offset_misalignment + alignment_bytes) as usize ] ,
@@ -207,47 +207,50 @@ async fn cmd_read<T: BlockIO>(
207
207
let cmd_count = num_bytes / ( opt. iocmd_block_count * native_block_size) ;
208
208
let remainder = num_bytes % ( opt. iocmd_block_count * native_block_size) ;
209
209
210
- // Issue all of our read commands
211
- for i in 0 ..cmd_count {
212
- // which blocks in the underlying store are we accessing?
213
- let block_idx = block_offset + ( i * opt. iocmd_block_count ) ;
214
- let offset = Block :: new ( block_idx, native_block_size. trailing_zeros ( ) ) ;
210
+ let mut buffers: Vec < Buffer > = ( 0 ..opt. pipeline_length )
211
+ . map ( |_| {
212
+ Buffer :: new (
213
+ opt. iocmd_block_count as usize ,
214
+ native_block_size as usize ,
215
+ )
216
+ } )
217
+ . collect ( ) ;
215
218
216
- // Send the read command with whichever buffer is at the back of the
217
- // queue. We re-use the buffers to avoid lots of allocations
218
- let w_buf =
219
- Buffer :: new ( ( opt. iocmd_block_count * native_block_size) as usize ) ;
220
- let w_future = crucible. read ( offset, w_buf. clone ( ) ) ;
221
- buffers. push_back ( w_buf) ;
222
- futures. push_back ( w_future) ;
223
- total_bytes_read +=
224
- ( opt. iocmd_block_count * native_block_size) as usize ;
219
+ // Issue all of our read commands
220
+ for cmd_range in & ( 0 ..cmd_count as usize ) . chunks ( opt. pipeline_length ) {
221
+ for i in cmd_range {
222
+ // which blocks in the underlying store are we accessing?
223
+ let block_idx = block_offset + ( i as u64 * opt. iocmd_block_count ) ;
224
+ let offset =
225
+ Block :: new ( block_idx, native_block_size. trailing_zeros ( ) ) ;
226
+
227
+ // Send the read command with whichever buffer is at the back of the
228
+ // queue. We re-use the buffers to avoid lots of allocations
229
+ let mut buffer = buffers. pop ( ) . unwrap ( ) ;
230
+ let crucible = crucible. clone ( ) ;
231
+ futures. push_back ( tokio:: spawn ( async move {
232
+ crucible. read ( offset, & mut buffer) . await ?;
233
+ Ok ( buffer)
234
+ } ) ) ;
235
+
236
+ total_bytes_read +=
237
+ ( opt. iocmd_block_count * native_block_size) as usize ;
238
+ }
225
239
226
- // If we have a queue of futures, drain the oldest one to output.
227
- if futures . len ( ) == opt . pipeline_length {
228
- futures . pop_front ( ) . unwrap ( ) . await ?;
229
- let r_buf = buffers . pop_front ( ) . unwrap ( ) ;
230
- output . write_all ( & r_buf. as_vec ( ) . await ) ? ;
240
+ for future in futures. drain ( .. ) {
241
+ let result : Result < Buffer , CrucibleError > = future . await ? ;
242
+ let r_buf = result ?;
243
+ output . write_all ( & r_buf ) ? ;
244
+ buffers . push ( r_buf) ;
231
245
}
232
246
233
247
if early_shutdown. try_recv ( ) . is_ok ( ) {
234
248
eprintln ! ( "shutting down early in response to SIGUSR1" ) ;
235
- join_all ( futures) . await ?;
236
249
return Ok ( total_bytes_read) ;
237
250
}
238
251
}
239
252
240
- // Drain the outstanding commands
241
- if !futures. is_empty ( ) {
242
- crucible:: join_all ( futures) . await ?;
243
-
244
- // drain the buffer to the output file
245
- while !buffers. is_empty ( ) {
246
- // unwrapping is safe because of the length check
247
- let r_buf = buffers. pop_front ( ) . unwrap ( ) ;
248
- output. write_all ( & r_buf. as_vec ( ) . await ) ?;
249
- }
250
- }
253
+ assert ! ( futures. is_empty( ) ) ;
251
254
252
255
// Issue our final read command, if any. This could be interleaved with
253
256
// draining the outstanding commands but it's more complicated and
@@ -256,12 +259,13 @@ async fn cmd_read<T: BlockIO>(
256
259
// let block_remainder = remainder % native_block_size;
257
260
// round up
258
261
let blocks = ( remainder + native_block_size - 1 ) / native_block_size;
259
- let buffer = Buffer :: new ( ( blocks * native_block_size) as usize ) ;
262
+ let mut buffer =
263
+ Buffer :: new ( blocks as usize , native_block_size as usize ) ;
260
264
let block_idx = ( cmd_count * opt. iocmd_block_count ) + block_offset;
261
265
let offset = Block :: new ( block_idx, native_block_size. trailing_zeros ( ) ) ;
262
- crucible. read ( offset, buffer. clone ( ) ) . await ?;
266
+ crucible. read ( offset, & mut buffer) . await ?;
263
267
total_bytes_read += remainder as usize ;
264
- output. write_all ( & buffer. as_vec ( ) . await [ 0 ..remainder as usize ] ) ?;
268
+ output. write_all ( & buffer[ 0 ..remainder as usize ] ) ?;
265
269
}
266
270
267
271
Ok ( total_bytes_read)
@@ -280,7 +284,7 @@ async fn write_remainder_and_finalize<'a, T: BlockIO>(
280
284
offset : Block ,
281
285
n_read : usize ,
282
286
native_block_size : u64 ,
283
- mut futures : VecDeque < crucible :: CrucibleBlockIOFuture < ' a > > ,
287
+ mut futures : VecDeque < CrucibleBlockIOFuture < ' a > > ,
284
288
) -> Result < ( ) > {
285
289
// the input stream ended,
286
290
// - read/mod/write for alignment
@@ -310,11 +314,11 @@ async fn write_remainder_and_finalize<'a, T: BlockIO>(
310
314
offset. value + uflow_blocks,
311
315
native_block_size. trailing_zeros ( ) ,
312
316
) ;
313
- let uflow_r_buf = Buffer :: new ( native_block_size as usize ) ;
314
- crucible. read ( uflow_offset, uflow_r_buf. clone ( ) ) . await ?;
317
+ let mut uflow_r_buf = Buffer :: new ( 1 , native_block_size as usize ) ;
318
+ crucible. read ( uflow_offset, & mut uflow_r_buf) . await ?;
315
319
316
320
// Copy it into w_buf
317
- let r_bytes = uflow_r_buf. into_vec ( ) . unwrap ( ) ;
321
+ let r_bytes = uflow_r_buf. into_vec ( ) ;
318
322
w_buf[ n_read..n_read + uflow_backfill]
319
323
. copy_from_slice ( & r_bytes[ uflow_remainder as usize ..] ) ;
320
324
@@ -323,12 +327,12 @@ async fn write_remainder_and_finalize<'a, T: BlockIO>(
323
327
futures. push_back ( w_future) ;
324
328
}
325
329
326
- // Flush
327
- let flush_future = crucible. flush ( None ) ;
328
- futures. push_back ( flush_future) ;
330
+ // Wait for all the writes first, then flush
331
+ for future in futures {
332
+ future. await ?;
333
+ }
329
334
330
- // Wait for all the writes
331
- join_all ( futures) . await ?;
335
+ crucible. flush ( None ) . await ?;
332
336
333
337
Ok ( ( ) )
334
338
}
@@ -395,12 +399,12 @@ async fn cmd_write<T: BlockIO>(
395
399
// We need to read-modify-write here.
396
400
397
401
// Read the full block
398
- let buffer = Buffer :: new ( native_block_size as usize ) ;
402
+ let mut buffer = Buffer :: new ( 1 , native_block_size as usize ) ;
399
403
let block_idx = opt. byte_offset / native_block_size;
400
404
let offset = Block :: new ( block_idx, native_block_size. trailing_zeros ( ) ) ;
401
- crucible. read ( offset, buffer. clone ( ) ) . await ?;
405
+ crucible. read ( offset, & mut buffer) . await ?;
402
406
403
- let mut w_vec = buffer. into_vec ( ) . unwrap ( ) ;
407
+ let mut w_vec = buffer. into_vec ( ) ;
404
408
// Write our data into the buffer
405
409
let bytes_read = input. read (
406
410
& mut w_vec[ offset_misalignment as usize
@@ -485,7 +489,9 @@ async fn cmd_write<T: BlockIO>(
485
489
486
490
if early_shutdown. try_recv ( ) . is_ok ( ) {
487
491
eprintln ! ( "shutting down early in response to SIGUSR1" ) ;
488
- join_all ( futures) . await ?;
492
+ for future in futures {
493
+ future. await ?;
494
+ }
489
495
crucible. flush ( None ) . await ?;
490
496
return Ok ( total_bytes_written) ;
491
497
}
@@ -523,7 +529,9 @@ async fn cmd_write<T: BlockIO>(
523
529
)
524
530
. await ?;
525
531
} else {
526
- join_all ( futures) . await ?;
532
+ for future in futures {
533
+ future. await ?;
534
+ }
527
535
crucible. flush ( None ) . await ?;
528
536
}
529
537
@@ -536,14 +544,10 @@ async fn handle_signals(
536
544
mut signals : Signals ,
537
545
early_shutdown : oneshot:: Sender < ( ) > ,
538
546
) {
539
- while let Some ( signal) = signals. next ( ) . await {
540
- match signal {
541
- SIGUSR1 => {
542
- early_shutdown. send ( ( ) ) . unwrap ( ) ;
543
- break ;
544
- }
545
- _ => unreachable ! ( ) ,
546
- }
547
+ match signals. next ( ) . await {
548
+ Some ( SIGUSR1 ) => early_shutdown. send ( ( ) ) . unwrap ( ) ,
549
+ Some ( _) => unreachable ! ( ) ,
550
+ None => ( ) , // signal sender is dropped
547
551
}
548
552
}
549
553
@@ -563,10 +567,10 @@ async fn main() -> Result<()> {
563
567
} ;
564
568
565
569
// TODO: volumes?
566
- let guest = Arc :: new ( Guest :: new ( None ) ) ;
570
+ let ( guest, io) = Guest :: new ( None ) ;
571
+ let guest = Arc :: new ( guest) ;
567
572
568
- let _join_handle =
569
- up_main ( crucible_opts, opt. gen , None , guest. clone ( ) , None ) ?;
573
+ let _join_handle = up_main ( crucible_opts, opt. gen , None , io, None ) ?;
570
574
eprintln ! ( "Crucible runtime is spawned" ) ;
571
575
572
576
// IO time
0 commit comments