From 63ac39f5c72625a0b83eb1ffe665ee5bd5d2b966 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Thu, 15 Aug 2024 21:21:55 +0000 Subject: [PATCH 1/4] Add some worker tasks to the agent. This enables crucible agent to have more than one (currently 5) outstanding region creation operations while still handling creation and deletion of snapshots (SMF services) and deletion of regions. Updated agent-antagonist test to print a little less in some places and a little more in other. agent-antagonist now has a new test that just tests downstairs cloning operations in a loop. --- agent-antagonist/src/main.rs | 189 ++++++++++++++++++++++-- agent/src/datafile.rs | 207 +++++++++++++++++++++----- agent/src/main.rs | 276 +++++++++++++++++++++-------------- agent/src/model.rs | 1 + agent/src/server.rs | 15 +- openapi/crucible-agent.json | 43 ++++++ 6 files changed, 564 insertions(+), 167 deletions(-) diff --git a/agent-antagonist/src/main.rs b/agent-antagonist/src/main.rs index 65340bde6..c30fa20b8 100644 --- a/agent-antagonist/src/main.rs +++ b/agent-antagonist/src/main.rs @@ -7,7 +7,7 @@ use rand::random; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use slog::Logger; -use slog::{info, warn}; +use slog::{debug, info, warn}; use std::net::SocketAddr; use std::process::Command; use std::sync::{ @@ -27,8 +27,11 @@ use crucible_agent_client::{ about = "Stress tester for the crucible agent" )] enum Args { - /// Run a number of antagonist loop tasks that will do the following: + /// Run a number of antagonist loop tasks. + /// There are two options for what the test will do. + /// All tests operate on a 1GB region. /// + /// The default test loop will do this loop: /// - Create a 1GB region /// /// - Randomly: @@ -40,6 +43,14 @@ enum Args { /// /// - Delete the region /// + /// The clone stress test will do this: + /// - Create a 1GB region + /// - Create a snapshot of that region. + /// - Loop on: + /// - Clone the snapshot to a new region. + /// - Delete the region. + /// + /// /// Additionally, one task is spawned that will: /// /// - Get a list of regions @@ -51,6 +62,10 @@ enum Args { #[clap(short, long)] agent: Option, + /// Stress test the downstairs clone of a snapshot operation. + #[clap(long, action)] + clone_stress: bool, + /// Dataset for the crucible agent - leave blank to autodetect if in the /// crucible zone #[clap(short, long)] @@ -69,7 +84,7 @@ fn command(log: &Logger, bin: &'static str, args: &[&str]) -> Result { info!(log, "{} {:?} took {:?}", bin, args, elapsed); if !cmd.status.success() { - bail!("zfs list failed!"); + bail!("command: {} {:?} failed {:?}", bin, args, cmd); } Ok(String::from_utf8(cmd.stdout)?.trim_end().to_string()) @@ -174,6 +189,95 @@ async fn main_thread( Ok(()) } +async fn main_clone_thread( + log: Logger, + agent: SocketAddr, + dataset: String, + stop_flag: Arc, +) -> Result<()> { + // Create a 1 GB region + let region_id = Uuid::new_v4(); + + let region_request = CreateRegion { + block_size: 512, + extent_count: 16, + extent_size: 131072, + id: RegionId(region_id.to_string()), + encrypted: true, + cert_pem: None, + key_pem: None, + root_pem: None, + source: None, + }; + + if let Err(e) = create_a_region(agent, &log, region_request.clone()).await { + bail!("Region create {region_id} failed: {e}"); + } + + let snapshot_id = Uuid::new_v4(); + info!(log, "Create snapshot {snapshot_id}"); + + if let Err(e) = + create_a_snapshot(agent, &log, &dataset, region_id, snapshot_id).await + { + bail!("Snapshot create returned {e}"); + } + + let mut count = 1; + loop { + if stop_flag.load(Ordering::SeqCst) { + break; + } + let clone_region_id = Uuid::new_v4(); + info!( + log, + "From {region_id}--{snapshot_id} clone:{clone_region_id} at \ + count:{count}---" + ); + if let Err(e) = clone_a_snapshot( + agent, + &log, + region_id, + region_request.clone(), + snapshot_id, + clone_region_id, + ) + .await + { + bail!("Snapshot clone returned {e}"); + } + + info!(log, "Delete clone:{clone_region_id} at count:{count}"); + + if let Err(e) = delete_a_region(agent, &log, clone_region_id).await { + bail!("Region clone delete {clone_region_id} failed: {e}"); + } + count += 1; + info!( + log, + "Completed {:5} clones from {region_id}--{snapshot_id}", count + ); + + // If we don't add a little disturbance, all the threads end up + // cloning at the same time. This little variation here will ensure + // that the tasks are not always synced up to each other. + if random() && random() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + + if let Err(e) = delete_a_snapshot(agent, &log, region_id, snapshot_id).await + { + bail!("Snapshot delete returned {e}"); + } + + // Delete region + if let Err(e) = delete_a_region(agent, &log, region_id).await { + bail!("Region delete {region_id} failed: {e}"); + } + Ok(()) +} + // Create a region. // Loop till it is ready. async fn create_a_region( @@ -181,12 +285,13 @@ async fn create_a_region( log: &Logger, region_request: CreateRegion, ) -> Result<()> { + info!(log, "creating region {:?}", region_request.id); + let mut retry = 1; loop { - info!(log, "creating region {:?}", region_request.id); let client = get_client(&agent); let region = match client.region_create(®ion_request).await { Ok(region) => { - info!(log, "creating region {:?} ok", region_request.id,); + debug!(log, "creating region {:?} ok", region_request.id,); region } @@ -203,9 +308,10 @@ async fn create_a_region( RegionState::Requested => { info!( log, - "waiting for region {:?}: state {:?}", + "waiting for region {:?}: state {:?} try:{}", region_request.id, region.state, + retry, ); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -226,6 +332,7 @@ async fn create_a_region( ); } } + retry += 1; } Ok(()) } @@ -567,6 +674,44 @@ async fn clone_a_snapshot( "Use {:?} for source clone {:?}", source_addr, region_request ); + // We just created a region, then took a snapshot. Next we will try to + // clone that snapshot. It's possible we arrive here before the downstairs + // (that we are cloning from) has come all the way online. We loop a few + // times and hit a known endpoint on the expected downstairs repair port to + // verify that things have come online before trying to clone. This avoids + // us trying to clone too soon. + let url = format!("http://{}/region-info", source_addr).to_string(); + + let mut retry = 0; + loop { + let res = reqwest::get(url.clone()).await; + match res { + Ok(resp) => { + if resp.status().is_success() { + info!(log, "http to clone {} was successful.", url); + break; + } else { + warn!( + log, + "Request {retry} to {} failed with status: {}", + url, + resp.status() + ); + } + } + Err(e) => { + warn!(log, "Request {retry} to {} failed: {}", url, e); + } + } + retry += 1; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if retry > 20 { + bail!("Failed check to clone endpoint {}", url); + } else { + warn!(log, "http to clone {} failed, try:{}", url, retry); + } + } + if let Err(e) = create_a_region(agent, log, region_request.clone()).await { bail!("Region clone create failed, returned {e}"); } @@ -582,14 +727,15 @@ async fn delete_a_region( log: &Logger, region_id: Uuid, ) -> Result<()> { + info!(log, "tombstoning region {:?}", region_id); + let mut retry = 1; loop { - info!(log, "tombstoning region {:?}", region_id); let client = get_client(&agent); let r = client.region_delete(&RegionId(region_id.to_string())).await; drop(client); match r { Ok(_) => { - info!(log, "tombstoning region {:?} ok", region_id); + debug!(log, "tombstoning region {:?} ok", region_id); } Err(e) => { @@ -602,7 +748,7 @@ async fn delete_a_region( drop(client); let region = match r { Ok(region) => { - info!(log, "get region {:?} ok", region_id); + debug!(log, "get region {:?} ok", region_id); region } @@ -615,9 +761,10 @@ async fn delete_a_region( RegionState::Tombstoned => { info!( log, - "waiting for region {:?}: state {:?}", + "waiting for region {:?}: state {:?} try:{}", region_id, region.state, + retry, ); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -635,6 +782,7 @@ async fn delete_a_region( ); } } + retry += 1; } Ok(()) } @@ -724,6 +872,7 @@ async fn main() -> Result<()> { match args { Args::Run { agent, + clone_stress, dataset, tasks, } => { @@ -788,14 +937,28 @@ async fn main() -> Result<()> { let dataset = dataset.clone(); let stop_flag_clone = stop_flag.clone(); - tokio::spawn(async move { - main_thread(log, agent, dataset, stop_flag_clone).await - }) + if clone_stress { + tokio::spawn(async move { + main_clone_thread( + log, + agent, + dataset, + stop_flag_clone, + ) + .await + }) + } else { + tokio::spawn(async move { + main_thread(log, agent, dataset, stop_flag_clone) + .await + }) + } }) .collect(); // Add another task that grabs all regions, and queries all // snapshots for those regions + let stop_flag_clone = stop_flag.clone(); let qlog = log.new(slog::o!("query" => 0)); jhs.push(tokio::spawn(async move { diff --git a/agent/src/datafile.rs b/agent/src/datafile.rs index 91b2943a8..fc2812e9d 100644 --- a/agent/src/datafile.rs +++ b/agent/src/datafile.rs @@ -2,18 +2,24 @@ use super::model::*; use anyhow::{anyhow, bail, Result}; +use chrono::{DateTime, Utc}; use crucible_common::write_json; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use slog::{crit, error, info, Logger}; -use std::collections::BTreeMap; +use slog::{crit, error, info, warn, Logger}; +use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; -use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +use std::sync::{mpsc, Arc, Condvar, Mutex}; +use std::thread::JoinHandle; use crate::snapshot_interface::SnapshotInterface; use crate::ZFSDataset; +/// Maximum parallel region creation we allow. +const MAX_REGION_WORK: usize = 5; + pub struct DataFile { log: Logger, base_path: PathBuf, @@ -22,8 +28,28 @@ pub struct DataFile { port_min: u16, port_max: u16, bell: Condvar, - inner: Mutex, + outer: Mutex, snapshot_interface: Arc, + // When any task is updating SMF, it should obtain this lock first. + pub smf_lock: Mutex, +} + +/// Describing an active region create job the agent is doing. +struct RegionJob { + /// When this job was requested. + request_time: DateTime, + /// The join_handle for the spawned worker thread + join_handle: JoinHandle>, + /// When the thread has finished its work, a message will arrive here. + done_rx: mpsc::Receiver, +} + +// This struct covers both the inner regions and snapshots as well as +// the work queue for the agent. We put both here so we can protect +// them in the same mutex. +struct Outer { + inner: Inner, + work_queue: HashMap, } #[derive(Serialize, Deserialize, Default)] @@ -33,6 +59,12 @@ struct Inner { running_snapshots: BTreeMap>, } +#[derive(Serialize, Deserialize, JsonSchema)] +pub struct JobInfo { + request_time: DateTime, + region_id: RegionId, +} + impl DataFile { pub fn new( log: Logger, @@ -57,6 +89,9 @@ impl DataFile { } }; + let work_queue = HashMap::new(); + let outer = Outer { inner, work_queue }; + Ok(DataFile { log, base_path: base_path.to_path_buf(), @@ -65,8 +100,9 @@ impl DataFile { port_min, port_max, bell: Condvar::new(), - inner: Mutex::new(inner), + outer: Mutex::new(outer), snapshot_interface, + smf_lock: Mutex::new(false), }) } @@ -75,31 +111,64 @@ impl DataFile { } pub fn regions(&self) -> Vec { - self.inner - .lock() - .unwrap() - .regions - .values() - .cloned() - .collect() + let outer = self.outer.lock().unwrap(); + outer.inner.regions.values().cloned().collect() } pub fn running_snapshots( &self, ) -> BTreeMap> { - self.inner.lock().unwrap().running_snapshots.clone() + self.outer.lock().unwrap().inner.running_snapshots.clone() } pub fn get(&self, id: &RegionId) -> Option { - self.inner.lock().unwrap().regions.get(id).cloned() + self.outer.lock().unwrap().inner.regions.get(id).cloned() + } + + // Add the details about a spawned work task to the work queue. + pub fn add_work( + &self, + id: RegionId, + join_handle: JoinHandle>, + request_time: DateTime, + done_rx: mpsc::Receiver, + ) { + let work_queue = &mut self.outer.lock().unwrap().work_queue; + let region_job = RegionJob { + request_time, + join_handle, + done_rx, + }; + work_queue.insert(id, region_job); + } + + // Return a Vec of JobInfo about all jobs on the work queue. + pub fn get_work_queue(&self) -> Vec { + let work_queue = &mut self.outer.lock().unwrap().work_queue; + let mut regions = Vec::new(); + for (k, v) in work_queue.iter() { + let job_info = JobInfo { + request_time: v.request_time, + region_id: k.clone(), + }; + regions.push(job_info); + } + regions + } + + // When a piece of work has completed, it should call this to signal to + // the main thread that work has completed. + pub fn work_done(&self, done_tx: mpsc::Sender) { + let _ = done_tx.send(true); + self.bell.notify_all(); } /** * Store the database into the JSON file. */ - fn store(&self, inner: MutexGuard) { + fn store(&self, inner: &Inner) { loop { - match write_json(&self.conf_path, &*inner, true) { + match write_json(&self.conf_path, inner, true) { Ok(()) => return, Err(e) => { /* @@ -117,7 +186,7 @@ impl DataFile { } } - fn get_free_port(&self, inner: &MutexGuard) -> Result { + fn get_free_port(&self, inner: &Inner) -> Result { for port_number in self.port_min..=self.port_max { let mut region_uses_port = false; let mut running_snapshot_uses_port = false; @@ -180,7 +249,7 @@ impl DataFile { &self, create: CreateRegion, ) -> Result { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Look for a region with this ID. @@ -204,7 +273,7 @@ impl DataFile { /* * Allocate a port number that is not yet in use. */ - let port_number = self.get_free_port(&inner)?; + let port_number = self.get_free_port(inner)?; let read_only = create.source.is_some(); @@ -243,7 +312,7 @@ impl DataFile { &self, request: CreateRunningSnapshotRequest, ) -> Result { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Look for an existing running snapshot. @@ -293,7 +362,7 @@ impl DataFile { /* * Allocate a port number that is not yet in use. */ - let port_number = self.get_free_port(&inner)?; + let port_number = self.get_free_port(inner)?; let s = RunningSnapshot { id: request.id.clone(), @@ -330,7 +399,7 @@ impl DataFile { &self, request: DeleteRunningSnapshotRequest, ) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Look for an existing running snapshot. @@ -426,7 +495,7 @@ impl DataFile { &self, request: DeleteSnapshotRequest, ) -> Result<()> { - let inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; /* * Are we running a read-only downstairs for this snapshot? Fail if so. @@ -518,7 +587,7 @@ impl DataFile { * Mark a particular region as failed to provision. */ pub fn fail(&self, id: &RegionId) { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner.regions.get_mut(id).unwrap(); let nstate = State::Failed; @@ -539,7 +608,7 @@ impl DataFile { * Mark a particular running snapshot as failed to provision. */ pub fn fail_rs(&self, region_id: &RegionId, snapshot_name: &str) { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let rs = inner .running_snapshots @@ -570,7 +639,7 @@ impl DataFile { * Mark a particular region as provisioned. */ pub fn created(&self, id: &RegionId) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner.regions.get_mut(id).unwrap(); let nstate = State::Created; @@ -604,7 +673,7 @@ impl DataFile { region_id: &RegionId, snapshot_name: &str, ) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let rs = inner .running_snapshots @@ -672,7 +741,7 @@ impl DataFile { * used in a saga. */ pub fn destroyed(&self, id: &RegionId) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner.regions.get_mut(id).unwrap(); let nstate = State::Destroyed; @@ -702,7 +771,7 @@ impl DataFile { region_id: &RegionId, snapshot_name: &str, ) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let rs = inner .running_snapshots @@ -731,7 +800,7 @@ impl DataFile { * Nexus has requested that we destroy this particular region. */ pub fn destroy(&self, id: &RegionId) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); + let inner = &mut self.outer.lock().unwrap().inner; let r = inner .regions @@ -772,9 +841,41 @@ impl DataFile { * wait on the condition variable. */ pub fn first_in_states(&self, states: &[State]) -> Resource { - let mut inner = self.inner.lock().unwrap(); + let mut outer = self.outer.lock().unwrap(); loop { + // First check to see if there are completed jobs on the + // region create work queue. If we find any that are done, + // then remove them now. + { + let mut done_jobs = Vec::new(); + for (id, region_job) in outer.work_queue.iter_mut() { + if region_job.join_handle.is_finished() + || region_job.done_rx.try_recv().is_ok() + { + done_jobs.push(id.clone()); + } else { + info!(self.log, "id {:?} still running", id); + } + } + + for done_id in done_jobs { + let region_job = outer.work_queue.remove(&done_id).unwrap(); + // Wait for the thread to wrap up. + if let Err(e) = region_job.join_handle.join() { + warn!( + self.log, + "Exiting work thread reported: {:?}", e + ); + } + } + + info!( + self.log, + "reqion create work queue len is now: {:?}", + outer.work_queue.len() + ); + } /* * States are provided in priority order. We check for regions * in the first requested state before we check for @@ -783,20 +884,46 @@ impl DataFile { * regions ahead of creating new regions. */ for s in states { - for r in inner.regions.values() { + for r in outer.inner.regions.values() { if &r.state == s { - return Resource::Region(r.clone()); + // If this region ID is on the work queue hashmap, then + // let that work finish before we take any other action + // on it. + if outer.work_queue.contains_key(&r.id) { + continue; + } + + // We only return regions in Requested state if we + // have not started working on them yet, and we have + // room on the work queue. Otherwise, they remain + // requested until we can service them. + if r.state == State::Requested { + assert!(!outer.work_queue.contains_key(&r.id)); + if outer.work_queue.len() < MAX_REGION_WORK { + info!(self.log, "ID {:?} ready to add", r.id); + return Resource::Region(r.clone()); + } else { + info!(self.log, "No room for {:?} on wq", r.id); + continue; + } + } else { + return Resource::Region(r.clone()); + } } } - for (rid, r) in &inner.running_snapshots { + for (rid, r) in &outer.inner.running_snapshots { for (name, rs) in r { if &rs.state == s { - return Resource::RunningSnapshot( - rid.clone(), - name.clone(), - rs.clone(), - ); + if outer.work_queue.contains_key(rid) { + continue; + } else { + return Resource::RunningSnapshot( + rid.clone(), + name.clone(), + rs.clone(), + ); + } } } } @@ -806,7 +933,7 @@ impl DataFile { * If we did not find any regions in the specified state, sleep * on the condvar. */ - inner = self.bell.wait(inner).unwrap(); + outer = self.bell.wait(outer).unwrap(); } } diff --git a/agent/src/main.rs b/agent/src/main.rs index 8f3a61855..250c034c9 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1,6 +1,8 @@ // Copyright 2021 Oxide Computer Company +use crate::model::Region; use anyhow::{anyhow, bail, Result}; +use chrono::Utc; use clap::Parser; use dropshot::{ConfigLogging, ConfigLoggingIfExists, ConfigLoggingLevel}; use slog::{debug, error, info, o, Logger}; @@ -9,7 +11,7 @@ use std::io::Write; use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::process::Command; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; const PROG: &str = "crucible-agent"; const SERVICE: &str = "oxide/crucible/downstairs"; @@ -77,7 +79,7 @@ enum Args { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ZFSDataset { dataset: String, } @@ -158,6 +160,7 @@ impl ZFSDataset { cmd.arg("-o").arg(format!("quota={}", quota)); } + info!(log, "cmd is: {:?} {:?}", cmd, dataset); let res = cmd.arg(&dataset).output()?; if !res.status.success() { @@ -352,6 +355,7 @@ fn apply_smf( downstairs_prefix: &str, snapshot_prefix: &str, ) -> Result<()> { + let _smf_lock = df.smf_lock.lock().unwrap(); let scf = crucible_smf::Scf::new()?; let scope = scf.scope_local()?; let svc = scope @@ -400,7 +404,7 @@ where */ let expected_downstairs_instances = regions .iter() - .filter(|r| r.state == State::Created) + .filter(|r| r.state == State::Created || r.state == State::Requested) .map(|r| format!("{}-{}", downstairs_prefix, r.id.0)) .collect::>(); @@ -408,7 +412,9 @@ where .iter() .flat_map(|(_, n)| { n.iter() - .filter(|(_, rs)| rs.state == State::Created) + .filter(|(_, rs)| { + rs.state == State::Created || rs.state == State::Requested + }) .map(|(_, rs)| { format!("{}-{}-{}", snapshot_prefix, rs.id.0, rs.name) }) @@ -554,7 +560,7 @@ where reconfig = true; info!( log, - "existing {} value {} does not match {}", + "existing {} value {} doesn't match {}", property.name, val.as_string()?, property.val, @@ -585,9 +591,9 @@ where } } else { /* - * No running snapshot means the service has never started. Prod - * the restarter by disabling it, then we'll create everything - * from scratch. + * No running snapshot means the service has never started. + * Prod the restarter by disabling it, then we'll create + * everything from scratch. */ inst.disable(false)?; true @@ -631,7 +637,7 @@ where info!(log, "ok!"); } crucible_smf::CommitResult::OutOfDate => { - error!(log, "concurrent modification?!"); + error!(log, "concurrent modification for: {}", r.id.0); } } } else { @@ -799,6 +805,10 @@ where } crucible_smf::CommitResult::OutOfDate => { error!(log, "concurrent modification?!"); + panic!( + "concurrent modification for snap: {}", + snapshot.id.0 + ); } } } else { @@ -901,8 +911,8 @@ mod test { Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!()) } - /// Wrap a datafile, mock SMF interface, and mock snapshot interface, in order to test the - /// agent's SMF related behaviour. + /// Wrap a datafile, mock SMF interface, and mock snapshot interface, in + /// order to test the agent's SMF related behaviour. pub struct TestSmfHarness { log: Logger, dir: TempDir, @@ -1014,8 +1024,8 @@ mod test { impl Drop for TestSmfHarness { fn drop(&mut self) { - // If the agent zone bounces, it should read state from the datafile and recreate - // everything. Compare here during the drop. + // If the agent zone bounces, it should read state from the datafile + // and recreate everything. Compare here during the drop. let after_bounce_smf_interface = MockSmf::new(SERVICE.to_string()); let mut path_buf = self.dir.path().to_path_buf(); @@ -1031,8 +1041,8 @@ mod test { ) .unwrap(); - // Prune disabled services: a bounced agent zone will lose all these, and the agent - // will not recreate them. + // Prune disabled services: a bounced agent zone will lose all + // these, and the agent will not recreate them. self.smf_interface.prune(); assert_eq!(self.smf_interface, after_bounce_smf_interface); @@ -1585,6 +1595,108 @@ mod test { } } +/// Do all the steps required of the agent program to create a region. +/// This is called in a thread and takes care of both the actual creating of +/// a region, and creating the SMF service for that region. +/// +/// The function is responsible for updating the internal Datafile structure +/// with the results (pass or fail) from what it performs here. +/// +/// When this function is done, signal to the worker thread that state has +/// changed and it can cleanup this job and possibly create more work. +#[allow(clippy::too_many_arguments)] +fn agent_region_create( + log: Logger, + df: Arc, + regions_dataset: ZFSDataset, + regions_dataset_path: PathBuf, + downstairs_program: PathBuf, + downstairs_prefix: String, + r: Region, + snapshot_prefix: String, + done_tx: mpsc::Sender, +) { + let region_id = r.id.clone(); + info!(log, "spawned task for {:?}", region_id); + + /* + * Compute the actual size required for a full region, + * then add our metadata overhead to that. + */ + let region_size = r.block_size * r.extent_size * r.extent_count as u64; + let reservation = (region_size as f64 * RESERVATION_FACTOR).round() as u64; + let quota = region_size * QUOTA_FACTOR; + + info!( + log, + "Region size:{} reservation:{} quota:{}", + region_size, + reservation, + quota, + ); + + // If regions need to be created, do that before apply_smf. + let region_dataset = match regions_dataset.ensure_child_dataset( + &r.id.0, + Some(reservation), + Some(quota), + &log, + ) { + Ok(region_dataset) => region_dataset, + Err(e) => { + error!(log, "Dataset {} creation failed: {}", &r.id.0, e,); + df.fail(&r.id); + df.work_done(done_tx); + return; + } + }; + + let dataset_path = match region_dataset.path() { + Ok(dataset_path) => dataset_path, + Err(e) => { + error!(log, "Failed to find path for dataset {}: {}", &r.id.0, e,); + df.fail(&r.id); + df.work_done(done_tx); + return; + } + }; + + // It's important that a region transition to "Created" only after it has + // been created as a dataset: after the crucible agent restarts, + // `apply_smf` will only start downstairs services for those in "Created". + // If the `df.created` is moved to after this function's `apply_smf` call, + // and there is a crash before that moved `df.created` is set, then the + // agent will not start a downstairs service for this region when rebooted. + let res = + worker_region_create(&log, &downstairs_program, &r, &dataset_path) + .and_then(|_| df.created(&r.id)); + + if let Err(e) = res { + error!(log, "Region {:?} create failed: {:?}", r.id.0, e); + df.fail(&r.id); + df.work_done(done_tx); + return; + } + + info!(log, "Applying SMF actions post create {:?} ...", r.id.0); + let result = apply_smf( + &log, + &df, + regions_dataset_path.clone(), + &downstairs_prefix, + &snapshot_prefix, + ); + + if let Err(e) = result { + error!(log, "SMF application failure: {:?}", e); + } else { + info!(log, "SMF ok!"); + } + + info!(log, "Task for {:?} done, send notify", region_id); + df.work_done(done_tx); +} + /** * For region with state Tombstoned, destroy the region. * @@ -1633,107 +1745,45 @@ fn worker( * then we finish up destroying the region. */ match &r.state { - State::Requested => 'requested: { - /* - * Compute the actual size required for a full region, - * then add our metadata overhead to that. - */ - let region_size = r.block_size - * r.extent_size - * r.extent_count as u64; - let reservation = - (region_size as f64 * RESERVATION_FACTOR).round() - as u64; - let quota = region_size * QUOTA_FACTOR; - - info!( - log, - "Region size:{} reservation:{} quota:{}", - region_size, - reservation, - quota, - ); - - // If regions need to be created, do that before - // apply_smf. - let region_dataset = match regions_dataset - .ensure_child_dataset( - &r.id.0, - Some(reservation), - Some(quota), - &log, - ) { - Ok(region_dataset) => region_dataset, - Err(e) => { - error!( - log, - "Dataset {} creation failed: {}", - &r.id.0, - e, - ); - df.fail(&r.id); - break 'requested; - } - }; - - let dataset_path = match region_dataset.path() { - Ok(dataset_path) => dataset_path, - Err(e) => { - error!( - log, - "Failed to find path for dataset {}: {}", - &r.id.0, - e, - ); - df.fail(&r.id); - break 'requested; - } - }; - - // It's important that a region transition to "Created" - // only after it has been created as a dataset: - // after the crucible agent restarts, `apply_smf` will - // only start downstairs services for those in - // "Created". If the `df.created` is moved to after this - // function's `apply_smf` call, and there is a crash - // before that moved `df.created` is set, then the agent - // will not start a downstairs service for this region - // when rebooted. - let res = worker_region_create( - &log, - &downstairs_program, - &r, - &dataset_path, - ) - .and_then(|_| df.created(&r.id)); - - if let Err(e) = res { - error!( - log, - "region {:?} create failed: {:?}", r.id.0, e + State::Requested => { + // first_in_states has given us a new region to create + // that does not already have a job on work queue, so + // now we will create that job and spawn a task for + // it to do the work in. + let log0 = log.new(o!("component" => "worktask")); + let df_c = Arc::clone(&df); + let r_c = r.clone(); + let rd_c = regions_dataset.clone(); + let rdp_c = regions_dataset_path.clone(); + let dp_c = downstairs_program.clone(); + let dpre_c = downstairs_prefix.clone(); + let sp_c = snapshot_prefix.clone(); + let request_time = Utc::now(); + + info!(log0, "Spawing a region create for {:?}", r.id); + // This channel will tell us when the job is done. + let (done_tx, done_rx) = mpsc::channel(); + let job_handle = std::thread::spawn(move || { + agent_region_create( + log0, df_c, rd_c, rdp_c, dp_c, dpre_c, r_c, + sp_c, done_tx, ); - df.fail(&r.id); - break 'requested; - } - - info!(log, "applying SMF actions post create..."); - let result = apply_smf( - &log, - &df, - regions_dataset_path.clone(), - &downstairs_prefix, - &snapshot_prefix, + Ok(()) + }); + df.add_work( + r.id.clone(), + job_handle, + request_time, + done_rx, ); - - if let Err(e) = result { - error!(log, "SMF application failure: {:?}", e); - } else { - info!(log, "SMF ok!"); - } + info!(log, "Spawned a region create for {:?}", r.id); } State::Tombstoned => 'tombstoned: { - info!(log, "applying SMF actions before removal..."); + info!( + log, + "applying SMF actions before removal of {:?}", r.id + ); let result = apply_smf( &log, &df, @@ -1801,7 +1851,9 @@ fn worker( */ info!( log, - "applying SMF actions for region {} running snapshot {} (state {:?})...", + "applying SMF actions for region {} with {} running \ + snapshot {} (state {:?})...", + region_id.0, rs.id.0, rs.name, rs.state, diff --git a/agent/src/model.rs b/agent/src/model.rs index 614881efe..c07901568 100644 --- a/agent/src/model.rs +++ b/agent/src/model.rs @@ -202,6 +202,7 @@ impl CreateRegion { Clone, PartialOrd, Ord, + Hash, )] pub struct RegionId(pub String); diff --git a/agent/src/server.rs b/agent/src/server.rs index 63bf68dea..9ef66b821 100644 --- a/agent/src/server.rs +++ b/agent/src/server.rs @@ -1,5 +1,5 @@ // Copyright 2024 Oxide Computer Company -use super::datafile::DataFile; +use super::datafile::{DataFile, JobInfo}; use super::model; use anyhow::{anyhow, Result}; use dropshot::{ @@ -318,12 +318,22 @@ async fn region_delete_running_snapshot( match rc.context().delete_running_snapshot_request(request) { Ok(_) => Ok(HttpResponseDeleted()), Err(e) => Err(HttpError::for_internal_error(format!( - "running snapshot create failure: {:?}", + "running snapshot delete failure: {:?}", e ))), } } +#[endpoint { + method = GET, + path = "/crucible/0/work", +}] +async fn region_get_work_queue( + rc: RequestContext>, +) -> SResult>, HttpError> { + Ok(HttpResponseOk(rc.context().get_work_queue())) +} + pub fn make_api() -> Result>> { let mut api = dropshot::ApiDescription::new(); @@ -339,6 +349,7 @@ pub fn make_api() -> Result>> { api.register(region_run_snapshot)?; api.register(region_delete_running_snapshot)?; + api.register(region_get_work_queue)?; Ok(api) } diff --git a/openapi/crucible-agent.json b/openapi/crucible-agent.json index f46982f1e..74fcd64f7 100644 --- a/openapi/crucible-agent.json +++ b/openapi/crucible-agent.json @@ -298,6 +298,33 @@ } } } + }, + "/crucible/0/work": { + "get": { + "operationId": "region_get_work_queue", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Array_of_JobInfo", + "type": "array", + "items": { + "$ref": "#/components/schemas/JobInfo" + } + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } } }, "components": { @@ -392,6 +419,22 @@ "snapshots" ] }, + "JobInfo": { + "type": "object", + "properties": { + "region_id": { + "$ref": "#/components/schemas/RegionId" + }, + "request_time": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "region_id", + "request_time" + ] + }, "Region": { "type": "object", "properties": { From 9aaf6e4b99e4d77baa792487d49b74ae5f590028 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Fri, 7 Feb 2025 00:17:00 +0000 Subject: [PATCH 2/4] A bit more in the log --- agent/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/src/main.rs b/agent/src/main.rs index 250c034c9..cb2245841 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -1971,7 +1971,7 @@ fn worker_region_create( let cmd = cmd.output()?; if cmd.status.success() { - info!(log, "region files created ok"); + info!(log, "region {:?} created ok", region.id.0); } else { let err = String::from_utf8_lossy(&cmd.stderr); let out = String::from_utf8_lossy(&cmd.stdout); From 3c3e043887eacdf87bdb62ba4444496698040c32 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Fri, 7 Feb 2025 19:25:18 +0000 Subject: [PATCH 3/4] More info in logs A panic to remove later --- agent-antagonist/src/main.rs | 31 ++++++++++++++++++++++++------- downstairs/src/lib.rs | 8 ++++++-- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/agent-antagonist/src/main.rs b/agent-antagonist/src/main.rs index c30fa20b8..993ea5673 100644 --- a/agent-antagonist/src/main.rs +++ b/agent-antagonist/src/main.rs @@ -7,7 +7,7 @@ use rand::random; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use slog::Logger; -use slog::{debug, info, warn}; +use slog::{debug, error, info, warn}; use std::net::SocketAddr; use std::process::Command; use std::sync::{ @@ -308,25 +308,39 @@ async fn create_a_region( RegionState::Requested => { info!( log, - "waiting for region {:?}: state {:?} try:{}", + "waiting for new region {:?}: state {:?} try:{}", region_request.id, region.state, retry, ); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(3)).await; } RegionState::Created => { info!( log, - "region {:?} state {:?}", region_request.id, region.state, + "created region {:?} state {:?}", + region_request.id, region.state, ); break; } _ => { + error!( + log, + "region {:?} invalid state {:?}", + region_request.id, + region.state, + ); + // XXX Remove before merging. + // This is to catch something at the point of failure. + panic!( + "create region {:?} failed {:?}", + region_request.id, + region, + ); bail!( - "region {:?} unknown state {:?}", + "region {:?} invalid state {:?}", region_request.id, region.state, ); @@ -761,7 +775,7 @@ async fn delete_a_region( RegionState::Tombstoned => { info!( log, - "waiting for region {:?}: state {:?} try:{}", + "waiting for deleted region {:?}: state {:?} try:{}", region_id, region.state, retry, @@ -994,7 +1008,10 @@ async fn main() -> Result<()> { // The reason a task ended might be lost in the logging noise, so // print out what every task ended with info!(log, "All tasks have ended"); - info!(log, "Summary: {:?}", result_summary); + info!(log, "Task results"); + for res in result_summary { + info!(log, "{:?}", res); + } } } Ok(()) diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index a5347e056..417b01feb 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -2609,8 +2609,12 @@ impl Downstairs { info!(log, "Connecting to {source} to obtain our extent files."); let url = format!("http://{:?}", source); + // XXX Do we need a new client? + let reqwest_client = reqwest::ClientBuilder::new() + .build() + .unwrap(); let repair_server = - Client::new_with_client(&url, self.reqwest_client.clone()); + Client::new_with_client(&url, reqwest_client.clone()); let source_def = match repair_server.get_region_info().await { Ok(def) => def.into_inner(), @@ -2649,7 +2653,7 @@ impl Downstairs { if let Err(e) = self .region - .repair_extent(self.reqwest_client.clone(), eid, source, true) + .repair_extent(reqwest_client.clone(), eid, source, true) .await { bail!("repair extent {eid} returned: {e}"); From 764da36f6a4255a5834591b69c6e5591e6ce3a04 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Fri, 7 Feb 2025 19:51:46 +0000 Subject: [PATCH 4/4] cargo fmt --- agent-antagonist/src/main.rs | 6 +++--- downstairs/src/lib.rs | 4 +--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/agent-antagonist/src/main.rs b/agent-antagonist/src/main.rs index 993ea5673..613ec6b7f 100644 --- a/agent-antagonist/src/main.rs +++ b/agent-antagonist/src/main.rs @@ -320,7 +320,8 @@ async fn create_a_region( info!( log, "created region {:?} state {:?}", - region_request.id, region.state, + region_request.id, + region.state, ); break; } @@ -336,8 +337,7 @@ async fn create_a_region( // This is to catch something at the point of failure. panic!( "create region {:?} failed {:?}", - region_request.id, - region, + region_request.id, region, ); bail!( "region {:?} invalid state {:?}", diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 417b01feb..40b9ecc79 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -2610,9 +2610,7 @@ impl Downstairs { let url = format!("http://{:?}", source); // XXX Do we need a new client? - let reqwest_client = reqwest::ClientBuilder::new() - .build() - .unwrap(); + let reqwest_client = reqwest::ClientBuilder::new().build().unwrap(); let repair_server = Client::new_with_client(&url, reqwest_client.clone());