From 6fd3ebd4d355589592928e14e6e60f01fab60ede Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Mon, 5 Feb 2024 16:54:58 -0500 Subject: [PATCH] Use fewer borrows in ExtentInner API --- downstairs/src/extent.rs | 18 +++--- downstairs/src/extent_inner_raw.rs | 93 ++++++++++++--------------- downstairs/src/extent_inner_sqlite.rs | 31 ++++----- downstairs/src/region.rs | 22 +++---- 4 files changed, 72 insertions(+), 92 deletions(-) diff --git a/downstairs/src/extent.rs b/downstairs/src/extent.rs index 345751233..a327cc8af 100644 --- a/downstairs/src/extent.rs +++ b/downstairs/src/extent.rs @@ -45,15 +45,14 @@ pub(crate) trait ExtentInner: Send + Debug { fn read( &mut self, job_id: JobId, - requests: &[&crucible_protocol::ReadRequest], - responses: &mut Vec, + requests: &[crucible_protocol::ReadRequest], iov_max: usize, - ) -> Result<(), CrucibleError>; + ) -> Result, CrucibleError>; fn write( &mut self, job_id: JobId, - writes: &[&crucible_protocol::Write], + writes: &[crucible_protocol::Write], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError>; @@ -519,29 +518,28 @@ impl Extent { pub async fn read( &self, job_id: JobId, - requests: &[&crucible_protocol::ReadRequest], - responses: &mut Vec, - ) -> Result<(), CrucibleError> { + requests: &[crucible_protocol::ReadRequest], + ) -> Result, CrucibleError> { cdt::extent__read__start!(|| { (job_id.0, self.number, requests.len() as u64) }); let mut inner = self.inner.lock().await; - inner.read(job_id, requests, responses, self.iov_max)?; + let responses = inner.read(job_id, requests, self.iov_max)?; cdt::extent__read__done!(|| { (job_id.0, self.number, requests.len() as u64) }); - Ok(()) + Ok(responses) } #[instrument] pub async fn write( &self, job_id: JobId, - writes: &[&crucible_protocol::Write], + writes: &[crucible_protocol::Write], only_write_unwritten: bool, ) -> Result<(), CrucibleError> { if self.read_only { diff --git a/downstairs/src/extent_inner_raw.rs b/downstairs/src/extent_inner_raw.rs index 7f00089ad..ecf17a0a4 100644 --- a/downstairs/src/extent_inner_raw.rs +++ b/downstairs/src/extent_inner_raw.rs @@ -151,7 +151,7 @@ impl ExtentInner for RawInner { fn write( &mut self, job_id: JobId, - writes: &[&crucible_protocol::Write], + writes: &[crucible_protocol::Write], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError> { @@ -186,10 +186,9 @@ impl ExtentInner for RawInner { fn read( &mut self, job_id: JobId, - requests: &[&crucible_protocol::ReadRequest], - responses: &mut Vec, + requests: &[crucible_protocol::ReadRequest], iov_max: usize, - ) -> Result<(), CrucibleError> { + ) -> Result, CrucibleError> { // This code batches up operations for contiguous regions of // ReadRequests, so we can perform larger read syscalls queries. This // significantly improves read throughput. @@ -199,8 +198,9 @@ impl ExtentInner for RawInner { // request. let mut req_run_start = 0; let block_size = self.extent_size.block_size_in_bytes(); + let mut responses = Vec::with_capacity(requests.len()); while req_run_start < requests.len() { - let first_req = requests[req_run_start]; + let first_req = &requests[req_run_start]; // Starting from the first request in the potential run, we scan // forward until we find a request with a block that isn't @@ -300,7 +300,7 @@ impl ExtentInner for RawInner { req_run_start += n_contiguous_requests; } - Ok(()) + Ok(responses) } fn flush( @@ -831,7 +831,7 @@ impl RawInner { fn write_inner( &self, - writes: &[&crucible_protocol::Write], + writes: &[crucible_protocol::Write], writes_to_skip: &HashSet, iov_max: usize, ) -> Result<(), CrucibleError> { @@ -1001,7 +1001,7 @@ impl RawInner { fn write_without_overlaps( &mut self, job_id: JobId, - writes: &[&crucible_protocol::Write], + writes: &[crucible_protocol::Write], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError> { @@ -1056,7 +1056,7 @@ impl RawInner { }); let mut write_run_start = 0; while write_run_start < writes.len() { - let first_write = writes[write_run_start]; + let first_write = &writes[write_run_start]; // Starting from the first write in the potential run, we scan // forward until we find a write with a block that isn't @@ -1694,7 +1694,7 @@ mod test { hash, }, }; - inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(10), &[write], false, IOV_MAX_TEST)?; // The context should be in place, though we haven't flushed yet @@ -1713,14 +1713,13 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(20), &[&write], true, IOV_MAX_TEST)?; + inner.write(JobId(20), &[write], true, IOV_MAX_TEST)?; - let mut resp = Vec::new(); let read = ReadRequest { eid: 0, offset: Block::new_512(0), }; - inner.read(JobId(21), &[&read], &mut resp, IOV_MAX_TEST)?; + let resp = inner.read(JobId(21), &[read], IOV_MAX_TEST)?; // We should not get back our data, because block 0 was written. assert_ne!( @@ -1748,14 +1747,13 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; - let mut resp = Vec::new(); let read = ReadRequest { eid: 0, offset: Block::new_512(1), }; - inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?; // We should get back our data! Block 1 was never written. assert_eq!( @@ -1798,28 +1796,28 @@ mod test { offset: Block::new_512(1), ..write.clone() }; - inner.write(JobId(10), &[&write1], false, IOV_MAX_TEST)?; + inner.write(JobId(10), &[write1], false, IOV_MAX_TEST)?; assert_eq!(inner.context_slot_dirty[0], 0b00); assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[1], 0b10); assert_eq!(inner.active_context[1], ContextSlot::B); // The context should be written to block 0, slot B - inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.context_slot_dirty[0], 0b10); assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged // The context should be written to block 0, slot A - inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.context_slot_dirty[0], 0b11); assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[1], 0b10); // unchanged assert_eq!(inner.active_context[1], ContextSlot::B); // unchanged // The context should be written to slot B, forcing a sync - inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(12), &[write], false, IOV_MAX_TEST)?; assert_eq!(inner.context_slot_dirty[0], 0b10); assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[1], 0b00); @@ -1848,7 +1846,7 @@ mod test { }, }; // The context should be written to slot B - inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); @@ -1858,17 +1856,17 @@ mod test { assert_eq!(inner.context_slot_dirty[0], 0b00); // The context should be written to slot A - inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b01); // The context should be written to slot B - inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b11); // The context should be written to slot A, forcing a sync - inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b01); @@ -1895,12 +1893,12 @@ mod test { }, }; // The context should be written to slot B - inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(10), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); // The context should be written to slot A - inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b11); @@ -1910,17 +1908,17 @@ mod test { assert_eq!(inner.context_slot_dirty[0], 0b00); // The context should be written to slot B - inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); // The context should be written to slot A - inner.write(JobId(12), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(12), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::A); assert_eq!(inner.context_slot_dirty[0], 0b11); // The context should be written to slot B, forcing a sync - inner.write(JobId(11), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(11), &[write.clone()], false, IOV_MAX_TEST)?; assert_eq!(inner.active_context[0], ContextSlot::B); assert_eq!(inner.context_slot_dirty[0], 0b10); @@ -1972,14 +1970,13 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; - let mut resp = Vec::new(); let read = ReadRequest { eid: 0, offset: Block::new_512(0), }; - inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?; // We should get back our data! Block 1 was never written. assert_eq!( @@ -2016,7 +2013,7 @@ mod test { hash, }, }; - inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; } for i in 0..10 { @@ -2064,8 +2061,7 @@ mod test { writes.push(write); } // This write has toggled every single context slot - let writes_ref: Vec<&_> = writes.iter().collect(); - inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; for i in 0..10 { assert_eq!( inner.active_context[i], @@ -2117,7 +2113,7 @@ mod test { hash, }, }; - inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; } // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2170,8 +2166,7 @@ mod test { // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- // A | B | A | B | A | B | B | B | B | B - let writes_ref: Vec<&_> = writes.iter().collect(); - inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; for i in 0..10 { assert_eq!( inner.active_context[i], @@ -2225,7 +2220,7 @@ mod test { hash, }, }; - inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; } // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2274,9 +2269,8 @@ mod test { }; writes.push(write); } - let writes_ref: Vec<&_> = writes.iter().collect(); - inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; - inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; // This write should toggled every single context slot: // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2334,7 +2328,7 @@ mod test { hash, }, }; - inner.write(JobId(30), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], false, IOV_MAX_TEST)?; } // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2383,9 +2377,8 @@ mod test { }; writes.push(write); } - let writes_ref: Vec<&_> = writes.iter().collect(); - inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; - inner.write(JobId(30), &writes_ref, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; // This write should toggled every single context slot: // 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 // --|---|---|---|---|---|---|---|---|--- @@ -2476,23 +2469,19 @@ mod test { }) .collect(); - let write_refs: Vec<_> = writes.iter().collect(); - assert_eq!(inner.context_slot_dirty[0], 0b00); - inner.write(JobId(30), &write_refs, false, IOV_MAX_TEST)?; + inner.write(JobId(30), &writes, false, IOV_MAX_TEST)?; // The write should be split into four separate calls to // `write_without_overlaps`, triggering one bonus fsync. assert_eq!(inner.context_slot_dirty[0], 0b11); // Block 0 should be 0x03 repeated. - let mut resp = Vec::new(); let read = ReadRequest { eid: 0, offset: Block::new_512(0), }; - - inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?; let data = Bytes::from(vec![0x03; 512]); let hash = integrity_hash(&[&data[..]]); diff --git a/downstairs/src/extent_inner_sqlite.rs b/downstairs/src/extent_inner_sqlite.rs index 34056c7b6..d58a305a4 100644 --- a/downstairs/src/extent_inner_sqlite.rs +++ b/downstairs/src/extent_inner_sqlite.rs @@ -159,10 +159,10 @@ impl ExtentInner for SqliteInner { fn read( &mut self, job_id: JobId, - requests: &[&crucible_protocol::ReadRequest], - responses: &mut Vec, + requests: &[crucible_protocol::ReadRequest], iov_max: usize, - ) -> Result<(), CrucibleError> { + ) -> Result, CrucibleError> { + let mut responses = Vec::with_capacity(requests.len()); // This code batches up operations for contiguous regions of // ReadRequests, so we can perform larger read syscalls and sqlite // queries. This significantly improves read throughput. @@ -172,7 +172,7 @@ impl ExtentInner for SqliteInner { // request. let mut req_run_start = 0; while req_run_start < requests.len() { - let first_req = requests[req_run_start]; + let first_req = &requests[req_run_start]; // Starting from the first request in the potential run, we scan // forward until we find a request with a block that isn't @@ -255,13 +255,13 @@ impl ExtentInner for SqliteInner { req_run_start += n_contiguous_requests; } - Ok(()) + Ok(responses) } fn write( &mut self, job_id: JobId, - writes: &[&crucible_protocol::Write], + writes: &[crucible_protocol::Write], only_write_unwritten: bool, iov_max: usize, ) -> Result<(), CrucibleError> { @@ -321,7 +321,7 @@ impl ExtentInner for SqliteInner { }); let mut write_run_start = 0; while write_run_start < writes.len() { - let first_write = writes[write_run_start]; + let first_write = &writes[write_run_start]; // Starting from the first write in the potential run, we scan // forward until we find a write with a block that isn't @@ -1520,7 +1520,7 @@ mod test { hash, }, }; - inner.write(JobId(10), &[&write], false, IOV_MAX_TEST)?; + inner.write(JobId(10), &[write], false, IOV_MAX_TEST)?; // We haven't flushed, but this should leave our context in place. inner.fully_rehash_and_clean_all_stale_contexts(false)?; @@ -1540,14 +1540,13 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(20), &[&write], true, IOV_MAX_TEST)?; + inner.write(JobId(20), &[write], true, IOV_MAX_TEST)?; - let mut resp = Vec::new(); let read = ReadRequest { eid: 0, offset: Block::new_512(0), }; - inner.read(JobId(21), &[&read], &mut resp, IOV_MAX_TEST)?; + let resp = inner.read(JobId(21), &[read], IOV_MAX_TEST)?; // We should not get back our data, because block 0 was written. assert_ne!( @@ -1575,14 +1574,13 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; - let mut resp = Vec::new(); let read = ReadRequest { eid: 0, offset: Block::new_512(1), }; - inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?; // We should get back our data! Block 1 was never written. assert_eq!( @@ -1642,14 +1640,13 @@ mod test { data: data.clone(), block_context, }; - inner.write(JobId(30), &[&write], true, IOV_MAX_TEST)?; + inner.write(JobId(30), &[write], true, IOV_MAX_TEST)?; - let mut resp = Vec::new(); let read = ReadRequest { eid: 0, offset: Block::new_512(0), }; - inner.read(JobId(31), &[&read], &mut resp, IOV_MAX_TEST)?; + let resp = inner.read(JobId(31), &[read], IOV_MAX_TEST)?; // We should get back our data! Block 1 was never written. assert_eq!( diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index b2acbbb8d..76105ca51 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -718,13 +718,13 @@ impl Region { * Batch writes so they can all be sent to the appropriate extent * together. */ - let mut batched_writes: HashMap> = + let mut batched_writes: HashMap> = HashMap::new(); for write in writes { let extent_vec = batched_writes.entry(write.eid as usize).or_default(); - extent_vec.push(write); + extent_vec.push(write.clone()); } if only_write_unwritten { @@ -769,36 +769,32 @@ impl Region { * use a hashmap in the same way that batching writes can. */ let mut eid: Option = None; - let mut batched_reads: Vec<&crucible_protocol::ReadRequest> = - Vec::with_capacity(requests.len()); + let mut batched_reads = Vec::with_capacity(requests.len()); cdt::os__read__start!(|| job_id.0); for request in requests { if let Some(_eid) = eid { if request.eid == _eid { - batched_reads.push(request); + batched_reads.push(request.clone()); } else { let extent = self.get_opened_extent(_eid as usize).await; - extent - .read(job_id, &batched_reads[..], &mut responses) - .await?; + responses + .extend(extent.read(job_id, &batched_reads[..]).await?); eid = Some(request.eid); batched_reads.clear(); - batched_reads.push(request); + batched_reads.push(request.clone()); } } else { eid = Some(request.eid); batched_reads.clear(); - batched_reads.push(request); + batched_reads.push(request.clone()); } } if let Some(_eid) = eid { let extent = self.get_opened_extent(_eid as usize).await; - extent - .read(job_id, &batched_reads[..], &mut responses) - .await?; + responses.extend(extent.read(job_id, &batched_reads[..]).await?); } cdt::os__read__done!(|| job_id.0);