Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a few async-related bugs with archive jobs #38

Merged
merged 7 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions uit_plus_job/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ def _safe_delete(self, using, keep_parents):
super().delete(using, keep_parents)

@classmethod
@database_sync_to_async
def instance_from_pbs_job(cls, job, user):
script = job.script
instance = cls(
Expand Down Expand Up @@ -316,15 +315,14 @@ def parse_pbs_directive(directive_str):
m = re.match(r"PbsDirective\(directive='(.*?)', options='(.*?)'\)", directive_str)
return PbsDirective(*m.groups())

@property
def archive_dir(self):
async def get_archive_dir(self):
"""Get the job archive directory from the HPC.

Returns:
str: Archive Directory
"""
if self._archive_dir is None:
archive_home = self.get_environment_variable("ARCHIVE_HOME")
archive_home = await self.get_environment_variable("ARCHIVE_HOME")
self._archive_dir = posixpath.join(archive_home, self.remote_workspace_suffix)
return self._archive_dir

Expand Down Expand Up @@ -433,9 +431,9 @@ def working_dir(self):
"""
return self.pbs_job.working_dir

def is_job_archived(self):
async def is_job_archived(self):
archive_filename = f"job_{self.remote_workspace_id}.run_files.tar.gz"
archive_files = self.client.list_dir(self.archive_dir).get("files", [])
archive_files = self.client.list_dir(await self.get_archive_dir()).get("files", [])
return archive_filename in [file["name"] for file in archive_files]

@_ensure_connected
Expand Down Expand Up @@ -464,7 +462,7 @@ async def get_logs(self):
},
}

def get_environment_variable(self, variable):
async def get_environment_variable(self, variable):
"""Get the value of an environment variable from the HPC.

Args:
Expand All @@ -473,7 +471,7 @@ def get_environment_variable(self, variable):
Returns:
str: value of environment variable.
"""
return self.client.env.get(variable)
return await self.client.env.get_environmental_variable(variable)

@_ensure_connected
async def execute(self, *args, **kwargs):
Expand Down Expand Up @@ -605,6 +603,7 @@ async def _update_status(self):
thread.start()
await self._safe_save()

@database_sync_to_async
def set_archived_status(self, value):
archived_job_id = self.extended_properties.get("archived_job_id")
if archived_job_id:
Expand Down Expand Up @@ -744,13 +743,13 @@ async def clean(self, archive=False, remote=True):
tg.create_task(asyncio.to_thread(shutil.rmtree, self.workspace, True))

if remote:
# Remove remote locations
if archive:
cmd = f"archive rm -rf {self.archive_dir} || true"
path = await self.get_archive_dir()
cmd = f"archive rm -rf {path} || true"
tg.create_task(self.client.call(command=cmd, working_dir="/"))
self.set_archived_status(False)
log.info(f"Executing command '{cmd}' on {self.system}")
else:
# Remove remote locations
for path in (self.working_dir, self.home_dir):
cmd = f"rm -rf {path} || true"
tg.create_task(self.client.call(command=cmd, working_dir="/"))
Expand Down Expand Up @@ -796,7 +795,7 @@ async def _archive(self, *args, **kwargs):
)
pbs_script.execution_block = (
f"tar -czf {archive_filename} *\n"
f"archive put -p -C {self.archive_dir} {archive_filename}\n"
f"archive put -p -C {await self.get_archive_dir()} {archive_filename}\n"
f"rm {archive_filename}\n"
)

Expand All @@ -806,7 +805,7 @@ async def _archive(self, *args, **kwargs):
job._remote_workspace_id = self._remote_workspace_id

job.description = f"Archive job: {self.name} ({self.job_id})"
job_model = await self.instance_from_pbs_job(job, self.user)
job_model = await database_sync_to_async(self.instance_from_pbs_job)(job, self.user)
# Put job id in extended properties
save_script_attrs = [
"name",
Expand Down Expand Up @@ -851,7 +850,7 @@ async def restore(self):

# Create transfer script
self.execution_block = (
f"archive get -p -C {self.archive_dir} {archive_filename}\n"
f"archive get -p -C {await self.get_archive_dir()} {archive_filename}\n"
f"tar -xzf {archive_filename}\n"
f"rm {archive_filename}\n"
)
Expand All @@ -861,7 +860,11 @@ async def restore(self):
await self.resubmit()

# Check database for archived job
job_id = self.extended_properties.get("archived_job_id")
await self.update_job_after_restore(self.extended_properties.get("archived_job_id"))

@database_sync_to_async
def update_job_after_restore(self, job_id):
"""After restoring from the archive, recreate job in the main jobs_table if it does not already exist"""
if job_id is not None:
try:
self.__class__.objects.get(job_id=job_id)
Expand All @@ -879,8 +882,8 @@ async def restore(self):
pbs_job._remote_workspace = self._remote_workspace
pbs_job._remote_workspace_id = self._remote_workspace_id
pbs_job._job_id = job_id
restored = await self.instance_from_pbs_job(pbs_job, self.user)
restored.status = "Complete"
restored = self.instance_from_pbs_job(pbs_job, self.user)
restored._status = "COM"
restored.save()


Expand Down
2 changes: 1 addition & 1 deletion uit_plus_job/submit_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ def transfer_output_files(self):

async def submit(self, custom_logs=None):
self.job.script = self.pbs_script # update script to ensure it reflects any UI updates
job = await UitPlusJob.instance_from_pbs_job(self.job, self.tethys_user)
job = await database_sync_to_async(UitPlusJob.instance_from_pbs_job)(self.job, self.tethys_user)
job.custom_logs = custom_logs or self.custom_logs
job.transfer_output_files = self.transfer_output_files
await job.execute()