Skip to content

Commit dc3610a

Browse files
committed
Back to parallel flushing
1 parent 9d5ca85 commit dc3610a

File tree

3 files changed

+39
-10
lines changed

3 files changed

+39
-10
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

downstairs/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ opentelemetry.workspace = true
3232
oximeter-producer.workspace = true
3333
oximeter.workspace = true
3434
rand.workspace = true
35+
rayon.workspace = true
3536
repair-client.workspace = true
3637
reqwest.workspace = true
3738
ringbuffer.workspace = true

downstairs/src/region.rs

+37-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2023 Oxide Computer Company
22
use std::cmp::Ordering;
3-
use std::collections::{HashMap, HashSet};
3+
use std::collections::{BTreeSet, HashMap, HashSet};
44
use std::convert::TryInto;
55
use std::fmt::Debug;
66
use std::fs::{rename, File, OpenOptions};
@@ -844,7 +844,7 @@ impl Region {
844844

845845
// Select extents we're going to flush, while respecting the
846846
// extent_limit if one was provided.
847-
let dirty_extents: Vec<usize> = match extent_limit {
847+
let dirty_extents: BTreeSet<usize> = match extent_limit {
848848
None => self.dirty_extents.iter().copied().collect(),
849849
Some(el) => {
850850
if el > self.def.extent_count().try_into().unwrap() {
@@ -859,13 +859,40 @@ impl Region {
859859
}
860860
};
861861

862-
// TODO: parallelism?
863-
let mut results = vec![];
864-
for eid in &dirty_extents {
865-
let log = self.log.clone();
866-
let extent = self.get_opened_extent_mut(*eid);
867-
results.push(extent.flush(flush_number, gen_number, job_id, &log));
868-
}
862+
// This is a bit sneaky: we want to perform each flush in a separate
863+
// task for *parallelism*, but can't call `self.get_opened_extent_mut`
864+
// multiple times. In addition, we can't use the tokio thread pool to
865+
// spawn a task, because that requires a 'static lifetime, which we
866+
// can't get from a borrowed Extent.
867+
//
868+
// We'll combine two tricks to work around the issue:
869+
// - Do the work in the rayon thread pool instead of using tokio tasks
870+
// - Carefully walk self.extents.as_mut_slice() to mutably borrow
871+
// multiple at the same time.
872+
873+
let mut slice_start = 0;
874+
let mut slice = self.extents.as_mut_slice();
875+
let mut results = vec![Ok(()); dirty_extents.len()];
876+
rayon::scope(|s| {
877+
for (eid, r) in dirty_extents.iter().zip(results.iter_mut()) {
878+
let next = eid - slice_start;
879+
slice = &mut slice[next..];
880+
let (extent, rest) = slice.split_first_mut().unwrap();
881+
let ExtentState::Opened(extent) = extent else {
882+
panic!("can't flush closed extent");
883+
};
884+
slice = rest;
885+
slice_start += next + 1;
886+
s.spawn(|_| {
887+
*r = extent.flush(
888+
flush_number,
889+
gen_number,
890+
job_id,
891+
&self.log,
892+
)
893+
});
894+
}
895+
});
869896

870897
cdt::os__flush__done!(|| job_id.0);
871898

@@ -1932,7 +1959,7 @@ pub(crate) mod test {
19321959
let mut file = OpenOptions::new()
19331960
.read(true)
19341961
.write(true)
1935-
.open(&extent_file)?;
1962+
.open(extent_file)?;
19361963
extent_inner_raw::RawInner::import(
19371964
&mut file,
19381965
&ddef,

0 commit comments

Comments
 (0)