Skip to content

Commit

Permalink
rebuilding postprocessing script ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
arik-shurygin committed Oct 22, 2024
1 parent 52d0e6b commit 29ab545
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 64 deletions.
130 changes: 103 additions & 27 deletions src/mechanistic_azure/azure_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from azure.core.paging import ItemPaged
from cfa_azure.clients import AzureClient

from resp_ode.utils import find_files, sort_filenames_by_suffix


class AzureExperimentLauncher:
def __init__(
Expand Down Expand Up @@ -277,51 +275,129 @@ def launch_states(self, depend_on_task_ids: list[str] = None) -> list[str]:
task_ids += task_id
return task_ids

def _find_postprocess_file_docker(
self, postprocess_folder_name, postprocess_filename
) -> str:
"""given a postprocess script filename validates its existance
in the local experiment, returns the file's path on the docker
environment
Parameters
----------
postprocessing_folder_name: str
name of the folder in which postprocessing scripts are held,
postprocess_filename : str
filename of the postprocess script. Able to
correctly parse paths as well as raw filenames.
Returns
-------
str
path to the postprocess script as executed on the docker machine
(they are uploaded to the input mount)
Raises
------
FileNotFoundError
if `postprocess_filename` does not exist in the experiment directory
nor the `postprocess_folder_name`, raises.
"""
if os.path.exists(
os.path.join(self.experiment_path_local, postprocess_filename)
):
postprocess_docker_path = os.path.join(
self.experiment_path_docker, postprocess_filename
)
# if user does not prepend the folder name catch that error
elif os.path.exists(
os.path.join(
self.experiment_path_local,
postprocess_folder_name,
postprocess_filename,
)
):
postprocess_docker_path = os.path.join(
self.experiment_path_docker,
postprocess_folder_name,
postprocess_filename,
)
else:
raise FileNotFoundError(
"Unable to find postprocessing script %s in your "
"experiment folder, make sure you are specifying "
"paths relative to the experiment folder"
% postprocess_filename
)
return postprocess_docker_path

def launch_postprocess(
self,
execution_order: list[str | list[str]],
depend_on_task_ids: list[str],
postprocess_folder_name: str = "postprocessing_scripts",
) -> list[str]:
"""Launches postprocessing scripts from within `experiment_path_local` identified by
the postprocess_states tag and an optional suffix.
All postprocessing scripts depend on completion of tasks within `depend_on_task_ids`
along with any postprocessing scripts with lower suffix.
Eg. `postprocess_states_2.py` depends on `postprocess_states_1.py` to finish first.
"""Launches postprocessing scripts identified by `execution_order`
in the order they are passed in the list. List elements are
treated as bundles of postprocess scripts that are able to be run
simultaneously. Script filenames given to `execution_order` must
be found in either
Example
----------
```
main_tasks = self.launch_states()
#launch 3 postprocess tasks to execute simultaneously
ex_order = [("1.py", "2.py", "3.py")]
#launch 3 postprocess tasks to execute sequentially (note no list nesting)
ex_order = ["1.py", "2.py", "3.py"]
ex_order = [("1.py"), ("2.py"), ("3.py")]
#launch 3 postprocess tasks, 1 and 2 to execute together, 3 last
ex_order = [("1.py", "2.py"), "3.py"]
postprocess = self.launch_postprocess(execution_order = ex_order, depend_on_task_ids = main_tasks)
```
Parameters
----------
execution_order: list[str | list[str]]
list of paths to each postprocess script, lists of scripts imply
simultaneous execution while ordering of the top level list
implies order of execution.
depend_on_task_ids : list[str]
list of task ids on which postprocessing scripts depend on finishing to start themselves
postprocessing_folder_name: str, optional
name of the folder in which postprocessing scripts are held,
by default "postprocessing_scripts/"
Returns
-------
list[str]
list of each postprocess task_id launched
"""
# get all paths to postprocess scripts on this machine
postprocess_scripts = find_files(
self.experiment_path_local, filename_contains="postprocess_states"
)
# lets sort postprocess scripts in order of their suffix
postprocess_scripts = sort_filenames_by_suffix(postprocess_scripts)
# [] means no postprocessing scripts in this experiment
# postprocess_task_ids added one execution bundle at a time
# so within-bundles run simultaneously but bundles run sequentially
postprocess_task_ids = []
if postprocess_scripts:
# translate paths to docker paths
postprocess_scripts_docker = [
os.path.join(self.experiment_path_docker, filename)
for filename in postprocess_scripts
]
for postprocess_script in postprocess_scripts_docker:
# depends_on flag requires postprocessing scripts to await completion of all previously run tasks
# this means postprocess_states.py requires all states to finish
# postprocessing scripts will require all earlier postprocessing scripts to finish before starting as well.
postprocess_task_id = self.azure_client.add_task(
for execution in execution_order:
# convert str to list[str] even if only single element
execution_bundle: list[str] = (
[execution] if isinstance(execution, str) else execution
)
execution_bundle_ids = []
for postprocess_filename in execution_bundle:
# translate paths to docker paths
# may raise FileNotFoundError if unable to locate ppfile
postprocess_docker_path = self._find_postprocess_file_docker(
postprocess_folder_name, postprocess_filename
)
# postprocess_task_ids contains all previous execution_bundles
# task_ids, but not any task_ids from the current bundle
task_id = self.azure_client.add_task(
job_id=self.job_id,
docker_cmd="python %s -j %s"
% (postprocess_script, self.job_id),
% (postprocess_docker_path, self.job_id),
depends_on=depend_on_task_ids + postprocess_task_ids,
)
postprocess_task_ids += postprocess_task_id
execution_bundle_ids += task_id
# bundle completed, add those task ids to the running list
postprocess_task_ids += execution_bundle_ids

return postprocess_task_ids

Expand Down
29 changes: 27 additions & 2 deletions src/mechanistic_azure/experiment_launcher_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"""

import argparse
import os
from itertools import groupby

from .azure_utilities import AzureExperimentLauncher
from azure_utilities import AzureExperimentLauncher

# specify job ID, cant already exist

Expand Down Expand Up @@ -33,6 +35,7 @@
args = parser.parse_args()
experiment_names: list[str] = args.experiment_name
job_id: str = args.job_id
postprocess_execution_order = []
# upload dockerfile used
launcher = AzureExperimentLauncher(
experiment_names[0],
Expand All @@ -41,6 +44,27 @@
experiment_directory=EXPERIMENTS_DIRECTORY,
docker_image_name=DOCKER_IMAGE_TAG,
)
pp_scripts_path = os.path.join(
launcher.experiment_path_local, "postprocessing_scripts"
)
if os.path.exists(pp_scripts_path):
postprocess_script_filenames = [
f
for f in os.listdir(pp_scripts_path)
if os.path.isfile(os.path.join(pp_scripts_path, f))
]
# Sort the list based on the numeric prefix of each filename
sorted_list = sorted(
postprocess_script_filenames, key=lambda x: int(x.split("_")[0])
)

# Group the filenames based on their shared numbers
postprocess_execution_order = [
list(group)
for _, group in groupby(sorted_list, lambda x: x.split("_")[0])
]


launcher.set_resource_pool(pool_name="scenarios_4cpu_pool")
all_tasks_run = []
# all experiments will be placed under the same jobid,
Expand All @@ -52,7 +76,8 @@
)
state_task_ids = launcher.launch_states(depend_on_task_ids=all_tasks_run)
postprocessing_tasks = launcher.launch_postprocess(
depend_on_task_ids=state_task_ids
execution_order=postprocess_execution_order,
depend_on_task_ids=state_task_ids,
)
all_tasks_run += state_task_ids + postprocessing_tasks
launcher.azure_client.monitor_job(job_id)
35 changes: 0 additions & 35 deletions src/resp_ode/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1898,38 +1898,3 @@ def find_files(
postprocess_files = glob.glob(pattern, recursive=recursive)

return [os.path.basename(file) for file in postprocess_files]


def sort_filenames_by_suffix(filenames) -> list[str]:
"""Given a list of filenames, sorts them by the _1/2/3 suffix
handles a no suffix case as first element.
An example ordering would be:
`[file.py, file_0.py, file_5.py, file_11.py, file_new_15.py]`
Parameters
----------
filenames : list[str]
list of filenames, ending with _int suffixes.
Returns
----------
same filenames list but sorted by suffix order.
"""

def extract_number(filename):
# Find the last occurrence of '_'
last_underscore_index = filename.rfind("_")

# Check if '_' exists in the filename
if last_underscore_index != -1:
start = last_underscore_index + 1
end = filename.rfind(".")
number_str = filename[start:end]

# Check if the extracted substring is a valid integer
if number_str.isdigit():
return int(number_str)

return 0

return sorted(filenames, key=extract_number)

0 comments on commit 29ab545

Please sign in to comment.