Skip to content

Commit 862cd98

Browse files
authored
Fix write reordering bug (#1448)
If we have _any_ block operations in the deferred queue (i.e. because we're encrypting a large write in the thread pool), then every block operation must go through the queue to preserve ordering. Unfortunately, we were checking the **wrong** `DeferredQueue` when checking whether to defer writes. This means that a large (deferred) write followed by a short (non-deferred) write could be reordered so that the short write happens first. This is probably the root cause of CI failures that we saw in #1445: fast-ack means that the Guest is able to send the short write with less delay, making it more likely to hit this race condition.
1 parent 5233258 commit 862cd98

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

upstairs/src/deferred.rs

+6
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ impl<T> DeferredQueue<T> {
9393
pub fn is_empty(&self) -> bool {
9494
self.empty
9595
}
96+
97+
/// Returns the number of futures in the queue
98+
#[allow(dead_code)] // only used in unit tests
99+
pub fn len(&self) -> usize {
100+
self.stream.len()
101+
}
96102
}
97103

98104
////////////////////////////////////////////////////////////////////////////////

upstairs/src/upstairs.rs

+38-1
Original file line numberDiff line numberDiff line change
@@ -1391,7 +1391,7 @@ impl Upstairs {
13911391
if let Some(w) =
13921392
self.compute_deferred_write(offset, data, res, is_write_unwritten)
13931393
{
1394-
let should_defer = !self.deferred_msgs.is_empty()
1394+
let should_defer = !self.deferred_ops.is_empty()
13951395
|| w.data.len() > MIN_DEFER_SIZE_BYTES as usize;
13961396
if should_defer {
13971397
let tx = self.deferred_ops.push_oneshot();
@@ -4205,4 +4205,41 @@ pub(crate) mod test {
42054205
assert!(!r.contains("HashMismatch"));
42064206
assert!(r.contains("read hash mismatch"));
42074207
}
4208+
4209+
#[test]
4210+
fn write_defer() {
4211+
let mut up = make_upstairs();
4212+
up.force_active().unwrap();
4213+
set_all_active(&mut up.downstairs);
4214+
4215+
const NODEFER_SIZE: usize = MIN_DEFER_SIZE_BYTES as usize - 512;
4216+
const DEFER_SIZE: usize = MIN_DEFER_SIZE_BYTES as usize * 2;
4217+
4218+
// Submit a short write, which should not be deferred
4219+
let mut data = BytesMut::new();
4220+
data.extend_from_slice(vec![1; NODEFER_SIZE].as_slice());
4221+
let offset = BlockIndex(7);
4222+
let (_res, done) = BlockOpWaiter::pair();
4223+
up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done }));
4224+
assert_eq!(up.deferred_ops.len(), 0);
4225+
4226+
// Submit a long write, which should be deferred
4227+
let mut data = BytesMut::new();
4228+
data.extend_from_slice(vec![2; DEFER_SIZE].as_slice());
4229+
let offset = BlockIndex(7);
4230+
let (_res, done) = BlockOpWaiter::pair();
4231+
up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done }));
4232+
assert_eq!(up.deferred_ops.len(), 1);
4233+
assert_eq!(up.deferred_msgs.len(), 0);
4234+
4235+
// Submit a short write, which would normally not be deferred, but
4236+
// there's already a deferred job in the queue
4237+
let mut data = BytesMut::new();
4238+
data.extend_from_slice(vec![3; NODEFER_SIZE].as_slice());
4239+
let offset = BlockIndex(7);
4240+
let (_res, done) = BlockOpWaiter::pair();
4241+
up.apply(UpstairsAction::Guest(BlockOp::Write { offset, data, done }));
4242+
assert_eq!(up.deferred_ops.len(), 2);
4243+
assert_eq!(up.deferred_msgs.len(), 0);
4244+
}
42084245
}

0 commit comments

Comments
 (0)