Skip to content

Commit 2d7e499

Browse files
authored
Simplify the do_work_task loop (#1150)
- Use a `VecDeque` to remove the inner / outer loops - Clarify that the `is_lossy` conditional helps exercise reordering / dependency tracking - Use the `let Some(v) = v else { continue; };` pattern to reduce rightward indentation creep - Move the logic from `check_message_for_abort` next to the code that actually handles errors + replying
1 parent 2ee617c commit 2d7e499

File tree

1 file changed

+85
-103
lines changed

1 file changed

+85
-103
lines changed

downstairs/src/lib.rs

+85-103
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use futures::lock::{Mutex, MutexGuard};
66
use std::cmp::Ordering;
7-
use std::collections::HashMap;
7+
use std::collections::{HashMap, VecDeque};
88
use std::fmt;
99
use std::fs::File;
1010
use std::io::{Read, Write};
@@ -1001,11 +1001,11 @@ async fn do_work_task(
10011001
* Build ourselves a list of all the jobs on the work hashmap that
10021002
* are New or DepWait.
10031003
*/
1004-
let mut new_work: Vec<JobId> = {
1004+
let mut new_work: VecDeque<JobId> = {
10051005
if let Ok(new_work) =
10061006
ads.lock().await.new_work(upstairs_connection).await
10071007
{
1008-
new_work
1008+
new_work.into_iter().collect()
10091009
} else {
10101010
// This means we couldn't unblock jobs for this UUID
10111011
continue;
@@ -1018,118 +1018,100 @@ async fn do_work_task(
10181018
* sorted before it is returned so this function iterates through jobs
10191019
* in order.
10201020
*/
1021-
while !new_work.is_empty() {
1022-
let mut repeat_work = Vec::with_capacity(new_work.len());
1023-
1024-
for new_id in new_work.drain(..) {
1025-
if is_lossy && random() && random() {
1026-
// Skip a job that needs to be done. Sometimes
1027-
info!(ads.lock().await.log, "[lossy] skipping {}", new_id);
1028-
repeat_work.push(new_id);
1029-
continue;
1030-
}
1021+
while let Some(new_id) = new_work.pop_front() {
1022+
if is_lossy && random() && random() {
1023+
// Skip a job that needs to be done, moving it to the back of
1024+
// the list. This exercises job dependency tracking in the face
1025+
// of arbitrary reordering.
1026+
info!(ads.lock().await.log, "[lossy] skipping {}", new_id);
1027+
new_work.push_back(new_id);
1028+
continue;
1029+
}
10311030

1032-
/*
1033-
* If this job is still new, take it and go to work. The
1034-
* in_progress method will only return a job if all
1035-
* dependencies are met.
1036-
*/
1037-
let job_id = ads
1038-
.lock()
1039-
.await
1040-
.in_progress(upstairs_connection, new_id)
1041-
.await?;
1031+
/*
1032+
* If this job is still new, take it and go to work. The
1033+
* in_progress method will only return a job if all
1034+
* dependencies are met.
1035+
*/
1036+
let job_id = ads
1037+
.lock()
1038+
.await
1039+
.in_progress(upstairs_connection, new_id)
1040+
.await?;
10421041

1043-
if let Some(job_id) = job_id {
1044-
cdt::work__process!(|| job_id.0);
1045-
let m = ads
1046-
.lock()
1047-
.await
1048-
.do_work(upstairs_connection, job_id)
1049-
.await?;
1042+
// If the job's dependencies aren't met, then keep going
1043+
let Some(job_id) = job_id else {
1044+
continue;
1045+
};
10501046

1051-
// If this is a repair job, and that repair failed, we
1052-
// can do no more work on this downstairs and should
1053-
// force everything to come down before more work arrives.
1054-
//
1055-
// However, we can respond to the upstairs with the failed
1056-
// result, and let the upstairs take action that will
1057-
// allow it to abort the repair and continue working in
1058-
// some degraded state.
1059-
let mut abort_needed = false;
1060-
if let Some(m) = m {
1061-
abort_needed = check_message_for_abort(&m);
1062-
1063-
if let Some(error) = m.err() {
1064-
resp_tx
1065-
.send(Message::ErrorReport {
1066-
upstairs_id: upstairs_connection
1067-
.upstairs_id,
1068-
session_id: upstairs_connection.session_id,
1069-
job_id: new_id,
1070-
error: error.clone(),
1071-
})
1072-
.await?;
1073-
1074-
// If the job errored, do not consider it completed.
1075-
// Retry it.
1076-
repeat_work.push(new_id);
1077-
} else {
1078-
// The job completed successfully, so inform the
1079-
// Upstairs
1080-
1081-
ads.lock().await.complete_work_stat(
1082-
upstairs_connection,
1083-
&m,
1084-
job_id,
1085-
)?;
1086-
1087-
// Notify the upstairs before completing work, which
1088-
// consumes the message (so we'll check whether it's
1089-
// a FlushAck beforehand)
1090-
let is_flush =
1091-
matches!(m, Message::FlushAck { .. });
1092-
resp_tx.send(m).await?;
1093-
1094-
ads.lock()
1095-
.await
1096-
.complete_work_inner(
1097-
upstairs_connection,
1098-
job_id,
1099-
is_flush,
1100-
)
1101-
.await?;
1047+
cdt::work__process!(|| job_id.0);
1048+
let m = ads
1049+
.lock()
1050+
.await
1051+
.do_work(upstairs_connection, job_id)
1052+
.await?;
11021053

1103-
cdt::work__done!(|| job_id.0);
1104-
}
1105-
}
1054+
// If a different downstairs was promoted, then `do_work` returns
1055+
// `None` and we ignore the job.
1056+
let Some(m) = m else {
1057+
continue;
1058+
};
11061059

1107-
// Now, if the message requires an abort, we handle
1108-
// that now by exiting this task with error.
1109-
if abort_needed {
1110-
bail!("Repair has failed, exiting task");
1111-
}
1060+
if let Some(error) = m.err() {
1061+
resp_tx
1062+
.send(Message::ErrorReport {
1063+
upstairs_id: upstairs_connection.upstairs_id,
1064+
session_id: upstairs_connection.session_id,
1065+
job_id: new_id,
1066+
error: error.clone(),
1067+
})
1068+
.await?;
1069+
1070+
// If the job errored, do not consider it completed.
1071+
// Retry it.
1072+
new_work.push_back(new_id);
1073+
1074+
// If this is a repair job, and that repair failed, we
1075+
// can do no more work on this downstairs and should
1076+
// force everything to come down before more work arrives.
1077+
//
1078+
// We have replied to the Upstairs above, which lets the
1079+
// upstairs take action to abort the repair and continue
1080+
// working in some degraded state.
1081+
//
1082+
// If you change this, change how the Upstairs processes
1083+
// ErrorReports!
1084+
if matches!(m, Message::ExtentLiveRepairAckId { .. }) {
1085+
bail!("Repair has failed, exiting task");
11121086
}
1113-
}
1087+
} else {
1088+
// The job completed successfully, so inform the
1089+
// Upstairs
11141090

1115-
new_work = repeat_work;
1116-
}
1117-
}
1091+
ads.lock().await.complete_work_stat(
1092+
upstairs_connection,
1093+
&m,
1094+
job_id,
1095+
)?;
11181096

1119-
// None means the channel is closed
1120-
Ok(())
1121-
}
1097+
// Notify the upstairs before completing work, which
1098+
// consumes the message (so we'll check whether it's
1099+
// a FlushAck beforehand)
1100+
let is_flush = matches!(m, Message::FlushAck { .. });
1101+
resp_tx.send(m).await?;
1102+
1103+
ads.lock()
1104+
.await
1105+
.complete_work_inner(upstairs_connection, job_id, is_flush)
1106+
.await?;
11221107

1123-
// Check and see if this message is A LiveRepair, and if it has failed. If you
1124-
// change this, change how the Upstairs processes ErrorReports!
1125-
fn check_message_for_abort(m: &Message) -> bool {
1126-
if let Message::ExtentLiveRepairAckId { result, .. } = m {
1127-
if result.is_err() {
1128-
return true;
1108+
cdt::work__done!(|| job_id.0);
1109+
}
11291110
}
11301111
}
11311112

1132-
false
1113+
// None means the channel is closed
1114+
Ok(())
11331115
}
11341116

11351117
async fn proc_stream(

0 commit comments

Comments
 (0)