From 493abbc0033a48e3ed1a72f39f03daaaec9faf77 Mon Sep 17 00:00:00 2001 From: Eric Rollins Date: Fri, 26 Jan 2024 13:29:33 -0500 Subject: [PATCH 1/6] RW-11654 Enable pending pacts (#4144) --- .../workbench/leonardo/provider/LeoProvider.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/provider/LeoProvider.scala b/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/provider/LeoProvider.scala index 64c9b6c6719..f20fd4b87aa 100644 --- a/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/provider/LeoProvider.scala +++ b/pact4s/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/provider/LeoProvider.scala @@ -137,10 +137,13 @@ class LeoProvider extends AnyFlatSpec with BeforeAndAfterAll with PactVerifier { } val provider: ProviderInfoBuilder = - ProviderInfoBuilder(name = "leonardo", - PactSource - .PactBrokerWithSelectors(pactBrokerUrl) - .withAuth(BasicAuth(pactBrokerUser, pactBrokerPass)) + ProviderInfoBuilder( + name = "leonardo", + PactSource + .PactBrokerWithSelectors(pactBrokerUrl) + .withAuth(BasicAuth(pactBrokerUser, pactBrokerPass)) + .withPendingPacts(true) + .withProviderTags(ProviderTags("develop")) ) .withStateManagementFunction( providerStatesHandler From 2874e4d65f8540f57dc9117fdf805728d09247e3 Mon Sep 17 00:00:00 2001 From: David An Date: Tue, 30 Jan 2024 09:13:29 -0500 Subject: [PATCH 2/6] AJ-1550: bump listener image and chart (#4154) --- http/src/main/resources/reference.conf | 4 ++-- .../dsde/workbench/leonardo/http/ConfigReaderSpec.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/http/src/main/resources/reference.conf b/http/src/main/resources/reference.conf index ae402a14faf..350b2d212d2 100644 --- a/http/src/main/resources/reference.conf +++ b/http/src/main/resources/reference.conf @@ -214,7 +214,7 @@ azure { minor-version-auto-upgrade = true, file-uris = ["https://raw.githubusercontent.com/DataBiosphere/leonardo/52aab3b7f252667f73b23682062ab3e0d9d533b9/http/src/main/resources/init-resources/azure_vm_init_script.sh"] } - listener-image = "terradevacrpublic.azurecr.io/terra-azure-relay-listeners:bd50dbc" + listener-image = "terradevacrpublic.azurecr.io/terra-azure-relay-listeners:ae65380" } } @@ -412,7 +412,7 @@ azure { listener-chart-config { chart-name = "terra-helm/listener" - chart-version = "0.2.0" + chart-version = "0.3.0" } } diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala index 93905093196..538767cef80 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala @@ -68,7 +68,7 @@ class ConfigReaderSpec extends AnyFlatSpec with Matchers { "https://raw.githubusercontent.com/DataBiosphere/leonardo/52aab3b7f252667f73b23682062ab3e0d9d533b9/http/src/main/resources/init-resources/azure_vm_init_script.sh" ) ), - "terradevacrpublic.azurecr.io/terra-azure-relay-listeners:bd50dbc", + "terradevacrpublic.azurecr.io/terra-azure-relay-listeners:ae65380", VMCredential(username = "username", password = "password") ) ), @@ -220,7 +220,7 @@ class ConfigReaderSpec extends AnyFlatSpec with Matchers { ), List(AppType.Wds, AppType.WorkflowsApp), TdrConfig("https://jade.datarepo-dev.broadinstitute.org"), - ListenerChartConfig(ChartName("terra-helm/listener"), ChartVersion("0.2.0")) + ListenerChartConfig(ChartName("terra-helm/listener"), ChartVersion("0.3.0")) ), OidcAuthConfig( Uri.unsafeFromString("https://fake"), From ce2158933efbc22ee328e9b4a91e04747fd18e46 Mon Sep 17 00:00:00 2001 From: Yonghao Yu Date: Tue, 30 Jan 2024 12:03:19 -0500 Subject: [PATCH 3/6] [RW-11775][risk=no] upgrade sas and rstudio to increase proxy size limit (#4153) Co-authored-by: Qi Wang --- Dockerfile | 4 ++-- http/src/main/resources/reference.conf | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 98e875fd975..2bdaf2824af 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,8 +32,8 @@ ENV NGINX_VERSION 4.3.0 # If you update this here, make sure to also update reference.conf: ENV CROMWELL_CHART_VERSION 0.2.422 ENV HAIL_BATCH_CHART_VERSION 0.1.9 -ENV RSTUDIO_CHART_VERSION 0.3.0 -ENV SAS_CHART_VERSION 0.3.0 +ENV RSTUDIO_CHART_VERSION 0.4.0 +ENV SAS_CHART_VERSION 0.5.0 RUN mkdir /leonardo COPY ./leonardo*.jar /leonardo diff --git a/http/src/main/resources/reference.conf b/http/src/main/resources/reference.conf index 350b2d212d2..e7a08058d14 100644 --- a/http/src/main/resources/reference.conf +++ b/http/src/main/resources/reference.conf @@ -786,8 +786,8 @@ gke { # This is really just a prefix of the chart name. Full chart name for AOU app will be # this chartName in config file appended with allowedChartName from createAppRequest chartName = "/leonardo/" - rstudioChartVersion = "0.3.0" - sasChartVersion = "0.3.0" + rstudioChartVersion = "0.4.0" + sasChartVersion = "0.5.0" services = [ { From 0975c0db93b5a39cc2375de26767cc4c9bcdcda6 Mon Sep 17 00:00:00 2001 From: Justin Canas Date: Tue, 30 Jan 2024 13:41:04 -0800 Subject: [PATCH 4/6] [IA-4786] Readme change and notification change (#4160) Co-authored-by: jdcanas --- .github/{README.md => workflow-README.md} | 0 .github/workflows/azure_automation_test.yml | 5 +++-- 2 files changed, 3 insertions(+), 2 deletions(-) rename .github/{README.md => workflow-README.md} (100%) diff --git a/.github/README.md b/.github/workflow-README.md similarity index 100% rename from .github/README.md rename to .github/workflow-README.md diff --git a/.github/workflows/azure_automation_test.yml b/.github/workflows/azure_automation_test.yml index 424c051d1ab..99e3df7d062 100644 --- a/.github/workflows/azure_automation_test.yml +++ b/.github/workflows/azure_automation_test.yml @@ -301,8 +301,9 @@ jobs: - name: Notify slack uses: slackapi/slack-github-action@v1.23.0 with: - # Channel is for #ia-notification-test - channel-id: 'C03ATF4QXEV' + # C03ATF4QXEV is for #ia-notification-test + # C53JYBV9A is for #dsde-qa + channel-id: 'C03ATF4QXEV,C53JYBV9A' slack-message: "Azure E2E Tests FAILED, automation-branch: ${{ needs.init-github-context.outputs.automation-branch }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" env: SLACK_BOT_TOKEN: ${{ secrets.SLACKBOT_TOKEN }} From 2d11d206ee33f2ed991c7418965b33981f5d1915 Mon Sep 17 00:00:00 2001 From: Broad Bot <61600560+broadbot@users.noreply.github.com> Date: Wed, 31 Jan 2024 18:39:55 -0500 Subject: [PATCH 5/6] WX-767: Update Cromwell version to 0.2.426 (#4171) --- Dockerfile | 2 +- http/src/main/resources/reference.conf | 4 ++-- .../dsde/workbench/leonardo/KubernetesTestData.scala | 2 +- .../dsde/workbench/leonardo/http/ConfigReaderSpec.scala | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2bdaf2824af..5de33f143f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,7 +30,7 @@ ENV TERRA_APP_VERSION 0.5.0 ENV GALAXY_VERSION 2.8.1 ENV NGINX_VERSION 4.3.0 # If you update this here, make sure to also update reference.conf: -ENV CROMWELL_CHART_VERSION 0.2.422 +ENV CROMWELL_CHART_VERSION 0.2.426 ENV HAIL_BATCH_CHART_VERSION 0.1.9 ENV RSTUDIO_CHART_VERSION 0.4.0 ENV SAS_CHART_VERSION 0.5.0 diff --git a/http/src/main/resources/reference.conf b/http/src/main/resources/reference.conf index e7a08058d14..0f6c073fb98 100644 --- a/http/src/main/resources/reference.conf +++ b/http/src/main/resources/reference.conf @@ -229,7 +229,7 @@ azure { coa-app-config { instrumentation-enabled = false chart-name = "cromwell-helm/cromwell-on-azure" - chart-version = "0.2.422" + chart-version = "0.2.426" release-name-suffix = "coa-rls" namespace-name-suffix = "coa-ns" ksa-name = "coa-ksa" @@ -765,7 +765,7 @@ gke { cromwellApp { # If you update the chart name or version here, make sure to also update it in the dockerfile: chartName = "/leonardo/cromwell" - chartVersion = "0.2.422" + chartVersion = "0.2.426" services = [ { name = "cromwell-service" diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala index 8e9d683af6b..6bb941f41b9 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/KubernetesTestData.scala @@ -58,7 +58,7 @@ object KubernetesTestData { val ingressChart = Chart(ingressChartName, ingressChartVersion) val coaChartName = ChartName("cromwell-helm/cromwell-on-azure") - val coaChartVersion = ChartVersion("0.2.422") + val coaChartVersion = ChartVersion("0.2.426") val coaChart = Chart(coaChartName, coaChartVersion) diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala index 538767cef80..1e2aec260e6 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/http/ConfigReaderSpec.scala @@ -76,7 +76,7 @@ class ConfigReaderSpec extends AnyFlatSpec with Matchers { AzureAppRegistrationConfig(ClientId(""), ClientSecret(""), ManagedAppTenantId("")), CoaAppConfig( ChartName("cromwell-helm/cromwell-on-azure"), - ChartVersion("0.2.422"), + ChartVersion("0.2.426"), ReleaseNameSuffix("coa-rls"), NamespaceNameSuffix("coa-ns"), KsaName("coa-ksa"), From 7e2df46ef5294eed43730cd7d109f421ed3fde46 Mon Sep 17 00:00:00 2001 From: lmcnatt <85642387+lucymcnatt@users.noreply.github.com> Date: Thu, 1 Feb 2024 10:53:11 -0500 Subject: [PATCH 6/6] [IA-4634] Add polling when deleting disk in WSM (#4152) --- .../leonardo/db/ClusterComponent.scala | 11 + .../leoPubsubMessageSubscriberModels.scala | 7 +- .../leonardo/util/AzurePubsubHandler.scala | 176 ++++++------ .../util/AzurePubsubHandlerSpec.scala | 261 ++++++++++++++++++ 4 files changed, 367 insertions(+), 88 deletions(-) diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala index b233d24783a..2aedd1e0623 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/db/ClusterComponent.scala @@ -715,6 +715,17 @@ object clusterQuery extends TableQuery(new ClusterTable(_)) { _ <- diskIdOpt.traverse(diskId => persistentDiskQuery.delete(diskId, dateAccessed)) } yield () + def getDiskId(runtimeId: Long)(implicit + ec: ExecutionContext + ): DBIO[Option[DiskId]] = + for { + runtimeConfigId <- findByIdQuery(runtimeId) + .map(_.runtimeConfigId) + .result + .headOption + diskIdOpt <- runtimeConfigId.flatTraverse(rid => RuntimeConfigQueries.getDiskId(rid)) + } yield diskIdOpt + def setToRunning(id: Long, hostIp: IP, dateAccessed: Instant): DBIO[Int] = updateClusterStatusAndHostIp(id, RuntimeStatus.Running, Some(hostIp), dateAccessed) diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala index 3248d37ced4..182df96a10c 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/monitor/leoPubsubMessageSubscriberModels.scala @@ -1069,17 +1069,18 @@ object PubsubHandleMessageError { final case class DiskDeletionError(diskId: DiskId, workspaceId: WorkspaceId, errorMsg: String) extends PubsubHandleMessageError { override def getMessage: String = - s"\n\tdisk ${diskId} in workspace ${workspaceId}, \n\tmsg: ${errorMsg})" + s"\n\tdisk ${diskId.value} in workspace ${workspaceId.value}, \n\tmsg: ${errorMsg})" val isRetryable: Boolean = false } - final case class AzureDiskDeletionError(wsmControlledResourceId: WsmControlledResourceId, + final case class AzureDiskDeletionError(diskId: DiskId, + wsmControlledResourceId: WsmControlledResourceId, workspaceId: WorkspaceId, errorMsg: String ) extends PubsubHandleMessageError { override def getMessage: String = - s"\n\tdisk resource: ${wsmControlledResourceId}, \n\tmsg: ${errorMsg})" + s"\n\tdisk ${diskId.value} with resource id: ${wsmControlledResourceId.value}, \n\tmsg: ${errorMsg})" val isRetryable: Boolean = false } diff --git a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala index 5811d442785..5815a3c6ec3 100644 --- a/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala +++ b/http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala @@ -49,7 +49,12 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( )(implicit val executionContext: ExecutionContext, dbRef: DbReference[F], logger: StructuredLogger[F], F: Async[F]) extends AzurePubsubHandlerAlgebra[F] { - implicit val isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v + // implicits necessary to poll on the status of external jobs + implicit private def isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v + implicit private def wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) => + v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed + implicit private def wsmDeleteDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) => + v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed override def createAndPollRuntime(msg: CreateAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] = for { @@ -450,9 +455,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( resourceId ) - private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] = { - implicit val wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) => - v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed + private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] = for { ctx <- ev.ask auth <- samDAO.getLeoAuthToken @@ -542,11 +545,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( ) ) } yield () - } override def deleteAndPollRuntime(msg: DeleteAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] = { - implicit val wsmDeleteVmDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) => - v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed for { ctx <- ev.ask @@ -611,8 +611,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( _ <- diskRecordOpt match { case Some(diskRecord) => for { - _ <- sendDeleteDisktoWSM(diskRecord.resourceId, msg.workspaceId, auth) - _ <- logger.info(ctx.loggingCtx)(s"runtime disk ${diskId.value} is deleted successfully") + _ <- deleteDiskInWSM(diskId, diskRecord.resourceId, msg.workspaceId, auth, Some(runtime.id)) } yield () case _ => // if the disk hasn't been created in WSM yet, skip disk deletion @@ -625,7 +624,6 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( }.void // Delete the VM in WSM - // TODO: Add polling on disk deletion _ <- msg.wsmResourceId.fold( for { // Error'd runtimes might not have a WSM resourceId and therefore no WsmJobStatus. @@ -687,7 +685,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( _ <- deleteDiskAction _ <- dbRef.inTransaction(clusterQuery.completeDeletion(runtime.id, ctx.now)) _ <- logger.info(ctx.loggingCtx)( - s"runtime ${msg.runtimeId} and associated disk is deleted successfully" + s"runtime ${msg.runtimeId} ${if (msg.diskIdToDelete.isDefined) "and associated disk"} have been deleted successfully" ) } yield () case WsmJobStatus.Failed => @@ -775,23 +773,22 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( _ <- clusterQuery.updateClusterStatus(e.runtimeId, RuntimeStatus.Error, now).transaction auth <- samDAO.getLeoAuthToken + diskIdOpt <- clusterQuery.getDiskId(e.runtimeId).transaction - _ <- e.useExistingDisk match { - case false => + _ <- (e.useExistingDisk, diskIdOpt) match { + // disk was supposed to be created and was + case (false, Some(diskId)) => for { diskRecordOpt <- controlledResourceQuery .getWsmRecordForRuntime(e.runtimeId, WsmResourceType.AzureDisk) .transaction _ <- diskRecordOpt match { + // if there is a disk record, the disk finished creating, so it must be deleted in WSM case Some(diskRecord) => for { - // TODO: Add polling on disk deletion - _ <- sendDeleteDisktoWSM(diskRecord.resourceId, e.workspaceId, auth) - _ <- clusterQuery.setDiskDeleted(e.runtimeId, now).transaction - _ <- logger.info(ctx.loggingCtx)(s"disk for runtime ${e.runtimeId} is deleted successfully") + _ <- deleteDiskInWSM(diskId, diskRecord.resourceId, e.workspaceId, auth, Some(e.runtimeId)) } yield () case _ => - // if the disk hasn't been created in WSM yet, skip disk deletion for { _ <- logger.info( s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk." @@ -801,7 +798,13 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( } } yield () - case true => F.unit + // disk was supposed to be created and wasn't + case (false, None) => + logger.info( + s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk." + ) + // no disk created + case (true, _) => F.unit } } yield () @@ -896,14 +899,15 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( } } yield () - private def sendDeleteDisktoWSM(wsmResourceId: WsmControlledResourceId, - workspaceId: WorkspaceId, - auth: Authorization - )(implicit - ev: Ask[F, AppContext] - ): F[WsmJobId] = + private def deleteDiskInWSM(diskId: DiskId, + wsmResourceId: WsmControlledResourceId, + workspaceId: WorkspaceId, + auth: Authorization, + runtimeId: Option[Long] + )(implicit ev: Ask[F, AppContext]): F[Unit] = for { ctx <- ev.ask + jobId = getWsmJobId("delete-disk", wsmResourceId) _ <- logger.info(ctx.loggingCtx)(s"Sending WSM delete message for disk resource ${wsmResourceId.value}") @@ -921,76 +925,79 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( .void .adaptError(e => AzureDiskDeletionError( + diskId, wsmResourceId, workspaceId, s"${ctx.traceId.asString} | WSM call to delete disk failed due to ${e.getMessage}. Please retry delete again" ) ) - } yield jobId - override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] = { - implicit val wsmDeleteDiskDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) => - v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed - for { - ctx <- ev.ask - auth <- samDAO.getLeoAuthToken + getDeleteJobResult = wsmDao.getDeleteDiskJobResult( + GetJobResultRequest(workspaceId, jobId), + auth + ) - _ <- msg.wsmResourceId match { - case Some(wsmResourceId) => - for { - jobId <- sendDeleteDisktoWSM(wsmResourceId, msg.workspaceId, auth) - getDeleteJobResult = wsmDao.getDeleteDiskJobResult( - GetJobResultRequest(msg.workspaceId, jobId), - auth - ) + // We need to wait until WSM deletion job to be done to update the database + taskToRun = for { + resp <- streamFUntilDone( + getDeleteJobResult, + config.deleteDiskPollConfig.maxAttempts, + config.deleteDiskPollConfig.interval + ).compile.lastOrError - // We need to wait until WSM deletion job to be done to update the database - taskToRun = for { - resp <- streamFUntilDone( - getDeleteJobResult, - config.deleteDiskPollConfig.maxAttempts, - config.deleteDiskPollConfig.interval - ).compile.lastOrError - - _ <- resp.jobReport.status match { - case WsmJobStatus.Succeeded => - for { - _ <- logger.info(ctx.loggingCtx)(s"disk ${msg.diskId.value} is deleted successfully") - _ <- dbRef.inTransaction(persistentDiskQuery.delete(msg.diskId, ctx.now)) - } yield () - case WsmJobStatus.Failed => - F.raiseError[Unit]( - DiskDeletionError( - msg.diskId, - msg.workspaceId, - s"WSM delete disk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}" - ) - ) - case WsmJobStatus.Running => - F.raiseError[Unit]( - DiskDeletionError( - msg.diskId, - msg.workspaceId, - s"WSM delete VM disk was not completed within ${config.deleteDiskPollConfig.maxAttempts} attempts with ${config.deleteDiskPollConfig.interval} delay" - ) - ) + _ <- resp.jobReport.status match { + case WsmJobStatus.Succeeded => + for { + _ <- logger.info(ctx.loggingCtx)(s"disk ${diskId.value} is deleted successfully") + _ <- runtimeId match { + case Some(runtimeId) => clusterQuery.setDiskDeleted(runtimeId, ctx.now).transaction + case _ => dbRef.inTransaction(persistentDiskQuery.delete(diskId, ctx.now)) } } yield () - - _ <- asyncTasks.offer( - Task( - ctx.traceId, - taskToRun, - Some { e => - handleAzureDiskDeletionError( - DiskDeletionError(msg.diskId, msg.workspaceId, s"Fail to delete disk due to ${e.getMessage}") - ) - }, - ctx.now, - "deleteDiskV2" + case WsmJobStatus.Failed => + F.raiseError[Unit]( + AzureDiskDeletionError( + diskId, + wsmResourceId, + workspaceId, + s"WSM deleteDisk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}" ) ) - } yield () + case WsmJobStatus.Running => + F.raiseError[Unit]( + AzureDiskDeletionError( + diskId, + wsmResourceId, + workspaceId, + s"Wsm deleteDisk job was not completed within ${config.deleteDiskPollConfig.maxAttempts} attempts with ${config.deleteDiskPollConfig.interval} delay" + ) + ) + } + } yield () + + _ <- asyncTasks.offer( + Task( + ctx.traceId, + taskToRun, + Some { e => + handleAzureDiskDeletionError( + AzureDiskDeletionError(diskId, wsmResourceId, workspaceId, s"Fail to delete disk due to ${e.getMessage}") + ) + }, + ctx.now, + "deleteDiskV2" + ) + ) + } yield () + + override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] = + for { + ctx <- ev.ask + auth <- samDAO.getLeoAuthToken + + _ <- msg.wsmResourceId match { + case Some(wsmResourceId) => + deleteDiskInWSM(msg.diskId, wsmResourceId, msg.workspaceId, auth, None) case None => for { _ <- logger.info(s"No WSM resource found for Azure disk ${msg.diskId}, skipping deletion in WSM") @@ -999,9 +1006,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel]( } yield () } } yield () - } - def handleAzureDiskDeletionError(e: DiskDeletionError)(implicit + def handleAzureDiskDeletionError(e: AzureDiskDeletionError)(implicit ev: Ask[F, AppContext] ): F[Unit] = for { ctx <- ev.ask diff --git a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala index 54b2b053398..b1345a8f774 100644 --- a/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala +++ b/http/src/test/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandlerSpec.scala @@ -484,6 +484,118 @@ class AzurePubsubHandlerSpec res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } + it should "poll on azure delete vm and disk " in isolatedDbTest { + val queue = QueueFactory.asyncTaskQueue() + val mockWsmDao = mock[WsmDao[IO]] + when { + mockWsmDao.getLandingZoneResources(BillingProfileId(any[String]), any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(landingZoneResources) + when { + mockWsmDao.getWorkspaceStorageContainer(WorkspaceId(any[UUID]), any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(Some(StorageContainerResponse(ContainerName("dummy"), storageContainerResourceId))) + when { + mockWsmDao.deleteStorageContainer(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.deleteVm(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.deleteDisk(any[DeleteWsmResourceRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure(None) + when { + mockWsmDao.getDeleteDiskJobResult(any[GetJobResultRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure( + GetDeleteJobResult( + WsmJobReport( + WsmJobId("job1"), + "desc", + WsmJobStatus.Succeeded, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None + ) + ) + when { + mockWsmDao.getDeleteVmJobResult(any[GetJobResultRequest], any[Authorization])(any[Ask[IO, AppContext]]) + } thenReturn IO.pure( + GetDeleteJobResult( + WsmJobReport( + WsmJobId("job1"), + "desc", + WsmJobStatus.Succeeded, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None + ) + ) + + val azureInterp = makeAzurePubsubHandler(asyncTaskQueue = queue, wsmDAO = mockWsmDao) + + val res = + for { + disk <- makePersistentDisk().copy(status = DiskStatus.Ready).save() + + azureRuntimeConfig = RuntimeConfig.AzureConfig(MachineTypeName(VirtualMachineSizeTypes.STANDARD_A1.toString), + Some(disk.id), + None + ) + runtime = makeCluster(2) + .copy( + status = RuntimeStatus.Running, + cloudContext = CloudContext.Azure(azureCloudContext) + ) + .saveWithRuntimeConfig(azureRuntimeConfig) + + assertions = for { + getRuntimeOpt <- clusterQuery.getClusterById(runtime.id).transaction + getRuntime = getRuntimeOpt.get + getDiskOpt <- persistentDiskQuery.getById(disk.id).transaction + getDisk = getDiskOpt.get + controlledResources <- controlledResourceQuery.getAllForRuntime(runtime.id).transaction + } yield { + verify(mockWsmDao, times(1)).getDeleteDiskJobResult(any[GetJobResultRequest], any[Authorization])( + any[Ask[IO, AppContext]] + ) + verify(mockWsmDao, times(1)).getDeleteVmJobResult(any[GetJobResultRequest], any[Authorization])( + any[Ask[IO, AppContext]] + ) + getRuntime.status shouldBe RuntimeStatus.Deleted + getDisk.status shouldBe DiskStatus.Deleted + controlledResources.length shouldBe 3 + } + + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDisk) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureStorageContainer) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDatabase) + .transaction + msg = DeleteAzureRuntimeMessage(runtime.id, + Some(disk.id), + workspaceId, + Some(wsmResourceId), + billingProfileId, + None + ) + + asyncTaskProcessor = AsyncTaskProcessor(AsyncTaskProcessor.Config(10, 10), queue) + _ <- azureInterp.deleteAndPollRuntime(msg) + + _ <- withInfiniteStream(asyncTaskProcessor.process, assertions) + } yield () + + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + it should "delete azure vm but keep the disk if no disk specified" in isolatedDbTest { val queue = QueueFactory.asyncTaskQueue() val mockWsmDao = mock[WsmDao[IO]] @@ -846,6 +958,155 @@ class AzurePubsubHandlerSpec res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) } + it should "fail to delete Azure Vm if WSM delete VM job fails" in isolatedDbTest { + val exceptionMsg = "WSM delete VM job failed due to unknown" + val queue = QueueFactory.asyncTaskQueue() + val wsm = new MockWsmDAO { + override def getDeleteVmJobResult(request: GetJobResultRequest, authorization: Authorization)(implicit + ev: Ask[IO, AppContext] + ): IO[GetDeleteJobResult] = + IO.pure( + GetDeleteJobResult( + WsmJobReport( + request.jobId, + "desc", + WsmJobStatus.Failed, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None // Some(WsmErrorReport("test error", 200, )) + ) + ) + } + val azureInterp = makeAzurePubsubHandler(asyncTaskQueue = queue, wsmDAO = wsm) + + val res = + for { + disk <- makePersistentDisk().copy(status = DiskStatus.Ready).save() + + azureRuntimeConfig = RuntimeConfig.AzureConfig(MachineTypeName(VirtualMachineSizeTypes.STANDARD_A1.toString), + Some(disk.id), + None + ) + runtime = makeCluster(1) + .copy( + status = RuntimeStatus.Running, + cloudContext = CloudContext.Azure(azureCloudContext) + ) + .saveWithRuntimeConfig(azureRuntimeConfig) + + // Here we manually save a controlled resource with the runtime because we want too ensure it isn't deleted on error + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDatabase) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDisk) + .transaction + + assertions = for { + getRuntimeOpt <- clusterQuery.getClusterById(runtime.id).transaction + getRuntime = getRuntimeOpt.get + error <- clusterErrorQuery.get(runtime.id).transaction + } yield { + getRuntime.status shouldBe RuntimeStatus.Error + error.map(_.errorMessage).head should include(exceptionMsg) + } + + msg = DeleteAzureRuntimeMessage(runtime.id, + Some(disk.id), + workspaceId, + Some(wsmResourceId), + billingProfileId, + None + ) + + asyncTaskProcessor = AsyncTaskProcessor(AsyncTaskProcessor.Config(10, 10), queue) + _ <- azureInterp.deleteAndPollRuntime(msg) + + _ <- withInfiniteStream(asyncTaskProcessor.process, assertions) + } yield () + + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + + it should "update state correctly if WSM deleteDisk job fails during runtime deletion" in isolatedDbTest { + val queue = QueueFactory.asyncTaskQueue() + val wsm = new MockWsmDAO { + override def getDeleteDiskJobResult(request: GetJobResultRequest, authorization: Authorization)(implicit + ev: Ask[IO, AppContext] + ): IO[GetDeleteJobResult] = + IO.pure( + GetDeleteJobResult( + WsmJobReport( + request.jobId, + "desc", + WsmJobStatus.Failed, + 200, + ZonedDateTime.parse("2022-03-18T15:02:29.264756Z"), + Some(ZonedDateTime.parse("2022-03-18T15:02:29.264756Z")), + "resultUrl" + ), + None // Some(WsmErrorReport("test error", 200, )) + ) + ) + } + val azureInterp = makeAzurePubsubHandler(asyncTaskQueue = queue, wsmDAO = wsm) + + val res = + for { + disk <- makePersistentDisk().copy(status = DiskStatus.Ready).save() + + azureRuntimeConfig = RuntimeConfig.AzureConfig(MachineTypeName(VirtualMachineSizeTypes.STANDARD_A1.toString), + Some(disk.id), + None + ) + runtime = makeCluster(1) + .copy( + status = RuntimeStatus.Running, + cloudContext = CloudContext.Azure(azureCloudContext) + ) + .saveWithRuntimeConfig(azureRuntimeConfig) + + // Here we manually save a controlled resource with the runtime because we want too ensure it isn't deleted on error + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDatabase) + .transaction + _ <- controlledResourceQuery + .save(runtime.id, WsmControlledResourceId(UUID.randomUUID()), WsmResourceType.AzureDisk) + .transaction + + assertions = for { + getRuntimeOpt <- clusterQuery.getClusterById(runtime.id).transaction + runtime = getRuntimeOpt.get + getDiskOpt <- persistentDiskQuery.getById(disk.id).transaction + getDisk = getDiskOpt.get + diskAttached <- persistentDiskQuery.isDiskAttached(disk.id).transaction + } yield { + // VM must be deleted successfully for deleteDisk action to start + // disk can then be deleted from the cloud environment page if desired + runtime.status shouldBe RuntimeStatus.Deleted + getDisk.status shouldBe DiskStatus.Error + diskAttached shouldBe false + } + + msg = DeleteAzureRuntimeMessage(runtime.id, + Some(disk.id), + workspaceId, + Some(wsmResourceId), + billingProfileId, + None + ) + + asyncTaskProcessor = AsyncTaskProcessor(AsyncTaskProcessor.Config(10, 10), queue) + _ <- azureInterp.deleteAndPollRuntime(msg) + + _ <- withInfiniteStream(asyncTaskProcessor.process, assertions) + } yield () + res.unsafeRunSync()(cats.effect.unsafe.IORuntime.global) + } + it should "update runtime correctly when wsm deleteDisk call errors on runtime deletion" in isolatedDbTest { val exceptionMsg = "test exception" val queue = QueueFactory.asyncTaskQueue()