Skip to content

Commit bdcc633

Browse files
authored
Single-task refactoring (#1058)
This PR implements the plan from [RFD 444](https://rfd.shared.oxide.computer/rfd/444#_counter_proposal_one_big_task) of consolidating all data ownership into a single async task. It is a squashed commit, but the original commit chain is preserved in the `one-task-refactoring` tag. The details are in the RFD, but in short, state is only mutated in one place (instead of being mutated from multiple tasks). Having a single task own the `struct Upstairs` means that we don't need to think about locks or cross-task notifications. This architectural simplification lets us delete almost 3KLOC while maintaining the same functionality. The new architecture is event-driven: `Upstairs::select` returns an event, and `Upstairs::apply` applies that event. We split into two functions for ease of unit testing; many tests can drive the `Upstairs` by calling `Upstairs::apply` on a synthetic sequence of events, to put it into a particular state. Data is stored in a hierarchical set of data structures: - 1× `Upstairs` has our high-level state and guest-level info. It contains... - 1× `Downstairs`, which tracks downstairs jobs. It contains... - 3× `DownstairsClient`, which contain per-client state Unlike before, each of these data structures is in its own module, meaning its internal details are private (by default). Tests that need internal details are often moved into that specific module (e.g. from `test.rs` -> `downstairs.rs`). This isn't an absolute; in many cases, fields have to be made `pub(crate)` for ease of testing or cross-layer modification. I see 20-30% performance improvement for 1M and 4M random-write (`fio`) benchmarks, compared to `main`. This is likely because the expensive work of serialization (`FramedWrite`) has moved into a dedicated per-client IO task, leaving the main task free to keep working.
1 parent 4fe7be7 commit bdcc633

15 files changed

+19355
-22378
lines changed

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ repair-client = { path = "./repair-client" }
130130
[profile.dev]
131131
panic = 'abort'
132132

133+
[profile.release]
134+
panic = 'abort'
135+
133136
# Using the workspace-hack via this patch directive means that it only applies
134137
# while building within this workspace. If another workspace imports a crate
135138
# from here via a git dependency, it will not have the workspace-hack applied

tools/test_fail_live_repair.sh

+58-14
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ function ctrl_c() {
2727
kill "$crutest_pid"
2828
wait "$crutest_pid"
2929
fi
30+
exit 1
3031
}
3132

3233
loop_log=/tmp/test_fail_live_repair_summary.log
@@ -53,6 +54,24 @@ if pgrep -fl -U "$(id -u)" "$cds"; then
5354
exit 1
5455
fi
5556

57+
loops=20
58+
59+
usage () {
60+
echo "Usage: $0 [-l #]" >&2
61+
echo " -l loops Number of test loops to perform (default 20)" >&2
62+
}
63+
64+
while getopts 'l:' opt; do
65+
case "$opt" in
66+
l) loops=$OPTARG
67+
;;
68+
*) echo "Invalid option"
69+
usage
70+
exit 1
71+
;;
72+
esac
73+
done
74+
5675
echo "" > ${loop_log}
5776
echo "" > ${test_log}
5877
echo "" > ${dsc_test_log}
@@ -93,17 +112,18 @@ if ! "$crucible_test" fill "${args[@]}" -q -g "$gen"\
93112
--retry-activate >> "$test_log" 2>&1 ; then
94113
echo Failed on initial verify seed, check "$test_log"
95114
${dsc} cmd shutdown
115+
exit 1
96116
fi
97117
(( gen += 1 ))
98118

99-
for i in {1..20}
100-
do
119+
count=1
120+
while [[ $count -le $loops ]]; do
101121
SECONDS=0
102122
choice=$((RANDOM % 3))
103123

104-
# The state of our chosen downstairs is based on an offset
124+
# Clear the log on each loop
105125
echo "" > "$test_log"
106-
echo "New loop starts now $(date) faulting: $choice" >> "$test_log"
126+
echo "New loop starts now $(date) faulting: $choice" | tee -a "$test_log"
107127
# Start sending IO.
108128
"$crucible_test" generic "${args[@]}" --continuous \
109129
-q -g "$gen" --verify-out "$verify_file" \
@@ -113,9 +133,25 @@ do
113133
crutest_pid=$!
114134
sleep 5
115135

116-
curl -X POST http://127.0.0.1:7777/downstairs/fault/"${choice}"
136+
${dsc} cmd stop -c "$choice" >> "$dsc_test_log" 2>&1 &
137+
# Wait for our downstairs to fault
138+
echo Wait for our downstairs to fault | tee -a "$test_log"
139+
choice_state="undefined"
140+
while [[ "$choice_state" != "faulted" ]]; do
141+
sleep 3
142+
if [[ $choice -eq 0 ]]; then
143+
choice_state=$(curl -s http://127.0.0.1:7777/info | awk -F\" '{print $8}')
144+
elif [[ $choice -eq 1 ]]; then
145+
choice_state=$(curl -s http://127.0.0.1:7777/info | awk -F\" '{print $10}')
146+
else
147+
choice_state=$(curl -s http://127.0.0.1:7777/info | awk -F\" '{print $12}')
148+
fi
149+
done
150+
151+
${dsc} cmd start -c "$choice" >> "$dsc_test_log" 2>&1 &
117152

118153
# Wait for our downstairs to begin live_repair
154+
echo Wait for our downstairs to begin live_repair | tee -a "$test_log"
119155
choice_state="undefined"
120156
while [[ "$choice_state" != "live_repair" ]]; do
121157
sleep 3
@@ -128,19 +164,26 @@ do
128164
fi
129165
done
130166

131-
# Let the repair do some repairing.
132-
sleep 5
167+
# Give the live repair between 5 and 25 seconds to start repairing.
168+
rand_sleep=$((RANDOM % 20))
169+
((rand_sleep += 5))
170+
sleep $rand_sleep
171+
echo "After $rand_sleep seconds, Stop $choice again" | tee -a "$test_log"
172+
${dsc} cmd stop -c "$choice" >> "$dsc_test_log" 2>&1 &
133173

134-
# Now fault the downstairs again.
135-
curl -X POST http://127.0.0.1:7777/downstairs/fault/"${choice}"
174+
sleep 2
175+
echo "Start $choice for a second time" | tee -a "$test_log"
176+
${dsc} cmd start -c "$choice" >> "$dsc_test_log" 2>&1 &
136177

137178
# Now wait for all downstairs to be active
179+
echo Now wait for all downstairs to be active | tee -a "$test_log"
138180
all_state=$(curl -s http://127.0.0.1:7777/info | awk -F\" '{print $8","$10","$12}')
139181
while [[ "${all_state}" != "active,active,active" ]]; do
140182
sleep 5
141183
all_state=$(curl -s http://127.0.0.1:7777/info | awk -F\" '{print $8","$10","$12}')
142184
done
143185

186+
echo All downstairs active, now stop IO test and wait for it to finish | tee -a "$test_log"
144187
kill -SIGUSR1 $crutest_pid
145188
wait $crutest_pid
146189
result=$?
@@ -150,7 +193,7 @@ do
150193
else
151194
(( err += 1 ))
152195
duration=$SECONDS
153-
printf "[%03d] Error $result after %d:%02d\n" "$i" \
196+
printf "[%03d] Error $result after %d:%02d\n" "$count" \
154197
$((duration / 60)) $((duration % 60)) | tee -a ${loop_log}
155198
mv "$test_log" "$test_log".lastfail
156199
break
@@ -166,7 +209,7 @@ do
166209
(( dropshot += 1 ))
167210
else
168211
mv "$test_log" "$test_log".lastfail
169-
echo "verify failed on loop $i"
212+
echo "verify failed on loop $count"
170213
(( err += 1 ))
171214
break
172215
fi
@@ -179,12 +222,13 @@ do
179222
ave=$(( total / pass_total ))
180223
printf \
181224
"[%03d][%d] %d:%02d ds_err:%d ave:%d:%02d total:%d:%02d last_run:%d\n" \
182-
"$i" "$choice" \
225+
"$count" "$choice" \
183226
$((duration / 60)) $((duration % 60)) \
184227
"$dropshot" \
185228
$((ave / 60)) $((ave % 60)) $((total / 60)) $((total % 60)) \
186229
"$duration" | tee -a ${loop_log}
187230

231+
(( count += 1 ))
188232
done
189233

190234
# Stop dsc.
@@ -194,9 +238,9 @@ wait ${dsc_pid}
194238
echo "Final results:" | tee -a ${loop_log}
195239
printf \
196240
"[%03d] %d:%02d ave:%d:%02d total:%d:%02d errors:%d last_run_seconds:%d\n" \
197-
"$i" $((duration / 60)) $((duration % 60)) \
241+
"$count" $((duration / 60)) $((duration % 60)) \
198242
$((ave / 60)) $((ave % 60)) $((total / 60)) $((total % 60)) \
199243
"$err" $duration | tee -a ${loop_log}
200-
echo "$(date) Test ends with $err" >> "$test_log"
244+
echo "$(date) Test ends with $err" | tee -a "$test_log"
201245
exit "$err"
202246

upstairs/src/active_jobs.rs

+5-90
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,15 @@
33
use crucible_protocol::JobId;
44

55
use crate::{
6-
AckStatus, DownstairsIO, ExtentRepairIDs, IOop, ImpactedAddr,
7-
ImpactedBlocks,
6+
DownstairsIO, ExtentRepairIDs, IOop, ImpactedAddr, ImpactedBlocks,
87
};
98
use std::collections::{BTreeMap, BTreeSet};
109

11-
/// `ActiveJobs` tracks active jobs by ID
10+
/// `ActiveJobs` tracks active jobs (and associated metadata) by job ID
1211
///
1312
/// It exposes an API that roughly matches a `BTreeMap<JobId, DownstairsIO>`,
1413
/// but leaves open the possibility for further optimization.
1514
///
16-
/// Notably, there is no way to directly modify a `DownstairsIO` contained in
17-
/// `ActiveJobs`. Bulk modification can be done with `for_each`, and individual
18-
/// modification can be done with `get_mut`, which returns a
19-
/// `DownstairsIOHandle` instead of a raw `&mut DownstairsIO`. All of this
20-
/// means that we can keep extra metadata in sync, e.g. a list of all ackable
21-
/// jobs.
22-
///
2315
/// The `ActiveJobs` structure also includes a data structure ([`BlockMap`])
2416
/// which accelerates dependency tracking: it tracks the most recent blocking
2517
/// (write / flush / etc) and non-blocking (read) jobs on a per-block basis,
@@ -29,7 +21,6 @@ use std::collections::{BTreeMap, BTreeSet};
2921
#[derive(Debug, Default)]
3022
pub(crate) struct ActiveJobs {
3123
jobs: BTreeMap<JobId, DownstairsIO>,
32-
ackable: BTreeSet<JobId>,
3324
block_to_active: BlockMap,
3425
}
3526

@@ -46,10 +37,8 @@ impl ActiveJobs {
4637

4738
/// Looks up a job by ID, returning a mutable reference
4839
#[inline]
49-
pub fn get_mut(&mut self, job_id: &JobId) -> Option<DownstairsIOHandle> {
50-
self.jobs
51-
.get_mut(job_id)
52-
.map(|job| DownstairsIOHandle::new(job, &mut self.ackable))
40+
pub fn get_mut(&mut self, job_id: &JobId) -> Option<&mut DownstairsIO> {
41+
self.jobs.get_mut(job_id)
5342
}
5443

5544
/// Returns the total number of active jobs
@@ -68,8 +57,7 @@ impl ActiveJobs {
6857
#[inline]
6958
pub fn for_each<F: FnMut(&JobId, &mut DownstairsIO)>(&mut self, mut f: F) {
7059
for (job_id, job) in self.jobs.iter_mut() {
71-
let handle = DownstairsIOHandle::new(job, &mut self.ackable);
72-
f(job_id, handle.job);
60+
f(job_id, job);
7361
}
7462
}
7563

@@ -209,10 +197,6 @@ impl ActiveJobs {
209197
dep
210198
}
211199

212-
pub fn ackable_work(&self) -> BTreeSet<JobId> {
213-
self.ackable.clone()
214-
}
215-
216200
#[cfg(test)]
217201
pub fn get_extents_for(&self, job: JobId) -> ImpactedBlocks {
218202
*self.block_to_active.job_to_range.get(&job).unwrap()
@@ -230,75 +214,6 @@ impl<'a> IntoIterator for &'a ActiveJobs {
230214

231215
////////////////////////////////////////////////////////////////////////////////
232216

233-
/// Handle for a `DownstairsIO` that keeps secondary data in sync
234-
///
235-
/// Many parts of the code want to modify a `DownstairsIO` by directly poking
236-
/// its fields. This makes it hard to keep secondary data in sync, e.g.
237-
/// maintaining a separate list of all ackable IOs.
238-
pub(crate) struct DownstairsIOHandle<'a> {
239-
pub job: &'a mut DownstairsIO,
240-
initial_status: AckStatus,
241-
ackable: &'a mut BTreeSet<JobId>,
242-
}
243-
244-
impl<'a> std::fmt::Debug for DownstairsIOHandle<'a> {
245-
fn fmt(
246-
&self,
247-
f: &mut std::fmt::Formatter<'_>,
248-
) -> Result<(), std::fmt::Error> {
249-
self.job.fmt(f)
250-
}
251-
}
252-
253-
impl<'a> DownstairsIOHandle<'a> {
254-
fn new(
255-
job: &'a mut DownstairsIO,
256-
ackable: &'a mut BTreeSet<JobId>,
257-
) -> Self {
258-
let initial_status = job.ack_status;
259-
Self {
260-
job,
261-
initial_status,
262-
ackable,
263-
}
264-
}
265-
266-
pub fn job(&mut self) -> &mut DownstairsIO {
267-
self.job
268-
}
269-
}
270-
271-
impl<'a> std::ops::Drop for DownstairsIOHandle<'a> {
272-
fn drop(&mut self) {
273-
match (self.initial_status, self.job.ack_status) {
274-
(AckStatus::NotAcked, AckStatus::AckReady) => {
275-
let prev = self.ackable.insert(self.job.ds_id);
276-
assert!(prev);
277-
}
278-
(AckStatus::AckReady, AckStatus::Acked | AckStatus::NotAcked) => {
279-
let prev = self.ackable.remove(&self.job.ds_id);
280-
assert!(prev);
281-
}
282-
// None transitions
283-
(AckStatus::AckReady, AckStatus::AckReady)
284-
| (AckStatus::Acked, AckStatus::Acked)
285-
| (AckStatus::NotAcked, AckStatus::NotAcked) => (),
286-
287-
// Invalid transitions!
288-
(AckStatus::NotAcked, AckStatus::Acked)
289-
| (AckStatus::Acked, AckStatus::NotAcked)
290-
| (AckStatus::Acked, AckStatus::AckReady) => {
291-
panic!(
292-
"invalid transition: {:?} => {:?}",
293-
self.initial_status, self.job.ack_status
294-
)
295-
}
296-
}
297-
}
298-
}
299-
300-
////////////////////////////////////////////////////////////////////////////////
301-
302217
/// Acceleration data structure to quickly look up dependencies
303218
#[derive(Debug, Default)]
304219
struct BlockMap {

upstairs/src/block_req.rs

+5
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ impl BlockReq {
4242
// XXX this eats the result!
4343
let _ = self.sender.send(r);
4444
}
45+
46+
/// Consume this BlockReq and return the inner oneshot sender
47+
pub fn take_sender(self) -> oneshot::Sender<Result<(), CrucibleError>> {
48+
self.sender
49+
}
4550
}
4651

4752
/**

0 commit comments

Comments
 (0)