From e577915ee6ae87b6d89fd8bab5f2e28d98ad4fab Mon Sep 17 00:00:00 2001 From: Raffaello Bonghi Date: Sat, 18 Jan 2025 16:49:03 +0000 Subject: [PATCH] Improve script with multiple workspaces --- src/nanosaur/main.py | 50 ++++--- src/nanosaur/utilities.py | 41 ++++-- src/nanosaur/workspace.py | 290 +++++++++++++++++++++++++------------- 3 files changed, 258 insertions(+), 123 deletions(-) diff --git a/src/nanosaur/main.py b/src/nanosaur/main.py index 3e11570..8fc9fcb 100644 --- a/src/nanosaur/main.py +++ b/src/nanosaur/main.py @@ -35,14 +35,18 @@ from nanosaur import workspace from nanosaur import simulation from nanosaur import robot -from nanosaur.workspace import get_workspace_path +from nanosaur.utilities import require_sudo_password from nanosaur.prompt_colors import TerminalFormatter -NANOSAUR_CONFIG_FILE = 'nanosaur.yaml' +NANOSAUR_CONFIG_FILE_NAME = 'nanosaur.yaml' +NANOSAUR_HOME_NAME = 'nanosaur' # Define default parameters DEFAULT_PARAMS = { - 'nanosaur_workspace_name': 'nanosaur_ws', + 'nanosaur_home': NANOSAUR_HOME_NAME, + 'robot_ws_name': 'robot_ws', + 'simulation_ws_name': 'simulation_ws', + 'perception_ws_name': 'perception_ws', 'nanosaur_branch': 'nanosaur2', } @@ -60,14 +64,11 @@ def info(platform, params: Params, args): for key, value in platform.items(): print(f" {key}: {value}") - -def install(platform, params: Params, args): - device_type = "robot" if platform['Machine'] == 'jetson' else "desktop" - print(TerminalFormatter.color_text(f"Installing Nanosaur for {device_type}...", color='green')) - if device_type == 'desktop': - simulation.simulation_install(platform, params, args) - elif device_type == 'robot': - print(TerminalFormatter.color_text("Robot installation not supported yet.", color='red')) +def install(platform, params: Params, args, password=None): + if args.developer: + workspace.create_developer_workspace(platform, params, args) + else: + print(TerminalFormatter.color_text("Not implemented yet", color='red')) def parser_workspace_menu(subparsers: argparse._SubParsersAction) -> argparse.ArgumentParser: @@ -78,14 +79,24 @@ def parser_workspace_menu(subparsers: argparse._SubParsersAction) -> argparse.Ar # Add workspace clean subcommand parser_workspace_clean = workspace_subparsers.add_parser( 'clean', help="Clean the workspace") + parser_workspace_clean.add_argument( + 'workspace', type=str, nargs='?', help="Specify the workspace to clean") parser_workspace_clean.add_argument( '--force', action='store_true', help="Force the workspace clean") + parser_workspace_clean.add_argument( + '--all-platforms', '--all', action='store_true', help="Clean all workspaces") + parser_workspace_clean.add_argument( + '--perception', action='store_true', help="Clean the perception workspace") parser_workspace_clean.set_defaults(func=workspace.clean) # Add workspace update subcommand parser_workspace_update = workspace_subparsers.add_parser( 'update', help="Update the workspace") + parser_workspace_update.add_argument( + 'workspace', type=str, nargs='?', help="Specify the workspace to clean") parser_workspace_update.add_argument( '--force', action='store_true', help="Force the update") + parser_workspace_update.add_argument( + '--all-platforms', '--all', action='store_true', help="Clean all workspaces") parser_workspace_update.set_defaults(func=workspace.update) return parser_workspace @@ -133,8 +144,7 @@ def parser_swarm_menu(subparsers: argparse._SubParsersAction, params: Params) -> def main(): # Load the parameters - user_home_dir = os.path.expanduser("~") - params = Params.load(DEFAULT_PARAMS, params_file=f'{user_home_dir}/{NANOSAUR_CONFIG_FILE}') + params = Params.load(DEFAULT_PARAMS, home_folder=NANOSAUR_HOME_NAME, params_file_name=NANOSAUR_CONFIG_FILE_NAME) # Extract device information with jtop try: @@ -160,24 +170,26 @@ def main(): parser_info.set_defaults(func=info) # Subcommand: install (hidden if workspace already exists) - if get_workspace_path(params['nanosaur_workspace_name']) is None: + #if get_workspace_path(params['nanosaur_workspace_name']) is None: + if 'developer_mode' not in params and not params['developer_mode']: parser_install = subparsers.add_parser('install', help="Install the Nanosaur workspace") else: parser_install = subparsers.add_parser('install') # Add simulation install subcommand - parser_install.add_argument('--developer', action='store_true', help="Install developer workspace") + parser_install.add_argument('--developer', '--dev', action='store_true', help="Install developer workspace") parser_install.add_argument('--force', action='store_true', help="Force the update") + parser_install.add_argument('--all-platforms', action='store_true', help="Install for all platforms") parser_install.set_defaults(func=install) # Subcommand: workspace (with a sub-menu for workspace operations) - if get_workspace_path(params['nanosaur_workspace_name']) is not None: + if 'developer_mode' in params and params['developer_mode']: # Add workspace subcommand parser_workspace = parser_workspace_menu(subparsers) # Subcommand: simulation (with a sub-menu for simulation types) - if device_type == 'desktop' and get_workspace_path(params['nanosaur_workspace_name']) is not None: - # Add simulation subcommand - parser_simulation = parser_simulation_menu(subparsers, params) + #if device_type == 'desktop' and get_workspace_path(params['nanosaur_workspace_name']) is not None: + # # Add simulation subcommand + parser_simulation = parser_simulation_menu(subparsers, params) # Subcommand: robot (with a sub-menu for robot operations) robot_data = robot.RobotList.get_robot(params) diff --git a/src/nanosaur/utilities.py b/src/nanosaur/utilities.py index 4f20bbc..86f0fdc 100644 --- a/src/nanosaur/utilities.py +++ b/src/nanosaur/utilities.py @@ -30,11 +30,11 @@ import getpass from nanosaur.prompt_colors import TerminalFormatter - class Params: @classmethod - def load(cls, default_params, params_file=None): + def load(cls, default_params, home_folder, params_file_name): + params_file = Params.get_params_file(home_folder, params_file_name) # Load parameters from YAML file if it exists if os.path.exists(params_file): with open(params_file, 'r') as file: @@ -42,12 +42,13 @@ def load(cls, default_params, params_file=None): else: params_dict = default_params - return cls(params_dict, params_file) + return cls(params_dict, home_folder, params_file) - def __init__(self, params_dict, params_file=None): + def __init__(self, params_dict, home_folder, params_file_name): self._params_dict = params_dict self._default_params = copy.deepcopy(params_dict) - self.params_file = params_file + self.home_folder = home_folder + self.params_file_name = params_file_name for key, value in params_dict.items(): setattr(self, key, value) @@ -73,11 +74,20 @@ def __repr__(self): return str(self._params_dict) def save(self): - if self.params_file and self._params_dict != self._default_params: - print(TerminalFormatter.color_text(f"Saving parameters to {self.params_file}", color='yellow')) - with open(self.params_file, 'w') as file: + params_file = Params.get_params_file(self.home_folder, self.params_file_name) + # Save the parameters to the file if they are different from the default + if params_file and self._params_dict != self._default_params: + # Get the current nanosaur's home directory + create_nanosaur_home(self.home_folder) + # Save the parameters to the file + print(TerminalFormatter.color_text(f"Saving parameters to {self.params_file_name}", color='yellow')) + with open(params_file, 'w') as file: yaml.dump(self._params_dict, file) + @staticmethod + def get_params_file(home_folder, params_file_name): + return os.path.join(get_nanosaur_home(home_folder), params_file_name) + def get(self, key, default=None): return getattr(self, key, default) @@ -90,6 +100,21 @@ def set(self, key, value): def items(self): return self._params_dict.items() +def create_nanosaur_home(nanosaur_home): + # Get the current user's home directory + user_home_dir = os.path.expanduser("~") + # Create the full path for the workspace folder in the user's home directory + nanosaur_home_path = os.path.join(user_home_dir, nanosaur_home) + # Check if folder exists, if not, create it + if not os.path.exists(nanosaur_home_path): + os.makedirs(nanosaur_home_path) + print(TerminalFormatter.color_text(f"Folder '{nanosaur_home_path}' created.", color='green')) + return nanosaur_home_path + +def get_nanosaur_home(nanosaur_home): + # Get the current user's home directory + user_home_dir = os.path.expanduser("~") + return os.path.join(user_home_dir, nanosaur_home) def require_sudo(func): def wrapper(*args, **kwargs): diff --git a/src/nanosaur/workspace.py b/src/nanosaur/workspace.py index 4fc9efd..8530e9a 100644 --- a/src/nanosaur/workspace.py +++ b/src/nanosaur/workspace.py @@ -27,13 +27,22 @@ import sys import pexpect import requests +import yaml import subprocess from nanosaur.prompt_colors import TerminalFormatter -from nanosaur.utilities import Params, require_sudo_password, conditional_sudo_password +from nanosaur.utilities import Params, create_nanosaur_home, require_sudo_password, conditional_sudo_password ros2_distro = 'humble' ros2_sources = f'/opt/ros/{ros2_distro}/setup.bash' +# Default colcon settings +COLCON_DEFAULTS = { + 'build': { + 'base-paths': ['src', '../shared_src'], + 'symlink-install': True, + } +} + def clean_workspace(nanosaur_ws_name, password): """ @@ -55,61 +64,53 @@ def clean_workspace(nanosaur_ws_name, password): # Check if the workspace folder exists if os.path.exists(workspace_path) and os.path.isdir(workspace_path): - print(TerminalFormatter.color_text(f"Workspace '{workspace_path}' exists. Cleaning build, install and log folders", color='yellow')) - result = False - try: - child = pexpect.spawn(f"sudo rm -rf {workspace_path}/build {workspace_path}/install {workspace_path}/log", encoding='utf-8', timeout=None) - # Wait for password prompt with timeout - index = child.expect( - ['password for', pexpect.EOF, pexpect.TIMEOUT], timeout=30) - if index == 0: - child.logfile = None # Disable logging to hide password - child.sendline(password) - child.logfile = sys.stdout # Re-enable logging - # Wait for completion - child.expect(pexpect.EOF, timeout=300) - result = True - elif index == 1: # Command finished without password prompt - print("Command finished without asking for a password.") - result = True - elif index == 2: # Timeout - print(TerminalFormatter.color_text("Error: Sudo prompt timed out. Please try again.", color='red')) - result = False - - # Stream all command output to the terminal in real time - child.logfile = sys.stdout - - except pexpect.ExceptionPexpect as e: - print(TerminalFormatter.color_text(f"Error running rm {str(e)}", color='red')) + subfolders = ['build', 'install', 'log'] + subfolders_exist = all(os.path.exists(os.path.join(workspace_path, subfolder)) for subfolder in subfolders) + + if subfolders_exist: + print(TerminalFormatter.color_text(f"Workspace '{workspace_path}' and subfolders exist. Cleaning build, install and log folders", color='yellow')) result = False - finally: - # Ensure the process is closed - if child.isalive(): - child.close() - print(TerminalFormatter.color_text(f"Workspace '{workspace_path}' cleaned up.", color='green')) - return result + try: + child = pexpect.spawn(f"sudo rm -rf {workspace_path}/build {workspace_path}/install {workspace_path}/log", encoding='utf-8', timeout=None) + # Wait for password prompt with timeout + index = child.expect( + ['password for', pexpect.EOF, pexpect.TIMEOUT], timeout=30) + if index == 0: + child.logfile = None # Disable logging to hide password + child.sendline(password) + child.logfile = sys.stdout # Re-enable logging + # Wait for completion + child.expect(pexpect.EOF, timeout=300) + result = True + elif index == 1: # Command finished without password prompt + print("Command finished without asking for a password.") + result = True + elif index == 2: # Timeout + print(TerminalFormatter.color_text("Error: Sudo prompt timed out. Please try again.", color='red')) + result = False + + # Stream all command output to the terminal in real time + child.logfile = sys.stdout + + except pexpect.ExceptionPexpect as e: + print(TerminalFormatter.color_text(f"Error running rm {str(e)}", color='red')) + result = False + finally: + # Ensure the process is closed + if child.isalive(): + child.close() + print(TerminalFormatter.color_text(f"Workspace '{workspace_path}' cleaned up.", color='green')) + return result else: print(TerminalFormatter.color_text(f"Folder '{workspace_path}' does not exist.", color='yellow')) return False -def get_workspace_path(nanosaur_ws_name): - """ - Checks if a workspace folder exists in the user's home directory. - :param folder_name: The name of the workspace folder to check. - :return: The full path to the workspace if it exists, or None if it doesn't. - """ - # Check if the script is running with sudo - if os.geteuid() == 0: - # Get the original user's home directory - user_home_dir = os.path.expanduser(f"~{os.getenv('SUDO_USER')}") - else: - # Get the current user's home directory - user_home_dir = os.path.expanduser("~") - - # Create the full path for the workspace folder in the user's home - # directory - workspace_path = os.path.join(user_home_dir, nanosaur_ws_name) +def get_workspace_path(params: Params, ws_name): + # Create the Nanosaur home folder + nanosaur_home_path = create_nanosaur_home(params['nanosaur_home']) + # Create the full path for the workspace folder in the user's home directory + workspace_path = os.path.join(nanosaur_home_path, ws_name) # Check if the workspace folder exists if os.path.exists(workspace_path) and os.path.isdir(workspace_path): @@ -118,28 +119,102 @@ def get_workspace_path(nanosaur_ws_name): return None -def create_workspace(nanosaur_ws_name): - # Check if the script is running with sudo - if os.geteuid() == 0: - # Get the original user's home directory - user_home_dir = os.path.expanduser(f"~{os.getenv('SUDO_USER')}") +def create_workspace(nanosaur_home_path, ws_name): + ws_name_path = os.path.join(nanosaur_home_path, ws_name) + ws_name_path_src = os.path.join(ws_name_path, "src") + # Check if folder exists, if not, create it + if not os.path.exists(ws_name_path_src): + os.makedirs(ws_name_path_src) + print(TerminalFormatter.color_text(f"Workspace '{ws_name}' created in {nanosaur_home_path}.", color='green')) else: - # Get the current user's home directory - user_home_dir = os.path.expanduser("~") - - # Create the full path for the workspace folder in the user's home - # directory - workspace_path = os.path.join(user_home_dir, nanosaur_ws_name) - workspace_path_src = os.path.join(workspace_path, "src") + print(TerminalFormatter.color_text(f"Workspace '{ws_name}' already exists.", color='yellow')) + # Save the default colcon settings + with open(f"{ws_name_path}/colcon_defaults.yaml", 'w') as file: + yaml.dump(COLCON_DEFAULTS, file) + return ws_name_path + + +def build_workspace(branch, workspace_path, rosinstall_name, password, skip_rosdep=False, skip_build=False): + # Download rosinstall for this device + url = f"https://raw.githubusercontent.com/rnanosaur/nanosaur/{branch}/nanosaur/rosinstall/{rosinstall_name}.rosinstall" + rosinstall_path = download_rosinstall(url, workspace_path, f"{rosinstall_name}.rosinstall") + if rosinstall_path is not None: + print(TerminalFormatter.color_text(f"- Fill {rosinstall_name} from {rosinstall_name}.rosinstall", bold=True)) + else: + print(TerminalFormatter.color_text(f"Failed to download {rosinstall_name}.rosinstall Exiting...", color='red')) + return False + # Import workspace + print(TerminalFormatter.color_text(f"- Import workspace from {rosinstall_name}.rosinstall", bold=True)) + # run vcs import to sync the workspace + vcs_status = run_vcs_import(workspace_path, rosinstall_path) + if not vcs_status: + print(TerminalFormatter.color_text("Failed to import workspace", color='red')) + return False + # rosdep workspace + if not skip_rosdep: + print(TerminalFormatter.color_text(f"- Install all dependencies on workspace {workspace_path}", bold=True)) + if not run_rosdep(workspace_path, password): + print(TerminalFormatter.color_text("Failed to install dependencies", color='red')) + return False + # Build environment + if not skip_build: + print(TerminalFormatter.color_text(f"- Build workspace {workspace_path}", bold=True)) + if not run_colcon_build(workspace_path): + print(TerminalFormatter.color_text("Failed to build workspace", color='red')) + return False + # All fine + return True +@require_sudo_password +def create_developer_workspace(platform, params: Params, args, password=None): + # determine the device type + device_type = "robot" if platform['Machine'] == 'jetson' else "desktop" + # Get the Nanosaur home folder and branch + nanosaur_home = params['nanosaur_home'] + branch = params['nanosaur_branch'] + # Create the Nanosaur home folder + nanosaur_home_path = create_nanosaur_home(nanosaur_home) + + # Create the shared source folder + nanosaur_shared_src = os.path.join(nanosaur_home_path, "shared_src") # Check if folder exists, if not, create it - if not os.path.exists(workspace_path_src): - os.makedirs(workspace_path_src) - print(TerminalFormatter.color_text(f"Folder '{workspace_path_src}' created.", color='green')) + if not os.path.exists(nanosaur_shared_src): + os.makedirs(nanosaur_shared_src) + print(TerminalFormatter.color_text(f"Shared src folder created in {nanosaur_home_path}.", color='green')) + # Download rosinstall for this device + url = f"https://raw.githubusercontent.com/rnanosaur/nanosaur/{branch}/nanosaur/rosinstall/shared.rosinstall" + rosinstall_path = download_rosinstall(url, nanosaur_shared_src, "shared.rosinstall") + if rosinstall_path is not None: + print(TerminalFormatter.color_text("- Fill shared src from shared.rosinstall", bold=True)) else: - print(TerminalFormatter.color_text(f"Folder '{workspace_path_src}' already exists.", color='yellow')) - - return workspace_path + print(TerminalFormatter.color_text("Failed to download rosinstall file. Exiting...", color='red')) + return False + # Import workspace + print(TerminalFormatter.color_text("- Import workspace from shared.rosinstall", bold=True)) + # run vcs import to sync the workspace + vcs_status = run_vcs_import(nanosaur_home_path, rosinstall_path, src_folder="shared_src") + if not vcs_status: + print(TerminalFormatter.color_text("Failed to import workspace", color='red')) + return False + + # Make the robot workspace + if device_type == "robot" or args.all_platforms: + # Make the robot workspace + ws_name_path = create_workspace(nanosaur_home_path, params['robot_ws_name']) + if not build_workspace(branch, ws_name_path, device_type, password): + return False + # Make the simulation workspace + if device_type == "desktop" or args.all_platforms: + # Make the simulation workspace + ws_name_path = create_workspace(nanosaur_home_path, params['simulation_ws_name']) + if not build_workspace(branch, ws_name_path, device_type, password): + return False + + # Make the perception workspace + ws_name_path = create_workspace(nanosaur_home_path, params['perception_ws_name']) + build_workspace(branch, ws_name_path, 'perception', password, skip_rosdep=True, skip_build=True) + # Set params in developer mode + params['developer_mode'] = True def download_rosinstall(url, folder_path, file_name): @@ -163,14 +238,14 @@ def download_rosinstall(url, folder_path, file_name): return file_path else: print(TerminalFormatter.color_text(f"Failed to download file. Status code: {response.status_code}", color='red')) - return + return None -def run_vcs_import(workspace_path, rosinstall_path): +def run_vcs_import(workspace_path, rosinstall_path, src_folder="src"): try: # Run the command and stream the output live process = subprocess.Popen( - f"vcs import {workspace_path}/src < {rosinstall_path}", + f"vcs import {workspace_path}/{src_folder} < {rosinstall_path}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE @@ -277,34 +352,57 @@ def run_colcon_build(folder_path): @require_sudo_password def clean(platform, params: Params, args, password=None): - device_type = "robot" if platform['Machine'] == 'jetson' else "desktop" - print(TerminalFormatter.color_text(f"Nanosaur cleaning on {device_type}", bold=True)) - # Check if the workspace exists - workspace_path = get_workspace_path(params['nanosaur_workspace_name']) - if workspace_path is None: - print(TerminalFormatter.color_text(f"There are no {params['nanosaur_workspace_name']} in this device!", color='red')) - return False - # Clean workspace - clean_workspace(workspace_path, password) + if args.workspace is not None: + workspace = args.workspace + else: + workspace = "robot" if platform['Machine'] == 'jetson' else "desktop" + + if workspace == 'robot' or args.all_platforms: + robot_ws_name = params['robot_ws_name'] + robot_ws_path = get_workspace_path(params, robot_ws_name) + print(TerminalFormatter.color_text(f"- Clean robot workspace {robot_ws_name}", bold=True)) + # Clean workspace + clean_workspace(robot_ws_path, password) + + if workspace == 'desktop' or args.all_platforms: + simulation_ws_name = params['simulation_ws_name'] + simulation_ws_path = get_workspace_path(params, simulation_ws_name) + print(TerminalFormatter.color_text(f"- Clean simulation workspace {simulation_ws_name}", bold=True)) + # Clean workspace + clean_workspace(simulation_ws_path, password) + + if args.perception or args.all_platforms: + perception_ws_name = params['perception_ws_name'] + perception_ws_path = get_workspace_path(params, perception_ws_name) + print(TerminalFormatter.color_text(f"- Clean perception workspace {perception_ws_name}", bold=True)) + # Clean workspace + clean_workspace(perception_ws_path, password) + return True @conditional_sudo_password def update(platform, params: Params, args, password=None): - device_type = "robot" if platform['Machine'] == 'jetson' else "desktop" - print(TerminalFormatter.color_text(f"Nanosaur updating on {device_type}", bold=True)) - # Check if the workspace exists - workspace_path = get_workspace_path(params['nanosaur_workspace_name']) - if workspace_path is None: - print(TerminalFormatter.color_text(f"There are no {params['nanosaur_workspace_name']} in this device!", color='red')) - return False - # Clean workspace if force - if args.force: - print(TerminalFormatter.color_text("- Force update", bold=True)) - # Check if the workspace exists - clean_workspace(workspace_path, password) - # Build environment - print(TerminalFormatter.color_text(f"- Build workspace {workspace_path}", bold=True)) - if not run_colcon_build(workspace_path): - return False + if args.workspace is not None: + workspace = args.workspace + else: + workspace = "robot" if platform['Machine'] == 'jetson' else "desktop" + + if workspace == 'robot' or args.all_platforms: + robot_ws_name = params['robot_ws_name'] + robot_ws_path = get_workspace_path(params, robot_ws_name) + print(TerminalFormatter.color_text(f"- Update robot workspace {robot_ws_name}", bold=True)) + # Build environment + if not run_colcon_build(robot_ws_path): + return False + + if workspace == 'desktop' or args.all_platforms: + simulation_ws_name = params['simulation_ws_name'] + simulation_ws_path = get_workspace_path(params, simulation_ws_name) + print(TerminalFormatter.color_text(f"- Update simulation workspace {simulation_ws_name}", bold=True)) + # Build environment + if not run_colcon_build(simulation_ws_path): + return False + + return True # EOF