Skip to content

Commit 4d6ae20

Browse files
committed
Move blocking flush to a worker thread
1 parent dc3610a commit 4d6ae20

File tree

1 file changed

+34
-21
lines changed

1 file changed

+34
-21
lines changed

downstairs/src/region.rs

+34-21
Original file line numberDiff line numberDiff line change
@@ -869,30 +869,43 @@ impl Region {
869869
// - Do the work in the rayon thread pool instead of using tokio tasks
870870
// - Carefully walk self.extents.as_mut_slice() to mutably borrow
871871
// multiple at the same time.
872-
873-
let mut slice_start = 0;
874-
let mut slice = self.extents.as_mut_slice();
875872
let mut results = vec![Ok(()); dirty_extents.len()];
876-
rayon::scope(|s| {
873+
if matches!(
874+
tokio::runtime::Handle::current().runtime_flavor(),
875+
tokio::runtime::RuntimeFlavor::MultiThread
876+
) {
877+
let mut slice_start = 0;
878+
let mut slice = self.extents.as_mut_slice();
879+
tokio::task::block_in_place(|| {
880+
rayon::scope(|s| {
881+
for (eid, r) in dirty_extents.iter().zip(results.iter_mut())
882+
{
883+
let next = eid - slice_start;
884+
slice = &mut slice[next..];
885+
let (extent, rest) = slice.split_first_mut().unwrap();
886+
let ExtentState::Opened(extent) = extent else {
887+
panic!("can't flush closed extent");
888+
};
889+
slice = rest;
890+
slice_start += next + 1;
891+
s.spawn(|_| {
892+
*r = extent.flush(
893+
flush_number,
894+
gen_number,
895+
job_id,
896+
&self.log,
897+
)
898+
});
899+
}
900+
})
901+
});
902+
} else {
903+
let log = self.log.clone();
877904
for (eid, r) in dirty_extents.iter().zip(results.iter_mut()) {
878-
let next = eid - slice_start;
879-
slice = &mut slice[next..];
880-
let (extent, rest) = slice.split_first_mut().unwrap();
881-
let ExtentState::Opened(extent) = extent else {
882-
panic!("can't flush closed extent");
883-
};
884-
slice = rest;
885-
slice_start += next + 1;
886-
s.spawn(|_| {
887-
*r = extent.flush(
888-
flush_number,
889-
gen_number,
890-
job_id,
891-
&self.log,
892-
)
893-
});
905+
let extent = self.get_opened_extent_mut(*eid);
906+
*r = extent.flush(flush_number, gen_number, job_id, &log);
894907
}
895-
});
908+
}
896909

897910
cdt::os__flush__done!(|| job_id.0);
898911

0 commit comments

Comments
 (0)