Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use fewer borrows in ExtentInner API #1147

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions downstairs/src/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ pub(crate) trait ExtentInner: Send + Debug {
fn read(
&mut self,
job_id: JobId,
requests: &[&crucible_protocol::ReadRequest],
responses: &mut Vec<crucible_protocol::ReadResponse>,
requests: &[crucible_protocol::ReadRequest],
iov_max: usize,
) -> Result<(), CrucibleError>;
) -> Result<Vec<crucible_protocol::ReadResponse>, CrucibleError>;

fn write(
&mut self,
job_id: JobId,
writes: &[&crucible_protocol::Write],
writes: &[crucible_protocol::Write],
only_write_unwritten: bool,
iov_max: usize,
) -> Result<(), CrucibleError>;
Expand Down Expand Up @@ -519,29 +518,28 @@ impl Extent {
pub async fn read(
&self,
job_id: JobId,
requests: &[&crucible_protocol::ReadRequest],
responses: &mut Vec<crucible_protocol::ReadResponse>,
) -> Result<(), CrucibleError> {
requests: &[crucible_protocol::ReadRequest],
) -> Result<Vec<crucible_protocol::ReadResponse>, CrucibleError> {
cdt::extent__read__start!(|| {
(job_id.0, self.number, requests.len() as u64)
});

let mut inner = self.inner.lock().await;

inner.read(job_id, requests, responses, self.iov_max)?;
let responses = inner.read(job_id, requests, self.iov_max)?;

cdt::extent__read__done!(|| {
(job_id.0, self.number, requests.len() as u64)
});

Ok(())
Ok(responses)
}

#[instrument]
pub async fn write(
&self,
job_id: JobId,
writes: &[&crucible_protocol::Write],
writes: &[crucible_protocol::Write],
only_write_unwritten: bool,
) -> Result<(), CrucibleError> {
if self.read_only {
Expand Down
93 changes: 41 additions & 52 deletions downstairs/src/extent_inner_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl ExtentInner for RawInner {
fn write(
&mut self,
job_id: JobId,
writes: &[&crucible_protocol::Write],
writes: &[crucible_protocol::Write],
only_write_unwritten: bool,
iov_max: usize,
) -> Result<(), CrucibleError> {
Expand Down Expand Up @@ -186,10 +186,9 @@ impl ExtentInner for RawInner {
fn read(
&mut self,
job_id: JobId,
requests: &[&crucible_protocol::ReadRequest],
responses: &mut Vec<crucible_protocol::ReadResponse>,
requests: &[crucible_protocol::ReadRequest],
iov_max: usize,
) -> Result<(), CrucibleError> {
) -> Result<Vec<crucible_protocol::ReadResponse>, CrucibleError> {
// This code batches up operations for contiguous regions of
// ReadRequests, so we can perform larger read syscalls queries. This
// significantly improves read throughput.
Expand All @@ -199,8 +198,9 @@ impl ExtentInner for RawInner {
// request.
let mut req_run_start = 0;
let block_size = self.extent_size.block_size_in_bytes();
let mut responses = Vec::with_capacity(requests.len());
while req_run_start < requests.len() {
let first_req = requests[req_run_start];
let first_req = &requests[req_run_start];

// Starting from the first request in the potential run, we scan
// forward until we find a request with a block that isn't
Expand Down Expand Up @@ -300,7 +300,7 @@ impl ExtentInner for RawInner {
req_run_start += n_contiguous_requests;
}

Ok(())
Ok(responses)
}

fn flush(
Expand Down Expand Up @@ -831,7 +831,7 @@ impl RawInner {

fn write_inner(
&self,
writes: &[&crucible_protocol::Write],
writes: &[crucible_protocol::Write],
writes_to_skip: &HashSet<u64>,
iov_max: usize,
) -> Result<(), CrucibleError> {
Expand Down Expand Up @@ -1001,7 +1001,7 @@ impl RawInner {
fn write_without_overlaps(
&mut self,
job_id: JobId,
writes: &[&crucible_protocol::Write],
writes: &[crucible_protocol::Write],
only_write_unwritten: bool,
iov_max: usize,
) -> Result<(), CrucibleError> {
Expand Down Expand Up @@ -1056,7 +1056,7 @@ impl RawInner {
});
let mut write_run_start = 0;
while write_run_start < writes.len() {
let first_write = writes[write_run_start];
let first_write = &writes[write_run_start];

// Starting from the first write in the potential run, we scan
// forward until we find a write with a block that isn't
Expand Down Expand Up @@ -1694,7 +1694,7 @@ mod test {
hash,
},
};
inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(10), &[write], false, IOV_MAX_TEST)?;

// The context should be in place, though we haven't flushed yet

Expand All @@ -1713,14 +1713,13 @@ mod test {
data: data.clone(),
block_context,
};
inner.write(JobId(20), &[&write], true, IOV_MAX_TEST)?;
inner.write(JobId(20), &[write], true, IOV_MAX_TEST)?;

let mut resp = Vec::new();
let read = ReadRequest {
eid: 0,
offset: Block::new_512(0),
};
inner.read(JobId(21), &[&read], &mut resp, IOV_MAX_TEST)?;
let resp = inner.read(JobId(21), &[read], IOV_MAX_TEST)?;

// We should not get back our data, because block 0 was written.
assert_ne!(
Expand Down Expand Up @@ -1748,14 +1747,13 @@ mod test {
data: data.clone(),
block_context,
};
inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?;
inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?;

let mut resp = Vec::new();
let read = ReadRequest {
eid: 0,
offset: Block::new_512(1),
};
inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?;
let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?;

// We should get back our data! Block 1 was never written.
assert_eq!(
Expand Down Expand Up @@ -1798,28 +1796,28 @@ mod test {
offset: Block::new_512(1),
..write.clone()
};
inner.write(JobId(10), &[&write1], false, IOV_MAX_TEST)?;
inner.write(JobId(10), &[write1], false, IOV_MAX_TEST)?;
assert_eq!(inner.context_slot_dirty[0], 0b00);
assert_eq!(inner.active_context[0], ContextSlot::A);
assert_eq!(inner.context_slot_dirty[1], 0b10);
assert_eq!(inner.active_context[1], ContextSlot::B);

// The context should be written to block 0, slot B
inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.context_slot_dirty[0], 0b10);
assert_eq!(inner.active_context[0], ContextSlot::B);
assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged
assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged

// The context should be written to block 0, slot A
inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.context_slot_dirty[0], 0b11);
assert_eq!(inner.active_context[0], ContextSlot::A);
assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged
assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged

// The context should be written to slot B, forcing a sync
inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(12), &[write], false, IOV_MAX_TEST)?;
assert_eq!(inner.context_slot_dirty[0], 0b10);
assert_eq!(inner.active_context[0], ContextSlot::B);
assert_eq!(inner.context_slot_dirty[1], 0b00);
Expand Down Expand Up @@ -1848,7 +1846,7 @@ mod test {
},
};
// The context should be written to slot B
inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::B);
assert_eq!(inner.context_slot_dirty[0], 0b10);

Expand All @@ -1858,17 +1856,17 @@ mod test {
assert_eq!(inner.context_slot_dirty[0], 0b00);

// The context should be written to slot A
inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::A);
assert_eq!(inner.context_slot_dirty[0], 0b01);

// The context should be written to slot B
inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::B);
assert_eq!(inner.context_slot_dirty[0], 0b11);

// The context should be written to slot A, forcing a sync
inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::A);
assert_eq!(inner.context_slot_dirty[0], 0b01);

Expand All @@ -1895,12 +1893,12 @@ mod test {
},
};
// The context should be written to slot B
inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::B);
assert_eq!(inner.context_slot_dirty[0], 0b10);

// The context should be written to slot A
inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::A);
assert_eq!(inner.context_slot_dirty[0], 0b11);

Expand All @@ -1910,17 +1908,17 @@ mod test {
assert_eq!(inner.context_slot_dirty[0], 0b00);

// The context should be written to slot B
inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::B);
assert_eq!(inner.context_slot_dirty[0], 0b10);

// The context should be written to slot A
inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::A);
assert_eq!(inner.context_slot_dirty[0], 0b11);

// The context should be written to slot B, forcing a sync
inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?;
assert_eq!(inner.active_context[0], ContextSlot::B);
assert_eq!(inner.context_slot_dirty[0], 0b10);

Expand Down Expand Up @@ -1972,14 +1970,13 @@ mod test {
data: data.clone(),
block_context,
};
inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?;
inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?;

let mut resp = Vec::new();
let read = ReadRequest {
eid: 0,
offset: Block::new_512(0),
};
inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?;
let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?;

// We should get back our data! Block 1 was never written.
assert_eq!(
Expand Down Expand Up @@ -2016,7 +2013,7 @@ mod test {
hash,
},
};
inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?;
}

for i in 0..10 {
Expand Down Expand Up @@ -2064,8 +2061,7 @@ mod test {
writes.push(write);
}
// This write has toggled every single context slot
let writes_ref: Vec<&_> = writes.iter().collect();
inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?;
for i in 0..10 {
assert_eq!(
inner.active_context[i],
Expand Down Expand Up @@ -2117,7 +2113,7 @@ mod test {
hash,
},
};
inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?;
}
// 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
// --|---|---|---|---|---|---|---|---|---
Expand Down Expand Up @@ -2170,8 +2166,7 @@ mod test {
// 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
// --|---|---|---|---|---|---|---|---|---
// A | B | A | B | A | B | B | B | B | B
let writes_ref: Vec<&_> = writes.iter().collect();
inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?;
for i in 0..10 {
assert_eq!(
inner.active_context[i],
Expand Down Expand Up @@ -2225,7 +2220,7 @@ mod test {
hash,
},
};
inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?;
}
// 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
// --|---|---|---|---|---|---|---|---|---
Expand Down Expand Up @@ -2274,9 +2269,8 @@ mod test {
};
writes.push(write);
}
let writes_ref: Vec<&_> = writes.iter().collect();
inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?;
// This write should toggled every single context slot:
// 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
// --|---|---|---|---|---|---|---|---|---
Expand Down Expand Up @@ -2334,7 +2328,7 @@ mod test {
hash,
},
};
inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?;
inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?;
}
// 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
// --|---|---|---|---|---|---|---|---|---
Expand Down Expand Up @@ -2383,9 +2377,8 @@ mod test {
};
writes.push(write);
}
let writes_ref: Vec<&_> = writes.iter().collect();
inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?;
// This write should toggled every single context slot:
// 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9
// --|---|---|---|---|---|---|---|---|---
Expand Down Expand Up @@ -2476,23 +2469,19 @@ mod test {
})
.collect();

let write_refs: Vec<_> = writes.iter().collect();

assert_eq!(inner.context_slot_dirty[0], 0b00);
inner.write(JobId(30), &write_refs, false, IOV_MAX_TEST)?;
inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?;

// The write should be split into four separate calls to
// `write_without_overlaps`, triggering one bonus fsync.
assert_eq!(inner.context_slot_dirty[0], 0b11);

// Block 0 should be 0x03 repeated.
let mut resp = Vec::new();
let read = ReadRequest {
eid: 0,
offset: Block::new_512(0),
};

inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?;
let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?;

let data = Bytes::from(vec![0x03; 512]);
let hash = integrity_hash(&[&data[..]]);
Expand Down
Loading
Loading