Skip to content

Commit

Permalink
[IA-4634] Add polling when deleting disk in WSM (#4152)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucymcnatt authored Feb 1, 2024
1 parent e6374ab commit 7e2df46
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,17 @@ object clusterQuery extends TableQuery(new ClusterTable(_)) {
_ <- diskIdOpt.traverse(diskId => persistentDiskQuery.delete(diskId, dateAccessed))
} yield ()

def getDiskId(runtimeId: Long)(implicit
ec: ExecutionContext
): DBIO[Option[DiskId]] =
for {
runtimeConfigId <- findByIdQuery(runtimeId)
.map(_.runtimeConfigId)
.result
.headOption
diskIdOpt <- runtimeConfigId.flatTraverse(rid => RuntimeConfigQueries.getDiskId(rid))
} yield diskIdOpt

def setToRunning(id: Long, hostIp: IP, dateAccessed: Instant): DBIO[Int] =
updateClusterStatusAndHostIp(id, RuntimeStatus.Running, Some(hostIp), dateAccessed)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,17 +1069,18 @@ object PubsubHandleMessageError {
final case class DiskDeletionError(diskId: DiskId, workspaceId: WorkspaceId, errorMsg: String)
extends PubsubHandleMessageError {
override def getMessage: String =
s"\n\tdisk ${diskId} in workspace ${workspaceId}, \n\tmsg: ${errorMsg})"
s"\n\tdisk ${diskId.value} in workspace ${workspaceId.value}, \n\tmsg: ${errorMsg})"

val isRetryable: Boolean = false
}

final case class AzureDiskDeletionError(wsmControlledResourceId: WsmControlledResourceId,
final case class AzureDiskDeletionError(diskId: DiskId,
wsmControlledResourceId: WsmControlledResourceId,
workspaceId: WorkspaceId,
errorMsg: String
) extends PubsubHandleMessageError {
override def getMessage: String =
s"\n\tdisk resource: ${wsmControlledResourceId}, \n\tmsg: ${errorMsg})"
s"\n\tdisk ${diskId.value} with resource id: ${wsmControlledResourceId.value}, \n\tmsg: ${errorMsg})"

val isRetryable: Boolean = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
)(implicit val executionContext: ExecutionContext, dbRef: DbReference[F], logger: StructuredLogger[F], F: Async[F])
extends AzurePubsubHandlerAlgebra[F] {

implicit val isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v
// implicits necessary to poll on the status of external jobs
implicit private def isJupyterUpDoneCheckable: DoneCheckable[Boolean] = (v: Boolean) => v
implicit private def wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
implicit private def wsmDeleteDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed

override def createAndPollRuntime(msg: CreateAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
Expand Down Expand Up @@ -450,9 +455,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
resourceId
)

private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] = {
implicit val wsmCreateVmDoneCheckable: DoneCheckable[GetCreateVmJobResult] = (v: GetCreateVmJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
private def monitorCreateRuntime(params: PollRuntimeParams)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
ctx <- ev.ask
auth <- samDAO.getLeoAuthToken
Expand Down Expand Up @@ -542,11 +545,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
)
)
} yield ()
}

override def deleteAndPollRuntime(msg: DeleteAzureRuntimeMessage)(implicit ev: Ask[F, AppContext]): F[Unit] = {
implicit val wsmDeleteVmDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
for {
ctx <- ev.ask

Expand Down Expand Up @@ -611,8 +611,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
_ <- diskRecordOpt match {
case Some(diskRecord) =>
for {
_ <- sendDeleteDisktoWSM(diskRecord.resourceId, msg.workspaceId, auth)
_ <- logger.info(ctx.loggingCtx)(s"runtime disk ${diskId.value} is deleted successfully")
_ <- deleteDiskInWSM(diskId, diskRecord.resourceId, msg.workspaceId, auth, Some(runtime.id))
} yield ()
case _ =>
// if the disk hasn't been created in WSM yet, skip disk deletion
Expand All @@ -625,7 +624,6 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
}.void

// Delete the VM in WSM
// TODO: Add polling on disk deletion
_ <- msg.wsmResourceId.fold(
for {
// Error'd runtimes might not have a WSM resourceId and therefore no WsmJobStatus.
Expand Down Expand Up @@ -687,7 +685,7 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
_ <- deleteDiskAction
_ <- dbRef.inTransaction(clusterQuery.completeDeletion(runtime.id, ctx.now))
_ <- logger.info(ctx.loggingCtx)(
s"runtime ${msg.runtimeId} and associated disk is deleted successfully"
s"runtime ${msg.runtimeId} ${if (msg.diskIdToDelete.isDefined) "and associated disk"} have been deleted successfully"
)
} yield ()
case WsmJobStatus.Failed =>
Expand Down Expand Up @@ -775,23 +773,22 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
_ <- clusterQuery.updateClusterStatus(e.runtimeId, RuntimeStatus.Error, now).transaction

auth <- samDAO.getLeoAuthToken
diskIdOpt <- clusterQuery.getDiskId(e.runtimeId).transaction

_ <- e.useExistingDisk match {
case false =>
_ <- (e.useExistingDisk, diskIdOpt) match {
// disk was supposed to be created and was
case (false, Some(diskId)) =>
for {
diskRecordOpt <- controlledResourceQuery
.getWsmRecordForRuntime(e.runtimeId, WsmResourceType.AzureDisk)
.transaction
_ <- diskRecordOpt match {
// if there is a disk record, the disk finished creating, so it must be deleted in WSM
case Some(diskRecord) =>
for {
// TODO: Add polling on disk deletion
_ <- sendDeleteDisktoWSM(diskRecord.resourceId, e.workspaceId, auth)
_ <- clusterQuery.setDiskDeleted(e.runtimeId, now).transaction
_ <- logger.info(ctx.loggingCtx)(s"disk for runtime ${e.runtimeId} is deleted successfully")
_ <- deleteDiskInWSM(diskId, diskRecord.resourceId, e.workspaceId, auth, Some(e.runtimeId))
} yield ()
case _ =>
// if the disk hasn't been created in WSM yet, skip disk deletion
for {
_ <- logger.info(
s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk."
Expand All @@ -801,7 +798,13 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
}
} yield ()

case true => F.unit
// disk was supposed to be created and wasn't
case (false, None) =>
logger.info(
s"No disk resource found for runtime ${e.runtimeId.toString} in ${e.workspaceId.value}. No-op for wsmDao.deleteDisk."
)
// no disk created
case (true, _) => F.unit
}
} yield ()

Expand Down Expand Up @@ -896,14 +899,15 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
}
} yield ()

private def sendDeleteDisktoWSM(wsmResourceId: WsmControlledResourceId,
workspaceId: WorkspaceId,
auth: Authorization
)(implicit
ev: Ask[F, AppContext]
): F[WsmJobId] =
private def deleteDiskInWSM(diskId: DiskId,
wsmResourceId: WsmControlledResourceId,
workspaceId: WorkspaceId,
auth: Authorization,
runtimeId: Option[Long]
)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
ctx <- ev.ask

jobId = getWsmJobId("delete-disk", wsmResourceId)

_ <- logger.info(ctx.loggingCtx)(s"Sending WSM delete message for disk resource ${wsmResourceId.value}")
Expand All @@ -921,76 +925,79 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
.void
.adaptError(e =>
AzureDiskDeletionError(
diskId,
wsmResourceId,
workspaceId,
s"${ctx.traceId.asString} | WSM call to delete disk failed due to ${e.getMessage}. Please retry delete again"
)
)
} yield jobId

override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] = {
implicit val wsmDeleteDiskDoneCheckable: DoneCheckable[GetDeleteJobResult] = (v: GetDeleteJobResult) =>
v.jobReport.status.equals(WsmJobStatus.Succeeded) || v.jobReport.status == WsmJobStatus.Failed
for {
ctx <- ev.ask
auth <- samDAO.getLeoAuthToken
getDeleteJobResult = wsmDao.getDeleteDiskJobResult(
GetJobResultRequest(workspaceId, jobId),
auth
)

_ <- msg.wsmResourceId match {
case Some(wsmResourceId) =>
for {
jobId <- sendDeleteDisktoWSM(wsmResourceId, msg.workspaceId, auth)
getDeleteJobResult = wsmDao.getDeleteDiskJobResult(
GetJobResultRequest(msg.workspaceId, jobId),
auth
)
// We need to wait until WSM deletion job to be done to update the database
taskToRun = for {
resp <- streamFUntilDone(
getDeleteJobResult,
config.deleteDiskPollConfig.maxAttempts,
config.deleteDiskPollConfig.interval
).compile.lastOrError

// We need to wait until WSM deletion job to be done to update the database
taskToRun = for {
resp <- streamFUntilDone(
getDeleteJobResult,
config.deleteDiskPollConfig.maxAttempts,
config.deleteDiskPollConfig.interval
).compile.lastOrError

_ <- resp.jobReport.status match {
case WsmJobStatus.Succeeded =>
for {
_ <- logger.info(ctx.loggingCtx)(s"disk ${msg.diskId.value} is deleted successfully")
_ <- dbRef.inTransaction(persistentDiskQuery.delete(msg.diskId, ctx.now))
} yield ()
case WsmJobStatus.Failed =>
F.raiseError[Unit](
DiskDeletionError(
msg.diskId,
msg.workspaceId,
s"WSM delete disk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}"
)
)
case WsmJobStatus.Running =>
F.raiseError[Unit](
DiskDeletionError(
msg.diskId,
msg.workspaceId,
s"WSM delete VM disk was not completed within ${config.deleteDiskPollConfig.maxAttempts} attempts with ${config.deleteDiskPollConfig.interval} delay"
)
)
_ <- resp.jobReport.status match {
case WsmJobStatus.Succeeded =>
for {
_ <- logger.info(ctx.loggingCtx)(s"disk ${diskId.value} is deleted successfully")
_ <- runtimeId match {
case Some(runtimeId) => clusterQuery.setDiskDeleted(runtimeId, ctx.now).transaction
case _ => dbRef.inTransaction(persistentDiskQuery.delete(diskId, ctx.now))
}
} yield ()

_ <- asyncTasks.offer(
Task(
ctx.traceId,
taskToRun,
Some { e =>
handleAzureDiskDeletionError(
DiskDeletionError(msg.diskId, msg.workspaceId, s"Fail to delete disk due to ${e.getMessage}")
)
},
ctx.now,
"deleteDiskV2"
case WsmJobStatus.Failed =>
F.raiseError[Unit](
AzureDiskDeletionError(
diskId,
wsmResourceId,
workspaceId,
s"WSM deleteDisk job failed due to ${resp.errorReport.map(_.message).getOrElse("unknown")}"
)
)
} yield ()
case WsmJobStatus.Running =>
F.raiseError[Unit](
AzureDiskDeletionError(
diskId,
wsmResourceId,
workspaceId,
s"Wsm deleteDisk job was not completed within ${config.deleteDiskPollConfig.maxAttempts} attempts with ${config.deleteDiskPollConfig.interval} delay"
)
)
}
} yield ()

_ <- asyncTasks.offer(
Task(
ctx.traceId,
taskToRun,
Some { e =>
handleAzureDiskDeletionError(
AzureDiskDeletionError(diskId, wsmResourceId, workspaceId, s"Fail to delete disk due to ${e.getMessage}")
)
},
ctx.now,
"deleteDiskV2"
)
)
} yield ()

override def deleteDisk(msg: DeleteDiskV2Message)(implicit ev: Ask[F, AppContext]): F[Unit] =
for {
ctx <- ev.ask
auth <- samDAO.getLeoAuthToken

_ <- msg.wsmResourceId match {
case Some(wsmResourceId) =>
deleteDiskInWSM(msg.diskId, wsmResourceId, msg.workspaceId, auth, None)
case None =>
for {
_ <- logger.info(s"No WSM resource found for Azure disk ${msg.diskId}, skipping deletion in WSM")
Expand All @@ -999,9 +1006,8 @@ class AzurePubsubHandlerInterp[F[_]: Parallel](
} yield ()
}
} yield ()
}

def handleAzureDiskDeletionError(e: DiskDeletionError)(implicit
def handleAzureDiskDeletionError(e: AzureDiskDeletionError)(implicit
ev: Ask[F, AppContext]
): F[Unit] = for {
ctx <- ev.ask
Expand Down
Loading

0 comments on commit 7e2df46

Please sign in to comment.