diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 50dd3de19..038d124d2 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -4,7 +4,7 @@ use futures::lock::{Mutex, MutexGuard}; use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::fs::File; use std::io::{Read, Write}; @@ -1001,11 +1001,11 @@ async fn do_work_task( * Build ourselves a list of all the jobs on the work hashmap that * are New or DepWait. */ - let mut new_work: Vec = { + let mut new_work: VecDeque = { if let Ok(new_work) = ads.lock().await.new_work(upstairs_connection).await { - new_work + new_work.into_iter().collect() } else { // This means we couldn't unblock jobs for this UUID continue; @@ -1018,118 +1018,100 @@ async fn do_work_task( * sorted before it is returned so this function iterates through jobs * in order. */ - while !new_work.is_empty() { - let mut repeat_work = Vec::with_capacity(new_work.len()); - - for new_id in new_work.drain(..) { - if is_lossy && random() && random() { - // Skip a job that needs to be done. Sometimes - info!(ads.lock().await.log, "[lossy] skipping {}", new_id); - repeat_work.push(new_id); - continue; - } + while let Some(new_id) = new_work.pop_front() { + if is_lossy && random() && random() { + // Skip a job that needs to be done, moving it to the back of + // the list. This exercises job dependency tracking in the face + // of arbitrary reordering. + info!(ads.lock().await.log, "[lossy] skipping {}", new_id); + new_work.push_back(new_id); + continue; + } - /* - * If this job is still new, take it and go to work. The - * in_progress method will only return a job if all - * dependencies are met. - */ - let job_id = ads - .lock() - .await - .in_progress(upstairs_connection, new_id) - .await?; + /* + * If this job is still new, take it and go to work. The + * in_progress method will only return a job if all + * dependencies are met. + */ + let job_id = ads + .lock() + .await + .in_progress(upstairs_connection, new_id) + .await?; - if let Some(job_id) = job_id { - cdt::work__process!(|| job_id.0); - let m = ads - .lock() - .await - .do_work(upstairs_connection, job_id) - .await?; + // If the job's dependencies aren't met, then keep going + let Some(job_id) = job_id else { + continue; + }; - // If this is a repair job, and that repair failed, we - // can do no more work on this downstairs and should - // force everything to come down before more work arrives. - // - // However, we can respond to the upstairs with the failed - // result, and let the upstairs take action that will - // allow it to abort the repair and continue working in - // some degraded state. - let mut abort_needed = false; - if let Some(m) = m { - abort_needed = check_message_for_abort(&m); - - if let Some(error) = m.err() { - resp_tx - .send(Message::ErrorReport { - upstairs_id: upstairs_connection - .upstairs_id, - session_id: upstairs_connection.session_id, - job_id: new_id, - error: error.clone(), - }) - .await?; - - // If the job errored, do not consider it completed. - // Retry it. - repeat_work.push(new_id); - } else { - // The job completed successfully, so inform the - // Upstairs - - ads.lock().await.complete_work_stat( - upstairs_connection, - &m, - job_id, - )?; - - // Notify the upstairs before completing work, which - // consumes the message (so we'll check whether it's - // a FlushAck beforehand) - let is_flush = - matches!(m, Message::FlushAck { .. }); - resp_tx.send(m).await?; - - ads.lock() - .await - .complete_work_inner( - upstairs_connection, - job_id, - is_flush, - ) - .await?; + cdt::work__process!(|| job_id.0); + let m = ads + .lock() + .await + .do_work(upstairs_connection, job_id) + .await?; - cdt::work__done!(|| job_id.0); - } - } + // If a different downstairs was promoted, then `do_work` returns + // `None` and we ignore the job. + let Some(m) = m else { + continue; + }; - // Now, if the message requires an abort, we handle - // that now by exiting this task with error. - if abort_needed { - bail!("Repair has failed, exiting task"); - } + if let Some(error) = m.err() { + resp_tx + .send(Message::ErrorReport { + upstairs_id: upstairs_connection.upstairs_id, + session_id: upstairs_connection.session_id, + job_id: new_id, + error: error.clone(), + }) + .await?; + + // If the job errored, do not consider it completed. + // Retry it. + new_work.push_back(new_id); + + // If this is a repair job, and that repair failed, we + // can do no more work on this downstairs and should + // force everything to come down before more work arrives. + // + // We have replied to the Upstairs above, which lets the + // upstairs take action to abort the repair and continue + // working in some degraded state. + // + // If you change this, change how the Upstairs processes + // ErrorReports! + if matches!(m, Message::ExtentLiveRepairAckId { .. }) { + bail!("Repair has failed, exiting task"); } - } + } else { + // The job completed successfully, so inform the + // Upstairs - new_work = repeat_work; - } - } + ads.lock().await.complete_work_stat( + upstairs_connection, + &m, + job_id, + )?; - // None means the channel is closed - Ok(()) -} + // Notify the upstairs before completing work, which + // consumes the message (so we'll check whether it's + // a FlushAck beforehand) + let is_flush = matches!(m, Message::FlushAck { .. }); + resp_tx.send(m).await?; + + ads.lock() + .await + .complete_work_inner(upstairs_connection, job_id, is_flush) + .await?; -// Check and see if this message is A LiveRepair, and if it has failed. If you -// change this, change how the Upstairs processes ErrorReports! -fn check_message_for_abort(m: &Message) -> bool { - if let Message::ExtentLiveRepairAckId { result, .. } = m { - if result.is_err() { - return true; + cdt::work__done!(|| job_id.0); + } } } - false + // None means the channel is closed + Ok(()) } async fn proc_stream(