Skip to content

Commit

Permalink
Update study configuration during execution (#445)
Browse files Browse the repository at this point in the history
Add the ability to change some of a study's configuration during execution, including:
   * rlimit (restart limit)
   * throttle
   * sleep

Implementation works like cancel, taking paths to executing study workspaces. Configuration parameters can be updated either via an interactive prompt for each study workspace given, or using explicit/repeated keyword arguments for each configuration parameter.

Current Limitations:
   * rlimit is a per step configuration, but it is only set globally in current implementation
   * If a study is no longer running, updating rlimit will not yet resume the study to account for any increases

Adds termynal to the docs to improve readability of interactive cli commands and their outputs
  • Loading branch information
jwhite242 authored Dec 12, 2024
1 parent 524b6c4 commit 0802410
Show file tree
Hide file tree
Showing 8 changed files with 543 additions and 159 deletions.
83 changes: 83 additions & 0 deletions docs/Maestro/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ maestro [OPTIONS] COMMAND [ARGS]...
- [*cancel*](#cancel): Cancel all running jobs.
- [*run*](#run): Launch a study based on a specification
- [*status*](#status): Check the status of a running study.
- [*update*](#update): Update a running study

### **cancel**

Expand Down Expand Up @@ -106,6 +107,88 @@ maestro status [OPTIONS] DIRECTORY [DIRECTORY ...]
| `--disable-pager` | boolean | Turn off the pager for the status display. See [Status Pager](monitoring.md#status-pager) for more information on this option. | `False` |


### **update**

Update the config of a running study. Currently limited to three settings: throttle, restart limit (rlimit), and sleep. Explicitly set each argument via keyword args, interactively set for each study, or a mix of the two. Supports updating multiple studies at once.

!!! note

This command will drop a hidden file in your study workspace '.study.update.lock' which conductor reads asynchronously and removes upon successful reading. Applying this command to a finished study will currently leave this file in your workspace. Similarly, this file will also not be cleaned up if conductor crashes before reading.

**Usage:**

``` console
maestro update [-h] [--rlimit RLIMIT] [--sleep SLEEP] [--throttle THROTTLE] DIRECTORY [DIRECTORY ...]
```

**Options:**

| Name | Type | Description | Default |
| ---- | ---- | ----------- | ------- |
| `-h`, `--help` | boolean | Show this help message and exit. | `False` |
| `--rlimit` | integer | Update maximum number of restarts when steps specify a restart command (0 denotes no limit) | None |
| `--sleep` | integer | Update the time (in seconds) that the manager (conductor) will wait between job status checks. | None |
| `--throttle` | integer | Update the maximum number of inflight jobs allowed to execute simultaneously (0 denotes no throttling). | None |


#### **Examples**

**Update a single study configuration value for a single study:**

``` console title="Change single config value for a single study"
maestro update --rlimit 4 /path/to/my/timestamped/study/workspace/
```

**Update multiple study configuration values for a single study:**

``` console title="Change multiple config values for a single study"
maestro update --rlimit 4 --throttle 2 /path/to/my/timestamped/study/workspace/
```
**Update single study configuration value for multiple studies:**

``` console title="Single config value, two studies"
maestro update --rlimit 4 --rlimit 2 /path/to/my/timestamped/study/workspace_1/ /path/to/my/timestamped/study/workspace_2/
```

**Update multiple study configuration values for multiple studies:**

``` console title="Multiple config values, two studies"
maestro update --rlimit 4 --rlimit 2 /path/to/my/timestamped/study/workspace_1/ /path/to/my/timestamped/study/workspace_2/
```

**Interactively update study configuration for one study:**

<!-- termynal -->
```
$ maestro update ./sample_output/hello_world_restart/hello_bye_world_20241119-173122
Updating study at '/path/to/sample_output/hello_world_restart/hello_bye_world_20241119-173122'
Choose study config to update, or done/quit to finish/abort
[rlimit/throttle/sleep/done/quit]
> rlimit
Enter new restart limit [Integer, 0 = unlimited]
> 4
Choose study config to update, or quit
[rlimit/throttle/sleep/done/quit]
> sleep
Enter new sleep duration for Conductor [Integer, seconds]
> 30
Choose study config to update, or quit
[rlimit/throttle/sleep/done/quit]
> quit
Discarding updates to 'sample_output/hello_world_restart/hello_bye_world_20241119-173122/'
$ maestro update ./sample_output/hello_world_restart/hello_bye_world_20241119-173122
Updating study at '/path/to/sample_output/hello_world_restart/hello_bye_world_20241119-173122'
Choose study config to update, or done/quit to finish/abort
[rlimit/throttle/sleep/done/quit]
> rlimit
Enter new restart limit [Integer, 0 = unlimited]
> 4
Choose study config to update, or quit
[rlimit/throttle/sleep/done/quit]
> done
Writing updated study config to 'sample_output/hello_world_restart/hello_bye_world_20241119-173122/.study.update.lock'
```

## **conductor**

A application for checking and managing and ExecutionDAG within an executing study.
Expand Down
77 changes: 77 additions & 0 deletions maestrowf/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import inspect
import logging
import os
import shutil
import sys
from time import sleep
import dill
Expand Down Expand Up @@ -140,6 +141,7 @@ class Conductor:

_pkl_extension = ".study.pkl"
_cancel_lock = ".cancel.lock"
_study_update = ".study.update.lock"
_batch_info = "batch.info"

def __init__(self, study):
Expand Down Expand Up @@ -289,6 +291,60 @@ def mark_cancelled(cls, output_path):
with open(lock_path, 'a'):
os.utime(lock_path, None)

@classmethod
def update_study_exec(cls, output_path, updated_config):
"""
Mark the study rooted at 'out_path'.
:param out_path: A string containing the patht to a study root.
:returns: A dictionary containing the status of the study.
"""
study_update_path = make_safe_path(output_path, cls._study_update)
# NOTE: should we timestamp these, or append separate timestamped docs
# to the yaml?
print(f"Writing updated study config to '{study_update_path}'")
with open(study_update_path, 'wb') as study_update:
study_update.write(yaml.dump(updated_config).encode("utf-8"))

@classmethod
def load_updated_study_exec(cls, output_path):
"""
Load the updated study config for the study rooted in 'out_path'.
:param out_path: A string containing the path to a study root.
:returns: A dict containing the updated config for the study.
"""
study_update_path = os.path.join(output_path, cls._study_update)

if not os.path.exists(study_update_path):
# No update record found
return {}

with open(study_update_path, 'r') as data:
try:
updated_study_config = yaml.load(data, yaml.FullLoader)
except AttributeError:
LOGGER.warning(
"*** PyYAML is using an unsafe version with a known "
"load vulnerability. Please upgrade your installation "
"to a more recent version! ***")
updated_study_config = yaml.load(data)

if updated_study_config:
LOGGER.debug("Successfully read updated study config; removing record at %s",
study_update_path)
os.remove(study_update_path)

return updated_study_config

@property
def sleep_time(self):
return self._sleep_time

@sleep_time.setter
def sleep_time(self, new_sleep_time):
self._sleep_time = new_sleep_time

def initialize(self, batch_info, sleeptime=60):
"""
Initializes the Conductor instance based on the stored study.
Expand Down Expand Up @@ -318,6 +374,7 @@ def monitor_study(self):

# Set some fixed variables that monitor will use.
cancel_lock_path = make_safe_path(self.output_path, self._cancel_lock)
study_update_path = make_safe_path(self.output_path, self._study_update)
dag = self._exec_dag
pkl_path = \
os.path.join(self._pkl_path, "{}.pkl".format(self._study.name))
Expand Down Expand Up @@ -346,6 +403,26 @@ def monitor_study(self):
LOGGER.error("Failed to acquire cancellation lock.")
pass

if os.path.exists(study_update_path):
updated_study_config = self.load_updated_study_exec(self.output_path)

if "throttle" in updated_study_config and updated_study_config["throttle"]:
LOGGER.info("Updating throttle from %d to %d",
dag._submission_throttle, # NOTE: make a property?
updated_study_config["throttle"])
dag.update_throttle(updated_study_config["throttle"])

if "rlimit" in updated_study_config and updated_study_config["rlimit"]:
LOGGER.info("Updating restart limit to %d",
updated_study_config["rlimit"])
dag.update_rlimit(updated_study_config["rlimit"])

if "sleep" in updated_study_config and updated_study_config["sleep"]:
LOGGER.info("Updating conductor sleep time from %s to %s",
str(self.sleep_time),
str(updated_study_config["sleep"]))
self.sleep_time = updated_study_config["sleep"]

LOGGER.info("Checking DAG status at %s", str(datetime.now()))
# Execute steps that are ready
# Receives StudyStatus enum
Expand Down
29 changes: 29 additions & 0 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ def params(self):
self._params = {}
return self._params

@property
def restart_limit(self):
return self._restart_limit

@restart_limit.setter
def restart_limit(self, new_restart_limit):
self._restart_limit = new_restart_limit

def setup_workspace(self):
"""Initialize the record's workspace."""
create_parentdir(self.workspace.value)
Expand Down Expand Up @@ -399,6 +407,27 @@ def _check_tmp_dir(self):
if self._tmp_dir and not os.path.exists(self._tmp_dir):
self._tmp_dir = tempfile.mkdtemp()

def update_throttle(self, new_submission_throttle):
self._submission_throttle = new_submission_throttle

def update_rlimit(self, new_restart_limit):
# subtree, _ = self.bfs_subtree(SOURCE)

# update_subtree = [key for key in subtree
# if key != '_source']

for key in self.values.keys():
if key == SOURCE:
continue

# Get the step record to update
step_record = self.values[key]
LOGGER.debug("Updating restart limit from %d to %d for step '%s'",
step_record.restart_limit,
new_restart_limit,
key)
step_record.restart_limit = new_restart_limit

def add_step(self, name, step, workspace, restart_limit, params=None):
"""
Add a StepRecord to the ExecutionGraph.
Expand Down
Loading

0 comments on commit 0802410

Please sign in to comment.