diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f9ba2d1..2111d8e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,10 @@ Changelog ========= +latest +---------------- +* Add --hide-acyclic flag. + 2.2 (2025-12-12) ---------------- diff --git a/README.rst b/README.rst index d2ec6b5..0e9a8b3 100644 --- a/README.rst +++ b/README.rst @@ -64,6 +64,7 @@ There is currently only one command. --show-cycle-breakers Identify a set of dependencies that, if removed, would make the graph acyclic, and display them as dashed lines. + --hide-acyclic Hide submodules that are not part of a cycle. --help Show this message and exit. Draw a graph of the dependencies within any installed Python package or subpackage. @@ -119,4 +120,18 @@ within ``django.db.utils``. Here you can see that two of the dependencies are shown as a dashed line. If these dependencies were to be removed, the graph would be acyclic. To decide on the cycle breakers, Impulse uses the -`nominate_cycle_breakers method provided by Grimp `_. \ No newline at end of file +`nominate_cycle_breakers method provided by Grimp `_. + +**Example with hide acyclic** + +.. code-block:: text + + impulse drawgraph django.db --hide-acyclic + +.. image:: https://raw.githubusercontent.com/seddonym/impulse/master/docs/_static/images/django.db.hide-acyclic.png + :align: center + :alt: Graph of django.db package with acyclic submodules hidden. + +This hides all submodules that do not form part of any cycle, making it easier to focus on circular +dependencies in larger packages. Combine this with ``--show-cycle-breakers`` for a condensed view +of cycles and how they can be broken. diff --git a/docs/_static/images/django.db.hide-acyclic.png b/docs/_static/images/django.db.hide-acyclic.png new file mode 100644 index 0000000..046bc4c Binary files /dev/null and b/docs/_static/images/django.db.hide-acyclic.png differ diff --git a/justfile b/justfile index 1764d38..c0f561a 100644 --- a/justfile +++ b/justfile @@ -14,6 +14,7 @@ test: @uv run --with=google-cloud-audit-log impulse drawgraph google.cloud.audit @uv run impulse drawgraph grimp --show-import-totals @uv run --with=django impulse drawgraph django.db --show-cycle-breakers + @uv run --with=django impulse drawgraph django.db --hide-acyclic # Run tests under all supported Python versions. diff --git a/src/impulse/application/use_cases.py b/src/impulse/application/use_cases.py index bf79454..ae8e85f 100644 --- a/src/impulse/application/use_cases.py +++ b/src/impulse/application/use_cases.py @@ -1,14 +1,15 @@ from collections.abc import Set from collections.abc import Callable -import itertools + import grimp -from impulse import ports, dotfile +from impulse import ports, dotfile, graph def draw_graph( module_name: str, show_import_totals: bool, show_cycle_breakers: bool, + hide_acyclic: bool, sys_path: list[str], current_directory: str, get_top_level_package: Callable[[str], str], @@ -21,6 +22,7 @@ def draw_graph( module_name: the package or subpackage name of any importable Python package. show_import_totals: whether to label the arrows with the total number of imports they represent. show_cycle_breakers: marks a set of dependencies that, if removed, would make the graph acyclic. + hide_acyclic: whether to hide submodules that are not part of a cycle. sys_path: the sys.path list (or a test double). current_directory: the current working directory. get_top_level_package: the function to retrieve the top level package name. This will usually be the first part @@ -35,23 +37,46 @@ def draw_graph( top_level_package = get_top_level_package(module_name) grimp_graph = build_graph(top_level_package) - dot = _build_dot(grimp_graph, module_name, show_import_totals, show_cycle_breakers) + dot = _build_dot( + grimp_graph, + module_name, + show_import_totals, + show_cycle_breakers, + hide_acyclic, + ) viewer.view(dot) class _DotGraphBuildStrategy: - def build(self, module_name: str, grimp_graph: grimp.ImportGraph) -> dotfile.DotGraph: + def build( + self, + module_name: str, + grimp_graph: grimp.ImportGraph, + hide_acyclic: bool, + ) -> dotfile.DotGraph: children = grimp_graph.find_children(module_name) self.prepare_graph(grimp_graph, children) + presentation_graph = graph.DirectedGraphWithoutLoops.from_adjacency_condition( + vertices=children, + is_adjacent=lambda upstream, downstream: self.has_edge( + grimp_graph, upstream, downstream + ), + ) + + if hide_acyclic: + presentation_graph = presentation_graph.remove_acyclic_vertices() + dot = dotfile.DotGraph(title=module_name, concentrate=self.should_concentrate()) - for child in children: - dot.add_node(child) - for upstream, downstream in itertools.permutations(children, r=2): - if edge := self.build_edge(grimp_graph, upstream, downstream): - dot.add_edge(edge) + + for vertex in presentation_graph.vertices: + dot.add_node(vertex) + + for upstream, downstream in presentation_graph.iter_edges(): + dot_edge = self.build_edge(grimp_graph, upstream, downstream) + dot.add_edge(dot_edge) return dot @@ -61,9 +86,12 @@ def should_concentrate(self) -> bool: def prepare_graph(self, grimp_graph: grimp.ImportGraph, children: Set[str]) -> None: pass + def has_edge(self, grimp_graph: grimp.ImportGraph, upstream: str, downstream: str) -> bool: + raise NotImplementedError + def build_edge( self, grimp_graph: grimp.ImportGraph, upstream: str, downstream: str - ) -> dotfile.Edge | None: + ) -> dotfile.Edge: raise NotImplementedError @@ -74,12 +102,13 @@ def prepare_graph(self, grimp_graph: grimp.ImportGraph, children: Set[str]) -> N for child in children: grimp_graph.squash_module(child) + def has_edge(self, grimp_graph: grimp.ImportGraph, upstream: str, downstream: str) -> bool: + return grimp_graph.direct_import_exists(importer=downstream, imported=upstream) + def build_edge( self, grimp_graph: grimp.ImportGraph, upstream: str, downstream: str - ) -> dotfile.Edge | None: - if grimp_graph.direct_import_exists(importer=downstream, imported=upstream): - return dotfile.Edge(source=downstream, destination=upstream) - return None + ) -> dotfile.Edge: + return dotfile.Edge(source=downstream, destination=upstream) class _ImportExpressionBuildStrategy(_DotGraphBuildStrategy): @@ -128,31 +157,32 @@ def _get_self_or_ancestor(candidate: str, ancestors: Set[str]) -> str | None: return ancestor return None + def has_edge(self, grimp_graph: grimp.ImportGraph, upstream: str, downstream: str) -> bool: + return grimp_graph.direct_import_exists( + importer=downstream, imported=upstream, as_packages=True + ) + def build_edge( self, grimp_graph: grimp.ImportGraph, upstream: str, downstream: str - ) -> dotfile.Edge | None: - if grimp_graph.direct_import_exists( - importer=downstream, imported=upstream, as_packages=True - ): - if self.show_import_totals: - number_of_imports = self._count_imports_between_packages( - grimp_graph, importer=downstream, imported=upstream - ) - label = str(number_of_imports) - else: - label = "" - - if self.show_cycle_breakers: - assert self.cycle_breakers is not None - is_cycle_breaker = (downstream, upstream) in self.cycle_breakers - emphasized = is_cycle_breaker - else: - emphasized = False - - return dotfile.Edge( - source=downstream, destination=upstream, label=label, emphasized=emphasized + ) -> dotfile.Edge: + if self.show_import_totals: + number_of_imports = self._count_imports_between_packages( + grimp_graph, importer=downstream, imported=upstream ) - return None + label = str(number_of_imports) + else: + label = "" + + if self.show_cycle_breakers: + assert self.cycle_breakers is not None + is_cycle_breaker = (downstream, upstream) in self.cycle_breakers + emphasized = is_cycle_breaker + else: + emphasized = False + + return dotfile.Edge( + source=downstream, destination=upstream, label=label, emphasized=emphasized + ) @staticmethod def _count_imports_between_packages( @@ -183,6 +213,7 @@ def _build_dot( module_name: str, show_import_totals: bool, show_cycle_breakers: bool, + hide_acyclic: bool, ) -> dotfile.DotGraph: strategy: _DotGraphBuildStrategy if show_import_totals or show_cycle_breakers: @@ -194,4 +225,4 @@ def _build_dot( else: strategy = _ModuleSquashingBuildStrategy() - return strategy.build(module_name, grimp_graph) + return strategy.build(module_name, grimp_graph, hide_acyclic) diff --git a/src/impulse/cli.py b/src/impulse/cli.py index fee13f9..186abdc 100644 --- a/src/impulse/cli.py +++ b/src/impulse/cli.py @@ -27,12 +27,23 @@ def main(): "and display them as dashed lines." ), ) +@click.option( + "--hide-acyclic", + is_flag=True, + help="Hide submodules that are not part of a cycle.", +) @click.argument("module_name", type=str) -def drawgraph(module_name: str, show_import_totals: bool, show_cycle_breakers: bool) -> None: +def drawgraph( + module_name: str, + show_import_totals: bool, + show_cycle_breakers: bool, + hide_acyclic: bool, +) -> None: use_cases.draw_graph( module_name=module_name, show_import_totals=show_import_totals, show_cycle_breakers=show_cycle_breakers, + hide_acyclic=hide_acyclic, sys_path=sys.path, current_directory=os.getcwd(), get_top_level_package=adapters.get_top_level_package, diff --git a/src/impulse/graph.py b/src/impulse/graph.py new file mode 100644 index 0000000..7e5c64b --- /dev/null +++ b/src/impulse/graph.py @@ -0,0 +1,104 @@ +import dataclasses +import itertools +from typing import Callable, Generic, Iterator, Mapping, TypeVar + +V = TypeVar("V") # type for graph vertices + + +@dataclasses.dataclass(frozen=True) +class DirectedGraphWithoutLoops(Generic[V]): + vertices: frozenset[V] + _adjacency_map: Mapping[V, set[V]] + + @classmethod + def from_adjacency_condition(cls, vertices: set[V], is_adjacent: Callable[[V, V], bool]): + adjacency_map = { + from_vertex: { + to_vertex + for to_vertex in vertices + if from_vertex != to_vertex and is_adjacent(from_vertex, to_vertex) + } + for from_vertex in vertices + } + return cls(vertices=frozenset(vertices), _adjacency_map=adjacency_map) + + def iter_edges(self) -> Iterator[tuple[V, V]]: + for from_vertex in self._adjacency_map: + for to_vertex in self._adjacency_map[from_vertex]: + yield from_vertex, to_vertex + + def remove_vertices(self, vertices_to_remove: set[V]) -> "DirectedGraphWithoutLoops[V]": + new_vertices = frozenset(self.vertices - vertices_to_remove) + return self.__class__( + vertices=new_vertices, + _adjacency_map={ + from_vertex: self._adjacency_map[from_vertex] - vertices_to_remove + for from_vertex in new_vertices + }, + ) + + def find_acyclic_vertices(self) -> set[V]: + """ + Find all vertices that are not part of any cycle. + + This function uses Tarjan's strongly connected components algorithm. The + algorithm performs a single depth first search of the graph and groups + vertices into strongly connected components (SCCs), where each vertex in + an SCC is reachable from every other vertex in the same SCC. + + Under the assumption that the graph contains no self loops, a vertex is + not part of a cycle if and only if the SCC it is part of contains no + other vertices. + + Returns: + A set of vertices that are not part of any cycle. + """ + # Vertices are assigned indices in the order they are encountered + index_generator: Iterator[int] = itertools.count() + index_map: dict[V, int] = {} + lowest_reachable_index: dict[V, int] = {} + + active_stack: list[V] = [] + # Mirror of active_stack for O(1) membership checks + active_stack_set: set[V] = set() + + acyclic_vertices: set[V] = set() + + def visit(v: V) -> None: + index = next(index_generator) + + index_map[v] = index + lowest_reachable_index[v] = index + + active_stack.append(v) + active_stack_set.add(v) + + for w in self._adjacency_map.get(v, {}): + if w not in index_map: + visit(w) + lowest_reachable_index[v] = min( + lowest_reachable_index[v], lowest_reachable_index[w] + ) + elif w in active_stack_set: + lowest_reachable_index[v] = min(lowest_reachable_index[v], index_map[w]) + + if lowest_reachable_index[v] == index_map[v]: + scc: list[V] = [] + while True: + w = active_stack.pop() + active_stack_set.remove(w) + scc.append(w) + if w == v: + break + + if len(scc) == 1: + acyclic_vertices.update(scc) + + for vertex in self.vertices: + if vertex not in index_map: + visit(vertex) + + return acyclic_vertices + + def remove_acyclic_vertices(self) -> "DirectedGraphWithoutLoops[V]": + return self.remove_vertices(self.find_acyclic_vertices()) diff --git a/tests/unit/application/test_use_cases.py b/tests/unit/application/test_use_cases.py index 95badd0..550810f 100644 --- a/tests/unit/application/test_use_cases.py +++ b/tests/unit/application/test_use_cases.py @@ -75,6 +75,7 @@ def test_draw_graph(self): SOME_MODULE, show_import_totals=False, show_cycle_breakers=False, + hide_acyclic=False, sys_path=sys_path, current_directory=current_directory, get_top_level_package=fake_get_top_level_package_non_namespace, @@ -120,6 +121,7 @@ def asserting_build_graph(top_level_package: str) -> grimp.ImportGraph: "some.namespace.foo.blue", show_import_totals=False, show_cycle_breakers=False, + hide_acyclic=False, sys_path=[], current_directory="/cwd", get_top_level_package=get_top_level_package, @@ -134,6 +136,7 @@ def test_draw_graph_show_import_totals(self): SOME_MODULE, show_import_totals=True, show_cycle_breakers=False, + hide_acyclic=False, sys_path=[], current_directory="/cwd", get_top_level_package=fake_get_top_level_package_non_namespace, @@ -156,6 +159,7 @@ def test_draw_graph_show_cycle_breakers(self): SOME_MODULE, show_import_totals=False, show_cycle_breakers=True, + hide_acyclic=False, sys_path=[], current_directory="/cwd", get_top_level_package=fake_get_top_level_package_non_namespace, @@ -179,3 +183,28 @@ def test_draw_graph_show_cycle_breakers(self): ), Edge("mypackage.foo.red", "mypackage.foo.blue", emphasized=True), } + + def test_draw_graph_hide_acyclic(self): + viewer = SpyGraphViewer() + + use_cases.draw_graph( + SOME_MODULE, + show_import_totals=False, + show_cycle_breakers=False, + hide_acyclic=True, + sys_path=[], + current_directory="/cwd", + get_top_level_package=fake_get_top_level_package_non_namespace, + build_graph=build_fake_graph, + viewer=viewer, + ) + + # The only cycle in the test graph is formed by blue and red. + assert viewer.called_with_dot.nodes == { + "mypackage.foo.blue", + "mypackage.foo.red", + } + assert viewer.called_with_dot.edges == { + Edge("mypackage.foo.blue", "mypackage.foo.red"), + Edge("mypackage.foo.red", "mypackage.foo.blue"), + } diff --git a/tests/unit/test_graph.py b/tests/unit/test_graph.py new file mode 100644 index 0000000..32c3396 --- /dev/null +++ b/tests/unit/test_graph.py @@ -0,0 +1,180 @@ +import pytest +from impulse import graph + + +class TestDirectedGraphWithoutLoops: + def test_from_adjacency(self): + vertices = {"A", "B", "C"} + + def is_adjacent(from_vertex, to_vertex): + return from_vertex < to_vertex + + test_graph = graph.DirectedGraphWithoutLoops.from_adjacency_condition( + vertices, is_adjacent + ) + + # Self-loops should be excluded + assert test_graph._adjacency_map == {"A": {"B", "C"}, "B": {"C"}, "C": set()} + + def test_from_adjacency_for_complete_graph(self): + vertices = {"A", "B", "C"} + + def is_adjacent(from_vertex, to_vertex): + return True + + test_graph = graph.DirectedGraphWithoutLoops.from_adjacency_condition( + vertices, is_adjacent + ) + + assert test_graph._adjacency_map == { + "A": {"B", "C"}, + "B": {"A", "C"}, + "C": {"A", "B"}, + } + + def test_from_adjacency_for_empty_graph(self): + vertices = {"A", "B", "C"} + + def is_adjacent(from_vertex, to_vertex): + return False + + test_graph = graph.DirectedGraphWithoutLoops.from_adjacency_condition( + vertices, is_adjacent + ) + + assert test_graph._adjacency_map == {"A": set(), "B": set(), "C": set()} + + def test_from_adjacency_empty_vertices(self): + vertices = set() + + def is_adjacent(from_vertex, to_vertex): + return True + + test_graph = graph.DirectedGraphWithoutLoops.from_adjacency_condition( + vertices, is_adjacent + ) + + assert test_graph.vertices == set() + assert test_graph._adjacency_map == {} + + def test_iter_edges(self): + vertices = {"A", "B", "C"} + edges = { + "A": {"B", "C"}, + "B": {"C"}, + "C": set(), + } + g = graph.DirectedGraphWithoutLoops(vertices, edges) + + result = set(g.iter_edges()) + expected = {("A", "B"), ("A", "C"), ("B", "C")} + assert result == expected + + @pytest.mark.parametrize( + ("vertices_to_remove", "expected_vertices", "expected_adjacency_map"), + [ + (set(), {"A", "B", "C"}, {"A": {"B"}, "B": {"C"}, "C": set()}), + ({"C"}, {"A", "B"}, {"A": {"B"}, "B": set()}), + ({"B"}, {"A", "C"}, {"A": set(), "C": set()}), + ({"A", "B", "C"}, set(), {}), + ], + ) + def test_remove_vertices( + self, + vertices_to_remove: set[str], + expected_vertices: set[str], + expected_adjacency_map: dict[str, set[str]], + ): + vertices = {"A", "B", "C"} + adjacency_map = {"A": {"B"}, "B": {"C"}, "C": set()} + test_graph = graph.DirectedGraphWithoutLoops(frozenset(vertices), adjacency_map) + + result_graph = test_graph.remove_vertices(vertices_to_remove) + + assert result_graph.vertices == expected_vertices + assert result_graph._adjacency_map == expected_adjacency_map + + def test_find_acyclic_vertices_empty_graph(self): + g = graph.DirectedGraphWithoutLoops(set(), {}) + assert g.find_acyclic_vertices() == set() + + def test_find_acyclic_vertices_simple_acyclic(self): + vertices = {"A", "B", "C"} + adjacency_map = {"A": {"B", "C"}, "B": {"C"}, "C": set()} + g = graph.DirectedGraphWithoutLoops(vertices, adjacency_map) + assert g.find_acyclic_vertices() == {"A", "B", "C"} + + def test_find_acyclic_vertices_simple_cycle(self): + vertices = {"A", "B", "C"} + adjacency_map = {"A": {"B"}, "B": {"C"}, "C": {"A"}} + g = graph.DirectedGraphWithoutLoops(vertices, adjacency_map) + assert g.find_acyclic_vertices() == set() + + def test_find_acyclic_vertices_with_branches(self): + vertices = {"A", "B", "C", "D", "E", "F"} + adjacency_map = { + "A": {"B", "C"}, # A branches to B and C + "B": {"D"}, # B -> D -> E -> B (cycle) + "C": {"F"}, # C -> F (acyclic branch) + "D": {"E"}, + "E": {"B"}, + "F": set(), + } + g = graph.DirectedGraphWithoutLoops(vertices, adjacency_map) + assert g.find_acyclic_vertices() == {"A", "C", "F"} + + def test_find_acyclic_vertices_isolated_vertices(self): + """Isolated vertices (no edges) are acyclic.""" + vertices = {"A", "B", "C"} + adjacency_map = {"A": set(), "B": set(), "C": set()} + g = graph.DirectedGraphWithoutLoops(vertices, adjacency_map) + assert g.find_acyclic_vertices() == {"A", "B", "C"} + + def test_find_acyclic_vertices_multiple_cycles(self): + """Multiple disconnected cycles.""" + vertices = {"A", "B", "C", "D", "E", "F"} + adjacency_map = { + "A": {"B"}, # A -> B -> A (cycle 1) + "B": {"A"}, + "C": set(), # C is isolated (acyclic) + "D": {"E"}, # D -> E -> F -> D (cycle 2) + "E": {"F"}, + "F": {"D"}, + } + g = graph.DirectedGraphWithoutLoops(vertices, adjacency_map) + assert g.find_acyclic_vertices() == {"C"} + + def test_find_acyclic_vertices_mixed_graph(self): + """Graph with cycles, acyclic chains, and isolated vertices.""" + vertices = {"A", "B", "C", "D", "E", "F", "G"} + adjacency_map = { + "A": {"B"}, # A -> B -> C (acyclic chain) + "B": {"C"}, + "C": set(), + "D": {"E"}, # D -> E -> D (cycle) + "E": {"D"}, + "F": set(), # F isolated + "G": {"A"}, # G -> A (connects to acyclic chain) + } + g = graph.DirectedGraphWithoutLoops(vertices, adjacency_map) + assert g.find_acyclic_vertices() == {"A", "B", "C", "F", "G"} + + def test_remove_acyclic_vertices_returns_only_cycles(self): + """remove_acyclic_vertices should return a graph with only cyclic vertices.""" + vertices = {"A", "B", "C", "D", "E"} + adjacency_map = { + "A": {"B"}, # A -> B (acyclic) + "B": set(), + "C": {"D"}, # C -> D -> E -> C (cycle) + "D": {"E"}, + "E": {"C"}, + } + g = graph.DirectedGraphWithoutLoops(vertices, adjacency_map) + result = g.remove_acyclic_vertices() + + assert result.vertices == {"C", "D", "E"} + assert result._adjacency_map == { + "C": {"D"}, + "D": {"E"}, + "E": {"C"}, + }