Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the do_work_task loop #1150

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 85 additions & 103 deletions downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use futures::lock::{Mutex, MutexGuard};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::fs::File;
use std::io::{Read, Write};
Expand Down Expand Up @@ -1001,11 +1001,11 @@ async fn do_work_task(
* Build ourselves a list of all the jobs on the work hashmap that
* are New or DepWait.
*/
let mut new_work: Vec<JobId> = {
let mut new_work: VecDeque<JobId> = {
if let Ok(new_work) =
ads.lock().await.new_work(upstairs_connection).await
{
new_work
new_work.into_iter().collect()
} else {
// This means we couldn't unblock jobs for this UUID
continue;
Expand All @@ -1018,118 +1018,100 @@ async fn do_work_task(
* sorted before it is returned so this function iterates through jobs
* in order.
*/
while !new_work.is_empty() {
let mut repeat_work = Vec::with_capacity(new_work.len());

for new_id in new_work.drain(..) {
if is_lossy && random() && random() {
// Skip a job that needs to be done. Sometimes
info!(ads.lock().await.log, "[lossy] skipping {}", new_id);
repeat_work.push(new_id);
continue;
}
while let Some(new_id) = new_work.pop_front() {
if is_lossy && random() && random() {
// Skip a job that needs to be done, moving it to the back of
// the list. This exercises job dependency tracking in the face
// of arbitrary reordering.
info!(ads.lock().await.log, "[lossy] skipping {}", new_id);
new_work.push_back(new_id);
continue;
}

/*
* If this job is still new, take it and go to work. The
* in_progress method will only return a job if all
* dependencies are met.
*/
let job_id = ads
.lock()
.await
.in_progress(upstairs_connection, new_id)
.await?;
/*
* If this job is still new, take it and go to work. The
* in_progress method will only return a job if all
* dependencies are met.
*/
let job_id = ads
.lock()
.await
.in_progress(upstairs_connection, new_id)
.await?;

if let Some(job_id) = job_id {
cdt::work__process!(|| job_id.0);
let m = ads
.lock()
.await
.do_work(upstairs_connection, job_id)
.await?;
// If the job's dependencies aren't met, then keep going
let Some(job_id) = job_id else {
continue;
};

// If this is a repair job, and that repair failed, we
// can do no more work on this downstairs and should
// force everything to come down before more work arrives.
//
// However, we can respond to the upstairs with the failed
// result, and let the upstairs take action that will
// allow it to abort the repair and continue working in
// some degraded state.
let mut abort_needed = false;
if let Some(m) = m {
abort_needed = check_message_for_abort(&m);

if let Some(error) = m.err() {
resp_tx
.send(Message::ErrorReport {
upstairs_id: upstairs_connection
.upstairs_id,
session_id: upstairs_connection.session_id,
job_id: new_id,
error: error.clone(),
})
.await?;

// If the job errored, do not consider it completed.
// Retry it.
repeat_work.push(new_id);
} else {
// The job completed successfully, so inform the
// Upstairs

ads.lock().await.complete_work_stat(
upstairs_connection,
&m,
job_id,
)?;

// Notify the upstairs before completing work, which
// consumes the message (so we'll check whether it's
// a FlushAck beforehand)
let is_flush =
matches!(m, Message::FlushAck { .. });
resp_tx.send(m).await?;

ads.lock()
.await
.complete_work_inner(
upstairs_connection,
job_id,
is_flush,
)
.await?;
cdt::work__process!(|| job_id.0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought maybe this probe was in a bad spot, as we could hit this, then
fal out of the loop if some other downstairs was promoted at line 1057,
but, if that really is the case, then the whole accounting is meaningless
as we should be shutting this task down with the other upstairs takes over.

Also, this issue was here before you refactored.

let m = ads
.lock()
.await
.do_work(upstairs_connection, job_id)
.await?;

cdt::work__done!(|| job_id.0);
}
}
// If a different downstairs was promoted, then `do_work` returns
// `None` and we ignore the job.
let Some(m) = m else {
continue;
};

// Now, if the message requires an abort, we handle
// that now by exiting this task with error.
if abort_needed {
bail!("Repair has failed, exiting task");
}
if let Some(error) = m.err() {
resp_tx
.send(Message::ErrorReport {
upstairs_id: upstairs_connection.upstairs_id,
session_id: upstairs_connection.session_id,
job_id: new_id,
error: error.clone(),
})
.await?;

// If the job errored, do not consider it completed.
// Retry it.
new_work.push_back(new_id);

// If this is a repair job, and that repair failed, we
// can do no more work on this downstairs and should
// force everything to come down before more work arrives.
//
// We have replied to the Upstairs above, which lets the
// upstairs take action to abort the repair and continue
// working in some degraded state.
//
// If you change this, change how the Upstairs processes
// ErrorReports!
if matches!(m, Message::ExtentLiveRepairAckId { .. }) {
bail!("Repair has failed, exiting task");
}
}
} else {
// The job completed successfully, so inform the
// Upstairs

new_work = repeat_work;
}
}
ads.lock().await.complete_work_stat(
upstairs_connection,
&m,
job_id,
)?;

// None means the channel is closed
Ok(())
}
// Notify the upstairs before completing work, which
// consumes the message (so we'll check whether it's
// a FlushAck beforehand)
let is_flush = matches!(m, Message::FlushAck { .. });
resp_tx.send(m).await?;

ads.lock()
.await
.complete_work_inner(upstairs_connection, job_id, is_flush)
.await?;

// Check and see if this message is A LiveRepair, and if it has failed. If you
// change this, change how the Upstairs processes ErrorReports!
fn check_message_for_abort(m: &Message) -> bool {
if let Message::ExtentLiveRepairAckId { result, .. } = m {
if result.is_err() {
return true;
cdt::work__done!(|| job_id.0);
}
}
}

false
// None means the channel is closed
Ok(())
}

async fn proc_stream(
Expand Down
Loading