10
10
from driving_log_replayer_cli .core .result import convert_all
11
11
from driving_log_replayer_cli .core .result import display_all
12
12
from driving_log_replayer_cli .core .scenario import Datasets
13
+ from driving_log_replayer_cli .core .scenario import get_dry_run_scenario_path
13
14
from driving_log_replayer_cli .core .scenario import load_scenario
14
15
from driving_log_replayer_cli .core .scenario import Scenario
15
16
from driving_log_replayer_cli .simulation .update import update_annotationless_scenario_condition
@@ -48,19 +49,20 @@ def run(
48
49
continue
49
50
scenario = load_scenario (scenario_file )
50
51
launch_cmd = None
52
+ launch_arg_dict = args_to_dict (launch_args )
51
53
if scenario .Evaluation ["UseCaseName" ] in USE_T4_DATASET :
52
54
launch_cmd = cmd_use_t4_dataset (
53
55
scenario_file ,
54
56
output_case ,
55
57
config .autoware_path ,
56
- launch_args ,
58
+ launch_arg_dict ,
57
59
)
58
60
else :
59
61
launch_cmd = cmd_use_bag_only (
60
62
scenario_file ,
61
63
output_case ,
62
64
config .autoware_path ,
63
- launch_args ,
65
+ launch_arg_dict ,
64
66
)
65
67
66
68
if launch_cmd is None :
@@ -93,6 +95,54 @@ def run(
93
95
display_all (output_dir_by_time )
94
96
95
97
98
+ def dry_run (
99
+ config : Config ,
100
+ use_case : str ,
101
+ launch_args : list [str ],
102
+ ) -> None :
103
+ output_dir_by_time = create_output_dir_by_time (config .output_directory )
104
+ output_case = output_dir_by_time .joinpath ("dry_run" )
105
+ scenario_file = get_dry_run_scenario_path (use_case )
106
+ scenario = load_scenario (scenario_file )
107
+ launch_cmd = None
108
+ launch_arg_dict = args_to_dict (launch_args )
109
+ if scenario .Evaluation ["UseCaseName" ] in USE_T4_DATASET :
110
+ launch_cmd = cmd_use_t4_dataset (
111
+ scenario_file ,
112
+ output_case ,
113
+ config .autoware_path ,
114
+ launch_arg_dict ,
115
+ )
116
+ else :
117
+ launch_cmd = cmd_use_bag_only (
118
+ scenario_file ,
119
+ output_case ,
120
+ config .autoware_path ,
121
+ launch_arg_dict ,
122
+ )
123
+
124
+ if launch_cmd is None :
125
+ return
126
+
127
+ # create dir for output bag and result.jsonl
128
+ output_case .mkdir (exist_ok = True )
129
+
130
+ # save command as bash script
131
+ run_script = output_case .joinpath ("run.bash" )
132
+ with run_script .open ("w" ) as f :
133
+ f .write (launch_cmd )
134
+
135
+ # run simulation
136
+ cmd = ["/bin/bash" , run_script .as_posix ()]
137
+ try :
138
+ run_with_log (cmd , output_case .joinpath ("console.log" ))
139
+ except KeyboardInterrupt :
140
+ termcolor .cprint ("Simulation execution canceled by Ctrl+C" , "red" )
141
+ # convert result file and display result
142
+ convert_all (output_dir_by_time )
143
+ display_all (output_dir_by_time )
144
+
145
+
96
146
def create_output_dir_by_time (base_path : Path ) -> Path :
97
147
output_dir_by_time = base_path .joinpath (
98
148
datetime .datetime .now ().strftime ("%Y-%m%d-%H%M%S" ) # noqa
@@ -117,22 +167,29 @@ def get_scenario_file(dataset_path: Path) -> Path | None:
117
167
return scenario_file_path
118
168
119
169
120
- def extract_arg (launch_args : list [str ]) -> str :
121
- extract_launch_arg = ""
170
+ def args_to_dict (launch_args : list [str ]) -> dict [ str , str ] :
171
+ launch_arg_dict = {}
122
172
for l_arg in launch_args :
123
- if len (l_arg .split (":=" )) != 2 : # noqa
173
+ try :
174
+ key , value = l_arg .split (":=" )
175
+ launch_arg_dict [key ] = value
176
+ except ValueError :
124
177
# invalid argument
125
178
termcolor .cprint (
126
179
f"{ l_arg } is ignored because it is invalid" ,
127
180
"red" ,
128
181
)
129
- else : # noqa
130
- # Enclose in single quotes to avoid being treated as special characters by shell
131
- if "{" in l_arg or "[" in l_arg :
132
- extract_launch_arg += f" '{ l_arg } '"
133
- else :
134
- extract_launch_arg += f" { l_arg } "
135
- return extract_launch_arg
182
+ return launch_arg_dict
183
+
184
+
185
+ def launch_dict_to_str (launch_arg_dict : dict ) -> str :
186
+ rtn_str = ""
187
+ for k , v in launch_arg_dict .items ():
188
+ if isinstance (v , str ) and ("{" in v or "[" in v ):
189
+ rtn_str += f" '{ k } :={ v } '"
190
+ else :
191
+ rtn_str += f" { k } :={ v } "
192
+ return rtn_str
136
193
137
194
138
195
def clean_up_cmd () -> str :
@@ -155,30 +212,40 @@ def cmd_use_bag_only(
155
212
scenario_path : Path ,
156
213
output_path : Path ,
157
214
autoware_path : Path ,
158
- launch_args : str ,
215
+ launch_args_dict : dict [ str , str ] ,
159
216
) -> str | None :
160
217
scenario : Scenario = load_scenario (scenario_path )
161
- launch_command = f"source { autoware_path .joinpath ('install' , 'setup.bash' ).as_posix ()} \n "
162
- launch_command += f"ros2 launch driving_log_replayer { scenario .Evaluation ['UseCaseName' ]} .launch.py map_path:={ scenario .LocalMapPath } vehicle_model:={ scenario .VehicleModel } sensor_model:={ scenario .SensorModel } vehicle_id:={ scenario .VehicleId } "
163
- launch_command += f" scenario_path:={ scenario_path .as_posix ()} result_json_path:={ output_path .joinpath ('result.json' ).as_posix ()} input_bag:={ scenario_path .parent .joinpath ('input_bag' ).as_posix ()} result_bag_path:={ output_path .joinpath ('result_bag' ).as_posix ()} "
218
+ launch_command = f"source { autoware_path .joinpath ('install' , 'setup.bash' )} \n "
219
+ launch_command += (
220
+ f"ros2 launch driving_log_replayer { scenario .Evaluation ['UseCaseName' ]} .launch.py"
221
+ )
222
+ launch_arg_dict_scenario = {
223
+ "map_path" : scenario .LocalMapPath ,
224
+ "vehicle_model" : scenario .VehicleModel ,
225
+ "sensor_model" : scenario .SensorModel ,
226
+ "vehicle_id" : scenario .VehicleId ,
227
+ "scenario_path" : scenario_path ,
228
+ "result_json_path" : output_path .joinpath ("result.json" ),
229
+ "input_bag" : scenario_path .parent .joinpath ("input_bag" ),
230
+ "result_bag_path" : output_path .joinpath ("result_bag" ),
231
+ }
164
232
launch_localization = scenario .Evaluation .get ("LaunchLocalization" )
165
233
if launch_localization is not None :
166
- launch_command += f" localization:={ launch_localization } "
167
- launch_command += extract_arg (launch_args )
234
+ launch_arg_dict_scenario ["localization" ] = launch_localization
235
+ launch_arg_dict_scenario .update (launch_args_dict )
236
+ launch_command += launch_dict_to_str (launch_arg_dict_scenario ) + "\n "
168
237
return launch_command + clean_up_cmd ()
169
238
170
239
171
240
def cmd_use_t4_dataset (
172
241
scenario_path : Path ,
173
242
output_path : Path ,
174
243
autoware_path : Path ,
175
- launch_args : str ,
244
+ launch_args_dict : dict [ str , str ] ,
176
245
) -> str | None :
177
246
dataset_path = scenario_path .parent
178
247
scenario : Scenario = load_scenario (scenario_path )
179
- launch_command_for_all_dataset = (
180
- f"source { autoware_path .joinpath ('install' , 'setup.bash' ).as_posix ()} \n "
181
- )
248
+ launch_command_for_all_dataset = f"source { autoware_path .joinpath ('install' , 'setup.bash' )} \n "
182
249
t4_dataset_base_path = dataset_path .joinpath ("t4_dataset" )
183
250
try :
184
251
t4_datasets : Datasets = Datasets (Datasets = scenario .Evaluation ["Datasets" ])
@@ -202,12 +269,25 @@ def cmd_use_t4_dataset(
202
269
)
203
270
continue
204
271
205
- launch_command = f"ros2 launch driving_log_replayer { scenario .Evaluation ['UseCaseName' ]} .launch.py map_path:={ map_path } vehicle_model:={ scenario .VehicleModel } sensor_model:={ scenario .SensorModel } vehicle_id:={ vehicle_id } "
206
- launch_command += f" scenario_path:={ scenario_path .as_posix ()} result_json_path:={ output_dir_per_dataset .joinpath ('result.json' ).as_posix ()} input_bag:={ t4_dataset_path .joinpath ('input_bag' ).as_posix ()} result_bag_path:={ output_dir_per_dataset .joinpath ('result_bag' ).as_posix ()} "
207
- launch_command += f" t4_dataset_path:={ t4_dataset_path } result_archive_path:={ output_dir_per_dataset .joinpath ('result_archive' ).as_posix ()} "
272
+ launch_command = (
273
+ f"ros2 launch driving_log_replayer { scenario .Evaluation ['UseCaseName' ]} .launch.py"
274
+ )
275
+ launch_arg_dict_dataset = {
276
+ "map_path" : map_path ,
277
+ "vehicle_model" : scenario .VehicleModel ,
278
+ "sensor_model" : scenario .SensorModel ,
279
+ "vehicle_id" : vehicle_id ,
280
+ "scenario_path" : scenario_path ,
281
+ "result_json_path" : output_dir_per_dataset .joinpath ("result.json" ),
282
+ "input_bag" : t4_dataset_path .joinpath ("input_bag" ),
283
+ "result_bag_path" : output_dir_per_dataset .joinpath ("result_bag" ),
284
+ "t4_dataset_path" : t4_dataset_path ,
285
+ "result_archive_path" : output_dir_per_dataset .joinpath ("result_archive" ),
286
+ }
208
287
if t4_dataset [key ].LaunchSensing is not None :
209
- launch_command += f" sensing:={ t4_dataset [key ].LaunchSensing } "
210
- launch_command += extract_arg (launch_args )
288
+ launch_arg_dict_dataset ["sensing" ] = t4_dataset [key ].LaunchSensing
289
+ launch_arg_dict_dataset .update (launch_args_dict )
290
+ launch_command += launch_dict_to_str (launch_arg_dict_dataset ) + "\n "
211
291
launch_command += clean_up_cmd ()
212
292
launch_command_for_all_dataset += launch_command
213
293
if is_database_evaluation :
@@ -218,6 +298,8 @@ def cmd_use_t4_dataset(
218
298
"driving_log_replayer" ,
219
299
"perception_database_result.py" ,
220
300
)
221
- database_result_command = f"python3 { database_result_script_path .as_posix ()} -s { scenario_path } -r { output_path } \n "
301
+ database_result_command = (
302
+ f"python3 { database_result_script_path } -s { scenario_path } -r { output_path } \n "
303
+ )
222
304
launch_command_for_all_dataset += database_result_command
223
305
return launch_command_for_all_dataset
0 commit comments