Skip to content

Commit

Permalink
Merge branch 'IA-4590-check-WSM-status' of https://github.com/DataBio…
Browse files Browse the repository at this point in the history
…sphere/leonardo into IA-4590-check-WSM-status
  • Loading branch information
lucymcnatt committed Feb 1, 2024
2 parents 2994b3a + 7e2df46 commit 6b89024
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 107 deletions.
File renamed without changes.
5 changes: 3 additions & 2 deletions .github/workflows/azure_automation_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ jobs:
- name: Notify slack
uses: slackapi/slack-github-action@v1.23.0
with:
# Channel is for #ia-notification-test
channel-id: 'C03ATF4QXEV'
# C03ATF4QXEV is for #ia-notification-test
# C53JYBV9A is for #dsde-qa
channel-id: 'C03ATF4QXEV,C53JYBV9A'
slack-message: "Azure E2E Tests FAILED, automation-branch: ${{ needs.init-github-context.outputs.automation-branch }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACKBOT_TOKEN }}
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ ENV TERRA_APP_VERSION 0.5.0
ENV GALAXY_VERSION 2.8.1
ENV NGINX_VERSION 4.3.0
# If you update this here, make sure to also update reference.conf:
ENV CROMWELL_CHART_VERSION 0.2.422
ENV CROMWELL_CHART_VERSION 0.2.426
ENV HAIL_BATCH_CHART_VERSION 0.1.9
ENV RSTUDIO_CHART_VERSION 0.3.0
ENV SAS_CHART_VERSION 0.3.0
ENV RSTUDIO_CHART_VERSION 0.4.0
ENV SAS_CHART_VERSION 0.5.0

RUN mkdir /leonardo
COPY ./leonardo*.jar /leonardo
Expand Down
12 changes: 6 additions & 6 deletions http/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ azure {
minor-version-auto-upgrade = true,
file-uris = ["https://raw.githubusercontent.com/DataBiosphere/leonardo/52aab3b7f252667f73b23682062ab3e0d9d533b9/http/src/main/resources/init-resources/azure_vm_init_script.sh"]
}
listener-image = "terradevacrpublic.azurecr.io/terra-azure-relay-listeners:bd50dbc"
listener-image = "terradevacrpublic.azurecr.io/terra-azure-relay-listeners:ae65380"
}
}

Expand All @@ -229,7 +229,7 @@ azure {
coa-app-config {
instrumentation-enabled = false
chart-name = "cromwell-helm/cromwell-on-azure"
chart-version = "0.2.422"
chart-version = "0.2.426"
release-name-suffix = "coa-rls"
namespace-name-suffix = "coa-ns"
ksa-name = "coa-ksa"
Expand Down Expand Up @@ -412,7 +412,7 @@ azure {

listener-chart-config {
chart-name = "terra-helm/listener"
chart-version = "0.2.0"
chart-version = "0.3.0"
}
}

Expand Down Expand Up @@ -765,7 +765,7 @@ gke {
cromwellApp {
# If you update the chart name or version here, make sure to also update it in the dockerfile:
chartName = "/leonardo/cromwell"
chartVersion = "0.2.422"
chartVersion = "0.2.426"
services = [
{
name = "cromwell-service"
Expand All @@ -786,8 +786,8 @@ gke {
# This is really just a prefix of the chart name. Full chart name for AOU app will be
# this chartName in config file appended with allowedChartName from createAppRequest
chartName = "/leonardo/"
rstudioChartVersion = "0.3.0"
sasChartVersion = "0.3.0"
rstudioChartVersion = "0.4.0"
sasChartVersion = "0.5.0"

services = [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,17 @@ object clusterQuery extends TableQuery(new ClusterTable(_)) {
_ <- diskIdOpt.traverse(diskId => persistentDiskQuery.delete(diskId, dateAccessed))
} yield ()

def getDiskId(runtimeId: Long)(implicit
ec: ExecutionContext
): DBIO[Option[DiskId]] =
for {
runtimeConfigId <- findByIdQuery(runtimeId)
.map(_.runtimeConfigId)
.result
.headOption
diskIdOpt <- runtimeConfigId.flatTraverse(rid => RuntimeConfigQueries.getDiskId(rid))
} yield diskIdOpt

def setToRunning(id: Long, hostIp: IP, dateAccessed: Instant): DBIO[Int] =
updateClusterStatusAndHostIp(id, RuntimeStatus.Running, Some(hostIp), dateAccessed)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,17 +1069,18 @@ object PubsubHandleMessageError {
final case class DiskDeletionError(diskId: DiskId, workspaceId: WorkspaceId, errorMsg: String)
extends PubsubHandleMessageError {
override def getMessage: String =
s"\n\tdisk ${diskId} in workspace ${workspaceId}, \n\tmsg: ${errorMsg})"
s"\n\tdisk ${diskId.value} in workspace ${workspaceId.value}, \n\tmsg: ${errorMsg})"

val isRetryable: Boolean = false
}

final case class AzureDiskDeletionError(wsmControlledResourceId: WsmControlledResourceId,
final case class AzureDiskDeletionError(diskId: DiskId,
wsmControlledResourceId: WsmControlledResourceId,
workspaceId: WorkspaceId,
errorMsg: String
) extends PubsubHandleMessageError {
override def getMessage: String =
s"\n\tdisk resource: ${wsmControlledResourceId}, \n\tmsg: ${errorMsg})"
s"\n\tdisk ${diskId.value} with resource id: ${wsmControlledResourceId.value}, \n\tmsg: ${errorMsg})"

val isRetryable: Boolean = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
)(implicit val executionContext: ExecutionContext, dbRef: DbReference[F], logger: StructuredLogger[F], F: Async[F])
extends AzurePubsubHandlerAlgebra[F] {

implicit val isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v
// implicits necessary to poll on the status of external jobs
implicit private def isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v
implicit private def wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
implicit private def wsmDeleteDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed

override def createAndPollRuntime(msg: CreateAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
Expand Down Expand Up @@ -450,9 +455,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
resourceId
)

private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] = {
implicit val wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
ctx <- ev.ask
auth <- samDAO.getLeoAuthToken
Expand Down Expand Up @@ -542,11 +545,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
)
)
} yield ()
}

override def deleteAndPollRuntime(msg: DeleteAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] = {
implicit val wsmDeleteVmDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
for {
ctx <- ev.ask

Expand Down Expand Up @@ -611,8 +611,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
_ <- diskRecordOpt match {
case Some(diskRecord) =>
for {
_ <- sendDeleteDisktoWSM(diskRecord.resourceId, msg.workspaceId, auth)
_ <- logger.info(ctx.loggingCtx)(s"runtime disk ${diskId.value} is deleted successfully")
_ <- deleteDiskInWSM(diskId, diskRecord.resourceId, msg.workspaceId, auth, Some(runtime.id))
} yield ()
case _ =>
// if the disk hasn't been created in WSM yet, skip disk deletion
Expand All @@ -625,7 +624,6 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
}.void

// Delete the VM in WSM
// TODO: Add polling on disk deletion
_ <- msg.wsmResourceId.fold(
for {
// Error'd runtimes might not have a WSM resourceId and therefore no WsmJobStatus.
Expand Down Expand Up @@ -687,7 +685,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
_ <- deleteDiskAction
_ <- dbRef.inTransaction(clusterQuery.completeDeletion(runtime.id, ctx.now))
_ <- logger.info(ctx.loggingCtx)(
s"runtime ${msg.runtimeId} and associated disk is deleted successfully"
s"runtime ${msg.runtimeId} ${if (msg.diskIdToDelete.isDefined) "and associated disk"} have been deleted successfully"
)
} yield ()
case WsmJobStatus.Failed =>
Expand Down Expand Up @@ -775,23 +773,22 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
_ <- clusterQuery.updateClusterStatus(e.runtimeId, RuntimeStatus.Error, now).transaction

auth <- samDAO.getLeoAuthToken
diskIdOpt <- clusterQuery.getDiskId(e.runtimeId).transaction

_ <- e.useExistingDisk match {
case false =>
_ <- (e.useExistingDisk, diskIdOpt) match {
// disk was supposed to be created and was
case (false, Some(diskId)) =>
for {
diskRecordOpt <- controlledResourceQuery
.getWsmRecordForRuntime(e.runtimeId, WsmResourceType.AzureDisk)
.transaction
_ <- diskRecordOpt match {
// if there is a disk record, the disk finished creating, so it must be deleted in WSM
case Some(diskRecord) =>
for {
// TODO: Add polling on disk deletion
_ <- sendDeleteDisktoWSM(diskRecord.resourceId, e.workspaceId, auth)
_ <- clusterQuery.setDiskDeleted(e.runtimeId, now).transaction
_ <- logger.info(ctx.loggingCtx)(s"disk for runtime ${e.runtimeId} is deleted successfully")
_ <- deleteDiskInWSM(diskId, diskRecord.resourceId, e.workspaceId, auth, Some(e.runtimeId))
} yield ()
case _ =>
// if the disk hasn't been created in WSM yet, skip disk deletion
for {
_ <- logger.info(

Check warning on line 793 in http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala

View check run for this annotation

Codecov / codecov/patch

http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala#L793

Added line #L793 was not covered by tests
s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk."
Expand All @@ -801,7 +798,13 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
}
} yield ()

case true => F.unit
// disk was supposed to be created and wasn't
case (false, None) =>
logger.info(

Check warning on line 803 in http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala

View check run for this annotation

Codecov / codecov/patch

http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala#L803

Added line #L803 was not covered by tests
s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk."
)
// no disk created
case (true, _) => F.unit

Check warning on line 807 in http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala

View check run for this annotation

Codecov / codecov/patch

http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala#L807

Added line #L807 was not covered by tests
}
} yield ()

Expand Down Expand Up @@ -896,14 +899,15 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
}
} yield ()

private def sendDeleteDisktoWSM(wsmResourceId: WsmControlledResourceId,
workspaceId: WorkspaceId,
auth: Authorization
)(implicit
ev: Ask[F, AppContext]
): F[WsmJobId] =
private def deleteDiskInWSM(diskId: DiskId,
wsmResourceId: WsmControlledResourceId,
workspaceId: WorkspaceId,
auth: Authorization,
runtimeId: Option[Long]
)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
ctx <- ev.ask

jobId = getWsmJobId("delete-disk", wsmResourceId)

_ <- logger.info(ctx.loggingCtx)(s"Sending WSM delete message for disk resource ${wsmResourceId.value}")
Expand All @@ -921,76 +925,79 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
.void
.adaptError(e =>
AzureDiskDeletionError(
diskId,
wsmResourceId,
workspaceId,
s"${ctx.traceId.asString} | WSM call to delete disk failed due to ${e.getMessage}. Please retry delete again"
)
)
} yield jobId

override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] = {
implicit val wsmDeleteDiskDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
for {
ctx <- ev.ask
auth <- samDAO.getLeoAuthToken
getDeleteJobResult = wsmDao.getDeleteDiskJobResult(
GetJobResultRequest(workspaceId, jobId),
auth
)

_ <- msg.wsmResourceId match {
case Some(wsmResourceId) =>
for {
jobId <- sendDeleteDisktoWSM(wsmResourceId, msg.workspaceId, auth)
getDeleteJobResult = wsmDao.getDeleteDiskJobResult(
GetJobResultRequest(msg.workspaceId, jobId),
auth
)
// We need to wait until WSM deletion job to be done to update the database
taskToRun = for {
resp <- streamFUntilDone(
getDeleteJobResult,
config.deleteDiskPollConfig.maxAttempts,
config.deleteDiskPollConfig.interval
).compile.lastOrError

// We need to wait until WSM deletion job to be done to update the database
taskToRun = for {
resp <- streamFUntilDone(
getDeleteJobResult,
config.deleteDiskPollConfig.maxAttempts,
config.deleteDiskPollConfig.interval
).compile.lastOrError

_ <- resp.jobReport.status match {
case WsmJobStatus.Succeeded =>
for {
_ <- logger.info(ctx.loggingCtx)(s"disk ${msg.diskId.value} is deleted successfully")
_ <- dbRef.inTransaction(persistentDiskQuery.delete(msg.diskId, ctx.now))
} yield ()
case WsmJobStatus.Failed =>
F.raiseError[Unit](
DiskDeletionError(
msg.diskId,
msg.workspaceId,
s"WSM delete disk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}"
)
)
case WsmJobStatus.Running =>
F.raiseError[Unit](
DiskDeletionError(
msg.diskId,
msg.workspaceId,
s"WSM delete VM disk was not completed within ${config.deleteDiskPollConfig.maxAttempts} attempts with ${config.deleteDiskPollConfig.interval} delay"
)
)
_ <- resp.jobReport.status match {
case WsmJobStatus.Succeeded =>
for {
_ <- logger.info(ctx.loggingCtx)(s"disk ${diskId.value} is deleted successfully")
_ <- runtimeId match {
case Some(runtimeId) => clusterQuery.setDiskDeleted(runtimeId, ctx.now).transaction
case _ => dbRef.inTransaction(persistentDiskQuery.delete(diskId, ctx.now))
}
} yield ()

_ <- asyncTasks.offer(
Task(
ctx.traceId,
taskToRun,
Some { e =>
handleAzureDiskDeletionError(
DiskDeletionError(msg.diskId, msg.workspaceId, s"Fail to delete disk due to ${e.getMessage}")
)
},
ctx.now,
"deleteDiskV2"
case WsmJobStatus.Failed =>
F.raiseError[Unit](
AzureDiskDeletionError(
diskId,
wsmResourceId,
workspaceId,
s"WSM deleteDisk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}"
)
)
} yield ()
case WsmJobStatus.Running =>
F.raiseError[Unit](
AzureDiskDeletionError(

Check warning on line 968 in http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala

View check run for this annotation

Codecov / codecov/patch

http/src/main/scala/org/broadinstitute/dsde/workbench/leonardo/util/AzurePubsubHandler.scala#L968

Added line #L968 was not covered by tests
diskId,
wsmResourceId,
workspaceId,
s"Wsm deleteDisk job was not completed within ${config.deleteDiskPollConfig.maxAttempts} attempts with ${config.deleteDiskPollConfig.interval} delay"
)
)
}
} yield ()

_ <- asyncTasks.offer(
Task(
ctx.traceId,
taskToRun,
Some { e =>
handleAzureDiskDeletionError(
AzureDiskDeletionError(diskId, wsmResourceId, workspaceId, s"Fail to delete disk due to ${e.getMessage}")
)
},
ctx.now,
"deleteDiskV2"
)
)
} yield ()

override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
ctx <- ev.ask
auth <- samDAO.getLeoAuthToken

_ <- msg.wsmResourceId match {
case Some(wsmResourceId) =>
deleteDiskInWSM(msg.diskId, wsmResourceId, msg.workspaceId, auth, None)
case None =>
for {
_ <- logger.info(s"No WSM resource found for Azure disk ${msg.diskId}, skipping deletion in WSM")
Expand All @@ -999,9 +1006,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
} yield ()
}
} yield ()
}

def handleAzureDiskDeletionError(e: DiskDeletionError)(implicit
def handleAzureDiskDeletionError(e: AzureDiskDeletionError)(implicit
ev: Ask[F, AppContext]
): F[Unit] = for {
ctx <- ev.ask
Expand Down
Loading

0 comments on commit 6b89024

Please sign in to comment.