From 3cd89c8566a34f72524de86ce18e397e898cf60d Mon Sep 17 00:00:00 2001 From: Raffaello Bonghi Date: Wed, 22 Jan 2025 12:47:07 +0000 Subject: [PATCH] Major improvements using inquirer and rearrange --- requirements.txt | 1 + src/nanosaur/main.py | 175 ++++------------- src/nanosaur/robot.py | 338 ++++++++++++++++++--------------- src/nanosaur/ros.py | 306 ++++++++++++++++++++++++++++++ src/nanosaur/simulation.py | 73 +++---- src/nanosaur/swarm.py | 97 ++++++++-- src/nanosaur/utilities.py | 5 +- src/nanosaur/workspace.py | 379 ++++++++----------------------------- 8 files changed, 728 insertions(+), 646 deletions(-) create mode 100644 src/nanosaur/ros.py diff --git a/requirements.txt b/requirements.txt index 4afee62..50718a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +inquirer argcomplete pyyaml pexpect diff --git a/src/nanosaur/main.py b/src/nanosaur/main.py index 6461da9..2c93375 100644 --- a/src/nanosaur/main.py +++ b/src/nanosaur/main.py @@ -32,20 +32,22 @@ from nanosaur import __version__ from nanosaur.utilities import Params, get_nanosaur_home from nanosaur import workspace -from nanosaur import simulation -from nanosaur import robot -from nanosaur import swarm +from nanosaur.robot import parser_robot_menu +from nanosaur.simulation import parser_simulation_menu +from nanosaur.swarm import parser_swarm_menu from nanosaur.prompt_colors import TerminalFormatter -from nanosaur.utilities import CAMERA_CHOICES, LIDAR_CHOICES +from nanosaur.utilities import RobotList +import inquirer NANOSAUR_CONFIG_FILE_NAME = 'nanosaur.yaml' -NANOSAUR_HOME_NAME = 'nanosaur' +NANOSAUR_HOME_NAME = 'nanosaur_test' # Define default parameters DEFAULT_PARAMS = { 'nanosaur_branch': 'nanosaur2', 'nanosaur_home': NANOSAUR_HOME_NAME, 'nanosaur_raw_github_repo': 'https://raw.githubusercontent.com/rnanosaur/nanosaur', + 'ws_developer_name': 'ros_ws', 'ws_perception_name': 'perception_ws', 'ws_robot_name': 'robot_ws', 'ws_simulation_name': 'simulation_ws', @@ -59,9 +61,9 @@ def info(platform, params: Params, args): print(TerminalFormatter.color_text(f"Mode: {params['mode']}", bg_color='red', bold=True)) print() - robot_list = robot.RobotList.load(params) + robot_list = RobotList.load(params) # Print current robot configuration - robot_data = robot.RobotList.get_robot(params) + robot_data = RobotList.get_robot(params) robot_data.verbose() # Print other robots if they exist if len(robot_list.robots) > 1 or args.verbose: @@ -96,139 +98,32 @@ def info(platform, params: Params, args): print(f" {TerminalFormatter.color_text('Nanosaur config file:', bold=True)} {config_file_path}") -def install(platform, params: Params, args, password=None): +def install_old(platform, params: Params, args, password=None): if args.developer: workspace.create_developer_workspace(platform, params, args) else: print(TerminalFormatter.color_text("Not implemented yet", color='red')) -def parser_workspace_menu(subparsers: argparse._SubParsersAction) -> argparse.ArgumentParser: - parser_workspace = subparsers.add_parser( - 'workspace', aliases=["ws"], help="Manage the Nanosaur workspace") - workspace_subparsers = parser_workspace.add_subparsers( - dest='workspace_type', help="Workspace types") - # Add workspace clean subcommand - - def add_workspace_subcommand(name, help_text, func): - parser = workspace_subparsers.add_parser(name, help=help_text) - parser.add_argument('workspace', type=str, nargs='?', help="Specify the workspace to clean") - parser.add_argument('--force', action='store_true', help="Force the workspace clean") - parser.add_argument('--all-platforms', '--all', action='store_true', help="Clean all workspaces") - parser.set_defaults(func=func) - return parser - # Add workspace clean subcommand - add_workspace_subcommand('clean', "Clean the workspace", workspace.clean) - add_workspace_subcommand('update', "Update the workspace", workspace.update) - add_workspace_subcommand('deploy', "Deploy workspace to docker image", workspace.deploy) - # Add workspace perception subcommand - parser_workspace_perception = workspace_subparsers.add_parser( - 'perception', help="Start the Isaac ROS docker container") - parser_workspace_perception.set_defaults(func=workspace.run_dev_script) - return parser_workspace - - -def parser_simulation_menu(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: - # Get the simulation tool from the parameters - simulation_type = params.get('simulation_tool', "NOT SELECTED") - # Add simulation subcommand - parser_simulation = subparsers.add_parser( - 'simulation', aliases=["sim"], help=f"Work with simulation tools [{simulation_type}]") - simulation_subparsers = parser_simulation.add_subparsers( - dest='simulation_type', help="Simulation types") - - # Add simulation start subcommand - parser_simulation_start = simulation_subparsers.add_parser( - 'start', help="Start the selected simulation") - parser_simulation_start.set_defaults(func=simulation.simulation_start) - - # Add simulation set subcommand - parser_simulation_set = simulation_subparsers.add_parser( - 'set', help="Select the simulator you want to use") - parser_simulation_set.set_defaults(func=simulation.simulation_set) - return parser_simulation - - -def add_robot_config_subcommands(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: - robot_data = robot.RobotList.get_robot(params) - parser_robot_config = subparsers.add_parser('config', help=f"Configure the robot settings [{robot_data.name}]") - config_subparsers = parser_robot_config.add_subparsers(dest='config_type', help="Configuration options") - # Add robot name subcommand - parser_robot_name = config_subparsers.add_parser('name', help=f"Change the robot name [{robot_data.name}]") - parser_robot_name.add_argument('name', type=str, nargs='?', help="New name for the robot (default: nanosaur)") - parser_robot_name.set_defaults(func=robot.robot_set_name) - # Add robot domain id subcommand - parser_robot_domain_id = config_subparsers.add_parser('domain_id', help=f"Change the robot domain ID [{robot_data.domain_id}]") - parser_robot_domain_id.add_argument('domain_id', type=int, nargs='?', help="New domain ID for the robot (default: 0)") - parser_robot_domain_id.set_defaults(func=robot.robot_set_domain_id) - # Add robot camera subcommand - parser_robot_camera = config_subparsers.add_parser('camera', help=f"Change the robot camera type [{robot_data.camera_type}]") - parser_robot_camera.add_argument('--new-camera', type=str, help=f"Specify the new camera type (options: {', '.join(CAMERA_CHOICES)})") - parser_robot_camera.set_defaults(func=robot.robot_set_camera) - # Add robot lidar subcommand - parser_robot_lidar = config_subparsers.add_parser('lidar', help=f"Change the robot lidar type [{robot_data.lidar_type}]") - parser_robot_lidar.add_argument('--new-lidar', type=str, choices=LIDAR_CHOICES, help=f"Specify the new lidar type (options: {', '.join(LIDAR_CHOICES)})") - parser_robot_lidar.set_defaults(func=robot.robot_set_lidar) - # Add robot engines subcommand - parser_robot_engines = config_subparsers.add_parser('engines', help=f"Configure the robot engines [{', '.join(robot_data.engines)}]") - parser_robot_engines.add_argument('--new-engine', type=str, help="Specify the new engine configuration") - parser_robot_engines.set_defaults(func=robot.robot_configure_engines) - # Add robot reset subcommand - parser_robot_reset = config_subparsers.add_parser('reset', help="Restore the robot configuration to default") - parser_robot_reset.set_defaults(func=robot.robot_reset) - - return parser_robot_config - - -def parser_robot_menu(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: - robot_data = robot.RobotList.get_robot(params) - parser_robot = subparsers.add_parser('robot', help=f"Manage the Nanosaur robot [{robot_data.name}]") - robot_subparsers = parser_robot.add_subparsers(dest='robot_type', help="Robot operations") - # Add robot display subcommand - parser_robot_display = robot_subparsers.add_parser('display', help="Show the robot") - parser_robot_display.set_defaults(func=robot.robot_display) - # Add robot drive subcommand - parser_robot_drive = robot_subparsers.add_parser('drive', help="Control the robot") - parser_robot_drive.set_defaults(func=robot.control_keyboard) - # Add robot start subcommand - parser_robot_start = robot_subparsers.add_parser('start', help="Activate the robot") - parser_robot_start.add_argument( - '--container', action='store_true', help="Run within a container") - parser_robot_start.add_argument( - '--build', action='store_true', help="Rebuild docker before starting") - parser_robot_start.set_defaults(func=robot.robot_start) - # Add robot stop subcommand - parser_robot_stop = robot_subparsers.add_parser('stop', help="Deactivate the robot") - parser_robot_stop.set_defaults(func=robot.robot_stop) - - # Add robot config subcommand - parser_config = add_robot_config_subcommands(robot_subparsers, params) - - return parser_robot, parser_config - - -def parser_swarm_menu(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: - # Get the robot index from the parameters - idx_swarm = params.get('robot_idx', 0) - # Subcommand: swarm (with a sub-menu for swarm operations) - parser_swarm = subparsers.add_parser('swarm', help="Manage swarm Nanosaur robots") - swarm_subparsers = parser_swarm.add_subparsers(dest='swarm_type', help="Robot operations") - # Add robot status subcommand - parser_robot_new = swarm_subparsers.add_parser('new', help="Get a new robot to control") - parser_robot_new.add_argument('name', type=str, help="New robot name") - parser_robot_new.set_defaults(func=swarm.robot_new) - # Add robot set subcommand - parser_robot_set = swarm_subparsers.add_parser('set', help=f"Set which robot to control [{idx_swarm}]") - parser_robot_set.add_argument('robot_name', type=str, nargs='?', help="Name of the robot to control") - parser_robot_set.set_defaults(func=swarm.robot_idx_set) - # Add robot remove subcommand - parser_robot_remove = swarm_subparsers.add_parser('remove', help="Remove a robot from the swarm") - parser_robot_remove.add_argument('robot_name', type=str, nargs='?', help="Name of the robot to remove") - parser_robot_remove.set_defaults(func=swarm.robot_remove) - # Add robot list subcommand - parser_robot_list = swarm_subparsers.add_parser('list', help="List all robots in the swarm") - parser_robot_list.set_defaults(func=swarm.robot_list) - return parser_swarm + +def install(platform, params: Params, args, password=None): + questions = [ + inquirer.List( + 'choice', + message="What would you like to install?", + choices=['Developer Workspace', 'Simulation Tools', 'Robot Configuration'], + ), + ] + + answers = inquirer.prompt(questions) + + if answers['choice'] == 'Developer Workspace': + workspace.create_developer_workspace(platform, params, args) + elif answers['choice'] == 'Simulation Tools': + print(TerminalFormatter.color_text("Simulation Tools installation is not implemented yet", color='red')) + elif answers['choice'] == 'Robot Configuration': + print(TerminalFormatter.color_text("Robot Configuration installation is not implemented yet", color='red')) + def main(): @@ -260,10 +155,10 @@ def main(): parser_info.set_defaults(func=info) # Subcommand: install (hidden if workspace already exists) - if 'mode' in params and params['mode'] == 'developer': - parser_install = subparsers.add_parser('install', help="Install the Nanosaur workspace") - else: - parser_install = subparsers.add_parser('install') + #if 'mode' in params and params['mode'] in ['developer', 'maintainer']: + parser_install = subparsers.add_parser('install', help=f"Install nanosaur on your {device_type}") + #else: + # parser_install = subparsers.add_parser('install') # Add simulation install subcommand parser_install.add_argument('--developer', '--dev', action='store_true', help="Install developer workspace") parser_install.add_argument('--force', action='store_true', help="Force the update") @@ -271,9 +166,9 @@ def main(): parser_install.set_defaults(func=install) # Subcommand: workspace (with a sub-menu for workspace operations) - if 'mode' in params and params['mode'] == 'developer': + if workspace.get_workspaces_path(params): # Add workspace subcommand - parser_workspace = parser_workspace_menu(subparsers) + parser_workspace = workspace.parser_workspace_menu(subparsers) # Subcommand: simulation (with a sub-menu for simulation types) if device_type == 'desktop': diff --git a/src/nanosaur/robot.py b/src/nanosaur/robot.py index 7914cac..3e9187d 100644 --- a/src/nanosaur/robot.py +++ b/src/nanosaur/robot.py @@ -23,6 +23,9 @@ # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import inquirer +from inquirer.themes import GreenPassion +import argparse import subprocess from nanosaur import workspace from nanosaur import docker @@ -32,6 +35,62 @@ from nanosaur.utilities import ENGINES_CHOICES, CAMERA_CHOICES, LIDAR_CHOICES +def add_robot_config_subcommands(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: + robot_data = RobotList.get_robot(params) + parser_robot_config = subparsers.add_parser('config', help=f"Configure the robot settings [{robot_data.name}]") + config_subparsers = parser_robot_config.add_subparsers(dest='config_type', help="Configuration options") + # Add robot name subcommand + parser_robot_name = config_subparsers.add_parser('name', help=f"Change the robot name [{robot_data.name}]") + parser_robot_name.set_defaults(func=robot_set_name) + # Add robot domain id subcommand + parser_robot_domain_id = config_subparsers.add_parser('domain_id', help=f"Change the robot domain ID [{robot_data.domain_id}]") + parser_robot_domain_id.set_defaults(func=robot_set_domain_id) + # Add robot camera subcommand + parser_robot_camera = config_subparsers.add_parser('camera', help=f"Change the robot camera type [{robot_data.camera_type}]") + parser_robot_camera.add_argument('--new', type=str, help=f"Specify the new camera type (options: {', '.join(CAMERA_CHOICES)})") + parser_robot_camera.set_defaults(func=robot_set_camera) + # Add robot lidar subcommand + parser_robot_lidar = config_subparsers.add_parser('lidar', help=f"Change the robot lidar type [{robot_data.lidar_type}]") + parser_robot_lidar.add_argument('--new', type=str, choices=LIDAR_CHOICES, help=f"Specify the new lidar type (options: {', '.join(LIDAR_CHOICES)})") + parser_robot_lidar.set_defaults(func=robot_set_lidar) + # Add robot engines subcommand + parser_robot_engines = config_subparsers.add_parser('engines', help=f"Configure the robot engines [{', '.join(robot_data.engines)}]") + parser_robot_engines.add_argument('--new', type=str, help="Specify the new engine configuration") + parser_robot_engines.set_defaults(func=robot_configure_engines) + # Add robot reset subcommand + parser_robot_reset = config_subparsers.add_parser('reset', help="Restore the robot configuration to default") + parser_robot_reset.set_defaults(func=robot_reset) + + return parser_robot_config + + +def parser_robot_menu(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: + robot_data = RobotList.get_robot(params) + parser_robot = subparsers.add_parser('robot', help=f"Manage the Nanosaur robot [{robot_data.name}]") + robot_subparsers = parser_robot.add_subparsers(dest='robot_type', help="Robot operations") + # Add robot display subcommand + parser_robot_display = robot_subparsers.add_parser('display', help="Show the robot") + parser_robot_display.set_defaults(func=robot_display) + # Add robot drive subcommand + parser_robot_drive = robot_subparsers.add_parser('drive', help="Control the robot") + parser_robot_drive.set_defaults(func=control_keyboard) + # Add robot start subcommand + parser_robot_start = robot_subparsers.add_parser('start', help="Activate the robot") + parser_robot_start.add_argument( + '--container', action='store_true', help="Run within a container") + parser_robot_start.add_argument( + '--build', action='store_true', help="Rebuild docker before starting") + parser_robot_start.set_defaults(func=robot_start) + # Add robot stop subcommand + parser_robot_stop = robot_subparsers.add_parser('stop', help="Deactivate the robot") + parser_robot_stop.set_defaults(func=robot_stop) + + # Add robot config subcommand + parser_config = add_robot_config_subcommands(robot_subparsers, params) + + return parser_robot, parser_config + + def robot_start(platform, params: Params, args): device_type = "robot" if platform['Machine'] == 'jetson' else "desktop" # Check the device type @@ -62,195 +121,174 @@ def robot_stop(platform, params: Params, args): def robot_set_name(platform, params: Params, args): """Configure the robot name.""" - # Check if the robot name is provided robot = RobotList.get_robot(params) - if not args.name: - print(f"Current robot name: {robot.name}") + + def validate_name(_, x): + if not x.isalnum(): + raise inquirer.errors.ValidationError("", reason="Name must contain only letters and numbers") return True - # Update the robot name - robot.name = args.name - RobotList.update_robot(params, robot) - print(TerminalFormatter.color_text(f"Robot name set to: {robot.name}", color='green')) - return True + + question = [ + inquirer.Text( + 'name', + message="Enter the new robot name", + default=robot.name, + validate=validate_name + ) + ] + + answers = inquirer.prompt(question, theme=GreenPassion()) + if answers is None: + return False + if not answers['name']: + print(TerminalFormatter.color_text("No name provided", color='red')) + return False + new_name = answers['name'] + if new_name != robot.name: + robot.name = new_name + RobotList.update_robot(params, robot) + print(TerminalFormatter.color_text(f"Robot name set to: {robot.name}", color='green')) + else: + print(TerminalFormatter.color_text(f"Robot name {new_name} is already set", color='yellow')) def robot_set_domain_id(platform, params: Params, args): """Configure the domain ID.""" - # Check if the domain ID is provided robot = RobotList.get_robot(params) - if not args.domain_id: - print(f"Current robot domain_id: {robot.domain_id}") + + def validate_domain_id(_, x): + if not x.isdigit(): + raise inquirer.errors.ValidationError("", reason="Domain ID must be a number") return True - # Update the domain ID - robot.domain_id = args.domain_id - RobotList.update_robot(params, robot) - print(TerminalFormatter.color_text(f"Domain ID set to: {robot.domain_id}", color='green')) - return True + question = [ + inquirer.Text( + 'domain_id', + message="Enter the new domain ID", + default=str(robot.domain_id), + validate=validate_domain_id + ) + ] + + answers = inquirer.prompt(question, theme=GreenPassion()) + if answers is None: + return False + new_domain_id = int(answers['domain_id']) + if new_domain_id != robot.domain_id: + robot.domain_id = new_domain_id + RobotList.update_robot(params, robot) + print(TerminalFormatter.color_text(f"Domain ID set to: {robot.domain_id}", color='green')) + else: + print(TerminalFormatter.color_text(f"Domain ID {new_domain_id} is already set", color='yellow')) def robot_set_camera(platform, params: Params, args): """Configure the camera.""" - def print_options(robot): - all_cameras = sorted(CAMERA_CHOICES + [robot.camera_type] if robot.camera_type not in CAMERA_CHOICES else CAMERA_CHOICES) - options = [] - for i, camera in enumerate(all_cameras): - if camera == robot.camera_type: - if camera: - options.append(TerminalFormatter.color_text(f"{i + 1}. Camera {camera} (selected)", color='green')) - else: - options.append(TerminalFormatter.color_text(f"{i + 1}. No camera (selected)", color='green')) - else: - if camera: - options.append(f"{i + 1}. Select camera {camera}") - else: - options.append(f"{i + 1}. Select no camera") - options.append(f"{len(all_cameras) + 1}. Exit") - for option in options: - print(option) - - def select_camera(robot, camera): - robot.camera_type = camera - RobotList.update_robot(params, robot) - robot = RobotList.get_robot(params) - if args.new_camera is not None: - if args.new_camera != robot.camera_type: - robot.camera_type = args.new_camera + + all_cameras = sorted(set(CAMERA_CHOICES + [robot.camera_type])) + + if args.new is not None: + if args.new not in all_cameras: + robot.camera_type = args.new RobotList.update_robot(params, robot) - print(TerminalFormatter.color_text(f"New camera {args.new_camera} selected", color='green')) + print(TerminalFormatter.color_text(f"New camera {args.new} selected", color='green')) else: - print(TerminalFormatter.color_text(f"Camera {args.new_camera} is already selected", color='yellow')) + print(TerminalFormatter.color_text(f"Camera {args.new} is already exist", color='yellow')) return True - try: - while True: - robot = RobotList.get_robot(params) - print_options(robot) - choice = input("Select an option: ") - all_cameras = sorted(CAMERA_CHOICES + [robot.camera_type] if robot.camera_type not in CAMERA_CHOICES else CAMERA_CHOICES) - if choice.isdigit() and 1 <= int(choice) <= len(all_cameras): - select_camera(robot, all_cameras[int(choice) - 1]) - if robot.camera_type: - print(TerminalFormatter.color_text(f"Camera set to: {robot.camera_type}", color='green')) - else: - print(TerminalFormatter.color_text("No camera selected", color='green')) - break - elif choice == str(len(all_cameras) + 1): - print(TerminalFormatter.color_text("Exiting camera selection", color='yellow')) - break - else: - print(TerminalFormatter.color_text("Invalid option, please try again", color='red')) - except KeyboardInterrupt: - print(TerminalFormatter.color_text("Process interrupted by user", color='yellow')) + options = [ + inquirer.List( + 'camera', + message="Select a camera", + choices=[camera or 'No camera' for camera in all_cameras], + default=robot.camera_type + ) + ] + + answers = inquirer.prompt(options, theme=GreenPassion()) + if answers is None: return False + selected_camera = answers['camera'].replace(" (selected)", "") + + selected_camera = '' if selected_camera == 'No camera' else selected_camera + if selected_camera != robot.camera_type: + robot.camera_type = selected_camera + RobotList.update_robot(params, robot) + print(TerminalFormatter.color_text(f"Camera set to: {robot.camera_type or 'No camera'}", color='green')) + else: + print(TerminalFormatter.color_text(f"Camera {selected_camera or 'No camera'} is already selected", color='yellow')) def robot_set_lidar(platform, params: Params, args): """Configure the lidar.""" - def print_options(robot): - all_lidars = sorted(LIDAR_CHOICES + [robot.lidar_type] if robot.lidar_type not in LIDAR_CHOICES else LIDAR_CHOICES) - options = [] - for i, lidar in enumerate(all_lidars): - if lidar == robot.lidar_type: - if lidar: - options.append(TerminalFormatter.color_text(f"{i + 1}. Lidar {lidar} (selected)", color='green')) - else: - options.append(TerminalFormatter.color_text(f"{i + 1}. No lidar (selected)", color='green')) - else: - if lidar: - options.append(f"{i + 1}. Select lidar {lidar}") - else: - options.append(f"{i + 1}. Select no lidar") - options.append(f"{len(all_lidars) + 1}. Exit") - for option in options: - print(option) - - def select_lidar(robot, lidar): - robot.lidar_type = lidar - RobotList.update_robot(params, robot) - robot = RobotList.get_robot(params) - if args.new_lidar is not None: - if args.new_lidar != robot.lidar_type: - robot.lidar_type = args.new_lidar + + all_lidars = sorted(set(LIDAR_CHOICES + [robot.lidar_type])) + + if args.new is not None: + if args.new not in all_lidars: + robot.lidar_type = args.new RobotList.update_robot(params, robot) - print(TerminalFormatter.color_text(f"New lidar {args.new_lidar} selected", color='green')) + print(TerminalFormatter.color_text(f"New lidar {args.new} selected", color='green')) else: - print(TerminalFormatter.color_text(f"Lidar {args.new_lidar} is already selected", color='yellow')) + print(TerminalFormatter.color_text(f"Lidar {args.new} is already exist", color='yellow')) return True - try: - while True: - robot = RobotList.get_robot(params) - print_options(robot) - choice = input("Select an option: ") - all_lidars = sorted(LIDAR_CHOICES + [robot.lidar_type] if robot.lidar_type not in LIDAR_CHOICES else LIDAR_CHOICES) - if choice.isdigit() and 1 <= int(choice) <= len(all_lidars): - select_lidar(robot, all_lidars[int(choice) - 1]) - if robot.lidar_type: - print(TerminalFormatter.color_text(f"Lidar set to: {robot.lidar_type}", color='green')) - else: - print(TerminalFormatter.color_text("No lidar selected", color='green')) - break - elif choice == str(len(all_lidars) + 1): - print(TerminalFormatter.color_text("Exiting lidar selection", color='yellow')) - break - else: - print(TerminalFormatter.color_text("Invalid option, please try again", color='red')) - except KeyboardInterrupt: - print(TerminalFormatter.color_text("Process interrupted by user", color='yellow')) + options = [ + inquirer.List( + 'lidar', + message="Select a lidar", + choices=[lidar or 'No lidar' for lidar in all_lidars], + default=robot.lidar_type + ) + ] + + answers = inquirer.prompt(options, theme=GreenPassion()) + if answers is None: return False + selected_lidar = answers['lidar'].replace(" (selected)", "") + + selected_lidar = '' if selected_lidar == 'No lidar' else selected_lidar + if selected_lidar != robot.lidar_type: + robot.lidar_type = selected_lidar + RobotList.update_robot(params, robot) + print(TerminalFormatter.color_text(f"Lidar set to: {robot.lidar_type or 'No lidar'}", color='green')) + else: + print(TerminalFormatter.color_text(f"Lidar {selected_lidar or 'No lidar'} is already selected", color='yellow')) def robot_configure_engines(platform, params: Params, args): """Configure the robot engines.""" - def print_options(robot): - all_engines = sorted(list(set(robot.engines + ENGINES_CHOICES))) - options = [] - for i, engine in enumerate(all_engines): - if engine in robot.engines: - options.append(TerminalFormatter.color_text(f"{i + 1}. Engine {engine} (enabled)", color='green')) - else: - options.append(f"{i + 1}. Enable engine {engine}") - options.append(f"{len(all_engines) + 1}. Exit") - for option in options: - print(option) - - def toggle_engine(robot, engine): - if not robot.engines: - robot.engines = [] - if engine in robot.engines: - robot.engines.remove(engine) - else: - robot.engines.append(engine) - RobotList.update_robot(params, robot) - robot = RobotList.get_robot(params) - if args.new_engine is not None: - if args.new_engine not in robot.engines: - robot.engines.append(args.new_engine) + + if args.new is not None: + if args.new not in robot.engines: + robot.engines.append(args.new) RobotList.update_robot(params, robot) - print(TerminalFormatter.color_text(f"New engine {args.new_engine} added", color='green')) + print(TerminalFormatter.color_text(f"New engine {args.new} added", color='green')) else: - print(TerminalFormatter.color_text(f"Engine {args.new_engine} is already enabled", color='yellow')) + print(TerminalFormatter.color_text(f"Engine {args.new} is already enabled", color='yellow')) return True - try: - while True: - robot = RobotList.get_robot(params) - print_options(robot) - choice = input("Select an option: ") - all_engines = sorted(list(set(robot.engines + ENGINES_CHOICES))) - if choice.isdigit() and 1 <= int(choice) <= len(all_engines): - toggle_engine(robot, all_engines[int(choice) - 1]) - elif choice == str(len(all_engines) + 1): - break - else: - print(TerminalFormatter.color_text("Invalid option, please try again", color='red')) - except KeyboardInterrupt: - print(TerminalFormatter.color_text("Process interrupted by user", color='yellow')) + engine_choices = [ + inquirer.Checkbox( + 'engines', + message="Select engines to enable/disable", + choices=list(sorted(list(set(robot.engines + ENGINES_CHOICES)))), + default=robot.engines, + ) + ] + + answers = inquirer.prompt(engine_choices, theme=GreenPassion()) + if answers is None: return False + robot.engines = answers['engines'] + RobotList.update_robot(params, robot) + if robot.engines: + print(TerminalFormatter.color_text(f"Engines updated: {', '.join(robot.engines)}", color='green')) + else: + print(TerminalFormatter.color_text("No engines selected", color='yellow')) def robot_reset(platform, params: Params, args): diff --git a/src/nanosaur/ros.py b/src/nanosaur/ros.py new file mode 100644 index 0000000..111822d --- /dev/null +++ b/src/nanosaur/ros.py @@ -0,0 +1,306 @@ +# Copyright (C) 2025, Raffaello Bonghi +# All rights reserved +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND +# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +# BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import pexpect +import requests +import yaml +import subprocess +import pty +import select +import termios +import tty +import signal +from nanosaur.prompt_colors import TerminalFormatter +from nanosaur.utilities import Params, get_nanosaur_home, create_nanosaur_home, require_sudo_password, conditional_sudo_password + +ros2_distro = 'humble' +ros2_sources = f'/opt/ros/{ros2_distro}/setup.bash' + + +ISAAC_ROS_DISTRO_SUFFIX = "ros2_humble" +NANOSAUR_DOCKERFILE_SUFFIX = "nanosaur" + + +def run_dev_script(platform, params: Params, args): + + perception_path = get_workspace_path(params, params['ws_perception_name']) + isaac_ros_common_path = os.path.join(perception_path, 'src', 'isaac_ros_common') + # Get the path to the Isaac ROS common package + os.chdir(isaac_ros_common_path) + print(f"Changed directory to: {isaac_ros_common_path}") + + nanosaur_home_path = get_nanosaur_home(params['nanosaur_home']) + + # Path to the script you want to run + command = "./scripts/run_dev.sh" + + # Build the command arguments + args = ["-d", nanosaur_home_path] + + # Optional: Commands to run automatically after the script starts + auto_commands = [f"cd {params['ws_perception_name']}"] + + # Save the original terminal settings + original_termios = termios.tcgetattr(sys.stdin) + + def set_raw_mode(): + """Set terminal to raw mode to handle special characters.""" + tty.setraw(sys.stdin.fileno()) + + def restore_terminal(): + """Restore original terminal settings.""" + termios.tcsetattr(sys.stdin, termios.TCSANOW, original_termios) + + def handle_signal(signum, frame): + """Forward terminal signals to the subprocess.""" + os.kill(child_pid, signum) + + # Open a pseudo-terminal + master_fd, slave_fd = pty.openpty() + + # Prepare the command and arguments + cmd = [command] + (args or []) + + # Fork the process + child_pid = os.fork() + if child_pid == 0: # Child process + os.close(master_fd) # Close master in the child process + os.dup2(slave_fd, sys.stdin.fileno()) # Use slave as stdin + os.dup2(slave_fd, sys.stdout.fileno()) # Use slave as stdout + os.dup2(slave_fd, sys.stderr.fileno()) # Use slave as stderr + os.execvp(cmd[0], cmd) # Execute the command + else: # Parent process + os.close(slave_fd) # Close slave in the parent process + + try: + set_raw_mode() + + # Forward terminal signals to the subprocess + for sig in (signal.SIGINT, signal.SIGTSTP, signal.SIGQUIT): + signal.signal(sig, handle_signal) + + # Automatically send commands if specified + if auto_commands: + for command in auto_commands: + os.write(master_fd, (command + '\n').encode()) + + while True: + # Wait for input from the user or output from the subprocess + rlist, _, _ = select.select([sys.stdin, master_fd], [], []) + + if sys.stdin in rlist: # User input + user_input = os.read(sys.stdin.fileno(), 1024) + os.write(master_fd, user_input) # Forward input to the subprocess + + if master_fd in rlist: # Subprocess output + output = os.read(master_fd, 1024) + if not output: # If the subprocess exits, stop the loop + break + # Filter and render subprocess output to avoid cursor resets + filtered_output = output.replace(b"\x1b[2J", b"") # Remove clear screen sequences + sys.stdout.buffer.write(filtered_output) + sys.stdout.buffer.flush() + finally: + restore_terminal() # Restore the original terminal settings + os.close(master_fd) + + print(TerminalFormatter.color_text("Dev script finished", color='green')) + + +def download_rosinstall(url, folder_path, file_name) -> str: + # Create the full file path + file_path = os.path.join(folder_path, file_name) + + # Check if the file already exists + if os.path.exists(file_path): + print(TerminalFormatter.color_text(f"File '{file_name}' already exists in '{folder_path}'. Skip download", color='yellow')) + return file_path # Cancel download + + # Send a request to download the file + response = requests.get(url) + + if response.status_code == 200: + # Save the file in the workspace folder + file_path = os.path.join(folder_path, file_name) + with open(file_path, 'wb') as file: + file.write(response.content) + print(TerminalFormatter.color_text(f"File '{file_name}' downloaded successfully to '{folder_path}'.", color='green')) + return file_path + else: + print(TerminalFormatter.color_text(f"Failed to download file. Status code: {response.status_code}", color='red')) + return None + + +def run_vcs_import(workspace_path, rosinstall_path, src_folder="src") -> bool: + try: + # Run the command and stream the output live + process = subprocess.Popen( + f"vcs import {workspace_path}/{src_folder} < {rosinstall_path}", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + # Stream output live + for line in process.stdout: + print(line.decode('utf-8'), end="") # Print stdout line-by-line + + # Wait for the process to finish + process.wait() + + # Stream any errors + for line in process.stderr: + print(TerminalFormatter.color_text(line.decode('utf-8'), color='red'), end="") # Print stderr (errors) in red + + # Check the exit status of the command + if process.returncode != 0: + print(TerminalFormatter.color_text(process.returncode, color='red')) + else: + print(TerminalFormatter.color_text("Command completed successfully", color='green')) + + return process.returncode == 0 + + except Exception as e: + print(f"An error occurred while running the command: {e}") + return False + + +def run_rosdep(folder_path, password) -> bool: + if password is None: + print(TerminalFormatter.color_text("Error: No password provided.", color='red')) + return False + result = False + try: + child = pexpect.spawn(f"bash -c 'source {ros2_sources} && rosdep install --from-paths {folder_path}/src --ignore-src -r -y'", encoding='utf-8', timeout=None) + # Stream all command output to the terminal in real time + child.logfile = sys.stdout + # Wait for password prompt with timeout + index = child.expect( + ['password for', pexpect.EOF, pexpect.TIMEOUT], timeout=30) + if index == 0: + child.logfile = None # Disable logging to hide password + child.sendline(password) + child.logfile = sys.stdout # Re-enable logging + # Wait for completion + child.expect(pexpect.EOF, timeout=300) + result = True + elif index == 1: # Command finished without password prompt + print("Command finished without asking for a password.") + result = True + elif index == 2: # Timeout + print(TerminalFormatter.color_text("Error: Sudo prompt timed out. Please try again.", color='red')) + result = False + except pexpect.ExceptionPexpect as e: + print(TerminalFormatter.color_text(f"Error running rosdep: {str(e)}", color='red')) + result = False + finally: + # Ensure the process is closed + if child.isalive(): + child.close() + return result + + +def run_colcon_build(folder_path) -> bool: + + # Move to the folder_path and run the colcon build command + try: + os.chdir(folder_path) + print(f"Changed directory to: {folder_path}") + + # Run the command and stream the output live + process = subprocess.Popen( + f"source {ros2_sources} && colcon build --symlink-install --merge-install", + shell=True, + executable="/bin/bash", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + # Stream output live + for line in process.stdout: + print(line.decode('utf-8'), end="") # Print stdout line-by-line + + # Wait for the process to finish + process.wait() + + # Stream any errors + for line in process.stderr: + print(TerminalFormatter.color_text(line.decode('utf-8'), color='red'), end="") # Print stderr (errors) in red + + # Check the exit status of the command + if process.returncode != 0: + print(TerminalFormatter.color_text(process.returncode, color='red')) + else: + print(TerminalFormatter.color_text("Command completed successfully", color='green')) + + return process.returncode == 0 + + except Exception as e: + print(f"An error occurred while running the colcon build command: {e}") + return False + + +def deploy_docker_perception(params: Params, perception_ws_path: str) -> bool: + image_name = "nanosaur-perception" + nanosaur_perception_path = os.path.join(perception_ws_path, 'src', 'nanosaur_perception') + + src_folders = [ + os.path.join(get_nanosaur_home(params['nanosaur_home']), 'shared_src'), + os.path.join(perception_ws_path, 'src') + ] + + try: + os.chdir(nanosaur_perception_path) + print(f"Changed directory to: {nanosaur_perception_path}") + + ws_dir_list = '--ws-src ' + ' --ws-src '.join(src_folders) + command = f"scripts/docker_build.sh {ws_dir_list} --image-name {image_name}" + + process = subprocess.Popen( + command, + shell=True, + executable="/bin/bash", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + + for line in process.stdout: + print(line.decode('utf-8', errors='replace'), end="") + + process.wait() + + if process.returncode != 0: + print(TerminalFormatter.color_text(process.returncode, color='red')) + return False + else: + print(TerminalFormatter.color_text("Command completed successfully", color='green')) + return True + + except Exception as e: + print(f"An error occurred while running the command: {e}") + return False +# EOF diff --git a/src/nanosaur/simulation.py b/src/nanosaur/simulation.py index 81e6eb0..d24e974 100644 --- a/src/nanosaur/simulation.py +++ b/src/nanosaur/simulation.py @@ -23,11 +23,15 @@ # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import inquirer +from inquirer.themes import GreenPassion +import argparse import subprocess from nanosaur import workspace from nanosaur.prompt_colors import TerminalFormatter from nanosaur.utilities import Params, RobotList + # Dictionary of simulation tools and their commands simulation_tools = { "Isaac Sim": { @@ -40,6 +44,26 @@ } } +def parser_simulation_menu(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: + # Get the simulation tool from the parameters + simulation_type = params.get('simulation_tool', "NOT SELECTED") + # Add simulation subcommand + parser_simulation = subparsers.add_parser( + 'simulation', aliases=["sim"], help=f"Work with simulation tools [{simulation_type}]") + simulation_subparsers = parser_simulation.add_subparsers( + dest='simulation_type', help="Simulation types") + + # Add simulation start subcommand + parser_simulation_start = simulation_subparsers.add_parser( + 'start', help="Start the selected simulation") + parser_simulation_start.set_defaults(func=simulation_start) + + # Add simulation set subcommand + parser_simulation_set = simulation_subparsers.add_parser( + 'set', help="Select the simulator you want to use") + parser_simulation_set.set_defaults(func=simulation_set) + return parser_simulation + def start_robot_simulation(params): nanosaur_ws_path = workspace.get_workspace_path(params, params['ws_simulation_name']) @@ -135,38 +159,21 @@ def simulation_start(platform, params: Params, args): def simulation_set(platform, params: Params, args): """Set the simulation tools.""" - while True: - print("Set the simulation tools:") - # List of simulation environments - sim_options = list(simulation_tools.keys()) - - # Print options from list - for i, value in enumerate(sim_options, 1): - if 'simulation_tool' in params and params['simulation_tool'] == value: - print(f"{i}. {TerminalFormatter.color_text(value, color='green')} [Current]") - else: - print(f"{i}. {value}") - exit_option = len(sim_options) + 1 - print(f"{exit_option}. Exit") - - try: - choice = input("Enter your choice: ") - except KeyboardInterrupt: - print(TerminalFormatter.color_text("Exiting...", color='yellow')) - return False - - if choice.isdigit(): - choice_num = int(choice) - if choice_num == exit_option: - # print(TerminalFormatter.color_text("Exiting...", color='yellow')) - break - elif 1 <= choice_num <= len(sim_options): - params['simulation_tool'] = sim_options[choice_num - 1] - print(TerminalFormatter.color_text(f"Selected {sim_options[choice_num - 1]}", color='green')) - break - else: - print(TerminalFormatter.color_text(f"Invalid choice. Please enter a number between 1 and {exit_option}.", color='red')) - else: - print(TerminalFormatter.color_text("Invalid choice. Please enter a number.", color='red')) + + questions = [ + inquirer.List( + 'simulation_tool', + message="Set the simulation tools:", + choices=list(simulation_tools.keys()), + default=params.get('simulation_tool', None) + ) + ] + # Ask the user to select a simulation tool + answers = inquirer.prompt(questions, theme=GreenPassion()) + if answers is None: + return False + # Save the selected simulation tool + params['simulation_tool'] = answers['simulation_tool'] + print(TerminalFormatter.color_text(f"Selected {answers['simulation_tool']}", color='green')) return True # EOF diff --git a/src/nanosaur/swarm.py b/src/nanosaur/swarm.py index 8db8560..4f0d59b 100644 --- a/src/nanosaur/swarm.py +++ b/src/nanosaur/swarm.py @@ -23,15 +23,56 @@ # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - +import inquirer +from inquirer.themes import GreenPassion +import argparse from nanosaur.prompt_colors import TerminalFormatter from nanosaur.utilities import Params, RobotList, Robot +def parser_swarm_menu(subparsers: argparse._SubParsersAction, params: Params) -> argparse.ArgumentParser: + # Get the robot index from the parameters + idx_swarm = params.get('robot_idx', 0) + current_robot_name = RobotList.load(params)._get_robot_by_idx(idx_swarm).name + # Subcommand: swarm (with a sub-menu for swarm operations) + parser_swarm = subparsers.add_parser('swarm', help="Manage swarm Nanosaur robots") + swarm_subparsers = parser_swarm.add_subparsers(dest='swarm_type', help="Robot operations") + # Add robot status subcommand + parser_robot_new = swarm_subparsers.add_parser('new', help="Get a new robot to control") + parser_robot_new.add_argument('name', type=str, nargs='?', help="New robot name") + parser_robot_new.set_defaults(func=robot_new) + # Add robot set subcommand + parser_robot_set = swarm_subparsers.add_parser('set', help=f"Set which robot to control [{current_robot_name}]") + parser_robot_set.add_argument('robot_name', type=str, nargs='?', help="Name of the robot to control") + parser_robot_set.set_defaults(func=robot_idx_set) + # Add robot remove subcommand + parser_robot_remove = swarm_subparsers.add_parser('remove', help="Remove a robot from the swarm") + parser_robot_remove.add_argument('robot_name', type=str, nargs='?', help="Name of the robot to remove") + parser_robot_remove.set_defaults(func=robot_remove) + # Add robot list subcommand + parser_robot_list = swarm_subparsers.add_parser('list', help="List all robots in the swarm") + parser_robot_list.set_defaults(func=robot_list) + return parser_swarm + + def robot_new(platform, params: Params, args): """Add a new robot configuration.""" - # Create a new robot configuration - robot = Robot(name=args.name) + + def validate_name(_, x): + if not x.isalnum(): + raise inquirer.errors.ValidationError("", reason="Name must contain only letters and numbers") + return True + + questions = [ + inquirer.Text( + 'name', + message="Enter the new robot name", + default=args.name, + validate=validate_name, + ) + ] + answers = inquirer.prompt(questions, theme=GreenPassion()) + robot = Robot(name=answers['name']) if RobotList.add_robot(params, robot): print(TerminalFormatter.color_text("New robot configuration added", color='green')) return True @@ -39,17 +80,31 @@ def robot_new(platform, params: Params, args): def robot_idx_set(platform, params: Params, args): - """Set the robot index.""" + """Set the robot configuration.""" + # Load the robot list + robots = RobotList.load(params) + + default = robots._get_robot_by_idx(params.get('robot_idx', 0)) if args.robot_name is not None: - idx = RobotList.get_idx_by_name(params, args.robot_name) - if idx is not None: - params['robot_idx'] = idx - print(TerminalFormatter.color_text(f"Robot index set to: {idx}", color='green')) - return True - print(TerminalFormatter.color_text("Robot not found", color='red')) - else: - robot = RobotList.load(params)._get_robot_by_idx(params.get('robot_idx', 0)) - print(f"Current robot index: {params.get('robot_idx', 0)} name: {robot.name}") + default = robots._get_robot_by_name(args.robot_name) + + options = [ + inquirer.List( + 'robot', + message="Select robot in list", + choices=robots.to_list(), + default=default + ) + ] + # Get the selected robot + answers = inquirer.prompt(options, theme=GreenPassion()) + if answers is None: + return False + # Get the selected robot and its index + robot = answers['robot'] + idx = RobotList.get_idx_by_name(params, robot.name) + print(f"Selected robot: {robot.name} with index {idx}") + params['robot_idx'] = idx def robot_remove(platform, params: Params, args): @@ -59,13 +114,21 @@ def robot_remove(platform, params: Params, args): args.robot_name = robot.name formatted_robot_name = TerminalFormatter.color_text(args.robot_name, color='green', bold=True) - confirmation = input(f"Confirm {TerminalFormatter.color_text('remove', color='red', bold=True)} config for {formatted_robot_name}? (yes/no): ") - if confirmation.lower() == 'yes': + questions = [ + inquirer.Confirm( + 'confirm', + message=f"Confirm {TerminalFormatter.color_text('remove', color='red', bold=True)} config for {formatted_robot_name}?", + default=False + ) + ] + answers = inquirer.prompt(questions, theme=GreenPassion()) + if answers is None: + return False + # Remove the robot configuration + if answers['confirm']: RobotList.remove_robot(params, params.get('robot_idx', 0)) print(TerminalFormatter.color_text("Robot configuration removed", color='green')) return True - else: - print(TerminalFormatter.color_text("Robot configuration removal canceled", color='yellow')) def robot_list(platform, params: Params, args): diff --git a/src/nanosaur/utilities.py b/src/nanosaur/utilities.py index 992f162..0ccf95a 100644 --- a/src/nanosaur/utilities.py +++ b/src/nanosaur/utilities.py @@ -41,7 +41,7 @@ CAMERA_CHOICES = ['', 'realsense', 'zed'] LIDAR_CHOICES = ['', 'LD06', 'rplidar'] -ENGINES_CHOICES = ['vlslam', 'nvblox', 'apriltag'] +ENGINES_CHOICES = ['vslam', 'nvblox', 'apriltag'] class Robot: @@ -201,6 +201,9 @@ def __repr__(self): def to_dict(self) -> list: return [robot.to_dict() for robot in self.robots] + def to_list(self) -> list: + return self.robots + def print_all_robots(self, robot_idx=None): if robot_idx is not None: print(TerminalFormatter.color_text(f"All robots: (selected: {robot_idx})", bold=True)) diff --git a/src/nanosaur/workspace.py b/src/nanosaur/workspace.py index bd53464..3198536 100644 --- a/src/nanosaur/workspace.py +++ b/src/nanosaur/workspace.py @@ -23,22 +23,16 @@ # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + import os import sys import pexpect -import requests import yaml -import subprocess -import pty -import select -import termios -import tty -import signal +import argparse from nanosaur.prompt_colors import TerminalFormatter +from nanosaur import ros from nanosaur.utilities import Params, get_nanosaur_home, create_nanosaur_home, require_sudo_password, conditional_sudo_password -ros2_distro = 'humble' -ros2_sources = f'/opt/ros/{ros2_distro}/setup.bash' # Default colcon settings COLCON_DEFAULTS = { @@ -48,94 +42,69 @@ } } -ISAAC_ROS_DISTRO_SUFFIX = "ros2_humble" -NANOSAUR_DOCKERFILE_SUFFIX = "nanosaur" - - -def run_dev_script(platform, params: Params, args): - - perception_path = get_workspace_path(params, params['ws_perception_name']) - isaac_ros_common_path = os.path.join(perception_path, 'src', 'isaac_ros_common') - # Get the path to the Isaac ROS common package - os.chdir(isaac_ros_common_path) - print(f"Changed directory to: {isaac_ros_common_path}") +def parser_workspace_menu(subparsers: argparse._SubParsersAction) -> argparse.ArgumentParser: + parser_workspace = subparsers.add_parser( + 'workspace', aliases=["ws"], help="Manage the Nanosaur workspace") + workspace_subparsers = parser_workspace.add_subparsers( + dest='workspace_type', help="Workspace types") + # Add workspace clean subcommand + + def add_workspace_subcommand(name, help_text, func): + parser = workspace_subparsers.add_parser(name, help=help_text) + parser.add_argument('workspace', type=str, nargs='?', help="Specify the workspace to clean") + parser.add_argument('--force', action='store_true', help="Force the workspace clean") + parser.add_argument('--all-platforms', '--all', action='store_true', help="Clean all workspaces") + parser.set_defaults(func=func) + return parser + # Add workspace clean subcommand + add_workspace_subcommand('clean', "Clean the workspace", clean) + add_workspace_subcommand('update', "Update the workspace", update) + add_workspace_subcommand('deploy', "Deploy workspace to docker image", deploy) + # Add workspace perception subcommand + parser_workspace_perception = workspace_subparsers.add_parser( + 'perception', help="Start the Isaac ROS docker container") + parser_workspace_perception.set_defaults(func=ros.run_dev_script) + return parser_workspace + + + +def get_workspaces_path(params: Params) -> bool: nanosaur_home_path = get_nanosaur_home(params['nanosaur_home']) + # Add all workspaces that exist in the Nanosaur home folder + return [ + workspace for workspace in [ + 'ws_developer_name', 'ws_robot_name', 'ws_simulation_name', 'ws_perception_name' + ] if os.path.exists(os.path.join(nanosaur_home_path, params[workspace])) + ] - # Path to the script you want to run - command = "./scripts/run_dev.sh" - - # Build the command arguments - args = ["-d", nanosaur_home_path] - - # Optional: Commands to run automatically after the script starts - auto_commands = [f"cd {params['ws_perception_name']}"] - - # Save the original terminal settings - original_termios = termios.tcgetattr(sys.stdin) - - def set_raw_mode(): - """Set terminal to raw mode to handle special characters.""" - tty.setraw(sys.stdin.fileno()) - - def restore_terminal(): - """Restore original terminal settings.""" - termios.tcsetattr(sys.stdin, termios.TCSANOW, original_termios) - - def handle_signal(signum, frame): - """Forward terminal signals to the subprocess.""" - os.kill(child_pid, signum) - - # Open a pseudo-terminal - master_fd, slave_fd = pty.openpty() - - # Prepare the command and arguments - cmd = [command] + (args or []) - - # Fork the process - child_pid = os.fork() - if child_pid == 0: # Child process - os.close(master_fd) # Close master in the child process - os.dup2(slave_fd, sys.stdin.fileno()) # Use slave as stdin - os.dup2(slave_fd, sys.stdout.fileno()) # Use slave as stdout - os.dup2(slave_fd, sys.stderr.fileno()) # Use slave as stderr - os.execvp(cmd[0], cmd) # Execute the command - else: # Parent process - os.close(slave_fd) # Close slave in the parent process - - try: - set_raw_mode() - - # Forward terminal signals to the subprocess - for sig in (signal.SIGINT, signal.SIGTSTP, signal.SIGQUIT): - signal.signal(sig, handle_signal) - - # Automatically send commands if specified - if auto_commands: - for command in auto_commands: - os.write(master_fd, (command + '\n').encode()) - while True: - # Wait for input from the user or output from the subprocess - rlist, _, _ = select.select([sys.stdin, master_fd], [], []) +def get_workspace_path(params: Params, ws_name) -> str: + # Create the Nanosaur home folder + nanosaur_home_path = create_nanosaur_home(params['nanosaur_home']) + # Create the full path for the workspace folder in the user's home directory + workspace_path = os.path.join(nanosaur_home_path, ws_name) - if sys.stdin in rlist: # User input - user_input = os.read(sys.stdin.fileno(), 1024) - os.write(master_fd, user_input) # Forward input to the subprocess + # Check if the workspace folder exists + if os.path.exists(workspace_path) and os.path.isdir(workspace_path): + return workspace_path + else: + return None - if master_fd in rlist: # Subprocess output - output = os.read(master_fd, 1024) - if not output: # If the subprocess exits, stop the loop - break - # Filter and render subprocess output to avoid cursor resets - filtered_output = output.replace(b"\x1b[2J", b"") # Remove clear screen sequences - sys.stdout.buffer.write(filtered_output) - sys.stdout.buffer.flush() - finally: - restore_terminal() # Restore the original terminal settings - os.close(master_fd) - print(TerminalFormatter.color_text("Dev script finished", color='green')) +def create_workspace(nanosaur_home_path, ws_name) -> str: + ws_name_path = os.path.join(nanosaur_home_path, ws_name) + ws_name_path_src = os.path.join(ws_name_path, "src") + # Check if folder exists, if not, create it + if not os.path.exists(ws_name_path_src): + os.makedirs(ws_name_path_src) + print(TerminalFormatter.color_text(f"Workspace '{ws_name}' created in {nanosaur_home_path}.", color='green')) + else: + print(TerminalFormatter.color_text(f"Workspace '{ws_name}' already exists.", color='yellow')) + # Save the default colcon settings + with open(f"{ws_name_path}/colcon_defaults.yaml", 'w') as file: + yaml.dump(COLCON_DEFAULTS, file) + return ws_name_path def clean_workspace(nanosaur_ws_name, password) -> bool: @@ -195,38 +164,13 @@ def clean_workspace(nanosaur_ws_name, password) -> bool: return False -def get_workspace_path(params: Params, ws_name) -> str: - # Create the Nanosaur home folder - nanosaur_home_path = create_nanosaur_home(params['nanosaur_home']) - # Create the full path for the workspace folder in the user's home directory - workspace_path = os.path.join(nanosaur_home_path, ws_name) - - # Check if the workspace folder exists - if os.path.exists(workspace_path) and os.path.isdir(workspace_path): - return workspace_path - else: - return None - -def create_workspace(nanosaur_home_path, ws_name) -> str: - ws_name_path = os.path.join(nanosaur_home_path, ws_name) - ws_name_path_src = os.path.join(ws_name_path, "src") - # Check if folder exists, if not, create it - if not os.path.exists(ws_name_path_src): - os.makedirs(ws_name_path_src) - print(TerminalFormatter.color_text(f"Workspace '{ws_name}' created in {nanosaur_home_path}.", color='green')) - else: - print(TerminalFormatter.color_text(f"Workspace '{ws_name}' already exists.", color='yellow')) - # Save the default colcon settings - with open(f"{ws_name_path}/colcon_defaults.yaml", 'w') as file: - yaml.dump(COLCON_DEFAULTS, file) - return ws_name_path def build_workspace(nanosaur_raw_github_repo, branch, workspace_path, rosinstall_name, password, skip_rosdep=False, skip_build=False) -> bool: # Download rosinstall for this device url = f"{nanosaur_raw_github_repo}/{branch}/nanosaur/rosinstall/{rosinstall_name}.rosinstall" - rosinstall_path = download_rosinstall(url, workspace_path, f"{rosinstall_name}.rosinstall") + rosinstall_path = ros.download_rosinstall(url, workspace_path, f"{rosinstall_name}.rosinstall") if rosinstall_path is not None: print(TerminalFormatter.color_text(f"- Fill {rosinstall_name} from {rosinstall_name}.rosinstall", bold=True)) else: @@ -235,28 +179,27 @@ def build_workspace(nanosaur_raw_github_repo, branch, workspace_path, rosinstall # Import workspace print(TerminalFormatter.color_text(f"- Import workspace from {rosinstall_name}.rosinstall", bold=True)) # run vcs import to sync the workspace - vcs_status = run_vcs_import(workspace_path, rosinstall_path) + vcs_status = ros.run_vcs_import(workspace_path, rosinstall_path) if not vcs_status: print(TerminalFormatter.color_text("Failed to import workspace", color='red')) return False # rosdep workspace if not skip_rosdep: print(TerminalFormatter.color_text(f"- Install all dependencies on workspace {workspace_path}", bold=True)) - if not run_rosdep(workspace_path, password): + if not ros.run_rosdep(workspace_path, password): print(TerminalFormatter.color_text("Failed to install dependencies", color='red')) return False # Build environment if not skip_build: print(TerminalFormatter.color_text(f"- Build workspace {workspace_path}", bold=True)) - if not run_colcon_build(workspace_path): + if not ros.run_colcon_build(workspace_path): print(TerminalFormatter.color_text("Failed to build workspace", color='red')) return False # All fine return True - @require_sudo_password -def create_developer_workspace(platform, params: Params, args, password=None): +def create_maintainer_workspace(platform, params: Params, args, password=None): # determine the device type device_type = "robot" if platform['Machine'] == 'jetson' else "desktop" # Get the Nanosaur home folder and branch @@ -274,7 +217,7 @@ def create_developer_workspace(platform, params: Params, args, password=None): print(TerminalFormatter.color_text(f"Shared src folder created in {nanosaur_home_path}.", color='green')) # Download rosinstall for this device url = f"{nanosaur_raw_github_repo}/{branch}/nanosaur/rosinstall/shared.rosinstall" - rosinstall_path = download_rosinstall(url, nanosaur_shared_src, "shared.rosinstall") + rosinstall_path = ros.download_rosinstall(url, nanosaur_shared_src, "shared.rosinstall") if rosinstall_path is not None: print(TerminalFormatter.color_text("- Fill shared src from shared.rosinstall", bold=True)) else: @@ -283,7 +226,7 @@ def create_developer_workspace(platform, params: Params, args, password=None): # Import workspace print(TerminalFormatter.color_text("- Import workspace from shared.rosinstall", bold=True)) # run vcs import to sync the workspace - vcs_status = run_vcs_import(nanosaur_home_path, rosinstall_path, src_folder="shared_src") + vcs_status = ros.run_vcs_import(nanosaur_home_path, rosinstall_path, src_folder="shared_src") if not vcs_status: print(TerminalFormatter.color_text("Failed to import workspace", color='red')) return False @@ -304,141 +247,8 @@ def create_developer_workspace(platform, params: Params, args, password=None): # Make the perception workspace ws_name_path = create_workspace(nanosaur_home_path, params['ws_perception_name']) build_workspace(branch, ws_name_path, 'perception', password, skip_rosdep=True, skip_build=True) - # Set params in developer mode - params['mode'] = 'developer' - - -def download_rosinstall(url, folder_path, file_name) -> str: - # Create the full file path - file_path = os.path.join(folder_path, file_name) - - # Check if the file already exists - if os.path.exists(file_path): - print(TerminalFormatter.color_text(f"File '{file_name}' already exists in '{folder_path}'. Skip download", color='yellow')) - return file_path # Cancel download - - # Send a request to download the file - response = requests.get(url) - - if response.status_code == 200: - # Save the file in the workspace folder - file_path = os.path.join(folder_path, file_name) - with open(file_path, 'wb') as file: - file.write(response.content) - print(TerminalFormatter.color_text(f"File '{file_name}' downloaded successfully to '{folder_path}'.", color='green')) - return file_path - else: - print(TerminalFormatter.color_text(f"Failed to download file. Status code: {response.status_code}", color='red')) - return None - - -def run_vcs_import(workspace_path, rosinstall_path, src_folder="src") -> bool: - try: - # Run the command and stream the output live - process = subprocess.Popen( - f"vcs import {workspace_path}/{src_folder} < {rosinstall_path}", - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - - # Stream output live - for line in process.stdout: - print(line.decode('utf-8'), end="") # Print stdout line-by-line - - # Wait for the process to finish - process.wait() - - # Stream any errors - for line in process.stderr: - print(TerminalFormatter.color_text(line.decode('utf-8'), color='red'), end="") # Print stderr (errors) in red - - # Check the exit status of the command - if process.returncode != 0: - print(TerminalFormatter.color_text(process.returncode, color='red')) - else: - print(TerminalFormatter.color_text("Command completed successfully", color='green')) - - return process.returncode == 0 - - except Exception as e: - print(f"An error occurred while running the command: {e}") - return False - - -def run_rosdep(folder_path, password) -> bool: - if password is None: - print(TerminalFormatter.color_text("Error: No password provided.", color='red')) - return False - result = False - try: - child = pexpect.spawn(f"bash -c 'source {ros2_sources} && rosdep install --from-paths {folder_path}/src --ignore-src -r -y'", encoding='utf-8', timeout=None) - # Stream all command output to the terminal in real time - child.logfile = sys.stdout - # Wait for password prompt with timeout - index = child.expect( - ['password for', pexpect.EOF, pexpect.TIMEOUT], timeout=30) - if index == 0: - child.logfile = None # Disable logging to hide password - child.sendline(password) - child.logfile = sys.stdout # Re-enable logging - # Wait for completion - child.expect(pexpect.EOF, timeout=300) - result = True - elif index == 1: # Command finished without password prompt - print("Command finished without asking for a password.") - result = True - elif index == 2: # Timeout - print(TerminalFormatter.color_text("Error: Sudo prompt timed out. Please try again.", color='red')) - result = False - except pexpect.ExceptionPexpect as e: - print(TerminalFormatter.color_text(f"Error running rosdep: {str(e)}", color='red')) - result = False - finally: - # Ensure the process is closed - if child.isalive(): - child.close() - return result - - -def run_colcon_build(folder_path) -> bool: - - # Move to the folder_path and run the colcon build command - try: - os.chdir(folder_path) - print(f"Changed directory to: {folder_path}") - - # Run the command and stream the output live - process = subprocess.Popen( - f"source {ros2_sources} && colcon build --symlink-install --merge-install", - shell=True, - executable="/bin/bash", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - # Stream output live - for line in process.stdout: - print(line.decode('utf-8'), end="") # Print stdout line-by-line - - # Wait for the process to finish - process.wait() - - # Stream any errors - for line in process.stderr: - print(TerminalFormatter.color_text(line.decode('utf-8'), color='red'), end="") # Print stderr (errors) in red - - # Check the exit status of the command - if process.returncode != 0: - print(TerminalFormatter.color_text(process.returncode, color='red')) - else: - print(TerminalFormatter.color_text("Command completed successfully", color='green')) - - return process.returncode == 0 - - except Exception as e: - print(f"An error occurred while running the colcon build command: {e}") - return False + # Set params in maintainer mode + params['mode'] = 'maintainer' @require_sudo_password @@ -490,12 +300,12 @@ def update_workspace(params, workspace_type, workspace_name_key, skip_build=Fals rosinstall_path = os.path.join(workspace_path, f"{workspace_type}.rosinstall") if os.path.exists(rosinstall_path): print(TerminalFormatter.color_text(f"Found rosinstall file: {rosinstall_path}", bold=True)) - if not run_vcs_import(workspace_path, rosinstall_path): + if not ros.run_vcs_import(workspace_path, rosinstall_path): return False # rosdep workspace if not skip_build: print(TerminalFormatter.color_text(f"- Update {workspace_name} workspace", bold=True)) - if not run_colcon_build(workspace_path): + if not ros.run_colcon_build(workspace_path): return False return True @@ -505,7 +315,7 @@ def update_workspace(params, workspace_type, workspace_name_key, skip_build=Fals rosinstall_path = os.path.join(shared_src_path, "shared.rosinstall") if os.path.exists(rosinstall_path): print(TerminalFormatter.color_text(f"Found rosinstall file: {rosinstall_path}", bold=True)) - if not run_vcs_import(nanosaur_home_path, rosinstall_path, src_folder="shared_src"): + if not ros.run_vcs_import(nanosaur_home_path, rosinstall_path, src_folder="shared_src"): return False # Update the robot workspace @@ -522,49 +332,6 @@ def update_workspace(params, workspace_type, workspace_name_key, skip_build=Fals return True -def deploy_docker_perception(params: Params): - image_name = "nanosaur-perception" - perception_ws_name = params['ws_perception_name'] - perception_ws_path = get_workspace_path(params, perception_ws_name) - nanosaur_perception_path = os.path.join(perception_ws_path, 'src', 'nanosaur_perception') - - src_folders = [ - os.path.join(get_nanosaur_home(params['nanosaur_home']), 'shared_src'), - os.path.join(perception_ws_path, 'src') - ] - - try: - os.chdir(nanosaur_perception_path) - print(f"Changed directory to: {nanosaur_perception_path}") - - ws_dir_list = '--ws-src ' + ' --ws-src '.join(src_folders) - command = f"scripts/docker_build.sh {ws_dir_list} --image-name {image_name}" - - process = subprocess.Popen( - command, - shell=True, - executable="/bin/bash", - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - ) - - for line in process.stdout: - print(line.decode('utf-8', errors='replace'), end="") - - process.wait() - - if process.returncode != 0: - print(TerminalFormatter.color_text(process.returncode, color='red')) - return False - else: - print(TerminalFormatter.color_text("Command completed successfully", color='green')) - return True - - except Exception as e: - print(f"An error occurred while running the command: {e}") - return False - - def deploy(platform, params: Params, args, password=None): if args.workspace is not None: workspace = args.workspace @@ -579,7 +346,9 @@ def deploy(platform, params: Params, args, password=None): # Call the function within the deploy function if workspace == 'perception' or args.all_platforms: - deploy_docker_perception(params) + perception_ws_name = params['ws_perception_name'] + perception_ws_path = get_workspace_path(params, perception_ws_name) + ros.deploy_docker_perception(params, perception_ws_path) return True # EOF