Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

belief file csv output #483

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions angel_system/global_step_prediction/belief_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
This class is meant to create a CSV file in the format that the BBN system expects.
"""

# Max number of seconds in the first time difference between when
# the node started and when the first update came in.
# if larger, a correction will be made to make up for start up time
START_UP_TIME_MAX = 0.6


class BState:
"""
Enum for the state of the belief
"""

def __init__(self):
self.current = "current"
self.unobserved = "unobserved"
self.done = "done"


class BeliefFile:
def __init__(self, filename: str, skill: str, labels: list, start_time: int):
self.filename = filename
self.skill = skill
self.labels = labels
self.running_state = {} # keeps track of the steps
# initialize the running states
for label in labels:
self.running_state[label] = BState().unobserved
# set the first step to current
# NOTE: the example files given had this set to current
# from the very beginnning - an assumption we are making here, too
self.running_state[1.0] = BState().current
# this will be used to calculate the current time in the video
self.start_time = start_time

# initialize the file - in case we need to overwrite it
with open(self.filename, "w") as f:
f.write("")

# flag for handling how long it takes to start up the video
self.first_time_diff = True

def _add_row_to_file(self, row: str) -> None:
# append the row to the file
with open(self.filename, "a") as f:
f.write(row)

def _add_rows(self, conf_array: list, ctime: float) -> None:
"""
Add multiple rows to the file based on the labels
"""
# <skill>, <step_num>, <state>, <confidence>, <timestep>
row = self.skill

# add the rows
for step in self.labels:
_row = row + f",{step},{self.running_state[step]},"
_row = (
_row + f"{conf_array[int(step)]},{ctime}\n"
) # _row = _row + f"{conf_array[int(step)]:0.8f},{ctime:0.8f}\n"
self._add_row_to_file(_row)

def final_step_done(self) -> None:
"""
This method is called when the final step is done.
"""
# set the final step
self.running_state[self.labels[-1]] = BState().done

def update_values(
self, current_step: float, conf_array: list, current_time: int
) -> None:
"""
When you provide an update, this method will update internal state
and trigger a write to the file.
"""
curr_time = (
float(current_time - self.start_time) * 1e-9
) # get seconds from nano

# correction of the starting time if we notice that the first
# time difference is too large
if self.first_time_diff and curr_time > START_UP_TIME_MAX:
self.first_time_diff = False
self.start_time = current_time # save this for the next update
# assume 0 for now
curr_time = 0.0

# check the states and see if they changed
if current_step > 0 and self.running_state[current_step] != BState().current:
# set the current step
self.running_state[current_step] = BState().current

# see if the previous state was current - that means we change it to done
prev_step = current_step - 1.0
if prev_step > 0 and self.running_state[prev_step] == BState().current:
self.running_state[prev_step] = BState().done

# write the rows to the file
self._add_rows(conf_array, curr_time)
157 changes: 157 additions & 0 deletions angel_system/global_step_prediction/get_bbn_belief_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
This script will take a kwcoco file that was output from the TCN node (for example)
and output the belief file that is used by the BBN eval_kit. The belief file is a CSV.
"""
from pathlib import Path

import click
import kwcoco
import numpy as np
import yaml

from angel_system.global_step_prediction.belief_file import BeliefFile
from angel_system.global_step_prediction.global_step_predictor import (
GlobalStepPredictor,
)

# TODO: make these options in the future?
threshold_multiplier_weak = 1.0
threshold_frame_count = 3
threshold_frame_count_weak = 8
deactivate_thresh_frame_count = 8


def get_belief_file(
coco_ds: kwcoco.CocoDataset,
medical_task="r18",
code_dir=Path("."),
out_file=Path("./belief_file.csv"),
model_file=Path(
"./model_files/task_monitor/global_step_predictor_act_avgs_R18.npy"
),
) -> None:
"""
Run the inference and create the belief file.
"""

# path to the medical activity labels
act_path = code_dir / "config/activity_labels/medical" / f"{medical_task}.yaml"

# load the steps from the activity config file
with open(act_path, "r") as stream:
config = yaml.safe_load(stream)
labels = []
for lbl in config["labels"]:
id = float(lbl["id"]) # using float based on the belief file format
if id > 0: # skip the background label - not used in belief format
labels.append(id)
print(f"Labels: {labels}")

start_time = 0 # start of the video

# setup the belief file
print(f"setting up output: {out_file}")
belief = BeliefFile(out_file, medical_task.upper(), labels, start_time)

# setup the global step predictor
gsp = GlobalStepPredictor(
threshold_multiplier_weak=threshold_multiplier_weak,
threshold_frame_count=threshold_frame_count,
threshold_frame_count_weak=threshold_frame_count_weak,
deactivate_thresh_frame_count=deactivate_thresh_frame_count,
recipe_types=[f"{medical_task}"],
activity_config_fpath=act_path.as_posix(),
recipe_config_dict={
f"{medical_task}": code_dir
/ "config/tasks/medical"
/ f"{medical_task}.yaml"
},
)
# load the model
gsp.get_average_TP_activations_from_file(model_file)

all_vid_ids = np.unique(np.asarray(coco_ds.images().lookup("video_id")))
for vid_id in all_vid_ids:
print(f"vid_id {vid_id}===========================")

image_ids = coco_ds.index.vidid_to_gids[vid_id]
annots_images = coco_ds.subset(gids=image_ids, copy=True)

# All N activity confs x each video frame
activity_confs = annots_images.annots().get("prob")

# get the frame_index from the images
ftimes = annots_images.images().lookup("frame_index")
# print(ftimes)

step_mode = "granular"
for i, conf_array in enumerate(activity_confs):
current_time = ftimes[i] # get the time from the image's frame_index

if current_time > 0: # skip any 0 index frames
tracker_dict_list = gsp.process_new_confidences(np.array([conf_array]))
for task in tracker_dict_list:
current_step_id = task[f"current_{step_mode}_step"]

# If we are on the last step and it is not active, mark it as done
if (
current_step_id == task[f"total_num_{step_mode}_steps"] - 1
and not task["active"]
):
belief.final_step_done()

print(f"Updating based on: {current_time}")
belief.update_values(current_step_id, conf_array, current_time)

print(f"finished writing belief file: {out_file}")


@click.command(context_settings={"help_option_names": ["-h", "--help"]})
@click.argument(
"medical_task",
type=str,
)
@click.argument(
"coco_file",
type=click.Path(
exists=True, dir_okay=False, readable=True, resolve_path=True, path_type=Path
),
default="./stuff/r18_bench1_activity_predictions.kwcoco",
)
@click.option(
"--code_dir",
type=click.Path(
exists=True, file_okay=False, readable=True, resolve_path=True, path_type=Path
),
default=".",
help="The path to the code directory",
)
@click.option(
"--out_file",
type=click.Path(readable=True, resolve_path=True, path_type=Path),
default="./belief_file.csv",
help="The path to where to save the output file",
)
def run_expirement(
medical_task: str,
coco_file: Path,
code_dir: Path,
out_file: Path,
) -> None:
"""
Creates the belief file.
"""

print(f"Running medical task: {medical_task}")
print(f"coco_file = {coco_file}")

get_belief_file(
kwcoco.CocoDataset(coco_file),
medical_task=medical_task,
code_dir=code_dir,
out_file=out_file,
)


if __name__ == "__main__":
run_expirement()
36 changes: 35 additions & 1 deletion ros/angel_system_nodes/angel_system_nodes/latency_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
from angel_utils.conversion import time_to_float, time_to_int
from builtin_interfaces.msg import Time

from angel_msgs.msg import ObjectDetection2dSet, HandJointPosesUpdate, ActivityDetection
from angel_msgs.msg import (
ObjectDetection2dSet,
HandJointPosesUpdate,
ActivityDetection,
TaskUpdate,
)
from angel_utils import (
declare_and_get_parameters,
RateTracker, # DYNAMIC_TYPE
Expand Down Expand Up @@ -44,13 +49,15 @@ def __init__(self):
# If we should enable additional logging to the info level
# about when we receive and process data.
("enable_time_trace_logging", False),
("gsp_topic", "TaskUpdates"),
],
)
self._image_md_topic = param_values["image_md_topic"]
self._det_topic = param_values["det_topic"]
self._pose_topic = param_values["pose_topic"]
self._act_topic = param_values["activity_topic"]
self._latency_topic = param_values["latency_topic"]
self._gsp_topic = param_values["gsp_topic"]

self._enable_trace_logging = param_values["enable_time_trace_logging"]

Expand All @@ -64,12 +71,14 @@ def __init__(self):
self._cur_pose_msg_lock = Lock()
self._cur_act_msg_lock = Lock()
self._image_lookup_lock = Lock()
self._task_msg_lock = Lock()

self._rate_tracker = RateTracker()

self._det = None
self._pose = None
self._act = None
self._task = None
self._image_lookup = {}

##########################################
Expand Down Expand Up @@ -103,6 +112,13 @@ def __init__(self):
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)
self._task_subscriber = self.create_subscription(
TaskUpdate,
self._gsp_topic,
self.task_callback,
1,
callback_group=MutuallyExclusiveCallbackGroup(),
)

self._latency_publisher = self.create_publisher(
ros2_string,
Expand Down Expand Up @@ -188,19 +204,33 @@ def rt_loop(self):
if img_time is not None:
act_lat_end = act_time - time_to_float(img_time)

task_lat = None
if self._task:
with self._task_msg_lock:
task_msg = self._task
task_time = time_to_float(task_msg.header.stamp)
img_time = self.get_msg_time_from_source(
task_msg.latest_sensor_input_time
)
if img_time is not None:
task_lat = task_time - time_to_float(img_time)

# save the info to the message
data = {
"detection": det_lat,
"pose:": pose_lat,
"activity_start": act_lat_start,
"activity_end": act_lat_end,
"task": task_lat,
}
det_str = f"{det_lat:.3f}" if det_lat else "NA"
pose_str = f"{pose_lat:.3f}" if pose_lat else "NA"
acts_str = f"{act_lat_start:.3f}" if act_lat_start else "NA"
acte_str = f"{act_lat_end:.3f}" if act_lat_end else "NA"
task_str = f"{task_lat:.3f}" if task_lat else "NA"
log.info(
f"Detection: {det_str}, Pose: {pose_str}, Activity.start: {acts_str}, Activity.end: {acte_str}"
f", Task monitor: {task_str}"
)
msg.data = json.dumps(data, indent=0)

Expand Down Expand Up @@ -233,6 +263,10 @@ def act_callback(self, msg: ActivityDetection) -> None:
with self._cur_act_msg_lock:
self._act = msg

def task_callback(self, msg: TaskUpdate) -> None:
with self._task_msg_lock:
self._task = msg

def get_msg_time_from_source(self, source_stamp: Time) -> Time:
with self._image_lookup_lock:
return self._image_lookup.get(time_to_int(source_stamp))
Expand Down