From be7004c709d942c41e543e09c0e4e4ac541c91ec Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 6 Dec 2023 13:54:54 +0000 Subject: [PATCH] fileinstall: run coros in the background if loop already running * Allow Rose file installation to be called by code which already has an event loop running by scheduling coroutines to run in the background (i.e. schedule but don't await). * The calling code can list these tasks using `asyncio.all_tasks()` and await them as appropriate. * Addresses https://github.com/cylc/cylc-rose/issues/274 --- metomi/rose/config_processors/fileinstall.py | 4 +++ metomi/rose/job_runner.py | 36 ++++++++++++++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/metomi/rose/config_processors/fileinstall.py b/metomi/rose/config_processors/fileinstall.py index 937ed7464..24a839731 100644 --- a/metomi/rose/config_processors/fileinstall.py +++ b/metomi/rose/config_processors/fileinstall.py @@ -17,6 +17,7 @@ """Process "file:*" sections in node of a metomi.rose.config_tree.ConfigTree. """ +from contextlib import suppress from fnmatch import fnmatch from glob import glob from io import BytesIO @@ -106,6 +107,9 @@ def process( finally: if cwd != os.getcwd(): self.manager.fs_util.chdir(cwd) + if loc_dao.conn: + with suppress(Exception): + loc_dao.conn.close() def _process(self, conf_tree, nodes, loc_dao, **kwargs): """Helper for self.process.""" diff --git a/metomi/rose/job_runner.py b/metomi/rose/job_runner.py index 6c8d1610a..86247a817 100644 --- a/metomi/rose/job_runner.py +++ b/metomi/rose/job_runner.py @@ -17,9 +17,17 @@ """A multiprocessing runner of jobs with dependencies.""" import asyncio + from metomi.rose.reporter import Event +# set containing references to "background" coroutines that are not referenced +# from any code (i.e. are not directly awaited), adding them to this list +# avoids the potential for garbage collection to delete them whilst they are +# running +_BACKGROUND_TASKS = set() + + class JobEvent(Event): """Event raised when a job completes.""" @@ -175,19 +183,33 @@ def run(self, job_manager, *args, concurrency=6): The maximum number of jobs to run concurrently. """ - running = [] loop = asyncio.get_event_loop() loop.set_exception_handler(self.job_processor.handle_event) - loop.run_until_complete( - asyncio.gather( - self._run_jobs(running, job_manager, args, concurrency), - self._post_process_jobs(running, job_manager, args), - ) - ) + coro = self._run(job_manager, *args, concurrency=concurrency) + try: + # event loop is not running (e.g. rose CLI use) + loop.run_until_complete(coro) + except RuntimeError: + # event loop is already running (e.g. cylc CLI use) + # WARNING: this starts the file installation running, but it + # doesn't wait for it to finish, that's your problem :( + task = loop.create_task(coro) + # reference this task from a global variable to prevent it from + # being garbage collected + _BACKGROUND_TASKS.add(task) + # tidy up afterwards + task.add_done_callback(_BACKGROUND_TASKS.discard) dead_jobs = job_manager.get_dead_jobs() if dead_jobs: raise JobRunnerNotCompletedError(dead_jobs) + async def _run(self, job_manager, *args, concurrency=6): + running = [] + await asyncio.gather( + self._run_jobs(running, job_manager, args, concurrency), + self._post_process_jobs(running, job_manager, args), + ) + async def _run_jobs(self, running, job_manager, args, concurrency): """Run pending jobs subject to the concurrency limit.