diff --git a/exp/example_azure_experiment/postprocess_states_0.py b/exp/example_azure_experiment/postprocessing_scripts/0_combine_state_timeseries_csv.py similarity index 100% rename from exp/example_azure_experiment/postprocess_states_0.py rename to exp/example_azure_experiment/postprocessing_scripts/0_combine_state_timeseries_csv.py diff --git a/src/mechanistic_azure/azure_utilities.py b/src/mechanistic_azure/azure_utilities.py index 7e425501..7f82833f 100644 --- a/src/mechanistic_azure/azure_utilities.py +++ b/src/mechanistic_azure/azure_utilities.py @@ -4,8 +4,6 @@ from azure.core.paging import ItemPaged from cfa_azure.clients import AzureClient -from resp_ode.utils import find_files, sort_filenames_by_suffix - class AzureExperimentLauncher: def __init__( @@ -277,51 +275,129 @@ def launch_states(self, depend_on_task_ids: list[str] = None) -> list[str]: task_ids += task_id return task_ids + def _find_postprocess_file_docker( + self, postprocess_folder_name, postprocess_filename + ) -> str: + """given a postprocess script filename validates its existance + in the local experiment, returns the file's path on the docker + environment + + Parameters + ---------- + postprocessing_folder_name: str + name of the folder in which postprocessing scripts are held, + postprocess_filename : str + filename of the postprocess script. Able to + correctly parse paths as well as raw filenames. + + Returns + ------- + str + path to the postprocess script as executed on the docker machine + (they are uploaded to the input mount) + + Raises + ------ + FileNotFoundError + if `postprocess_filename` does not exist in the experiment directory + nor the `postprocess_folder_name`, raises. + """ + if os.path.exists( + os.path.join(self.experiment_path_local, postprocess_filename) + ): + postprocess_docker_path = os.path.join( + self.experiment_path_docker, postprocess_filename + ) + # if user does not prepend the folder name catch that error + elif os.path.exists( + os.path.join( + self.experiment_path_local, + postprocess_folder_name, + postprocess_filename, + ) + ): + postprocess_docker_path = os.path.join( + self.experiment_path_docker, + postprocess_folder_name, + postprocess_filename, + ) + else: + raise FileNotFoundError( + "Unable to find postprocessing script %s in your " + "experiment folder, make sure you are specifying " + "paths relative to the experiment folder" + % postprocess_filename + ) + return postprocess_docker_path + def launch_postprocess( self, + execution_order: list[str | list[str]], depend_on_task_ids: list[str], + postprocess_folder_name: str = "postprocessing_scripts", ) -> list[str]: - """Launches postprocessing scripts from within `experiment_path_local` identified by - the postprocess_states tag and an optional suffix. - All postprocessing scripts depend on completion of tasks within `depend_on_task_ids` - along with any postprocessing scripts with lower suffix. - Eg. `postprocess_states_2.py` depends on `postprocess_states_1.py` to finish first. + """Launches postprocessing scripts identified by `execution_order` + in the order they are passed in the list. List elements are + treated as bundles of postprocess scripts that are able to be run + simultaneously. Script filenames given to `execution_order` must + be found in either + Example + ---------- + ``` + main_tasks = self.launch_states() + #launch 3 postprocess tasks to execute simultaneously + ex_order = [("1.py", "2.py", "3.py")] + #launch 3 postprocess tasks to execute sequentially (note no list nesting) + ex_order = ["1.py", "2.py", "3.py"] + ex_order = [("1.py"), ("2.py"), ("3.py")] + #launch 3 postprocess tasks, 1 and 2 to execute together, 3 last + ex_order = [("1.py", "2.py"), "3.py"] + postprocess = self.launch_postprocess(execution_order = ex_order, depend_on_task_ids = main_tasks) + ``` Parameters ---------- + execution_order: list[str | list[str]] + list of paths to each postprocess script, lists of scripts imply + simultaneous execution while ordering of the top level list + implies order of execution. depend_on_task_ids : list[str] list of task ids on which postprocessing scripts depend on finishing to start themselves + postprocessing_folder_name: str, optional + name of the folder in which postprocessing scripts are held, + by default "postprocessing_scripts/" Returns ------- list[str] list of each postprocess task_id launched """ - # get all paths to postprocess scripts on this machine - postprocess_scripts = find_files( - self.experiment_path_local, filename_contains="postprocess_states" - ) - # lets sort postprocess scripts in order of their suffix - postprocess_scripts = sort_filenames_by_suffix(postprocess_scripts) - # [] means no postprocessing scripts in this experiment + # postprocess_task_ids added one execution bundle at a time + # so within-bundles run simultaneously but bundles run sequentially postprocess_task_ids = [] - if postprocess_scripts: - # translate paths to docker paths - postprocess_scripts_docker = [ - os.path.join(self.experiment_path_docker, filename) - for filename in postprocess_scripts - ] - for postprocess_script in postprocess_scripts_docker: - # depends_on flag requires postprocessing scripts to await completion of all previously run tasks - # this means postprocess_states.py requires all states to finish - # postprocessing scripts will require all earlier postprocessing scripts to finish before starting as well. - postprocess_task_id = self.azure_client.add_task( + for execution in execution_order: + # convert str to list[str] even if only single element + execution_bundle: list[str] = ( + [execution] if isinstance(execution, str) else execution + ) + execution_bundle_ids = [] + for postprocess_filename in execution_bundle: + # translate paths to docker paths + # may raise FileNotFoundError if unable to locate ppfile + postprocess_docker_path = self._find_postprocess_file_docker( + postprocess_folder_name, postprocess_filename + ) + # postprocess_task_ids contains all previous execution_bundles + # task_ids, but not any task_ids from the current bundle + task_id = self.azure_client.add_task( job_id=self.job_id, docker_cmd="python %s -j %s" - % (postprocess_script, self.job_id), + % (postprocess_docker_path, self.job_id), depends_on=depend_on_task_ids + postprocess_task_ids, ) - postprocess_task_ids += postprocess_task_id + execution_bundle_ids += task_id + # bundle completed, add those task ids to the running list + postprocess_task_ids += execution_bundle_ids return postprocess_task_ids diff --git a/src/mechanistic_azure/experiment_launcher_azure.py b/src/mechanistic_azure/experiment_launcher_azure.py index 76029c0c..98fea9b8 100644 --- a/src/mechanistic_azure/experiment_launcher_azure.py +++ b/src/mechanistic_azure/experiment_launcher_azure.py @@ -4,8 +4,10 @@ """ import argparse +import os +from itertools import groupby -from .azure_utilities import AzureExperimentLauncher +from azure_utilities import AzureExperimentLauncher # specify job ID, cant already exist @@ -33,6 +35,7 @@ args = parser.parse_args() experiment_names: list[str] = args.experiment_name job_id: str = args.job_id +postprocess_execution_order = [] # upload dockerfile used launcher = AzureExperimentLauncher( experiment_names[0], @@ -41,6 +44,27 @@ experiment_directory=EXPERIMENTS_DIRECTORY, docker_image_name=DOCKER_IMAGE_TAG, ) +pp_scripts_path = os.path.join( + launcher.experiment_path_local, "postprocessing_scripts" +) +if os.path.exists(pp_scripts_path): + postprocess_script_filenames = [ + f + for f in os.listdir(pp_scripts_path) + if os.path.isfile(os.path.join(pp_scripts_path, f)) + ] + # Sort the list based on the numeric prefix of each filename + sorted_list = sorted( + postprocess_script_filenames, key=lambda x: int(x.split("_")[0]) + ) + + # Group the filenames based on their shared numbers + postprocess_execution_order = [ + list(group) + for _, group in groupby(sorted_list, lambda x: x.split("_")[0]) + ] + + launcher.set_resource_pool(pool_name="scenarios_4cpu_pool") all_tasks_run = [] # all experiments will be placed under the same jobid, @@ -52,7 +76,8 @@ ) state_task_ids = launcher.launch_states(depend_on_task_ids=all_tasks_run) postprocessing_tasks = launcher.launch_postprocess( - depend_on_task_ids=state_task_ids + execution_order=postprocess_execution_order, + depend_on_task_ids=state_task_ids, ) all_tasks_run += state_task_ids + postprocessing_tasks launcher.azure_client.monitor_job(job_id) diff --git a/src/resp_ode/utils.py b/src/resp_ode/utils.py index 4597d80c..2589dbbe 100644 --- a/src/resp_ode/utils.py +++ b/src/resp_ode/utils.py @@ -1898,38 +1898,3 @@ def find_files( postprocess_files = glob.glob(pattern, recursive=recursive) return [os.path.basename(file) for file in postprocess_files] - - -def sort_filenames_by_suffix(filenames) -> list[str]: - """Given a list of filenames, sorts them by the _1/2/3 suffix - handles a no suffix case as first element. - An example ordering would be: - `[file.py, file_0.py, file_5.py, file_11.py, file_new_15.py]` - - Parameters - ---------- - filenames : list[str] - list of filenames, ending with _int suffixes. - - Returns - ---------- - same filenames list but sorted by suffix order. - """ - - def extract_number(filename): - # Find the last occurrence of '_' - last_underscore_index = filename.rfind("_") - - # Check if '_' exists in the filename - if last_underscore_index != -1: - start = last_underscore_index + 1 - end = filename.rfind(".") - number_str = filename[start:end] - - # Check if the extracted substring is a valid integer - if number_str.isdigit(): - return int(number_str) - - return 0 - - return sorted(filenames, key=extract_number)