From a20c91c50d5b85a2d31d23e774083c1c442952f5 Mon Sep 17 00:00:00 2001 From: ben birch Date: Wed, 25 Feb 2026 13:14:30 +0000 Subject: [PATCH] Add verbose error handling updates --- board.py | 111 ++++++++++++++++++++------------ bus_router.py | 97 ++++++++++++++++++++++++++-- module.py | 3 +- run.py | 160 ++++++++++++++++++++++++++++++++++++++++++---- server.py | 44 ++++++++++--- server_packets.py | 1 + 6 files changed, 351 insertions(+), 65 deletions(-) diff --git a/board.py b/board.py index cc7e4419..d039b0c5 100644 --- a/board.py +++ b/board.py @@ -89,11 +89,12 @@ def _add_modules_from_loader(self) -> None: # Get module properties name = module_data.get('name') version = module_data.get('version') + module_id = module_data.get('id') position = (module_data['position']['x'], module_data['position']['y']) rotation = module_data.get('rotation') # Create Module object - module = Module(name=name, version=version, position=position, rotation=rotation) + module = Module(name=name, version=version, position=position, rotation=rotation, module_id=module_id) self.modules.append(module) def _add_layers_from_loader(self) -> None: @@ -295,9 +296,12 @@ def _validate_zones_and_modules(self) -> None: if not self.zones: print("🟠 No zones to validate") return + + # Recompute warnings fresh for this validation pass + self.position_warnings = [] - # 1. Check if any module zones overlap - print("🔵 Validating module zone overlaps") + # 1. Check if any module zones overlap or are too close + print("🔵 Validating module zone spacing") modules_with_zones = [module for module in self.modules if hasattr(module, 'zone') and module.zone] for i, module1 in enumerate(modules_with_zones): @@ -306,12 +310,30 @@ def _validate_zones_and_modules(self) -> None: if i >= j: continue - if self._do_zones_overlap(module1.zone, module2.zone): - warning = f"🔴 WARNING: Zone overlap detected between modules '{module1.name}' and '{module2.name}'" - print(warning) + overlap_without_margin = self._do_zones_overlap(module1.zone, module2.zone, margin=0.0) + if overlap_without_margin: + warning = ( + "MODULE_OVERLAPPING_OTHER_MODULE " + f"moduleIds=[{module1.module_id},{module2.module_id}] " + f"moduleIdShort=[{module1.module_id[:4] if module1.module_id else 'unkn'},{module2.module_id[:4] if module2.module_id else 'unkn'}] " + f"moduleNames=[{module1.name},{module2.name}]" + ) + print(f"🔴 {warning}") + self.position_warnings.append(warning) + continue + + overlap_with_margin = self._do_zones_overlap(module1.zone, module2.zone) + if overlap_with_margin: + warning = ( + "MODULE_TOO_CLOSE_TO_OTHER_MODULE " + f"moduleIds=[{module1.module_id},{module2.module_id}] " + f"moduleIdShort=[{module1.module_id[:4] if module1.module_id else 'unkn'},{module2.module_id[:4] if module2.module_id else 'unkn'}] " + f"moduleNames=[{module1.name},{module2.name}]" + ) + print(f"🔴 {warning}") self.position_warnings.append(warning) - # 2. Check if any module zone extends beyond the board size + # 2. Check if any module zone extends beyond board or is too close to board edge print("🔵 Validating module zones within board boundaries") # Calculate the board boundaries xmin = self.origin_x - self.width / 2 @@ -323,45 +345,46 @@ def _validate_zones_and_modules(self) -> None: # Extract corner points from module's zone bl, tl, tr, br = module.zone - # Check if any corner is outside the board boundaries - if (bl[0] < xmin or bl[1] < ymin or - tl[0] < xmin or tl[1] > ymax or - tr[0] > xmax or tr[1] > ymax or - br[0] > xmax or br[1] < ymin): - - warning = f"🔴 Module '{module.name}' zone extends beyond board boundaries" - print(warning) + zone_x_min = min(bl[0], tl[0], tr[0], br[0]) + zone_x_max = max(bl[0], tl[0], tr[0], br[0]) + zone_y_min = min(bl[1], tl[1], tr[1], br[1]) + zone_y_max = max(bl[1], tl[1], tr[1], br[1]) + + # Overhanging board edge + if (zone_x_min < xmin or zone_y_min < ymin or zone_x_max > xmax or zone_y_max > ymax): + warning = ( + "MODULE_OVERHANGING_BOARD_EDGE " + f"moduleId={module.module_id} " + f"moduleIdShort={module.module_id[:4] if module.module_id else 'unkn'} " + f"moduleName={module.name}" + ) + print(f"🔴 {warning}") + self.position_warnings.append(warning) + continue + + edge_clearance = min( + zone_x_min - xmin, + xmax - zone_x_max, + zone_y_min - ymin, + ymax - zone_y_max, + ) + + if self.module_margin > 0 and edge_clearance < self.module_margin: + warning = ( + "MODULE_TOO_CLOSE_TO_BOARD_EDGE " + f"moduleId={module.module_id} " + f"moduleIdShort={module.module_id[:4] if module.module_id else 'unkn'} " + f"moduleName={module.name}" + ) + print(f"🔴 {warning}") self.position_warnings.append(warning) - - # 3. Check if any module zones overlap with remaining zones in self.zones - print("🔵 Validating module zones against other zones") - all_zones = self.zones.get_data() - - for module in modules_with_zones: - module_zone = module.zone - - for zone in all_zones: - # Skip if this is the module's own zone - # Calculate zone center for comparison - bl_z, _, tr_z, _ = zone - center_x = (bl_z[0] + tr_z[0]) / 2 - center_y = (bl_z[1] + tr_z[1]) / 2 - - # Skip if this is the module's own zone - if module.position.x == center_x and module.position.y == center_y: - continue - - if self._do_zones_overlap(module_zone, zone): - warning = f"🔴 Module '{module.name}' zone overlaps with a zone at position {(zone[0], zone[2])}" - print(warning) - self.position_warnings.append(warning) if not self.position_warnings: print("🟢 All zones and modules validated successfully!") else: print(f"🔴 Found {len(self.position_warnings)} validation issues") - def _do_zones_overlap(self, zone1: List[Tuple[float, float]], zone2: List[Tuple[float, float]]) -> bool: + def _do_zones_overlap(self, zone1: List[Tuple[float, float]], zone2: List[Tuple[float, float]], margin: Optional[float] = None) -> bool: """ Check if two zones overlap, considering a module margin. @@ -377,7 +400,8 @@ def _do_zones_overlap(self, zone1: List[Tuple[float, float]], zone2: List[Tuple[ bl2, _, tr2, _ = zone2 # Add margin to the bounding boxes - margin = self.module_margin + if margin is None: + margin = self.module_margin # Create bounding boxes with margin [min_x, min_y, max_x, max_y] box1 = [bl1[0] - margin, bl1[1] - margin, tr1[0] + margin, tr1[1] + margin] @@ -600,6 +624,13 @@ def get_module_name_from_position(self, position: Tuple[float, float]) -> Option if module.position.x == position[0] and module.position.y == position[1]: return module.name return None + + def get_module_from_position(self, position: Tuple[float, float], epsilon: float = 1e-6) -> Optional[Module]: + """Get the module object at a specific position.""" + for module in self.modules: + if abs(module.position.x - position[0]) <= epsilon and abs(module.position.y - position[1]) <= epsilon: + return module + return None def add_sockets(self, sockets: Sockets) -> None: """Add sockets to the board diff --git a/bus_router.py b/bus_router.py index 1bf179d3..fa6922ae 100644 --- a/bus_router.py +++ b/bus_router.py @@ -1,4 +1,5 @@ import numpy as np +import math from collections import defaultdict from typing import Dict, List, Tuple @@ -601,6 +602,76 @@ def _group_sockets(self, sockets_data, zones_data): print(f"Ordered zone center: {zone_center}, Groups: {groups}") return ordered_socket_groups + + def _get_module_id(self, module) -> str: + return module.module_id if module and getattr(module, "module_id", None) else "unknown" + + def _get_module_short_id(self, module_id: str) -> str: + return module_id[:4] if module_id != "unknown" else "unkn" + + def _zone_bbox(self, zone: Tuple[Tuple[float, float], Tuple[float, float], Tuple[float, float], Tuple[float, float]]) -> Tuple[float, float, float, float]: + bl, _, tr, _ = zone + return bl[0], bl[1], tr[0], tr[1] + + def _zone_clearance(self, zone_a, zone_b) -> float: + ax_min, ay_min, ax_max, ay_max = self._zone_bbox(zone_a) + bx_min, by_min, bx_max, by_max = self._zone_bbox(zone_b) + + margin = self.board.module_margin + ax_min -= margin + ay_min -= margin + ax_max += margin + ay_max += margin + + bx_min -= margin + by_min -= margin + bx_max += margin + by_max += margin + + dx = max(ax_min - bx_max, bx_min - ax_max, 0.0) + dy = max(ay_min - by_max, by_min - ay_max, 0.0) + return math.hypot(dx, dy) + + def _find_too_close_module_pair(self, module) -> tuple[str, str, str, float]: + if module is None: + return "unknown", "unkn", "unknown", -1.0 + + best_other = None + best_clearance = float("inf") + best_center_distance = float("inf") + + for other in self.board.modules: + if other is module: + continue + + if getattr(module, "zone", None) and getattr(other, "zone", None): + clearance = self._zone_clearance(module.zone, other.zone) + else: + clearance = float("inf") + + center_distance = math.dist((module.position.x, module.position.y), (other.position.x, other.position.y)) + + if clearance < best_clearance or (clearance == best_clearance and center_distance < best_center_distance): + best_other = other + best_clearance = clearance + best_center_distance = center_distance + + if best_other is None: + return "unknown", "unkn", "unknown", -1.0 + + other_id = self._get_module_id(best_other) + return other_id, self._get_module_short_id(other_id), best_other.name if best_other.name else "unknown", best_clearance + + def _build_pair_issue(self, module_id: str, module_name: str, too_close_id: str, too_close_name: str, clearance_mm: float) -> str: + module_short = self._get_module_short_id(module_id) + too_close_short = self._get_module_short_id(too_close_id) + issue_code = "MODULE_OVERLAPPING_OTHER_MODULE" if clearance_mm <= 0 else "MODULE_TOO_CLOSE_TO_OTHER_MODULE" + return ( + f"{issue_code} " + f"moduleIds=[{module_id},{too_close_id}] " + f"moduleIdShort=[{module_short},{too_close_short}] " + f"moduleNames=[{module_name if module_name else 'unknown'},{too_close_name}]" + ) def route(self) -> None: try: @@ -634,11 +705,15 @@ def route(self) -> None: for zone_center, socket_groups in grouped_sockets.items(): - module_name = self.board.get_module_name_from_position(zone_center) + module = self.board.get_module_from_position(zone_center) + module_name = module.name if module else self.board.get_module_name_from_position(zone_center) + module_id = module.module_id if module and module.module_id else "unknown" # For each group of sockets for group_idx, socket_group in enumerate(socket_groups): i = 0 + state_visit_counts = defaultdict(int) + max_state_revisits = 4 if self.debugger: self.debugger.log_event(f"Starting group {group_idx}: {len(socket_group)} sockets") @@ -646,6 +721,16 @@ def route(self) -> None: # Process all sockets in the group print(f"Socket group length: {len(socket_group)}") while i < len(socket_group): + group_signature = tuple( + (entry[0], float(entry[1][0]), float(entry[1][1])) for entry in socket_group + ) + state_key = (i, group_signature) + state_visit_counts[state_key] += 1 + + if state_visit_counts[state_key] > max_state_revisits: + too_close_id, too_close_short, too_close_name, too_close_clearance = self._find_too_close_module_pair(module) + raise RuntimeError(self._build_pair_issue(module_id, module_name, too_close_id, too_close_name, too_close_clearance)) + socket = socket_group[i] net_name, socket_pos = socket @@ -694,13 +779,15 @@ def route(self) -> None: timeout = 7 if current_time - last_write_time > timeout: print(f"🔴 Abandoned job (ID: {thread_context.job_id}) due to expired keepalive ({timeout} seconds)") - sys.exit() # Exit the thread + too_close_id, too_close_short, too_close_name, too_close_clearance = self._find_too_close_module_pair(module) + raise RuntimeError(self._build_pair_issue(module_id, module_name, too_close_id, too_close_name, too_close_clearance)) # Also abandon the job if there's more than 150 images in the routing_imgs folder routing_imgs_folder = thread_context.job_folder / "routing_imgs" if routing_imgs_folder.exists() and len(list(routing_imgs_folder.glob("*.png"))) > 150: print(f"🔴 Abandoned job (ID: {thread_context.job_id}) due to too many routing attempts (>150)") - sys.exit() # Exit the thread + too_close_id, too_close_short, too_close_name, too_close_clearance = self._find_too_close_module_pair(module) + raise RuntimeError(self._build_pair_issue(module_id, module_name, too_close_id, too_close_name, too_close_clearance)) path = self._route_socket_to_bus(self.base_grid, socket_pos, bus_point, net_name) @@ -742,8 +829,8 @@ def route(self) -> None: if i == 0: i += 1 socket_count += 1 - # continue - raise Exception(f" socket at {socket_pos} in group {group_idx} (first socket, cannot backtrack)") + too_close_id, too_close_short, too_close_name, too_close_clearance = self._find_too_close_module_pair(module) + raise Exception(self._build_pair_issue(module_id, module_name, too_close_id, too_close_name, too_close_clearance)) # Otherwise, we can backtrack print(f"🟠 Backtracking in group {group_idx} at socket {i}") diff --git a/module.py b/module.py index 42982187..9779bf46 100644 --- a/module.py +++ b/module.py @@ -16,9 +16,10 @@ def __repr__(self) -> str: return f"Position(x={self.x}, y={self.y})" class Module: - def __init__(self, name: str, version: str, position: Tuple[float, float], rotation: float): + def __init__(self, name: str, version: str, position: Tuple[float, float], rotation: float, module_id: Optional[str] = None): self.name = name self.version = version + self.module_id = module_id # HACK: Manually inverting the y-coordinate here self.position = Position(position[0], -position[1]) self.rotation = rotation diff --git a/run.py b/run.py index 2f8c87b9..435d3ee0 100644 --- a/run.py +++ b/run.py @@ -1,5 +1,7 @@ from process import merge_layers import os +import json +import re from gerbersockets import Sockets, Zones from loader import Loader @@ -10,12 +12,123 @@ from process import merge_stacks, compress_directory from consolidate import consolidate_component_files -import warnings import firmware from pathlib import Path import thread_context + +ALLOWED_ISSUE_PREFIXES = ( + "MODULE_TOO_CLOSE_TO_BOARD_EDGE", + "MODULE_TOO_CLOSE_TO_OTHER_MODULE", + "MODULE_OVERLAPPING_OTHER_MODULE", + "MODULE_OVERHANGING_BOARD_EDGE", +) + + +def _issues_file_path() -> Path: + return Path(thread_context.job_folder) / "issues.json" + + +def _read_issue_payload() -> dict: + payload = {"issues": []} + issues_file = _issues_file_path() + if not issues_file.exists(): + return payload + + try: + with open(issues_file, "r") as file: + content = json.load(file) + if isinstance(content, dict): + payload["issues"] = content.get("issues", []) if isinstance(content.get("issues", []), list) else [] + except Exception: + pass + + return payload + + +def _append_issue(message: str) -> None: + if not message: + return + + if not message.startswith(ALLOWED_ISSUE_PREFIXES): + return + + if ("moduleId=" not in message and + "moduleIds=" not in message and + "moduleIdsAll=" not in message): + board = getattr(thread_context, "board", None) + if board is not None and getattr(board, "modules", None): + ids = _all_module_ids(board) + short_ids = _all_module_ids_short(board) + message = ( + f"{message} " + f"moduleIdsAll=[{','.join(ids)}] " + f"moduleIdsAllShort=[{','.join(short_ids)}]" + ) + + payload = _read_issue_payload() + if message not in payload["issues"]: + payload["issues"].append(message) + + with open(_issues_file_path(), "w") as file: + json.dump(payload, file) + + +def _record_failure(message: str) -> None: + if not message: + return + + error_file = Path(thread_context.job_folder) / "error.txt" + with open(error_file, "w") as file: + file.write(message) + + _append_issue(message) + + +def _normalize_position_warning(warning: str) -> str: + return warning.strip() + + +def _sync_position_warnings(board: Board) -> None: + for warning in board.position_warnings: + message = _normalize_position_warning(warning) + _append_issue(message) + + +def _read_router_error() -> str: + error_file = Path(thread_context.job_folder) / "error.txt" + if not error_file.exists(): + return "" + try: + with open(error_file, "r") as file: + return file.read().strip() + except Exception: + return "" + + +def _all_module_ids(board: Board) -> list[str]: + module_ids: list[str] = [] + for module in board.modules: + module_id = module.module_id if getattr(module, "module_id", None) else "unknown" + module_ids.append(module_id) + return module_ids + + +def _all_module_ids_short(board: Board) -> list[str]: + return [module_id[:4] if module_id != "unknown" else "unkn" for module_id in _all_module_ids(board)] + + +def _issue_with_all_modules(code: str, board: Board, extra_fields: str = "") -> str: + ids = _all_module_ids(board) + short_ids = _all_module_ids_short(board) + base = ( + f"{code} " + f"moduleIdsAll=[{','.join(ids)}] " + f"moduleIdsAllShort=[{','.join(short_ids)}]" + ) + return f"{base} {extra_fields}".strip() + # Make sure to run `source venv/bin/activate` first! def run(job_id: str, job_folder: Path) -> dict: @@ -34,6 +147,10 @@ def run(job_id: str, job_folder: Path) -> dict: thread_context.job_id = job_id thread_context.job_folder = Path(job_folder) + # Initialize job-level user-facing issue tracking + with open(_issues_file_path(), "w") as file: + json.dump({"issues": []}, file) + # TODO: Change the rest of the code to reflect these decisions if (not hasattr(thread_context, "job_folder")): raise RuntimeError("run() must be called from within a thread") @@ -60,6 +177,7 @@ def run(job_id: str, job_folder: Path) -> dict: if gerbersockets_layer is None: print("🔴 No GerberSockets layer found in any module") + _record_failure(_issue_with_all_modules("ROUTING_LAYER_MISSING_GERBERSOCKETS", board)) return {"failed": True} print("🟢 Merged", loader.gerbersockets_layer_name, "layers") @@ -68,6 +186,7 @@ def run(job_id: str, job_folder: Path) -> dict: sockets = Sockets(loader, gerbersockets_layer) if sockets.get_socket_count() == 0: print("🔴 No sockets found") + _record_failure(_issue_with_all_modules("ROUTING_NO_SOCKETS_FOUND", board)) return {"failed": True} else: board.add_sockets(sockets) @@ -81,9 +200,11 @@ def run(job_id: str, job_folder: Path) -> dict: zones = Zones(loader, gerbersockets_layer) if zones.get_zone_count() == 0: print("🔴 No keep-out zones found, and added them to the board") + _record_failure(_issue_with_all_modules("PLACEMENT_NO_KEEP_OUT_ZONES", board)) return {"failed": True} else: board.add_zones(zones) + _sync_position_warnings(board) print( "🟢 Found", zones.get_zone_count(), @@ -101,9 +222,9 @@ def run(job_id: str, job_folder: Path) -> dict: print(f" {', '.join(str_nets)}") # Generate JSON containing module/net mappings needed for MCU programming - json = board.get_programming_json() + programming_json = board.get_programming_json() with open(thread_context.job_folder / "firmware.json", "w") as json_file: - json_file.write(json) + json_file.write(programming_json) print("🟢 Generated MCU programming firmware JSON file") # TODO: for now it will be hardcoded, but would be good to identify the track/buses layers programatically @@ -115,11 +236,21 @@ def run(job_id: str, job_folder: Path) -> dict: board, tracks_layer=top_layer, buses_layer=bottom_layer, side="left" ) left_router.route() + left_route_error = _read_router_error() + if left_route_error: + _append_issue(left_route_error) + return {"failed": True} right_router = BusRouter( board, tracks_layer=bottom_layer, buses_layer=top_layer, side="right" ) right_router.route() + right_route_error = _read_router_error() + if right_route_error: + _append_issue(right_route_error) + return {"failed": True} + + _sync_position_warnings(board) # Save final front.svg / back.svg try: @@ -130,14 +261,12 @@ def run(job_id: str, job_folder: Path) -> dict: print(f"🔴 Error saving final SVGs: {e}") else: print("🔴 Could not find both top and bottom layers for routing") + _record_failure(_issue_with_all_modules("ROUTING_LAYERS_MISSING_TOP_OR_BOTTOM", board)) return {"failed": True} - # Suppress warnings from Gerbonara during generation - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - generate(board) - merge_stacks(board.modules, board.name) - consolidate_component_files(board.modules, board.name) + generate(board) + merge_stacks(board.modules, board.name) + consolidate_component_files(board.modules, board.name) # Count only sockets that were assigned to modules (ignore unassigned sockets) module_nets = board.get_module_nets() @@ -147,7 +276,15 @@ def run(job_id: str, job_folder: Path) -> dict: if (all - connected) == 0: print(f"🟢 All {connected} GerberSockets routed successfully") else: - print(f"🔴 GerberSockets routing incomplete for {all - connected} socket. {connected}/{all} completed") + failed_count = all - connected + print(f"🔴 GerberSockets routing incomplete for {failed_count} socket. {connected}/{all} completed") + _record_failure( + _issue_with_all_modules( + "ROUTING_UNCONNECTED_SOCKETS", + board, + f"failedSockets={failed_count} connectedSockets={connected} totalSockets={all}", + ) + ) return {"failed": True} # Generate the firmware files for microbit/RP2040 module to flash all Jacdac-based SMT32 virtual modules @@ -155,8 +292,9 @@ def run(job_id: str, job_folder: Path) -> dict: firmware.run() print("🟢 Generated firmware files") compress_directory(thread_context.job_folder / "output") - except Exception as e: + except BaseException as e: print("🔴 Failed to generate firmware files:", e) + _append_issue(_issue_with_all_modules("FIRMWARE_GENERATION_FAILED", board, f"error={str(e)}")) compress_directory(thread_context.job_folder / "output") # Write to a text fail indicating zip ready diff --git a/server.py b/server.py index 389d4f53..9f22c198 100644 --- a/server.py +++ b/server.py @@ -49,6 +49,25 @@ def generate_id(length=8): # Was using import uuid, but this is simpler chars = string.ascii_letters + string.digits # A-Z, a-z, 0-9 return ''.join(random.choices(chars, k=length)) + +def read_job_issues(job_id: str) -> list: + issues_file = job_folder_base / job_id / "issues.json" + issues: list[str] = [] + + if not issues_file.exists(): + return issues + + try: + with open(issues_file, 'r') as file: + data = json.load(file) + if isinstance(data, dict): + loaded = data.get("issues", []) + issues = loaded if isinstance(loaded, list) else [] + except Exception as e: + print(f"🔴 Error reading issues file {issues_file}: {e}") + + return issues + # TODO: Implement these new endpoints properly @app.route('/routingStart', methods=['POST']) def routing_start(): @@ -177,15 +196,20 @@ def _read_b64(path): if routing_failed: - response: RoutingProgressResponse = { - "endpoint": "routingProgress", - "error": { - "message": error_message, - "failedModuleIds": [], # TODO: Implement - "succeededModuleIds": [], # TODO: Implement + issue_list = read_job_issues(job_id) + if error_message and error_message not in issue_list: + issue_list.append(error_message) + + response: RoutingProgressResponse = { + "endpoint": "routingProgress", + "issues": issue_list, + "error": { + "message": error_message, + "failedModuleIds": [], # TODO: Implement + "succeededModuleIds": [], # TODO: Implement + } } - } - return jsonify(response), 200 + return jsonify(response), 200 else: response: RoutingProgressResponse = { "endpoint": "routingProgress", @@ -198,6 +222,10 @@ def _read_b64(path): # TODO: Implement bus width left and right } } + + if finished_success: + response["issues"] = read_job_issues(job_id) + return jsonify(response), 200 diff --git a/server_packets.py b/server_packets.py index 525a6d63..22b0fd00 100644 --- a/server_packets.py +++ b/server_packets.py @@ -61,6 +61,7 @@ class RoutingProgressResponse(TypedDict): endpoint: Literal["routingProgress"] error: NotRequired[RoutingProgressResponseError] result: NotRequired[RoutingProgressResponseResult] + issues: NotRequired[List[str]] class PCBArtifactResponseResult(TypedDict):