From fafc277703b229788d0da6081bd9ef88832bc39d Mon Sep 17 00:00:00 2001 From: Joseph VanPelt Date: Fri, 20 Dec 2024 14:48:03 -0500 Subject: [PATCH 1/4] add new script to output the belief file csv --- .../global_step_prediction/belief_file.py | 94 +++++++++++ .../get_bbn_belief_file.py | 154 ++++++++++++++++++ 2 files changed, 248 insertions(+) create mode 100644 angel_system/global_step_prediction/belief_file.py create mode 100644 angel_system/global_step_prediction/get_bbn_belief_file.py diff --git a/angel_system/global_step_prediction/belief_file.py b/angel_system/global_step_prediction/belief_file.py new file mode 100644 index 000000000..5a1b70b6e --- /dev/null +++ b/angel_system/global_step_prediction/belief_file.py @@ -0,0 +1,94 @@ +""" +This class is meant to create a CSV file in the format that the BBN system expects. +""" + +# Max number of seconds in the first time difference between when +# the node started and when the first update came in. +# if larger, a correction will be made to make up for start up time +START_UP_TIME_MAX = 0.6 + +class BState: + """ + Enum for the state of the belief + """ + def __init__(self): + self.current = "current" + self.unobserved = "unobserved" + self.done = "done" + + +class BeliefFile: + def __init__(self, filename: str, skill: str, labels: list, start_time: int): + self.filename = filename + self.skill = skill + self.labels = labels + self.running_state = {} # keeps track of the steps + # initialize the running states + for label in labels: + self.running_state[label] = BState().unobserved + # set the first step to current + # NOTE: the example files given had this set to current + # from the very beginnning - an assumption we are making here, too + self.running_state[1.0] = BState().current + # this will be used to calculate the current time in the video + self.start_time = start_time + + # initialize the file - in case we need to overwrite it + with open(self.filename, 'w') as f: + f.write("") + + # flag for handling how long it takes to start up the video + self.first_time_diff = True + + def _add_row_to_file(self, row: str) -> None: + # append the row to the file + with open(self.filename, 'a') as f: + f.write(row) + + def _add_rows(self, conf_array: list, ctime: float) -> None: + """ + Add multiple rows to the file based on the labels + """ + # , , , , + row = self.skill + + # add the rows + for step in self.labels: + _row = row + f",{step},{self.running_state[step]}," + _row = _row + f"{conf_array[int(step)]},{ctime}\n" # _row = _row + f"{conf_array[int(step)]:0.8f},{ctime:0.8f}\n" + self._add_row_to_file(_row) + + def final_step_done(self) -> None: + """ + This method is called when the final step is done. + """ + # set the final step + self.running_state[self.labels[-1]] = BState().done + + def update_values(self, current_step: float, conf_array: list, current_time: int) -> None: + """ + When you provide an update, this method will update internal state + and trigger a write to the file. + """ + curr_time = float(current_time - self.start_time) * 1e-9 # get seconds from nano + + # correction of the starting time if we notice that the first + # time difference is too large + if self.first_time_diff and curr_time > START_UP_TIME_MAX: + self.first_time_diff = False + self.start_time = current_time # save this for the next update + # assume 0 for now + curr_time = 0.0 + + # check the states and see if they changed + if current_step > 0 and self.running_state[current_step] != BState().current: + # set the current step + self.running_state[current_step] = BState().current + + # see if the previous state was current - that means we change it to done + prev_step = current_step - 1.0 + if prev_step > 0 and self.running_state[prev_step] == BState().current: + self.running_state[prev_step] = BState().done + + # write the rows to the file + self._add_rows(conf_array, curr_time) diff --git a/angel_system/global_step_prediction/get_bbn_belief_file.py b/angel_system/global_step_prediction/get_bbn_belief_file.py new file mode 100644 index 000000000..e6ed4c4fd --- /dev/null +++ b/angel_system/global_step_prediction/get_bbn_belief_file.py @@ -0,0 +1,154 @@ +""" +This script will take a kwcoco file that was output from the TCN node (for example) +and output the belief file that is used by the BBN eval_kit. The belief file is a CSV. +""" +from pathlib import Path + +import click +import kwcoco +import numpy as np +import yaml + +from angel_system.global_step_prediction.belief_file import BeliefFile +from angel_system.global_step_prediction.global_step_predictor import ( + GlobalStepPredictor, +) + +# TODO: make these options in the future? +threshold_multiplier_weak = 1.0 +threshold_frame_count = 3 +threshold_frame_count_weak = 8 +deactivate_thresh_frame_count = 8 + +def get_belief_file( + coco_ds: kwcoco.CocoDataset, + medical_task="r18", + code_dir=Path("."), + out_file=Path("./belief_file.csv"), + model_file=Path("./model_files/task_monitor/global_step_predictor_act_avgs_R18.npy"), +) -> None: + """ + Run the inference and create the belief file. + """ + + # path to the medical activity labels + act_path = code_dir / "config/activity_labels/medical" / f"{medical_task}.yaml" + + # load the steps from the activity config file + with open(act_path, "r") as stream: + config = yaml.safe_load(stream) + labels = [] + for lbl in config["labels"]: + id = float(lbl["id"]) # using float based on the belief file format + if id > 0: # skip the background label - not used in belief format + labels.append(id) + print(f"Labels: {labels}") + + start_time = 0 # start of the video + + # setup the belief file + print(f"setting up output: {out_file}") + belief = BeliefFile(out_file, medical_task.upper(), labels, start_time) + + # setup the global step predictor + gsp = GlobalStepPredictor( + threshold_multiplier_weak=threshold_multiplier_weak, + threshold_frame_count=threshold_frame_count, + threshold_frame_count_weak=threshold_frame_count_weak, + deactivate_thresh_frame_count=deactivate_thresh_frame_count, + recipe_types=[f"{medical_task}"], + activity_config_fpath=act_path.as_posix(), + recipe_config_dict={ + f"{medical_task}": code_dir + / "config/tasks/medical" + / f"{medical_task}.yaml" + }, + ) + # load the model + gsp.get_average_TP_activations_from_file(model_file) + + all_vid_ids = np.unique(np.asarray(coco_ds.images().lookup("video_id"))) + for vid_id in all_vid_ids: + print(f"vid_id {vid_id}===========================") + + image_ids = coco_ds.index.vidid_to_gids[vid_id] + annots_images = coco_ds.subset(gids=image_ids, copy=True) + + # All N activity confs x each video frame + activity_confs = annots_images.annots().get("prob") + + # get the frame_index from the images + ftimes = annots_images.images().lookup("frame_index") + #print(ftimes) + + step_mode = "granular" + for i, conf_array in enumerate(activity_confs): + current_time = ftimes[i] # get the time from the image's frame_index + + if current_time > 0: # skip any 0 index frames + tracker_dict_list = gsp.process_new_confidences(np.array([conf_array])) + for task in tracker_dict_list: + current_step_id = task[f"current_{step_mode}_step"] + + # If we are on the last step and it is not active, mark it as done + if ( + current_step_id == task[f"total_num_{step_mode}_steps"] - 1 + and not task["active"] + ): + belief.final_step_done() + + print(f"Updating based on: {current_time}") + belief.update_values(current_step_id, conf_array, current_time) + + print(f"finished writing belief file: {out_file}") + + +@click.command(context_settings={"help_option_names": ["-h", "--help"]}) +@click.argument( + "medical_task", + type=str, +) +@click.argument( + "coco_file", + type=click.Path( + exists=True, dir_okay=False, readable=True, resolve_path=True, path_type=Path + ), + default="./stuff/r18_bench1_activity_predictions.kwcoco", +) +@click.option( + "--code_dir", + type=click.Path( + exists=True, file_okay=False, readable=True, resolve_path=True, path_type=Path + ), + default=".", + help="The path to the code directory", +) +@click.option( + "--out_file", + type=click.Path(readable=True, resolve_path=True, path_type=Path), + default="./belief_file.csv", + help="The path to where to save the output file", +) +def run_expirement( + medical_task: str, + coco_file: Path, + code_dir: Path, + out_file: Path, +) -> None: + """ + Creates the belief file. + """ + + print(f"Running medical task: {medical_task}") + print(f"coco_file = {coco_file}") + + get_belief_file( + kwcoco.CocoDataset(coco_file), + medical_task=medical_task, + code_dir=code_dir, + out_file=out_file, + ) + + +if __name__ == "__main__": + run_expirement() From 0dd80f5461612ea1649ddceabe25e6daa3ba9cc1 Mon Sep 17 00:00:00 2001 From: Joseph VanPelt Date: Fri, 20 Dec 2024 14:51:14 -0500 Subject: [PATCH 2/4] black formatting --- .../global_step_prediction/belief_file.py | 18 +++++++++++++----- .../get_bbn_belief_file.py | 7 +++++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/angel_system/global_step_prediction/belief_file.py b/angel_system/global_step_prediction/belief_file.py index 5a1b70b6e..b1ac4f450 100644 --- a/angel_system/global_step_prediction/belief_file.py +++ b/angel_system/global_step_prediction/belief_file.py @@ -7,10 +7,12 @@ # if larger, a correction will be made to make up for start up time START_UP_TIME_MAX = 0.6 + class BState: """ Enum for the state of the belief """ + def __init__(self): self.current = "current" self.unobserved = "unobserved" @@ -34,7 +36,7 @@ def __init__(self, filename: str, skill: str, labels: list, start_time: int): self.start_time = start_time # initialize the file - in case we need to overwrite it - with open(self.filename, 'w') as f: + with open(self.filename, "w") as f: f.write("") # flag for handling how long it takes to start up the video @@ -42,7 +44,7 @@ def __init__(self, filename: str, skill: str, labels: list, start_time: int): def _add_row_to_file(self, row: str) -> None: # append the row to the file - with open(self.filename, 'a') as f: + with open(self.filename, "a") as f: f.write(row) def _add_rows(self, conf_array: list, ctime: float) -> None: @@ -55,7 +57,9 @@ def _add_rows(self, conf_array: list, ctime: float) -> None: # add the rows for step in self.labels: _row = row + f",{step},{self.running_state[step]}," - _row = _row + f"{conf_array[int(step)]},{ctime}\n" # _row = _row + f"{conf_array[int(step)]:0.8f},{ctime:0.8f}\n" + _row = ( + _row + f"{conf_array[int(step)]},{ctime}\n" + ) # _row = _row + f"{conf_array[int(step)]:0.8f},{ctime:0.8f}\n" self._add_row_to_file(_row) def final_step_done(self) -> None: @@ -65,12 +69,16 @@ def final_step_done(self) -> None: # set the final step self.running_state[self.labels[-1]] = BState().done - def update_values(self, current_step: float, conf_array: list, current_time: int) -> None: + def update_values( + self, current_step: float, conf_array: list, current_time: int + ) -> None: """ When you provide an update, this method will update internal state and trigger a write to the file. """ - curr_time = float(current_time - self.start_time) * 1e-9 # get seconds from nano + curr_time = ( + float(current_time - self.start_time) * 1e-9 + ) # get seconds from nano # correction of the starting time if we notice that the first # time difference is too large diff --git a/angel_system/global_step_prediction/get_bbn_belief_file.py b/angel_system/global_step_prediction/get_bbn_belief_file.py index e6ed4c4fd..11a843a2f 100644 --- a/angel_system/global_step_prediction/get_bbn_belief_file.py +++ b/angel_system/global_step_prediction/get_bbn_belief_file.py @@ -20,12 +20,15 @@ threshold_frame_count_weak = 8 deactivate_thresh_frame_count = 8 + def get_belief_file( coco_ds: kwcoco.CocoDataset, medical_task="r18", code_dir=Path("."), out_file=Path("./belief_file.csv"), - model_file=Path("./model_files/task_monitor/global_step_predictor_act_avgs_R18.npy"), + model_file=Path( + "./model_files/task_monitor/global_step_predictor_act_avgs_R18.npy" + ), ) -> None: """ Run the inference and create the belief file. @@ -79,7 +82,7 @@ def get_belief_file( # get the frame_index from the images ftimes = annots_images.images().lookup("frame_index") - #print(ftimes) + # print(ftimes) step_mode = "granular" for i, conf_array in enumerate(activity_confs): From 5012bd3a76b6d8112af02c034e8c771b0bc829d7 Mon Sep 17 00:00:00 2001 From: Joseph VanPelt Date: Thu, 26 Dec 2024 11:31:38 -0500 Subject: [PATCH 3/4] add gsp to the latency tracker --- .../angel_system_nodes/latency_tracker.py | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py b/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py index 04609cf7b..dffab7507 100644 --- a/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py +++ b/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py @@ -8,7 +8,7 @@ from angel_utils.conversion import time_to_float, time_to_int from builtin_interfaces.msg import Time -from angel_msgs.msg import ObjectDetection2dSet, HandJointPosesUpdate, ActivityDetection +from angel_msgs.msg import ObjectDetection2dSet, HandJointPosesUpdate, ActivityDetection, TaskUpdate from angel_utils import ( declare_and_get_parameters, RateTracker, # DYNAMIC_TYPE @@ -44,6 +44,7 @@ def __init__(self): # If we should enable additional logging to the info level # about when we receive and process data. ("enable_time_trace_logging", False), + ("gsp_topic", "TaskUpdates"), ], ) self._image_md_topic = param_values["image_md_topic"] @@ -51,6 +52,7 @@ def __init__(self): self._pose_topic = param_values["pose_topic"] self._act_topic = param_values["activity_topic"] self._latency_topic = param_values["latency_topic"] + self._gsp_topic = param_values["gsp_topic"] self._enable_trace_logging = param_values["enable_time_trace_logging"] @@ -64,12 +66,14 @@ def __init__(self): self._cur_pose_msg_lock = Lock() self._cur_act_msg_lock = Lock() self._image_lookup_lock = Lock() + self._task_msg_lock = Lock() self._rate_tracker = RateTracker() self._det = None self._pose = None self._act = None + self._task = None self._image_lookup = {} ########################################## @@ -103,6 +107,13 @@ def __init__(self): 1, callback_group=MutuallyExclusiveCallbackGroup(), ) + self._task_subscriber = self.create_subscription( + TaskUpdate, + self._gsp_topic, + self.task_callback, + 1, + callback_group=MutuallyExclusiveCallbackGroup(), + ) self._latency_publisher = self.create_publisher( ros2_string, @@ -188,19 +199,31 @@ def rt_loop(self): if img_time is not None: act_lat_end = act_time - time_to_float(img_time) + task_lat = None + if self._task: + with self._task_msg_lock: + task_msg = self._task + task_time = time_to_float(task_msg.header.stamp) + img_time = self.get_msg_time_from_source(task_msg.latest_sensor_input_time) + if img_time is not None: + task_lat = task_time - time_to_float(img_time) + # save the info to the message data = { "detection": det_lat, "pose:": pose_lat, "activity_start": act_lat_start, "activity_end": act_lat_end, + "task": task_lat, } det_str = f"{det_lat:.3f}" if det_lat else "NA" pose_str = f"{pose_lat:.3f}" if pose_lat else "NA" acts_str = f"{act_lat_start:.3f}" if act_lat_start else "NA" acte_str = f"{act_lat_end:.3f}" if act_lat_end else "NA" + task_str = f"{task_lat:.3f}" if task_lat else "NA" log.info( f"Detection: {det_str}, Pose: {pose_str}, Activity.start: {acts_str}, Activity.end: {acte_str}" + f", Task monitor: {task_str}" ) msg.data = json.dumps(data, indent=0) @@ -233,6 +256,10 @@ def act_callback(self, msg: ActivityDetection) -> None: with self._cur_act_msg_lock: self._act = msg + def task_callback(self, msg: TaskUpdate) -> None: + with self._task_msg_lock: + self._task = msg + def get_msg_time_from_source(self, source_stamp: Time) -> Time: with self._image_lookup_lock: return self._image_lookup.get(time_to_int(source_stamp)) From 3e99e353e18ac264efd8d5161a578c580b91799b Mon Sep 17 00:00:00 2001 From: Joseph VanPelt Date: Thu, 26 Dec 2024 11:58:21 -0500 Subject: [PATCH 4/4] black formatting changes --- .../angel_system_nodes/latency_tracker.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py b/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py index dffab7507..2e790975e 100644 --- a/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py +++ b/ros/angel_system_nodes/angel_system_nodes/latency_tracker.py @@ -8,7 +8,12 @@ from angel_utils.conversion import time_to_float, time_to_int from builtin_interfaces.msg import Time -from angel_msgs.msg import ObjectDetection2dSet, HandJointPosesUpdate, ActivityDetection, TaskUpdate +from angel_msgs.msg import ( + ObjectDetection2dSet, + HandJointPosesUpdate, + ActivityDetection, + TaskUpdate, +) from angel_utils import ( declare_and_get_parameters, RateTracker, # DYNAMIC_TYPE @@ -204,7 +209,9 @@ def rt_loop(self): with self._task_msg_lock: task_msg = self._task task_time = time_to_float(task_msg.header.stamp) - img_time = self.get_msg_time_from_source(task_msg.latest_sensor_input_time) + img_time = self.get_msg_time_from_source( + task_msg.latest_sensor_input_time + ) if img_time is not None: task_lat = task_time - time_to_float(img_time)