|
7 | 7 | from pathlib import Path
|
8 | 8 | import casadi as ca
|
9 | 9 | import cyecca.lie as lie
|
10 |
| -from cyecca.lie.group_so3 import SO3Quat, SO3EulerB321 |
11 |
| -from cyecca.lie.group_se23 import SE23Quat, se23, SE23LieGroupElement, SE23LieAlgebraElement |
| 10 | +from cyecca.lie.group_so3 import SO3Quat, SO3EulerB321, so3 |
| 11 | +from cyecca.lie.group_se23 import ( |
| 12 | + SE23Quat, |
| 13 | + se23, |
| 14 | + SE23LieGroupElement, |
| 15 | + SE23LieAlgebraElement, |
| 16 | +) |
12 | 17 | from cyecca.symbolic import SERIES
|
13 | 18 |
|
14 | 19 | print('python: ', sys.executable)
|
|
33 | 38 | z_integral_max = 0 # 5.0
|
34 | 39 | ki_z = 0.05 # velocity z integral gain
|
35 | 40 |
|
| 41 | +# estimator params |
| 42 | +att_w_acc =0.2 |
| 43 | +att_w_gyro_bias = 0.1 |
| 44 | +param_att_w_mag = 0.2 |
| 45 | + |
36 | 46 | def derive_control_allocation(
|
37 | 47 | ):
|
38 | 48 | """
|
@@ -457,6 +467,125 @@ def derive_strapdown_ins_propagation():
|
457 | 467 | }
|
458 | 468 | return eqs
|
459 | 469 |
|
| 470 | +def derive_attitude_estimator(): |
| 471 | + # Define Casadi variables |
| 472 | + q0 = ca.SX.sym("q", 4) |
| 473 | + q = SO3Quat.elem(param=q0) |
| 474 | + mag = ca.SX.sym("mag", 3) |
| 475 | + mag_decl = ca.SX.sym("mag_decl", 1) |
| 476 | + gyro = ca.SX.sym("gyro", 3) |
| 477 | + accel = ca.SX.sym("accel", 3) |
| 478 | + dt = ca.SX.sym("dt", 1) |
| 479 | + |
| 480 | + # Convert magnetometer to quat |
| 481 | + mag1 = SO3Quat.elem(ca.vertcat(0, mag)) |
| 482 | + |
| 483 | + # correction angular velocity vector |
| 484 | + correction = ca.SX.zeros(3, 1) |
| 485 | + |
| 486 | + # Convert vector to world frame and extract xy component |
| 487 | + spin_rate = ca.norm_2(gyro) |
| 488 | + mag_earth = (q.inverse() * mag1 * q).param[1:] |
| 489 | + |
| 490 | + mag_err = ( |
| 491 | + ca.fmod(ca.atan2(mag_earth[1], mag_earth[0]) - mag_decl + ca.pi, 2 * ca.pi) |
| 492 | + - ca.pi |
| 493 | + ) |
| 494 | + |
| 495 | + # Change gain if spin rate is large |
| 496 | + fifty_dps = 0.873 |
| 497 | + gain_mult = ca.if_else(spin_rate > fifty_dps, ca.fmin(spin_rate / fifty_dps, 10), 1) |
| 498 | + |
| 499 | + # Move magnetometer correction in body frame |
| 500 | + correction += ( |
| 501 | + (q.inverse() * SO3Quat.elem(ca.vertcat(0, 0, 0, mag_err)) * q).param[1:] |
| 502 | + * param_att_w_mag |
| 503 | + * gain_mult |
| 504 | + ) |
| 505 | + |
| 506 | + # Correction from accelerometer |
| 507 | + accel_norm_sq = ca.norm_2(accel) ** 2 |
| 508 | + |
| 509 | + # Correct accelerometer only if g between |
| 510 | + higher_lim_check = ca.if_else(accel_norm_sq < ((g * 1.1) ** 2), 1, 0) |
| 511 | + lower_lim_check = ca.if_else(accel_norm_sq > ((g * 0.9) ** 2), 1, 0) |
| 512 | + |
| 513 | + # Correct gravity as z |
| 514 | + correction += ( |
| 515 | + lower_lim_check |
| 516 | + * higher_lim_check |
| 517 | + * ca.cross(np.array([[0], [0], [-1]]), accel / ca.norm_2(accel)) |
| 518 | + * att_w_acc |
| 519 | + ) |
| 520 | + |
| 521 | + ## TODO add gyro bias stuff |
| 522 | + |
| 523 | + # Add gyro to correction |
| 524 | + correction += gyro |
| 525 | + |
| 526 | + # Make the correction |
| 527 | + q1 = q * so3.elem(correction * dt).exp(SO3Quat) |
| 528 | + |
| 529 | + # Return estimator |
| 530 | + f_att_estimator = ca.Function( |
| 531 | + "attitude_estimator", |
| 532 | + [q0, mag, mag_decl, gyro, accel, dt], |
| 533 | + [q1.param], |
| 534 | + ["q", "mag", "mag_decl", "gyro", "accel", "dt"], |
| 535 | + ["q1"], |
| 536 | + ) |
| 537 | + |
| 538 | + return {"attitude_estimator": f_att_estimator} |
| 539 | + |
| 540 | +def derive_position_correction(): |
| 541 | + ## Initilaizing measurments |
| 542 | + z = ca.SX.sym("gps", 3) |
| 543 | + dt = ca.SX.sym("dt", 1) |
| 544 | + P = ca.SX.sym("P", 6, 6) |
| 545 | + |
| 546 | + # Initialize state |
| 547 | + est_x = ca.SX.sym("est", 10) # [x,y,z,u,v,w,q0,q1,q2,q3] |
| 548 | + x0 = est_x[0:6] # [x,y,z,u,v,w] |
| 549 | + |
| 550 | + # Define the state transition matrix (A) |
| 551 | + A = ca.SX.eye(6) |
| 552 | + A[0:3, 3:6] = np.eye(3) * dt # The velocity elements multiply by dt |
| 553 | + |
| 554 | + ## TODO: may need to pass Q and R throught the casadi function |
| 555 | + Q = np.eye(6) * 1e-5 # Process noise (uncertainty in system model) |
| 556 | + R = np.eye(3) * 1e-2 # Measurement noise (uncertainty in sensors) |
| 557 | + |
| 558 | + # Measurement matrix |
| 559 | + H = ca.horzcat(ca.SX.eye(3), ca.SX.zeros(3, 3)) |
| 560 | + |
| 561 | + # extrapolate uncertainty |
| 562 | + P_s = A @ P @ A.T + Q |
| 563 | + |
| 564 | + ## Measurment Update |
| 565 | + ## vel is a basic integral given acceleration values. need to figure out how to get v0 |
| 566 | + y = H @ P_s @ H.T + R |
| 567 | + |
| 568 | + # Update Kalman Gain |
| 569 | + K = P_s @ H.T @ ca.inv(y) |
| 570 | + |
| 571 | + # Update estimate w/ measurment |
| 572 | + x_new = x0 + K @ (z - H @ x0) |
| 573 | + |
| 574 | + # Update the measurement uncertainty |
| 575 | + P_new = (np.eye(6) - (K @ H)) @ P_s |
| 576 | + |
| 577 | + # Return to have attitude updated |
| 578 | + x_new = ca.vertcat(x_new, ca.SX.zeros(4)) |
| 579 | + |
| 580 | + f_pos_estimator = ca.Function( |
| 581 | + "position_correction", |
| 582 | + [est_x, z, dt, P], |
| 583 | + [x_new, P_new], |
| 584 | + ["est_x", "gps", "dt", "P"], |
| 585 | + ["x_new", "P_new"], |
| 586 | + ) |
| 587 | + return {"position_correction": f_pos_estimator} |
| 588 | + |
460 | 589 |
|
461 | 590 | def generate_code(eqs: dict, filename, dest_dir: str, **kwargs):
|
462 | 591 | """
|
@@ -499,6 +628,8 @@ def generate_code(eqs: dict, filename, dest_dir: str, **kwargs):
|
499 | 628 | eqs.update(derive_joy_auto_level())
|
500 | 629 | eqs.update(derive_strapdown_ins_propagation())
|
501 | 630 | eqs.update(derive_control_allocation())
|
| 631 | + eqs.update(derive_attitude_estimator()) |
| 632 | + eqs.update(derive_position_correction()) |
502 | 633 |
|
503 | 634 | for name, eq in eqs.items():
|
504 | 635 | print('eq: ', name)
|
|
0 commit comments