Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/research/fed-bpt/transformers…
Browse files Browse the repository at this point in the history
…-4.48.0
  • Loading branch information
ZiyueXu77 authored Feb 24, 2025
2 parents f718871 + 8a9d67f commit 10f5b68
Show file tree
Hide file tree
Showing 252 changed files with 14,200 additions and 2,413 deletions.
27 changes: 21 additions & 6 deletions docs/user_guide/configurations/logging_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ Logging Configuration and Features
Default Logging Configuration
=============================

The default logging configuration json file **log_config.json.default** is divided into 3 main sections: formatters, handlers, and loggers.
The default logging configuration json file (**log_config.json.default**, ``default``) is divided into 3 main sections: formatters, handlers, and loggers.
This file can be found at :github_nvflare_link:`log_config.json <nvflare/fuel/utils/log_config.json>`.
See the `configuration dictionary schema <(https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema)>`_ for more details.

.. code-block:: json
Expand Down Expand Up @@ -255,7 +256,7 @@ The following log file handlers are pre-configured:
- jsonFileHandler with jsonFormatter to write json formatted logs to ``log.json``
- FLFileHandler with baseFormatter and FLFilter to write fl training and custom logs to ``log_fl.txt``


.. _loggers:
Loggers
=======

Expand Down Expand Up @@ -288,7 +289,7 @@ Modifying Logging Configurations
Simulator log configuration
===========================

Users can specify a log configuration file in the simulator command with the ``-l`` simulator argument:
Users can specify a log configuration in the simulator command with the ``-l`` simulator argument:

.. code-block:: shell
Expand All @@ -301,6 +302,13 @@ Or using the ``log_config`` argument of the Job API simulator run:
job.simulator_run("/tmp/nvflare/hello-numpy-sag", log_config="log_config.json")
The log config argument be one of the following:

- path to a log configuration json file (``/path/to/my_log_config.json``)
- preconfigured log mode (``default``, ``concise``, ``verbose``)
- log level name or number (``debug``, ``info``, ``warning``, ``error``, ``critical``, ``30``)


POC log configurations
======================
If you search the POC workspace, you will find the following:
Expand Down Expand Up @@ -342,16 +350,23 @@ We also recommend using the :ref:`Dynamic Logging Configuration Commands <dynami
Dynamic Logging Configuration Commands
**************************************

We provide two admin commands to enable users to dynamically configure the site or job level logging.
When running the FLARE system (POC mode or production mode), there are two sets of logs: the site logs and job logs.
The current site log configuration will be used for the site logs as well as the log config of any new job started on that site.
In order to access the generated logs in the workspaces refer to :ref:`access_server_workspace` and :ref:`client_workspace`.

We provide two admin commands to enable users to dynamically configure the site or job level logging when running the FLARE system.
Note these command effects will last until reconfiguration or as long as the corresponding site or job is running.
However these commands do not overwrite the log configuration file in the workspace- the log configuration file can be reloaded using "reload".

- **target**: ``server``, ``client <clients>...``, or ``all``
- **config**: log configuration

- path to a json log configuration file (``/path/to/my_log_config.json``)
- predefined log mode (``default``, ``concise``, ``verbose``)
- log level name/number (``debug``, ``INFO``, ``30``)
- read the current log configuration file (``reload``)
- read the current log configuration file from the workspace (``reload``)

To configure the target site logging (does not affect jobs):
To configure the target site logging (does not affect currently running jobs):

.. code-block:: shell
Expand Down
587 changes: 10 additions & 577 deletions examples/advanced/distributed_optimization/1-consensus/tutorial.ipynb

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions examples/advanced/distributed_optimization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ In this repo we provide the following examples:
## Implementation walkthrough
Let's now walk through how to use NVFlare to implement custom peer-to-peer (P2P) algorithms, opening the road to easily implement custom distributed optimization and swarm learning workflows.
Specifically, we'll delve into using some lower-level NVFlare APIs to create a controllers and executors, which serve as the backbone for orchestrating communication and computation across different nodes (clients) in a distributed setup.
As an example, we'll demonstrate how to implement a consensus algorithm using these components and we'll show it in action in the next notebook.
As an example, we'll demonstrate how to implement a consensus algorithm using these components.

As said, the final implementation is in the `nvflare.app_opt.p2p` module - we'll refer to the specific files along the notebook.

Expand All @@ -35,9 +35,9 @@ NVFlare natively supports various communication and orchestration patterns, incl

To implement custom P2P/distributed optimization algorithms, we'll delve into its lower level APIs to build a framework facilitate building P2P algorithms. In particular, we'll use

- [Controllers](https://nvflare.readthedocs.io/en/2.5/apidocs/nvflare.apis.impl.controller.html#module-nvflare.apis.impl.controller): Server-side components that manage job execution and orchestrate tasks.
- [Executors](https://nvflare.readthedocs.io/en/2.5/apidocs/nvflare.apis.executor.html#module-nvflare.apis.executor): Client-side components that perform computations and handle tasks received from the controller.
- [Messages via aux channes](https://nvflare.readthedocs.io/en/2.5/apidocs/nvflare.private.aux_runner.html#nvflare.private.aux_runner.AuxRunner.send_aux_request): Custom messages that enable direct communication between clients.
- [Controllers](https://nvflare.readthedocs.io/en/main/apidocs/nvflare.apis.impl.controller.html#module-nvflare.apis.impl.controller): Server-side components that manage job execution and orchestrate tasks.
- [Executors](https://nvflare.readthedocs.io/en/main/apidocs/nvflare.apis.executor.html#module-nvflare.apis.executor): Client-side components that perform computations and handle tasks received from the controller.
- [Messages via aux channes](https://nvflare.readthedocs.io/en/main/apidocs/nvflare.private.aux_runner.html#nvflare.private.aux_runner.AuxRunner.send_aux_request): Custom messages that enable direct communication between clients.

#### What to expect
We'll start by defining a way to easily define and share configurations across the network. Then we'll implement a base controller and executor, serving as the backbone to implement arbitrary p2p algorithms. Finally we'll build upon the base executor to implement a specific algorithm, the Consensus algorithm.
Expand Down Expand Up @@ -298,6 +298,7 @@ from collections import defaultdict

from nvflare.apis.dxo import DXO, DataKind
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import ReservedKey
from nvflare.apis.signal import Signal


Expand Down Expand Up @@ -343,7 +344,7 @@ class SyncAlgorithmExecutor(Executor):
def _handle_neighbor_value(
self, topic: str, request: Shareable, fl_ctx: FLContext
) -> Shareable:
sender = request.get_peer_props()["__identity_name__"]
sender = request.get_peer_prop(key=ReservedKey.IDENTITY_NAME, default=None)
data = from_shareable(request).data
iteration = data["iteration"]

Expand Down
9 changes: 8 additions & 1 deletion examples/advanced/sklearn-kmeans/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,16 @@ Below is a sample config for site-1, saved to `/tmp/nvflare/workspace/jobs/kmean
}
```

Alternative to using Learner+Executor as above, we can also use [ClientAPI](https://github.com/NVIDIA/NVFlare/blob/main/docs/programming_guide/execution_api_type/client_api.rst)
to run the federated training:
```commandline
python kmeans_job_clientapi.py --num_clients 3 --split_mode uniform --workspace_dir "/tmp/nvflare/workspace/works/kmeans_clientapi" --job_dir "/tmp/nvflare/workspace/jobs/kmeans_clientapi"
```


The resulting curve for `homogeneity_score` is
![minibatch curve](./figs/minibatch.png)
It can be visualized using
```commandline
tensorboard --logdir /tmp/nvflare/workspace/works/kmeans/sklearn_kmeans_uniform_3_clients
```
```
Binary file modified examples/advanced/sklearn-kmeans/figs/minibatch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
93 changes: 2 additions & 91 deletions examples/advanced/sklearn-kmeans/kmeans_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

import argparse
import os
from enum import Enum
from typing import List

import numpy as np
from src.kmeans_assembler import KMeansAssembler
from src.kmeans_learner import KMeansLearner
from utils.split_data import split_data

from nvflare import FedJob
from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator
Expand All @@ -29,92 +27,6 @@
from nvflare.app_opt.sklearn.sklearn_executor import SKLearnExecutor


class SplitMethod(Enum):
UNIFORM = "uniform"
LINEAR = "linear"
SQUARE = "square"
EXPONENTIAL = "exponential"


def get_split_ratios(site_num: int, split_method: SplitMethod):
if split_method == SplitMethod.UNIFORM:
ratio_vec = np.ones(site_num)
elif split_method == SplitMethod.LINEAR:
ratio_vec = np.linspace(1, site_num, num=site_num)
elif split_method == SplitMethod.SQUARE:
ratio_vec = np.square(np.linspace(1, site_num, num=site_num))
elif split_method == SplitMethod.EXPONENTIAL:
ratio_vec = np.exp(np.linspace(1, site_num, num=site_num))
else:
raise ValueError(f"Split method {split_method.name} not implemented!")

return ratio_vec


def split_num_proportion(n, site_num, split_method: SplitMethod) -> List[int]:
split = []
ratio_vec = get_split_ratios(site_num, split_method)
total = sum(ratio_vec)
left = n
for site in range(site_num - 1):
x = int(n * ratio_vec[site] / total)
left = left - x
split.append(x)
split.append(left)
return split


def assign_data_index_to_sites(
data_size: int,
valid_fraction: float,
num_sites: int,
split_method: SplitMethod = SplitMethod.UNIFORM,
) -> dict:
if valid_fraction > 1.0:
raise ValueError("validation percent should be less than or equal to 100% of the total data")
elif valid_fraction < 1.0:
valid_size = int(round(data_size * valid_fraction, 0))
train_size = data_size - valid_size
else:
valid_size = data_size
train_size = data_size

site_sizes = split_num_proportion(train_size, num_sites, split_method)
split_data_indices = {
"valid": {"start": 0, "end": valid_size},
}
for site in range(num_sites):
site_id = site + 1
if valid_fraction < 1.0:
idx_start = valid_size + sum(site_sizes[:site])
idx_end = valid_size + sum(site_sizes[: site + 1])
else:
idx_start = sum(site_sizes[:site])
idx_end = sum(site_sizes[: site + 1])
split_data_indices[site_id] = {"start": idx_start, "end": idx_end}

return split_data_indices


def get_file_line_count(input_path: str) -> int:
count = 0
with open(input_path, "r") as fp:
for i, _ in enumerate(fp):
count += 1
return count


def split_data(
data_path: str,
num_clients: int,
valid_frac: float,
split_method: SplitMethod = SplitMethod.UNIFORM,
):
size_total_file = get_file_line_count(data_path)
site_indices = assign_data_index_to_sites(size_total_file, valid_frac, num_clients, split_method)
return site_indices


def define_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
Expand All @@ -133,7 +45,7 @@ def define_parser():
"--data_path",
type=str,
default="/tmp/nvflare/dataset/sklearn_iris.csv",
help="work directory, default to '/tmp/nvflare/dataset/sklearn_iris.csv'",
help="data path, default to '/tmp/nvflare/dataset/sklearn_iris.csv'",
)
parser.add_argument(
"--num_clients",
Expand Down Expand Up @@ -209,7 +121,6 @@ def main():
data_path,
num_clients,
valid_frac,
SplitMethod(split_mode),
)

for i in range(1, num_clients + 1):
Expand Down
Loading

0 comments on commit 10f5b68

Please sign in to comment.