diff --git a/src/archml/views/placement.py b/src/archml/views/placement.py new file mode 100644 index 0000000..f2b7d39 --- /dev/null +++ b/src/archml/views/placement.py @@ -0,0 +1,629 @@ +# Copyright 2026 ArchML Contributors +# SPDX-License-Identifier: Apache-2.0 + +"""Placement algorithm for ArchML visualization diagrams. + +Implements a Sugiyama-style hierarchical layout algorithm that produces a +backend-independent :class:`LayoutPlan` describing where to position every +element in a :class:`~archml.views.topology.VizDiagram`. + +Algorithm overview +------------------ +The algorithm operates in four phases, mirroring the classical Sugiyama +framework used by Graphviz *dot*: + +1. **Layer assignment** — Internal child nodes are partitioned into horizontal + columns using the *longest-path* algorithm. Nodes that only initiate + requests (edge sources, i.e. nodes whose ``requires`` ports appear as edge + source ports) are placed in the leftmost column; nodes that only respond + (edge sinks) are placed in the rightmost column. + +2. **Crossing minimisation** — Nodes within each column are ordered to reduce + the number of edge crossings using the *barycenter heuristic* with multiple + alternating forward and backward sweep passes. + +3. **Peripheral placement** — Nodes outside the root boundary (terminal nodes + and external actors) are classified as *left* or *right* peripherals based + on their role in the edge graph, and stacked vertically beside the boundary. + +4. **Coordinate assignment** — Abstract float coordinates are assigned to every + node and boundary. Port anchors follow the ArchML convention: ``requires`` + ports are anchored to the **left edge** of their node (incoming connections), + ``provides`` ports to the **right edge** (outgoing connections). Edge routes + are straight lines between the port anchors. + +Output +------ +The resulting :class:`LayoutPlan` is fully independent of any rendering +backend (SVG, Dash, etc.). All coordinates are expressed in abstract *layout +units*. Renderers multiply these by their chosen scale factor to obtain pixel +or viewport coordinates. +""" + +from __future__ import annotations + +from collections import defaultdict, deque +from dataclasses import dataclass, field + +from archml.views.topology import VizBoundary, VizDiagram, VizNode, VizPort + +# ############### +# Public Interface +# ############### + + +@dataclass +class LayoutConfig: + """Configuration parameters for the placement algorithm. + + All dimensions are in abstract layout units. Renderers scale them to + actual pixel or viewport sizes. + + Attributes: + node_width: Default width of every internal child node. + node_height: Default height of every internal child node. + layer_gap: Horizontal gap between adjacent node columns inside the + root boundary. + node_gap: Vertical gap between nodes stacked in the same column. + peripheral_gap: Horizontal gap between peripheral nodes and the root + boundary edge. + boundary_padding: Padding between the root boundary edge and the + nearest child node on each side. + peripheral_node_width: Width of terminal and external peripheral nodes. + peripheral_node_height: Height of terminal and external peripheral nodes. + """ + + node_width: float = 120.0 + node_height: float = 60.0 + layer_gap: float = 80.0 + node_gap: float = 40.0 + peripheral_gap: float = 80.0 + boundary_padding: float = 40.0 + peripheral_node_width: float = 100.0 + peripheral_node_height: float = 50.0 + + +@dataclass +class PortAnchor: + """The exact point where an edge attaches to a port. + + Attributes: + port_id: Stable identifier of the :class:`~archml.views.topology.VizPort`. + x: Horizontal position of the attachment point (layout units). + y: Vertical position of the attachment point (layout units). + """ + + port_id: str + x: float + y: float + + +@dataclass +class NodeLayout: + """Position and size of a :class:`~archml.views.topology.VizNode`. + + The origin ``(x, y)`` is the **top-left corner** of the node rectangle. + The node spans ``[x, x + width] × [y, y + height]``. + + Attributes: + node_id: Stable identifier matching :attr:`VizNode.id`. + x: Left edge (layout units). + y: Top edge (layout units). + width: Horizontal extent (layout units). + height: Vertical extent (layout units). + """ + + node_id: str + x: float + y: float + width: float + height: float + + +@dataclass +class BoundaryLayout: + """Position and size of a :class:`~archml.views.topology.VizBoundary`. + + Attributes: + boundary_id: Stable identifier matching :attr:`VizBoundary.id`. + x: Left edge (layout units). + y: Top edge (layout units). + width: Horizontal extent (layout units). + height: Vertical extent (layout units). + """ + + boundary_id: str + x: float + y: float + width: float + height: float + + +@dataclass +class EdgeRoute: + """Polyline route for a :class:`~archml.views.topology.VizEdge`. + + The route is expressed as an ordered list of ``(x, y)`` waypoints. The + first waypoint coincides with the source port anchor; the last with the + target port anchor. Additional interior waypoints may be added by more + sophisticated routing passes; the base implementation uses straight lines + (two waypoints only). + + Attributes: + edge_id: Stable identifier matching :attr:`VizEdge.id`. + waypoints: Ordered ``(x, y)`` coordinate pairs (layout units). + """ + + edge_id: str + waypoints: list[tuple[float, float]] = field(default_factory=list) + + +@dataclass +class LayoutPlan: + """Complete backend-independent layout plan for a :class:`~archml.views.topology.VizDiagram`. + + All positions and sizes are in abstract layout units. + + Attributes: + diagram_id: Matches :attr:`VizDiagram.id`. + total_width: Bounding-box width of the entire diagram (layout units). + total_height: Bounding-box height of the entire diagram (layout units). + nodes: Mapping from node ID to its :class:`NodeLayout`. Covers all + internal child nodes and all peripheral nodes. + boundaries: Mapping from boundary ID to its :class:`BoundaryLayout`. + Currently contains the root boundary; nested boundaries will be + added in future versions. + port_anchors: Mapping from port ID to its :class:`PortAnchor`. + Covers ports on every node and boundary in the diagram. + edge_routes: Mapping from edge ID to its :class:`EdgeRoute`. Only + edges whose both port anchors are resolved are included. + """ + + diagram_id: str + total_width: float + total_height: float + nodes: dict[str, NodeLayout] = field(default_factory=dict) + boundaries: dict[str, BoundaryLayout] = field(default_factory=dict) + port_anchors: dict[str, PortAnchor] = field(default_factory=dict) + edge_routes: dict[str, EdgeRoute] = field(default_factory=dict) + + +def compute_layout( + diagram: VizDiagram, + *, + config: LayoutConfig | None = None, +) -> LayoutPlan: + """Compute a :class:`LayoutPlan` for *diagram*. + + Applies the Sugiyama-style hierarchical layout algorithm described in this + module's docstring. The returned plan is fully independent of any rendering + backend. + + Args: + diagram: The topology to lay out, as produced by + :func:`~archml.views.topology.build_viz_diagram`. + config: Optional layout configuration. Defaults to + :class:`LayoutConfig` with standard values if omitted. + + Returns: + A :class:`LayoutPlan` covering all nodes, the root boundary, all port + anchors, and all resolvable edge routes. + """ + return _Layouter(diagram, config or LayoutConfig()).run() + + +# ################ +# Implementation +# ################ + + +class _Layouter: + """Stateful helper that runs the full Sugiyama layout pipeline.""" + + def __init__(self, diagram: VizDiagram, cfg: LayoutConfig) -> None: + self._diagram = diagram + self._cfg = cfg + + def run(self) -> LayoutPlan: # noqa: PLR0914 – intentionally wide orchestrator + diagram = self._diagram + cfg = self._cfg + + # --- Step 0: build port → node mapping --- + port_to_node = _build_port_to_node(diagram) + + # --- Step 1: collect internal child nodes (VizNode only) --- + child_nodes: list[VizNode] = [n for n in diagram.root.children if isinstance(n, VizNode)] + child_ids = {n.id for n in child_nodes} + child_by_id = {n.id: n for n in child_nodes} + + # Build directed edge graph between child nodes. + internal_edges: list[tuple[str, str]] = [] + for edge in diagram.edges: + src = port_to_node.get(edge.source_port_id) + tgt = port_to_node.get(edge.target_port_id) + if src in child_ids and tgt in child_ids and src != tgt: + internal_edges.append((src, tgt)) + + # --- Step 2: layer assignment (longest-path) --- + child_id_list = [n.id for n in child_nodes] + raw_layers = _longest_path_layers(child_id_list, internal_edges) + + num_layers = max(raw_layers.values(), default=-1) + 1 if raw_layers else 0 + layer_groups: dict[int, list[str]] = defaultdict(list) + for node_id, layer in raw_layers.items(): + layer_groups[layer].append(node_id) + ordered_layers: list[list[str]] = [layer_groups.get(i, []) for i in range(num_layers)] + + # --- Step 3: crossing minimisation (barycenter heuristic) --- + ordered_layers = _minimise_crossings(ordered_layers, internal_edges) + + # --- Step 4: classify peripheral nodes --- + peripheral_left, peripheral_right = _classify_peripherals(diagram.peripheral_nodes, diagram.edges, port_to_node) + + # --- Step 5: compute geometry --- + max_per_layer = max((len(la) for la in ordered_layers), default=0) + + inner_w = num_layers * cfg.node_width + max(0, num_layers - 1) * cfg.layer_gap + inner_h = max_per_layer * cfg.node_height + max(0, max_per_layer - 1) * cfg.node_gap + boundary_w = inner_w + 2 * cfg.boundary_padding + boundary_h = inner_h + 2 * cfg.boundary_padding + + left_h = _stack_height(len(peripheral_left), cfg.peripheral_node_height, cfg.node_gap) + right_h = _stack_height(len(peripheral_right), cfg.peripheral_node_height, cfg.node_gap) + + total_h = max(boundary_h, left_h, right_h) + + left_zone_w = cfg.peripheral_node_width if peripheral_left else 0.0 + right_zone_w = cfg.peripheral_node_width if peripheral_right else 0.0 + left_gap = cfg.peripheral_gap if peripheral_left else 0.0 + right_gap = cfg.peripheral_gap if peripheral_right else 0.0 + + boundary_x = left_zone_w + left_gap + boundary_y = (total_h - boundary_h) / 2.0 + total_w = boundary_x + boundary_w + right_gap + right_zone_w + + # --- Step 6: assign node positions --- + node_layouts: dict[str, NodeLayout] = {} + + for layer_idx, layer_node_ids in enumerate(ordered_layers): + col_x = boundary_x + cfg.boundary_padding + layer_idx * (cfg.node_width + cfg.layer_gap) + col_h = _stack_height(len(layer_node_ids), cfg.node_height, cfg.node_gap) + col_start_y = boundary_y + cfg.boundary_padding + (inner_h - col_h) / 2.0 + for row, node_id in enumerate(layer_node_ids): + node_layouts[node_id] = NodeLayout( + node_id=node_id, + x=col_x, + y=col_start_y + row * (cfg.node_height + cfg.node_gap), + width=cfg.node_width, + height=cfg.node_height, + ) + + left_start_y = (total_h - left_h) / 2.0 + for i, node in enumerate(peripheral_left): + node_layouts[node.id] = NodeLayout( + node_id=node.id, + x=0.0, + y=left_start_y + i * (cfg.peripheral_node_height + cfg.node_gap), + width=cfg.peripheral_node_width, + height=cfg.peripheral_node_height, + ) + + right_x = boundary_x + boundary_w + right_gap + right_start_y = (total_h - right_h) / 2.0 + for i, node in enumerate(peripheral_right): + node_layouts[node.id] = NodeLayout( + node_id=node.id, + x=right_x, + y=right_start_y + i * (cfg.peripheral_node_height + cfg.node_gap), + width=cfg.peripheral_node_width, + height=cfg.peripheral_node_height, + ) + + # --- Step 7: boundary layout --- + boundary_layouts: dict[str, BoundaryLayout] = { + diagram.root.id: BoundaryLayout( + boundary_id=diagram.root.id, + x=boundary_x, + y=boundary_y, + width=boundary_w, + height=boundary_h, + ) + } + + # --- Step 8: port anchors --- + port_anchors: dict[str, PortAnchor] = {} + + root_bl = boundary_layouts[diagram.root.id] + _add_boundary_anchors(diagram.root, root_bl, port_anchors) + + all_viz_nodes: dict[str, VizNode] = { + **child_by_id, + **{n.id: n for n in diagram.peripheral_nodes}, + } + for node_id, nl in node_layouts.items(): + viz_node = all_viz_nodes.get(node_id) + if viz_node is not None: + _add_node_anchors(viz_node, nl, port_anchors) + + # --- Step 9: edge routes (straight-line) --- + edge_routes: dict[str, EdgeRoute] = {} + for edge in diagram.edges: + src_anc = port_anchors.get(edge.source_port_id) + tgt_anc = port_anchors.get(edge.target_port_id) + if src_anc is not None and tgt_anc is not None: + edge_routes[edge.id] = EdgeRoute( + edge_id=edge.id, + waypoints=[(src_anc.x, src_anc.y), (tgt_anc.x, tgt_anc.y)], + ) + + return LayoutPlan( + diagram_id=diagram.id, + total_width=total_w, + total_height=total_h, + nodes=node_layouts, + boundaries=boundary_layouts, + port_anchors=port_anchors, + edge_routes=edge_routes, + ) + + +# -------- graph helpers -------- + + +def _build_port_to_node(diagram: VizDiagram) -> dict[str, str]: + """Return a mapping ``port_id → node_or_boundary_id`` for the whole diagram.""" + result: dict[str, str] = {} + for p in diagram.root.ports: + result[p.id] = diagram.root.id + _collect_boundary_port_to_node(diagram.root, result) + for node in diagram.peripheral_nodes: + for p in node.ports: + result[p.id] = node.id + return result + + +def _collect_boundary_port_to_node(boundary: VizBoundary, out: dict[str, str]) -> None: + """Recursively map ports of *boundary*'s children into *out*.""" + for child in boundary.children: + if isinstance(child, VizNode): + for p in child.ports: + out[p.id] = child.id + else: + for p in child.ports: + out[p.id] = child.id + _collect_boundary_port_to_node(child, out) + + +def _classify_peripherals( + peripheral_nodes: list[VizNode], + edges: list, + port_to_node: dict[str, str], +) -> tuple[list[VizNode], list[VizNode]]: + """Classify peripheral nodes as left (source) or right (sink). + + A peripheral node is *left* if it acts as an edge source (its ports appear + as ``source_port_id`` in edges, meaning it initiates requests rightward + into the boundary). It is *right* if it acts as an edge target (its ports + appear as ``target_port_id``, meaning it receives requests from the + boundary). Ties and unconnected nodes fall back to port direction. + """ + source_port_ids = {e.source_port_id for e in edges} + target_port_ids = {e.target_port_id for e in edges} + + left: list[VizNode] = [] + right: list[VizNode] = [] + + for node in peripheral_nodes: + node_port_ids = {p.id for p in node.ports} + is_src = bool(node_port_ids & source_port_ids) + is_tgt = bool(node_port_ids & target_port_ids) + + if is_src and not is_tgt: + left.append(node) + elif is_tgt and not is_src: + right.append(node) + elif is_src and is_tgt: + # Mixed: use majority port direction as tiebreaker. + req_count = sum(1 for p in node.ports if p.direction == "requires") + prov_count = len(node.ports) - req_count + if req_count >= prov_count: + left.append(node) + else: + right.append(node) + else: + # No edges: terminal kind is authoritative. + has_req = any(p.direction == "requires" for p in node.ports) + if has_req: + left.append(node) + else: + right.append(node) + + return left, right + + +# -------- layer assignment -------- + + +def _longest_path_layers( + node_ids: list[str], + edges: list[tuple[str, str]], +) -> dict[str, int]: + """Assign a layer index to each node using the longest-path algorithm. + + Nodes with no incoming edges (sources) are assigned layer 0. Each + subsequent node is placed one layer after the deepest of its predecessors. + This produces the minimum-height layering for a DAG. + + Cycles (which should not appear in valid ArchML models) are handled + gracefully: any node that cannot be reached from a source is assigned + layer 0. + + Args: + node_ids: All node identifiers to be layered. + edges: Directed edges as ``(source_id, target_id)`` pairs. Only + edges whose both endpoints are in *node_ids* are considered. + + Returns: + Mapping from node ID to its layer index (0 = leftmost). + """ + node_set = set(node_ids) + successors: dict[str, list[str]] = defaultdict(list) + in_degree: dict[str, int] = {n: 0 for n in node_ids} + + for src, tgt in edges: + if src in node_set and tgt in node_set and src != tgt: + successors[src].append(tgt) + in_degree[tgt] += 1 + + # Kahn's topological sort, tracking longest path simultaneously. + layers: dict[str, int] = {} + queue: deque[str] = deque() + for n in node_ids: + if in_degree[n] == 0: + layers[n] = 0 + queue.append(n) + + if not queue: + # All nodes are in cycles – assign every node to layer 0. + return {n: 0 for n in node_ids} + + while queue: + curr = queue.popleft() + curr_layer = layers[curr] + for succ in successors[curr]: + candidate = curr_layer + 1 + if succ not in layers or layers[succ] < candidate: + layers[succ] = candidate + in_degree[succ] -= 1 + if in_degree[succ] == 0: + queue.append(succ) + + # Nodes unreachable from sources (in cycles) get layer 0. + for n in node_ids: + if n not in layers: + layers[n] = 0 + + return layers + + +# -------- crossing minimisation -------- + + +def _minimise_crossings( + layers: list[list[str]], + edges: list[tuple[str, str]], + num_passes: int = 4, +) -> list[list[str]]: + """Reduce edge crossings using the barycenter heuristic. + + Performs *num_passes* alternating forward and backward sweeps. In each + forward sweep the nodes in layer *i* are reordered by the average position + of their predecessors in layer *i − 1*. Backward sweeps do the same in + the opposite direction using successors. + + Args: + layers: Current column-by-column node ordering (modified in place). + edges: Directed edges as ``(source_id, target_id)`` pairs. + num_passes: Number of alternating sweep passes (default 4). + + Returns: + New layer ordering with reduced crossings. + """ + if len(layers) <= 1: + return layers + + successors: dict[str, list[str]] = defaultdict(list) + predecessors: dict[str, list[str]] = defaultdict(list) + for src, tgt in edges: + successors[src].append(tgt) + predecessors[tgt].append(src) + + result = [list(la) for la in layers] + + for pass_num in range(num_passes): + if pass_num % 2 == 0: # forward: order layer i by predecessors in i-1 + for i in range(1, len(result)): + fixed = {node: pos for pos, node in enumerate(result[i - 1])} + result[i] = _barycenter_sort(result[i], predecessors, fixed) + else: # backward: order layer i by successors in i+1 + for i in range(len(result) - 2, -1, -1): + fixed = {node: pos for pos, node in enumerate(result[i + 1])} + result[i] = _barycenter_sort(result[i], successors, fixed) + + return result + + +def _barycenter_sort( + nodes: list[str], + neighbors: dict[str, list[str]], + positions: dict[str, int], +) -> list[str]: + """Sort *nodes* by the average position of their neighbours in the fixed layer. + + Nodes with no neighbours in the fixed layer keep their relative order + (they are sorted to the end, preserving stability via Python's stable sort). + """ + + def _key(node_id: str) -> float: + nbrs = [positions[nb] for nb in neighbors[node_id] if nb in positions] + return sum(nbrs) / len(nbrs) if nbrs else float(len(positions)) + + return sorted(nodes, key=_key) + + +# -------- coordinate helpers -------- + + +def _stack_height(count: int, item_h: float, gap: float) -> float: + """Total height of *count* items stacked with *gap* between them.""" + if count <= 0: + return 0.0 + return count * item_h + max(0, count - 1) * gap + + +def _add_node_anchors( + node: VizNode, + layout: NodeLayout, + out: dict[str, PortAnchor], +) -> None: + """Compute and record port anchors for a :class:`VizNode`. + + ``requires`` ports are anchored to the **left edge** (x = layout.x). + ``provides`` ports are anchored to the **right edge** (x = layout.x + width). + Multiple ports on the same side are spaced evenly along the vertical axis. + """ + req = [p for p in node.ports if p.direction == "requires"] + prov = [p for p in node.ports if p.direction == "provides"] + _anchor_ports_on_edge(req, layout.x, layout.y, layout.height, out) + _anchor_ports_on_edge(prov, layout.x + layout.width, layout.y, layout.height, out) + + +def _add_boundary_anchors( + boundary: VizBoundary, + layout: BoundaryLayout, + out: dict[str, PortAnchor], +) -> None: + """Compute and record port anchors for a :class:`VizBoundary`. + + Follows the same left/right convention as :func:`_add_node_anchors`. + """ + req = [p for p in boundary.ports if p.direction == "requires"] + prov = [p for p in boundary.ports if p.direction == "provides"] + _anchor_ports_on_edge(req, layout.x, layout.y, layout.height, out) + _anchor_ports_on_edge(prov, layout.x + layout.width, layout.y, layout.height, out) + + +def _anchor_ports_on_edge( + ports: list[VizPort], + edge_x: float, + top_y: float, + height: float, + out: dict[str, PortAnchor], +) -> None: + """Place *ports* evenly along a vertical edge at x = *edge_x*.""" + n = len(ports) + if n == 0: + return + for i, port in enumerate(ports): + y = top_y + (i + 1) * height / (n + 1) + out[port.id] = PortAnchor(port_id=port.id, x=edge_x, y=y) diff --git a/tests/views/test_placement.py b/tests/views/test_placement.py new file mode 100644 index 0000000..a3eecd8 --- /dev/null +++ b/tests/views/test_placement.py @@ -0,0 +1,748 @@ +# Copyright 2026 ArchML Contributors +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for the Sugiyama-style placement algorithm.""" + +import pytest + +from archml.model.entities import Component, Connection, ConnectionEndpoint, InterfaceRef, System +from archml.views.placement import ( + LayoutConfig, + LayoutPlan, + compute_layout, +) +from archml.views.topology import ( + VizBoundary, + VizDiagram, + VizEdge, + VizNode, + VizPort, + build_viz_diagram, +) + +# ############### +# Helpers +# ############### + + +def _iref(name: str, version: str | None = None) -> InterfaceRef: + return InterfaceRef(name=name, version=version) + + +def _conn(source: str, target: str, interface: str) -> Connection: + return Connection( + source=ConnectionEndpoint(entity=source), + target=ConnectionEndpoint(entity=target), + interface=InterfaceRef(name=interface), + ) + + +def _port(node_id: str, direction: str, name: str) -> VizPort: + dir_tag = "req" if direction == "requires" else "prov" + return VizPort( + id=f"{node_id}.{dir_tag}.{name}", + node_id=node_id, + interface_name=name, + interface_version=None, + direction=direction, # type: ignore[arg-type] + ) + + +def _node(node_id: str, ports: list[VizPort] | None = None) -> VizNode: + return VizNode( + id=node_id, + label=node_id, + title=None, + kind="component", + entity_path=node_id, + ports=ports or [], + ) + + +def _edge(src_port: str, tgt_port: str) -> VizEdge: + return VizEdge( + id=f"edge.{src_port}--{tgt_port}", + source_port_id=src_port, + target_port_id=tgt_port, + label="Iface", + interface_name="Iface", + ) + + +def _simple_diagram( + children: list[VizNode], + peripheral_nodes: list[VizNode] | None = None, + edges: list[VizEdge] | None = None, +) -> VizDiagram: + """Build a minimal VizDiagram from explicit child/peripheral/edge lists.""" + root = VizBoundary( + id="Root", + label="Root", + title=None, + kind="system", + entity_path="Root", + children=children, + ) + return VizDiagram( + id="diagram.Root", + title="Root", + description=None, + root=root, + peripheral_nodes=peripheral_nodes or [], + edges=edges or [], + ) + + +# ############### +# LayoutConfig +# ############### + + +def test_layout_config_defaults() -> None: + """Default config has sensible non-zero values.""" + cfg = LayoutConfig() + assert cfg.node_width > 0 + assert cfg.node_height > 0 + assert cfg.layer_gap > 0 + assert cfg.node_gap > 0 + assert cfg.peripheral_gap > 0 + assert cfg.boundary_padding > 0 + + +def test_compute_layout_uses_custom_config() -> None: + """A custom LayoutConfig is applied to node dimensions.""" + comp = Component(name="Sys", components=[Component(name="A")]) + diagram = build_viz_diagram(comp) + cfg = LayoutConfig(node_width=200.0, node_height=80.0) + plan = compute_layout(diagram, config=cfg) + nl = next(iter(plan.nodes.values())) + assert nl.width == 200.0 + assert nl.height == 80.0 + + +# ############### +# Empty / minimal diagrams +# ############### + + +def test_empty_diagram_returns_plan() -> None: + """A diagram with no children and no peripherals still produces a plan.""" + comp = Component(name="Empty") + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + assert isinstance(plan, LayoutPlan) + assert plan.diagram_id == diagram.id + + +def test_empty_diagram_has_root_boundary() -> None: + """Root boundary is always present in the plan.""" + comp = Component(name="Empty") + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + assert diagram.root.id in plan.boundaries + + +def test_empty_diagram_total_dimensions_positive() -> None: + """Total dimensions are positive even for an empty diagram.""" + comp = Component(name="Empty") + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + assert plan.total_width > 0 + assert plan.total_height > 0 + + +def test_single_child_node_has_layout() -> None: + """A single child node receives a NodeLayout entry.""" + comp = Component(name="Sys", components=[Component(name="A")]) + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + assert len(plan.nodes) == 1 + + +# ############### +# Layer assignment +# ############### + + +def test_two_connected_nodes_in_different_layers() -> None: + """Nodes connected by an edge are placed in different layers (different x).""" + a_req = _port("A", "requires", "Iface") + b_prov = _port("B", "provides", "Iface") + a = _node("A", [a_req]) + b = _node("B", [b_prov]) + edge = _edge(a_req.id, b_prov.id) + diagram = _simple_diagram([a, b], edges=[edge]) + plan = compute_layout(diagram) + assert plan.nodes["A"].x != plan.nodes["B"].x + + +def test_source_node_left_of_sink_node() -> None: + """The requesting node (source) is placed to the left of the provider (target).""" + a_req = _port("A", "requires", "Iface") + b_prov = _port("B", "provides", "Iface") + a = _node("A", [a_req]) + b = _node("B", [b_prov]) + edge = _edge(a_req.id, b_prov.id) + diagram = _simple_diagram([a, b], edges=[edge]) + plan = compute_layout(diagram) + assert plan.nodes["A"].x < plan.nodes["B"].x + + +def test_three_node_chain_left_to_right_order() -> None: + """A → B → C chain: A is leftmost, C is rightmost.""" + a_req = _port("A", "requires", "AB") + b_prov_ab = _port("B", "provides", "AB") + b_req = _port("B", "requires", "BC") + c_prov = _port("C", "provides", "BC") + a = _node("A", [a_req]) + b = _node("B", [b_prov_ab, b_req]) + c = _node("C", [c_prov]) + edges = [_edge(a_req.id, b_prov_ab.id), _edge(b_req.id, c_prov.id)] + diagram = _simple_diagram([a, b, c], edges=edges) + plan = compute_layout(diagram) + assert plan.nodes["A"].x < plan.nodes["B"].x < plan.nodes["C"].x + + +def test_three_node_chain_three_distinct_x_positions() -> None: + """Each node in a 3-layer chain occupies its own distinct x column.""" + a_req = _port("A", "requires", "AB") + b_prov_ab = _port("B", "provides", "AB") + b_req = _port("B", "requires", "BC") + c_prov = _port("C", "provides", "BC") + a = _node("A", [a_req]) + b = _node("B", [b_prov_ab, b_req]) + c = _node("C", [c_prov]) + edges = [_edge(a_req.id, b_prov_ab.id), _edge(b_req.id, c_prov.id)] + diagram = _simple_diagram([a, b, c], edges=edges) + plan = compute_layout(diagram) + xs = {plan.nodes["A"].x, plan.nodes["B"].x, plan.nodes["C"].x} + assert len(xs) == 3 + + +def test_disconnected_nodes_assigned_layer_zero() -> None: + """Nodes with no edges all land in the same (leftmost) column.""" + a = _node("A") + b = _node("B") + diagram = _simple_diagram([a, b]) + plan = compute_layout(diagram) + assert plan.nodes["A"].x == plan.nodes["B"].x + + +def test_diamond_graph_max_layer_correct() -> None: + """A → B, A → C, B → D, C → D: D must be in layer 2 (rightmost).""" + a_req_b = _port("A", "requires", "AB") + a_req_c = _port("A", "requires", "AC") + b_prov = _port("B", "provides", "AB") + b_req = _port("B", "requires", "BD") + c_prov = _port("C", "provides", "AC") + c_req = _port("C", "requires", "CD") + d_prov_b = _port("D", "provides", "BD") + d_prov_c = _port("D", "provides", "CD") + a = _node("A", [a_req_b, a_req_c]) + b = _node("B", [b_prov, b_req]) + c = _node("C", [c_prov, c_req]) + d = _node("D", [d_prov_b, d_prov_c]) + edges = [ + _edge(a_req_b.id, b_prov.id), + _edge(a_req_c.id, c_prov.id), + _edge(b_req.id, d_prov_b.id), + _edge(c_req.id, d_prov_c.id), + ] + diagram = _simple_diagram([a, b, c, d], edges=edges) + plan = compute_layout(diagram) + assert plan.nodes["A"].x < plan.nodes["B"].x + assert plan.nodes["A"].x < plan.nodes["C"].x + assert plan.nodes["B"].x < plan.nodes["D"].x + assert plan.nodes["C"].x < plan.nodes["D"].x + + +# ############### +# Crossing minimisation +# ############### + + +def test_crossing_minimisation_does_not_change_layer_x() -> None: + """Crossing minimisation only changes y-order, not x-columns.""" + a_req = _port("A", "requires", "Iface") + b_prov = _port("B", "provides", "Iface") + a = _node("A", [a_req]) + b = _node("B", [b_prov]) + edge = _edge(a_req.id, b_prov.id) + diagram = _simple_diagram([a, b], edges=[edge]) + cfg = LayoutConfig() + plan = compute_layout(diagram, config=cfg) + # A and B must still be in different columns. + assert plan.nodes["A"].x != plan.nodes["B"].x + + +def test_nodes_within_same_layer_have_different_y() -> None: + """Two disconnected nodes in the same layer (same x) have distinct y values.""" + a = _node("A") + b = _node("B") + diagram = _simple_diagram([a, b]) + plan = compute_layout(diagram) + assert plan.nodes["A"].y != plan.nodes["B"].y + + +# ############### +# Peripheral node classification +# ############### + + +def test_requires_terminal_placed_left_of_boundary() -> None: + """A 'requires' terminal node (LEFT peripheral) is left of the root boundary.""" + comp = Component( + name="Sys", + requires=[_iref("InIface")], + components=[Component(name="A")], + ) + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + boundary_x = plan.boundaries[diagram.root.id].x + terminal = next(n for n in diagram.peripheral_nodes if n.kind == "terminal") + assert plan.nodes[terminal.id].x < boundary_x + + +def test_provides_terminal_placed_right_of_boundary() -> None: + """A 'provides' terminal node (RIGHT peripheral) is right of the root boundary.""" + comp = Component( + name="Sys", + provides=[_iref("OutIface")], + components=[Component(name="A")], + ) + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + bl = plan.boundaries[diagram.root.id] + boundary_right = bl.x + bl.width + terminal = next(n for n in diagram.peripheral_nodes if n.kind == "terminal") + assert plan.nodes[terminal.id].x >= boundary_right + + +def test_external_source_placed_left_of_boundary() -> None: + """An external node that sources edges (has requires ports) goes LEFT.""" + ext_req = _port("Ext", "requires", "Iface") + a_prov = _port("A", "provides", "Iface") + ext = _node("Ext", [ext_req]) + a = _node("A", [a_prov]) + edge = _edge(ext_req.id, a_prov.id) + diagram = _simple_diagram([a], peripheral_nodes=[ext], edges=[edge]) + plan = compute_layout(diagram) + boundary_x = plan.boundaries["Root"].x + assert plan.nodes["Ext"].x < boundary_x + + +def test_external_sink_placed_right_of_boundary() -> None: + """An external node that targets edges (has provides ports) goes RIGHT.""" + a_req = _port("A", "requires", "Iface") + ext_prov = _port("Ext", "provides", "Iface") + a = _node("A", [a_req]) + ext = _node("Ext", [ext_prov]) + edge = _edge(a_req.id, ext_prov.id) + diagram = _simple_diagram([a], peripheral_nodes=[ext], edges=[edge]) + plan = compute_layout(diagram) + bl = plan.boundaries["Root"] + boundary_right = bl.x + bl.width + assert plan.nodes["Ext"].x >= boundary_right + + +def test_unconnected_peripheral_with_requires_port_goes_left() -> None: + """Peripheral with only a requires port and no edges is placed LEFT.""" + req = _port("P", "requires", "X") + p = _node("P", [req]) + a = _node("A") + diagram = _simple_diagram([a], peripheral_nodes=[p]) + plan = compute_layout(diagram) + boundary_x = plan.boundaries["Root"].x + assert plan.nodes["P"].x < boundary_x + + +def test_unconnected_peripheral_with_provides_port_goes_right() -> None: + """Peripheral with only a provides port and no edges is placed RIGHT.""" + prov = _port("P", "provides", "X") + p = _node("P", [prov]) + a = _node("A") + diagram = _simple_diagram([a], peripheral_nodes=[p]) + plan = compute_layout(diagram) + bl = plan.boundaries["Root"] + boundary_right = bl.x + bl.width + assert plan.nodes["P"].x >= boundary_right + + +# ############### +# Port anchors +# ############### + + +def test_requires_port_anchored_to_left_edge() -> None: + """A requires port anchor has x equal to the node's left edge.""" + req = _port("A", "requires", "Iface") + a = _node("A", [req]) + diagram = _simple_diagram([a]) + plan = compute_layout(diagram) + nl = plan.nodes["A"] + anchor = plan.port_anchors[req.id] + assert anchor.x == pytest.approx(nl.x) + + +def test_provides_port_anchored_to_right_edge() -> None: + """A provides port anchor has x equal to the node's right edge.""" + prov = _port("A", "provides", "Iface") + a = _node("A", [prov]) + diagram = _simple_diagram([a]) + plan = compute_layout(diagram) + nl = plan.nodes["A"] + anchor = plan.port_anchors[prov.id] + assert anchor.x == pytest.approx(nl.x + nl.width) + + +def test_single_port_vertically_centred_on_node() -> None: + """A single port on a side is anchored at y = node.y + height/2.""" + req = _port("A", "requires", "Iface") + a = _node("A", [req]) + diagram = _simple_diagram([a]) + plan = compute_layout(diagram) + nl = plan.nodes["A"] + anchor = plan.port_anchors[req.id] + assert anchor.y == pytest.approx(nl.y + nl.height / 2) + + +def test_two_requires_ports_have_different_y() -> None: + """Two requires ports on the same node have distinct y anchors.""" + p1 = _port("A", "requires", "Iface1") + p2 = _port("A", "requires", "Iface2") + a = _node("A", [p1, p2]) + diagram = _simple_diagram([a]) + plan = compute_layout(diagram) + assert plan.port_anchors[p1.id].y != plan.port_anchors[p2.id].y + + +def test_multiple_ports_evenly_spaced_within_node_height() -> None: + """Three requires ports are anchored within the node's vertical extent.""" + ports = [_port("A", "requires", f"I{i}") for i in range(3)] + a = _node("A", ports) + diagram = _simple_diagram([a]) + plan = compute_layout(diagram) + nl = plan.nodes["A"] + for p in ports: + anc = plan.port_anchors[p.id] + assert nl.y < anc.y < nl.y + nl.height + + +def test_provides_and_requires_ports_on_opposite_sides() -> None: + """Requires and provides ports on the same node are anchored to opposite x edges.""" + req = _port("A", "requires", "In") + prov = _port("A", "provides", "Out") + a = _node("A", [req, prov]) + diagram = _simple_diagram([a]) + plan = compute_layout(diagram) + nl = plan.nodes["A"] + assert plan.port_anchors[req.id].x == pytest.approx(nl.x) + assert plan.port_anchors[prov.id].x == pytest.approx(nl.x + nl.width) + + +def test_root_boundary_requires_port_on_left_edge() -> None: + """Root boundary requires port is anchored to the left edge of the boundary.""" + comp = Component(name="Sys", requires=[_iref("InIface")]) + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + bl = plan.boundaries[diagram.root.id] + req_port = next(p for p in diagram.root.ports if p.direction == "requires") + anchor = plan.port_anchors[req_port.id] + assert anchor.x == pytest.approx(bl.x) + + +def test_root_boundary_provides_port_on_right_edge() -> None: + """Root boundary provides port is anchored to the right edge of the boundary.""" + comp = Component(name="Sys", provides=[_iref("OutIface")]) + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + bl = plan.boundaries[diagram.root.id] + prov_port = next(p for p in diagram.root.ports if p.direction == "provides") + anchor = plan.port_anchors[prov_port.id] + assert anchor.x == pytest.approx(bl.x + bl.width) + + +# ############### +# Edge routes +# ############### + + +def test_edge_route_has_two_waypoints() -> None: + """A straight-line edge route has exactly two waypoints.""" + a_req = _port("A", "requires", "Iface") + b_prov = _port("B", "provides", "Iface") + a = _node("A", [a_req]) + b = _node("B", [b_prov]) + edge = _edge(a_req.id, b_prov.id) + diagram = _simple_diagram([a, b], edges=[edge]) + plan = compute_layout(diagram) + assert edge.id in plan.edge_routes + assert len(plan.edge_routes[edge.id].waypoints) == 2 + + +def test_edge_route_starts_at_source_port_anchor() -> None: + """First waypoint of an edge route matches the source port anchor.""" + a_req = _port("A", "requires", "Iface") + b_prov = _port("B", "provides", "Iface") + a = _node("A", [a_req]) + b = _node("B", [b_prov]) + edge = _edge(a_req.id, b_prov.id) + diagram = _simple_diagram([a, b], edges=[edge]) + plan = compute_layout(diagram) + src_anc = plan.port_anchors[a_req.id] + first_wp = plan.edge_routes[edge.id].waypoints[0] + assert first_wp == pytest.approx((src_anc.x, src_anc.y)) + + +def test_edge_route_ends_at_target_port_anchor() -> None: + """Last waypoint of an edge route matches the target port anchor.""" + a_req = _port("A", "requires", "Iface") + b_prov = _port("B", "provides", "Iface") + a = _node("A", [a_req]) + b = _node("B", [b_prov]) + edge = _edge(a_req.id, b_prov.id) + diagram = _simple_diagram([a, b], edges=[edge]) + plan = compute_layout(diagram) + tgt_anc = plan.port_anchors[b_prov.id] + last_wp = plan.edge_routes[edge.id].waypoints[-1] + assert last_wp == pytest.approx((tgt_anc.x, tgt_anc.y)) + + +def test_edge_with_unresolvable_port_not_in_routes() -> None: + """Edges whose port anchors cannot be resolved are omitted from edge_routes.""" + # Build a diagram then inject a fake edge with unknown port IDs. + comp = Component(name="Sys") + diagram = build_viz_diagram(comp) + # Manually inject an unresolvable edge. + fake_edge = VizEdge( + id="edge.ghost.req--ghost.prov", + source_port_id="ghost.req.Unknown", + target_port_id="ghost.prov.Unknown", + label="Unknown", + interface_name="Unknown", + ) + diagram.edges.append(fake_edge) + plan = compute_layout(diagram) + assert fake_edge.id not in plan.edge_routes + + +# ############### +# Boundary sizing +# ############### + + +def test_boundary_contains_all_child_nodes() -> None: + """Every child node lies within the root boundary rectangle.""" + comp = Component( + name="Sys", + components=[Component(name="A"), Component(name="B"), Component(name="C")], + ) + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + bl = plan.boundaries[diagram.root.id] + for nl in plan.nodes.values(): + if nl.node_id in {c.id for c in diagram.root.children}: + assert nl.x >= bl.x + assert nl.y >= bl.y + assert nl.x + nl.width <= bl.x + bl.width + assert nl.y + nl.height <= bl.y + bl.height + + +def test_boundary_padding_applied() -> None: + """The boundary is wider than its single child by at least 2× boundary_padding.""" + comp = Component(name="Sys", components=[Component(name="A")]) + diagram = build_viz_diagram(comp) + cfg = LayoutConfig(boundary_padding=50.0) + plan = compute_layout(diagram, config=cfg) + bl = plan.boundaries[diagram.root.id] + assert bl.width >= cfg.node_width + 2 * cfg.boundary_padding + assert bl.height >= cfg.node_height + 2 * cfg.boundary_padding + + +def test_layer_gap_increases_boundary_width() -> None: + """A larger layer_gap produces a wider root boundary for multi-layer diagrams.""" + a_req = _port("A", "requires", "Iface") + b_prov = _port("B", "provides", "Iface") + a = _node("A", [a_req]) + b = _node("B", [b_prov]) + edge = _edge(a_req.id, b_prov.id) + diagram = _simple_diagram([a, b], edges=[edge]) + + plan_small = compute_layout(diagram, config=LayoutConfig(layer_gap=20.0)) + plan_large = compute_layout(diagram, config=LayoutConfig(layer_gap=200.0)) + assert plan_large.boundaries["Root"].width > plan_small.boundaries["Root"].width + + +# ############### +# Total diagram dimensions +# ############### + + +def test_total_width_includes_left_peripheral_zone() -> None: + """Adding a left peripheral increases total_width.""" + a = _node("A") + diagram_no_peri = _simple_diagram([a]) + plan_no = compute_layout(diagram_no_peri) + + req = _port("P", "requires", "X") + p = _node("P", [req]) + diagram_with = _simple_diagram([a], peripheral_nodes=[p]) + plan_with = compute_layout(diagram_with) + assert plan_with.total_width > plan_no.total_width + + +def test_total_width_includes_right_peripheral_zone() -> None: + """Adding a right peripheral increases total_width.""" + a = _node("A") + diagram_no_peri = _simple_diagram([a]) + plan_no = compute_layout(diagram_no_peri) + + prov = _port("P", "provides", "X") + p = _node("P", [prov]) + diagram_with = _simple_diagram([a], peripheral_nodes=[p]) + plan_with = compute_layout(diagram_with) + assert plan_with.total_width > plan_no.total_width + + +def test_peripheral_nodes_outside_boundary_in_total_width() -> None: + """The total width is at least as wide as boundary + both peripheral zones.""" + comp = Component( + name="Sys", + requires=[_iref("In")], + provides=[_iref("Out")], + components=[Component(name="A")], + ) + diagram = build_viz_diagram(comp) + cfg = LayoutConfig() + plan = compute_layout(diagram, config=cfg) + bl = plan.boundaries[diagram.root.id] + # Both terminal zones must be accounted for. + assert plan.total_width >= bl.x + bl.width + cfg.peripheral_gap + cfg.peripheral_node_width + + +# ############### +# Integration: build_viz_diagram → compute_layout +# ############### + + +def test_ecommerce_system_produces_complete_plan() -> None: + """Full integration: ecommerce system with multiple components and connections.""" + sys = System( + name="ECommerce", + components=[ + Component( + name="OrderService", + requires=[_iref("PaymentRequest")], + provides=[_iref("OrderRequest")], + ), + Component( + name="PaymentService", + provides=[_iref("PaymentRequest")], + ), + Component( + name="NotificationService", + requires=[_iref("OrderRequest")], + ), + ], + connections=[ + _conn("OrderService", "PaymentService", "PaymentRequest"), + _conn("NotificationService", "OrderService", "OrderRequest"), + ], + ) + diagram = build_viz_diagram(sys) + plan = compute_layout(diagram) + + # All three children have layouts. + child_ids = {c.id for c in diagram.root.children} + assert child_ids <= plan.nodes.keys() + + # Root boundary is present. + assert diagram.root.id in plan.boundaries + + # All edges are routed. + assert len(plan.edge_routes) == len(diagram.edges) + + +def test_ecommerce_order_service_left_of_payment_service() -> None: + """OrderService (requirer) is to the left of PaymentService (provider).""" + sys = System( + name="ECommerce", + components=[ + Component( + name="OrderService", + requires=[_iref("PaymentRequest")], + ), + Component( + name="PaymentService", + provides=[_iref("PaymentRequest")], + ), + ], + connections=[_conn("OrderService", "PaymentService", "PaymentRequest")], + ) + diagram = build_viz_diagram(sys) + plan = compute_layout(diagram) + order_id = next(c.id for c in diagram.root.children if c.label == "OrderService") + payment_id = next(c.id for c in diagram.root.children if c.label == "PaymentService") + assert plan.nodes[order_id].x < plan.nodes[payment_id].x + + +def test_external_actor_resolved_and_positioned() -> None: + """An external actor resolved via external_entities receives a layout entry.""" + stripe = Component(name="Stripe", is_external=True, provides=[_iref("PaymentGateway")]) + sys = System( + name="ECommerce", + components=[ + Component(name="OrderService", requires=[_iref("PaymentGateway")]), + ], + connections=[_conn("OrderService", "Stripe", "PaymentGateway")], + ) + diagram = build_viz_diagram(sys, external_entities={"Stripe": stripe}) + plan = compute_layout(diagram) + stripe_node = next(n for n in diagram.peripheral_nodes if n.label == "Stripe") + assert stripe_node.id in plan.nodes + + +def test_external_actor_right_of_boundary_when_it_provides() -> None: + """An external provider (target of edges) is placed right of the boundary.""" + stripe = Component(name="Stripe", is_external=True, provides=[_iref("PaymentGateway")]) + sys = System( + name="ECommerce", + components=[ + Component(name="OrderService", requires=[_iref("PaymentGateway")]), + ], + connections=[_conn("OrderService", "Stripe", "PaymentGateway")], + ) + diagram = build_viz_diagram(sys, external_entities={"Stripe": stripe}) + plan = compute_layout(diagram) + bl = plan.boundaries[diagram.root.id] + stripe_node = next(n for n in diagram.peripheral_nodes if n.label == "Stripe") + assert plan.nodes[stripe_node.id].x >= bl.x + bl.width + + +def test_all_ports_in_diagram_have_anchors() -> None: + """Every port in a topology diagram has a corresponding PortAnchor.""" + sys = System( + name="ECommerce", + requires=[_iref("ClientRequest")], + provides=[_iref("ClientResponse")], + components=[ + Component(name="A", requires=[_iref("BService")]), + Component(name="B", provides=[_iref("BService")]), + ], + connections=[_conn("A", "B", "BService")], + ) + diagram = build_viz_diagram(sys) + plan = compute_layout(diagram) + from archml.views.topology import collect_all_ports + + all_ports = collect_all_ports(diagram) + for port_id in all_ports: + assert port_id in plan.port_anchors, f"Missing anchor for port {port_id}" + + +def test_plan_diagram_id_matches_viz_diagram_id() -> None: + """plan.diagram_id matches the VizDiagram's id.""" + comp = Component(name="Sys") + diagram = build_viz_diagram(comp) + plan = compute_layout(diagram) + assert plan.diagram_id == diagram.id