From 8236c78d72fb20c8e04e6e43f4667f52dde59533 Mon Sep 17 00:00:00 2001 From: Raffaello Bonghi Date: Sat, 18 Jan 2025 17:51:10 +0000 Subject: [PATCH] Add docker Isaac ROS developer control --- src/nanosaur/main.py | 4 ++ src/nanosaur/workspace.py | 77 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/src/nanosaur/main.py b/src/nanosaur/main.py index 78fe80e..464121c 100644 --- a/src/nanosaur/main.py +++ b/src/nanosaur/main.py @@ -97,6 +97,10 @@ def parser_workspace_menu(subparsers: argparse._SubParsersAction) -> argparse.Ar parser_workspace_update.add_argument( '--all-platforms', '--all', action='store_true', help="Clean all workspaces") parser_workspace_update.set_defaults(func=workspace.update) + # Add workspace perception subcommand + parser_workspace_perception = workspace_subparsers.add_parser( + 'perception', help="Start the Isaac ROS docker container") + parser_workspace_perception.set_defaults(func=workspace.run_dev_script) return parser_workspace diff --git a/src/nanosaur/workspace.py b/src/nanosaur/workspace.py index ae67973..953ec37 100644 --- a/src/nanosaur/workspace.py +++ b/src/nanosaur/workspace.py @@ -29,8 +29,11 @@ import requests import yaml import subprocess +import pty +import select +import termios from nanosaur.prompt_colors import TerminalFormatter -from nanosaur.utilities import Params, create_nanosaur_home, require_sudo_password, conditional_sudo_password +from nanosaur.utilities import Params, get_nanosaur_home, create_nanosaur_home, require_sudo_password, conditional_sudo_password ros2_distro = 'humble' ros2_sources = f'/opt/ros/{ros2_distro}/setup.bash' @@ -43,6 +46,78 @@ } } +def run_dev_script(platform, params: Params, args): + + perception_path = get_workspace_path(params, params['perception_ws_name']) + isaac_ros_common_path = os.path.join(perception_path, 'src', 'isaac_ros_common') + # Get the path to the Isaac ROS common package + os.chdir(isaac_ros_common_path) + print(f"Changed directory to: {isaac_ros_common_path}") + + # Save terminal settings to restore later + original_termios = termios.tcgetattr(sys.stdin) + + # Switch terminal to raw mode for better control over input + def set_raw_mode(): + tty = sys.stdin.fileno() + new_termios = termios.tcgetattr(tty) + new_termios[3] = new_termios[3] & ~termios.ICANON & ~termios.ECHO # Disable canonical mode and echo + termios.tcsetattr(tty, termios.TCSANOW, new_termios) + + # Restore the terminal settings + def restore_terminal(): + termios.tcsetattr(sys.stdin, termios.TCSANOW, original_termios) + + # Open a pseudo-terminal + master_fd, slave_fd = pty.openpty() + + nanosaur_home_path = get_nanosaur_home(params['nanosaur_home']) + + # Start the subprocess with the slave side of the pseudo-terminal as its stdio + process = subprocess.Popen( + ["./scripts/run_dev.sh", "-d", nanosaur_home_path], + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + universal_newlines=True + ) + + try: + set_raw_mode() + while True: + # Use select to handle input/output + rlist, _, _ = select.select([sys.stdin, master_fd], [], []) + + if sys.stdin in rlist: # Check for input from the user + user_input = os.read(sys.stdin.fileno(), 1024) # Read input from the terminal + + # Check for CTRL-D + if b'\x04' in user_input: # CTRL-D (End of Transmission) + os.write(master_fd, b'\x04') # Send CTRL-D to the subprocess + break # Exit the loop after sending CTRL-D + + os.write(master_fd, user_input) # Write input to the subprocess + + if master_fd in rlist: # Check for output from the subprocess + output = os.read(master_fd, 1024).decode() # Read from the subprocess + if output: + sys.stdout.write(output) # Write it to the terminal + sys.stdout.flush() + + # If the subprocess has terminated, exit the loop + if process.poll() is not None: + break + + # Ensure the subprocess is terminated (in case it didn't handle CTRL-D) + if process.poll() is None: + process.terminate() + process.wait() + finally: + restore_terminal() # Restore terminal settings + os.close(master_fd) + os.close(slave_fd) + + print(TerminalFormatter.color_text("Dev script finished", color='green')) def clean_workspace(nanosaur_ws_name, password): """