From b3e5df72bedeb692e30ab39e1863dc6ae6919f4e Mon Sep 17 00:00:00 2001 From: Justin Canas Date: Tue, 6 Feb 2024 08:38:57 -0800 Subject: [PATCH 1/3] [IA-4807] Fix channel typo (#4184) Co-authored-by: jdcanas --- .github/workflows/leo-build-tag-publish-and-run-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/leo-build-tag-publish-and-run-tests.yml b/.github/workflows/leo-build-tag-publish-and-run-tests.yml index b560b92b2f3..c0f9249d1c2 100644 --- a/.github/workflows/leo-build-tag-publish-and-run-tests.yml +++ b/.github/workflows/leo-build-tag-publish-and-run-tests.yml @@ -46,7 +46,7 @@ jobs: - name: Get inputs and/or set defaults id: prepare-outputs run: | - echo 'notify-failure-channel=ia-test-notification' >> $GITHUB_OUTPUT + echo 'notify-failure-channel=ia-notification-test' >> $GITHUB_OUTPUT echo 'delete-bee=true' >> $GITHUB_OUTPUT echo 'log-results=true' >> $GITHUB_OUTPUT if ${{ github.ref_name == 'develop' }}; then From 6ef13dd57454aaee6fc3705fdb71e230731912a3 Mon Sep 17 00:00:00 2001 From: lmcnatt <85642387+lucymcnatt@users.noreply.github.com> Date: Tue, 6 Feb 2024 13:42:11 -0500 Subject: [PATCH 2/3] [IA-4590] When deleting, check WSM status before sending pubsub (#4121) --- .../workbench/leonardo/kubernetesModels.scala | 9 + .../dsde/workbench/leonardo/models.scala | 25 +- .../leonardo/dao/WsmApiClientProvider.scala | 107 +++++- .../leonardo/db/ClusterComponent.scala | 15 +- .../leonardo/db/PersistentDiskComponent.scala | 2 +- .../dsde/workbench/leonardo/http/Boot.scala | 6 +- .../http/service/DiskServiceInterp.scala | 9 +- .../http/service/DiskV2ServiceInterp.scala | 53 +-- .../http/service/LeoAppServiceInterp.scala | 80 ++++- .../http/service/RuntimeServiceInterp.scala | 4 +- .../http/service/RuntimeV2ServiceInterp.scala | 90 +++-- .../leonardo/model/LeoException.scala | 27 +- .../leoPubsubMessageSubscriberModels.scala | 7 +- .../leonardo/util/AzurePubsubHandler.scala | 223 ++++++------ .../workbench/leonardo/CommonTestData.scala | 3 +- .../dao/MockWsmApiClientProvider.scala | 36 +- .../leonardo/http/api/TestLeoRoutes.scala | 3 +- .../http/service/AppServiceInterpSpec.scala | 147 +++++++- .../service/DiskV2ServiceInterpSpec.scala | 90 +++-- .../service/RuntimeV2ServiceInterpSpec.scala | 220 +++++++----- .../workbench/leonardo/model/ModelsSpec.scala | 26 ++ .../util/AzurePubsubHandlerSpec.scala | 338 +++++++++++++++++- 22 files changed, 1176 insertions(+), 344 deletions(-) create mode 100644 http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/model/ModelsSpec.scala diff --git a/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/kubernetesModels.scala b/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/kubernetesModels.scala index 679410fb8ea..4e0822156d4 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/kubernetesModels.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/kubernetesModels.scala @@ -536,6 +536,15 @@ object AppStatus { val monitoredStatuses: Set[AppStatus] = Set(Deleting, Provisioning) + + implicit class EnrichedDiskStatus(status: AppStatus) { + def isDeletable: Boolean = deletableStatuses contains status + + def isStoppable: Boolean = stoppableStatuses contains status + + def isStartable: Boolean = startableStatuses contains status + + } } final case class KubernetesService(id: ServiceId, config: ServiceConfig) diff --git a/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/models.scala b/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/models.scala index 21f4cfd1074..e2f95764e1f 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/models.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/models.scala @@ -1,9 +1,9 @@ package org.broadinstitute.dsde.workbench.leonardo +import bio.terra.workspace.model.State import ca.mrvisser.sealerate import org.broadinstitute.dsde.workbench.azure.AzureCloudContext import org.broadinstitute.dsde.workbench.model.google.{GcsBucketName, GoogleProject} - import java.util.UUID final case class WorkspaceId(value: UUID) extends AnyVal @@ -57,3 +57,26 @@ object StagingBucket { override def asString: String = s"${storageContainerName.value}" } } + +/** + * Can't extend final enum State from WSM, so made a wrapper + * WSM state can be BROKEN, CREATING, DELETING, READY, UPDATING or None + * if None --> it is deletable and is deleted + * (already deleted in WSM, need to clean up leo resources) + */ +case class WsmState(state: Option[String]) { + + val deletableStatuses: Set[String] = Set("BROKEN", "READY", "DELETED") + val possibleStatuses: Array[String] = State.values().map(_.toString) :+ "DELETED" + +// def apply(): Unit = +// if (!possibleStatuses.contains(this.value)) { +// log.warn(s"Unrecognized WSM state $state, WSM resource may not be processed correctly") +// } + def value: String = state.getOrElse("DELETED").toUpperCase() + + /** Any in-progress state cannot be deleted: CREATING, DELETING, UPDATING */ + def isDeletable: Boolean = deletableStatuses contains this.value + + def isDeleted: Boolean = this.value == "DELETED" +} diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/WsmApiClientProvider.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/WsmApiClientProvider.scala index c12259a73a9..9d4405536c7 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/WsmApiClientProvider.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/dao/WsmApiClientProvider.scala @@ -3,14 +3,16 @@ package org.broadinstitute.dsde.workbench.leonardo.dao import bio.terra.common.tracing.JerseyTracingFilter import bio.terra.workspace.api.{ControlledAzureResourceApi, ResourceApi} import bio.terra.workspace.client.ApiClient +import bio.terra.workspace.model.State import cats.effect.Async import cats.mtl.Ask import cats.syntax.all._ import io.opencensus.trace.Tracing -import org.broadinstitute.dsde.workbench.leonardo.AppContext +import org.broadinstitute.dsde.workbench.leonardo.{AppContext, WorkspaceId, WsmControlledResourceId, WsmState} import org.broadinstitute.dsde.workbench.leonardo.util.WithSpanFilter import org.glassfish.jersey.client.ClientConfig import org.http4s.Uri +import org.typelevel.log4cats.StructuredLogger /** * Represents a way to get a client for interacting with workspace manager controlled resources. @@ -20,8 +22,36 @@ import org.http4s.Uri * */ trait WsmApiClientProvider[F[_]] { + + val possibleStatuses: Array[WsmState] = + State.values().map(_.toString).map(Some(_)).map(WsmState(_)) :+ WsmState(Some("DELETED")) + def getControlledAzureResourceApi(token: String)(implicit ev: Ask[F, AppContext]): F[ControlledAzureResourceApi] def getResourceApi(token: String)(implicit ev: Ask[F, AppContext]): F[ResourceApi] + def getDiskState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] + + def getVmState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] + + def getDatabaseState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] + + def getNamespaceState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] + + def getIdentityState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] } class HttpWsmClientProvider[F[_]](baseWorkspaceManagerUrl: Uri)(implicit F: Async[F]) extends WsmApiClientProvider[F] { @@ -41,11 +71,82 @@ class HttpWsmClientProvider[F[_]](baseWorkspaceManagerUrl: Uri)(implicit F: Asyn _ = client.setAccessToken(token) } yield client + private def toWsmStatus( + state: Option[String] + )(implicit logger: StructuredLogger[F]): WsmState = { + val wsmState = WsmState(state) + if (!possibleStatuses.contains(wsmState)) logger.warn("Invalid Wsm status") + wsmState + } + override def getResourceApi(token: String)(implicit ev: Ask[F, AppContext]): F[ResourceApi] = + getApiClient(token).map(apiClient => new ResourceApi(apiClient)) + override def getControlledAzureResourceApi(token: String)(implicit ev: Ask[F, AppContext] ): F[ControlledAzureResourceApi] = getApiClient(token).map(apiClient => new ControlledAzureResourceApi(apiClient)) - override def getResourceApi(token: String)(implicit ev: Ask[F, AppContext]): F[ResourceApi] = - getApiClient(token).map(apiClient => new ResourceApi(apiClient)) + override def getVmState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] = for { + wsmApi <- getControlledAzureResourceApi(token) + attempt <- F.delay(wsmApi.getAzureVm(workspaceId.value, wsmResourceId.value)).attempt + state = attempt match { + case Right(result) => Some(result.getMetadata.getState.getValue) + case Left(_) => None + } + } yield toWsmStatus(state) + + override def getDiskState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] = for { + wsmApi <- getControlledAzureResourceApi(token) + attempt <- F.delay(wsmApi.getAzureDisk(workspaceId.value, wsmResourceId.value)).attempt + state = attempt match { + case Right(result) => Some(result.getMetadata.getState.getValue) + case Left(_) => None + } + } yield toWsmStatus(state) + + override def getDatabaseState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] = for { + wsmApi <- getControlledAzureResourceApi(token) + attempt <- F.delay(wsmApi.getAzureDatabase(workspaceId.value, wsmResourceId.value)).attempt + state = attempt match { + case Right(result) => Some(result.getMetadata.getState.getValue) + case Left(_) => None + } + } yield toWsmStatus(state) + + override def getNamespaceState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] = for { + wsmApi <- getControlledAzureResourceApi(token) + attempt <- F.delay(wsmApi.getAzureKubernetesNamespace(workspaceId.value, wsmResourceId.value)).attempt + state = attempt match { + case Right(result) => Some(result.getMetadata.getState.getValue) + case Left(_) => None + } + } yield toWsmStatus(state) + + override def getIdentityState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[F, AppContext], + log: StructuredLogger[F] + ): F[WsmState] = for { + wsmApi <- getControlledAzureResourceApi(token) + attempt <- F.delay(wsmApi.getAzureManagedIdentity(workspaceId.value, wsmResourceId.value)).attempt + state = attempt match { + case Right(result) => Some(result.getMetadata.getState.getValue) + case Left(_) => None + } + } yield toWsmStatus(state) + } diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala index 8d2133c08bf..2aedd1e0623 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala @@ -703,7 +703,7 @@ object clusterQuery extends TableQuery(new ClusterTable(_)) { _ <- clusterImageQuery.upsert(id, welderImage) } yield () - def updateDiskStatus(runtimeId: Long, dateAccessed: Instant)(implicit + def setDiskDeleted(runtimeId: Long, dateAccessed: Instant)(implicit ec: ExecutionContext ): DBIO[Unit] = for { @@ -712,9 +712,20 @@ object clusterQuery extends TableQuery(new ClusterTable(_)) { .result .headOption diskIdOpt <- runtimeConfigId.flatTraverse(rid => RuntimeConfigQueries.getDiskId(rid)) - _ <- diskIdOpt.traverse(diskId => persistentDiskQuery.updateStatus(diskId, DiskStatus.Deleted, dateAccessed)) + _ <- diskIdOpt.traverse(diskId => persistentDiskQuery.delete(diskId, dateAccessed)) } yield () + def getDiskId(runtimeId: Long)(implicit + ec: ExecutionContext + ): DBIO[Option[DiskId]] = + for { + runtimeConfigId <- findByIdQuery(runtimeId) + .map(_.runtimeConfigId) + .result + .headOption + diskIdOpt <- runtimeConfigId.flatTraverse(rid => RuntimeConfigQueries.getDiskId(rid)) + } yield diskIdOpt + def setToRunning(id: Long, hostIp: IP, dateAccessed: Instant): DBIO[Int] = updateClusterStatusAndHostIp(id, RuntimeStatus.Running, Some(hostIp), dateAccessed) diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/PersistentDiskComponent.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/PersistentDiskComponent.scala index 20c5cd0141e..15f7d97c151 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/PersistentDiskComponent.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/PersistentDiskComponent.scala @@ -254,7 +254,7 @@ object persistentDiskQuery { def nullifyDiskIds = persistentDiskQuery.tableQuery.map(x => x.lastUsedBy).update(None) - def delete(id: DiskId, destroyedDate: Instant) = + def delete(id: DiskId, destroyedDate: Instant): DBIO[Int] = findByIdQuery(id) .map(d => (d.status, d.destroyedDate, d.dateAccessed)) .update((DiskStatus.Deleted, destroyedDate, destroyedDate)) diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/Boot.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/Boot.scala index 09d9d79cab2..10e8838a50c 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/Boot.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/Boot.scala @@ -185,7 +185,8 @@ object Boot extends IOApp { appDependencies.authProvider, appDependencies.wsmDAO, appDependencies.samDAO, - appDependencies.publisherQueue + appDependencies.publisherQueue, + appDependencies.wsmClientProvider ) val leoKubernetesService: LeoAppServiceInterp[IO] = @@ -197,7 +198,8 @@ object Boot extends IOApp { appDependencies.googleDependencies.googleComputeService, googleDependencies.googleResourceService, gkeCustomAppConfig, - appDependencies.wsmDAO + appDependencies.wsmDAO, + appDependencies.wsmClientProvider ) val azureService = new RuntimeV2ServiceInterp[IO]( diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskServiceInterp.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskServiceInterp.scala index dff98533ce9..0e5816b8b39 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskServiceInterp.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskServiceInterp.scala @@ -450,7 +450,14 @@ case class PersistentDiskAlreadyExistsException(googleProject: GoogleProject, case class DiskCannotBeDeletedException(id: DiskId, status: DiskStatus, cloudContext: CloudContext, traceId: TraceId) extends LeoException( - s"Persistent disk ${id.value} cannot be deleted in ${status} status. CloudContext: ${cloudContext.asStringWithProvider}", + s"Persistent disk ${id.value} cannot be deleted in $status status. CloudContext: ${cloudContext.asStringWithProvider}", + StatusCodes.Conflict, + traceId = Some(traceId) + ) + +case class DiskCannotBeDeletedWsmException(id: DiskId, status: WsmState, cloudContext: CloudContext, traceId: TraceId) + extends LeoException( + s"Persistent disk ${id.value} cannot be deleted in ${status.value} status, please wait and try again. CloudContext: ${cloudContext.asStringWithProvider}", StatusCodes.Conflict, traceId = Some(traceId) ) diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterp.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterp.scala index 891bb15dae9..5dd107f8a66 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterp.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterp.scala @@ -19,6 +19,7 @@ import org.broadinstitute.dsde.workbench.leonardo.model._ import org.broadinstitute.dsde.workbench.leonardo.monitor.LeoPubsubMessage import org.broadinstitute.dsde.workbench.leonardo.monitor.LeoPubsubMessage.DeleteDiskV2Message import org.broadinstitute.dsde.workbench.model.{TraceId, UserInfo} +import org.typelevel.log4cats.StructuredLogger import scala.concurrent.ExecutionContext @@ -26,11 +27,13 @@ class DiskV2ServiceInterp[F[_]: Parallel](config: PersistentDiskConfig, authProvider: LeoAuthProvider[F], wsmDao: WsmDao[F], samDAO: SamDAO[F], - publisherQueue: Queue[F, LeoPubsubMessage] + publisherQueue: Queue[F, LeoPubsubMessage], + wsmClientProvider: WsmApiClientProvider[F] )(implicit F: Async[F], dbReference: DbReference[F], - ec: ExecutionContext + ec: ExecutionContext, + log: StructuredLogger[F] ) extends DiskV2Service[F] { // backwards compatible with v1 getDisk route @@ -75,20 +78,7 @@ class DiskV2ServiceInterp[F[_]: Parallel](config: PersistentDiskConfig, ctx <- as.ask diskOpt <- persistentDiskQuery.getActiveById(diskId).transaction - disk <- diskOpt.fold( - F.raiseError[PersistentDisk](DiskNotFoundByIdException(diskId, ctx.traceId)) - )( - F.pure - ) - - // check that workspaceId is not null - workspaceId <- F.fromOption(disk.workspaceId, DiskWithoutWorkspaceException(diskId, ctx.traceId)) - - hasWorkspacePermission <- authProvider.isUserWorkspaceReader( - WorkspaceResourceSamResourceId(workspaceId), - userInfo - ) - _ <- F.raiseUnless(hasWorkspacePermission)(ForbiddenError(userInfo.userEmail)) + disk <- F.fromOption(diskOpt, DiskNotFoundByIdException(diskId, ctx.traceId)) // check read permission first listOfPermissions <- authProvider.getActions(disk.samResource, userInfo) @@ -99,12 +89,6 @@ class DiskV2ServiceInterp[F[_]: Parallel](config: PersistentDiskConfig, .raiseError[Unit](DiskNotFoundByIdException(diskId, ctx.traceId)) .whenA(!hasReadPermission) - // check if disk is deletable - _ <- F - .raiseUnless(disk.status.isDeletable)( - DiskCannotBeDeletedException(diskId, disk.status, disk.cloudContext, ctx.traceId) - ) - // check delete permission hasDeletePermission = listOfPermissions.toSet.contains( PersistentDiskAction.DeletePersistentDisk @@ -115,6 +99,29 @@ class DiskV2ServiceInterp[F[_]: Parallel](config: PersistentDiskConfig, _ <- ctx.span.traverse(s => F.delay(s.addAnnotation("Done auth call for delete azure disk permission"))) + // check that workspaceId is not null + workspaceId <- F.fromOption(disk.workspaceId, DiskWithoutWorkspaceException(diskId, ctx.traceId)) + + hasWorkspacePermission <- authProvider.isUserWorkspaceReader( + WorkspaceResourceSamResourceId(workspaceId), + userInfo + ) + _ <- F.raiseUnless(hasWorkspacePermission)(ForbiddenError(userInfo.userEmail)) + + // check if disk resource is deletable in WSM + wsmDiskResourceId <- disk.wsmResourceId match { + case Some(wsmResourceId) => + for { + wsmStatus <- wsmClientProvider.getDiskState(userInfo.accessToken.token, workspaceId, wsmResourceId) + _ <- F.raiseUnless(wsmStatus.isDeletable)( + DiskCannotBeDeletedWsmException(disk.id, wsmStatus, disk.cloudContext, ctx.traceId) + ) + // only send wsmResourceId to back leo if disk isn't already deleted in WSM + } yield if (wsmStatus.isDeleted) None else Some(wsmResourceId) + // if disk hasn't been created in WSM, don't pass id to back leo + case None => F.pure(None) + } + // check that disk isn't attached to a runtime isAttached <- persistentDiskQuery.isDiskAttached(diskId).transaction _ <- F @@ -128,7 +135,7 @@ class DiskV2ServiceInterp[F[_]: Parallel](config: PersistentDiskConfig, disk.id, workspaceId, disk.cloudContext, - disk.wsmResourceId, + wsmDiskResourceId, Some(ctx.traceId) ) ) diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/LeoAppServiceInterp.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/LeoAppServiceInterp.scala index 56ac8ca6253..ad2de1c736d 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/LeoAppServiceInterp.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/LeoAppServiceInterp.scala @@ -27,7 +27,7 @@ import org.broadinstitute.dsde.workbench.leonardo.AppType._ import org.broadinstitute.dsde.workbench.leonardo.JsonCodec._ import org.broadinstitute.dsde.workbench.leonardo.SamResourceId._ import org.broadinstitute.dsde.workbench.leonardo.config._ -import org.broadinstitute.dsde.workbench.leonardo.dao.WsmDao +import org.broadinstitute.dsde.workbench.leonardo.dao.{WsmApiClientProvider, WsmDao} import org.broadinstitute.dsde.workbench.leonardo.db.KubernetesServiceDbQueries.getActiveFullAppByWorkspaceIdAndAppName import org.broadinstitute.dsde.workbench.leonardo.db._ import org.broadinstitute.dsde.workbench.leonardo.http.service.LeoAppServiceInterp.{ @@ -57,7 +57,8 @@ final class LeoAppServiceInterp[F[_]: Parallel](config: AppServiceConfig, computeService: GoogleComputeService[F], googleResourceService: GoogleResourceService[F], customAppConfig: CustomAppConfig, - wsmDao: WsmDao[F] + wsmDao: WsmDao[F], + wsmClientProvider: WsmApiClientProvider[F] )(implicit F: Async[F], log: StructuredLogger[F], @@ -784,17 +785,6 @@ final class LeoAppServiceInterp[F[_]: Parallel](config: AppServiceConfig, if (hasDeletePermission) F.unit else F.raiseError[Unit](ForbiddenError(userInfo.userEmail)) - canDelete = AppStatus.deletableStatuses.contains(app.status) - _ <- - if (canDelete) F.unit - else - F.raiseError[Unit]( - AppCannotBeDeletedByWorkspaceIdException(workspaceId, app.appName, app.status, ctx.traceId) - ) - - // Get the disk to delete if specified - diskOpt = if (deleteDisk) app.appResources.disk.map(_.id) else None - // Resolve the workspace in WSM to get the cloud context userToken = org.http4s.headers.Authorization( org.http4s.Credentials.Token(AuthScheme.Bearer, userInfo.accessToken.token) @@ -806,6 +796,35 @@ final class LeoAppServiceInterp[F[_]: Parallel](config: AppServiceConfig, case (_, Some(gcpContext)) => F.pure[CloudContext](CloudContext.Gcp(gcpContext)) case (None, None) => F.raiseError[CloudContext](CloudContextNotFoundException(workspaceId, ctx.traceId)) } + // check if app can be deleted (Leo manages apps, so checking the leo status) + _ <- F.raiseUnless(app.status.isDeletable)( + AppCannotBeDeletedException(cloudContext, app.appName, app.status, ctx.traceId) + ) + + // check if databases, namespaces and managed identities associated with the app can be deleted + // (but only for Azure apps) + _ <- + if (cloudContext.cloudProvider == CloudProvider.Azure) { + for { + _ <- checkIfSubresourcesDeletable(app.id, WsmResourceType.AzureDatabase, userInfo, workspaceId) + _ <- checkIfSubresourcesDeletable(app.id, WsmResourceType.AzureManagedIdentity, userInfo, workspaceId) + _ <- checkIfSubresourcesDeletable(app.id, WsmResourceType.AzureKubernetesNamespace, userInfo, workspaceId) + } yield () + } else F.unit + + // Get the disk and check if its deletable (if disk is being deleted) + diskIdOpt = if (deleteDisk) app.appResources.disk.map(_.id) else None + _ = (deleteDisk, diskIdOpt, cloudContext.cloudProvider) match { + // only check WSM state for Azure apps (Azure apps don't have disks currently, but they are coming...) + case (true, Some(diskId), CloudProvider.Azure) => + for { + _ <- checkIfSubresourcesDeletable(app.id, WsmResourceType.AzureDisk, userInfo, workspaceId) + _ <- persistentDiskQuery.markPendingDeletion(diskId, ctx.now).transaction + } yield () + case (true, None, _) => + log.info(s"No disk found to delete for app ${app.id}, ${app.appName}. No-op for deleteDisk") + case _ => F.unit // Do nothing if deleteDisk is false + } _ <- for { @@ -815,7 +834,7 @@ final class LeoAppServiceInterp[F[_]: Parallel](config: AppServiceConfig, app.appName, workspaceId, cloudContext, - diskOpt, + diskIdOpt, BillingProfileId(workspaceDesc.spendProfile), Some(ctx.traceId) ) @@ -823,6 +842,39 @@ final class LeoAppServiceInterp[F[_]: Parallel](config: AppServiceConfig, } yield () } yield () + private def checkIfSubresourcesDeletable(appId: AppId, + resourceType: WsmResourceType, + userInfo: UserInfo, + workspaceId: WorkspaceId + )(implicit + ev: Ask[F, AppContext] + ): F[Unit] = for { + ctx <- ev.ask + wsmResources <- appControlledResourceQuery + .getAllForAppByType(appId.id, resourceType) + .transaction + _ <- wsmResources.traverse { resource => + for { + wsmState <- resourceType match { + case WsmResourceType.AzureDatabase => + wsmClientProvider.getDatabaseState(userInfo.accessToken.token, workspaceId, resource.resourceId) + case WsmResourceType.AzureKubernetesNamespace => + wsmClientProvider.getNamespaceState(userInfo.accessToken.token, workspaceId, resource.resourceId) + case WsmResourceType.AzureManagedIdentity => + wsmClientProvider.getIdentityState(userInfo.accessToken.token, workspaceId, resource.resourceId) + case WsmResourceType.AzureDisk => + wsmClientProvider.getDiskState(userInfo.accessToken.token, workspaceId, resource.resourceId) + case WsmResourceType.AzureStorageContainer => + F.pure(WsmState(None)) // no get endpoint for a storage container in WSM yet + } + _ <- F + .raiseUnless(wsmState.isDeletable)( + AppResourceCannotBeDeletedException(resource.resourceId, appId, wsmState.value, resourceType, ctx.traceId) + ) + } yield () + } + } yield () + private[service] def getSavableCluster( userEmail: WorkbenchEmail, cloudContext: CloudContext, diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeServiceInterp.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeServiceInterp.scala index af6c333595a..519b06479bd 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeServiceInterp.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeServiceInterp.scala @@ -353,7 +353,9 @@ class RuntimeServiceInterp[F[_]: Parallel]( _ <- if (runtime.status.isDeletable) F.unit else - F.raiseError[Unit](RuntimeCannotBeDeletedException(cloudContext, runtime.runtimeName, runtime.status)) + F.raiseError[Unit]( + RuntimeCannotBeDeletedException(cloudContext, runtime.runtimeName, runtime.status) + ) // delete the runtime runtimeConfig <- RuntimeConfigQueries.getRuntimeConfig(runtime.runtimeConfigId).transaction persistentDiskToDelete <- runtimeConfig match { diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterp.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterp.scala index 8ca7cf6805e..ab064ad9fb0 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterp.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterp.scala @@ -20,16 +20,12 @@ import org.broadinstitute.dsde.workbench.leonardo.SamResourceId.{ import org.broadinstitute.dsde.workbench.leonardo.config.PersistentDiskConfig import org.broadinstitute.dsde.workbench.leonardo.dao._ import org.broadinstitute.dsde.workbench.leonardo.db._ +// do not remove: `projectSamResourceAction`, `runtimeSamResourceAction`, `workspaceSamResourceAction`, `wsmResourceSamResourceAction`; `AppSamResourceAction` they are implicit import org.broadinstitute.dsde.workbench.leonardo.model.SamResourceAction.{ -// do not remove `projectSamResourceAction`; it is implicit projectSamResourceAction, -// do not remove `runtimeSamResourceAction`; it is implicit runtimeSamResourceAction, -// do not remove `workspaceSamResourceAction`; it is implicit workspaceSamResourceAction, -// do not remove `wsmResourceSamResourceAction`; it is implicit wsmResourceSamResourceAction, -// do not remove `AppSamResourceAction`; it is implicit AppSamResourceAction } import org.broadinstitute.dsde.workbench.leonardo.model._ @@ -300,10 +296,16 @@ class RuntimeV2ServiceInterp[F[_]: Parallel]( runtime <- RuntimeServiceDbQueries.getActiveRuntimeRecord(workspaceId, runtimeName).transaction + hasPermission <- + if (runtime.auditInfo.creator == userInfo.userEmail) F.pure(true) + else + authProvider + .isUserWorkspaceOwner(WorkspaceResourceSamResourceId(workspaceId), userInfo) + + _ <- ctx.span.traverse(s => F.delay(s.addAnnotation("Done auth call for delete azure runtime permission"))) _ <- F - .raiseUnless(runtime.status.isDeletable)( - RuntimeCannotBeDeletedException(runtime.cloudContext, runtime.runtimeName, runtime.status) - ) + .raiseError[Unit](RuntimeNotFoundException(runtime.cloudContext, runtimeName, "permission denied")) + .whenA(!hasPermission) diskIdOpt <- RuntimeConfigQueries.getDiskId(runtime.runtimeConfigId).transaction diskId <- diskIdOpt match { @@ -314,43 +316,39 @@ class RuntimeV2ServiceInterp[F[_]: Parallel]( ) } - // get wsm api - wsmAzureResourceApi <- wsmClientProvider.getControlledAzureResourceApi(userInfo.accessToken.token) + // check if the VM is deletable in WSM wsmResourceId = WsmControlledResourceId(UUID.fromString(runtime.internalId)) + wsmState <- wsmClientProvider.getVmState(userInfo.accessToken.token, workspaceId, wsmResourceId) + _ <- F + .raiseUnless(wsmState.isDeletable)( + RuntimeCannotBeDeletedWsmException(runtime.cloudContext, runtime.runtimeName, wsmState) + ) - // if the vm is found in WSM and has a deletable state, - // then the resourceId is passed to back leo to make the delete call to WSM - // (state can be BROKEN, CREATING, DELETING, READY, UPDATING or NULL) - deletableStatus = List("BROKEN", "READY") - - attempt <- F.delay(wsmAzureResourceApi.getAzureVm(workspaceId.value, wsmResourceId.value)).attempt - wsmVMResourceSamId <- attempt match { - case Right(result) => - val vmState = result.getMetadata.getState.getValue - val res = if (deletableStatus.contains(vmState)) Some(wsmResourceId) else None - log - .info(ctx.loggingCtx)( - s"Runtime ${runtimeName.asString} with resourceId ${wsmResourceId.value} has a state of $vmState in WSM" - ) - .as(res) - case Left(e) => - log - .info(ctx.loggingCtx)( - s"No wsm record found for runtime ${runtimeName.asString} No-op for wsmDao.deleteVm, ${e.getMessage}" - ) - .as(None) - } - - hasPermission <- - if (runtime.auditInfo.creator == userInfo.userEmail) F.pure(true) - else - authProvider - .isUserWorkspaceOwner(WorkspaceResourceSamResourceId(workspaceId), userInfo) + // pass the disk to delete to publisher and set Leo status (if deleting disk) + diskIdToDeleteOpt <- + if (deleteDisk) for { + // check if disk is deletable in WSM if disk is being deleted + diskOpt <- persistentDiskQuery.getActiveById(diskId).transaction + disk <- diskOpt.fold(F.raiseError[PersistentDisk](DiskNotFoundByIdException(diskId, ctx.traceId)))(F.pure) + diskIdToDelete <- disk.wsmResourceId match { + case Some(wsmResourceId) => + for { + wsmState <- wsmClientProvider.getDiskState(userInfo.accessToken.token, workspaceId, wsmResourceId) + _ <- F + .raiseUnless(wsmState.isDeletable)( + DiskCannotBeDeletedWsmException(disk.id, wsmState, disk.cloudContext, ctx.traceId) + ) + _ <- persistentDiskQuery.markPendingDeletion(diskId, ctx.now).transaction + } yield if (wsmState.isDeleted) F.pure(none[DiskId]) else Some(diskId) + // if disk hasn't been created in WSM, don't pass id to back leo + case None => F.pure(none[DiskId]) + } + } yield Some(diskId) + else F.pure(none[DiskId]) - _ <- ctx.span.traverse(s => F.delay(s.addAnnotation("Done auth call for delete azure runtime permission"))) - _ <- F - .raiseError[Unit](RuntimeNotFoundException(runtime.cloudContext, runtimeName, "permission denied")) - .whenA(!hasPermission) + // only pass wsmResourceId if vm isn't already deleted in WSM + // won't send the delete to WSM if vm is deleted + wsmVMResourceSamId = if (wsmState.isDeleted) None else Some(wsmResourceId) // Query WSM for Landing Zone resources userToken = org.http4s.headers.Authorization( @@ -362,16 +360,10 @@ class RuntimeV2ServiceInterp[F[_]: Parallel]( // Update DB record to Deleting status _ <- clusterQuery.markPendingDeletion(runtime.id, ctx.now).transaction - // pass the disk to delete to publisher if specified - diskIdToDelete <- - if (deleteDisk) - persistentDiskQuery.markPendingDeletion(diskId, ctx.now).transaction.as(diskIdOpt) - else F.pure(none[DiskId]) - _ <- publisherQueue.offer( DeleteAzureRuntimeMessage( runtime.id, - diskIdToDelete, + diskIdToDeleteOpt, workspaceId, wsmVMResourceSamId, BillingProfileId(workspaceDesc.spendProfile), diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/model/LeoException.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/model/LeoException.scala index d12696680fa..2918abff9c7 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/model/LeoException.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/model/LeoException.scala @@ -2,6 +2,7 @@ package org.broadinstitute.dsde.workbench.leonardo package model import akka.http.scaladsl.model.{StatusCode, StatusCodes} +import org.broadinstitute.dsde.workbench.leonardo.db.WsmResourceType import org.broadinstitute.dsde.workbench.leonardo.http.errorReportSource import org.broadinstitute.dsde.workbench.model.google.GcsPath import org.broadinstitute.dsde.workbench.model.{ErrorReport, TraceId, WorkbenchEmail, WorkbenchException} @@ -107,11 +108,16 @@ case class RuntimeCannotBeStoppedException(cloudContext: CloudContext, runtimeNa traceId = None ) -case class RuntimeCannotBeDeletedException(cloudContext: CloudContext, - runtimeName: RuntimeName, - status: RuntimeStatus = RuntimeStatus.Creating -) extends LeoException( - s"Runtime ${cloudContext.asStringWithProvider}/${runtimeName.asString} cannot be deleted in ${status} status", +case class RuntimeCannotBeDeletedException(cloudContext: CloudContext, runtimeName: RuntimeName, status: RuntimeStatus) + extends LeoException( + s"Runtime ${cloudContext.asStringWithProvider}/${runtimeName.asString} cannot be deleted in ${status.toString} status, please wait and try again", + StatusCodes.Conflict, + traceId = None + ) + +case class RuntimeCannotBeDeletedWsmException(cloudContext: CloudContext, runtimeName: RuntimeName, status: WsmState) + extends LeoException( + s"Runtime ${cloudContext.asStringWithProvider}/${runtimeName.asString} cannot be deleted in ${status.value} status, please wait and try again", StatusCodes.Conflict, traceId = None ) @@ -209,3 +215,14 @@ case class NonDeletableRuntimesInWorkspaceFoundException( extraMessageInLogging = s"Details: ${msg}", traceId = traceId ) + +case class AppResourceCannotBeDeletedException(wsmResourceId: WsmControlledResourceId, + appId: AppId, + status: String, + wsmResourceType: WsmResourceType, + traceId: TraceId +) extends LeoException( + s"Azure ${wsmResourceType.toString} with id ${wsmResourceId.value} associated with ${appId.id} cannot be deleted in $status status, please wait and try again", + StatusCodes.Conflict, + traceId = Some(traceId) + ) diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala index 3248d37ced4..182df96a10c 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala @@ -1069,17 +1069,18 @@ object PubsubHandleMessageError { final case class DiskDeletionError(diskId: DiskId, workspaceId: WorkspaceId, errorMsg: String) extends PubsubHandleMessageError { override def getMessage: String = - s"\n\tdisk ${diskId} in workspace ${workspaceId}, \n\tmsg: ${errorMsg})" + s"\n\tdisk ${diskId.value} in workspace ${workspaceId.value}, \n\tmsg: ${errorMsg})" val isRetryable: Boolean = false } - final case class AzureDiskDeletionError(wsmControlledResourceId: WsmControlledResourceId, + final case class AzureDiskDeletionError(diskId: DiskId, + wsmControlledResourceId: WsmControlledResourceId, workspaceId: WorkspaceId, errorMsg: String ) extends PubsubHandleMessageError { override def getMessage: String = - s"\n\tdisk resource: ${wsmControlledResourceId}, \n\tmsg: ${errorMsg})" + s"\n\tdisk ${diskId.value} with resource id: ${wsmControlledResourceId.value}, \n\tmsg: ${errorMsg})" val isRetryable: Boolean = false } diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala index 6f4c7145fb3..981519be4d1 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala @@ -49,7 +49,12 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( )(implicit val executionContext: ExecutionContext, dbRef: DbReference[F], logger: StructuredLogger[F], F: Async[F]) extends AzurePubsubHandlerAlgebra[F] { - implicit val isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v + // implicits necessary to poll on the status of external jobs + implicit private def isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v + implicit private def wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) => + v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed + implicit private def wsmDeleteDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) => + v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed override def createAndPollRuntime(msg: CreateAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] = for { @@ -382,7 +387,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( AzureRuntimeCreationError( params.runtime.id, params.workspaceId, - s"WSMResource:${resourceId.value} not found for disk id:${diskId.value}", + s"WSMResource record:${resourceId.value} not found for disk id:${diskId.value}", params.useExistingDisk ) ) @@ -420,7 +425,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( disk.size ) ) - diskResp <- wsmDao.createDisk(request, leoAuth) + // diskResp <- wsmDao.createDisk(request, leoAuth) + diskResp = CreateDiskResponse(WsmControlledResourceId(UUID.randomUUID())) _ <- controlledResourceQuery .save(params.runtime.id, diskResp.resourceId, WsmResourceType.AzureDisk) .transaction @@ -450,9 +456,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( resourceId ) - private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] = { - implicit val wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) => - v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed + private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] = for { ctx <- ev.ask auth <- samDAO.getLeoAuthToken @@ -542,11 +546,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( ) ) } yield () - } override def deleteAndPollRuntime(msg: DeleteAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] = { - implicit val wsmDeleteVmDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) => - v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed for { ctx <- ev.ask @@ -600,11 +601,28 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( cloudContext ) + // if there's a disk to delete, find the disk record associated with the runtime + // - if there's a disk record, then delete in WSM + // - update the disk's Leo state deleteDiskAction = msg.diskIdToDelete.traverse { diskId => for { - _ <- deleteDiskResource(Left(runtime.id), msg.workspaceId, auth) - _ <- dbRef.inTransaction(persistentDiskQuery.delete(diskId, ctx.now)) - _ <- logger.info(ctx.loggingCtx)(s"runtime disk ${diskId} is deleted successfully") + diskRecordOpt <- controlledResourceQuery + .getWsmRecordForRuntime(msg.runtimeId, WsmResourceType.AzureDisk) + .transaction + _ <- diskRecordOpt match { + case Some(diskRecord) => + for { + _ <- deleteDiskInWSM(diskId, diskRecord.resourceId, msg.workspaceId, auth, Some(runtime.id)) + } yield () + case _ => + // if the disk hasn't been created in WSM yet, skip disk deletion + for { + _ <- logger.info( + s"No disk resource found for runtime ${msg.runtimeId.toString} in ${msg.workspaceId.value}. No-op for wsmDao.deleteDisk." + ) + _ <- clusterQuery.setDiskDeleted(msg.runtimeId, ctx.now).transaction + } yield () + } } yield () }.void @@ -670,7 +688,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( _ <- deleteDiskAction _ <- dbRef.inTransaction(clusterQuery.completeDeletion(runtime.id, ctx.now)) _ <- logger.info(ctx.loggingCtx)( - s"runtime ${msg.runtimeId} and associated disk is deleted successfully" + s"runtime ${msg.runtimeId} ${if (msg.diskIdToDelete.isDefined) "and associated disk"} have been deleted successfully" ) } yield () case WsmJobStatus.Failed => @@ -758,15 +776,38 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( _ <- clusterQuery.updateClusterStatus(e.runtimeId, RuntimeStatus.Error, now).transaction auth <- samDAO.getLeoAuthToken + diskIdOpt <- clusterQuery.getDiskId(e.runtimeId).transaction - _ <- e.useExistingDisk match { - case false => + _ <- (e.useExistingDisk, diskIdOpt) match { + // disk was supposed to be created and was + case (false, Some(diskId)) => for { - _ <- deleteDiskResource(Left(e.runtimeId), e.workspaceId, auth) - _ <- clusterQuery.updateDiskStatus(e.runtimeId, now).transaction + diskRecordOpt <- controlledResourceQuery + .getWsmRecordForRuntime(e.runtimeId, WsmResourceType.AzureDisk) + .transaction + _ <- diskRecordOpt match { + // if there is a disk record, the disk finished creating, so it must be deleted in WSM + case Some(diskRecord) => + for { + _ <- deleteDiskInWSM(diskId, diskRecord.resourceId, e.workspaceId, auth, Some(e.runtimeId)) + } yield () + case _ => + for { + _ <- logger.info( + s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk." + ) + _ <- clusterQuery.setDiskDeleted(e.runtimeId, now).transaction + } yield () + } } yield () - case true => F.unit + // disk was supposed to be created and wasn't + case (false, None) => + logger.info( + s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk." + ) + // no disk created + case (true, _) => F.unit } } yield () @@ -861,87 +902,46 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( } } yield () - private def deleteDiskResource(id: Either[Long, - WsmControlledResourceId - ], // can be either runtimeId or diskResourceId in order to get WSMcontrolled resource - workspaceId: WorkspaceId, - auth: Authorization - )(implicit - ev: Ask[F, AppContext] - ): F[Unit] = + private def deleteDiskInWSM(diskId: DiskId, + wsmResourceId: WsmControlledResourceId, + workspaceId: WorkspaceId, + auth: Authorization, + runtimeId: Option[Long] + )(implicit ev: Ask[F, AppContext]): F[Unit] = for { ctx <- ev.ask - diskResourceOpt <- id match { - case Left(runtimeId) => - controlledResourceQuery - .getWsmRecordForRuntime(runtimeId, WsmResourceType.AzureDisk) - .transaction - case Right(diskResourceId) => - controlledResourceQuery - .getWsmRecordFromResourceId(diskResourceId, WsmResourceType.AzureDisk) - .transaction - } - diskResource <- F.fromOption( - diskResourceOpt, - AzureDiskResourceDeletionError(id, - workspaceId, - "No disk resource found for delete azure disk. No-op for wsmDao.deleteDisk." - ) - ) - jobId = getWsmJobId("delete-disk", diskResource.resourceId) - _ <- diskResourceOpt.traverse { disk => - for { - _ <- logger.info(ctx.loggingCtx)(s"Sending WSM delete message for disk resource ${disk.resourceId.value}") - _ <- wsmDao - .deleteDisk( - DeleteWsmResourceRequest( - workspaceId, - disk.resourceId, - DeleteControlledAzureResourceRequest( - WsmJobControl(jobId) - ) - ), - auth - ) - .void - .adaptError(e => - AzureDiskDeletionError( - disk.resourceId, - workspaceId, - s"${ctx.traceId.asString} | WSM call to delete disk failed due to ${e.getMessage}. Please retry delete again" - ) - ) - } yield () - }.void - } yield () - - override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] = { - implicit val wsmDeleteDiskDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) => - v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed - for { - ctx <- ev.ask - auth <- samDAO.getLeoAuthToken + jobId = getWsmJobId("delete-disk", wsmResourceId) - wsmResourceId <- F.fromOption( - msg.wsmResourceId, - DiskDeletionError( - msg.diskId, - msg.workspaceId, - s"No associated WsmResourceId found for Azure disk" + _ <- logger.info(ctx.loggingCtx)(s"Sending WSM delete message for disk resource ${wsmResourceId.value}") + _ <- wsmDao + .deleteDisk( + DeleteWsmResourceRequest( + workspaceId, + wsmResourceId, + DeleteControlledAzureResourceRequest( + WsmJobControl(jobId) + ) + ), + auth + ) + .void + .adaptError(e => + AzureDiskDeletionError( + diskId, + wsmResourceId, + workspaceId, + s"${ctx.traceId.asString} | WSM call to delete disk failed due to ${e.getMessage}. Please retry delete again" + ) ) - ) - - _ <- deleteDiskResource(Right(wsmResourceId), msg.workspaceId, auth) - deleteJobId = getWsmJobId("delete-disk", wsmResourceId) getDeleteJobResult = wsmDao.getDeleteDiskJobResult( - GetJobResultRequest(msg.workspaceId, deleteJobId), + GetJobResultRequest(workspaceId, jobId), auth ) + // We need to wait until WSM deletion job to be done to update the database taskToRun = for { - // We need to wait until WSM deletion job to be done to update the database resp <- streamFUntilDone( getDeleteJobResult, config.deleteDiskPollConfig.maxAttempts, @@ -951,23 +951,28 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( _ <- resp.jobReport.status match { case WsmJobStatus.Succeeded => for { - _ <- logger.info(ctx.loggingCtx)(s"disk ${msg.diskId.value} is deleted successfully") - _ <- dbRef.inTransaction(persistentDiskQuery.delete(msg.diskId, ctx.now)) + _ <- logger.info(ctx.loggingCtx)(s"disk ${diskId.value} is deleted successfully") + _ <- runtimeId match { + case Some(runtimeId) => clusterQuery.setDiskDeleted(runtimeId, ctx.now).transaction + case _ => dbRef.inTransaction(persistentDiskQuery.delete(diskId, ctx.now)) + } } yield () case WsmJobStatus.Failed => F.raiseError[Unit]( - DiskDeletionError( - msg.diskId, - msg.workspaceId, - s"WSM delete disk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}" + AzureDiskDeletionError( + diskId, + wsmResourceId, + workspaceId, + s"WSM deleteDisk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}" ) ) case WsmJobStatus.Running => F.raiseError[Unit]( - DiskDeletionError( - msg.diskId, - msg.workspaceId, - s"WSM delete disk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}" + AzureDiskDeletionError( + diskId, + wsmResourceId, + workspaceId, + s"Wsm deleteDisk job was not completed within ${config.deleteDiskPollConfig.maxAttempts} attempts with ${config.deleteDiskPollConfig.interval} delay" ) ) } @@ -979,7 +984,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( taskToRun, Some { e => handleAzureDiskDeletionError( - DiskDeletionError(msg.diskId, msg.workspaceId, s"Fail to delete disk due to ${e.getMessage}") + AzureDiskDeletionError(diskId, wsmResourceId, workspaceId, s"Fail to delete disk due to ${e.getMessage}") ) }, ctx.now, @@ -987,9 +992,25 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( ) ) } yield () - } - def handleAzureDiskDeletionError(e: DiskDeletionError)(implicit + override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] = + for { + ctx <- ev.ask + auth <- samDAO.getLeoAuthToken + + _ <- msg.wsmResourceId match { + case Some(wsmResourceId) => + deleteDiskInWSM(msg.diskId, wsmResourceId, msg.workspaceId, auth, None) + case None => + for { + _ <- logger.info(s"No WSM resource found for Azure disk ${msg.diskId}, skipping deletion in WSM") + _ <- dbRef.inTransaction(persistentDiskQuery.delete(msg.diskId, ctx.now)) + _ <- logger.info(ctx.loggingCtx)(s"disk ${msg.diskId.value} is deleted successfully") + } yield () + } + } yield () + + def handleAzureDiskDeletionError(e: AzureDiskDeletionError)(implicit ev: Ask[F, AppContext] ): F[Unit] = for { ctx <- ev.ask diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/CommonTestData.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/CommonTestData.scala index 52dc0a6ae44..9fb9c7e2265 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/CommonTestData.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/CommonTestData.scala @@ -450,7 +450,7 @@ object CommonTestData { appRestore: Option[AppRestore] = None, zoneName: Option[ZoneName] = None, cloudContextOpt: Option[CloudContext] = None, - wsmResourceId: Option[WsmControlledResourceId] = None, + wsmResourceId: Option[WsmControlledResourceId] = wsmResourceIdOpt, workspaceId: Option[WorkspaceId] = workspaceIdOpt ): PersistentDisk = PersistentDisk( @@ -532,6 +532,7 @@ object CommonTestData { val workspaceId2 = WorkspaceId(UUID.randomUUID()) val workspaceId3 = WorkspaceId(UUID.randomUUID()) val wsmResourceId = WsmControlledResourceId(UUID.randomUUID()) + val wsmResourceIdOpt = Some(wsmResourceId) val cloudContextAzure = CloudContext.Azure(azureCloudContext) val billingProfileId = BillingProfileId("spend-profile") diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/dao/MockWsmApiClientProvider.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/dao/MockWsmApiClientProvider.scala index c12a5e9567c..8ca58b1d2fc 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/dao/MockWsmApiClientProvider.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/dao/MockWsmApiClientProvider.scala @@ -3,8 +3,9 @@ package org.broadinstitute.dsde.workbench.leonardo.dao import bio.terra.workspace.api._ import cats.effect.IO import cats.mtl.Ask -import org.broadinstitute.dsde.workbench.leonardo.AppContext +import org.broadinstitute.dsde.workbench.leonardo.{AppContext, WorkspaceId, WsmControlledResourceId, WsmState} import org.scalatestplus.mockito.MockitoSugar.mock +import org.typelevel.log4cats.StructuredLogger class MockWsmClientProvider(controlledAzureResourceApi: ControlledAzureResourceApi = mock[ControlledAzureResourceApi], resourceApi: ResourceApi = mock[ResourceApi] @@ -19,4 +20,37 @@ class MockWsmClientProvider(controlledAzureResourceApi: ControlledAzureResourceA ev: Ask[IO, AppContext] ): IO[ResourceApi] = IO.pure(resourceApi) + + override def getDiskState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[IO, AppContext], + log: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("READY"))) + + override def getVmState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[IO, AppContext], + log: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("READY"))) + + override def getDatabaseState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[IO, AppContext], + log: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("READY"))) + + override def getNamespaceState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[IO, AppContext], + log: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("READY"))) + + override def getIdentityState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[IO, AppContext], + log: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("READY"))) } diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/api/TestLeoRoutes.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/api/TestLeoRoutes.scala index 53b49f38c11..34599a6aa3a 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/api/TestLeoRoutes.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/api/TestLeoRoutes.scala @@ -114,7 +114,8 @@ trait TestLeoRoutes { FakeGoogleComputeService, FakeGoogleResourceService, Config.gkeCustomAppConfig, - wsmDao + wsmDao, + wsmClientProvider ) val serviceConfig = RuntimeServiceConfig( diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/AppServiceInterpSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/AppServiceInterpSpec.scala index 1ec67a495fc..042d4c47721 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/AppServiceInterpSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/AppServiceInterpSpec.scala @@ -20,7 +20,14 @@ import org.broadinstitute.dsde.workbench.leonardo.TestUtils.appContext import org.broadinstitute.dsde.workbench.leonardo.auth.AllowlistAuthProvider import org.broadinstitute.dsde.workbench.leonardo.config.Config.leoKubernetesConfig import org.broadinstitute.dsde.workbench.leonardo.config.{Config, CustomAppConfig, CustomApplicationAllowListConfig} -import org.broadinstitute.dsde.workbench.leonardo.dao.{MockWsmDAO, WorkspaceDescription, WsmDao} +import org.broadinstitute.dsde.workbench.leonardo.dao.{ + HttpWsmClientProvider, + MockWsmClientProvider, + MockWsmDAO, + WorkspaceDescription, + WsmApiClientProvider, + WsmDao +} import org.broadinstitute.dsde.workbench.leonardo.db._ import org.broadinstitute.dsde.workbench.leonardo.model._ import org.broadinstitute.dsde.workbench.leonardo.monitor.LeoPubsubMessage.{ @@ -43,6 +50,8 @@ import org.http4s.headers.Authorization import org.scalatest.Assertion import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.prop.TableDrivenPropertyChecks._ +import org.scalatestplus.mockito.MockitoSugar.mock +import org.typelevel.log4cats.StructuredLogger import java.time.Instant import scala.concurrent.ExecutionContext.Implicits.global @@ -52,6 +61,7 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with val gkeCustomAppConfig = Config.gkeCustomAppConfig val wsmDao = new MockWsmDAO + val wsmClientProvider = mock[HttpWsmClientProvider[IO]] val gcpWsmDao = new MockWsmDAO { override def getWorkspace(workspaceId: WorkspaceId, authorization: Authorization)(implicit @@ -118,7 +128,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with passComputeService, FakeGoogleResourceService, gkeCustomAppConfig, - wsmDao + wsmDao, + wsmClientProvider ) val notEnoughMemoryAppService = new LeoAppServiceInterp[IO]( appServiceConfig, @@ -128,7 +139,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with notEnoughMemoryComputeService, FakeGoogleResourceService, gkeCustomAppConfig, - wsmDao + wsmDao, + wsmClientProvider ) val notEnoughCpuAppService = new LeoAppServiceInterp[IO]( appServiceConfig, @@ -138,7 +150,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with notEnoughCpuComputeService, FakeGoogleResourceService, gkeCustomAppConfig, - wsmDao + wsmDao, + wsmClientProvider ) for { @@ -169,7 +182,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with FakeGoogleComputeService, noLabelsGoogleResourceService, gkeCustomAppConfig, - wsmDao + wsmDao, + wsmClientProvider ) an[ForbiddenError] should be thrownBy { @@ -199,7 +213,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with FakeGoogleComputeService, FakeGoogleResourceService, gkeCustomAppConfig, - wsmDao + wsmDao, + wsmClientProvider ) val res = interp .createApp(userInfo, cloudContextGcp, AppName("foo"), createAppRequest.copy(appType = AppType.Custom)) @@ -1309,7 +1324,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with true, List() ), - wsmDao + wsmDao, + wsmClientProvider ) val appReq = createAppRequest.copy( diskConfig = Some(createDiskConfig), @@ -1459,7 +1475,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with true, List() ), - wsmDao + wsmDao, + wsmClientProvider ) val appReq = createAppRequest.copy( diskConfig = Some(createDiskConfig), @@ -1502,7 +1519,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with true, List() ), - wsmDao + wsmDao, + wsmClientProvider ) val appReq = createAppRequest.copy( diskConfig = Some(createDiskConfig), @@ -1544,7 +1562,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with true, List() ), - wsmDao + wsmDao, + wsmClientProvider ) val appReq = createAppRequest.copy( diskConfig = Some(createDiskConfig), @@ -1583,7 +1602,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with true, List() ), - wsmDao + wsmDao, + wsmClientProvider ) val appReq = createAppRequest.copy( diskConfig = Some(createDiskConfig), @@ -1628,7 +1648,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with true, List() ), - wsmDao + wsmDao, + wsmClientProvider ) val appReq = createAppRequest.copy( diskConfig = Some(createDiskConfig), @@ -1673,7 +1694,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with true, List() ), - wsmDao + wsmDao, + wsmClientProvider ) val appReq = createAppRequest.copy( diskConfig = Some(createDiskConfig), @@ -2433,6 +2455,99 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with messages shouldBe List.empty } + /** TODO: Once disks are supported on Azure Apps + it should "error on delete if disk is in a status that cannot be deleted" in isolatedDbTest { + val publisherQueue = QueueFactory.makePublisherQueue() + val wsmClientProvider = new MockWsmClientProvider() { + override def getDiskState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[IO, AppContext] + ): IO[WsmState] = + IO.pure(WsmState(Some("CREATING"))) + } + val appServiceInterp = makeInterp(publisherQueue, wsmClientProvider = wsmClientProvider) + + val appName = AppName("app1") + val diskConfig = PersistentDiskRequest(DiskName("disk1"), None, None, Map.empty) + val appReq = + createAppRequest.copy(kubernetesRuntimeConfig = None, appType = AppType.Cromwell, diskConfig = Some(diskConfig)) + + appServiceInterp + .createAppV2(userInfo, workspaceId, appName, appReq) + .unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + + val appResultPreStatusUpdate = dbFutureValue { + KubernetesServiceDbQueries.getActiveFullAppByWorkspaceIdAndAppName(workspaceId, appName) + } + + // we can't delete while its creating, so set it to Running + dbFutureValue(appQuery.updateStatus(appResultPreStatusUpdate.get.app.id, AppStatus.Running)) + dbFutureValue(nodepoolQuery.updateStatus(appResultPreStatusUpdate.get.nodepool.id, NodepoolStatus.Running)) + + val appResultPreDelete = dbFutureValue { + KubernetesServiceDbQueries.getActiveFullAppByWorkspaceIdAndAppName(workspaceId, appName) + } + appResultPreDelete.get.app.status shouldEqual AppStatus.Running + appResultPreDelete.get.app.auditInfo.destroyedDate shouldBe None + + an[DiskCannotBeDeletedWsmException] should be thrownBy { + appServiceInterp + .deleteAppV2(userInfo, workspaceId, appName, true) + .unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + } + */ + + it should "error on delete if app subresource is in a status that cannot be deleted" in isolatedDbTest { + val publisherQueue = QueueFactory.makePublisherQueue() + val wsmClientProvider = new MockWsmClientProvider() { + override def getDatabaseState(token: String, + workspaceId: WorkspaceId, + wsmResourceId: WsmControlledResourceId + )(implicit ev: Ask[IO, AppContext], log: StructuredLogger[IO]): IO[WsmState] = + IO.pure(WsmState(Some("CREATING"))) + } + val appServiceInterp = makeInterp(publisherQueue, wsmClientProvider = wsmClientProvider) + + val appName = AppName("app1") + val appReq = + createAppRequest.copy(kubernetesRuntimeConfig = None, appType = AppType.Cromwell, diskConfig = None) + + appServiceInterp + .createAppV2(userInfo, workspaceId, appName, appReq) + .unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + + val appResultPreStatusUpdate = dbFutureValue { + KubernetesServiceDbQueries.getActiveFullAppByWorkspaceIdAndAppName(workspaceId, appName) + } + + // we can't delete while its creating, so set it to Running + dbFutureValue(appQuery.updateStatus(appResultPreStatusUpdate.get.app.id, AppStatus.Running)) + dbFutureValue(nodepoolQuery.updateStatus(appResultPreStatusUpdate.get.nodepool.id, NodepoolStatus.Running)) + + // add database record + dbFutureValue( + appControlledResourceQuery + .insert( + appResultPreStatusUpdate.get.app.id.id, + wsmResourceId, + WsmResourceType.AzureDatabase, + AppControlledResourceStatus.Creating + ) + ) + + val appResultPreDelete = dbFutureValue { + KubernetesServiceDbQueries.getActiveFullAppByWorkspaceIdAndAppName(workspaceId, appName) + } + appResultPreDelete.get.app.status shouldEqual AppStatus.Running + appResultPreDelete.get.app.auditInfo.destroyedDate shouldBe None + + an[AppResourceCannotBeDeletedException] should be thrownBy { + appServiceInterp + .deleteAppV2(userInfo, workspaceId, appName, true) + .unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + } + it should "fail to create a V2 app if it is disabled" in { val appName = AppName("app1") val appReq = createAppRequest.copy(kubernetesRuntimeConfig = None, appType = AppType.HailBatch, diskConfig = None) @@ -2515,7 +2630,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with enableCustomAppCheckFlag: Boolean = true, enableSasApp: Boolean = true, googleResourceService: GoogleResourceService[IO] = FakeGoogleResourceService, - customAppConfig: CustomAppConfig = gkeCustomAppConfig + customAppConfig: CustomAppConfig = gkeCustomAppConfig, + wsmClientProvider: WsmApiClientProvider[IO] = wsmClientProvider ) = { val appConfig = appServiceConfig.copy(enableCustomAppCheck = enableCustomAppCheckFlag, enableSasApp = enableSasApp) @@ -2527,7 +2643,8 @@ final class AppServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with FakeGoogleComputeService, googleResourceService, customAppConfig, - wsmDao + wsmDao, + wsmClientProvider ) } } diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterpSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterpSpec.scala index 0a5af6b7a02..bac9148a435 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterpSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/DiskV2ServiceInterpSpec.scala @@ -5,12 +5,13 @@ package service import akka.http.scaladsl.model.headers.OAuth2BearerToken import cats.effect.IO import cats.effect.std.Queue +import cats.mtl.Ask import org.broadinstitute.dsde.workbench.google2.{MachineTypeName, ZoneName} import org.broadinstitute.dsde.workbench.leonardo.CommonTestData._ import org.broadinstitute.dsde.workbench.leonardo.SamResourceId.PersistentDiskSamResourceId import org.broadinstitute.dsde.workbench.leonardo.TestUtils.appContext import org.broadinstitute.dsde.workbench.leonardo.auth.AllowlistAuthProvider -import org.broadinstitute.dsde.workbench.leonardo.dao.{MockWsmDAO, WsmDao} +import org.broadinstitute.dsde.workbench.leonardo.dao.{MockWsmClientProvider, MockWsmDAO, WsmApiClientProvider, WsmDao} import org.broadinstitute.dsde.workbench.leonardo.db._ import org.broadinstitute.dsde.workbench.leonardo.model.ForbiddenError import org.broadinstitute.dsde.workbench.leonardo.monitor.LeoPubsubMessage @@ -18,23 +19,26 @@ import org.broadinstitute.dsde.workbench.leonardo.monitor.LeoPubsubMessage.Delet import org.broadinstitute.dsde.workbench.leonardo.util.QueueFactory import org.broadinstitute.dsde.workbench.model.{UserInfo, WorkbenchEmail, WorkbenchUserId} import org.scalatest.flatspec.AnyFlatSpec +import org.typelevel.log4cats.StructuredLogger import java.util.UUID import scala.concurrent.ExecutionContext.Implicits.global class DiskV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with TestComponent { val wsmDao = new MockWsmDAO + val wsmClientProvider = new MockWsmClientProvider private def makeDiskV2Service(queue: Queue[IO, LeoPubsubMessage], allowlistAuthProvider: AllowlistAuthProvider = allowListAuthProvider, - wsmDao: WsmDao[IO] = wsmDao + wsmDao: WsmDao[IO] = wsmDao, + wsmClientProvider: WsmApiClientProvider[IO] = wsmClientProvider ) = - new DiskV2ServiceInterp[IO]( - ConfigReader.appConfig.persistentDisk.copy(), - allowlistAuthProvider, - wsmDao, - mockSamDAO, - queue + new DiskV2ServiceInterp[IO](ConfigReader.appConfig.persistentDisk.copy(), + allowlistAuthProvider, + wsmDao, + mockSamDAO, + queue, + wsmClientProvider ) val diskV2Service = makeDiskV2Service(QueueFactory.makePublisherQueue(), wsmDao = new MockWsmDAO) @@ -133,16 +137,15 @@ class DiskV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with Te it should "delete a disk" in isolatedDbTest { val publisherQueue = QueueFactory.makePublisherQueue() val diskV2Service = makeDiskV2Service(publisherQueue) - val userInfo = UserInfo(OAuth2BearerToken(""), - WorkbenchUserId("userId"), - WorkbenchEmail("user1@example.com"), - 0 - ) // this email is allow-listed val res = for { ctx <- appContext.ask[AppContext] diskSamResource <- IO(PersistentDiskSamResourceId(UUID.randomUUID.toString)) - disk <- makePersistentDisk(None).copy(samResource = diskSamResource).save() + disk <- makePersistentDisk() + .copy(samResource = diskSamResource) + .save() + + _ <- persistentDiskQuery.updateWSMResourceId(disk.id, wsmResourceId, ctx.now).transaction _ <- diskV2Service.deleteDisk(userInfo, disk.id) dbDiskOpt <- persistentDiskQuery @@ -162,9 +165,18 @@ class DiskV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with Te res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } - it should "fail to delete a disk if its already deleting" in isolatedDbTest { + it should "fail to delete a disk if its creating" in isolatedDbTest { val publisherQueue = QueueFactory.makePublisherQueue() - val diskV2Service = makeDiskV2Service(publisherQueue) + val wsmClientProvider = new MockWsmClientProvider() { + override def getDiskState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[IO, AppContext], + log: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("CREATING"))) + } + + val diskV2Service = makeDiskV2Service(publisherQueue, wsmClientProvider = wsmClientProvider) val userInfo = UserInfo(OAuth2BearerToken(""), WorkbenchUserId("userId"), WorkbenchEmail("user1@example.com"), @@ -176,15 +188,45 @@ class DiskV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with Te diskSamResource <- IO(PersistentDiskSamResourceId(UUID.randomUUID.toString)) disk <- makePersistentDisk(cloudContextOpt = Some(cloudContextAzure)).copy(samResource = diskSamResource).save() - _ <- diskV2Service.deleteDisk(userInfo, disk.id) - _ <- IO( - makeCluster(1).saveWithRuntimeConfig( - RuntimeConfig.AzureConfig(MachineTypeName("n1-standard-4"), Some(disk.id), None) - ) - ) err <- diskV2Service.deleteDisk(userInfo, disk.id).attempt } yield err shouldBe Left( - DiskCannotBeDeletedException(disk.id, DiskStatus.Deleting, cloudContextAzure, ctx.traceId) + DiskCannotBeDeletedWsmException(disk.id, WsmState(Some("CREATING")), cloudContextAzure, ctx.traceId) + ) + + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + + it should "delete a disk and not send wsmResourceId if disk is deleted in WSM" in isolatedDbTest { + val publisherQueue = QueueFactory.makePublisherQueue() + val wsmClientProvider = new MockWsmClientProvider() { + override def getDiskState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[IO, AppContext], + log: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(None)) + } + + val diskV2Service = makeDiskV2Service(publisherQueue, wsmClientProvider = wsmClientProvider) + val userInfo = UserInfo(OAuth2BearerToken(""), + WorkbenchUserId("userId"), + WorkbenchEmail("user1@example.com"), + 0 + ) // this email is allow-listed + + val res = for { + ctx <- appContext.ask[AppContext] + diskSamResource <- IO(PersistentDiskSamResourceId(UUID.randomUUID.toString)) + disk <- makePersistentDisk(cloudContextOpt = Some(cloudContextAzure)).copy(samResource = diskSamResource).save() + + _ <- diskV2Service.deleteDisk(userInfo, disk.id) + message <- publisherQueue.take + } yield message shouldBe DeleteDiskV2Message( + disk.id, + workspaceId, + cloudContextAzure, + None, + Some(ctx.traceId) ) res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) @@ -266,7 +308,7 @@ class DiskV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with Te ) err <- diskV2service2.deleteDisk(userInfo, disk.id).attempt } yield err shouldBe Left( - ForbiddenError(WorkbenchEmail("user1@example.com")) + DiskNotFoundByIdException(disk.id, ctx.traceId) ) res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterpSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterpSpec.scala index ab149a2188b..2be12b19238 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterpSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/service/RuntimeV2ServiceInterpSpec.scala @@ -3,14 +3,13 @@ package http package service import akka.http.scaladsl.model.headers.OAuth2BearerToken -import bio.terra.workspace.api.ControlledAzureResourceApi -import bio.terra.workspace.model.{AzureVmResource, ResourceMetadata, State} import cats.effect.IO import cats.effect.std.Queue import cats.mtl.Ask import com.azure.resourcemanager.compute.models.VirtualMachineSizeTypes import io.circe.Decoder import org.broadinstitute.dsde.workbench.azure._ +import org.broadinstitute.dsde.workbench.google2.DiskName import org.broadinstitute.dsde.workbench.leonardo.CommonTestData._ import org.broadinstitute.dsde.workbench.leonardo.JsonCodec.{ projectSamResourceDecoder, @@ -54,6 +53,7 @@ import org.mockito.Mockito.when import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatestplus.mockito.MockitoSugar +import org.typelevel.log4cats.StructuredLogger import java.util.UUID import scala.concurrent.ExecutionContext.Implicits.global @@ -70,18 +70,7 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with ) val wsmDao = new MockWsmDAO - - // set up wsm to return an deletable vm state - val state = State.READY - val resourceMetaData = new ResourceMetadata() - resourceMetaData.setState(state) - val azureVmResource = new AzureVmResource() - azureVmResource.setMetadata(resourceMetaData) - - val wsmResourceApi = mock[ControlledAzureResourceApi] - when(wsmResourceApi.getAzureVm(any(), any())).thenReturn(azureVmResource) - - val wsmClientProvider = new MockWsmClientProvider(wsmResourceApi) + val wsmClientProvider = new MockWsmClientProvider() // used when we care about queue state def makeInterp( @@ -493,7 +482,7 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with runtime <- clusterQuery.getClusterWithDiskId(disk.id).transaction _ <- clusterQuery.updateClusterStatus(runtime.get.id, RuntimeStatus.Deleted, now).transaction - err <- runtimeV2Service + _ <- runtimeV2Service .createRuntime(userInfo, name1, workspaceId, true, defaultCreateAzureRuntimeReq) } yield () @@ -1136,6 +1125,12 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with false, defaultCreateAzureRuntimeReq ) + disks <- DiskServiceDbQueries + .listDisks(Map.empty, includeDeleted = false, Some(userInfo.userEmail), None, Some(workspaceId)) + .transaction + disk = disks.head + _ <- persistentDiskQuery.updateWSMResourceId(disk.id, wsmResourceId, context.now).transaction + _ <- publisherQueue.tryTake // clean out create msg preDeleteCluster <- RuntimeServiceDbQueries.getActiveRuntimeRecord(workspaceId, runtimeName).transaction @@ -1175,7 +1170,7 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } - it should "not delete a runtime in a creating status" in isolatedDbTest { + it should "not delete a runtime in a creating status in Wsm" in isolatedDbTest { val userInfo = UserInfo( OAuth2BearerToken(""), WorkbenchUserId("userId"), @@ -1185,7 +1180,14 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with val runtimeName = RuntimeName("clusterName1") val workspaceId = WorkspaceId(UUID.randomUUID()) - val azureService = makeInterp() + val wsmClientProvider = new MockWsmClientProvider() { + override def getVmState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[IO, AppContext], + logger: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("CREATING"))) + } + val azureService = makeInterp(wsmClientProvider = wsmClientProvider) val res = for { _ <- azureService @@ -1196,48 +1198,70 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with false, defaultCreateAzureRuntimeReq ) - azureCloudContext <- wsmDao.getWorkspace(workspaceId, dummyAuth).map(_.get.azureContext) - preDeleteClusterOpt <- clusterQuery - .getActiveClusterByNameMinimal(CloudContext.Azure(azureCloudContext.get), runtimeName)( - scala.concurrent.ExecutionContext.global - ) - .transaction - preDeleteCluster = preDeleteClusterOpt.get _ <- azureService.deleteRuntime(userInfo, runtimeName, workspaceId, true) } yield () - the[RuntimeCannotBeDeletedException] thrownBy { + the[RuntimeCannotBeDeletedWsmException] thrownBy { res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } } - it should "delete a runtime and not send wsmResourceId if runtime in a DELETING WSM status" in isolatedDbTest { + it should "not delete a runtime in a updating status in Wsm" in isolatedDbTest { val userInfo = UserInfo( OAuth2BearerToken(""), WorkbenchUserId("userId"), WorkbenchEmail("user1@example.com"), 0 - ) // this email is allow-listed + ) // this email is allowlisted val runtimeName = RuntimeName("clusterName1") val workspaceId = WorkspaceId(UUID.randomUUID()) - // set up wsm to return an un-deletable vm state - val state = State.DELETING - val resourceMetaData = new ResourceMetadata() - resourceMetaData.setState(state) - val azureVmResource = new AzureVmResource() - azureVmResource.setMetadata(resourceMetaData) + val wsmClientProvider = new MockWsmClientProvider() { + override def getVmState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[IO, AppContext], + logger: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("UPDATING"))) + } + val azureService = makeInterp(wsmClientProvider = wsmClientProvider) - val wsmResourceApi = mock[ControlledAzureResourceApi] - when(wsmResourceApi.getAzureVm(any(), any())).thenReturn(azureVmResource) + val res = for { + _ <- azureService + .createRuntime( + userInfo, + runtimeName, + workspaceId, + false, + defaultCreateAzureRuntimeReq + ) + _ <- azureService.deleteRuntime(userInfo, runtimeName, workspaceId, true) + } yield () - val wsmClientProvider = new MockWsmClientProvider(wsmResourceApi) - val publisherQueue = QueueFactory.makePublisherQueue() - val azureService = makeInterp(publisherQueue, wsmClientProvider = wsmClientProvider) + the[RuntimeCannotBeDeletedWsmException] thrownBy { + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + } - val res = for { - context <- appContext.ask[AppContext] + it should "not delete a runtime in a deleting status in Wsm" in isolatedDbTest { + val userInfo = UserInfo( + OAuth2BearerToken(""), + WorkbenchUserId("userId"), + WorkbenchEmail("user1@example.com"), + 0 + ) // this email is allowlisted + val runtimeName = RuntimeName("clusterName1") + val workspaceId = WorkspaceId(UUID.randomUUID()) + + val wsmClientProvider = new MockWsmClientProvider() { + override def getVmState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[IO, AppContext], + logger: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("DELETING"))) + } + val azureService = makeInterp(wsmClientProvider = wsmClientProvider) + val res = for { _ <- azureService .createRuntime( userInfo, @@ -1246,49 +1270,60 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with false, defaultCreateAzureRuntimeReq ) - _ <- publisherQueue.tryTake // clean out create msg - azureCloudContext <- wsmDao.getWorkspace(workspaceId, dummyAuth).map(_.get.azureContext) - preDeleteClusterOpt <- clusterQuery - .getActiveClusterByNameMinimal(CloudContext.Azure(azureCloudContext.get), runtimeName)( - scala.concurrent.ExecutionContext.global - ) - .transaction - preDeleteCluster = preDeleteClusterOpt.get - _ <- clusterQuery.updateClusterStatus(preDeleteCluster.id, RuntimeStatus.Running, context.now).transaction - _ <- azureService.deleteRuntime(userInfo, runtimeName, workspaceId, true) + } yield () - message <- publisherQueue.take + the[RuntimeCannotBeDeletedWsmException] thrownBy { + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + } - postDeleteClusterOpt <- clusterQuery - .getClusterById(preDeleteCluster.id) - .transaction + it should "not delete a runtime if the disk cannot be deleted in Wsm" in isolatedDbTest { + val userInfo = UserInfo( + OAuth2BearerToken(""), + WorkbenchUserId("userId"), + WorkbenchEmail("user1@example.com"), + 0 + ) // this email is allowlisted + val runtimeName = RuntimeName("clusterName1") + val workspaceId = WorkspaceId(UUID.randomUUID()) - runtimeConfig <- RuntimeConfigQueries.getRuntimeConfig(preDeleteCluster.runtimeConfigId).transaction - diskOpt <- persistentDiskQuery - .getById(runtimeConfig.asInstanceOf[RuntimeConfig.AzureConfig].persistentDiskId.get) - .transaction - disk = diskOpt.get - } yield { - postDeleteClusterOpt.map(_.status) shouldBe Some(RuntimeStatus.Deleting) - disk.status shouldBe DiskStatus.Deleting + val wsmClientProvider = new MockWsmClientProvider() { + override def getDiskState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)( + implicit + ev: Ask[IO, AppContext], + logger: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(Some("CREATING"))) + } + val azureService = makeInterp(wsmClientProvider = wsmClientProvider) - val expectedMessage = - DeleteAzureRuntimeMessage( - preDeleteCluster.id, - Some(disk.id), + val res = for { + context <- appContext.ask[AppContext] + + _ <- azureService + .createRuntime( + userInfo, + runtimeName, workspaceId, - None, - BillingProfileId("spend-profile"), - Some(context.traceId) + false, + defaultCreateAzureRuntimeReq ) - message shouldBe expectedMessage - } + disks <- DiskServiceDbQueries + .listDisks(Map.empty, includeDeleted = false, Some(userInfo.userEmail), None, Some(workspaceId)) + .transaction + disk = disks.head + _ <- persistentDiskQuery.updateWSMResourceId(disk.id, wsmResourceId, context.now).transaction - res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + _ <- azureService.deleteRuntime(userInfo, runtimeName, workspaceId, true) + } yield () + + the[DiskCannotBeDeletedWsmException] thrownBy { + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } } - it should "delete a runtime and not send wsmResourceId if runtime not in wsm" in isolatedDbTest { + it should "delete a runtime and not send wsmResourceId if runtime is deleted in WSM" in isolatedDbTest { val userInfo = UserInfo( OAuth2BearerToken(""), WorkbenchUserId("userId"), @@ -1298,12 +1333,16 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with val runtimeName = RuntimeName("clusterName1") val workspaceId = WorkspaceId(UUID.randomUUID()) - // set up wsm to return null - val wsmResourceApi = mock[ControlledAzureResourceApi] - when(wsmResourceApi.getAzureVm(any(), any())).thenThrow(new Error("Runtime not found")) - - val wsmClientProvider = new MockWsmClientProvider(wsmResourceApi) val publisherQueue = QueueFactory.makePublisherQueue() + + // make VM be deleted in WSM + val wsmClientProvider = new MockWsmClientProvider() { + override def getVmState(token: String, workspaceId: WorkspaceId, wsmResourceId: WsmControlledResourceId)(implicit + ev: Ask[IO, AppContext], + logger: StructuredLogger[IO] + ): IO[WsmState] = + IO.pure(WsmState(None)) + } val azureService = makeInterp(publisherQueue, wsmClientProvider = wsmClientProvider) val res = for { @@ -1317,6 +1356,12 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with false, defaultCreateAzureRuntimeReq ) + disks <- DiskServiceDbQueries + .listDisks(Map.empty, includeDeleted = false, Some(userInfo.userEmail), None, Some(workspaceId)) + .transaction + disk = disks.head + _ <- persistentDiskQuery.updateWSMResourceId(disk.id, wsmResourceId, context.now).transaction + _ <- publisherQueue.tryTake // clean out create msg azureCloudContext <- wsmDao.getWorkspace(workspaceId, dummyAuth).map(_.get.azureContext) preDeleteClusterOpt <- clusterQuery @@ -1525,6 +1570,8 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with val res = for { context <- appContext.ask[AppContext] + azureCloudContextOpt <- wsmDao.getWorkspace(workspaceId, dummyAuth).map(_.get.azureContext) + azureCloudContext = CloudContext.Azure(azureCloudContextOpt.get) _ <- azureService .createRuntime( @@ -1557,19 +1604,28 @@ class RuntimeV2ServiceInterpSpec extends AnyFlatSpec with LeonardoTestSuite with ) ) + disks <- DiskServiceDbQueries + .listDisks(Map.empty, includeDeleted = false, Some(userInfo.userEmail), None, Some(workspaceId)) + .transaction + + disk1 <- persistentDiskQuery.getActiveByName(azureCloudContext, DiskName("diskName1")).transaction + disk2 <- persistentDiskQuery.getActiveByName(azureCloudContext, DiskName("diskName2")).transaction + disk3 <- persistentDiskQuery.getActiveByName(azureCloudContext, DiskName("diskName3")).transaction + + _ <- persistentDiskQuery.updateWSMResourceId(disk1.get.id, wsmResourceId, context.now).transaction + _ <- persistentDiskQuery.updateWSMResourceId(disk2.get.id, wsmResourceId, context.now).transaction + _ <- persistentDiskQuery.updateWSMResourceId(disk3.get.id, wsmResourceId, context.now).transaction + _ <- publisherQueue.tryTakeN(Some(3)) // clean out create msg - azureCloudContext <- wsmDao.getWorkspace(workspaceId, dummyAuth).map(_.get.azureContext) - preDeleteCluster_1 <- RuntimeServiceDbQueries.getActiveRuntimeRecord(workspaceId, runtimeName_1).transaction + preDeleteCluster_1 <- RuntimeServiceDbQueries.getActiveRuntimeRecord(workspaceId, runtimeName_1).transaction preDeleteCluster_2 <- RuntimeServiceDbQueries.getActiveRuntimeRecord(workspaceId, runtimeName_2).transaction - preDeleteCluster_3 <- RuntimeServiceDbQueries.getActiveRuntimeRecord(workspaceId, runtimeName_3).transaction _ <- clusterQuery.updateClusterStatus(preDeleteCluster_1.id, RuntimeStatus.Deleted, context.now).transaction _ <- clusterQuery.updateClusterStatus(preDeleteCluster_2.id, RuntimeStatus.Running, context.now).transaction _ <- clusterQuery.updateClusterStatus(preDeleteCluster_3.id, RuntimeStatus.Error, context.now).transaction - wsmResourceId_1 = WsmControlledResourceId(UUID.fromString(preDeleteCluster_1.internalId)) wsmResourceId_2 = WsmControlledResourceId(UUID.fromString(preDeleteCluster_2.internalId)) wsmResourceId_3 = WsmControlledResourceId(UUID.fromString(preDeleteCluster_3.internalId)) diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/model/ModelsSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/model/ModelsSpec.scala new file mode 100644 index 00000000000..6ad2ea16f3d --- /dev/null +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/model/ModelsSpec.scala @@ -0,0 +1,26 @@ +package org.broadinstitute.dsde.workbench.leonardo.model + +import org.broadinstitute.dsde.workbench.leonardo.{LeonardoTestSuite, WsmState} +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers + +class ModelsSpec extends LeonardoTestSuite with Matchers with AnyFlatSpecLike { + it should "convert WSM state to value correctly" in { + WsmState(None).value shouldBe "DELETED" + WsmState(Some("CREATING")).value shouldBe "CREATING" + WsmState(Some("UPDATING")).value shouldBe "UPDATING" + WsmState(Some("BROKEN")).value shouldBe "BROKEN" + WsmState(Some("READY")).value shouldBe "READY" + } + it should "determine if WSM state is deletable" in { + WsmState(None).isDeletable shouldBe true + WsmState(Some("BROKEN")).isDeletable shouldBe true + WsmState(Some("READY")).isDeletable shouldBe true + WsmState(Some("CREATING")).isDeletable shouldBe false + WsmState(Some("UPDATING")).isDeletable shouldBe false + } + it should "determine if WSM state is deleted" in { + WsmState(None).isDeleted shouldBe true + WsmState(Some("READY")).isDeleted shouldBe false + } +} diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala index c0cba47ed17..facd86d7d8c 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala @@ -484,6 +484,115 @@ class AzurePubsubHandlerSpec res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } + it should "poll on azure delete vm and disk " in isolatedDbTest { + val queue = QueueFactory.asyncTaskQueue() + val mockWsmDao = mock[WsmDao[IO]] + when { + mockWsmDao.getLandingZoneResources(BillingProfileId(any[String]), any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(landingZoneResources) + when { + mockWsmDao.getWorkspaceStorageContainer(WorkspaceId(any[UUID]), any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(Some(StorageContainerResponse(ContainerName("dummy"), storageContainerResourceId))) + when { + mockWsmDao.deleteStorageContainer(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.deleteVm(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.deleteDisk(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.getDeleteDiskJobResult(any[GetJobResultRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure( + GetDeleteJobResult( + WsmJobReport( + WsmJobId("job1"), + "desc", + WsmJobStatus.Succeeded, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None + ) + ) + when { + mockWsmDao.getDeleteVmJobResult(any[GetJobResultRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure( + GetDeleteJobResult( + WsmJobReport( + WsmJobId("job1"), + "desc", + WsmJobStatus.Succeeded, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None + ) + ) + + val azureInterp = makeAzurePubsubHandler(asyncTaskQueue = queue, wsmDAO = mockWsmDao) + + val res = + for { + disk <- makePersistentDisk().copy(status = DiskStatus.Ready).save() + + azureRuntimeConfig = RuntimeConfig.AzureConfig(MachineTypeName(VirtualMachineSizeTypes.STANDARD_A1.toString), + Some(disk.id), + None + ) + runtime = makeCluster(2) + .copy( + status = RuntimeStatus.Running, + cloudContext = CloudContext.Azure(azureCloudContext) + ) + .saveWithRuntimeConfig(azureRuntimeConfig) + + assertions = for { + getRuntimeOpt <- clusterQuery.getClusterById(runtime.id).transaction + getRuntime = getRuntimeOpt.get + controlledResources <- controlledResourceQuery.getAllForRuntime(runtime.id).transaction + } yield { + verify(mockWsmDao, times(1)).getDeleteDiskJobResult(any[GetJobResultRequest], any[Authorization])( + any[Ask[IO, AppContext]] + ) + verify(mockWsmDao, times(1)).getDeleteVmJobResult(any[GetJobResultRequest], any[Authorization])( + any[Ask[IO, AppContext]] + ) + getRuntime.status shouldBe RuntimeStatus.Deleted + controlledResources.length shouldBe 3 + } + + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDisk) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureStorageContainer) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDatabase) + .transaction + msg = DeleteAzureRuntimeMessage(runtime.id, + Some(disk.id), + workspaceId, + Some(wsmResourceId), + billingProfileId, + None + ) + + asyncTaskProcessor = AsyncTaskProcessor(AsyncTaskProcessor.Config(10, 10), queue) + _ <- azureInterp.deleteAndPollRuntime(msg) + + _ <- withInfiniteStream(asyncTaskProcessor.process, assertions) + } yield () + + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + it should "delete azure vm but keep the disk if no disk specified" in isolatedDbTest { val queue = QueueFactory.asyncTaskQueue() val mockWsmDao = mock[WsmDao[IO]] @@ -656,7 +765,7 @@ class AzurePubsubHandlerSpec AzureRuntimeCreationError( runtime.id, workspaceId, - s"No associated resourceId found for Disk id:${disk.id.value}", + s"WSMResource record:${wsmResourceId.value} not found for disk id:${disk.id.value}", true ) ) @@ -706,7 +815,7 @@ class AzurePubsubHandlerSpec AzureRuntimeCreationError( runtime.id, workspaceId, - s"WSMResource:${resourceId.value} not found for disk id:${disk.id.value}", + s"WSMResource record:${resourceId.value} not found for disk id:${disk.id.value}", true ) ) @@ -846,6 +955,155 @@ class AzurePubsubHandlerSpec res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } + it should "fail to delete Azure Vm if WSM delete VM job fails" in isolatedDbTest { + val exceptionMsg = "WSM delete VM job failed due to unknown" + val queue = QueueFactory.asyncTaskQueue() + val wsm = new MockWsmDAO { + override def getDeleteVmJobResult(request: GetJobResultRequest, authorization: Authorization)(implicit + ev: Ask[IO, AppContext] + ): IO[GetDeleteJobResult] = + IO.pure( + GetDeleteJobResult( + WsmJobReport( + request.jobId, + "desc", + WsmJobStatus.Failed, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None // Some(WsmErrorReport("test error", 200, )) + ) + ) + } + val azureInterp = makeAzurePubsubHandler(asyncTaskQueue = queue, wsmDAO = wsm) + + val res = + for { + disk <- makePersistentDisk().copy(status = DiskStatus.Ready).save() + + azureRuntimeConfig = RuntimeConfig.AzureConfig(MachineTypeName(VirtualMachineSizeTypes.STANDARD_A1.toString), + Some(disk.id), + None + ) + runtime = makeCluster(1) + .copy( + status = RuntimeStatus.Running, + cloudContext = CloudContext.Azure(azureCloudContext) + ) + .saveWithRuntimeConfig(azureRuntimeConfig) + + // Here we manually save a controlled resource with the runtime because we want too ensure it isn't deleted on error + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDatabase) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDisk) + .transaction + + assertions = for { + getRuntimeOpt <- clusterQuery.getClusterById(runtime.id).transaction + getRuntime = getRuntimeOpt.get + error <- clusterErrorQuery.get(runtime.id).transaction + } yield { + getRuntime.status shouldBe RuntimeStatus.Error + error.map(_.errorMessage).head should include(exceptionMsg) + } + + msg = DeleteAzureRuntimeMessage(runtime.id, + Some(disk.id), + workspaceId, + Some(wsmResourceId), + billingProfileId, + None + ) + + asyncTaskProcessor = AsyncTaskProcessor(AsyncTaskProcessor.Config(10, 10), queue) + _ <- azureInterp.deleteAndPollRuntime(msg) + + _ <- withInfiniteStream(asyncTaskProcessor.process, assertions) + } yield () + + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + + it should "update state correctly if WSM deleteDisk job fails during runtime deletion" in isolatedDbTest { + val queue = QueueFactory.asyncTaskQueue() + val wsm = new MockWsmDAO { + override def getDeleteDiskJobResult(request: GetJobResultRequest, authorization: Authorization)(implicit + ev: Ask[IO, AppContext] + ): IO[GetDeleteJobResult] = + IO.pure( + GetDeleteJobResult( + WsmJobReport( + request.jobId, + "desc", + WsmJobStatus.Failed, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None // Some(WsmErrorReport("test error", 200, )) + ) + ) + } + val azureInterp = makeAzurePubsubHandler(asyncTaskQueue = queue, wsmDAO = wsm) + + val res = + for { + disk <- makePersistentDisk().copy(status = DiskStatus.Ready).save() + + azureRuntimeConfig = RuntimeConfig.AzureConfig(MachineTypeName(VirtualMachineSizeTypes.STANDARD_A1.toString), + Some(disk.id), + None + ) + runtime = makeCluster(1) + .copy( + status = RuntimeStatus.Running, + cloudContext = CloudContext.Azure(azureCloudContext) + ) + .saveWithRuntimeConfig(azureRuntimeConfig) + + // Here we manually save a controlled resource with the runtime because we want too ensure it isn't deleted on error + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDatabase) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDisk) + .transaction + + assertions = for { + getRuntimeOpt <- clusterQuery.getClusterById(runtime.id).transaction + runtime = getRuntimeOpt.get + getDiskOpt <- persistentDiskQuery.getById(disk.id).transaction + getDisk = getDiskOpt.get + diskAttached <- persistentDiskQuery.isDiskAttached(disk.id).transaction + } yield { + // VM must be deleted successfully for deleteDisk action to start + // disk can then be deleted from the cloud environment page if desired + runtime.status shouldBe RuntimeStatus.Deleted + getDisk.status shouldBe DiskStatus.Error + diskAttached shouldBe false + } + + msg = DeleteAzureRuntimeMessage(runtime.id, + Some(disk.id), + workspaceId, + Some(wsmResourceId), + billingProfileId, + None + ) + + asyncTaskProcessor = AsyncTaskProcessor(AsyncTaskProcessor.Config(10, 10), queue) + _ <- azureInterp.deleteAndPollRuntime(msg) + + _ <- withInfiniteStream(asyncTaskProcessor.process, assertions) + } yield () + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + it should "update runtime correctly when wsm deleteDisk call errors on runtime deletion" in isolatedDbTest { val exceptionMsg = "test exception" val queue = QueueFactory.asyncTaskQueue() @@ -1167,31 +1425,83 @@ class AzurePubsubHandlerSpec res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } - it should "fail to delete a disk without a WSMresourceId" in isolatedDbTest { + it should "not send delete to WSM without a disk record" in isolatedDbTest { val queue = QueueFactory.asyncTaskQueue() - val wsmResourceId = WsmControlledResourceId(UUID.randomUUID()) val mockWsmDao = mock[WsmDao[IO]] + when { + mockWsmDao.getLandingZoneResources(BillingProfileId(any[String]), any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(landingZoneResources) + when { + mockWsmDao.getWorkspaceStorageContainer(WorkspaceId(any[UUID]), any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(Some(StorageContainerResponse(ContainerName("dummy"), storageContainerResourceId))) + when { + mockWsmDao.deleteStorageContainer(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.deleteVm(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.deleteDisk(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.getDeleteVmJobResult(any[GetJobResultRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure( + GetDeleteJobResult( + WsmJobReport( + WsmJobId("job1"), + "desc", + WsmJobStatus.Succeeded, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None + ) + ) + val azureInterp = makeAzurePubsubHandler(asyncTaskQueue = queue, wsmDAO = mockWsmDao) val res = for { - disk <- makePersistentDisk(wsmResourceId = Some(wsmResourceId)).copy(status = DiskStatus.Ready).save() + disk <- makePersistentDisk(wsmResourceId = None).copy(status = DiskStatus.Ready).save() - msg = DeleteDiskV2Message(disk.id, workspaceId, cloudContextAzure, Some(wsmResourceId), None) + azureRuntimeConfig = RuntimeConfig.AzureConfig(MachineTypeName(VirtualMachineSizeTypes.STANDARD_A1.toString), + Some(disk.id), + None + ) + runtime = makeCluster(2) + .copy( + status = RuntimeStatus.Running, + cloudContext = CloudContext.Azure(azureCloudContext) + ) + .saveWithRuntimeConfig(azureRuntimeConfig) - err <- azureInterp.deleteDisk(msg).attempt + assertions = for { + diskStatusOpt <- persistentDiskQuery.getStatus(disk.id).transaction + diskStatus = diskStatusOpt.get + } yield { + verify(mockWsmDao, times(0)).deleteDisk(any[DeleteWsmResourceRequest], any[Authorization])( + any[Ask[IO, AppContext]] + ) - } yield err shouldBe Left( - AzureDiskResourceDeletionError( - Right(wsmResourceId), - workspaceId, - "No disk resource found for delete azure disk. No-op for wsmDao.deleteDisk." + diskStatus shouldBe DiskStatus.Deleted + } + msg = DeleteAzureRuntimeMessage(runtime.id, + Some(disk.id), + workspaceId, + Some(wsmResourceId), + billingProfileId, + None ) - ) + asyncTaskProcessor = AsyncTaskProcessor(AsyncTaskProcessor.Config(10, 10), queue) + _ <- azureInterp.deleteAndPollRuntime(msg) + + _ <- withInfiniteStream(asyncTaskProcessor.process, assertions) + } yield () res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } - // Needs to be made for each test its used in, otherwise queue will overlap def makeAzurePubsubHandler(asyncTaskQueue: Queue[IO, Task[IO]] = QueueFactory.asyncTaskQueue(), relayService: AzureRelayService[IO] = FakeAzureRelayService, From 384cc8bf44aeda21426854872884213dbd3ff0fb Mon Sep 17 00:00:00 2001 From: Broad Bot <61600560+broadbot@users.noreply.github.com> Date: Tue, 6 Feb 2024 17:35:15 -0500 Subject: [PATCH 3/3] WM-2488: Update Cromwell version to 0.2.429 (#4186) --- Dockerfile | 2 +- http/src/main/resources/reference.conf | 4 ++-- .../dsde/workbench/leonardo/KubernetesTestData.scala | 2 +- .../dsde/workbench/leonardo/http/ConfigReaderSpec.scala | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 990787242a4..1cc18d7987f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,7 +30,7 @@ ENV TERRA_APP_VERSION 0.5.0 ENV GALAXY_VERSION 2.8.1 ENV NGINX_VERSION 4.3.0 # If you update this here, make sure to also update reference.conf: -ENV CROMWELL_CHART_VERSION 0.2.428 +ENV CROMWELL_CHART_VERSION 0.2.429 ENV HAIL_BATCH_CHART_VERSION 0.1.9 ENV RSTUDIO_CHART_VERSION 0.4.0 ENV SAS_CHART_VERSION 0.5.0 diff --git a/http/src/main/resources/reference.conf b/http/src/main/resources/reference.conf index 2a52f792944..87806240d5b 100644 --- a/http/src/main/resources/reference.conf +++ b/http/src/main/resources/reference.conf @@ -229,7 +229,7 @@ azure { coa-app-config { instrumentation-enabled = false chart-name = "cromwell-helm/cromwell-on-azure" - chart-version = "0.2.428" + chart-version = "0.2.429" release-name-suffix = "coa-rls" namespace-name-suffix = "coa-ns" ksa-name = "coa-ksa" @@ -765,7 +765,7 @@ gke { cromwellApp { # If you update the chart name or version here, make sure to also update it in the dockerfile: chartName = "/leonardo/cromwell" - chartVersion = "0.2.428" + chartVersion = "0.2.429" services = [ { name = "cromwell-service" diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala index 4f939968a7e..dd9a0b2be98 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala @@ -58,7 +58,7 @@ object KubernetesTestData { val ingressChart = Chart(ingressChartName, ingressChartVersion) val coaChartName = ChartName("cromwell-helm/cromwell-on-azure") - val coaChartVersion = ChartVersion("0.2.428") + val coaChartVersion = ChartVersion("0.2.429") val coaChart = Chart(coaChartName, coaChartVersion) diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala index 431920fd481..d7d97c1ebe6 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala @@ -76,7 +76,7 @@ class ConfigReaderSpec extends AnyFlatSpec with Matchers { AzureAppRegistrationConfig(ClientId(""), ClientSecret(""), ManagedAppTenantId("")), CoaAppConfig( ChartName("cromwell-helm/cromwell-on-azure"), - ChartVersion("0.2.428"), + ChartVersion("0.2.429"), ReleaseNameSuffix("coa-rls"), NamespaceNameSuffix("coa-ns"), KsaName("coa-ksa"),