From df0b4512c1df5656c50fa40248936281be1d0509 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Mon, 24 Feb 2025 22:46:18 +0000 Subject: [PATCH 01/11] server: allow VCR replacement during block backend start Modify the state driver's VM startup procedure to allow the driver to process Crucible volume configuration changes while block backends are being activated. This fixes a livelock that occurs when starting a VM with a Crucible VCR that points to an unavailable downstairs: the unavailable downstairs prevents Crucible activation from proceeding; Nexus sends a corrected VCR that, if applied, would allow the upstairs to activate; but the state driver never applies the new VCR because it's blocked trying to activate using the broken VCR. Modify the PHD VCR replacement smoke test so that it checks this behavior. Add an affordance to PHD Crucible disks that allows a test to specify that the disk's generated VCRs should contain an invalid downstairs IP. Start a VM with a disk configured this way, then replace the broken VCR with a corrected VCR and verify that the VM boots normally. --- bin/propolis-server/src/lib/vm/objects.rs | 73 ++----- .../src/lib/vm/request_queue.rs | 33 ++- .../src/lib/vm/state_driver.rs | 188 ++++++++++++++++-- phd-tests/framework/src/disk/crucible.rs | 57 +++++- phd-tests/framework/src/lib.rs | 11 +- phd-tests/framework/src/test_vm/config.rs | 8 +- phd-tests/framework/src/test_vm/mod.rs | 6 +- phd-tests/framework/src/test_vm/spec.rs | 38 +++- phd-tests/tests/src/crucible/smoke.rs | 18 +- 9 files changed, 323 insertions(+), 109 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/objects.rs b/bin/propolis-server/src/lib/vm/objects.rs index f328f6ad3..fc762e580 100644 --- a/bin/propolis-server/src/lib/vm/objects.rs +++ b/bin/propolis-server/src/lib/vm/objects.rs @@ -23,9 +23,7 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use crate::{serial::Serial, spec::Spec, vcpu_tasks::VcpuTaskController}; -use super::{ - state_driver::VmStartReason, BlockBackendMap, CrucibleBackendMap, DeviceMap, -}; +use super::{BlockBackendMap, CrucibleBackendMap, DeviceMap}; /// A collection of components that make up a Propolis VM instance. pub(crate) struct VmObjects { @@ -189,6 +187,14 @@ impl VmObjectsLocked { &self.ps2ctrl } + pub(crate) fn device_map(&self) -> &DeviceMap { + &self.devices + } + + pub(crate) fn block_backend_map(&self) -> &BlockBackendMap { + &self.block_backends + } + /// Iterates over all of the lifecycle trait objects in this VM and calls /// `func` on each one. pub(crate) fn for_each_device( @@ -244,33 +250,6 @@ impl VmObjectsLocked { self.machine.reinitialize().unwrap(); } - /// Starts a VM's devices and allows all of its vCPU tasks to run. - /// - /// This function may be called either after initializing a new VM from - /// scratch or after an inbound live migration. In the latter case, this - /// routine assumes that the caller initialized and activated the VM's vCPUs - /// prior to importing state from the migration source. - pub(super) async fn start( - &mut self, - reason: VmStartReason, - ) -> anyhow::Result<()> { - match reason { - VmStartReason::ExplicitRequest => { - self.reset_vcpus(); - } - VmStartReason::MigratedIn => { - self.resume_kernel_vm(); - } - } - - let result = self.start_devices().await; - if result.is_ok() { - self.vcpu_tasks.resume_all(); - } - - result - } - /// Pauses this VM's devices and its kernel VMM. pub(crate) async fn pause(&mut self) { // Order matters here: the Propolis lifecycle trait's pause function @@ -291,6 +270,16 @@ impl VmObjectsLocked { self.vcpu_tasks.resume_all(); } + /// Resumes this VM's vCPU tasks. + /// + /// This is intended for use in VM startup sequences where the state driver + /// needs fine-grained control over the order in which devices and vCPUs + /// start. When pausing and resuming a VM that's already been started, use + /// [`Self::pause`] and [`Self::resume`] instead. + pub(crate) async fn resume_vcpus(&mut self) { + self.vcpu_tasks.resume_all(); + } + /// Stops the VM's vCPU tasks and devices. pub(super) async fn halt(&mut self) { self.vcpu_tasks.exit_all(); @@ -324,30 +313,6 @@ impl VmObjectsLocked { self.vcpu_tasks.resume_all(); } - /// Starts all of a VM's devices and allows its block backends to process - /// requests from their devices. - async fn start_devices(&self) -> anyhow::Result<()> { - self.for_each_device_fallible(|name, dev| { - info!(self.log, "sending startup complete to {}", name); - let res = dev.start(); - if let Err(e) = &res { - error!(self.log, "startup failed for {}: {:?}", name, e); - } - res - })?; - - for (name, backend) in self.block_backends.iter() { - info!(self.log, "starting block backend {}", name); - let res = backend.start().await; - if let Err(e) = &res { - error!(self.log, "startup failed for {}: {:?}", name, e); - return res; - } - } - - Ok(()) - } - /// Pauses all of a VM's devices. async fn pause_devices(&self) { // Take care not to wedge the runtime with any device pause diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 30d26f59c..2c78c4348 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -210,9 +210,7 @@ impl ExternalRequestQueue { reboot: RequestDisposition::Deny( RequestDeniedReason::InstanceNotActive, ), - mutate: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), + mutate: RequestDisposition::Enqueue, stop: RequestDisposition::Enqueue, }, log, @@ -295,7 +293,7 @@ impl ExternalRequestQueue { start: Disposition::Ignore, migrate_as_source: Disposition::Deny(reason), reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), + mutate: Disposition::Enqueue, stop: self.allowed.stop, } } @@ -577,18 +575,27 @@ mod test { } #[tokio::test] - async fn mutation_requires_running_and_not_migrating_out() { + async fn mutation_requires_not_migrating_out() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); - // Mutating a VM before it has started is not allowed. - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + // Mutating a VM before it has started is allowed. + assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); + assert!(matches!( + queue.pop_front(), + Some(ExternalRequest::ReconfigureCrucibleVolume { .. }) + )); - // Merely dequeuing the start request doesn't allow mutation; the VM - // actually has to be running. + // Mutating a VM is also allowed while it is starting. assert!(queue.try_queue(ExternalRequest::Start).is_ok()); assert!(matches!(queue.pop_front(), Some(ExternalRequest::Start))); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); + assert!(matches!( + queue.pop_front(), + Some(ExternalRequest::ReconfigureCrucibleVolume { .. }) + )); + + // And it's allowed once the VM has started running. queue.notify_instance_state_change(InstanceStateChange::StartedRunning); assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); assert!(matches!( @@ -610,10 +617,14 @@ mod test { } #[tokio::test] - async fn mutation_disallowed_after_stop() { + async fn mutation_disallowed_after_stop_requested() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); + assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + queue.notify_instance_state_change(InstanceStateChange::Stopped); assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); } diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index e3ba3a4c8..7afc5a0e4 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -90,6 +90,7 @@ //! [`ensure`]: crate::vm::ensure use std::{ + collections::VecDeque, sync::{Arc, Mutex}, time::Duration, }; @@ -109,7 +110,7 @@ use crate::{ destination::DestinationProtocol, source::SourceProtocol, MigrateRole, }, spec::StorageBackend, - vm::state_publisher::ExternalStateUpdate, + vm::{state_publisher::ExternalStateUpdate, BlockBackendMap}, }; use super::{ @@ -150,6 +151,10 @@ struct InputQueueInner { /// State change requests from the external API. external_requests: request_queue::ExternalRequestQueue, + /// State change requests from the external API that were previously read + /// but not handled immediately. + buffered_external: VecDeque, + /// State change requests from the VM's components. These take precedence /// over external state change requests. guest_events: super::guest_event::GuestEventQueue, @@ -162,6 +167,7 @@ impl InputQueueInner { log, auto_start, ), guest_events: super::guest_event::GuestEventQueue::default(), + buffered_external: Default::default(), } } } @@ -211,6 +217,8 @@ impl InputQueue { let mut guard = self.inner.lock().unwrap(); if let Some(guest_event) = guard.guest_events.pop_front() { return InputQueueEvent::GuestEvent(guest_event); + } else if let Some(req) = guard.buffered_external.pop_front() { + return InputQueueEvent::ExternalRequest(req); } else if let Some(req) = guard.external_requests.pop_front() { return InputQueueEvent::ExternalRequest(req); } @@ -227,6 +235,13 @@ impl InputQueue { } } + /// Pushes an external request to the end of the buffered external request + /// list. + fn buffer_external_request(&self, req: ExternalRequest) { + let mut guard = self.inner.lock().unwrap(); + guard.buffered_external.push_back(req); + } + /// Notifies the external request queue that the instance's state has /// changed so that it can change the dispositions for new state change /// requests. @@ -471,10 +486,9 @@ impl StateDriver { info!(self.log, "state driver launched"); let final_state = if migrated_in { - if self.start_vm(VmStartReason::MigratedIn).await.is_ok() { - self.event_loop().await - } else { - InstanceState::Failed + match self.start_vm(VmStartReason::MigratedIn).await { + Ok(()) => self.event_loop().await, + Err(_) => InstanceState::Failed, } } else { self.event_loop().await @@ -520,20 +534,162 @@ impl StateDriver { ) -> anyhow::Result<()> { info!(self.log, "starting instance"; "reason" => ?start_reason); - let start_result = - self.objects.lock_exclusive().await.start(start_reason).await; - match &start_result { - Ok(()) => { - self.publish_steady_state(InstanceState::Running); + // The start sequence is arranged so that calls to block backends can be + // interleaved with processing of requests from the external request + // queue. This allows Nexus to reconfigure Crucible volumes while they + // are being activated, which is necessary to unwedge activations that + // are targeting a downstairs that became invalid between the time Nexus + // passed its address to a VM and the time the VM actually tried to + // connect to it. + // + // Before getting into any of that, handle the synchronous portions of + // VM startup. First, ensure that the kernel VM and all its associated + // devices are in the correct initial states. + let objects = self.objects.lock_shared().await; + match start_reason { + // If this VM is a migration target, migration will have properly + // initialized the vCPUs, but will have left the kernel VM paused. + // Resume it here before asking any in-kernel components to start. + VmStartReason::MigratedIn => objects.resume_kernel_vm(), + + // If this VM is starting from scratch, its kernel VM is active, but + // its vCPUs have not been initialized yet. + VmStartReason::ExplicitRequest => objects.reset_vcpus(), + } + + // Send synchronous start commands to all devices. + for (name, dev) in objects.device_map() { + info!(self.log, "sending start request to {}", name); + let res = dev.start(); + if let Err(e) = &res { + error!(self.log, "startup failed for {}: {:?}", name, e); + return res; } - Err(e) => { - error!(&self.log, "failed to start devices"; - "error" => ?e); - self.publish_steady_state(InstanceState::Failed); + } + + // Next, prepare to start block backends. This is done by capturing the + // current block backend set and creating a future that issues all the + // start requests. + // + // For this to work, the set of block backends to be started must not + // change while the VM is starting. This is guaranteed because all such + // requests to hotplug a block backend will be dispatched to the VM's + // request queue; if any such requests are seen below, they can simply + // be buffered and handled after the rest of the VM has started. + async fn start_block_backends( + log: slog::Logger, + backends: BlockBackendMap, + ) -> anyhow::Result<()> { + for (name, backend) in backends { + info!(log, "starting block backend {}", name); + let res = backend.start().await; + if let Err(e) = &res { + error!(log, "startup failed for {}: {:?}", name, e); + return res; + } } + + Ok(()) } - start_result + let block_backends = objects.block_backend_map().clone(); + let block_backend_fut = + start_block_backends(self.log.clone(), block_backends); + tokio::pin!(block_backend_fut); + + // Drop the VM object lock before proceeding to allow other API calls + // that simply read the VM to make progress. Again, note that the set of + // objects being started still can't change, not because the lock is + // held, but because the only entity that can change them is the current + // task, which can decide whether and how to buffer incoming requests. + drop(objects); + + loop { + let event = tokio::select! { + // If the VM successfully starts, return immediately and let + // the caller process any events that may happen to be on the + // queue. + biased; + + res = &mut block_backend_fut => { + if res.is_ok() { + let objects = &self.objects; + objects.lock_exclusive().await.resume_vcpus().await; + self.publish_steady_state(InstanceState::Running); + info!(&self.log, "VM successfully started"); + } + + return res; + } + + dequeued = self.input_queue.wait_for_next_event() => { + dequeued + } + }; + + // The VM's vCPUs haven't been started yet, so there should be no + // way for the VM to produce any guest events to handle. + let InputQueueEvent::ExternalRequest(req) = event else { + unreachable!("can't get guest events before the VM starts"); + }; + + // Handle requests to reconfigure one of the existing Crucible + // volumes inline, but buffer other requests so that they can be + // handled after the VM has finished starting. + // + // Buffering some requests and servicing others can theoretically + // change the order in which those requests are retired. That's not + // a problem here because the request queue will stop accepting new + // VCR change requests once a request to stop has been queued. + match req { + ExternalRequest::Stop => { + info!( + &self.log, + "got request to stop while still starting" + ); + + // Buffer this request to stop so that it can be processed + // as part of the main event loop once the VM finishes + // starting. + // + // It is possible, at least in theory, to drop the block + // backend startup future (or cooperatively cancel it) and + // then pause/halt the VM immediately. This requires block + // backends' startup operations to be cancel-safe (in the + // sense that it must be possible to pause and halt a + // backend whose startup sequence was interrupted). Since + // block backends aren't currently required to guarantee + // this, just buffer the stop request. + self.input_queue.buffer_external_request(req); + } + ExternalRequest::ReconfigureCrucibleVolume { + backend_id, + new_vcr_json, + result_tx, + } => { + let _ = result_tx.send( + self.reconfigure_crucible_volume( + &backend_id, + new_vcr_json, + ) + .await, + ); + } + // The request queue is expected to reject (or at least silently + // ignore) requests to migrate or reboot an instance that hasn't + // reported that it's fully started. Similarly, requests to + // start a VM that's already starting are expected to be ignored + // for idempotency. + r @ ExternalRequest::Start + | r @ ExternalRequest::MigrateAsSource { .. } + | r @ ExternalRequest::Reboot => { + unreachable!( + "external request {r:?} shouldn't be queued while \ + starting" + ); + } + } + } } async fn handle_guest_event( @@ -583,7 +739,7 @@ impl StateDriver { match request { ExternalRequest::Start => { match self.start_vm(VmStartReason::ExplicitRequest).await { - Ok(_) => HandleEventOutcome::Continue, + Ok(()) => HandleEventOutcome::Continue, Err(_) => HandleEventOutcome::Exit { final_state: InstanceState::Failed, }, diff --git a/phd-tests/framework/src/disk/crucible.rs b/phd-tests/framework/src/disk/crucible.rs index 5ac2f2747..6ed05e938 100644 --- a/phd-tests/framework/src/disk/crucible.rs +++ b/phd-tests/framework/src/disk/crucible.rs @@ -5,7 +5,7 @@ //! Abstractions for Crucible-backed disks. use std::{ - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, path::{Path, PathBuf}, process::Stdio, sync::Mutex, @@ -47,10 +47,24 @@ impl Drop for DataDirectory { #[derive(Debug)] struct Downstairs { process_handle: std::process::Child, + + /// The address on which this downstairs is serving its API. address: SocketAddr, + + /// The address to insert as a connection target when constructing a VCR + /// that refers to this downstairs. If `None`, the downstairs's API address + /// is used instead. + vcr_address_override: Option, + data_dir: DataDirectory, } +impl Downstairs { + fn vcr_address(&self) -> SocketAddr { + self.vcr_address_override.unwrap_or(self.address) + } +} + impl Drop for Downstairs { fn drop(&mut self) { info!(?self, "Stopping Crucible downstairs process"); @@ -107,6 +121,39 @@ impl CrucibleDisk { pub fn set_generation(&self, generation: u64) { self.inner.lock().unwrap().generation = generation; } + + /// Changes this disk's downstairs configuration so that the returned IP + /// address of the first downstairs is an IPv6 black hole instead of its + /// actual address. This will prevent VMs from activating this disk until + /// the VCR is replaced with one bearing the correct IP address. + pub fn enable_vcr_black_hole(&self) { + info!(disk = self.device_name.as_str(), "enabling vcr black hole"); + + // 100::/64 is the IPv6 discard prefix (per RFC 6666). + let address = SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0x100, 0, 0, 0, 0, 0, 0, 0), + 9000, + 0, + 0, + )); + + let mut inner = self.inner.lock().unwrap(); + + // Crucible rejects VCR replacement requests that change more than one + // downstairs address, so just invalidate the first downstairs address. + // This ensures that if the black hole is disabled, subsequent VCRs are + // valid replacements for the ones produced while the black hole was + // enabled. + inner.downstairs_instances[0].vcr_address_override = Some(address); + } + + /// Ensures that this disk's downstairs configuration will return the + /// correct addresses for all its downstairs instances. + pub fn disable_vcr_black_hole(&self) { + info!(disk = self.device_name.as_str(), "disabling vcr black hole"); + let mut inner = self.inner.lock().unwrap(); + inner.downstairs_instances[0].vcr_address_override = None; + } } impl super::DiskConfig for CrucibleDisk { @@ -302,6 +349,7 @@ impl Inner { stderr, )?, address: SocketAddr::V4(addr), + vcr_address_override: None, data_dir: dir, }; @@ -338,8 +386,11 @@ impl Inner { } fn vcr(&self, disk_id: Uuid) -> VolumeConstructionRequest { - let downstairs_addrs = - self.downstairs_instances.iter().map(|ds| ds.address).collect(); + let downstairs_addrs = self + .downstairs_instances + .iter() + .map(|ds| ds.vcr_address()) + .collect(); VolumeConstructionRequest::Volume { id: disk_id, diff --git a/phd-tests/framework/src/lib.rs b/phd-tests/framework/src/lib.rs index 327b1804b..35ead0df1 100644 --- a/phd-tests/framework/src/lib.rs +++ b/phd-tests/framework/src/lib.rs @@ -290,15 +290,8 @@ impl Framework { vm: &TestVm, environment: Option<&EnvironmentSpec>, ) -> anyhow::Result { - let mut vm_spec = - VmSpec { vm_name: vm_name.to_owned(), ..vm.vm_spec() }; - - // Reconcile any differences between the generation numbers in the VM - // objects' instance spec and the associated Crucible disk handles. - // This may be needed because a test can call `set_generation` on a disk - // handle to change its active generation number mid-test, and this - // won't automatically be reflected in the VM's instance spec. - vm_spec.refresh_crucible_backends(); + let mut vm_spec = vm.vm_spec().clone(); + vm_spec.set_vm_name(vm_name.to_owned()); // Create new metadata for an instance based on this predecessor. It // should have the same project and silo IDs, but the sled identifiers diff --git a/phd-tests/framework/src/test_vm/config.rs b/phd-tests/framework/src/test_vm/config.rs index 06a0c7c2d..2bf54d43c 100644 --- a/phd-tests/framework/src/test_vm/config.rs +++ b/phd-tests/framework/src/test_vm/config.rs @@ -387,14 +387,14 @@ impl<'dr> VmConfig<'dr> { sled_serial: sled_id.to_string(), }; - Ok(VmSpec { - vm_name: vm_name.clone(), - instance_spec: spec, + Ok(VmSpec::new( + vm_name.clone(), + spec, disk_handles, guest_os_kind, bootrom_path, metadata, - }) + )) } } diff --git a/phd-tests/framework/src/test_vm/mod.rs b/phd-tests/framework/src/test_vm/mod.rs index 88fcfb2d9..ea480a54a 100644 --- a/phd-tests/framework/src/test_vm/mod.rs +++ b/phd-tests/framework/src/test_vm/mod.rs @@ -292,7 +292,7 @@ impl TestVm { let init = match migrate { None => InstanceInitializationMethod::Spec { - spec: self.spec.instance_spec.clone(), + spec: self.spec.instance_spec(), }, Some(info) => InstanceInitializationMethod::MigrationTarget { migration_id: info.migration_id, @@ -489,7 +489,7 @@ impl TestVm { let timeout_duration = match Into::::into(timeout) { MigrationTimeout::Explicit(val) => val, MigrationTimeout::InferFromMemorySize => { - let mem_mib = self.spec.instance_spec.board.memory_mb; + let mem_mib = self.spec.instance_spec().board.memory_mb; std::time::Duration::from_secs( (MIGRATION_SECS_PER_GUEST_GIB * mem_mib) / 1024, ) @@ -573,7 +573,7 @@ impl TestVm { fn generate_replacement_components(&self) -> ReplacementComponents { let mut map = ReplacementComponents::new(); - for (id, comp) in &self.spec.instance_spec.components { + for (id, comp) in &self.spec.instance_spec().components { match comp { ComponentV0::MigrationFailureInjector(inj) => { map.insert( diff --git a/phd-tests/framework/src/test_vm/spec.rs b/phd-tests/framework/src/test_vm/spec.rs index 970e4424d..d0ef3f8c5 100644 --- a/phd-tests/framework/src/test_vm/spec.rs +++ b/phd-tests/framework/src/test_vm/spec.rs @@ -18,7 +18,7 @@ pub struct VmSpec { pub vm_name: String, /// The instance spec to pass to the VM when starting the guest. - pub instance_spec: InstanceSpecV0, + base_instance_spec: InstanceSpecV0, /// A set of handles to disk files that the VM's disk backends refer to. pub disk_handles: Vec>, @@ -43,9 +43,37 @@ impl VmSpec { .find(|disk| disk.device_name().as_str() == name) } + pub(crate) fn new( + vm_name: String, + instance_spec: InstanceSpecV0, + disk_handles: Vec>, + guest_os_kind: GuestOsKind, + bootrom_path: Utf8PathBuf, + metadata: InstanceMetadata, + ) -> Self { + Self { + vm_name, + base_instance_spec: instance_spec, + disk_handles, + guest_os_kind, + bootrom_path, + metadata, + } + } + + pub(crate) fn set_vm_name(&mut self, name: String) { + self.vm_name = name + } + + pub(crate) fn instance_spec(&self) -> InstanceSpecV0 { + let mut spec = self.base_instance_spec.clone(); + self.set_crucible_backends(&mut spec); + spec + } + /// Update the Crucible backend specs in the instance spec to match the /// current backend specs given by this specification's disk handles. - pub(crate) fn refresh_crucible_backends(&mut self) { + fn set_crucible_backends(&self, spec: &mut InstanceSpecV0) { for disk in &self.disk_handles { let disk = if let Some(disk) = disk.as_crucible() { disk @@ -57,11 +85,9 @@ impl VmSpec { let backend_name = disk.device_name().clone().into_backend_name().into_string(); if let Some(ComponentV0::CrucibleStorageBackend(_)) = - self.instance_spec.components.get(&backend_name) + spec.components.get(&backend_name) { - self.instance_spec - .components - .insert(backend_name, backend_spec); + spec.components.insert(backend_name, backend_spec); } } } diff --git a/phd-tests/tests/src/crucible/smoke.rs b/phd-tests/tests/src/crucible/smoke.rs index fd34e34b8..984723742 100644 --- a/phd-tests/tests/src/crucible/smoke.rs +++ b/phd-tests/tests/src/crucible/smoke.rs @@ -88,8 +88,9 @@ async fn shutdown_persistence_test(ctx: &Framework) { } #[phd_testcase] -async fn vcr_replace_test(ctx: &Framework) { - let mut config = ctx.vm_config_builder("crucible_vcr_replace_test"); +async fn vcr_replace_during_start_test(ctx: &Framework) { + let mut config = + ctx.vm_config_builder("crucible_vcr_replace_during_start_test"); // Create a blank data disk on which to perform VCR replacement. This is // necessary because Crucible doesn't permit VCR replacements for volumes @@ -111,11 +112,22 @@ async fn vcr_replace_test(ctx: &Framework) { let disk_hdl = spec.get_disk_by_device_name(DATA_DISK_NAME).cloned().unwrap(); let disk = disk_hdl.as_crucible().unwrap(); + disk.enable_vcr_black_hole(); let mut vm = ctx.spawn_vm_with_spec(spec, None).await?; vm.launch().await?; - vm.wait_to_boot().await?; + // The VM is expected not to reach the Running state. Unfortunately, there's + // no great way to test that this is never going to happen; as a best-effort + // alternative, wait for a short while and assert that the VM doesn't reach + // Running in the timeout interval. + vm.wait_for_state(InstanceState::Running, Duration::from_secs(5)) + .await + .unwrap_err(); + + disk.disable_vcr_black_hole(); disk.set_generation(2); vm.replace_crucible_vcr(disk).await?; + + vm.wait_to_boot().await?; } From e794e68c3f538d189fa9cbbe08dabb8478f14623 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Wed, 26 Feb 2025 00:16:17 +0000 Subject: [PATCH 02/11] PR feedback & comment cleanup --- bin/propolis-server/src/lib/vm/objects.rs | 6 +++--- .../src/lib/vm/state_driver.rs | 19 ++++++++++++------- phd-tests/tests/src/crucible/smoke.rs | 14 +++++++++++++- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/objects.rs b/bin/propolis-server/src/lib/vm/objects.rs index fc762e580..0992a9f4a 100644 --- a/bin/propolis-server/src/lib/vm/objects.rs +++ b/bin/propolis-server/src/lib/vm/objects.rs @@ -267,7 +267,7 @@ impl VmObjectsLocked { // that all devices resume before any vCPUs do. self.resume_kernel_vm(); self.resume_devices(); - self.vcpu_tasks.resume_all(); + self.resume_vcpus(); } /// Resumes this VM's vCPU tasks. @@ -276,7 +276,7 @@ impl VmObjectsLocked { /// needs fine-grained control over the order in which devices and vCPUs /// start. When pausing and resuming a VM that's already been started, use /// [`Self::pause`] and [`Self::resume`] instead. - pub(crate) async fn resume_vcpus(&mut self) { + pub(crate) fn resume_vcpus(&mut self) { self.vcpu_tasks.resume_all(); } @@ -310,7 +310,7 @@ impl VmObjectsLocked { // Resume devices so they're ready to do more work, then resume // vCPUs. self.resume_devices(); - self.vcpu_tasks.resume_all(); + self.resume_vcpus(); } /// Pauses all of a VM's devices. diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index 7afc5a0e4..0c2e179b0 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -536,11 +536,12 @@ impl StateDriver { // The start sequence is arranged so that calls to block backends can be // interleaved with processing of requests from the external request - // queue. This allows Nexus to reconfigure Crucible volumes while they - // are being activated, which is necessary to unwedge activations that - // are targeting a downstairs that became invalid between the time Nexus - // passed its address to a VM and the time the VM actually tried to - // connect to it. + // queue. This allows Nexus to reconfigure Crucible backends while they + // are being activated, which can be necessary if the VM's original + // specification specifies a Crucible downstairs server that is offline + // or unavailable. (Downstairs instances can disappear at any time, + // e.g. due to sled failure, so these configurations aren't necessarily + // client errors.) // // Before getting into any of that, handle the synchronous portions of // VM startup. First, ensure that the kernel VM and all its associated @@ -584,7 +585,11 @@ impl StateDriver { info!(log, "starting block backend {}", name); let res = backend.start().await; if let Err(e) = &res { - error!(log, "startup failed for {}: {:?}", name, e); + error!( + log, + "startup failed for block backend {}: {:?}", name, e + ); + return res; } } @@ -614,7 +619,7 @@ impl StateDriver { res = &mut block_backend_fut => { if res.is_ok() { let objects = &self.objects; - objects.lock_exclusive().await.resume_vcpus().await; + objects.lock_exclusive().await.resume_vcpus(); self.publish_steady_state(InstanceState::Running); info!(&self.log, "VM successfully started"); } diff --git a/phd-tests/tests/src/crucible/smoke.rs b/phd-tests/tests/src/crucible/smoke.rs index 984723742..eab416ece 100644 --- a/phd-tests/tests/src/crucible/smoke.rs +++ b/phd-tests/tests/src/crucible/smoke.rs @@ -108,12 +108,16 @@ async fn vcr_replace_during_start_test(ctx: &Framework) { 5, ); + // Configure the disk so that when the VM starts, it will have an invalid + // downstairs address. let spec = config.vm_spec(ctx).await?; let disk_hdl = spec.get_disk_by_device_name(DATA_DISK_NAME).cloned().unwrap(); let disk = disk_hdl.as_crucible().unwrap(); disk.enable_vcr_black_hole(); + // Try to start the VM, but don't wait for it to boot; it should get stuck + // while activating using an invalid downstairs address. let mut vm = ctx.spawn_vm_with_spec(spec, None).await?; vm.launch().await?; @@ -125,9 +129,17 @@ async fn vcr_replace_during_start_test(ctx: &Framework) { .await .unwrap_err(); + // Fix the disk's downstairs address and send a replacement request. This + // should be processed and should allow the VM to boot. disk.disable_vcr_black_hole(); disk.set_generation(2); vm.replace_crucible_vcr(disk).await?; - vm.wait_to_boot().await?; + + assert_eq!(vm.get().await?.instance.state, InstanceState::Running); + + // VCR replacements should continue to be accepted now that the instance is + // running. + disk.set_generation(3); + vm.replace_crucible_vcr(disk).await?; } From 35c4db9cfd841254b55c5d0b9b29ccf8fc2f8be0 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Mon, 3 Mar 2025 19:44:28 +0000 Subject: [PATCH 03/11] server: remove resolved TODO comment (thanks James) --- bin/propolis-server/src/lib/vm/request_queue.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 2c78c4348..08e2e41a6 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -74,10 +74,6 @@ pub enum ExternalRequest { /// Attempts to update the volume construction request for the supplied /// Crucible volume. - /// - /// TODO: Due to https://github.com/oxidecomputer/crucible/issues/871, this - /// is only allowed once the VM is started and the volume has activated, but - /// it should be allowed even before the VM has started. ReconfigureCrucibleVolume { /// The ID of the Crucible backend in the VM's Crucible backend map. backend_id: SpecKey, From bfb8b07896d08a6758bf2d74538393b48743d8ab Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Mon, 3 Mar 2025 19:49:39 +0000 Subject: [PATCH 04/11] server: don't spin out on stop requests while starting Add a PHD test case for this. This also revealed a bug in the request queue: if a VM is stuck waiting to activate a Crucible volume, and a stop request arrives, the VCR replacement that might have fixed it will be rejected, because the queue rejects replacement requests that arrive after a stop request is queued. Fix this by keeping the same VCR replacement request disposition when a stop request is received. --- .../src/lib/vm/request_queue.rs | 37 +++++--- .../src/lib/vm/state_driver.rs | 87 +++++++++++++------ phd-tests/tests/src/crucible/smoke.rs | 4 + phd-tests/tests/src/server_state_machine.rs | 57 ++++++++++++ 4 files changed, 146 insertions(+), 39 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 08e2e41a6..c6fced60c 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -166,7 +166,7 @@ struct AllowedRequests { start: RequestDisposition, migrate_as_source: RequestDisposition, reboot: RequestDisposition, - mutate: RequestDisposition, + reconfigure_crucible_volume: RequestDisposition, stop: RequestDisposition, } @@ -206,7 +206,7 @@ impl ExternalRequestQueue { reboot: RequestDisposition::Deny( RequestDeniedReason::InstanceNotActive, ), - mutate: RequestDisposition::Enqueue, + reconfigure_crucible_volume: RequestDisposition::Enqueue, stop: RequestDisposition::Enqueue, }, log, @@ -237,7 +237,7 @@ impl ExternalRequestQueue { } ExternalRequest::Reboot => self.allowed.reboot, ExternalRequest::ReconfigureCrucibleVolume { .. } => { - self.allowed.mutate + self.allowed.reconfigure_crucible_volume } // Requests to stop always succeed. Note that a request to stop a VM @@ -289,7 +289,7 @@ impl ExternalRequestQueue { start: Disposition::Ignore, migrate_as_source: Disposition::Deny(reason), reboot: Disposition::Deny(reason), - mutate: Disposition::Enqueue, + reconfigure_crucible_volume: Disposition::Enqueue, stop: self.allowed.stop, } } @@ -310,7 +310,7 @@ impl ExternalRequestQueue { reboot: Disposition::Deny( DenyReason::InvalidRequestForMigrationSource, ), - mutate: Disposition::Deny( + reconfigure_crucible_volume: Disposition::Deny( DenyReason::InvalidRequestForMigrationSource, ), stop: self.allowed.stop, @@ -328,15 +328,30 @@ impl ExternalRequestQueue { AllowedRequests { reboot: Disposition::Ignore, ..self.allowed } } - // Requests to stop the instance block other requests from being - // queued. Additional requests to stop are ignored for idempotency. + // Once the instance is asked to stop, further requests to change + // its state are ignored. + // + // Requests to change Crucible volume configuration can still be + // queued if they were previously alloewd. This allows the state + // driver to accept VCR mutations that are needed to allow an + // activation to complete even if the instance is slated to stop + // immediately after starting. + // + // Note that it is possible for a VCR change request to be enqueued + // and then not processed before the state driver stops reading from + // the queue. If this happens, the request will be marked as failed + // when the request queue containing it is dropped. (This can also + // happen if a guest asks to halt a VM while it has a replacement + // request on its queue.) ChangeReason::ApiRequest(ExternalRequest::Stop) => { let reason = DenyReason::HaltPending; AllowedRequests { start: Disposition::Deny(reason), migrate_as_source: Disposition::Deny(reason), reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), + reconfigure_crucible_volume: self + .allowed + .reconfigure_crucible_volume, stop: Disposition::Ignore, } } @@ -354,7 +369,7 @@ impl ExternalRequestQueue { start: self.allowed.start, migrate_as_source: Disposition::Enqueue, reboot: Disposition::Enqueue, - mutate: Disposition::Enqueue, + reconfigure_crucible_volume: Disposition::Enqueue, stop: self.allowed.stop, } } @@ -384,7 +399,7 @@ impl ExternalRequestQueue { start: Disposition::Deny(reason), migrate_as_source: Disposition::Deny(reason), reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), + reconfigure_crucible_volume: Disposition::Deny(reason), stop: Disposition::Ignore, } } @@ -394,7 +409,7 @@ impl ExternalRequestQueue { start: Disposition::Deny(reason), migrate_as_source: Disposition::Deny(reason), reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), + reconfigure_crucible_volume: Disposition::Deny(reason), stop: self.allowed.stop, } } diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index 0c2e179b0..856f71739 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -153,7 +153,7 @@ struct InputQueueInner { /// State change requests from the external API that were previously read /// but not handled immediately. - buffered_external: VecDeque, + self_request: VecDeque, /// State change requests from the VM's components. These take precedence /// over external state change requests. @@ -167,7 +167,7 @@ impl InputQueueInner { log, auto_start, ), guest_events: super::guest_event::GuestEventQueue::default(), - buffered_external: Default::default(), + self_request: Default::default(), } } } @@ -198,12 +198,20 @@ impl InputQueue { } } - /// Waits for an event to arrive on the input queue and returns it for - /// processing. + /// Waits for a new event to arrive on one of the queue's sub-queues and + /// dispatches it for processing. /// - /// External requests and guest events are stored in separate queues. If - /// both queues have events when this routine is called, the guest event - /// queue takes precedence. + /// The sub-queues, listed here in priority order, are: + /// + /// - Guest events: These are signals raised from the VM's vCPUs and + /// devices (e.g. a request to reboot or halt the VM arising from a vCPU + /// asserting a virtual chipset signal). + /// - Self-requests: The state driver may buffer external requests for + /// later processing by pushing them to the self-request queue. See + /// [`Self::push_self_request`]. + /// - External requests: These are state change requests received via the + /// server API. See [`super::request_queue`] for more details about how + /// these requests are queued. /// /// # Synchronization /// @@ -217,7 +225,7 @@ impl InputQueue { let mut guard = self.inner.lock().unwrap(); if let Some(guest_event) = guard.guest_events.pop_front() { return InputQueueEvent::GuestEvent(guest_event); - } else if let Some(req) = guard.buffered_external.pop_front() { + } else if let Some(req) = guard.self_request.pop_front() { return InputQueueEvent::ExternalRequest(req); } else if let Some(req) = guard.external_requests.pop_front() { return InputQueueEvent::ExternalRequest(req); @@ -235,11 +243,17 @@ impl InputQueue { } } - /// Pushes an external request to the end of the buffered external request - /// list. - fn buffer_external_request(&self, req: ExternalRequest) { + /// Pushes a self-requested state change request to this queue. + /// + /// This routine may only be called from the state driver task. + fn push_self_request(&self, req: ExternalRequest) { let mut guard = self.inner.lock().unwrap(); - guard.buffered_external.push_back(req); + guard.self_request.push_back(req); + + // Since this routine is only called from the state driver task, the + // driver is by definition not waiting for a new event at this point, so + // it's not necessary to signal the notify here (the next call to + // dequeue an event will always pick this event up). } /// Notifies the external request queue that the instance's state has @@ -534,6 +548,14 @@ impl StateDriver { ) -> anyhow::Result<()> { info!(self.log, "starting instance"; "reason" => ?start_reason); + // Tell listeners that the VM's components are now starting up and not + // merely being created (but keep the VM in the Migrating state if it's + // being started pursuant to a migration in). + if let VmStartReason::ExplicitRequest = start_reason { + self.external_state + .update(ExternalStateUpdate::Instance(InstanceState::Starting)); + } + // The start sequence is arranged so that calls to block backends can be // interleaved with processing of requests from the external request // queue. This allows Nexus to reconfigure Crucible backends while they @@ -609,6 +631,11 @@ impl StateDriver { // task, which can decide whether and how to buffer incoming requests. drop(objects); + // Keep track of whether the external queue produced a request to stop + // the VM while it was being started. If such a request is seen, send a + // self-request to stop just before returning so that the VM will stop + // immediately. + let mut stopped_while_starting = false; loop { let event = tokio::select! { // If the VM successfully starts, return immediately and let @@ -621,6 +648,16 @@ impl StateDriver { let objects = &self.objects; objects.lock_exclusive().await.resume_vcpus(); self.publish_steady_state(InstanceState::Running); + + // If a stop request was seen while in the startup + // sequence, self-request a stop for the main event + // loop to handle. + if stopped_while_starting { + self.input_queue.push_self_request( + ExternalRequest::Stop + ); + } + info!(&self.log, "VM successfully started"); } @@ -632,10 +669,12 @@ impl StateDriver { } }; - // The VM's vCPUs haven't been started yet, so there should be no - // way for the VM to produce any guest events to handle. + // The VM's vCPUs only start when the block backend startup future + // resolves and is selected above. If control reached that point, + // that branch wasn't selected, so the vCPUs should still be paused, + // which means the dequeued event should not be a guest event. let InputQueueEvent::ExternalRequest(req) = event else { - unreachable!("can't get guest events before the VM starts"); + unreachable!("can't get guest events before vCPUs start"); }; // Handle requests to reconfigure one of the existing Crucible @@ -653,19 +692,11 @@ impl StateDriver { "got request to stop while still starting" ); - // Buffer this request to stop so that it can be processed - // as part of the main event loop once the VM finishes - // starting. - // - // It is possible, at least in theory, to drop the block - // backend startup future (or cooperatively cancel it) and - // then pause/halt the VM immediately. This requires block - // backends' startup operations to be cancel-safe (in the - // sense that it must be possible to pause and halt a - // backend whose startup sequence was interrupted). Since - // block backends aren't currently required to guarantee - // this, just buffer the stop request. - self.input_queue.buffer_external_request(req); + // Remember that the VM should stop once it has started. + // It's not safe to issue a self-request here because the + // next loop iteration will simply pop the self-request back + // off the queue and reach this path once more. + stopped_while_starting = true; } ExternalRequest::ReconfigureCrucibleVolume { backend_id, diff --git a/phd-tests/tests/src/crucible/smoke.rs b/phd-tests/tests/src/crucible/smoke.rs index eab416ece..a69ac2f5b 100644 --- a/phd-tests/tests/src/crucible/smoke.rs +++ b/phd-tests/tests/src/crucible/smoke.rs @@ -89,6 +89,10 @@ async fn shutdown_persistence_test(ctx: &Framework) { #[phd_testcase] async fn vcr_replace_during_start_test(ctx: &Framework) { + if !ctx.crucible_enabled() { + phd_skip!("Crucible backends not enabled (no downstairs path)"); + } + let mut config = ctx.vm_config_builder("crucible_vcr_replace_during_start_test"); diff --git a/phd-tests/tests/src/server_state_machine.rs b/phd-tests/tests/src/server_state_machine.rs index dba7d3cd6..376afb728 100644 --- a/phd-tests/tests/src/server_state_machine.rs +++ b/phd-tests/tests/src/server_state_machine.rs @@ -6,6 +6,10 @@ use std::time::Duration; +use phd_framework::{ + disk::{BlockSize, DiskSource}, + test_vm::{DiskBackend, DiskInterface}, +}; use phd_testcase::*; use propolis_client::types::InstanceState; @@ -89,3 +93,56 @@ async fn instance_reset_requires_running_test(ctx: &Framework) { vm.launch().await?; vm.wait_for_state(InstanceState::Running, Duration::from_secs(60)).await?; } + +#[phd_testcase] +async fn stop_while_blocked_on_start_test(ctx: &Framework) { + // This test uses a Crucible disk backend to cause VM startup to block. + if !ctx.crucible_enabled() { + phd_skip!("test requires Crucible support"); + } + + let mut config = ctx.vm_config_builder("stop_while_blocked_on_start_test"); + + // Create a VM that blocks while starting by attaching a Crucible data disk + // to it and enabling the black hole address in its volume construction + // request. The invalid address will keep Crucible from activating and so + // will block the VM from fully starting. + const DATA_DISK_NAME: &str = "vcr-replacement-target"; + config.data_disk( + DATA_DISK_NAME, + DiskSource::Blank(1024 * 1024 * 1024), + DiskInterface::Nvme, + DiskBackend::Crucible { + min_disk_size_gib: 1, + block_size: BlockSize::Bytes512, + }, + 5, + ); + + let spec = config.vm_spec(ctx).await?; + let disk_hdl = + spec.get_disk_by_device_name(DATA_DISK_NAME).cloned().unwrap(); + let disk = disk_hdl.as_crucible().unwrap(); + disk.enable_vcr_black_hole(); + + // Launch the VM and wait for it to advertise that its components are + // starting. + let mut vm = ctx.spawn_vm_with_spec(spec, None).await?; + vm.launch().await?; + vm.wait_for_state(InstanceState::Starting, Duration::from_secs(15)) + .await + .unwrap(); + + // Send a stop request. This should enqueue successfully, but the VM won't + // stop right away because it's still starting. + vm.stop().await?; + + // Unblock Crucible startup by fixing the broken disk's VCR. + disk.disable_vcr_black_hole(); + disk.set_generation(2); + vm.replace_crucible_vcr(disk).await?; + + // Eventually the instance should shut down. + vm.wait_for_state(InstanceState::Destroyed, Duration::from_secs(60)) + .await?; +} From 65f862f05f544ad40bffd1fabf6b9822829323f8 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 4 Mar 2025 01:29:33 +0000 Subject: [PATCH 05/11] phd: tidy up call to map --- phd-tests/framework/src/disk/crucible.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/phd-tests/framework/src/disk/crucible.rs b/phd-tests/framework/src/disk/crucible.rs index 6ed05e938..a4553990a 100644 --- a/phd-tests/framework/src/disk/crucible.rs +++ b/phd-tests/framework/src/disk/crucible.rs @@ -389,7 +389,7 @@ impl Inner { let downstairs_addrs = self .downstairs_instances .iter() - .map(|ds| ds.vcr_address()) + .map(Downstairs::vcr_address) .collect(); VolumeConstructionRequest::Volume { From b6427f7d8fe8894f0743d53615bfad673b5ca820 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 4 Mar 2025 01:30:23 +0000 Subject: [PATCH 06/11] server: fix/refine request queue tests --- .../src/lib/vm/request_queue.rs | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index c6fced60c..03c355691 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -628,15 +628,40 @@ mod test { } #[tokio::test] - async fn mutation_disallowed_after_stop_requested() { + async fn mutation_allowed_after_stop_requested() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); queue.notify_instance_state_change(InstanceStateChange::StartedRunning); assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); + } + #[tokio::test] + async fn mutation_disallowed_after_stopped() { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + queue.notify_instance_state_change(InstanceStateChange::StartedRunning); queue.notify_instance_state_change(InstanceStateChange::Stopped); assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); } + + #[tokio::test] + async fn vcr_requests_canceled_when_queue_drops() { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let req = ExternalRequest::ReconfigureCrucibleVolume { + backend_id: SpecKey::Uuid(Uuid::new_v4()), + new_vcr_json: "".to_string(), + result_tx: tx, + }; + + assert!(queue.try_queue(req).is_ok()); + drop(queue); + let err = rx.await.unwrap().unwrap_err(); + assert_eq!(err.status_code, hyper::StatusCode::GONE); + } } From e66e5deefd4871a11436c90fa97de4859ea62cd5 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 4 Mar 2025 20:22:17 +0000 Subject: [PATCH 07/11] server: improve comments --- bin/propolis-server/src/lib/vm/request_queue.rs | 2 +- bin/propolis-server/src/lib/vm/state_driver.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 03c355691..66f9224c9 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -332,7 +332,7 @@ impl ExternalRequestQueue { // its state are ignored. // // Requests to change Crucible volume configuration can still be - // queued if they were previously alloewd. This allows the state + // queued if they were previously allowed. This allows the state // driver to accept VCR mutations that are needed to allow an // activation to complete even if the instance is slated to stop // immediately after starting. diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index 856f71739..a91484e21 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -644,14 +644,18 @@ impl StateDriver { biased; res = &mut block_backend_fut => { + // If the VM started up successfully, publish that it is + // running and queue up any external requests that were + // deferred while startup was ongoing. + // + // If startup failed, just return the error without changing + // any state or processing any additional requests. The + // caller will move the instance to the appropriate terminal + // state and clean up the VM as needed. if res.is_ok() { let objects = &self.objects; objects.lock_exclusive().await.resume_vcpus(); self.publish_steady_state(InstanceState::Running); - - // If a stop request was seen while in the startup - // sequence, self-request a stop for the main event - // loop to handle. if stopped_while_starting { self.input_queue.push_self_request( ExternalRequest::Stop From 2412bbdb6e284458a350cf4c92c82f9eb84b4e30 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 4 Mar 2025 20:29:21 +0000 Subject: [PATCH 08/11] server: extend API request queue's memory of prior requests Today the server's API request queue decides whether to accept or reject incoming requests by storing a fixed disposition for each kind of request and adjusting those dispositions as new requests are queued or as a VM's state changes. This is not always enough information to maintain the proper dispositions. Consider the following example: 1. A VM begins to start. Reboot requests are denied in this state (since a VM that hasn't booted yet can't be rebooted). 2. A caller queues a request to stop the VM after it starts. All subsequent reboot requests should now be denied because the VM will halt before they can be serviced. 3. The VM successfully starts. The correct reboot disposition is still "deny," because there's a pending stop request, but there's no way to discern this from the prior reboot disposition (it was also "deny" before the stop request was queued!). To address this sort of problem, and to pave the way for better handling of Crucible VCR replacements during instance start, refactor the queue as follows: - Remember what kinds of outstanding requests have not been processed and dispose of requests based on this state and the queue's notion of the instance's overall state. - Break external API requests into "state change" and "component change" requests. This will allow them to be dequeued independently when the state driver wants to handle component changes without changing the order of state change requests. - Tweak the language the state driver uses to communicate with the queue: instead of saying "the VM is now in state X," the driver says "I handled (or failed to handle) a request of type Y." Tests: cargo test, PHD. --- bin/propolis-server/src/lib/vm/active.rs | 15 +- .../src/lib/vm/request_queue.rs | 836 +++++++++++------- .../src/lib/vm/state_driver.rs | 147 +-- 3 files changed, 608 insertions(+), 390 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/active.rs b/bin/propolis-server/src/lib/vm/active.rs index 8a812bf27..1da8f7ea4 100644 --- a/bin/propolis-server/src/lib/vm/active.rs +++ b/bin/propolis-server/src/lib/vm/active.rs @@ -63,9 +63,9 @@ impl ActiveVm { self.state_driver_queue .queue_external_request(match requested { - InstanceStateRequested::Run => ExternalRequest::Start, - InstanceStateRequested::Stop => ExternalRequest::Stop, - InstanceStateRequested::Reboot => ExternalRequest::Reboot, + InstanceStateRequested::Run => ExternalRequest::start(), + InstanceStateRequested::Stop => ExternalRequest::stop(), + InstanceStateRequested::Reboot => ExternalRequest::reboot(), }) .map_err(Into::into) } @@ -79,10 +79,7 @@ impl ActiveVm { websock: dropshot::WebsocketConnection, ) -> Result<(), VmError> { Ok(self.state_driver_queue.queue_external_request( - ExternalRequest::MigrateAsSource { - migration_id, - websock: websock.into(), - }, + ExternalRequest::migrate_as_source(migration_id, websock), )?) } @@ -107,11 +104,11 @@ impl ActiveVm { ) -> Result<(), VmError> { self.state_driver_queue .queue_external_request( - ExternalRequest::ReconfigureCrucibleVolume { + ExternalRequest::reconfigure_crucible_volume( backend_id, new_vcr_json, result_tx, - }, + ), ) .map_err(Into::into) } diff --git a/bin/propolis-server/src/lib/vm/request_queue.rs b/bin/propolis-server/src/lib/vm/request_queue.rs index 30d26f59c..1d53e67d1 100644 --- a/bin/propolis-server/src/lib/vm/request_queue.rs +++ b/bin/propolis-server/src/lib/vm/request_queue.rs @@ -2,29 +2,22 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Handles requests to change a Propolis server's state via the external API. +//! Handles requests to change a Propolis server's state or component +//! configuration via the external API. //! -//! An instance accepts or rejects requests to change state based on a -//! combination of its current state and its knowledge of the requests it has -//! previously queued but not processed yet. The latter knowledge is used to -//! reject requests that will never be fulfilled (because they're preceded by an -//! action that will forbid them; consider rebooting after stopping) or that may -//! need be to redirected to a migration target. +//! The queue accepts or rejects requests based on a combination of its current +//! state and its knowledge of requests that it has previously queued but that +//! have not yet been processed. The latter knowledge is used to reject requests +//! that will never be fulfilled (because a prior request preempts them) or that +//! may need to be redirected to a migration target. //! -//! The queue maintains a disposition for each kind of request that can be sent -//! to it, which allows that request to be enqueued, denied, or silently ignored -//! (for idempotency purposes). These dispositions can change as new requests -//! are queued. The queue also provides callbacks to the VM state driver that -//! allow the driver to advise the queue of state changes that further affect -//! what requests should be accepted. -//! -//! Users who want to share a queue must wrap it in the synchronization objects -//! of their choice. +//! The queue contains no synchronization of its own. Users who want to share a +//! queue between multiple threads must wrap it in a synchronization object. use std::collections::VecDeque; use propolis_api_types::instance_spec::SpecKey; -use slog::{debug, info, Logger}; +use slog::{info, Logger}; use thiserror::Error; use uuid::Uuid; @@ -53,9 +46,8 @@ impl WebsocketConnection { } } -/// An external request made of a VM controller via the server API. Handled by -/// the controller's state driver thread. -pub enum ExternalRequest { +/// A request to change a VM's runtime state. +pub(crate) enum StateChangeRequest { /// Asks the state worker to start a brand-new VM (i.e. not one initialized /// by live migration, which implicitly starts the VM). Start, @@ -71,7 +63,29 @@ pub enum ExternalRequest { /// Halts the VM. Note that this is not a graceful shutdown and does not /// coordinate with guest software. Stop, +} + +impl std::fmt::Debug for StateChangeRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Start => write!(f, "Start"), + Self::MigrateAsSource { migration_id, .. } => f + .debug_struct("MigrateAsSource") + .field("migration_id", migration_id) + .finish(), + Self::Reboot => write!(f, "Reboot"), + Self::Stop => write!(f, "Stop"), + } + } +} +/// A request to reconfigure a VM's components. +/// +/// NOTE: Successfully queuing a component change request does not guarantee +/// that the request will be processed, because it may be preempted by a VM +/// state change. If this happens the request will fail and notify the +/// submitter using whatever channel is appropriate for the request's type. +pub enum ComponentChangeRequest { /// Attempts to update the volume construction request for the supplied /// Crucible volume. /// @@ -91,16 +105,9 @@ pub enum ExternalRequest { }, } -impl std::fmt::Debug for ExternalRequest { +impl std::fmt::Debug for ComponentChangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Start => write!(f, "Start"), - Self::MigrateAsSource { migration_id, .. } => f - .debug_struct("MigrateAsSource") - .field("migration_id", migration_id) - .finish(), - Self::Reboot => write!(f, "Reboot"), - Self::Stop => write!(f, "Stop"), Self::ReconfigureCrucibleVolume { backend_id, .. } => f .debug_struct("ReconfigureCrucibleVolume") .field("backend_id", backend_id) @@ -109,10 +116,64 @@ impl std::fmt::Debug for ExternalRequest { } } +/// An external request made of a VM controller via the server API. Handled by +/// the controller's state driver thread. +#[derive(Debug)] +pub(crate) enum ExternalRequest { + /// A request to change the VM's runtime state. + State(StateChangeRequest), + + /// A request to reconfigure one of the VM's components. + Component(ComponentChangeRequest), +} + +impl ExternalRequest { + /// Constructs a VM start request. + pub fn start() -> Self { + Self::State(StateChangeRequest::Start) + } + + /// Constructs a VM stop request. + pub fn stop() -> Self { + Self::State(StateChangeRequest::Stop) + } + + /// Constructs a VM reboot request. + pub fn reboot() -> Self { + Self::State(StateChangeRequest::Reboot) + } + + /// Constructs a request to migrate a VM to another Propolis instance, using + /// `ws_conn` as the websocket connection to the migration target. + pub fn migrate_as_source( + migration_id: Uuid, + ws_conn: dropshot::WebsocketConnection, + ) -> Self { + Self::State(StateChangeRequest::MigrateAsSource { + migration_id, + websock: WebsocketConnection(Some(ws_conn)), + }) + } + + /// Constructs a request to update a Crucible volume's construction request. + /// The result of this request will be sent to the supplied `result_tx`. + pub fn reconfigure_crucible_volume( + backend_id: SpecKey, + new_vcr_json: String, + result_tx: super::CrucibleReplaceResultTx, + ) -> Self { + Self::Component(ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id, + new_vcr_json, + result_tx, + }) + } +} + /// A set of reasons why a request to queue an external state transition can /// fail. #[derive(Copy, Clone, Debug, Error)] -pub enum RequestDeniedReason { +pub(crate) enum RequestDeniedReason { #[error("Operation requires an active instance")] InstanceNotActive, @@ -123,35 +184,24 @@ pub enum RequestDeniedReason { AlreadyMigrationSource, #[error("Operation cannot be performed on a migration source")] - InvalidRequestForMigrationSource, + InvalidForMigrationSource, #[error("Instance is preparing to stop")] HaltPending, - #[error("Instance failed to start or halted due to a failure")] - InstanceFailed, -} + #[error("Instance has migrated out and is being torn down")] + MigratedOut, -/// The set of instance state changes that should change the dispositions of -/// future requests to the queue. -#[derive(Copy, Clone, Debug)] -pub enum InstanceStateChange { - StartedRunning, - Rebooted, - Stopped, - Failed, -} + #[error("Instance has already halted")] + Halted, -/// A reason for a change in the queue's request dispositions. -#[derive(Debug)] -enum DispositionChangeReason<'a> { - ApiRequest(&'a ExternalRequest), - StateChange(InstanceStateChange), + #[error("Instance failed to start or halted due to a failure")] + InstanceFailed, } /// The possible methods of handling a request to queue a state change. #[derive(Copy, Clone, Debug)] -enum RequestDisposition { +enum Disposition { /// Put the state change on the queue. Enqueue, @@ -164,27 +214,83 @@ enum RequestDisposition { Deny(RequestDeniedReason), } -/// The current disposition for each kind of incoming request. +/// A kind of request that can be popped from the queue and then completed. #[derive(Copy, Clone, Debug)] -struct AllowedRequests { - start: RequestDisposition, - migrate_as_source: RequestDisposition, - reboot: RequestDisposition, - mutate: RequestDisposition, - stop: RequestDisposition, +pub(super) enum CompletedRequest { + Start { succeeded: bool }, + Reboot, + MigrationOut { succeeded: bool }, + Stop, +} + +/// The queue's internal notion of the VM's runtime state. +#[derive(Copy, Clone, Debug)] +enum QueueState { + /// The instance has not started yet and no one has asked it to start. + NotStarted, + + /// The instance is not running yet, but the state driver will eventually + /// try to start it. + StartPending, + + /// The instance has successfully started. + Running, + + /// The instance has stopped due to a migration out. + MigratedOut, + + /// The instance has shut down. + Stopped, + + /// The instance failed to start. + Failed, +} + +impl QueueState { + /// If `self` is a state in which new change requests should be denied + /// unconditionally, returns a `Some` containing an appropriate + /// [`RequestDeniedReason`]; returns `None` otherwise. + fn deny_reason(&self) -> Option { + match self { + Self::MigratedOut => Some(RequestDeniedReason::MigratedOut), + Self::Stopped => Some(RequestDeniedReason::Halted), + Self::Failed => Some(RequestDeniedReason::InstanceFailed), + _ => None, + } + } } -/// A queue for external requests to change an instance's state. #[derive(Debug)] -pub struct ExternalRequestQueue { - queue: VecDeque, - allowed: AllowedRequests, +pub(super) struct ExternalRequestQueue { + /// The queue of unprocessed state change requests. + state_queue: VecDeque, + + /// The queue of unprocessed component change requests. + component_queue: VecDeque, + + /// The "effective" (for purposes of deciding how to dispose of requests) + /// state of the instance associated with this queue. + state: QueueState, + + /// True if this queue has enqueued a reboot request that has not been + /// completed by the state driver. + awaiting_reboot: bool, + + /// True if this queue has enqueued a stop request that has not been + /// completed by the state driver. + awaiting_migration_out: bool, + + /// True if this queue has enqueued a request to migrate out that has not + /// been completed by the state driver. + awaiting_stop: bool, + + /// The queue's logger. log: Logger, } /// Indicates whether this queue's creator will start the relevant instance /// without waiting for a Start request from the queue. -pub enum InstanceAutoStart { +pub(super) enum InstanceAutoStart { Yes, No, } @@ -192,64 +298,155 @@ pub enum InstanceAutoStart { impl ExternalRequestQueue { /// Creates a new queue that logs to the supplied logger. pub fn new(log: Logger, auto_start: InstanceAutoStart) -> Self { - // If the queue is being created for an instance that will start - // automatically (e.g. due to a migration in), set the request - // disposition for future start requests to Ignore for idempotency. - let start = match auto_start { - InstanceAutoStart::Yes => RequestDisposition::Ignore, - InstanceAutoStart::No => RequestDisposition::Enqueue, + let instance_state = match auto_start { + InstanceAutoStart::Yes => QueueState::StartPending, + InstanceAutoStart::No => QueueState::NotStarted, }; Self { - queue: VecDeque::new(), - allowed: AllowedRequests { - start, - migrate_as_source: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - reboot: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - mutate: RequestDisposition::Deny( - RequestDeniedReason::InstanceNotActive, - ), - stop: RequestDisposition::Enqueue, - }, + state_queue: Default::default(), + component_queue: Default::default(), + + state: instance_state, + awaiting_reboot: false, + awaiting_migration_out: false, + awaiting_stop: false, log, } } - /// Pops the request at the front of the queue. + /// Pops the next request off of the queue. If the queue contains both state + /// change and component change requests, the next state change request is + /// popped first (even if it arrived later in time than the next component + /// change request). pub fn pop_front(&mut self) -> Option { - self.queue.pop_front() + if let Some(state_change) = self.state_queue.pop_front() { + Some(ExternalRequest::State(state_change)) + } else { + self.component_queue.pop_front().map(ExternalRequest::Component) + } } /// Indicates whether the queue is empty. #[cfg(test)] pub fn is_empty(&self) -> bool { - self.queue.is_empty() + self.state_queue.is_empty() && self.component_queue.is_empty() } - /// Asks to place the supplied request on the queue. If the request is - /// enqueued, updates the dispositions to use for future requests. + /// Attempts to replace the supplied `request` on the queue, returning `Ok` + /// if the request was accepted and an `Err` otherwise. In the latter case, + /// the error contains a [`RequestDeniedReason`] that describes why the + /// request was rejected. pub fn try_queue( &mut self, request: ExternalRequest, ) -> Result<(), RequestDeniedReason> { - let disposition = match request { - ExternalRequest::Start => self.allowed.start, - ExternalRequest::MigrateAsSource { .. } => { - self.allowed.migrate_as_source - } - ExternalRequest::Reboot => self.allowed.reboot, - ExternalRequest::ReconfigureCrucibleVolume { .. } => { - self.allowed.mutate + // If the queue is in a terminal state, deny the request straightaway + // (unless it's a stop request, which can be ignored for idempotency). + let disposition = if let Some(reason) = self.state.deny_reason() { + if matches!( + request, + ExternalRequest::State(StateChangeRequest::Stop) + ) { + Disposition::Ignore + } else { + Disposition::Deny(reason) } + } else { + // The instance hasn't stopped yet, so consider this request in + // light of its current state and the other as-yet unprocessed + // requests from the queue. + // + // In general, try to make state change requests idempotent by + // ignoring new requests when a request of the appropriate kind is + // already on the queue, and deny requests to reach a state that is + // precluded by an earlier state change request. + match request { + // Interpret start requests as requests to reach the Running + // state. + ExternalRequest::State(StateChangeRequest::Start) => { + if self.awaiting_stop { + Disposition::Deny(RequestDeniedReason::HaltPending) + } else if let QueueState::NotStarted = self.state { + Disposition::Enqueue + } else { + Disposition::Ignore + } + } + + // Only allow one attempt to migrate out at a time (if it works + // the VM can't migrate out again), and only allow migration out + // after an instance begins to run. + ExternalRequest::State( + StateChangeRequest::MigrateAsSource { .. }, + ) => { + if self.awaiting_migration_out { + Disposition::Deny( + RequestDeniedReason::AlreadyMigrationSource, + ) + } else if self.awaiting_stop { + Disposition::Deny(RequestDeniedReason::HaltPending) + } else if matches!(self.state, QueueState::NotStarted) { + Disposition::Deny( + RequestDeniedReason::InstanceNotActive, + ) + } else { + Disposition::Enqueue + } + } + + // Treat reboot requests as a request to take a VM that has + // already started, reset its state, and resume the VM. If the + // VM migrates out first, this request needs to be directed to + // the target, so reject it here to allow the caller to wait for + // the migration to resolve. + ExternalRequest::State(StateChangeRequest::Reboot) => { + if self.awaiting_migration_out { + Disposition::Deny( + RequestDeniedReason::InvalidForMigrationSource, + ) + } else if self.awaiting_stop { + Disposition::Deny(RequestDeniedReason::HaltPending) + } else if matches!(self.state, QueueState::NotStarted) { + Disposition::Deny( + RequestDeniedReason::InstanceNotActive, + ) + } else if matches!(self.state, QueueState::StartPending) { + Disposition::Deny(RequestDeniedReason::StartInProgress) + } else if self.awaiting_reboot { + Disposition::Ignore + } else { + Disposition::Enqueue + } + } + + // As with reboots, deny requests to stop a VM that might + // migrate out first, since the request may need to be directed + // to the migration target. + ExternalRequest::State(StateChangeRequest::Stop) => { + if self.awaiting_migration_out { + Disposition::Deny( + RequestDeniedReason::InvalidForMigrationSource, + ) + } else if self.awaiting_stop { + Disposition::Ignore + } else { + Disposition::Enqueue + } + } - // Requests to stop always succeed. Note that a request to stop a VM - // that hasn't started should still be queued to the state worker so - // that the worker can exit and drop its references to the instance. - ExternalRequest::Stop => self.allowed.stop, + // Always enqueue component change requests, even if the VM has + // a pending request to stop or migrate out. This allows the + // state driver to process these requests during a state change, + // which may be necessary to complete that state change. If the + // change request is canceled by a later state transition, the + // queue can use the request data to notify the requestor. + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + .. + }, + ) => Disposition::Enqueue, + } }; info!(&self.log, "Queuing external request"; @@ -257,155 +454,90 @@ impl ExternalRequestQueue { "disposition" => ?disposition); match disposition { - RequestDisposition::Enqueue => {} - RequestDisposition::Ignore => return Ok(()), - RequestDisposition::Deny(reason) => return Err(reason), - }; - - self.allowed = self.get_new_dispositions( - DispositionChangeReason::ApiRequest(&request), - ); - self.queue.push_back(request); - Ok(()) - } - - /// Notifies the queue that the instance's state has changed and that its - /// disposition should be updated accordingly. - pub fn notify_instance_state_change(&mut self, state: InstanceStateChange) { - self.allowed = self - .get_new_dispositions(DispositionChangeReason::StateChange(state)); - } - - /// Computes a new set of queue dispositions given the current state of the - /// queue and the event that is changing those dispositions. - fn get_new_dispositions( - &self, - reason: DispositionChangeReason, - ) -> AllowedRequests { - debug!(self.log, "Computing new queue dispositions"; - "reason" => ?reason); - - use DispositionChangeReason as ChangeReason; - use RequestDeniedReason as DenyReason; - use RequestDisposition as Disposition; - match reason { - ChangeReason::ApiRequest(ExternalRequest::Start) => { - let reason = DenyReason::StartInProgress; - AllowedRequests { - start: Disposition::Ignore, - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: self.allowed.stop, + Disposition::Ignore => Ok(()), + Disposition::Deny(reason) => Err(reason), + Disposition::Enqueue => { + match request { + ExternalRequest::State(StateChangeRequest::Start) => { + assert!(matches!(self.state, QueueState::NotStarted)); + self.state = QueueState::StartPending; + } + ExternalRequest::State( + StateChangeRequest::MigrateAsSource { .. }, + ) => { + assert!(!self.awaiting_migration_out); + self.awaiting_migration_out = true; + } + ExternalRequest::State(StateChangeRequest::Reboot) => { + assert!(!self.awaiting_reboot); + self.awaiting_reboot = true; + } + ExternalRequest::State(StateChangeRequest::Stop) => { + assert!(!self.awaiting_stop); + self.awaiting_stop = true; + } + ExternalRequest::Component(_) => {} } - } - ChangeReason::ApiRequest(ExternalRequest::MigrateAsSource { - .. - }) => { - assert!( - matches!(self.allowed.start, Disposition::Ignore), - "{:?}", - self.allowed - ); - - AllowedRequests { - start: self.allowed.start, - migrate_as_source: Disposition::Deny( - DenyReason::AlreadyMigrationSource, - ), - reboot: Disposition::Deny( - DenyReason::InvalidRequestForMigrationSource, - ), - mutate: Disposition::Deny( - DenyReason::InvalidRequestForMigrationSource, - ), - stop: self.allowed.stop, + + match request { + ExternalRequest::State(s) => self.state_queue.push_back(s), + ExternalRequest::Component(c) => { + self.component_queue.push_back(c) + } } - } - // Requests to reboot prevent additional reboot requests from being - // queued, but do not affect other operations. - ChangeReason::ApiRequest(ExternalRequest::Reboot) => { - assert!( - matches!(self.allowed.start, Disposition::Ignore), - "{:?}", - self.allowed - ); - AllowedRequests { reboot: Disposition::Ignore, ..self.allowed } + Ok(()) } + } + } - // Requests to stop the instance block other requests from being - // queued. Additional requests to stop are ignored for idempotency. - ChangeReason::ApiRequest(ExternalRequest::Stop) => { - let reason = DenyReason::HaltPending; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: Disposition::Ignore, - } - } + /// Notifies this queue that the caller has finished processing a + /// previously-dequeued request, allowing the queue to adjust its + /// dispositions in response. + pub(super) fn notify_request_completed(&mut self, req: CompletedRequest) { + info!( + &self.log, + "queue notified of request completion"; + "request" => ?req + ); - // Requests to mutate VM configuration don't move the VM state - // machine and don't change any request dispositions. - ChangeReason::ApiRequest( - ExternalRequest::ReconfigureCrucibleVolume { .. }, - ) => self.allowed, - - // When an instance begins running, requests to migrate out of it or - // to reboot it become valid. - ChangeReason::StateChange(InstanceStateChange::StartedRunning) => { - AllowedRequests { - start: self.allowed.start, - migrate_as_source: Disposition::Enqueue, - reboot: Disposition::Enqueue, - mutate: Disposition::Enqueue, - stop: self.allowed.stop, + match req { + CompletedRequest::Start { succeeded } => { + assert!(matches!(self.state, QueueState::StartPending)); + if succeeded { + self.state = QueueState::Running; + } else { + self.state = QueueState::Failed; } } - - // When an instance finishes rebooting, allow new reboot requests to - // be queued again, unless reboot requests began to be denied in the - // meantime. - ChangeReason::StateChange(InstanceStateChange::Rebooted) => { - let new_reboot = - if let Disposition::Ignore = self.allowed.reboot { - Disposition::Enqueue - } else { - self.allowed.reboot - }; - - AllowedRequests { reboot: new_reboot, ..self.allowed } + CompletedRequest::Reboot => { + assert!(matches!(self.state, QueueState::Running)); + assert!(self.awaiting_reboot); + self.awaiting_reboot = false; } - - // When an instance stops or fails, requests to do anything other - // than stop it are denied with an appropriate deny reason. Note - // that an instance may stop or fail due to guest activity, so the - // previous dispositions for migrate and reboot requests may not be - // "deny". - ChangeReason::StateChange(InstanceStateChange::Stopped) => { - let reason = DenyReason::InstanceNotActive; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: Disposition::Ignore, + CompletedRequest::MigrationOut { succeeded } => { + assert!(matches!(self.state, QueueState::Running)); + assert!(self.awaiting_migration_out); + self.awaiting_migration_out = false; + if succeeded { + self.state = QueueState::MigratedOut; } } - ChangeReason::StateChange(InstanceStateChange::Failed) => { - let reason = DenyReason::InstanceFailed; - AllowedRequests { - start: Disposition::Deny(reason), - migrate_as_source: Disposition::Deny(reason), - reboot: Disposition::Deny(reason), - mutate: Disposition::Deny(reason), - stop: self.allowed.stop, - } + CompletedRequest::Stop => { + assert!(self.awaiting_stop); + self.awaiting_stop = false; + self.state = QueueState::Stopped; } } } + + /// Notifies this queue that the instance has stopped. This routine is meant + /// to be used in cases where an instance stops for reasons other than an + /// external request (e.g., a guest-requested chipset-driven shutdown). + pub(super) fn notify_stopped(&mut self) { + info!(&self.log, "queue notified that VM has stopped"); + self.state = QueueState::Stopped; + } } // It's possible for an external request queue to be dropped with outstanding @@ -414,11 +546,22 @@ impl ExternalRequestQueue { // queue that the VM is gone. impl Drop for ExternalRequestQueue { fn drop(&mut self) { - for req in self.queue.drain(..) { + // No special handling is needed for the state change queue: + // + // - Requests to start, reboot, and stop are handled asynchronously + // (calls to change the instance's state return as soon as they're + // queued). + // - Requests to migrate out contain a connection to the migration + // target; dropping this connection tells the target the source is + // gone. + // + // Drain the component change request queue and send messages to + // requestors telling them that their requests have been canceled. + for req in self.component_queue.drain(..) { match req { // Crucible VCR change requestors wait for their requests to be // retired. - ExternalRequest::ReconfigureCrucibleVolume { + ComponentChangeRequest::ReconfigureCrucibleVolume { result_tx, .. } => { @@ -431,18 +574,6 @@ impl Drop for ExternalRequestQueue { hyper::StatusCode::GONE, ))); } - - // Requests to start, reboot, and stop are handled - // asynchronously (calls to change the instance's state return - // as soon as they're queued). - ExternalRequest::Start - | ExternalRequest::Reboot - | ExternalRequest::Stop => {} - - // Dropping a request to migrate out drops the embedded - // connection to the migration target, thus notifying it that - // the source is gone. - ExternalRequest::MigrateAsSource { .. } => {} } } } @@ -459,48 +590,89 @@ mod test { } fn make_migrate_as_source_request() -> ExternalRequest { - ExternalRequest::MigrateAsSource { + ExternalRequest::State(StateChangeRequest::MigrateAsSource { migration_id: Uuid::new_v4(), websock: WebsocketConnection(None), - } + }) } fn make_reconfigure_crucible_request() -> ExternalRequest { let (tx, _rx) = tokio::sync::oneshot::channel(); - ExternalRequest::ReconfigureCrucibleVolume { - backend_id: SpecKey::Uuid(Uuid::new_v4()), - new_vcr_json: "".to_string(), - result_tx: tx, + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id: SpecKey::Uuid(Uuid::new_v4()), + new_vcr_json: "".to_string(), + result_tx: tx, + }, + ) + } + + impl ExternalRequest { + fn assert_start(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Start)), + "expected start request, got {self:?}" + ); + } + + fn assert_stop(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Stop)), + "expected stop request, got {self:?}" + ); + } + + fn assert_reboot(&self) { + assert!( + matches!(self, Self::State(StateChangeRequest::Reboot)), + "expected reboot request, got {self:?}" + ); + } + + fn assert_migrate_as_source(&self) { + assert!( + matches!( + self, + Self::State(StateChangeRequest::MigrateAsSource { .. }) + ), + "expected migrate as source request, got {self:?}" + ); } } - #[tokio::test] - async fn start_requests_become_idempotent_after_first_request() { + #[test] + fn start_requests_become_idempotent_after_first_request() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); // The first request to start should succeed. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); // The second one should too, but only for idempotency: the queue should // then have only one start request on it. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Start))); - assert!(queue.pop_front().is_none()); + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); + queue.pop_front().unwrap().assert_start(); + assert!(queue.is_empty()); // Start requests continue to be ignored even after the instance starts // to run. - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(queue.pop_front().is_none()); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); + + assert!(queue.try_queue(ExternalRequest::start()).is_ok()); + assert!(queue.is_empty()); } - #[tokio::test] - async fn migrate_as_source_is_not_idempotent() { + #[test] + fn migrate_as_source_is_not_idempotent() { // Simulate a running instance. let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); // Requests to migrate out should be allowed. assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); @@ -517,104 +689,122 @@ mod test { // If migration fails, the instance resumes running, and then another // request to migrate out should be allowed. - assert!(matches!( - queue.pop_front(), - Some(ExternalRequest::MigrateAsSource { .. }) - )); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + queue.pop_front().unwrap().assert_migrate_as_source(); + queue.notify_request_completed(CompletedRequest::MigrationOut { + succeeded: false, + }); + assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); // A successful migration stops the instance, which forecloses on future // requests to migrate out. queue.pop_front(); - queue.notify_instance_state_change(InstanceStateChange::Stopped); + queue.notify_request_completed(CompletedRequest::MigrationOut { + succeeded: true, + }); + assert!(queue.try_queue(make_migrate_as_source_request()).is_err()); } - #[tokio::test] - async fn stop_requests_enqueue_after_vm_failure() { + #[test] + fn stop_requests_are_idempotent() { let mut queue = - ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); - queue.notify_instance_state_change(InstanceStateChange::Failed); + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); - assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Stop))); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + queue.pop_front().unwrap().assert_stop(); + assert!(queue.is_empty()); } - #[tokio::test] - async fn reboot_requests_are_idempotent_except_when_stopping() { + #[test] + fn stop_requests_ignored_after_vm_failure() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: false, + }); + + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.is_empty()); + } + + #[test] + fn reboot_requests_are_idempotent_except_when_stopping() { + let mut queue = + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); // Once the instance is started, reboot requests should be allowed, but // after the first, subsequent requests should be dropped for // idempotency. assert!(queue.is_empty()); for _ in 0..5 { - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); } - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); + queue.pop_front().unwrap().assert_reboot(); assert!(queue.is_empty()); // Once the instance has rebooted, new requests can be queued. - queue.notify_instance_state_change(InstanceStateChange::Rebooted); - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); - assert!(!queue.is_empty()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); - queue.notify_instance_state_change(InstanceStateChange::Rebooted); + queue.notify_request_completed(CompletedRequest::Reboot); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed(CompletedRequest::Reboot); // If a request to reboot is queued, and then a request to stop is // queued, new requests to reboot should always fail, even after the // instance finishes rebooting. - assert!(queue.try_queue(ExternalRequest::Reboot).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_ok()); assert!(!queue.is_empty()); - assert!(queue.try_queue(ExternalRequest::Stop).is_ok()); - assert!(queue.try_queue(ExternalRequest::Reboot).is_err()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Reboot))); - queue.notify_instance_state_change(InstanceStateChange::Rebooted); - assert!(queue.try_queue(ExternalRequest::Reboot).is_err()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + assert!(queue.try_queue(ExternalRequest::reboot()).is_err()); + queue.pop_front().unwrap().assert_reboot(); + queue.notify_request_completed(CompletedRequest::Reboot); + assert!(queue.try_queue(ExternalRequest::reboot()).is_err()); } - #[tokio::test] - async fn mutation_requires_running_and_not_migrating_out() { + #[test] + fn mutation_disallowed_after_stopped() { let mut queue = - ExternalRequestQueue::new(test_logger(), InstanceAutoStart::No); + ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - // Mutating a VM before it has started is not allowed. - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); - // Merely dequeuing the start request doesn't allow mutation; the VM - // actually has to be running. - assert!(queue.try_queue(ExternalRequest::Start).is_ok()); - assert!(matches!(queue.pop_front(), Some(ExternalRequest::Start))); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); - assert!(matches!( - queue.pop_front(), - Some(ExternalRequest::ReconfigureCrucibleVolume { .. }) - )); - - // Successfully requesting migration out should block new mutation - // requests (they should wait for the migration to resolve and then go - // to the target). - assert!(queue.try_queue(make_migrate_as_source_request()).is_ok()); + assert!(queue.try_queue(ExternalRequest::stop()).is_ok()); + queue.notify_request_completed(CompletedRequest::Stop); assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); - - // But if the VM resumes (due to a failed migration out) these requests - // should succeed again. - assert!(queue.pop_front().is_some()); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_ok()); } #[tokio::test] - async fn mutation_disallowed_after_stop() { + async fn vcr_requests_canceled_when_queue_drops() { let mut queue = ExternalRequestQueue::new(test_logger(), InstanceAutoStart::Yes); - queue.notify_instance_state_change(InstanceStateChange::StartedRunning); - queue.notify_instance_state_change(InstanceStateChange::Stopped); - assert!(queue.try_queue(make_reconfigure_crucible_request()).is_err()); + + queue.notify_request_completed(CompletedRequest::Start { + succeeded: true, + }); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let req = ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id: SpecKey::Uuid(Uuid::new_v4()), + new_vcr_json: "".to_string(), + result_tx: tx, + }, + ); + + assert!(queue.try_queue(req).is_ok()); + drop(queue); + let err = rx.await.unwrap().unwrap_err(); + assert_eq!(err.status_code, hyper::StatusCode::GONE); } } diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index e3ba3a4c8..a37967668 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -119,7 +119,10 @@ use super::{ }, guest_event::{self, GuestEvent}, objects::VmObjects, - request_queue::{self, ExternalRequest, InstanceAutoStart}, + request_queue::{ + self, CompletedRequest, ComponentChangeRequest, ExternalRequest, + InstanceAutoStart, StateChangeRequest, + }, state_publisher::{MigrationStateUpdate, StatePublisher}, InstanceEnsureResponseTx, }; @@ -227,15 +230,19 @@ impl InputQueue { } } - /// Notifies the external request queue that the instance's state has - /// changed so that it can change the dispositions for new state change - /// requests. - fn notify_instance_state_change( - &self, - state: request_queue::InstanceStateChange, - ) { + /// Notifies the external request queue that the state driver has completed + /// a request from that queue. + fn notify_request_completed(&self, state: CompletedRequest) { + let mut guard = self.inner.lock().unwrap(); + guard.external_requests.notify_request_completed(state); + } + + /// Notifies the external request queue that the instance has stopped. This + /// is used to stop the queue when the instance stops without a request from + /// the API (e.g. because the guest requested a chipset-driven shutdown). + fn notify_stopped(&self) { let mut guard = self.inner.lock().unwrap(); - guard.external_requests.notify_instance_state_change(state); + guard.external_requests.notify_stopped(); } /// Submits an external state change request to the queue. @@ -522,14 +529,23 @@ impl StateDriver { let start_result = self.objects.lock_exclusive().await.start(start_reason).await; + + self.input_queue.notify_request_completed(CompletedRequest::Start { + succeeded: start_result.is_ok(), + }); + match &start_result { Ok(()) => { - self.publish_steady_state(InstanceState::Running); + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Running, + )); } Err(e) => { error!(&self.log, "failed to start devices"; "error" => ?e); - self.publish_steady_state(InstanceState::Failed); + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Failed, + )); } } @@ -544,6 +560,11 @@ impl StateDriver { GuestEvent::VcpuSuspendHalt(_when) => { info!(self.log, "Halting due to VM suspend event",); self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue.notify_stopped(); HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } @@ -564,6 +585,11 @@ impl StateDriver { GuestEvent::ChipsetHalt => { info!(self.log, "Halting due to chipset-driven halt"); self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue.notify_stopped(); HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } @@ -581,7 +607,7 @@ impl StateDriver { request: ExternalRequest, ) -> HandleEventOutcome { match request { - ExternalRequest::Start => { + ExternalRequest::State(StateChangeRequest::Start) => { match self.start_vm(VmStartReason::ExplicitRequest).await { Ok(_) => HandleEventOutcome::Continue, Err(_) => HandleEventOutcome::Exit { @@ -589,31 +615,50 @@ impl StateDriver { }, } } - ExternalRequest::MigrateAsSource { migration_id, websock } => { - self.migrate_as_source(migration_id, websock.into_inner()) - .await; - - // The callee either queues its own stop request (on a - // successful migration out) or resumes the VM (on a failed - // migration out). Either way, the main loop can just proceed to - // process the queue as normal. - HandleEventOutcome::Continue + ExternalRequest::State(StateChangeRequest::MigrateAsSource { + migration_id, + websock, + }) => { + if self + .migrate_as_source(migration_id, websock.into_inner()) + .await + .is_ok() + { + self.do_halt().await; + HandleEventOutcome::Exit { + final_state: InstanceState::Destroyed, + } + } else { + HandleEventOutcome::Continue + } } - ExternalRequest::Reboot => { + ExternalRequest::State(StateChangeRequest::Reboot) => { self.do_reboot().await; + self.input_queue + .notify_request_completed(CompletedRequest::Reboot); + HandleEventOutcome::Continue } - ExternalRequest::Stop => { + ExternalRequest::State(StateChangeRequest::Stop) => { self.do_halt().await; + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue + .notify_request_completed(CompletedRequest::Stop); + HandleEventOutcome::Exit { final_state: InstanceState::Destroyed, } } - ExternalRequest::ReconfigureCrucibleVolume { - backend_id, - new_vcr_json, - result_tx, - } => { + ExternalRequest::Component( + ComponentChangeRequest::ReconfigureCrucibleVolume { + backend_id, + new_vcr_json, + result_tx, + }, + ) => { let _ = result_tx.send( self.reconfigure_crucible_volume(&backend_id, new_vcr_json) .await, @@ -633,9 +678,6 @@ impl StateDriver { // Notify other consumers that the instance successfully rebooted and is // now back to Running. - self.input_queue.notify_instance_state_change( - request_queue::InstanceStateChange::Rebooted, - ); self.external_state .update(ExternalStateUpdate::Instance(InstanceState::Running)); } @@ -658,34 +700,13 @@ impl StateDriver { guard.halt().await; } - - self.publish_steady_state(InstanceState::Stopped); - } - - fn publish_steady_state(&mut self, state: InstanceState) { - let change = match state { - InstanceState::Running => { - request_queue::InstanceStateChange::StartedRunning - } - InstanceState::Stopped => { - request_queue::InstanceStateChange::Stopped - } - InstanceState::Failed => request_queue::InstanceStateChange::Failed, - _ => panic!( - "Called publish_steady_state on non-terminal state {:?}", - state - ), - }; - - self.input_queue.notify_instance_state_change(change); - self.external_state.update(ExternalStateUpdate::Instance(state)); } async fn migrate_as_source( &mut self, migration_id: Uuid, websock: dropshot::WebsocketConnection, - ) { + ) -> Result<(), ()> { let conn = tokio_tungstenite::WebSocketStream::from_raw_socket( websock.into_inner(), tokio_tungstenite::tungstenite::protocol::Role::Server, @@ -712,7 +733,7 @@ impl StateDriver { }, )); - return; + return Err(()); } }; @@ -740,15 +761,25 @@ impl StateDriver { // On a successful migration out, the protocol promises to leave // the VM objects in a paused state, so don't pause them again. self.paused = true; - self.input_queue - .queue_external_request(ExternalRequest::Stop) - .expect("can always queue a request to stop"); + self.input_queue.notify_request_completed( + CompletedRequest::MigrationOut { succeeded: true }, + ); + + Ok(()) } Err(e) => { info!(self.log, "migration out failed, resuming"; "error" => ?e); - self.publish_steady_state(InstanceState::Running); + self.input_queue.notify_request_completed( + CompletedRequest::MigrationOut { succeeded: false }, + ); + + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Running, + )); + + Err(()) } } } From 0fa8e37a1ee27650bfdcdf5be27208ea67fab941 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Fri, 7 Mar 2025 00:03:20 +0000 Subject: [PATCH 09/11] server: abort startup if a stop request is received while awaiting --- .../src/lib/vm/state_driver.rs | 161 ++++++++---------- lib/propolis/src/block/mod.rs | 16 ++ phd-tests/tests/src/server_state_machine.rs | 11 +- 3 files changed, 93 insertions(+), 95 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index 43a98f4f3..e0d4533c8 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -90,7 +90,6 @@ //! [`ensure`]: crate::vm::ensure use std::{ - collections::VecDeque, sync::{Arc, Mutex}, time::Duration, }; @@ -142,6 +141,12 @@ pub(super) enum VmStartReason { ExplicitRequest, } +enum VmStartOutcome { + Succeeded, + Failed, + Aborted, +} + /// A kind of event the state driver can handle. #[derive(Debug)] enum InputQueueEvent { @@ -154,10 +159,6 @@ struct InputQueueInner { /// State change requests from the external API. external_requests: request_queue::ExternalRequestQueue, - /// State change requests from the external API that were previously read - /// but not handled immediately. - self_request: VecDeque, - /// State change requests from the VM's components. These take precedence /// over external state change requests. guest_events: super::guest_event::GuestEventQueue, @@ -170,7 +171,6 @@ impl InputQueueInner { log, auto_start, ), guest_events: super::guest_event::GuestEventQueue::default(), - self_request: Default::default(), } } } @@ -209,9 +209,6 @@ impl InputQueue { /// - Guest events: These are signals raised from the VM's vCPUs and /// devices (e.g. a request to reboot or halt the VM arising from a vCPU /// asserting a virtual chipset signal). - /// - Self-requests: The state driver may buffer external requests for - /// later processing by pushing them to the self-request queue. See - /// [`Self::push_self_request`]. /// - External requests: These are state change requests received via the /// server API. See [`super::request_queue`] for more details about how /// these requests are queued. @@ -228,8 +225,6 @@ impl InputQueue { let mut guard = self.inner.lock().unwrap(); if let Some(guest_event) = guard.guest_events.pop_front() { return InputQueueEvent::GuestEvent(guest_event); - } else if let Some(req) = guard.self_request.pop_front() { - return InputQueueEvent::ExternalRequest(req); } else if let Some(req) = guard.external_requests.pop_front() { return InputQueueEvent::ExternalRequest(req); } @@ -246,19 +241,6 @@ impl InputQueue { } } - /// Pushes a self-requested state change request to this queue. - /// - /// This routine may only be called from the state driver task. - fn push_self_request(&self, req: ExternalRequest) { - let mut guard = self.inner.lock().unwrap(); - guard.self_request.push_back(req); - - // Since this routine is only called from the state driver task, the - // driver is by definition not waiting for a new event at this point, so - // it's not necessary to signal the notify here (the next call to - // dequeue an event will always pick this event up). - } - /// Notifies the external request queue that the state driver has completed /// a request from that queue. fn notify_request_completed(&self, state: CompletedRequest) { @@ -508,8 +490,9 @@ impl StateDriver { let final_state = if migrated_in { match self.start_vm(VmStartReason::MigratedIn).await { - Ok(()) => self.event_loop().await, - Err(_) => InstanceState::Failed, + VmStartOutcome::Succeeded => self.event_loop().await, + VmStartOutcome::Failed => InstanceState::Failed, + VmStartOutcome::Aborted => InstanceState::Destroyed, } } else { self.event_loop().await @@ -552,7 +535,7 @@ impl StateDriver { async fn start_vm( &mut self, start_reason: VmStartReason, - ) -> anyhow::Result<()> { + ) -> VmStartOutcome { info!(self.log, "starting instance"; "reason" => ?start_reason); // Tell listeners that the VM's components are now starting up and not @@ -591,9 +574,9 @@ impl StateDriver { for (name, dev) in objects.device_map() { info!(self.log, "sending start request to {}", name); let res = dev.start(); - if let Err(e) = &res { + if let Err(e) = res { error!(self.log, "startup failed for {}: {:?}", name, e); - return res; + return VmStartOutcome::Failed; } } @@ -642,70 +625,63 @@ impl StateDriver { // the VM while it was being started. If such a request is seen, send a // self-request to stop just before returning so that the VM will stop // immediately. - let mut stopped_while_starting = false; + enum Selection { + BackendFuture(anyhow::Result<()>), + Event(InputQueueEvent), + } loop { - let event = tokio::select! { + let selection = tokio::select! { // If the VM successfully starts, return immediately and let // the caller process any events that may happen to be on the // queue. biased; res = &mut block_backend_fut => { - // If the VM started up successfully, publish that it is - // running and queue up any external requests that were - // deferred while startup was ongoing. - // - // If startup failed, just return the error without changing - // any state or processing any additional requests. The - // caller will move the instance to the appropriate terminal - // state and clean up the VM as needed. - if res.is_ok() { - let objects = &self.objects; - objects.lock_exclusive().await.resume_vcpus(); - self.external_state - .update(ExternalStateUpdate::Instance(InstanceState::Running)); - - self.input_queue.notify_request_completed( - CompletedRequest::Start { succeeded: true }, - ); - - if stopped_while_starting { - self.input_queue.push_self_request( - ExternalRequest::stop() - ); - } - - info!(&self.log, "VM successfully started"); - } else { - self.input_queue.notify_request_completed( - CompletedRequest::Start { succeeded: false }, - ); - } - - return res; + Selection::BackendFuture(res) } - dequeued = self.input_queue.wait_for_next_event() => { - dequeued + event = self.input_queue.wait_for_next_event() => { + Selection::Event(event) } }; - // The VM's vCPUs only start when the block backend startup future - // resolves and is selected above. If control reached that point, - // that branch wasn't selected, so the vCPUs should still be paused, - // which means the dequeued event should not be a guest event. - let InputQueueEvent::ExternalRequest(req) = event else { - unreachable!("can't get guest events before vCPUs start"); + let req: ExternalRequest = match selection { + Selection::BackendFuture(Ok(())) => { + let objects = &self.objects; + objects.lock_exclusive().await.resume_vcpus(); + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Running, + )); + + self.input_queue.notify_request_completed( + CompletedRequest::Start { succeeded: true }, + ); + + info!(&self.log, "VM successfully started"); + return VmStartOutcome::Succeeded; + } + + Selection::BackendFuture(Err(e)) => { + info!(&self.log, "VM startup failed: {e}"); + self.input_queue.notify_request_completed( + CompletedRequest::Start { succeeded: false }, + ); + + return VmStartOutcome::Failed; + } + + // The VM's vCPUs only start when the block backend startup + // future resolves and is selected above. If control reached + // that point, that branch wasn't selected, so the vCPUs should + // still be paused, which means the dequeued event should not be + // a guest event. + Selection::Event(InputQueueEvent::GuestEvent(_)) => { + unreachable!("can't get guest events before vCPUs start") + } + + Selection::Event(InputQueueEvent::ExternalRequest(req)) => req, }; - // Handle requests to reconfigure one of the existing Crucible - // volumes inline, but buffer other requests so that they can be - // handled after the VM has finished starting. - // - // Buffering some requests and servicing others can theoretically - // change the order in which those requests are retired. That's not - // a problem here because the request queue will stop accepting new - // VCR change requests once a request to stop has been queued. match req { ExternalRequest::State(StateChangeRequest::Stop) => { info!( @@ -713,11 +689,21 @@ impl StateDriver { "got request to stop while still starting" ); - // Remember that the VM should stop once it has started. - // It's not safe to issue a self-request here because the - // next loop iteration will simply pop the self-request back - // off the queue and reach this path once more. - stopped_while_starting = true; + // Don't send any pause/halt notifications here, since + // (depending on what async work was in flight when this + // notification was received) there may be a + // partially-started component that is not prepared to be + // paused and halted. Instead, simply move the VM to + // Stopped, return an "aborted" status, and let the caller + // arrange to drop all the VM's components. (Note that no + // vCPUs have started yet, so no guest work is in flight at + // this point.) + self.external_state.update(ExternalStateUpdate::Instance( + InstanceState::Stopped, + )); + + self.input_queue.notify_stopped(); + return VmStartOutcome::Aborted; } ExternalRequest::Component( ComponentChangeRequest::ReconfigureCrucibleVolume { @@ -810,10 +796,13 @@ impl StateDriver { match request { ExternalRequest::State(StateChangeRequest::Start) => { match self.start_vm(VmStartReason::ExplicitRequest).await { - Ok(()) => HandleEventOutcome::Continue, - Err(_) => HandleEventOutcome::Exit { + VmStartOutcome::Succeeded => HandleEventOutcome::Continue, + VmStartOutcome::Failed => HandleEventOutcome::Exit { final_state: InstanceState::Failed, }, + VmStartOutcome::Aborted => HandleEventOutcome::Exit { + final_state: InstanceState::Destroyed, + }, } } ExternalRequest::State(StateChangeRequest::MigrateAsSource { diff --git a/lib/propolis/src/block/mod.rs b/lib/propolis/src/block/mod.rs index cd210ed25..45a590ec1 100644 --- a/lib/propolis/src/block/mod.rs +++ b/lib/propolis/src/block/mod.rs @@ -251,6 +251,16 @@ pub trait Backend: Send + Sync + 'static { /// /// Spawning of any tasks required to do such request processing can be done /// as part of this start-up. + /// + /// This operation will be invoked only once per backend (when its VM + /// starts). Block backends are not explicitly resumed during VM lifecycle + /// events; instead, their corresponding devices will stop issuing new + /// requests while paused and resume issuing them when they are resumed. + /// + /// WARNING: The caller may abort VM startup and cancel the future created + /// by this routine. In this case the caller may not call [`stop`] prior to + /// dropping the backend. This routine is, however, guaranteed to be called + /// before the VM's vCPUs are started. async fn start(&self) -> anyhow::Result<()>; /// Stop attempting to process new [Request]s from [Device] (if attached) @@ -260,6 +270,12 @@ pub trait Backend: Send + Sync + 'static { /// /// If any tasks were spawned as part of [Backend::start()], they should be /// brought to rest as part of this call. + /// + /// This operation will be invoked only once per backend (when its VM + /// stops). Block backends are not explicitly paused during VM lifecycle + /// events; instead, their corresponding devices will stop issuing new + /// requests when they are told to pause (and will only report they are + /// fully paused when all their in-flight requests have completed). async fn stop(&self) -> (); /// Attempt to detach from associated [Device] diff --git a/phd-tests/tests/src/server_state_machine.rs b/phd-tests/tests/src/server_state_machine.rs index 376afb728..c15a59641 100644 --- a/phd-tests/tests/src/server_state_machine.rs +++ b/phd-tests/tests/src/server_state_machine.rs @@ -133,16 +133,9 @@ async fn stop_while_blocked_on_start_test(ctx: &Framework) { .await .unwrap(); - // Send a stop request. This should enqueue successfully, but the VM won't - // stop right away because it's still starting. + // Send a stop request. This should enqueue successfully, and the VM should + // shut down even though activation is blocked. vm.stop().await?; - - // Unblock Crucible startup by fixing the broken disk's VCR. - disk.disable_vcr_black_hole(); - disk.set_generation(2); - vm.replace_crucible_vcr(disk).await?; - - // Eventually the instance should shut down. vm.wait_for_state(InstanceState::Destroyed, Duration::from_secs(60)) .await?; } From 08359a526734701e9512d44b7225868ef8aac58b Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Fri, 7 Mar 2025 17:38:40 +0000 Subject: [PATCH 10/11] cli: accept non-uuid disk IDs in vcr replace command --- bin/propolis-cli/src/main.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bin/propolis-cli/src/main.rs b/bin/propolis-cli/src/main.rs index 4e04bb366..d27f9a0cb 100644 --- a/bin/propolis-cli/src/main.rs +++ b/bin/propolis-cli/src/main.rs @@ -155,8 +155,8 @@ enum Command { /// Call the VolumeConstructionRequest replace endpoint Vcr { /// Uuid for the disk - #[clap(short = 'u', action)] - uuid: Uuid, + #[clap(short = 'd', action)] + disk_id: String, /// File with a JSON InstanceVcrReplace struct #[clap(long, action)] @@ -510,7 +510,7 @@ async fn new_instance( async fn replace_vcr( client: &Client, - id: Uuid, + id: String, vcr_replace: InstanceVcrReplace, ) -> anyhow::Result<()> { // Try to call the endpoint @@ -941,9 +941,9 @@ async fn main() -> anyhow::Result<()> { } Command::Monitor => monitor(addr).await?, Command::InjectNmi => inject_nmi(&client).await?, - Command::Vcr { uuid, vcr_replace } => { + Command::Vcr { disk_id, vcr_replace } => { let replace: InstanceVcrReplace = parse_json_file(&vcr_replace)?; - replace_vcr(&client, uuid, replace).await? + replace_vcr(&client, disk_id, replace).await? } } From c6d4553932a432a1dd3a947abd067b1bb620e89f Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Tue, 11 Mar 2025 21:03:32 +0000 Subject: [PATCH 11/11] PR feedback --- .../src/lib/vm/state_driver.rs | 63 ++++++++++++++----- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index e0d4533c8..22354578f 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -141,12 +141,28 @@ pub(super) enum VmStartReason { ExplicitRequest, } +/// The outcome of a request to start a VM. enum VmStartOutcome { Succeeded, Failed, Aborted, } +impl VmStartOutcome { + /// If this start outcome implies that the state driver should return + /// immediately and allow the VM to be torn down, this routine returns + /// `Some(state)` where `state` is the final VM state to return from the + /// driver. If the driver should continue running the VM, this routine + /// returns `None`. + fn final_vm_state(&self) -> Option { + match self { + Self::Succeeded => None, + Self::Failed => Some(InstanceState::Failed), + Self::Aborted => Some(InstanceState::Destroyed), + } + } +} + /// A kind of event the state driver can handle. #[derive(Debug)] enum InputQueueEvent { @@ -489,10 +505,16 @@ impl StateDriver { info!(self.log, "state driver launched"); let final_state = if migrated_in { - match self.start_vm(VmStartReason::MigratedIn).await { - VmStartOutcome::Succeeded => self.event_loop().await, - VmStartOutcome::Failed => InstanceState::Failed, - VmStartOutcome::Aborted => InstanceState::Destroyed, + // If the final state is known merely from the attempt to start the + // VM, return it immediately; otherwise, run the event loop and wait + // for it to return the final state. + match self + .start_vm(VmStartReason::MigratedIn) + .await + .final_vm_state() + { + None => self.event_loop().await, + Some(s) => s, } } else { self.event_loop().await @@ -575,7 +597,12 @@ impl StateDriver { info!(self.log, "sending start request to {}", name); let res = dev.start(); if let Err(e) = res { - error!(self.log, "startup failed for {}: {:?}", name, e); + error!( + self.log, "device start() returned an error"; + "device" => %name, + "error" => %e + ); + return VmStartOutcome::Failed; } } @@ -599,7 +626,9 @@ impl StateDriver { if let Err(e) = &res { error!( log, - "startup failed for block backend {}: {:?}", name, e + "block backend start() returned an error"; + "backend" => %name, + "error" => %e ); return res; @@ -712,6 +741,9 @@ impl StateDriver { result_tx, }, ) => { + // The API caller who requested this operation can hang up + // and drop the receiver. This isn't fatal; just keep + // starting the VM if it happens. let _ = result_tx.send( self.reconfigure_crucible_volume( &backend_id, @@ -795,14 +827,17 @@ impl StateDriver { ) -> HandleEventOutcome { match request { ExternalRequest::State(StateChangeRequest::Start) => { - match self.start_vm(VmStartReason::ExplicitRequest).await { - VmStartOutcome::Succeeded => HandleEventOutcome::Continue, - VmStartOutcome::Failed => HandleEventOutcome::Exit { - final_state: InstanceState::Failed, - }, - VmStartOutcome::Aborted => HandleEventOutcome::Exit { - final_state: InstanceState::Destroyed, - }, + // If this start attempt produces a terminal VM state, return it + // to the driver and indicate that the driver should exit. + match self + .start_vm(VmStartReason::ExplicitRequest) + .await + .final_vm_state() + { + None => HandleEventOutcome::Continue, + Some(final_state) => { + HandleEventOutcome::Exit { final_state } + } } } ExternalRequest::State(StateChangeRequest::MigrateAsSource {