From c35d02cd5ca46c5585ca0de9119c6be8bd164ae8 Mon Sep 17 00:00:00 2001 From: rajithkrishnegowda <134698520+rajithkrishnegowda@users.noreply.github.com> Date: Mon, 13 Jan 2025 13:55:35 +0530 Subject: [PATCH 1/9] split matrix and add task runner api (#1259) * split matrix and add task runner api * update taskrunner.yml * check ruff version * split matrix and add task runner api * update taskrunner.yml * check ruff version * test * test * update * cleanup --- .github/workflows/lint.yml | 2 +- .github/workflows/taskrunner.yml | 18 ++++-------------- .github/workflows/ubuntu.yml | 5 ++++- .github/workflows/windows.yml | 6 +++++- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 36d7fdc41e..afa5ea6a2f 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -25,6 +25,6 @@ jobs: - name: Install linters run: | python -m pip install --upgrade pip - pip install -r linters-requirements.txt + pip install -r linters-requirements.txt - name: Lint with OpenFL-specific rules run: bash scripts/lint.sh diff --git a/.github/workflows/taskrunner.yml b/.github/workflows/taskrunner.yml index a9093be4c1..d003ad8e1c 100644 --- a/.github/workflows/taskrunner.yml +++ b/.github/workflows/taskrunner.yml @@ -17,11 +17,7 @@ env: jobs: build: if: github.event.pull_request.draft == false - strategy: - matrix: - os: ['ubuntu-latest', 'windows-latest'] - python-version: ["3.10", "3.11", "3.12"] - runs-on: ${{ matrix.os }} + runs-on: ubuntu-latest timeout-minutes: 15 steps: @@ -29,17 +25,11 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: ${{ matrix.python-version }} + python-version: "3.10" - name: Install dependencies ubuntu - if: matrix.os == 'ubuntu-latest' run: | python -m pip install --upgrade pip pip install . - - name: Install dependencies windows - if: matrix.os == 'windows-latest' + - name: Task Runner API run: | - python -m pip install --upgrade pip - pip install . - - name: Test TaskRunner API - run: | - python -m tests.github.test_hello_federation --template keras_cnn_mnist --fed_workspace aggregator --col1 col1 --col2 col2 --rounds-to-train 3 --save-model output_model + python -m tests.github.test_hello_federation --template torch_cnn_mnist --fed_workspace aggregator --col1 collaborator1 --col2 collaborator2 --rounds-to-train 3 --save-model output_model \ No newline at end of file diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index c968e85f11..617c004751 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -13,6 +13,9 @@ env: jobs: pytest-coverage: # from pytest_coverage.yml + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12"] runs-on: ubuntu-latest timeout-minutes: 15 @@ -21,7 +24,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.10" + python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index 341b93b7f1..5f3ffa1220 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -13,14 +13,18 @@ env: jobs: pytest-coverage: # from pytest_coverage.yml + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12"] runs-on: windows-latest timeout-minutes: 15 + steps: - uses: actions/checkout@v3 - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.10" + python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip From db9c0bdbf5d3736de2d434cd2dc272764ebec0b2 Mon Sep 17 00:00:00 2001 From: refai06 <149057514+refai06@users.noreply.github.com> Date: Mon, 13 Jan 2025 17:42:25 +0530 Subject: [PATCH 2/9] Streamline FederatedRuntime Workspace by removing Unnecessary Files (#1262) * Clean created dir-workspace Signed-off-by: refai06 * Update cleanup dir-workspace Signed-off-by: refai06 * Update docstring Signed-off-by: refai06 * Improve code-structure Signed-off-by: refai06 --------- Signed-off-by: refai06 --- .../workflow/workspace_export/export.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/openfl/experimental/workflow/workspace_export/export.py b/openfl/experimental/workflow/workspace_export/export.py index 3975e83e44..1dd9e847c4 100644 --- a/openfl/experimental/workflow/workspace_export/export.py +++ b/openfl/experimental/workflow/workspace_export/export.py @@ -88,7 +88,6 @@ def __init__(self, notebook_path: str, output_workspace: str) -> None: f"{export_filename}.py", ) ).resolve() - print_tree(self.created_workspace_path, level=2) # Generated python script name without .py extension self.script_name = self.script_path.name.split(".")[0].strip() @@ -290,6 +289,8 @@ def export_federated(cls, notebook_path: str, output_workspace: str) -> Tuple[st instance = cls(notebook_path, output_workspace) instance.generate_requirements() instance.generate_plan_yaml() + instance._clean_generated_workspace() + print_tree(output_workspace, level=2) return instance.generate_experiment_archive() @classmethod @@ -304,6 +305,7 @@ def export(cls, notebook_path: str, output_workspace: str) -> None: instance.generate_requirements() instance.generate_plan_yaml() instance.generate_data_yaml() + print_tree(output_workspace, level=2) def generate_experiment_archive(self) -> Tuple[str, str]: """ @@ -357,6 +359,20 @@ def generate_requirements(self) -> None: if i not in line_nos: f.write(line) + def _clean_generated_workspace(self) -> None: + """ + Remove cols.yaml and data.yaml from the generated workspace + as these are not needed in FederatedRuntime (Director based workflow) + + """ + cols_file = self.output_workspace_path.joinpath("plan", "cols.yaml") + data_file = self.output_workspace_path.joinpath("plan", "data.yaml") + + if cols_file.exists(): + cols_file.unlink() + if data_file.exists(): + data_file.unlink() + def generate_plan_yaml(self) -> None: """ Generates plan.yaml From 486794707abb0e7cbdb2ff2e0534450e5998a4a0 Mon Sep 17 00:00:00 2001 From: Kush Agrawal Date: Mon, 13 Jan 2025 17:47:19 +0530 Subject: [PATCH 3/9] Update numpy regression sample (#1256) Signed-off-by: Agrawal, Kush --- .../workflow/105_Numpy_Linear_Regression_Workflow.ipynb | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/openfl-tutorials/experimental/workflow/105_Numpy_Linear_Regression_Workflow.ipynb b/openfl-tutorials/experimental/workflow/105_Numpy_Linear_Regression_Workflow.ipynb index c076b87e12..ada59a995f 100644 --- a/openfl-tutorials/experimental/workflow/105_Numpy_Linear_Regression_Workflow.ipynb +++ b/openfl-tutorials/experimental/workflow/105_Numpy_Linear_Regression_Workflow.ipynb @@ -42,10 +42,6 @@ "metadata": {}, "outputs": [], "source": [ - "# Below code will display the print statement output on screen as well\n", - "import sys\n", - "sys.stdout = open('/dev/stdout', 'w')\n", - "\n", "!pip install git+https://github.com/securefederatedai/openfl.git\n", "!pip install -r workflow_interface_requirements.txt\n", "!pip install matplotlib\n", @@ -308,7 +304,7 @@ " self.current_round += 1\n", " if self.current_round < self.rounds:\n", " self.next(self.aggregated_model_validation,\n", - " foreach='collaborators', exclude=['private'])\n", + " foreach='collaborators')\n", " else:\n", " self.next(self.end)\n", "\n", From 33004f668214f3c7f1a418c3a986c8eb6da4cc92 Mon Sep 17 00:00:00 2001 From: Eran Lerer Date: Tue, 14 Jan 2025 19:34:03 +0200 Subject: [PATCH 4/9] Add persistent db module and recovery logic (#1229) * Add persistent db module and recovery logic Signed-off-by: Lerer, Eran * Address code review comments Signed-off-by: Lerer, Eran * Adding persist_checkpoint flag to the plan Signed-off-by: Lerer, Eran * Handling next round model tensors Signed-off-by: Lerer, Eran * Changing peristed path to be saved along with the proto model files, for Gramine as well Signed-off-by: Lerer, Eran --------- Signed-off-by: Lerer, Eran --- docs/about/features_index/taskrunner.rst | 5 +- .../workspace/plan/defaults/aggregator.yaml | 2 + openfl/component/aggregator/aggregator.py | 172 +++++++-- openfl/databases/__init__.py | 1 + openfl/databases/persistent_db.py | 365 ++++++++++++++++++ openfl/databases/tensor_db.py | 33 ++ 6 files changed, 553 insertions(+), 25 deletions(-) create mode 100644 openfl/databases/persistent_db.py diff --git a/docs/about/features_index/taskrunner.rst b/docs/about/features_index/taskrunner.rst index 2097c72f32..f8730f463b 100644 --- a/docs/about/features_index/taskrunner.rst +++ b/docs/about/features_index/taskrunner.rst @@ -44,8 +44,9 @@ Configurable Settings - :code:`best_state_path`: (str:path) Defines the weight protobuf file path that will be saved to for the highest accuracy model during the experiment. - :code:`last_state_path`: (str:path) Defines the weight protobuf file path that will be saved to during the last round completed in each experiment. - :code:`rounds_to_train`: (int) Specifies the number of rounds in a federation. A federated learning round is defined as one complete iteration when the collaborators train the model and send the updated model weights back to the aggregator to form a new global model. Within a round, collaborators can train the model for multiple iterations called epochs. - - :code:`write_logs`: (boolean) Metric logging callback feature. By default, logging is done through `tensorboard `_ but users can also use custom metric logging function for each task. - + - :code:`write_logs`: (boolean) Metric logging callback feature. By default, logging is done through `tensorboard `_ but users can also use custom metric logging function for each task. + - :code:`persist_checkpoint`: (boolean) Specifies whether to enable the storage of a persistent checkpoint in non-volatile storage for recovery purposes. When enabled, the aggregator will restore its state to what it was prior to the restart, ensuring continuity after a restart. + - :code:`persistent_db_path`: (str:path) Defines the persisted database path. - :class:`Collaborator ` `openfl.component.Collaborator `_ diff --git a/openfl-workspace/workspace/plan/defaults/aggregator.yaml b/openfl-workspace/workspace/plan/defaults/aggregator.yaml index 43d923b996..d4204e2b0c 100644 --- a/openfl-workspace/workspace/plan/defaults/aggregator.yaml +++ b/openfl-workspace/workspace/plan/defaults/aggregator.yaml @@ -1,3 +1,5 @@ template : openfl.component.Aggregator settings : db_store_rounds : 2 + persist_checkpoint: True + persistent_db_path: save/tensor.db diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index eaac9fa6a0..9964f331d4 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -12,10 +12,11 @@ import openfl.callbacks as callbacks_module from openfl.component.straggler_handling_functions import CutoffTimeBasedStragglerHandling -from openfl.databases import TensorDB +from openfl.databases import PersistentTensorDB, TensorDB from openfl.interface.aggregation_functions import WeightedAverage from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import base_pb2, utils +from openfl.protocols.base_pb2 import NamedTensor from openfl.utilities import TaskResultKey, TensorKey, change_tags logger = logging.getLogger(__name__) @@ -82,6 +83,8 @@ def __init__( log_memory_usage=False, write_logs=False, callbacks: Optional[List] = None, + persist_checkpoint=True, + persistent_db_path=None, ): """Initializes the Aggregator. @@ -110,6 +113,7 @@ def __init__( callbacks: List of callbacks to be used during the experiment. """ self.round_number = 0 + self.next_model_round_number = 0 if single_col_cert_common_name: logger.warning( @@ -137,6 +141,16 @@ def __init__( self.quit_job_sent_to = [] self.tensor_db = TensorDB() + if persist_checkpoint: + persistent_db_path = persistent_db_path or "tensor.db" + logger.info( + "Persistent checkpoint is enabled, setting persistent db at path %s", + persistent_db_path, + ) + self.persistent_db = PersistentTensorDB(persistent_db_path) + else: + logger.info("Persistent checkpoint is disabled") + self.persistent_db = None # FIXME: I think next line generates an error on the second round # if it is set to 1 for the aggregator. self.db_store_rounds = db_store_rounds @@ -154,8 +168,25 @@ def __init__( # TODO: Remove. Used in deprecated interactive and native APIs self.best_tensor_dict: dict = {} self.last_tensor_dict: dict = {} + # these enable getting all tensors for a task + self.collaborator_tasks_results = {} # {TaskResultKey: list of TensorKeys} + self.collaborator_task_weight = {} # {TaskResultKey: data_size} - if initial_tensor_dict: + # maintain a list of collaborators that have completed task and + # reported results in a given round + self.collaborators_done = [] + # Initialize a lock for thread safety + self.lock = Lock() + self.use_delta_updates = use_delta_updates + + self.model = None # Initialize the model attribute to None + if self.persistent_db and self._recover(): + logger.info("recovered state of aggregator") + + # The model is built by recovery if at least one round has finished + if self.model: + logger.info("Model was loaded by recovery") + elif initial_tensor_dict: self._load_initial_tensors_from_dict(initial_tensor_dict) self.model = utils.construct_model_proto( tensor_dict=initial_tensor_dict, @@ -168,20 +199,6 @@ def __init__( self.collaborator_tensor_results = {} # {TensorKey: nparray}} - # these enable getting all tensors for a task - self.collaborator_tasks_results = {} # {TaskResultKey: list of TensorKeys} - - self.collaborator_task_weight = {} # {TaskResultKey: data_size} - - # maintain a list of collaborators that have completed task and - # reported results in a given round - self.collaborators_done = [] - - # Initialize a lock for thread safety - self.lock = Lock() - - self.use_delta_updates = use_delta_updates - # Callbacks self.callbacks = callbacks_module.CallbackList( callbacks, @@ -195,6 +212,79 @@ def __init__( self.callbacks.on_experiment_begin() self.callbacks.on_round_begin(self.round_number) + def _recover(self): + """Populates the aggregator state to the state it was prior a restart""" + recovered = False + # load tensors persistent DB + tensor_key_dict = self.persistent_db.load_tensors( + self.persistent_db.get_tensors_table_name() + ) + if len(tensor_key_dict) > 0: + logger.info(f"Recovering {len(tensor_key_dict)} model tensors") + recovered = True + self.tensor_db.cache_tensor(tensor_key_dict) + committed_round_number, self.best_model_score = ( + self.persistent_db.get_round_and_best_score() + ) + logger.info("Recovery - Setting model proto") + to_proto_tensor_dict = {} + for tk in tensor_key_dict: + tk_name, _, _, _, _ = tk + to_proto_tensor_dict[tk_name] = tensor_key_dict[tk] + self.model = utils.construct_model_proto( + to_proto_tensor_dict, committed_round_number, self.compression_pipeline + ) + # round number is the current round which is still in process + # i.e. committed_round_number + 1 + self.round_number = committed_round_number + 1 + logger.info( + "Recovery - loaded round number %s and best score %s", + self.round_number, + self.best_model_score, + ) + + next_round_tensor_key_dict = self.persistent_db.load_tensors( + self.persistent_db.get_next_round_tensors_table_name() + ) + if len(next_round_tensor_key_dict) > 0: + logger.info(f"Recovering {len(next_round_tensor_key_dict)} next round model tensors") + recovered = True + self.tensor_db.cache_tensor(next_round_tensor_key_dict) + + logger.debug("Recovery - this is the tensor_db after recovery: %s", self.tensor_db) + + if self.persistent_db.is_task_table_empty(): + logger.debug("task table is empty") + return recovered + + logger.info("Recovery - Replaying saved task results") + task_id = 1 + while True: + task_result = self.persistent_db.get_task_result_by_id(task_id) + if not task_result: + break + recovered = True + collaborator_name = task_result["collaborator_name"] + round_number = task_result["round_number"] + task_name = task_result["task_name"] + data_size = task_result["data_size"] + serialized_tensors = task_result["named_tensors"] + named_tensors = [ + NamedTensor.FromString(serialized_tensor) + for serialized_tensor in serialized_tensors + ] + logger.info( + "Recovery - Replaying task results %s %s %s", + collaborator_name, + round_number, + task_name, + ) + self.process_task_results( + collaborator_name, round_number, task_name, data_size, named_tensors + ) + task_id += 1 + return recovered + def _load_initial_tensors(self): """Load all of the tensors required to begin federated learning. @@ -255,9 +345,12 @@ def _save_model(self, round_number, file_path): for k, v in og_tensor_dict.items() ] tensor_dict = {} + tensor_tuple_dict = {} for tk in tensor_keys: tk_name, _, _, _, _ = tk - tensor_dict[tk_name] = self.tensor_db.get_tensor_from_cache(tk) + tensor_value = self.tensor_db.get_tensor_from_cache(tk) + tensor_dict[tk_name] = tensor_value + tensor_tuple_dict[tk] = tensor_value if tensor_dict[tk_name] is None: logger.info( "Cannot save model for round %s. Continuing...", @@ -267,6 +360,19 @@ def _save_model(self, round_number, file_path): if file_path == self.best_state_path: self.best_tensor_dict = tensor_dict if file_path == self.last_state_path: + # Transaction to persist/delete all data needed to increment the round + if self.persistent_db: + if self.next_model_round_number > 0: + next_round_tensors = self.tensor_db.get_tensors_by_round_and_tags( + self.next_model_round_number, ("model",) + ) + self.persistent_db.finalize_round( + tensor_tuple_dict, next_round_tensors, self.round_number, self.best_model_score + ) + logger.info( + "Persist model and clean task result for round %s", + round_number, + ) self.last_tensor_dict = tensor_dict self.model = utils.construct_model_proto( tensor_dict, round_number, self.compression_pipeline @@ -606,6 +712,31 @@ def send_local_task_results( Returns: None """ + # Save task and its metadata for recovery + serialized_tensors = [tensor.SerializeToString() for tensor in named_tensors] + if self.persistent_db: + self.persistent_db.save_task_results( + collaborator_name, round_number, task_name, data_size, serialized_tensors + ) + logger.debug( + f"Persisting task results {task_name} from {collaborator_name} round {round_number}" + ) + logger.info( + f"Collaborator {collaborator_name} is sending task results " + f"for {task_name}, round {round_number}" + ) + self.process_task_results( + collaborator_name, round_number, task_name, data_size, named_tensors + ) + + def process_task_results( + self, + collaborator_name, + round_number, + task_name, + data_size, + named_tensors, + ): if self._time_to_quit() or collaborator_name in self.stragglers: logger.warning( f"STRAGGLER: Collaborator {collaborator_name} is reporting results " @@ -620,11 +751,6 @@ def send_local_task_results( ) return - logger.info( - f"Collaborator {collaborator_name} is sending task results " - f"for {task_name}, round {round_number}" - ) - task_key = TaskResultKey(task_name, collaborator_name, round_number) # we mustn't have results already @@ -864,7 +990,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result new_model_report, ("model",), ) - + self.next_model_round_number = new_model_round_number # Finally, cache the updated model tensor self.tensor_db.cache_tensor({final_model_tk: new_model_nparray}) diff --git a/openfl/databases/__init__.py b/openfl/databases/__init__.py index 849fcde7c9..0e64082d5f 100644 --- a/openfl/databases/__init__.py +++ b/openfl/databases/__init__.py @@ -2,4 +2,5 @@ # SPDX-License-Identifier: Apache-2.0 +from openfl.databases.persistent_db import PersistentTensorDB from openfl.databases.tensor_db import TensorDB diff --git a/openfl/databases/persistent_db.py b/openfl/databases/persistent_db.py new file mode 100644 index 0000000000..7fe0c6463f --- /dev/null +++ b/openfl/databases/persistent_db.py @@ -0,0 +1,365 @@ +import json +import logging +import pickle +import sqlite3 +from threading import Lock +from typing import Dict, Optional + +import numpy as np + +from openfl.utilities import TensorKey + +logger = logging.getLogger(__name__) + +__all__ = ["PersistentTensorDB"] + + +class PersistentTensorDB: + """ + The PersistentTensorDB class implements a database + for storing tensors and metadata using SQLite. + + Attributes: + conn: The SQLite connection object. + cursor: The SQLite cursor object. + lock: A threading Lock object used to ensure thread-safe operations. + """ + + TENSORS_TABLE = "tensors" + NEXT_ROUND_TENSORS_TABLE = "next_round_tensors" + TASK_RESULT_TABLE = "task_results" + KEY_VALUE_TABLE = "key_value_store" + + def __init__(self, db_path) -> None: + """Initializes a new instance of the PersistentTensorDB class.""" + + logger.info("Initializing persistent db at %s", db_path) + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self.lock = Lock() + + cursor = self.conn.cursor() + self._create_model_tensors_table(cursor, PersistentTensorDB.TENSORS_TABLE) + self._create_model_tensors_table(cursor, PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE) + self._create_task_results_table(cursor) + self._create_key_value_store(cursor) + self.conn.commit() + + def _create_model_tensors_table(self, cursor, table_name) -> None: + """Create the database table for storing tensors if it does not exist.""" + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tensor_name TEXT NOT NULL, + origin TEXT NOT NULL, + round INTEGER NOT NULL, + report INTEGER NOT NULL, + tags TEXT, + nparray BLOB NOT NULL + ) + """ + cursor.execute(query) + + def _create_task_results_table(self, cursor) -> None: + """Creates a table for storing task results.""" + query = f""" + CREATE TABLE IF NOT EXISTS {PersistentTensorDB.TASK_RESULT_TABLE} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + collaborator_name TEXT NOT NULL, + round_number INTEGER NOT NULL, + task_name TEXT NOT NULL, + data_size INTEGER NOT NULL, + named_tensors BLOB NOT NULL + ) + """ + cursor.execute(query) + + def _create_key_value_store(self, cursor) -> None: + """Create a key-value store table for storing additional metadata.""" + query = f""" + CREATE TABLE IF NOT EXISTS {PersistentTensorDB.KEY_VALUE_TABLE} ( + key TEXT PRIMARY KEY, + value REAL NOT NULL + ) + """ + cursor.execute(query) + + def save_task_results( + self, + collaborator_name: str, + round_number: int, + task_name: str, + data_size: int, + named_tensors, + ): + """ + Saves task results to the task_results table. + + Args: + collaborator_name (str): Collaborator name. + round_number (int): Round number. + task_name (str): Task name. + data_size (int): Data size. + named_tensors(List): list of binary representation of tensors. + """ + serialized_blob = pickle.dumps(named_tensors) + + # Insert into the database + insert_query = f""" + INSERT INTO {PersistentTensorDB.TASK_RESULT_TABLE} + (collaborator_name, round_number, task_name, data_size, named_tensors) + VALUES (?, ?, ?, ?, ?); + """ + with self.lock: + cursor = self.conn.cursor() + cursor.execute( + insert_query, + (collaborator_name, round_number, task_name, data_size, serialized_blob), + ) + self.conn.commit() + + def get_task_result_by_id(self, task_result_id: int): + """ + Retrieve a task result by its ID. + + Args: + task_result_id (int): The ID of the task result to retrieve. + + Returns: + A dictionary containing the task result details, or None if not found. + """ + with self.lock: + cursor = self.conn.cursor() + query = f""" + SELECT collaborator_name, round_number, task_name, data_size, named_tensors + FROM {PersistentTensorDB.TASK_RESULT_TABLE} + WHERE id = ? + """ + cursor.execute(query, (task_result_id,)) + result = cursor.fetchone() + if result: + collaborator_name, round_number, task_name, data_size, serialized_blob = result + serialized_tensors = pickle.loads(serialized_blob) + return { + "collaborator_name": collaborator_name, + "round_number": round_number, + "task_name": task_name, + "data_size": data_size, + "named_tensors": serialized_tensors, + } + return None + + def _serialize_array(self, array: np.ndarray) -> bytes: + """Serialize a NumPy array into bytes for storing in SQLite. + note: using pickle since in some cases the array is actually a scalar. + """ + return pickle.dumps(array) + + def _deserialize_array(self, blob: bytes, dtype: Optional[np.dtype] = None) -> np.ndarray: + """Deserialize bytes from SQLite into a NumPy array.""" + try: + return pickle.loads(blob) + except Exception as e: + raise ValueError(f"Failed to deserialize array: {e}") + + def __repr__(self) -> str: + """Returns a string representation of the PersistentTensorDB.""" + with self.lock: + cursor = self.conn.cursor() + cursor.execute("SELECT tensor_name, origin, round, report, tags FROM tensors") + rows = cursor.fetchall() + return f"PersistentTensorDB contents:\n{rows}" + + def finalize_round( + self, + tensor_key_dict: Dict[TensorKey, np.ndarray], + next_round_tensor_key_dict: Dict[TensorKey, np.ndarray], + round_number: int, + best_score: float, + ): + """Finalize a training round by saving tensors, preparing for the next round, + and updating metadata in the database. + + This function performs the following steps as a single transaction: + 1. Persist the tensors of the current round into the database. + 2. Persist the tensors for the next training round into the database. + 3. Reinitialize the task results table to prepare for new tasks. + 4. Update the round number and best score in the key-value store. + + If any step fails, the transaction is rolled back to ensure data integrity. + + Args: + tensor_key_dict (Dict[TensorKey, np.ndarray]): + A dictionary mapping tensor keys to their corresponding + NumPy arrays for the current round. + next_round_tensor_key_dict (Dict[TensorKey, np.ndarray]): + A dictionary mapping tensor keys to their corresponding + NumPy arrays for the next round. + round_number (int): + The current training round number. + best_score (float): + The best score achieved during the current round. + + Raises: + RuntimeError: If an error occurs during the transaction, the transaction is rolled back, + and a RuntimeError is raised with the details of the failure. + """ + with self.lock: + try: + # Begin transaction + cursor = self.conn.cursor() + cursor.execute("BEGIN TRANSACTION") + self._persist_tensors(cursor, PersistentTensorDB.TENSORS_TABLE, tensor_key_dict) + self._persist_next_round_tensors(cursor, next_round_tensor_key_dict) + self._init_task_results_table(cursor) + self._save_round_and_best_score(cursor, round_number, best_score) + # Commit transaction + self.conn.commit() + logger.info( + f"Committed model for round {round_number}, saved {len(tensor_key_dict)}" + f" model tensors and {len(next_round_tensor_key_dict)}" + f" next round model tensors with best_score {best_score}" + ) + except Exception as e: + # Rollback transaction in case of an error + self.conn.rollback() + raise RuntimeError(f"Failed to finalize round: {e}") + + def _persist_tensors( + self, cursor, table_name, tensor_key_dict: Dict[TensorKey, np.ndarray] + ) -> None: + """Insert a dictionary of tensors into the SQLite as part of transaction""" + for tensor_key, nparray in tensor_key_dict.items(): + tensor_name, origin, fl_round, report, tags = tensor_key + serialized_array = self._serialize_array(nparray) + serialized_tags = json.dumps(tags) + query = f""" + INSERT INTO {table_name} (tensor_name, origin, round, report, tags, nparray) + VALUES (?, ?, ?, ?, ?, ?) + """ + cursor.execute( + query, + (tensor_name, origin, fl_round, int(report), serialized_tags, serialized_array), + ) + + def _persist_next_round_tensors( + self, cursor, tensor_key_dict: Dict[TensorKey, np.ndarray] + ) -> None: + """Persisting the last round next_round tensors.""" + drop_table_query = f"DROP TABLE IF EXISTS {PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE}" + cursor.execute(drop_table_query) + self._create_model_tensors_table(cursor, PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE) + self._persist_tensors(cursor, PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE, tensor_key_dict) + + def _init_task_results_table(self, cursor): + """ + Creates a table for storing task results. Drops the table first if it already exists. + """ + drop_table_query = "DROP TABLE IF EXISTS task_results" + cursor.execute(drop_table_query) + self._create_task_results_table(cursor) + + def _save_round_and_best_score(self, cursor, round_number: int, best_score: float) -> None: + """Save the round number and best score as key-value pairs in the database.""" + # Create a table with key-value structure where values can be integer or float + # Insert or update the round_number + cursor.execute( + """ + INSERT OR REPLACE INTO key_value_store (key, value) + VALUES (?, ?) + """, + ("round_number", float(round_number)), + ) + + # Insert or update the best_score + cursor.execute( + """ + INSERT OR REPLACE INTO key_value_store (key, value) + VALUES (?, ?) + """, + ("best_score", float(best_score)), + ) + + def get_tensors_table_name(self) -> str: + return PersistentTensorDB.TENSORS_TABLE + + def get_next_round_tensors_table_name(self) -> str: + return PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE + + def load_tensors(self, tensor_table) -> Dict[TensorKey, np.ndarray]: + """Load all tensors from the SQLite database and return them as a dictionary.""" + tensor_dict = {} + with self.lock: + cursor = self.conn.cursor() + query = f"SELECT tensor_name, origin, round, report, tags, nparray FROM {tensor_table}" + cursor.execute(query) + rows = cursor.fetchall() + for row in rows: + tensor_name, origin, fl_round, report, tags, nparray = row + # Deserialize the JSON string back to a Python list + deserialized_tags = tuple(json.loads(tags)) + tensor_key = TensorKey(tensor_name, origin, fl_round, report, deserialized_tags) + tensor_dict[tensor_key] = self._deserialize_array(nparray) + return tensor_dict + + def get_round_and_best_score(self) -> tuple[int, float]: + """Retrieve the round number and best score from the database.""" + with self.lock: + cursor = self.conn.cursor() + # Fetch the round_number + cursor.execute( + """ + SELECT value FROM key_value_store WHERE key = ? + """, + ("round_number",), + ) + round_number = cursor.fetchone() + if round_number is None: + round_number = -1 + else: + round_number = int(round_number[0]) # Cast to int + + # Fetch the best_score + cursor.execute( + """ + SELECT value FROM key_value_store WHERE key = ? + """, + ("best_score",), + ) + best_score = cursor.fetchone() + if best_score is None: + best_score = 0 + else: + best_score = float(best_score[0]) # Cast to float + return round_number, best_score + + def clean_up(self, remove_older_than: int = 1) -> None: + """Remove old entries from the database.""" + if remove_older_than < 0: + return + with self.lock: + cursor = self.conn.cursor() + query = f"SELECT MAX(round) FROM {PersistentTensorDB.TENSORS_TABLE}" + cursor.execute(query) + current_round = cursor.fetchone()[0] + if current_round is None: + return + cursor.execute( + """ + DELETE FROM tensors + WHERE round <= ? AND report = 0 + """, + (current_round - remove_older_than,), + ) + self.conn.commit() + + def close(self) -> None: + """Close the SQLite database connection.""" + self.conn.close() + + def is_task_table_empty(self) -> bool: + """Check if the task table is empty.""" + with self.lock: + cursor = self.conn.cursor() + cursor.execute("SELECT COUNT(*) FROM task_results") + count = cursor.fetchone()[0] + return count == 0 diff --git a/openfl/databases/tensor_db.py b/openfl/databases/tensor_db.py index 1b9d5ea132..5f9ffe78c6 100644 --- a/openfl/databases/tensor_db.py +++ b/openfl/databases/tensor_db.py @@ -151,6 +151,39 @@ def get_tensor_from_cache(self, tensor_key: TensorKey) -> Optional[np.ndarray]: return None return np.array(df["nparray"].iloc[0]) + def get_tensors_by_round_and_tags(self, fl_round: int, tags: tuple) -> dict: + """Retrieve all tensors that match the specified round and tags. + + Args: + fl_round (int): The round number to filter tensors. + tags (tuple): The tags to filter tensors. + + Returns: + dict: A dictionary where the keys are TensorKey objects and the values are numpy arrays. + """ + # Filter the DataFrame based on the round and tags + df = self.tensor_db[ + (self.tensor_db["round"] == fl_round) & (self.tensor_db["tags"] == tags) + ] + + # Check if any tensors match the criteria + if len(df) == 0: + return {} + + # Construct a dictionary mapping TensorKey to np.ndarray + tensor_dict = {} + for _, row in df.iterrows(): + tensor_key = TensorKey( + tensor_name=row["tensor_name"], + origin=row["origin"], + round_number=row["round"], + report=row["report"], + tags=row["tags"], + ) + tensor_dict[tensor_key] = np.array(row["nparray"]) + + return tensor_dict + def get_aggregated_tensor( self, tensor_key: TensorKey, From fdad4fbbbbc84105c728656b2f918157b9268bfa Mon Sep 17 00:00:00 2001 From: Snehal Das Date: Tue, 14 Jan 2025 23:17:39 +0530 Subject: [PATCH 5/9] Correcting calls to get_sleep_time() (#1266) --- openfl/component/aggregator/aggregator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 9964f331d4..6a2da516cc 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -472,7 +472,7 @@ def get_tasks(self, collaborator_name): # if no tasks, tell the collaborator to sleep if len(tasks) == 0: tasks = None - sleep_time = self._get_sleep_time() + sleep_time = Aggregator._get_sleep_time() return tasks, self.round_number, sleep_time, time_to_quit @@ -502,7 +502,7 @@ def get_tasks(self, collaborator_name): # been completed if len(tasks) == 0: tasks = None - sleep_time = self._get_sleep_time() + sleep_time = Aggregator._get_sleep_time() return tasks, self.round_number, sleep_time, time_to_quit From 337560905554e8505b9018b295af3dba900e4b74 Mon Sep 17 00:00:00 2001 From: Eran Lerer Date: Wed, 15 Jan 2025 18:38:46 +0200 Subject: [PATCH 6/9] Recover missing state after collaborator restart (#1268) Signed-off-by: Lerer, Eran --- openfl/component/collaborator/collaborator.py | 24 +++++++++++++++---- openfl/federated/task/runner_keras.py | 11 ++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index d4fd380998..c90be90f22 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -382,15 +382,17 @@ def get_data_for_tensorkey(self, tensor_key): return nparray prior_round -= 1 logger.info(f"Cannot find any prior version of tensor {tensor_name} locally...") - logger.debug( - "Unable to get tensor from local store..." "attempting to retrieve from client" - ) # Determine whether there are additional compression related # dependencies. # Typically, dependencies are only relevant to model layers tensor_dependencies = self.tensor_codec.find_dependencies( tensor_key, self.delta_updates ) + logger.debug( + "Unable to get tensor from local store..." + "attempting to retrieve from client len tensor_dependencies" + f" tensor_key {tensor_key}" + ) if len(tensor_dependencies) > 0: # Resolve dependencies # tensor_dependencies[0] corresponds to the prior version @@ -411,10 +413,10 @@ def get_data_for_tensorkey(self, tensor_key): self.tensor_db.cache_tensor({new_model_tk: nparray}) else: logger.info( - "Count not find previous model layer." + "Could not find previous model layer." "Fetching latest layer from aggregator" ) - # The original model tensor should be fetched from client + # The original model tensor should be fetched from aggregator nparray = self.get_aggregated_tensor_from_aggregator( tensor_key, require_lossless=True ) @@ -423,6 +425,18 @@ def get_data_for_tensorkey(self, tensor_key): nparray = self.get_aggregated_tensor_from_aggregator( tensor_key, require_lossless=True ) + else: + # we should try fetching the tensor from aggregator + tensor_name, origin, round_number, report, tags = tensor_key + tags = (self.collaborator_name,) + tags + tensor_key = (tensor_name, origin, round_number, report, tags) + logger.info( + "Could not find previous model layer." + f"Fetching latest layer from aggregator {tensor_key}" + ) + nparray = self.get_aggregated_tensor_from_aggregator( + tensor_key, require_lossless=True + ) else: logger.debug("Found tensor %s in local TensorDB", tensor_key) diff --git a/openfl/federated/task/runner_keras.py b/openfl/federated/task/runner_keras.py index e2dd069f72..c7803cb0eb 100644 --- a/openfl/federated/task/runner_keras.py +++ b/openfl/federated/task/runner_keras.py @@ -182,7 +182,16 @@ def train_(self, batch_generator, metrics: list = None, **kwargs): # initialization (build_model). # If metrics are added (i.e. not a subset of what was originally # defined) then the model must be recompiled. - results = self.model.get_metrics_result() + try: + results = self.model.get_metrics_result() + except ValueError: + if "batch_size" in kwargs: + batch_size = kwargs["batch_size"] + else: + batch_size = 1 + # evaluation needed before metrics can be resolved + self.model.evaluate(self.data_loader.get_valid_loader(batch_size), verbose=1) + results = self.model.get_metrics_result() # TODO if there are new metrics in the flplan that were not included # in the originally From a110bae00b3a103e9289a6e3c5faa9d8dae772b6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 16 Jan 2025 12:05:02 +0530 Subject: [PATCH 7/9] Bump pytest-asyncio from 0.25.1 to 0.25.2 (#1254) Bumps [pytest-asyncio](https://github.com/pytest-dev/pytest-asyncio) from 0.25.1 to 0.25.2. - [Release notes](https://github.com/pytest-dev/pytest-asyncio/releases) - [Commits](https://github.com/pytest-dev/pytest-asyncio/compare/v0.25.1...v0.25.2) --- updated-dependencies: - dependency-name: pytest-asyncio dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- test-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-requirements.txt b/test-requirements.txt index c17ddf1364..c047afcbdf 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -2,7 +2,7 @@ docker lxml==5.3.0 paramiko pytest==8.3.4 -pytest-asyncio==0.25.1 +pytest-asyncio==0.25.2 pytest-mock==3.14.0 defusedxml==0.7.1 matplotlib==3.10.0 From 9658c0352d072e812d48bdd6ce3636b933b3571a Mon Sep 17 00:00:00 2001 From: Karan Shah Date: Thu, 16 Jan 2025 14:25:26 +0530 Subject: [PATCH 8/9] Bump ruff, apply formatting updates (#1272) Signed-off-by: MasterSkepticista --- linters-requirements.txt | 2 +- openfl/component/aggregator/aggregator.py | 15 +++++++-------- .../assigner/random_grouped_assigner.py | 6 +++--- .../assigner/static_grouped_assigner.py | 3 +-- openfl/component/collaborator/collaborator.py | 11 ++++------- openfl/component/director/experiment.py | 2 +- openfl/component/envoy/envoy.py | 2 +- openfl/databases/tensor_db.py | 6 +++--- .../workflow/component/aggregator/aggregator.py | 4 +--- .../component/collaborator/collaborator.py | 2 +- .../workflow/federated/plan/plan.py | 6 ++---- .../workflow/interface/cli/aggregator.py | 6 +++--- .../workflow/interface/cli/collaborator.py | 10 +++++----- .../experimental/workflow/interface/cli/plan.py | 3 +-- .../workflow/interface/cli/workspace.py | 2 +- .../workflow/runtime/federated_runtime.py | 6 +++--- .../workflow/workspace_export/export.py | 6 ++---- openfl/federated/plan/plan.py | 9 +++------ openfl/federated/task/runner_gandlf.py | 2 +- openfl/federated/task/runner_pt.py | 2 +- openfl/federated/task/runner_xgb.py | 6 +++--- openfl/interface/aggregator.py | 6 +++--- openfl/interface/cli.py | 5 ++--- openfl/interface/collaborator.py | 10 +++++----- openfl/interface/envoy.py | 4 ++-- openfl/interface/interactive_api/experiment.py | 17 ++++++++--------- openfl/interface/plan.py | 4 ++-- openfl/interface/workspace.py | 7 ++----- openfl/native/fastestimator.py | 2 +- openfl/pipelines/tensor_codec.py | 9 ++++----- .../frameworks_adapters/pytorch_adapter.py | 2 +- openfl/utilities/fed_timer.py | 8 +++----- openfl/utilities/split.py | 2 +- openfl/utilities/workspace.py | 6 ++---- 34 files changed, 84 insertions(+), 109 deletions(-) diff --git a/linters-requirements.txt b/linters-requirements.txt index 735903f6b0..5c91de2fd6 100644 --- a/linters-requirements.txt +++ b/linters-requirements.txt @@ -1,2 +1,2 @@ pre-commit -ruff==0.8.1 \ No newline at end of file +ruff==0.9.1 \ No newline at end of file diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 6a2da516cc..61d253bcff 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -855,9 +855,9 @@ def _process_named_tensor(self, named_tensor, collaborator_name): tuple(named_tensor.tags), ) tensor_name, origin, round_number, report, tags = tensor_key - assert ( - "compressed" in tags or "lossy_compressed" in tags - ), f"Named tensor {tensor_key} is not compressed" + assert "compressed" in tags or "lossy_compressed" in tags, ( + f"Named tensor {tensor_key} is not compressed" + ) if "compressed" in tags: dec_tk, decompressed_nparray = self.tensor_codec.decompress( tensor_key, @@ -1039,9 +1039,9 @@ def _compute_validation_related_task_metrics(self, task_name) -> dict: metrics = {} for tensor_key in self.collaborator_tasks_results[task_key]: tensor_name, origin, round_number, report, tags = tensor_key - assert ( - collaborators_for_task[0] in tags - ), f"Tensor {tensor_key} in task {task_name} has not been processed correctly" + assert collaborators_for_task[0] in tags, ( + f"Tensor {tensor_key} in task {task_name} has not been processed correctly" + ) # Strip the collaborator label, and lookup aggregated tensor new_tags = change_tags(tags, remove_field=collaborators_for_task[0]) agg_tensor_key = TensorKey(tensor_name, origin, round_number, report, new_tags) @@ -1073,8 +1073,7 @@ def _compute_validation_related_task_metrics(self, task_name) -> dict: # Compare the accuracy of the model, potentially save it if self.best_model_score is None or self.best_model_score < agg_results: logger.info( - f"Round {round_number}: saved the best " - f"model with score {agg_results:f}" + f"Round {round_number}: saved the best model with score {agg_results:f}" ) self.best_model_score = agg_results self._save_model(round_number, self.best_state_path) diff --git a/openfl/component/assigner/random_grouped_assigner.py b/openfl/component/assigner/random_grouped_assigner.py index 7a1e20123c..dea00022a4 100644 --- a/openfl/component/assigner/random_grouped_assigner.py +++ b/openfl/component/assigner/random_grouped_assigner.py @@ -56,9 +56,9 @@ def define_task_assignments(self): Returns: None """ - assert ( - np.abs(1.0 - np.sum([group["percentage"] for group in self.task_groups])) < 0.01 - ), "Task group percentages must sum to 100%" + assert np.abs(1.0 - np.sum([group["percentage"] for group in self.task_groups])) < 0.01, ( + "Task group percentages must sum to 100%" + ) # Start by finding all of the tasks in all specified groups self.all_tasks_in_groups = list( diff --git a/openfl/component/assigner/static_grouped_assigner.py b/openfl/component/assigner/static_grouped_assigner.py index 5ccff16c67..fcb5a59034 100644 --- a/openfl/component/assigner/static_grouped_assigner.py +++ b/openfl/component/assigner/static_grouped_assigner.py @@ -62,8 +62,7 @@ def define_task_assignments(self): unique_authorized_cols = set(self.authorized_cols) assert cols_amount == authorized_cols_amount and unique_cols == unique_authorized_cols, ( - f"Collaborators in each group must be distinct: " - f"{unique_cols}, {unique_authorized_cols}" + f"Collaborators in each group must be distinct: {unique_cols}, {unique_authorized_cols}" ) # Start by finding all of the tasks in all specified groups diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index c90be90f22..4a5a78329a 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -141,7 +141,7 @@ def __init__( if hasattr(DevicePolicy, device_assignment_policy): self.device_assignment_policy = DevicePolicy[device_assignment_policy] else: - logger.error("Unknown device_assignment_policy: " f"{device_assignment_policy.name}.") + logger.error(f"Unknown device_assignment_policy: {device_assignment_policy.name}.") raise NotImplementedError( f"Unknown device_assignment_policy: {device_assignment_policy}." ) @@ -216,8 +216,7 @@ def run_simulation(self): for task in tasks: self.do_task(task, round_number) logger.info( - f"All tasks completed on {self.collaborator_name} " - f"for round {round_number}..." + f"All tasks completed on {self.collaborator_name} for round {round_number}..." ) break @@ -376,8 +375,7 @@ def get_data_for_tensorkey(self, tensor_key): ) if nparray is not None: logger.debug( - f"Found tensor {tensor_name} in local TensorDB " - f"for round {prior_round}" + f"Found tensor {tensor_name} in local TensorDB for round {prior_round}" ) return nparray prior_round -= 1 @@ -413,8 +411,7 @@ def get_data_for_tensorkey(self, tensor_key): self.tensor_db.cache_tensor({new_model_tk: nparray}) else: logger.info( - "Could not find previous model layer." - "Fetching latest layer from aggregator" + "Could not find previous model layer.Fetching latest layer from aggregator" ) # The original model tensor should be fetched from aggregator nparray = self.get_aggregated_tensor_from_aggregator( diff --git a/openfl/component/director/experiment.py b/openfl/component/director/experiment.py index e2b56cf877..2e40f43247 100644 --- a/openfl/component/director/experiment.py +++ b/openfl/component/director/experiment.py @@ -106,7 +106,7 @@ async def start( """ self.status = Status.IN_PROGRESS try: - logger.info(f"New experiment {self.name} for " f"collaborators {self.collaborators}") + logger.info(f"New experiment {self.name} for collaborators {self.collaborators}") with ExperimentWorkspace( experiment_name=self.name, diff --git a/openfl/component/envoy/envoy.py b/openfl/component/envoy/envoy.py index c37ba29a7d..f98b7e21ba 100644 --- a/openfl/component/envoy/envoy.py +++ b/openfl/component/envoy/envoy.py @@ -237,7 +237,7 @@ def _get_cuda_device_info(self): ) except Exception as exc: logger.exception( - f"Failed to get cuda device info: {exc}. " f"Check your cuda device monitor plugin." + f"Failed to get cuda device info: {exc}. Check your cuda device monitor plugin." ) return cuda_devices_info diff --git a/openfl/databases/tensor_db.py b/openfl/databases/tensor_db.py index 5f9ffe78c6..f64f4d783d 100644 --- a/openfl/databases/tensor_db.py +++ b/openfl/databases/tensor_db.py @@ -213,9 +213,9 @@ def get_aggregated_tensor( None: if not all values are present. """ if len(collaborator_weight_dict) != 0: - assert ( - np.abs(1.0 - sum(collaborator_weight_dict.values())) < 0.01 - ), f"Collaborator weights do not sum to 1.0: {collaborator_weight_dict}" + assert np.abs(1.0 - sum(collaborator_weight_dict.values())) < 0.01, ( + f"Collaborator weights do not sum to 1.0: {collaborator_weight_dict}" + ) collaborator_names = collaborator_weight_dict.keys() agg_tensor_dict = {} diff --git a/openfl/experimental/workflow/component/aggregator/aggregator.py b/openfl/experimental/workflow/component/aggregator/aggregator.py index 568c3246fa..1e818b528a 100644 --- a/openfl/experimental/workflow/component/aggregator/aggregator.py +++ b/openfl/experimental/workflow/component/aggregator/aggregator.py @@ -461,9 +461,7 @@ def send_task_results( f" for the wrong round: {round_number}. Ignoring..." ) else: - logger.info( - f"Collaborator {collab_name} sent task results" f" for round {round_number}." - ) + logger.info(f"Collaborator {collab_name} sent task results for round {round_number}.") # Unpickle the clone (FLSpec object) clone = dill.loads(clone_bytes) # Update the clone in clones_dict dictionary diff --git a/openfl/experimental/workflow/component/collaborator/collaborator.py b/openfl/experimental/workflow/component/collaborator/collaborator.py index 0cbb1de069..b5d112c8da 100644 --- a/openfl/experimental/workflow/component/collaborator/collaborator.py +++ b/openfl/experimental/workflow/component/collaborator/collaborator.py @@ -161,7 +161,7 @@ def send_task_results(self, next_step: str, clone: Any) -> None: None """ self.logger.info( - f"Round {self.round_number}," f" collaborator {self.name} is sending results..." + f"Round {self.round_number}, collaborator {self.name} is sending results..." ) self.client.send_task_results(self.name, self.round_number, next_step, dill.dumps(clone)) diff --git a/openfl/experimental/workflow/federated/plan/plan.py b/openfl/experimental/workflow/federated/plan/plan.py index 5e81a91a9d..dc03c2d0a9 100644 --- a/openfl/experimental/workflow/federated/plan/plan.py +++ b/openfl/experimental/workflow/federated/plan/plan.py @@ -169,8 +169,7 @@ def parse( except Exception: Plan.logger.exception( - f"Parsing Federated Learning Plan : " - f"[red]FAILURE[/] : [blue]{plan_config_path}[/].", + f"Parsing Federated Learning Plan : [red]FAILURE[/] : [blue]{plan_config_path}[/].", extra={"markup": True}, ) raise @@ -235,8 +234,7 @@ def import_(template) -> object: class_name = splitext(template)[1].strip(".") module_path = splitext(template)[0] Plan.logger.info( - f"Importing [red]🡆[/] Object [red]{class_name}[/] " - f"from [red]{module_path}[/] Module.", + f"Importing [red]🡆[/] Object [red]{class_name}[/] from [red]{module_path}[/] Module.", extra={"markup": True}, ) module = import_module(module_path) diff --git a/openfl/experimental/workflow/interface/cli/aggregator.py b/openfl/experimental/workflow/interface/cli/aggregator.py index b51b78f480..5db5d30212 100644 --- a/openfl/experimental/workflow/interface/cli/aggregator.py +++ b/openfl/experimental/workflow/interface/cli/aggregator.py @@ -99,7 +99,7 @@ def start_(plan, authorized_cols, secure): "--fqdn", required=False, type=click_types.FQDN, - help=f"The fully qualified domain name of" f" aggregator node [{getfqdn_env()}]", + help=f"The fully qualified domain name of aggregator node [{getfqdn_env()}]", default=getfqdn_env(), ) def _generate_cert_request(fqdn): @@ -118,8 +118,8 @@ def generate_cert_request(fqdn): echo( f"Creating AGGREGATOR certificate key pair with following settings: " - f'CN={style(common_name, fg="red")},' - f' SAN={style(subject_alternative_name, fg="red")}' + f"CN={style(common_name, fg='red')}," + f" SAN={style(subject_alternative_name, fg='red')}" ) server_private_key, server_csr = generate_csr(common_name, server=True) diff --git a/openfl/experimental/workflow/interface/cli/collaborator.py b/openfl/experimental/workflow/interface/cli/collaborator.py index fe0cb32940..57e282faa4 100644 --- a/openfl/experimental/workflow/interface/cli/collaborator.py +++ b/openfl/experimental/workflow/interface/cli/collaborator.py @@ -126,8 +126,8 @@ def generate_cert_request(collaborator_name, silent, skip_package): echo( f"Creating COLLABORATOR certificate key pair with following settings: " - f'CN={style(common_name, fg="red")},' - f' SAN={style(subject_alternative_name, fg="red")}' + f"CN={style(common_name, fg='red')}," + f" SAN={style(subject_alternative_name, fg='red')}" ) client_private_key, client_csr = generate_csr(common_name, server=False) @@ -164,7 +164,7 @@ def generate_cert_request(collaborator_name, silent, skip_package): make_archive(archive_name, archive_type, tmp_dir) rmtree(tmp_dir) - echo(f"Archive {archive_file_name} with certificate signing" f" request created") + echo(f"Archive {archive_file_name} with certificate signing request created") echo( "This file should be sent to the certificate authority" " (typically hosted by the aggregator) for signing" @@ -233,14 +233,14 @@ def register_collaborator(file_name): "-r", "--request-pkg", type=ClickPath(exists=True), - help="The archive containing the certificate signing" " request (*.zip) for a collaborator", + help="The archive containing the certificate signing request (*.zip) for a collaborator", ) @option( "-i", "--import", "import_", type=ClickPath(exists=True), - help="Import the archive containing the collaborator's" " certificate (signed by the CA)", + help="Import the archive containing the collaborator's certificate (signed by the CA)", ) def certify_(collaborator_name, silent, request_pkg, import_): """Certify the collaborator.""" diff --git a/openfl/experimental/workflow/interface/cli/plan.py b/openfl/experimental/workflow/interface/cli/plan.py index 9f40db67de..5c30955398 100644 --- a/openfl/experimental/workflow/interface/cli/plan.py +++ b/openfl/experimental/workflow/interface/cli/plan.py @@ -85,8 +85,7 @@ def initialize(context, plan_config, cols_config, data_config, aggregator_addres plan_origin["network"]["settings"]["agg_addr"] = aggregator_address or getfqdn_env() logger.warn( - f"Patching Aggregator Addr in Plan" - f" 🠆 {plan_origin['network']['settings']['agg_addr']}" + f"Patching Aggregator Addr in Plan 🠆 {plan_origin['network']['settings']['agg_addr']}" ) Plan.dump(plan_config, plan_origin) diff --git a/openfl/experimental/workflow/interface/cli/workspace.py b/openfl/experimental/workflow/interface/cli/workspace.py index 1d65d0dde0..fb38786b46 100644 --- a/openfl/experimental/workflow/interface/cli/workspace.py +++ b/openfl/experimental/workflow/interface/cli/workspace.py @@ -258,7 +258,7 @@ def export_(pip_install_options: Tuple[str]): if confirm("Create a default '.workspace' file?"): copy2(WORKSPACE / "workspace" / ".workspace", tmp_dir) else: - echo("To proceed, you must have a '.workspace' " "file in the current directory.") + echo("To proceed, you must have a '.workspace' file in the current directory.") raise # Create Zip archive of directory diff --git a/openfl/experimental/workflow/runtime/federated_runtime.py b/openfl/experimental/workflow/runtime/federated_runtime.py index 604f13ce88..b484860a8d 100644 --- a/openfl/experimental/workflow/runtime/federated_runtime.py +++ b/openfl/experimental/workflow/runtime/federated_runtime.py @@ -232,9 +232,9 @@ def stream_experiment_stdout(self, experiment_name) -> None: print(f"Getting standard output for experiment: {experiment_name}...") for stdout_message_dict in self._dir_client.stream_experiment_stdout(experiment_name): print( - f'Origin: {stdout_message_dict["stdout_origin"]}, ' - f'Task: {stdout_message_dict["task_name"]}' - f'\n{stdout_message_dict["stdout_value"]}' + f"Origin: {stdout_message_dict['stdout_origin']}, " + f"Task: {stdout_message_dict['task_name']}" + f"\n{stdout_message_dict['stdout_value']}" ) def __repr__(self) -> str: diff --git a/openfl/experimental/workflow/workspace_export/export.py b/openfl/experimental/workflow/workspace_export/export.py index 1dd9e847c4..809be19bd5 100644 --- a/openfl/experimental/workflow/workspace_export/export.py +++ b/openfl/experimental/workflow/workspace_export/export.py @@ -519,8 +519,7 @@ def generate_data_yaml(self) -> None: # noqa: C901 runtime_created = True if not runtime_collab_created: f.write( - f"\nruntime_collaborators = " - f"{runtime_name}._LocalRuntime__collaborators" + f"\nruntime_collaborators = {runtime_name}._LocalRuntime__collaborators" ) runtime_collab_created = True f.write( @@ -528,8 +527,7 @@ def generate_data_yaml(self) -> None: # noqa: C901 f"runtime_collaborators['{collab_name}'].private_attributes" ) data[collab_name] = { - "private_attributes": f"src." - f"{self.script_name}.{collab_name}_private_attributes" + "private_attributes": f"src.{self.script_name}.{collab_name}_private_attributes" } self.__write_yaml(data_yaml, data) diff --git a/openfl/federated/plan/plan.py b/openfl/federated/plan/plan.py index 69ff36c19c..13d446e145 100644 --- a/openfl/federated/plan/plan.py +++ b/openfl/federated/plan/plan.py @@ -163,8 +163,7 @@ def parse( # noqa: C901 if gandlf_config_path is not None: Plan.logger.info( - f"Importing GaNDLF Config into plan " - f"from file [red]{gandlf_config_path}[/].", + f"Importing GaNDLF Config into plan from file [red]{gandlf_config_path}[/].", extra={"markup": True}, ) @@ -201,8 +200,7 @@ def parse( # noqa: C901 except Exception: Plan.logger.exception( - f"Parsing Federated Learning Plan : " - f"[red]FAILURE[/] : [blue]{plan_config_path}[/].", + f"Parsing Federated Learning Plan : [red]FAILURE[/] : [blue]{plan_config_path}[/].", extra={"markup": True}, ) raise @@ -248,8 +246,7 @@ def import_(template): class_name = splitext(template)[1].strip(".") module_path = splitext(template)[0] Plan.logger.info( - f"Importing [red]🡆[/] Object [red]{class_name}[/] " - f"from [red]{module_path}[/] Module.", + f"Importing [red]🡆[/] Object [red]{class_name}[/] from [red]{module_path}[/] Module.", extra={"markup": True}, ) module = import_module(module_path) diff --git a/openfl/federated/task/runner_gandlf.py b/openfl/federated/task/runner_gandlf.py index b352d01439..e8662ce1da 100644 --- a/openfl/federated/task/runner_gandlf.py +++ b/openfl/federated/task/runner_gandlf.py @@ -738,7 +738,7 @@ def to_cpu_numpy(state): # When restoring, we currently assume all values are tensors. if not pt.is_tensor(v): raise ValueError( - "We do not currently support non-tensors " "coming from model.state_dict()" + "We do not currently support non-tensors coming from model.state_dict()" ) # get as a numpy array, making sure is on cpu state[k] = v.cpu().numpy() diff --git a/openfl/federated/task/runner_pt.py b/openfl/federated/task/runner_pt.py index 30b56a4120..9b1a8021f4 100644 --- a/openfl/federated/task/runner_pt.py +++ b/openfl/federated/task/runner_pt.py @@ -714,7 +714,7 @@ def to_cpu_numpy(state): # When restoring, we currently assume all values are tensors. if not torch.is_tensor(v): raise ValueError( - "We do not currently support non-tensors " "coming from model.state_dict()" + "We do not currently support non-tensors coming from model.state_dict()" ) # get as a numpy array, making sure is on cpu state[k] = v.cpu().numpy() diff --git a/openfl/federated/task/runner_xgb.py b/openfl/federated/task/runner_xgb.py index a5f5101b2e..0b6de32d18 100644 --- a/openfl/federated/task/runner_xgb.py +++ b/openfl/federated/task/runner_xgb.py @@ -29,9 +29,9 @@ def check_precision_loss(logger, converted_data, original_data): reconstructed_json = reconstructed_bytes.decode("utf-8") reconstructed_data = json.loads(reconstructed_json) - assert type(original_data) is type( - reconstructed_data - ), "Reconstructed datatype does not match original." + assert type(original_data) is type(reconstructed_data), ( + "Reconstructed datatype does not match original." + ) # Compare the original and reconstructed data if original_data != reconstructed_data: diff --git a/openfl/interface/aggregator.py b/openfl/interface/aggregator.py index 80ce56e32e..930dcd43be 100644 --- a/openfl/interface/aggregator.py +++ b/openfl/interface/aggregator.py @@ -77,7 +77,7 @@ def start_(plan, authorized_cols): "--fqdn", required=False, type=click_types.FQDN, - help=f"The fully qualified domain name of" f" aggregator node [{getfqdn_env()}]", + help=f"The fully qualified domain name of aggregator node [{getfqdn_env()}]", default=getfqdn_env(), ) def _generate_cert_request(fqdn): @@ -101,8 +101,8 @@ def generate_cert_request(fqdn): echo( f"Creating AGGREGATOR certificate key pair with following settings: " - f'CN={style(common_name, fg="red")},' - f' SAN={style(subject_alternative_name, fg="red")}' + f"CN={style(common_name, fg='red')}," + f" SAN={style(subject_alternative_name, fg='red')}" ) server_private_key, server_csr = generate_csr(common_name, server=True) diff --git a/openfl/interface/cli.py b/openfl/interface/cli.py index 314cfebf3a..75cbbac803 100755 --- a/openfl/interface/cli.py +++ b/openfl/interface/cli.py @@ -148,13 +148,12 @@ def format_help(self, ctx, formatter): help_str = cmd.get_short_help_str() if level == 0: formatter.write( - f'\n{style(name, fg="blue", bold=True):<30}' - f" {style(help_str, bold=True)}" + "\n" + f"\n{style(name, fg='blue', bold=True):<30} {style(help_str, bold=True)}" + "\n" ) formatter.write("─" * 80 + "\n") if level == 1: formatter.write( - f' {style("*", fg="green")}' f' {style(name, fg="cyan"):<21} {help_str}' + "\n" + f" {style('*', fg='green')} {style(name, fg='cyan'):<21} {help_str}" + "\n" ) diff --git a/openfl/interface/collaborator.py b/openfl/interface/collaborator.py index 862ae4db84..81b76b68f8 100644 --- a/openfl/interface/collaborator.py +++ b/openfl/interface/collaborator.py @@ -208,8 +208,8 @@ def generate_cert_request(collaborator_name, silent, skip_package): echo( f"Creating COLLABORATOR certificate key pair with following settings: " - f'CN={style(common_name, fg="red")},' - f' SAN={style(subject_alternative_name, fg="red")}' + f"CN={style(common_name, fg='red')}," + f" SAN={style(subject_alternative_name, fg='red')}" ) client_private_key, client_csr = generate_csr(common_name, server=False) @@ -246,7 +246,7 @@ def generate_cert_request(collaborator_name, silent, skip_package): make_archive(archive_name, archive_type, tmp_dir) rmtree(tmp_dir) - echo(f"Archive {archive_file_name} with certificate signing" f" request created") + echo(f"Archive {archive_file_name} with certificate signing request created") echo( "This file should be sent to the certificate authority" " (typically hosted by the aggregator) for signing" @@ -322,14 +322,14 @@ def register_collaborator(file_name): "-r", "--request-pkg", type=ClickPath(exists=True), - help="The archive containing the certificate signing" " request (*.zip) for a collaborator", + help="The archive containing the certificate signing request (*.zip) for a collaborator", ) @option( "-i", "--import", "import_", type=ClickPath(exists=True), - help="Import the archive containing the collaborator's" " certificate (signed by the CA)", + help="Import the archive containing the collaborator's certificate (signed by the CA)", ) def certify_(collaborator_name, silent, request_pkg, import_): """Certify the collaborator.""" diff --git a/openfl/interface/envoy.py b/openfl/interface/envoy.py index 4e35391bba..0fd5eb0cd7 100644 --- a/openfl/interface/envoy.py +++ b/openfl/interface/envoy.py @@ -147,7 +147,7 @@ def start_( for plugin_name, plugin_settings in optional_plugins_section.items(): template = plugin_settings.get("template") if not template: - raise Exception("You should put a template" f"for plugin {plugin_name}") + raise Exception(f"You should put a templatefor plugin {plugin_name}") module_path, _, class_name = template.rpartition(".") plugin_params = plugin_settings.get("params", {}) @@ -221,7 +221,7 @@ def shard_descriptor_from_config(shard_config: dict): """ template = shard_config.get("template") if not template: - raise Exception("You should define a shard " "descriptor template in the envoy config") + raise Exception("You should define a shard descriptor template in the envoy config") class_name = template.split(".")[-1] module_path = ".".join(template.split(".")[:-1]) params = shard_config.get("params", {}) diff --git a/openfl/interface/interactive_api/experiment.py b/openfl/interface/interactive_api/experiment.py index d8096df8ad..cc3ffae3b2 100644 --- a/openfl/interface/interactive_api/experiment.py +++ b/openfl/interface/interactive_api/experiment.py @@ -115,7 +115,7 @@ def _assert_experiment_submitted(self): """Assure experiment is sent to director and accepted.""" if not self.experiment_submitted: self.logger.error("The experiment was not submitted to a Director service.") - self.logger.error("Report the experiment first: " "use the Experiment.start() method.") + self.logger.error("Report the experiment first: use the Experiment.start() method.") return False return True @@ -192,10 +192,10 @@ def stream_metrics(self, tensorboard_logs: bool = True) -> None: return for metric_message_dict in self.federation.dir_client.stream_metrics(self.experiment_name): self.logger.metric( - f'Round {metric_message_dict["round"]}, ' - f'collaborator {metric_message_dict["metric_origin"]} ' - f'{metric_message_dict["task_name"]} result ' - f'{metric_message_dict["metric_name"]}:\t{metric_message_dict["metric_value"]:f}' + f"Round {metric_message_dict['round']}, " + f"collaborator {metric_message_dict['metric_origin']} " + f"{metric_message_dict['task_name']} result " + f"{metric_message_dict['metric_name']}:\t{metric_message_dict['metric_value']:f}" ) if tensorboard_logs: @@ -209,7 +209,7 @@ def write_tensorboard_metric(self, metric: dict) -> None: self.summary_writer = SummaryWriter(f"./logs/{self.experiment_name}", flush_secs=5) self.summary_writer.add_scalar( - f'{metric["metric_origin"]}/{metric["task_name"]}/{metric["metric_name"]}', + f"{metric['metric_origin']}/{metric['task_name']}/{metric['metric_name']}", metric["metric_value"], metric["round"], ) @@ -393,8 +393,7 @@ def define_task_assigner(self, task_keeper, rounds_to_train): # noqa: C901 if not is_train_task_exist and rounds_to_train != 1: # Since we have only validation tasks, we do not have to train it multiple times raise Exception( - "Variable rounds_to_train must be equal 1, " - "because only validation tasks were given" + "Variable rounds_to_train must be equal 1, because only validation tasks were given" ) if is_train_task_exist and self.is_validate_task_exist: @@ -816,7 +815,7 @@ def set_aggregation_function(self, aggregation_function: AggregationFunction): def decorator_with_args(training_method): if not isinstance(aggregation_function, AggregationFunction): raise Exception( - "aggregation_function must implement " "AggregationFunction interface." + "aggregation_function must implement AggregationFunction interface." ) self.aggregation_functions[training_method.__name__] = aggregation_function return training_method diff --git a/openfl/interface/plan.py b/openfl/interface/plan.py index 93d09c02a5..503693e581 100644 --- a/openfl/interface/plan.py +++ b/openfl/interface/plan.py @@ -169,7 +169,7 @@ def initialize( # This is needed to bypass data being locally available if input_shape is not None: logger.info( - "Attempting to generate initial model weights with" f" custom input shape {input_shape}" + f"Attempting to generate initial model weights with custom input shape {input_shape}" ) data_loader = get_dataloader(plan, prefer_minimal=True, input_shape=input_shape) @@ -237,7 +237,7 @@ def freeze_plan(plan_config): init_state_path = plan.config["aggregator"]["settings"]["init_state_path"] if not Path(init_state_path).exists(): - logger.info("Plan has not been initialized! Run 'fx plan" " initialize' before proceeding") + logger.info("Plan has not been initialized! Run 'fx plan initialize' before proceeding") return Plan.dump(Path(plan_config), plan.config, freeze=True) diff --git a/openfl/interface/workspace.py b/openfl/interface/workspace.py index d3cb1713c5..dce6eb9dae 100644 --- a/openfl/interface/workspace.py +++ b/openfl/interface/workspace.py @@ -363,7 +363,7 @@ def export_() -> str: if not os.path.isfile(_ws_identifier_file): openfl_ws_identifier_file = os.path.join(WORKSPACE, "workspace", _ws_identifier_file) logging.warning( - f"`{_ws_identifier_file}` is missing, " f"copying {openfl_ws_identifier_file} as-is." + f"`{_ws_identifier_file}` is missing, copying {openfl_ws_identifier_file} as-is." ) shutil.copy2(openfl_ws_identifier_file, tmp_dir) shutil.copy2(_ws_identifier_file, tmp_dir) @@ -428,10 +428,7 @@ def dockerize_(context, save: bool, rebuild: bool, enclave_key: str, revision: s # Build OpenFL base image. logging.info("Building OpenFL Base image") base_image_build_cmd = ( - "DOCKER_BUILDKIT=1 docker build {options} " - "-t {image_name} " - "-f {dockerfile} " - "{build_context}" + "DOCKER_BUILDKIT=1 docker build {options} -t {image_name} -f {dockerfile} {build_context}" ).format( options=options, image_name="openfl", diff --git a/openfl/native/fastestimator.py b/openfl/native/fastestimator.py index a95db185d0..d91b1ebc01 100644 --- a/openfl/native/fastestimator.py +++ b/openfl/native/fastestimator.py @@ -82,7 +82,7 @@ def fit(self): # noqa: C901 tensor_dict=tensor_dict, round_number=0, tensor_pipe=tensor_pipe ) - self.logger.info(f"Creating Initial Weights File" f" 🠆 {init_state_path}") + self.logger.info(f"Creating Initial Weights File 🠆 {init_state_path}") utils.dump_proto(model_proto=model_snap, fpath=init_state_path) diff --git a/openfl/pipelines/tensor_codec.py b/openfl/pipelines/tensor_codec.py index be08bbbc1c..15edde0965 100644 --- a/openfl/pipelines/tensor_codec.py +++ b/openfl/pipelines/tensor_codec.py @@ -114,9 +114,9 @@ def decompress( tensor_name, origin, round_number, report, tags = tensor_key assert len(transformer_metadata) > 0, "metadata must be included for decompression" - assert ("compressed" in tags) or ( - "lossy_compressed" in tags - ), "Cannot decompress an uncompressed tensor" + assert ("compressed" in tags) or ("lossy_compressed" in tags), ( + "Cannot decompress an uncompressed tensor" + ) if require_lossless: assert "compressed" in tags, "Cannot losslessly decompress lossy tensor" @@ -169,8 +169,7 @@ def generate_delta(tensor_key, nparray, base_model_nparray): f"layer shape of ({base_model_nparray.shape})" ) assert "model" not in tags, ( - "The tensorkey should be provided " - "from the layer with new weights, not the base model" + "The tensorkey should be provided from the layer with new weights, not the base model" ) new_tags = change_tags(tags, add_field="delta") delta_tensor_key = TensorKey(tensor_name, origin, round_number, report, new_tags) diff --git a/openfl/plugins/frameworks_adapters/pytorch_adapter.py b/openfl/plugins/frameworks_adapters/pytorch_adapter.py index 60a6db54f4..21ff26c7fb 100644 --- a/openfl/plugins/frameworks_adapters/pytorch_adapter.py +++ b/openfl/plugins/frameworks_adapters/pytorch_adapter.py @@ -271,7 +271,7 @@ def to_cpu_numpy(state): # When restoring, we currently assume all values are tensors. if not pt.is_tensor(v): raise ValueError( - "We do not currently support non-tensors " "coming from model.state_dict()" + "We do not currently support non-tensors coming from model.state_dict()" ) # get as a numpy array, making sure is on cpu state[k] = v.cpu().numpy() diff --git a/openfl/utilities/fed_timer.py b/openfl/utilities/fed_timer.py index 3d8770ec24..320fff5a02 100644 --- a/openfl/utilities/fed_timer.py +++ b/openfl/utilities/fed_timer.py @@ -116,8 +116,7 @@ async def async_execute(self): await asyncio.wait_for(task, timeout=self._max_timeout) except asyncio.TimeoutError: raise asyncio.TimeoutError( - f"Timeout after {self._max_timeout} second(s), " - f"Exception method: ({self._fn_name})" + f"Timeout after {self._max_timeout} second(s), Exception method: ({self._fn_name})" ) except Exception: raise Exception(f"Generic Exception: {self._fn_name}") @@ -151,8 +150,7 @@ def sync_execute(self): # exception. if task.is_alive(): raise TimeoutError( - f"Timeout after {self._max_timeout} second(s), " - f"Exception method: ({self._fn_name})" + f"Timeout after {self._max_timeout} second(s), Exception method: ({self._fn_name})" ) return task.result() @@ -276,6 +274,6 @@ def wrapper(self, func, *args, **kwargs): logger.info(f"({self.task._fn_name}) Elapsed Time: {time.perf_counter() - start}") except Exception as e: logger.exception( - f"An exception of type {type(e).__name__} occurred. " f"Arguments:\n{e.args[0]!r}" + f"An exception of type {type(e).__name__} occurred. Arguments:\n{e.args[0]!r}" ) os._exit(status=os.EX_TEMPFAIL) diff --git a/openfl/utilities/split.py b/openfl/utilities/split.py index ee2e4654ac..be66934fe8 100644 --- a/openfl/utilities/split.py +++ b/openfl/utilities/split.py @@ -92,7 +92,7 @@ def split_tensor_dict_for_holdouts( holdout_tensors[tensor_name] = tensors_to_send.pop(tensor_name) except KeyError: logger.warning( - f"tried to remove tensor: {tensor_name} not present " f"in the tensor dict" + f"tried to remove tensor: {tensor_name} not present in the tensor dict" ) continue diff --git a/openfl/utilities/workspace.py b/openfl/utilities/workspace.py index e19b03cf16..15e7a3a339 100644 --- a/openfl/utilities/workspace.py +++ b/openfl/utilities/workspace.py @@ -116,12 +116,10 @@ def __exit__(self, exc_type, exc_value, traceback): if self.remove_archive: logger.debug( - "Exiting from the workspace context manager" - f" for {self.experiment_name} experiment" + f"Exiting from the workspace context manager for {self.experiment_name} experiment" ) logger.debug( - "Exiting from the workspace context manager" - f" for {self.experiment_name} experiment" + f"Exiting from the workspace context manager for {self.experiment_name} experiment" ) logger.debug("Archive still exists: %s", self.data_file_path.exists()) self.data_file_path.unlink(missing_ok=False) From b8e2c703a4d7e5d0c5fcbbee70c64cb7d373be20 Mon Sep 17 00:00:00 2001 From: Noopur Date: Thu, 16 Jan 2025 14:59:44 +0530 Subject: [PATCH 9/9] FederatedRuntime Workflow for CI Pipeline - 301 Watermarking notebook run (#1267) * FederatedRuntime Workflow for CI Pipeline - 301 Watermarking notebook run Signed-off-by: noopur * Display output on screen Signed-off-by: noopur * 5 Rounds Signed-off-by: noopur * Removed extra ) bracket Signed-off-by: noopur * Timeout of 30 min due to 5 rounds Signed-off-by: noopur * End the loop after all rounds Signed-off-by: noopur * Review comments incor Signed-off-by: noopur * Retry the envoy fetch Signed-off-by: noopur * Added invalid code just to verify negative scenario Signed-off-by: noopur * Added invalid code just to verify negative scenario Signed-off-by: noopur * 20m job timeout Signed-off-by: noopur * Revert invalid code and stdout notebook run Signed-off-by: noopur * Use markdown with stdout Signed-off-by: noopur * Induced error for testing Signed-off-by: noopur * Reverted error, added 10s sleep in github fetch logic Signed-off-by: noopur * Code format check Signed-off-by: noopur * pytest for the notebook Signed-off-by: noopur * Pip install ipython ipykernel Signed-off-by: noopur * Minor changes Signed-off-by: noopur * Test summary step corrected for wf_functional_e2e workflow Signed-off-by: noopur * 3 rounds instead of 5 Signed-off-by: noopur * Job name change Signed-off-by: noopur * Review comments incorp Signed-off-by: noopur * Increased timeout to 30m for CI pipeline jobs Signed-off-by: noopur * Increased timeout to 30m for CI pipeline jobs Signed-off-by: noopur --------- Signed-off-by: noopur --- .github/actions/tr_post_test_run/action.yml | 2 +- .github/workflows/federated_runtime.yml | 66 ++++++ .github/workflows/straggler-handling.yml | 2 +- .github/workflows/task_runner_basic_e2e.yml | 8 +- .github/workflows/taskrunner.yml | 4 +- .github/workflows/wf_functional_e2e.yml | 6 +- .../workflow_interface_101_mnist.yml | 2 +- .../Bangalore/Bangalore_config.yaml | 0 .../Bangalore/private_attributes.py | 0 .../Bangalore/requirements.txt | 0 .../Bangalore/start_envoy.sh | 0 .../Chandler/Chandler_config.yaml | 0 .../Chandler/private_attributes.py | 0 .../Chandler/requirements.txt | 0 .../Chandler/start_envoy.sh | 0 .../README.md | 0 .../director/director_config.yaml | 0 .../director/private_attributes.py | 0 .../director/start_director.sh | 0 .../workspace/MNIST_Watermarking.ipynb | 33 ++- .../workflow/runtime/federated_runtime.py | 13 +- test-requirements.txt | 3 +- tests/end_to_end/pytest.ini | 1 + .../test_suites/wf_federated_runtime_tests.py | 94 +++++++++ tests/end_to_end/utils/exceptions.py | 15 ++ tests/end_to_end/utils/federation_helper.py | 188 +++++++++++++++++- tests/end_to_end/utils/summary_helper.py | 71 ++++++- tests/end_to_end/utils/wf_common_fixtures.py | 11 - 28 files changed, 479 insertions(+), 40 deletions(-) create mode 100644 .github/workflows/federated_runtime.yml rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Bangalore/Bangalore_config.yaml (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Bangalore/private_attributes.py (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Bangalore/requirements.txt (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Bangalore/start_envoy.sh (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Chandler/Chandler_config.yaml (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Chandler/private_attributes.py (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Chandler/requirements.txt (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/Chandler/start_envoy.sh (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/README.md (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/director/director_config.yaml (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/director/private_attributes.py (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/director/start_director.sh (100%) rename openfl-tutorials/experimental/workflow/FederatedRuntime/{301_MNIST_Watermaking => 301_MNIST_Watermarking}/workspace/MNIST_Watermarking.ipynb (95%) create mode 100644 tests/end_to_end/test_suites/wf_federated_runtime_tests.py diff --git a/.github/actions/tr_post_test_run/action.yml b/.github/actions/tr_post_test_run/action.yml index 04444af4d9..f5c8849283 100644 --- a/.github/actions/tr_post_test_run/action.yml +++ b/.github/actions/tr_post_test_run/action.yml @@ -16,7 +16,7 @@ runs: if: ${{ always() }} run: | export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py + python tests/end_to_end/utils/summary_helper.py --func_name "print_task_runner_score" echo "Test summary printed" shell: bash diff --git a/.github/workflows/federated_runtime.yml b/.github/workflows/federated_runtime.yml new file mode 100644 index 0000000000..717b96e490 --- /dev/null +++ b/.github/workflows/federated_runtime.yml @@ -0,0 +1,66 @@ +#--------------------------------------------------------------------------- +# Workflow to run 301_MNIST_Watermarking notebook +# Authors - Noopur, Payal Chaurasiya +#--------------------------------------------------------------------------- +name: Federated Runtime 301 MNIST Watermarking + +on: + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + + workflow_dispatch: + +permissions: + contents: read + +jobs: + test_federated_runtime_301_watermarking_notebook: + if: github.event.pull_request.draft == false + runs-on: ubuntu-22.04 + timeout-minutes: 20 + steps: + - name: Checkout OpenFL repository + uses: actions/checkout@v4.1.1 + with: + fetch-depth: 2 # needed for detecting changes + submodules: "true" + token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Python + uses: actions/setup-python@v3 + with: + python-version: "3.10" + + - name: Install dependencies # Without this step, fx command will not work + id: install_dependencies + run: | + python -m pip install --upgrade pip ipython ipykernel + pip install . + pip install -r test-requirements.txt + + - name: Run Federated Runtime 301 MNIST Watermarking via pytest + id: run_tests + run: | + python -m pytest -s tests/end_to_end/test_suites/wf_federated_runtime_tests.py -k test_federated_runtime_301_watermarking + echo "Federated Runtime 301 MNIST Watermarking test run completed" + + - name: Print test summary + id: print_test_summary + if: ${{ always() }} + run: | + export PYTHONPATH="$PYTHONPATH:." + python tests/end_to_end/utils/summary_helper.py --func_name "print_federated_runtime_score" + echo "Test summary printed" + + - name: Tar files + if: ${{ always() }} # collect artifacts regardless of failures + run: | + tar -cvf notebook_301.tar --exclude="__pycache__" $HOME/results --ignore-failed-read + echo "TAR file created" + + - name: Upload Artifacts + uses: actions/upload-artifact@v4 + if: ${{ always() }} # collect artifacts regardless of failures + with: + name: federated_runtime_301_watermarking_${{ github.run_id }} + path: notebook_301.tar diff --git a/.github/workflows/straggler-handling.yml b/.github/workflows/straggler-handling.yml index 450caf8e8a..64a2f07153 100644 --- a/.github/workflows/straggler-handling.yml +++ b/.github/workflows/straggler-handling.yml @@ -21,7 +21,7 @@ jobs: matrix: os: ['ubuntu-latest', 'windows-latest'] runs-on: ${{ matrix.os }} - timeout-minutes: 15 + timeout-minutes: 30 steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/task_runner_basic_e2e.yml b/.github/workflows/task_runner_basic_e2e.yml index b50eedd526..4c4aaa12d7 100644 --- a/.github/workflows/task_runner_basic_e2e.yml +++ b/.github/workflows/task_runner_basic_e2e.yml @@ -31,7 +31,7 @@ jobs: test_with_tls: name: tr_tls runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # There are open issues for some of the models, so excluding them for now: @@ -74,7 +74,7 @@ jobs: test_with_non_tls: name: tr_non_tls runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # Testing non TLS scenario only for torch_cnn_mnist model and python 3.10 @@ -117,7 +117,7 @@ jobs: test_with_no_client_auth: name: tr_no_client_auth runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # Testing non TLS scenario only for torch_cnn_mnist model and python 3.10 @@ -160,7 +160,7 @@ jobs: test_memory_logs: name: tr_tls_memory_logs runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 strategy: matrix: # Testing non TLS scenario only for torch_cnn_mnist model and python 3.10 diff --git a/.github/workflows/taskrunner.yml b/.github/workflows/taskrunner.yml index d003ad8e1c..088ee60c64 100644 --- a/.github/workflows/taskrunner.yml +++ b/.github/workflows/taskrunner.yml @@ -18,8 +18,8 @@ jobs: build: if: github.event.pull_request.draft == false runs-on: ubuntu-latest - timeout-minutes: 15 - + timeout-minutes: 30 + steps: - uses: actions/checkout@v3 - name: Set up Python diff --git a/.github/workflows/wf_functional_e2e.yml b/.github/workflows/wf_functional_e2e.yml index 923aa73bae..1831949299 100644 --- a/.github/workflows/wf_functional_e2e.yml +++ b/.github/workflows/wf_functional_e2e.yml @@ -29,9 +29,9 @@ env: NUM_COLLABORATORS: ${{ github.event.inputs.num_collaborators || '2' }} jobs: - test_wf_func: + test_wf_functional_local_runtime: if: github.event.pull_request.draft == false - name: wf_func + name: wf_functional_local_runtime runs-on: ubuntu-22.04 timeout-minutes: 15 strategy: @@ -74,7 +74,7 @@ jobs: if: ${{ always() }} run: | export PYTHONPATH="$PYTHONPATH:." - python tests/end_to_end/utils/summary_helper.py + python tests/end_to_end/utils/summary_helper.py --func_name "print_local_runtime_score" echo "Test summary printed" - name: Create Tar (exclude cert and data folders) diff --git a/.github/workflows/workflow_interface_101_mnist.yml b/.github/workflows/workflow_interface_101_mnist.yml index 57e1dae46e..1182a74361 100644 --- a/.github/workflows/workflow_interface_101_mnist.yml +++ b/.github/workflows/workflow_interface_101_mnist.yml @@ -17,7 +17,7 @@ jobs: run_notebook: if: github.event.pull_request.draft == false runs-on: ubuntu-22.04 - timeout-minutes: 15 + timeout-minutes: 30 steps: - name: Checkout OpenFL repository uses: actions/checkout@v4.1.1 diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/Bangalore_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/Bangalore_config.yaml similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/Bangalore_config.yaml rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/Bangalore_config.yaml diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/private_attributes.py similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/private_attributes.py rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/private_attributes.py diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/requirements.txt b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/requirements.txt similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/requirements.txt rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/requirements.txt diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/start_envoy.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/start_envoy.sh similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Bangalore/start_envoy.sh rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Bangalore/start_envoy.sh diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/Chandler_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/Chandler_config.yaml similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/Chandler_config.yaml rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/Chandler_config.yaml diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/private_attributes.py similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/private_attributes.py rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/private_attributes.py diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/requirements.txt b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/requirements.txt similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/requirements.txt rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/requirements.txt diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/start_envoy.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/start_envoy.sh similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/Chandler/start_envoy.sh rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/Chandler/start_envoy.sh diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/README.md b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/README.md similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/README.md rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/README.md diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/director_config.yaml b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/director_config.yaml similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/director_config.yaml rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/director_config.yaml diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/private_attributes.py b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/private_attributes.py similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/private_attributes.py rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/private_attributes.py diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/start_director.sh b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/start_director.sh similarity index 100% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/director/start_director.sh rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/director/start_director.sh diff --git a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/workspace/MNIST_Watermarking.ipynb b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/workspace/MNIST_Watermarking.ipynb similarity index 95% rename from openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/workspace/MNIST_Watermarking.ipynb rename to openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/workspace/MNIST_Watermarking.ipynb index 040fb2cb26..0ee4c67681 100644 --- a/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermaking/workspace/MNIST_Watermarking.ipynb +++ b/openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking/workspace/MNIST_Watermarking.ipynb @@ -39,7 +39,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "d79eacbd", "metadata": {}, "outputs": [], @@ -66,7 +66,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "f7475cba", "metadata": {}, "outputs": [], @@ -95,7 +95,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "id": "9bd8ac2d", "metadata": {}, "outputs": [], @@ -193,7 +193,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "id": "89cf4866", "metadata": {}, "outputs": [], @@ -245,7 +245,7 @@ " watermark_pretrain_optimizer=None,\n", " watermark_retrain_optimizer=None,\n", " round_number=0,\n", - " n_rounds=1,\n", + " n_rounds=3,\n", " **kwargs,\n", " ):\n", " super().__init__(**kwargs)\n", @@ -425,7 +425,20 @@ " + f\" Acc: {self.watermark_retrain_validation_score:<.6f}\")\n", " retrain_round += 1\n", "\n", - " self.next(self.end)\n", + " self.next(self.internal_loop)\n", + " \n", + " @aggregator\n", + " def internal_loop(self):\n", + " \"\"\"\n", + " Internal loop to continue the Federated Learning process.\n", + " \"\"\"\n", + " if self.round_number == self.n_rounds - 1:\n", + " print(f\"\\nCompleted training for all {self.n_rounds} round(s)\")\n", + " self.next(self.end)\n", + " else:\n", + " self.round_number += 1\n", + " print(f\"\\nCompleted round: {self.round_number}\")\n", + " self.next(self.aggregated_model_validation, foreach='collaborators')\n", "\n", " @aggregator\n", " def end(self):\n", @@ -449,7 +462,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "1715a373", "metadata": {}, "outputs": [], @@ -468,7 +481,7 @@ "federated_runtime = FederatedRuntime(\n", " collaborators=authorized_collaborators,\n", " director=director_info, \n", - " notebook_path='./MNIST_Watermarking.ipynb'\n", + " notebook_path='./MNIST_Watermarking.ipynb',\n", ")" ] }, @@ -552,7 +565,7 @@ ], "metadata": { "kernelspec": { - "display_name": "fed_run", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -566,7 +579,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.15" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/openfl/experimental/workflow/runtime/federated_runtime.py b/openfl/experimental/workflow/runtime/federated_runtime.py index b484860a8d..efa90e2a24 100644 --- a/openfl/experimental/workflow/runtime/federated_runtime.py +++ b/openfl/experimental/workflow/runtime/federated_runtime.py @@ -193,8 +193,12 @@ def get_flow_state(self) -> Tuple[bool, Any]: return status, flow_object - def get_envoys(self) -> None: - """Prints the status of Envoys in a formatted way.""" + def get_envoys(self) -> List[str]: + """ + Prints the status of Envoys in a formatted way. + Returns: + online_envoys (List[str]): List of online envoys. + """ # Fetch envoy data envoys = self._dir_client.get_envoys() DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" @@ -204,6 +208,7 @@ def get_envoys(self) -> None: headers = ["Name", "Online", "Last Updated", "Experiment Running", "Experiment Name"] # Prepare the table rows rows = [] + online_envoys = [] for envoy in envoys.envoy_infos: rows.append( [ @@ -214,11 +219,15 @@ def get_envoys(self) -> None: envoy.experiment_name if envoy.experiment_name else "None", ] ) + if envoy.is_online: + online_envoys.append(envoy.envoy_name) + # Use tabulate to format the table result = tabulate(rows, headers=headers, tablefmt="grid") # Display the current timestamp print(f"Status of Envoys connected to Federation at: {now}\n") print(result) + return online_envoys def stream_experiment_stdout(self, experiment_name) -> None: """Stream experiment stdout. diff --git a/test-requirements.txt b/test-requirements.txt index c047afcbdf..b07ea268b9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,4 +6,5 @@ pytest-asyncio==0.25.2 pytest-mock==3.14.0 defusedxml==0.7.1 matplotlib==3.10.0 -fpdf==1.7.2 \ No newline at end of file +fpdf==1.7.2 +papermill==2.6.0 diff --git a/tests/end_to_end/pytest.ini b/tests/end_to_end/pytest.ini index ed865c99c6..e26b1337c6 100644 --- a/tests/end_to_end/pytest.ini +++ b/tests/end_to_end/pytest.ini @@ -8,5 +8,6 @@ markers = log_memory_usage: mark a test as a log memory usage test. task_runner_basic: mark a test as a task runner basic test. task_runner_dockerized_ws: mark a test as a task runner dockerized workspace test. + federated_runtime_301_watermarking: mark a test as a federated runtime 301 watermarking test. asyncio_mode=auto asyncio_default_fixture_loop_scope="function" diff --git a/tests/end_to_end/test_suites/wf_federated_runtime_tests.py b/tests/end_to_end/test_suites/wf_federated_runtime_tests.py new file mode 100644 index 0000000000..18d9f681b2 --- /dev/null +++ b/tests/end_to_end/test_suites/wf_federated_runtime_tests.py @@ -0,0 +1,94 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import pytest +import logging +import os +import time +import concurrent.futures + +import tests.end_to_end.utils.federation_helper as fh + +log = logging.getLogger(__name__) + + +@pytest.mark.federated_runtime_301_watermarking +def test_federated_runtime_301_watermarking(request): + """ + Test federated runtime without TLS. + Args: + request (Fixture): Pytest fixture + """ + envoys = ["Bangalore", "Chandler"] + workspace_path = os.path.join( + os.getcwd(), + "openfl-tutorials/experimental/workflow/FederatedRuntime/301_MNIST_Watermarking", + ) + # Activate the experimental feature + cmd = f"fx experimental activate" + error_msg = "Failed to activate the experimental feature" + return_code, output, error = fh.run_command( + cmd, + workspace_path=workspace_path, + error_msg=error_msg, + return_error=True, + ) + + if error: + # Check if the experimental feature is already activated + if [err for err in error if "No such command 'activate'" in err]: + log.info("Experimental feature already activated. Ignore the error.") + else: + log.error(f"{error_msg}: {error}") + raise Exception(error) + + log.info(f"Activated the experimental feature.") + + # Create result log files for the director and envoys + result_path, participant_res_files = fh.create_federated_runtime_participant_res_files( + request.config.results_dir, envoys + ) + + # Start the director + fh.start_director(workspace_path, participant_res_files["director"]) + + # Start envoys Bangalore and Chandler and connect them to the director + executor = concurrent.futures.ThreadPoolExecutor() + results = [ + executor.submit( + fh.start_envoy, + envoy_name=envoy, + workspace_path=workspace_path, + res_file=participant_res_files[envoy.lower()], + ) + for envoy in envoys + ] + assert all([f.result() for f in results]), "Failed to start one or more envoys" + + # Based on the pattern, the envoys take time to connect to the director + # Hence, adding a sleep of 10 seconds anyways. + time.sleep(10) + nb_workspace_path = os.path.join(workspace_path, "workspace") + notebook_path = nb_workspace_path + "/" + "MNIST_Watermarking.ipynb" + + assert fh.check_envoys_director_conn_federated_runtime( + notebook_path=notebook_path, expected_envoys=envoys + ), "Envoys are not connected to the director" + + # IMP - Notebook 301 Watermarking has hard coded notebook path set, hence changing the directory + # This might not be true for all notebooks, thus keeping it as a separate step + os.chdir(nb_workspace_path) + + assert fh.run_notebook( + notebook_path=notebook_path, + output_notebook_path=result_path + "/" + "MNIST_Watermarking_output.ipynb" + ), "Notebook run failed" + + # Change the directory back to the original directory + os.chdir(os.getcwd()) + + assert fh.verify_federated_runtime_experiment_completion( + participant_res_files + ), "Experiment failed" + + log.info("Experiment completed successfully") diff --git a/tests/end_to_end/utils/exceptions.py b/tests/end_to_end/utils/exceptions.py index 4cccce0e5f..31fa596ac0 100644 --- a/tests/end_to_end/utils/exceptions.py +++ b/tests/end_to_end/utils/exceptions.py @@ -71,3 +71,18 @@ class WorkspaceLoadException(Exception): class ReferenceFlowException(Exception): """Exception for reference flow""" pass + + +class NotebookRunException(Exception): + """Exception for notebook run""" + pass + + +class EnvoyStartException(Exception): + """Exception for envoy start""" + pass + + +class DirectorStartException(Exception): + """Exception for director start""" + pass diff --git a/tests/end_to_end/utils/federation_helper.py b/tests/end_to_end/utils/federation_helper.py index 698e580179..50910c4f2e 100644 --- a/tests/end_to_end/utils/federation_helper.py +++ b/tests/end_to_end/utils/federation_helper.py @@ -7,6 +7,7 @@ import os import json import re +import papermill as pm from pathlib import Path import tests.end_to_end.utils.constants as constants @@ -16,6 +17,7 @@ from tests.end_to_end.models import collaborator as col_model log = logging.getLogger(__name__) +home_dir = Path().home() def setup_pki_for_collaborators(collaborators, model_owner, local_bind_path): @@ -542,6 +544,7 @@ def run_command( bg_file=None, print_output=False, with_docker=False, + return_error=False, ): """ Run the command @@ -553,6 +556,7 @@ def run_command( bg_file (str): Background file (with path) print_output (bool): Print the output with_docker (bool): Flag specific to dockerized workspace scenario. Default is False. + return_error (bool): Return error message Returns: tuple: Return code, output and error """ @@ -591,7 +595,7 @@ def run_command( ) else: return_code, output, error = ssh.run_command(command) - if return_code != 0: + if return_code != 0 and not return_error: log.error(f"{error_msg}: {error}") raise Exception(f"{error_msg}: {error}") @@ -752,3 +756,185 @@ def start_docker_containers_for_dws( raise ex.DockerException( f"Failed to start {participant.name} docker environment: {e}" ) + + +def start_director(workspace_path, dir_res_file): + """ + Start the director. + Args: + workspace_path (str): Workspace path + dir_res_file (str): Director result file + Returns: + bool: True if successful, else False + """ + try: + error_msg = "Failed to start the director" + return_code, output, error = run_command( + "./start_director.sh", + error_msg=error_msg, + workspace_path=os.path.join(workspace_path, "director"), + run_in_background=True, + bg_file=dir_res_file, + ) + log.debug(f"Director start: Return code: {return_code}, Output: {output}, Error: {error}") + log.info( + "Waiting for 30s for the director to start. With no retry mechanism in place, " + "envoys will fail immediately if the director is not ready." + ) + time.sleep(30) + except ex.DirectorStartException as e: + raise e + return True + + +def start_envoy(envoy_name, workspace_path, res_file): + """ + Start given envoy. + Args: + envoy_name (str): Name of the envoy. For e.g. Bangalore, Chandler (case sensitive) + workspace_path (str): Workspace path + res_file (str): Result file to track the logs. + Returns: + bool: True if successful, else False + """ + try: + error_msg = f"Failed to start {envoy_name} envoy" + return_code, output, error = run_command( + f"./start_envoy.sh {envoy_name} {envoy_name}_config.yaml", + error_msg=error_msg, + workspace_path=os.path.join(workspace_path, envoy_name), + run_in_background=True, + bg_file=res_file, + ) + log.debug(f"{envoy_name} start: Return code: {return_code}, Output: {output}, Error: {error}") + except ex.EnvoyStartException as e: + raise e + return True + + +def create_federated_runtime_participant_res_files(results_dir, envoys, model_name="301_mnist_watermarking"): + """ + Create result log files for the director and envoys. + Args: + results_dir (str): Results directory + envoys (list): List of envoys + model_name (str): Model name + Returns: + tuple: Result path and participant result files (including director) + """ + participant_res_files = {} + result_path = os.path.join( + home_dir, results_dir, model_name + ) + os.makedirs(result_path, exist_ok=True) + + for participant in envoys + ["director"]: + res_file = os.path.join(result_path, f"{participant.lower()}.log") + participant_res_files[participant.lower()] = res_file + # Create the file + open(res_file, 'w').close() + + + return result_path, participant_res_files + + +def check_envoys_director_conn_federated_runtime( + notebook_path, expected_envoys, director_node_fqdn="localhost", director_port=50050 +): + """ + Function to check if the envoys are connected to the director for Federated Runtime notebooks. + Args: + notebook_path (str): Path to the notebook + expected_envoys (list): List of expected envoys + director_node_fqdn (str): Director node FQDN + director_port (int): Director port + Returns: + bool: True if all the envoys are connected to the director, else False + """ + from openfl.experimental.workflow.runtime import FederatedRuntime + + # Number of retries and delay between retries in seconds + MAX_RETRIES = RETRY_DELAY = 5 + + federated_runtime = FederatedRuntime( + collaborators=expected_envoys, + director={ + "director_node_fqdn": director_node_fqdn, + "director_port": director_port, + }, + notebook_path=notebook_path, + ) + # Retry logic + for attempt in range(MAX_RETRIES): + actual_envoys = federated_runtime.get_envoys() + if all( + sorted(expected_envoys) == sorted(actual_envoys) + for expected_envoys, actual_envoys in [(expected_envoys, actual_envoys)] + ): + log.info("All the envoys are connected to the director") + return True + else: + log.warning( + f"Attempt {attempt + 1}/{MAX_RETRIES}: Not all envoys are connected. Retrying in {RETRY_DELAY} seconds..." + ) + time.sleep(RETRY_DELAY) + + return False + + +def run_notebook(notebook_path, output_notebook_path): + """ + Function to run the notebook. + Args: + notebook_path (str): Path to the notebook + participant_res_files (dict): Dictionary containing participant names and their result log files + Returns: + bool: True if successful, else False + """ + try: + log.info(f"Running the notebook: {notebook_path} with output notebook path: {output_notebook_path}") + output = pm.execute_notebook( + input_path=notebook_path, + output_path=output_notebook_path, + request_save_on_cell_execute=True, + autosave_cell_every=5, # autosave every 5 seconds + log_output=True, + ) + except pm.exceptions.PapermillExecutionError as e: + log.error(f"PapermillExecutionError: {e}") + raise e + + except ex.NotebookRunException as e: + log.error(f"Failed to run the notebook: {e}") + raise e + return True + + +def verify_federated_runtime_experiment_completion(participant_res_files): + """ + Verify the completion of the experiment using the participant logs. + Args: + participant_res_files (dict): Dictionary containing participant names and their result log files + Returns: + bool: True if successful, else False + """ + # Check participant logs for successful completion + for name, result_file in participant_res_files.items(): + # Do not open file here as it will be opened in the loop below + # Also it takes time for the federation run to start and write the logs + with open(result_file, "r") as file: + lines = [line.strip() for line in file.readlines()] + last_7_lines = list(filter(str.rstrip, lines))[-7:] + if ( + name == "director" + and [1 for content in last_7_lines if "Experiment FederatedFlow_MNIST_Watermarking was finished successfully" in content] + ): + log.debug(f"Process completed for {name}") + continue + elif name != "director" and [1 for content in last_7_lines if "End of Federation reached." in content]: + log.debug(f"Process completed for {name}") + continue + else: + log.error(f"Process failed for {name}") + return False + return True diff --git a/tests/end_to_end/utils/summary_helper.py b/tests/end_to_end/utils/summary_helper.py index cfdbc17a9e..0ed8aa0a3a 100644 --- a/tests/end_to_end/utils/summary_helper.py +++ b/tests/end_to_end/utils/summary_helper.py @@ -1,9 +1,11 @@ -# Copyright 2020-2023 Intel Corporation +# Copyright 2020-2025 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import argparse from defusedxml.ElementTree import parse as defused_parse from lxml import etree import os +import re from pathlib import Path import tests.end_to_end.utils.constants as constants @@ -100,7 +102,7 @@ def get_testcase_result(): return database_list -def main(): +def print_task_runner_score(): """ Main function to get the test case results and aggregator logs And write the results to GitHub step summary @@ -166,5 +168,68 @@ def main(): ) +def print_federated_runtime_score(): + summary_file = os.getenv("GITHUB_STEP_SUMMARY") + + search_string = "Aggregated model validation score" + + last_occurrence = aggregated_model_score = None + + # Assumption - result directory is present in the home directory + dir_res_file = os.path.join( + result_path, + "301_mnist_watermarking", + "director.log", + ) + + # Open and read the log file + with open(dir_res_file, "r") as file: + for line in file: + if search_string in line: + last_occurrence = line + + # Extract the value from the last occurrence + if last_occurrence: + match = re.search( + r"Aggregated model validation score = (\d+\.\d+)", last_occurrence + ) + if match: + aggregated_model_score = match.group(1) + print(f"Last Aggregated model validation score: {aggregated_model_score}") + else: + print("No valid score found in the last occurrence.") + else: + print(f"No occurrences of '{search_string}' found in the log file.") + + # Write the results to GitHub step summary file + # This file is created at runtime by the GitHub action, thus we cannot verify its existence beforehand + with open(summary_file, "a") as fh: + # DO NOT change the print statements + print("| Aggregated model validation score |", file=fh) + print("| ------------- |", file=fh) + print(f"| {aggregated_model_score} |", file=fh) + + +def fetch_args(): + """ + Function to fetch the commandline arguments. + Returns: + Parsed arguments + """ + # Initialize the parser and add arguments + parser = argparse.ArgumentParser() + parser.add_argument( + "--func_name", required=True, default="", type=str, help="Name of function to be called" + ) + args = parser.parse_args() + return args + + if __name__ == "__main__": - main() + # Fetch input arguments + args = fetch_args() + func_name = args.func_name + if func_name in ["print_task_runner_score", "print_local_runtime_score"]: + print_task_runner_score() + elif func_name == "print_federated_runtime_score": + print_federated_runtime_score() diff --git a/tests/end_to_end/utils/wf_common_fixtures.py b/tests/end_to_end/utils/wf_common_fixtures.py index 44e6629990..2243ec5ccd 100644 --- a/tests/end_to_end/utils/wf_common_fixtures.py +++ b/tests/end_to_end/utils/wf_common_fixtures.py @@ -142,14 +142,3 @@ def fx_local_federated_workflow_prvt_attr(request): collaborators=collaborators_list, runtime=local_runtime, ) - - -@pytest.fixture(scope="function") -def fx_federated_runtime(request): - request.config.test_env = "workflow_federation_runtime" - - envoys = ["Portland, Seattle, Chandler, Bangalore"] - return workflow_federated_runtime_fixture( - director="director", - envoys=envoys - )