Skip to content

Commit fd0eb21

Browse files
authored
Fix cleanup after failed convert_to_wkw job (#8426)
1 parent 087e287 commit fd0eb21

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

app/controllers/WKRemoteWorkerController.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO,
2828

2929
def requestJobs(key: String): Action[AnyContent] = Action.async { implicit request =>
3030
for {
31-
worker <- workerDAO.findOneByKey(key) ?~> "jobs.worker.notFound"
31+
worker <- workerDAO.findOneByKey(key) ?~> "job.worker.notFound"
3232
_ = workerDAO.updateHeartBeat(worker._id)
3333
_ <- reserveNextJobs(worker, pendingIterationCount = 10)
3434
assignedUnfinishedJobs: List[Job] <- jobDAO.findAllUnfinishedByWorker(worker._id)
@@ -75,19 +75,19 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO,
7575
def updateJobStatus(key: String, id: ObjectId): Action[JobStatus] = Action.async(validateJson[JobStatus]) {
7676
implicit request =>
7777
for {
78-
_ <- workerDAO.findOneByKey(key) ?~> "jobs.worker.notFound"
78+
_ <- workerDAO.findOneByKey(key) ?~> "job.worker.notFound"
7979
jobBeforeChange <- jobDAO.findOne(id)(GlobalAccessContext)
8080
_ <- jobDAO.updateStatus(id, request.body)
81-
jobAfterChange <- jobDAO.findOne(id)(GlobalAccessContext)
81+
jobAfterChange <- jobDAO.findOne(id)(GlobalAccessContext) ?~> "job.notFound"
8282
_ = jobService.trackStatusChange(jobBeforeChange, jobAfterChange)
83-
_ <- jobService.cleanUpIfFailed(jobAfterChange)
83+
_ <- jobService.cleanUpIfFailed(jobAfterChange) ?~> "job.cleanup.failed"
8484
} yield Ok
8585
}
8686

8787
def attachVoxelyticsWorkflow(key: String, id: ObjectId): Action[String] = Action.async(validateJson[String]) {
8888
implicit request =>
8989
for {
90-
_ <- workerDAO.findOneByKey(key) ?~> "jobs.worker.notFound"
90+
_ <- workerDAO.findOneByKey(key) ?~> "job.worker.notFound"
9191
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
9292
organizationId <- jobDAO.organizationIdForJobId(id) ?~> "job.notFound"
9393
workflowHash = request.body
@@ -105,7 +105,7 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO,
105105
Action.async(validateJson[String]) { implicit request =>
106106
implicit val ctx: DBAccessContext = GlobalAccessContext
107107
for {
108-
_ <- workerDAO.findOneByKey(key) ?~> "jobs.worker.notFound"
108+
_ <- workerDAO.findOneByKey(key) ?~> "job.worker.notFound"
109109
organizationId <- jobDAO.organizationIdForJobId(id) ?~> "job.notFound"
110110
dataset <- datasetDAO.findOneByDirectoryNameAndOrganization(request.body, organizationId)
111111
aiInference <- aiInferenceDAO.findOneByJobId(id) ?~> "aiInference.notFound"

app/models/job/JobService.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,12 @@ class JobService @Inject()(wkConf: WkConf,
149149

150150
def cleanUpIfFailed(job: Job): Fox[Unit] =
151151
if (job.state == JobState.FAILURE && job.command == JobCommand.convert_to_wkw) {
152-
logger.info(s"WKW conversion job ${job._id} failed. Deleting dataset from the database, freeing the name...")
152+
logger.info(
153+
s"WKW conversion job ${job._id} failed. Deleting dataset from the database, freeing the directoryName...")
153154
val commandArgs = job.commandArgs.value
154155
for {
155156
datasetDirectoryName <- commandArgs.get("dataset_directory_name").map(_.as[String]).toFox
156-
organizationId <- commandArgs.get("organization_name").map(_.as[String]).toFox
157+
organizationId <- commandArgs.get("organization_id").map(_.as[String]).toFox
157158
dataset <- datasetDAO.findOneByDirectoryNameAndOrganization(datasetDirectoryName, organizationId)(
158159
GlobalAccessContext)
159160
_ <- datasetDAO.deleteDataset(dataset._id)

0 commit comments

Comments
 (0)