From c63abbf877e493e606baac614a84f74c6b3121f5 Mon Sep 17 00:00:00 2001 From: Davide Iafrate Date: Mon, 30 Sep 2024 11:38:16 +0000 Subject: [PATCH 1/2] Ported DuckieDrone UKF 2D and 7D --- .../state_estimator_ukf_2d_node.py | 382 ------------------ .../config/state_estimator_node.yaml | 6 +- .../launch/state_estimator_node.launch | 4 + .../state_estimator_ukf_2d_node.py | 382 ------------------ .../state_estimator_ukf_7d_node.py | 207 ---------- .../state_estimator/src/scripts/__init__.py | 4 +- .../src/scripts/state_estimator_ukf_2d.py | 245 +++++++++++ .../src/scripts/state_estimator_ukf_7d.py | 332 +++++++++++++++ .../src/state_estimator_node.py | 13 +- 9 files changed, 597 insertions(+), 978 deletions(-) delete mode 100755 packages/robots/duckiedrone/state_estimator/_scripts/StateEstimators/state_estimator_ukf_2d_node.py delete mode 100755 packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_2d_node.py delete mode 100755 packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_7d_node.py create mode 100644 packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_2d.py create mode 100644 packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_7d.py diff --git a/packages/robots/duckiedrone/state_estimator/_scripts/StateEstimators/state_estimator_ukf_2d_node.py b/packages/robots/duckiedrone/state_estimator/_scripts/StateEstimators/state_estimator_ukf_2d_node.py deleted file mode 100755 index 0ffd530f..00000000 --- a/packages/robots/duckiedrone/state_estimator/_scripts/StateEstimators/state_estimator_ukf_2d_node.py +++ /dev/null @@ -1,382 +0,0 @@ -#!/usr/bin/env python3 - -# ROS imports -import rospy -from sensor_msgs.msg import Imu, Range -from nav_msgs.msg import Odometry - -# UKF imports -# The matplotlib imports and the matplotlib.use('Pdf') line make it so that the -# UKF code that imports matplotlib does not complain. Essentially, the -# use('Pdf') call allows a plot to be created without a window (allows it to run -# through ssh) -import matplotlib -matplotlib.use('Pdf') -from filterpy.kalman import UnscentedKalmanFilter -from filterpy.kalman import MerweScaledSigmaPoints -from filterpy.common.discretization import Q_discrete_white_noise - -# Other imports -import numpy as np -import argparse -import os - - -class UKFStateEstimator2D(object): - """ - Class that estimates the state of the drone using an Unscented Kalman Filter - (UKF) applied to raw sensor data. The filter only tracks the quadcopter's - motion along one spatial dimension: the global frame z-axis. In this - simplified case, we assume that the drone's body frame orientation is - aligned with the world frame (i.e., no roll, pitch, or yaw), and that it is - only offset along the z-axis. It is called a 2D UKF because we track two - state variables in the state vector: z position and z velocity. - """ - - def __init__(self, loop_hz, altitude_throttled=False, imu_throttled=False): - # self.ready_to_filter is False until we get initial measurements in - # order to be able to initialize the filter's state vector x and - # covariance matrix P. - self.ready_to_filter = False - self.printed_filter_start_notice = False - self.got_altitude = False - self.loop_hz = loop_hz - - self.altitude_topic_str = 'altitude_node' - self.imu_topic_str = 'imu' - throttle_suffix = '_throttle' - - if altitude_throttled: - self.altitude_topic_str += throttle_suffix - if imu_throttled: - self.imu_topic_str += throttle_suffix - - self.in_callback = False - - self.initialize_ukf() - - # The last time that we received an input and formed a prediction with - # the state transition function - self.last_state_transition_time = None - - # Time in seconds between consecutive state transitions, dictated by - # when the inputs come in - self.dt = None - - # Initialize the last control input as 0 m/s^2 along the z-axis - self.last_control_input = np.array([0.0]) - - self.initialize_ros() - - def initialize_ros(self): - """ - Initialize ROS-related objects, e.g., the node, subscribers, etc. - """ - self.node_name = os.path.splitext(os.path.basename(__file__))[0] - print('Initializing {} node...'.format(self.node_name)) - rospy.init_node(self.node_name) - - # Subscribe to topics to which the drone publishes in order to get raw - # data from sensors, which we can then filter - rospy.Subscriber(self.imu_topic_str, Imu, self.imu_data_callback) - rospy.Subscriber(self.altitude_topic_str, Range, self.altitude_data_callback) - - # Create the publisher to publish state estimates - self.state_pub = rospy.Publisher('state_estimator_node/ukf_2d', Odometry, queue_size=1, - tcp_nodelay=False) - - def initialize_ukf(self): - """ - Initialize the parameters of the Unscented Kalman Filter (UKF) that is - used to estimate the state of the drone. - """ - - # Number of state variables being tracked - self.state_vector_dim = 2 - # The state vector consists of the following column vector. - # Note that FilterPy initializes the state vector with zeros. - # [[z], - # [z_vel]] - - # Number of measurement variables that the drone receives - self.measurement_vector_dim = 1 - # The measurement variables consist of the following vector: - # [[slant_range]] - - # Function to generate sigma points for the UKF - # TODO: Modify these sigma point parameters appropriately. Currently - # just educated guesses - sigma_points = MerweScaledSigmaPoints(n=self.state_vector_dim, - alpha=0.1, - beta=2.0, - kappa=(3.0-self.state_vector_dim)) - # Create the UKF object - # Note that dt will get updated dynamically as sensor data comes in, - # as will the measurement function, since measurements come in at - # distinct rates. Setting compute_log_likelihood=False saves some - # computation. - self.ukf = UnscentedKalmanFilter(dim_x=self.state_vector_dim, - dim_z=self.measurement_vector_dim, - dt=1.0, - hx=self.measurement_function, - fx=self.state_transition_function, - points=sigma_points, - compute_log_likelihood=False) - self.initialize_ukf_matrices() - - def initialize_ukf_matrices(self): - """ - Initialize the covariance matrices of the UKF - """ - # Initialize state covariance matrix P: - # TODO: Tune these initial values appropriately. Currently these are - # just guesses - self.ukf.P = np.diag([0.1, 0.2]) - - # Initialize the process noise covariance matrix Q: - # TODO: Tune appropriately. Currently just a guess - self.ukf.Q = np.diag([0.01, 1.0])*0.005 - # self.ukf.Q = Q_discrete_white_noise(self.state_vector_dim) - - # Initialize the measurement covariance matrix R - # IR slant range variance (m^2), determined experimentally in a static - # setup with mean range around 0.335 m: - self.ukf.R = np.array([2.2221e-05]) - - def update_input_time(self, msg): - """ - Update the time at which we have received the most recent input, based - on the timestamp in the header of a ROS message - - msg : a ROS message that includes a header with a timestamp that - indicates the time at which the respective input was originally - recorded - """ - self.last_time_secs = msg.header.stamp.secs - self.last_time_nsecs = msg.header.stamp.nsecs - new_time = self.last_time_secs + self.last_time_nsecs*1e-9 - # Compute the time interval since the last state transition / input - self.dt = new_time - self.last_state_transition_time - # Assert that the time step be non-negative. Negative time steps seem to - # be able to arise when more than one input node is running (i.e., both - # IR and flight controller nodes). This might result in small - # discrepencies between different nodes' delays from timestamping to - # publishing. Another solution might be to call rospy.Time.now() right - # before those nodes publish, or on the receiving end in this state - # estimator node - if self.dt < 0: - self.dt = 0 - # Set the current time at which we just received an input - # to be the last input time - self.last_state_transition_time = new_time - - def initialize_input_time(self, msg): - """ - Initialize the input time (self.last_state_transition_time) based on the - timestamp in the header of a ROS message. This is called before we start - filtering in order to attain an initial time value, which enables us to - then compute a time interval self.dt by calling self.update_input_time() - - msg : a ROS message that includes a header with a timestamp - """ - self.last_time_secs = msg.header.stamp.secs - self.last_time_nsecs = msg.header.stamp.nsecs - self.last_state_transition_time = (self.last_time_secs + - self.last_time_nsecs*1e-9) - - def ukf_predict(self): - """ - Compute the prior for the UKF based on the current state, a control - input, and a time step. - """ - # self.ukf.Q = Q_discrete_white_noise(self.state_vector_dim, dt=self.dt, var=1.0) - # self.ukf.Q = np.diag([0.01, 1.0])*0.05*self.dt - self.ukf.predict(dt=self.dt, u=self.last_control_input) - - def print_notice_if_first(self): - if not self.printed_filter_start_notice: - print('Starting filter') - self.printed_filter_start_notice = True - - def imu_data_callback(self, data): - """ - Handle the receipt of an Imu message. Only take the linear acceleration - along the z-axis. - """ - if self.in_callback: - return - self.in_callback = True - self.last_control_input = np.array([data.linear_acceleration.z]) - if abs(data.linear_acceleration.z) < 0.3: - # Adaptive filtering. Lower the process noise if acceleration is low - self.ukf.Q = np.diag([0.01, 1.0])*0.0005 - else: - self.ukf.Q = np.diag([0.01, 1.0])*0.005 - self.in_callback = False - - def altitude_data_callback(self, data): - """ - Handle the receipt of a Range message from the IR sensor. - """ - if self.in_callback: - return - self.in_callback = True - self.last_measurement_vector = np.array([data.range]) - if self.ready_to_filter: - self.update_input_time(data) - else: - self.initialize_input_time(data) - # Got a raw slant range reading, so update the initial state - # vector of the UKF - self.ukf.x[0] = data.range - self.ukf.x[1] = 0.0 # initialize velocity as 0 m/s - # Update the state covariance matrix to reflect estimated - # measurement error. Variance of the measurement -> variance of - # the corresponding state variable - self.ukf.P[0, 0] = self.ukf.R[0] - self.got_altitude = True - self.check_if_ready_to_filter() - self.in_callback = False - - def check_if_ready_to_filter(self): - self.ready_to_filter = self.got_altitude - - def publish_current_state(self): - """ - Publish the current state estimate and covariance from the UKF. This is - a State message containing: - - Header - - PoseWithCovariance - - TwistWithCovariance - Note that a lot of these ROS message fields will be left empty, as the - 2D UKF does not track information on the entire state space of the - drone. - """ - state_msg = Odometry() - state_msg.header.stamp.secs = self.last_time_secs - state_msg.header.stamp.nsecs = self.last_time_nsecs - state_msg.header.frame_id = 'global' - - # Get the current state estimate from self.ukf.x - state_msg.pose.pose.position.z = self.ukf.x[0] - state_msg.twist.twist.linear.z = self.ukf.x[1] - - # Fill the rest of the message with NaN - state_msg.pose.pose.position.x = np.nan - state_msg.pose.pose.position.y = np.nan - state_msg.pose.pose.orientation.x = np.nan - state_msg.pose.pose.orientation.y = np.nan - state_msg.pose.pose.orientation.z = np.nan - state_msg.pose.pose.orientation.w = np.nan - state_msg.twist.twist.linear.x = np.nan - state_msg.twist.twist.linear.y = np.nan - state_msg.twist.twist.angular.x = np.nan - state_msg.twist.twist.angular.y = np.nan - state_msg.twist.twist.angular.z = np.nan - - # Prepare covariance matrices - # 36-element array, in a row-major order, according to ROS msg docs - pose_cov_mat = np.full((36,), np.nan) - twist_cov_mat = np.full((36,), np.nan) - pose_cov_mat[14] = self.ukf.P[0, 0] # z variance - twist_cov_mat[14] = self.ukf.P[1, 1] # z velocity variance - - # Add covariances to message - state_msg.pose.covariance = pose_cov_mat - state_msg.twist.covariance = twist_cov_mat - - self.state_pub.publish(state_msg) - - def state_transition_function(self, x, dt, u): - """ - The state transition function to compute the prior in the prediction - step, propagating the state to the next time step. - - x : current state. A NumPy array - dt : time step. A float - u : control input. A NumPy array - """ - # State transition matrix F - F = np.array([[1, dt], - [0, 1]]) - # Integrate control input acceleration to get a change in velocity - change_from_control_input = np.array([0, - u[0]*dt]) - # change_from_control_input = np.array([0.5*u[0]*(dt**2.0), - # u[0]*dt]) - x_output = np.dot(F, x) + change_from_control_input - return x_output - - def measurement_function(self, x): - """ - Transform the state x into measurement space. In this simple model, we - assume that the range measurement corresponds exactly to position along - the z-axis, as we are assuming there is no pitch and roll. - - x : current state. A NumPy array - """ - # Measurement update matrix H - H = np.array([[1, 0]]) - hx_output = np.dot(H, x) - return hx_output - - def start_loop(self): - """ - Begin the UKF's loop of predicting and updating. Publish a state - estimate at the end of each loop. - """ - rate = rospy.Rate(self.loop_hz) - while not rospy.is_shutdown(): - if self.ready_to_filter: - self.print_notice_if_first() - self.ukf_predict() - - # Now that a prediction has been formed to bring the current - # prior state estimate to the same point in time as the - # measurement, perform a measurement update with the most recent - # IR range reading - self.ukf.update(self.last_measurement_vector) - self.publish_current_state() - rate.sleep() - - -def check_positive_float_duration(val): - """ - Function to check that the --loop_hz command-line argument is a positive - float. - """ - value = float(val) - if value <= 0.0: - raise argparse.ArgumentTypeError('Loop Hz must be positive') - return value - -def main(): - parser = argparse.ArgumentParser(description=('Estimate the drone\'s state ' - 'with a UKF in one spatial dimension')) - # Arguments to determine if the throttle command is being used. E.g.: - # rosrun topic_tools throttle messages /pidrone/altitude 40.0 - parser.add_argument('--altitude_throttled', action='store_true', - help=('Use throttled altitude topic altitude_throttle')) - parser.add_argument('--imu_throttled', action='store_true', - help=('Use throttled IMU topic imu_throttle')) - parser.add_argument('--loop_hz', '-hz', default=30.0, - type=check_positive_float_duration, - help=('Frequency at which to run the predict-update ' - 'loop of the UKF (default: 30)')) - # NOTE: The throttle flags are deprecated in favor of the loop Hz flag. - # Using throttled data streams while also running the UKF on a set - # loop can degrade the estimates. - args = parser.parse_args() - se = UKFStateEstimator2D(loop_hz=args.loop_hz, - altitude_throttled=args.altitude_throttled, - imu_throttled=args.imu_throttled) - try: - se.start_loop() - finally: - # Upon termination of this script, print out a helpful message - print('{} node terminating.'.format(se.node_name)) - print('Most recent state vector:') - print(se.ukf.x) - -if __name__ == '__main__': - main() diff --git a/packages/robots/duckiedrone/state_estimator/config/state_estimator_node.yaml b/packages/robots/duckiedrone/state_estimator/config/state_estimator_node.yaml index a900cd6f..00613126 100644 --- a/packages/robots/duckiedrone/state_estimator/config/state_estimator_node.yaml +++ b/packages/robots/duckiedrone/state_estimator/config/state_estimator_node.yaml @@ -1,10 +1,10 @@ # Configuration for the StateEstimatorNode # Primary state estimation method -primary: 'ema' # Default: 'ukf7d' -# Choices: ['ema', 'ukf2d', 'ukf7d', 'ukf12d', 'mocap', 'simulator'] +primary: 'ukf7d' # Default: 'ukf7d' +# Choices: ['ema', 'ukf2d', 'ukf7d', 'ukf12d'] -# Other state estimation methods to run alongside the primary one +# Other state estimation methods to run alongside the primary one (DEPRECATED, DO NOT USE) others: - 'ema' - 'mocap' diff --git a/packages/robots/duckiedrone/state_estimator/launch/state_estimator_node.launch b/packages/robots/duckiedrone/state_estimator/launch/state_estimator_node.launch index f678c4dd..38efe10a 100644 --- a/packages/robots/duckiedrone/state_estimator/launch/state_estimator_node.launch +++ b/packages/robots/duckiedrone/state_estimator/launch/state_estimator_node.launch @@ -5,6 +5,10 @@ watchtower, duckiedrone or traffic_light"/> + + + + diff --git a/packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_2d_node.py b/packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_2d_node.py deleted file mode 100755 index 0ffd530f..00000000 --- a/packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_2d_node.py +++ /dev/null @@ -1,382 +0,0 @@ -#!/usr/bin/env python3 - -# ROS imports -import rospy -from sensor_msgs.msg import Imu, Range -from nav_msgs.msg import Odometry - -# UKF imports -# The matplotlib imports and the matplotlib.use('Pdf') line make it so that the -# UKF code that imports matplotlib does not complain. Essentially, the -# use('Pdf') call allows a plot to be created without a window (allows it to run -# through ssh) -import matplotlib -matplotlib.use('Pdf') -from filterpy.kalman import UnscentedKalmanFilter -from filterpy.kalman import MerweScaledSigmaPoints -from filterpy.common.discretization import Q_discrete_white_noise - -# Other imports -import numpy as np -import argparse -import os - - -class UKFStateEstimator2D(object): - """ - Class that estimates the state of the drone using an Unscented Kalman Filter - (UKF) applied to raw sensor data. The filter only tracks the quadcopter's - motion along one spatial dimension: the global frame z-axis. In this - simplified case, we assume that the drone's body frame orientation is - aligned with the world frame (i.e., no roll, pitch, or yaw), and that it is - only offset along the z-axis. It is called a 2D UKF because we track two - state variables in the state vector: z position and z velocity. - """ - - def __init__(self, loop_hz, altitude_throttled=False, imu_throttled=False): - # self.ready_to_filter is False until we get initial measurements in - # order to be able to initialize the filter's state vector x and - # covariance matrix P. - self.ready_to_filter = False - self.printed_filter_start_notice = False - self.got_altitude = False - self.loop_hz = loop_hz - - self.altitude_topic_str = 'altitude_node' - self.imu_topic_str = 'imu' - throttle_suffix = '_throttle' - - if altitude_throttled: - self.altitude_topic_str += throttle_suffix - if imu_throttled: - self.imu_topic_str += throttle_suffix - - self.in_callback = False - - self.initialize_ukf() - - # The last time that we received an input and formed a prediction with - # the state transition function - self.last_state_transition_time = None - - # Time in seconds between consecutive state transitions, dictated by - # when the inputs come in - self.dt = None - - # Initialize the last control input as 0 m/s^2 along the z-axis - self.last_control_input = np.array([0.0]) - - self.initialize_ros() - - def initialize_ros(self): - """ - Initialize ROS-related objects, e.g., the node, subscribers, etc. - """ - self.node_name = os.path.splitext(os.path.basename(__file__))[0] - print('Initializing {} node...'.format(self.node_name)) - rospy.init_node(self.node_name) - - # Subscribe to topics to which the drone publishes in order to get raw - # data from sensors, which we can then filter - rospy.Subscriber(self.imu_topic_str, Imu, self.imu_data_callback) - rospy.Subscriber(self.altitude_topic_str, Range, self.altitude_data_callback) - - # Create the publisher to publish state estimates - self.state_pub = rospy.Publisher('state_estimator_node/ukf_2d', Odometry, queue_size=1, - tcp_nodelay=False) - - def initialize_ukf(self): - """ - Initialize the parameters of the Unscented Kalman Filter (UKF) that is - used to estimate the state of the drone. - """ - - # Number of state variables being tracked - self.state_vector_dim = 2 - # The state vector consists of the following column vector. - # Note that FilterPy initializes the state vector with zeros. - # [[z], - # [z_vel]] - - # Number of measurement variables that the drone receives - self.measurement_vector_dim = 1 - # The measurement variables consist of the following vector: - # [[slant_range]] - - # Function to generate sigma points for the UKF - # TODO: Modify these sigma point parameters appropriately. Currently - # just educated guesses - sigma_points = MerweScaledSigmaPoints(n=self.state_vector_dim, - alpha=0.1, - beta=2.0, - kappa=(3.0-self.state_vector_dim)) - # Create the UKF object - # Note that dt will get updated dynamically as sensor data comes in, - # as will the measurement function, since measurements come in at - # distinct rates. Setting compute_log_likelihood=False saves some - # computation. - self.ukf = UnscentedKalmanFilter(dim_x=self.state_vector_dim, - dim_z=self.measurement_vector_dim, - dt=1.0, - hx=self.measurement_function, - fx=self.state_transition_function, - points=sigma_points, - compute_log_likelihood=False) - self.initialize_ukf_matrices() - - def initialize_ukf_matrices(self): - """ - Initialize the covariance matrices of the UKF - """ - # Initialize state covariance matrix P: - # TODO: Tune these initial values appropriately. Currently these are - # just guesses - self.ukf.P = np.diag([0.1, 0.2]) - - # Initialize the process noise covariance matrix Q: - # TODO: Tune appropriately. Currently just a guess - self.ukf.Q = np.diag([0.01, 1.0])*0.005 - # self.ukf.Q = Q_discrete_white_noise(self.state_vector_dim) - - # Initialize the measurement covariance matrix R - # IR slant range variance (m^2), determined experimentally in a static - # setup with mean range around 0.335 m: - self.ukf.R = np.array([2.2221e-05]) - - def update_input_time(self, msg): - """ - Update the time at which we have received the most recent input, based - on the timestamp in the header of a ROS message - - msg : a ROS message that includes a header with a timestamp that - indicates the time at which the respective input was originally - recorded - """ - self.last_time_secs = msg.header.stamp.secs - self.last_time_nsecs = msg.header.stamp.nsecs - new_time = self.last_time_secs + self.last_time_nsecs*1e-9 - # Compute the time interval since the last state transition / input - self.dt = new_time - self.last_state_transition_time - # Assert that the time step be non-negative. Negative time steps seem to - # be able to arise when more than one input node is running (i.e., both - # IR and flight controller nodes). This might result in small - # discrepencies between different nodes' delays from timestamping to - # publishing. Another solution might be to call rospy.Time.now() right - # before those nodes publish, or on the receiving end in this state - # estimator node - if self.dt < 0: - self.dt = 0 - # Set the current time at which we just received an input - # to be the last input time - self.last_state_transition_time = new_time - - def initialize_input_time(self, msg): - """ - Initialize the input time (self.last_state_transition_time) based on the - timestamp in the header of a ROS message. This is called before we start - filtering in order to attain an initial time value, which enables us to - then compute a time interval self.dt by calling self.update_input_time() - - msg : a ROS message that includes a header with a timestamp - """ - self.last_time_secs = msg.header.stamp.secs - self.last_time_nsecs = msg.header.stamp.nsecs - self.last_state_transition_time = (self.last_time_secs + - self.last_time_nsecs*1e-9) - - def ukf_predict(self): - """ - Compute the prior for the UKF based on the current state, a control - input, and a time step. - """ - # self.ukf.Q = Q_discrete_white_noise(self.state_vector_dim, dt=self.dt, var=1.0) - # self.ukf.Q = np.diag([0.01, 1.0])*0.05*self.dt - self.ukf.predict(dt=self.dt, u=self.last_control_input) - - def print_notice_if_first(self): - if not self.printed_filter_start_notice: - print('Starting filter') - self.printed_filter_start_notice = True - - def imu_data_callback(self, data): - """ - Handle the receipt of an Imu message. Only take the linear acceleration - along the z-axis. - """ - if self.in_callback: - return - self.in_callback = True - self.last_control_input = np.array([data.linear_acceleration.z]) - if abs(data.linear_acceleration.z) < 0.3: - # Adaptive filtering. Lower the process noise if acceleration is low - self.ukf.Q = np.diag([0.01, 1.0])*0.0005 - else: - self.ukf.Q = np.diag([0.01, 1.0])*0.005 - self.in_callback = False - - def altitude_data_callback(self, data): - """ - Handle the receipt of a Range message from the IR sensor. - """ - if self.in_callback: - return - self.in_callback = True - self.last_measurement_vector = np.array([data.range]) - if self.ready_to_filter: - self.update_input_time(data) - else: - self.initialize_input_time(data) - # Got a raw slant range reading, so update the initial state - # vector of the UKF - self.ukf.x[0] = data.range - self.ukf.x[1] = 0.0 # initialize velocity as 0 m/s - # Update the state covariance matrix to reflect estimated - # measurement error. Variance of the measurement -> variance of - # the corresponding state variable - self.ukf.P[0, 0] = self.ukf.R[0] - self.got_altitude = True - self.check_if_ready_to_filter() - self.in_callback = False - - def check_if_ready_to_filter(self): - self.ready_to_filter = self.got_altitude - - def publish_current_state(self): - """ - Publish the current state estimate and covariance from the UKF. This is - a State message containing: - - Header - - PoseWithCovariance - - TwistWithCovariance - Note that a lot of these ROS message fields will be left empty, as the - 2D UKF does not track information on the entire state space of the - drone. - """ - state_msg = Odometry() - state_msg.header.stamp.secs = self.last_time_secs - state_msg.header.stamp.nsecs = self.last_time_nsecs - state_msg.header.frame_id = 'global' - - # Get the current state estimate from self.ukf.x - state_msg.pose.pose.position.z = self.ukf.x[0] - state_msg.twist.twist.linear.z = self.ukf.x[1] - - # Fill the rest of the message with NaN - state_msg.pose.pose.position.x = np.nan - state_msg.pose.pose.position.y = np.nan - state_msg.pose.pose.orientation.x = np.nan - state_msg.pose.pose.orientation.y = np.nan - state_msg.pose.pose.orientation.z = np.nan - state_msg.pose.pose.orientation.w = np.nan - state_msg.twist.twist.linear.x = np.nan - state_msg.twist.twist.linear.y = np.nan - state_msg.twist.twist.angular.x = np.nan - state_msg.twist.twist.angular.y = np.nan - state_msg.twist.twist.angular.z = np.nan - - # Prepare covariance matrices - # 36-element array, in a row-major order, according to ROS msg docs - pose_cov_mat = np.full((36,), np.nan) - twist_cov_mat = np.full((36,), np.nan) - pose_cov_mat[14] = self.ukf.P[0, 0] # z variance - twist_cov_mat[14] = self.ukf.P[1, 1] # z velocity variance - - # Add covariances to message - state_msg.pose.covariance = pose_cov_mat - state_msg.twist.covariance = twist_cov_mat - - self.state_pub.publish(state_msg) - - def state_transition_function(self, x, dt, u): - """ - The state transition function to compute the prior in the prediction - step, propagating the state to the next time step. - - x : current state. A NumPy array - dt : time step. A float - u : control input. A NumPy array - """ - # State transition matrix F - F = np.array([[1, dt], - [0, 1]]) - # Integrate control input acceleration to get a change in velocity - change_from_control_input = np.array([0, - u[0]*dt]) - # change_from_control_input = np.array([0.5*u[0]*(dt**2.0), - # u[0]*dt]) - x_output = np.dot(F, x) + change_from_control_input - return x_output - - def measurement_function(self, x): - """ - Transform the state x into measurement space. In this simple model, we - assume that the range measurement corresponds exactly to position along - the z-axis, as we are assuming there is no pitch and roll. - - x : current state. A NumPy array - """ - # Measurement update matrix H - H = np.array([[1, 0]]) - hx_output = np.dot(H, x) - return hx_output - - def start_loop(self): - """ - Begin the UKF's loop of predicting and updating. Publish a state - estimate at the end of each loop. - """ - rate = rospy.Rate(self.loop_hz) - while not rospy.is_shutdown(): - if self.ready_to_filter: - self.print_notice_if_first() - self.ukf_predict() - - # Now that a prediction has been formed to bring the current - # prior state estimate to the same point in time as the - # measurement, perform a measurement update with the most recent - # IR range reading - self.ukf.update(self.last_measurement_vector) - self.publish_current_state() - rate.sleep() - - -def check_positive_float_duration(val): - """ - Function to check that the --loop_hz command-line argument is a positive - float. - """ - value = float(val) - if value <= 0.0: - raise argparse.ArgumentTypeError('Loop Hz must be positive') - return value - -def main(): - parser = argparse.ArgumentParser(description=('Estimate the drone\'s state ' - 'with a UKF in one spatial dimension')) - # Arguments to determine if the throttle command is being used. E.g.: - # rosrun topic_tools throttle messages /pidrone/altitude 40.0 - parser.add_argument('--altitude_throttled', action='store_true', - help=('Use throttled altitude topic altitude_throttle')) - parser.add_argument('--imu_throttled', action='store_true', - help=('Use throttled IMU topic imu_throttle')) - parser.add_argument('--loop_hz', '-hz', default=30.0, - type=check_positive_float_duration, - help=('Frequency at which to run the predict-update ' - 'loop of the UKF (default: 30)')) - # NOTE: The throttle flags are deprecated in favor of the loop Hz flag. - # Using throttled data streams while also running the UKF on a set - # loop can degrade the estimates. - args = parser.parse_args() - se = UKFStateEstimator2D(loop_hz=args.loop_hz, - altitude_throttled=args.altitude_throttled, - imu_throttled=args.imu_throttled) - try: - se.start_loop() - finally: - # Upon termination of this script, print out a helpful message - print('{} node terminating.'.format(se.node_name)) - print('Most recent state vector:') - print(se.ukf.x) - -if __name__ == '__main__': - main() diff --git a/packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_7d_node.py b/packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_7d_node.py deleted file mode 100755 index a4483214..00000000 --- a/packages/robots/duckiedrone/state_estimator/src/scripts/StateEstimators_old/state_estimator_ukf_7d_node.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python3 - -# ROS imports -# UKF imports -# The matplotlib imports and the matplotlib.use('Pdf') line make it so that the -# UKF code that imports matplotlib does not complain. Essentially, the -# use('Pdf') call allows a plot to be created without a window (allows it to run -# through ssh) -import rospy -import tf - -# Other imports -import numpy as np -import tf.transformations -from filterpy.kalman import MerweScaledSigmaPoints, UnscentedKalmanFilter -from geometry_msgs.msg import PoseStamped, TwistStamped -from sensor_msgs.msg import Imu, Range -from ..state_estimator_abs import StateEstimatorAbs - - -class UKFStateEstimator7D(StateEstimatorAbs): - """ - UKF-based state estimator for a 7D state space using various sensor inputs. - """ - - def __init__(self, loop_hz=30.0, altitude_throttled=False, imu_throttled=False, - optical_flow_throttled=False, camera_pose_throttled=False): - # Call the parent constructor - super().__init__(loop_hz, state_topic='state_estimator_node/ukf_7d') - - self.ready_to_filter = False - self.got_altitude = False - self.got_imu = False - self.angular_velocity = None - - # Initialize topic names with optional throttling - self.altitude_topic_str = 'altitude_node' + ('_throttle' if altitude_throttled else '') - self.imu_topic_str = 'imu' + ('_throttle' if imu_throttled else '') - self.optical_flow_topic_str = 'camera_node/twist' + ('_throttle' if optical_flow_throttled else '') - self.camera_pose_topic_str = 'camera_node/pose' + ('_throttle' if camera_pose_throttled else '') - - self.in_callback = False - self.initialize_estimator() - - # ROS Subscribers - self._imu_sub = rospy.Subscriber(self.imu_topic_str, Imu, self.imu_data_callback, queue_size=1) - self._altitude_sub = rospy.Subscriber(self.altitude_topic_str, Range, self.altitude_data_callback, queue_size=1) - self._of_sub = rospy.Subscriber(self.optical_flow_topic_str, TwistStamped, self.optical_flow_data_callback) - self._cam_pose_sub = rospy.Subscriber(self.camera_pose_topic_str, PoseStamped, self.camera_pose_data_callback) - - def initialize_estimator(self): - """ Initialize the UKF estimator parameters. """ - self.state_vector_dim = 7 - self.measurement_vector_dim = 6 - - sigma_points = MerweScaledSigmaPoints(n=self.state_vector_dim, - alpha=0.1, beta=2.0, - kappa=(3.0 - self.state_vector_dim)) - - self.ukf = UnscentedKalmanFilter(dim_x=self.state_vector_dim, - dim_z=self.measurement_vector_dim, - dt=1.0, - hx=self.measurement_function, - fx=self.state_transition_function, - points=sigma_points) - - self.initialize_ukf_matrices() - - def initialize_ukf_matrices(self): - """ Initialize the covariance matrices for the UKF. """ - self.ukf.P = np.diag([0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.0005]) - self.ukf.Q = np.diag([0.01, 0.01, 0.01, 1.0, 1.0, 1.0, 0.1]) * 0.005 - self.measurement_cov_altitude = np.array([2.2221e-05]) - self.measurement_cov_optical_flow = np.diag([0.01, 0.01]) - self.measurement_cov_camera_pose = np.diag([0.0025, 0.0025, 0.0003]) - self.ukf.R = np.diag([2.2221e-05, 0.0025, 0.0025, 0.01, 0.01, 0.0003]) - - def process_pose(self, pose_data : PoseStamped): - '''Process incoming pose data.''' - _, _, yaw = tf.transformations.euler_from_quaternion([ - pose_data.pose.orientation.x, pose_data.pose.orientation.y, - pose_data.pose.orientation.z, pose_data.pose.orientation.w]) - self.last_measurement_vector[1] = pose_data.pose.position.x - self.last_measurement_vector[2] = pose_data.pose.position.y - self.last_measurement_vector[5] = yaw - - def process_twist(self, twist_data : TwistStamped): - '''Process incoming twist data.''' - self.last_measurement_vector[3] = twist_data.twist.linear.x - self.last_measurement_vector[4] = twist_data.twist.linear.y - - def process_range(self, range_data : Range): - '''Process incoming range data.''' - self.last_measurement_vector[0] = range_data.range - - def compute_prior(self): - """ Predict the state using the UKF. """ - self.ukf.predict(dt=self.dt, u=self.last_control_input) - self.ukf.update(self.last_measurement_vector) - - def imu_data_callback(self, data): - """ Callback for IMU data. """ - if self.in_callback: - return - self.in_callback = True - - self.last_control_input = np.array([data.linear_acceleration.x, - data.linear_acceleration.y, - data.linear_acceleration.z]) - self.angular_velocity = data.angular_velocity - - if not self.ready_to_filter: - self.initialize_input_time(data) - self.got_imu = True - self.ready_to_filter = self.check_if_ready_to_filter() - - self.in_callback = False - - def altitude_data_callback(self, data): - """ Callback for altitude data. """ - if self.in_callback: - return - self.in_callback = True - - if self.ready_to_filter: - self.update_input_time(data) - else: - self.initialize_input_time(data) - self.ukf.x[2] = data.range - self.ukf.x[5] = 0.0 - self.ukf.P[2, 2] = self.measurement_cov_altitude[0] - self.got_altitude = True - self.ready_to_filter = self.check_if_ready_to_filter() - - self.in_callback = False - - def optical_flow_data_callback(self, data): - """ Callback for optical flow data. """ - if self.in_callback: - return - self.in_callback = True - - if self.ready_to_filter: - self.update_input_time(data) - else: - self.initialize_input_time(data) - self.ukf.x[3] = data.twist.linear.x - self.ukf.x[4] = data.twist.linear.y - self.ukf.P[3, 3] = self.measurement_cov_optical_flow[0, 0] - self.ukf.P[4, 4] = self.measurement_cov_optical_flow[1, 1] - self.ready_to_filter = self.check_if_ready_to_filter() - - self.in_callback = False - - def camera_pose_data_callback(self, data): - """ Callback for camera pose data. """ - if self.in_callback: - return - self.in_callback = True - - _, _, yaw = tf.transformations.euler_from_quaternion([ - data.pose.orientation.x, data.pose.orientation.y, - data.pose.orientation.z, data.pose.orientation.w]) - - if self.ready_to_filter: - self.update_input_time(data) - else: - self.initialize_input_time(data) - self.ukf.x[1] = data.pose.position.x - self.ukf.x[2] = data.pose.position.y - self.ukf.x[5] = yaw - self.ready_to_filter = self.check_if_ready_to_filter() - - self.in_callback = False - - def check_if_ready_to_filter(self): - """ Check if the estimator is ready to filter. """ - return self.got_altitude and self.got_imu - - def state_transition_function(self, state, dt): - """ Define the state transition function for the UKF. """ - new_state = state.copy() - new_state[0] += state[3] * dt - new_state[1] += state[4] * dt - new_state[2] += state[5] * dt - return new_state - - def measurement_function(self, state): - """ Define the measurement function for the UKF. """ - measurement = np.zeros(self.measurement_vector_dim) - measurement[0] = state[2] - measurement[1] = state[0] - measurement[2] = state[1] - measurement[3] = state[3] - measurement[4] = state[4] - measurement[5] = state[5] - return measurement - -def check_positive_float_duration(val): - """ - Function to check that the --loop_hz command-line argument is a positive - float. - """ - value = float(val) - if value <= 0.0: - raise argparse.ArgumentTypeError('Loop Hz must be positive') - return value \ No newline at end of file diff --git a/packages/robots/duckiedrone/state_estimator/src/scripts/__init__.py b/packages/robots/duckiedrone/state_estimator/src/scripts/__init__.py index 0b36b894..4b1e9612 100644 --- a/packages/robots/duckiedrone/state_estimator/src/scripts/__init__.py +++ b/packages/robots/duckiedrone/state_estimator/src/scripts/__init__.py @@ -1,3 +1,5 @@ from .state_estimator_abs import StateEstimatorAbs from .state_estimator_ema import StateEstimatorEMA -from .state_estimator_ukf_12d import StateEstimatorUKF12D \ No newline at end of file +from .state_estimator_ukf_12d import StateEstimatorUKF12D +from .state_estimator_ukf_7d import StateEstimatorUKF7D +from .state_estimator_ukf_2d import StateEstimatorUKF2D \ No newline at end of file diff --git a/packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_2d.py b/packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_2d.py new file mode 100644 index 00000000..7eeaec67 --- /dev/null +++ b/packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_2d.py @@ -0,0 +1,245 @@ +from geometry_msgs.msg import PoseStamped, TwistStamped +import numpy as np + +from nav_msgs.msg import Odometry +from sensor_msgs.msg import Imu, Range + +from filterpy.kalman import MerweScaledSigmaPoints, UnscentedKalmanFilter + +from .state_estimator_abs import StateEstimatorAbs + + +class StateEstimatorUKF2D(StateEstimatorAbs): + """ + Class that estimates the state of the drone using an Unscented Kalman Filter + (UKF) applied to raw sensor data. The filter only tracks the quadcopter's + motion along one spatial dimension: the global frame z-axis. In this + simplified case, we assume that the drone's body frame orientation is + aligned with the world frame (i.e., no roll, pitch, or yaw), and that it is + only offset along the z-axis. It is called a 2D UKF because we track two + state variables in the state vector: z position and z velocity. + """ + + def __init__(self): + # Call the parent constructor + super().__init__() + + # Initialize the estimator + self.initialize_estimator() + + # Initialize flags and variables + self.ready_to_filter = False + self.printed_filter_start_notice = False + self.got_altitude = False + self.got_imu = False + self.in_callback = False + + # Time management + self.last_state_transition_time = None + self.dt = None + + # Last control input + self.last_control_input = np.array([0.0]) + + def initialize_estimator(self): + """ + Initialize the parameters of the Unscented Kalman Filter (UKF) that is + used to estimate the state of the drone. + """ + # Number of state variables being tracked + self.state_vector_dim = 2 + + # Number of measurement variables that the drone receives + self.measurement_vector_dim = 1 + + # Function to generate sigma points for the UKF + sigma_points = MerweScaledSigmaPoints( + n=self.state_vector_dim, + alpha=0.1, + beta=2.0, + kappa=(3.0 - self.state_vector_dim), + ) + + # Create the UKF object + self.ukf = UnscentedKalmanFilter( + dim_x=self.state_vector_dim, + dim_z=self.measurement_vector_dim, + dt=1.0, + hx=self.measurement_function, + fx=self.state_transition_function, + points=sigma_points, + ) + self.initialize_ukf_matrices() + + def initialize_ukf_matrices(self): + """ + Initialize the covariance matrices of the UKF. + """ + # Initialize state covariance matrix P + self.ukf.P = np.diag([0.1, 0.2]) + + # Initialize the process noise covariance matrix Q + self.ukf.Q = np.diag([0.01, 1.0]) * 0.005 + + # Initialize the measurement covariance matrix R + self.measurement_cov_altitude = np.array([2.2221e-05]) + + def process_imu(self, imu_data: Imu): + """ + Process IMU data and perform prediction step. + """ + if self.in_callback: + return + self.in_callback = True + + self.last_control_input = np.array([imu_data.linear_acceleration.z]) + + # Adjust process noise based on acceleration magnitude + if abs(imu_data.linear_acceleration.z) < 0.3: + # Lower process noise if acceleration is low + self.ukf.Q = np.diag([0.01, 1.0]) * 0.0005 + else: + self.ukf.Q = np.diag([0.01, 1.0]) * 0.005 + + if self.ready_to_filter: + self.update_input_time(imu_data) + self.compute_prior() + self.update_current_state() + else: + self.initialize_input_time(imu_data) + self.got_imu = True + self.check_if_ready_to_filter() + + self.in_callback = False + + def process_range(self, range_data: Range): + """ + Process altitude data and perform prediction and update steps. + """ + if self.in_callback: + return + self.in_callback = True + + if self.ready_to_filter: + self.update_input_time(range_data) + self.compute_prior() + + measurement_z = np.array([range_data.range]) + + self.ukf.update( + measurement_z, + hx=self.measurement_function_altitude, + R=self.measurement_cov_altitude, + ) + self.update_current_state() + else: + self.initialize_input_time(range_data) + self.ukf.x[0] = range_data.range # z position + self.ukf.x[1] = 0.0 # z velocity initialized to zero + self.ukf.P[0, 0] = self.measurement_cov_altitude[0] + self.got_altitude = True + self.check_if_ready_to_filter() + + self.in_callback = False + + def process_pose(self, pose_data: PoseStamped): + return super().process_pose(pose_data) + + def process_twist(self, twist_data: TwistStamped): + return super().process_twist(twist_data) + + def compute_prior(self): + """ + Compute the prior for the UKF based on the current state, control input, and time step. + """ + self.ukf.predict(dt=self.dt, u=self.last_control_input) + + def update_current_state(self): + """ + Publish the current state estimate and covariance from the UKF. + """ + state_msg = Odometry() + state_msg.header.stamp.secs = self.last_time_secs + state_msg.header.stamp.nsecs = self.last_time_nsecs + state_msg.header.frame_id = "global" + + # Get the current state estimate from self.ukf.x + state_msg.pose.pose.position.z = self.ukf.x[0] + state_msg.twist.twist.linear.z = self.ukf.x[1] + + # Fill the rest of the message with NaN + state_msg.pose.pose.position.x = np.nan + state_msg.pose.pose.position.y = np.nan + state_msg.pose.pose.orientation.x = np.nan + state_msg.pose.pose.orientation.y = np.nan + state_msg.pose.pose.orientation.z = np.nan + state_msg.pose.pose.orientation.w = np.nan + state_msg.twist.twist.linear.x = np.nan + state_msg.twist.twist.linear.y = np.nan + state_msg.twist.twist.angular.x = np.nan + state_msg.twist.twist.angular.y = np.nan + state_msg.twist.twist.angular.z = np.nan + + # Prepare covariance matrices + pose_cov_mat = np.full((36,), np.nan) + twist_cov_mat = np.full((36,), np.nan) + pose_cov_mat[14] = self.ukf.P[0, 0] # z variance + twist_cov_mat[14] = self.ukf.P[1, 1] # z velocity variance + + # Add covariances to message + state_msg.pose.covariance = pose_cov_mat.tolist() + state_msg.twist.covariance = twist_cov_mat.tolist() + + self.state = state_msg + + def initialize_input_time(self, msg): + """ + Initialize the input time based on the timestamp in the header of a ROS message. + """ + self.last_time_secs = msg.header.stamp.secs + self.last_time_nsecs = msg.header.stamp.nsecs + self.last_state_transition_time = self.last_time_secs + self.last_time_nsecs * 1e-9 + + def update_input_time(self, msg): + """ + Update the time at which we have received the most recent input, based on the timestamp. + """ + self.last_time_secs = msg.header.stamp.secs + self.last_time_nsecs = msg.header.stamp.nsecs + new_time = self.last_time_secs + self.last_time_nsecs * 1e-9 + self.dt = new_time - self.last_state_transition_time + if self.dt < 0: + self.dt = 0 + self.last_state_transition_time = new_time + + def check_if_ready_to_filter(self): + """ + Check if the estimator is ready to start filtering. + """ + self.ready_to_filter = self.got_altitude and self.got_imu + + def state_transition_function(self, x, dt, u): + """ + The state transition function to compute the prior in the prediction step, + propagating the state to the next time step. + """ + # State transition matrix F + F = np.array([[1, dt], [0, 1]]) + + # Change from control input (acceleration) + change_from_control_input = np.array([0, u[0] * dt]) + + x_output = np.dot(F, x) + change_from_control_input + return x_output + + def measurement_function(self, x): + """ + Default measurement function (not used in this implementation). + """ + pass + + def measurement_function_altitude(self, x): + """ + Measurement function for altitude data. + """ + return np.array([x[0]]) diff --git a/packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_7d.py b/packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_7d.py new file mode 100644 index 00000000..0b172554 --- /dev/null +++ b/packages/robots/duckiedrone/state_estimator/src/scripts/state_estimator_ukf_7d.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python3 + +import tf + +import numpy as np +import tf.transformations +from filterpy.kalman import MerweScaledSigmaPoints, UnscentedKalmanFilter +from geometry_msgs.msg import TwistStamped +from sensor_msgs.msg import Imu +from nav_msgs.msg import Odometry +from math import pi + +from .state_estimator_abs import StateEstimatorAbs + + +class StateEstimatorUKF7D(StateEstimatorAbs): + """ + UKF-based state estimator for a 7D state space using various sensor inputs. + """ + + def __init__( + self, + altitude_throttled=False, + imu_throttled=False, + optical_flow_throttled=False, + camera_pose_throttled=False, + ): + # Call the parent constructor + super().__init__() + + # Initialize the estimator + self.initialize_estimator() + + # Initialize flags and variables + self.ready_to_filter = False + self.printed_filter_start_notice = False + self.got_imu = False + self.got_altitude = False + self.got_optical_flow = False + self.got_camera_pose = False + self.in_callback = False + + # Time management + self.last_state_transition_time = None + self.dt = None + + # Last control input + self.last_control_input = np.array([0.0, 0.0, 0.0]) + + + def initialize_estimator(self): + """Initialize the UKF estimator parameters.""" + self.state_vector_dim = 7 + self.measurement_vector_dim = 6 + + sigma_points = MerweScaledSigmaPoints( + n=self.state_vector_dim, + alpha=0.1, + beta=2.0, + kappa=(3.0 - self.state_vector_dim), + ) + + self.ukf = UnscentedKalmanFilter( + dim_x=self.state_vector_dim, + dim_z=self.measurement_vector_dim, + dt=1.0, + hx=self.measurement_function, + fx=self.state_transition_function, + points=sigma_points, + residual_x=self.residual_x_account_for_angles, + ) + + self.initialize_ukf_matrices() + + def initialize_ukf_matrices(self): + """Initialize the covariance matrices for the UKF.""" + self.ukf.P = np.diag([0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.0005]) + self.ukf.Q = np.diag([0.01, 0.01, 0.01, 1.0, 1.0, 1.0, 0.1]) * 0.005 + + # Measurement covariance matrices + self.measurement_cov_altitude = np.array([2.2221e-05]) + self.measurement_cov_optical_flow = np.diag([0.01, 0.01]) + self.measurement_cov_camera_pose = np.diag([0.0025, 0.0025, 0.0003]) + + def process_imu(self, imu_data: Imu): + """Process IMU data and perform prediction and update steps.""" + if self.in_callback: + return + self.in_callback = True + + self.last_control_input = np.array( + [ + imu_data.linear_acceleration.x, + imu_data.linear_acceleration.y, + imu_data.linear_acceleration.z, + ] + ) + # Assuming we might use angular velocities in future + # self.angular_velocity = imu_data.angular_velocity + + if self.ready_to_filter: + self.update_input_time(imu_data) + self.compute_prior() + + # No measurement update from IMU in this simplified example + self.update_current_state() + else: + self.initialize_input_time(imu_data) + self.got_imu = True + self.check_if_ready_to_filter() + + self.in_callback = False + + def process_range(self, range_data): + """Process altitude data and perform prediction and update steps.""" + if self.in_callback: + return + self.in_callback = True + + if self.ready_to_filter: + self.update_input_time(range_data) + self.compute_prior() + + measurement_z = np.array([range_data.range]) + + self.ukf.update( + measurement_z, + hx=self.measurement_function_altitude, + R=self.measurement_cov_altitude, + ) + self.update_current_state() + else: + self.initialize_input_time(range_data) + self.ukf.x[2] = range_data.range # Altitude (z position) + self.ukf.P[2, 2] = self.measurement_cov_altitude[0] + self.got_altitude = True + self.check_if_ready_to_filter() + + self.in_callback = False + + def process_twist(self, twist_data: TwistStamped): + """Process optical flow data and perform prediction and update steps.""" + if self.in_callback: + return + self.in_callback = True + + if self.ready_to_filter: + self.update_input_time(twist_data) + self.compute_prior() + + measurement_z = np.array( + [twist_data.twist.linear.x, twist_data.twist.linear.y] + ) + + self.ukf.update( + measurement_z, + hx=self.measurement_function_optical_flow, + R=self.measurement_cov_optical_flow, + ) + self.update_current_state() + else: + self.initialize_input_time(twist_data) + self.ukf.x[3] = twist_data.twist.linear.x # x velocity + self.ukf.x[4] = twist_data.twist.linear.y # y velocity + self.ukf.P[3, 3] = self.measurement_cov_optical_flow[0, 0] + self.ukf.P[4, 4] = self.measurement_cov_optical_flow[1, 1] + self.got_optical_flow = True + self.check_if_ready_to_filter() + + self.in_callback = False + + def process_pose(self, pose_data): + """Process camera pose data and perform prediction and update steps.""" + if self.in_callback: + return + self.in_callback = True + + _, _, yaw = tf.transformations.euler_from_quaternion( + [ + pose_data.pose.orientation.x, + pose_data.pose.orientation.y, + pose_data.pose.orientation.z, + pose_data.pose.orientation.w, + ] + ) + + if self.ready_to_filter: + self.update_input_time(pose_data) + self.compute_prior() + + measurement_z = np.array( + [ + pose_data.pose.position.x, + pose_data.pose.position.y, + yaw, + ] + ) + + self.ukf.update( + measurement_z, + hx=self.measurement_function_camera_pose, + R=self.measurement_cov_camera_pose, + ) + self.update_current_state() + else: + self.initialize_input_time(pose_data) + self.ukf.x[0] = pose_data.pose.position.x + self.ukf.x[1] = pose_data.pose.position.y + self.ukf.x[5] = yaw + self.ukf.P[0, 0] = self.measurement_cov_camera_pose[0, 0] + self.ukf.P[1, 1] = self.measurement_cov_camera_pose[1, 1] + self.ukf.P[5, 5] = self.measurement_cov_camera_pose[2, 2] + self.got_camera_pose = True + self.check_if_ready_to_filter() + + self.in_callback = False + + def compute_prior(self): + """Predict the state using the UKF.""" + self.ukf.predict(dt=self.dt, u=self.last_control_input) + + def update_current_state(self): + """Publish the current state estimate and covariance from the UKF.""" + state_msg = Odometry() + state_msg.header.stamp.secs = self.last_time_secs + state_msg.header.stamp.nsecs = self.last_time_nsecs + state_msg.header.frame_id = "global" + + # Get the current state estimate from self.ukf.x + state_msg.pose.pose.position.x = self.ukf.x[0] + state_msg.pose.pose.position.y = self.ukf.x[1] + state_msg.pose.pose.position.z = self.ukf.x[2] + state_msg.twist.twist.linear.x = self.ukf.x[3] + state_msg.twist.twist.linear.y = self.ukf.x[4] + state_msg.twist.twist.linear.z = self.ukf.x[5] + + quaternion = tf.transformations.quaternion_from_euler(0, 0, self.ukf.x[5]) + state_msg.pose.pose.orientation.x = quaternion[0] + state_msg.pose.pose.orientation.y = quaternion[1] + state_msg.pose.pose.orientation.z = quaternion[2] + state_msg.pose.pose.orientation.w = quaternion[3] + + # Extract covariances + P = self.ukf.P + # Pose covariance + pose_covariance = np.zeros(36) + pose_covariance[0] = P[0, 0] # x + pose_covariance[7] = P[1, 1] # y + pose_covariance[14] = P[2, 2] # z + pose_covariance[21] = 0.0 # Roll variance (not estimated) + pose_covariance[28] = 0.0 # Pitch variance (not estimated) + pose_covariance[35] = P[5, 5] # Yaw + + state_msg.pose.covariance = pose_covariance.tolist() + + # Twist covariance + twist_covariance = np.zeros(36) + twist_covariance[0] = P[3, 3] # x_vel + twist_covariance[7] = P[4, 4] # y_vel + twist_covariance[14] = P[5, 5] # z_vel + + state_msg.twist.covariance = twist_covariance.tolist() + + self.state = state_msg + + def initialize_input_time(self, msg): + """ + Initialize the input time based on the timestamp in the header of a ROS message. + """ + self.last_time_secs = msg.header.stamp.secs + self.last_time_nsecs = msg.header.stamp.nsecs + self.last_state_transition_time = self.last_time_secs + self.last_time_nsecs * 1e-9 + + def update_input_time(self, msg): + """ + Update the time at which we have received the most recent input, based on the timestamp. + """ + self.last_time_secs = msg.header.stamp.secs + self.last_time_nsecs = msg.header.stamp.nsecs + new_time = self.last_time_secs + self.last_time_nsecs * 1e-9 + self.dt = new_time - self.last_state_transition_time + self.last_state_transition_time = new_time + + def check_if_ready_to_filter(self): + """Check if the estimator is ready to start filtering.""" + self.ready_to_filter = self.got_imu and self.got_altitude + + def state_transition_function(self, x, dt, u): + """Define the state transition function for the UKF.""" + F = np.eye(self.state_vector_dim) + F[0, 3] = dt + F[1, 4] = dt + F[2, 5] = dt + + # Compute the change from the control input (accelerations) + accelerations = u + velocities = accelerations * dt + change_from_control_input = np.array( + [0, 0, 0, velocities[0], velocities[1], velocities[2], 0] + ) + x_output = np.dot(F, x) + change_from_control_input + x_output[5] = x_output[5] % (2 * pi) # Ensure yaw stays within [0, 2*pi] + return x_output + + def residual_x_account_for_angles(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: + """Compute the residual for the state vector x, accounting for angles.""" + output_residual = a - b + # Adjust yaw angle residual + yaw_residual = output_residual[5] + yaw_residual = (yaw_residual + pi) % (2 * pi) - pi + output_residual[5] = yaw_residual + return output_residual + + def measurement_function(self, x): + """Default measurement function (not used in this implementation).""" + pass + + def measurement_function_altitude(self, x): + """Measurement function for altitude data.""" + return np.array([x[2]]) + + def measurement_function_optical_flow(self, x): + """Measurement function for optical flow data.""" + return np.array([x[3], x[4]]) + + def measurement_function_camera_pose(self, x): + """Measurement function for camera pose data.""" + return np.array([x[0], x[1], x[5]]) + + def measurement_function_complete(self, x): + """Measurement function when all measurements are combined.""" + return np.array([x[2], x[0], x[1], x[3], x[4], x[5]]) diff --git a/packages/robots/duckiedrone/state_estimator/src/state_estimator_node.py b/packages/robots/duckiedrone/state_estimator/src/state_estimator_node.py index 635a7c90..b21ffe18 100755 --- a/packages/robots/duckiedrone/state_estimator/src/state_estimator_node.py +++ b/packages/robots/duckiedrone/state_estimator/src/state_estimator_node.py @@ -9,7 +9,14 @@ from sensor_msgs.msg import Imu, Range from geometry_msgs.msg import PoseStamped, TwistStamped -from scripts import StateEstimatorEMA, StateEstimatorUKF12D, StateEstimatorAbs +from scripts import ( + StateEstimatorEMA, + StateEstimatorUKF12D, + StateEstimatorUKF7D, + StateEstimatorUKF2D, + StateEstimatorAbs, +) + class StateEstimatorNode(DTROS): """ @@ -117,8 +124,8 @@ def twist_cb(self, msg : TwistStamped): def get_state_estimator(self) -> StateEstimatorAbs: filters_name_to_class = { 'ema': StateEstimatorEMA, - 'ukf2d': NotImplementedError, - 'ukf7d': NotImplementedError, + 'ukf2d': StateEstimatorUKF2D, + 'ukf7d': StateEstimatorUKF7D, 'ukf12d': StateEstimatorUKF12D, 'mocap': NotImplementedError, 'simulator': NotImplementedError From 508b3b0ed36fadde27da5e2505139d1cefadabcc Mon Sep 17 00:00:00 2001 From: Davide Iafrate Date: Mon, 30 Sep 2024 13:46:36 +0000 Subject: [PATCH 2/2] Refactor OpticalFlowNode to publish TwistStamped instead of Odometry --- .../optical_flow/src/optical_flow_node.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/packages/robots/duckiedrone/optical_flow/src/optical_flow_node.py b/packages/robots/duckiedrone/optical_flow/src/optical_flow_node.py index c219b56f..7a9e01e1 100755 --- a/packages/robots/duckiedrone/optical_flow/src/optical_flow_node.py +++ b/packages/robots/duckiedrone/optical_flow/src/optical_flow_node.py @@ -14,8 +14,8 @@ from duckietown.dtros import DTParam, DTROS, NodeType, ParamType from duckietown_msgs.msg import Segment, SegmentList from geometry_msgs.msg import Point as PointMsg +from geometry_msgs.msg import TwistStamped -from nav_msgs.msg import Odometry from sensor_msgs.msg import CompressedImage, Range from opencv_apps.msg import FlowArrayStamped, Flow @@ -59,7 +59,7 @@ def __init__(self, node_name): self.pub_debug_image = rospy.Publisher( "~debug/image/compressed", CompressedImage, queue_size=1 ) - self.pub_odometry = rospy.Publisher("~visual_odometry", Odometry, queue_size=1) + self.pub_odometry = rospy.Publisher("~visual_odometry", TwistStamped, queue_size=1) self.pub_motion_vectors = rospy.Publisher( "~lineseglist_out", SegmentList, queue_size=1 ) @@ -161,15 +161,14 @@ def cb_projected_motion_vectors(self, msg: SegmentList): self.logdebug(f"Computed velocity vector [m/s]: {velocity}") # Publish the optical flow vector as odometry - odometry_msg = Odometry() - odometry_msg.header.stamp = now + velocity_msg = TwistStamped() + velocity_msg.header.stamp = now # TODO: change this to the correct frame - odometry_msg.child_frame_id = "base_link" - odometry_msg.twist.twist.linear.x = velocity[0] - odometry_msg.twist.twist.linear.y = velocity[1] + velocity_msg.twist.linear.x = velocity[0] + velocity_msg.twist.linear.y = velocity[1] - self.pub_odometry.publish(odometry_msg) + self.pub_odometry.publish(velocity_msg) # self.last_stamp = now