diff --git a/grid_with_obstacles_Dijkstra.py b/grid_with_obstacles_Dijkstra.py new file mode 100644 index 0000000..00fe4c0 --- /dev/null +++ b/grid_with_obstacles_Dijkstra.py @@ -0,0 +1,169 @@ +import matplotlib.patches as patches +import numpy as np +import matplotlib.pyplot as plt +import networkx as nx +import matplotlib.animation as animation +import heapq +import random + +# graph creation +rows, cols = 17, 17 +grid = np.zeros((rows, cols)) +G = nx.grid_2d_graph(rows, cols) + +# initial and target nodes +start = (0, 0) +target = (12, 12) + +# obstacles creation +obstacle_ratio = 0.2 +num_obstacles = int(rows * cols * obstacle_ratio) +all_positions = [(r, c) for r in range(rows) for c in range(cols) if (r, c) not in [start, target]] +obstacles = random.sample(all_positions, num_obstacles) +for obs in obstacles: + grid[obs] = 1 +#removing them form the graph +for obs in obstacles: + if obs in G: + G.remove_node(obs) + +# Function of Dijkstra algorithm. Shown in the animation +def dijkstra_stepwise(G, start, target): + # Initialisation + distances = {node: float('inf') for node in G.nodes()} + distances[start] = 0 + previous_nodes = {node: None for node in G.nodes()} + evaluated_nodes = [] # List of evaluated nodes + path_to_current = [] # History of the path to the current node + + # priority queue on the non-evaluated nodes + priority_queue = [(0, start)] # (distance, node) + heapq.heapify(priority_queue) + + while priority_queue: + current_distance, current_node = heapq.heappop(priority_queue) + + if current_node not in evaluated_nodes: + evaluated_nodes.append(current_node) + + # Reconstruction of the path to the current node + temp_path = [] + node = current_node + while node is not None: + temp_path.append(node) + node = previous_nodes[node] + temp_path.reverse() + + path_to_current.append(temp_path) + + if current_node == target: + break + + # Evaluation of the current nodes neighbors + for neighbor in G.neighbors(current_node): + if neighbor not in evaluated_nodes: + new_distance = current_distance + 1 + if new_distance < distances[neighbor]: + distances[neighbor] = new_distance + previous_nodes[neighbor] = current_node + heapq.heappush(priority_queue, (new_distance, neighbor)) + + return evaluated_nodes, path_to_current + +# Retrieving the results of Dijkstra's algorithm +evaluated_nodes, path_history = dijkstra_stepwise(G, start, target) + +# Creation of the figure +fig, ax = plt.subplots(figsize=(6, 6)) +ax.imshow(grid, cmap="Greys", origin="upper") # The origin is set on the top left corner + +# Display of the grid +ax.set_xticks(np.arange(-0.5, cols, 1), minor=True) +ax.set_yticks(np.arange(-0.5, rows, 1), minor=True) +ax.grid(True, which="minor", color="black", linewidth=0.5) +ax.tick_params(which="both", bottom=False, left=False, labelbottom=False, labelleft=False) + +# Display of the start and target nodes +start_rect = patches.Rectangle((start[1] - 0.5, start[0] - 0.5), 1, 1, facecolor="green", alpha=0.8) +target_rect = patches.Rectangle((target[1] - 0.5, target[0] - 0.5), 1, 1, facecolor="yellow", alpha=0.8) +ax.add_patch(start_rect) +ax.add_patch(target_rect) + +# Initiate the plots +evaluated_patches = [] +path_patches = [] + +# List in use for the confetti animationn +confetti_patches = [] +confetti_velocities = [] + +# Update function for the animation +def update(frame): + # Diplays the evaluated nodes in blue + if frame < len(evaluated_nodes): + node = evaluated_nodes[frame] + if node != start: + rect = patches.Rectangle((node[1] - 0.5, node[0] - 0.5), 1, 1, facecolor="blue", alpha=0.6) + else: + rect = patches.Rectangle((node[1] - 0.5, node[0] - 0.5), 1, 1, facecolor="green", alpha=0.6) + ax.add_patch(rect) + evaluated_patches.append(rect) + + + # Delete previous path + if frame < len(path_history): + for patch in path_patches: + patch.remove() + path_patches.clear() + + # Displays the current shortest path + if frame < len(path_history): + for node in path_history[frame]: + if node != start: + rect = patches.Rectangle((node[1] - 0.5, node[0] - 0.5), 1, 1, facecolor="red", alpha=0.8) + else: + rect = patches.Rectangle((node[1] - 0.5, node[0] - 0.5), 1, 1, facecolor="green", alpha=0.8) + ax.add_patch(rect) + path_patches.append(rect) + + + # Getting rid of the previous shortest paths + if frame == len(path_history) - 1: + for patch in path_patches: + patch.set_facecolor("red") + path_patches[-1].set_facecolor("cyan") # When the target is reached, becomes cyan + + # confetti effect when we reach the target node + if frame == len(path_history) - 1: + # creation of particules around the final node + confetti_patches.clear() + confetti_velocities.clear() + for _ in range(50): # 50 particules + offset_x = random.uniform(-0.5, 0.5) + offset_y = random.uniform(-0.5, 0.5) + color = np.random.rand(3,) # Random color + confetti_rect = patches.Circle( + (target[1] + offset_x, target[0] + offset_y), + radius=0.1, facecolor=color, alpha=0.7 + ) + ax.add_patch(confetti_rect) + confetti_patches.append(confetti_rect) + + # Define velocity of the particales + velocity = [random.uniform(-0.1, 0.1), random.uniform(-0.1, 0.1)] # Mouvement dans toutes les directions + confetti_velocities.append(velocity) + + # Animation of the particales + for i, patch in enumerate(confetti_patches): + new_center = ( + patch.center[0] + confetti_velocities[i][0], # X movement + patch.center[1] + confetti_velocities[i][1] # Y movement + ) + patch.set_center(new_center) + + return evaluated_patches + path_patches + confetti_patches + +# creation of the animation +ani = animation.FuncAnimation(fig, update, frames=len(evaluated_nodes) + 20, interval=100, blit=False, repeat=False) + +plt.show() \ No newline at end of file diff --git a/quactography/classical/io.py b/quactography/classical/io.py new file mode 100644 index 0000000..8a57773 --- /dev/null +++ b/quactography/classical/io.py @@ -0,0 +1,104 @@ +import numpy as np +import json +import networkx as nx + + +def save_graph(G, output_base, copies=1): + """ + Save a graph to .npz files as arrays of nodes and edges. + + Parameters + ---------- + G : networkx.Graph + The graph to be saved. + output_base : str + The base name for output files (e.g., 'graph' will become 'graph_0.npz'). + copies : int, optional + The number of copies to save. Default is 1. + + Returns + ------- + None + """ + for i in range(copies): + output_file = f"{output_base}_{i}.npz" + nodes = list(G.nodes()) + edges = list(G.edges()) + + nodes_array = np.array(nodes) + edges_array = np.array(edges) + + np.savez(output_file, nodes=nodes_array, edges=edges_array) + print(f"Copie {i + 1}/{copies} saved as '{output_file}'.") + + +def load_the_graph(file_path): + """ + Load a graph from a .json or .npz file. + + Parameters + ---------- + file_path : str + Path to the graph file. Supported formats: .json, .npz. + + Returns + ------- + networkx.Graph + The loaded graph. + + Raises + ------ + ValueError + If the file format is not supported. + """ + if file_path.endswith(".json"): + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + G = nx.Graph() + for node in data["nodes"]: + G.add_node(tuple(node)) + + for edge in data["edges"]: + node1, node2 = tuple(map(tuple, edge[:2])) + weight = edge[2] if len(edge) > 2 else 1 + G.add_edge(node1, node2, weight=weight) + + return G + + if file_path.endswith(".npz"): + data = np.load(file_path, allow_pickle=True) + adjacency_matrix = data["adjacency_matrix"] + raw_nodes = data["node_indices"] + + node_indices = [ + tuple(node) if isinstance(node, (list, np.ndarray, tuple)) else (node,) + for node in raw_nodes + ] + node_indices = [(n[0], 0) if len(n) == 1 else n for n in node_indices] + + print( + f"Loaded adjacency matrix of shape {adjacency_matrix.shape} " + f"with {len(node_indices)} nodes." + ) + + G = nx.Graph() + for node in node_indices: + G.add_node(node) + + for i in range(len(adjacency_matrix)): + for j in range(i + 1, len(adjacency_matrix[i])): + if adjacency_matrix[i, j] > 0: + G.add_edge( + node_indices[i], node_indices[j], + weight=adjacency_matrix[i, j] + ) + + print(f"Nodes in G: {list(G.nodes())}") + print( + f"Graph charged with {G.number_of_nodes()} nodes and " + f"{G.number_of_edges()} edges." + ) + return G + + raise ValueError("Unsupported file format. Use either .json or .npz.") diff --git a/quactography/classical/utils/Astar.py b/quactography/classical/utils/Astar.py new file mode 100644 index 0000000..7b1afc7 --- /dev/null +++ b/quactography/classical/utils/Astar.py @@ -0,0 +1,90 @@ +import time +import heapq +from travel_related import heuristic, get_neighbors_diagonal + + +def astar_stepwise(G, start, target, diagonal_mode="nondiagonal"): + """ + Perform the A* pathfinding algorithm in a stepwise manner on a given graph. + + Parameters + ---------- + G : networkx.Graph + The input graph where each node is typically a tuple (e.g., (x, y)). + Edges may have a 'weight' attribute indicating traversal cost + (default is 1). + start : hashable + The starting node for the pathfinding algorithm. + target : hashable + The target (goal) node to reach. + diagonal_mode : str, optional + Neighbor retrieval mode. Must be either: + - "nondiagonal": only 4-directional neighbors (up, down, left, right) + - "diagonal": include diagonal neighbors (8 directions total) + Default is "nondiagonal". + + Returns + ------- + evaluated_nodes : list + The list of nodes in the order they were evaluated by the algorithm. + path_to_current : list of list + A list containing the reconstructed path from the start node to each + node evaluated so far, in evaluation order. + current_f_score : float + The final f-score (g + h) associated with the target node if found, + or with the last evaluated node if the target was not reached. + """ + start_time = time.time() + + g_scores = {node: float('inf') for node in G.nodes()} + g_scores[start] = 0 + + f_scores = {node: float('inf') for node in G.nodes()} + f_scores[start] = heuristic(start, target) + + previous_nodes = {node: None for node in G.nodes()} + evaluated_nodes = [] + path_to_current = [] + priority_queue = [(f_scores[start], start)] + heapq.heapify(priority_queue) + + while priority_queue: + current_f_score, current_node = heapq.heappop(priority_queue) + + if current_node not in evaluated_nodes: + evaluated_nodes.append(current_node) + + temp_path = [] + node = current_node + while node is not None: + temp_path.append(node) + node = previous_nodes[node] + temp_path.reverse() + path_to_current.append(temp_path) + + if current_node == target: + break + + if diagonal_mode == "diagonal": + neighbors = list(get_neighbors_diagonal(current_node, G)) + else: + neighbors = list(G.neighbors(current_node)) + + for neighbor in neighbors: + edge_weight = G[current_node][neighbor].get("weight", 1) + tentative_g_score = g_scores[current_node] + edge_weight + if tentative_g_score < g_scores[neighbor]: + previous_nodes[neighbor] = current_node + g_scores[neighbor] = tentative_g_score + f_scores[neighbor] = tentative_g_score + + heuristic(neighbor, target) + heapq.heappush( + priority_queue, + (f_scores[neighbor], neighbor) + ) + + end_time = time.time() + execution_time = end_time - start_time + print(f"Execution time of A* : {execution_time:.4f} seconds") + + return evaluated_nodes, path_to_current, current_f_score diff --git a/quactography/classical/utils/Dijkstra.py b/quactography/classical/utils/Dijkstra.py new file mode 100644 index 0000000..afb2f9e --- /dev/null +++ b/quactography/classical/utils/Dijkstra.py @@ -0,0 +1,89 @@ +import time +import heapq +from travel_related import get_neighbors_diagonal + + +def dijkstra_stepwise(G, start, target, diagonal_mode="nondiagonal"): + """ + Perform Dijkstra's algorithm in a stepwise manner on a given graph. + + Parameters + ---------- + G : networkx.Graph + The input graph where each node is typically a tuple (e.g., (x, y)). + Edges may have a 'weight' attribute indicating traversal cost + (default is 1). + start : hashable + The starting node for the pathfinding algorithm. + target : hashable + The target node to reach. + diagonal_mode : str, optional + Neighbor retrieval mode. Must be either: + - "nondiagonal": only 4-directional neighbors (up, down, left, right) + - "diagonal": include diagonal neighbors (8 directions total) + Default is "nondiagonal". + + Returns + ------- + evaluated_nodes : list + The list of nodes in the order they were evaluated by the algorithm. + path_to_current : list of list + A list containing the reconstructed path from the start node to each + node evaluated so far, in evaluation order. + current_distance : float + The final distance to the target node if found, or to the last + evaluated node otherwise. Returns None if no path is found. + """ + start_time = time.time() + + distances = {node: float('inf') for node in G.nodes()} + distances[start] = 0 + previous_nodes = {node: None for node in G.nodes()} + evaluated_nodes = [] + path_to_current = [] + priority_queue = [(0, start)] + heapq.heapify(priority_queue) + + while priority_queue: + current_distance, current_node = heapq.heappop(priority_queue) + + if current_node not in evaluated_nodes: + evaluated_nodes.append(current_node) + + temp_path = [] + node = current_node + while node is not None: + temp_path.append(node) + if node in previous_nodes: + node = previous_nodes[node] + else: + break + temp_path.reverse() + path_to_current.append(temp_path) + + if current_node == target: + break + + if diagonal_mode == "diagonal": + neighbors = list(get_neighbors_diagonal(current_node, G)) + else: + neighbors = list(G.neighbors(current_node)) + + for neighbor in neighbors: + if neighbor not in evaluated_nodes: + edge_weight = G[current_node][neighbor].get("weight", 1) + new_distance = current_distance + edge_weight + if new_distance < distances[neighbor]: + distances[neighbor] = new_distance + previous_nodes[neighbor] = current_node + heapq.heappush(priority_queue, (new_distance, neighbor)) + + if distances[target] == float('inf'): + print("No path found.") + return None, None, None + + end_time = time.time() + execution_time = end_time - start_time + print(f"Execution time of Dijkstra: {execution_time:.4f} seconds") + + return evaluated_nodes, path_to_current, current_distance diff --git a/quactography/classical/utils/random_grid_generator.py b/quactography/classical/utils/random_grid_generator.py new file mode 100644 index 0000000..77ccbf1 --- /dev/null +++ b/quactography/classical/utils/random_grid_generator.py @@ -0,0 +1,54 @@ +import numpy as np +import random +import networkx as nx + +def generate_grid(size, obstacle_mode="ratio", obstacle_ratio=0.2, obstacle_number=20): + """ + Generate a random 2D grid and its corresponding NetworkX graph. + + Parameters + ---------- + size : int + Size of the grid (the grid will be of shape `size x size`). + Must be a positive integer. + obstacle_mode : str + Strategy used to place obstacles in the grid. Options are: + - "ratio": place a proportion of obstacles based on `obstacle_ratio` + - "number": place a fixed number of obstacles based on `obstacle_number` + obstacle_ratio : float + Used only if `obstacle_mode` is "ratio". Defines the proportion of cells + to be turned into obstacles. Must be a float between 0 and 1. + obstacle_number : int + Used only if `obstacle_mode` is "number". Defines the exact number of obstacles + to place in the grid. Must be a positive integer. + + Returns + ------- + grid : np.ndarray + A 2D NumPy array of shape `(size, size)` representing the grid, where `1` + denotes an obstacle and `0` a free cell. + G : networkx.Graph + A 2D grid graph where each node is a tuple `(x, y)`. Edges connect 4-neighboring + nodes (up, down, left, right). Nodes corresponding to obstacles are removed. + """ + n = size + grid = np.zeros((n, n)) + G = nx.grid_2d_graph(n, n) + + obstacles = set() + + if obstacle_mode == "ratio": + num_obstacles = int(n * n * obstacle_ratio) + else: + num_obstacles = obstacle_number + + while len(obstacles) < num_obstacles: + x, y = random.randint(0, n - 1), random.randint(0, n - 1) + if (x, y) != (0, 0) and (x, y) != (n - 1, n - 1): + obstacles.add((x, y)) + grid[x, y] = 1 # Add an obstacle + if (x, y) in G: + G.remove_node((x, y)) + + return grid, G + diff --git a/quactography/classical/utils/travel_related.py b/quactography/classical/utils/travel_related.py new file mode 100644 index 0000000..94587ca --- /dev/null +++ b/quactography/classical/utils/travel_related.py @@ -0,0 +1,45 @@ +def get_neighbors_diagonal(node, G): + """ + Yield all 8-directional neighbors of a node that exist in the graph G. + + Parameters + ---------- + node : tuple + The current node position, typically a coordinate (x, y). + G : networkx.Graph + The graph in which neighbors should be checked. + + Yields + ------ + neighbor : tuple + A valid neighboring node in G that is adjacent to `node` in any of + the 8 possible directions (N, S, E, W, and diagonals). + """ + directions = [ + (0, 1), (1, 0), (0, -1), (-1, 0), + (1, 1), (-1, -1), (1, -1), (-1, 1) + ] + for dx, dy in directions: + neighbor = (node[0] + dx, node[1] + dy) + if neighbor in G.nodes(): + yield neighbor + + +def heuristic(current, target): + """ + Compute the Chebyshev distance between two points on a grid. + + Parameters + ---------- + current : tuple + The current node position (x, y). + target : tuple + The target node position (x, y). + + Returns + ------- + int + The Chebyshev distance between `current` and `target`. + This is appropriate for 8-directional movement. + """ + return max(abs(current[0] - target[0]), abs(current[1] - target[1])) diff --git a/scripts/classical_shortest_path_finder.py b/scripts/classical_shortest_path_finder.py new file mode 100644 index 0000000..d671532 --- /dev/null +++ b/scripts/classical_shortest_path_finder.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Find the shortest path between 2 points +in a graph using Dijkstra or A* algorithm. +Supports graphs loaded from JSON or NPZ files, +and optionally allows diagonal movement. +""" + +import argparse +import os +import sys +from quactography.classical.utils.random_Dijkstra import dijkstra_stepwise +from quactography.classical.utils.random_Astar import astar_stepwise, heuristic +from quactography.classical.io import load_the_graph + + +def _build_arg_parser(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument( + "--in_graph", type=str, required=True, + help="Path to the input graph file (.json or .npz)" + ) + parser.add_argument( + "--shortestpath", choices=['Dijkstra', 'A*'], default='Dijkstra', + help="Shortest path algorithm to use: 'Dijkstra' or 'A*'" + ) + parser.add_argument( + "--start", type=str, required=True, + help="Start node, e.g. '3,4'" + ) + parser.add_argument( + "--target", type=str, required=True, + help="Target node, e.g. '7,8'" + ) + parser.add_argument( + "--diagonal_mode", choices=['diagonal', 'nondiagonal'], + default='nondiagonal', + help="Allow diagonal movement or not" + ) + return parser + + +def parse_node(node_str): + try: + parts = node_str.strip().split(',') + return tuple(int(p) for p in parts if p.strip() != '') + except ValueError as e: + raise ValueError( + f"Invalid node format: '{node_str}' (expected format: x,y)" + ) from e + + +def main(): + parser = _build_arg_parser() + args = parser.parse_args() + + if not os.path.isfile(args.in_graph): + print(f"Error: File '{args.in_graph}' not found.") + sys.exit(1) + + try: + start = parse_node(args.start) + target = parse_node(args.target) + except ValueError as e: + print(f"Error parsing node: {e}") + sys.exit(1) + + G = load_the_graph(args.in_graph) + + if start not in G.nodes(): + print(f"Start node {start} not in graph.") + print(f"Available nodes: {list(G.nodes())[:5]}...") + sys.exit(1) + + if target not in G.nodes(): + print(f"Target node {target} not in graph.") + print(f"Available nodes: {list(G.nodes())[:5]}...") + sys.exit(1) + + print( + f"Finding shortest path from {start} to {target} using {args.shortestpath}..." + ) + + if args.shortestpath == "Dijkstra": + evaluated_nodes, path_history, path_cost = dijkstra_stepwise( + G, start, target, args.diagonal_mode + ) + else: + evaluated_nodes, path_history, path_cost = astar_stepwise( + G, start, target, args.diagonal_mode + ) + + if path_history is None: + print("No path found.") + sys.exit(0) + + shortest_path = [tuple(int(x) for x in n) for n in path_history[-1]] + + print("\nShortest path:") + print(" → ".join(map(str, shortest_path))) + print(f"Path cost: {path_cost:.2f}") + print(f"Nodes evaluated: {len(evaluated_nodes)}") + + +if __name__ == "__main__": + main() diff --git a/scripts/generate_random_grids.py b/scripts/generate_random_grids.py new file mode 100644 index 0000000..d1e5533 --- /dev/null +++ b/scripts/generate_random_grids.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Generate random grids/graphs with obstacles. +This script can optionally suppress the output +display of the grid and graph nodes. +The generated graphs are saved as .npz files. +""" + +import argparse +from my_research.utils.grid_dijkstra import generer_grille, save_graph + + +def _build_arg_parser(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawTextHelpFormatter + ) + + parser.add_argument( + '--size', type=int, default=10, + help="Size of the grid (grid will be of shape size x size)." + ) + + parser.add_argument( + '--obstacles', type=str, default='ratio:0.2', + help="Obstacle settings: 'ratio:' or 'number:'" + ) + + parser.add_argument( + '--output', type=str, required=True, + help="Output format: 'filename.npz;'. " + "This will generate files like 'filename_0.npz', etc." + ) + + parser.add_argument( + '--save_only', action='store_true', + help="If set, suppress grid and node outputs." + ) + + return parser + + +def parse_obstacle_mode(obstacle_str): + mode, value_str = obstacle_str.split(':') + value = float(value_str) if mode == 'ratio' else int(value_str) + + if mode == 'ratio' and not (0 <= value <= 1): + raise ValueError( + f"The obstacle ratio must be between 0 and 1 (received: {value})") + + return mode, value + + +def main(): + parser = _build_arg_parser() + args = parser.parse_args() + + mode, value = parse_obstacle_mode(args.obstacles) + + file, number = args.output.split(';') + number = int(number) + + grid, G = ( + generer_grille(args.size, mode, value) + if mode == "ratio" + else generer_grille(args.size, mode, value, value) + ) + + if not args.save_only: + print(f"{number} graphs saved as '{file}_X.npz'.") + print("Grille générée :") + print(grid) + print("Graph nodes:", list(G.nodes)) + + for i in range(number): + save_graph(G, f"{file}_{i}.npz") + + +if __name__ == "__main__": + main()