From 543bfce18b32ef79a280eb02b46f5898521a5155 Mon Sep 17 00:00:00 2001 From: MihirGore23 Date: Wed, 10 Jul 2024 15:35:49 +0530 Subject: [PATCH 1/6] Made corrections to physical visualtisation to get it running --- .../launcher/launcher_robot_display_view.py | 10 +++++----- .../launcher/launcher_visualization.py | 6 ++++-- manager/manager/launcher/launcher_world.py | 20 +++++++++++++++++-- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/manager/manager/launcher/launcher_robot_display_view.py b/manager/manager/launcher/launcher_robot_display_view.py index bb72abf..d164e10 100644 --- a/manager/manager/launcher/launcher_robot_display_view.py +++ b/manager/manager/launcher/launcher_robot_display_view.py @@ -4,16 +4,16 @@ import time import os import stat - +from typing import List, Any class LauncherRobotDisplayView(ILauncher): display: str - internal_port: str - external_port: str + internal_port: int + external_port: int height: int width: int - running = False - threads = [] + running: bool = False + threads: List[Any] = [] def run(self, callback): DRI_PATH = os.path.join("/dev/dri", os.environ.get("DRI_NAME", "card0")) diff --git a/manager/manager/launcher/launcher_visualization.py b/manager/manager/launcher/launcher_visualization.py index 9f9b38c..8aef2ea 100644 --- a/manager/manager/launcher/launcher_visualization.py +++ b/manager/manager/launcher/launcher_visualization.py @@ -1,6 +1,6 @@ from src.manager.libs.process_utils import get_class, class_from_module -from typing import Optional -from pydantic import BaseModel +from typing import Optional, ClassVar +from pydantic import BaseModel, ConfigDict from src.manager.libs.process_utils import get_class, class_from_module, get_ros_version @@ -82,6 +82,7 @@ ], "physic_rae": [ { + "type": "module", "module": "console", "display": ":1", "external_port": 1108, @@ -101,6 +102,7 @@ class LauncherVisualization(BaseModel): + running: ClassVar[bool] module: str = ".".join(__name__.split(".")[:-1]) visualization: str launchers: Optional[ILauncher] = [] diff --git a/manager/manager/launcher/launcher_world.py b/manager/manager/launcher/launcher_world.py index 8e761cc..05c0350 100644 --- a/manager/manager/launcher/launcher_world.py +++ b/manager/manager/launcher/launcher_world.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, ClassVar from pydantic import BaseModel from src.manager.libs.process_utils import get_class, class_from_module, get_ros_version @@ -48,11 +48,27 @@ } ], }, - "physical": {}, + "physical": {"1": [ + { + "type": "module", + "module": "ros_api", + "parameters": [], + "launch_file": [], + } + ], + "2": [ + { + "type": "module", + "module": "ros2_api", + "parameters": [], + "launch_file": [], + } + ],}, } class LauncherWorld(BaseModel): + running: ClassVar[bool] world: str launch_file_path: str module: str = ".".join(__name__.split(".")[:-1]) From 6c4fd194471fa24d3a753d1ac6c39dbf410f3a6d Mon Sep 17 00:00:00 2001 From: MihirGore23 Date: Tue, 16 Jul 2024 11:40:10 +0530 Subject: [PATCH 2/6] Update --- manager/manager/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manager/manager/manager.py b/manager/manager/manager.py index 82f0f8e..e6e803c 100644 --- a/manager/manager/manager.py +++ b/manager/manager/manager.py @@ -424,7 +424,7 @@ def start(self): def signal_handler(sign, frame): print("\nprogram exiting gracefully") - self.running = False + self.running = True if self.gui_server is not None: try: self.gui_server.stop() From 1976e13981b66bb321ae9bd5e842f7afda022332 Mon Sep 17 00:00:00 2001 From: MihirGore23 Date: Mon, 22 Jul 2024 12:47:02 +0530 Subject: [PATCH 3/6] Update manager and launcher_visualisation --- manager/manager/launcher/launcher_visualization.py | 13 ++----------- manager/manager/manager.py | 2 +- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/manager/manager/launcher/launcher_visualization.py b/manager/manager/launcher/launcher_visualization.py index 8aef2ea..901eb47 100644 --- a/manager/manager/launcher/launcher_visualization.py +++ b/manager/manager/launcher/launcher_visualization.py @@ -57,7 +57,7 @@ "type": "module", "width": 1024, "height": 768, - "module": "gazebo_view", + "module": "robot_display_view", "display": ":2", "external_port": 6080, "internal_port": 5900, @@ -87,16 +87,7 @@ "display": ":1", "external_port": 1108, "internal_port": 5901, - }, - { - "type": "module", - "width": 1024, - "height": 768, - "module": "robot_display_view", - "display": ":2", - "external_port": 2303, - "internal_port": 5902, - }, + } ], } diff --git a/manager/manager/manager.py b/manager/manager/manager.py index e6e803c..02b4b09 100644 --- a/manager/manager/manager.py +++ b/manager/manager/manager.py @@ -220,7 +220,7 @@ def on_prepare_visualization(self, event): ) self.visualization_launcher.run() - if visualization_type == "gazebo_rae": + if visualization_type == "physic_rae" or visualization_type == "gazebo_rae": self.gui_server = Server(2303, self.update) self.gui_server.start() LogManager.logger.info("Visualization transition finished") From 574b2bc5de9ca7b9c0ec9c2a2ff0050038b545cf Mon Sep 17 00:00:00 2001 From: MihirGore23 Date: Tue, 30 Jul 2024 15:25:28 +0530 Subject: [PATCH 4/6] Update manager --- manager/manager/manager.py | 97 ++++++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/manager/manager/manager.py b/manager/manager/manager.py index 02b4b09..551fe66 100644 --- a/manager/manager/manager.py +++ b/manager/manager/manager.py @@ -8,6 +8,8 @@ import psutil import shutil import time +import base64 +import zipfile if "noetic" in str(subprocess.check_output(["bash", "-c", "echo $ROS_DISTRO"])): import rosservice @@ -32,7 +34,6 @@ from src.manager.libs.process_utils import stop_process_and_children from src.manager.manager.lint.linter import Lint - class Manager: states = [ "idle", @@ -172,9 +173,7 @@ def on_connect(self, event): "radi_version": subprocess.check_output( ["bash", "-c", "echo $IMAGE_TAG"] ), - "ros_version": subprocess.check_output( - ["bash", "-c", "echo $ROS_DISTRO"] - ), + "ros_version": self.ros_version, "gpu_avaliable": check_gpu_acceleration(), }, command="introspection", @@ -200,17 +199,33 @@ def on_launch_world(self, event): Note: The method logs the start of the launch transition and the configuration details for debugging and traceability. """ - try: config_dict = event.kwargs.get("data", {}) + try: + if (config_dict["type"] == "zip"): + return self.on_launch_world_zip(config_dict) + except Exception: + pass configuration = ConfigurationManager.validate(config_dict) except ValueError as e: - LogManager.logger.error(f"Configuration validotion failed: {e}") + LogManager.logger.error(f"Configuration validation failed: {e}") self.world_launcher = LauncherWorld(**configuration.model_dump()) self.world_launcher.run() LogManager.logger.info("Launch transition finished") + def on_launch_world_zip(self, data): + print("BT Studio application") + + # Unzip the app + if data["code"].startswith('data:'): + _, _, code = data["code"].partition('base64,') + with open('/workspace/worlds/universe.zip', 'wb') as result: + result.write(base64.b64decode(code)) + zip_ref = zipfile.ZipFile("/workspace/worlds/universe.zip", 'r') + zip_ref.extractall("/workspace/worlds") + zip_ref.close() + def on_prepare_visualization(self, event): LogManager.logger.info("Visualization transition started") @@ -220,7 +235,7 @@ def on_prepare_visualization(self, event): ) self.visualization_launcher.run() - if visualization_type == "physic_rae" or visualization_type == "gazebo_rae": + if visualization_type == "gazebo_rae" or visualization_type == "physic_rae": self.gui_server = Server(2303, self.update) self.gui_server.start() LogManager.logger.info("Visualization transition finished") @@ -257,9 +272,15 @@ def add_frequency_control(self, code): def on_run_application(self, event): - superthin = False + code_path = "/workspace/code/exercise.py" # Extract app config application_configuration = event.kwargs.get("data", {}) + try: + if (application_configuration["type"] == "bt-studio"): + return self.on_run_bt_studio_application(application_configuration) + except Exception: + pass + application_file_path = application_configuration["template"] exercise_id = application_configuration["exercise_id"] code = application_configuration["code"] @@ -271,14 +292,14 @@ def on_run_application(self, event): application_folder = application_file_path + "/ros2_humble/" if not os.path.isfile(application_folder + "exercise.py"): - superthin = True + code_path = "/workspace/code/academy.py" # Make code backwards compatible code = code.replace("from GUI import GUI","import GUI") code = code.replace("from HAL import HAL","import HAL") # Create executable app - errors = self.linter.evaluate_code(code, exercise_id) + errors = self.linter.evaluate_code(code, exercise_id, self.ros_version) if errors == "": code = self.add_frequency_control(code) @@ -287,29 +308,45 @@ def on_run_application(self, event): f.close() shutil.copytree(application_folder, "/workspace/code", dirs_exist_ok=True) - if superthin: - self.application_process = subprocess.Popen( - ["python3", "/workspace/code/academy.py"], - stdout=sys.stdout, - stderr=subprocess.STDOUT, - bufsize=1024, - universal_newlines=True, - ) - else: - self.application_process = subprocess.Popen( - ["python3", "/workspace/code/exercise.py"], - stdout=sys.stdout, - stderr=subprocess.STDOUT, - bufsize=1024, - universal_newlines=True, - ) + self.application_process = subprocess.Popen( + ["python3", code_path], + stdout=sys.stdout, + stderr=subprocess.STDOUT, + bufsize=1024, + universal_newlines=True, + ) self.unpause_sim() else: - print("errors") + with open('/dev/pts/1', 'w') as console: + console.write(errors + "\n\n") + raise Exception(errors) LogManager.logger.info("Run application transition finished") + def on_run_bt_studio_application(self, data): + print("BT Studio application") + + # Unzip the app + if data["code"].startswith('data:'): + _, _, code = data["code"].partition('base64,') + with open('/workspace/code/app.zip', 'wb') as result: + result.write(base64.b64decode(code)) + zip_ref = zipfile.ZipFile("/workspace/code/app.zip", 'r') + zip_ref.extractall("/workspace/code") + zip_ref.close() + + self.application_process = subprocess.Popen( + ["python3", "/workspace/code/execute_docker.py"], + stdout=sys.stdout, + stderr=subprocess.STDOUT, + bufsize=1024, + universal_newlines=True, + ) + self.unpause_sim() + + LogManager.logger.info("Run application transition finished") + def on_terminate_application(self, event): if self.application_process: @@ -363,7 +400,7 @@ def on_disconnect(self, event): python = sys.executable os.execl(python, python, *sys.argv) - def process_messsage(self, message): + def process_message(self, message): if message.command == "gui": self.gui_server.send(message.data) return @@ -424,7 +461,7 @@ def start(self): def signal_handler(sign, frame): print("\nprogram exiting gracefully") - self.running = True + self.running = False if self.gui_server is not None: try: self.gui_server.stop() @@ -467,7 +504,7 @@ def signal_handler(sign, frame): time.sleep(0.1) else: message = self.queue.get() - self.process_messsage(message) + self.process_message(message) except Exception as e: if message is not None: ex = ManagerConsumerMessageException(id=message.id, message=str(e)) From 89562a9358f148dd542d59d4ed9d1dc9290879e7 Mon Sep 17 00:00:00 2001 From: MihirGore23 Date: Tue, 30 Jul 2024 23:13:34 +0530 Subject: [PATCH 5/6] Update --- manager/manager/manager.py | 530 ------------------------------------- 1 file changed, 530 deletions(-) delete mode 100644 manager/manager/manager.py diff --git a/manager/manager/manager.py b/manager/manager/manager.py deleted file mode 100644 index 551fe66..0000000 --- a/manager/manager/manager.py +++ /dev/null @@ -1,530 +0,0 @@ -from __future__ import annotations - -import os -import signal -import subprocess -import sys -import re -import psutil -import shutil -import time -import base64 -import zipfile - -if "noetic" in str(subprocess.check_output(["bash", "-c", "echo $ROS_DISTRO"])): - import rosservice -import traceback -from queue import Queue -from uuid import uuid4 - - -from transitions import Machine - -from src.manager.comms.consumer_message import ManagerConsumerMessageException -from src.manager.comms.new_consumer import ManagerConsumer -from src.manager.libs.process_utils import check_gpu_acceleration, get_class_from_file -from src.manager.libs.launch_world_model import ConfigurationManager -from src.manager.manager.launcher.launcher_world import LauncherWorld -from src.manager.manager.launcher.launcher_visualization import LauncherVisualization -from src.manager.ram_logging.log_manager import LogManager -from src.manager.libs.applications.compatibility.server import Server -from src.manager.manager.application.robotics_python_application_interface import ( - IRoboticsPythonApplication, -) -from src.manager.libs.process_utils import stop_process_and_children -from src.manager.manager.lint.linter import Lint - -class Manager: - states = [ - "idle", - "connected", - "world_ready", - "visualization_ready", - "application_running", - "paused", - ] - - transitions = [ - # Transitions for state idle - { - "trigger": "connect", - "source": "idle", - "dest": "connected", - "before": "on_connect", - }, - # Transitions for state connected - { - "trigger": "launch_world", - "source": "connected", - "dest": "world_ready", - "before": "on_launch_world", - }, - # Transitions for state world ready - { - "trigger": "prepare_visualization", - "source": "world_ready", - "dest": "visualization_ready", - "before": "on_prepare_visualization", - }, - # Transitions for state visualization_ready - { - "trigger": "run_application", - "source": ["visualization_ready", "paused"], - "dest": "application_running", - "before": "on_run_application", - }, - # Transitions for state application_running - { - "trigger": "pause", - "source": "application_running", - "dest": "paused", - "before": "on_pause", - }, - { - "trigger": "resume", - "source": "paused", - "dest": "application_running", - "before": "on_resume", - }, - # Transitions for terminate levels - { - "trigger": "terminate_application", - "source": ["visualization_ready", "application_running", "paused"], - "dest": "visualization_ready", - "before": "on_terminate_application", - }, - { - "trigger": "terminate_visualization", - "source": "visualization_ready", - "dest": "world_ready", - "before": "on_terminate_visualization", - }, - { - "trigger": "terminate_universe", - "source": "world_ready", - "dest": "connected", - "before": "on_terminate_universe", - }, - # Global transitions - { - "trigger": "disconnect", - "source": "*", - "dest": "idle", - "before": "on_disconnect", - }, - ] - - def __init__(self, host: str, port: int): - - self.machine = Machine( - model=self, - states=Manager.states, - transitions=Manager.transitions, - initial="idle", - send_event=True, - after_state_change=self.state_change, - ) - self.ros_version = subprocess.check_output(["bash", "-c", "echo $ROS_DISTRO"]) - self.queue = Queue() - self.consumer = ManagerConsumer(host, port, self.queue) - self.world_launcher = None - self.visualization_launcher = None - self.application_process = None - self.running = True - self.gui_server = None - self.linter = Lint() - - # Creates workspace directories - worlds_dir = "/workspace/worlds" - code_dir = "/workspace/code" - binaries_dir = "/workspace/binaries" - if not os.path.isdir(worlds_dir): - os.makedirs(worlds_dir) - if not os.path.isdir(code_dir): - os.makedirs(code_dir) - if not os.path.isdir(binaries_dir): - os.makedirs(binaries_dir) - - def state_change(self, event): - LogManager.logger.info(f"State changed to {self.state}") - if self.consumer is not None: - self.consumer.send_message({"state": self.state}, command="state-changed") - - def update(self, data): - LogManager.logger.debug(f"Sending update to client") - if self.consumer is not None: - self.consumer.send_message({"update": data}, command="update") - - def on_connect(self, event): - """ - This method is triggered when the application transitions to the 'connected' state. - It sends an introspection message to a consumer with key information. - - Parameters: - event (Event): The event object containing data related to the 'connect' event. - - The message sent to the consumer includes: - - `radi_version`: The current RADI (Robotics Application Docker Image) version. - - `ros_version`: The current ROS (Robot Operating System) distribution version. - - `gpu_avaliable`: Boolean indicating whether GPU acceleration is available. - """ - self.consumer.send_message( - { - "radi_version": subprocess.check_output( - ["bash", "-c", "echo $IMAGE_TAG"] - ), - "ros_version": self.ros_version, - "gpu_avaliable": check_gpu_acceleration(), - }, - command="introspection", - ) - - def on_launch_world(self, event): - """ - Handles the 'launch' event, transitioning the application from 'connected' to 'ready' state. - This method initializes the launch process based on the provided configuration. - - During the launch process, it validates and processes the configuration data received from the event. - It then creates and starts a LauncherWorld instance with the validated configuration. - This setup is crucial for preparing the environment and resources necessary for the application's execution. - - Parameters: - event (Event): The event object containing data related to the 'launch' event. - This data includes configuration information necessary for initializing the launch process. - - Raises: - ValueError: If the configuration data is invalid or incomplete, a ValueError is raised, - indicating the issue with the provided configuration. - - Note: - The method logs the start of the launch transition and the configuration details for debugging and traceability. - """ - try: - config_dict = event.kwargs.get("data", {}) - try: - if (config_dict["type"] == "zip"): - return self.on_launch_world_zip(config_dict) - except Exception: - pass - configuration = ConfigurationManager.validate(config_dict) - except ValueError as e: - LogManager.logger.error(f"Configuration validation failed: {e}") - - self.world_launcher = LauncherWorld(**configuration.model_dump()) - self.world_launcher.run() - LogManager.logger.info("Launch transition finished") - - def on_launch_world_zip(self, data): - print("BT Studio application") - - # Unzip the app - if data["code"].startswith('data:'): - _, _, code = data["code"].partition('base64,') - with open('/workspace/worlds/universe.zip', 'wb') as result: - result.write(base64.b64decode(code)) - zip_ref = zipfile.ZipFile("/workspace/worlds/universe.zip", 'r') - zip_ref.extractall("/workspace/worlds") - zip_ref.close() - - def on_prepare_visualization(self, event): - LogManager.logger.info("Visualization transition started") - - visualization_type = event.kwargs.get("data", {}) - self.visualization_launcher = LauncherVisualization( - visualization=visualization_type - ) - self.visualization_launcher.run() - - if visualization_type == "gazebo_rae" or visualization_type == "physic_rae": - self.gui_server = Server(2303, self.update) - self.gui_server.start() - LogManager.logger.info("Visualization transition finished") - - def add_frequency_control(self, code): - frequency_control_code_imports = """ -import time -from datetime import datetime -ideal_cycle = 20 -""" - code = frequency_control_code_imports + code - infinite_loop = re.search( - r"[^ ]while\s*\(\s*True\s*\)\s*:|[^ ]while\s*True\s*:|[^ ]while\s*1\s*:|[^ ]while\s*\(\s*1\s*\)\s*:", - code, - ) - frequency_control_code_pre = """ - start_time = datetime.now() - """ - code = ( - code[: infinite_loop.end()] - + frequency_control_code_pre - + code[infinite_loop.end() :] - ) - frequency_control_code_post = """ - finish_time = datetime.now() - dt = finish_time - start_time - ms = (dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0 - - if (ms < ideal_cycle): - time.sleep((ideal_cycle - ms) / 1000.0) -""" - code = code + frequency_control_code_post - return code - - def on_run_application(self, event): - - code_path = "/workspace/code/exercise.py" - # Extract app config - application_configuration = event.kwargs.get("data", {}) - try: - if (application_configuration["type"] == "bt-studio"): - return self.on_run_bt_studio_application(application_configuration) - except Exception: - pass - - application_file_path = application_configuration["template"] - exercise_id = application_configuration["exercise_id"] - code = application_configuration["code"] - - # Template version - if "noetic" in str(self.ros_version): - application_folder = application_file_path + "/ros1_noetic/" - else: - application_folder = application_file_path + "/ros2_humble/" - - if not os.path.isfile(application_folder + "exercise.py"): - code_path = "/workspace/code/academy.py" - - # Make code backwards compatible - code = code.replace("from GUI import GUI","import GUI") - code = code.replace("from HAL import HAL","import HAL") - - # Create executable app - errors = self.linter.evaluate_code(code, exercise_id, self.ros_version) - if errors == "": - - code = self.add_frequency_control(code) - f = open("/workspace/code/academy.py", "w") - f.write(code) - f.close() - - shutil.copytree(application_folder, "/workspace/code", dirs_exist_ok=True) - self.application_process = subprocess.Popen( - ["python3", code_path], - stdout=sys.stdout, - stderr=subprocess.STDOUT, - bufsize=1024, - universal_newlines=True, - ) - self.unpause_sim() - else: - with open('/dev/pts/1', 'w') as console: - console.write(errors + "\n\n") - - raise Exception(errors) - - LogManager.logger.info("Run application transition finished") - - def on_run_bt_studio_application(self, data): - print("BT Studio application") - - # Unzip the app - if data["code"].startswith('data:'): - _, _, code = data["code"].partition('base64,') - with open('/workspace/code/app.zip', 'wb') as result: - result.write(base64.b64decode(code)) - zip_ref = zipfile.ZipFile("/workspace/code/app.zip", 'r') - zip_ref.extractall("/workspace/code") - zip_ref.close() - - self.application_process = subprocess.Popen( - ["python3", "/workspace/code/execute_docker.py"], - stdout=sys.stdout, - stderr=subprocess.STDOUT, - bufsize=1024, - universal_newlines=True, - ) - self.unpause_sim() - - LogManager.logger.info("Run application transition finished") - - def on_terminate_application(self, event): - - if self.application_process: - try: - stop_process_and_children(self.application_process) - self.application_process = None - self.pause_sim() - self.reset_sim() - except Exception: - LogManager.logger.exception("No application running") - print(traceback.format_exc()) - - def on_terminate_visualization(self, event): - - self.visualization_launcher.terminate() - self.gui_server.stop() - self.gui_server = None - - def on_terminate_universe(self, event): - - self.world_launcher.terminate() - - def on_disconnect(self, event): - try: - self.consumer.stop() - except Exception as e: - LogManager.logger.exception("Exception stopping consumer") - - if self.application_process: - try: - stop_process_and_children(self.application_process) - self.application_process = None - except Exception as e: - LogManager.logger.exception("Exception stopping application process") - - if self.visualization_launcher: - try: - self.visualization_launcher.terminate() - except Exception as e: - LogManager.logger.exception( - "Exception terminating visualization launcher" - ) - - if self.world_launcher: - try: - self.world_launcher.terminate() - except Exception as e: - LogManager.logger.exception("Exception terminating world launcher") - - # Reiniciar el script - python = sys.executable - os.execl(python, python, *sys.argv) - - def process_message(self, message): - if message.command == "gui": - self.gui_server.send(message.data) - return - - self.trigger(message.command, data=message.data or None) - response = {"message": f"Exercise state changed to {self.state}"} - self.consumer.send_message(message.response(response)) - - def on_pause(self, msg): - proc = psutil.Process(self.application_process.pid) - proc.suspend() - self.pause_sim() - - def on_resume(self, msg): - proc = psutil.Process(self.application_process.pid) - proc.resume() - self.unpause_sim() - - def pause_sim(self): - if "noetic" in str(self.ros_version): - rosservice.call_service("/gazebo/pause_physics", []) - else: - self.call_service("/pause_physics", "std_srvs/srv/Empty") - - def unpause_sim(self): - if "noetic" in str(self.ros_version): - rosservice.call_service("/gazebo/unpause_physics", []) - else: - self.call_service("/unpause_physics", "std_srvs/srv/Empty") - - def reset_sim(self): - if "noetic" in str(self.ros_version): - rosservice.call_service("/gazebo/reset_world", []) - else: - self.call_service("/reset_world", "std_srvs/srv/Empty") - - def call_service(self, service, service_type): - command = f"ros2 service call {service} {service_type}" - subprocess.call( - f"{command}", - shell=True, - stdout=sys.stdout, - stderr=subprocess.STDOUT, - bufsize=1024, - universal_newlines=True, - ) - - def start(self): - """ - Starts the RAM - RAM must be run in main thread to be able to handle signaling other processes, for instance ROS launcher. - """ - LogManager.logger.info( - f"Starting RAM consumer in {self.consumer.server}:{self.consumer.port}" - ) - - self.consumer.start() - - def signal_handler(sign, frame): - print("\nprogram exiting gracefully") - self.running = False - if self.gui_server is not None: - try: - self.gui_server.stop() - except Exception as e: - LogManager.logger.exception("Exception stopping GUI server") - try: - self.consumer.stop() - except Exception as e: - LogManager.logger.exception("Exception stopping consumer") - - if self.application_process: - try: - stop_process_and_children(self.application_process) - self.application_process = None - except Exception as e: - LogManager.logger.exception( - "Exception stopping application process" - ) - - if self.visualization_launcher: - try: - self.visualization_launcher.terminate() - except Exception as e: - LogManager.logger.exception( - "Exception terminating visualization launcher" - ) - - if self.world_launcher: - try: - self.world_launcher.terminate() - except Exception as e: - LogManager.logger.exception("Exception terminating world launcher") - - signal.signal(signal.SIGINT, signal_handler) - - while self.running: - message = None - try: - if self.queue.empty(): - time.sleep(0.1) - else: - message = self.queue.get() - self.process_message(message) - except Exception as e: - if message is not None: - ex = ManagerConsumerMessageException(id=message.id, message=str(e)) - else: - ex = ManagerConsumerMessageException( - id=str(uuid4()), message=str(e) - ) - self.consumer.send_message(ex) - LogManager.logger.error(e, exc_info=True) - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser() - parser.add_argument( - "host", type=str, help="Host to listen to (0.0.0.0 or all hosts)" - ) - parser.add_argument("port", type=int, help="Port to listen to") - args = parser.parse_args() - - RAM = Manager(args.host, args.port) - RAM.start() From 9489f9438cd4ca44a2e93c0ef8578ad5afa2704c Mon Sep 17 00:00:00 2001 From: MihirGore23 Date: Tue, 30 Jul 2024 23:13:58 +0530 Subject: [PATCH 6/6] Update --- manager/manager/manager.py | 530 +++++++++++++++++++++++++++++++++++++ 1 file changed, 530 insertions(+) create mode 100644 manager/manager/manager.py diff --git a/manager/manager/manager.py b/manager/manager/manager.py new file mode 100644 index 0000000..551fe66 --- /dev/null +++ b/manager/manager/manager.py @@ -0,0 +1,530 @@ +from __future__ import annotations + +import os +import signal +import subprocess +import sys +import re +import psutil +import shutil +import time +import base64 +import zipfile + +if "noetic" in str(subprocess.check_output(["bash", "-c", "echo $ROS_DISTRO"])): + import rosservice +import traceback +from queue import Queue +from uuid import uuid4 + + +from transitions import Machine + +from src.manager.comms.consumer_message import ManagerConsumerMessageException +from src.manager.comms.new_consumer import ManagerConsumer +from src.manager.libs.process_utils import check_gpu_acceleration, get_class_from_file +from src.manager.libs.launch_world_model import ConfigurationManager +from src.manager.manager.launcher.launcher_world import LauncherWorld +from src.manager.manager.launcher.launcher_visualization import LauncherVisualization +from src.manager.ram_logging.log_manager import LogManager +from src.manager.libs.applications.compatibility.server import Server +from src.manager.manager.application.robotics_python_application_interface import ( + IRoboticsPythonApplication, +) +from src.manager.libs.process_utils import stop_process_and_children +from src.manager.manager.lint.linter import Lint + +class Manager: + states = [ + "idle", + "connected", + "world_ready", + "visualization_ready", + "application_running", + "paused", + ] + + transitions = [ + # Transitions for state idle + { + "trigger": "connect", + "source": "idle", + "dest": "connected", + "before": "on_connect", + }, + # Transitions for state connected + { + "trigger": "launch_world", + "source": "connected", + "dest": "world_ready", + "before": "on_launch_world", + }, + # Transitions for state world ready + { + "trigger": "prepare_visualization", + "source": "world_ready", + "dest": "visualization_ready", + "before": "on_prepare_visualization", + }, + # Transitions for state visualization_ready + { + "trigger": "run_application", + "source": ["visualization_ready", "paused"], + "dest": "application_running", + "before": "on_run_application", + }, + # Transitions for state application_running + { + "trigger": "pause", + "source": "application_running", + "dest": "paused", + "before": "on_pause", + }, + { + "trigger": "resume", + "source": "paused", + "dest": "application_running", + "before": "on_resume", + }, + # Transitions for terminate levels + { + "trigger": "terminate_application", + "source": ["visualization_ready", "application_running", "paused"], + "dest": "visualization_ready", + "before": "on_terminate_application", + }, + { + "trigger": "terminate_visualization", + "source": "visualization_ready", + "dest": "world_ready", + "before": "on_terminate_visualization", + }, + { + "trigger": "terminate_universe", + "source": "world_ready", + "dest": "connected", + "before": "on_terminate_universe", + }, + # Global transitions + { + "trigger": "disconnect", + "source": "*", + "dest": "idle", + "before": "on_disconnect", + }, + ] + + def __init__(self, host: str, port: int): + + self.machine = Machine( + model=self, + states=Manager.states, + transitions=Manager.transitions, + initial="idle", + send_event=True, + after_state_change=self.state_change, + ) + self.ros_version = subprocess.check_output(["bash", "-c", "echo $ROS_DISTRO"]) + self.queue = Queue() + self.consumer = ManagerConsumer(host, port, self.queue) + self.world_launcher = None + self.visualization_launcher = None + self.application_process = None + self.running = True + self.gui_server = None + self.linter = Lint() + + # Creates workspace directories + worlds_dir = "/workspace/worlds" + code_dir = "/workspace/code" + binaries_dir = "/workspace/binaries" + if not os.path.isdir(worlds_dir): + os.makedirs(worlds_dir) + if not os.path.isdir(code_dir): + os.makedirs(code_dir) + if not os.path.isdir(binaries_dir): + os.makedirs(binaries_dir) + + def state_change(self, event): + LogManager.logger.info(f"State changed to {self.state}") + if self.consumer is not None: + self.consumer.send_message({"state": self.state}, command="state-changed") + + def update(self, data): + LogManager.logger.debug(f"Sending update to client") + if self.consumer is not None: + self.consumer.send_message({"update": data}, command="update") + + def on_connect(self, event): + """ + This method is triggered when the application transitions to the 'connected' state. + It sends an introspection message to a consumer with key information. + + Parameters: + event (Event): The event object containing data related to the 'connect' event. + + The message sent to the consumer includes: + - `radi_version`: The current RADI (Robotics Application Docker Image) version. + - `ros_version`: The current ROS (Robot Operating System) distribution version. + - `gpu_avaliable`: Boolean indicating whether GPU acceleration is available. + """ + self.consumer.send_message( + { + "radi_version": subprocess.check_output( + ["bash", "-c", "echo $IMAGE_TAG"] + ), + "ros_version": self.ros_version, + "gpu_avaliable": check_gpu_acceleration(), + }, + command="introspection", + ) + + def on_launch_world(self, event): + """ + Handles the 'launch' event, transitioning the application from 'connected' to 'ready' state. + This method initializes the launch process based on the provided configuration. + + During the launch process, it validates and processes the configuration data received from the event. + It then creates and starts a LauncherWorld instance with the validated configuration. + This setup is crucial for preparing the environment and resources necessary for the application's execution. + + Parameters: + event (Event): The event object containing data related to the 'launch' event. + This data includes configuration information necessary for initializing the launch process. + + Raises: + ValueError: If the configuration data is invalid or incomplete, a ValueError is raised, + indicating the issue with the provided configuration. + + Note: + The method logs the start of the launch transition and the configuration details for debugging and traceability. + """ + try: + config_dict = event.kwargs.get("data", {}) + try: + if (config_dict["type"] == "zip"): + return self.on_launch_world_zip(config_dict) + except Exception: + pass + configuration = ConfigurationManager.validate(config_dict) + except ValueError as e: + LogManager.logger.error(f"Configuration validation failed: {e}") + + self.world_launcher = LauncherWorld(**configuration.model_dump()) + self.world_launcher.run() + LogManager.logger.info("Launch transition finished") + + def on_launch_world_zip(self, data): + print("BT Studio application") + + # Unzip the app + if data["code"].startswith('data:'): + _, _, code = data["code"].partition('base64,') + with open('/workspace/worlds/universe.zip', 'wb') as result: + result.write(base64.b64decode(code)) + zip_ref = zipfile.ZipFile("/workspace/worlds/universe.zip", 'r') + zip_ref.extractall("/workspace/worlds") + zip_ref.close() + + def on_prepare_visualization(self, event): + LogManager.logger.info("Visualization transition started") + + visualization_type = event.kwargs.get("data", {}) + self.visualization_launcher = LauncherVisualization( + visualization=visualization_type + ) + self.visualization_launcher.run() + + if visualization_type == "gazebo_rae" or visualization_type == "physic_rae": + self.gui_server = Server(2303, self.update) + self.gui_server.start() + LogManager.logger.info("Visualization transition finished") + + def add_frequency_control(self, code): + frequency_control_code_imports = """ +import time +from datetime import datetime +ideal_cycle = 20 +""" + code = frequency_control_code_imports + code + infinite_loop = re.search( + r"[^ ]while\s*\(\s*True\s*\)\s*:|[^ ]while\s*True\s*:|[^ ]while\s*1\s*:|[^ ]while\s*\(\s*1\s*\)\s*:", + code, + ) + frequency_control_code_pre = """ + start_time = datetime.now() + """ + code = ( + code[: infinite_loop.end()] + + frequency_control_code_pre + + code[infinite_loop.end() :] + ) + frequency_control_code_post = """ + finish_time = datetime.now() + dt = finish_time - start_time + ms = (dt.days * 24 * 60 * 60 + dt.seconds) * 1000 + dt.microseconds / 1000.0 + + if (ms < ideal_cycle): + time.sleep((ideal_cycle - ms) / 1000.0) +""" + code = code + frequency_control_code_post + return code + + def on_run_application(self, event): + + code_path = "/workspace/code/exercise.py" + # Extract app config + application_configuration = event.kwargs.get("data", {}) + try: + if (application_configuration["type"] == "bt-studio"): + return self.on_run_bt_studio_application(application_configuration) + except Exception: + pass + + application_file_path = application_configuration["template"] + exercise_id = application_configuration["exercise_id"] + code = application_configuration["code"] + + # Template version + if "noetic" in str(self.ros_version): + application_folder = application_file_path + "/ros1_noetic/" + else: + application_folder = application_file_path + "/ros2_humble/" + + if not os.path.isfile(application_folder + "exercise.py"): + code_path = "/workspace/code/academy.py" + + # Make code backwards compatible + code = code.replace("from GUI import GUI","import GUI") + code = code.replace("from HAL import HAL","import HAL") + + # Create executable app + errors = self.linter.evaluate_code(code, exercise_id, self.ros_version) + if errors == "": + + code = self.add_frequency_control(code) + f = open("/workspace/code/academy.py", "w") + f.write(code) + f.close() + + shutil.copytree(application_folder, "/workspace/code", dirs_exist_ok=True) + self.application_process = subprocess.Popen( + ["python3", code_path], + stdout=sys.stdout, + stderr=subprocess.STDOUT, + bufsize=1024, + universal_newlines=True, + ) + self.unpause_sim() + else: + with open('/dev/pts/1', 'w') as console: + console.write(errors + "\n\n") + + raise Exception(errors) + + LogManager.logger.info("Run application transition finished") + + def on_run_bt_studio_application(self, data): + print("BT Studio application") + + # Unzip the app + if data["code"].startswith('data:'): + _, _, code = data["code"].partition('base64,') + with open('/workspace/code/app.zip', 'wb') as result: + result.write(base64.b64decode(code)) + zip_ref = zipfile.ZipFile("/workspace/code/app.zip", 'r') + zip_ref.extractall("/workspace/code") + zip_ref.close() + + self.application_process = subprocess.Popen( + ["python3", "/workspace/code/execute_docker.py"], + stdout=sys.stdout, + stderr=subprocess.STDOUT, + bufsize=1024, + universal_newlines=True, + ) + self.unpause_sim() + + LogManager.logger.info("Run application transition finished") + + def on_terminate_application(self, event): + + if self.application_process: + try: + stop_process_and_children(self.application_process) + self.application_process = None + self.pause_sim() + self.reset_sim() + except Exception: + LogManager.logger.exception("No application running") + print(traceback.format_exc()) + + def on_terminate_visualization(self, event): + + self.visualization_launcher.terminate() + self.gui_server.stop() + self.gui_server = None + + def on_terminate_universe(self, event): + + self.world_launcher.terminate() + + def on_disconnect(self, event): + try: + self.consumer.stop() + except Exception as e: + LogManager.logger.exception("Exception stopping consumer") + + if self.application_process: + try: + stop_process_and_children(self.application_process) + self.application_process = None + except Exception as e: + LogManager.logger.exception("Exception stopping application process") + + if self.visualization_launcher: + try: + self.visualization_launcher.terminate() + except Exception as e: + LogManager.logger.exception( + "Exception terminating visualization launcher" + ) + + if self.world_launcher: + try: + self.world_launcher.terminate() + except Exception as e: + LogManager.logger.exception("Exception terminating world launcher") + + # Reiniciar el script + python = sys.executable + os.execl(python, python, *sys.argv) + + def process_message(self, message): + if message.command == "gui": + self.gui_server.send(message.data) + return + + self.trigger(message.command, data=message.data or None) + response = {"message": f"Exercise state changed to {self.state}"} + self.consumer.send_message(message.response(response)) + + def on_pause(self, msg): + proc = psutil.Process(self.application_process.pid) + proc.suspend() + self.pause_sim() + + def on_resume(self, msg): + proc = psutil.Process(self.application_process.pid) + proc.resume() + self.unpause_sim() + + def pause_sim(self): + if "noetic" in str(self.ros_version): + rosservice.call_service("/gazebo/pause_physics", []) + else: + self.call_service("/pause_physics", "std_srvs/srv/Empty") + + def unpause_sim(self): + if "noetic" in str(self.ros_version): + rosservice.call_service("/gazebo/unpause_physics", []) + else: + self.call_service("/unpause_physics", "std_srvs/srv/Empty") + + def reset_sim(self): + if "noetic" in str(self.ros_version): + rosservice.call_service("/gazebo/reset_world", []) + else: + self.call_service("/reset_world", "std_srvs/srv/Empty") + + def call_service(self, service, service_type): + command = f"ros2 service call {service} {service_type}" + subprocess.call( + f"{command}", + shell=True, + stdout=sys.stdout, + stderr=subprocess.STDOUT, + bufsize=1024, + universal_newlines=True, + ) + + def start(self): + """ + Starts the RAM + RAM must be run in main thread to be able to handle signaling other processes, for instance ROS launcher. + """ + LogManager.logger.info( + f"Starting RAM consumer in {self.consumer.server}:{self.consumer.port}" + ) + + self.consumer.start() + + def signal_handler(sign, frame): + print("\nprogram exiting gracefully") + self.running = False + if self.gui_server is not None: + try: + self.gui_server.stop() + except Exception as e: + LogManager.logger.exception("Exception stopping GUI server") + try: + self.consumer.stop() + except Exception as e: + LogManager.logger.exception("Exception stopping consumer") + + if self.application_process: + try: + stop_process_and_children(self.application_process) + self.application_process = None + except Exception as e: + LogManager.logger.exception( + "Exception stopping application process" + ) + + if self.visualization_launcher: + try: + self.visualization_launcher.terminate() + except Exception as e: + LogManager.logger.exception( + "Exception terminating visualization launcher" + ) + + if self.world_launcher: + try: + self.world_launcher.terminate() + except Exception as e: + LogManager.logger.exception("Exception terminating world launcher") + + signal.signal(signal.SIGINT, signal_handler) + + while self.running: + message = None + try: + if self.queue.empty(): + time.sleep(0.1) + else: + message = self.queue.get() + self.process_message(message) + except Exception as e: + if message is not None: + ex = ManagerConsumerMessageException(id=message.id, message=str(e)) + else: + ex = ManagerConsumerMessageException( + id=str(uuid4()), message=str(e) + ) + self.consumer.send_message(ex) + LogManager.logger.error(e, exc_info=True) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "host", type=str, help="Host to listen to (0.0.0.0 or all hosts)" + ) + parser.add_argument("port", type=int, help="Port to listen to") + args = parser.parse_args() + + RAM = Manager(args.host, args.port) + RAM.start()