diff --git a/.github/workflows/Linux_CI.yml b/.github/workflows/Linux_CI.yml index 907d36452d..152a3b0682 100644 --- a/.github/workflows/Linux_CI.yml +++ b/.github/workflows/Linux_CI.yml @@ -17,11 +17,11 @@ jobs: name: Python ${{ matrix.python-version }} CI steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - run: git fetch --prune --unshallow - name: Setup python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.github/workflows/MacOS_CI.yml b/.github/workflows/MacOS_CI.yml index 03db65f43d..ab04dc01dc 100644 --- a/.github/workflows/MacOS_CI.yml +++ b/.github/workflows/MacOS_CI.yml @@ -19,14 +19,14 @@ jobs: python-version: [ '3.13' ] name: Python ${{ matrix.python-version }} CI steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - run: git fetch --prune --unshallow - name: Update bash run: brew install bash - name: Setup python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} diff --git a/.github/workflows/Windows_CI.yml b/.github/workflows/Windows_CI.yml index 5cb19699b2..a4385e595b 100644 --- a/.github/workflows/Windows_CI.yml +++ b/.github/workflows/Windows_CI.yml @@ -19,11 +19,11 @@ jobs: python-version: [ '3.13' ] name: Python ${{ matrix.python-version }} CI steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - run: git fetch --prune --unshallow - name: Setup python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} diff --git a/.github/workflows/circleci-artifacts-redirector.yml b/.github/workflows/circleci-artifacts-redirector.yml index 692942cddd..0e64bab96c 100644 --- a/.github/workflows/circleci-artifacts-redirector.yml +++ b/.github/workflows/circleci-artifacts-redirector.yml @@ -6,7 +6,7 @@ jobs: name: Run CircleCI artifacts redirector!! steps: - name: run-circleci-artifacts-redirector - uses: larsoner/circleci-artifacts-redirector-action@v1.1.0 + uses: larsoner/circleci-artifacts-redirector-action@v1.3.1 with: repo-token: ${{ secrets.GITHUB_TOKEN }} api-token: ${{ secrets.CIRCLECI_TOKEN }} diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 878a4a4435..dcbcdbaa88 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -16,7 +16,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v5 with: # We must fetch at least the immediate parents so that if this is # a pull request then we can checkout the head. @@ -24,7 +24,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v3 + uses: github/codeql-action/init@v4 with: config-file: ./.github/codeql/codeql-config.yml # Override language selection by uncommenting this and choosing your languages @@ -34,7 +34,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v3 + uses: github/codeql-action/autobuild@v4 # ℹ️ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -48,4 +48,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v3 + uses: github/codeql-action/analyze@v4 diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml index e08c6106c0..84165b9cd2 100644 --- a/.github/workflows/gh-pages.yml +++ b/.github/workflows/gh-pages.yml @@ -14,7 +14,7 @@ jobs: pages: write steps: - name: Setup python - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 - name: Checkout uses: actions/checkout@master with: diff --git a/.gitignore b/.gitignore index c971b8f9c5..8a0b1257b5 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ _build/ # Distribution / packaging .Python env/ +venv/ build/ develop-eggs/ dist/ diff --git a/Localization/unscented_kalman_filter/unscented_kalman_filter.py b/Localization/unscented_kalman_filter/unscented_kalman_filter.py index 4af748ec71..2119e1eacc 100644 --- a/Localization/unscented_kalman_filter/unscented_kalman_filter.py +++ b/Localization/unscented_kalman_filter/unscented_kalman_filter.py @@ -91,6 +91,29 @@ def observation_model(x): def generate_sigma_points(xEst, PEst, gamma): + """ + Generate sigma points for UKF. + + Sigma points are deterministically sampled points used to capture + the mean and covariance of the state distribution. + + Parameters + ---------- + xEst : numpy.ndarray + Current state estimate (n x 1) + PEst : numpy.ndarray + Current state covariance estimate (n x n) + gamma : float + Scaling parameter sqrt(n + lambda) + + Returns + ------- + sigma : numpy.ndarray + Sigma points (n x 2n+1) + sigma[:, 0] = xEst + sigma[:, 1:n+1] = xEst + gamma * sqrt(P) + sigma[:, n+1:2n+1] = xEst - gamma * sqrt(P) + """ sigma = xEst Psqrt = scipy.linalg.sqrtm(PEst) n = len(xEst[:, 0]) @@ -149,6 +172,35 @@ def calc_pxz(sigma, x, z_sigma, zb, wc): def ukf_estimation(xEst, PEst, z, u, wm, wc, gamma): + """ + Unscented Kalman Filter estimation. + + Performs one iteration of UKF state estimation using the unscented transform. + + Parameters + ---------- + xEst : numpy.ndarray + Current state estimate [x, y, yaw, v]^T (4 x 1) + PEst : numpy.ndarray + Current state covariance estimate (4 x 4) + z : numpy.ndarray + Observation vector [x_obs, y_obs]^T (2 x 1) + u : numpy.ndarray + Control input [velocity, yaw_rate]^T (2 x 1) + wm : numpy.ndarray + Weights for calculating mean (1 x 2n+1) + wc : numpy.ndarray + Weights for calculating covariance (1 x 2n+1) + gamma : float + Sigma point scaling parameter sqrt(n + lambda) + + Returns + ------- + xEst : numpy.ndarray + Updated state estimate (4 x 1) + PEst : numpy.ndarray + Updated state covariance estimate (4 x 4) + """ # Predict sigma = generate_sigma_points(xEst, PEst, gamma) sigma = predict_sigma_motion(sigma, u) @@ -194,6 +246,31 @@ def plot_covariance_ellipse(xEst, PEst): # pragma: no cover def setup_ukf(nx): + """ + Setup UKF parameters and weights. + + Calculates the weights for mean and covariance computation, + and the scaling parameter gamma for sigma point generation. + + Parameters + ---------- + nx : int + Dimension of the state vector + + Returns + ------- + wm : numpy.ndarray + Weights for calculating mean (1 x 2n+1) + wm[0] = lambda / (n + lambda) + wm[i] = 1 / (2(n + lambda)) for i > 0 + wc : numpy.ndarray + Weights for calculating covariance (1 x 2n+1) + wc[0] = lambda / (n + lambda) + (1 - alpha^2 + beta) + wc[i] = 1 / (2(n + lambda)) for i > 0 + gamma : float + Sigma point scaling parameter sqrt(n + lambda) + where lambda = alpha^2 * (n + kappa) - n + """ lamb = ALPHA ** 2 * (nx + KAPPA) - nx # calculate weights wm = [lamb / (lamb + nx)] diff --git a/PathPlanning/DStarLite/d_star_lite.py b/PathPlanning/DStarLite/d_star_lite.py index f099530173..1a44d84fa5 100644 --- a/PathPlanning/DStarLite/d_star_lite.py +++ b/PathPlanning/DStarLite/d_star_lite.py @@ -63,9 +63,7 @@ def __init__(self, ox: list, oy: list): self.y_max = int(abs(max(oy) - self.y_min_world)) self.obstacles = [Node(x - self.x_min_world, y - self.y_min_world) for x, y in zip(ox, oy)] - self.obstacles_xy = np.array( - [[obstacle.x, obstacle.y] for obstacle in self.obstacles] - ) + self.obstacles_xy = {(obstacle.x, obstacle.y) for obstacle in self.obstacles} self.start = Node(0, 0) self.goal = Node(0, 0) self.U = list() # type: ignore @@ -73,7 +71,7 @@ def __init__(self, ox: list, oy: list): self.kold = 0.0 self.rhs = self.create_grid(float("inf")) self.g = self.create_grid(float("inf")) - self.detected_obstacles_xy = np.empty((0, 2)) + self.detected_obstacles_xy: set[tuple[int, int]] = set() self.xy = np.empty((0, 2)) if show_animation: self.detected_obstacles_for_plotting_x = list() # type: ignore @@ -84,18 +82,8 @@ def create_grid(self, val: float): return np.full((self.x_max, self.y_max), val) def is_obstacle(self, node: Node): - x = np.array([node.x]) - y = np.array([node.y]) - obstacle_x_equal = self.obstacles_xy[:, 0] == x - obstacle_y_equal = self.obstacles_xy[:, 1] == y - is_in_obstacles = (obstacle_x_equal & obstacle_y_equal).any() - - is_in_detected_obstacles = False - if self.detected_obstacles_xy.shape[0] > 0: - is_x_equal = self.detected_obstacles_xy[:, 0] == x - is_y_equal = self.detected_obstacles_xy[:, 1] == y - is_in_detected_obstacles = (is_x_equal & is_y_equal).any() - + is_in_obstacles = (node.x, node.y) in self.obstacles_xy + is_in_detected_obstacles = (node.x, node.y) in self.detected_obstacles_xy return is_in_obstacles or is_in_detected_obstacles def c(self, node1: Node, node2: Node): @@ -157,7 +145,7 @@ def initialize(self, start: Node, goal: Node): self.g = self.create_grid(math.inf) self.rhs[self.goal.x][self.goal.y] = 0 self.U.append((self.goal, self.calculate_key(self.goal))) - self.detected_obstacles_xy = np.empty((0, 2)) + self.detected_obstacles_xy = set() def update_vertex(self, u: Node): if not compare_coordinates(u, self.goal): @@ -215,12 +203,7 @@ def detect_changes(self): compare_coordinates(spoofed_obstacle, self.goal): continue changed_vertices.append(spoofed_obstacle) - self.detected_obstacles_xy = np.concatenate( - ( - self.detected_obstacles_xy, - [[spoofed_obstacle.x, spoofed_obstacle.y]] - ) - ) + self.detected_obstacles_xy.add((spoofed_obstacle.x, spoofed_obstacle.y)) if show_animation: self.detected_obstacles_for_plotting_x.append( spoofed_obstacle.x + self.x_min_world) @@ -241,12 +224,7 @@ def detect_changes(self): compare_coordinates(new_obs, self.goal): return changed_vertices changed_vertices.append(Node(x, y)) - self.detected_obstacles_xy = np.concatenate( - ( - self.detected_obstacles_xy, - [[x, y]] - ) - ) + self.detected_obstacles_xy.add((x, y)) if show_animation: self.detected_obstacles_for_plotting_x.append(x + self.x_min_world) diff --git a/PathPlanning/ParticleSwarmOptimization/particle_swarm_optimization.py b/PathPlanning/ParticleSwarmOptimization/particle_swarm_optimization.py new file mode 100755 index 0000000000..4d1df5e197 --- /dev/null +++ b/PathPlanning/ParticleSwarmOptimization/particle_swarm_optimization.py @@ -0,0 +1,335 @@ +""" +Particle Swarm Optimization (PSO) Path Planning + +author: Anish (@anishk85) + +See Wikipedia article (https://en.wikipedia.org/wiki/Particle_swarm_optimization) + +References: + - Kennedy, J.; Eberhart, R. (1995). "Particle Swarm Optimization" + - Shi, Y.; Eberhart, R. (1998). "A Modified Particle Swarm Optimizer" + - https://machinelearningmastery.com/a-gentle-introduction-to-particle-swarm-optimization/ + +This implementation uses PSO to find collision-free paths by treating +path planning as an optimization problem where particles explore the +search space to minimize distance to target while avoiding obstacles. +""" +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.animation as animation +import matplotlib.patches as patches +import signal +import sys +# Add show_animation flag for consistency with other planners +show_animation = True + +def signal_handler(sig, frame): + print("\nExiting...") + plt.close("all") + sys.exit(0) + +signal.signal(signal.SIGINT, signal_handler) + +class Particle: + """Represents a single particle in the PSO swarm. + Each particle maintains its current position, velocity, and personal best + position discovered during the search. Particles explore the search space + by updating their velocity based on personal experience (cognitive component) + and swarm knowledge (social component). + Attributes: + search_bounds: List of tuples [(x_min, x_max), (y_min, y_max)] defining search space + max_velocity: Maximum velocity allowed in each dimension (5% of search space range) + position: Current 2D position [x, y] in search space + velocity: Current velocity vector [vx, vy] + personal_best_position: Personal best position found so far + personal_best_value: Fitness value at personal best position + path: List of all positions visited by this particle + """ + + def __init__(self, search_bounds, spawn_bounds): + self.search_bounds = search_bounds + self.max_velocity = np.array([(b[1] - b[0]) * 0.05 for b in search_bounds]) + self.position = np.array([np.random.uniform(b[0], b[1]) for b in spawn_bounds]) + self.velocity = np.random.randn(2) * 0.1 + self.personal_best_position = self.position.copy() + self.personal_best_value = np.inf + self.path = [self.position.copy()] + + def update_velocity(self, gbest_pos, w, c1, c2): + """Update particle velocity using PSO equation: + v = w*v + c1*r1*(personal_best - x) + c2*r2*(gbest - x) + """ + r1 = np.random.rand(2) + r2 = np.random.rand(2) + cognitive = c1 * r1 * (self.personal_best_position - self.position) + social = c2 * r2 * (gbest_pos - self.position) + self.velocity = w * self.velocity + cognitive + social + self.velocity = np.clip(self.velocity, -self.max_velocity, self.max_velocity) + + def update_position(self): + self.position = self.position + self.velocity + # Keep in bounds + for i in range(2): + self.position[i] = np.clip( + self.position[i], self.search_bounds[i][0], self.search_bounds[i][1] + ) + self.path.append(self.position.copy()) + +class PSOSwarm: + + def __init__( + self, n_particles, max_iter, target, search_bounds, spawn_bounds, obstacles + ): + self.n_particles = n_particles + self.max_iter = max_iter + self.target = np.array(target) + self.obstacles = obstacles + self.search_bounds = search_bounds + # PSO parameters + self.w_start = 0.9 # Initial inertia weight + self.w_end = 0.4 # Final inertia weight + self.c1 = 1.5 # Cognitive coefficient + self.c2 = 1.5 # Social coefficient + # Initialize particles + self.particles = [ + Particle(search_bounds, spawn_bounds) for _ in range(n_particles) + ] + self.gbest_position = None + self.gbest_value = np.inf + self.gbest_path = [] + self.iteration = 0 + + def fitness(self, pos): + """Calculate fitness - distance to target + obstacle penalty""" + dist = np.linalg.norm(pos - self.target) + # Obstacle penalty + penalty = 0 + for ox, oy, r in self.obstacles: + obs_dist = np.linalg.norm(pos - np.array([ox, oy])) + if obs_dist < r: + penalty += 1000 # Inside obstacle + elif obs_dist < r + 5: + penalty += 50 / (obs_dist - r + 0.1) # Too close + return dist + penalty + + def check_collision(self, start, end, obstacle): + """Check if path from start to end hits obstacle using line-circle intersection + Args: + start: Starting position (numpy array) + end: Ending position (numpy array) + obstacle: Tuple (ox, oy, r) representing obstacle center and radius + Returns: + bool: True if collision detected, False otherwise + """ + ox, oy, r = obstacle + center = np.array([ox, oy]) + # Vector math for line-circle intersection + d = end - start + f = start - center + a = np.dot(d, d) + # Guard against zero-length steps to prevent ZeroDivisionError + if a < 1e-10: # Near-zero length step + # Check if start point is inside obstacle + return np.linalg.norm(f) <= r + b = 2 * np.dot(f, d) + c = np.dot(f, f) - r * r + discriminant = b * b - 4 * a * c + if discriminant < 0: + return False + # Check if intersection on segment + sqrt_discriminant = np.sqrt(discriminant) + t1 = (-b - sqrt_discriminant) / (2 * a) + t2 = (-b + sqrt_discriminant) / (2 * a) + return (0 <= t1 <= 1) or (0 <= t2 <= 1) + + def step(self): + """Run one PSO iteration + Returns: + bool: True if algorithm should continue, False if completed + """ + if self.iteration >= self.max_iter: + return False + # Update inertia weight (linear decay) + w = self.w_start - (self.w_start - self.w_end) * ( + self.iteration / self.max_iter + ) + # Evaluate all particles + for particle in self.particles: + value = self.fitness(particle.position) + # Update personal best + if value < particle.personal_best_value: + particle.personal_best_value = value + particle.personal_best_position = particle.position.copy() + # Update global best + if value < self.gbest_value: + self.gbest_value = value + self.gbest_position = particle.position.copy() + if self.gbest_position is not None: + self.gbest_path.append(self.gbest_position.copy()) + # Update particles + for particle in self.particles: + particle.update_velocity(self.gbest_position, w, self.c1, self.c2) + # Predict next position + next_pos = particle.position + particle.velocity + # Check collision + collision = False + for obs in self.obstacles: + if self.check_collision(particle.position, next_pos, obs): + collision = True + break + if collision: + # Reduce velocity if collision detected + particle.velocity *= 0.2 + particle.update_position() + self.iteration += 1 + if show_animation and self.iteration % 20 == 0: + print( + f"Iteration {self.iteration}/{self.max_iter}, Best: {self.gbest_value:.2f}" + ) + return True + +def main(): + """Run PSO path planning algorithm demonstration. + This function demonstrates PSO-based path planning with the following setup: + - 15 particles exploring a (-50,50) x (-50,50) search space + - Start zone: (-45,-45) to (-35,-35) + - Target: (40, 35) + - 4 circular obstacles with collision avoidance + - Real-time visualization showing particle convergence (if show_animation=True) + - Headless mode support for testing (if show_animation=False) + The algorithm runs for up to 150 iterations, displaying particle movement, + personal/global best positions, and the evolving optimal path. + """ + print(__file__ + " start!!") + # Set matplotlib backend for headless environments + if not show_animation: + plt.switch_backend("Agg") # Use non-GUI backend for testing + # Setup parameters + N_PARTICLES = 15 + MAX_ITER = 150 + SEARCH_BOUNDS = [(-50, 50), (-50, 50)] + TARGET = [40, 35] + SPAWN_AREA = [(-45, -35), (-45, -35)] + OBSTACLES = [(10, 15, 8), (-20, 0, 12), (20, -25, 10), (-5, -30, 7)] + swarm = PSOSwarm( + n_particles=N_PARTICLES, + max_iter=MAX_ITER, + target=TARGET, + search_bounds=SEARCH_BOUNDS, + spawn_bounds=SPAWN_AREA, + obstacles=OBSTACLES, + ) + # pragma: no cover + if show_animation: + # Visualization setup + signal.signal(signal.SIGINT, signal_handler) + fig, ax = plt.subplots(figsize=(10, 10)) + ax.set_xlim(SEARCH_BOUNDS[0]) + ax.set_ylim(SEARCH_BOUNDS[1]) + ax.set_title("PSO Path Planning with Collision Avoidance", fontsize=14) + ax.grid(True, alpha=0.3) + # Draw obstacles + for ox, oy, r in OBSTACLES: + circle = patches.Circle((ox, oy), r, color="gray", alpha=0.7) + ax.add_patch(circle) + # Draw spawn area + spawn_rect = patches.Rectangle( + (SPAWN_AREA[0][0], SPAWN_AREA[1][0]), + SPAWN_AREA[0][1] - SPAWN_AREA[0][0], + SPAWN_AREA[1][1] - SPAWN_AREA[1][0], + linewidth=2, + edgecolor="green", + facecolor="green", + alpha=0.2, + label="Start Zone", + ) + ax.add_patch(spawn_rect) + # Draw target + ax.plot(TARGET[0], TARGET[1], "r*", markersize=20, label="Target") + # Initialize plot elements + particles_scatter = ax.scatter( + [], [], c="blue", s=50, alpha=0.6, label="Particles" + ) + gbest_scatter = ax.plot([], [], "yo", markersize=12, label="Best Position")[0] + particle_paths = [ + ax.plot([], [], "b-", lw=0.5, alpha=0.2)[0] for _ in range(N_PARTICLES) + ] + gbest_path_line = ax.plot([], [], "y--", lw=2, alpha=0.8, label="Best Path")[0] + iteration_text = ax.text( + 0.02, + 0.95, + "", + transform=ax.transAxes, + fontsize=12, + verticalalignment="top", + bbox=dict(boxstyle="round", facecolor="wheat", alpha=0.5), + ) + ax.legend(loc="upper right") + def animate(frame): + """Animation function for matplotlib FuncAnimation""" + if not swarm.step(): + return ( + particles_scatter, + gbest_scatter, + gbest_path_line, + iteration_text, + *particle_paths, + ) + # Update particle positions + positions = np.array([p.position for p in swarm.particles]) + particles_scatter.set_offsets(positions) + # Update particle paths + for i, particle in enumerate(swarm.particles): + if len(particle.path) > 1: + path = np.array(particle.path) + particle_paths[i].set_data(path[:, 0], path[:, 1]) + # Update global best + if swarm.gbest_position is not None: + gbest_scatter.set_data( + [swarm.gbest_position[0]], [swarm.gbest_position[1]] + ) + if len(swarm.gbest_path) > 1: + gbest = np.array(swarm.gbest_path) + gbest_path_line.set_data(gbest[:, 0], gbest[:, 1]) + # Update text + iteration_text.set_text( + f"Iteration: {swarm.iteration}/{MAX_ITER}\n" + f"Best Fitness: {swarm.gbest_value:.2f}" + ) + return ( + particles_scatter, + gbest_scatter, + gbest_path_line, + iteration_text, + *particle_paths, + ) + # Create animation and store reference to prevent garbage collection + animation_ref = animation.FuncAnimation( + fig, animate, frames=MAX_ITER, interval=100, blit=True, repeat=False + ) + plt.tight_layout() + plt.show() + # Keep reference to prevent garbage collection + return animation_ref + else: + # Run without animation for testing + print("Running PSO algorithm without animation...") + iteration_count = 0 + while swarm.step(): + iteration_count += 1 + if iteration_count >= MAX_ITER: + break + print(f"PSO completed after {iteration_count} iterations") + print(f"Best fitness: {swarm.gbest_value:.2f}") + if swarm.gbest_position is not None: + print( + f"Best position: [{swarm.gbest_position[0]:.2f}, {swarm.gbest_position[1]:.2f}]" + ) + return None +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nProgram interrupted by user") + plt.close("all") + sys.exit(0) \ No newline at end of file diff --git a/PathPlanning/RRT/rrt_with_pathsmoothing.py b/PathPlanning/RRT/rrt_with_pathsmoothing.py index 2ed6a366d1..ac68efe23f 100644 --- a/PathPlanning/RRT/rrt_with_pathsmoothing.py +++ b/PathPlanning/RRT/rrt_with_pathsmoothing.py @@ -51,30 +51,93 @@ def get_target_point(path, targetL): return [x, y, ti] -def line_collision_check(first, second, obstacleList): - # Line Equation - - x1 = first[0] - y1 = first[1] - x2 = second[0] - y2 = second[1] - - try: - a = y2 - y1 - b = -(x2 - x1) - c = y2 * (x2 - x1) - x2 * (y2 - y1) - except ZeroDivisionError: - return False - - for (ox, oy, size) in obstacleList: - d = abs(a * ox + b * oy + c) / (math.hypot(a, b)) - if d <= size: - return False - - return True # OK - - -def path_smoothing(path, max_iter, obstacle_list): +def is_point_collision(x, y, obstacle_list, robot_radius): + """ + Check whether a single point collides with any obstacle. + + This function calculates the Euclidean distance between the given point (x, y) + and each obstacle center. If the distance is less than or equal to the sum of + the obstacle's radius and the robot's radius, a collision is detected. + + Args: + x (float): X-coordinate of the point to check. + y (float): Y-coordinate of the point to check. + obstacle_list (List[Tuple[float, float, float]]): List of obstacles defined as (ox, oy, radius). + robot_radius (float): Radius of the robot, used to inflate the obstacles. + + Returns: + bool: True if the point is in collision with any obstacle, False otherwise. + """ + for (ox, oy, obstacle_radius) in obstacle_list: + d = math.hypot(ox - x, oy - y) + if d <= obstacle_radius + robot_radius: + return True # Collided + return False + + +def line_collision_check(first, second, obstacle_list, robot_radius=0.0, sample_step=0.2): + """ + Check if the line segment between `first` and `second` collides with any obstacle. + Considers the robot_radius by inflating the obstacle size. + + Args: + first (List[float]): Start point of the line [x, y] + second (List[float]): End point of the line [x, y] + obstacle_list (List[Tuple[float, float, float]]): Obstacles as (x, y, radius) + robot_radius (float): Radius of robot + sample_step (float): Distance between sampling points along the segment + + Returns: + bool: True if collision-free, False otherwise + """ + x1, y1 = first[0], first[1] + x2, y2 = second[0], second[1] + + dx = x2 - x1 + dy = y2 - y1 + length = math.hypot(dx, dy) + + if length == 0: + # Degenerate case: point collision check + return not is_point_collision(x1, y1, obstacle_list, robot_radius) + + steps = int(length / sample_step) + 1 # Sampling every sample_step along the segment + + for i in range(steps + 1): + t = i / steps + x = x1 + t * dx + y = y1 + t * dy + + if is_point_collision(x, y, obstacle_list, robot_radius): + return False # Collision found + + return True # Safe + + +def path_smoothing(path, max_iter, obstacle_list, robot_radius=0.0): + """ + Smooths a given path by iteratively replacing segments with shortcut connections, + while ensuring the new segments are collision-free. + + The algorithm randomly picks two points along the original path and attempts to + connect them with a straight line. If the line does not collide with any obstacles + (considering the robot's radius), the intermediate path points between them are + replaced with the direct connection. + + Args: + path (List[List[float]]): The original path as a list of [x, y] coordinates. + max_iter (int): Number of iterations for smoothing attempts. + obstacle_list (List[Tuple[float, float, float]]): List of obstacles represented as + (x, y, radius). + robot_radius (float, optional): Radius of the robot, used to inflate obstacle size + during collision checking. Defaults to 0.0. + + Returns: + List[List[float]]: The smoothed path as a list of [x, y] coordinates. + + Example: + >>> smoothed = path_smoothing(path, 1000, obstacle_list, robot_radius=0.5) + """ le = get_path_length(path) for i in range(max_iter): @@ -94,7 +157,7 @@ def path_smoothing(path, max_iter, obstacle_list): continue # collision check - if not line_collision_check(first, second, obstacle_list): + if not line_collision_check(first, second, obstacle_list, robot_radius): continue # Create New path @@ -119,14 +182,16 @@ def main(): (3, 10, 2), (7, 5, 2), (9, 5, 2) - ] # [x,y,size] + ] # [x,y,radius] rrt = RRT(start=[0, 0], goal=[6, 10], - rand_area=[-2, 15], obstacle_list=obstacleList) + rand_area=[-2, 15], obstacle_list=obstacleList, + robot_radius=0.3) path = rrt.planning(animation=show_animation) # Path smoothing maxIter = 1000 - smoothedPath = path_smoothing(path, maxIter, obstacleList) + smoothedPath = path_smoothing(path, maxIter, obstacleList, + robot_radius=rrt.robot_radius) # Draw final path if show_animation: diff --git a/PathPlanning/ThetaStar/theta_star.py b/PathPlanning/ThetaStar/theta_star.py new file mode 100644 index 0000000000..73a1448b28 --- /dev/null +++ b/PathPlanning/ThetaStar/theta_star.py @@ -0,0 +1,345 @@ +""" + +Theta* grid planning + +author: Musab Kasbati (@Musab1Blaser) + +See Wikipedia article (https://cdn.aaai.org/AAAI/2007/AAAI07-187.pdf) + +""" + +import math + +import matplotlib.pyplot as plt + +show_animation = True +use_theta_star = True + + +class ThetaStarPlanner: + + def __init__(self, ox, oy, resolution, rr): + """ + Initialize grid map for theta star planning + + ox: x position list of Obstacles [m] + oy: y position list of Obstacles [m] + resolution: grid resolution [m] + rr: robot radius[m] + """ + + self.resolution = resolution + self.rr = rr + self.min_x, self.min_y = 0, 0 + self.max_x, self.max_y = 0, 0 + self.obstacle_map = None + self.x_width, self.y_width = 0, 0 + self.motion = self.get_motion_model() + self.calc_obstacle_map(ox, oy) + + class Node: + def __init__(self, x, y, cost, parent_index): + self.x = x # index of grid + self.y = y # index of grid + self.cost = cost + self.parent_index = parent_index + + def __str__(self): + return str(self.x) + "," + str(self.y) + "," + str( + self.cost) + "," + str(self.parent_index) + + def planning(self, sx, sy, gx, gy): + """ + Theta star path search + + input: + s_x: start x position [m] + s_y: start y position [m] + gx: goal x position [m] + gy: goal y position [m] + + output: + rx: x position list of the final path + ry: y position list of the final path + """ + + start_node = self.Node(self.calc_xy_index(sx, self.min_x), + self.calc_xy_index(sy, self.min_y), 0.0, -1) + goal_node = self.Node(self.calc_xy_index(gx, self.min_x), + self.calc_xy_index(gy, self.min_y), 0.0, -1) + + open_set, closed_set = dict(), dict() + open_set[self.calc_grid_index(start_node)] = start_node + + while True: + if len(open_set) == 0: + print("Open set is empty..") + break + + c_id = min( + open_set, + key=lambda o: open_set[o].cost + self.calc_heuristic(goal_node, + open_set[ + o])) + current = open_set[c_id] + + # show graph + if show_animation: # pragma: no cover + x = self.calc_grid_position(current.x, self.min_x) + y = self.calc_grid_position(current.y, self.min_y) + + # Draw an arrow toward the parent + if current.parent_index != -1 and current.parent_index in closed_set: + parent = closed_set[current.parent_index] + px = self.calc_grid_position(parent.x, self.min_x) + py = self.calc_grid_position(parent.y, self.min_y) + + # Vector from current to parent + dx = px - x + dy = py - y + + # scale down vector for visibility + norm = math.hypot(dx, dy) + dx /= norm + dy /= norm + + # Draw a small arrow (scale it down for visibility) + plt.arrow(x, y, dx, dy, + head_width=0.5, head_length=0.5, + fc='c', ec='c', alpha=0.7) + + # For stopping simulation with the esc key + plt.gcf().canvas.mpl_connect( + 'key_release_event', + lambda event: [exit(0) if event.key == 'escape' else None] + ) + + if len(closed_set.keys()) % 10 == 0: + plt.pause(0.001) + + if current.x == goal_node.x and current.y == goal_node.y: + print("Find goal") + goal_node.parent_index = current.parent_index + goal_node.cost = current.cost + break + + # Remove the item from the open set + del open_set[c_id] + + # Add it to the closed set + closed_set[c_id] = current + + # expand_grid search grid based on motion model + for i, _ in enumerate(self.motion): + node = self.Node(current.x + self.motion[i][0], + current.y + self.motion[i][1], + current.cost + self.motion[i][2], c_id) # cost may later be updated by theta star path compression + n_id = self.calc_grid_index(node) + + if not self.verify_node(node): + continue + + if n_id in closed_set: + continue + + # Theta* modification: + if use_theta_star and current.parent_index != -1 and current.parent_index in closed_set: + grandparent = closed_set[current.parent_index] + if self.line_of_sight(grandparent, node): + # If parent(current) has line of sight to neighbor + node.cost = grandparent.cost + math.hypot(node.x - grandparent.x, node.y - grandparent.y) + node.parent_index = current.parent_index # compress path directly to grandparent + + if n_id not in open_set: + open_set[n_id] = node + else: + if open_set[n_id].cost > node.cost: + # This path is the best until now. record it + open_set[n_id] = node + + + rx, ry = self.calc_final_path(goal_node, closed_set) + + return rx, ry + + def calc_final_path(self, goal_node, closed_set): + # generate final course + rx, ry = [self.calc_grid_position(goal_node.x, self.min_x)], [ + self.calc_grid_position(goal_node.y, self.min_y)] + parent_index = goal_node.parent_index + while parent_index != -1: + n = closed_set[parent_index] + rx.append(self.calc_grid_position(n.x, self.min_x)) + ry.append(self.calc_grid_position(n.y, self.min_y)) + parent_index = n.parent_index + + return rx, ry + + @staticmethod + def calc_heuristic(n1, n2): + w = 1.0 # weight of heuristic + d = w * math.hypot(n1.x - n2.x, n1.y - n2.y) + return d + + def calc_grid_position(self, index, min_position): + """ + calc grid position + + :param index: + :param min_position: + :return: + """ + pos = index * self.resolution + min_position + return pos + + def calc_xy_index(self, position, min_pos): + return round((position - min_pos) / self.resolution) + + def calc_grid_index(self, node): + return (node.y - self.min_y) * self.x_width + (node.x - self.min_x) + + def line_of_sight(self, node1, node2): + """ + Check if there is a direct line of sight between two nodes. + Uses Bresenham’s line algorithm for grid traversal. + """ + x0 = node1.x + y0 = node1.y + x1 = node2.x + y1 = node2.y + + dx = abs(x1 - x0) + dy = abs(y1 - y0) + sx = 1 if x0 < x1 else -1 + sy = 1 if y0 < y1 else -1 + + err = dx - dy + + while True: + if not self.verify_node(self.Node(x0, y0, 0, -1)): + return False + if x0 == x1 and y0 == y1: + break + e2 = 2 * err + if e2 > -dy: + err -= dy + x0 += sx + if e2 < dx: + err += dx + y0 += sy + return True + + + def verify_node(self, node): + px = self.calc_grid_position(node.x, self.min_x) + py = self.calc_grid_position(node.y, self.min_y) + + if px < self.min_x: + return False + elif py < self.min_y: + return False + elif px >= self.max_x: + return False + elif py >= self.max_y: + return False + + # collision check + if self.obstacle_map[node.x][node.y]: + return False + + return True + + def calc_obstacle_map(self, ox, oy): + + self.min_x = round(min(ox)) + self.min_y = round(min(oy)) + self.max_x = round(max(ox)) + self.max_y = round(max(oy)) + print("min_x:", self.min_x) + print("min_y:", self.min_y) + print("max_x:", self.max_x) + print("max_y:", self.max_y) + + self.x_width = round((self.max_x - self.min_x) / self.resolution) + self.y_width = round((self.max_y - self.min_y) / self.resolution) + print("x_width:", self.x_width) + print("y_width:", self.y_width) + + # obstacle map generation + self.obstacle_map = [[False for _ in range(self.y_width)] + for _ in range(self.x_width)] + for ix in range(self.x_width): + x = self.calc_grid_position(ix, self.min_x) + for iy in range(self.y_width): + y = self.calc_grid_position(iy, self.min_y) + for iox, ioy in zip(ox, oy): + d = math.hypot(iox - x, ioy - y) + if d <= self.rr: + self.obstacle_map[ix][iy] = True + break + + @staticmethod + def get_motion_model(): + # dx, dy, cost + motion = [[1, 0, 1], + [0, 1, 1], + [-1, 0, 1], + [0, -1, 1], + [-1, -1, math.sqrt(2)], + [-1, 1, math.sqrt(2)], + [1, -1, math.sqrt(2)], + [1, 1, math.sqrt(2)]] + + return motion + + +def main(): + print(__file__ + " start!!") + + # start and goal position + sx = 10.0 # [m] + sy = 10.0 # [m] + gx = 50.0 # [m] + gy = 50.0 # [m] + grid_size = 2.0 # [m] + robot_radius = 1.0 # [m] + + # set obstacle positions + ox, oy = [], [] + for i in range(-10, 60): + ox.append(i) + oy.append(-10.0) + for i in range(-10, 60): + ox.append(60.0) + oy.append(i) + for i in range(-10, 61): + ox.append(i) + oy.append(60.0) + for i in range(-10, 61): + ox.append(-10.0) + oy.append(i) + for i in range(-10, 40): + ox.append(20.0) + oy.append(i) + for i in range(0, 40): + ox.append(40.0) + oy.append(60.0 - i) + + if show_animation: # pragma: no cover + plt.plot(ox, oy, ".k") + plt.plot(sx, sy, "og") + plt.plot(gx, gy, "xb") + plt.grid(True) + plt.axis("equal") + + theta_star = ThetaStarPlanner(ox, oy, grid_size, robot_radius) + rx, ry = theta_star.planning(sx, sy, gx, gy) + + if show_animation: # pragma: no cover + plt.plot(rx, ry, "-r") + plt.pause(0.001) + plt.show() + + +if __name__ == '__main__': + main() diff --git a/PathPlanning/TimeBasedPathPlanning/BaseClasses.py b/PathPlanning/TimeBasedPathPlanning/BaseClasses.py new file mode 100644 index 0000000000..745cde45fb --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/BaseClasses.py @@ -0,0 +1,49 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + Position, +) +from PathPlanning.TimeBasedPathPlanning.Node import NodePath +import random +import numpy.random as numpy_random + +# Seed randomness for reproducibility +RANDOM_SEED = 50 +random.seed(RANDOM_SEED) +numpy_random.seed(RANDOM_SEED) + +class SingleAgentPlanner(ABC): + """ + Base class for single agent planners + """ + + @staticmethod + @abstractmethod + def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: + pass + +@dataclass +class StartAndGoal: + # Index of this agent + index: int + # Start position of the robot + start: Position + # Goal position of the robot + goal: Position + + def distance_start_to_goal(self) -> float: + return pow(self.goal.x - self.start.x, 2) + pow(self.goal.y - self.start.y, 2) + +class MultiAgentPlanner(ABC): + """ + Base class for multi-agent planners + """ + + @staticmethod + @abstractmethod + def plan(grid: Grid, start_and_goal_positions: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> tuple[list[StartAndGoal], list[NodePath]]: + """ + Plan for all agents. Returned paths are in order corresponding to the returned list of `StartAndGoal` objects + """ + pass \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py index 2a47023f8c..ccc2989001 100644 --- a/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py +++ b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py @@ -7,31 +7,7 @@ import matplotlib.pyplot as plt from enum import Enum from dataclasses import dataclass - -@dataclass(order=True) -class Position: - x: int - y: int - - def as_ndarray(self) -> np.ndarray: - return np.array([self.x, self.y]) - - def __add__(self, other): - if isinstance(other, Position): - return Position(self.x + other.x, self.y + other.y) - raise NotImplementedError( - f"Addition not supported for Position and {type(other)}" - ) - - def __sub__(self, other): - if isinstance(other, Position): - return Position(self.x - other.x, self.y - other.y) - raise NotImplementedError( - f"Subtraction not supported for Position and {type(other)}" - ) - - def __hash__(self): - return hash((self.x, self.y)) +from PathPlanning.TimeBasedPathPlanning.Node import NodePath, Position @dataclass class Interval: @@ -43,6 +19,8 @@ class ObstacleArrangement(Enum): RANDOM = 0 # Obstacles start in a line in y at center of grid and move side-to-side in x ARRANGEMENT1 = 1 + # Static obstacle arrangement + NARROW_CORRIDOR = 2 """ Generates a 2d numpy array with lists for elements. @@ -87,6 +65,8 @@ def __init__( self.obstacle_paths = self.generate_dynamic_obstacles(num_obstacles) elif obstacle_arrangement == ObstacleArrangement.ARRANGEMENT1: self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles) + elif obstacle_arrangement == ObstacleArrangement.NARROW_CORRIDOR: + self.obstacle_paths = self.generate_narrow_corridor_obstacles(num_obstacles) for i, path in enumerate(self.obstacle_paths): obs_idx = i + 1 # avoid using 0 - that indicates free space in the grid @@ -184,6 +164,26 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]: obstacle_paths.append(path) return obstacle_paths + + def generate_narrow_corridor_obstacles(self, obs_count: int) -> list[list[Position]]: + obstacle_paths = [] + + for y in range(0, self.grid_size[1]): + if y > obs_count: + break + + if y == self.grid_size[1] // 2: + # Skip the middle row + continue + + obstacle_path = [] + x = self.grid_size[0] // 2 # middle of the grid + for t in range(0, self.time_limit - 1): + obstacle_path.append(Position(x, y)) + + obstacle_paths.append(obstacle_path) + + return obstacle_paths """ Check if the given position is valid at time t @@ -196,11 +196,11 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]: bool: True if position/time combination is valid, False otherwise """ def valid_position(self, position: Position, t: int) -> bool: - # Check if new position is in grid + # Check if position is in grid if not self.inside_grid_bounds(position): return False - # Check if new position is not occupied at time t + # Check if position is not occupied at time t return self.reservation_matrix[position.x, position.y, t] == 0 """ @@ -289,9 +289,48 @@ def get_safe_intervals_at_cell(self, cell: Position) -> list[Interval]: # both the time step when it is entering the cell, and the time step when it is leaving the cell. intervals = [interval for interval in intervals if interval.start_time != interval.end_time] return intervals + + """ + Reserve an agent's path in the grid. Raises an exception if the agent's index is 0, or if a position is + already reserved by a different agent. + """ + def reserve_path(self, node_path: NodePath, agent_index: int): + if agent_index == 0: + raise Exception("Agent index cannot be 0") + + for i, node in enumerate(node_path.path): + reservation_finish_time = node.time + 1 + if i < len(node_path.path) - 1: + reservation_finish_time = node_path.path[i + 1].time -show_animation = True + self.reserve_position(node.position, agent_index, Interval(node.time, reservation_finish_time)) + + """ + Reserve a position for the provided agent during the provided time interval. + Raises an exception if the agent's index is 0, or if the position is already reserved by a different agent during the interval. + """ + def reserve_position(self, position: Position, agent_index: int, interval: Interval): + if agent_index == 0: + raise Exception("Agent index cannot be 0") + + for t in range(interval.start_time, interval.end_time + 1): + current_reserver = self.reservation_matrix[position.x, position.y, t] + if current_reserver not in [0, agent_index]: + raise Exception( + f"Agent {agent_index} tried to reserve a position already reserved by another agent: {position} at time {t}, reserved by {current_reserver}" + ) + self.reservation_matrix[position.x, position.y, t] = agent_index + + """ + Clears the initial reservation for an agent by clearing reservations at its start position with its index for + from time 0 to the time limit. + """ + def clear_initial_reservation(self, position: Position, agent_index: int): + for t in range(self.time_limit): + if self.reservation_matrix[position.x, position.y, t] == agent_index: + self.reservation_matrix[position.x, position.y, t] = 0 +show_animation = True def main(): grid = Grid( diff --git a/PathPlanning/TimeBasedPathPlanning/Node.py b/PathPlanning/TimeBasedPathPlanning/Node.py new file mode 100644 index 0000000000..728eebb676 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/Node.py @@ -0,0 +1,99 @@ +from dataclasses import dataclass +from functools import total_ordering +import numpy as np +from typing import Sequence + +@dataclass(order=True) +class Position: + x: int + y: int + + def as_ndarray(self) -> np.ndarray: + return np.array([self.x, self.y]) + + def __add__(self, other): + if isinstance(other, Position): + return Position(self.x + other.x, self.y + other.y) + raise NotImplementedError( + f"Addition not supported for Position and {type(other)}" + ) + + def __sub__(self, other): + if isinstance(other, Position): + return Position(self.x - other.x, self.y - other.y) + raise NotImplementedError( + f"Subtraction not supported for Position and {type(other)}" + ) + + def __hash__(self): + return hash((self.x, self.y)) + +@dataclass() +# Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because +# this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent +# index is just used to track the path found by the algorithm, and has no effect on the quality +# of a node. +@total_ordering +class Node: + position: Position + time: int + heuristic: int + parent_index: int + + """ + This is what is used to drive node expansion. The node with the lowest value is expanded next. + This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic) + """ + def __lt__(self, other: object): + if not isinstance(other, Node): + return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") + return (self.time + self.heuristic) < (other.time + other.heuristic) + + """ + Note: cost and heuristic are not included in eq or hash, since they will always be the same + for a given (position, time) pair. Including either cost or heuristic would be redundant. + """ + def __eq__(self, other: object): + if not isinstance(other, Node): + return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") + return self.position == other.position and self.time == other.time + + def __hash__(self): + return hash((self.position, self.time)) + +class NodePath: + path: Sequence[Node] + positions_at_time: dict[int, Position] + # Number of nodes expanded while finding this path + expanded_node_count: int + + def __init__(self, path: Sequence[Node], expanded_node_count: int): + self.path = path + self.expanded_node_count = expanded_node_count + + self.positions_at_time = {} + for i, node in enumerate(path): + reservation_finish_time = node.time + 1 + if i < len(path) - 1: + reservation_finish_time = path[i + 1].time + + for t in range(node.time, reservation_finish_time): + self.positions_at_time[t] = node.position + + """ + Get the position of the path at a given time + """ + def get_position(self, time: int) -> Position | None: + return self.positions_at_time.get(time) + + """ + Time stamp of the last node in the path + """ + def goal_reached_time(self) -> int: + return self.path[-1].time + + def __repr__(self): + repr_string = "" + for i, node in enumerate(self.path): + repr_string += f"{i}: {node}\n" + return repr_string \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/Plotting.py b/PathPlanning/TimeBasedPathPlanning/Plotting.py new file mode 100644 index 0000000000..7cd1f615d8 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/Plotting.py @@ -0,0 +1,135 @@ +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.backend_bases import KeyEvent +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + Position, +) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import StartAndGoal +from PathPlanning.TimeBasedPathPlanning.Node import NodePath + +''' +Plot a single agent path. +''' +def PlotNodePath(grid: Grid, start: Position, goal: Position, path: NodePath): + fig = plt.figure(figsize=(10, 7)) + ax = fig.add_subplot( + autoscale_on=False, + xlim=(0, grid.grid_size[0] - 1), + ylim=(0, grid.grid_size[1] - 1), + ) + ax.set_aspect("equal") + ax.grid() + ax.set_xticks(np.arange(0, grid.grid_size[0], 1)) + ax.set_yticks(np.arange(0, grid.grid_size[1], 1)) + + (start_and_goal,) = ax.plot([], [], "mD", ms=15, label="Start and Goal") + start_and_goal.set_data([start.x, goal.x], [start.y, goal.y]) + (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") + (path_points,) = ax.plot([], [], "bo", ms=10, label="Path Found") + ax.legend(bbox_to_anchor=(1.05, 1)) + + # for stopping simulation with the esc key. + plt.gcf().canvas.mpl_connect( + "key_release_event", + lambda event: [exit(0) if event.key == "escape" else None] + if isinstance(event, KeyEvent) else None + ) + + for i in range(0, path.goal_reached_time()): + obs_positions = grid.get_obstacle_positions_at_time(i) + obs_points.set_data(obs_positions[0], obs_positions[1]) + path_position = path.get_position(i) + if not path_position: + raise Exception(f"Path position not found for time {i}.") + + path_points.set_data([path_position.x], [path_position.y]) + plt.pause(0.2) + plt.show() + +''' +Plot a series of agent paths. +''' +def PlotNodePaths(grid: Grid, start_and_goals: list[StartAndGoal], paths: list[NodePath]): + fig = plt.figure(figsize=(10, 7)) + + ax = fig.add_subplot( + autoscale_on=False, + xlim=(0, grid.grid_size[0] - 1), + ylim=(0, grid.grid_size[1] - 1), + ) + ax.set_aspect("equal") + ax.grid() + ax.set_xticks(np.arange(0, grid.grid_size[0], 1)) + ax.set_yticks(np.arange(0, grid.grid_size[1], 1)) + + # Plot start and goal positions for each agent + colors = [] # generated randomly in loop + markers = ['D', 's', '^', 'o', 'p'] # Different markers for visual distinction + + # Create plots for start and goal positions + start_and_goal_plots = [] + for i, path in enumerate(paths): + marker_idx = i % len(markers) + agent_id = start_and_goals[i].index + start = start_and_goals[i].start + goal = start_and_goals[i].goal + + color = np.random.rand(3,) + colors.append(color) + sg_plot, = ax.plot([], [], markers[marker_idx], c=color, ms=15, + label=f"Agent {agent_id} Start/Goal") + sg_plot.set_data([start.x, goal.x], [start.y, goal.y]) + start_and_goal_plots.append(sg_plot) + + # Plot for obstacles + (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") + + # Create plots for each agent's path + path_plots = [] + for i, path in enumerate(paths): + agent_id = start_and_goals[i].index + path_plot, = ax.plot([], [], "o", c=colors[i], ms=10, + label=f"Agent {agent_id} Path") + path_plots.append(path_plot) + + ax.legend(bbox_to_anchor=(1.05, 1)) + + # For stopping simulation with the esc key + plt.gcf().canvas.mpl_connect( + "key_release_event", + lambda event: [exit(0) if event.key == "escape" else None] + if isinstance(event, KeyEvent) else None + ) + + # Find the maximum time across all paths + max_time = max(path.goal_reached_time() for path in paths) + + # Animation loop + for i in range(0, max_time + 1): + # Update obstacle positions + obs_positions = grid.get_obstacle_positions_at_time(i) + obs_points.set_data(obs_positions[0], obs_positions[1]) + + # Update each agent's position + for (j, path) in enumerate(paths): + path_positions = [] + if i <= path.goal_reached_time(): + res = path.get_position(i) + if not res: + print(path) + print(i) + path_position = path.get_position(i) + if not path_position: + raise Exception(f"Path position not found for time {i}.") + + # Verify position is valid + assert not path_position in obs_positions + assert not path_position in path_positions + path_positions.append(path_position) + + path_plots[j].set_data([path_position.x], [path_position.y]) + + plt.pause(0.2) + + plt.show() \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py b/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py new file mode 100644 index 0000000000..2573965cf6 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py @@ -0,0 +1,95 @@ +""" +Priority Based Planner for multi agent path planning. +The planner generates an order to plan in, and generates plans for the robots in that order. Each planned +path is reserved in the grid, and all future plans must avoid that path. + +Algorithm outlined in section III of this paper: https://pure.tudelft.nl/ws/portalfiles/portal/67074672/07138650.pdf +""" + +import numpy as np +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + Interval, + ObstacleArrangement, + Position, +) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import MultiAgentPlanner, StartAndGoal +from PathPlanning.TimeBasedPathPlanning.Node import NodePath +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner +from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePaths +import time + +class PriorityBasedPlanner(MultiAgentPlanner): + + @staticmethod + def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> tuple[list[StartAndGoal], list[NodePath]]: + """ + Generate a path from the start to the goal for each agent in the `start_and_goals` list. + Returns the re-ordered StartAndGoal combinations, and a list of path plans. The order of the plans + corresponds to the order of the `start_and_goals` list. + """ + print(f"Using single-agent planner: {single_agent_planner_class}") + + # Reserve initial positions + for start_and_goal in start_and_goals: + grid.reserve_position(start_and_goal.start, start_and_goal.index, Interval(0, 10)) + + # Plan in descending order of distance from start to goal + start_and_goals = sorted(start_and_goals, + key=lambda item: item.distance_start_to_goal(), + reverse=True) + + paths = [] + for start_and_goal in start_and_goals: + if verbose: + print(f"\nPlanning for agent: {start_and_goal}" ) + + grid.clear_initial_reservation(start_and_goal.start, start_and_goal.index) + path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, verbose) + + if path is None: + print(f"Failed to find path for {start_and_goal}") + return [] + + agent_index = start_and_goal.index + grid.reserve_path(path, agent_index) + paths.append(path) + + return (start_and_goals, paths) + +verbose = False +show_animation = True +def main(): + grid_side_length = 21 + + start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 16)] + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + # obstacle_arrangement=ObstacleArrangement.NARROW_CORRIDOR, + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + # obstacle_arrangement=ObstacleArrangement.RANDOM, + ) + + start_time = time.time() + start_and_goals, paths = PriorityBasedPlanner.plan(grid, start_and_goals, SafeIntervalPathPlanner, verbose) + + runtime = time.time() - start_time + print(f"\nPlanning took: {runtime:.5f} seconds") + + if verbose: + print(f"Paths:") + for path in paths: + print(f"{path}\n") + + if not show_animation: + return + + PlotNodePaths(grid, start_and_goals, paths) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/SafeInterval.py b/PathPlanning/TimeBasedPathPlanning/SafeInterval.py index 7fea10c67f..446847ac6d 100644 --- a/PathPlanning/TimeBasedPathPlanning/SafeInterval.py +++ b/PathPlanning/TimeBasedPathPlanning/SafeInterval.py @@ -9,7 +9,6 @@ """ import numpy as np -import matplotlib.pyplot as plt from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( Grid, Interval, @@ -17,8 +16,11 @@ Position, empty_2d_array_of_lists, ) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +from PathPlanning.TimeBasedPathPlanning.Node import Node, NodePath +from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePath + import heapq -import random from dataclasses import dataclass from functools import total_ordering import time @@ -29,116 +31,50 @@ # index and interval member variables are just used to track the path found by the algorithm, # and has no effect on the quality of a node. @total_ordering -class Node: - position: Position - time: int - heuristic: int - parent_index: int +class SIPPNode(Node): interval: Interval - """ - This is what is used to drive node expansion. The node with the lowest value is expanded next. - This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic) - """ - def __lt__(self, other: object): - if not isinstance(other, Node): - return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") - return (self.time + self.heuristic) < (other.time + other.heuristic) - - """ - Equality only cares about position and time. Heuristic and interval will always be the same for a given - (position, time) pairing, so they are not considered in equality. - """ - def __eq__(self, other: object): - if not isinstance(other, Node): - return NotImplemented - return self.position == other.position and self.time == other.time - @dataclass class EntryTimeAndInterval: entry_time: int interval: Interval -class NodePath: - path: list[Node] - positions_at_time: dict[int, Position] = {} - - def __init__(self, path: list[Node]): - self.path = path - for (i, node) in enumerate(path): - if i > 0: - # account for waiting in interval at previous node - prev_node = path[i-1] - for t in range(prev_node.time, node.time): - self.positions_at_time[t] = prev_node.position - - self.positions_at_time[node.time] = node.position - - """ - Get the position of the path at a given time - """ - def get_position(self, time: int) -> Position | None: - return self.positions_at_time.get(time) - - """ - Time stamp of the last node in the path - """ - def goal_reached_time(self) -> int: - return self.path[-1].time - - def __repr__(self): - repr_string = "" - for i, node in enumerate(self.path): - repr_string += f"{i}: {node}\n" - return repr_string - - -class SafeIntervalPathPlanner: - grid: Grid - start: Position - goal: Position - - def __init__(self, grid: Grid, start: Position, goal: Position): - self.grid = grid - self.start = start - self.goal = goal - - # Seed randomness for reproducibility - RANDOM_SEED = 50 - random.seed(RANDOM_SEED) - np.random.seed(RANDOM_SEED) +class SafeIntervalPathPlanner(SingleAgentPlanner): """ Generate a plan given the loaded problem statement. Raises an exception if it fails to find a path. Arguments: verbose (bool): set to True to print debug information """ - def plan(self, verbose: bool = False) -> NodePath: + @staticmethod + def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: - safe_intervals = self.grid.get_safe_intervals() + safe_intervals = grid.get_safe_intervals() - open_set: list[Node] = [] - first_node_interval = safe_intervals[self.start.x, self.start.y][0] + open_set: list[SIPPNode] = [] + first_node_interval = safe_intervals[start.x, start.y][0] heapq.heappush( - open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1, first_node_interval) + open_set, SIPPNode(start, 0, SafeIntervalPathPlanner.calculate_heuristic(start, goal), -1, first_node_interval) ) - expanded_list: list[Node] = [] - visited_intervals = empty_2d_array_of_lists(self.grid.grid_size[0], self.grid.grid_size[1]) + expanded_list: list[SIPPNode] = [] + visited_intervals = empty_2d_array_of_lists(grid.grid_size[0], grid.grid_size[1]) while open_set: - expanded_node: Node = heapq.heappop(open_set) + expanded_node: SIPPNode = heapq.heappop(open_set) if verbose: print("Expanded node:", expanded_node) - if expanded_node.time + 1 >= self.grid.time_limit: + if expanded_node.time + 1 >= grid.time_limit: if verbose: print(f"\tSkipping node that is past time limit: {expanded_node}") continue - if expanded_node.position == self.goal: - print(f"Found path to goal after {len(expanded_list)} expansions") + if expanded_node.position == goal: + if verbose: + print(f"Found path to goal after {len(expanded_list)} expansions") + path = [] - path_walker: Node = expanded_node + path_walker: SIPPNode = expanded_node while True: path.append(path_walker) if path_walker.parent_index == -1: @@ -147,14 +83,14 @@ def plan(self, verbose: bool = False) -> NodePath: # reverse path so it goes start -> goal path.reverse() - return NodePath(path) + return NodePath(path, len(expanded_list)) expanded_idx = len(expanded_list) expanded_list.append(expanded_node) entry_time_and_node = EntryTimeAndInterval(expanded_node.time, expanded_node.interval) add_entry_to_visited_intervals_array(entry_time_and_node, visited_intervals, expanded_node) - for child in self.generate_successors(expanded_node, expanded_idx, safe_intervals, visited_intervals): + for child in SafeIntervalPathPlanner.generate_successors(grid, goal, expanded_node, expanded_idx, safe_intervals, visited_intervals): heapq.heappush(open_set, child) raise Exception("No path found") @@ -162,9 +98,10 @@ def plan(self, verbose: bool = False) -> NodePath: """ Generate list of possible successors of the provided `parent_node` that are worth expanding """ + @staticmethod def generate_successors( - self, parent_node: Node, parent_node_idx: int, intervals: np.ndarray, visited_intervals: np.ndarray - ) -> list[Node]: + grid: Grid, goal: Position, parent_node: SIPPNode, parent_node_idx: int, intervals: np.ndarray, visited_intervals: np.ndarray + ) -> list[SIPPNode]: new_nodes = [] diffs = [ Position(0, 0), @@ -175,7 +112,7 @@ def generate_successors( ] for diff in diffs: new_pos = parent_node.position + diff - if not self.grid.inside_grid_bounds(new_pos): + if not grid.inside_grid_bounds(new_pos): continue current_interval = parent_node.interval @@ -203,12 +140,12 @@ def generate_successors( # We know there is a node worth expanding. Generate successor at the earliest possible time the # new interval can be entered for possible_t in range(max(parent_node.time + 1, interval.start_time), min(current_interval.end_time, interval.end_time)): - if self.grid.valid_position(new_pos, possible_t): - new_nodes.append(Node( + if grid.valid_position(new_pos, possible_t): + new_nodes.append(SIPPNode( new_pos, # entry is max of interval start and parent node time + 1 (get there as soon as possible) max(interval.start_time, parent_node.time + 1), - self.calculate_heuristic(new_pos), + SafeIntervalPathPlanner.calculate_heuristic(new_pos, goal), parent_node_idx, interval, )) @@ -220,8 +157,9 @@ def generate_successors( """ Calculate the heuristic for a given position - Manhattan distance to the goal """ - def calculate_heuristic(self, position) -> int: - diff = self.goal - position + @staticmethod + def calculate_heuristic(position: Position, goal: Position) -> int: + diff = goal - position return abs(diff.x) + abs(diff.y) @@ -229,7 +167,7 @@ def calculate_heuristic(self, position) -> int: Adds a new entry to the visited intervals array. If the entry is already present, the entry time is updated if the new entry time is better. Otherwise, the entry is added to `visited_intervals` at the position of `expanded_node`. """ -def add_entry_to_visited_intervals_array(entry_time_and_interval: EntryTimeAndInterval, visited_intervals: np.ndarray, expanded_node: Node): +def add_entry_to_visited_intervals_array(entry_time_and_interval: EntryTimeAndInterval, visited_intervals: np.ndarray, expanded_node: SIPPNode): # if entry is present, update entry time if better for existing_entry_and_interval in visited_intervals[expanded_node.position.x, expanded_node.position.y]: if existing_entry_and_interval.interval == entry_time_and_interval.interval: @@ -257,8 +195,7 @@ def main(): # obstacle_arrangement=ObstacleArrangement.RANDOM, ) - planner = SafeIntervalPathPlanner(grid, start, goal) - path = planner.plan(verbose) + path = SafeIntervalPathPlanner.plan(grid, start, goal, verbose) runtime = time.time() - start_time print(f"Planning took: {runtime:.5f} seconds") @@ -268,35 +205,7 @@ def main(): if not show_animation: return - fig = plt.figure(figsize=(10, 7)) - ax = fig.add_subplot( - autoscale_on=False, - xlim=(0, grid.grid_size[0] - 1), - ylim=(0, grid.grid_size[1] - 1), - ) - ax.set_aspect("equal") - ax.grid() - ax.set_xticks(np.arange(0, grid_side_length, 1)) - ax.set_yticks(np.arange(0, grid_side_length, 1)) - - (start_and_goal,) = ax.plot([], [], "mD", ms=15, label="Start and Goal") - start_and_goal.set_data([start.x, goal.x], [start.y, goal.y]) - (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") - (path_points,) = ax.plot([], [], "bo", ms=10, label="Path Found") - ax.legend(bbox_to_anchor=(1.05, 1)) - - # for stopping simulation with the esc key. - plt.gcf().canvas.mpl_connect( - "key_release_event", lambda event: [exit(0) if event.key == "escape" else None] - ) - - for i in range(0, path.goal_reached_time() + 1): - obs_positions = grid.get_obstacle_positions_at_time(i) - obs_points.set_data(obs_positions[0], obs_positions[1]) - path_position = path.get_position(i) - path_points.set_data([path_position.x], [path_position.y]) - plt.pause(0.2) - plt.show() + PlotNodePath(grid, start, goal, path) if __name__ == "__main__": diff --git a/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py index c4e2802d37..b85569f5dc 100644 --- a/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py +++ b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py @@ -9,101 +9,25 @@ """ import numpy as np -import matplotlib.pyplot as plt from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( Grid, ObstacleArrangement, Position, ) +from PathPlanning.TimeBasedPathPlanning.Node import Node, NodePath import heapq from collections.abc import Generator -import random -from dataclasses import dataclass -from functools import total_ordering import time +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePath -# Seed randomness for reproducibility -RANDOM_SEED = 50 -random.seed(RANDOM_SEED) -np.random.seed(RANDOM_SEED) - -@dataclass() -# Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because -# this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent -# index is just used to track the path found by the algorithm, and has no effect on the quality -# of a node. -@total_ordering -class Node: - position: Position - time: int - heuristic: int - parent_index: int +class SpaceTimeAStar(SingleAgentPlanner): - """ - This is what is used to drive node expansion. The node with the lowest value is expanded next. - This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic) - """ - def __lt__(self, other: object): - if not isinstance(other, Node): - return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") - return (self.time + self.heuristic) < (other.time + other.heuristic) - - """ - Note: cost and heuristic are not included in eq or hash, since they will always be the same - for a given (position, time) pair. Including either cost or heuristic would be redundant. - """ - def __eq__(self, other: object): - if not isinstance(other, Node): - return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") - return self.position == other.position and self.time == other.time - - def __hash__(self): - return hash((self.position, self.time)) - -class NodePath: - path: list[Node] - positions_at_time: dict[int, Position] = {} - - def __init__(self, path: list[Node]): - self.path = path - for node in path: - self.positions_at_time[node.time] = node.position - - """ - Get the position of the path at a given time - """ - def get_position(self, time: int) -> Position | None: - return self.positions_at_time.get(time) - - """ - Time stamp of the last node in the path - """ - def goal_reached_time(self) -> int: - return self.path[-1].time - - def __repr__(self): - repr_string = "" - for i, node in enumerate(self.path): - repr_string += f"{i}: {node}\n" - return repr_string - - -class SpaceTimeAStar: - grid: Grid - start: Position - goal: Position - # Used to evaluate solutions - expanded_node_count: int = -1 - - def __init__(self, grid: Grid, start: Position, goal: Position): - self.grid = grid - self.start = start - self.goal = goal - - def plan(self, verbose: bool = False) -> NodePath: + @staticmethod + def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: open_set: list[Node] = [] heapq.heappush( - open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1) + open_set, Node(start, 0, SpaceTimeAStar.calculate_heuristic(start, goal), -1) ) expanded_list: list[Node] = [] @@ -113,13 +37,15 @@ def plan(self, verbose: bool = False) -> NodePath: if verbose: print("Expanded node:", expanded_node) - if expanded_node.time + 1 >= self.grid.time_limit: + if expanded_node.time + 1 >= grid.time_limit: if verbose: print(f"\tSkipping node that is past time limit: {expanded_node}") continue - if expanded_node.position == self.goal: - print(f"Found path to goal after {len(expanded_list)} expansions") + if expanded_node.position == goal: + if verbose: + print(f"Found path to goal after {len(expanded_list)} expansions") + path = [] path_walker: Node = expanded_node while True: @@ -130,14 +56,13 @@ def plan(self, verbose: bool = False) -> NodePath: # reverse path so it goes start -> goal path.reverse() - self.expanded_node_count = len(expanded_set) - return NodePath(path) + return NodePath(path, len(expanded_set)) expanded_idx = len(expanded_list) expanded_list.append(expanded_node) expanded_set.add(expanded_node) - for child in self.generate_successors(expanded_node, expanded_idx, verbose, expanded_set): + for child in SpaceTimeAStar.generate_successors(grid, goal, expanded_node, expanded_idx, verbose, expanded_set): heapq.heappush(open_set, child) raise Exception("No path found") @@ -145,8 +70,9 @@ def plan(self, verbose: bool = False) -> NodePath: """ Generate possible successors of the provided `parent_node` """ + @staticmethod def generate_successors( - self, parent_node: Node, parent_node_idx: int, verbose: bool, expanded_set: set[Node] + grid: Grid, goal: Position, parent_node: Node, parent_node_idx: int, verbose: bool, expanded_set: set[Node] ) -> Generator[Node, None, None]: diffs = [ Position(0, 0), @@ -160,20 +86,22 @@ def generate_successors( new_node = Node( new_pos, parent_node.time + 1, - self.calculate_heuristic(new_pos), + SpaceTimeAStar.calculate_heuristic(new_pos, goal), parent_node_idx, ) if new_node in expanded_set: continue - if self.grid.valid_position(new_pos, parent_node.time + 1): + # Check if the new node is valid for the next 2 time steps - one step to enter, and another to leave + if all([grid.valid_position(new_pos, parent_node.time + dt) for dt in [1, 2]]): if verbose: print("\tNew successor node: ", new_node) yield new_node - def calculate_heuristic(self, position) -> int: - diff = self.goal - position + @staticmethod + def calculate_heuristic(position: Position, goal: Position) -> int: + diff = goal - position return abs(diff.x) + abs(diff.y) @@ -194,8 +122,7 @@ def main(): obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, ) - planner = SpaceTimeAStar(grid, start, goal) - path = planner.plan(verbose) + path = SpaceTimeAStar.plan(grid, start, goal, verbose) runtime = time.time() - start_time print(f"Planning took: {runtime:.5f} seconds") @@ -206,36 +133,7 @@ def main(): if not show_animation: return - fig = plt.figure(figsize=(10, 7)) - ax = fig.add_subplot( - autoscale_on=False, - xlim=(0, grid.grid_size[0] - 1), - ylim=(0, grid.grid_size[1] - 1), - ) - ax.set_aspect("equal") - ax.grid() - ax.set_xticks(np.arange(0, grid_side_length, 1)) - ax.set_yticks(np.arange(0, grid_side_length, 1)) - - (start_and_goal,) = ax.plot([], [], "mD", ms=15, label="Start and Goal") - start_and_goal.set_data([start.x, goal.x], [start.y, goal.y]) - (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") - (path_points,) = ax.plot([], [], "bo", ms=10, label="Path Found") - ax.legend(bbox_to_anchor=(1.05, 1)) - - # for stopping simulation with the esc key. - plt.gcf().canvas.mpl_connect( - "key_release_event", lambda event: [exit(0) if event.key == "escape" else None] - ) - - for i in range(0, path.goal_reached_time()): - obs_positions = grid.get_obstacle_positions_at_time(i) - obs_points.set_data(obs_positions[0], obs_positions[1]) - path_position = path.get_position(i) - path_points.set_data([path_position.x], [path_position.y]) - plt.pause(0.2) - plt.show() - + PlotNodePath(grid, start, goal, path) if __name__ == "__main__": main() diff --git a/PathTracking/model_predictive_speed_and_steer_control/model_predictive_speed_and_steer_control.py b/PathTracking/model_predictive_speed_and_steer_control/model_predictive_speed_and_steer_control.py index eb2d7b6a73..205f37e62f 100644 --- a/PathTracking/model_predictive_speed_and_steer_control/model_predictive_speed_and_steer_control.py +++ b/PathTracking/model_predictive_speed_and_steer_control/model_predictive_speed_and_steer_control.py @@ -318,7 +318,7 @@ def calc_ref_trajectory(state, cx, cy, cyaw, ck, sp, dl, pind): travel = 0.0 - for i in range(T + 1): + for i in range(1, T + 1): travel += abs(state.v) * DT dind = int(round(travel / dl)) diff --git a/PathTracking/pure_pursuit/pure_pursuit.py b/PathTracking/pure_pursuit/pure_pursuit.py index ff995033a9..48453927ab 100644 --- a/PathTracking/pure_pursuit/pure_pursuit.py +++ b/PathTracking/pure_pursuit/pure_pursuit.py @@ -10,6 +10,11 @@ import math import matplotlib.pyplot as plt +import sys +import pathlib +sys.path.append(str(pathlib.Path(__file__).parent.parent.parent)) +from utils.angle import angle_mod + # Parameters k = 0.1 # look forward gain Lfc = 2.0 # [m] look-ahead distance @@ -17,26 +22,37 @@ dt = 0.1 # [s] time tick WB = 2.9 # [m] wheel base of vehicle -show_animation = True +# Vehicle parameters +LENGTH = WB + 1.0 # Vehicle length +WIDTH = 2.0 # Vehicle width +WHEEL_LEN = 0.6 # Wheel length +WHEEL_WIDTH = 0.2 # Wheel width +MAX_STEER = math.pi / 4 # Maximum steering angle [rad] -class State: - def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0): +show_animation = True +pause_simulation = False # Flag for pause simulation +is_reverse_mode = False # Flag for reverse driving mode + +class State: + def __init__(self, x=0.0, y=0.0, yaw=0.0, v=0.0, is_reverse=False): self.x = x self.y = y self.yaw = yaw self.v = v - self.rear_x = self.x - ((WB / 2) * math.cos(self.yaw)) - self.rear_y = self.y - ((WB / 2) * math.sin(self.yaw)) + self.direction = -1 if is_reverse else 1 # Direction based on reverse flag + self.rear_x = self.x - self.direction * ((WB / 2) * math.cos(self.yaw)) + self.rear_y = self.y - self.direction * ((WB / 2) * math.sin(self.yaw)) def update(self, a, delta): self.x += self.v * math.cos(self.yaw) * dt self.y += self.v * math.sin(self.yaw) * dt - self.yaw += self.v / WB * math.tan(delta) * dt + self.yaw += self.direction * self.v / WB * math.tan(delta) * dt + self.yaw = angle_mod(self.yaw) self.v += a * dt - self.rear_x = self.x - ((WB / 2) * math.cos(self.yaw)) - self.rear_y = self.y - ((WB / 2) * math.sin(self.yaw)) + self.rear_x = self.x - self.direction * ((WB / 2) * math.cos(self.yaw)) + self.rear_y = self.y - self.direction * ((WB / 2) * math.sin(self.yaw)) def calc_distance(self, point_x, point_y): dx = self.rear_x - point_x @@ -51,6 +67,7 @@ def __init__(self): self.y = [] self.yaw = [] self.v = [] + self.direction = [] self.t = [] def append(self, t, state): @@ -58,6 +75,7 @@ def append(self, t, state): self.y.append(state.y) self.yaw.append(state.yaw) self.v.append(state.v) + self.direction.append(state.direction) self.t.append(t) @@ -117,14 +135,18 @@ def pure_pursuit_steer_control(state, trajectory, pind): if ind < len(trajectory.cx): tx = trajectory.cx[ind] ty = trajectory.cy[ind] - else: # toward goal + else: tx = trajectory.cx[-1] ty = trajectory.cy[-1] ind = len(trajectory.cx) - 1 alpha = math.atan2(ty - state.rear_y, tx - state.rear_x) - state.yaw - delta = math.atan2(2.0 * WB * math.sin(alpha) / Lf, 1.0) + # Reverse steering angle when reversing + delta = state.direction * math.atan2(2.0 * WB * math.sin(alpha) / Lf, 1.0) + + # Limit steering angle to max value + delta = np.clip(delta, -MAX_STEER, MAX_STEER) return delta, ind @@ -142,10 +164,111 @@ def plot_arrow(x, y, yaw, length=1.0, width=0.5, fc="r", ec="k"): fc=fc, ec=ec, head_width=width, head_length=width) plt.plot(x, y) +def plot_vehicle(x, y, yaw, steer=0.0, color='blue', is_reverse=False): + """ + Plot vehicle model with four wheels + Args: + x, y: Vehicle center position + yaw: Vehicle heading angle + steer: Steering angle + color: Vehicle color + is_reverse: Flag for reverse mode + """ + # Adjust heading angle in reverse mode + if is_reverse: + yaw = angle_mod(yaw + math.pi) # Rotate heading by 180 degrees + steer = -steer # Reverse steering direction + + def plot_wheel(x, y, yaw, steer=0.0, color=color): + """Plot single wheel""" + wheel = np.array([ + [-WHEEL_LEN/2, WHEEL_WIDTH/2], + [WHEEL_LEN/2, WHEEL_WIDTH/2], + [WHEEL_LEN/2, -WHEEL_WIDTH/2], + [-WHEEL_LEN/2, -WHEEL_WIDTH/2], + [-WHEEL_LEN/2, WHEEL_WIDTH/2] + ]) + + # Rotate wheel if steering + if steer != 0: + c, s = np.cos(steer), np.sin(steer) + rot_steer = np.array([[c, -s], [s, c]]) + wheel = wheel @ rot_steer.T + + # Apply vehicle heading rotation + c, s = np.cos(yaw), np.sin(yaw) + rot_yaw = np.array([[c, -s], [s, c]]) + wheel = wheel @ rot_yaw.T + + # Translate to position + wheel[:, 0] += x + wheel[:, 1] += y + + # Plot wheel with color + plt.plot(wheel[:, 0], wheel[:, 1], color=color) + + # Calculate vehicle body corners + corners = np.array([ + [-LENGTH/2, WIDTH/2], + [LENGTH/2, WIDTH/2], + [LENGTH/2, -WIDTH/2], + [-LENGTH/2, -WIDTH/2], + [-LENGTH/2, WIDTH/2] + ]) + + # Rotation matrix + c, s = np.cos(yaw), np.sin(yaw) + Rot = np.array([[c, -s], [s, c]]) + + # Rotate and translate vehicle body + rotated = corners @ Rot.T + rotated[:, 0] += x + rotated[:, 1] += y + + # Plot vehicle body + plt.plot(rotated[:, 0], rotated[:, 1], color=color) + + # Plot wheels (darker color for front wheels) + front_color = 'darkblue' + rear_color = color + + # Plot four wheels + # Front left + plot_wheel(x + LENGTH/4 * c - WIDTH/2 * s, + y + LENGTH/4 * s + WIDTH/2 * c, + yaw, steer, front_color) + # Front right + plot_wheel(x + LENGTH/4 * c + WIDTH/2 * s, + y + LENGTH/4 * s - WIDTH/2 * c, + yaw, steer, front_color) + # Rear left + plot_wheel(x - LENGTH/4 * c - WIDTH/2 * s, + y - LENGTH/4 * s + WIDTH/2 * c, + yaw, color=rear_color) + # Rear right + plot_wheel(x - LENGTH/4 * c + WIDTH/2 * s, + y - LENGTH/4 * s - WIDTH/2 * c, + yaw, color=rear_color) + + # Add direction arrow + arrow_length = LENGTH/3 + plt.arrow(x, y, + -arrow_length * math.cos(yaw) if is_reverse else arrow_length * math.cos(yaw), + -arrow_length * math.sin(yaw) if is_reverse else arrow_length * math.sin(yaw), + head_width=WIDTH/4, head_length=WIDTH/4, + fc='r', ec='r', alpha=0.5) + +# Keyboard event handler +def on_key(event): + global pause_simulation + if event.key == ' ': # Space key + pause_simulation = not pause_simulation + elif event.key == 'escape': + exit(0) def main(): # target course - cx = np.arange(0, 50, 0.5) + cx = -1 * np.arange(0, 50, 0.5) if is_reverse_mode else np.arange(0, 50, 0.5) cy = [math.sin(ix / 5.0) * ix / 2.0 for ix in cx] target_speed = 10.0 / 3.6 # [m/s] @@ -153,7 +276,7 @@ def main(): T = 100.0 # max simulation time # initial state - state = State(x=-0.0, y=-3.0, yaw=0.0, v=0.0) + state = State(x=-0.0, y=-3.0, yaw=math.pi if is_reverse_mode else 0.0, v=0.0, is_reverse=is_reverse_mode) lastIndex = len(cx) - 1 time = 0.0 @@ -173,22 +296,33 @@ def main(): time += dt states.append(time, state) - if show_animation: # pragma: no cover plt.cla() # for stopping simulation with the esc key. - plt.gcf().canvas.mpl_connect( - 'key_release_event', - lambda event: [exit(0) if event.key == 'escape' else None]) - plot_arrow(state.x, state.y, state.yaw) + plt.gcf().canvas.mpl_connect('key_release_event', on_key) + # Pass is_reverse parameter + plot_vehicle(state.x, state.y, state.yaw, di, is_reverse=is_reverse_mode) plt.plot(cx, cy, "-r", label="course") plt.plot(states.x, states.y, "-b", label="trajectory") plt.plot(cx[target_ind], cy[target_ind], "xg", label="target") plt.axis("equal") plt.grid(True) plt.title("Speed[km/h]:" + str(state.v * 3.6)[:4]) + plt.legend() # Add legend display + + # Add pause state display + if pause_simulation: + plt.text(0.02, 0.95, 'PAUSED', transform=plt.gca().transAxes, + bbox=dict(facecolor='red', alpha=0.5)) + plt.pause(0.001) + # Handle pause state + while pause_simulation: + plt.pause(0.1) # Reduce CPU usage + if not plt.get_fignums(): # Check if window is closed + exit(0) + # Test assert lastIndex >= target_ind, "Cannot goal" diff --git a/README.md b/README.md index 85be274adb..65445fa4ce 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ Python codes and [textbook](https://atsushisakai.github.io/PythonRobotics/index. * [D* Lite algorithm](#d-lite-algorithm) * [Potential Field algorithm](#potential-field-algorithm) * [Grid based coverage path planning](#grid-based-coverage-path-planning) + * [Particle Swarm Optimization (PSO)](#particle-swarm-optimization-pso) * [State Lattice Planning](#state-lattice-planning) * [Biased polar sampling](#biased-polar-sampling) * [Lane sampling](#lane-sampling) @@ -356,6 +357,24 @@ This is a 2D grid based coverage path planning simulation. ![PotentialField](https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/PathPlanning/GridBasedSweepCPP/animation.gif) +### Particle Swarm Optimization (PSO) + +This is a 2D path planning simulation using the Particle Swarm Optimization algorithm. + +![PSO](https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/PathPlanning/ParticleSwarmOptimization/animation.gif) + +PSO is a metaheuristic optimization algorithm inspired by bird flocking behavior. In path planning, particles explore the search space to find collision-free paths while avoiding obstacles. + +The animation shows particles (blue dots) converging towards the optimal path (yellow line) from start (green area) to goal (red star). + +References + +- [Particle swarm optimization - Wikipedia](https://en.wikipedia.org/wiki/Particle_swarm_optimization) + +- [Kennedy, J.; Eberhart, R. (1995). "Particle Swarm Optimization"](https://ieeexplore.ieee.org/document/488968) + + + ## State Lattice Planning This script is a path planning code with state lattice planning. @@ -366,7 +385,7 @@ Reference - [Optimal rough terrain trajectory generation for wheeled mobile robots](https://journals.sagepub.com/doi/pdf/10.1177/0278364906075328) -- [State Space Sampling of Feasible Motions for High-Performance Mobile Robot Navigation in Complex Environments](https://www.frc.ri.cmu.edu/~alonzo/pubs/papers/JFR_08_SS_Sampling.pdf) +- [State Space Sampling of Feasible Motions for High-Performance Mobile Robot Navigation in Complex Environments](https://www.cs.cmu.edu/~alonzo/pubs/papers/JFR_08_SS_Sampling.pdf) ### Biased polar sampling diff --git a/docs/modules/0_getting_started/2_how_to_run_sample_codes_main.rst b/docs/modules/0_getting_started/2_how_to_run_sample_codes_main.rst index b92fc9bde0..33440bb137 100644 --- a/docs/modules/0_getting_started/2_how_to_run_sample_codes_main.rst +++ b/docs/modules/0_getting_started/2_how_to_run_sample_codes_main.rst @@ -46,7 +46,7 @@ using pip : 5. Execute python script in each directory. -For example, to run the sample code of `Extented Kalman Filter` in the +For example, to run the sample code of `Extended Kalman Filter` in the `localization` directory, execute the following command: .. code-block:: diff --git a/docs/modules/0_getting_started/3_how_to_contribute_main.rst b/docs/modules/0_getting_started/3_how_to_contribute_main.rst index 0325aaacae..f975127107 100644 --- a/docs/modules/0_getting_started/3_how_to_contribute_main.rst +++ b/docs/modules/0_getting_started/3_how_to_contribute_main.rst @@ -132,7 +132,7 @@ At first, please fix all CI errors before code review. You can check your PR doc from the CI panel. After the "ci/circleci: build_doc" CI is succeeded, -you can access you PR doc with clicking the [Details] of the "ci/circleci: build_doc artifact" CI. +you can access your PR doc with clicking the [Details] of the "ci/circleci: build_doc artifact" CI. .. image:: /_static/img/doc_ci.png @@ -176,7 +176,7 @@ Adding the missed documentations for existing examples is also great contributio If you check the `Python Robotics Docs`_, you can notice that some of the examples only have a simulation gif or short overview descriptions or just TBD., but no detailed algorithm or mathematical description. -These documents needs to be improved. +These documents need to be improved. This doc `how to write doc`_ can be helpful to write documents. diff --git a/docs/modules/10_inverted_pendulum/inverted_pendulum_main.rst b/docs/modules/10_inverted_pendulum/inverted_pendulum_main.rst index 58dc0f2e57..47ae034195 100644 --- a/docs/modules/10_inverted_pendulum/inverted_pendulum_main.rst +++ b/docs/modules/10_inverted_pendulum/inverted_pendulum_main.rst @@ -4,7 +4,7 @@ Inverted Pendulum ------------------ An inverted pendulum on a cart consists of a mass :math:`m` at the top of a pole of length :math:`l` pivoted on a -horizontally moving base as shown in the adjacent. +horizontally moving base as shown in the adjacent figure. The objective of the control system is to balance the inverted pendulum by applying a force to the cart that the pendulum is attached to. @@ -97,7 +97,7 @@ Code Link MPC control ~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The MPC controller minimize this cost function defined as: +The MPC controller minimizes this cost function defined as: .. math:: J = x^T Q x + u^T R u diff --git a/docs/modules/12_appendix/Kalmanfilter_basics_main.rst b/docs/modules/12_appendix/Kalmanfilter_basics_main.rst index a1d99d47ef..1ed5790d52 100644 --- a/docs/modules/12_appendix/Kalmanfilter_basics_main.rst +++ b/docs/modules/12_appendix/Kalmanfilter_basics_main.rst @@ -55,7 +55,7 @@ Variance, Covariance and Correlation Variance ^^^^^^^^ -Variance is the spread of the data. The mean does’nt tell much **about** +Variance is the spread of the data. The mean doesn’t tell much **about** the data. Therefore the variance tells us about the **story** about the data meaning the spread of the data. diff --git a/docs/modules/12_appendix/external_sensors_main.rst b/docs/modules/12_appendix/external_sensors_main.rst index 3597418150..b7caaf2c3a 100644 --- a/docs/modules/12_appendix/external_sensors_main.rst +++ b/docs/modules/12_appendix/external_sensors_main.rst @@ -10,9 +10,7 @@ Therefore, we will provide an overview. Introduction ------------ -In recent years, the application of robotic technology has advanced, -particularly in areas such as autonomous vehicles and disaster response robots. -A crucial element in these technologies is external recognition—the robot's ability to understand its surrounding environment, identify safe zones, and detect moving objects using onboard sensors. Achieving effective external recognition involves various techniques, but equally important is the selection of appropriate sensors. Robots, like the sensors they employ, come in many forms, but external recognition sensors can be broadly categorized into three types. Developing an advanced external recognition system requires a thorough understanding of each sensor's principles and characteristics to determine their optimal application. This article summarizes the principles and features of these sensors for personal study purposes. +In recent years, the application of robotic technology has advanced, particularly in areas such as autonomous vehicles and disaster response robots. A crucial element in these technologies is external recognition—the robot's ability to understand its surrounding environment, identify safe zones, and detect moving objects using onboard sensors. Achieving effective external recognition involves various techniques, but equally important is the selection of appropriate sensors. Robots, like the sensors they employ, come in many forms, but external recognition sensors can be broadly categorized into three types. Developing an advanced external recognition system requires a thorough understanding of each sensor's principles and characteristics to determine their optimal application. This article summarizes the principles and features of these sensors for personal study purposes. Laser Sensors ------------- @@ -22,7 +20,7 @@ Laser sensors measure distances by utilizing light, commonly referred to as Ligh Radar Sensors ------------- -TBD +Radar measures distances using radio waves, commonly referred to as Radio Detection and Ranging (RADAR). It operates by transmitting radio signals towards an object and calculating the distance based on the time it takes for the reflected waves to return, using the speed of radio waves as a constant. Monocular Cameras @@ -59,4 +57,9 @@ Ultrasonic sensors are commonly used in indoor robots and some automotive autono References ---------- -TBD +- Wikipedia articles: + + - `Lidar Sensors `_ + - `Radar Sensors `_ + - `Stereo Cameras `_ + - `Ultrasonic Sensors `_ \ No newline at end of file diff --git a/docs/modules/13_mission_planning/behavior_tree/behavior_tree_main.rst b/docs/modules/13_mission_planning/behavior_tree/behavior_tree_main.rst index 22849f7c54..4b0ef82c12 100644 --- a/docs/modules/13_mission_planning/behavior_tree/behavior_tree_main.rst +++ b/docs/modules/13_mission_planning/behavior_tree/behavior_tree_main.rst @@ -2,8 +2,8 @@ Behavior Tree ------------- Behavior Tree is a modular, hierarchical decision model that is widely used in robot control, and game development. -It present some similarities to hierarchical state machines with the key difference that the main building block of a behavior is a task rather than a state. -Behavior Tree have been shown to generalize several other control architectures (https://ieeexplore.ieee.org/document/7790863) +It presents some similarities to hierarchical state machines with the key difference that the main building block of a behavior is a task rather than a state. +Behavior Trees have been shown to generalize several other control architectures (https://ieeexplore.ieee.org/document/7790863) Code Link ~~~~~~~~~~~~~ diff --git a/docs/modules/1_introduction/2_python_for_robotics/python_for_robotics_main.rst b/docs/modules/1_introduction/2_python_for_robotics/python_for_robotics_main.rst index c47c122853..e3f38a55fd 100644 --- a/docs/modules/1_introduction/2_python_for_robotics/python_for_robotics_main.rst +++ b/docs/modules/1_introduction/2_python_for_robotics/python_for_robotics_main.rst @@ -1,7 +1,7 @@ Python for Robotics ---------------------- -A programing language, Python is used for this `PythonRobotics` project +A programming language, Python is used for this `PythonRobotics` project to achieve the purposes of this project described in the :ref:`What is PythonRobotics?`. This section explains the Python itself and features for science computing and robotics. diff --git a/docs/modules/1_introduction/3_technologies_for_robotics/technologies_for_robotics_main.rst b/docs/modules/1_introduction/3_technologies_for_robotics/technologies_for_robotics_main.rst index 0ed51e961b..dd3cd1d86c 100644 --- a/docs/modules/1_introduction/3_technologies_for_robotics/technologies_for_robotics_main.rst +++ b/docs/modules/1_introduction/3_technologies_for_robotics/technologies_for_robotics_main.rst @@ -3,7 +3,7 @@ Technologies for Robotics The field of robotics needs wide areas of technologies such as mechanical engineering, electrical engineering, computer science, and artificial intelligence (AI). -This project, `PythonRobotics`, only focus on computer science and artificial intelligence. +This project, `PythonRobotics`, only focuses on computer science and artificial intelligence. The technologies for robotics are categorized as following 3 categories: diff --git a/docs/modules/2_localization/ensamble_kalman_filter_localization_files/ensamble_kalman_filter_localization_main.rst b/docs/modules/2_localization/ensamble_kalman_filter_localization_files/ensamble_kalman_filter_localization_main.rst index 214e645d10..6274859002 100644 --- a/docs/modules/2_localization/ensamble_kalman_filter_localization_files/ensamble_kalman_filter_localization_main.rst +++ b/docs/modules/2_localization/ensamble_kalman_filter_localization_files/ensamble_kalman_filter_localization_main.rst @@ -1,12 +1,204 @@ -Ensamble Kalman Filter Localization +Ensemble Kalman Filter Localization ----------------------------------- .. figure:: https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/Localization/ensamble_kalman_filter/animation.gif -This is a sensor fusion localization with Ensamble Kalman Filter(EnKF). +This is a sensor fusion localization with Ensemble Kalman Filter(EnKF). + +The blue line is true trajectory, the black line is dead reckoning trajectory, +and the red line is estimated trajectory with EnKF. + +The red ellipse is estimated covariance ellipse with EnKF. + +It is assumed that the robot can measure distance and bearing angle from landmarks (RFID). + +These measurements are used for EnKF localization. Code Link ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autofunction:: Localization.ensemble_kalman_filter.ensemble_kalman_filter.enkf_localization + +Ensemble Kalman Filter Algorithm +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The Ensemble Kalman Filter (EnKF) is a Monte Carlo approach to the Kalman filter that +uses an ensemble of state estimates to represent the probability distribution. Unlike +the traditional Kalman filter that propagates the mean and covariance analytically, +EnKF uses a collection of samples (ensemble members) to estimate the state and its uncertainty. + +The EnKF algorithm consists of two main steps: + +=== Predict === + +For each ensemble member :math:`i = 1, ..., N`: + +1. Add random noise to the control input: + + :math:`\mathbf{u}^i = \mathbf{u} + \epsilon_u`, where :math:`\epsilon_u \sim \mathcal{N}(0, \mathbf{R})` + +2. Predict the state: + + :math:`\mathbf{x}^i_{pred} = f(\mathbf{x}^i_t, \mathbf{u}^i)` + +3. Predict the observation: + + :math:`\mathbf{z}^i_{pred} = h(\mathbf{x}^i_{pred}) + \epsilon_z`, where :math:`\epsilon_z \sim \mathcal{N}(0, \mathbf{Q})` + +=== Update === + +1. Calculate ensemble mean for states: + + :math:`\bar{\mathbf{x}} = \frac{1}{N}\sum_{i=1}^N \mathbf{x}^i_{pred}` + +2. Calculate ensemble mean for observations: + + :math:`\bar{\mathbf{z}} = \frac{1}{N}\sum_{i=1}^N \mathbf{z}^i_{pred}` + +3. Calculate state deviation: + + :math:`\mathbf{X}' = \mathbf{x}^i_{pred} - \bar{\mathbf{x}}` + +4. Calculate observation deviation: + + :math:`\mathbf{Z}' = \mathbf{z}^i_{pred} - \bar{\mathbf{z}}` + +5. Calculate covariance matrices: + + :math:`\mathbf{U} = \frac{1}{N-1}\mathbf{X}'\mathbf{Z}'^T` + + :math:`\mathbf{V} = \frac{1}{N-1}\mathbf{Z}'\mathbf{Z}'^T` + +6. Calculate Kalman gain: + + :math:`\mathbf{K} = \mathbf{U}\mathbf{V}^{-1}` + +7. Update each ensemble member: + + :math:`\mathbf{x}^i_{t+1} = \mathbf{x}^i_{pred} + \mathbf{K}(\mathbf{z}_{obs} - \mathbf{z}^i_{pred})` + +8. Calculate final state estimate: + + :math:`\mathbf{x}_{est} = \frac{1}{N}\sum_{i=1}^N \mathbf{x}^i_{t+1}` + +9. Calculate covariance estimate: + + :math:`\mathbf{P}_{est} = \frac{1}{N}\sum_{i=1}^N (\mathbf{x}^i_{t+1} - \mathbf{x}_{est})(\mathbf{x}^i_{t+1} - \mathbf{x}_{est})^T` + + +Filter Design +~~~~~~~~~~~~~ + +In this simulation, the robot has a state vector with 4 states at time :math:`t`: + +.. math:: \mathbf{x}_t = [x_t, y_t, \phi_t, v_t]^T + +where: + +- :math:`x, y` are 2D position coordinates +- :math:`\phi` is orientation (yaw angle) +- :math:`v` is velocity + +The filter uses an ensemble of :math:`N=20` particles to represent the state distribution. + +Input Vector +^^^^^^^^^^^^ + +The robot has a velocity sensor and a gyro sensor, so the control input vector at each time step is: + +.. math:: \mathbf{u}_t = [v_t, \omega_t]^T + +where: + +- :math:`v_t` is linear velocity +- :math:`\omega_t` is angular velocity (yaw rate) + +The input vector includes sensor noise modeled by covariance matrix :math:`\mathbf{R}`: + +.. math:: \mathbf{R} = \begin{bmatrix} \sigma_v^2 & 0 \\ 0 & \sigma_\omega^2 \end{bmatrix} + + +Observation Vector +^^^^^^^^^^^^^^^^^^ + +The robot can observe landmarks (RFID tags) with distance and bearing measurements: + +.. math:: \mathbf{z}_t = [d_t, \theta_t, x_{lm}, y_{lm}] + +where: + +- :math:`d_t` is the distance to the landmark +- :math:`\theta_t` is the bearing angle to the landmark +- :math:`x_{lm}, y_{lm}` are the known landmark positions + +The observation noise is modeled by covariance matrix :math:`\mathbf{Q}`: + +.. math:: \mathbf{Q} = \begin{bmatrix} \sigma_d^2 & 0 \\ 0 & \sigma_\theta^2 \end{bmatrix} + + +Motion Model +~~~~~~~~~~~~ + +The robot motion model is: + +.. math:: \dot{x} = v \cos(\phi) + +.. math:: \dot{y} = v \sin(\phi) + +.. math:: \dot{\phi} = \omega + +.. math:: \dot{v} = 0 + +Discretized with time step :math:`\Delta t`, the motion model becomes: + +.. math:: \mathbf{x}_{t+1} = f(\mathbf{x}_t, \mathbf{u}_t) = \mathbf{F}\mathbf{x}_t + \mathbf{B}\mathbf{u}_t + +where: + +:math:`\begin{equation*} \mathbf{F} = \begin{bmatrix} 1 & 0 & 0 & 0\\ 0 & 1 & 0 & 0\\ 0 & 0 & 1 & 0 \\ 0 & 0 & 0 & 0 \\ \end{bmatrix} \end{equation*}` + +:math:`\begin{equation*} \mathbf{B} = \begin{bmatrix} \cos(\phi) \Delta t & 0\\ \sin(\phi) \Delta t & 0\\ 0 & \Delta t\\ 1 & 0\\ \end{bmatrix} \end{equation*}` + +The motion function expands to: + +:math:`\begin{equation*} \begin{bmatrix} x_{t+1} \\ y_{t+1} \\ \phi_{t+1} \\ v_{t+1} \end{bmatrix} = \begin{bmatrix} x_t + v_t\cos(\phi_t)\Delta t \\ y_t + v_t\sin(\phi_t)\Delta t \\ \phi_t + \omega_t \Delta t \\ v_t \end{bmatrix} \end{equation*}` + + +Observation Model +~~~~~~~~~~~~~~~~~ + +For each landmark at position :math:`(x_{lm}, y_{lm})`, the observation model calculates +the expected landmark position in the global frame: + +.. math:: + \begin{bmatrix} + x_{lm,obs} \\ + y_{lm,obs} + \end{bmatrix} = + \begin{bmatrix} + x + d \cos(\phi + \theta) \\ + y + d \sin(\phi + \theta) + \end{bmatrix} + +where :math:`d` and :math:`\theta` are the observed distance and bearing to the landmark. + +The observation function projects the robot state to expected landmark positions, +which are then compared with actual landmark positions for the update step. + + +Advantages of EnKF +~~~~~~~~~~~~~~~~~~ + +- **No Jacobian required**: Unlike EKF, EnKF does not need to compute Jacobian matrices, making it easier to implement for nonlinear systems +- **Handles non-Gaussian distributions**: The ensemble representation can capture non-Gaussian features of the state distribution +- **Computationally efficient**: For high-dimensional systems, EnKF can be more efficient than maintaining full covariance matrices +- **Easy to parallelize**: Each ensemble member can be propagated independently + + +Reference +~~~~~~~~~ + +- `Ensemble Kalman filtering `_ +- `PROBABILISTIC ROBOTICS `_ + diff --git a/docs/modules/2_localization/histogram_filter_localization/histogram_filter_localization_main.rst b/docs/modules/2_localization/histogram_filter_localization/histogram_filter_localization_main.rst index 3a175b1316..b5697d2dd9 100644 --- a/docs/modules/2_localization/histogram_filter_localization/histogram_filter_localization_main.rst +++ b/docs/modules/2_localization/histogram_filter_localization/histogram_filter_localization_main.rst @@ -68,7 +68,7 @@ But, the probability is getting uncertain without observations: The `gaussian filter `_ -is used in the simulation for adding noize. +is used in the simulation for adding noise. Step3: Update probability by observation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/modules/2_localization/particle_filter_localization/particle_filter_localization_main.rst b/docs/modules/2_localization/particle_filter_localization/particle_filter_localization_main.rst index d648d8e080..8bf22a246b 100644 --- a/docs/modules/2_localization/particle_filter_localization/particle_filter_localization_main.rst +++ b/docs/modules/2_localization/particle_filter_localization/particle_filter_localization_main.rst @@ -13,7 +13,7 @@ and the red line is estimated trajectory with PF. It is assumed that the robot can measure a distance from landmarks (RFID). -This measurements are used for PF localization. +These measurements are used for PF localization. Code Link ~~~~~~~~~~~~~ diff --git a/docs/modules/2_localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization_main.rst b/docs/modules/2_localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization_main.rst index a7a5aab61b..01d161b17e 100644 --- a/docs/modules/2_localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization_main.rst +++ b/docs/modules/2_localization/unscented_kalman_filter_localization/unscented_kalman_filter_localization_main.rst @@ -5,7 +5,10 @@ Unscented Kalman Filter localization This is a sensor fusion localization with Unscented Kalman Filter(UKF). -The lines and points are same meaning of the EKF simulation. +The blue line is true trajectory, the black line is dead reckoning trajectory, +the green points are GPS observations, and the red line is estimated trajectory with UKF. + +The red ellipse is estimated covariance ellipse with UKF. Code Link ~~~~~~~~~~~~~ @@ -13,7 +16,242 @@ Code Link .. autofunction:: Localization.unscented_kalman_filter.unscented_kalman_filter.ukf_estimation +Unscented Kalman Filter Algorithm +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The Unscented Kalman Filter (UKF) is a nonlinear state estimation technique that uses +the unscented transform to handle nonlinearities. Unlike the Extended Kalman Filter (EKF) +that linearizes the nonlinear functions using Jacobians, UKF uses a deterministic sampling +approach called sigma points to capture the mean and covariance of the state distribution. + +The UKF algorithm consists of two main steps: + +=== Predict === + +1. Generate sigma points around the current state estimate: + + :math:`\chi_0 = \mathbf{x}_{t}` + + :math:`\chi_i = \mathbf{x}_{t} + \gamma\sqrt{\mathbf{P}_t}_i` for :math:`i = 1, ..., n` + + :math:`\chi_i = \mathbf{x}_{t} - \gamma\sqrt{\mathbf{P}_t}_{i-n}` for :math:`i = n+1, ..., 2n` + + where :math:`\gamma = \sqrt{n + \lambda}` and :math:`\lambda = \alpha^2(n + \kappa) - n` + +2. Propagate sigma points through the motion model: + + :math:`\chi^-_i = f(\chi_i, \mathbf{u}_t)` + +3. Calculate predicted state mean: + + :math:`\mathbf{x}^-_{t+1} = \sum_{i=0}^{2n} w^m_i \chi^-_i` + +4. Calculate predicted state covariance: + + :math:`\mathbf{P}^-_{t+1} = \sum_{i=0}^{2n} w^c_i (\chi^-_i - \mathbf{x}^-_{t+1})(\chi^-_i - \mathbf{x}^-_{t+1})^T + \mathbf{Q}` + +=== Update === + +1. Generate sigma points around the predicted state: + + :math:`\chi_i = \text{generate\_sigma\_points}(\mathbf{x}^-_{t+1}, \mathbf{P}^-_{t+1})` + +2. Propagate sigma points through the observation model: + + :math:`\mathcal{Z}_i = h(\chi_i)` + +3. Calculate predicted observation mean: + + :math:`\mathbf{z}^-_{t+1} = \sum_{i=0}^{2n} w^m_i \mathcal{Z}_i` + +4. Calculate innovation covariance: + + :math:`\mathbf{S}_t = \sum_{i=0}^{2n} w^c_i (\mathcal{Z}_i - \mathbf{z}^-_{t+1})(\mathcal{Z}_i - \mathbf{z}^-_{t+1})^T + \mathbf{R}` + +5. Calculate cross-correlation matrix: + + :math:`\mathbf{P}_{xz} = \sum_{i=0}^{2n} w^c_i (\chi_i - \mathbf{x}^-_{t+1})(\mathcal{Z}_i - \mathbf{z}^-_{t+1})^T` + +6. Calculate Kalman gain: + + :math:`\mathbf{K} = \mathbf{P}_{xz}\mathbf{S}_t^{-1}` + +7. Update state estimate: + + :math:`\mathbf{x}_{t+1} = \mathbf{x}^-_{t+1} + \mathbf{K}(\mathbf{z}_t - \mathbf{z}^-_{t+1})` + +8. Update covariance estimate: + + :math:`\mathbf{P}_{t+1} = \mathbf{P}^-_{t+1} - \mathbf{K}\mathbf{S}_t\mathbf{K}^T` + + +Sigma Points and Weights +~~~~~~~~~~~~~~~~~~~~~~~~~ + +The UKF uses deterministic sigma points to represent the state distribution. The weights +for combining sigma points are calculated as: + +**Mean weights:** + +:math:`w^m_0 = \frac{\lambda}{n + \lambda}` + +:math:`w^m_i = \frac{1}{2(n + \lambda)}` for :math:`i = 1, ..., 2n` + +**Covariance weights:** + +:math:`w^c_0 = \frac{\lambda}{n + \lambda} + (1 - \alpha^2 + \beta)` + +:math:`w^c_i = \frac{1}{2(n + \lambda)}` for :math:`i = 1, ..., 2n` + +where: + +- :math:`\alpha` controls the spread of sigma points around the mean (typically :math:`0.001 \leq \alpha \leq 1`) +- :math:`\beta` incorporates prior knowledge of the distribution (:math:`\beta = 2` is optimal for Gaussian distributions) +- :math:`\kappa` is a secondary scaling parameter (typically :math:`\kappa = 0`) +- :math:`n` is the dimension of the state vector + + +Filter Design +~~~~~~~~~~~~~ + +In this simulation, the robot has a state vector with 4 states at time :math:`t`: + +.. math:: \mathbf{x}_t = [x_t, y_t, \phi_t, v_t]^T + +where: + +- :math:`x, y` are 2D position coordinates +- :math:`\phi` is orientation (yaw angle) +- :math:`v` is velocity + +In the code, "xEst" means the state vector. + +The covariance matrices are: + +- :math:`\mathbf{P}_t` is the state covariance matrix +- :math:`\mathbf{Q}` is the process noise covariance matrix +- :math:`\mathbf{R}` is the observation noise covariance matrix + +**Process Noise Covariance** :math:`\mathbf{Q}`: + +.. math:: + \mathbf{Q} = \begin{bmatrix} + 0.1^2 & 0 & 0 & 0 \\ + 0 & 0.1^2 & 0 & 0 \\ + 0 & 0 & (0.017)^2 & 0 \\ + 0 & 0 & 0 & 1.0^2 + \end{bmatrix} + +**Observation Noise Covariance** :math:`\mathbf{R}`: + +.. math:: + \mathbf{R} = \begin{bmatrix} + 1.0^2 & 0 \\ + 0 & 1.0^2 + \end{bmatrix} + + +Input Vector +^^^^^^^^^^^^ + +The robot has a velocity sensor and a gyro sensor, so the control input vector at each time step is: + +.. math:: \mathbf{u}_t = [v_t, \omega_t]^T + +where: + +- :math:`v_t` is linear velocity [m/s] +- :math:`\omega_t` is angular velocity (yaw rate) [rad/s] + +The input vector includes sensor noise. + + +Observation Vector +^^^^^^^^^^^^^^^^^^ + +The robot has a GNSS (GPS) sensor that can measure x-y position: + +.. math:: \mathbf{z}_t = [x_t, y_t]^T + +The observation includes GPS noise with covariance :math:`\mathbf{R}`. + + +Motion Model +~~~~~~~~~~~~ + +The robot motion model is: + +.. math:: \dot{x} = v \cos(\phi) + +.. math:: \dot{y} = v \sin(\phi) + +.. math:: \dot{\phi} = \omega + +.. math:: \dot{v} = 0 + +Discretized with time step :math:`\Delta t`, the motion model becomes: + +.. math:: \mathbf{x}_{t+1} = f(\mathbf{x}_t, \mathbf{u}_t) = \mathbf{F}\mathbf{x}_t + \mathbf{B}\mathbf{u}_t + +where: + +:math:`\begin{equation*} \mathbf{F} = \begin{bmatrix} 1 & 0 & 0 & 0\\ 0 & 1 & 0 & 0\\ 0 & 0 & 1 & 0 \\ 0 & 0 & 0 & 0 \\ \end{bmatrix} \end{equation*}` + +:math:`\begin{equation*} \mathbf{B} = \begin{bmatrix} \cos(\phi) \Delta t & 0\\ \sin(\phi) \Delta t & 0\\ 0 & \Delta t\\ 1 & 0\\ \end{bmatrix} \end{equation*}` + +The motion function expands to: + +:math:`\begin{equation*} \begin{bmatrix} x_{t+1} \\ y_{t+1} \\ \phi_{t+1} \\ v_{t+1} \end{bmatrix} = \begin{bmatrix} x_t + v_t\cos(\phi_t)\Delta t \\ y_t + v_t\sin(\phi_t)\Delta t \\ \phi_t + \omega_t \Delta t \\ v_t \end{bmatrix} \end{equation*}` + +:math:`\Delta t = 0.1` [s] is the time interval. + + +Observation Model +~~~~~~~~~~~~~~~~~ + +The robot can get x-y position information from GPS. + +The GPS observation model is: + +.. math:: \mathbf{z}_{t} = h(\mathbf{x}_t) = \mathbf{H} \mathbf{x}_t + +where: + +:math:`\begin{equation*} \mathbf{H} = \begin{bmatrix} 1 & 0 & 0 & 0 \\ 0 & 1 & 0 & 0 \\ \end{bmatrix} \end{equation*}` + +The observation function states that: + +:math:`\begin{equation*} \begin{bmatrix} x_{obs} \\ y_{obs} \end{bmatrix} = h(\mathbf{x}) = \begin{bmatrix} x \\ y \end{bmatrix} \end{equation*}` + + +UKF Parameters +~~~~~~~~~~~~~~ + +The UKF uses three tuning parameters: + +- **ALPHA = 0.001**: Controls the spread of sigma points around the mean. Smaller values result in sigma points closer to the mean. +- **BETA = 2**: Incorporates prior knowledge about the distribution. For Gaussian distributions, the optimal value is 2. +- **KAPPA = 0**: Secondary scaling parameter. Usually set to 0 or 3-n where n is the state dimension. + +These parameters affect the calculation of :math:`\lambda = \alpha^2(n + \kappa) - n`, which determines +the sigma point spread and weights. + + +Advantages of UKF over EKF +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The Unscented Kalman Filter offers several advantages over the Extended Kalman Filter: + +- **No Jacobian required**: UKF does not need to compute Jacobian matrices, which can be error-prone and computationally expensive for complex nonlinear systems +- **Higher accuracy**: UKF captures the mean and covariance to the second order (third order for Gaussian distributions) while EKF only achieves first-order accuracy +- **Better handling of nonlinearities**: The unscented transform provides a better approximation of the probability distribution after nonlinear transformation +- **Easier implementation**: For highly nonlinear systems, UKF is often easier to implement since it doesn't require analytical derivatives +- **Numerical stability**: UKF can be more numerically stable than EKF for strongly nonlinear systems + + Reference ~~~~~~~~~~~ - `Discriminatively Trained Unscented Kalman Filter for Mobile Robot Localization `_ +- `The Unscented Kalman Filter for Nonlinear Estimation `_ +- `PROBABILISTIC ROBOTICS `_ diff --git a/docs/modules/3_mapping/lidar_to_grid_map_tutorial/lidar_to_grid_map_tutorial_main.rst b/docs/modules/3_mapping/lidar_to_grid_map_tutorial/lidar_to_grid_map_tutorial_main.rst index 29f5878e48..c97d204a82 100644 --- a/docs/modules/3_mapping/lidar_to_grid_map_tutorial/lidar_to_grid_map_tutorial_main.rst +++ b/docs/modules/3_mapping/lidar_to_grid_map_tutorial/lidar_to_grid_map_tutorial_main.rst @@ -19,7 +19,7 @@ unknown (unobserved) areas, which are close to 0.5. .. figure:: grid_map_example.png In order to construct the grid map from the measurement we need to -discretise the values. But, first let’s need to ``import`` some +discretise the values. But, first we need to ``import`` some necessary packages. .. code:: ipython3 diff --git a/docs/modules/4_slam/FastSLAM1/FastSLAM1_main.rst b/docs/modules/4_slam/FastSLAM1/FastSLAM1_main.rst index b6bafa0982..296f1766de 100644 --- a/docs/modules/4_slam/FastSLAM1/FastSLAM1_main.rst +++ b/docs/modules/4_slam/FastSLAM1/FastSLAM1_main.rst @@ -112,7 +112,7 @@ will converge to the correct estimate. MAX_RANGE = 20.0 # maximum observation range M_DIST_TH = 2.0 # Threshold of Mahalanobis distance for data association. STATE_SIZE = 3 # State size [x,y,yaw] - LM_SIZE = 2 # LM srate size [x,y] + LM_SIZE = 2 # LM state size [x,y] N_PARTICLE = 100 # number of particle NTH = N_PARTICLE / 1.5 # Number of particle for re-sampling @@ -409,7 +409,7 @@ probably will die out. 3- Resampling ~~~~~~~~~~~~~ -In the reseampling steps a new set of particles are chosen from the old +In the resampling steps a new set of particles are chosen from the old set. This is done according to the weight of each particle. The figure shows 100 particles distributed uniformly between [-0.5, 0.5] diff --git a/docs/modules/4_slam/ekf_slam/ekf_slam_main.rst b/docs/modules/4_slam/ekf_slam/ekf_slam_main.rst index 3967a7ae4d..05bd636ef5 100644 --- a/docs/modules/4_slam/ekf_slam/ekf_slam_main.rst +++ b/docs/modules/4_slam/ekf_slam/ekf_slam_main.rst @@ -32,7 +32,7 @@ Introduction EKF SLAM models the SLAM problem in a single EKF where the modeled state is both the pose :math:`(x, y, \theta)` and an array of landmarks -:math:`[(x_1, y_1), (x_2, x_y), ... , (x_n, y_n)]` for :math:`n` +:math:`[(x_1, y_1), (x_2, y_2), ... , (x_n, y_n)]` for :math:`n` landmarks. The covariance between each of the positions and landmarks are also tracked. diff --git a/docs/modules/4_slam/graph_slam/graphSLAM_doc.rst b/docs/modules/4_slam/graph_slam/graphSLAM_doc.rst index 5297604809..3d329ae131 100644 --- a/docs/modules/4_slam/graph_slam/graphSLAM_doc.rst +++ b/docs/modules/4_slam/graph_slam/graphSLAM_doc.rst @@ -28,7 +28,7 @@ available that uses graph-based approaches to perform online estimation or to solve for a subset of the poses. GraphSLAM uses the motion information as well as the observations of the -environment to create least square problem that can be solved using +environment to create a least squares problem that can be solved using standard optimization techniques. Minimal Example @@ -331,7 +331,7 @@ between node 0 and 1 will be created. The equations for the error is as follows: :math:`e_{ij}^y = y_j + d_j sin(\psi_j + \theta_j) - y_i - d_i sin(\psi_i + \theta_i)` -:math:`e_{ij}^x = \psi_j + \theta_j - \psi_i - \theta_i` +:math:`e_{ij}^{\psi} = \psi_j + \theta_j - \psi_i - \theta_i` Where :math:`[x_i, y_i, \psi_i]` is the pose for node :math:`i` and similarly for node :math:`j`, :math:`d` is the measured distance at diff --git a/docs/modules/5_path_planning/dubins_path/dubins_path_main.rst b/docs/modules/5_path_planning/dubins_path/dubins_path_main.rst index 5a3d14464b..74cb757886 100644 --- a/docs/modules/5_path_planning/dubins_path/dubins_path_main.rst +++ b/docs/modules/5_path_planning/dubins_path/dubins_path_main.rst @@ -13,7 +13,7 @@ It can generates a shortest path between two 2D poses (x, y, yaw) with maximum c Generated paths consist of 3 segments of maximum curvature curves or a straight line segment. -Each segment type can is categorized by 3 type: 'Right turn (R)' , 'Left turn (L)', and 'Straight (S).' +Each segment type can be categorized by 3 types: 'Right turn (R)' , 'Left turn (L)', and 'Straight (S).' Possible path will be at least one of these six types: RSR, RSL, LSR, LSL, RLR, LRL. diff --git a/docs/modules/5_path_planning/elastic_bands/elastic_bands_main.rst b/docs/modules/5_path_planning/elastic_bands/elastic_bands_main.rst index d0109d4ec3..e37cbd4e77 100644 --- a/docs/modules/5_path_planning/elastic_bands/elastic_bands_main.rst +++ b/docs/modules/5_path_planning/elastic_bands/elastic_bands_main.rst @@ -67,7 +67,7 @@ Dynamic Path Maintenance .. math:: b_i^{\text{new}} = b_i^{\text{old}} + \alpha (f_c + f_r), - where :math:`\alpha` is a step-size parameter, which often proportional to :math:`\rho(b_i^{\text{old}})` + where :math:`\alpha` is a step-size parameter, which is often proportional to :math:`\rho(b_i^{\text{old}})` 2. **Overlap Enforcement**: - Insert new nodes if adjacent nodes are too far apart diff --git a/docs/modules/5_path_planning/grid_base_search/grid_base_search_main.rst b/docs/modules/5_path_planning/grid_base_search/grid_base_search_main.rst index c4aa6882aa..b7450107b6 100644 --- a/docs/modules/5_path_planning/grid_base_search/grid_base_search_main.rst +++ b/docs/modules/5_path_planning/grid_base_search/grid_base_search_main.rst @@ -82,6 +82,35 @@ Code Link .. autofunction:: PathPlanning.BidirectionalAStar.bidirectional_a_star.BidirectionalAStarPlanner +.. _Theta*-algorithm: + +Theta\* algorithm +~~~~~~~~~~~~~~~~~ + +This is a 2D grid based shortest path planning with Theta star algorithm. + +It offers an optimization over the A star algorithm to generate any-angle paths. Unlike A star, which always assigns the current node as the parent of the successor, Theta star checks for a line-of-sight (unblocked path) from an earlier node (typically the parent of the current node) to the successor, and directly assigns it as a parent if visible. This reduces cost by replacing staggered segments with straight lines. + +Checking for line of sight involves verifying that no obstacles block the straight line between two nodes. On a grid, this means identifying all the discrete cells that the line passes through to determine if they are empty. Bresenham’s line algorithm is commonly used for this purpose. Starting from one endpoint, it incrementally steps along one axis, while considering the gradient to determine the position on the other axis. Because it relies only on integer addition and subtraction, it is both efficient and precise for grid-based visibility checks in Theta star. + +As a result, Theta star produces shorter, smoother paths than A star, ideal for ground or aerial robots operating in continuous environments where smoother motion enables higher acceleration and reduced travel time. + +.. image:: https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/PathPlanning/ThetaStar/animation.gif + +In the animation, each cyan arrow represents a node pointing to its parent. + +Reference +++++++++++++ + +- `Theta*: Any-Angle Path Planning on Grids `__ +- `Bresenham's line algorithm `__ + +Code Link ++++++++++++++ + +.. autofunction:: PathPlanning.ThetaStar.theta_star.ThetaStarPlanner + + .. _D*-algorithm: D\* algorithm diff --git a/docs/modules/5_path_planning/particleSwarmOptimization/particleSwarmOptimization_main.rst b/docs/modules/5_path_planning/particleSwarmOptimization/particleSwarmOptimization_main.rst new file mode 100644 index 0000000000..b430faec1e --- /dev/null +++ b/docs/modules/5_path_planning/particleSwarmOptimization/particleSwarmOptimization_main.rst @@ -0,0 +1,153 @@ +.. _particle_swarm_optimization: + +Particle Swarm Optimization Path Planning +------------------------------------------ + +This is a 2D path planning simulation using the Particle Swarm Optimization algorithm. + +PSO is a metaheuristic optimization algorithm inspired by the social behavior of bird flocking or fish schooling. In path planning, particles represent potential solutions that explore the search space to find collision-free paths from start to goal. + +.. image:: https://github.com/AtsushiSakai/PythonRoboticsGifs/raw/master/PathPlanning/ParticleSwarmOptimization/animation.gif + +Algorithm Overview +++++++++++++++++++ + +The PSO algorithm maintains a swarm of particles that move through the search space according to simple mathematical rules: + +1. **Initialization**: Particles are randomly distributed near the start position +2. **Evaluation**: Each particle's fitness is calculated based on distance to goal and obstacle penalties +3. **Update**: Particles adjust their velocities based on: + - Personal best position (cognitive component) + - Global best position (social component) + - Current velocity (inertia component) +4. **Movement**: Particles move to new positions and check for collisions +5. **Convergence**: Process repeats until maximum iterations or goal is reached + +Mathematical Foundation ++++++++++++++++++++++++ + +The core PSO velocity update equation is: + +.. math:: + + v_{i}(t+1) = w \cdot v_{i}(t) + c_1 \cdot r_1 \cdot (p_{i} - x_{i}(t)) + c_2 \cdot r_2 \cdot (g - x_{i}(t)) + +Where: +- :math:`v_{i}(t)` = velocity of particle i at time t +- :math:`x_{i}(t)` = position of particle i at time t +- :math:`w` = inertia weight (controls exploration vs exploitation) +- :math:`c_1` = cognitive coefficient (attraction to personal best) +- :math:`c_2` = social coefficient (attraction to global best) +- :math:`r_1, r_2` = random numbers in [0,1] +- :math:`p_{i}` = personal best position of particle i +- :math:`g` = global best position + +Position update: + +.. math:: + + x_{i}(t+1) = x_{i}(t) + v_{i}(t+1) + +Fitness Function +++++++++++++++++ + +The fitness function combines distance to target with obstacle penalties: + +.. math:: + + f(x) = ||x - x_{goal}|| + \sum_{j} P_{obs}(x, O_j) + +Where: +- :math:`||x - x_{goal}||` = Euclidean distance to goal +- :math:`P_{obs}(x, O_j)` = penalty for obstacle j +- :math:`O_j` = obstacle j with position and radius + +The obstacle penalty function is defined as: + +.. math:: + + P_{obs}(x, O_j) = \begin{cases} + 1000 & \text{if } ||x - O_j|| < r_j \text{ (inside obstacle)} \\ + \frac{50}{||x - O_j|| - r_j + 0.1} & \text{if } r_j \leq ||x - O_j|| < r_j + R_{influence} \text{ (near obstacle)} \\ + 0 & \text{if } ||x - O_j|| \geq r_j + R_{influence} \text{ (safe distance)} + \end{cases} + +Where: +- :math:`r_j` = radius of obstacle j +- :math:`R_{influence}` = influence radius (typically 5 units) + +Collision Detection ++++++++++++++++++++ + +Line-circle intersection is used to detect collisions between particle paths and circular obstacles: + +.. math:: + + ||P_0 + t \cdot \vec{d} - C|| = r + +Where: +- :math:`P_0` = start point of path segment +- :math:`\vec{d}` = direction vector of path +- :math:`C` = obstacle center +- :math:`r` = obstacle radius +- :math:`t \in [0,1]` = parameter along line segment + +Algorithm Parameters +++++++++++++++++++++ + +Key parameters affecting performance: + +- **Number of particles** (n_particles): More particles = better exploration but slower +- **Maximum iterations** (max_iter): More iterations = better convergence but slower +- **Inertia weight** (w): High = exploration, Low = exploitation +- **Cognitive coefficient** (c1): Attraction to personal best +- **Social coefficient** (c2): Attraction to global best + +Typical values: +- n_particles: 20-50 +- max_iter: 100-300 +- w: 0.9 → 0.4 (linearly decreasing) +- c1, c2: 1.5-2.0 + +Advantages +++++++++++ + +- **Global optimization**: Can escape local minima unlike gradient-based methods +- **No derivatives needed**: Works with non-differentiable fitness landscapes +- **Parallel exploration**: Multiple particles search simultaneously +- **Simple implementation**: Few parameters and straightforward logic +- **Flexible**: Easily adaptable to different environments and constraints + +Disadvantages ++++++++++++++ + +- **Stochastic**: Results may vary between runs +- **Parameter sensitive**: Performance heavily depends on parameter tuning +- **No optimality guarantee**: Metaheuristic without convergence proof +- **Computational cost**: Requires many fitness evaluations +- **Prone to stagnation**: Premature convergence where the entire swarm can get trapped in a local minimum if exploration is insufficient + +Code Link ++++++++++ + +.. autofunction:: PathPlanning.ParticleSwarmOptimization.particle_swarm_optimization.main + +Usage Example ++++++++++++++ + +.. code-block:: python + + import matplotlib.pyplot as plt + from PathPlanning.ParticleSwarmOptimization.particle_swarm_optimization import main + + # Run PSO path planning with visualization + main() + +References +++++++++++ + +- `Particle swarm optimization - Wikipedia `__ +- Kennedy, J.; Eberhart, R. (1995). "Particle Swarm Optimization". Proceedings of IEEE International Conference on Neural Networks. IV. pp. 1942–1948. +- Shi, Y.; Eberhart, R. (1998). "A Modified Particle Swarm Optimizer". IEEE International Conference on Evolutionary Computation. +- `A Gentle Introduction to Particle Swarm Optimization `__ +- Clerc, M.; Kennedy, J. (2002). "The particle swarm - explosion, stability, and convergence in a multidimensional complex space". IEEE Transactions on Evolutionary Computation. 6 (1): 58–73. \ No newline at end of file diff --git a/docs/modules/5_path_planning/path_planning_main.rst b/docs/modules/5_path_planning/path_planning_main.rst index 0c84a19c22..5132760dc5 100644 --- a/docs/modules/5_path_planning/path_planning_main.rst +++ b/docs/modules/5_path_planning/path_planning_main.rst @@ -19,6 +19,7 @@ Path planning is the ability of a robot to search feasible and efficient path to visibility_road_map_planner/visibility_road_map_planner vrm_planner/vrm_planner rrt/rrt + particleSwarmOptimization/particleSwarmOptimization cubic_spline/cubic_spline bspline_path/bspline_path catmull_rom_spline/catmull_rom_spline diff --git a/docs/modules/5_path_planning/reeds_shepp_path/reeds_shepp_path_main.rst b/docs/modules/5_path_planning/reeds_shepp_path/reeds_shepp_path_main.rst index 4dd54d7c97..bedcfc33d8 100644 --- a/docs/modules/5_path_planning/reeds_shepp_path/reeds_shepp_path_main.rst +++ b/docs/modules/5_path_planning/reeds_shepp_path/reeds_shepp_path_main.rst @@ -17,7 +17,7 @@ Here is an overview of mathematical derivations of formulae for individual path In all the derivations below, radius of curvature of the vehicle is assumed to be of unit length and start pose is considered to be at origin. (*In code we are removing the offset due to start position and normalising the lengths before passing the values to these functions.*) -Also, (t, u, v) respresent the measure of each motion requried. Thus, in case of a turning maneuver, they represent the angle inscribed at the centre of turning circle and in case of straight maneuver, they represent the distance to be travelled. +Also, (t, u, v) represent the measure of each motion required. Thus, in case of a turning maneuver, they represent the angle inscribed at the centre of turning circle and in case of straight maneuver, they represent the distance to be travelled. 1. **Left-Straight-Left** diff --git a/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst index 9517e95b31..a44dd20a97 100644 --- a/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst +++ b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst @@ -81,9 +81,28 @@ Code Link ^^^^^^^^^^^^^ .. autoclass:: PathPlanning.TimeBasedPathPlanning.SafeInterval.SafeIntervalPathPlanner +Multi-Agent Path Planning +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Priority Based Planning +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The planner generates an order to plan in, and generates plans for the robots in that order. Each planned path is reserved in the grid, and all future plans must avoid that path. The robots are planned for in descending order of distance from start to goal. + +This is a sub-optimal algorithm because no replanning happens once paths are found. It does not guarantee the shortest path is found for any particular robot, nor that the final set of paths found contains the lowest possible amount of time or movement. + +Static Obstacles: + +.. image:: https://raw.githubusercontent.com/AtsushiSakai/PythonRoboticsGifs/refs/heads/master/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner/priority_planner2.gif + +Dynamic Obstacles: + +.. image:: https://raw.githubusercontent.com/AtsushiSakai/PythonRoboticsGifs/refs/heads/master/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner/priority_planner.gif + References ~~~~~~~~~~~ - `Cooperative Pathfinding `__ - `SIPP: Safe Interval Path Planning for Dynamic Environments `__ +- `Prioritized Planning Algorithms for Trajectory Coordination of Multiple Mobile Robots `__ \ No newline at end of file diff --git a/docs/modules/5_path_planning/vrm_planner/vrm_planner_main.rst b/docs/modules/5_path_planning/vrm_planner/vrm_planner_main.rst index a9a41aab74..46b86123f3 100644 --- a/docs/modules/5_path_planning/vrm_planner/vrm_planner_main.rst +++ b/docs/modules/5_path_planning/vrm_planner/vrm_planner_main.rst @@ -9,7 +9,7 @@ In the animation, blue points are Voronoi points, Cyan crosses mean searched points with Dijkstra method, -The red line is the final path of Vornoi Road-Map. +The red line is the final path of Voronoi Road-Map. Code Link ~~~~~~~~~~~~~~~ diff --git a/docs/modules/6_path_tracking/model_predictive_speed_and_steering_control/model_predictive_speed_and_steering_control_main.rst b/docs/modules/6_path_tracking/model_predictive_speed_and_steering_control/model_predictive_speed_and_steering_control_main.rst index 76a6819a46..cc5a6812ca 100644 --- a/docs/modules/6_path_tracking/model_predictive_speed_and_steering_control/model_predictive_speed_and_steering_control_main.rst +++ b/docs/modules/6_path_tracking/model_predictive_speed_and_steering_control/model_predictive_speed_and_steering_control_main.rst @@ -34,17 +34,17 @@ Input vector is: .. math:: u = [a, \delta] -a: accellation, δ: steering angle +a: acceleration, δ: steering angle -The MPC cotroller minimize this cost function for path tracking: +The MPC cotroller minimizes this cost function for path tracking: .. math:: min\ Q_f(z_{T,ref}-z_{T})^2+Q\Sigma({z_{t,ref}-z_{t}})^2+R\Sigma{u_t}^2+R_d\Sigma({u_{t+1}-u_{t}})^2 -z_ref come from target path and speed. +z_ref comes from target path and speed. subject to: -- Linearlied vehicle model +- Linearized vehicle model .. math:: z_{t+1}=Az_t+Bu+C diff --git a/docs/modules/6_path_tracking/move_to_a_pose/move_to_a_pose_main.rst b/docs/modules/6_path_tracking/move_to_a_pose/move_to_a_pose_main.rst index 19bb0e4c14..b3d23ab2be 100644 --- a/docs/modules/6_path_tracking/move_to_a_pose/move_to_a_pose_main.rst +++ b/docs/modules/6_path_tracking/move_to_a_pose/move_to_a_pose_main.rst @@ -32,7 +32,7 @@ Constructor PathFinderController(Kp_rho, Kp_alpha, Kp_beta) -Constructs an instantiate of the PathFinderController for navigating a 3-DOF wheeled robot on a 2D plane. +Constructs an instance of the PathFinderController for navigating a 3-DOF wheeled robot on a 2D plane. Parameters: diff --git a/docs/modules/7_arm_navigation/n_joint_arm_to_point_control_main.rst b/docs/modules/7_arm_navigation/n_joint_arm_to_point_control_main.rst index 56900acde1..6c1db7d2a5 100644 --- a/docs/modules/7_arm_navigation/n_joint_arm_to_point_control_main.rst +++ b/docs/modules/7_arm_navigation/n_joint_arm_to_point_control_main.rst @@ -3,7 +3,7 @@ N joint arm to point control N joint arm to a point control simulation. -This is a interactive simulation. +This is an interactive simulation. You can set the goal position of the end effector with left-click on the plotting area. diff --git a/docs/modules/7_arm_navigation/planar_two_link_ik_main.rst b/docs/modules/7_arm_navigation/planar_two_link_ik_main.rst index 5b2712eb48..8d3b709e47 100644 --- a/docs/modules/7_arm_navigation/planar_two_link_ik_main.rst +++ b/docs/modules/7_arm_navigation/planar_two_link_ik_main.rst @@ -7,7 +7,7 @@ Two joint arm to point control This is two joint arm to a point control simulation. -This is a interactive simulation. +This is an interactive simulation. You can set the goal position of the end effector with left-click on the plotting area. diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 0c0719cca5..59056ed336 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,9 +1,9 @@ -numpy == 2.2.4 -scipy == 1.15.2 -matplotlib == 3.10.1 -cvxpy == 1.6.5 +numpy == 2.3.4 +scipy == 1.16.3 +matplotlib == 3.10.7 +cvxpy == 1.7.3 ecos == 2.0.14 -pytest == 8.4.0 # For unit test -pytest-xdist == 3.7.0 # For unit test -mypy == 1.16.0 # For unit test -ruff == 0.11.12 # For unit test +pytest == 9.0.0 # For unit test +pytest-xdist == 3.8.0 # For unit test +mypy == 1.18.2 # For unit test +ruff == 0.14.4 # For unit test diff --git a/tests/test_particle_swarm_optimization.py b/tests/test_particle_swarm_optimization.py new file mode 100755 index 0000000000..95720022f6 --- /dev/null +++ b/tests/test_particle_swarm_optimization.py @@ -0,0 +1,11 @@ +import conftest # Add root path to sys.path +from PathPlanning.ParticleSwarmOptimization import particle_swarm_optimization as m + + +def test_1(): + m.show_animation = False + m.main() + + +if __name__ == '__main__': + conftest.run_this_test(__file__) \ No newline at end of file diff --git a/tests/test_priority_based_planner.py b/tests/test_priority_based_planner.py new file mode 100644 index 0000000000..e2ff602d88 --- /dev/null +++ b/tests/test_priority_based_planner.py @@ -0,0 +1,39 @@ +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + NodePath, + ObstacleArrangement, + Position, +) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import StartAndGoal +from PathPlanning.TimeBasedPathPlanning import PriorityBasedPlanner as m +from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner +import numpy as np +import conftest + + +def test_1(): + grid_side_length = 21 + + start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 16)] + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + ) + + m.show_animation = False + + start_and_goals: list[StartAndGoal] + paths: list[NodePath] + start_and_goals, paths = m.PriorityBasedPlanner.plan(grid, start_and_goals, SafeIntervalPathPlanner, False) + + # All paths should start at the specified position and reach the goal + for i, start_and_goal in enumerate(start_and_goals): + assert paths[i].path[0].position == start_and_goal.start + assert paths[i].path[-1].position == start_and_goal.goal + +if __name__ == "__main__": + conftest.run_this_test(__file__) diff --git a/tests/test_pure_pursuit.py b/tests/test_pure_pursuit.py index 0e0b83bf6c..89c1829514 100644 --- a/tests/test_pure_pursuit.py +++ b/tests/test_pure_pursuit.py @@ -6,6 +6,10 @@ def test1(): m.show_animation = False m.main() +def test_backward(): + m.show_animation = False + m.is_reverse_mode = True + m.main() if __name__ == '__main__': conftest.run_this_test(__file__) diff --git a/tests/test_rrt_with_pathsmoothing_radius.py b/tests/test_rrt_with_pathsmoothing_radius.py new file mode 100644 index 0000000000..a1159255b5 --- /dev/null +++ b/tests/test_rrt_with_pathsmoothing_radius.py @@ -0,0 +1,48 @@ +import conftest +import math + +from PathPlanning.RRT import rrt_with_pathsmoothing as rrt_module + +def test_smoothed_path_safety(): + # Define test environment + obstacle_list = [ + (5, 5, 1.0), + (3, 6, 2.0), + (3, 8, 2.0), + (3, 10, 2.0), + (7, 5, 2.0), + (9, 5, 2.0) + ] + robot_radius = 0.5 + + # Disable animation for testing + rrt_module.show_animation = False + + # Create RRT planner + rrt = rrt_module.RRT( + start=[0, 0], + goal=[6, 10], + rand_area=[-2, 15], + obstacle_list=obstacle_list, + robot_radius=robot_radius + ) + + # Run RRT + path = rrt.planning(animation=False) + + # Smooth the path + smoothed = rrt_module.path_smoothing(path, max_iter=1000, + obstacle_list=obstacle_list, + robot_radius=robot_radius) + + # Check if all points on the smoothed path are safely distant from obstacles + for x, y in smoothed: + for ox, oy, obs_radius in obstacle_list: + d = math.hypot(x - ox, y - oy) + min_safe_dist = obs_radius + robot_radius + assert d > min_safe_dist, \ + f"Point ({x:.2f}, {y:.2f}) too close to obstacle at ({ox}, {oy})" + + +if __name__ == '__main__': + conftest.run_this_test(__file__) diff --git a/tests/test_safe_interval_path_planner.py b/tests/test_safe_interval_path_planner.py index f1fbf90d2a..bbcd4e427c 100644 --- a/tests/test_safe_interval_path_planner.py +++ b/tests/test_safe_interval_path_planner.py @@ -18,9 +18,7 @@ def test_1(): ) m.show_animation = False - planner = m.SafeIntervalPathPlanner(grid, start, goal) - - path = planner.plan(False) + path = m.SafeIntervalPathPlanner.plan(grid, start, goal) # path should have 31 entries assert len(path.path) == 31 diff --git a/tests/test_space_time_astar.py b/tests/test_space_time_astar.py index 390c7732dc..1194c61d2e 100644 --- a/tests/test_space_time_astar.py +++ b/tests/test_space_time_astar.py @@ -18,9 +18,7 @@ def test_1(): ) m.show_animation = False - planner = m.SpaceTimeAStar(grid, start, goal) - - path = planner.plan(False) + path = m.SpaceTimeAStar.plan(grid, start, goal) # path should have 28 entries assert len(path.path) == 31 @@ -28,7 +26,7 @@ def test_1(): # path should end at the goal assert path.path[-1].position == goal - assert planner.expanded_node_count < 1000 + assert path.expanded_node_count < 1000 if __name__ == "__main__": conftest.run_this_test(__file__) diff --git a/tests/test_theta_star.py b/tests/test_theta_star.py new file mode 100644 index 0000000000..10ceae44e3 --- /dev/null +++ b/tests/test_theta_star.py @@ -0,0 +1,11 @@ +import conftest +from PathPlanning.ThetaStar import theta_star as m + + +def test_1(): + m.show_animation = False + m.main() + + +if __name__ == '__main__': + conftest.run_this_test(__file__) \ No newline at end of file diff --git a/users_comments.md b/users_comments.md index 30dc2d2d85..c814e74c4b 100644 --- a/users_comments.md +++ b/users_comments.md @@ -101,7 +101,7 @@ Thank you! --- -Dear AtsushiSakai,
Thanks a lot for the wonderful collection of projects.It was really useful. I really appreciate your time in maintaing and building this repository.It will be really great if I get a chance to meet you in person sometime.I got really inspired looking at your work.
All the very best for all your future endeavors!
Thanks once again,
+Dear AtsushiSakai,
Thanks a lot for the wonderful collection of projects.It was really useful. I really appreciate your time in maintaining and building this repository.It will be really great if I get a chance to meet you in person sometime.I got really inspired looking at your work.
All the very best for all your future endeavors!
Thanks once again,
---Ezhil Bharathi @@ -163,7 +163,7 @@ so kind of you can you make videos of it. --- -Dear AtshshiSakai,
You are very wise and smart that you created this library for students wanting to learn Probabilistic Robotics and actually perform robot control.
Hats off to you and your work.
I am also reading your book titled : Python Robotics +Dear AtsushiSakai,
You are very wise and smart that you created this library for students wanting to learn Probabilistic Robotics and actually perform robot control.
Hats off to you and your work.
I am also reading your book titled : Python Robotics --Himanshu @@ -263,7 +263,7 @@ Hey Atsushi
We are working on a multiagent simulation game and this project --- -Thanks a lot for this amazing set of very useful librarires! +Thanks a lot for this amazing set of very useful libraries! --Razvan Viorel Mihai @@ -275,7 +275,7 @@ Dear Atsushi Sakai,

This is probably one of the best open-source roboti --- -hanks frnd, for ur contibusion +Thanks friend, for your contribution —--