feat: concurrent multi-tool daemon support#23
feat: concurrent multi-tool daemon support#23don-petry wants to merge 2 commits intoJoaolfelicio:mainfrom
Conversation
Adds Dependabot security-only config, auto-merge workflow, and dependency audit CI.
Support running multiple provider pipelines (gemini, claude, copilot) concurrently in a single daemon instance using asyncio.gather(). - Add --tools flag (comma-separated) with --tool kept as backward-compat alias - Refactor run_daemon into run_daemon_multi with concurrent task pipelines - Add SharedDeduplication for cross-provider rule deduplication - Add create_provider/create_evaluator factory functions - Update Dashboard to show per-tool status rows - Shared MCP bridge client across all providers Closes Joaolfelicio#12 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds concurrent multi-provider daemon execution so a single Context-Scribe process can monitor multiple tools (Gemini/Claude/Copilot) simultaneously, with a shared dashboard and cross-provider deduplication.
Changes:
- Introduces
--tools(comma-separated) alongside backward-compatible--tool, plus parsing/validation helpers and provider/evaluator factories. - Refactors daemon execution into per-tool async pipelines and updates the
Dashboardto show per-tool status/history. - Adds shared rule deduplication and expands test coverage; introduces dependency audit + Dependabot automation workflows.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
context_scribe/main.py |
Multi-tool CLI/daemon support, shared dashboard updates, shared dedup, and pipeline refactor. |
tests/test_multi_tool.py |
New tests for --tools, shared dedup, and pipeline duplicate-skipping behavior. |
tests/test_main.py |
Updates dashboard tests + adds tests for parsing and factory helpers. |
tests/test_dashboard.py |
Updates/extends dashboard tests for multi-tool status/history. |
tests/test_daemon.py |
Updates daemon tests to use factories + adds multi-tool daemon test. |
tests/test_daemon_errors.py |
Updates error-path tests and adds CLI validation tests for --tools. |
tests/test_copilot_daemon.py |
Updates to patch new factories/bootstrap dispatcher. |
tests/test_claude_daemon.py |
Updates to patch new factories/bootstrap dispatcher. |
.github/workflows/dependency-audit.yml |
Adds automated dependency vulnerability auditing across detected ecosystems. |
.github/workflows/dependabot-automerge.yml |
Adds auto-approval/auto-merge for eligible Dependabot PRs. |
.github/dependabot.yml |
Configures Dependabot update policy for pip and GitHub Actions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if rule_output: | ||
| # Cross-provider deduplication | ||
| if dedup.is_duplicate(rule_output): | ||
| dashboard.set_tool_status(tool, "🔍 Watching log stream... (skipped duplicate)") | ||
| continue | ||
|
|
||
| dest_proj = "global" if rule_output.scope == "GLOBAL" else interaction.project_name | ||
| dest_file = "global_rules.md" if rule_output.scope == "GLOBAL" else "rules.md" | ||
| dest_path = f"{dest_proj}/{dest_file}" | ||
|
|
||
| # Deduplicate content lines | ||
| lines = rule_output.content.splitlines() | ||
| seen_lines: Set[str] = set() | ||
| unique_lines: List[str] = [] | ||
| for line in lines: | ||
| stripped = line.strip() | ||
| if stripped.startswith("-") and stripped in seen_lines: | ||
| continue | ||
| unique_lines.append(line) | ||
| if stripped.startswith("-"): | ||
| seen_lines.add(stripped) | ||
|
|
||
| deduped_content = "\n".join(unique_lines).strip() | ||
|
|
||
| dashboard.set_tool_status(tool, f"📝 Committing: {dest_path}") | ||
| live.update(dashboard.generate_layout()) | ||
| await mcp_client.save_rule(deduped_content, dest_proj, dest_file) | ||
|
|
||
| dedup.mark_committed(rule_output) |
There was a problem hiding this comment.
SharedDeduplication is used in a check-then-act pattern (is_duplicate() then save_rule() then mark_committed()). With multiple pipelines, two tools can pass is_duplicate() before either calls mark_committed(), causing the same rule to be saved twice. Consider adding an atomic API (e.g., mark_if_new / reserve) under the lock and using it before committing (and ideally releasing/rolling back on save failure).
| """Thread-safe shared deduplication across multiple provider pipelines. | ||
|
|
||
| Tracks rule descriptions that have already been committed so that | ||
| the same rule extracted by one provider is not written again when | ||
| another provider encounters the same user interaction. | ||
| """ |
There was a problem hiding this comment.
SharedDeduplication’s docstring says it tracks “rule descriptions”, but the key is built from scope + content. Either update the docstring to match the actual behavior or change the key to reflect what you intend to deduplicate.
| # Deduplicate content lines | ||
| lines = rule_output.content.splitlines() | ||
| seen_lines: Set[str] = set() | ||
| unique_lines: List[str] = [] | ||
| for line in lines: | ||
| stripped = line.strip() | ||
| if stripped.startswith("-") and stripped in seen_lines: | ||
| continue | ||
| unique_lines.append(line) | ||
| if stripped.startswith("-"): | ||
| seen_lines.add(stripped) | ||
|
|
||
| deduped_content = "\n".join(unique_lines).strip() | ||
|
|
||
| dashboard.set_tool_status(tool, f"📝 Committing: {dest_path}") | ||
| live.update(dashboard.generate_layout()) | ||
| await mcp_client.save_rule(deduped_content, dest_proj, dest_file) | ||
|
|
||
| dedup.mark_committed(rule_output) | ||
| dashboard.add_history(dest_path, rule_output.description, tool=tool) |
There was a problem hiding this comment.
Deduplication is keyed on rule_output.content.strip(), but the content actually saved is deduped_content after line-level filtering. If deduped_content differs (or if whitespace differs across providers), subsequent duplicates may not be detected. Consider keying/marking based on the exact committed content (and scope), or normalizing content consistently before both the duplicate check and mark_committed().
| while True: | ||
| live.update(dashboard.generate_layout()) | ||
| interaction = await loop.run_in_executor(None, next, watch_iter) | ||
| if interaction is None: | ||
| continue |
There was a problem hiding this comment.
_run_tool_pipeline calls next(watch_iter) in an executor without handling StopIteration. If a provider’s watch() generator ends (or throws), the task will error out and may bring down the whole daemon via asyncio.gather(). Consider catching StopIteration (and other provider errors) and either restarting watch() or exiting the pipeline cleanly with a status update.
|
|
||
| Designed to be used as a concurrent coroutine alongside other tools. | ||
| """ | ||
| loop = asyncio.get_event_loop() |
There was a problem hiding this comment.
Inside an async function, prefer asyncio.get_running_loop() over asyncio.get_event_loop(). In newer Python versions, get_event_loop() has different semantics and can raise if no loop is set; using get_running_loop() is the robust option here.
| loop = asyncio.get_event_loop() | |
| loop = asyncio.get_running_loop() |
| # Shared MCP bridge client | ||
| mcp_client = MemoryBankClient(bank_path=bank_path) | ||
|
|
||
| try: | ||
| await mcp_client.connect() | ||
| except Exception: | ||
| console.print("[bold red]Fatal Error: Could not connect to the Memory Bank MCP server.[/bold red]") | ||
| os._exit(1) | ||
|
|
There was a problem hiding this comment.
run_daemon_multi shares a single MemoryBankClient across concurrent tool pipelines, but MemoryBankClient has no internal synchronization around session.call_tool() (stdio transport). Concurrent read_rules/save_rule calls can interleave and corrupt requests/responses or cause undefined behavior. Consider adding an asyncio.Lock inside MemoryBankClient (or in run_daemon_multi) to serialize MCP calls, or create one client per tool if the MCP server/client supports parallel sessions.
| # Launch a concurrent pipeline for each tool | ||
| tasks = [] | ||
| for tool in tools: | ||
| task = asyncio.create_task( | ||
| _run_tool_pipeline( | ||
| tool=tool, | ||
| provider=providers[tool], | ||
| evaluator=evaluators[tool], | ||
| mcp_client=mcp_client, | ||
| dashboard=db, | ||
| dedup=dedup, | ||
| live=live, | ||
| ) | ||
| ) | ||
| tasks.append(task) | ||
|
|
||
| # Wait until interrupted | ||
| await asyncio.gather(*tasks) | ||
| except (KeyboardInterrupt, asyncio.CancelledError): | ||
| for t in tasks: | ||
| t.cancel() | ||
| db.status = "🛑 Stopping..." | ||
| live.update(db.generate_layout()) | ||
| finally: | ||
| await mcp_client.close() |
There was a problem hiding this comment.
await asyncio.gather(*tasks) will propagate the first exception from any pipeline and skip the KeyboardInterrupt/CancelledError handler, potentially leaving other pipelines running while the MCP client is closed in finally. Consider using TaskGroup (Py3.11+) or gather(return_exceptions=True) + explicit cancellation/handling so a single-tool failure doesn’t put the daemon into an inconsistent state.
| def worker(): | ||
| if not dedup.is_duplicate(rule): | ||
| dedup.mark_committed(rule) | ||
| results.append("committed") | ||
| else: | ||
| results.append("skipped") | ||
|
|
||
| threads = [threading.Thread(target=worker) for _ in range(10)] | ||
| for t in threads: | ||
| t.start() | ||
| for t in threads: | ||
| t.join() | ||
|
|
||
| # Exactly one thread should have committed | ||
| assert results.count("committed") >= 1 | ||
| assert dedup.is_duplicate(rule) is True |
There was a problem hiding this comment.
The thread-safety test’s comment says “Exactly one thread should have committed”, but the assertion only checks >= 1. Also, the worker does is_duplicate() and mark_committed() as two separate calls, so multiple threads can legitimately commit due to the race between the calls. If the intended guarantee is “commit once”, consider adding/using an atomic API (e.g., mark_if_new()) and assert results.count("committed") == 1.
|
Closing in favor of PR #22 which implements the same feature cleanly on upstream/main. PR #22 already addresses the key concerns raised here (error handling, bounded queue, watcher lifecycle). The SharedDeduplication concept from this PR is valuable and could be added as a follow-up enhancement to PR #22 after merge. |
Summary
--tools gemini,claude,copilotflag to run multiple providers concurrently in a single daemon instance usingasyncio.gather()SharedDeduplicationclass for cross-provider rule deduplication so the same rule extracted by one provider is not re-committed by anothercreate_provider()/create_evaluator()factory functions andbootstrap_tool()dispatcherDashboardto show per-tool status rows and tool labels in history--toolas backward-compatible single-tool alias; default remainsgeminiTest plan
parse_toolsvalidates comma-separated input, rejects invalid tools, deduplicatesSharedDeduplicationthread-safety test with 10 concurrent threadsrun_daemon_multitest verifies two tools run concurrently with shared MCP clientsave_ruleis skipped for already-committed rules--tools,--tool, default, and mutual-exclusion errorrun_daemon(tool, bank_path)still works (delegates torun_daemon_multi)Closes #12
🤖 Generated with Claude Code