diff --git a/PathTracking/lqr_speed_steer_control/lqr.py b/PathTracking/lqr_speed_steer_control/lqr.py new file mode 100644 index 0000000000..1f2cefec66 --- /dev/null +++ b/PathTracking/lqr_speed_steer_control/lqr.py @@ -0,0 +1,610 @@ +''' +https://atsushisakai.github.io/PythonRobotics/modules/6_path_tracking/lqr_speed_and_steering_control/lqr_speed_and_steering_control.html +''' + +import math +import sys +import numpy as np +import scipy.linalg as la +import pathlib +sys.path.append(str(pathlib.Path(__file__).parent.parent.parent)) + +np.set_printoptions(precision=3) +np.set_printoptions(suppress=True) +np.set_printoptions(formatter={'float': '{: 0.3f}'.format}) + +class bcolors: + # https://godoc.org/github.com/whitedevops/colors + DEFAULT = "\033[39m" + BLACK = "\033[30m" + RED = "\033[31m" + GREEN = "\033[32m" + YELLOW = "\033[33m" + BLUE = "\033[34m" + MAGENTA = "\033[35m" + CYAN = "\033[36m" + LGRAY = "\033[37m" + DARKGRAY = "\033[90m" + FAIL = "\033[91m" + OKGREEN = '\033[92m' + WARNING = '\033[93m' + OKBLUE = '\033[94m' + HEADER = '\033[95m' + LIGHTCYAN = '\033[96m' + WHITE = "\033[97m" + + ENDC = '\033[0m' + BOLD = '\033[1m' + DIM = "\033[2m" + UNDERLINE = '\033[4m' + BLINK = "\033[5m" + REVERSE = "\033[7m" + HIDDEN = "\033[8m" + + BG_DEFAULT = "\033[49m" + BG_BLACK = "\033[40m" + BG_RED = "\033[41m" + BG_GREEN = "\033[42m" + BG_YELLOW = "\033[43m" + BG_BLUE = "\033[44m" + BG_MAGENTA = "\033[45m" + BG_CYAN = "\033[46m" + BG_GRAY = "\033[47m" + BG_DKGRAY = "\033[100m" + BG_LRED = "\033[101m" + BG_LGREEN = "\033[102m" + BG_LYELLOW = "\033[103m" + BG_LBLUE = "\033[104m" + BG_LMAGENTA = "\033[105m" + BG_LCYAN = "\033[106m" + BG_WHITE = "\033[107m" + +def print_color_str(s, clr): + print(clr + s + bcolors.ENDC) + +def angle_mod(x, zero_2_2pi=False, degree=False): + """ + Angle modulo operation + Default angle modulo range is [-pi, pi) + + Parameters + ---------- + x : float or array_like + A angle or an array of angles. This array is flattened for + the calculation. When an angle is provided, a float angle is returned. + zero_2_2pi : bool, optional + Change angle modulo range to [0, 2pi) + Default is False. + degree : bool, optional + If True, then the given angles are assumed to be in degrees. + Default is False. + + Returns + ------- + ret : float or ndarray + an angle or an array of modulated angle. + + Examples + -------- + >>> angle_mod(-4.0) + 2.28318531 + + >>> angle_mod([-4.0]) + np.array(2.28318531) + + >>> angle_mod([-150.0, 190.0, 350], degree=True) + array([-150., -170., -10.]) + + >>> angle_mod(-60.0, zero_2_2pi=True, degree=True) + array([300.]) + + """ + if isinstance(x, float): + is_float = True + else: + is_float = False + + x = np.asarray(x).flatten() + if degree: + x = np.deg2rad(x) + + if zero_2_2pi: + mod_angle = x % (2 * np.pi) + else: + mod_angle = (x + np.pi) % (2 * np.pi) - np.pi + + if degree: + mod_angle = np.rad2deg(mod_angle) + + if is_float: + return mod_angle.item() + else: + return mod_angle + +def pi_2_pi(angle): + return angle_mod(angle) + +def modulo(angle): + # // reduce the angle + angle = angle % 360 + + # // force it to be the positive remainder, so that 0 <= angle < 360 + angle = (angle + 360) % 360 + + # // force into the minimum absolute value residue class, so that -180 < angle <= 180 + if (angle > 180): + angle -= 360 + + return angle + +def modulo_rad(rad): + x = modulo(np.rad2deg(rad)) + return np.deg2rad(x) + +class State: + def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0): + self.x = x + self.y = y + self.yaw = yaw + self.v = v + + self.last_yaw = None + + self.e = 0.0 + self.e_th = 0.0 + +def solve_dare(A, B, Q, R): + """ + solve a discrete time_Algebraic Riccati equation (DARE) + """ + x = Q + x_next = Q + max_iter = 150 + eps = 0.01 + + for i in range(max_iter): + x_next = A.T @ x @ A - A.T @ x @ B @ \ + la.inv(R + B.T @ x @ B) @ B.T @ x @ A + Q + if (abs(x_next - x)).max() < eps: + break + x = x_next + + return x_next + +def dlqr(A, B, Q, R): + """Solve the discrete time lqr controller. + x[k+1] = A x[k] + B u[k] + cost = sum x[k].T*Q*x[k] + u[k].T*R*u[k] + # ref Bertsekas, p.151 + """ + + try: + + # first, try to solve the ricatti equation + X = solve_dare(A, B, Q, R) + + # compute the LQR gain + K = la.inv(B.T @ X @ B + R) @ (B.T @ X @ A) + + eig_result = la.eig(A - B @ K) + except Exception as e: + import ipdb; ipdb.set_trace() + + return K, X, eig_result[0] + +def smallest_diff(a, b): + # candidates = np.array([ + # a - b, + # a - (b + 2*np.pi), + # a - (b - 2*np.pi) + # ]) + # candidates2 = np.abs(candidates) + # return candidates[np.argmin(candidates2)] + + # a = (a + np.pi) % 2*np.pi - np.pi + # return a + + # return math.atan2(np.sin(a-b), np.cos(a-b)) + + diff = ( np.rad2deg(a) - np.rad2deg(b) + 180 ) % 360 - 180 + a = diff + 360 if diff < -180 else diff + return np.deg2rad(a) + +def calc_nearest_index(state, cx, cy, cyaw, a, b, debug=False): + dx = [state.x - icx for icx in cx] + dy = [state.y - icy for icy in cy] + + d = [idx ** 2 + idy ** 2 for (idx, idy) in zip(dx, dy)] + + mind = min(d) + + ind = d.index(mind) + + mind = math.sqrt(mind) + + dxl = cx[ind] - state.x + dyl = cy[ind] - state.y + + # angle = pi_2_pi(cyaw[ind] - math.atan2(dyl, dxl)) + angle = smallest_diff(cyaw[ind], math.atan2(dyl, dxl)) + if angle < 0: + mind *= -1 + + if debug: + print("calc_nearest_index", ind, cyaw[ind]) + + return ind, mind + +def calc_nearest_index2(state, cx, cy, cyaw, a, b, debug=False): + ind = min(int(b // a), len(cx) - 1) + + mind = math.sqrt( + (state.x - cx[ind]) ** 2 + (state.y - cy[ind]) ** 2) + print("ind", ind) + print("mind", mind) + + dxl = cx[ind] - state.x + dyl = cy[ind] - state.y + + angle = pi_2_pi(cyaw[ind] - math.atan2(dyl, dxl)) + if angle < 0: + mind *= -1 + + return ind, mind + +def interpolate(arr, factor): + res = [] + i = 0 + while i < len(arr) - 1: + res.extend( + np.linspace(arr[i], arr[i+1], factor, endpoint=False) + ) + i += 1 + res.append(arr[-1]) + return res + +def do_repeat(arr, factor): + res = [] + i = 0 + while i < len(arr) - 1: + res.extend( + np.repeat(arr[i+1], factor) + ) + i += 1 + res.append(arr[-1]) + return res + +def best_rotation_candidate(a, b): + offsets = [ + 0.0, + 2*np.pi, + -2*np.pi + ] + + b_candidates = [ + b, + b + 2*np.pi, + b - 2*np.pi + ] + + candidates = np.array([ + a - b, + a - (b + 2*np.pi), + a - (b - 2*np.pi) + ]) + candidates2 = np.abs(candidates) + + return b_candidates[np.argmin(candidates2)], np.min(candidates2), offsets[np.argmin(candidates2)] + +def rotation_smooth(rads, debug=False): + if len(rads) == 0: + return rads + + modulated = [rads[0]] + i = 1 + offset_stack = 0.0 + + while i < len(rads): + if debug: + print("offset_stack", offset_stack) + print("modulated[-1]", modulated[-1]) + + cand = rads[i] + offset_stack + smoothed_rad, dist, offset = best_rotation_candidate( + modulated[i-1], cand) + modulated.append(smoothed_rad) + + if debug: + print("rads[i]", rads[i], cand) + print("offset", offset) + print("smoothed_rad", smoothed_rad) + print("") + + ''' + dist2 = modulo_rad(rads[i] - modulated[i-1]) + + # print( + # "HEY!!!! %.3f vs %.3f = %.3f, %.3f @ %.3f" % ( + # modulated[i-1], rads[i], + # dist, dist2, + # smoothed_rad + # ) + # ) + + if np.abs(dist2 - np.pi / 2) < np.abs(dist2 - np.pi): + # print("genuine turn") + # modulated.append(rads[i]) + modulated.append(smoothed_rad) + else: + # print("wraparound") + modulated.append(smoothed_rad) + ''' + + i += 1 + + offset_stack += offset + + return modulated + +def find_max_index_less_than(arr, num): + """ + Finds the index of the maximum value in a NumPy array that is less than a specified number. + + Args: + arr: The NumPy array to search. + num: The upper bound for the values to consider. + + Returns: + The index of the maximum value less than num, or None if no such value exists. + """ + valid_indices = np.where(arr < num)[0] + + if valid_indices.size == 0: + return None + + max_index_within_limit = valid_indices[np.argmax(arr[valid_indices])] + return max_index_within_limit + +def calc_nearest_index3(state, cx, cy, cyaw, a, b, debug=False): + ind = find_max_index_less_than(a, b) + if ind is None: + return 0, 0.0 + + if debug: + print("ind", ind) + + mind = math.sqrt( + (state.x - cx[ind]) ** 2 + (state.y - cy[ind]) ** 2) + + dxl = cx[ind] - state.x + dyl = cy[ind] - state.y + + angle = pi_2_pi(cyaw[ind] - math.atan2(dyl, dxl)) + + if debug: + print("closest", cx[ind], cy[ind]) + print("current", state.x, state.y) + print("angle to closest", math.atan2(dyl, dxl)) + print("goal yaw", cyaw[ind]) + print("angle", angle) + + # angle = smallest_diff(cyaw[ind], math.atan2(dyl, dxl)) + if angle < 0: + mind *= -1 + + return ind, mind + +def calc_nearest_index_hybrid13(state, cx, cy, cyaw, a, b, debug=False): + ind1, e1 = calc_nearest_index( + state, cx, cy, cyaw, a, b, debug=debug) + + ind2, e2 = calc_nearest_index3( + state, cx, cy, cyaw, a, b, debug=debug) + + # import ipdb; ipdb.set_trace() + + # ind1 might be AHEAD of ind2 + # because the closest item is farther from start + # than the travel + if (ind1 > ind2): + if debug: + print("ahead (%d, %.3f) vs (%d, %.3f), state=(%.3f, %.3f)" % (ind1, e1, ind2, e2, state.x, state.y)) + return ind1, e1 + else: + if debug: + print("behind (%d, %.3f) vs (%d, %.3f), state=(%.3f, %.3f)" % (ind1, e1, ind2, e2, state.x, state.y)) + + return ind2, e2 + +def lqr_speed_steering_control( + state, + pe, pth_e, dt, + cx, cy, cyaw, + # ck, + # sp, + Q, R, L, tv, totalt, + distance_traveled, cumsums, debug=True): + if debug: + print("t: %.3f" % (totalt)) + + ind, e = calc_nearest_index_hybrid13( + state, cx, cy, cyaw, cumsums, distance_traveled, debug=debug) + + best_dist_estimate = cumsums[ind] + + v = state.v + # print("state.v", v) + # th_e = pi_2_pi(state.yaw - cyaw[ind]) + + expected_yaw = cyaw[ind] + + # th_e = smallest_diff( + # state.yaw, + # expected_yaw) + th_e = modulo_rad(state.yaw - expected_yaw) + + if debug: + print("state.yaw", state.yaw) + print("expected_yaw", expected_yaw) + print("th_e", th_e) + + # th_e = (state.yaw - cyaw[ind]) % 2*np.pi + # th_e = (th_e + 2*np.pi) % 2*np.pi + + # th_e = modulo_rad(state.yaw - cyaw[ind]) + + # A = [1.0, dt, 0.0, 0.0, 0.0 + # 0.0, 0.0, v, 0.0, 0.0] + # 0.0, 0.0, 1.0, dt, 0.0] + # 0.0, 0.0, 0.0, 0.0, 0.0] + # 0.0, 0.0, 0.0, 0.0, 1.0] + A = np.zeros((5, 5)) + A[0, 0] = 1.0 + A[0, 1] = dt + A[1, 2] = v + A[2, 2] = 1.0 + A[2, 3] = dt + A[4, 4] = 1.0 + + # B = [0.0, 0.0 + # 0.0, 0.0 + # 0.0, 0.0 + # v/L, 0.0 + # 0.0, dt] + B = np.zeros((5, 2)) + B[3, 0] = v / L + B[4, 1] = dt + + # print("Q", Q) + + K, _, _ = dlqr(A, B, Q, R) + + # state vector + # x = [e, dot_e, th_e, dot_th_e, delta_v] + # e: lateral distance to the path + # dot_e: derivative of e + # th_e: angle difference to the path + # dot_th_e: derivative of th_e + # delta_v: difference between current speed and target speed + x = np.zeros((5, 1)) + x[0, 0] = e + + # x[1, 0] = (e - pe) / dt + ''' + when the calc_nearest_index3 sign changes, this can blow up + it is reasonable we only care to control the MAGNITUDE of this + the signage maybe does not matter (?) + x[1, 0] = np.abs(e - pe) / dt + + i.e. + ind 297 + closest 0.3305452202422401 -2.08874005450175 + current 0.7554259032823595 -1.945941151618235 + angle to closest -2.817361559602424 + goal yaw -2.8094245446288317 + angle 0.007937014973592227 + x [[ 0.448] + [ 17.859] + [ 0.454] + [-0.171] + [-0.000]] + controller: 21.85: ((-2.355, -2.809) (0.448, -0.445) (0.454, 0.463 = -0.171) -2.809, -2.812 -> 0.050 + delta 0.018, -1.601 -> -1.583 + yaw update -2.355, (-1.583, 2.239) -> -0.116 + state.v 0.19977933240264226 + ''' + # x[1, 0] = np.abs(e - pe) / dt + if debug: + print("e: %.3f, pe %.3f => %.3f" % (e, pe, x[1, 0])) + if (np.sign(e) != np.sign(pe)): + if debug: + print_color_str("SIGN CHANGE!", bcolors.WARNING) + normalized_pe = np.sign(e) * np.abs(pe) + x[1, 0] = np.abs(e - normalized_pe) / dt + + x[2, 0] = th_e + dot_th_e = modulo_rad(th_e - pth_e) / dt + x[3, 0] = dot_th_e + + # print("v!", v) + # print("tv!") + x[4, 0] = v - tv + + # print("K", K) + # print("x", x) + + # input vector + # u = [delta, accel] + # delta: steering angle + # accel: acceleration + ustar = -K @ x + + # calc steering input + # k = ck[ind] + + k = 0.0 + if state.last_yaw is not None: + k = smallest_diff(cyaw[ind], state.last_yaw) / dt + + if debug: + print("x {}".format(x)) + + print("controller: %.2f: ((%.3f, %.3f) (%.3f, %.3f) (%.3f, %.3f = %.3f) %.3f, %.3f -> %.3f" % ( + totalt, + state.yaw, expected_yaw, + e, pe, + th_e, pth_e, dot_th_e, + cyaw[ind], state.last_yaw, k)) + state.last_yaw = cyaw[ind] + + ff = math.atan2(L * k, 1) # feedforward steering angle + # ff = 0 + fb = modulo_rad(ustar[0, 0]) # feedback steering angle + delta = modulo_rad(ff + fb) + + if np.abs(fb) > np.pi / 4: + if debug: + print_color_str("delta %.3f, %.3f -> %.3f" % ( + ff, + fb, + delta), bcolors.BG_LCYAN) + else: + if debug: + print("delta %.3f, %.3f -> %.3f" % ( + ff, + fb, + delta)) + + # calc accel input + accel = ustar[1, 0] + + # print("ustar", ustar) + + return delta, ind, e, th_e, accel, expected_yaw, fb, best_dist_estimate, ind + +# LQR parameters + +# state vector +# x = [e, dot_e, th_e, dot_th_e, delta_v] +# e: lateral distance to the path +# dot_e: derivative of e +# th_e: angle difference to the path +# dot_th_e: derivative of th_e +# delta_v: difference between current speed and target speed + +lqr_Q = np.eye(5) +# lqr_Q[0, 0] = 0.5 +# lqr_Q[0, 0] = 1.0 +lqr_Q[1, 1] = 0.0 +# lqr_Q[2, 2] = 0.5 +lqr_Q[3, 3] = 0.1 +lqr_Q[4, 4] = 0.1 # speed de-prioritized vs position + +# lqr_Q[2, 2] = 5e-3 + +lqr_R = np.eye(2) +# lqr_R[0, 0] = 1.0 +# lqr_R[1, 1] = 0.5 +dt = 0.05 # time tick[s] +L = 0.36 # Wheel base of the vehicle [m] +max_steer = np.deg2rad(180.0) # maximum steering angle[rad] diff --git a/PathTracking/lqr_speed_steer_control/lqr_speed_steer_control.py b/PathTracking/lqr_speed_steer_control/lqr_speed_steer_control.py deleted file mode 100644 index d85fa98a84..0000000000 --- a/PathTracking/lqr_speed_steer_control/lqr_speed_steer_control.py +++ /dev/null @@ -1,321 +0,0 @@ -""" - -Path tracking simulation with LQR speed and steering control - -author Atsushi Sakai (@Atsushi_twi) - -""" -import math -import sys -import matplotlib.pyplot as plt -import numpy as np -import scipy.linalg as la -import pathlib -from utils.angle import angle_mod -sys.path.append(str(pathlib.Path(__file__).parent.parent.parent)) - -from PathPlanning.CubicSpline import cubic_spline_planner - -# === Parameters ===== - -# LQR parameter -lqr_Q = np.eye(5) -lqr_R = np.eye(2) -dt = 0.1 # time tick[s] -L = 0.5 # Wheel base of the vehicle [m] -max_steer = np.deg2rad(45.0) # maximum steering angle[rad] - -show_animation = True - - -class State: - - def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0): - self.x = x - self.y = y - self.yaw = yaw - self.v = v - - -def update(state, a, delta): - - if delta >= max_steer: - delta = max_steer - if delta <= - max_steer: - delta = - max_steer - - state.x = state.x + state.v * math.cos(state.yaw) * dt - state.y = state.y + state.v * math.sin(state.yaw) * dt - state.yaw = state.yaw + state.v / L * math.tan(delta) * dt - state.v = state.v + a * dt - - return state - - -def pi_2_pi(angle): - return angle_mod(angle) - - -def solve_dare(A, B, Q, R): - """ - solve a discrete time_Algebraic Riccati equation (DARE) - """ - x = Q - x_next = Q - max_iter = 150 - eps = 0.01 - - for i in range(max_iter): - x_next = A.T @ x @ A - A.T @ x @ B @ \ - la.inv(R + B.T @ x @ B) @ B.T @ x @ A + Q - if (abs(x_next - x)).max() < eps: - break - x = x_next - - return x_next - - -def dlqr(A, B, Q, R): - """Solve the discrete time lqr controller. - x[k+1] = A x[k] + B u[k] - cost = sum x[k].T*Q*x[k] + u[k].T*R*u[k] - # ref Bertsekas, p.151 - """ - - # first, try to solve the ricatti equation - X = solve_dare(A, B, Q, R) - - # compute the LQR gain - K = la.inv(B.T @ X @ B + R) @ (B.T @ X @ A) - - eig_result = la.eig(A - B @ K) - - return K, X, eig_result[0] - - -def lqr_speed_steering_control(state, cx, cy, cyaw, ck, pe, pth_e, sp, Q, R): - ind, e = calc_nearest_index(state, cx, cy, cyaw) - - tv = sp[ind] - - k = ck[ind] - v = state.v - th_e = pi_2_pi(state.yaw - cyaw[ind]) - - # A = [1.0, dt, 0.0, 0.0, 0.0 - # 0.0, 0.0, v, 0.0, 0.0] - # 0.0, 0.0, 1.0, dt, 0.0] - # 0.0, 0.0, 0.0, 0.0, 0.0] - # 0.0, 0.0, 0.0, 0.0, 1.0] - A = np.zeros((5, 5)) - A[0, 0] = 1.0 - A[0, 1] = dt - A[1, 2] = v - A[2, 2] = 1.0 - A[2, 3] = dt - A[4, 4] = 1.0 - - # B = [0.0, 0.0 - # 0.0, 0.0 - # 0.0, 0.0 - # v/L, 0.0 - # 0.0, dt] - B = np.zeros((5, 2)) - B[3, 0] = v / L - B[4, 1] = dt - - K, _, _ = dlqr(A, B, Q, R) - - # state vector - # x = [e, dot_e, th_e, dot_th_e, delta_v] - # e: lateral distance to the path - # dot_e: derivative of e - # th_e: angle difference to the path - # dot_th_e: derivative of th_e - # delta_v: difference between current speed and target speed - x = np.zeros((5, 1)) - x[0, 0] = e - x[1, 0] = (e - pe) / dt - x[2, 0] = th_e - x[3, 0] = (th_e - pth_e) / dt - x[4, 0] = v - tv - - # input vector - # u = [delta, accel] - # delta: steering angle - # accel: acceleration - ustar = -K @ x - - # calc steering input - ff = math.atan2(L * k, 1) # feedforward steering angle - fb = pi_2_pi(ustar[0, 0]) # feedback steering angle - delta = ff + fb - - # calc accel input - accel = ustar[1, 0] - - return delta, ind, e, th_e, accel - - -def calc_nearest_index(state, cx, cy, cyaw): - dx = [state.x - icx for icx in cx] - dy = [state.y - icy for icy in cy] - - d = [idx ** 2 + idy ** 2 for (idx, idy) in zip(dx, dy)] - - mind = min(d) - - ind = d.index(mind) - - mind = math.sqrt(mind) - - dxl = cx[ind] - state.x - dyl = cy[ind] - state.y - - angle = pi_2_pi(cyaw[ind] - math.atan2(dyl, dxl)) - if angle < 0: - mind *= -1 - - return ind, mind - - -def do_simulation(cx, cy, cyaw, ck, speed_profile, goal): - T = 500.0 # max simulation time - goal_dis = 0.3 - stop_speed = 0.05 - - state = State(x=-0.0, y=-0.0, yaw=0.0, v=0.0) - - time = 0.0 - x = [state.x] - y = [state.y] - yaw = [state.yaw] - v = [state.v] - t = [0.0] - - e, e_th = 0.0, 0.0 - - while T >= time: - dl, target_ind, e, e_th, ai = lqr_speed_steering_control( - state, cx, cy, cyaw, ck, e, e_th, speed_profile, lqr_Q, lqr_R) - - state = update(state, ai, dl) - - if abs(state.v) <= stop_speed: - target_ind += 1 - - time = time + dt - - # check goal - dx = state.x - goal[0] - dy = state.y - goal[1] - if math.hypot(dx, dy) <= goal_dis: - print("Goal") - break - - x.append(state.x) - y.append(state.y) - yaw.append(state.yaw) - v.append(state.v) - t.append(time) - - if target_ind % 1 == 0 and show_animation: - plt.cla() - # for stopping simulation with the esc key. - plt.gcf().canvas.mpl_connect( - 'key_release_event', - lambda event: [exit(0) if event.key == 'escape' else None]) - plt.plot(cx, cy, "-r", label="course") - plt.plot(x, y, "ob", label="trajectory") - plt.plot(cx[target_ind], cy[target_ind], "xg", label="target") - plt.axis("equal") - plt.grid(True) - plt.title("speed[km/h]:" + str(round(state.v * 3.6, 2)) - + ",target index:" + str(target_ind)) - plt.pause(0.0001) - - return t, x, y, yaw, v - - -def calc_speed_profile(cyaw, target_speed): - speed_profile = [target_speed] * len(cyaw) - - direction = 1.0 - - # Set stop point - for i in range(len(cyaw) - 1): - dyaw = abs(cyaw[i + 1] - cyaw[i]) - switch = math.pi / 4.0 <= dyaw < math.pi / 2.0 - - if switch: - direction *= -1 - - if direction != 1.0: - speed_profile[i] = - target_speed - else: - speed_profile[i] = target_speed - - if switch: - speed_profile[i] = 0.0 - - # speed down - for i in range(40): - speed_profile[-i] = target_speed / (50 - i) - if speed_profile[-i] <= 1.0 / 3.6: - speed_profile[-i] = 1.0 / 3.6 - - return speed_profile - - -def main(): - print("LQR steering control tracking start!!") - ax = [0.0, 6.0, 12.5, 10.0, 17.5, 20.0, 25.0] - ay = [0.0, -3.0, -5.0, 6.5, 3.0, 0.0, 0.0] - goal = [ax[-1], ay[-1]] - - cx, cy, cyaw, ck, s = cubic_spline_planner.calc_spline_course( - ax, ay, ds=0.1) - target_speed = 10.0 / 3.6 # simulation parameter km/h -> m/s - - sp = calc_speed_profile(cyaw, target_speed) - - t, x, y, yaw, v = do_simulation(cx, cy, cyaw, ck, sp, goal) - - if show_animation: # pragma: no cover - plt.close() - plt.subplots(1) - plt.plot(ax, ay, "xb", label="waypoints") - plt.plot(cx, cy, "-r", label="target course") - plt.plot(x, y, "-g", label="tracking") - plt.grid(True) - plt.axis("equal") - plt.xlabel("x[m]") - plt.ylabel("y[m]") - plt.legend() - plt.subplots(1) - - plt.plot(t, np.array(v)*3.6, label="speed") - plt.grid(True) - plt.xlabel("Time [sec]") - plt.ylabel("Speed [m/s]") - plt.legend() - - plt.subplots(1) - plt.plot(s, [np.rad2deg(iyaw) for iyaw in cyaw], "-r", label="yaw") - plt.grid(True) - plt.legend() - plt.xlabel("line length[m]") - plt.ylabel("yaw angle[deg]") - - plt.subplots(1) - plt.plot(s, ck, "-r", label="curvature") - plt.grid(True) - plt.legend() - plt.xlabel("line length[m]") - plt.ylabel("curvature [1/m]") - - plt.show() - - -if __name__ == '__main__': - main() diff --git a/PathTracking/lqr_speed_steer_control/sim.py b/PathTracking/lqr_speed_steer_control/sim.py new file mode 100755 index 0000000000..8ecb69168d --- /dev/null +++ b/PathTracking/lqr_speed_steer_control/sim.py @@ -0,0 +1,756 @@ +#! /usr/bin/env python3 + +""" + +Path tracking simulation with LQR speed and steering control + +author Atsushi Sakai (@Atsushi_twi) + +./sim.py /home/charlieyan1/Dev/jim/pymap2d/sdf_path_0000210.npy +./sim.py /home/charlieyan1/Dev/jim/pymap2d/sdf_path_0000280.npy + +""" +from lqr import * + +import matplotlib.pyplot as plt + +# from PathPlanning.CubicSpline import cubic_spline_planner + +import argparse + +# from scipy.spatial.transform import Rotation as R +# from scipy.spatial.transform import Slerp + +# from spliner import * + +def two_d_make_x_y_theta_hom(x, y, theta): + hom = np.eye(3) + + theta = theta % (2 * np.pi) + # 2019-08-02 parentheses!!! + + hom[0, 0] = np.cos(theta) + hom[0, 1] = -np.sin(theta) + hom[1, 0] = np.sin(theta) + hom[1, 1] = np.cos(theta) + + hom[0, 2] = x + hom[1, 2] = y + return hom + +def plot_line(ax, a, b, mode, color, linewidth, alpha=0.2): + if mode == 3: + ax.plot( + [a[0], b[0]], + [a[1], b[1]], + [a[2], b[2]], + color=color, + linewidth=linewidth, + alpha=alpha) + elif mode == 2: + ax.plot( + [a[0], b[0]], + [a[1], b[1]], + color=color, + linewidth=linewidth, + alpha=alpha) + +def plot_gnomon(ax, g, mode=3, length=0.1, linewidth=5, c=None, offset = 0.0): + ''' + mode is dimension of 'canvas' + ''' + if (mode == 3): + o = g.dot(np.array([offset, offset, 0.0, 1.0])) + elif (mode == 2): + o = g.dot(np.array([offset, offset, 1.0])) + + if (mode == 3): + x = g.dot(np.array([length*1 + offset, offset, 0.0, 1.0])) + if c is not None: + plot_line(ax, o, x, mode, c, linewidth) + else: + plot_line(ax, o, x, mode, 'r', linewidth) + elif (mode == 2): + x = g.dot(np.array([length*1 + offset, offset, 1.0])) + if c is not None: + plot_line(ax, o, x, mode, c, linewidth) + else: + plot_line(ax, o, x, mode, 'r', linewidth) + + if (mode == 3): + y = g.dot(np.array([offset, length*2 + offset, 0.0, 1.0])) + if c is not None: + plot_line(ax, o, y, mode, c, linewidth) + else: + plot_line(ax, o, y, mode, 'g', linewidth) + elif (mode == 2): + y = g.dot(np.array([offset, length*2 + offset, 1.0])) + if c is not None: + plot_line(ax, o, y, mode, c, linewidth) + else: + plot_line(ax, o, y, mode, 'g', linewidth) + + if (mode == 3): + z = g.dot(np.array([offset, 0.0, length*3 + offset, 1.0])) + if c is not None: + plot_line(ax, o, z, mode, c, linewidth) + else: + plot_line(ax, o, z, mode, 'b', linewidth) + +def update(L, state, a, delta): + if delta >= max_steer: + delta = max_steer + if delta <= - max_steer: + delta = - max_steer + + state.x = state.x + state.v * math.cos(state.yaw) * dt + state.y = state.y + state.v * math.sin(state.yaw) * dt + state.yaw = state.yaw + state.v / L * math.tan(delta) * dt + state.v = state.v + a * dt + + return state + +def do_simulation( + state, + cx, cy, cyaw, + # ck, speed_profile, + goal, + T, + tv, + args, + full_dist): + goal_dis = 0.3 + stop_speed = 0.05 + + x = [state.x] + y = [state.y] + yaw = [state.yaw] + v = [state.v] + v_msg = [0.0] + w_msg = [0.0] + ais = [0.0] + t = [0.0] + travel = [0.0] + + debug1 = [0.0] + debug2 = [0.0] + debug3 = [0.0] + debug4 = [0.0] + debug5 = [0.0] + debug6 = [0.0] + + distances = [] # len(cx) - 1 + cumsums = [0.0] # len(cx) + i = 0 + pt = np.array([cx[i], cy[i]]) + while i < len(cx) - 1: + new_pt = np.array([cx[i+1], cy[i+1]]) + dist = np.linalg.norm(new_pt - pt, ord=2) + + distances.append(dist) + cumsums.append(cumsums[-1] + dist) + + i += 1 + pt = new_pt + cumsums = np.array(cumsums) + + time = 0.0 + distance_traveled = 0.0 + best_dist_estimate = 0.0 + best_idx = 0 + + if args.dist <= 0.0: + terminate_dist = cumsums[-2] + # not -1, because at -1, the nearest_idx might be the path start + else: + # T = min(T, full_T) + terminate_dist = args.dist + + print("terminate_dist", terminate_dist) + + # import ipdb; ipdb.set_trace() + + total_e = 0.0 + ticks = 0 + + while terminate_dist > best_dist_estimate: + + # while best_idx != len(cyaw) - 2: + # tv = min(0.2, time * 1) # ramp up to tv + + dl, target_ind, state.e, state.e_th, ai, expected_yaw, fb, best_dist_estimate, best_idx =\ + lqr_speed_steering_control( + state, state.e, state.e_th, + dt, + cx, cy, cyaw, + # ck, + # speed_profile, + lqr_Q, lqr_R, L, tv, time, + distance_traveled, cumsums, debug=args.debug) + + # print("best_dist_estimate", + # best_dist_estimate, + # best_idx, + # len(cyaw) - 1, + # terminate_dist, + # full_dist) + + distance_traveled += state.v * dt + + dyaw = state.v / L * math.tan(dl) * dt + + old_yaw = state.yaw + + # print("ai", ai) + + state = update(L, state, ai, dl) + + # print("after state.v", state.v) + + if args.debug: + if np.abs(dyaw) > np.pi / 4: + print_color_str("yaw update %.3f, (%.3f, %.3f) -> %.3f" % ( + old_yaw, dl, dyaw, state.yaw), bcolors.BG_RED) + else: + print("yaw update %.3f, (%.3f, %.3f) -> %.3f" % ( + old_yaw, dl, dyaw, state.yaw)) + + if abs(state.v) <= stop_speed: + target_ind += 1 + + time = time + dt + + # check goal + dx = state.x - goal[0] + dy = state.y - goal[1] + # if math.hypot(dx, dy) <= goal_dis: + # print("Goal") + # break + + v_msg_i = v_msg[-1] + ai * dt + w_msg_i = dyaw + + v_msg.append(v_msg_i) + w_msg.append(w_msg_i) + ais.append(ai) + x.append(state.x) + y.append(state.y) + yaw.append(state.yaw) + v.append(state.v) + t.append(time) + travel.append(distance_traveled) + + debug1.append(state.e_th) + debug2.append(expected_yaw) + debug3.append(fb) + debug4.append(dyaw) + debug5.append(dl) + debug6.append(state.e) + + if args.debug: + print("state.v", state.v) + + # if target_ind % 1 == 0 and show_animation: + # plt.cla() + # # for stopping simulation with the esc key. + # plt.gcf().canvas.mpl_connect( + # 'key_release_event', + # lambda event: [exit(0) if event.key == 'escape' else None]) + # plt.plot(cx, cy, "-r", label="course") + # plt.plot(x, y, "ob", label="trajectory") + # plt.plot(cx[target_ind], cy[target_ind], "xg", label="target") + # plt.axis("equal") + # plt.grid(True) + # plt.title("speed[km/h]:" + str(round(state.v * 3.6, 2)) + # + ",target index:" + str(target_ind)) + # plt.pause(0.0001) + + total_e += np.abs(state.e) + ticks += 1 + + if args.debug: + print("") + + # import ipdb; ipdb.set_trace() + + if args.debug: + print("total_e", total_e) + print("ticks", ticks) + rmse = np.sqrt(total_e / ticks) + + return t, x, y, yaw, v, v_msg, w_msg, ais, travel, debug1, debug2, debug3, debug4, debug5, debug6, rmse, terminate_dist + + +def calc_speed_profile(cyaw, target_speed): + speed_profile = [target_speed] * len(cyaw) + + direction = 1.0 + + # Set stop point + for i in range(len(cyaw) - 1): + dyaw = abs(cyaw[i + 1] - cyaw[i]) + switch = math.pi / 4.0 <= dyaw < math.pi / 2.0 + + if switch: + direction *= -1 + + if direction != 1.0: + speed_profile[i] = - target_speed + else: + speed_profile[i] = target_speed + + if switch: + speed_profile[i] = 0.0 + + # speed down + # for i in range(40): + # speed_profile[-i] = target_speed / (50 - i) + # if speed_profile[-i] <= 1.0 / 3.6: + # speed_profile[-i] = 1.0 / 3.6 + + return speed_profile + +def two_d_rvec_vec_from_matrix_2d(m): + x = m[0, 2] + y = m[1, 2] + + atan2 = np.arctan2(m[1, 0], m[0, 0]) + # SUPER #COOL #IMPORTANT #port + # this is how we get the original theta 2019-08-02 + # https://stackoverflow.com/a/32549077 + # deal with quadrant logic + + ''' + theta_0 = math.acos(m[0, 0]) + theta_1 = math.asin(m[1, 0]) + theta_2 = math.asin(-1.0 * m[0, 1]) + theta_3 = math.acos(m[1, 1]) + # TODO(someone) finish this up, + # do not always all match, sometimes negative + thetas = [theta_0, theta_1, theta_2, theta_3] + mode_thetas = Util.modes(thetas) + if len(mode_thetas) > 0: + # print("more than one theta found", mode_thetas) + pass + ''' + + return [x, y, atan2], None + +def slerp(p0, p1, t): + omega = np.arccos(np.dot(p0/np.linalg.norm(p0), p1/np.linalg.norm(p1))) + so = np.sin(omega) + return np.sin((1.0-t)*omega) / so * p0 + np.sin(t*omega)/so * p1 + +def slerp3(arr, xys, factor): + res = [] + i = 0 + while i < len(arr) - 1: + diff = arr[i+1] - arr[i] + + if (np.abs(diff) < 1e-8): + res.extend( + np.linspace(arr[i], arr[i+1], factor, endpoint=False) + ) + i += 1 + continue + + # import ipdb; ipdb.set_trace() + distance = np.linalg.norm(xys[i+1] - xys[i], ord=2) + + interp = [] + + min_t = np.abs(diff) / 0.5 + min_d = min_t * 0.2 # target_v + + # print("min_d", min_d) + + # # sinusoidal interpolation + # for j in range(factor): + # k = j / float(factor) # will never hit 1.0 + # a = 1-np.cos(np.pi*k) + # v = arr[i] + diff * a + # interp.append(v) + + distance_traveled = 0.0 + delta = np.abs(distance) / factor + # print("DELTA", delta) + # print("min_d", min_d) + for j in range(factor): + distance_traveled += delta + + if (distance_traveled + min_d >= distance): + alpha = (distance_traveled - (distance - min_d)) / min_d + # print("ALPHA", alpha) + interp.append(arr[i] + alpha * diff) + else: + interp.append(arr[i]) + + + # if j < factor * 0.9: + # interp.append(arr[i]) + + # else: + # v = arr[i+1] + # interp.append(v) + + res.extend( + interp + # np.linspace(arr[i], arr[i+1], factor, endpoint=False) + ) + i += 1 + + res.append(arr[-1]) + return res + +def main(): + parser = argparse.ArgumentParser( + description='') + parser.add_argument('file', + type=str, help='file', default="/home/charlieyan1/Dev/jim/pymap2d/sdf_path_0000220.npy") + parser.add_argument('--dist', + type=float, default=-1.0) + parser.add_argument('--headless', + help="headless, default=False", + action='https://github.com/store_true') + parser.add_argument('--debug', + help="headless, default=False", + action='https://github.com/store_true') + args = parser.parse_args() + + # print("arg.headless", args.headless) + + payload = np.load( + # '/home/charlieyan1/Dev/jim/pymap2d/sdf_path_0000001.npy', + # '/home/charlieyan1/Dev/jim/pymap2d/sdf_path_0000055.npy', + # '/home/charlieyan1/Dev/jim/pymap2d/sdf_path_0000240.npy', + args.file, + allow_pickle=True).tolist() + all_homs = payload['path'] + xythetas = np.array( + [two_d_rvec_vec_from_matrix_2d(x)[0] for x in all_homs]) + + ax = xythetas[:, 0] + ay = xythetas[:, 1] + + # this is #important, the key is to set the yaw + # for a wpt to align with the next path segment + # ayaw = xythetas[:, 2] + # this, in combination with slerp3 + ayaw = list(xythetas[1:, 2]) + ayaw.append(xythetas[-1, 2]) # note: maintain alignment on last waypoint + + # import ipdb; ipdb.set_trace() + + r2 = 0.5 + full_dist = r2 * max(1, len(ax) - 1) + + ########################## + + # square = 2.0 + # full_dist = square * 4 + + ''' + # CW + ax = np.array([ + ax[0], + ax[0], + ax[0] + square, + ax[0] + square, + ax[0] + ]) + ay = np.array([ + ay[0], + ay[0] + square, + ay[0] + square, + ay[0], + ay[0] + ]) + + ayaw = np.array([ + np.pi / 2, + 0.0, + -np.pi / 2, + np.pi, + np.pi, # note: maintain alignment on last waypoint + ]) + ''' + + ''' + # CCW + ax = np.array([ + ax[0], + + ax[0] + square, + + ax[0] + square, + + ax[0], + + ax[0] + ]) + ay = np.array([ + ay[0], + + ay[0], + + ay[0] + square, + + ay[0] + square, + + ay[0] + ]) + + ayaw = np.array([ + 0.0, + np.pi / 2, + np.pi, + -np.pi / 2, + -np.pi / 2, # note: maintain alignment on last waypoint + ]) + ''' + + ########################## + + # cx = ax + # cy = ay + # cyaw = ayaw + + ########################## + + # bx, by, byaw, bk, s = cubic_spline_planner.calc_spline_course( + # ax, ay, ds=0.1) + # cx = bx + # cy = by + # cyaw = byaw + + ########################## + + # import ipdb; ipdb.set_trace() + + ########################## + + cx = interpolate(ax, 40) + cy = interpolate(ay, 40) + + ########################## + + # # INTERPOATION ROTAIONS SUCKS! + # cyaw = [(x + np.pi) % (2 * np.pi) - np.pi for x in ayaw] + # cyaw = rotation_smooth(cyaw) + # # this is key + # # interpolating between -3.14 and 3.14 through 0 is spatially wrong + # cyaw = interpolate(cyaw, 40) + + # INTERPOATION ROTAIONS SUCKS! + cyaw = [(x + np.pi) % (2 * np.pi) - np.pi for x in ayaw] + cyaw = rotation_smooth(cyaw, args.debug) + + # import ipdb; ipdb.set_trace() + + # this is key + # interpolating between -3.14 and 3.14 through 0 is spatially wrong + xys = [np.array([ax[i], ay[i]]) for i in range(len(ax))] + + cyaw = slerp3(cyaw, xys, 40) + # cyaw = do_repeat(cyaw, 40) + + if args.debug: + print("len(cx)", len(cx)) + print("len(cy)", len(cy)) + print("len(cyaw)", len(cyaw)) + print("################################################") + + ########################## + + # key_times = [x * 40* dt for x in range(len(ax))] + # key_rots = R.from_euler('z', ayaw, degrees=False) + # slerp = Slerp(key_times, key_rots) + # sample_times = np.linspace(0.0, max(key_times), len(cx)) + # interp_rots = slerp(sample_times) + # cyaw = interp_rots.as_euler('xyz', degrees=False)[:, -1] + # print(len(cyaw)) + + ########################## + + # cx, cy, cyaw, ck, s = cubic_spline_planner.calc_spline_course( + # ax, ay, ds=1.0) + + ########################## + + # velocities = [[0.0, 0.0]] + # i = 0 + # while i < len(ax) - 1: + # dx = ax[i+1] - ax[i] + # dy = ay[i+1] - ay[i] + # velocities.append([dx, dy]) + # i += 1 + # # velocities.append([0.0, 0.0]) + + # # import ipdb; ipdb.set_trace() + + # ctrl_pts = [CtrlPt({ + # 0: np.array([ax[i], ay[i], 0.0]), + # # 1: np.array([velocities[i][0], velocities[i][1], 0.0]) + # }) for i in range(len(ax))] + # spliner = Spliner() # create a new spliner on each call + # s_array, s_data = spliner.process(0, 8, ctrl_pts) + + # order = s_data['order'] + # num_dof = s_data['num_dof'] + # time, state = reformat_deriv_major( + # order, + # num_dof, + # s_array, + # s_data) + # pos_order = state[0] + # cx = pos_order[:, 0] + # cy = pos_order[:, 1] + + # plt.plot(ax, ay, "xb", label="waypoints") + # plt.plot(cx, cy, "-r", label="target course") + # # plt.plot(x, y, "-g", label="tracking") + # plt.scatter(ax, ay) + # plt.grid(True) + # plt.axis("equal") + # plt.xlabel("x[m]") + # plt.ylabel("y[m]") + # plt.legend() + # plt.show() + + # import ipdb; ipdb.set_trace() + + ########################## + + # print("LQR steering control tracking start!!") + # ax = [0.0, 6.0, 12.5, 10.0, 17.5, 20.0, 25.0] + # ay = [0.0, -3.0, -5.0, 6.5, 3.0, 0.0, 0.0] + goal = [cx[-1], cy[-1]] + + # target_speed = 10.0 / 3.6 # simulation parameter km/h -> m/s + # sp = calc_speed_profile(cyaw, target_speed) + + state = State( + x=cx[0], + y=cy[0], + yaw=cyaw[0], + v=0.0) + + tv = 0.2 + + t, x, y, yaw, v, v_msg, w_msg, ais, distance_traveled, debug1, debug2, debug3, debug4, debug5, debug6, rmse, terminate_dist = do_simulation( + state, + cx, cy, cyaw, + # ck, sp, + goal, + args.dist, + tv, + args, + full_dist) + + print("RMSE!!! %.3f" % (rmse)) + + if not args.headless: # pragma: no cover + plt.close() + + fig, a = plt.subplots(1) + plt.plot(ax, ay, "xb", label="waypoints") + plt.plot(cx, cy, "-r", label="target course") + plt.plot(x, y, "-g", label="tracking") + plt.scatter(ax, ay) + + for i in range(len(ax)): + hom = two_d_make_x_y_theta_hom(ax[i], ay[i], ayaw[i]) + plot_gnomon(a, hom, mode=2, length=0.1, linewidth=2, c='b') + + for i in range(len(cx)): + # print("xyyaw", cx[i], cy[i], cyaw[i]) + hom = two_d_make_x_y_theta_hom(cx[i], cy[i], cyaw[i]) + plot_gnomon(a, hom, mode=2, length=0.05, linewidth=1) + + plt.grid(True) + plt.axis("equal") + plt.xlabel("x[m]") + plt.ylabel("y[m]") + plt.legend() + + plt.title("%s: RMSE: %.3f, dist: %.3f" % ( + args.file, + rmse, + terminate_dist)) + + fig, axs = plt.subplots(2, sharex=True) + + axs[0].plot(t, v, label="speed") + axs[0].plot(t, v_msg, c='r', label="v_msg") + axs[0].plot(t, w_msg, c='g', label="w_msg") + axs[0].plot(t, ais, c='b', label="ais") + # axs[0].plot(t, debug1, c='m', label="debug1") + # axs[0].plot(t, yaw, c='k', label="yaw") + + # plt.plot(t, distance_traveled, c='b', label="distance_traveled") + axs[0].grid(True) + # axs[0].xlabel("Time [sec]") + # axs[0].ylabel("Speed [m/s]") + axs[0].legend() + + axs[1].plot( + # [dt * i for i in range(len(cyaw))], + # # [np.rad2deg(iyaw) for iyaw in cyaw], + # cyaw, + t, + debug2, + "-r", + label="expected yaw") + axs[1].plot( + # list(range(len(cyaw))), + # yaw[:len(cyaw)], + t, + yaw, + c='k', + label="yaw") + axs[1].plot( + # list(range(len(cyaw))), + # yaw[:len(cyaw)], + t, + debug1, + c='b', + label="th_e") + axs[1].plot( + # list(range(len(cyaw))), + # yaw[:len(cyaw)], + t, + debug3, + c='g', + label="fb") + axs[1].plot( + # list(range(len(cyaw))), + # yaw[:len(cyaw)], + t, + debug4, + c='c', + label="dyaw") + axs[1].plot( + # list(range(len(cyaw))), + # yaw[:len(cyaw)], + t, + debug5, + c='y', + label="dl") + axs[1].plot( + # list(range(len(cyaw))), + # yaw[:len(cyaw)], + t, + debug6, + '*c', + label="state.e") + + # plt.plot(list(range(len(cyaw))), debug1, "-r", label="debug1") + axs[1].grid(True) + axs[1].legend() + # axs[1].xlabel("line length[m]") + # axs[1].ylabel("yaw angle[deg]") + + plt.show() + +if __name__ == '__main__': + main()