Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions app/core/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,25 @@ async def get_frontier_nodes(self, threshold: int = 3, limit: int = 0) -> list[s
rows = await conn.fetch(query, *params)
return [row["id"] for row in rows]

async def get_least_connected_nodes(self, limit: int = 1) -> list[dict]:
"""Return nodes sorted by degree ascending — always finds candidates."""
query = """
SELECT n.id, coalesce(ec.cnt, 0) AS deg
FROM nodes n
LEFT JOIN (
SELECT id, count(*) AS cnt FROM (
SELECT source AS id FROM edges
UNION ALL
SELECT target AS id FROM edges
) sub GROUP BY id
) ec ON ec.id = n.id
ORDER BY coalesce(ec.cnt, 0) ASC, random()
LIMIT $1
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(query, limit)
return [{"id": row["id"], "degree": row["deg"]} for row in rows]

async def degree(self, node_id: str) -> int:
async with self.pool.acquire() as conn:
count = await conn.fetchval(
Expand Down
54 changes: 40 additions & 14 deletions app/workers/expander.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,36 @@ async def _expand_cycle(self):
)
return

# Get frontier nodes for concurrent expansion
frontier = await self.gm.get_frontier_nodes(
threshold=3, limit=self.concurrency
)
# Adaptive frontier: try base threshold, then widen, then fall back to
# least-connected node. This makes expansion autonomous — the graph
# never permanently stalls.
frontier: list[str] = []
base_threshold = 3
for threshold in (base_threshold, base_threshold * 2, base_threshold * 4):
frontier = await self.gm.get_frontier_nodes(
threshold=threshold, limit=self.concurrency
)
if frontier:
if threshold > base_threshold:
logger.info(
"Frontier widened to degree<%d (%d candidates)",
threshold, len(frontier),
)
break

if not frontier:
logger.info("No frontier nodes to expand")
return
# All nodes are highly connected — pick the least-connected as
# seed for the next expansion wave.
least = await self.gm.get_least_connected_nodes(limit=self.concurrency)
if least:
frontier = [n["id"] for n in least]
logger.info(
"Autonomous re-seed: expanding least-connected node (degree=%d): %s",
least[0]["degree"], frontier[0],
)
else:
logger.info("Graph is empty, nothing to expand")
return

if self.concurrency == 1:
await self._expand_node(frontier[0])
Expand All @@ -160,7 +183,10 @@ async def _expand_node(self, node_id: str):
if not node:
return

logger.info("Expanding from node: %s", node_id)
parent_layer = node.get("layer", 0) if isinstance(node, dict) else 0
child_layer = parent_layer + 1

logger.info("Expanding from node: %s (layer %d -> %d)", node_id, parent_layer, child_layer)
related, cost = await self._generate_related(node)
await self.budget.record(cost)

Expand All @@ -172,12 +198,12 @@ async def _expand_node(self, node_id: str):

added = 0
for event in related:
ok = await self._add_event(event, source_node_id=node_id)
ok = await self._add_event(event, source_node_id=node_id, layer=child_layer)
if ok:
added += 1

logger.info(
"Expansion complete: added %d events from %s", added, node_id
"Expansion complete: added %d events from %s (layer %d)", added, node_id, child_layer
)

async def _generate_related(self, node: dict) -> tuple[list[dict], float]:
Expand Down Expand Up @@ -230,10 +256,10 @@ async def _generate_related(self, node: dict) -> tuple[list[dict], float]:

return json.loads(text), cost

async def _add_event(self, event: dict, source_node_id: str) -> bool:
async def _add_event(self, event: dict, source_node_id: str, layer: int = 1) -> bool:
if self.jm:
return await self._add_via_flash(event, source_node_id)
return await self._add_direct(event, source_node_id)
return await self._add_direct(event, source_node_id, layer=layer)

async def _add_via_flash(self, event: dict, source_node_id: str) -> bool:
name = event.get("name", "")
Expand Down Expand Up @@ -275,8 +301,8 @@ async def _add_via_flash(self, event: dict, source_node_id: str) -> bool:
logger.error("Flash render error for %s: %s", query, e)
return False

async def _add_direct(self, event: dict, source_node_id: str) -> bool:
"""Fallback: add node directly without Flash (layer 1)."""
async def _add_direct(self, event: dict, source_node_id: str, layer: int = 1) -> bool:
"""Fallback: add node directly without Flash."""
from app.core.url import build_path, MONTH_TO_NUM

month_str = str(event.get("month", "january")).lower()
Expand Down Expand Up @@ -312,7 +338,7 @@ async def _add_direct(self, event: dict, source_node_id: str) -> bool:
region=event.get("region", "unknown"),
city=event.get("city", "unknown"),
slug=path.split("/")[-1],
layer=1,
layer=layer,
visibility="public",
created_by="expander",
source_type="expander",
Expand Down