Add distributed mesh inference for multi-device compute sharing#1
Add distributed mesh inference for multi-device compute sharing#1Jackson57279 wants to merge 8 commits intomasterfrom
Conversation
Features: - Auto-discovery using mDNS + IP scanning (parallel) - mTLS transport with auto-generated Ed25519 certificates - Leader election (oldest node wins) - Route strategy: least-loaded node selection - Split strategy: layer-wise model distribution for large models - Real-time dashboard with OpenWebUI-inspired design - WebSocket-based chat interface with streaming - CLI 'up' command for easy mesh joining Architecture: - MeshDiscovery: mDNS + IP scanner for peer finding - MeshTransport: async mTLS connections - MeshCoordinator: leader election, state sync, job scheduling - DistributedInferenceEngine: transparent routing/splitting - MeshDashboard: FastAPI + Socket.IO + vanilla JS frontend Security: - Automatic TLS certificate generation per-node - Certificate pinning for trusted peers - 30-day certificate rotation Usage: miniforge up # Join mesh with auto-discovery miniforge up --ip 192.168.1.100 # Connect to specific peer
Host/Worker Topology: - Host: Stores all models, streams to workers on-demand - Worker: No local models required, downloads from host automatically - ModelRegistry: Tracks model locations across mesh - Weight Streaming: Chunk-based model transfer (16MB chunks) - Auto-download: Workers fetch models from host on first use CLI Changes: - --mode host|worker|auto: Choose node mode - --host-ip: Specify host IP for workers - --model-path: Register local model for host mode Usage: # Host (has models) miniforge up --mode host --model-path /path/to/model.gguf # Worker (no models, auto-downloads) miniforge up --mode worker --host-ip 192.168.1.100 Benefits: - Download models only once (on host) - Workers join with zero storage requirement - Automatic caching on workers for performance
|
@cubic-dev-ai review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughA new distributed inference mesh system is introduced via pyproject.toml dependency updates, CLI integration, and a comprehensive Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI
participant Coordinator
participant Discovery
participant Transport
participant RemoteNode
participant LocalEngine
User->>CLI: mesh up --mode host
CLI->>Coordinator: initialize()
Coordinator->>Discovery: start()
Discovery-->>Coordinator: on_peer_discovered()
Coordinator->>Transport: start()
Transport->>Transport: listen for connections
Note over Discovery: Scan /24 subnet & register mDNS service
Discovery-->>RemoteNode: broadcast node presence
RemoteNode->>Transport: connect_to_peer()
Transport-->>Transport: handshake & add connection
Transport->>Coordinator: on_connection_event("connected")
Coordinator->>Coordinator: run leader_election_loop()
Coordinator-->>CLI: is_leader = true
User->>CLI: generate("What is AI?")
CLI->>Coordinator: schedule_inference(job, strategy="auto")
Coordinator->>Coordinator: _select_least_loaded_node()
Coordinator-->>CLI: assigned_node = RemoteNode.id
CLI->>Transport: get_connection(remote_node_id)
CLI->>Transport: send("inference_request", payload)
Transport->>RemoteNode: transmit via MeshConnection
RemoteNode->>LocalEngine: generate(prompt)
LocalEngine-->>RemoteNode: result
RemoteNode->>Transport: send("inference_response", result)
Transport-->>CLI: receive response
Coordinator->>Coordinator: update job status = "completed"
CLI-->>User: display result
sequenceDiagram
participant Worker as Worker Node
participant Coordinator
participant Transport
participant Host as Host Node
participant LocalEngine
Worker->>Coordinator: initialize(is_host=false)
Host->>Coordinator: initialize(is_host=true)
Note over Host: register_local_model("model_id", path)
Worker->>Worker: generate(prompt, model_id)
Worker->>Worker: check ModelRegistry for model_id
Worker-->>Worker: model not found, need download
Worker->>Transport: connect_to_peer(Host)
Transport-->>Host: connection established
Worker->>Host: send("model_request", model_id)
Host->>LocalEngine: registry.stream_weights(model_id)
LocalEngine-->>Host: chunk iterator
Host->>Transport: send("model_chunk", bytes)
Transport->>Worker: receive chunk
Worker->>Worker: cache layer to disk
loop Until complete
Host->>Host: stream next chunk
Host->>Transport: send("model_chunk", bytes)
Transport->>Worker: dispatch to handler
Worker->>Worker: cache layer
end
Host->>Transport: send("model_chunk", final_signal)
Transport->>Worker: complete event
Worker->>Worker: registry.download_model() finishes
Worker->>LocalEngine: generate(prompt)
LocalEngine-->>Worker: result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Reorder route registration (static before template routes) - Add Jinja2 context processors for url() function - Add error handling for template rendering with fallback HTML - Update static file URLs to use Jinja2 url() function
- Move index.html to templates/ directory - Add named static route with append_version=True - Properly configure Jinja2 with FileSystemLoader - Static files now served from static/, templates from templates/
- Provides a non-Jinja2 version for direct file access - Useful for testing static files without running the server - Falls back gracefully if template rendering fails
- Use /static/style.css and /static/app.js directly - Avoids dependency on Jinja2 url() function - More reliable for initial rendering
There was a problem hiding this comment.
Actionable comments posted: 15
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
🟠 Major comments (8)
src/miniforge/mesh/cli.py-118-134 (1)
118-134:⚠️ Potential issue | 🟠 Major
--host-ipand--ipflags are effectively no-ops.
- Lines 118-120:
--host-ipis only printed;host_node_idis never populated from it (it staysNonethrough to L174), so worker mode can't actually target a specific host.- Lines 122-134: when
--ipis passed, aPeerInfois constructed and then dropped — there is no call todiscovery/transport/coordinatorto initiate the connection. The log message "Connecting to peer …" is misleading; nothing is connected.Either wire these into the discovery/transport layer (e.g.,
await discovery.add_manual_peer(peer)orawait transport.connect(peer)) or remove the flags until the plumbing exists so users don't rely on advertised-but-broken behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/cli.py` around lines 118 - 134, The --host-ip and --ip flags are no-ops: when args.host_ip is provided you should set the host_node_id or otherwise populate the host target used later (instead of only printing) and when args.ip is provided you must actually hand the constructed PeerInfo to the discovery/transport layer instead of dropping it. Fix by assigning the host target variable used later (e.g., set host_node_id or host_peer based on args.host_ip) and then call the appropriate async API with the created PeerInfo (e.g., await discovery.add_manual_peer(peer) or await transport.connect(peer) or forward it to coordinator.connect_peer) so the CLI actually initiates a connection; ensure the code paths using host_node_id and PeerInfo now receive those values.src/miniforge/mesh/transport.py-350-371 (1)
350-371:⚠️ Potential issue | 🟠 MajorHeartbeats advertise bogus resource values.
ram_available=self.ram_gbreports total RAM as available, andcpu_percent=0.0is constant.MeshCoordinator._select_least_loaded_nodesorts by exactly these two fields, so every node appears equally idle with full RAM, and "least loaded" routing is a no-op. Thepsutilpackage is already standard for this (psutil.virtual_memory().available / 1e9,psutil.cpu_percent(interval=None)). Worth populating before merging, since the routing heuristic is one of the PR's advertised features.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/transport.py` around lines 350 - 371, In _heartbeat_loop replace the bogus resource fields with real metrics: import psutil and send ram_available as psutil.virtual_memory().available / 1e9 and cpu_percent as psutil.cpu_percent(interval=None); ensure values are floats and catch/handle psutil exceptions (fallback to self.ram_gb and 0.0) so the payload still contains ram_available and cpu_percent for MeshCoordinator._select_least_loaded_node to use; add the psutil import at top of the module and keep the existing exception logging behavior.src/miniforge/mesh/coordinator.py-105-130 (1)
105-130:⚠️ Potential issue | 🟠 Major
all_nodesfabricates a stale self-entry on every call.Each property access builds a brand‑new
NodeState(ip="127.0.0.1", port=0, ram_available=0.0, cpu_percent=0.0, ...). Consequences:
total_resources["available_ram_gb"]excludes this node's actual available RAM (always adds 0 for self).is_leaderon the synthetic self is current, butlast_heartbeatdefaults totime.time()at construction, which accidentally makes self always look "fresh" to any caller that relies on it.dashboard.api_nodesexposesip=127.0.0.1,port=0for this node._select_least_loaded_nodesorts including this synthetic self withcpu_percent=0.0, biasing routing toward self.Track self as a real
NodeStatein_nodes(or a dedicatedself._self_state) and updateram_available,cpu_percent,ip,portin the heartbeat/sync loop.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/coordinator.py` around lines 105 - 130, The all_nodes property is constructing a synthetic self NodeState (in all_nodes) causing stale/wrong values (ip="127.0.0.1", port=0, ram_available=0, cpu_percent=0) which biases total_resources and selection logic; instead stop fabricating self on every call and keep a real mutable self state in the coordinator (either store the coordinator's own NodeState in the existing _nodes mapping keyed by self.node_id or introduce a dedicated attribute self._self_state), ensure heartbeat/sync code updates its ram_available, cpu_percent, ip and port, then change all_nodes to return list(self._nodes.values()) (or include self._self_state) so total_resources, _select_least_loaded_node and dashboard.api_nodes use the up-to-date self information.src/miniforge/mesh/registry.py-131-170 (1)
131-170:⚠️ Potential issue | 🟠 Major
stream_weightsblocks the event loop and has no error-path completion signal.
open(path, "rb")+f.read(CHUNK_SIZE)are synchronous calls in anasync def. On a multi-GB model each 16 MiB read plus the synchronouschunk_callback(which inhost_worker.pyschedulesasyncio.create_task) blocks every other coroutine on the host — heartbeats, peer handshakes, and incoming inference requests all stall during a weight transfer. Consider wrapping reads withasyncio.to_thread(...)or usingaiofiles, and makechunk_callbackawaitable so the host can apply backpressure per chunk.Also: if
f.reador the callback raises mid-stream, the caller never sends a terminatingmodel_chunkwithcomplete=True/ error, so the worker blocks until its 300 s timeout. Afinallythat forwards a terminal frame (or a documented contract that the caller must do so) is worth adding.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/registry.py` around lines 131 - 170, stream_weights is doing blocking file I/O and synchronous callbacks inside an async def which stalls the event loop and also lacks a guaranteed terminal signal on errors; modify stream_weights to perform file reads off the event loop (use asyncio.to_thread or aiofiles when reading Path(model.model_path)) and change the chunk_callback contract to be awaitable (await chunk_callback(chunk) instead of calling it directly) so backpressure can be applied (check caller in host_worker.py to await created tasks or accept async callback), and wrap the read/send loop in try/except/finally so a terminal model_chunk frame (complete=True or an error indicator) is always sent even if an exception occurs.src/miniforge/mesh/coordinator.py-369-393 (1)
369-393:⚠️ Potential issue | 🟠 MajorLeader election compares incompatible timestamps.
candidatesmixesself._started_at(when this process booted) with each remote node'slast_heartbeat(when we last heard from it). Pickingminby that key doesn't select the oldest node — it selects whichever value happens to be smaller between "my boot time" and "last time a peer contacted me". On a freshly joined node,last_heartbeatfrom a long‑running peer will be much newer thanself._started_at, so this node will (incorrectly) elect itself.You need the peer's own
started_at, not ours. Add astarted_atfield toNodeState, populate it from the handshake / heartbeat payload, and use it in the key. Also, leader election here is purely local (no consensus / message exchange), so split‑brain is likely — each node will locally elect whoever it thinks is oldest. Consider broadcasting the elected leader and treating divergent opinions deterministically.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/coordinator.py` around lines 369 - 393, The election incorrectly mixes this process's boot time (self._started_at) with peers' last_heartbeat; update NodeState to include a started_at timestamp, populate that started_at from the handshake/heartbeat payload handling code (where remote node info is parsed), and in _run_election use each node.started_at (not last_heartbeat) when building candidates so the min key compares actual boot times (e.g., candidates = [(self.node_id, self._started_at)] + [(node.node_id, node.started_at) for node in self._nodes.values()]); after computing winner in _run_election, broadcast the elected leader to peers (or call the existing heartbeat/send API) and implement handling of incoming leader announcements to set _leader_id/_is_leader deterministically to reduce split‑brain.src/miniforge/mesh/dashboard.py-215-253 (1)
215-253:⚠️ Potential issue | 🟠 MajorManual connect bypasses public API and pollutes node table.
Two concerns:
self.coordinator._connect_to_peer(peer)reaches into a private method; the coordinator should expose a publicconnect_to(ip, port)(or similar) that does peer resolution, logging, and state updates atomically.PeerInfo(node_id="unknown", ...)is persisted intoMeshCoordinator._nodeskeyed by"unknown"(seecoordinator._connect_to_peer). A second manual connect will overwrite the first, and leader election / routing will see spuriousnode_id="unknown"entries. Prefer deferring theNodeStateinsert until the realnode_idis learned from the handshake (MINIFORGE_OK|<node_id>|...).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/dashboard.py` around lines 215 - 253, api_connect currently constructs a PeerInfo with node_id="unknown" and calls the private coordinator method _connect_to_peer, which both leaks into a private API and inserts a bogus NodeState keyed by "unknown"; instead, add a public coordinator method (e.g., connect_to(ip, port) or connect_peer) that handles peer resolution, connection handshake, logging and only inserts/updates MeshCoordinator._nodes after the real node_id is learned from the handshake, and change api_connect to call that new public method with ip and port (removing creation of PeerInfo and any direct access to _connect_to_peer); ensure the handshake path parses the MINIFORGE_OK response to get the canonical node_id before creating NodeState so repeated manual connects do not overwrite or pollute the node table.src/miniforge/mesh/discovery.py-174-212 (1)
174-212:⚠️ Potential issue | 🟠 Major
_scan_networkbuilds and discards a coroutine list, then duplicates the work.Lines 187‑195 populate
tasks = [...]with coroutines fromself._check_host(...)that are never awaited (RuntimeWarning: coroutine was never awaited, plus no limiting). The real scan then runs again viacheck_with_limitin lines 207‑212, duplicating the host iteration and doing the filtering twice.🧹 Proposed fix
- # Create scan tasks for all hosts - tasks = [] - hosts = list(network.hosts())[:50] # Limit to first 50 hosts - - for host in hosts: - host_str = str(host) - if host_str == local_ip: - continue - for port in ports_to_check: - tasks.append(self._check_host(host_str, port)) - - if not tasks: - return - - # Run checks with semaphore to limit concurrency + hosts = list(network.hosts())[:50] semaphore = asyncio.Semaphore(20) async def check_with_limit(host: str, port: int) -> None: async with semaphore: await self._check_host(host, port) await asyncio.gather(*[ - check_with_limit(str(h), p) - for h in hosts[:50] + check_with_limit(str(h), p) + for h in hosts for p in ports_to_check if str(h) != local_ip ], return_exceptions=True)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/discovery.py` around lines 174 - 212, _scan_network is creating coroutines into tasks via self._check_host(...) and never awaiting them (causing RuntimeWarning) and then re-iterating hosts to run check_with_limit, duplicating work; fix by removing the unused tasks list and constructing the awaited work once using the concurrency limiter (semaphore) — e.g. build the gather call from check_with_limit(...) for each host/port (skipping local_ip) and await that with return_exceptions=True, or alternatively create tasks that wrap calls with the semaphore and await asyncio.gather on that tasks list; ensure you do not call self._check_host(...) directly without awaiting and keep semaphore/ check_with_limit usage consistent.src/miniforge/mesh/transport.py-1-16 (1)
1-16:⚠️ Potential issue | 🟠 MajorWire SSL context through asyncio calls or remove mTLS claims from module docstring.
The file header advertises "Async mTLS transport" and
MeshSecurityis instantiated, butstart_server()andopen_connection()omit thesslparameter. Thecreate_tls_context()method exists inMeshSecuritybut is never invoked. Connections remain cleartext, allowing any LAN host to intercept or injectinference_request/model_requestmessages. Passssl=self.security.create_tls_context(server_mode=True)tostart_server()andssl=self.security.create_tls_context(server_mode=False)toopen_connection(), or update the docstring to reflect actual TCP-only behavior.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/transport.py` around lines 1 - 16, The module claims "Async mTLS transport" but never wires the TLS context into asyncio calls; ensure MeshSecurity.create_tls_context is used and passed as the ssl argument to asyncio start_server and open_connection: call self.security.create_tls_context(server_mode=True) when invoking start_server() (for server-side sockets) and self.security.create_tls_context(server_mode=False) when invoking open_connection() (for client-side sockets), or if you intentionally want plaintext remove/update the mTLS claim in the module docstring and delete the unused MeshSecurity.create_tls_context usage to avoid misleading documentation; locate references to MeshSecurity, start_server(), open_connection(), and create_tls_context() in transport.py to implement the change.
🟡 Minor comments (9)
src/miniforge/mesh/cli.py-260-266 (1)
260-266:⚠️ Potential issue | 🟡 Minor
--mode autois not handled byis_host = args.mode == "host".
args.modeacceptshost | worker | auto(defaultauto), butis_host = args.mode == "host"at line 59 meansautosilently collapses to worker-mode semantics. Either handleautoexplicitly (e.g., elect via mesh state after discovery) or restrictchoicestohost|workeruntilautois implemented. Combined withhost_node_idbeingNone, anautonode effectively becomes a worker with no host target.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/cli.py` around lines 260 - 266, The CLI accepts mode "auto" but the code determines host/worker via is_host = args.mode == "host", causing "auto" to behave as worker; update the handling of args.mode in the CLI/startup flow: either (A) implement explicit "auto" election logic after discovery (use mesh state/peer discovery to set is_host and populate host_node_id) and ensure functions that rely on is_host, host_node_id, and any Host-specific setup (e.g., model storage initialization) use the elected values, or (B) constrain the CLI choices to ["host","worker"] until auto is implemented by changing the choices for up_parser and removing/avoiding the "auto" default; refer to the symbols args.mode, is_host, and host_node_id to locate and update the decision logic so "auto" is not silently treated as worker.src/miniforge/mesh/static/app.js-65-90 (1)
65-90:⚠️ Potential issue | 🟡 Minor
window.NODE_IDis never set — self-row highlighting is dead code.Lines 67 and 69 compare
node.node_id === window.NODE_ID, but no code (this file,index.html, ordashboard.py) ever assignswindow.NODE_ID. It is permanentlyundefined, so every node is treated as "not self": thenode-row selfclass and the(You)suffix never get applied onceupdateNodesTable()replaces the initial server-rendered row.Fix by injecting the node id from the template (e.g., in
index.htmladd<script>window.NODE_ID = "{{ node_id }}";</script>beforeapp.jsand passnode_idin the dashboard render context).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/static/app.js` around lines 65 - 90, The client-side self-highlighting is broken because updateNodesTable() checks window.NODE_ID (used in comparisons like node.node_id === window.NODE_ID and class "node-row self" / name "(You)"), but window.NODE_ID is never set; fix it by injecting the current node id into the page before app.js loads (e.g., add a script in the HTML template that assigns window.NODE_ID = "{{ node_id }}"), and ensure the server-side dashboard render (dashboard.py) includes node_id in the template context so updateNodesTable() can correctly mark the self row and append "(You)".src/miniforge/cli.py-167-173 (1)
167-173:⚠️ Potential issue | 🟡 MinorSilently swallowing
ImportErrorwill mask real bugs in mesh submodules.The
meshsubpackage's__init__.pyeagerly imports 8 submodules (discovery,transport,coordinator,engine,security,dashboard,registry,host_worker). ANameError, typo, or unrelated import failure inside any of them surfaces here asImportErrorand is silently discarded — making it indistinguishable from "mesh extras not installed". For reference,mesh/cli.pycurrently has anOptionalundefined-name bug that would be completely hidden by this handler.At minimum, log the exception so operators have a signal when something non-obvious is wrong:
🔧 Suggested change
# Mesh command (distributed inference) try: from miniforge.mesh.cli import add_mesh_subparser add_mesh_subparser(subparsers) - except ImportError: - # Mesh dependencies not installed - pass + except ImportError as e: + # Mesh extras not installed, or a mesh submodule failed to import. + # Log at debug so real breakage isn't silent, but don't block the CLI. + import logging + logging.getLogger(__name__).debug("Mesh CLI not available: %s", e)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/cli.py` around lines 167 - 173, The current try/except around "from miniforge.mesh.cli import add_mesh_subparser" silently swallows ImportError and can hide real errors in mesh submodules; update the block to catch ImportError, log the full exception (use logging.exception or the existing processLogger) with a clear message mentioning add_mesh_subparser/mesh.cli, and only suppress the exception when it clearly indicates missing optional extras (e.g., check "No module named" in the exception message); otherwise re-raise so real bugs (like the undefined Optional in mesh/cli.py) are visible.src/miniforge/mesh/static/app.js-101-128 (1)
101-128:⚠️ Potential issue | 🟡 MinorConcurrent sends during streaming corrupt the assistant message.
If the user presses Enter a second time while a stream is still in flight,
currentStreamMessageis overwritten with a new placeholder. Any remainingchat_chunkevents from the first stream then append to the second message's DOM node, and the first message is orphaned mid-response.chat_completefor the first stream will also prematurely unsetcurrentStreamMessage, cutting off the second.At minimum, disable the send button / early-return while a stream is in progress:
🔧 Suggested change
function sendMessage() { const input = document.getElementById('chat-input'); const message = input.value.trim(); if (!message) return; + if (currentStreamMessage) return; // don't start a new stream mid-flight // Add user message to chat addUserMessage(message); input.value = '';A more robust fix would be to track a per-stream id on the server events and match chunks to their placeholder, but a simple guard is sufficient for the dashboard's intended use.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/static/app.js` around lines 101 - 128, The sendMessage function allows a new send while a streaming response is active, causing currentStreamMessage to be overwritten and interleaving chunks; add a guard at the start of sendMessage that prevents a new send when a stream is in-progress (e.g., check if currentStreamMessage is non-null or maintain an isStreaming flag), disable the send button / Enter handling while streaming, and only create a new placeholder via addAssistantMessage and set currentStreamMessage when not already streaming; also ensure chat_complete clears the same flag/currentStreamMessage so subsequent sends are allowed.src/miniforge/mesh/static/index.html-34-71 (1)
34-71:⚠️ Potential issue | 🟡 MinorHardcoded resource values will show incorrect data on first paint.
Values like
1,28 GB,8 coresare baked into the initial HTML regardless of the actual node configuration (users can pass--ram/--cores). For a node with 64 GB / 16 cores, the dashboard will briefly lie on every load until the first/api/statusround-trip (and up to 5 s between refreshes).Either pass real values through Jinja (you already render
node_name/is_leaderserver-side) or initialize these fields to a neutral placeholder (—) so users don't see wrong data.🔧 Suggested change
- <div class="resource-item"> - <span class="resource-label">Nodes</span> - <span id="total-nodes" class="resource-value">1</span> - </div> - <div class="resource-item"> - <span class="resource-label">Total RAM</span> - <span id="total-ram" class="resource-value">28 GB</span> - </div> - <div class="resource-item"> - <span class="resource-label">Available RAM</span> - <span id="available-ram" class="resource-value">28 GB</span> - </div> - <div class="resource-item"> - <span class="resource-label">CPU Cores</span> - <span id="total-cpus" class="resource-value">8</span> - </div> + <div class="resource-item"> + <span class="resource-label">Nodes</span> + <span id="total-nodes" class="resource-value">—</span> + </div> + <div class="resource-item"> + <span class="resource-label">Total RAM</span> + <span id="total-ram" class="resource-value">—</span> + </div> + <div class="resource-item"> + <span class="resource-label">Available RAM</span> + <span id="available-ram" class="resource-value">—</span> + </div> + <div class="resource-item"> + <span class="resource-label">CPU Cores</span> + <span id="total-cpus" class="resource-value">—</span> + </div>Also note: the hardcoded self-row at lines 62-68 is replaced wholesale on the first
/api/nodesfetch byupdateNodesTable(), so it only shows during the initial flash and is also stale.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/static/index.html` around lines 34 - 71, The initial HTML hardcodes resource values and a self node row that flash stale data; replace those static values with server-rendered placeholders (use the existing Jinja variables like node_name and is_leader) or neutral placeholders (e.g. "—") for the elements with IDs total-nodes, total-ram, available-ram, total-cpus and the self row (class node-row self) so the first paint doesn't show incorrect resources; ensure the template injects real values when available or neutral placeholders and keep updateNodesTable() unchanged (it will still replace the self-row on the first /api/nodes fetch).src/miniforge/mesh/security.py-96-110 (1)
96-110:⚠️ Potential issue | 🟡 MinorReplace deprecated
datetime.utcnow()with timezone-aware alternative.Lines 96, 133, and 134 use
datetime.utcnow(), which emitsDeprecationWarningin Python 3.12 and is slated for removal. Replace withdatetime.now(timezone.utc)and use thenot_valid_after_utcproperty on certificates. Thecryptographylibrary ≥ 42 provides these UTC-aware alternatives (yourpyproject.tomlpinscryptography>=42.0.0).🔧 Suggested change
-from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone ... - if cert.not_valid_after > datetime.utcnow() + timedelta(days=1): + if cert.not_valid_after_utc > datetime.now(timezone.utc) + timedelta(days=1): ... - .not_valid_before(datetime.utcnow() - timedelta(days=1)) - .not_valid_after(datetime.utcnow() + timedelta(days=self.CERT_VALIDITY_DAYS)) + .not_valid_before(datetime.now(timezone.utc) - timedelta(days=1)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=self.CERT_VALIDITY_DAYS))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/security.py` around lines 96 - 110, Replace use of the deprecated naive timestamp check using datetime.utcnow(): in the certificate-loading block (where cert.not_valid_after is compared to datetime.utcnow() + timedelta(days=1)) switch to timezone-aware values by importing timezone and using datetime.now(timezone.utc) and, where available, use the certificate's UTC-aware property cert.not_valid_after_utc from the cryptography API; update the comparison to use cert.not_valid_after_utc (or convert cert.not_valid_after to UTC if that property isn't present) + timedelta(days=1), leaving the rest of the logic (calling self._generate_certificate(), writing cert.public_bytes(serialization.Encoding.PEM) to cert_path, and logging via logger) unchanged.src/miniforge/mesh/coordinator.py-186-238 (1)
186-238:⚠️ Potential issue | 🟡 Minor
strategy="local"andstrategy="remote"fall through silently.The docstring documents four strategies but the body only handles
auto,route, andsplit. Passing"local"or"remote"(asDistributedInferenceEngine.generatedoes withstrategy="route"viaschedule_inference, so this is specifically about callers passing the values from the docstring) leavesjob.status = "pending"andjob.assigned_node = None, and the method still returns the job — callers will wait forever in_wait_for_job_completion.Either remove the invalid values from the docstring, or add explicit branches (and raise for unknown strategies to mirror
DistributedInferenceEngine.generate'sValueError).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/coordinator.py` around lines 186 - 238, The schedule_inference method documents "local" and "remote" strategies but never handles them, causing jobs to remain pending; update schedule_inference to explicitly handle "local" (assign to current node or force local execution) and "remote" (route to a remote node via _select_least_loaded_node or similar) and set job.assigned_node, job.status="running", and job.started_at accordingly, and add an else that raises a ValueError for unknown strategies to match DistributedInferenceEngine.generate; reference schedule_inference, _select_least_loaded_node, _estimate_model_size, DistributedInferenceEngine.generate, and ensure _notify_state_change is still called before returning.src/miniforge/mesh/discovery.py-316-319 (1)
316-319:⚠️ Potential issue | 🟡 Minor
remove_serviceis a no-op so mDNS peers are never pruned.When a peer goes offline, the mDNS browser emits
remove_service, but we do nothing — the entry lingers in_peersuntil a separate caller invokesremove_stale_peers(). Implement a real removal that resolves the node to itsPeerInfo(either keep aname → keymap when handlingadd_serviceor look up by storednode_name) and calls_notify(peer, "removed")plusself._peers.pop(...).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/discovery.py` around lines 316 - 319, remove_service currently does nothing so peers never get pruned; implement it to resolve the service name to the corresponding PeerInfo and remove it from the in-memory registry. Either maintain a name→key map when handling add_service (update the map in add_service and use it here) or search self._peers for an entry whose PeerInfo.node_name matches the provided name, then call self._notify(peer, "removed") and pop that key from self._peers; ensure any auxiliary map is cleaned up and mirror the same removal logic used by remove_stale_peers.src/miniforge/mesh/registry.py-235-244 (1)
235-244:⚠️ Potential issue | 🟡 MinorTruncated SHA-256 defeats its purpose.
return h.hexdigest()[:16]keeps only 64 bits. If the goal is integrity (detecting silent corruption on download), this is fine for accidents but trivially forgeable and reduces collision resistance enormously. Since the PR frames the mesh as security-conscious (mTLS, cert pinning), return the full digest and compare via constant-time comparison on the worker afterdownload_modelcompletes. Storing/transmitting the full 64 hex chars is negligible cost.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/registry.py` around lines 235 - 244, The _calculate_checksum function currently truncates the SHA256 hex to 16 chars (64 bits); change it to return the full SHA-256 hex digest (all 64 hex chars) instead of h.hexdigest()[:16], and ensure downstream verification uses a constant-time comparison when validating after download (e.g., in the worker code that runs after download_model completes) so integrity checks transmit/store the full digest and compare securely.
🧹 Nitpick comments (9)
src/miniforge/mesh/security.py (2)
176-183:sslshould be imported at module level (also referenced in the return annotation).Line 176 uses
"ssl.SSLContext"as a stringified return type annotation butsslis imported inside the function body at line 178. The string annotation works at runtime, but any tool doingget_type_hints()(FastAPI/pydantic/etc., or mypy in some modes) will fail to resolve it. Move the import to the top of the module — it's a standard-library module, there's no reason to defer it.🔧 Suggested change
import logging import os +import ssl from datetime import datetime, timedelta ... - def create_tls_context(self, server_mode: bool = False) -> "ssl.SSLContext": + def create_tls_context(self, server_mode: bool = False) -> ssl.SSLContext: """Create SSL context with mTLS.""" - import ssl - context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER if server_mode else ssl.PROTOCOL_TLS_CLIENT)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/security.py` around lines 176 - 183, Move the local import of ssl out of create_tls_context and import it at module level so get_type_hints can resolve the return annotation; update the function signature to use a real annotation (ssl.SSLContext) instead of the stringified "ssl.SSLContext", remove the inner import statement in create_tls_context, and keep the function body using ssl.PROTOCOL_TLS_SERVER/CLIENT, ssl.TLSVersion, and ssl.CERT_REQUIRED as before (refer to the create_tls_context function and the ssl symbol).
135-144: Nit: avoid the__import__+ walrus dance; importipaddressnormally.The inline
ipaddress:=__import__("ipaddress").ip_address("127.0.0.1")at line 138 is hard to read and leaks anipaddressbinding into the enclosing method scope (which then shadows theipaddressmodule name for the rest of the function). Just import at the top of the file:🔧 Suggested change
import logging import os +import ipaddress from datetime import datetime, timedelta ... - x509.IPAddress(ipaddress:=__import__("ipaddress").ip_address("127.0.0.1")), + x509.IPAddress(ipaddress.ip_address("127.0.0.1")),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/security.py` around lines 135 - 144, Replace the inline "__import__" + walrus expression used to build the IPAddress in the x509.SubjectAlternativeName block with a normal module import: add "import ipaddress" at the top of the module and call ipaddress.ip_address("127.0.0.1") in the SubjectAlternativeName list (i.e., where x509.IPAddress(...) is constructed) so you avoid leaking a temporary binding and improve readability; keep the x509.SubjectAlternativeName(...), x509.DNSName("localhost"), and .sign(key, hashes.SHA256()) code unchanged.src/miniforge/mesh/__init__.py (1)
3-10: Eager re-exports couple everyminiforge.meshconsumer to all 8 submodules.Any consumer that imports anything from
miniforge.mesh(includingminiforge.mesh.cli, which you import in the top-level CLI) pays the cost of loading all heavy mesh deps (cryptography,msgpack,zeroconf,aiohttp,aiohttp-jinja2,python-socketio,aiodns) and surfaces any import-time failure in any submodule as anImportErrorat the package root.This also makes the
except ImportErrorinsrc/miniforge/cli.pymuch broader than intended — aNameError/SyntaxErrorin e.g.host_worker.pyis indistinguishable from "user didn't install themeshextra".Consider either keeping
__init__.pyfree of side-effect imports and letting callers import from submodules directly, or using__getattr__-based lazy loading (PEP 562). Given this is new code, the former is simplest:🔧 Suggested change (minimal)
-"""Miniforge Mesh - Distributed inference across multiple devices.""" - -from miniforge.mesh.discovery import MeshDiscovery, PeerInfo -from miniforge.mesh.transport import MeshTransport, MeshConnection -from miniforge.mesh.coordinator import MeshCoordinator, NodeState, InferenceJob -from miniforge.mesh.engine import DistributedInferenceEngine -from miniforge.mesh.security import MeshSecurity -from miniforge.mesh.dashboard import MeshDashboard -from miniforge.mesh.registry import ModelRegistry, ModelInfo -from miniforge.mesh.host_worker import HostWorkerEngine - -__all__ = [ - "MeshDiscovery", - "PeerInfo", +"""Miniforge Mesh - Distributed inference across multiple devices.""" + +__all__ = [ + "MeshDiscovery", + "PeerInfo", ... ] + +def __getattr__(name: str): + import importlib + mapping = { + "MeshDiscovery": "miniforge.mesh.discovery", + "PeerInfo": "miniforge.mesh.discovery", + "MeshTransport": "miniforge.mesh.transport", + "MeshConnection": "miniforge.mesh.transport", + "MeshCoordinator": "miniforge.mesh.coordinator", + "NodeState": "miniforge.mesh.coordinator", + "InferenceJob": "miniforge.mesh.coordinator", + "DistributedInferenceEngine": "miniforge.mesh.engine", + "MeshSecurity": "miniforge.mesh.security", + "MeshDashboard": "miniforge.mesh.dashboard", + "ModelRegistry": "miniforge.mesh.registry", + "ModelInfo": "miniforge.mesh.registry", + "HostWorkerEngine": "miniforge.mesh.host_worker", + } + if name in mapping: + return getattr(importlib.import_module(mapping[name]), name) + raise AttributeError(name)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/__init__.py` around lines 3 - 10, The package __init__.py currently eagerly imports and re-exports MeshDiscovery, MeshTransport, MeshCoordinator, DistributedInferenceEngine, MeshSecurity, MeshDashboard, ModelRegistry, HostWorkerEngine, etc., causing heavy dependency loads and masking import errors; remove these top-level imports and stop re-exporting submodule symbols from miniforge.mesh so consumers import directly from their specific submodules (e.g., miniforge.mesh.discovery.MeshDiscovery, miniforge.mesh.transport.MeshTransport, miniforge.mesh.coordinator.MeshCoordinator/NodeState/InferenceJob, miniforge.mesh.engine.DistributedInferenceEngine, miniforge.mesh.security.MeshSecurity, miniforge.mesh.dashboard.MeshDashboard, miniforge.mesh.registry.ModelRegistry/ModelInfo, miniforge.mesh.host_worker.HostWorkerEngine), or alternatively implement a PEP 562 __getattr__ in __init__.py to lazily import those names on demand so importing miniforge.mesh does not load all heavy deps or convert top-level import sites (like the CLI) to import required symbols from the specific submodule instead.src/miniforge/mesh/cli.py (2)
154-175: Duplicated branch logic; the only difference isis_host/host_node_id.Both branches import
HostWorkerEngineand construct it with identical arguments exceptis_hostandhost_node_id(which isNonein both cases anyway — see the--host-ipcomment above). Collapse to a single construction:🔧 Suggested refactor
- # Initialize engine based on mode - if is_host: - # Host uses distributed engine with registry - from miniforge.mesh.host_worker import HostWorkerEngine - engine = HostWorkerEngine( - local_engine=local_engine, - coordinator=coordinator, - registry=registry, - node_id=node_id, - is_host=True, - ) - else: - # Worker uses host/worker engine - from miniforge.mesh.host_worker import HostWorkerEngine - engine = HostWorkerEngine( - local_engine=local_engine, - coordinator=coordinator, - registry=registry, - node_id=node_id, - is_host=False, - host_node_id=host_node_id, - ) + from miniforge.mesh.host_worker import HostWorkerEngine + engine = HostWorkerEngine( + local_engine=local_engine, + coordinator=coordinator, + registry=registry, + node_id=node_id, + is_host=is_host, + host_node_id=host_node_id, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/cli.py` around lines 154 - 175, The HostWorkerEngine construction is duplicated; instead import HostWorkerEngine once and collapse the two branches into a single construction that passes the variable is_host and host_node_id directly (e.g., engine = HostWorkerEngine(local_engine=local_engine, coordinator=coordinator, registry=registry, node_id=node_id, is_host=is_host, host_node_id=host_node_id)), removing the branch-specific duplicate code and keeping a single assignment to engine.
68-68: Nit: f-string without placeholders.Ruff F541. Either drop the
fprefix here (and similarly on any other constant f-strings) or embed a real placeholder.- print(f"🔥 Miniforge Mesh Starting...") + print("🔥 Miniforge Mesh Starting...")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/cli.py` at line 68, The print call using a constant f-string triggers Ruff F541; in the print statement that currently reads print(f"🔥 Miniforge Mesh Starting...") remove the unnecessary f prefix (change to print("🔥 Miniforge Mesh Starting...")) or replace it with a real placeholder if interpolation is intended; also scan for any other constant f-strings in this module (e.g., similar startup/log prints) and apply the same change.pyproject.toml (1)
47-56: Minor: redundant/inconsistent dependency declarations.
aiohttpis already a core dependency at line 27 (aiohttp>=3.9); re-listingaiohttp>=3.9.0here undermeshis redundant (and the.0suffix makes them look mismatched even though they're equivalent).- Separately, the
llama-cppextra at lines 39-41 pinsllama-cpp-python>=0.3.5while the core dep at line 35 requires>=0.3.20. The stricter core pin wins, but the extra should either match or be removed to avoid confusion.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pyproject.toml` around lines 47 - 56, The mesh extra currently re-declares aiohttp and the llama-cpp extra has an inconsistent version compared to the core deps; remove the redundant aiohttp entry from the mesh extras (or change it to exactly match the core spec "aiohttp>=3.9") and update the llama-cpp extra's dependency to match the core requirement ("llama-cpp-python>=0.3.20") or remove the duplicate from the extra so all declarations are consistent (look for the "mesh" extras block and the extra that references "llama-cpp-python" to apply the change).src/miniforge/mesh/registry.py (1)
172-206:download_modelregisters an incomplete file if the stream errors midway.If
request_layer_streamraises after some bytes have been written, the partialoutput_pathis left on disk andregister_local_modelis never called — but the next boot, if any caller ever inspectscache_dirby filename, the truncated file is sitting there. Safer pattern: download tooutput_path.with_suffix(".gguf.part"),os.replace(tmp, output_path)only on success, and remove the partial on exception. Also consider verifying the downloaded file against the host-providedModelInfo.checksumbefore registering.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/registry.py` around lines 172 - 206, The download_model implementation can leave a truncated file and fail to register if the stream errors mid-download; change download_model to stream into a temporary path (e.g., output_path.with_suffix(".gguf.part")), write chunks to that temp file, and on full successful completion replace the temp with os.replace(temp, output_path) then call register_local_model(model_id, str(output_path)); on any exception ensure the temp file is removed (cleanup) so no partial files remain; additionally, if ModelInfo.checksum (or equivalent metadata) is available from the source node, compute and verify the checksum of the final file before replacing/registering and fail/clean up on mismatch.src/miniforge/mesh/dashboard.py (1)
267-278: Unbounded, silently-swallowed broadcast tasks.
asyncio.create_task(self._broadcast_status())drops the reference, so exceptions are only visible via the task's ownexcept Exception: logger.debug(...)and the task can be garbage-collected mid-flight (RUF006). More importantly, coordinator state changes can fire rapidly (peer add/update/remove, heartbeats), producing a burst of overlapping broadcasts. Consider coalescing (e.g. anasyncio.Eventset here and a single long-running broadcaster task draining it), and keep a strong reference to any task you do spawn.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/dashboard.py` around lines 267 - 278, _on_state_change currently fires a detached asyncio.create_task(self._broadcast_status()) which can be garbage-collected, swallow exceptions, and produce overlapping bursts; change this to start and keep a strong reference to a single long-lived broadcaster task (e.g., self._broadcaster_task) and use an asyncio.Event (e.g., self._broadcast_event) that _on_state_change sets instead of creating tasks; implement a long-running method (e.g., _broadcaster_loop) that awaits the event, clears it, coalesces rapid sets (optionally with asyncio.wait_for/sleep debounce), calls the existing _broadcast_status() safely inside try/except and logs errors, and ensure the task is created once (on init/start) and cancelled on shutdown.src/miniforge/mesh/engine.py (1)
236-247: Busy‑wait + unbounded connection fan-out.
_wait_for_job_completionpollsjob.statusevery 100 ms; anasyncio.Eventset from the coordinator's_handle_inference_responsewould be both cheaper and race‑free. Similarly,_get_connection_to_nodeneeds to look up bynode_id; thefor conn in connections.values(): return connpattern guarantees it returns the wrong connection for any mesh with ≥2 peers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/engine.py` around lines 236 - 247, _wait_for_job_completion busy-waits on job.status; change it to await an asyncio.Event tied to the InferenceJob (e.g., add an event attribute on InferenceJob or a job_id->Event map) and have the coordinator set that event from _handle_inference_response when the job completes/ fails, then await the event in _wait_for_job_completion to avoid polling/races. Also fix _get_connection_to_node to look up the connection by node_id in coordinator.transport.connections (use the node_id key or a node_id->MeshConnection mapping) instead of returning the first entry; return None if no matching MeshConnection is found. Ensure references: InferenceJob (event), _wait_for_job_completion, _handle_inference_response, _get_connection_to_node, coordinator.transport.connections, MeshConnection.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 51cad341-8413-4bcb-ba71-260098357bbc
📒 Files selected for processing (15)
pyproject.tomlsrc/miniforge/cli.pysrc/miniforge/mesh/__init__.pysrc/miniforge/mesh/cli.pysrc/miniforge/mesh/coordinator.pysrc/miniforge/mesh/dashboard.pysrc/miniforge/mesh/discovery.pysrc/miniforge/mesh/engine.pysrc/miniforge/mesh/host_worker.pysrc/miniforge/mesh/registry.pysrc/miniforge/mesh/security.pysrc/miniforge/mesh/static/app.jssrc/miniforge/mesh/static/index.htmlsrc/miniforge/mesh/static/style.csssrc/miniforge/mesh/transport.py
| # Determine mode | ||
| is_host = args.mode == "host" | ||
| host_node_id: Optional[str] = None |
There was a problem hiding this comment.
Critical: Optional is not imported — mesh up will crash with NameError.
Line 60 uses Optional[str] but typing.Optional is never imported in this module. Any invocation of mesh up raises NameError: name 'Optional' is not defined at module/function load, and — compounded by the except ImportError: pass handler in src/miniforge/cli.py — the mesh subcommand will simply disappear from the CLI without any visible error.
🔧 Suggested fix
import argparse
import asyncio
import logging
import sys
from pathlib import Path
+from typing import OptionalOr, since from __future__ import annotations is already in effect (line 3), you can use "str | None" / str | None directly without importing Optional:
- host_node_id: Optional[str] = None
+ host_node_id: str | None = None📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Determine mode | |
| is_host = args.mode == "host" | |
| host_node_id: Optional[str] = None | |
| # Determine mode | |
| is_host = args.mode == "host" | |
| host_node_id: str | None = None |
🧰 Tools
🪛 Ruff (0.15.10)
[error] 60-60: Undefined name Optional
(F821)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/cli.py` around lines 58 - 60, The code uses Optional in
the declaration of host_node_id but Optional is not imported, causing a
NameError; fix by either importing Optional from typing at the top of the module
or replace the annotation with a native union type (e.g., str | None) since from
__future__ import annotations is enabled; update the declaration of host_node_id
(the symbol to change) and add the typing import if you choose that route.
| async def _handle_inference_request(self, payload: Dict[str, Any]) -> None: | ||
| """Handle incoming inference request from peer.""" | ||
| job_id = payload.get("job_id") | ||
| prompt = payload.get("prompt") | ||
| model_id = payload.get("model_id", "minimax") | ||
| max_tokens = payload.get("max_tokens", 512) | ||
|
|
||
| logger.info(f"Received inference request {job_id} from peer") | ||
|
|
||
| # TODO: Execute inference using local engine | ||
| # For now, just acknowledge |
There was a problem hiding this comment.
Remote inference is never actually executed — always times out.
This handler is registered on every incoming connection for "inference_request" (line 304), but its body is a TODO. DistributedInferenceEngine._execute_remote sends "inference_request" and then polls job.status for up to 120 s, but since no peer ever sends "inference_response", every remote job fails with [Error: Inference timeout]. This also makes the remote / split / multi‑node auto strategies entirely non‑functional end‑to‑end.
Note that DistributedInferenceEngine.handle_remote_request already implements the correct response flow. You need to route incoming "inference_request" messages to that method (likely by injecting the engine into the coordinator or having the engine register its own handler when constructed). The current wiring also drops the source MeshConnection on the floor — the handler signature in MeshConnection._dispatch only receives payload, so you can't reply. Consider extending the handler signature to include the connection, or look up the originating connection by node_id from the payload.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/coordinator.py` around lines 313 - 323, The incoming
"inference_request" handler _handle_inference_request currently does nothing,
causing remote jobs to time out; fix it by routing the payload to the
DistributedInferenceEngine's handle_remote_request so peers can reply: either
inject a DistributedInferenceEngine instance into the coordinator and call
engine.handle_remote_request(payload, conn) from _handle_inference_request, or
let DistributedInferenceEngine register its own handler when constructed; ensure
MeshConnection._dispatch (or the dispatch registration) provides the originating
MeshConnection (or else include the sender node_id in payload and lookup the
MeshConnection by node_id) so handle_remote_request can send back the
"inference_response" to the correct peer.
| try: | ||
| if stream: | ||
| # Stream response | ||
| response_stream = await self.engine.generate( | ||
| prompt=prompt, | ||
| stream=True, | ||
| ) | ||
| async for chunk in response_stream: | ||
| await self.sio.emit("chat_chunk", { | ||
| "chunk": chunk, | ||
| }, room=sid) | ||
| await self.sio.emit("chat_complete", {}, room=sid) | ||
| else: | ||
| # Non-streaming | ||
| response = await self.engine.generate( | ||
| prompt=prompt, | ||
| stream=False, | ||
| ) | ||
| await self.sio.emit("chat_response", { | ||
| "response": response, | ||
| }, room=sid) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Chat error: {e}") | ||
| await self.sio.emit("chat_error", { | ||
| "error": str(e), | ||
| }, room=sid) |
There was a problem hiding this comment.
Streaming chat path will raise TypeError.
await self.engine.generate(..., stream=True) awaits the outer DistributedInferenceEngine.generate coroutine, which in its local branch does return self.local_engine.generate(..., stream=True) without awaiting — so response_stream is a bare coroutine, and async for chunk in response_stream fails. This is the primary consumer of engine.generate(stream=True), so the chat UI is non‑functional until the engine bug is fixed. See the related comment in src/miniforge/mesh/engine.py (lines 77‑96).
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 115-115: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/dashboard.py` around lines 93 - 119, The streaming path
currently returns a bare coroutine from DistributedInferenceEngine.generate
which causes TypeError when the caller (dashboard.chat handler) does "async for"
over the result; fix DistributedInferenceEngine.generate in
src/miniforge/mesh/engine.py so that when stream=True it becomes an async
generator that delegates to the local engine by iterating and yielding chunks
(e.g., "async for chunk in self.local_engine.generate(...): yield chunk")
instead of returning the coroutine; keep the non-streaming path behavior
unchanged so callers like the dashboard (self.engine.generate with stream=True)
receive an actual async iterable.
| def _notify(self, peer: PeerInfo, event: str) -> None: | ||
| """Notify all callbacks of peer event.""" | ||
| for cb in self._callbacks: | ||
| try: | ||
| cb(peer, event) | ||
| except Exception as e: | ||
| logger.warning(f"Discovery callback error: {e}") |
There was a problem hiding this comment.
Async peer callbacks are silently dropped — this breaks the whole mesh.
cb(peer, event) invokes the callback synchronously. MeshCoordinator._on_peer_event (src/miniforge/mesh/coordinator.py:271) is an async def, so calling it here just produces an un‑awaited coroutine; the coordinator never sees added/removed events, never calls _connect_to_peer, and the node table stays empty. The mesh will never form peer connections beyond whatever _discovery_connect_loop does via discovery.peers polling — and even that only helps once a peer is present.
🐛 Proposed fix
def _notify(self, peer: PeerInfo, event: str) -> None:
"""Notify all callbacks of peer event."""
for cb in self._callbacks:
try:
- cb(peer, event)
+ result = cb(peer, event)
+ if asyncio.iscoroutine(result):
+ asyncio.create_task(result)
except Exception as e:
logger.warning(f"Discovery callback error: {e}")(Keep a strong ref to the task if you want to avoid RUF006.) Apply the same treatment in MeshServiceListener._handle_service and in MeshTransport._notify / MeshCoordinator._notify_state_change if any of their callbacks are async.
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 85-85: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/discovery.py` around lines 80 - 86, The discovery._notify
is calling callbacks synchronously which drops async callbacks (like
MeshCoordinator._on_peer_event); change _notify to detect if cb(...) returns a
coroutine (or use asyncio.iscoroutinefunction on cb), and if so schedule it with
asyncio.create_task and keep a strong reference (e.g., add task to a local
self._tasks set and remove it in a done callback) so it runs and exceptions are
logged; keep the existing try/except for sync callbacks. Apply the same pattern
to MeshServiceListener._handle_service, MeshTransport._notify, and
MeshCoordinator._notify_state_change wherever callbacks might be async,
scheduling coroutines as tasks and ensuring exceptions are observed.
| async def _check_host(self, host: str, port: int) -> None: | ||
| """Check if a host:port is a mesh node.""" | ||
| try: | ||
| reader, writer = await asyncio.wait_for( | ||
| asyncio.open_connection(host, port), | ||
| timeout=1.0 | ||
| ) | ||
|
|
||
| # Send handshake probe | ||
| probe = b"MINIFORGE_PROBE\n" | ||
| writer.write(probe) | ||
| await writer.drain() | ||
|
|
||
| # Read response | ||
| response = await asyncio.wait_for(reader.read(1024), timeout=1.0) | ||
| writer.close() | ||
| await writer.wait_closed() | ||
|
|
||
| if b"MINIFORGE_NODE" in response: | ||
| # Parse node info from response | ||
| # Format: MINIFORGE_NODE|node_id|node_name|ram|cpu | ||
| parts = response.decode().strip().split("|") | ||
| if len(parts) >= 5: | ||
| _, node_id, node_name, ram_gb, cpu_cores = parts[:5] | ||
| peer = PeerInfo( | ||
| ip=host, | ||
| port=port, | ||
| node_id=node_id, | ||
| node_name=node_name, | ||
| ram_gb=float(ram_gb), | ||
| cpu_cores=int(cpu_cores), | ||
| last_seen=time.time(), | ||
| ) | ||
| await self._add_or_update_peer(peer) | ||
|
|
||
| except (asyncio.TimeoutError, ConnectionRefusedError, OSError): | ||
| pass # Host not available or not a mesh node | ||
| except Exception as e: | ||
| logger.debug(f"Host check error for {host}:{port}: {e}") |
There was a problem hiding this comment.
IP-scan probe protocol does not match the transport server.
_check_host sends b"MINIFORGE_PROBE\n" and expects a response containing MINIFORGE_NODE. But MeshTransport._handle_incoming (src/miniforge/mesh/transport.py:302-319) reads the first line as a handshake and rejects anything not starting with MINIFORGE_NODE| with MINIFORGE_REJECT|Invalid handshake\n. Result: IP scanning can never identify a peer, so on networks where mDNS is blocked, auto‑discovery silently fails (the PR’s main selling point).
Either define a distinct probe path in the server (answer probes without consuming the connection's handshake slot) or reuse the real handshake here. A quick fix: issue the full MINIFORGE_NODE|... handshake as the probe, parse the server's MINIFORGE_OK|... response, then close.
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 251-251: Do not catch blind exception: Exception
(BLE001)
| # Stream the model | ||
| chunk_count = 0 | ||
|
|
||
| def send_chunk(chunk: bytes) -> None: | ||
| nonlocal chunk_count | ||
| # Fire-and-forget chunk send | ||
| asyncio.create_task(conn.send("model_chunk", { | ||
| "model_id": model_id, | ||
| "chunk": chunk, | ||
| "chunk_index": chunk_count, | ||
| "complete": False, | ||
| })) | ||
| chunk_count += 1 | ||
|
|
||
| try: | ||
| await self.registry.stream_weights(model_id, [], send_chunk) | ||
|
|
||
| # Send completion | ||
| await conn.send("model_chunk", { | ||
| "model_id": model_id, | ||
| "chunk": b"", | ||
| "chunk_index": chunk_count, | ||
| "complete": True, | ||
| }) | ||
|
|
||
| logger.info(f"Streamed {model_id} to {requester} in {chunk_count} chunks") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to stream {model_id}: {e}") | ||
| await conn.send("model_response", { | ||
| "model_id": model_id, | ||
| "error": str(e), | ||
| }) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm CHUNK_SIZE vs the transport receive-side cap.
rg -nP -C2 '\bCHUNK_SIZE\b|10\s*\*\s*1024\s*\*\s*1024' --type=py
# Confirm handle_model_request is (or is not) registered as a transport handler.
rg -nP -C3 '\bmodel_request\b|handle_model_request' --type=pyRepository: Zapdev-labs/miniforge
Length of output: 2101
🏁 Script executed:
# Get the full context around lines 231-263 in host_worker.py
head -n 280 src/miniforge/mesh/host_worker.py | tail -n 100Repository: Zapdev-labs/miniforge
Length of output: 3408
🏁 Script executed:
# Examine the stream_weights implementation in registry.py
sed -n '150,170p' src/miniforge/mesh/registry.pyRepository: Zapdev-labs/miniforge
Length of output: 908
🏁 Script executed:
# Check if stream_weights accepts async callbacks
rg -A 20 'def stream_weights' src/miniforge/mesh/registry.pyRepository: Zapdev-labs/miniforge
Length of output: 761
🏁 Script executed:
# Check if there's a finally block or proper exception handling for the model streaming
rg -B 5 -A 30 'async def handle_model_request' src/miniforge/mesh/host_worker.pyRepository: Zapdev-labs/miniforge
Length of output: 1178
Chunk streaming is broken due to size limit and missing error handling.
Three critical issues:
-
ModelRegistry.CHUNK_SIZEis 16 MiB butMeshConnection._read_loopdrops connections for messages > 10 MiB. The first chunk will be rejected, causing immediate failure. -
send_chunkfiresasyncio.create_task(conn.send(...))without awaiting or keeping references (RUF006). Tasks are scheduled unbounded in a tight file-read loop, causing interleaving and out-of-order delivery. -
When
registry.stream_weightsraises an exception, the code sendsmodel_responsewith an error instead of a terminatingmodel_chunkframe. Receivers wait formodel_chunkevents, so they'll timeout at 300s instead of gracefully closing.
Fix: Make stream_weights accept an async callback and await each send, reduce CHUNK_SIZE below 10 MiB (or raise the transport cap), and ensure all code paths send a proper terminating model_chunk with complete: True (use finally).
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 237-242: Store a reference to the return value of asyncio.create_task
(RUF006)
[warning] 258-258: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/host_worker.py` around lines 231 - 263, The chunk
streaming logic is broken: reduce ModelRegistry.CHUNK_SIZE below the transport
cap (or raise MeshConnection._read_loop limit) so chunks are <=10MiB, change
send_chunk to be async (or make registry.stream_weights accept an async
callback) and await conn.send instead of fire-and-forget asyncio.create_task to
preserve ordering and avoid unbounded tasks, and move the final terminating
model_chunk ({"model_id": model_id, "chunk": b"", "chunk_index": chunk_count,
"complete": True}) into a finally block so that both success and exceptions
always send a complete:true model_chunk (also change the exception path to not
send model_response but rely on the terminating model_chunk for receivers).
| def create_tls_context(self, server_mode: bool = False) -> "ssl.SSLContext": | ||
| """Create SSL context with mTLS.""" | ||
| import ssl | ||
|
|
||
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER if server_mode else ssl.PROTOCOL_TLS_CLIENT) | ||
| context.minimum_version = ssl.TLSVersion.TLSv1_3 | ||
| context.verify_mode = ssl.CERT_REQUIRED | ||
|
|
||
| # Load our certificate and key | ||
| context.load_cert_chain( | ||
| certfile=str(self.certs_dir / "node_cert.pem"), | ||
| keyfile=str(self.certs_dir / "node_key.pem"), | ||
| ) | ||
|
|
||
| # In mesh mode, we trust pinned certificates | ||
| # Load pinned certs as CA certs | ||
| if self._pinned_certs: | ||
| # Create temporary CA bundle | ||
| ca_path = self.certs_dir / "pinned_certs.pem" | ||
| with open(ca_path, "wb") as f: | ||
| for fingerprint in self._pinned_certs: | ||
| # This is simplified - in practice you'd store full certs | ||
| pass | ||
| if ca_path.exists(): | ||
| context.load_verify_locations(cafile=str(ca_path)) | ||
| else: | ||
| # If no pinned certs, disable verification (LAN mode) | ||
| context.verify_mode = ssl.CERT_NONE | ||
|
|
||
| return context |
There was a problem hiding this comment.
Critical: TLS peer verification is effectively disabled; TOFU pinning is a no-op.
Two compounding issues make create_tls_context a security hole that contradicts the PR's advertised mTLS / certificate pinning:
-
Pin storage loses the certificate.
pin_peer_cert(L156-165) stores only the SHA-256 hex fingerprint string inself._pinned_certs; the actual peer certificate PEM is discarded. Whencreate_tls_contexttries to build a trust bundle (L192-200), it iterates those fingerprints with a literalpass, writes an emptypinned_certs.pem, and loads that as the CA bundle. Peer verification will then fail for every genuine peer. -
No-pin path disables verification entirely. When
_pinned_certsis empty — the default state on every fresh startup, since pins are in-memory only and never persisted — the code setscontext.verify_mode = ssl.CERT_NONE(L203). Combined withPROTOCOL_TLS_CLIENT/TLS_SERVERaccepting any cert, this means any process on the LAN can impersonate a mesh node and be trusted. This is the opposite of the PR description's "mutual TLS, trust-on-first-use certificate pinning".
Minimum fixes required before this ships:
- Persist pinned certificates as full PEM (not just fingerprints), e.g.
dict[str, bytes]keyed by fingerprint, serialized topinned_certs.pemon pin. - In the no-pin case, keep
CERT_REQUIREDand plug in a custom verification callback (SSLObject-level) that compares the peer leaf fingerprint against the pin set, adding it on first-sight if TOFU is desired. - Never silently downgrade to
CERT_NONE.
🔧 Sketch of the fix
- self._pinned_certs: set[str] = set()
+ self._pinned_certs: dict[str, bytes] = {} # fingerprint -> PEM
...
- def pin_peer_cert(self, cert_pem: bytes) -> str:
+ def pin_peer_cert(self, cert_pem: bytes) -> str:
try:
cert = x509.load_pem_x509_certificate(cert_pem)
fingerprint = cert.fingerprint(hashes.SHA256()).hex()
- self._pinned_certs.add(fingerprint)
+ self._pinned_certs[fingerprint] = cert_pem
return fingerprint[:16].upper()
...
- if self._pinned_certs:
- ca_path = self.certs_dir / "pinned_certs.pem"
- with open(ca_path, "wb") as f:
- for fingerprint in self._pinned_certs:
- # This is simplified - in practice you'd store full certs
- pass
- if ca_path.exists():
- context.load_verify_locations(cafile=str(ca_path))
- else:
- context.verify_mode = ssl.CERT_NONE
+ ca_path = self.certs_dir / "pinned_certs.pem"
+ with open(ca_path, "wb") as f:
+ for pem in self._pinned_certs.values():
+ f.write(pem)
+ if ca_path.stat().st_size > 0:
+ context.load_verify_locations(cafile=str(ca_path))
+ # Do NOT fall back to CERT_NONE. If no pins yet, rely on a pinning
+ # verify callback (or refuse connection until the operator approves).Also: with the 30-day cert rotation (CERT_VALIDITY_DAYS = 30), the node's fingerprint changes every rotation cycle, invalidating pins held by peers — worth thinking through the rotation/pin-refresh protocol before this lands.
cryptography library: recommended way to implement TLS certificate pinning / TOFU with Python ssl module (custom verify callback, SSLContext options)
🧰 Tools
🪛 Ruff (0.15.10)
[error] 176-176: Undefined name ssl
(F821)
[warning] 196-196: Loop control variable fingerprint not used within loop body
Rename unused fingerprint to _fingerprint
(B007)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/security.py` around lines 176 - 205, The
create_tls_context function currently writes empty pinned_certs.pem and falls
back to context.verify_mode = ssl.CERT_NONE, so TLS peer verification is
effectively disabled; fix by changing pin_peer_cert to store full peer cert PEMs
(e.g. self._pinned_certs: dict[fingerprint->bytes]) and persist them to
pinned_certs.pem when pinning, then in create_tls_context build the CA bundle by
writing those PEM bytes into pinned_certs.pem and load it via
context.load_verify_locations; do NOT set context.verify_mode = ssl.CERT_NONE —
leave context.verify_mode = ssl.CERT_REQUIRED and instead implement a
post-handshake verification/TOFU check (using an SSLContext callback or
post-accept/getpeercert fingerprint check) that compares the peer leaf
fingerprint against self._pinned_certs and adds the PEM on first-seen if TOFU is
desired; update references to pinned_certs.pem, self._pinned_certs,
pin_peer_cert, and create_tls_context accordingly and ensure pinned PEMs are
serialized to disk so pins survive restarts.
| async def _read_loop(self) -> None: | ||
| """Read and dispatch messages.""" | ||
| while self._running: | ||
| try: | ||
| # Read length prefix | ||
| length_data = await self.reader.readexactly(4) | ||
| length = struct.unpack(">I", length_data)[0] | ||
|
|
||
| if length > 10 * 1024 * 1024: # Max 10MB | ||
| logger.warning(f"Message too large ({length} bytes), dropping connection") | ||
| break | ||
|
|
||
| # Read message data | ||
| data = await self.reader.readexactly(length) | ||
| msg_dict = msgpack.unpackb(data, raw=False) | ||
|
|
||
| msg = MeshMessage( | ||
| msg_type=msg_dict["type"], | ||
| payload=msg_dict["payload"], | ||
| timestamp=msg_dict["timestamp"], | ||
| node_id=msg_dict["node_id"], | ||
| ) | ||
|
|
||
| self.last_seen = time.time() | ||
| await self._dispatch(msg) | ||
|
|
||
| except asyncio.IncompleteReadError: | ||
| logger.info(f"Connection closed by {self.node_id}") | ||
| break | ||
| except Exception as e: | ||
| logger.warning(f"Error reading from {self.node_id}: {e}") | ||
| break | ||
|
|
||
| self._running = False | ||
|
|
There was a problem hiding this comment.
10 MiB per-message cap is below registry chunk size.
length > 10 * 1024 * 1024 drops the connection, but ModelRegistry.CHUNK_SIZE is 16 MiB (src/miniforge/mesh/registry.py:42) and every model_chunk message is sent as a single length‑prefixed payload. The first model chunk will trip this branch and kill the worker's connection mid‑download. Either raise the cap (with care — msgpack decode of large blobs is expensive and creates backpressure issues) or reduce CHUNK_SIZE to ~4‑8 MiB plus room for framing/payload keys.
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 122-122: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/transport.py` around lines 93 - 127, The per-message max
size check in _read_loop currently rejects payloads > 10*1024*1024 which
conflicts with ModelRegistry.CHUNK_SIZE (16 MiB) so model_chunk messages will be
dropped; update the logic to allow the registry chunk size (or a safe upper
bound) by either increasing the cap to exceed ModelRegistry.CHUNK_SIZE (e.g., >=
ModelRegistry.CHUNK_SIZE + framing overhead) or else reduce
ModelRegistry.CHUNK_SIZE to a value under the cap (4–8 MiB); locate the size
check in _read_loop and ensure the warning/log text still reflects the new
threshold and that any increased cap has appropriate safeguards against
excessive memory use and msgpack decoding backpressure.
| async def connect_to_peer(self, ip: str, port: int) -> Optional[MeshConnection]: | ||
| """Connect to a discovered peer.""" | ||
| peer_key = f"{ip}:{port}" | ||
|
|
||
| if peer_key in self._connections: | ||
| return self._connections[peer_key] | ||
|
|
||
| try: | ||
| reader, writer = await asyncio.wait_for( | ||
| asyncio.open_connection(ip, port), | ||
| timeout=5.0, | ||
| ) | ||
|
|
||
| # Send handshake | ||
| handshake = f"{self.PROTOCOL_MAGIC.decode()}|{self.node_id}|{self.node_name}|{self.ram_gb}|{self.cpu_cores}\n" | ||
| writer.write(handshake.encode()) | ||
| await writer.drain() | ||
|
|
||
| # Read response | ||
| response = await asyncio.wait_for(reader.readline(), timeout=5.0) | ||
| response_str = response.decode().strip() | ||
|
|
||
| if not response_str.startswith("MINIFORGE_OK|"): | ||
| logger.warning(f"Peer {peer_key} rejected handshake: {response_str}") | ||
| writer.close() | ||
| await writer.wait_closed() | ||
| return None | ||
|
|
||
| # Parse peer info from response | ||
| parts = response_str.split("|") | ||
| peer_id = parts[1] if len(parts) > 1 else "unknown" | ||
|
|
||
| # Create connection | ||
| conn = MeshConnection(reader, writer, self.node_id, self.security) | ||
| conn.on("heartbeat", lambda _: None) # Update last_seen | ||
| await conn.start() | ||
|
|
||
| self._connections[peer_key] = conn | ||
| self._notify(peer_id, conn, "connected") | ||
|
|
||
| logger.info(f"Connected to peer {peer_id} @ {peer_key}") | ||
| return conn | ||
|
|
||
| except Exception as e: | ||
| logger.debug(f"Failed to connect to {peer_key}: {e}") | ||
| return None | ||
|
|
||
| async def _handle_incoming( | ||
| self, | ||
| reader: asyncio.StreamReader, | ||
| writer: asyncio.StreamWriter | ||
| ) -> None: | ||
| """Handle incoming connection.""" | ||
| peer_addr = writer.get_extra_info("peername") | ||
| logger.debug(f"Incoming connection from {peer_addr}") | ||
|
|
||
| try: | ||
| # Read handshake | ||
| handshake = await asyncio.wait_for(reader.readline(), timeout=5.0) | ||
| handshake_str = handshake.decode().strip() | ||
|
|
||
| if not handshake_str.startswith("MINIFORGE_NODE|"): | ||
| writer.write(b"MINIFORGE_REJECT|Invalid handshake\n") | ||
| await writer.drain() | ||
| writer.close() | ||
| await writer.wait_closed() | ||
| return | ||
|
|
||
| # Parse peer info | ||
| parts = handshake_str.split("|") | ||
| if len(parts) < 5: | ||
| writer.write(b"MINIFORGE_REJECT|Incomplete handshake\n") | ||
| await writer.drain() | ||
| writer.close() | ||
| await writer.wait_closed() | ||
| return | ||
|
|
||
| _, peer_id, peer_name, peer_ram, peer_cpu = parts[:5] | ||
| peer_key = f"{peer_addr[0]}:{peer_addr[1]}" | ||
|
|
||
| # Send OK response | ||
| response = f"MINIFORGE_OK|{self.node_id}|{self.node_name}|{self.ram_gb}|{self.cpu_cores}\n" | ||
| writer.write(response.encode()) | ||
| await writer.drain() | ||
|
|
||
| # Create connection | ||
| conn = MeshConnection(reader, writer, self.node_id, self.security) | ||
| await conn.start() | ||
|
|
||
| self._connections[peer_key] = conn | ||
| self._notify(peer_id, conn, "connected") | ||
|
|
||
| logger.info(f"Accepted connection from {peer_name} ({peer_id}) @ {peer_key}") | ||
|
|
||
| # Keep connection alive | ||
| while conn._running: | ||
| await asyncio.sleep(1) | ||
|
|
||
| except asyncio.TimeoutError: | ||
| logger.warning(f"Handshake timeout from {peer_addr}") | ||
| except Exception as e: | ||
| logger.warning(f"Error handling incoming from {peer_addr}: {e}") | ||
| finally: | ||
| if peer_key in self._connections: | ||
| del self._connections[peer_key] |
There was a problem hiding this comment.
Connection identity is inconsistent across inbound/outbound paths.
- Outbound:
peer_key = f"{ip}:{port}"withport = mesh_port(line 246). - Inbound:
peer_key = f"{peer_addr[0]}:{peer_addr[1]}"wherepeer_addr[1]is the ephemeral client port (line 322). So if node A dials node B and B dials A, both keep two different keys for what is actually the same logical peer;connectionswill hold duplicates. MeshConnection.__init__is called withself.node_id(this node's id, lines 277 and 330), soconn.node_idis our id, not the peer's. Consequently,_cleanup_staleat line 382 emits_notify(conn.node_id, conn, "disconnected")with the local node id, andMeshCoordinator._on_connection_eventwill try to mutateself._nodes[<our own id>]— never matching.
Key connections by the peer's node_id (learned from the handshake) instead of ip:port, and store the peer's id on the MeshConnection (e.g. conn.peer_node_id). The callback in connect_to_peer at line 278 (conn.on("heartbeat", lambda _: None)) is also dead code — last_seen is updated inside _read_loop before dispatch, so the lambda has no effect.
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 287-287: Do not catch blind exception: Exception
(BLE001)
[warning] 321-321: Unpacked variable peer_ram is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
[warning] 321-321: Unpacked variable peer_cpu is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
[warning] 344-344: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/transport.py` around lines 244 - 348, Outbound and inbound
flows use inconsistent keys and store the local node_id on MeshConnection,
causing duplicate entries and wrong notifications; fix connect_to_peer and
_handle_incoming to key self._connections by the peer's node_id learned from the
handshake (not ip:port), set a proper attribute on MeshConnection like
peer_node_id (assign peer_id parsed from the response/handshake before calling
MeshConnection(...)), update uses of peer_key/_connections/_notify to use that
peer_node_id as the canonical key, remove the no-op conn.on("heartbeat", lambda
_: None) call in connect_to_peer, and ensure any cleanup logic such as
_cleanup_stale and _notify expect and use conn.peer_node_id (handle the case
where peer_id is missing by aborting the connection and logging).
| async def _handle_incoming( | ||
| self, | ||
| reader: asyncio.StreamReader, | ||
| writer: asyncio.StreamWriter | ||
| ) -> None: | ||
| """Handle incoming connection.""" | ||
| peer_addr = writer.get_extra_info("peername") | ||
| logger.debug(f"Incoming connection from {peer_addr}") | ||
|
|
||
| try: | ||
| # Read handshake | ||
| handshake = await asyncio.wait_for(reader.readline(), timeout=5.0) | ||
| handshake_str = handshake.decode().strip() | ||
|
|
||
| if not handshake_str.startswith("MINIFORGE_NODE|"): | ||
| writer.write(b"MINIFORGE_REJECT|Invalid handshake\n") | ||
| await writer.drain() | ||
| writer.close() | ||
| await writer.wait_closed() | ||
| return | ||
|
|
||
| # Parse peer info | ||
| parts = handshake_str.split("|") | ||
| if len(parts) < 5: | ||
| writer.write(b"MINIFORGE_REJECT|Incomplete handshake\n") | ||
| await writer.drain() | ||
| writer.close() | ||
| await writer.wait_closed() | ||
| return | ||
|
|
||
| _, peer_id, peer_name, peer_ram, peer_cpu = parts[:5] | ||
| peer_key = f"{peer_addr[0]}:{peer_addr[1]}" | ||
|
|
||
| # Send OK response | ||
| response = f"MINIFORGE_OK|{self.node_id}|{self.node_name}|{self.ram_gb}|{self.cpu_cores}\n" | ||
| writer.write(response.encode()) | ||
| await writer.drain() | ||
|
|
||
| # Create connection | ||
| conn = MeshConnection(reader, writer, self.node_id, self.security) | ||
| await conn.start() | ||
|
|
||
| self._connections[peer_key] = conn | ||
| self._notify(peer_id, conn, "connected") | ||
|
|
||
| logger.info(f"Accepted connection from {peer_name} ({peer_id}) @ {peer_key}") | ||
|
|
||
| # Keep connection alive | ||
| while conn._running: | ||
| await asyncio.sleep(1) | ||
|
|
||
| except asyncio.TimeoutError: | ||
| logger.warning(f"Handshake timeout from {peer_addr}") | ||
| except Exception as e: | ||
| logger.warning(f"Error handling incoming from {peer_addr}: {e}") | ||
| finally: | ||
| if peer_key in self._connections: | ||
| del self._connections[peer_key] | ||
|
|
There was a problem hiding this comment.
peer_key can be unbound when finally runs — NameError on any early failure.
peer_key is only assigned at line 322, but the finally block at lines 346‑348 unconditionally does if peer_key in self._connections. Any exception thrown before line 322 (handshake timeout, truncated read, decode() failure, len(parts) < 5 early return) leaves peer_key unbound and raises UnboundLocalError from finally, masking the original error and leaking the writer (never closed on the reject/early-return paths either).
🐛 Proposed fix
async def _handle_incoming(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Handle incoming connection."""
peer_addr = writer.get_extra_info("peername")
+ peer_key: Optional[str] = None
logger.debug(f"Incoming connection from {peer_addr}")
try:
...
except asyncio.TimeoutError:
logger.warning(f"Handshake timeout from {peer_addr}")
except Exception as e:
logger.warning(f"Error handling incoming from {peer_addr}: {e}")
finally:
- if peer_key in self._connections:
+ if peer_key and peer_key in self._connections:
del self._connections[peer_key]
+ if not writer.is_closing():
+ writer.close()Also close the writer in the two early-return paths (lines 306‑310 and 315‑319) to avoid leaks — they currently do close it, good, but the del self._connections[peer_key] path in finally still needs guarding.
🧰 Tools
🪛 Ruff (0.15.10)
[warning] 321-321: Unpacked variable peer_ram is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
[warning] 321-321: Unpacked variable peer_cpu is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
[warning] 344-344: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/transport.py` around lines 291 - 349, In _handle_incoming
initialize peer_key = None before the try to avoid UnboundLocalError, and in the
finally guard removal with "if peer_key and peer_key in self._connections: del
self._connections[peer_key]"; also ensure the writer is always closed on all
early-return/error paths (verify the two reject branches and any exception paths
call writer.close() and await writer.wait_closed()) so connections/writer
handles are not leaked when parsing/handshake fails or exceptions occur before
peer_key is assigned.
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
src/miniforge/mesh/dashboard.py (1)
1-1: Docstring is incorrect — this is aiohttp, not FastAPI.Module docstring says "FastAPI dashboard" but the implementation uses
aiohttp+python-socketio(lines 10-13). Update for accuracy.-"""FastAPI dashboard with OpenWebUI-style interface.""" +"""aiohttp dashboard with OpenWebUI-style interface."""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/dashboard.py` at line 1, The module docstring incorrectly states "FastAPI dashboard"; update the top-level docstring to accurately describe the implementation as an aiohttp-based dashboard using python-socketio (reflecting the use of aiohttp and socketio in this module), e.g., mention "aiohttp + python-socketio OpenWebUI-style interface" so it matches the code that constructs the aiohttp app/socketio handlers.src/miniforge/mesh/static/fallback.html (1)
100-101: Nit: missing labels on peer IP/port inputs.Placeholder text is not an accessible name; screen readers will announce the inputs as unlabeled. Consider adding explicit
<label>elements (oraria-label) forpeer-ipandpeer-port. Same applies to thechat-inputtextarea (lines 77-81). Applies equally totemplates/index.html(lines 108-120, 84-88).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/static/fallback.html` around lines 100 - 101, The peer IP and port inputs (elements with ids "peer-ip" and "peer-port") and the chat textarea ("chat-input") lack accessible labels; placeholders are not sufficient for screen readers, so add explicit labels or aria-label attributes for these elements to provide an accessible name. Update the markup around the input elements (peer-ip, peer-port) and the textarea (chat-input) to include either a <label for="...">... from the visible text or add aria-label="Peer IP"/aria-label="Peer Port"/aria-label="Chat message" attributes to the existing elements; ensure the label text is meaningful and that the for attribute matches the element id, and apply the same change to the equivalent inputs in templates/index.html.src/miniforge/mesh/templates/index.html (1)
34-47: Hardcoded resource placeholders can mislead on first paint.
28 GB,8 cores,1 node,127.0.0.1are baked into the initial markup and will briefly be visible beforeapp.jshydrates, displaying values that almost certainly don't match the actual host. Prefer neutral placeholders (e.g.—/Loading…) matching whatfallback.htmlalready uses, so users aren't shown incorrect numbers if JS is slow/disabled.Proposed fix
- <span id="total-nodes" class="resource-value">1</span> + <span id="total-nodes" class="resource-value">—</span> @@ - <span id="total-ram" class="resource-value">28 GB</span> + <span id="total-ram" class="resource-value">—</span> @@ - <span id="available-ram" class="resource-value">28 GB</span> + <span id="available-ram" class="resource-value">—</span> @@ - <span id="total-cpus" class="resource-value">8</span> + <span id="total-cpus" class="resource-value">—</span> @@ - <tr class="node-row self"> - <td>{{ node_name }} (You)</td> - <td>127.0.0.1</td> - <td>28 GB</td> - <td>8 cores</td> - <td><span class="status-badge active">Active</span></td> - </tr> + <tr class="node-row self"> + <td colspan="5" style="text-align: center; padding: 2rem;"> + Loading mesh data… + </td> + </tr>Also applies to: 62-68
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/miniforge/mesh/templates/index.html` around lines 34 - 47, The template currently hardcodes resource values (e.g. "28 GB", "8", "1") which show before JS hydrates; update the markup for elements with ids total-nodes, total-ram, available-ram, total-cpus (and the similar host/IP placeholders mentioned around lines 62-68) to use neutral placeholders such as "—" or "Loading…" matching fallback.html instead of real numbers, keeping the same element ids and classes so app.js can hydrate them later; ensure no semantic/ARIA changes so hydration logic continues to target total-nodes, total-ram, available-ram, total-cpus (and the host/ip element ids) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/miniforge/mesh/dashboard.py`:
- Line 36: The dashboard currently defaults host: str = "0.0.0.0" and exposes
/api/* and Socket.IO with no auth; change the default host parameter to
"127.0.0.1" (require explicit CLI/flag to allow "0.0.0.0"), and implement an
aiohttp middleware (e.g., auth_middleware) that enforces a minimum auth scheme
(shared token or basic auth or a signed cookie derived from the node Ed25519 key
material) on all /api/* routes (specifically /api/chat and /api/connect) and
integrate the same check into the python-socketio event handlers so socket
events require the token/cookie; ensure the middleware is registered on the
aiohttp app and the socketio connection handler validates auth, and update the
dashboard docs to state the default localhost binding and opt‑in exposure model.
- Around line 318-321: The _on_state_change method currently fires
asyncio.create_task(self._broadcast_status()) and discards the Task (risking
GC/cancel) and may be called off the running loop; fix by adding an
instance-level set self._bg_tasks initialized in __init__ (e.g., self._bg_tasks:
set[asyncio.Task] = set()) and have _on_state_change create the task and store
it (add to set and remove on completion), and ensure task creation is
thread-safe by obtaining the running loop (capture it at start() or call
asyncio.get_running_loop()) and using loop.call_soon_threadsafe to schedule
creation of the Task that runs _broadcast_status, so
_notify_state_change/_on_state_change won’t raise RuntimeError when invoked
off-loop.
- Around line 176-197: The fallback HTML in the exception handler of the
dashboard render uses an unescaped f-string injecting
self.coordinator.node_name, self.coordinator.node_id, self.coordinator.is_leader
and the exception e, introducing XSS; fix by replacing the inline HTML with a
safe static response (preferably return web.FileResponse(...) to serve the
existing static/fallback.html) or, if keeping dynamic values, escape every
interpolated value with html.escape before inserting into the template (apply to
node_name, node_id, is_leader and str(e)) and ensure the exception text is
sanitized/trimmed; update the exception block in the Dashboard render handler in
src/miniforge/mesh/dashboard.py accordingly.
- Around line 266-304: The api_connect handler improperly trusts request JSON
and calls a private coordinator method with a placeholder PeerInfo; validate and
sanitize inputs (ensure "ip" is a valid IP address or CIDR-restricted allowed
host, ensure "port" is an integer in 1-65535) and return 400 on bad input; stop
calling the private method _connect_to_peer — add/use a public
MeshCoordinator.connect_to_peer(peer: PeerInfo) or connect_to_peer_by_addr(ip,
port) and call that from api_connect; and avoid creating a permanent placeholder
node with node_id="unknown" — either defer inserting nodes into
coordinator._nodes until the handshake returns a real node_id or use a temporary
key derived from (ip,port) that does not clobber real node_id entries so manual
connects do not overwrite or pollute aggregated resources.
---
Nitpick comments:
In `@src/miniforge/mesh/dashboard.py`:
- Line 1: The module docstring incorrectly states "FastAPI dashboard"; update
the top-level docstring to accurately describe the implementation as an
aiohttp-based dashboard using python-socketio (reflecting the use of aiohttp and
socketio in this module), e.g., mention "aiohttp + python-socketio
OpenWebUI-style interface" so it matches the code that constructs the aiohttp
app/socketio handlers.
In `@src/miniforge/mesh/static/fallback.html`:
- Around line 100-101: The peer IP and port inputs (elements with ids "peer-ip"
and "peer-port") and the chat textarea ("chat-input") lack accessible labels;
placeholders are not sufficient for screen readers, so add explicit labels or
aria-label attributes for these elements to provide an accessible name. Update
the markup around the input elements (peer-ip, peer-port) and the textarea
(chat-input) to include either a <label for="...">... from the visible text or
add aria-label="Peer IP"/aria-label="Peer Port"/aria-label="Chat message"
attributes to the existing elements; ensure the label text is meaningful and
that the for attribute matches the element id, and apply the same change to the
equivalent inputs in templates/index.html.
In `@src/miniforge/mesh/templates/index.html`:
- Around line 34-47: The template currently hardcodes resource values (e.g. "28
GB", "8", "1") which show before JS hydrates; update the markup for elements
with ids total-nodes, total-ram, available-ram, total-cpus (and the similar
host/IP placeholders mentioned around lines 62-68) to use neutral placeholders
such as "—" or "Loading…" matching fallback.html instead of real numbers,
keeping the same element ids and classes so app.js can hydrate them later;
ensure no semantic/ARIA changes so hydration logic continues to target
total-nodes, total-ram, available-ram, total-cpus (and the host/ip element ids)
unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3c4cb3c5-7ab3-46cb-8ad4-35902f548f05
📒 Files selected for processing (4)
src/miniforge/mesh/dashboard.pysrc/miniforge/mesh/static/fallback.htmlsrc/miniforge/mesh/templates/index.htmlsrc/miniforge/models/registry.py
| self, | ||
| coordinator: MeshCoordinator, | ||
| engine: DistributedInferenceEngine, | ||
| host: str = "0.0.0.0", |
There was a problem hiding this comment.
Default bind to 0.0.0.0 combined with no authentication is a significant exposure.
The dashboard has no auth middleware. Exposing /api/chat (arbitrary inference on this node's compute), /api/connect (arbitrary outbound peer connections — see separate comment), and the chat Socket.IO to the full network by default turns any LAN‑reachable user into an operator of this mesh node.
Recommended:
- Default
hostto127.0.0.1(require explicit--host 0.0.0.0to expose). - Add a minimum auth layer (shared token / basic auth / cookie signed with the node's Ed25519 cert material already in the PR) enforced via an aiohttp middleware on all
/api/*and Socket.IO events. - Document the exposure/opt‑in model in the dashboard section of the PR.
- host: str = "0.0.0.0",
+ host: str = "127.0.0.1",📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| host: str = "0.0.0.0", | |
| host: str = "127.0.0.1", |
🧰 Tools
🪛 Ruff (0.15.11)
[error] 36-36: Possible binding to all interfaces
(S104)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/dashboard.py` at line 36, The dashboard currently defaults
host: str = "0.0.0.0" and exposes /api/* and Socket.IO with no auth; change the
default host parameter to "127.0.0.1" (require explicit CLI/flag to allow
"0.0.0.0"), and implement an aiohttp middleware (e.g., auth_middleware) that
enforces a minimum auth scheme (shared token or basic auth or a signed cookie
derived from the node Ed25519 key material) on all /api/* routes (specifically
/api/chat and /api/connect) and integrate the same check into the
python-socketio event handlers so socket events require the token/cookie; ensure
the middleware is registered on the aiohttp app and the socketio connection
handler validates auth, and update the dashboard docs to state the default
localhost binding and opt‑in exposure model.
| except Exception as e: | ||
| logger.error(f"Template render error: {e}") | ||
| # Return simple HTML as fallback | ||
| return web.Response( | ||
| text=f""" | ||
| <!DOCTYPE html> | ||
| <html> | ||
| <head> | ||
| <title>Miniforge Mesh - {self.coordinator.node_name}</title> | ||
| <link rel="stylesheet" href="/static/style.css"> | ||
| </head> | ||
| <body> | ||
| <h1>Miniforge Mesh - {self.coordinator.node_name}</h1> | ||
| <p>Node ID: {self.coordinator.node_id}</p> | ||
| <p>Leader: {self.coordinator.is_leader}</p> | ||
| <p style="color: red;">Template error: {e}</p> | ||
| <p><a href="/static/fallback.html">Try static fallback</a></p> | ||
| </body> | ||
| </html> | ||
| """, | ||
| content_type="text/html" | ||
| ) |
There was a problem hiding this comment.
Critical: XSS in the template‑error fallback via unescaped f‑string.
node_name, node_id, is_leader, and especially the exception string {e} are interpolated directly into the HTML body. Jinja2 exception messages routinely include snippets of the offending template/context, and the fallback is reached precisely when an attacker‑influenced rendering input caused the failure — so {e} is the most dangerous of the four. Any </<script> in these values executes in the browser. Escape before writing, or (better) render a static HTML page and pass values as data attributes / separate fields updated by JS.
Proposed fix
+ import html as _html
except Exception as e:
logger.error(f"Template render error: {e}")
- # Return simple HTML as fallback
+ # Return simple HTML as fallback (escape all interpolations)
+ node_name = _html.escape(str(self.coordinator.node_name))
+ node_id = _html.escape(str(self.coordinator.node_id))
+ is_leader = _html.escape(str(self.coordinator.is_leader))
+ err = _html.escape(str(e))
return web.Response(
text=f"""
<!DOCTYPE html>
<html>
<head>
- <title>Miniforge Mesh - {self.coordinator.node_name}</title>
+ <title>Miniforge Mesh - {node_name}</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
- <h1>Miniforge Mesh - {self.coordinator.node_name}</h1>
- <p>Node ID: {self.coordinator.node_id}</p>
- <p>Leader: {self.coordinator.is_leader}</p>
- <p style="color: red;">Template error: {e}</p>
+ <h1>Miniforge Mesh - {node_name}</h1>
+ <p>Node ID: {node_id}</p>
+ <p>Leader: {is_leader}</p>
+ <p style="color: red;">Template error: {err}</p>
<p><a href="/static/fallback.html">Try static fallback</a></p>
</body>
</html>
""",
content_type="text/html"
)A cleaner long‑term alternative is web.FileResponse(static_dir / "fallback.html") and drop the inline HTML entirely — the static fallback already renders correctly without operator values.
🧰 Tools
🪛 Ruff (0.15.11)
[warning] 176-176: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/dashboard.py` around lines 176 - 197, The fallback HTML in
the exception handler of the dashboard render uses an unescaped f-string
injecting self.coordinator.node_name, self.coordinator.node_id,
self.coordinator.is_leader and the exception e, introducing XSS; fix by
replacing the inline HTML with a safe static response (preferably return
web.FileResponse(...) to serve the existing static/fallback.html) or, if keeping
dynamic values, escape every interpolated value with html.escape before
inserting into the template (apply to node_name, node_id, is_leader and str(e))
and ensure the exception text is sanitized/trimmed; update the exception block
in the Dashboard render handler in src/miniforge/mesh/dashboard.py accordingly.
| async def api_connect(self, request: web.Request) -> web.Response: | ||
| """Manually connect to a peer.""" | ||
| try: | ||
| data = await request.json() | ||
| ip = data.get("ip") | ||
| port = data.get("port", 9999) | ||
|
|
||
| if not ip: | ||
| return web.json_response( | ||
| {"error": "IP address required"}, | ||
| status=400, | ||
| ) | ||
|
|
||
| # Try to connect | ||
| from miniforge.mesh.discovery import PeerInfo | ||
| peer = PeerInfo( | ||
| ip=ip, | ||
| port=port, | ||
| node_id="unknown", | ||
| node_name="manual", | ||
| ram_gb=0, | ||
| cpu_cores=0, | ||
| last_seen=0, | ||
| ) | ||
|
|
||
| # This will trigger connection through discovery callback | ||
| await self.coordinator._connect_to_peer(peer) | ||
|
|
||
| return web.json_response({ | ||
| "status": "connecting", | ||
| "endpoint": f"{ip}:{port}", | ||
| }) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"API connect error: {e}") | ||
| return web.json_response( | ||
| {"error": str(e)}, | ||
| status=500, | ||
| ) |
There was a problem hiding this comment.
api_connect has three related defects: input validation, private‑method access, and placeholder PeerInfo.
- No input validation.
ipis used verbatim (could be any string, including hostnames, link‑local, internal addresses);portis taken directly from JSON with no type/range check. Combined with unauthenticated network exposure, this is an SSRF primitive against the host network (anyone can ask the daemon to open a TLS session to anyip:port). - Private API access.
self.coordinator._connect_to_peer(...)reaches across the module boundary. Promote a publicconnect_to_peer(peer: PeerInfo)onMeshCoordinator(or acceptip/portthere) and call it here. - Placeholder
PeerInfo.node_id="unknown",ram_gb=0,cpu_cores=0,last_seen=0get stored inself._nodes["unknown"](seecoordinator.py:_connect_to_peer), so every subsequent manual connect overwrites the same slot, and aggregated resources /all_nodeswill include a zero‑resource "unknown" node until the real handshake refreshes it (if it ever does). At minimum, either (a) don't populate_nodesuntil the handshake returns real identity, or (b) key by(ip, port)for manual peers until the realnode_idis known.
Suggested direction
- try:
- data = await request.json()
- ip = data.get("ip")
- port = data.get("port", 9999)
-
- if not ip:
- return web.json_response(
- {"error": "IP address required"},
- status=400,
- )
-
- # Try to connect
- from miniforge.mesh.discovery import PeerInfo
- peer = PeerInfo(
- ip=ip,
- port=port,
- node_id="unknown",
- node_name="manual",
- ram_gb=0,
- cpu_cores=0,
- last_seen=0,
- )
-
- # This will trigger connection through discovery callback
- await self.coordinator._connect_to_peer(peer)
+ try:
+ import ipaddress
+ data = await request.json()
+ ip = data.get("ip")
+ port = data.get("port", 9999)
+
+ if not isinstance(ip, str) or not ip:
+ return web.json_response({"error": "IP address required"}, status=400)
+ try:
+ ipaddress.ip_address(ip)
+ except ValueError:
+ return web.json_response({"error": "Invalid IP"}, status=400)
+ if not isinstance(port, int) or not (1 <= port <= 65535):
+ return web.json_response({"error": "Invalid port"}, status=400)
+
+ # Prefer a public coordinator API that performs the handshake
+ # before recording NodeState.
+ await self.coordinator.connect_to_peer(ip, port)🧰 Tools
🪛 Ruff (0.15.11)
[warning] 299-299: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/dashboard.py` around lines 266 - 304, The api_connect
handler improperly trusts request JSON and calls a private coordinator method
with a placeholder PeerInfo; validate and sanitize inputs (ensure "ip" is a
valid IP address or CIDR-restricted allowed host, ensure "port" is an integer in
1-65535) and return 400 on bad input; stop calling the private method
_connect_to_peer — add/use a public MeshCoordinator.connect_to_peer(peer:
PeerInfo) or connect_to_peer_by_addr(ip, port) and call that from api_connect;
and avoid creating a permanent placeholder node with node_id="unknown" — either
defer inserting nodes into coordinator._nodes until the handshake returns a real
node_id or use a temporary key derived from (ip,port) that does not clobber real
node_id entries so manual connects do not overwrite or pollute aggregated
resources.
| def _on_state_change(self) -> None: | ||
| """Broadcast state change to all clients.""" | ||
| # Schedule in event loop | ||
| asyncio.create_task(self._broadcast_status()) |
There was a problem hiding this comment.
Hold a reference to the broadcast task, and harden against being called off‑loop.
Two issues here:
- RUF006:
asyncio.create_task(...)whose result is discarded can be garbage‑collected mid‑flight, cancelling the broadcast. Keep a reference on the instance. _notify_state_changein the coordinator is synchronous and could theoretically be invoked from a non‑loop thread during startup/teardown;asyncio.create_taskwould then raiseRuntimeErrorwhich the coordinator swallows and logs as a callback warning. Usingasyncio.get_running_loop()(or capturing the loop atstart()) withloop.call_soon_threadsafeis safer.
Proposed fix
- def _on_state_change(self) -> None:
- """Broadcast state change to all clients."""
- # Schedule in event loop
- asyncio.create_task(self._broadcast_status())
+ def _on_state_change(self) -> None:
+ """Broadcast state change to all clients."""
+ try:
+ loop = asyncio.get_running_loop()
+ except RuntimeError:
+ return # no loop yet (e.g. during startup)
+ task = loop.create_task(self._broadcast_status())
+ self._bg_tasks.add(task)
+ task.add_done_callback(self._bg_tasks.discard)And initialize self._bg_tasks: set[asyncio.Task] = set() in __init__.
🧰 Tools
🪛 Ruff (0.15.11)
[warning] 321-321: Store a reference to the return value of asyncio.create_task
(RUF006)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/miniforge/mesh/dashboard.py` around lines 318 - 321, The _on_state_change
method currently fires asyncio.create_task(self._broadcast_status()) and
discards the Task (risking GC/cancel) and may be called off the running loop;
fix by adding an instance-level set self._bg_tasks initialized in __init__
(e.g., self._bg_tasks: set[asyncio.Task] = set()) and have _on_state_change
create the task and store it (add to set and remove on completion), and ensure
task creation is thread-safe by obtaining the running loop (capture it at
start() or call asyncio.get_running_loop()) and using loop.call_soon_threadsafe
to schedule creation of the Task that runs _broadcast_status, so
_notify_state_change/_on_state_change won’t raise RuntimeError when invoked
off-loop.
There was a problem hiding this comment.
28 issues found across 17 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/miniforge/mesh/static/app.js">
<violation number="1" location="src/miniforge/mesh/static/app.js:77">
P1: Unescaped peer data is rendered with `innerHTML` in the nodes table, creating an XSS risk.</violation>
</file>
<file name="src/miniforge/mesh/discovery.py">
<violation number="1" location="src/miniforge/mesh/discovery.py:142">
P1: `ServiceInfo(addresses=...)` expects packed network-byte-order addresses (4 bytes via `socket.inet_aton`), but `.encode()` on the IP string produces the UTF-8 representation (e.g. `b'192.168.1.5'`, 11 bytes). This will either crash or register a corrupted address with mDNS. Use `parsed_addresses` for string IPs, or `socket.inet_aton()` for packed bytes.</violation>
<violation number="2" location="src/miniforge/mesh/discovery.py:195">
P2: The coroutines created by `self._check_host(host_str, port)` and appended to `tasks` are never awaited — they are immediately orphaned. The code then creates *new* coroutines via `check_with_limit` in the `asyncio.gather` below. This produces `RuntimeWarning: coroutine '_check_host' was never awaited` for every scanned host. Replace the coroutine instantiation with a simple sentinel (e.g., append a tuple) or remove the first loop entirely and use a host list for the emptiness check.</violation>
<violation number="3" location="src/miniforge/mesh/discovery.py:337">
P1: `asyncio.create_task()` is called from `_handle_service`, but this method is invoked by `ServiceBrowser` from its internal background thread — not from the asyncio event loop thread. This will raise `RuntimeError: no running event loop`. Use `asyncio.run_coroutine_threadsafe(coro, loop)` with a stored event loop reference, or switch from `ServiceBrowser` to `AsyncServiceBrowser`.</violation>
</file>
<file name="src/miniforge/mesh/templates/index.html">
<violation number="1" location="src/miniforge/mesh/templates/index.html:8">
P2: Avoid a hard dependency on the external Socket.IO CDN here; if it is unavailable, the dashboard JavaScript fails before any real-time features can load.</violation>
</file>
<file name="src/miniforge/mesh/coordinator.py">
<violation number="1" location="src/miniforge/mesh/coordinator.py:354">
P1: If the stale-node cleanup in `_state_sync_loop` removes the current leader from `_nodes`, the election loop will never clear `_leader_id` because `self._nodes.get(self._leader_id)` returns `None` and the timeout check is skipped entirely. The mesh gets stuck with a phantom leader and no re-election.</violation>
<violation number="2" location="src/miniforge/mesh/coordinator.py:379">
P1: Leader election compares incompatible timestamps: `_started_at` for self vs. `last_heartbeat` for remote nodes. Since heartbeats update `last_heartbeat` continuously, a healthy remote node appears "younger" over time and a stale/dead node appears "oldest" — the election will prefer dead nodes over live ones. Remote nodes need to advertise their actual start time (e.g., via the heartbeat payload) for the comparison to be meaningful.</violation>
</file>
<file name="src/miniforge/mesh/registry.py">
<violation number="1" location="src/miniforge/mesh/registry.py:151">
P2: `layer_indices` is ignored, so the method silently streams the entire model even when specific layers are requested.</violation>
<violation number="2" location="src/miniforge/mesh/registry.py:159">
P1: Blocking file reads inside this async method can stall the event loop during large model streaming. Use non-blocking/offloaded I/O for chunk reads.</violation>
<violation number="3" location="src/miniforge/mesh/registry.py:198">
P1: Synchronous file writes in this async download loop block the event loop; offload writes to avoid degrading mesh responsiveness.</violation>
</file>
<file name="src/miniforge/mesh/host_worker.py">
<violation number="1" location="src/miniforge/mesh/host_worker.py:98">
P1: Missing `await` on the stream path. `local_engine.generate()` is an async method, so calling it without `await` returns a coroutine object instead of the `AsyncIterator[str]` the caller expects. The non-stream path on line 103 correctly uses `await`.</violation>
</file>
<file name="src/miniforge/mesh/static/fallback.html">
<violation number="1" location="src/miniforge/mesh/static/fallback.html:15">
P3: The fallback header never updates `#node-name`, so it will stay stuck on "Loading...".</violation>
</file>
<file name="src/miniforge/mesh/cli.py">
<violation number="1" location="src/miniforge/mesh/cli.py:60">
P2: `Optional` is referenced without being imported, which breaks strict type checking.</violation>
<violation number="2" location="src/miniforge/mesh/cli.py:119">
P1: `--host-ip`/`--ip` are accepted but never used to connect, so manual peer/host connection does not work despite CLI help text promising it.</violation>
</file>
<file name="src/miniforge/mesh/security.py">
<violation number="1" location="src/miniforge/mesh/security.py:142">
P0: Ed25519 keys do not accept a hash algorithm parameter. Passing `hashes.SHA256()` will raise `ValueError` at runtime, making certificate generation always fail. According to the `cryptography` library docs, the `algorithm` argument must be `None` for Ed25519 keys.</violation>
<violation number="2" location="src/miniforge/mesh/security.py:194">
P0: The pinned-certs CA bundle is never actually written (the loop body is `pass`), so the file is always empty. With `verify_mode = ssl.CERT_REQUIRED`, every mTLS handshake will fail because no CA is trusted. The `else` branch falls back to `CERT_NONE`, disabling verification entirely. Neither code path produces a working mTLS context.
The `_pinned_certs` set only stores fingerprint strings, not the full PEM certificates, so even fixing the loop wouldn't help — the full peer certificate PEM needs to be stored (e.g., in `pin_peer_cert`) to later write a valid CA bundle.</violation>
</file>
<file name="src/miniforge/mesh/dashboard.py">
<violation number="1" location="src/miniforge/mesh/dashboard.py:36">
P1: Binding the dashboard to `0.0.0.0` by default with no authentication middleware exposes `/api/chat` (arbitrary inference), `/api/connect` (outbound peer connections / SSRF), and the Socket.IO endpoint to the entire LAN. Default to `127.0.0.1` and require explicit opt-in (e.g. `--host 0.0.0.0`) to expose externally.</violation>
<violation number="2" location="src/miniforge/mesh/dashboard.py:46">
P1: Socket.IO is configured with `cors_allowed_origins="*"`, allowing any origin to connect to the dashboard event channel.</violation>
<violation number="3" location="src/miniforge/mesh/dashboard.py:191">
P2: The fallback HTML renders raw exception text to clients, leaking internal error details.</violation>
<violation number="4" location="src/miniforge/mesh/dashboard.py:191">
P1: XSS vulnerability in the template-error fallback: `{e}`, `{self.coordinator.node_name}`, `{self.coordinator.node_id}`, and `{self.coordinator.is_leader}` are interpolated directly into HTML via an f-string without escaping. The exception string `{e}` is especially dangerous since it is reached precisely when template rendering fails — which could be triggered by attacker-controlled input. Escape all values with `html.escape()` before interpolation, or serve the existing static `fallback.html` instead.</violation>
<violation number="5" location="src/miniforge/mesh/dashboard.py:270">
P1: SSRF: `ip` from the JSON body is used verbatim with no validation — it could be a hostname, an internal IP, or any arbitrary string. Combined with the unauthenticated endpoint, any LAN user can make the daemon open a TLS connection to arbitrary `ip:port` targets. Validate that `ip` is a proper IP address (e.g., via `ipaddress.ip_address(ip)`) and that `port` is an integer in `1..65535`.
Additionally, `PeerInfo` is created with `node_id="unknown"` and zero resources — every manual connect overwrites the same `"unknown"` slot in `coordinator._nodes`, so only the last manual connection is tracked.</violation>
<violation number="6" location="src/miniforge/mesh/dashboard.py:284">
P2: Manual peer connections all use `node_id="unknown"`, causing node-state collisions and overwrites for multiple manual peers.</violation>
<violation number="7" location="src/miniforge/mesh/dashboard.py:321">
P2: The `asyncio.create_task()` result is discarded, so the task only has a weak reference from the event loop and may be garbage-collected before the broadcast completes (RUF006). Additionally, `_on_state_change` is a synchronous callback that may be invoked when no event loop is running (e.g. during startup/teardown), causing `asyncio.create_task` to raise `RuntimeError`. Store a reference in a `set` on the instance and guard with `asyncio.get_running_loop()`.</violation>
</file>
<file name="src/miniforge/mesh/transport.py">
<violation number="1" location="src/miniforge/mesh/transport.py:212">
P0: The transport claims mTLS but neither the server (`asyncio.start_server`) nor the client (`asyncio.open_connection`) passes an SSL context. `MeshSecurity.create_tls_context()` exists but is never called. All mesh traffic—including inference payloads—is sent as plaintext TCP.
Pass `ssl=self.security.create_tls_context(server_mode=True)` to `start_server` and `ssl=self.security.create_tls_context(server_mode=False)` to `open_connection`.</violation>
<violation number="2" location="src/miniforge/mesh/transport.py:248">
P2: When an incoming connection ends (the `while conn._running` loop exits), the `finally` block removes the entry from `_connections` but never calls `self._notify(peer_id, conn, "disconnected")`. Peer-disconnect callbacks will not fire for non-stale disconnects, so the rest of the mesh won't be notified.</violation>
<violation number="3" location="src/miniforge/mesh/transport.py:297">
P1: `peer_key` is referenced in the `finally` block but is not defined on several code paths (timeout, invalid handshake, incomplete handshake). This will raise `NameError`, crash the handler, and leak the socket.
Initialize `peer_key` before the `try` block.</violation>
</file>
<file name="src/miniforge/mesh/engine.py">
<violation number="1" location="src/miniforge/mesh/engine.py:80">
P1: Missing `await` on `self.local_engine.generate(stream=True)`. Since `generate` is an `async def`, the caller receives a bare coroutine instead of the `AsyncIterator[str]`. Streaming via the local strategy will fail at runtime.</violation>
<violation number="2" location="src/miniforge/mesh/engine.py:241">
P1: `_get_connection_to_node` ignores `node_id` and returns the first connection it finds. In a multi-node mesh, inference requests will be sent to an arbitrary peer rather than the coordinator's chosen target, silently breaking routing.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| ]), | ||
| critical=False, | ||
| ) | ||
| .sign(key, hashes.SHA256()) |
There was a problem hiding this comment.
P0: Ed25519 keys do not accept a hash algorithm parameter. Passing hashes.SHA256() will raise ValueError at runtime, making certificate generation always fail. According to the cryptography library docs, the algorithm argument must be None for Ed25519 keys.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/security.py, line 142:
<comment>Ed25519 keys do not accept a hash algorithm parameter. Passing `hashes.SHA256()` will raise `ValueError` at runtime, making certificate generation always fail. According to the `cryptography` library docs, the `algorithm` argument must be `None` for Ed25519 keys.</comment>
<file context>
@@ -0,0 +1,205 @@
+ ]),
+ critical=False,
+ )
+ .sign(key, hashes.SHA256())
+ )
+ return cert
</file context>
| .sign(key, hashes.SHA256()) | |
| .sign(key, None) |
| # Load pinned certs as CA certs | ||
| if self._pinned_certs: | ||
| # Create temporary CA bundle | ||
| ca_path = self.certs_dir / "pinned_certs.pem" |
There was a problem hiding this comment.
P0: The pinned-certs CA bundle is never actually written (the loop body is pass), so the file is always empty. With verify_mode = ssl.CERT_REQUIRED, every mTLS handshake will fail because no CA is trusted. The else branch falls back to CERT_NONE, disabling verification entirely. Neither code path produces a working mTLS context.
The _pinned_certs set only stores fingerprint strings, not the full PEM certificates, so even fixing the loop wouldn't help — the full peer certificate PEM needs to be stored (e.g., in pin_peer_cert) to later write a valid CA bundle.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/security.py, line 194:
<comment>The pinned-certs CA bundle is never actually written (the loop body is `pass`), so the file is always empty. With `verify_mode = ssl.CERT_REQUIRED`, every mTLS handshake will fail because no CA is trusted. The `else` branch falls back to `CERT_NONE`, disabling verification entirely. Neither code path produces a working mTLS context.
The `_pinned_certs` set only stores fingerprint strings, not the full PEM certificates, so even fixing the loop wouldn't help — the full peer certificate PEM needs to be stored (e.g., in `pin_peer_cert`) to later write a valid CA bundle.</comment>
<file context>
@@ -0,0 +1,205 @@
+ # Load pinned certs as CA certs
+ if self._pinned_certs:
+ # Create temporary CA bundle
+ ca_path = self.certs_dir / "pinned_certs.pem"
+ with open(ca_path, "wb") as f:
+ for fingerprint in self._pinned_certs:
</file context>
| self._running = True | ||
|
|
||
| # Start server | ||
| self._server = await asyncio.start_server( |
There was a problem hiding this comment.
P0: The transport claims mTLS but neither the server (asyncio.start_server) nor the client (asyncio.open_connection) passes an SSL context. MeshSecurity.create_tls_context() exists but is never called. All mesh traffic—including inference payloads—is sent as plaintext TCP.
Pass ssl=self.security.create_tls_context(server_mode=True) to start_server and ssl=self.security.create_tls_context(server_mode=False) to open_connection.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/transport.py, line 212:
<comment>The transport claims mTLS but neither the server (`asyncio.start_server`) nor the client (`asyncio.open_connection`) passes an SSL context. `MeshSecurity.create_tls_context()` exists but is never called. All mesh traffic—including inference payloads—is sent as plaintext TCP.
Pass `ssl=self.security.create_tls_context(server_mode=True)` to `start_server` and `ssl=self.security.create_tls_context(server_mode=False)` to `open_connection`.</comment>
<file context>
@@ -0,0 +1,384 @@
+ self._running = True
+
+ # Start server
+ self._server = await asyncio.start_server(
+ self._handle_incoming,
+ host="0.0.0.0",
</file context>
| if (node.status === 'busy') statusClass = 'busy'; | ||
| if (node.status === 'offline') statusClass = 'offline'; | ||
|
|
||
| row.innerHTML = ` |
There was a problem hiding this comment.
P1: Unescaped peer data is rendered with innerHTML in the nodes table, creating an XSS risk.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/static/app.js, line 77:
<comment>Unescaped peer data is rendered with `innerHTML` in the nodes table, creating an XSS risk.</comment>
<file context>
@@ -0,0 +1,255 @@
+ if (node.status === 'busy') statusClass = 'busy';
+ if (node.status === 'offline') statusClass = 'offline';
+
+ row.innerHTML = `
+ <td>${name}${isLeader}</td>
+ <td>${node.ip}:${node.port}</td>
</file context>
| last_seen=time.time(), | ||
| ) | ||
|
|
||
| asyncio.create_task(self.discovery._add_or_update_peer(peer)) |
There was a problem hiding this comment.
P1: asyncio.create_task() is called from _handle_service, but this method is invoked by ServiceBrowser from its internal background thread — not from the asyncio event loop thread. This will raise RuntimeError: no running event loop. Use asyncio.run_coroutine_threadsafe(coro, loop) with a stored event loop reference, or switch from ServiceBrowser to AsyncServiceBrowser.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/discovery.py, line 337:
<comment>`asyncio.create_task()` is called from `_handle_service`, but this method is invoked by `ServiceBrowser` from its internal background thread — not from the asyncio event loop thread. This will raise `RuntimeError: no running event loop`. Use `asyncio.run_coroutine_threadsafe(coro, loop)` with a stored event loop reference, or switch from `ServiceBrowser` to `AsyncServiceBrowser`.</comment>
<file context>
@@ -0,0 +1,339 @@
+ last_seen=time.time(),
+ )
+
+ asyncio.create_task(self.discovery._add_or_update_peer(peer))
+ except Exception as e:
+ logger.warning(f"Error handling mDNS service: {e}")
</file context>
| <h1>Miniforge Mesh - {self.coordinator.node_name}</h1> | ||
| <p>Node ID: {self.coordinator.node_id}</p> | ||
| <p>Leader: {self.coordinator.is_leader}</p> | ||
| <p style="color: red;">Template error: {e}</p> |
There was a problem hiding this comment.
P2: The fallback HTML renders raw exception text to clients, leaking internal error details.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/dashboard.py, line 191:
<comment>The fallback HTML renders raw exception text to clients, leaking internal error details.</comment>
<file context>
@@ -0,0 +1,329 @@
+ <h1>Miniforge Mesh - {self.coordinator.node_name}</h1>
+ <p>Node ID: {self.coordinator.node_id}</p>
+ <p>Leader: {self.coordinator.is_leader}</p>
+ <p style="color: red;">Template error: {e}</p>
+ <p><a href="/static/fallback.html">Try static fallback</a></p>
+</body>
</file context>
| <p style="color: red;">Template error: {e}</p> | |
| <p style="color: red;">Template rendering failed.</p> |
| peer = PeerInfo( | ||
| ip=ip, | ||
| port=port, | ||
| node_id="unknown", |
There was a problem hiding this comment.
P2: Manual peer connections all use node_id="unknown", causing node-state collisions and overwrites for multiple manual peers.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/dashboard.py, line 284:
<comment>Manual peer connections all use `node_id="unknown"`, causing node-state collisions and overwrites for multiple manual peers.</comment>
<file context>
@@ -0,0 +1,329 @@
+ peer = PeerInfo(
+ ip=ip,
+ port=port,
+ node_id="unknown",
+ node_name="manual",
+ ram_gb=0,
</file context>
| node_id="unknown", | |
| node_id=f"manual-{ip}:{port}", |
| if peer_key in self._connections: | ||
| return self._connections[peer_key] |
There was a problem hiding this comment.
P2: When an incoming connection ends (the while conn._running loop exits), the finally block removes the entry from _connections but never calls self._notify(peer_id, conn, "disconnected"). Peer-disconnect callbacks will not fire for non-stale disconnects, so the rest of the mesh won't be notified.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/transport.py, line 248:
<comment>When an incoming connection ends (the `while conn._running` loop exits), the `finally` block removes the entry from `_connections` but never calls `self._notify(peer_id, conn, "disconnected")`. Peer-disconnect callbacks will not fire for non-stale disconnects, so the rest of the mesh won't be notified.</comment>
<file context>
@@ -0,0 +1,384 @@
+ """Connect to a discovered peer."""
+ peer_key = f"{ip}:{port}"
+
+ if peer_key in self._connections:
+ return self._connections[peer_key]
+
</file context>
| if peer_key in self._connections: | |
| return self._connections[peer_key] | |
| if peer_key in self._connections: | |
| conn = self._connections.pop(peer_key) | |
| self._notify(peer_id, conn, "disconnected") |
| def _on_state_change(self) -> None: | ||
| """Broadcast state change to all clients.""" | ||
| # Schedule in event loop | ||
| asyncio.create_task(self._broadcast_status()) |
There was a problem hiding this comment.
P2: The asyncio.create_task() result is discarded, so the task only has a weak reference from the event loop and may be garbage-collected before the broadcast completes (RUF006). Additionally, _on_state_change is a synchronous callback that may be invoked when no event loop is running (e.g. during startup/teardown), causing asyncio.create_task to raise RuntimeError. Store a reference in a set on the instance and guard with asyncio.get_running_loop().
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/dashboard.py, line 321:
<comment>The `asyncio.create_task()` result is discarded, so the task only has a weak reference from the event loop and may be garbage-collected before the broadcast completes (RUF006). Additionally, `_on_state_change` is a synchronous callback that may be invoked when no event loop is running (e.g. during startup/teardown), causing `asyncio.create_task` to raise `RuntimeError`. Store a reference in a `set` on the instance and guard with `asyncio.get_running_loop()`.</comment>
<file context>
@@ -0,0 +1,329 @@
+ def _on_state_change(self) -> None:
+ """Broadcast state change to all clients."""
+ # Schedule in event loop
+ asyncio.create_task(self._broadcast_status())
+
+ async def _broadcast_status(self) -> None:
</file context>
| <header class="header"> | ||
| <div class="header-left"> | ||
| <h1>🔥 Miniforge Mesh</h1> | ||
| <span class="node-badge" id="node-name">Loading...</span> |
There was a problem hiding this comment.
P3: The fallback header never updates #node-name, so it will stay stuck on "Loading...".
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/miniforge/mesh/static/fallback.html, line 15:
<comment>The fallback header never updates `#node-name`, so it will stay stuck on "Loading...".</comment>
<file context>
@@ -0,0 +1,109 @@
+ <header class="header">
+ <div class="header-left">
+ <h1>🔥 Miniforge Mesh</h1>
+ <span class="node-badge" id="node-name">Loading...</span>
+ </div>
+ <div class="header-right">
</file context>
Summary
This PR adds a complete distributed inference mesh system that allows multiple miniforge devices to auto-discover each other and share compute resources.
Features
🔍 Auto-Discovery
🔐 Security
🧠 Inference Strategies
🌐 Web Dashboard
🖥️ CLI
Architecture
New Dependencies
Added to under :
Files Added
Usage Example
Device A (192.168.1.100):
Device B (192.168.1.101):
Combined Resources:
Testing
Checklist
Related
Based on deep research into:
Summary by CodeRabbit
Release Notes
mesh upCLI command to initialize and manage mesh nodes with automatic peer discovery