From 9cecb3861652cacf6509e357ddd70799a5283d88 Mon Sep 17 00:00:00 2001 From: timepointai Date: Tue, 17 Mar 2026 22:26:06 -0600 Subject: [PATCH] =?UTF-8?q?feat:=20autonomous=20graph=20expansion=20?= =?UTF-8?q?=E2=80=94=20adaptive=20frontier=20+=20least-connected=20fallbac?= =?UTF-8?q?k?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expander now self-sustains instead of permanently stalling when all nodes exceed degree threshold. Three-tier frontier selection: base threshold (3), widened thresholds (6, 12), then least-connected node fallback. New nodes inherit parent_layer + 1 instead of hardcoded layer=1. --- app/core/graph.py | 19 +++++++++++++++ app/workers/expander.py | 54 ++++++++++++++++++++++++++++++----------- 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/app/core/graph.py b/app/core/graph.py index f232c7c..a05dfe3 100644 --- a/app/core/graph.py +++ b/app/core/graph.py @@ -505,6 +505,25 @@ async def get_frontier_nodes(self, threshold: int = 3, limit: int = 0) -> list[s rows = await conn.fetch(query, *params) return [row["id"] for row in rows] + async def get_least_connected_nodes(self, limit: int = 1) -> list[dict]: + """Return nodes sorted by degree ascending — always finds candidates.""" + query = """ + SELECT n.id, coalesce(ec.cnt, 0) AS deg + FROM nodes n + LEFT JOIN ( + SELECT id, count(*) AS cnt FROM ( + SELECT source AS id FROM edges + UNION ALL + SELECT target AS id FROM edges + ) sub GROUP BY id + ) ec ON ec.id = n.id + ORDER BY coalesce(ec.cnt, 0) ASC, random() + LIMIT $1 + """ + async with self.pool.acquire() as conn: + rows = await conn.fetch(query, limit) + return [{"id": row["id"], "degree": row["deg"]} for row in rows] + async def degree(self, node_id: str) -> int: async with self.pool.acquire() as conn: count = await conn.fetchval( diff --git a/app/workers/expander.py b/app/workers/expander.py index a4b7fc1..d390b75 100644 --- a/app/workers/expander.py +++ b/app/workers/expander.py @@ -137,13 +137,36 @@ async def _expand_cycle(self): ) return - # Get frontier nodes for concurrent expansion - frontier = await self.gm.get_frontier_nodes( - threshold=3, limit=self.concurrency - ) + # Adaptive frontier: try base threshold, then widen, then fall back to + # least-connected node. This makes expansion autonomous — the graph + # never permanently stalls. + frontier: list[str] = [] + base_threshold = 3 + for threshold in (base_threshold, base_threshold * 2, base_threshold * 4): + frontier = await self.gm.get_frontier_nodes( + threshold=threshold, limit=self.concurrency + ) + if frontier: + if threshold > base_threshold: + logger.info( + "Frontier widened to degree<%d (%d candidates)", + threshold, len(frontier), + ) + break + if not frontier: - logger.info("No frontier nodes to expand") - return + # All nodes are highly connected — pick the least-connected as + # seed for the next expansion wave. + least = await self.gm.get_least_connected_nodes(limit=self.concurrency) + if least: + frontier = [n["id"] for n in least] + logger.info( + "Autonomous re-seed: expanding least-connected node (degree=%d): %s", + least[0]["degree"], frontier[0], + ) + else: + logger.info("Graph is empty, nothing to expand") + return if self.concurrency == 1: await self._expand_node(frontier[0]) @@ -160,7 +183,10 @@ async def _expand_node(self, node_id: str): if not node: return - logger.info("Expanding from node: %s", node_id) + parent_layer = node.get("layer", 0) if isinstance(node, dict) else 0 + child_layer = parent_layer + 1 + + logger.info("Expanding from node: %s (layer %d -> %d)", node_id, parent_layer, child_layer) related, cost = await self._generate_related(node) await self.budget.record(cost) @@ -172,12 +198,12 @@ async def _expand_node(self, node_id: str): added = 0 for event in related: - ok = await self._add_event(event, source_node_id=node_id) + ok = await self._add_event(event, source_node_id=node_id, layer=child_layer) if ok: added += 1 logger.info( - "Expansion complete: added %d events from %s", added, node_id + "Expansion complete: added %d events from %s (layer %d)", added, node_id, child_layer ) async def _generate_related(self, node: dict) -> tuple[list[dict], float]: @@ -230,10 +256,10 @@ async def _generate_related(self, node: dict) -> tuple[list[dict], float]: return json.loads(text), cost - async def _add_event(self, event: dict, source_node_id: str) -> bool: + async def _add_event(self, event: dict, source_node_id: str, layer: int = 1) -> bool: if self.jm: return await self._add_via_flash(event, source_node_id) - return await self._add_direct(event, source_node_id) + return await self._add_direct(event, source_node_id, layer=layer) async def _add_via_flash(self, event: dict, source_node_id: str) -> bool: name = event.get("name", "") @@ -275,8 +301,8 @@ async def _add_via_flash(self, event: dict, source_node_id: str) -> bool: logger.error("Flash render error for %s: %s", query, e) return False - async def _add_direct(self, event: dict, source_node_id: str) -> bool: - """Fallback: add node directly without Flash (layer 1).""" + async def _add_direct(self, event: dict, source_node_id: str, layer: int = 1) -> bool: + """Fallback: add node directly without Flash.""" from app.core.url import build_path, MONTH_TO_NUM month_str = str(event.get("month", "january")).lower() @@ -312,7 +338,7 @@ async def _add_direct(self, event: dict, source_node_id: str) -> bool: region=event.get("region", "unknown"), city=event.get("city", "unknown"), slug=path.split("/")[-1], - layer=1, + layer=layer, visibility="public", created_by="expander", source_type="expander",