diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index a5347e056..a78c01e3c 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -673,10 +673,7 @@ pub fn show_work(ds: &mut Downstairs) { for upstairs_connection in active_upstairs_connections { let work = ds.work(upstairs_connection); - - let mut kvec: Vec = work.dep_wait.keys().cloned().collect(); - - if kvec.is_empty() { + if work.pending_jobs.is_empty() { info!(ds.log, "Crucible Downstairs work queue: Empty"); } else { info!(ds.log, "Crucible Downstairs work queue:"); @@ -684,10 +681,8 @@ pub fn show_work(ds: &mut Downstairs) { ds.log, "{:8} {:>7} {:>5} {}", " JOB_ID", "IO_TYPE", "STATE", "DEPS" ); - kvec.sort_unstable(); - for id in kvec.iter() { - let dsw = work.dep_wait.get(id).unwrap(); - let (dsw_type, dep_list) = match &dsw { + for job in work.pending_jobs.iter() { + let (dsw_type, dep_list) = match &job.io { IOop::Read { dependencies, .. } => ("Read", dependencies), IOop::Write { dependencies, .. } => ("Write", dependencies), IOop::Flush { dependencies, .. } => ("Flush", dependencies), @@ -713,7 +708,7 @@ pub fn show_work(ds: &mut Downstairs) { ("NoOp", dependencies) } }; - info!(ds.log, "{:8} {:>7} {:?}", id, dsw_type, dep_list,); + info!(ds.log, "{:8} {:>7} {:?}", job.id, dsw_type, dep_list); } } @@ -1474,39 +1469,26 @@ impl ActiveConnection { dss: &mut DsStatOuter, region: &mut Region, ) -> Result<()> { - /* - * Build ourselves a list of all the jobs on the work hashmap that - * are New or DepWait. - */ - let mut new_work = self.work.new_work(); - - /* - * We don't have to do jobs in order, but the dependencies are, at - * least for now, always going to be in order of job id. `new_work` is - * sorted before it is returned so this function iterates through jobs - * in order. - */ - while let Some(new_id) = new_work.pop_front() { + let mut retry = VecDeque::new(); + let mut stop_at = None; + while let Some((ds_id, work)) = self + .work + .get_next(&mut stop_at) + .or_else(|| retry.pop_front()) + { if flags.lossy && random() && random() { // Skip a job that needs to be done, moving it to the back of // the list. This exercises job dependency tracking in the face // of arbitrary reordering. - info!(self.log, "[lossy] skipping {}", new_id); - new_work.push_back(new_id); - continue; - } - - // If this job is still new, take it and go to work. The in_progress - // method will only return the work if all dependencies are met. - let Some(dw) = self.work.in_progress(new_id) else { + info!(self.log, "[lossy] skipping {}", ds_id); + retry.push_back((ds_id, work)); continue; - }; - - if let Some(failed) = self - .do_work(new_id, dw, flags, reqwest_client, dss, region) + } else if let Some((ds_id, work)) = self + .do_work(ds_id, work, flags, reqwest_client, dss, region) .await? { - new_work.push_back(failed); + // We failed, add this job to the retry queue + retry.push_back((ds_id, work)); } } Ok(()) @@ -1515,26 +1497,34 @@ impl ActiveConnection { async fn do_work_if_ready( &mut self, ds_id: JobId, - mut job: IOop, + job: IOop, flags: &DownstairsFlags, reqwest_client: &reqwest::Client, dss: &mut DsStatOuter, region: &mut Region, ) -> Result { - if self.work.check_ready(ds_id, &mut job) { + let mut p = PendingJob { + id: ds_id, + io: job, + outstanding_deps: None, + }; + if self.work.check_ready(&mut p) { + let ds_id = p.id; + let job = p.io; cdt::work__start!(|| ds_id.0); + + // Keep retrying this job until it succeeds let mut prev = self .do_work(ds_id, job, flags, reqwest_client, dss, region) .await?; - while let Some(ds_id) = prev.take() { - let job = self.work.in_progress(ds_id).unwrap(); + while let Some((ds_id, job)) = prev.take() { prev = self .do_work(ds_id, job, flags, reqwest_client, dss, region) .await?; } Ok(WorkResult::Handled) } else { - self.work.add_work(ds_id, job); + self.work.add_pending_job(p); Ok(WorkResult::Queued) } } @@ -1542,10 +1532,9 @@ impl ActiveConnection { /// Do work for the given IO job /// /// This function will either complete the work (adding it to the completed - /// list and returning `None`), fail to complete the work (re-adding it to - /// the active list, sending an `ErrorPort`, and returning the `JobId`), or - /// fail to communicate when replying to the Upstairs (which returns an - /// error). + /// list and returning `None`), fail to complete the work (returning the + /// `JobId` and `IOop`, and sending an `ErrorReport`) , or fail to communicate + /// when replying to the Upstairs (which returns an error). /// /// There's one exception: failure to perform a live-repair task will -- in /// addition to sending an `ErrorReport` to the upstairs -- **also** return @@ -1559,7 +1548,7 @@ impl ActiveConnection { reqwest_client: &reqwest::Client, dss: &mut DsStatOuter, region: &mut Region, - ) -> Result> { + ) -> Result> { let upstairs_connection = self.upstairs_connection; cdt::work__process!(|| new_id.0); @@ -1576,8 +1565,6 @@ impl ActiveConnection { })?; // If the job errored, do not consider it completed. - // Retry it. - self.work.dep_wait.insert(new_id, job); // If this is a repair job, and that repair failed, we // can do no more work on this downstairs and should @@ -1592,7 +1579,8 @@ impl ActiveConnection { if matches!(m, Message::ExtentLiveRepairAckId { .. }) { bail!("Repair has failed, exiting task"); } - Ok(Some(new_id)) + // Return the job ID and IOop, so the caller can retry + Ok(Some((new_id, job))) } else { // The job completed successfully, so update our stats dss.on_complete(&m); @@ -1627,7 +1615,7 @@ impl ActiveConnection { /// This function is nominally infallible, because any errors are encoded as /// an error field in the returned `Message`. async fn do_work_inner( - &mut self, + &self, job_id: JobId, work: &IOop, flags: &DownstairsFlags, @@ -3086,7 +3074,7 @@ impl Downstairs { self.log, "upstairs {:?} ({id:?}) removed, {} jobs left", upstairs_connection, - state.work.jobs(), + state.work.pending_jobs.len(), ); } else { info!( @@ -3173,13 +3161,13 @@ impl Downstairs { // information, as that will not be valid any longer. // // TODO: Really work through this error case - if state.work.dep_wait.keys().len() > 0 { + if !state.work.pending_jobs.is_empty() { warn!( self.log, "Crucible Downstairs promoting {:?} to active, \ discarding {} jobs", state.upstairs_connection, - state.work.dep_wait.keys().len() + state.work.pending_jobs.len() ); } @@ -3267,18 +3255,23 @@ impl DownstairsHandle { } } +#[derive(Debug)] +struct PendingJob { + id: JobId, + io: IOop, + /// Number of unmet dependencies, once it is known for this job + /// + /// This is used for logging purposes + outstanding_deps: Option, +} + /* * The structure that tracks downstairs work in progress */ #[derive(Debug)] pub struct Work { - dep_wait: HashMap, - - /// Map of jobs with outstanding (unmet) dependencies - /// - /// The value stored in this map is the number of unmet dependencies for the - /// given job. - outstanding_deps: HashMap, + /// Jobs which are new or have not met their dependencies + pending_jobs: VecDeque, /// Track completed jobs since the last flush completed: CompletedJobs, @@ -3289,8 +3282,7 @@ pub struct Work { impl Work { fn new(last_flush: Option, log: Logger) -> Self { Work { - dep_wait: HashMap::new(), - outstanding_deps: HashMap::new(), + pending_jobs: VecDeque::new(), completed: CompletedJobs::new(last_flush), log, } @@ -3300,69 +3292,63 @@ impl Work { self.completed.completed() } - fn jobs(&self) -> usize { - self.dep_wait.len() + /// Pushes a new job to the back of the queue + fn add_pending_job(&mut self, job: PendingJob) { + self.pending_jobs.push_back(job); } - /// Returns a sorted list of downstairs request IDs that are new or have - /// been waiting for other dependencies to finish. - fn new_work(&self) -> VecDeque { - let mut result: VecDeque<_> = self.dep_wait.keys().cloned().collect(); - result.make_contiguous().sort_unstable(); - - result + /// Returns a list of pending job IDs + #[cfg(test)] + fn new_work(&self) -> Vec { + self.pending_jobs.iter().map(|p| p.id).collect() } + /// Directly add work to the pending jobs list + /// + /// This is only used in test code; during normal operation, we attempt to + /// perform the work and otherwise enqueue it with [`Self::add_pending_job`] + #[cfg(test)] fn add_work(&mut self, ds_id: JobId, dsw: IOop) { - self.dep_wait.insert(ds_id, dsw); + self.add_pending_job(PendingJob { + id: ds_id, + io: dsw, + outstanding_deps: None, + }); } #[cfg(test)] fn get_job(&self, ds_id: JobId) -> &IOop { - self.dep_wait.get(&ds_id).unwrap() + &self.pending_jobs.iter().find(|p| p.id == ds_id).unwrap().io } /// Checks whether the given job is ready /// - /// Updates `self.outstanding_deps` and prints a warning message the first + /// Updates `job.outstanding_deps` and prints a warning message the first /// time a job with unmet dependencies is checked. - fn check_ready(&mut self, ds_id: JobId, work: &mut IOop) -> bool { + fn check_ready(&self, work: &mut PendingJob) -> bool { // Before we can do work for this job, we have to check the dep list if // there is one and make sure all dependencies are completed. // - // The Downstairs currently assumes that all jobs previous to the last - // flush have completed, hence this early out. - // - // Currently `work.completed` is cleared out when - // `ActiveConnection::do_ready_work` (or `complete` in mod test) is - // called with a `FlushAck`, so this early out cannot be removed unless - // that is changed too. - // // XXX Make this better/faster by removing the ones that are met, so // next lap we don't have to check again? There may be some debug value // to knowing what the dep list was, so consider that before making this // faster. let num_deps_outstanding = work + .io .deps() .iter() .filter(|&dep| !self.completed.is_complete(*dep)) .count(); - if num_deps_outstanding > 0 { - let print = if let Some(existing_outstanding_deps) = - self.outstanding_deps.get(&ds_id) - { - *existing_outstanding_deps != num_deps_outstanding - } else { - false - }; - - if print { + // If the job has outstanding dependencies, print a log if the count has + // changed and return `false` (indicating the job is not ready) + if let Some(n) = NonZeroUsize::new(num_deps_outstanding) { + if work.outstanding_deps != Some(n) { warn!( self.log, "{} job {} waiting on {} deps", - ds_id, - match &work { + work.id, + match &work.io { IOop::Write { .. } => "Write", IOop::WriteUnwritten { .. } => "WriteUnwritten", IOop::Flush { .. } => "Flush", @@ -3378,35 +3364,41 @@ impl Work { ); } - let _ = self.outstanding_deps.insert(ds_id, num_deps_outstanding); + work.outstanding_deps = Some(n); false } else { true } } - /// If the job is ready, remove it from the `dep_wait` map and return it - /// - /// Otherwise, leave the job in the map and return `None` + /// Returns the next ready job in the pending work queue. /// - /// # Panics - /// If the job is not present in the `dep_wait` map + /// Returns `None` if the pending work queue is empty or all jobs in the + /// work queue are not ready. The `stop_at` argument tracks the first + /// non-ready job and is used during iteration; once we circle back to that + /// job, we have visited all jobs in the queue and can stop iterating. #[must_use] - fn in_progress(&mut self, ds_id: JobId) -> Option { - let Some(mut work) = self.dep_wait.remove(&ds_id) else { - panic!("called in_progress for invalid job"); - }; - - if self.check_ready(ds_id, &mut work) { - // If we previously logged that this job had outstanding deps, then - // remove the entry from the map (to avoid space leaks). - let _ = self.outstanding_deps.remove(&ds_id); - Some(work) - } else { - // Return the work to the map - self.dep_wait.insert(ds_id, work); - None + fn get_next( + &mut self, + stop_at: &mut Option, + ) -> Option<(JobId, IOop)> { + while let Some(mut job) = self.pending_jobs.pop_front() { + if Some(job.id) == *stop_at { + // We have reached the first re-queued job, so we should put it + // back in the list and stop iterating + self.pending_jobs.push_front(job); + break; + } else if self.check_ready(&mut job) { + return Some((job.id, job.io)); + } else { + // Record the job ID of the first re-queued job + if stop_at.is_none() { + *stop_at = Some(job.id); + } + self.pending_jobs.push_back(job); + } } + None } } @@ -3724,16 +3716,8 @@ mod test { } fn test_push_next_jobs(work: &mut Work) -> Vec<(JobId, IOop)> { - let mut jobs = vec![]; - let new_work = work.new_work(); - - for &new_id in new_work.iter() { - if let Some(job) = work.in_progress(new_id) { - jobs.push((new_id, job)); - } - } - - jobs + let mut stop = None; + std::iter::from_fn(|| work.get_next(&mut stop)).collect() } fn test_do_work(work: &mut Work, jobs: Vec<(JobId, IOop)>) { @@ -3751,8 +3735,6 @@ mod test { let mut work = Work::new(None, csl()); add_work(&mut work, JobId(1000), vec![], false); - assert_eq!(work.new_work(), vec![JobId(1000)]); - let next_jobs = test_push_next_jobs(&mut work); assert_eq!(to_job_ids(&next_jobs), vec![JobId(1000)]);