Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge Components #3275

Merged
merged 16 commits into from
Mar 3, 2025
67 changes: 67 additions & 0 deletions examples/advanced/edge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Running Edge Example with Hierarchical Clients

Please follow these steps to run the Edge device emulator,

## Provision

Use tree_prov.py to generate a hierarchical NVFlare system with 2 levels and 2 clients at each level,

python nvflare/edge/tree_prov.py -p /tmp/edge_example -d 1 -w 2

This will create a deployment with 2 clients, 4 leaf-clients, 2 relays, 1 server.

This file needs to be copied to the `local` folder of each leaf clients, C11, C12, C21 and C22.

`edge__p_resources.json`:

```
{
"format_version": 2,
"components": [
{
"id": "web_agent",
"path": "nvflare.edge.widgets.web_agent.WebAgent",
"args": {}
},
{
"id": "etd",
"path": "nvflare.edge.widgets.etd.EdgeTaskDispatcher",
"args": {}
}
]
}
```

To start the system, just run the following command in the prepared workspace,
cd /tmp/edge_example/prod_00
./start_all.sh

## Starting Web Proxy

To route devices to different LCP, routing_proxy is used. It's a simple proxy that routes the request to
different LCP based on checksum of the device ID. It can be started like this,

python nvflare/edge/web/routing_proxy.py 8000 /tmp/edge_example/lcp_map.json

The lcp_map.json file is generated by tree_prov.py.

## Example Job

The `hello_mobile` is a very simple job to test the edge functions. It only sends one task "train" and
print the aggregated results.

The job can be started as usual in NVFlare admin console.

## Run Edge Emulator

The emulator can be used to test all the features of the edge system. It handles 'train' task by simply doubling every values
in the weights.

To start the emulator, give it an endpoint URL and number of devices like this,

python /nvflare/edge/emulator/run_emulator.py http://localhost:8000 16

The emulator keeps polling the LCP for job assignment. It only runs one job then quits.



Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"format_version": 2,
"num_rounds": 3,
"executors": [
{
"tasks": [
"train"
],
"executor": {
"id": "Executor",
"path": "nvflare.app_common.executors.ham.HierarchicalAggregationManager",
"args": {
"learner_id": "learner",
"aggregator_id": "aggregator",
"aggr_timeout": 60,
"min_responses": 2,
"wait_time_after_min_resps_received": 5
}
}
}
],
"components": [
{
"id": "learner",
"path": "nvflare.edge.executors.edge_dispatch_executor.EdgeDispatchExecutor",
"args": {
"wait_time": 60,
"min_devices": 2,
"aggregator_id": "aggregator"
}
},
{
"id": "aggregator",
"path": "nvflare.edge.aggregators.edge_result_accumulator.EdgeResultAccumulator",
"args": {}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"format_version": 2,
"num_rounds": 2,
"workflows": [
{
"id": "edge_controller",
"path": "edge_controller.SimpleEdgeController",
"args": {
"num_rounds": "{num_rounds}",
"initial_weights": [
1.0, 2.0, 3.0, 4.0
]
}
}
]
}
135 changes: 135 additions & 0 deletions examples/advanced/edge/jobs/hello_mobile/app/custom/edge_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any

from nvflare.apis.controller_spec import ClientTask, Task
from nvflare.apis.fl_constant import ReturnCode
from nvflare.apis.fl_context import FLContext
from nvflare.apis.impl.controller import Controller
from nvflare.apis.shareable import Shareable
from nvflare.apis.signal import Signal
from nvflare.app_common.app_constant import AppConstants
from nvflare.app_common.app_event_type import AppEventType
from nvflare.edge.aggregators.edge_result_accumulator import EdgeResultAccumulator
from nvflare.security.logging import secure_format_exception


class SimpleEdgeController(Controller):

def __init__(self, num_rounds: int, initial_weights: Any):
super().__init__()
self.num_rounds = num_rounds
self.current_round = None
self.initial_weights = initial_weights
self.aggregator = None

def start_controller(self, fl_ctx: FLContext) -> None:
self.log_info(fl_ctx, "Initializing Simple mobile workflow.")
self.aggregator = EdgeResultAccumulator()

# initialize global model
fl_ctx.set_prop(AppConstants.START_ROUND, 1, private=True, sticky=True)
fl_ctx.set_prop(AppConstants.NUM_ROUNDS, self.num_rounds, private=True, sticky=False)
fl_ctx.set_prop(AppConstants.GLOBAL_MODEL, self.initial_weights, private=True, sticky=True)
self.fire_event(AppEventType.INITIAL_MODEL_LOADED, fl_ctx)

def stop_controller(self, fl_ctx: FLContext):
self.log_info(fl_ctx, "Stopping Simple mobile workflow.")

def control_flow(self, abort_signal: Signal, fl_ctx: FLContext) -> None:
try:

self.log_info(fl_ctx, "Beginning mobile training phase.")

fl_ctx.set_prop(AppConstants.NUM_ROUNDS, self.num_rounds, private=True, sticky=False)
self.fire_event(AppEventType.TRAINING_STARTED, fl_ctx)

weights = self.initial_weights
for i in range(self.num_rounds):

self.current_round = i
if abort_signal.triggered:
return

self.log_info(fl_ctx, f"Round {self.current_round} started.")
fl_ctx.set_prop(AppConstants.GLOBAL_MODEL, self.initial_weights, private=True, sticky=True)
fl_ctx.set_prop(AppConstants.CURRENT_ROUND, self.current_round, private=True, sticky=True)
self.fire_event(AppEventType.ROUND_STARTED, fl_ctx)

# Create train_task
task_data = Shareable()
task_data["weights"] = weights
task_data["task_done"] = self.current_round >= (self.num_rounds - 1)
task_data.set_header(AppConstants.CURRENT_ROUND, self.current_round)
task_data.set_header(AppConstants.NUM_ROUNDS, self.num_rounds)
task_data.add_cookie(AppConstants.CONTRIBUTION_ROUND, self.current_round)

train_task = Task(
name="train",
data=task_data,
result_received_cb=self.process_train_result,
)

self.broadcast_and_wait(
task=train_task,
min_responses=1,
wait_time_after_min_received=30,
fl_ctx=fl_ctx,
abort_signal=abort_signal,
)

if abort_signal.triggered:
return

self.log_info(fl_ctx, "Start aggregation.")
self.fire_event(AppEventType.BEFORE_AGGREGATION, fl_ctx)
aggr_result = self.aggregator.aggregate(fl_ctx)
weights = aggr_result.get("weights")
self.log_info(fl_ctx, f"Aggregation result: {aggr_result}")
fl_ctx.set_prop(AppConstants.AGGREGATION_RESULT, aggr_result, private=True, sticky=False)
self.fire_event(AppEventType.AFTER_AGGREGATION, fl_ctx)
self.log_info(fl_ctx, "End aggregation.")

if abort_signal.triggered:
return

final_weights = aggr_result.get("weights", None)
self.log_info(fl_ctx, f"Finished Mobile Training. Final weights: {final_weights}")
except Exception as e:
error_msg = f"Exception in mobile control_flow: {secure_format_exception(e)}"
self.log_exception(fl_ctx, error_msg)
self.system_panic(error_msg, fl_ctx)

def process_train_result(self, client_task: ClientTask, fl_ctx: FLContext) -> None:
result = client_task.result
client_name = client_task.client.name
rc = result.get_return_code()

# Raise errors if bad peer context or execution exception.
if rc and rc != ReturnCode.OK:
self.system_panic(
f"Result from {client_name} is bad, error code: {rc}. "
f"{self.__class__.__name__} exiting at round {self.current_round}.",
fl_ctx=fl_ctx,
)

return

self.log_info(fl_ctx, f"Weights: {result.get('weights', None)}")

accepted = self.aggregator.accept(result, fl_ctx)
accepted_msg = "ACCEPTED" if accepted else "REJECTED"
self.log_info(
fl_ctx, f"Contribution from {client_name} {accepted_msg} by the aggregator at round {self.current_round}."
)
9 changes: 9 additions & 0 deletions examples/advanced/edge/jobs/hello_mobile/meta.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"name": "hello_mobile",
"resource_spec": {},
"deploy_map": {
"app": ["@ALL"]
},
"min_clients": 2,
"edge_method": "cnn"
}
6 changes: 3 additions & 3 deletions nvflare/app_common/executors/ham.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ def _do_execute(self, fl_ctx: FLContext, abort_signal: Signal) -> Shareable:
return make_reply(ReturnCode.EXECUTION_EXCEPTION)

if received == 0:
# nothing received!
self.log_info(fl_ctx, "nothing received - timeout")
return make_reply(ReturnCode.TIMEOUT)
# nothing received! This maybe ok
self.log_warning(fl_ctx, "nothing received - timeout")
# return make_reply(ReturnCode.TIMEOUT)

try:
self.log_info(fl_ctx, "return aggregation result")
Expand Down
50 changes: 50 additions & 0 deletions nvflare/edge/aggregators/edge_result_accumulator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np

from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable
from nvflare.app_common.abstract.aggregator import Aggregator


class EdgeResultAccumulator(Aggregator):
def __init__(self):
Aggregator.__init__(self)
self.weights = None
self.num_devices = 0

def accept(self, shareable: Shareable, fl_ctx: FLContext) -> bool:
self.log_info(fl_ctx, f"Accepting: {shareable}")

w = shareable.get("weights")
if w is None:
return True

self.num_devices += 1
if self.weights is None:
self.weights = w
else:
result = np.add(self.weights, w)
if isinstance(result, np.ndarray):
self.weights = result.tolist()
else:
self.weights = result
return True

def reset(self, fl_ctx: FLContext):
self.weights = None
self.num_devices = 0

def aggregate(self, fl_ctx: FLContext) -> Shareable:
return Shareable({"weights": self.weights, "num_devices": self.num_devices})
Loading
Loading