From a567d7e0f060702c843ae3aa82bf9c39e46b75a4 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Fri, 21 Mar 2025 17:17:23 +0000 Subject: [PATCH] Clean up volume usage records created by sagas Adding a check that the volume Nexus is about to hard delete does not have associated volume resource usage revealed areas where volumes that had read-only resources were not being correctly cleaned up. Fix that here! Note that leaking these usage records does not cause a functional problem, but it's not good to leak them. I'll have to write a background task that cleans these up. --- nexus/db-queries/src/db/datastore/volume.rs | 37 +- .../region_snapshot_replacement_start.rs | 18 + .../sagas/region_snapshot_replacement_step.rs | 417 ++++++++++++++++++ nexus/src/app/sagas/snapshot_create.rs | 4 +- 4 files changed, 474 insertions(+), 2 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/volume.rs b/nexus/db-queries/src/db/datastore/volume.rs index 008d7fdeb9f..b84da3aad17 100644 --- a/nexus/db-queries/src/db/datastore/volume.rs +++ b/nexus/db-queries/src/db/datastore/volume.rs @@ -467,9 +467,18 @@ impl DataStore { ) -> DeleteResult { use db::schema::volume::dsl; + let conn = self.pool_connection_unauthorized().await?; + + // If running integration tests, assert that the volume we're about to + // hard delete has no volume usage records. + #[cfg(any(test, feature = "testing"))] + Self::validate_volume_has_no_usage_records(&conn, volume_id) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + diesel::delete(dsl::volume) .filter(dsl::id.eq(to_db_typed_uuid(volume_id))) - .execute_async(&*self.pool_connection_unauthorized().await?) + .execute_async(&*conn) .await .map(|_| ()) .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) @@ -4061,6 +4070,32 @@ impl DataStore { Ok(()) } + + async fn validate_volume_has_no_usage_records( + conn: &async_bb8_diesel::Connection, + volume_id: VolumeUuid, + ) -> Result<(), diesel::result::Error> { + use db::schema::volume_resource_usage::dsl; + + let matching_usage_records: Vec = + dsl::volume_resource_usage + .filter(dsl::volume_id.eq(to_db_typed_uuid(volume_id))) + .select(VolumeResourceUsageRecord::as_select()) + .get_results_async(conn) + .await? + .into_iter() + .map(|r| r.try_into().unwrap()) + .collect(); + + if !matching_usage_records.is_empty() { + return Err(Self::volume_invariant_violated(format!( + "volume {volume_id} has matching usage records: {:?}", + matching_usage_records, + ))); + } + + Ok(()) + } } #[cfg(test)] diff --git a/nexus/src/app/sagas/region_snapshot_replacement_start.rs b/nexus/src/app/sagas/region_snapshot_replacement_start.rs index d9e8a0349ba..ca2b7a7e8fa 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_start.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_start.rs @@ -911,6 +911,15 @@ async fn rsrss_new_region_volume_create_undo( let new_region_volume_id = sagactx.lookup::("new_region_volume_id")?; + + if osagactx.datastore().volume_get(new_region_volume_id).await?.is_some() { + // All the knowledge to unwind the resources created by this saga is in + // this saga, but use soft delete in order to keep volume resource usage + // records consistent (they would have been added in the volume create). + // Make sure to only call this if the volume still exists. + osagactx.datastore().soft_delete_volume(new_region_volume_id).await?; + } + osagactx.datastore().volume_hard_delete(new_region_volume_id).await?; Ok(()) @@ -1008,6 +1017,15 @@ async fn rsrss_create_fake_volume_undo( // Delete the fake volume. let new_volume_id = sagactx.lookup::("new_volume_id")?; + + if osagactx.datastore().volume_get(new_volume_id).await?.is_some() { + // All the knowledge to unwind the resources created by this saga is in + // this saga, but use soft delete in order to keep volume resource usage + // records consistent (they would have been added in the volume create). + // Make sure to only call this if the volume still exists. + osagactx.datastore().soft_delete_volume(new_volume_id).await?; + } + osagactx.datastore().volume_hard_delete(new_volume_id).await?; Ok(()) diff --git a/nexus/src/app/sagas/region_snapshot_replacement_step.rs b/nexus/src/app/sagas/region_snapshot_replacement_step.rs index 5a3fdaed93e..18a10f26993 100644 --- a/nexus/src/app/sagas/region_snapshot_replacement_step.rs +++ b/nexus/src/app/sagas/region_snapshot_replacement_step.rs @@ -311,6 +311,15 @@ async fn rssrs_create_fake_volume_undo( // Delete the fake volume. let new_volume_id = sagactx.lookup::("new_volume_id")?; + + if osagactx.datastore().volume_get(new_volume_id).await?.is_some() { + // All the knowledge to unwind the resources created by this saga is in + // this saga, but use soft delete in order to keep volume resource usage + // records consistent (they would have been added in the volume create). + // Make sure to only call this if the volume still exists. + osagactx.datastore().soft_delete_volume(new_volume_id).await?; + } + osagactx.datastore().volume_hard_delete(new_volume_id).await?; Ok(()) @@ -625,3 +634,411 @@ async fn rsrss_update_request_record( Ok(()) } + +#[cfg(test)] +pub(crate) mod test { + use crate::{ + app::RegionAllocationStrategy, app::db::DataStore, + app::db::datastore::region_snapshot_replacement::InsertStepResult, + app::db::lookup::LookupPath, app::saga::create_saga_dag, + app::sagas::region_snapshot_replacement_garbage_collect, + app::sagas::region_snapshot_replacement_garbage_collect::*, + app::sagas::region_snapshot_replacement_start, + app::sagas::region_snapshot_replacement_start::*, + app::sagas::region_snapshot_replacement_step, + app::sagas::region_snapshot_replacement_step::*, + app::sagas::test_helpers::test_opctx, + }; + use nexus_db_model::RegionSnapshotReplacement; + use nexus_db_model::RegionSnapshotReplacementState; + use nexus_db_model::RegionSnapshotReplacementStep; + use nexus_db_model::RegionSnapshotReplacementStepState; + use nexus_db_model::Volume; + use nexus_db_queries::authn::saga::Serialized; + use nexus_db_queries::context::OpContext; + use nexus_test_utils::resource_helpers::DiskTest; + use nexus_test_utils::resource_helpers::create_disk; + use nexus_test_utils::resource_helpers::create_disk_from_snapshot; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils::resource_helpers::create_snapshot; + use nexus_test_utils_macros::nexus_test; + use nexus_types::identity::Asset; + use sled_agent_client::VolumeConstructionRequest; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const DISK_NAME: &str = "my-disk"; + const DISK_FROM_SNAPSHOT_NAME: &str = "my-disk-from-snap"; + const SNAPSHOT_NAME: &str = "my-snap"; + const PROJECT_NAME: &str = "springfield-squidport"; + + /// Create four zpools, a disk, and a snapshot of that disk + async fn prepare_for_test( + cptestctx: &ControlPlaneTestContext, + ) -> PrepareResult { + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let mut disk_test = DiskTest::new(cptestctx).await; + disk_test.add_zpool_with_dataset(cptestctx.first_sled_id()).await; + + let _project_id = + create_project(&client, PROJECT_NAME).await.identity.id; + + // Create a disk + let disk = create_disk(&client, PROJECT_NAME, DISK_NAME).await; + + let disk_id = disk.identity.id; + let (.., db_disk) = LookupPath::new(&opctx, &datastore) + .disk_id(disk_id) + .fetch() + .await + .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id)); + + // Create a snapshot + let snapshot = + create_snapshot(&client, PROJECT_NAME, DISK_NAME, SNAPSHOT_NAME) + .await; + + let snapshot_id = snapshot.identity.id; + + // Create a disk from that snapshot + let disk_from_snapshot = create_disk_from_snapshot( + &client, + PROJECT_NAME, + DISK_FROM_SNAPSHOT_NAME, + snapshot_id, + ) + .await; + + let (.., db_disk_from_snapshot) = LookupPath::new(&opctx, &datastore) + .disk_id(disk_from_snapshot.identity.id) + .fetch() + .await + .unwrap_or_else(|_| panic!("test disk {:?} should exist", disk_id)); + + // Replace one of the snapshot's targets + let disk_allocated_regions = + datastore.get_allocated_regions(db_disk.volume_id()).await.unwrap(); + + let region: &nexus_db_model::Region = &disk_allocated_regions[0].1; + + let region_snapshot = datastore + .region_snapshot_get(region.dataset_id(), region.id(), snapshot_id) + .await + .unwrap() + .unwrap(); + + // Manually insert the region snapshot replacement request + let request = + RegionSnapshotReplacement::for_region_snapshot(®ion_snapshot); + + datastore + .insert_region_snapshot_replacement_request(&opctx, request.clone()) + .await + .unwrap(); + + // Run the region snapshot replacement start saga + let dag = create_saga_dag::( + region_snapshot_replacement_start::Params { + serialized_authn: Serialized::for_opctx(&opctx), + request: request.clone(), + allocation_strategy: RegionAllocationStrategy::Random { + seed: None, + }, + }, + ) + .unwrap(); + + let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap(); + + // Actually run the saga + runnable_saga.run_to_completion().await.unwrap(); + + // Validate the state transition + let result = datastore + .get_region_snapshot_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!( + result.replacement_state, + RegionSnapshotReplacementState::ReplacementDone + ); + assert!(result.new_region_id.is_some()); + assert!(result.operating_saga_id.is_none()); + + // Next step of region snapshot replacement: calling the garbage collect + // saga to move the request into the Running state + + let dag = + create_saga_dag::( + region_snapshot_replacement_garbage_collect::Params { + serialized_authn: Serialized::for_opctx(&opctx), + old_snapshot_volume_id: result + .old_snapshot_volume_id() + .unwrap(), + request: result, + }, + ) + .unwrap(); + + let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap(); + + runnable_saga.run_to_completion().await.unwrap(); + + // Validate the state transition + let result = datastore + .get_region_snapshot_replacement_request_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!( + result.replacement_state, + RegionSnapshotReplacementState::Running + ); + + // Manually insert the region snapshot replacement step + + let InsertStepResult::Inserted { step_id } = datastore + .create_region_snapshot_replacement_step( + &opctx, + request.id, + db_disk_from_snapshot.volume_id(), + ) + .await + .unwrap() + else { + panic!("InsertStepResult::Inserted not returned"); + }; + + let step = datastore + .get_region_snapshot_replacement_step_by_id(&opctx, step_id) + .await + .unwrap(); + + PrepareResult { step, db_disk_from_snapshot } + } + + struct PrepareResult { + step: RegionSnapshotReplacementStep, + db_disk_from_snapshot: nexus_db_model::Disk, + } + + #[nexus_test(server = crate::Server)] + async fn test_region_snapshot_replacement_step_saga( + cptestctx: &ControlPlaneTestContext, + ) { + let PrepareResult { step, .. } = prepare_for_test(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + // Run the region snapshot replacement step saga + + let dag = create_saga_dag::( + region_snapshot_replacement_step::Params { + serialized_authn: Serialized::for_opctx(&opctx), + request: step.clone(), + }, + ) + .unwrap(); + + let runnable_saga = nexus.sagas.saga_prepare(dag).await.unwrap(); + + runnable_saga.run_to_completion().await.unwrap(); + + // Validate the state transition + let result = datastore + .get_region_snapshot_replacement_step_by_id(&opctx, step.id) + .await + .unwrap(); + + assert_eq!( + result.replacement_state, + RegionSnapshotReplacementStepState::Complete + ); + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind( + cptestctx: &ControlPlaneTestContext, + ) { + let PrepareResult { step, db_disk_from_snapshot } = + prepare_for_test(cptestctx).await; + + let log = &cptestctx.logctx.log; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let affected_volume_original = datastore + .volume_get(db_disk_from_snapshot.volume_id()) + .await + .unwrap() + .unwrap(); + + verify_clean_slate(&cptestctx, &step, &affected_volume_original).await; + + crate::app::sagas::test_helpers::action_failure_can_unwind::< + SagaRegionSnapshotReplacementStep, + _, + _, + >( + nexus, + || Box::pin(async { new_test_params(&opctx, &step) }), + || { + Box::pin(async { + verify_clean_slate( + &cptestctx, + &step, + &affected_volume_original, + ) + .await; + }) + }, + log, + ) + .await; + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + let PrepareResult { step, db_disk_from_snapshot } = + prepare_for_test(cptestctx).await; + + let log = &cptestctx.logctx.log; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let affected_volume_original = datastore + .volume_get(db_disk_from_snapshot.volume_id()) + .await + .unwrap() + .unwrap(); + + verify_clean_slate(&cptestctx, &step, &affected_volume_original).await; + + crate::app::sagas::test_helpers::action_failure_can_unwind_idempotently::< + SagaRegionSnapshotReplacementStep, + _, + _ + >( + nexus, + || Box::pin(async { new_test_params(&opctx, &step) }), + || Box::pin(async { + verify_clean_slate( + &cptestctx, + &step, + &affected_volume_original, + ).await; + }), + log + ).await; + } + + #[nexus_test(server = crate::Server)] + async fn test_actions_succeed_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + let PrepareResult { step, db_disk_from_snapshot } = + prepare_for_test(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let affected_volume_original = datastore + .volume_get(db_disk_from_snapshot.volume_id()) + .await + .unwrap() + .unwrap(); + + verify_clean_slate(&cptestctx, &step, &affected_volume_original).await; + + // Build the saga DAG with the provided test parameters + let params = new_test_params(&opctx, &step); + let dag = create_saga_dag::(params) + .unwrap(); + crate::app::sagas::test_helpers::actions_succeed_idempotently( + nexus, dag, + ) + .await; + } + + // helpers + + fn new_test_params( + opctx: &OpContext, + request: &RegionSnapshotReplacementStep, + ) -> region_snapshot_replacement_step::Params { + region_snapshot_replacement_step::Params { + serialized_authn: Serialized::for_opctx(opctx), + request: request.clone(), + } + } + + pub(crate) async fn verify_clean_slate( + cptestctx: &ControlPlaneTestContext, + request: &RegionSnapshotReplacementStep, + affected_volume_original: &Volume, + ) { + let datastore = cptestctx.server.server_context().nexus.datastore(); + + crate::app::sagas::test_helpers::assert_no_failed_undo_steps( + &cptestctx.logctx.log, + datastore, + ) + .await; + + assert_region_snapshot_replacement_step_untouched( + cptestctx, &datastore, &request, + ) + .await; + + assert_volume_untouched(&datastore, &affected_volume_original).await; + } + + async fn assert_region_snapshot_replacement_step_untouched( + cptestctx: &ControlPlaneTestContext, + datastore: &DataStore, + request: &RegionSnapshotReplacementStep, + ) { + let opctx = test_opctx(cptestctx); + let db_request = datastore + .get_region_snapshot_replacement_step_by_id(&opctx, request.id) + .await + .unwrap(); + + assert_eq!( + db_request.replacement_state, + RegionSnapshotReplacementStepState::Requested + ); + } + + async fn assert_volume_untouched( + datastore: &DataStore, + affected_volume_original: &Volume, + ) { + let affected_volume = datastore + .volume_get(affected_volume_original.id()) + .await + .unwrap() + .unwrap(); + + let actual: VolumeConstructionRequest = + serde_json::from_str(&affected_volume.data()).unwrap(); + + let expected: VolumeConstructionRequest = + serde_json::from_str(&affected_volume_original.data()).unwrap(); + + assert_eq!(actual, expected); + } +} diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index fd185b97402..7ab61655437 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -629,7 +629,9 @@ async fn ssc_create_destination_volume_record_undo( // resources. It's safe here to perform a volume hard delete without // decreasing the crucible resource count because the destination volume is // guaranteed to never have read only resources that require that - // accounting. + // accounting. This is the same reason that volume resource usage records + // are not created for the destination volume (they're only created for + // read-only resources!) info!(log, "hard deleting volume {}", destination_volume_id);