Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update study configuration during execution #445

Merged
merged 16 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions docs/Maestro/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ maestro [OPTIONS] COMMAND [ARGS]...
- [*cancel*](#cancel): Cancel all running jobs.
- [*run*](#run): Launch a study based on a specification
- [*status*](#status): Check the status of a running study.
- [*update*](#update): Update a running study

### **cancel**

Expand Down Expand Up @@ -106,6 +107,88 @@ maestro status [OPTIONS] DIRECTORY [DIRECTORY ...]
| `--disable-pager` | boolean | Turn off the pager for the status display. See [Status Pager](monitoring.md#status-pager) for more information on this option. | `False` |


### **update**

Update the config of a running study. Currently limited to three settings: throttle, restart limit (rlimit), and sleep. Explicitly set each argument via keyword args, interactively set for each study, or a mix of the two. Supports updating multiple studies at once.

!!! note

This command will drop a hidden file in your study workspace '.study.update.lock' which conductor reads asynchronously and removes upon successful reading. Applying this command to a finished study will currently leave this file in your workspace. Similarly, this file will also not be cleaned up if conductor crashes before reading.

**Usage:**

``` console
maestro update [-h] [--rlimit RLIMIT] [--sleep SLEEP] [--throttle THROTTLE] DIRECTORY [DIRECTORY ...]
```

**Options:**

| Name | Type | Description | Default |
| ---- | ---- | ----------- | ------- |
| `-h`, `--help` | boolean | Show this help message and exit. | `False` |
| `--rlimit` | integer | Update maximum number of restarts when steps specify a restart command (0 denotes no limit) | None |
| `--sleep` | integer | Update the time (in seconds) that the manager (conductor) will wait between job status checks. | None |
| `--throttle` | integer | Update the maximum number of inflight jobs allowed to execute simultaneously (0 denotes no throttling). | None |


#### **Examples**

**Update a single study configuration value for a single study:**

``` console title="Change single config value for a single study"
maestro update --rlimit 4 /path/to/my/timestamped/study/workspace/
```

**Update multiple study configuration values for a single study:**

``` console title="Change multiple config values for a single study"
maestro update --rlimit 4 --throttle 2 /path/to/my/timestamped/study/workspace/
```
**Update single study configuration value for multiple studies:**

``` console title="Single config value, two studies"
maestro update --rlimit 4 --rlimit 2 /path/to/my/timestamped/study/workspace_1/ /path/to/my/timestamped/study/workspace_2/
```

**Update multiple study configuration values for multiple studies:**

``` console title="Multiple config values, two studies"
maestro update --rlimit 4 --rlimit 2 /path/to/my/timestamped/study/workspace_1/ /path/to/my/timestamped/study/workspace_2/
```

**Interactively update study configuration for one study:**

<!-- termynal -->
```
$ maestro update ./sample_output/hello_world_restart/hello_bye_world_20241119-173122
Updating study at '/path/to/sample_output/hello_world_restart/hello_bye_world_20241119-173122'
Choose study config to update, or done/quit to finish/abort
[rlimit/throttle/sleep/done/quit]
> rlimit
Enter new restart limit [Integer, 0 = unlimited]
> 4
Choose study config to update, or quit
[rlimit/throttle/sleep/done/quit]
> sleep
Enter new sleep duration for Conductor [Integer, seconds]
> 30
Choose study config to update, or quit
[rlimit/throttle/sleep/done/quit]
> quit
Discarding updates to 'sample_output/hello_world_restart/hello_bye_world_20241119-173122/'
$ maestro update ./sample_output/hello_world_restart/hello_bye_world_20241119-173122
Updating study at '/path/to/sample_output/hello_world_restart/hello_bye_world_20241119-173122'
Choose study config to update, or done/quit to finish/abort
[rlimit/throttle/sleep/done/quit]
> rlimit
Enter new restart limit [Integer, 0 = unlimited]
> 4
Choose study config to update, or quit
[rlimit/throttle/sleep/done/quit]
> done
Writing updated study config to 'sample_output/hello_world_restart/hello_bye_world_20241119-173122/.study.update.lock'
```

## **conductor**

A application for checking and managing and ExecutionDAG within an executing study.
Expand Down
77 changes: 77 additions & 0 deletions maestrowf/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import inspect
import logging
import os
import shutil
import sys
from time import sleep
import dill
Expand Down Expand Up @@ -140,6 +141,7 @@ class Conductor:

_pkl_extension = ".study.pkl"
_cancel_lock = ".cancel.lock"
_study_update = ".study.update.lock"
_batch_info = "batch.info"

def __init__(self, study):
Expand Down Expand Up @@ -289,6 +291,60 @@ def mark_cancelled(cls, output_path):
with open(lock_path, 'a'):
os.utime(lock_path, None)

@classmethod
def update_study_exec(cls, output_path, updated_config):
"""
Mark the study rooted at 'out_path'.

:param out_path: A string containing the patht to a study root.
:returns: A dictionary containing the status of the study.
"""
study_update_path = make_safe_path(output_path, cls._study_update)
# NOTE: should we timestamp these, or append separate timestamped docs
# to the yaml?
print(f"Writing updated study config to '{study_update_path}'")
with open(study_update_path, 'wb') as study_update:
study_update.write(yaml.dump(updated_config).encode("utf-8"))

@classmethod
def load_updated_study_exec(cls, output_path):
"""
Load the updated study config for the study rooted in 'out_path'.

:param out_path: A string containing the path to a study root.
:returns: A dict containing the updated config for the study.
"""
study_update_path = os.path.join(output_path, cls._study_update)

if not os.path.exists(study_update_path):
# No update record found
return {}

with open(study_update_path, 'r') as data:
try:
updated_study_config = yaml.load(data, yaml.FullLoader)
except AttributeError:
LOGGER.warning(
"*** PyYAML is using an unsafe version with a known "
"load vulnerability. Please upgrade your installation "
"to a more recent version! ***")
updated_study_config = yaml.load(data)

if updated_study_config:
LOGGER.debug("Successfully read updated study config; removing record at %s",
study_update_path)
os.remove(study_update_path)

return updated_study_config

@property
def sleep_time(self):
return self._sleep_time

@sleep_time.setter
def sleep_time(self, new_sleep_time):
self._sleep_time = new_sleep_time

def initialize(self, batch_info, sleeptime=60):
"""
Initializes the Conductor instance based on the stored study.
Expand Down Expand Up @@ -318,6 +374,7 @@ def monitor_study(self):

# Set some fixed variables that monitor will use.
cancel_lock_path = make_safe_path(self.output_path, self._cancel_lock)
study_update_path = make_safe_path(self.output_path, self._study_update)
dag = self._exec_dag
pkl_path = \
os.path.join(self._pkl_path, "{}.pkl".format(self._study.name))
Expand Down Expand Up @@ -346,6 +403,26 @@ def monitor_study(self):
LOGGER.error("Failed to acquire cancellation lock.")
pass

if os.path.exists(study_update_path):
updated_study_config = self.load_updated_study_exec(self.output_path)

if "throttle" in updated_study_config and updated_study_config["throttle"]:
LOGGER.info("Updating throttle from %d to %d",
dag._submission_throttle, # NOTE: make a property?
updated_study_config["throttle"])
dag.update_throttle(updated_study_config["throttle"])

if "rlimit" in updated_study_config and updated_study_config["rlimit"]:
LOGGER.info("Updating restart limit to %d",
updated_study_config["rlimit"])
dag.update_rlimit(updated_study_config["rlimit"])

if "sleep" in updated_study_config and updated_study_config["sleep"]:
LOGGER.info("Updating conductor sleep time from %s to %s",
str(self.sleep_time),
str(updated_study_config["sleep"]))
self.sleep_time = updated_study_config["sleep"]

LOGGER.info("Checking DAG status at %s", str(datetime.now()))
# Execute steps that are ready
# Receives StudyStatus enum
Expand Down
29 changes: 29 additions & 0 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ def params(self):
self._params = {}
return self._params

@property
def restart_limit(self):
return self._restart_limit

@restart_limit.setter
def restart_limit(self, new_restart_limit):
self._restart_limit = new_restart_limit

def setup_workspace(self):
"""Initialize the record's workspace."""
create_parentdir(self.workspace.value)
Expand Down Expand Up @@ -399,6 +407,27 @@ def _check_tmp_dir(self):
if self._tmp_dir and not os.path.exists(self._tmp_dir):
self._tmp_dir = tempfile.mkdtemp()

def update_throttle(self, new_submission_throttle):
self._submission_throttle = new_submission_throttle

def update_rlimit(self, new_restart_limit):
# subtree, _ = self.bfs_subtree(SOURCE)

# update_subtree = [key for key in subtree
# if key != '_source']

for key in self.values.keys():
if key == SOURCE:
continue

# Get the step record to update
step_record = self.values[key]
LOGGER.debug("Updating restart limit from %d to %d for step '%s'",
step_record.restart_limit,
new_restart_limit,
key)
step_record.restart_limit = new_restart_limit

def add_step(self, name, step, workspace, restart_limit, params=None):
"""
Add a StepRecord to the ExecutionGraph.
Expand Down
Loading
Loading