Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(smart_mpc_trajectory_follower): add smart_mpc_trajectory_follower #6805

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 23 additions & 37 deletions launch/tier4_control_launch/launch/control.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,371 +66,358 @@
aeb_param = yaml.safe_load(f)["/**"]["ros__parameters"]
with open(LaunchConfiguration("predicted_path_checker_param_path").perform(context), "r") as f:
predicted_path_checker_param = yaml.safe_load(f)["/**"]["ros__parameters"]
trajectory_follower_mode = LaunchConfiguration("trajectory_follower_mode").perform(context)

controller_component = ComposableNode(
package="trajectory_follower_node",
plugin="autoware::motion::control::trajectory_follower_node::Controller",
name="controller_node_exe",
namespace="trajectory_follower",
remappings=[
("~/input/reference_trajectory", "/planning/scenario_planning/trajectory"),
("~/input/current_odometry", "/localization/kinematic_state"),
("~/input/current_steering", "/vehicle/status/steering_status"),
("~/input/current_accel", "/localization/acceleration"),
("~/input/current_operation_mode", "/system/operation_mode/state"),
("~/output/predicted_trajectory", "lateral/predicted_trajectory"),
("~/output/lateral_diagnostic", "lateral/diagnostic"),
("~/output/slope_angle", "longitudinal/slope_angle"),
("~/output/longitudinal_diagnostic", "longitudinal/diagnostic"),
("~/output/stop_reason", "longitudinal/stop_reason"),
("~/output/control_cmd", "control_cmd"),
],
parameters=[
{
"lateral_controller_mode": LaunchConfiguration("lateral_controller_mode"),
"longitudinal_controller_mode": LaunchConfiguration("longitudinal_controller_mode"),
},
nearest_search_param,
trajectory_follower_node_param,
lon_controller_param,
lat_controller_param,
vehicle_info_param,
],
extra_arguments=[{"use_intra_process_comms": LaunchConfiguration("use_intra_process")}],
)

# lane departure checker
lane_departure_component = ComposableNode(
package="lane_departure_checker",
plugin="lane_departure_checker::LaneDepartureCheckerNode",
name="lane_departure_checker_node",
namespace="trajectory_follower",
remappings=[
("~/input/odometry", "/localization/kinematic_state"),
("~/input/lanelet_map_bin", "/map/vector_map"),
("~/input/route", "/planning/mission_planning/route"),
("~/input/reference_trajectory", "/planning/scenario_planning/trajectory"),
(
"~/input/predicted_trajectory",
"/control/trajectory_follower/lateral/predicted_trajectory",
),
],
parameters=[nearest_search_param, lane_departure_checker_param, vehicle_info_param],
extra_arguments=[{"use_intra_process_comms": LaunchConfiguration("use_intra_process")}],
)

# shift decider
shift_decider_component = ComposableNode(
package="shift_decider",
plugin="ShiftDecider",
name="shift_decider",
remappings=[
("input/control_cmd", "/control/trajectory_follower/control_cmd"),
("input/state", "/autoware/state"),
("input/current_gear", "/vehicle/status/gear_status"),
("output/gear_cmd", "/control/shift_decider/gear_cmd"),
],
parameters=[
shift_decider_param,
],
extra_arguments=[{"use_intra_process_comms": LaunchConfiguration("use_intra_process")}],
)

# autonomous emergency braking
autonomous_emergency_braking = ComposableNode(
package="autonomous_emergency_braking",
plugin="autoware::motion::control::autonomous_emergency_braking::AEB",
name="autonomous_emergency_braking",
remappings=[
("~/input/pointcloud", "/perception/obstacle_segmentation/pointcloud"),
("~/input/velocity", "/vehicle/status/velocity_status"),
("~/input/imu", "/sensing/imu/imu_data"),
("~/input/odometry", "/localization/kinematic_state"),
(
"~/input/predicted_trajectory",
"/control/trajectory_follower/lateral/predicted_trajectory",
),
],
parameters=[
aeb_param,
],
extra_arguments=[{"use_intra_process_comms": LaunchConfiguration("use_intra_process")}],
)

autonomous_emergency_braking_loader = LoadComposableNodes(
condition=IfCondition(LaunchConfiguration("enable_autonomous_emergency_braking")),
composable_node_descriptions=[autonomous_emergency_braking],
target_container="/control/control_container",
)

# autonomous emergency braking
predicted_path_checker = ComposableNode(
package="predicted_path_checker",
plugin="autoware::motion::control::predicted_path_checker::PredictedPathCheckerNode",
name="predicted_path_checker",
remappings=[
("~/input/objects", "/perception/object_recognition/objects"),
("~/input/reference_trajectory", "/planning/scenario_planning/trajectory"),
("~/input/current_accel", "/localization/acceleration"),
("~/input/odometry", "/localization/kinematic_state"),
(
"~/input/predicted_trajectory",
"/control/trajectory_follower/lateral/predicted_trajectory",
),
],
parameters=[
vehicle_info_param,
predicted_path_checker_param,
],
extra_arguments=[{"use_intra_process_comms": LaunchConfiguration("use_intra_process")}],
)

predicted_path_checker_loader = LoadComposableNodes(
condition=IfCondition(LaunchConfiguration("enable_predicted_path_checker")),
composable_node_descriptions=[predicted_path_checker],
target_container="/control/control_container",
)

# vehicle cmd gate
vehicle_cmd_gate_component = ComposableNode(
package="vehicle_cmd_gate",
plugin="vehicle_cmd_gate::VehicleCmdGate",
name="vehicle_cmd_gate",
remappings=[
("input/steering", "/vehicle/status/steering_status"),
("input/operation_mode", "/system/operation_mode/state"),
("input/auto/control_cmd", "/control/trajectory_follower/control_cmd"),
("input/auto/turn_indicators_cmd", "/planning/turn_indicators_cmd"),
("input/auto/hazard_lights_cmd", "/planning/hazard_lights_cmd"),
("input/auto/gear_cmd", "/control/shift_decider/gear_cmd"),
("input/external/control_cmd", "/external/selected/control_cmd"),
("input/external/turn_indicators_cmd", "/external/selected/turn_indicators_cmd"),
("input/external/hazard_lights_cmd", "/external/selected/hazard_lights_cmd"),
("input/external/gear_cmd", "/external/selected/gear_cmd"),
("input/external_emergency_stop_heartbeat", "/external/selected/heartbeat"),
("input/gate_mode", "/control/gate_mode_cmd"),
("input/emergency/control_cmd", "/system/emergency/control_cmd"),
("input/emergency/hazard_lights_cmd", "/system/emergency/hazard_lights_cmd"),
("input/emergency/gear_cmd", "/system/emergency/gear_cmd"),
("input/mrm_state", "/system/fail_safe/mrm_state"),
("input/kinematics", "/localization/kinematic_state"),
("input/acceleration", "/localization/acceleration"),
("output/vehicle_cmd_emergency", "/control/command/emergency_cmd"),
("output/control_cmd", "/control/command/control_cmd"),
("output/gear_cmd", "/control/command/gear_cmd"),
("output/turn_indicators_cmd", "/control/command/turn_indicators_cmd"),
("output/hazard_lights_cmd", "/control/command/hazard_lights_cmd"),
("output/gate_mode", "/control/current_gate_mode"),
("output/engage", "/api/autoware/get/engage"),
("output/external_emergency", "/api/autoware/get/emergency"),
("output/operation_mode", "/control/vehicle_cmd_gate/operation_mode"),
("~/service/engage", "/api/autoware/set/engage"),
("~/service/external_emergency", "/api/autoware/set/emergency"),
# TODO(Takagi, Isamu): deprecated
("input/engage", "/autoware/engage"),
("~/service/external_emergency_stop", "~/external_emergency_stop"),
("~/service/clear_external_emergency_stop", "~/clear_external_emergency_stop"),
],
parameters=[
vehicle_cmd_gate_param,
vehicle_info_param,
{
"check_external_emergency_heartbeat": LaunchConfiguration(
"check_external_emergency_heartbeat"
),
},
],
extra_arguments=[{"use_intra_process_comms": LaunchConfiguration("use_intra_process")}],
)

# operation mode transition manager
operation_mode_transition_manager_component = ComposableNode(
package="operation_mode_transition_manager",
plugin="operation_mode_transition_manager::OperationModeTransitionManager",
name="operation_mode_transition_manager",
remappings=[
# input
("kinematics", "/localization/kinematic_state"),
("steering", "/vehicle/status/steering_status"),
("trajectory", "/planning/scenario_planning/trajectory"),
("control_cmd", "/control/command/control_cmd"),
("trajectory_follower_control_cmd", "/control/trajectory_follower/control_cmd"),
("control_mode_report", "/vehicle/status/control_mode"),
("gate_operation_mode", "/control/vehicle_cmd_gate/operation_mode"),
# output
("is_autonomous_available", "/control/is_autonomous_available"),
("control_mode_request", "/control/control_mode_request"),
],
parameters=[
nearest_search_param,
operation_mode_transition_manager_param,
vehicle_info_param,
],
)

# external cmd selector
external_cmd_selector_loader = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[FindPackageShare("external_cmd_selector"), "/launch/external_cmd_selector.launch.py"]
),
launch_arguments=[
("use_intra_process", LaunchConfiguration("use_intra_process")),
("target_container", "/control/control_container"),
(
"external_cmd_selector_param_path",
LaunchConfiguration("external_cmd_selector_param_path"),
),
],
)

# external cmd converter
external_cmd_converter_loader = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[FindPackageShare("external_cmd_converter"), "/launch/external_cmd_converter.launch.py"]
),
launch_arguments=[
("use_intra_process", LaunchConfiguration("use_intra_process")),
("target_container", "/control/control_container"),
],
)

# obstacle collision checker
obstacle_collision_checker_component = ComposableNode(
package="obstacle_collision_checker",
plugin="obstacle_collision_checker::ObstacleCollisionCheckerNode",
name="obstacle_collision_checker",
remappings=[
("input/lanelet_map_bin", "/map/vector_map"),
("input/obstacle_pointcloud", "/perception/obstacle_segmentation/pointcloud"),
("input/reference_trajectory", "/planning/scenario_planning/trajectory"),
(
"input/predicted_trajectory",
"/control/trajectory_follower/lateral/predicted_trajectory",
),
("input/odometry", "/localization/kinematic_state"),
],
parameters=[
obstacle_collision_checker_param,
vehicle_info_param,
],
extra_arguments=[{"use_intra_process_comms": LaunchConfiguration("use_intra_process")}],
)

obstacle_collision_checker_loader = LoadComposableNodes(
condition=IfCondition(LaunchConfiguration("enable_obstacle_collision_checker")),
composable_node_descriptions=[obstacle_collision_checker_component],
target_container="/control/control_container",
)

glog_component = ComposableNode(
package="glog_component",
plugin="GlogComponent",
name="glog_component",
)

# set container to run all required components in the same process
if trajectory_follower_mode == "trajectory_follower_node":
container = ComposableNodeContainer(
name="control_container",
namespace="",
package="rclcpp_components",
executable=LaunchConfiguration("container_executable"),
composable_node_descriptions=[
controller_component,
lane_departure_component,
shift_decider_component,
vehicle_cmd_gate_component,
operation_mode_transition_manager_component,
glog_component,
],
)
control_validator_component = ComposableNode(
package="control_validator",
plugin="control_validator::ControlValidator",
name="control_validator",
remappings=[
("~/input/kinematics", "/localization/kinematic_state"),
("~/input/reference_trajectory", "/planning/scenario_planning/trajectory"),
(
"~/input/predicted_trajectory",
"/control/trajectory_follower/lateral/predicted_trajectory",
),
("~/output/validation_status", "~/validation_status"),
],
parameters=[control_validator_param],
)

else:
container = ComposableNodeContainer(
name="control_container",
namespace="",
package="rclcpp_components",
executable=LaunchConfiguration("container_executable"),
composable_node_descriptions=[
lane_departure_component,
shift_decider_component,
vehicle_cmd_gate_component,
operation_mode_transition_manager_component,
glog_component,
],
)
control_validator_component = ComposableNode(
package="control_validator",
plugin="control_validator::ControlValidator",
name="control_validator",
remappings=[
("~/input/kinematics", "/localization/kinematic_state"),
("~/input/reference_trajectory", "/planning/scenario_planning/trajectory"),
(
"~/input/predicted_trajectory",
"/control/trajectory_follower/lateral/predicted_trajectory",
),
("~/output/validation_status", "~/validation_status"),
],
parameters=[control_validator_param],
)

control_validator_component = ComposableNode(
package="control_validator",
plugin="control_validator::ControlValidator",
name="control_validator",
remappings=[
("~/input/kinematics", "/localization/kinematic_state"),
("~/input/reference_trajectory", "/planning/scenario_planning/trajectory"),
(
"~/input/predicted_trajectory",
"/control/trajectory_follower/lateral/predicted_trajectory",
),
("~/output/validation_status", "~/validation_status"),
],
parameters=[control_validator_param],
)

group = GroupAction(
[
PushRosNamespace("control"),
container,
external_cmd_selector_loader,
external_cmd_converter_loader,
obstacle_collision_checker_loader,
autonomous_emergency_braking_loader,
predicted_path_checker_loader,
]
)

control_validator_group = GroupAction(
[
PushRosNamespace("control"),
ComposableNodeContainer(
name="control_validator_container",
namespace="",
package="rclcpp_components",
executable=LaunchConfiguration("container_executable"),
composable_node_descriptions=[
control_validator_component,
ComposableNode(
package="glog_component",
plugin="GlogComponent",
name="glog_validator_component",
),
],
),
]
)

smart_mpc_trajectory_follower = Node(
package="smart_mpc_trajectory_follower",
executable="pympc_trajectory_follower.py",
name="pympc_trajectory_follower",
)
if trajectory_follower_mode == "trajectory_follower_node":
return [group, control_validator_group]
elif trajectory_follower_mode == "smart_mpc_trajectory_follower":
return [group, control_validator_group, smart_mpc_trajectory_follower]

Check warning on line 420 in launch/tier4_control_launch/launch/control.launch.py

View check run for this annotation

CodeScene Delta Analysis / CodeScene Cloud Delta Analysis (main)

❌ Getting worse: Large Method

launch_setup increases from 328 to 352 lines of code, threshold = 70. Large functions with many lines of code are generally harder to understand and lower the code health. Avoid adding more lines to this function.


def generate_launch_description():
Expand Down Expand Up @@ -468,7 +455,7 @@

# component
add_launch_arg("use_intra_process", "false", "use ROS 2 component container communication")
add_launch_arg("use_multithread", "false", "use multithread")
add_launch_arg("use_multithread", "true", "use multithread")
add_launch_arg(
"trajectory_follower_mode",
"trajectory_follower_node",
Expand All @@ -480,12 +467,11 @@
condition=UnlessCondition(LaunchConfiguration("use_multithread")),
)

if True:
set_container_mt_executable = SetLaunchConfiguration(
"container_executable",
"component_container_mt",
condition=IfCondition(LaunchConfiguration("use_multithread")),
)
set_container_mt_executable = SetLaunchConfiguration(
"container_executable",
"component_container_mt",
condition=IfCondition(LaunchConfiguration("use_multithread")),
)
return launch.LaunchDescription(
launch_arguments
+ [
Expand Down
Loading