diff --git a/.gitignore b/.gitignore index 0d20b64..73ec5e1 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -*.pyc +*.pyc + +.vscode \ No newline at end of file diff --git a/nova_rover_demos/pathfinding/a_star.py b/nova_rover_demos/pathfinding/a_star.py index e2b2138..f961ec6 100644 --- a/nova_rover_demos/pathfinding/a_star.py +++ b/nova_rover_demos/pathfinding/a_star.py @@ -1,3 +1,7 @@ +import numpy as np +from itertools import product +from math import sqrt, inf +from pathfinding.heuristic import euclidean_cost import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") @@ -6,10 +10,6 @@ except: raise -from pathfinding.heuristic import euclidean_cost -from math import sqrt, inf -from itertools import product -import numpy as np def reconstruct_path_to_destination(prev, end): """ @@ -25,6 +25,8 @@ def reconstruct_path_to_destination(prev, end): return path # A* Search + + def get_successors(node, grid): """ The neighbors of a cell (node) in the grid are the 8-surrounding cells. @@ -35,7 +37,7 @@ def get_successors(node, grid): n_rows = len(grid) n_cols = len(grid[0]) - for dx, dy in product([-1,0,1],[-1,0,1]): + for dx, dy in product([-1, 0, 1], [-1, 0, 1]): # skip the current node itself if (dx == 0 and dy == 0): continue @@ -49,17 +51,20 @@ def get_successors(node, grid): # put infinite penalty on successors that would take us off the edge of the grid cost = inf - successors.append( ((x, y), cost) ) + successors.append(((x, y), cost)) return successors -def node_with_min_fscore(open_set, f_cost): # open_set is a set (of cell) and f_cost is a dict (with cells as keys) + +# open_set is a set (of cell) and f_cost is a dict (with cells as keys) +def node_with_min_fscore(open_set, f_cost): """ Find the cell in open set with the smallest f score. """ f_cost_open = dict([a for a in f_cost.items() if a[0] in open_set]) return min(f_cost_open, key=f_cost_open.get) + def a_star_search(grid, start, end, heuristic_cost=euclidean_cost): """ Implementation of A Star over a 2D grid. Returns a list of waypoints @@ -108,7 +113,7 @@ def a_star_search(grid, start, end, heuristic_cost=euclidean_cost): if neighbor in closed_set: continue - curr_g_score = g_cost[curr] + cost + curr_g_score = g_cost[curr] + cost # add neighbor to newly discovered nodes if neighbor not in open_set: open_set.add(neighbor) @@ -122,4 +127,4 @@ def a_star_search(grid, start, end, heuristic_cost=euclidean_cost): f_cost[neighbor] = g_cost[neighbor] + heuristic_cost(neighbor, end) # if we get to this point, it's not possible to reach the end destination - return [] \ No newline at end of file + return [] diff --git a/nova_rover_demos/pathfinding/comparison_platform/README.md b/nova_rover_demos/pathfinding/comparison_platform/README.md new file mode 100644 index 0000000..a3d2439 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/README.md @@ -0,0 +1,103 @@ +# Path Finding Algorithm Comparison Platform + +## Introduction +This is software platform aimed at comparing the relative performance of various path planning/route finding algorithms. This software tool benchmarks the processing time, peak memory usage and distance of the path found and presents them in a visual manner. We have designed our path finding software in such a way so easily a range of different algorithms can be added to the platform and their performance can be benchmarked. + +Currently in the repository there are three classes of pathfinding algorithms are present as default: +1. A* and variants (Heuristic based family) +2. Pledge (Wall follower family) +3. RRT and variants (Probabilistic family) + + +## Usage + +Our goal is to create a scalable easy to use function. To use the comparison platform you simply need to: + +1. Import your algorithm(s) into the comparison platform +2. Select it inside the `run.py` file + +### 1. Importing your Algorithm(s) + +**Step 1:** You can import the file or folder of the algorithm inside the lib folder. + +Lib folder + +**Step 2:** Ensure you have fixed any relative imports inside your files. This step is only applicable if you have multiple files are they're importing functions/classes from each other. + +Relative import + +Because our a-star variant file imports a class from a-star and both are under the lib folder, we import in the manner shown in the image above. + +**Step 3:** Write a wrapper class (If applicable). Currently the comparison platform is designed in a way that all functions accept 3 arguments in the order: +1. List of obstacles (A list of (x, y) tuples) +2. A starting coordinate (A (x, y) format tuple) +3. A goal coordinate (A (x, y) format tuple) + +If your algorithm accepts other out formats, you can call the main algorithm with all the necessary arguments inside the wrapper function. + +Wrapper class + +We have put together a very [short guide](wrapper.md) on writing the wrapper functions and you can find [here](wrapper.md). It has a template code for you to get started as well. + +### 2. Configure `run.py` + +**Step 4:** Import algorithm or wrapper function in the `run.py` file. + + +Run.py file + +Importing the files should be done as the following (Remember to do relative importing): + +Importing algorithms + + +**Step 5:** List your algorithm / wrapper function in the algorithm list. + +Listing algorithms + +**Step 6:** Run `run.py` file. After completing the easy steps above simply run the following command: + +```python +python run.py +``` + +You can also select the number of times the functions will be tested. For example: + +```python +python run.py 10 +``` + +**Step 7 (Optional):** Configure how dense you want the environment to be. + +Map density + + +### Check algorithm is working correctly + +Even if no errors are thrown during runtime, you should check the results folder. In the results folder a visual map (PDF file) with the planned route should be automatically generated with the name of your algorithm/wrapper function. Please review this figure and see if everything is okay. + +You can also check the CSV generated inside the results folder to cross check things. + + +## Results + +The main power of this comparison platform lies within the extensive results it generated. The comparison platform gives you visualization of the following key performance factors: +- Runtime +- Memory Comparison +- Average path length + + +Upon running the program you should see a couple of figures which give you a visual comparison of the algorithms key metrics. + + +Result sample 1 Result sample 2 + + +Furthermore inside the results folder you can also view the map of the environment and the path generated by each of the algorithms in separate PDF file. Sample: + +Map sample + + +You have also have access to the raw numbers inside a CSV file inside the results folder. + +CSV sample \ No newline at end of file diff --git a/nova_rover_demos/pathfinding/comparison_platform/README.md.txt b/nova_rover_demos/pathfinding/comparison_platform/README.md.txt deleted file mode 100644 index e69de29..0000000 diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/algorithm-list.png b/nova_rover_demos/pathfinding/comparison_platform/img/algorithm-list.png new file mode 100644 index 0000000..a758184 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/algorithm-list.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/csv-sample.png b/nova_rover_demos/pathfinding/comparison_platform/img/csv-sample.png new file mode 100644 index 0000000..29d9532 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/csv-sample.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/lib-folder.png b/nova_rover_demos/pathfinding/comparison_platform/img/lib-folder.png new file mode 100644 index 0000000..7d74ecd Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/lib-folder.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/map-density.png b/nova_rover_demos/pathfinding/comparison_platform/img/map-density.png new file mode 100644 index 0000000..b9b3ac4 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/map-density.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/map-sample.png b/nova_rover_demos/pathfinding/comparison_platform/img/map-sample.png new file mode 100644 index 0000000..4b97eee Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/map-sample.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/relative-import.png b/nova_rover_demos/pathfinding/comparison_platform/img/relative-import.png new file mode 100644 index 0000000..fd3063b Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/relative-import.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/results-fig-1.png b/nova_rover_demos/pathfinding/comparison_platform/img/results-fig-1.png new file mode 100644 index 0000000..f433316 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/results-fig-1.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/results-fig-2.png b/nova_rover_demos/pathfinding/comparison_platform/img/results-fig-2.png new file mode 100644 index 0000000..44d3ba8 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/results-fig-2.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/results-folder.png b/nova_rover_demos/pathfinding/comparison_platform/img/results-folder.png new file mode 100644 index 0000000..e6c3863 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/results-folder.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/run-file.png b/nova_rover_demos/pathfinding/comparison_platform/img/run-file.png new file mode 100644 index 0000000..53c7da6 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/run-file.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/runpy-import.png b/nova_rover_demos/pathfinding/comparison_platform/img/runpy-import.png new file mode 100644 index 0000000..ab51da6 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/runpy-import.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/img/wrapper-demo.png b/nova_rover_demos/pathfinding/comparison_platform/img/wrapper-demo.png new file mode 100644 index 0000000..51b3e5d Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/img/wrapper-demo.png differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/a_star.py b/nova_rover_demos/pathfinding/comparison_platform/lib/a_star.py new file mode 100644 index 0000000..fb44a56 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/a_star.py @@ -0,0 +1,169 @@ +''' +This file is provided for additional testing of the different algorithms +''' + +import time +import random +import math +import heapq +from collections import defaultdict + +# Wrapper class with added functionalities + + +class PriorityQueue: + def __init__(self): + self.elements = [] + + def isEmpty(self): + return len(self.elements) == 0 + + def put(self, item, priority): + heapq.heappush(self.elements, (priority, item)) + + def get(self): + # Return only the item + return heapq.heappop(self.elements)[1] + + +# Function to reconstruct path from start to goal using the source dictionary +def reconstruct_path(came_from, start, goal): + current = goal + path = [] + + while current != start: + path.append(current) + current = came_from[current] + + # Add the start to the path - optional + path.append(start) + # reverse the path to get from start to goal + path.reverse() + return path + +# Combine two paths and avoid happing overlapping nodes + + +def join_paths(path_1, path_2): + path_2.reverse() + if(path_1[-1] == path_2[0]): + return path_1 + path_2[1:] + + +# The heuristic used by the a-star algorithm +# This is used to calculate the distance of current node form the goal too +def manhattan_heuristic(a, b): + (x1, y1) = a + (x2, y2) = b + + return abs(x1 - x2) + abs(y1 - y2) + +# Diagonal heuristic which considers the diagonal value of nodes + + +def diagonal_heuristic(a, b): + (x1, y1) = a + (x2, y2) = b + + return max(abs(x1 - x2), abs(y1 - y2)) + +# Taking the euclidian distance as heuristics + + +def euclidian_heuristic(a, b): + (x1, y1) = a + (x2, y2) = b + + return math.sqrt((x1 - x2)**2 + (y1 - y2)**2) + +# Implementation of the a-star search algorithm + + +def a_star_search(graph, start, goal): + # Priority Queue track progression of nodes + frontier = PriorityQueue() + frontier.put(start, 0) + + # Dictionary to track origin of a node + came_from = {} + # Dictionary to track the cost to move to a particular node + cost_so_far = {} + + # Add starting node into the dictionaries + came_from[start] = None + cost_so_far[start] = 0 + + # While the Priority queue is not empty + while not frontier.isEmpty(): + # Get the top of queue + current = frontier.get() + + # check if we have reached destination + if current == goal: + break + + # Loop through neighbors of current node and process them + for next in graph.neighbors(current): + # Calculate the new cost to travel to neighboring node + # The new cost is cost of travelling to current node plus the cost of + # travelling from current node to the neighbor + new_cost = cost_so_far[current] + graph.cost(current, next) + + # Check if this node hasn't been reached before or we have a new cheaper path + if next not in cost_so_far or new_cost < cost_so_far[next]: + cost_so_far[next] = new_cost + # Set the priority of the neighbor using the heuristic + # We are taking distance to goal in consideration through heuristics + priority = new_cost + manhattan_heuristic(goal, next) + frontier.put(next, priority) + came_from[next] = current + + # Return the cost and source dictionary + # return came_from, cost_so_far + return reconstruct_path(came_from, start, goal) + + +# A wrapper function around the main star search function +def a_star(oc_grid, start, end): + diagram = OpenGrid() + diagram.add_walls(oc_grid) + try: + path = a_star_search(diagram, start, end) + maze_solved = True + except: + path = start + maze_solved = False + + return path, maze_solved + + +# A class which handles the grid world for A-star +class OpenGrid: + def __init__(self): + self.walls = defaultdict(int) + self.weights = {} + + # Check if current location is blocked for not + def passable(self, id): + return False if self.walls[id] == 1 else True + + # Check the neighbors of the current grid + def neighbors(self, id): + (x, y) = id + result = [(x+1, y), (x, y-1), (x-1, y), (x, y+1)] + # Just for aesthetics + if(x + y) % 2 == 0: + result.reverse() + # Check if the neighbors are not blocked + result = list(filter(self.passable, result)) + + return result + + # Method to get cost to travel from weights, else default value of 1 + def cost(self, from_node, to_node): + return self.weights.get(to_node, 1) + + # Helper method to add walls to the dictionary + def add_walls(self, walls): + for wall in walls: + self.walls[wall] = 1 diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/a_star_variants.py b/nova_rover_demos/pathfinding/comparison_platform/lib/a_star_variants.py new file mode 100644 index 0000000..8ab01f8 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/a_star_variants.py @@ -0,0 +1,259 @@ +from lib.a_star import * +from collections import defaultdict + +# Wrapper function + + +def bidirectional_a_star(oc_grid, start, end): + # Instantiate appropriate attributes if necessary + diagram = OpenGrid() + diagram.add_walls(oc_grid) + try: + # Call the main algorithm + path = bidirectional_a_star_core(diagram, start, end) + maze_solved = True + except: + path = start + maze_solved = False + + # Return a list of tuples as path and + # a boolean indicating maze solved or not + return path, maze_solved + + +# Wrapper function + + +def weighted_a_star(oc_grid, start, end): + diagram = OpenGrid() + diagram.add_walls(oc_grid) + try: + path = weighted_a_star_core(diagram, start, end) + maze_solved = True + except: + path = start + maze_solved = False + + return path, maze_solved + +# Wrapper function + + +def dynamic_weighted_a_star(oc_grid, start, end): + diagram = OpenGrid() + diagram.add_walls(oc_grid) + try: + path = dynamic_weighted_a_star_core(diagram, start, end) + maze_solved = True + except: + path = start + maze_solved = False + + return path, maze_solved + + +# Implementation of bidirectional a-start +# This algorithm will start looking for a path from both the start and the goal +def bidirectional_a_star_core(graph, start, goal): + # Priority queues to hold the progression of nodes + # This will hold nodes starting from the 'start' node + frontier_1 = PriorityQueue() + frontier_1.put(start, 0) + # This will node nodes starting from the 'goal' node + frontier_2 = PriorityQueue() + frontier_2.put(goal, 0) + + # Dictionaries to hold the cost of travel and origin of nodes + # For the start node + came_from_1 = {} + cost_so_far_1 = {} + + came_from_1[start] = None + cost_so_far_1[start] = 0 + + # For the goal node + came_from_2 = {} + cost_so_far_2 = {} + + came_from_2[goal] = None + cost_so_far_2[goal] = 0 + + # Run the loop until both of the + while not frontier_1.isEmpty() or frontier_2.isEmpty(): + # Get the top from each of the queues + current_1 = frontier_1.get() + current_2 = frontier_2.get() + + # Check if we have found a full path or not + if current_1 == goal or current_2 == start: + break + # We have found two overlapping node + if current_1 in cost_so_far_2 and current_2 in cost_so_far_1: + path_1 = reconstruct_path(came_from_1, start, current_1) + path_2 = reconstruct_path(came_from_2, goal, current_2) + combined_path = join_paths(path_1, path_2) + break + # Only one of the nodes are are overlapping + elif (current_1 in cost_so_far_2): + path_1 = reconstruct_path(came_from_1, start, current_1) + path_2 = reconstruct_path(came_from_2, goal, current_1) + combined_path = join_paths(path_1, path_2) + break + + elif (current_2 in cost_so_far_1): + path_1 = reconstruct_path(came_from_1, start, current_2) + path_2 = reconstruct_path(came_from_2, goal, current_2) + combined_path = join_paths(path_1, path_2) + break + + # A detailed description of the steps can be found in the implementation of + # a_star_search function + # Process the nodes from the start side + for next in graph.neighbors(current_1): + new_cost = cost_so_far_1[current_1] + graph.cost(current_1, next) + + if next not in cost_so_far_1 or new_cost < cost_so_far_1[next]: + cost_so_far_1[next] = new_cost + priority = new_cost + manhattan_heuristic(goal, next) + frontier_1.put(next, priority) + came_from_1[next] = current_1 + + # Process the nodes from the goal side + for next in graph.neighbors(current_2): + new_cost = cost_so_far_2[current_2] + graph.cost(current_2, next) + + if next not in cost_so_far_2 or new_cost < cost_so_far_2[next]: + cost_so_far_2[next] = new_cost + priority = new_cost + manhattan_heuristic(start, next) + frontier_2.put(next, priority) + came_from_2[next] = current_2 + + return combined_path + + +# Weighted A-star +''' + Weighting sacrifices solution optimality to speed up the search. The larger the weight, + the more greedy the search. + Weighted A* does not provide the optimal path. Rather speeds up the process. z + + The movement cost used for this is: + f = g + w2 * h + Here, + g = g-cost -> Distance from start + h = h-cost -> Distance from goal + w2 = (1 + e) where e > 0 = How much we consider the heuristic; + For this algorithm + e = 4. So w2 = 5 => Theoretically return us a solution 5 times faster +''' + + +def weighted_a_star_core(graph, start, goal): + # The weight which we will prioritise the goal + WEIGHT = 5 + # Priority Queue track progression of nodes + frontier = PriorityQueue() + frontier.put(start, 0) + + # Dictionary to track origin of a node + came_from = {} + # Dictionary to track the cost to move to a particular node + cost_so_far = {} + + # Add starting node into the dictionaries + came_from[start] = None + cost_so_far[start] = 0 + + # While the Priority queue is not empty + while not frontier.isEmpty(): + # Get the top of queue + current = frontier.get() + + # check if we have reached destination + if current == goal: + break + + # Loop through neighbors of current node and process them + for next in graph.neighbors(current): + # Calculate the new cost to travel to neighboring node + # The new cost is cost of travelling to current node plus the cost of + # travelling from current node to the neighbor + new_cost = cost_so_far[current] + graph.cost(current, next) + + # Check if this node hasn't been reached before or we have a new cheaper path + if next not in cost_so_far or new_cost < cost_so_far[next]: + cost_so_far[next] = new_cost + # Set the priority of the neighbor using the heuristic + # We are taking distance to goal in consideration through heuristics + priority = new_cost + manhattan_heuristic(goal, next) * WEIGHT + frontier.put(next, priority) + came_from[next] = current + + # Return the cost and source dictionary + # return came_from, cost_so_far + return reconstruct_path(came_from, start, goal) + + +# Dynamically weighted A* +def dynamic_weighted_a_star_core(graph, start, goal, node_threshold, epsilon=2): + # The weight which we will prioritise the goal + weight = 1 + # Priority Queue track progression of nodes + frontier = PriorityQueue() + frontier.put(start, 0) + + # Dictionary to track origin of a node + came_from = {} + # Dictionary to track the cost to move to a particular node + cost_so_far = {} + + # Add starting node into the dictionaries + came_from[start] = None + cost_so_far[start] = 0 + + # Counter to track the depth + count = 0 + depth = 1 + + # While the Priority queue is not empty + while not frontier.isEmpty(): + # Get the top of queue + current = frontier.get() + count += 1 + + # Adjust depth of the search + # If we have searched all nodes in that level increase depth + if count == 4 ** depth: + depth += 1 + + # check if we have reached destination + if current == goal: + break + + # Dynamically calculate the weight + if(depth <= node_threshold): + # Dynamic weighting + weight = 1 - (depth / node_threshold) + else: + weight = 0 + + # Loop through neighbors of current node and process them + for next in graph.neighbors(current): + # Calculate the new cost to travel to neighboring node + # The new cost is cost of travelling to current node plus the cost of + # travelling from current node to the neighbor + new_cost = cost_so_far[current] + graph.cost(current, next) + + # Check if this node hasn't been reached before or we have a new cheaper path + if next not in cost_so_far or new_cost < cost_so_far[next]: + cost_so_far[next] = new_cost + # Set the priority of the neighbor using the heuristic + # We are taking distance to goal in consideration through heuristics + priority = new_cost + \ + manhattan_heuristic(goal, next) * (1 + weight * epsilon) + frontier.put(next, priority) + came_from[next] = current + + # Return the cost and source dictionary + # return came_from, cost_so_far + return reconstruct_path(came_from, start, goal) diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/modified_pledge.py b/nova_rover_demos/pathfinding/comparison_platform/lib/modified_pledge.py new file mode 100644 index 0000000..dda7a63 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/modified_pledge.py @@ -0,0 +1,214 @@ +import numpy as np + + +def modified_pledge(occupancy_grid, start_position, end_position): + + + maze = Labyrinth(occupancy_grid, start_position, end_position) + rover = Mouse(maze) + while rover.has_reached_exit() == False: + rover.move() + return rover.path, not(rover.infinite_loop) + + +class Mouse: + + def __init__(self, labyrinth): + self.__labyrinth = labyrinth + self.position = labyrinth.start_pos + + self.direction = self.direction_to_goal() #Number between 0-3 + + self.wall_following = False + self.wall_following_direction = 'Left' + self.sum_turns = 0 + + self.path = [self.position] + self.path_dict = {} + self.path_dict[self.position] = 1 + + self.infinite_loop = False + + def direction_to_goal(self): + x,y = np.array(self.__labyrinth.goal) - np.array(self.position) + theta = np.arctan(y/x) * 180/np.pi + ''' + Account for ambiguity from arctan: specify direction around unit circle + ''' + if x < 0: + if y >= 0: + theta += 180 + else: + theta -= 180 + directions = np.array([0, 90, 180, -90]) + idx = np.abs(directions - theta).argmin() + return idx + + def has_reached_exit(self): + if self.check_loop(): #Checks for loops to end the code] + self.infinite_loop = True + out = True + else: + out = tuple(self.position) == self.__labyrinth.goal + return out + + def turn_right(self): + return (self.direction - 1)%4 + + def turn_left(self): + return (self.direction + 1)%4 + + def next_right(self): + dir = self.direction + for j in range(1,3+1): + if (dir - j)%4 in self.__labyrinth.get_allowed_directions(self.position): + out = (dir - j)%4 + break + return out, -j + + def next_left(self): + dir = self.direction + for j in range(1,3+1): + if (dir + j)%4 in self.__labyrinth.get_allowed_directions(self.position): + out = (dir + j)%4 + break + return out, j + + def continue_straight(self): + return self.direction, 0 + + def step(self, turn_direction): + return tuple( np.array(self.position) + np.array(self.__labyrinth.movements[turn_direction]) ) + + def check_loop(self): + ''' + Returns 'True' if a single cell is visited for a third time (Less visits might be sufficient to detect loop) + ''' + return max(self.path_dict.values()) >= 15 + + def move(self): + if self.has_reached_exit() == False: + ''' + Logic for moving directly towards goal + ''' + if self.wall_following == False: + if self.direction_to_goal() in self.__labyrinth.get_allowed_directions(self.position): + self.position = tuple(self.position + self.__labyrinth.movements[self.direction_to_goal()]) + self.direction = self.direction_to_goal() + self.path.append(tuple(self.position)) + else: + self.wall_following = True #If a wall is encountered, begin wall following + + if self.path.count(self.position) > 1: #Turn right if location has been visited before + self.wall_following_direction = 'Right' + ''' + Begin wall following phase by first aligning mouse perpendicular to wall + ''' + self.direction, num_turns = self.next_right() + self.sum_turns += num_turns + + else: + self.wall_following_direction = 'Left' + ''' + Begin wall following phase by first aligning mouse perpendicular to wall + ''' + self.direction, num_turns = self.next_left() + self.sum_turns += num_turns + else: + ''' + Logic for Wall Following + ''' + if self.wall_following_direction == 'Left': + if self.turn_right() in self.__labyrinth.get_allowed_directions(self.position): + ''' + Turn Right if Possible + ''' + self.direction, num_turns = self.next_right() + self.sum_turns += num_turns + self.position = tuple(self.step(self.direction)) + + elif self.direction in self.__labyrinth.get_allowed_directions(self.position): + ''' + Continue Straight if no right turn is available + ''' + self.position = tuple(self.step(self.direction)) + + else: + ''' + Take the next left if no other option available + ''' + self.direction, num_turns = self.next_left() + self.sum_turns += num_turns + self.position = tuple(self.step(self.direction)) + + elif self.wall_following_direction == 'Right': + if self.turn_left() in self.__labyrinth.get_allowed_directions(self.position): + ''' + Turn Right if Possible + ''' + self.direction, num_turns = self.next_left() + self.sum_turns += num_turns + self.position = tuple(self.step(self.direction)) + + elif self.direction in self.__labyrinth.get_allowed_directions(self.position): + ''' + Continue Straight if no right turn is available + ''' + self.position = tuple(self.step(self.direction)) + + else: + ''' + Take the next right if no other option available + ''' + self.direction, num_turns = self.next_right() + self.sum_turns += num_turns + self.position = tuple(self.step(self.direction)) + + self.path.append(tuple(self.position)) #Update ordered path for output + ''' + Update the path dictionary for loop detection + ''' + try: self.path_dict[tuple(self.position)] += 1 + except: self.path_dict[tuple(self.position)] = 1 + + if self.sum_turns == 0 or abs(self.sum_turns) >= 5: + self.wall_following = False + self.sum_terms = 0 + +class Labyrinth: + + def __init__(self, walls, start, end): + self.walls = walls + self.walls_dict = self.read_walls_to_dict() + + self.start_pos = tuple(start) + self.goal = tuple(end) + + self.directions = [0, 1, 2, 3] + self.movements = np.array([ [1,0], [0,1], [-1,0], [0,-1] ]) + + ''' + Directions Structure: + | Right = 0 | up = 1 | left = 2 | down = 3 | + ''' + def get_allowed_directions(self, position): + mouse_position = np.array(position) + + allowed_directions = [] + for direction, movement in zip(self.directions, self.movements): + if not self.is_wall(mouse_position + movement): + allowed_directions.append(direction) + return allowed_directions + + def is_wall(self, position): + out = False + if tuple(position) in self.walls_dict: + out = True + return out + #Do this with a set of tuples or dictionary - constant time lookup + + def read_walls_to_dict(self): + walls_dict = {} + for i in self.walls: + walls_dict[i] = 0 + return walls_dict diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/heuristics.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/heuristics.py new file mode 100644 index 0000000..2c12a2b --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/heuristics.py @@ -0,0 +1,40 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +from lib.rrt_star.utilities.geometry import dist_between_points + + +def cost_to_go(a: tuple, b: tuple) -> float: + """ + :param a: current location + :param b: next location + :return: estimated segment_cost-to-go from a to b + """ + return dist_between_points(a, b) + + +def path_cost(E, a, b): + """ + Cost of the unique path from x_init to x + :param E: edges, in form of E[child] = parent + :param a: initial location + :param b: goal location + :return: segment_cost of unique path from x_init to x + """ + cost = 0 + while not b == a: + p = E[b] + cost += dist_between_points(b, p) + b = p + + return cost + + +def segment_cost(a, b): + """ + Cost function of the line between x_near and x_new + :param a: start of line + :param b: end of line + :return: segment_cost function between a and b + """ + return dist_between_points(a, b) diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt.py new file mode 100644 index 0000000..0334720 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt.py @@ -0,0 +1,40 @@ +from lib.rrt_star.rrt.rrt_base import RRTBase + + +class RRT(RRTBase): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01): + """ + Template RRT planner + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc) + + def rrt_search(self): + """ + Create and return a Rapidly-exploring Random Tree, keeps expanding until can connect to goal + https://en.wikipedia.org/wiki/Rapidly-exploring_random_tree + :return: list representation of path, dict representing edges of tree in form E[child] = parent + """ + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + while True: + for q in self.Q: # iterate over different edge lengths until solution found or time out + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + + if x_new is None: + continue + + # connect shortest valid edge + self.connect_to_point(0, x_nearest, x_new) + + solution = self.check_solution() + if solution[0]: + return solution[1] diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_base.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_base.py new file mode 100644 index 0000000..2d6a449 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_base.py @@ -0,0 +1,178 @@ +import random + +import numpy as np + +from lib.rrt_star.rrt.tree import Tree +from lib.rrt_star.utilities.geometry import steer + + +class RRTBase(object): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01): + """ + Template RRT planner + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + """ + self.X = X + self.samples_taken = 0 + self.max_samples = max_samples + self.Q = Q + self.r = r + self.prc = prc + self.x_init = x_init + self.x_goal = x_goal + self.trees = [] # list of all trees + self.add_tree() # add initial tree + + def add_tree(self): + """ + Create an empty tree and add to trees + """ + self.trees.append(Tree(self.X)) + + def add_vertex(self, tree, v): + """ + Add vertex to corresponding tree + :param tree: int, tree to which to add vertex + :param v: tuple, vertex to add + """ + self.trees[tree].V.insert(0, v + v, v) + self.trees[tree].V_count += 1 # increment number of vertices in tree + self.samples_taken += 1 # increment number of samples taken + + def add_edge(self, tree, child, parent): + """ + Add edge to corresponding tree + :param tree: int, tree to which to add vertex + :param child: tuple, child vertex + :param parent: tuple, parent vertex + """ + self.trees[tree].E[child] = parent + + def nearby(self, tree, x, n): + """ + Return nearby vertices + :param tree: int, tree being searched + :param x: tuple, vertex around which searching + :param n: int, max number of neighbors to return + :return: list of nearby vertices + """ + return self.trees[tree].V.nearest(x, num_results=n, objects="raw") + + def get_nearest(self, tree, x): + """ + Return vertex nearest to x + :param tree: int, tree being searched + :param x: tuple, vertex around which searching + :return: tuple, nearest vertex to x + """ + return next(self.nearby(tree, x, 1)) + + def new_and_near(self, tree, q): + """ + Return a new steered vertex and the vertex in tree that is nearest + :param tree: int, tree being searched + :param q: length of edge when steering + :return: vertex, new steered vertex, vertex, nearest vertex in tree to new vertex + """ + x_rand = self.X.sample_free() + x_nearest = self.get_nearest(tree, x_rand) + x_new = self.bound_point(steer(x_nearest, x_rand, q[0])) + # check if new point is in X_free and not already in V + if not self.trees[0].V.count(x_new) == 0 or not self.X.obstacle_free(x_new): + return None, None + self.samples_taken += 1 + return x_new, x_nearest + + def connect_to_point(self, tree, x_a, x_b): + """ + Connect vertex x_a in tree to vertex x_b + :param tree: int, tree to which to add edge + :param x_a: tuple, vertex + :param x_b: tuple, vertex + :return: bool, True if able to add edge, False if prohibited by an obstacle + """ + if self.trees[tree].V.count(x_b) == 0 and self.X.collision_free(x_a, x_b, self.r): + self.add_vertex(tree, x_b) + self.add_edge(tree, x_b, x_a) + return True + return False + + def can_connect_to_goal(self, tree): + """ + Check if the goal can be connected to the graph + :param tree: rtree of all Vertices + :return: True if can be added, False otherwise + """ + x_nearest = self.get_nearest(tree, self.x_goal) + if self.x_goal in self.trees[tree].E and x_nearest in self.trees[tree].E[self.x_goal]: + # tree is already connected to goal using nearest vertex + return True + # check if obstacle-free + if self.X.collision_free(x_nearest, self.x_goal, self.r): + return True + return False + + def get_path(self): + """ + Return path through tree from start to goal + :return: path if possible, None otherwise + """ + if self.can_connect_to_goal(0): + print("Can connect to goal") + self.connect_to_goal(0) + return self.reconstruct_path(0, self.x_init, self.x_goal) + print("Could not connect to goal") + return None + + def connect_to_goal(self, tree): + """ + Connect x_goal to graph + (does not check if this should be possible, for that use: can_connect_to_goal) + :param tree: rtree of all Vertices + """ + x_nearest = self.get_nearest(tree, self.x_goal) + self.trees[tree].E[self.x_goal] = x_nearest + + def reconstruct_path(self, tree, x_init, x_goal): + """ + Reconstruct path from start to goal + :param tree: int, tree in which to find path + :param x_init: tuple, starting vertex + :param x_goal: tuple, ending vertex + :return: sequence of vertices from start to goal + """ + path = [x_goal] + current = x_goal + if x_init == x_goal: + return path + while not self.trees[tree].E[current] == x_init: + path.append(self.trees[tree].E[current]) + current = self.trees[tree].E[current] + path.append(x_init) + path.reverse() + return path + + def check_solution(self): + # probabilistically check if solution found + if self.prc and random.random() < self.prc: + print("Checking if can connect to goal at", + str(self.samples_taken), "samples") + path = self.get_path() + if path is not None: + return True, path + # check if can connect to goal after generating max_samples + if self.samples_taken >= self.max_samples: + return True, self.get_path() + return False, None + + def bound_point(self, point): + # if point is out-of-bounds, set to bound + point = np.maximum(point, self.X.dimension_lengths[:, 0]) + point = np.minimum(point, self.X.dimension_lengths[:, 1]) + return tuple(point) diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_connect.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_connect.py new file mode 100644 index 0000000..f08fe1b --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_connect.py @@ -0,0 +1,85 @@ +import enum + +import numpy as np + +from lib.rrt_star.rrt.rrt_base import RRTBase +from lib.rrt_star.utilities.geometry import steer + + +class Status(enum.Enum): + FAILED = 1 + TRAPPED = 2 + ADVANCED = 3 + REACHED = 4 + + +class RRTConnect(RRTBase): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01): + """ + Template RRTConnect planner + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc) + self.swapped = False + + def swap_trees(self): + """ + Swap trees only + """ + # swap trees + self.trees[0], self.trees[1] = self.trees[1], self.trees[0] + self.swapped = not self.swapped + + def unswap(self): + """ + Check if trees have been swapped and unswap + """ + if self.swapped: + self.swap_trees() + + def extend(self, tree, x_rand): + x_nearest = self.get_nearest(tree, x_rand) + x_new = steer(x_nearest, x_rand, self.Q[0]) + if self.connect_to_point(tree, x_nearest, x_new): + if np.abs(np.sum(np.array(x_new) - np.array(x_rand))) < 1e-2: + return x_new, Status.REACHED + return x_new, Status.ADVANCED + return x_new, Status.TRAPPED + + def connect(self, tree, x): + S = Status.ADVANCED + while S == Status.ADVANCED: + x_new, S = self.extend(tree, x) + return x_new, S + + def rrt_connect(self): + """ + RRTConnect + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + self.add_tree() + self.add_vertex(1, self.x_goal) + self.add_edge(1, self.x_goal, None) + while self.samples_taken < self.max_samples: + x_rand = self.X.sample_free() + x_new, status = self.extend(0, x_rand) + if status != Status.TRAPPED: + x_new, connect_status = self.connect(1, x_new) + if connect_status == Status.REACHED: + self.unswap() + first_part = self.reconstruct_path( + 0, self.x_init, self.get_nearest(0, x_new)) + second_part = self.reconstruct_path( + 1, self.x_goal, self.get_nearest(1, x_new)) + second_part.reverse() + return first_part + second_part + self.swap_trees() + self.samples_taken += 1 diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star.py new file mode 100644 index 0000000..f76bbb3 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star.py @@ -0,0 +1,114 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. +from operator import itemgetter + +from lib.rrt_star.rrt.heuristics import cost_to_go +from lib.rrt_star.rrt.heuristics import segment_cost, path_cost +from lib.rrt_star.rrt.rrt import RRT + + +class RRTStar(RRT): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01, rewire_count=None): + """ + RRT* Search + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + :param rewire_count: number of nearby vertices to rewire + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc) + self.rewire_count = rewire_count if rewire_count is not None else 0 + self.c_best = float('inf') # length of best solution thus far + + def get_nearby_vertices(self, tree, x_init, x_new): + """ + Get nearby vertices to new vertex and their associated path costs from the root of tree + as if new vertex is connected to each one separately. + + :param tree: tree in which to search + :param x_init: starting vertex used to calculate path cost + :param x_new: vertex around which to find nearby vertices + :return: list of nearby vertices and their costs, sorted in ascending order by cost + """ + X_near = self.nearby(tree, x_new, self.current_rewire_count(tree)) + L_near = [(path_cost(self.trees[tree].E, x_init, x_near) + segment_cost(x_near, x_new), x_near) for + x_near in X_near] + # noinspection PyTypeChecker + L_near.sort(key=itemgetter(0)) + + return L_near + + def rewire(self, tree, x_new, L_near): + """ + Rewire tree to shorten edges if possible + Only rewires vertices according to rewire count + :param tree: int, tree to rewire + :param x_new: tuple, newly added vertex + :param L_near: list of nearby vertices used to rewire + :return: + """ + for c_near, x_near in L_near: + curr_cost = path_cost(self.trees[tree].E, self.x_init, x_near) + tent_cost = path_cost( + self.trees[tree].E, self.x_init, x_new) + segment_cost(x_new, x_near) + if tent_cost < curr_cost and self.X.collision_free(x_near, x_new, self.r): + self.trees[tree].E[x_near] = x_new + + def connect_shortest_valid(self, tree, x_new, L_near): + """ + Connect to nearest vertex that has an unobstructed path + :param tree: int, tree being added to + :param x_new: tuple, vertex being added + :param L_near: list of nearby vertices + """ + # check nearby vertices for total cost and connect shortest valid edge + for c_near, x_near in L_near: + if c_near + cost_to_go(x_near, self.x_goal) < self.c_best and self.connect_to_point(tree, x_near, x_new): + break + + def current_rewire_count(self, tree): + """ + Return rewire count + :param tree: tree being rewired + :return: rewire count + """ + # if no rewire count specified, set rewire count to be all vertices + if self.rewire_count is None: + return self.trees[tree].V_count + + # max valid rewire count + return min(self.trees[tree].V_count, self.rewire_count) + + def rrt_star(self): + """ + Based on algorithm found in: Incremental Sampling-based Algorithms for Optimal Motion Planning + http://roboticsproceedings.org/rss06/p34.pdf + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + while True: + for q in self.Q: # iterate over different edge lengths + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + if x_new is None: + continue + + # get nearby vertices and cost-to-come + L_near = self.get_nearby_vertices(0, self.x_init, x_new) + + # check nearby vertices for total cost and connect shortest valid edge + self.connect_shortest_valid(0, x_new, L_near) + + if x_new in self.trees[0].E: + # rewire tree + self.rewire(0, x_new, L_near) + + solution = self.check_solution() + if solution[0]: + return solution[1] diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star_bid.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star_bid.py new file mode 100644 index 0000000..3f12ead --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star_bid.py @@ -0,0 +1,129 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +import random + +from lib.rrt_star.rrt.heuristics import path_cost +from lib.rrt_star.rrt.rrt_star import RRTStar + + +class RRTStarBidirectional(RRTStar): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01, rewire_count=None): + """ + Bidirectional RRT* Search + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + :param rewire_count: number of nearby vertices to rewire + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc, rewire_count) + self.sigma_best = None # best solution thus far + self.swapped = False + + def connect_trees(self, a, b, x_new, L_near): + """ + Check nearby vertices for total cost and connect shortest valid edge if possible + This results in both trees being connected + :param a: first tree to connect + :param b: second tree to connect + :param x_new: new vertex to add + :param L_near: nearby vertices + """ + for c_near, x_near in L_near: + c_tent = c_near + path_cost(self.trees[a].E, self.x_init, x_new) + if c_tent < self.c_best and self.X.collision_free(x_near, x_new, self.r): + self.trees[b].V_count += 1 + self.trees[b].E[x_new] = x_near + self.c_best = c_tent + sigma_a = self.reconstruct_path(a, self.x_init, x_new) + sigma_b = self.reconstruct_path(b, self.x_goal, x_new) + del sigma_b[-1] + sigma_b.reverse() + self.sigma_best = sigma_a + sigma_b + + break + + def swap_trees(self): + """ + Swap trees and start/goal + """ + # swap trees + self.trees[0], self.trees[1] = self.trees[1], self.trees[0] + # swap start/goal + self.x_init, self.x_goal = self.x_goal, self.x_init + self.swapped = not self.swapped + + def unswap(self): + """ + Check if trees have been swapped and unswap + Reverse path if needed to correspond with swapped trees + """ + if self.swapped: + self.swap_trees() + + if self.sigma_best is not None and self.sigma_best[0] is not self.x_init: + self.sigma_best.reverse() + + def rrt_star_bidirectional(self): + """ + Bidirectional RRT* + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + # tree a + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + # tree b + self.add_tree() + self.add_vertex(1, self.x_goal) + self.add_edge(1, self.x_goal, None) + + while True: + for q in self.Q: # iterate over different edge lengths + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + if x_new is None: + continue + + # get nearby vertices and cost-to-come + L_near = self.get_nearby_vertices(0, self.x_init, x_new) + + # check nearby vertices for total cost and connect shortest valid edge + self.connect_shortest_valid(0, x_new, L_near) + + if x_new in self.trees[0].E: + # rewire tree + self.rewire(0, x_new, L_near) + + # nearby vertices from opposite tree and cost-to-come + L_near = self.get_nearby_vertices( + 1, self.x_goal, x_new) + + self.connect_trees(0, 1, x_new, L_near) + + if self.prc and random.random() < self.prc: # probabilistically check if solution found + print("Checking if can connect to goal at", + str(self.samples_taken), "samples") + if self.sigma_best is not None: + print("Can connect to goal") + self.unswap() + + return self.sigma_best + + if self.samples_taken >= self.max_samples: + self.unswap() + + if self.sigma_best is not None: + print("Can connect to goal") + + return self.sigma_best + else: + print("Could not connect to goal") + + return self.sigma_best + + self.swap_trees() diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star_bid_h.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star_bid_h.py new file mode 100644 index 0000000..cf542b6 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/rrt_star_bid_h.py @@ -0,0 +1,126 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. +import random + +from lib.rrt_star.rrt.rrt_star_bid import RRTStarBidirectional +from lib.rrt_star.utilities.geometry import dist_between_points, pairwise + + +class RRTStarBidirectionalHeuristic(RRTStarBidirectional): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01, + rewire_count: int = None, conditional_rewire: bool = False): + """ + Bidirectional RRT* Search + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + :param rewire_count: number of nearby vertices to rewire + :param conditional_rewire: if True, set rewire count to 1 until solution found, + then set to specified rewire count (ensure runtime complexity guarantees) + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc, + 1 if conditional_rewire else rewire_count) + self.original_rewire_count = rewire_count + + def rrt_star_bid_h(self): + """ + Bidirectional RRT* using added heuristics + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + # tree a + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + # tree b + self.add_tree() + self.add_vertex(1, self.x_goal) + self.add_edge(1, self.x_goal, None) + + while True: + for q in self.Q: # iterate over different edge lengths + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + if x_new is None: + continue + + # get nearby vertices and cost-to-come + L_near = self.get_nearby_vertices(0, self.x_init, x_new) + + # check nearby vertices for total cost and connect shortest valid edge + self.connect_shortest_valid(0, x_new, L_near) + + if x_new in self.trees[0].E: + # rewire tree + self.rewire(0, x_new, L_near) + + # nearby vertices from opposite tree and cost-to-come + L_near = self.get_nearby_vertices( + 1, self.x_goal, x_new) + + self.connect_trees(0, 1, x_new, L_near) + self.rewire_count = self.original_rewire_count + + self.lazy_shortening() + + if self.prc and random.random() < self.prc: # probabilistically check if solution found + print("Checking if can connect to goal at", + str(self.samples_taken), "samples") + if self.sigma_best is not None: + print("Can connect to goal") + self.unswap() + + return self.sigma_best + + if self.samples_taken >= self.max_samples: + self.unswap() + + if self.sigma_best is not None: + print("Can connect to goal") + + return self.sigma_best + else: + print("Could not connect to goal") + + return self.sigma_best + + self.swap_trees() + + def lazy_shortening(self): + """ + Lazily attempt to shorten current best path + """ + if self.sigma_best is not None and len(self.sigma_best) > 2: + a, b = 0, 0 + while not abs(a - b) > 1: + a, b = random.sample(range(0, len(self.sigma_best)), 2) + + a, b = min(a, b), max(a, b) + v_a, v_b = tuple(self.sigma_best[a]), tuple(self.sigma_best[b]) + + if self.X.collision_free(v_a, v_b, self.r): + # create new edge connecting vertices + if v_a in self.trees[0].E and v_b in self.reconstruct_path(0, self.x_init, v_a): + self.trees[0].E[v_a] = v_b + elif v_a in self.trees[1].E and v_b in self.reconstruct_path(1, self.x_goal, v_a): + self.trees[1].E[v_a] = v_b + elif v_b in self.trees[0].E and v_a in self.reconstruct_path(0, self.x_init, v_b): + self.trees[0].E[v_b] = v_a + elif v_b in self.trees[1].E and v_a in self.reconstruct_path(1, self.x_goal, v_b): + self.trees[1].E[v_b] = v_a + elif v_a in self.trees[0].E: + self.trees[0].E[v_b] = v_a + else: + self.trees[1].E[v_b] = v_a + + # update best path + # remove cost of removed edges + self.c_best -= sum(dist_between_points(i, j) + for i, j in pairwise(self.sigma_best[a:b + 1])) + # add cost of new edge + self.c_best += dist_between_points( + self.sigma_best[a], self.sigma_best[b]) + self.sigma_best = self.sigma_best[:a + 1] + self.sigma_best[b:] diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/tree.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/tree.py new file mode 100644 index 0000000..7ac0bbb --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt/tree.py @@ -0,0 +1,14 @@ +from rtree import index + + +class Tree(object): + def __init__(self, X): + """ + Tree representation + :param X: Search Space + """ + p = index.Property() + p.dimension = X.dimensions + self.V = index.Index(interleaved=True, properties=p) # vertices in an rtree + self.V_count = 0 + self.E = {} # edges in form E[child] = parent diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt_star_2d.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt_star_2d.py new file mode 100644 index 0000000..fde3eda --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/rrt_star_2d.py @@ -0,0 +1,73 @@ + +import numpy as np + +from lib.rrt_star.rrt.rrt_star import RRTStar +from lib.rrt_star.search_space.search_space import SearchSpace +#from pathfinding.rrtStarCombo.utilities.plotting import Plot + + +# wrapper function +def rrt_star_2d_search(obstacle_list, start, end): + # instantiate attributes + Obstacles = process_obstacles(obstacle_list) + + # dimensions of Search Space + X_dimensions = np.array([(0, 150), (0, 150)]) + + # length of tree edges + Q = np.array([(8, 4)]) + + # res is taken from fastslam + r = 1 + + # max number of samples to take before timing out + max_samples = 1024 + + # optional, number of nearby branches to rewire + rewire_count = 500 + + # probability of checking for a connection to goal + prc = 0.1 + + # create Search Space + X = SearchSpace(X_dimensions, Obstacles) + + # create rrt_search + rrt = RRTStar(X, Q, start, end, max_samples, r, prc, rewire_count) + + try: + # call the main algorithm + # it should return the path as a list or coordinates + path = rrt.rrt_star() + maze_solved = True + except: + path = start + maze_solved = False + + # Return a list of tuples as path and + # a boolean indicating maze solved or not + return path, maze_solved + + +""" +Obstacles = ([(x, y, x+res, y+res) #res is taken from fast slam +x_init = (0, 0) # starting location +x_goal = (100, 100) # goal location + +plot = Plot("rrt_star_2d") +plot.plot_tree(X, rrt.trees) +if path is not None: + plot.plot_path(X, path) +plot.plot_obstacles(X, Obstacles) +plot.plot_start(X, x_init) +plot.plot_goal(X, x_goal) +plot.draw(auto_open=True) +""" + + +def process_obstacles(obstacle_list): + result = [] + for coor in obstacle_list: + result.append((*(coor), *(coor))) + + return np.array(result) diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/search_space/search_space.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/search_space/search_space.py new file mode 100644 index 0000000..cd92c47 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/search_space/search_space.py @@ -0,0 +1,80 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +import numpy as np +from rtree import index + +from lib.rrt_star.utilities.geometry import es_points_along_line +from lib.rrt_star.utilities.obstacle_generation import obstacle_generator + + +class SearchSpace(object): + def __init__(self, dimension_lengths, O=None): + """ + Initialize Search Space + :param dimension_lengths: range of each dimension + :param O: list of obstacles + """ + # sanity check + if len(dimension_lengths) < 2: + raise Exception("Must have at least 2 dimensions") + self.dimensions = len(dimension_lengths) # number of dimensions + # sanity checks + if any(len(i) != 2 for i in dimension_lengths): + raise Exception("Dimensions can only have a start and end") + if any(i[0] >= i[1] for i in dimension_lengths): + raise Exception("Dimension start must be less than dimension end") + self.dimension_lengths = dimension_lengths # length of each dimension + p = index.Property() + p.dimension = self.dimensions + if O is None: + self.obs = index.Index(interleaved=True, properties=p) + else: + # r-tree representation of obstacles + # sanity check + if any(len(o) / 2 != len(dimension_lengths) for o in O): + raise Exception("Obstacle has incorrect dimension definition") + # if any(o[i] >= o[int(i + len(o) / 2)] for o in O for i in range(int(len(o) / 2))): + # raise Exception( + # "Obstacle start must be less than obstacle end") + self.obs = index.Index(obstacle_generator( + O), interleaved=True, properties=p) + + def obstacle_free(self, x): + """ + Check if a location resides inside of an obstacle + :param x: location to check + :return: True if not inside an obstacle, False otherwise + """ + return self.obs.count(x) == 0 + + def sample_free(self): + """ + Sample a location within X_free + :return: random location within X_free + """ + while True: # sample until not inside of an obstacle + x = self.sample() + if self.obstacle_free(x): + return x + + def collision_free(self, start, end, r): + """ + Check if a line segment intersects an obstacle + :param start: starting point of line + :param end: ending point of line + :param r: resolution of points to sample along edge when checking for collisions + :return: True if line segment does not intersect an obstacle, False otherwise + """ + points = es_points_along_line(start, end, r) + coll_free = all(map(self.obstacle_free, points)) + return coll_free + + def sample(self): + """ + Return a random location within X + :return: random location within X (not necessarily X_free) + """ + x = np.random.uniform( + self.dimension_lengths[:, 0], self.dimension_lengths[:, 1]) + return tuple(x) diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/geometry.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/geometry.py new file mode 100644 index 0000000..e78b07f --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/geometry.py @@ -0,0 +1,60 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +from itertools import tee + +import numpy as np + + +def dist_between_points(a, b): + """ + Return the Euclidean distance between two points + :param a: first point + :param b: second point + :return: Euclidean distance between a and b + """ + distance = np.linalg.norm(np.array(b) - np.array(a)) + return distance + + +def pairwise(iterable): + """ + Pairwise iteration over iterable + :param iterable: iterable + :return: s -> (s0,s1), (s1,s2), (s2, s3), ... + """ + a, b = tee(iterable) + next(b, None) + return zip(a, b) + + +def es_points_along_line(start, end, r): + """ + Equally-spaced points along a line defined by start, end, with resolution r + :param start: starting point + :param end: ending point + :param r: maximum distance between points + :return: yields points along line from start to end, separated by distance r + """ + d = dist_between_points(start, end) + n_points = int(np.ceil(d / r)) + if n_points > 1: + step = d / (n_points - 1) + for i in range(n_points): + next_point = steer(start, end, i * step) + yield next_point + + +def steer(start, goal, d): + """ + Return a point in the direction of the goal, that is distance away from start + :param start: start location + :param goal: goal location + :param d: distance away from start + :return: point in the direction of the goal, distance away from start + """ + start, end = np.array(start), np.array(goal) + v = end - start + u = v / (np.sqrt(np.sum(v ** 2))) + steered_point = start + u * d + return tuple(steered_point) diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/obstacle_generation.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/obstacle_generation.py new file mode 100644 index 0000000..b6f851a --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/obstacle_generation.py @@ -0,0 +1,60 @@ +import random +import uuid + +import numpy as np + + +def generate_random_obstacles(X, start, end, n): + """ + Generates n random obstacles without disrupting world connectivity. + It also respects start and end points so that they don't lie inside of an obstacle. + """ + # Note: Current implementation only supports hyperrectangles. + i = 0 + obstacles = [] + while i < n: + center = np.empty(len(X.dimension_lengths), np.float) + scollision = True + fcollision = True + edge_lengths = [] + for j in range(X.dimensions): + # None of the sides of a hyperrectangle can be higher than 0.1 of the total span + # in that particular X.dimensions + max_edge_length = (X.dimension_lengths[j][1] - X.dimension_lengths[j][0]) / 10.0 + # None of the sides of a hyperrectangle can be higher than 0.01 of the total span + # in that particular X.dimensions + min_edge_length = (X.dimension_lengths[j][1] - X.dimension_lengths[j][0]) / 100.0 + edge_length = random.uniform(min_edge_length, max_edge_length) + center[j] = random.uniform(X.dimension_lengths[j][0] + edge_length, + X.dimension_lengths[j][1] - edge_length) + edge_lengths.append(edge_length) + + if abs(start[j] - center[j]) > edge_length: + scollision = False + if abs(end[j] - center[j]) > edge_length: + fcollision = False + + # Check if any part of the obstacle is inside of another obstacle. + min_corner = np.empty(X.dimensions, np.float) + max_corner = np.empty(X.dimensions, np.float) + for j in range(X.dimensions): + min_corner[j] = center[j] - edge_lengths[j] + max_corner[j] = center[j] + edge_lengths[j] + obstacle = np.append(min_corner, max_corner) + # Check newly generated obstacle intersects any former ones. Also respect start and end points + if len(list(X.obs.intersection(obstacle))) > 0 or scollision or fcollision: + continue + i += 1 + obstacles.append(obstacle) + X.obs.add(uuid.uuid4(), tuple(obstacle), tuple(obstacle)) + + return obstacles + + +def obstacle_generator(obstacles): + """ + Add obstacles to r-tree + :param obstacles: list of obstacles + """ + for obstacle in obstacles: + yield (uuid.uuid4(), obstacle, obstacle) diff --git a/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/plotting.py b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/plotting.py new file mode 100644 index 0000000..f3fffef --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/lib/rrt_star/utilities/plotting.py @@ -0,0 +1,231 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +import plotly as py +from plotly import graph_objs as go + +colors = ['darkblue', 'teal'] + + +class Plot(object): + def __init__(self, filename): + """ + Create a plot + :param filename: filename + """ + self.filename = filename + ".html" + self.data = [] + self.layout = {'title': 'Plot', + 'showlegend': False + } + + self.fig = {'data': self.data, + 'layout': self.layout} + + def plot_tree(self, X, trees): + """ + Plot tree + :param X: Search Space + :param trees: list of trees + """ + if X.dimensions == 2: # plot in 2D + self.plot_tree_2d(trees) + elif X.dimensions == 3: # plot in 3D + self.plot_tree_3d(trees) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_tree_2d(self, trees): + """ + Plot 2D trees + :param trees: trees to plot + """ + for i, tree in enumerate(trees): + for start, end in tree.E.items(): + if end is not None: + trace = go.Scatter( + x=[start[0], end[0]], + y=[start[1], end[1]], + line=dict( + color=colors[i] + ), + mode="lines" + ) + self.data.append(trace) + + def plot_tree_3d(self, trees): + """ + Plot 3D trees + :param trees: trees to plot + """ + for i, tree in enumerate(trees): + for start, end in tree.E.items(): + if end is not None: + trace = go.Scatter3d( + x=[start[0], end[0]], + y=[start[1], end[1]], + z=[start[2], end[2]], + line=dict( + color=colors[i] + ), + mode="lines" + ) + self.data.append(trace) + + def plot_obstacles(self, X, O): + """ + Plot obstacles + :param X: Search Space + :param O: list of obstacles + """ + if X.dimensions == 2: # plot in 2D + self.layout['shapes'] = [] + for O_i in O: + # noinspection PyUnresolvedReferences + self.layout['shapes'].append( + { + 'type': 'rect', + 'x0': O_i[0], + 'y0': O_i[1], + 'x1': O_i[2], + 'y1': O_i[3], + 'line': { + 'color': 'purple', + 'width': 4, + }, + 'fillcolor': 'purple', + 'opacity': 0.70 + }, + ) + elif X.dimensions == 3: # plot in 3D + for O_i in O: + obs = go.Mesh3d( + x=[O_i[0], O_i[0], O_i[3], O_i[3], O_i[0], O_i[0], O_i[3], O_i[3]], + y=[O_i[1], O_i[4], O_i[4], O_i[1], O_i[1], O_i[4], O_i[4], O_i[1]], + z=[O_i[2], O_i[2], O_i[2], O_i[2], O_i[5], O_i[5], O_i[5], O_i[5]], + i=[7, 0, 0, 0, 4, 4, 6, 6, 4, 0, 3, 2], + j=[3, 4, 1, 2, 5, 6, 5, 2, 0, 1, 6, 3], + k=[0, 7, 2, 3, 6, 7, 1, 1, 5, 5, 7, 6], + color='purple', + opacity=0.70 + ) + self.data.append(obs) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_path(self, X, path): + """ + Plot path through Search Space + :param X: Search Space + :param path: path through space given as a sequence of points + """ + if X.dimensions == 2: # plot in 2D + x, y = [], [] + for i in path: + x.append(i[0]) + y.append(i[1]) + trace = go.Scatter( + x=x, + y=y, + line=dict( + color="red", + width=4 + ), + mode="lines" + ) + + self.data.append(trace) + elif X.dimensions == 3: # plot in 3D + x, y, z = [], [], [] + for i in path: + x.append(i[0]) + y.append(i[1]) + z.append(i[2]) + trace = go.Scatter3d( + x=x, + y=y, + z=z, + line=dict( + color="red", + width=4 + ), + mode="lines" + ) + + self.data.append(trace) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_start(self, X, x_init): + """ + Plot starting point + :param X: Search Space + :param x_init: starting location + """ + if X.dimensions == 2: # plot in 2D + trace = go.Scatter( + x=[x_init[0]], + y=[x_init[1]], + line=dict( + color="orange", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + elif X.dimensions == 3: # plot in 3D + trace = go.Scatter3d( + x=[x_init[0]], + y=[x_init[1]], + z=[x_init[2]], + line=dict( + color="orange", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_goal(self, X, x_goal): + """ + Plot goal point + :param X: Search Space + :param x_goal: goal location + """ + if X.dimensions == 2: # plot in 2D + trace = go.Scatter( + x=[x_goal[0]], + y=[x_goal[1]], + line=dict( + color="green", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + elif X.dimensions == 3: # plot in 3D + trace = go.Scatter3d( + x=[x_goal[0]], + y=[x_goal[1]], + z=[x_goal[2]], + line=dict( + color="green", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def draw(self, auto_open=True): + """ + Render the plot to a file + """ + py.offline.plot(self.fig, filename=self.filename, auto_open=auto_open) diff --git a/nova_rover_demos/pathfinding/comparison_platform/results/a_star.pdf b/nova_rover_demos/pathfinding/comparison_platform/results/a_star.pdf new file mode 100644 index 0000000..4b3e2a3 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/results/a_star.pdf differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/results/benchmark.csv b/nova_rover_demos/pathfinding/comparison_platform/results/benchmark.csv new file mode 100644 index 0000000..1bfa59d --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/results/benchmark.csv @@ -0,0 +1,5 @@ +Algorithm,Mean Runtime,Std Deviation,Peak Memory Usage,Avg. Path +modified_pledge,36.167542139689125,2.391340146035558,124104,298.0 +a_star,87.76553471883138,9.33659105469145,1092816,224.0 +bidirectional_a_star,18.201440572738647,2.7570686991386473,228868,224.0 +weighted_a_star,6.618521430275657,0.8068808778901773,183294,228.0 diff --git a/nova_rover_demos/pathfinding/comparison_platform/results/bidirectional_a_star.pdf b/nova_rover_demos/pathfinding/comparison_platform/results/bidirectional_a_star.pdf new file mode 100644 index 0000000..593c8b6 Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/results/bidirectional_a_star.pdf differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/results/modified_pledge.pdf b/nova_rover_demos/pathfinding/comparison_platform/results/modified_pledge.pdf new file mode 100644 index 0000000..3b3be8a Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/results/modified_pledge.pdf differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/results/weighted_a_star.pdf b/nova_rover_demos/pathfinding/comparison_platform/results/weighted_a_star.pdf new file mode 100644 index 0000000..e99c26f Binary files /dev/null and b/nova_rover_demos/pathfinding/comparison_platform/results/weighted_a_star.pdf differ diff --git a/nova_rover_demos/pathfinding/comparison_platform/run.py b/nova_rover_demos/pathfinding/comparison_platform/run.py new file mode 100644 index 0000000..78b0c8f --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/run.py @@ -0,0 +1,45 @@ +from src.platform import * +import warnings +''' +Place Algorithm Imports Here +''' + +############## IMPORT YOUR ALGORITHM(S) HERE ########################### +from lib.modified_pledge import * +from lib.a_star import * +from lib.a_star_variants import * +from lib.rrt_star.rrt_star_2d import * + +######################################################################### +''' +Place the function call for each algorithm to be compared into the list named 'algorithm list'. + +Functions should be of the form * My_Function(occupancy_grid, start_position, end_position) + > occupancy_grid: list of tuples (any length) of coordinates containing walls + > start_position: length 2 tuple containing x and y starting coordinates + > end_position: length 2 tuple containing x and y coordinates of the goal coordinates + +''' + +################## List the algorithms you want to compare here ############################## +algorithm_list = [modified_pledge, a_star, + bidirectional_a_star, weighted_a_star, rrt_star_2d_search] + + +############################################################################################## + + +# Density keyword describes the density of the environment to be traversed. +# Can be 'heavy', 'medium', 'light' or 'sparse' +density = 'light' + + +###################################################################### +''' +Below this point no input is required from the user :) +''' +###################################################################### + +warnings.simplefilter("ignore") + +compare_algorithms(algorithm_list, density) diff --git a/nova_rover_demos/pathfinding/comparison_platform/run.py.txt b/nova_rover_demos/pathfinding/comparison_platform/run.py.txt deleted file mode 100644 index e69de29..0000000 diff --git a/nova_rover_demos/pathfinding/comparison_platform/sandbox.py b/nova_rover_demos/pathfinding/comparison_platform/sandbox.py new file mode 100644 index 0000000..b7fd74b --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/sandbox.py @@ -0,0 +1,7 @@ +from lib.rrt_star.rrt_star_2d import * +from src.maze.random_maze import * + +args = [*random_maze(150, 150, 'heavy')] + + +print(rrt_star_2d_search(*args)) diff --git a/nova_rover_demos/pathfinding/comparison_platform/src/benchmark.py b/nova_rover_demos/pathfinding/comparison_platform/src/benchmark.py new file mode 100644 index 0000000..0d281d4 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/src/benchmark.py @@ -0,0 +1,82 @@ +# Importing necessary files +import math +import time +import random +import statistics +import tracemalloc +import sys +import csv +''' + The core function which handles the benchmarking of different algorithms + + @param functions - A tuple containing the functions we want to compare + @param args - A tuple containing the arguments we want to pass into each function +''' + + +def benchmarker(functions, args): + + # Determining the number of iterations to be made + iterations = 100 if len(sys.argv) < 2 else int(sys.argv[1]) + # Dictionary to hold the runtime of the comparing functions + times = {f.__name__: [] for f in functions} + # Dictionary to hold memory + peak_memory = {f.__name__: 0 for f in functions} + # A dictionary to keep track of total path lenth + avg_path = {f.__name__: 0 for f in functions} + # Loading the arguments to proper functions + ''' + args = [ [...], [....], [...] ] + ''' + argument_dict = {} + for i in range(len(functions)): + argument_dict[functions[i].__name__] = args + + # Running each function randomly around 3000 times + for i in range(iterations): + for _ in range(len(functions)): + # Choose a function randomly from the list and load its arguments + func = random.choice(functions) + #func_args = argument_dict[func.__name__] + # Time its execution start tracing memory allocation + t0 = time.time() + tracemalloc.start() + # Run the functions with the arguments + # func(*func_args) + path, status = func(*args) + # Stop memory tracing + peak = tracemalloc.get_traced_memory()[1] + tracemalloc.stop() + # Stop timer + t1 = time.time() + times[func.__name__].append((t1-t0)*1000) + peak_memory[func.__name__] = peak \ + if peak > peak_memory[func.__name__] else peak_memory[func.__name__] + + avg_path[func.__name__] = len(path) + avg_path[func.__name__] + + return times, peak_memory, avg_path + + +''' + A function which outputs the benchmark statistics into a CSV file + + @param time_stats - Time related statistics + @param memory_stats - Memory usage + @param path_stats - Contains path lengths +''' + + +def print_results(time_stats, memory_stats, path_stats): + + with open('results/benchmark.csv', mode='w') as csv_file: + field_names = ['Algorithm', 'Mean Runtime', + 'Std Deviation', 'Peak Memory Usage', 'Avg. Path'] + writer = csv.DictWriter(csv_file, fieldnames=field_names) + + writer.writeheader() + for name, number in time_stats.items(): + writer.writerow({'Algorithm': name, 'Mean Runtime': statistics.mean(number), + 'Std Deviation': statistics.stdev(number), 'Peak Memory Usage': memory_stats[name], + 'Avg. Path': (path_stats[name] / len(number)) + }) diff --git a/nova_rover_demos/pathfinding/comparison_platform/src/maze/maze_demo.py b/nova_rover_demos/pathfinding/comparison_platform/src/maze/maze_demo.py new file mode 100644 index 0000000..c8b3701 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/src/maze/maze_demo.py @@ -0,0 +1,31 @@ +import numpy as np + + +from random_maze import * + +xdim = 200 +ydim = 200 + +density = ['sparse', 'light', 'medium', 'heavy'] + +subplot_ref = [ (0,0), (0,1), (1,0), (1,1)] +fig, axs = plt.subplots(2, 2) + + + +for i in range(len(density)): + walls, start, end = random_maze(xdim, ydim, density[i]) + maze = np.zeros([xdim, ydim]) + + for cell in walls: + maze[cell] = 1 + axs[subplot_ref[i]].imshow(maze) + axs[subplot_ref[i]].set_title(f'{density[i]}') + +# walls, start, end = random_maze(xdim, ydim, 'super heavy') +# maze = np.zeros([xdim, ydim]) + +# for cell in walls: +# maze[cell] = 1 +# plt.imshow(maze) +plt.show() diff --git a/nova_rover_demos/pathfinding/comparison_platform/src/maze/random_maze.py b/nova_rover_demos/pathfinding/comparison_platform/src/maze/random_maze.py new file mode 100644 index 0000000..8a9e1e0 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/src/maze/random_maze.py @@ -0,0 +1,52 @@ +import numpy as np +import random + +import matplotlib.pyplot as plt + +def random_obstacle(x, y, max_size): + obstacle_size = random.randint(5,max_size) + obstacle_occupancy = [(x,y)] + for i in range(obstacle_size): + x += random.choice([1, 0, -1]) + y += random.choice([1, 0, -1]) + obstacle_occupancy.append((x,y)) + return obstacle_occupancy + + + +def random_maze(x_dimension, y_dimension, density): + oc_grid = [] + if density == 'heavy': + num_obstacles = int( np.sqrt(x_dimension * y_dimension) ) + max_obstacle_size = int( np.sqrt(x_dimension * y_dimension) ) + elif density == 'medium': + num_obstacles = int( 0.75 * np.sqrt(x_dimension * y_dimension) ) + max_obstacle_size = int( np.sqrt(x_dimension * y_dimension) ) + elif density == 'light': + num_obstacles = int( 0.5 * np.sqrt(x_dimension * y_dimension) ) + max_obstacle_size = int( np.sqrt(x_dimension * y_dimension) ) + elif density == 'sparse': + num_obstacles = int( 0.25 * np.sqrt(x_dimension * y_dimension) ) + max_obstacle_size = int( np.sqrt(x_dimension * y_dimension) ) + + start = (0,0) + end = (x_dimension - 1, y_dimension - 1) + + for i in range(num_obstacles): + x = random.randint(1, x_dimension - 2) + y = random.randint(1, y_dimension - 2) + for i in random_obstacle(x,y, max_obstacle_size): + if 0 <= i[0] <= x_dimension - 2 and 0 <= i[1] <= y_dimension - 2: + oc_grid.append(i) + ''' + Start and End positions are either in the corner or centre of the edge of the maze + Start and End are always on opposite edges of the maze + ''' + waypoints = [0, int(x_dimension/2), x_dimension - 1] + start = (random.choice(waypoints), 0) + if start[0] != int(x_dimension/2): #Prevent the maze from generating start and end coordinates along the edge of the maze + del waypoints[waypoints.index(start[0])] + + end = (random.choice(waypoints), y_dimension - 1) + + return oc_grid, start, end diff --git a/nova_rover_demos/pathfinding/comparison_platform/src/platform.py b/nova_rover_demos/pathfinding/comparison_platform/src/platform.py new file mode 100644 index 0000000..ee379d2 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/src/platform.py @@ -0,0 +1,42 @@ +import time +import random +import numpy as np + +from src.maze.random_maze import * +from src.benchmark import * +from src.visuals import * + +''' +Syntax: random_maze(x_dimension, y_dimension, density) + > Density can be 'heavy', 'medium', 'light', or 'sparse' +''' + +''' + The core function which combines different components. + It serves as a platform for the data to be passed around between different + sub-modules. + + @param algorithm_list - List of algorithms it needs to compare + @param density - The density of obstacles in the environment +''' + + +def compare_algorithms(algorithm_list, density): + + maze_x_dimension, maze_y_dimension = 150, 150 + + # Generate the arguments to the benchmarking function + # args output > occupancy_grid, start_coordinates, end_coordinates + args = [*random_maze(maze_x_dimension, maze_y_dimension, density)] + + # Run the benchmarking on the selected arguments + time_stats, memory_stats, path_stats = benchmarker(algorithm_list, args) + + # Save the results in the results folder + plot_diagram(algorithm_list, args, maze_x_dimension, maze_y_dimension) + + # Print the results in a log file + print_results(time_stats, memory_stats, path_stats) + + # Draw visuals based on our benchmarking + visualiser(time_stats, memory_stats, path_stats) diff --git a/nova_rover_demos/pathfinding/comparison_platform/src/platform.py.txt b/nova_rover_demos/pathfinding/comparison_platform/src/platform.py.txt deleted file mode 100644 index e69de29..0000000 diff --git a/nova_rover_demos/pathfinding/comparison_platform/src/visuals.py b/nova_rover_demos/pathfinding/comparison_platform/src/visuals.py new file mode 100644 index 0000000..0df8fa9 --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/src/visuals.py @@ -0,0 +1,100 @@ +import statistics +import matplotlib.pyplot as plt +import numpy as np + + +# The function responsible for displaying the plots in the screen +def visualiser(time_stats, memory_stats, path_stats): + + # Converting to appropriate data + func_names = [] + performance = [] + error = [] + peak_memory = [] + avg_path = [] + + for name, number in time_stats.items(): + func_names.append(name) + performance.append(statistics.mean(number)) + error.append(statistics.stdev(number)) + peak_memory.append(memory_stats[name]) + avg_path.append(path_stats[name] / len(number)) + + y_pos = np.arange(len(func_names)) + + # Plotting the runtime performance + fig1 = plt.figure(figsize=(10, 10)) + ax1 = fig1.add_subplot(211) + + ax1.barh(y_pos, performance, xerr=error, align='center', + color='green', ecolor='black') + ax1.set_yticks(y_pos) + ax1.set_yticklabels(func_names) + # Read labels top to bottom + ax1.invert_yaxis() + # Labels + ax1.set_xscale('log') + ax1.set_xlabel('Mean Runtime (ms)') + ax1.set_title('Runtime Comparison') + + # Plotting path visuals + ax_path = fig1.add_subplot(212) + ax_path.barh(y_pos, avg_path, align='center', + color='purple', ecolor='black') + # Setting y-axis labels + ax_path.set_yticks(y_pos) + ax_path.set_yticklabels(func_names) + # Adding x-axis labels + # Read labels top to bottom + ax_path.invert_yaxis() + ax_path.set_xlabel('Path Length') + ax_path.set_title('Distance Travelled') + + # Adding some padding between layouts + fig1.tight_layout(pad=4.0) + + # Plotting the memory performance + fig2 = plt.figure(figsize=(10, 10)) + ax2 = fig2.add_subplot() + + ax2.barh(y_pos, peak_memory, align='center') + ax2.set_yticks(y_pos) + ax2.set_yticklabels(func_names) + # Read labels top to bottom + ax2.invert_yaxis() + # Labels + ax2.set_xlabel('Peak Memory Use (KB)') + ax2.set_title('Memory Usage Comparison') + + fig2.tight_layout() + + # Show the plot + plt.show() + + +# A function to draw the grid with path found by each of the algorithms +def plot_diagram(functions, args, maze_x, maze_y): + + # Loop through all the algorithms + for func in functions: + path, status = func(*args) + + # Creating an identify matrix of given dimensions + grid = np.ones([maze_x, maze_y]) + + # Populate different kinds of grids + for i in args[0]: + grid[i] = 0 + + for j in path: + grid[j] = 2 + + grid[path[0]] = 3 + grid[path[-1]] = 4 + + # Create a figure and save it + plt.imshow(grid.T) + plt.colorbar() + filename = "results/" + func.__name__ + ".pdf" + plt.savefig(filename) + plt.close() diff --git a/nova_rover_demos/pathfinding/comparison_platform/src/visuals.py.txt b/nova_rover_demos/pathfinding/comparison_platform/src/visuals.py.txt deleted file mode 100644 index e69de29..0000000 diff --git a/nova_rover_demos/pathfinding/comparison_platform/wrapper.md b/nova_rover_demos/pathfinding/comparison_platform/wrapper.md new file mode 100644 index 0000000..310adea --- /dev/null +++ b/nova_rover_demos/pathfinding/comparison_platform/wrapper.md @@ -0,0 +1,30 @@ +# Wrapper function guidelines + +Wrapper function helps us run the comparison platform in a systematic way. You **do not** need a wrapper function if you algorithm has both the following characteristics: +- Accepts only list of obstacles, start coordinate & end coordinate as function parameters +- Returns the path as a list of tuples and a boolean indicating path was found or not + +If not then you can either tweak your existing function or put it inside a wrapper function. Here is a template for the wrapper function: + +```python +def WRAPPER_FUNCTION(obstacle_list, start, end): + + # Instantiate appropriate attributes if necessary + + + try: + # Call the main algorithm + # It should return the path as a list of coordinates (tuples) + path = YOUR_ALGORITHM_CALL + maze_solved = True + except: + path = start + maze_solved = False + + # Return a list of tuples as path and + # a boolean indicating maze solved or not + return path, maze_solved + +``` + +You should create this wrapper inside the **lib** folder and afterwards pass its name in `run.py` file. \ No newline at end of file diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/heuristics.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/heuristics.py new file mode 100644 index 0000000..7e5e1d1 --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/heuristics.py @@ -0,0 +1,40 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +from src.utilities.geometry import dist_between_points + + +def cost_to_go(a: tuple, b: tuple) -> float: + """ + :param a: current location + :param b: next location + :return: estimated segment_cost-to-go from a to b + """ + return dist_between_points(a, b) + + +def path_cost(E, a, b): + """ + Cost of the unique path from x_init to x + :param E: edges, in form of E[child] = parent + :param a: initial location + :param b: goal location + :return: segment_cost of unique path from x_init to x + """ + cost = 0 + while not b == a: + p = E[b] + cost += dist_between_points(b, p) + b = p + + return cost + + +def segment_cost(a, b): + """ + Cost function of the line between x_near and x_new + :param a: start of line + :param b: end of line + :return: segment_cost function between a and b + """ + return dist_between_points(a, b) diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt.py new file mode 100644 index 0000000..5d05601 --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt.py @@ -0,0 +1,40 @@ +from src.rrt.rrt_base import RRTBase + + +class RRT(RRTBase): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01): + """ + Template RRT planner + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc) + + def rrt_search(self): + """ + Create and return a Rapidly-exploring Random Tree, keeps expanding until can connect to goal + https://en.wikipedia.org/wiki/Rapidly-exploring_random_tree + :return: list representation of path, dict representing edges of tree in form E[child] = parent + """ + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + while True: + for q in self.Q: # iterate over different edge lengths until solution found or time out + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + + if x_new is None: + continue + + # connect shortest valid edge + self.connect_to_point(0, x_nearest, x_new) + + solution = self.check_solution() + if solution[0]: + return solution[1] diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_base.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_base.py new file mode 100644 index 0000000..e5c95a0 --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_base.py @@ -0,0 +1,176 @@ +import random + +import numpy as np + +from src.rrt.tree import Tree +from src.utilities.geometry import steer + + +class RRTBase(object): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01): + """ + Template RRT planner + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + """ + self.X = X + self.samples_taken = 0 + self.max_samples = max_samples + self.Q = Q + self.r = r + self.prc = prc + self.x_init = x_init + self.x_goal = x_goal + self.trees = [] # list of all trees + self.add_tree() # add initial tree + + def add_tree(self): + """ + Create an empty tree and add to trees + """ + self.trees.append(Tree(self.X)) + + def add_vertex(self, tree, v): + """ + Add vertex to corresponding tree + :param tree: int, tree to which to add vertex + :param v: tuple, vertex to add + """ + self.trees[tree].V.insert(0, v + v, v) + self.trees[tree].V_count += 1 # increment number of vertices in tree + self.samples_taken += 1 # increment number of samples taken + + def add_edge(self, tree, child, parent): + """ + Add edge to corresponding tree + :param tree: int, tree to which to add vertex + :param child: tuple, child vertex + :param parent: tuple, parent vertex + """ + self.trees[tree].E[child] = parent + + def nearby(self, tree, x, n): + """ + Return nearby vertices + :param tree: int, tree being searched + :param x: tuple, vertex around which searching + :param n: int, max number of neighbors to return + :return: list of nearby vertices + """ + return self.trees[tree].V.nearest(x, num_results=n, objects="raw") + + def get_nearest(self, tree, x): + """ + Return vertex nearest to x + :param tree: int, tree being searched + :param x: tuple, vertex around which searching + :return: tuple, nearest vertex to x + """ + return next(self.nearby(tree, x, 1)) + + def new_and_near(self, tree, q): + """ + Return a new steered vertex and the vertex in tree that is nearest + :param tree: int, tree being searched + :param q: length of edge when steering + :return: vertex, new steered vertex, vertex, nearest vertex in tree to new vertex + """ + x_rand = self.X.sample_free() + x_nearest = self.get_nearest(tree, x_rand) + x_new = self.bound_point(steer(x_nearest, x_rand, q[0])) + # check if new point is in X_free and not already in V + if not self.trees[0].V.count(x_new) == 0 or not self.X.obstacle_free(x_new): + return None, None + self.samples_taken += 1 + return x_new, x_nearest + + def connect_to_point(self, tree, x_a, x_b): + """ + Connect vertex x_a in tree to vertex x_b + :param tree: int, tree to which to add edge + :param x_a: tuple, vertex + :param x_b: tuple, vertex + :return: bool, True if able to add edge, False if prohibited by an obstacle + """ + if self.trees[tree].V.count(x_b) == 0 and self.X.collision_free(x_a, x_b, self.r): + self.add_vertex(tree, x_b) + self.add_edge(tree, x_b, x_a) + return True + return False + + def can_connect_to_goal(self, tree): + """ + Check if the goal can be connected to the graph + :param tree: rtree of all Vertices + :return: True if can be added, False otherwise + """ + x_nearest = self.get_nearest(tree, self.x_goal) + if self.x_goal in self.trees[tree].E and x_nearest in self.trees[tree].E[self.x_goal]: + # tree is already connected to goal using nearest vertex + return True + if self.X.collision_free(x_nearest, self.x_goal, self.r): # check if obstacle-free + return True + return False + + def get_path(self): + """ + Return path through tree from start to goal + :return: path if possible, None otherwise + """ + if self.can_connect_to_goal(0): + print("Can connect to goal") + self.connect_to_goal(0) + return self.reconstruct_path(0, self.x_init, self.x_goal) + print("Could not connect to goal") + return None + + def connect_to_goal(self, tree): + """ + Connect x_goal to graph + (does not check if this should be possible, for that use: can_connect_to_goal) + :param tree: rtree of all Vertices + """ + x_nearest = self.get_nearest(tree, self.x_goal) + self.trees[tree].E[self.x_goal] = x_nearest + + def reconstruct_path(self, tree, x_init, x_goal): + """ + Reconstruct path from start to goal + :param tree: int, tree in which to find path + :param x_init: tuple, starting vertex + :param x_goal: tuple, ending vertex + :return: sequence of vertices from start to goal + """ + path = [x_goal] + current = x_goal + if x_init == x_goal: + return path + while not self.trees[tree].E[current] == x_init: + path.append(self.trees[tree].E[current]) + current = self.trees[tree].E[current] + path.append(x_init) + path.reverse() + return path + + def check_solution(self): + # probabilistically check if solution found + if self.prc and random.random() < self.prc: + print("Checking if can connect to goal at", str(self.samples_taken), "samples") + path = self.get_path() + if path is not None: + return True, path + # check if can connect to goal after generating max_samples + if self.samples_taken >= self.max_samples: + return True, self.get_path() + return False, None + + def bound_point(self, point): + # if point is out-of-bounds, set to bound + point = np.maximum(point, self.X.dimension_lengths[:, 0]) + point = np.minimum(point, self.X.dimension_lengths[:, 1]) + return tuple(point) diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_connect.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_connect.py new file mode 100644 index 0000000..767bd4a --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_connect.py @@ -0,0 +1,83 @@ +import enum + +import numpy as np + +from src.rrt.rrt_base import RRTBase +from src.utilities.geometry import steer + + +class Status(enum.Enum): + FAILED = 1 + TRAPPED = 2 + ADVANCED = 3 + REACHED = 4 + + +class RRTConnect(RRTBase): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01): + """ + Template RRTConnect planner + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc) + self.swapped = False + + def swap_trees(self): + """ + Swap trees only + """ + # swap trees + self.trees[0], self.trees[1] = self.trees[1], self.trees[0] + self.swapped = not self.swapped + + def unswap(self): + """ + Check if trees have been swapped and unswap + """ + if self.swapped: + self.swap_trees() + + def extend(self, tree, x_rand): + x_nearest = self.get_nearest(tree, x_rand) + x_new = steer(x_nearest, x_rand, self.Q[0]) + if self.connect_to_point(tree, x_nearest, x_new): + if np.abs(np.sum(np.array(x_new) - np.array(x_rand))) < 1e-2: + return x_new, Status.REACHED + return x_new, Status.ADVANCED + return x_new, Status.TRAPPED + + def connect(self, tree, x): + S = Status.ADVANCED + while S == Status.ADVANCED: + x_new, S = self.extend(tree, x) + return x_new, S + + def rrt_connect(self): + """ + RRTConnect + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + self.add_tree() + self.add_vertex(1, self.x_goal) + self.add_edge(1, self.x_goal, None) + while self.samples_taken < self.max_samples: + x_rand = self.X.sample_free() + x_new, status = self.extend(0, x_rand) + if status != Status.TRAPPED: + x_new, connect_status = self.connect(1, x_new) + if connect_status == Status.REACHED: + self.unswap() + first_part = self.reconstruct_path(0, self.x_init, self.get_nearest(0, x_new)) + second_part = self.reconstruct_path(1, self.x_goal, self.get_nearest(1, x_new)) + second_part.reverse() + return first_part + second_part + self.swap_trees() + self.samples_taken += 1 diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star.py new file mode 100644 index 0000000..dc81c7c --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star.py @@ -0,0 +1,113 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. +from operator import itemgetter + +from src.rrt.heuristics import cost_to_go +from src.rrt.heuristics import segment_cost, path_cost +from src.rrt.rrt import RRT + + +class RRTStar(RRT): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01, rewire_count=None): + """ + RRT* Search + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + :param rewire_count: number of nearby vertices to rewire + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc) + self.rewire_count = rewire_count if rewire_count is not None else 0 + self.c_best = float('inf') # length of best solution thus far + + def get_nearby_vertices(self, tree, x_init, x_new): + """ + Get nearby vertices to new vertex and their associated path costs from the root of tree + as if new vertex is connected to each one separately. + + :param tree: tree in which to search + :param x_init: starting vertex used to calculate path cost + :param x_new: vertex around which to find nearby vertices + :return: list of nearby vertices and their costs, sorted in ascending order by cost + """ + X_near = self.nearby(tree, x_new, self.current_rewire_count(tree)) + L_near = [(path_cost(self.trees[tree].E, x_init, x_near) + segment_cost(x_near, x_new), x_near) for + x_near in X_near] + # noinspection PyTypeChecker + L_near.sort(key=itemgetter(0)) + + return L_near + + def rewire(self, tree, x_new, L_near): + """ + Rewire tree to shorten edges if possible + Only rewires vertices according to rewire count + :param tree: int, tree to rewire + :param x_new: tuple, newly added vertex + :param L_near: list of nearby vertices used to rewire + :return: + """ + for c_near, x_near in L_near: + curr_cost = path_cost(self.trees[tree].E, self.x_init, x_near) + tent_cost = path_cost(self.trees[tree].E, self.x_init, x_new) + segment_cost(x_new, x_near) + if tent_cost < curr_cost and self.X.collision_free(x_near, x_new, self.r): + self.trees[tree].E[x_near] = x_new + + def connect_shortest_valid(self, tree, x_new, L_near): + """ + Connect to nearest vertex that has an unobstructed path + :param tree: int, tree being added to + :param x_new: tuple, vertex being added + :param L_near: list of nearby vertices + """ + # check nearby vertices for total cost and connect shortest valid edge + for c_near, x_near in L_near: + if c_near + cost_to_go(x_near, self.x_goal) < self.c_best and self.connect_to_point(tree, x_near, x_new): + break + + def current_rewire_count(self, tree): + """ + Return rewire count + :param tree: tree being rewired + :return: rewire count + """ + # if no rewire count specified, set rewire count to be all vertices + if self.rewire_count is None: + return self.trees[tree].V_count + + # max valid rewire count + return min(self.trees[tree].V_count, self.rewire_count) + + def rrt_star(self): + """ + Based on algorithm found in: Incremental Sampling-based Algorithms for Optimal Motion Planning + http://roboticsproceedings.org/rss06/p34.pdf + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + while True: + for q in self.Q: # iterate over different edge lengths + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + if x_new is None: + continue + + # get nearby vertices and cost-to-come + L_near = self.get_nearby_vertices(0, self.x_init, x_new) + + # check nearby vertices for total cost and connect shortest valid edge + self.connect_shortest_valid(0, x_new, L_near) + + if x_new in self.trees[0].E: + # rewire tree + self.rewire(0, x_new, L_near) + + solution = self.check_solution() + if solution[0]: + return solution[1] diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star_bid.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star_bid.py new file mode 100644 index 0000000..56e52fe --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star_bid.py @@ -0,0 +1,127 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +import random + +from src.rrt.heuristics import path_cost +from src.rrt.rrt_star import RRTStar + + +class RRTStarBidirectional(RRTStar): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01, rewire_count=None): + """ + Bidirectional RRT* Search + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + :param rewire_count: number of nearby vertices to rewire + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc, rewire_count) + self.sigma_best = None # best solution thus far + self.swapped = False + + def connect_trees(self, a, b, x_new, L_near): + """ + Check nearby vertices for total cost and connect shortest valid edge if possible + This results in both trees being connected + :param a: first tree to connect + :param b: second tree to connect + :param x_new: new vertex to add + :param L_near: nearby vertices + """ + for c_near, x_near in L_near: + c_tent = c_near + path_cost(self.trees[a].E, self.x_init, x_new) + if c_tent < self.c_best and self.X.collision_free(x_near, x_new, self.r): + self.trees[b].V_count += 1 + self.trees[b].E[x_new] = x_near + self.c_best = c_tent + sigma_a = self.reconstruct_path(a, self.x_init, x_new) + sigma_b = self.reconstruct_path(b, self.x_goal, x_new) + del sigma_b[-1] + sigma_b.reverse() + self.sigma_best = sigma_a + sigma_b + + break + + def swap_trees(self): + """ + Swap trees and start/goal + """ + # swap trees + self.trees[0], self.trees[1] = self.trees[1], self.trees[0] + # swap start/goal + self.x_init, self.x_goal = self.x_goal, self.x_init + self.swapped = not self.swapped + + def unswap(self): + """ + Check if trees have been swapped and unswap + Reverse path if needed to correspond with swapped trees + """ + if self.swapped: + self.swap_trees() + + if self.sigma_best is not None and self.sigma_best[0] is not self.x_init: + self.sigma_best.reverse() + + def rrt_star_bidirectional(self): + """ + Bidirectional RRT* + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + # tree a + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + # tree b + self.add_tree() + self.add_vertex(1, self.x_goal) + self.add_edge(1, self.x_goal, None) + + while True: + for q in self.Q: # iterate over different edge lengths + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + if x_new is None: + continue + + # get nearby vertices and cost-to-come + L_near = self.get_nearby_vertices(0, self.x_init, x_new) + + # check nearby vertices for total cost and connect shortest valid edge + self.connect_shortest_valid(0, x_new, L_near) + + if x_new in self.trees[0].E: + # rewire tree + self.rewire(0, x_new, L_near) + + # nearby vertices from opposite tree and cost-to-come + L_near = self.get_nearby_vertices(1, self.x_goal, x_new) + + self.connect_trees(0, 1, x_new, L_near) + + if self.prc and random.random() < self.prc: # probabilistically check if solution found + print("Checking if can connect to goal at", str(self.samples_taken), "samples") + if self.sigma_best is not None: + print("Can connect to goal") + self.unswap() + + return self.sigma_best + + if self.samples_taken >= self.max_samples: + self.unswap() + + if self.sigma_best is not None: + print("Can connect to goal") + + return self.sigma_best + else: + print("Could not connect to goal") + + return self.sigma_best + + self.swap_trees() diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star_bid_h.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star_bid_h.py new file mode 100644 index 0000000..5ebf54b --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/rrt_star_bid_h.py @@ -0,0 +1,122 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. +import random + +from src.rrt.rrt_star_bid import RRTStarBidirectional +from src.utilities.geometry import dist_between_points, pairwise + + +class RRTStarBidirectionalHeuristic(RRTStarBidirectional): + def __init__(self, X, Q, x_init, x_goal, max_samples, r, prc=0.01, + rewire_count: int = None, conditional_rewire: bool = False): + """ + Bidirectional RRT* Search + :param X: Search Space + :param Q: list of lengths of edges added to tree + :param x_init: tuple, initial location + :param x_goal: tuple, goal location + :param max_samples: max number of samples to take + :param r: resolution of points to sample along edge when checking for collisions + :param prc: probability of checking whether there is a solution + :param rewire_count: number of nearby vertices to rewire + :param conditional_rewire: if True, set rewire count to 1 until solution found, + then set to specified rewire count (ensure runtime complexity guarantees) + """ + super().__init__(X, Q, x_init, x_goal, max_samples, r, prc, + 1 if conditional_rewire else rewire_count) + self.original_rewire_count = rewire_count + + def rrt_star_bid_h(self): + """ + Bidirectional RRT* using added heuristics + :return: set of Vertices; Edges in form: vertex: [neighbor_1, neighbor_2, ...] + """ + # tree a + self.add_vertex(0, self.x_init) + self.add_edge(0, self.x_init, None) + + # tree b + self.add_tree() + self.add_vertex(1, self.x_goal) + self.add_edge(1, self.x_goal, None) + + while True: + for q in self.Q: # iterate over different edge lengths + for i in range(q[1]): # iterate over number of edges of given length to add + x_new, x_nearest = self.new_and_near(0, q) + if x_new is None: + continue + + # get nearby vertices and cost-to-come + L_near = self.get_nearby_vertices(0, self.x_init, x_new) + + # check nearby vertices for total cost and connect shortest valid edge + self.connect_shortest_valid(0, x_new, L_near) + + if x_new in self.trees[0].E: + # rewire tree + self.rewire(0, x_new, L_near) + + # nearby vertices from opposite tree and cost-to-come + L_near = self.get_nearby_vertices(1, self.x_goal, x_new) + + self.connect_trees(0, 1, x_new, L_near) + self.rewire_count = self.original_rewire_count + + self.lazy_shortening() + + if self.prc and random.random() < self.prc: # probabilistically check if solution found + print("Checking if can connect to goal at", str(self.samples_taken), "samples") + if self.sigma_best is not None: + print("Can connect to goal") + self.unswap() + + return self.sigma_best + + if self.samples_taken >= self.max_samples: + self.unswap() + + if self.sigma_best is not None: + print("Can connect to goal") + + return self.sigma_best + else: + print("Could not connect to goal") + + return self.sigma_best + + self.swap_trees() + + def lazy_shortening(self): + """ + Lazily attempt to shorten current best path + """ + if self.sigma_best is not None and len(self.sigma_best) > 2: + a, b = 0, 0 + while not abs(a - b) > 1: + a, b = random.sample(range(0, len(self.sigma_best)), 2) + + a, b = min(a, b), max(a, b) + v_a, v_b = tuple(self.sigma_best[a]), tuple(self.sigma_best[b]) + + if self.X.collision_free(v_a, v_b, self.r): + # create new edge connecting vertices + if v_a in self.trees[0].E and v_b in self.reconstruct_path(0, self.x_init, v_a): + self.trees[0].E[v_a] = v_b + elif v_a in self.trees[1].E and v_b in self.reconstruct_path(1, self.x_goal, v_a): + self.trees[1].E[v_a] = v_b + elif v_b in self.trees[0].E and v_a in self.reconstruct_path(0, self.x_init, v_b): + self.trees[0].E[v_b] = v_a + elif v_b in self.trees[1].E and v_a in self.reconstruct_path(1, self.x_goal, v_b): + self.trees[1].E[v_b] = v_a + elif v_a in self.trees[0].E: + self.trees[0].E[v_b] = v_a + else: + self.trees[1].E[v_b] = v_a + + # update best path + # remove cost of removed edges + self.c_best -= sum(dist_between_points(i, j) for i, j in pairwise(self.sigma_best[a:b + 1])) + # add cost of new edge + self.c_best += dist_between_points(self.sigma_best[a], self.sigma_best[b]) + self.sigma_best = self.sigma_best[:a + 1] + self.sigma_best[b:] diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/rrt/tree.py b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/tree.py new file mode 100644 index 0000000..7ac0bbb --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/rrt/tree.py @@ -0,0 +1,14 @@ +from rtree import index + + +class Tree(object): + def __init__(self, X): + """ + Tree representation + :param X: Search Space + """ + p = index.Property() + p.dimension = X.dimensions + self.V = index.Index(interleaved=True, properties=p) # vertices in an rtree + self.V_count = 0 + self.E = {} # edges in form E[child] = parent diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/search_space/search_space.py b/nova_rover_demos/pathfinding/rrtStarCombo/search_space/search_space.py new file mode 100644 index 0000000..726b98b --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/search_space/search_space.py @@ -0,0 +1,77 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +import numpy as np +from rtree import index + +from src.utilities.geometry import es_points_along_line +from src.utilities.obstacle_generation import obstacle_generator + + +class SearchSpace(object): + def __init__(self, dimension_lengths, O=None): + """ + Initialize Search Space + :param dimension_lengths: range of each dimension + :param O: list of obstacles + """ + # sanity check + if len(dimension_lengths) < 2: + raise Exception("Must have at least 2 dimensions") + self.dimensions = len(dimension_lengths) # number of dimensions + # sanity checks + if any(len(i) != 2 for i in dimension_lengths): + raise Exception("Dimensions can only have a start and end") + if any(i[0] >= i[1] for i in dimension_lengths): + raise Exception("Dimension start must be less than dimension end") + self.dimension_lengths = dimension_lengths # length of each dimension + p = index.Property() + p.dimension = self.dimensions + if O is None: + self.obs = index.Index(interleaved=True, properties=p) + else: + # r-tree representation of obstacles + # sanity check + if any(len(o) / 2 != len(dimension_lengths) for o in O): + raise Exception("Obstacle has incorrect dimension definition") + if any(o[i] >= o[int(i + len(o) / 2)] for o in O for i in range(int(len(o) / 2))): + raise Exception("Obstacle start must be less than obstacle end") + self.obs = index.Index(obstacle_generator(O), interleaved=True, properties=p) + + def obstacle_free(self, x): + """ + Check if a location resides inside of an obstacle + :param x: location to check + :return: True if not inside an obstacle, False otherwise + """ + return self.obs.count(x) == 0 + + def sample_free(self): + """ + Sample a location within X_free + :return: random location within X_free + """ + while True: # sample until not inside of an obstacle + x = self.sample() + if self.obstacle_free(x): + return x + + def collision_free(self, start, end, r): + """ + Check if a line segment intersects an obstacle + :param start: starting point of line + :param end: ending point of line + :param r: resolution of points to sample along edge when checking for collisions + :return: True if line segment does not intersect an obstacle, False otherwise + """ + points = es_points_along_line(start, end, r) + coll_free = all(map(self.obstacle_free, points)) + return coll_free + + def sample(self): + """ + Return a random location within X + :return: random location within X (not necessarily X_free) + """ + x = np.random.uniform(self.dimension_lengths[:, 0], self.dimension_lengths[:, 1]) + return tuple(x) diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/utilities/geometry.py b/nova_rover_demos/pathfinding/rrtStarCombo/utilities/geometry.py new file mode 100644 index 0000000..e78b07f --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/utilities/geometry.py @@ -0,0 +1,60 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +from itertools import tee + +import numpy as np + + +def dist_between_points(a, b): + """ + Return the Euclidean distance between two points + :param a: first point + :param b: second point + :return: Euclidean distance between a and b + """ + distance = np.linalg.norm(np.array(b) - np.array(a)) + return distance + + +def pairwise(iterable): + """ + Pairwise iteration over iterable + :param iterable: iterable + :return: s -> (s0,s1), (s1,s2), (s2, s3), ... + """ + a, b = tee(iterable) + next(b, None) + return zip(a, b) + + +def es_points_along_line(start, end, r): + """ + Equally-spaced points along a line defined by start, end, with resolution r + :param start: starting point + :param end: ending point + :param r: maximum distance between points + :return: yields points along line from start to end, separated by distance r + """ + d = dist_between_points(start, end) + n_points = int(np.ceil(d / r)) + if n_points > 1: + step = d / (n_points - 1) + for i in range(n_points): + next_point = steer(start, end, i * step) + yield next_point + + +def steer(start, goal, d): + """ + Return a point in the direction of the goal, that is distance away from start + :param start: start location + :param goal: goal location + :param d: distance away from start + :return: point in the direction of the goal, distance away from start + """ + start, end = np.array(start), np.array(goal) + v = end - start + u = v / (np.sqrt(np.sum(v ** 2))) + steered_point = start + u * d + return tuple(steered_point) diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/utilities/obstacle_generation.py b/nova_rover_demos/pathfinding/rrtStarCombo/utilities/obstacle_generation.py new file mode 100644 index 0000000..b6f851a --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/utilities/obstacle_generation.py @@ -0,0 +1,60 @@ +import random +import uuid + +import numpy as np + + +def generate_random_obstacles(X, start, end, n): + """ + Generates n random obstacles without disrupting world connectivity. + It also respects start and end points so that they don't lie inside of an obstacle. + """ + # Note: Current implementation only supports hyperrectangles. + i = 0 + obstacles = [] + while i < n: + center = np.empty(len(X.dimension_lengths), np.float) + scollision = True + fcollision = True + edge_lengths = [] + for j in range(X.dimensions): + # None of the sides of a hyperrectangle can be higher than 0.1 of the total span + # in that particular X.dimensions + max_edge_length = (X.dimension_lengths[j][1] - X.dimension_lengths[j][0]) / 10.0 + # None of the sides of a hyperrectangle can be higher than 0.01 of the total span + # in that particular X.dimensions + min_edge_length = (X.dimension_lengths[j][1] - X.dimension_lengths[j][0]) / 100.0 + edge_length = random.uniform(min_edge_length, max_edge_length) + center[j] = random.uniform(X.dimension_lengths[j][0] + edge_length, + X.dimension_lengths[j][1] - edge_length) + edge_lengths.append(edge_length) + + if abs(start[j] - center[j]) > edge_length: + scollision = False + if abs(end[j] - center[j]) > edge_length: + fcollision = False + + # Check if any part of the obstacle is inside of another obstacle. + min_corner = np.empty(X.dimensions, np.float) + max_corner = np.empty(X.dimensions, np.float) + for j in range(X.dimensions): + min_corner[j] = center[j] - edge_lengths[j] + max_corner[j] = center[j] + edge_lengths[j] + obstacle = np.append(min_corner, max_corner) + # Check newly generated obstacle intersects any former ones. Also respect start and end points + if len(list(X.obs.intersection(obstacle))) > 0 or scollision or fcollision: + continue + i += 1 + obstacles.append(obstacle) + X.obs.add(uuid.uuid4(), tuple(obstacle), tuple(obstacle)) + + return obstacles + + +def obstacle_generator(obstacles): + """ + Add obstacles to r-tree + :param obstacles: list of obstacles + """ + for obstacle in obstacles: + yield (uuid.uuid4(), obstacle, obstacle) diff --git a/nova_rover_demos/pathfinding/rrtStarCombo/utilities/plotting.py b/nova_rover_demos/pathfinding/rrtStarCombo/utilities/plotting.py new file mode 100644 index 0000000..f3fffef --- /dev/null +++ b/nova_rover_demos/pathfinding/rrtStarCombo/utilities/plotting.py @@ -0,0 +1,231 @@ +# This file is subject to the terms and conditions defined in +# file 'LICENSE', which is part of this source code package. + +import plotly as py +from plotly import graph_objs as go + +colors = ['darkblue', 'teal'] + + +class Plot(object): + def __init__(self, filename): + """ + Create a plot + :param filename: filename + """ + self.filename = filename + ".html" + self.data = [] + self.layout = {'title': 'Plot', + 'showlegend': False + } + + self.fig = {'data': self.data, + 'layout': self.layout} + + def plot_tree(self, X, trees): + """ + Plot tree + :param X: Search Space + :param trees: list of trees + """ + if X.dimensions == 2: # plot in 2D + self.plot_tree_2d(trees) + elif X.dimensions == 3: # plot in 3D + self.plot_tree_3d(trees) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_tree_2d(self, trees): + """ + Plot 2D trees + :param trees: trees to plot + """ + for i, tree in enumerate(trees): + for start, end in tree.E.items(): + if end is not None: + trace = go.Scatter( + x=[start[0], end[0]], + y=[start[1], end[1]], + line=dict( + color=colors[i] + ), + mode="lines" + ) + self.data.append(trace) + + def plot_tree_3d(self, trees): + """ + Plot 3D trees + :param trees: trees to plot + """ + for i, tree in enumerate(trees): + for start, end in tree.E.items(): + if end is not None: + trace = go.Scatter3d( + x=[start[0], end[0]], + y=[start[1], end[1]], + z=[start[2], end[2]], + line=dict( + color=colors[i] + ), + mode="lines" + ) + self.data.append(trace) + + def plot_obstacles(self, X, O): + """ + Plot obstacles + :param X: Search Space + :param O: list of obstacles + """ + if X.dimensions == 2: # plot in 2D + self.layout['shapes'] = [] + for O_i in O: + # noinspection PyUnresolvedReferences + self.layout['shapes'].append( + { + 'type': 'rect', + 'x0': O_i[0], + 'y0': O_i[1], + 'x1': O_i[2], + 'y1': O_i[3], + 'line': { + 'color': 'purple', + 'width': 4, + }, + 'fillcolor': 'purple', + 'opacity': 0.70 + }, + ) + elif X.dimensions == 3: # plot in 3D + for O_i in O: + obs = go.Mesh3d( + x=[O_i[0], O_i[0], O_i[3], O_i[3], O_i[0], O_i[0], O_i[3], O_i[3]], + y=[O_i[1], O_i[4], O_i[4], O_i[1], O_i[1], O_i[4], O_i[4], O_i[1]], + z=[O_i[2], O_i[2], O_i[2], O_i[2], O_i[5], O_i[5], O_i[5], O_i[5]], + i=[7, 0, 0, 0, 4, 4, 6, 6, 4, 0, 3, 2], + j=[3, 4, 1, 2, 5, 6, 5, 2, 0, 1, 6, 3], + k=[0, 7, 2, 3, 6, 7, 1, 1, 5, 5, 7, 6], + color='purple', + opacity=0.70 + ) + self.data.append(obs) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_path(self, X, path): + """ + Plot path through Search Space + :param X: Search Space + :param path: path through space given as a sequence of points + """ + if X.dimensions == 2: # plot in 2D + x, y = [], [] + for i in path: + x.append(i[0]) + y.append(i[1]) + trace = go.Scatter( + x=x, + y=y, + line=dict( + color="red", + width=4 + ), + mode="lines" + ) + + self.data.append(trace) + elif X.dimensions == 3: # plot in 3D + x, y, z = [], [], [] + for i in path: + x.append(i[0]) + y.append(i[1]) + z.append(i[2]) + trace = go.Scatter3d( + x=x, + y=y, + z=z, + line=dict( + color="red", + width=4 + ), + mode="lines" + ) + + self.data.append(trace) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_start(self, X, x_init): + """ + Plot starting point + :param X: Search Space + :param x_init: starting location + """ + if X.dimensions == 2: # plot in 2D + trace = go.Scatter( + x=[x_init[0]], + y=[x_init[1]], + line=dict( + color="orange", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + elif X.dimensions == 3: # plot in 3D + trace = go.Scatter3d( + x=[x_init[0]], + y=[x_init[1]], + z=[x_init[2]], + line=dict( + color="orange", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def plot_goal(self, X, x_goal): + """ + Plot goal point + :param X: Search Space + :param x_goal: goal location + """ + if X.dimensions == 2: # plot in 2D + trace = go.Scatter( + x=[x_goal[0]], + y=[x_goal[1]], + line=dict( + color="green", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + elif X.dimensions == 3: # plot in 3D + trace = go.Scatter3d( + x=[x_goal[0]], + y=[x_goal[1]], + z=[x_goal[2]], + line=dict( + color="green", + width=10 + ), + mode="markers" + ) + + self.data.append(trace) + else: # can't plot in higher dimensions + print("Cannot plot in > 3 dimensions") + + def draw(self, auto_open=True): + """ + Render the plot to a file + """ + py.offline.plot(self.fig, filename=self.filename, auto_open=auto_open)