diff --git a/PathPlanning/TimeBasedPathPlanning/BaseClasses.py b/PathPlanning/TimeBasedPathPlanning/BaseClasses.py new file mode 100644 index 0000000000..745cde45fb --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/BaseClasses.py @@ -0,0 +1,49 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + Position, +) +from PathPlanning.TimeBasedPathPlanning.Node import NodePath +import random +import numpy.random as numpy_random + +# Seed randomness for reproducibility +RANDOM_SEED = 50 +random.seed(RANDOM_SEED) +numpy_random.seed(RANDOM_SEED) + +class SingleAgentPlanner(ABC): + """ + Base class for single agent planners + """ + + @staticmethod + @abstractmethod + def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: + pass + +@dataclass +class StartAndGoal: + # Index of this agent + index: int + # Start position of the robot + start: Position + # Goal position of the robot + goal: Position + + def distance_start_to_goal(self) -> float: + return pow(self.goal.x - self.start.x, 2) + pow(self.goal.y - self.start.y, 2) + +class MultiAgentPlanner(ABC): + """ + Base class for multi-agent planners + """ + + @staticmethod + @abstractmethod + def plan(grid: Grid, start_and_goal_positions: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> tuple[list[StartAndGoal], list[NodePath]]: + """ + Plan for all agents. Returned paths are in order corresponding to the returned list of `StartAndGoal` objects + """ + pass \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py index 2a47023f8c..ccc2989001 100644 --- a/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py +++ b/PathPlanning/TimeBasedPathPlanning/GridWithDynamicObstacles.py @@ -7,31 +7,7 @@ import matplotlib.pyplot as plt from enum import Enum from dataclasses import dataclass - -@dataclass(order=True) -class Position: - x: int - y: int - - def as_ndarray(self) -> np.ndarray: - return np.array([self.x, self.y]) - - def __add__(self, other): - if isinstance(other, Position): - return Position(self.x + other.x, self.y + other.y) - raise NotImplementedError( - f"Addition not supported for Position and {type(other)}" - ) - - def __sub__(self, other): - if isinstance(other, Position): - return Position(self.x - other.x, self.y - other.y) - raise NotImplementedError( - f"Subtraction not supported for Position and {type(other)}" - ) - - def __hash__(self): - return hash((self.x, self.y)) +from PathPlanning.TimeBasedPathPlanning.Node import NodePath, Position @dataclass class Interval: @@ -43,6 +19,8 @@ class ObstacleArrangement(Enum): RANDOM = 0 # Obstacles start in a line in y at center of grid and move side-to-side in x ARRANGEMENT1 = 1 + # Static obstacle arrangement + NARROW_CORRIDOR = 2 """ Generates a 2d numpy array with lists for elements. @@ -87,6 +65,8 @@ def __init__( self.obstacle_paths = self.generate_dynamic_obstacles(num_obstacles) elif obstacle_arrangement == ObstacleArrangement.ARRANGEMENT1: self.obstacle_paths = self.obstacle_arrangement_1(num_obstacles) + elif obstacle_arrangement == ObstacleArrangement.NARROW_CORRIDOR: + self.obstacle_paths = self.generate_narrow_corridor_obstacles(num_obstacles) for i, path in enumerate(self.obstacle_paths): obs_idx = i + 1 # avoid using 0 - that indicates free space in the grid @@ -184,6 +164,26 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]: obstacle_paths.append(path) return obstacle_paths + + def generate_narrow_corridor_obstacles(self, obs_count: int) -> list[list[Position]]: + obstacle_paths = [] + + for y in range(0, self.grid_size[1]): + if y > obs_count: + break + + if y == self.grid_size[1] // 2: + # Skip the middle row + continue + + obstacle_path = [] + x = self.grid_size[0] // 2 # middle of the grid + for t in range(0, self.time_limit - 1): + obstacle_path.append(Position(x, y)) + + obstacle_paths.append(obstacle_path) + + return obstacle_paths """ Check if the given position is valid at time t @@ -196,11 +196,11 @@ def obstacle_arrangement_1(self, obs_count: int) -> list[list[Position]]: bool: True if position/time combination is valid, False otherwise """ def valid_position(self, position: Position, t: int) -> bool: - # Check if new position is in grid + # Check if position is in grid if not self.inside_grid_bounds(position): return False - # Check if new position is not occupied at time t + # Check if position is not occupied at time t return self.reservation_matrix[position.x, position.y, t] == 0 """ @@ -289,9 +289,48 @@ def get_safe_intervals_at_cell(self, cell: Position) -> list[Interval]: # both the time step when it is entering the cell, and the time step when it is leaving the cell. intervals = [interval for interval in intervals if interval.start_time != interval.end_time] return intervals + + """ + Reserve an agent's path in the grid. Raises an exception if the agent's index is 0, or if a position is + already reserved by a different agent. + """ + def reserve_path(self, node_path: NodePath, agent_index: int): + if agent_index == 0: + raise Exception("Agent index cannot be 0") + + for i, node in enumerate(node_path.path): + reservation_finish_time = node.time + 1 + if i < len(node_path.path) - 1: + reservation_finish_time = node_path.path[i + 1].time -show_animation = True + self.reserve_position(node.position, agent_index, Interval(node.time, reservation_finish_time)) + + """ + Reserve a position for the provided agent during the provided time interval. + Raises an exception if the agent's index is 0, or if the position is already reserved by a different agent during the interval. + """ + def reserve_position(self, position: Position, agent_index: int, interval: Interval): + if agent_index == 0: + raise Exception("Agent index cannot be 0") + + for t in range(interval.start_time, interval.end_time + 1): + current_reserver = self.reservation_matrix[position.x, position.y, t] + if current_reserver not in [0, agent_index]: + raise Exception( + f"Agent {agent_index} tried to reserve a position already reserved by another agent: {position} at time {t}, reserved by {current_reserver}" + ) + self.reservation_matrix[position.x, position.y, t] = agent_index + + """ + Clears the initial reservation for an agent by clearing reservations at its start position with its index for + from time 0 to the time limit. + """ + def clear_initial_reservation(self, position: Position, agent_index: int): + for t in range(self.time_limit): + if self.reservation_matrix[position.x, position.y, t] == agent_index: + self.reservation_matrix[position.x, position.y, t] = 0 +show_animation = True def main(): grid = Grid( diff --git a/PathPlanning/TimeBasedPathPlanning/Node.py b/PathPlanning/TimeBasedPathPlanning/Node.py new file mode 100644 index 0000000000..728eebb676 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/Node.py @@ -0,0 +1,99 @@ +from dataclasses import dataclass +from functools import total_ordering +import numpy as np +from typing import Sequence + +@dataclass(order=True) +class Position: + x: int + y: int + + def as_ndarray(self) -> np.ndarray: + return np.array([self.x, self.y]) + + def __add__(self, other): + if isinstance(other, Position): + return Position(self.x + other.x, self.y + other.y) + raise NotImplementedError( + f"Addition not supported for Position and {type(other)}" + ) + + def __sub__(self, other): + if isinstance(other, Position): + return Position(self.x - other.x, self.y - other.y) + raise NotImplementedError( + f"Subtraction not supported for Position and {type(other)}" + ) + + def __hash__(self): + return hash((self.x, self.y)) + +@dataclass() +# Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because +# this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent +# index is just used to track the path found by the algorithm, and has no effect on the quality +# of a node. +@total_ordering +class Node: + position: Position + time: int + heuristic: int + parent_index: int + + """ + This is what is used to drive node expansion. The node with the lowest value is expanded next. + This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic) + """ + def __lt__(self, other: object): + if not isinstance(other, Node): + return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") + return (self.time + self.heuristic) < (other.time + other.heuristic) + + """ + Note: cost and heuristic are not included in eq or hash, since they will always be the same + for a given (position, time) pair. Including either cost or heuristic would be redundant. + """ + def __eq__(self, other: object): + if not isinstance(other, Node): + return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") + return self.position == other.position and self.time == other.time + + def __hash__(self): + return hash((self.position, self.time)) + +class NodePath: + path: Sequence[Node] + positions_at_time: dict[int, Position] + # Number of nodes expanded while finding this path + expanded_node_count: int + + def __init__(self, path: Sequence[Node], expanded_node_count: int): + self.path = path + self.expanded_node_count = expanded_node_count + + self.positions_at_time = {} + for i, node in enumerate(path): + reservation_finish_time = node.time + 1 + if i < len(path) - 1: + reservation_finish_time = path[i + 1].time + + for t in range(node.time, reservation_finish_time): + self.positions_at_time[t] = node.position + + """ + Get the position of the path at a given time + """ + def get_position(self, time: int) -> Position | None: + return self.positions_at_time.get(time) + + """ + Time stamp of the last node in the path + """ + def goal_reached_time(self) -> int: + return self.path[-1].time + + def __repr__(self): + repr_string = "" + for i, node in enumerate(self.path): + repr_string += f"{i}: {node}\n" + return repr_string \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/Plotting.py b/PathPlanning/TimeBasedPathPlanning/Plotting.py new file mode 100644 index 0000000000..7cd1f615d8 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/Plotting.py @@ -0,0 +1,135 @@ +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.backend_bases import KeyEvent +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + Position, +) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import StartAndGoal +from PathPlanning.TimeBasedPathPlanning.Node import NodePath + +''' +Plot a single agent path. +''' +def PlotNodePath(grid: Grid, start: Position, goal: Position, path: NodePath): + fig = plt.figure(figsize=(10, 7)) + ax = fig.add_subplot( + autoscale_on=False, + xlim=(0, grid.grid_size[0] - 1), + ylim=(0, grid.grid_size[1] - 1), + ) + ax.set_aspect("equal") + ax.grid() + ax.set_xticks(np.arange(0, grid.grid_size[0], 1)) + ax.set_yticks(np.arange(0, grid.grid_size[1], 1)) + + (start_and_goal,) = ax.plot([], [], "mD", ms=15, label="Start and Goal") + start_and_goal.set_data([start.x, goal.x], [start.y, goal.y]) + (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") + (path_points,) = ax.plot([], [], "bo", ms=10, label="Path Found") + ax.legend(bbox_to_anchor=(1.05, 1)) + + # for stopping simulation with the esc key. + plt.gcf().canvas.mpl_connect( + "key_release_event", + lambda event: [exit(0) if event.key == "escape" else None] + if isinstance(event, KeyEvent) else None + ) + + for i in range(0, path.goal_reached_time()): + obs_positions = grid.get_obstacle_positions_at_time(i) + obs_points.set_data(obs_positions[0], obs_positions[1]) + path_position = path.get_position(i) + if not path_position: + raise Exception(f"Path position not found for time {i}.") + + path_points.set_data([path_position.x], [path_position.y]) + plt.pause(0.2) + plt.show() + +''' +Plot a series of agent paths. +''' +def PlotNodePaths(grid: Grid, start_and_goals: list[StartAndGoal], paths: list[NodePath]): + fig = plt.figure(figsize=(10, 7)) + + ax = fig.add_subplot( + autoscale_on=False, + xlim=(0, grid.grid_size[0] - 1), + ylim=(0, grid.grid_size[1] - 1), + ) + ax.set_aspect("equal") + ax.grid() + ax.set_xticks(np.arange(0, grid.grid_size[0], 1)) + ax.set_yticks(np.arange(0, grid.grid_size[1], 1)) + + # Plot start and goal positions for each agent + colors = [] # generated randomly in loop + markers = ['D', 's', '^', 'o', 'p'] # Different markers for visual distinction + + # Create plots for start and goal positions + start_and_goal_plots = [] + for i, path in enumerate(paths): + marker_idx = i % len(markers) + agent_id = start_and_goals[i].index + start = start_and_goals[i].start + goal = start_and_goals[i].goal + + color = np.random.rand(3,) + colors.append(color) + sg_plot, = ax.plot([], [], markers[marker_idx], c=color, ms=15, + label=f"Agent {agent_id} Start/Goal") + sg_plot.set_data([start.x, goal.x], [start.y, goal.y]) + start_and_goal_plots.append(sg_plot) + + # Plot for obstacles + (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") + + # Create plots for each agent's path + path_plots = [] + for i, path in enumerate(paths): + agent_id = start_and_goals[i].index + path_plot, = ax.plot([], [], "o", c=colors[i], ms=10, + label=f"Agent {agent_id} Path") + path_plots.append(path_plot) + + ax.legend(bbox_to_anchor=(1.05, 1)) + + # For stopping simulation with the esc key + plt.gcf().canvas.mpl_connect( + "key_release_event", + lambda event: [exit(0) if event.key == "escape" else None] + if isinstance(event, KeyEvent) else None + ) + + # Find the maximum time across all paths + max_time = max(path.goal_reached_time() for path in paths) + + # Animation loop + for i in range(0, max_time + 1): + # Update obstacle positions + obs_positions = grid.get_obstacle_positions_at_time(i) + obs_points.set_data(obs_positions[0], obs_positions[1]) + + # Update each agent's position + for (j, path) in enumerate(paths): + path_positions = [] + if i <= path.goal_reached_time(): + res = path.get_position(i) + if not res: + print(path) + print(i) + path_position = path.get_position(i) + if not path_position: + raise Exception(f"Path position not found for time {i}.") + + # Verify position is valid + assert not path_position in obs_positions + assert not path_position in path_positions + path_positions.append(path_position) + + path_plots[j].set_data([path_position.x], [path_position.y]) + + plt.pause(0.2) + + plt.show() \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py b/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py new file mode 100644 index 0000000000..2573965cf6 --- /dev/null +++ b/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner.py @@ -0,0 +1,95 @@ +""" +Priority Based Planner for multi agent path planning. +The planner generates an order to plan in, and generates plans for the robots in that order. Each planned +path is reserved in the grid, and all future plans must avoid that path. + +Algorithm outlined in section III of this paper: https://pure.tudelft.nl/ws/portalfiles/portal/67074672/07138650.pdf +""" + +import numpy as np +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + Interval, + ObstacleArrangement, + Position, +) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import MultiAgentPlanner, StartAndGoal +from PathPlanning.TimeBasedPathPlanning.Node import NodePath +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner +from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePaths +import time + +class PriorityBasedPlanner(MultiAgentPlanner): + + @staticmethod + def plan(grid: Grid, start_and_goals: list[StartAndGoal], single_agent_planner_class: SingleAgentPlanner, verbose: bool = False) -> tuple[list[StartAndGoal], list[NodePath]]: + """ + Generate a path from the start to the goal for each agent in the `start_and_goals` list. + Returns the re-ordered StartAndGoal combinations, and a list of path plans. The order of the plans + corresponds to the order of the `start_and_goals` list. + """ + print(f"Using single-agent planner: {single_agent_planner_class}") + + # Reserve initial positions + for start_and_goal in start_and_goals: + grid.reserve_position(start_and_goal.start, start_and_goal.index, Interval(0, 10)) + + # Plan in descending order of distance from start to goal + start_and_goals = sorted(start_and_goals, + key=lambda item: item.distance_start_to_goal(), + reverse=True) + + paths = [] + for start_and_goal in start_and_goals: + if verbose: + print(f"\nPlanning for agent: {start_and_goal}" ) + + grid.clear_initial_reservation(start_and_goal.start, start_and_goal.index) + path = single_agent_planner_class.plan(grid, start_and_goal.start, start_and_goal.goal, verbose) + + if path is None: + print(f"Failed to find path for {start_and_goal}") + return [] + + agent_index = start_and_goal.index + grid.reserve_path(path, agent_index) + paths.append(path) + + return (start_and_goals, paths) + +verbose = False +show_animation = True +def main(): + grid_side_length = 21 + + start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 16)] + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + # obstacle_arrangement=ObstacleArrangement.NARROW_CORRIDOR, + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + # obstacle_arrangement=ObstacleArrangement.RANDOM, + ) + + start_time = time.time() + start_and_goals, paths = PriorityBasedPlanner.plan(grid, start_and_goals, SafeIntervalPathPlanner, verbose) + + runtime = time.time() - start_time + print(f"\nPlanning took: {runtime:.5f} seconds") + + if verbose: + print(f"Paths:") + for path in paths: + print(f"{path}\n") + + if not show_animation: + return + + PlotNodePaths(grid, start_and_goals, paths) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/PathPlanning/TimeBasedPathPlanning/SafeInterval.py b/PathPlanning/TimeBasedPathPlanning/SafeInterval.py index 7fea10c67f..446847ac6d 100644 --- a/PathPlanning/TimeBasedPathPlanning/SafeInterval.py +++ b/PathPlanning/TimeBasedPathPlanning/SafeInterval.py @@ -9,7 +9,6 @@ """ import numpy as np -import matplotlib.pyplot as plt from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( Grid, Interval, @@ -17,8 +16,11 @@ Position, empty_2d_array_of_lists, ) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +from PathPlanning.TimeBasedPathPlanning.Node import Node, NodePath +from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePath + import heapq -import random from dataclasses import dataclass from functools import total_ordering import time @@ -29,116 +31,50 @@ # index and interval member variables are just used to track the path found by the algorithm, # and has no effect on the quality of a node. @total_ordering -class Node: - position: Position - time: int - heuristic: int - parent_index: int +class SIPPNode(Node): interval: Interval - """ - This is what is used to drive node expansion. The node with the lowest value is expanded next. - This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic) - """ - def __lt__(self, other: object): - if not isinstance(other, Node): - return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") - return (self.time + self.heuristic) < (other.time + other.heuristic) - - """ - Equality only cares about position and time. Heuristic and interval will always be the same for a given - (position, time) pairing, so they are not considered in equality. - """ - def __eq__(self, other: object): - if not isinstance(other, Node): - return NotImplemented - return self.position == other.position and self.time == other.time - @dataclass class EntryTimeAndInterval: entry_time: int interval: Interval -class NodePath: - path: list[Node] - positions_at_time: dict[int, Position] = {} - - def __init__(self, path: list[Node]): - self.path = path - for (i, node) in enumerate(path): - if i > 0: - # account for waiting in interval at previous node - prev_node = path[i-1] - for t in range(prev_node.time, node.time): - self.positions_at_time[t] = prev_node.position - - self.positions_at_time[node.time] = node.position - - """ - Get the position of the path at a given time - """ - def get_position(self, time: int) -> Position | None: - return self.positions_at_time.get(time) - - """ - Time stamp of the last node in the path - """ - def goal_reached_time(self) -> int: - return self.path[-1].time - - def __repr__(self): - repr_string = "" - for i, node in enumerate(self.path): - repr_string += f"{i}: {node}\n" - return repr_string - - -class SafeIntervalPathPlanner: - grid: Grid - start: Position - goal: Position - - def __init__(self, grid: Grid, start: Position, goal: Position): - self.grid = grid - self.start = start - self.goal = goal - - # Seed randomness for reproducibility - RANDOM_SEED = 50 - random.seed(RANDOM_SEED) - np.random.seed(RANDOM_SEED) +class SafeIntervalPathPlanner(SingleAgentPlanner): """ Generate a plan given the loaded problem statement. Raises an exception if it fails to find a path. Arguments: verbose (bool): set to True to print debug information """ - def plan(self, verbose: bool = False) -> NodePath: + @staticmethod + def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: - safe_intervals = self.grid.get_safe_intervals() + safe_intervals = grid.get_safe_intervals() - open_set: list[Node] = [] - first_node_interval = safe_intervals[self.start.x, self.start.y][0] + open_set: list[SIPPNode] = [] + first_node_interval = safe_intervals[start.x, start.y][0] heapq.heappush( - open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1, first_node_interval) + open_set, SIPPNode(start, 0, SafeIntervalPathPlanner.calculate_heuristic(start, goal), -1, first_node_interval) ) - expanded_list: list[Node] = [] - visited_intervals = empty_2d_array_of_lists(self.grid.grid_size[0], self.grid.grid_size[1]) + expanded_list: list[SIPPNode] = [] + visited_intervals = empty_2d_array_of_lists(grid.grid_size[0], grid.grid_size[1]) while open_set: - expanded_node: Node = heapq.heappop(open_set) + expanded_node: SIPPNode = heapq.heappop(open_set) if verbose: print("Expanded node:", expanded_node) - if expanded_node.time + 1 >= self.grid.time_limit: + if expanded_node.time + 1 >= grid.time_limit: if verbose: print(f"\tSkipping node that is past time limit: {expanded_node}") continue - if expanded_node.position == self.goal: - print(f"Found path to goal after {len(expanded_list)} expansions") + if expanded_node.position == goal: + if verbose: + print(f"Found path to goal after {len(expanded_list)} expansions") + path = [] - path_walker: Node = expanded_node + path_walker: SIPPNode = expanded_node while True: path.append(path_walker) if path_walker.parent_index == -1: @@ -147,14 +83,14 @@ def plan(self, verbose: bool = False) -> NodePath: # reverse path so it goes start -> goal path.reverse() - return NodePath(path) + return NodePath(path, len(expanded_list)) expanded_idx = len(expanded_list) expanded_list.append(expanded_node) entry_time_and_node = EntryTimeAndInterval(expanded_node.time, expanded_node.interval) add_entry_to_visited_intervals_array(entry_time_and_node, visited_intervals, expanded_node) - for child in self.generate_successors(expanded_node, expanded_idx, safe_intervals, visited_intervals): + for child in SafeIntervalPathPlanner.generate_successors(grid, goal, expanded_node, expanded_idx, safe_intervals, visited_intervals): heapq.heappush(open_set, child) raise Exception("No path found") @@ -162,9 +98,10 @@ def plan(self, verbose: bool = False) -> NodePath: """ Generate list of possible successors of the provided `parent_node` that are worth expanding """ + @staticmethod def generate_successors( - self, parent_node: Node, parent_node_idx: int, intervals: np.ndarray, visited_intervals: np.ndarray - ) -> list[Node]: + grid: Grid, goal: Position, parent_node: SIPPNode, parent_node_idx: int, intervals: np.ndarray, visited_intervals: np.ndarray + ) -> list[SIPPNode]: new_nodes = [] diffs = [ Position(0, 0), @@ -175,7 +112,7 @@ def generate_successors( ] for diff in diffs: new_pos = parent_node.position + diff - if not self.grid.inside_grid_bounds(new_pos): + if not grid.inside_grid_bounds(new_pos): continue current_interval = parent_node.interval @@ -203,12 +140,12 @@ def generate_successors( # We know there is a node worth expanding. Generate successor at the earliest possible time the # new interval can be entered for possible_t in range(max(parent_node.time + 1, interval.start_time), min(current_interval.end_time, interval.end_time)): - if self.grid.valid_position(new_pos, possible_t): - new_nodes.append(Node( + if grid.valid_position(new_pos, possible_t): + new_nodes.append(SIPPNode( new_pos, # entry is max of interval start and parent node time + 1 (get there as soon as possible) max(interval.start_time, parent_node.time + 1), - self.calculate_heuristic(new_pos), + SafeIntervalPathPlanner.calculate_heuristic(new_pos, goal), parent_node_idx, interval, )) @@ -220,8 +157,9 @@ def generate_successors( """ Calculate the heuristic for a given position - Manhattan distance to the goal """ - def calculate_heuristic(self, position) -> int: - diff = self.goal - position + @staticmethod + def calculate_heuristic(position: Position, goal: Position) -> int: + diff = goal - position return abs(diff.x) + abs(diff.y) @@ -229,7 +167,7 @@ def calculate_heuristic(self, position) -> int: Adds a new entry to the visited intervals array. If the entry is already present, the entry time is updated if the new entry time is better. Otherwise, the entry is added to `visited_intervals` at the position of `expanded_node`. """ -def add_entry_to_visited_intervals_array(entry_time_and_interval: EntryTimeAndInterval, visited_intervals: np.ndarray, expanded_node: Node): +def add_entry_to_visited_intervals_array(entry_time_and_interval: EntryTimeAndInterval, visited_intervals: np.ndarray, expanded_node: SIPPNode): # if entry is present, update entry time if better for existing_entry_and_interval in visited_intervals[expanded_node.position.x, expanded_node.position.y]: if existing_entry_and_interval.interval == entry_time_and_interval.interval: @@ -257,8 +195,7 @@ def main(): # obstacle_arrangement=ObstacleArrangement.RANDOM, ) - planner = SafeIntervalPathPlanner(grid, start, goal) - path = planner.plan(verbose) + path = SafeIntervalPathPlanner.plan(grid, start, goal, verbose) runtime = time.time() - start_time print(f"Planning took: {runtime:.5f} seconds") @@ -268,35 +205,7 @@ def main(): if not show_animation: return - fig = plt.figure(figsize=(10, 7)) - ax = fig.add_subplot( - autoscale_on=False, - xlim=(0, grid.grid_size[0] - 1), - ylim=(0, grid.grid_size[1] - 1), - ) - ax.set_aspect("equal") - ax.grid() - ax.set_xticks(np.arange(0, grid_side_length, 1)) - ax.set_yticks(np.arange(0, grid_side_length, 1)) - - (start_and_goal,) = ax.plot([], [], "mD", ms=15, label="Start and Goal") - start_and_goal.set_data([start.x, goal.x], [start.y, goal.y]) - (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") - (path_points,) = ax.plot([], [], "bo", ms=10, label="Path Found") - ax.legend(bbox_to_anchor=(1.05, 1)) - - # for stopping simulation with the esc key. - plt.gcf().canvas.mpl_connect( - "key_release_event", lambda event: [exit(0) if event.key == "escape" else None] - ) - - for i in range(0, path.goal_reached_time() + 1): - obs_positions = grid.get_obstacle_positions_at_time(i) - obs_points.set_data(obs_positions[0], obs_positions[1]) - path_position = path.get_position(i) - path_points.set_data([path_position.x], [path_position.y]) - plt.pause(0.2) - plt.show() + PlotNodePath(grid, start, goal, path) if __name__ == "__main__": diff --git a/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py index c4e2802d37..b85569f5dc 100644 --- a/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py +++ b/PathPlanning/TimeBasedPathPlanning/SpaceTimeAStar.py @@ -9,101 +9,25 @@ """ import numpy as np -import matplotlib.pyplot as plt from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( Grid, ObstacleArrangement, Position, ) +from PathPlanning.TimeBasedPathPlanning.Node import Node, NodePath import heapq from collections.abc import Generator -import random -from dataclasses import dataclass -from functools import total_ordering import time +from PathPlanning.TimeBasedPathPlanning.BaseClasses import SingleAgentPlanner +from PathPlanning.TimeBasedPathPlanning.Plotting import PlotNodePath -# Seed randomness for reproducibility -RANDOM_SEED = 50 -random.seed(RANDOM_SEED) -np.random.seed(RANDOM_SEED) - -@dataclass() -# Note: Total_ordering is used instead of adding `order=True` to the @dataclass decorator because -# this class needs to override the __lt__ and __eq__ methods to ignore parent_index. Parent -# index is just used to track the path found by the algorithm, and has no effect on the quality -# of a node. -@total_ordering -class Node: - position: Position - time: int - heuristic: int - parent_index: int +class SpaceTimeAStar(SingleAgentPlanner): - """ - This is what is used to drive node expansion. The node with the lowest value is expanded next. - This comparison prioritizes the node with the lowest cost-to-come (self.time) + cost-to-go (self.heuristic) - """ - def __lt__(self, other: object): - if not isinstance(other, Node): - return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") - return (self.time + self.heuristic) < (other.time + other.heuristic) - - """ - Note: cost and heuristic are not included in eq or hash, since they will always be the same - for a given (position, time) pair. Including either cost or heuristic would be redundant. - """ - def __eq__(self, other: object): - if not isinstance(other, Node): - return NotImplementedError(f"Cannot compare Node with object of type: {type(other)}") - return self.position == other.position and self.time == other.time - - def __hash__(self): - return hash((self.position, self.time)) - -class NodePath: - path: list[Node] - positions_at_time: dict[int, Position] = {} - - def __init__(self, path: list[Node]): - self.path = path - for node in path: - self.positions_at_time[node.time] = node.position - - """ - Get the position of the path at a given time - """ - def get_position(self, time: int) -> Position | None: - return self.positions_at_time.get(time) - - """ - Time stamp of the last node in the path - """ - def goal_reached_time(self) -> int: - return self.path[-1].time - - def __repr__(self): - repr_string = "" - for i, node in enumerate(self.path): - repr_string += f"{i}: {node}\n" - return repr_string - - -class SpaceTimeAStar: - grid: Grid - start: Position - goal: Position - # Used to evaluate solutions - expanded_node_count: int = -1 - - def __init__(self, grid: Grid, start: Position, goal: Position): - self.grid = grid - self.start = start - self.goal = goal - - def plan(self, verbose: bool = False) -> NodePath: + @staticmethod + def plan(grid: Grid, start: Position, goal: Position, verbose: bool = False) -> NodePath: open_set: list[Node] = [] heapq.heappush( - open_set, Node(self.start, 0, self.calculate_heuristic(self.start), -1) + open_set, Node(start, 0, SpaceTimeAStar.calculate_heuristic(start, goal), -1) ) expanded_list: list[Node] = [] @@ -113,13 +37,15 @@ def plan(self, verbose: bool = False) -> NodePath: if verbose: print("Expanded node:", expanded_node) - if expanded_node.time + 1 >= self.grid.time_limit: + if expanded_node.time + 1 >= grid.time_limit: if verbose: print(f"\tSkipping node that is past time limit: {expanded_node}") continue - if expanded_node.position == self.goal: - print(f"Found path to goal after {len(expanded_list)} expansions") + if expanded_node.position == goal: + if verbose: + print(f"Found path to goal after {len(expanded_list)} expansions") + path = [] path_walker: Node = expanded_node while True: @@ -130,14 +56,13 @@ def plan(self, verbose: bool = False) -> NodePath: # reverse path so it goes start -> goal path.reverse() - self.expanded_node_count = len(expanded_set) - return NodePath(path) + return NodePath(path, len(expanded_set)) expanded_idx = len(expanded_list) expanded_list.append(expanded_node) expanded_set.add(expanded_node) - for child in self.generate_successors(expanded_node, expanded_idx, verbose, expanded_set): + for child in SpaceTimeAStar.generate_successors(grid, goal, expanded_node, expanded_idx, verbose, expanded_set): heapq.heappush(open_set, child) raise Exception("No path found") @@ -145,8 +70,9 @@ def plan(self, verbose: bool = False) -> NodePath: """ Generate possible successors of the provided `parent_node` """ + @staticmethod def generate_successors( - self, parent_node: Node, parent_node_idx: int, verbose: bool, expanded_set: set[Node] + grid: Grid, goal: Position, parent_node: Node, parent_node_idx: int, verbose: bool, expanded_set: set[Node] ) -> Generator[Node, None, None]: diffs = [ Position(0, 0), @@ -160,20 +86,22 @@ def generate_successors( new_node = Node( new_pos, parent_node.time + 1, - self.calculate_heuristic(new_pos), + SpaceTimeAStar.calculate_heuristic(new_pos, goal), parent_node_idx, ) if new_node in expanded_set: continue - if self.grid.valid_position(new_pos, parent_node.time + 1): + # Check if the new node is valid for the next 2 time steps - one step to enter, and another to leave + if all([grid.valid_position(new_pos, parent_node.time + dt) for dt in [1, 2]]): if verbose: print("\tNew successor node: ", new_node) yield new_node - def calculate_heuristic(self, position) -> int: - diff = self.goal - position + @staticmethod + def calculate_heuristic(position: Position, goal: Position) -> int: + diff = goal - position return abs(diff.x) + abs(diff.y) @@ -194,8 +122,7 @@ def main(): obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, ) - planner = SpaceTimeAStar(grid, start, goal) - path = planner.plan(verbose) + path = SpaceTimeAStar.plan(grid, start, goal, verbose) runtime = time.time() - start_time print(f"Planning took: {runtime:.5f} seconds") @@ -206,36 +133,7 @@ def main(): if not show_animation: return - fig = plt.figure(figsize=(10, 7)) - ax = fig.add_subplot( - autoscale_on=False, - xlim=(0, grid.grid_size[0] - 1), - ylim=(0, grid.grid_size[1] - 1), - ) - ax.set_aspect("equal") - ax.grid() - ax.set_xticks(np.arange(0, grid_side_length, 1)) - ax.set_yticks(np.arange(0, grid_side_length, 1)) - - (start_and_goal,) = ax.plot([], [], "mD", ms=15, label="Start and Goal") - start_and_goal.set_data([start.x, goal.x], [start.y, goal.y]) - (obs_points,) = ax.plot([], [], "ro", ms=15, label="Obstacles") - (path_points,) = ax.plot([], [], "bo", ms=10, label="Path Found") - ax.legend(bbox_to_anchor=(1.05, 1)) - - # for stopping simulation with the esc key. - plt.gcf().canvas.mpl_connect( - "key_release_event", lambda event: [exit(0) if event.key == "escape" else None] - ) - - for i in range(0, path.goal_reached_time()): - obs_positions = grid.get_obstacle_positions_at_time(i) - obs_points.set_data(obs_positions[0], obs_positions[1]) - path_position = path.get_position(i) - path_points.set_data([path_position.x], [path_position.y]) - plt.pause(0.2) - plt.show() - + PlotNodePath(grid, start, goal, path) if __name__ == "__main__": main() diff --git a/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst index 9517e95b31..a44dd20a97 100644 --- a/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst +++ b/docs/modules/5_path_planning/time_based_grid_search/time_based_grid_search_main.rst @@ -81,9 +81,28 @@ Code Link ^^^^^^^^^^^^^ .. autoclass:: PathPlanning.TimeBasedPathPlanning.SafeInterval.SafeIntervalPathPlanner +Multi-Agent Path Planning +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Priority Based Planning +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The planner generates an order to plan in, and generates plans for the robots in that order. Each planned path is reserved in the grid, and all future plans must avoid that path. The robots are planned for in descending order of distance from start to goal. + +This is a sub-optimal algorithm because no replanning happens once paths are found. It does not guarantee the shortest path is found for any particular robot, nor that the final set of paths found contains the lowest possible amount of time or movement. + +Static Obstacles: + +.. image:: https://raw.githubusercontent.com/AtsushiSakai/PythonRoboticsGifs/refs/heads/master/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner/priority_planner2.gif + +Dynamic Obstacles: + +.. image:: https://raw.githubusercontent.com/AtsushiSakai/PythonRoboticsGifs/refs/heads/master/PathPlanning/TimeBasedPathPlanning/PriorityBasedPlanner/priority_planner.gif + References ~~~~~~~~~~~ - `Cooperative Pathfinding `__ - `SIPP: Safe Interval Path Planning for Dynamic Environments `__ +- `Prioritized Planning Algorithms for Trajectory Coordination of Multiple Mobile Robots `__ \ No newline at end of file diff --git a/tests/test_priority_based_planner.py b/tests/test_priority_based_planner.py new file mode 100644 index 0000000000..e2ff602d88 --- /dev/null +++ b/tests/test_priority_based_planner.py @@ -0,0 +1,39 @@ +from PathPlanning.TimeBasedPathPlanning.GridWithDynamicObstacles import ( + Grid, + NodePath, + ObstacleArrangement, + Position, +) +from PathPlanning.TimeBasedPathPlanning.BaseClasses import StartAndGoal +from PathPlanning.TimeBasedPathPlanning import PriorityBasedPlanner as m +from PathPlanning.TimeBasedPathPlanning.SafeInterval import SafeIntervalPathPlanner +import numpy as np +import conftest + + +def test_1(): + grid_side_length = 21 + + start_and_goals = [StartAndGoal(i, Position(1, i), Position(19, 19-i)) for i in range(1, 16)] + obstacle_avoid_points = [pos for item in start_and_goals for pos in (item.start, item.goal)] + + grid = Grid( + np.array([grid_side_length, grid_side_length]), + num_obstacles=250, + obstacle_avoid_points=obstacle_avoid_points, + obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1, + ) + + m.show_animation = False + + start_and_goals: list[StartAndGoal] + paths: list[NodePath] + start_and_goals, paths = m.PriorityBasedPlanner.plan(grid, start_and_goals, SafeIntervalPathPlanner, False) + + # All paths should start at the specified position and reach the goal + for i, start_and_goal in enumerate(start_and_goals): + assert paths[i].path[0].position == start_and_goal.start + assert paths[i].path[-1].position == start_and_goal.goal + +if __name__ == "__main__": + conftest.run_this_test(__file__) diff --git a/tests/test_safe_interval_path_planner.py b/tests/test_safe_interval_path_planner.py index f1fbf90d2a..bbcd4e427c 100644 --- a/tests/test_safe_interval_path_planner.py +++ b/tests/test_safe_interval_path_planner.py @@ -18,9 +18,7 @@ def test_1(): ) m.show_animation = False - planner = m.SafeIntervalPathPlanner(grid, start, goal) - - path = planner.plan(False) + path = m.SafeIntervalPathPlanner.plan(grid, start, goal) # path should have 31 entries assert len(path.path) == 31 diff --git a/tests/test_space_time_astar.py b/tests/test_space_time_astar.py index 390c7732dc..1194c61d2e 100644 --- a/tests/test_space_time_astar.py +++ b/tests/test_space_time_astar.py @@ -18,9 +18,7 @@ def test_1(): ) m.show_animation = False - planner = m.SpaceTimeAStar(grid, start, goal) - - path = planner.plan(False) + path = m.SpaceTimeAStar.plan(grid, start, goal) # path should have 28 entries assert len(path.path) == 31 @@ -28,7 +26,7 @@ def test_1(): # path should end at the goal assert path.path[-1].position == goal - assert planner.expanded_node_count < 1000 + assert path.expanded_node_count < 1000 if __name__ == "__main__": conftest.run_this_test(__file__)