diff --git a/.amplifier/digital-twin-universe/profiles/memory-bundle-e2e.yaml b/.amplifier/digital-twin-universe/profiles/memory-bundle-e2e.yaml index 9c3175e..c1e81c7 100644 --- a/.amplifier/digital-twin-universe/profiles/memory-bundle-e2e.yaml +++ b/.amplifier/digital-twin-universe/profiles/memory-bundle-e2e.yaml @@ -70,11 +70,12 @@ provision: echo 'export PATH="/root/.local/bin:$PATH"' >> /root/.bashrc export PATH="/root/.local/bin:$PATH" - # 4. Install mempalace + pytest (system-wide via pip with break-system-packages) + # 4. Install mempalace + pytest + pytest-asyncio (system-wide via pip with break-system-packages) # --ignore-installed skips reinstalling packages already present (e.g. rich # installed by Debian's python3-rich package which has no RECORD file). # pytest is required to run tests/integration/ inside the DTU. - - pip install --break-system-packages --ignore-installed mempalace pytest + # pytest-asyncio is required for async test support in test_event_wiring.py. + - pip install --break-system-packages --ignore-installed mempalace pytest pytest-asyncio # 5. Create the palace directory — mempalace mine will populate it. # mempalace init is a project-level entity-detection setup command; @@ -108,6 +109,20 @@ provision: export PATH="/root/.local/bin:$PATH" uv tool install -vv git+https://github.com/microsoft/amplifier + # 12.5. Pin amplifier-core to 1.4.1 and reinstall amplifier-app-cli from latest git + # amplifier-core >= 1.4.1 is required: register_contributor / collect_contributions + # and the on_session_ready lifecycle work correctly only from this version. + # The meta-package uv install above may resolve an older PyPI release; this guarantees + # the correct versions are present before the bundle is added. + - | + export PATH="/root/.local/bin:$PATH" + VENV_PY=/root/.local/share/uv/tools/amplifier/bin/python + uv pip install --python "$VENV_PY" \ + --reinstall-package amplifier-core \ + --reinstall-package amplifier-app-cli \ + 'amplifier-core==1.4.1' \ + git+https://github.com/microsoft/amplifier-app-cli@main + # 13. Write API keys to keys.env (passthrough env vars; chmod 600 for security) - | mkdir -p /root/.amplifier @@ -119,19 +134,107 @@ provision: export PATH="/root/.local/bin:$PATH" amplifier bundle add --app "git+https://github.com/michaeljabbour/amplifier-bundle-memory@main#subdirectory=behaviors/mempalace.yaml" - # 15. Validate that both tools are installed and reachable + # 15. Install hooks-logging fork with on_session_ready + collect_contributions support + # + # Why this fork: stock hooks-logging predates amplifier-core 1.4.1's on_session_ready + # lifecycle. Without it, collect_contributions("observability.events") is never called, + # so no handlers register for memory-mempalace:* events and events.jsonl stays empty. + # + # Why delete the cache dir (not just install via pip): + # Amplifier prepends ~/.amplifier/cache/-/ to sys.path BEFORE site-packages. + # Even after `uv pip install --force-reinstall`, the old cache copy (without on_session_ready) + # is imported first and placed in sys.modules — the fork in site-packages is never reached. + # Deleting the cache dir lets Python fall through to site-packages on every session start. + # No timing dependency, no primer session needed. (Investigated in session 9201aaf7.) + - | + set -euo pipefail + export PATH="/root/.local/bin:$PATH" + VENV_PY=/root/.local/share/uv/tools/amplifier/bin/python + uv pip install --python "$VENV_PY" --force-reinstall \ + git+https://github.com/colombod/amplifier-module-hooks-logging@feat/on-session-ready + # Delete any pre-existing cache that would shadow site-packages + rm -rf /root/.amplifier/cache/amplifier-module-hooks-logging-* + echo "[hooks-logging] fork installed and shadowing cache cleared" + # Verify the fork is reachable and exposes on_session_ready + "$VENV_PY" -c " + import amplifier_module_hooks_logging as m + assert hasattr(m, 'on_session_ready'), f'FAIL: on_session_ready missing at {m.__file__}' + print(f'[hooks-logging] OK: on_session_ready present ({m.__file__})') + " + + # 16. Validate that both tools are installed and reachable - | export PATH="/root/.local/bin:$PATH" amplifier --version mempalace --version || true + # 17. Post-provision smoke test — fail fast if coordinator events are not flowing. + # Runs a real Amplifier session and verifies that memory-mempalace:* events appear + # in events.jsonl. Surfaces broken wiring before the developer discovers it manually. + - | + set -euo pipefail + export PATH="/root/.local/bin:$PATH" + VENV_PY=/root/.local/share/uv/tools/amplifier/bin/python + + echo "=== DTU smoke test ===" + + # 1. on_session_ready accessible + "$VENV_PY" -c " + import amplifier_module_hooks_logging as m + assert hasattr(m, 'on_session_ready'), f'FAIL: on_session_ready missing at {m.__file__}' + print('PASS hooks-logging has on_session_ready') + " + + # 2. No shadowing cache dir + SHADOW=$(find /root/.amplifier/cache -maxdepth 1 -type d \ + -name 'amplifier-module-hooks-logging-*' 2>/dev/null | wc -l) + [ "$SHADOW" -eq 0 ] \ + && echo "PASS no shadowing hooks-logging cache" \ + || echo "WARN $SHADOW shadowing cache dir(s) present — coordinator events may not flow" + + # 3. Session produces coordinator events + cd /workspace + amplifier run 'hello' >/tmp/smoke.log 2>&1 || true + LATEST=$(ls -t /root/.amplifier/projects/*/sessions/*/events.jsonl 2>/dev/null | head -1) + if [ -n "$LATEST" ]; then + COUNT=$(grep -c '"memory-mempalace:' "$LATEST" 2>/dev/null || echo 0) + [ "$COUNT" -gt 0 ] \ + && echo "PASS coordinator events flowing ($COUNT memory-mempalace:* events)" \ + || { echo "FAIL zero coordinator events in $LATEST"; cat /tmp/smoke.log; exit 1; } + else + echo "WARN no events.jsonl found — run a session manually to verify" + fi + + echo "=== smoke test passed ===" + update: refresh_pypi: true cmds: - # Clear Amplifier module cache so re-add fetches the latest bundle from Gitea. - # The palace is intentionally NOT reset here — accumulated memories are preserved - # across updates. Use reset-palace manually to restore the seed state. - - rm -rf /root/.amplifier/cache/ + # Targeted cache invalidation — ONLY the memory bundle modules and hooks-logging. + # NEVER wipe the whole cache (rm -rf /root/.amplifier/cache/) — that removes + # provider-anthropic, loop-streaming, and all foundation modules, breaking every + # session until Amplifier is fully reinstalled (5-10 min). + # The palace is intentionally NOT reset — accumulated memories are preserved. + # Use reset-palace manually to restore the seed state. + - | + rm -rf /root/.amplifier/cache/amplifier-module-hooks-mempalace-* + rm -rf /root/.amplifier/cache/amplifier-module-tool-mempalace-* + rm -rf /root/.amplifier/cache/amplifier-module-hooks-project-context-* + rm -rf /root/.amplifier/cache/amplifier-module-hooks-logging-* + echo "[update] cleared memory bundle + hooks-logging caches" - | export PATH="/root/.local/bin:$PATH" amplifier bundle add --app "git+https://github.com/michaeljabbour/amplifier-bundle-memory@main#subdirectory=behaviors/mempalace.yaml" + # Re-apply hooks-logging fork (same approach as provision step 15) + - | + set -euo pipefail + export PATH="/root/.local/bin:$PATH" + VENV_PY=/root/.local/share/uv/tools/amplifier/bin/python + uv pip install --python "$VENV_PY" --force-reinstall \ + git+https://github.com/colombod/amplifier-module-hooks-logging@feat/on-session-ready + rm -rf /root/.amplifier/cache/amplifier-module-hooks-logging-* + "$VENV_PY" -c " + import amplifier_module_hooks_logging as m + assert hasattr(m, 'on_session_ready'), f'FAIL: on_session_ready missing at {m.__file__}' + print(f'[hooks-logging] OK: on_session_ready present') + " diff --git a/agents/archivist.md b/agents/archivist.md index eaf6bb7..1812851 100644 --- a/agents/archivist.md +++ b/agents/archivist.md @@ -1,4 +1,11 @@ --- +name: archivist +description: | + The Archivist is the read path for the memory system. It answers memory + queries, navigates the palace graph, surfaces context from the knowledge + graph and agent diaries, and reads project-context coordination files. + Note: session:start briefings are handled by hooks-mempalace-briefing; + the Archivist is invoked on-demand for deeper retrieval. agent: name: archivist namespace: mempalace diff --git a/agents/curator.md b/agents/curator.md index 82b6051..7496e41 100644 --- a/agents/curator.md +++ b/agents/curator.md @@ -1,4 +1,11 @@ --- +name: curator +description: | + The Curator is the write path for the memory system. It processes raw + hook captures, curates them into well-categorized palace drawers, updates + the knowledge graph, maintains palace hygiene, and updates the + project-context coordination files (HANDOFF.md, PROVENANCE.md, GLOSSARY.md, + WAYSOFWORKING.md) at session end. agent: name: curator namespace: mempalace diff --git a/agents/docent.md b/agents/docent.md index c50f682..8eb290f 100644 --- a/agents/docent.md +++ b/agents/docent.md @@ -1,4 +1,11 @@ --- +name: docent +description: | + Conversational memory assistant. Answers natural-language questions + about what's stored in the palace, what happened in prior sessions, + which decisions were made, and how the project's understanding has + evolved. Synthesizes responses from palace search, knowledge graph, + agent diaries, session event log, and coordination files. agent: name: docent namespace: mempalace diff --git a/bundle.dot b/bundle.dot new file mode 100644 index 0000000..706da23 --- /dev/null +++ b/bundle.dot @@ -0,0 +1,97 @@ +// This repository packages a "memory palace" assistant that captures, curates, and recalls knowledge across sessions. +digraph memory { + rankdir=LR + fontname="Helvetica" + fontsize=12 + label="Memory Palace Bundle\nA shared workspace for capturing, curating, and recalling knowledge\nv1.2.0" + labelloc=t + labeljust=c + nodesep=0.6 + ranksep=0.7 + bgcolor="white" + source_hash="a0b2b4ee554828d5e237da750bcb9e3020519a046becba3a1cf0378fd59ce450" + + node [fontname="Helvetica", fontsize=11, style="filled,rounded"] + edge [fontname="Helvetica", fontsize=9] + + root_memory [label="Memory Palace (this bundle)\nv1.2.0\n0 tools · 0 agents\n~1530 tok aggregate", shape=box, fillcolor="#80cbc4", style="filled,rounded,bold", penwidth=2] + + subgraph cluster_behaviors { + label="Behaviors — bundled capabilities the assistant can use" + style="filled" + fillcolor="#f9f9f9" + color="#999999" + + beh_mempalace_behavior [label="Memory Palace Behavior\nThe core remember/recall capability\n7 tools · 2 ctx\n~3418 tok", shape=box, fillcolor="#e0f2f1", style="filled,rounded"] + } + + subgraph cluster_agents { + label="Agents — specialist helpers, each with a role" + style="filled" + fillcolor="#f9f9f9" + color="#999999" + + agt_archivist [label="Archivist\nSaves new memories\n~0 tok desc", shape=box, fillcolor="#c8e6c9", style="filled,rounded"] + agt_curator [label="Curator\nOrganizes and prunes memories\n~0 tok desc", shape=box, fillcolor="#c8e6c9", style="filled,rounded"] + agt_docent [label="Docent\nGuides recall and retrieval\n~0 tok desc", shape=box, fillcolor="#c8e6c9", style="filled,rounded"] + } + + subgraph cluster_modules { + label="Modules — building blocks the behavior plugs together" + style="filled" + fillcolor="#f9f9f9" + color="#999999" + + mod_hooks_mempalace_briefing [label="Briefing Hook\nShares relevant memories\nat the start of a session\n(hooks-mempalace-briefing)", shape=box, fillcolor="#bbdefb", style="filled,rounded"] + mod_hooks_mempalace_capture [label="Capture Hook\nNotices things worth\nremembering as you work\n(hooks-mempalace-capture)", shape=box, fillcolor="#bbdefb", style="filled,rounded"] + mod_hooks_mempalace_interject [label="Interject Hook\nSpeaks up when a memory\nis useful right now\n(hooks-mempalace-interject)", shape=box, fillcolor="#bbdefb", style="filled,rounded"] + mod_hooks_project_context [label="Project Context Hook\nKeeps project-specific\nbackground in view\n(hooks-project-context)", shape=box, fillcolor="#bbdefb", style="filled,rounded"] + mod_tool_mempalace [label="Memory Palace Tool\nThe action that reads &\nwrites memories\n(tool-mempalace)", shape=box, fillcolor="#bbdefb", style="filled,rounded"] + } + + subgraph cluster_context { + label="Context Files — written guidance loaded into the assistant" + style="filled" + fillcolor="#f9f9f9" + color="#999999" + + ctx_instructions_md [label="How-to-Use Instructions\n(instructions.md)\n~1335 tok", shape=box, fillcolor="#e1bee7", style="filled,rounded"] + ctx_project_context_guide_md [label="Project Context Guide\n(project-context-guide.md)\n~759 tok", shape=box, fillcolor="#e1bee7", style="filled,rounded"] + } + + subgraph cluster_legend { + label="Legend — what each color means" + style="filled" + fillcolor="white" + color="#cccccc" + fontsize=9 + + leg_root [label="this bundle (root)", shape=box, fillcolor="#80cbc4", style="filled,rounded,bold", fontsize=9] + leg_behavior [label="behavior (capability)", shape=box, fillcolor="#e0f2f1", style="filled,rounded", fontsize=9] + leg_agent [label="agent (specialist helper)", shape=box, fillcolor="#c8e6c9", style="filled,rounded", fontsize=9] + leg_module [label="module (building block)", shape=box, fillcolor="#bbdefb", style="filled,rounded", fontsize=9] + leg_provider [label="provider (AI service)", shape=box, fillcolor="#e0e0e0", style="filled,rounded", fontsize=9] + leg_context [label="context (written guidance)", shape=box, fillcolor="#e1bee7", style="filled,rounded", fontsize=9] + leg_standalone [label="standalone piece", shape=box, fillcolor="#80cbc4", style="filled,rounded", fontsize=9] + leg_experiment [label="experiment", shape=box, fillcolor="#e1bee7", style="filled,rounded", fontsize=9] + leg_ext_cost [label="external dependency\n(adds hidden cost)", shape=box, fillcolor="#80cbc4", style="dashed", color="red", penwidth=2, fontsize=9] + leg_ext_muted [label="external dependency\n(no extra cost)", shape=box, fillcolor="#f5f5f5", style="dashed", fontsize=9] + } + + disclaimer [label="Token estimates: ~4 chars/token\nSolid border = local (counted)\nDashed + red = external, hidden cost (not counted)\nDashed + muted = external, no cost\nExcludes: sub-session costs, runtime-dynamic", shape=note, fillcolor="#eceff1", style="filled", fontsize=9] + + ext_githttps___github_com_microsoft_amplifier_foundation_main [label="Amplifier Foundation\nShared base brought in from GitHub\n(external, cost)", shape=box, fillcolor="#80cbc4", style="dashed", color="red", penwidth=2] + + root_memory -> ext_githttps___github_com_microsoft_amplifier_foundation_main [style=dashed] + root_memory -> beh_mempalace_behavior [label="composes"] + beh_mempalace_behavior -> agt_archivist [label="owns"] + beh_mempalace_behavior -> agt_curator [label="owns"] + beh_mempalace_behavior -> mod_tool_mempalace [label="uses", penwidth=0.8] + beh_mempalace_behavior -> mod_hooks_mempalace_capture [label="uses", penwidth=0.8] + beh_mempalace_behavior -> mod_hooks_mempalace_briefing [label="uses", penwidth=0.8] + beh_mempalace_behavior -> mod_hooks_project_context [label="uses", penwidth=0.8] + beh_mempalace_behavior -> mod_hooks_mempalace_interject [label="uses", penwidth=0.8] + beh_mempalace_behavior -> ctx_instructions_md [style=dotted, color=purple] + beh_mempalace_behavior -> ctx_project_context_guide_md [style=dotted, color=purple] + root_memory -> ctx_instructions_md [style=dotted, color=purple] +} \ No newline at end of file diff --git a/docs/development/dtu.md b/docs/development/dtu.md index 0d8470f..5f77a8a 100644 --- a/docs/development/dtu.md +++ b/docs/development/dtu.md @@ -42,17 +42,25 @@ pull request or shipping a release. ## Prerequisites -You need five things before you can launch the DTU: - -1. **Incus** — the container runtime used by `amplifier-digital-twin`. -2. **`amplifier-digital-twin` CLI** — install with `uv`: +You need six things before you can launch the DTU: + +1. **Incus** — the container runtime used by `amplifier-digital-twin`. On + macOS this requires Colima as a Linux host VM; see the + [`installing-incus` guide](https://github.com/microsoft/amplifier-bundle-digital-twin-universe/blob/main/docs/installing-incus.md) + for platform-specific steps. +2. **`amplifier-digital-twin` CLI** — install with `uv` (the package is not + on PyPI, so install from the bundle repo): + ```bash + uv tool install git+https://github.com/microsoft/amplifier-bundle-digital-twin-universe@main + ``` +3. **`amplifier-gitea` CLI** — install with `uv`: ```bash - uv tool install amplifier-digital-twin + uv tool install git+https://github.com/microsoft/amplifier-bundle-gitea@main ``` -3. **A running Gitea instance with the bundle mirrored.** See the one-time setup - section below. -4. **`ANTHROPIC_API_KEY`** — an Anthropic API key starting with `sk-ant`. -5. **`OPENAI_API_KEY`** — an OpenAI API key starting with `sk-`. +4. **A running Gitea instance with the bundle mirrored.** See the one-time + setup section below — `amplifier-gitea` provisions one for you. +5. **`ANTHROPIC_API_KEY`** — an Anthropic API key starting with `sk-ant`. +6. **`OPENAI_API_KEY`** — an OpenAI API key starting with `sk-`. ### Verify your environment @@ -67,11 +75,15 @@ incus --version amplifier-digital-twin --version # expected: a version string -# 3. Anthropic key is set (first 6 chars should be sk-ant) +# 3. amplifier-gitea CLI is available +amplifier-gitea --version +# expected: a version string + +# 4. Anthropic key is set (first 6 chars should be sk-ant) echo $ANTHROPIC_API_KEY | head -c 6 # expected: sk-ant -# 4. OpenAI key is set (first 3 chars should be sk-) +# 5. OpenAI key is set (first 3 chars should be sk-) echo $OPENAI_API_KEY | head -c 3 # expected: sk- ``` @@ -86,66 +98,80 @@ the host shell. The DTU profile rewrites GitHub URLs to a local Gitea mirror so that `amplifier bundle add` installs your local version of the bundle, not the -upstream one on GitHub. You need to create this mirror once. +upstream one on GitHub. The `amplifier-gitea` CLI provisions the Gitea +instance and mirrors the repo for you. -### 1. Get your Gitea base URL and token +### 1. Create the Gitea environment ```bash -GITEA_URL=$(amplifier-gitea url ) -GITEA_TOKEN=$(amplifier-gitea token | jq -r .token) +GITEA_JSON=$(amplifier-gitea create --port 10110 --name dtu-memory-gitea) +GITEA_ID=$(echo "$GITEA_JSON" | jq -r .id) +GITEA_URL=$(echo "$GITEA_JSON" | jq -r .gitea_url) +GITEA_TOKEN=$(echo "$GITEA_JSON" | jq -r .token) +echo "ID: $GITEA_ID" echo "URL: $GITEA_URL" echo "Token: ${GITEA_TOKEN:0:8}..." ``` -Replace `` with the identifier printed when you provisioned your -Gitea instance. +`amplifier-gitea create` returns a JSON object with the Gitea ID, the host +base URL (`http://localhost:10110`), the admin token, and admin credentials +(`admin` / `admin1234`). Capture all three values for use below. -### 2. Create the mirror repository +### 2. Mirror the bundle repo ```bash -curl -s -X POST "${GITEA_URL}/api/v1/repos/migrate" \ - -H "Content-Type: application/json" \ - -H "Authorization: token ${GITEA_TOKEN}" \ - -d '{ - "clone_addr": "https://github.com/michaeljabbour/amplifier-bundle-memory", - "repo_name": "amplifier-bundle-memory", - "uid": 1, - "mirror": true, - "private": false, - "description": "Mirror of amplifier-bundle-memory for DTU use" - }' | jq .full_name -# expected output: "admin/amplifier-bundle-memory" +amplifier-gitea mirror-from-github "$GITEA_ID" \ + --github-repo https://github.com/michaeljabbour/amplifier-bundle-memory ``` -The `uid: 1` is the admin user. Adjust if your admin has a different UID -(`GET /api/v1/users/admin` to check). +This populates `admin/amplifier-bundle-memory` inside the Gitea instance. +By default only the git history and branches are mirrored — pass +`--include-issues`, `--include-prs`, etc. if you also need metadata. ### Working from a fork -If you are developing on a personal fork rather than the upstream repo, change -`clone_addr` to your fork URL. The DTU url-rewrite rule matches -`github.com/michaeljabbour/amplifier-bundle-memory`; update it in the profile -YAML if your fork is at a different path. +If you are developing on a personal fork rather than the upstream repo, +change the `--github-repo` URL to your fork. The DTU url-rewrite rule +matches `github.com/michaeljabbour/amplifier-bundle-memory`; update the +`url_rewrites.rules[0].match` field in the profile YAML if your fork is at +a different path. + +### Generating a fresh token later + +If you need a new token without recreating the environment: + +```bash +GITEA_TOKEN=$(amplifier-gitea token "$GITEA_ID" | jq -r .token) +``` --- ## Launch the DTU Run the following from the **bundle root** (the directory that contains -`amplifier-bundle-memory/`): +this `docs/` folder and `.amplifier/digital-twin-universe/profiles/`): ```bash -DTU_ID=$(amplifier-digital-twin launch memory-bundle-e2e \ - --var GITEA_URL="${GITEA_URL}" \ - --var GITEA_TOKEN="${GITEA_TOKEN}" \ - | tail -n1) +DTU_ID=memory-bundle-e2e -echo "DTU environment ID: ${DTU_ID}" +amplifier-digital-twin launch \ + .amplifier/digital-twin-universe/profiles/memory-bundle-e2e.yaml \ + --name "$DTU_ID" \ + --var GITEA_URL="$GITEA_URL" \ + --var GITEA_TOKEN="$GITEA_TOKEN" ``` -The `tail -n1` captures the environment ID printed as the last line of the -launch output. Save it; every subsequent command uses it. +The `--name` flag pins the DTU id to a known value so subsequent commands +can reference `${DTU_ID}` directly without parsing JSON. Without it, the id +is auto-generated and printed as the final JSON line of launch output +(e.g. `{"id": "dtu-a1b2c3d4", ...}`). + +> **Note on `GITEA_URL`:** `amplifier-digital-twin` automatically rewrites +> `localhost` and `127.0.0.1` in launch variables to the host gateway IP +> reachable from inside the container, so the URL returned by +> `amplifier-gitea create` (e.g. `http://localhost:10110`) works as-is — +> you do not need to substitute a bridge IP yourself. > **First launch takes 5–10 minutes.** The profile has 15 `setup_cmds` that > install system packages, compile Python wheels, initialise MemPalace, mine @@ -420,3 +446,120 @@ connecting to `api.anthropic.com` or `api.openai.com`. --var GITEA_TOKEN="${GITEA_TOKEN}" \ | tail -n1 ``` + +--- + +### Zero `memory-mempalace:*` events in events.jsonl + +**Symptom:** An Amplifier session runs successfully but +`grep 'memory-mempalace:' ~/.amplifier/projects/*/sessions/*/events.jsonl` +returns nothing, even though the memory bundle is active. + +**Root cause — Amplifier module cache shadowing** + +The Amplifier loader prepends `~/.amplifier/cache/amplifier-module-hooks-logging-*/` to +`sys.path` before site-packages. If that cache copy lacks `on_session_ready`, B2 detection +in `_load_entry_point` sets `has_osr=False` and never enqueues the callback. Result: +`register_contributor("observability.events", ...)` is never called, no handlers register +for `memory-mempalace:*` events, and events.jsonl stays empty. + +This also means `uv pip install --force-reinstall ` alone is insufficient — the old +cache wins over the freshly installed version in site-packages. + +**Quick diagnosis:** +```bash +amplifier-digital-twin exec ${DTU_ID} -- \ + /root/.local/share/uv/tools/amplifier/bin/python -c " +import amplifier_module_hooks_logging as m +print('file:', m.__file__) +print('has on_session_ready:', hasattr(m, 'on_session_ready')) +" +``` + +If `has on_session_ready: False` — the cache is shadowing the fork. + +**Fix:** +```bash +amplifier-digital-twin exec ${DTU_ID} -- bash -c " + rm -rf /root/.amplifier/cache/amplifier-module-hooks-logging-* + echo 'Cleared shadowing cache — next session will use site-packages fork' +" +``` + +Deleting the cache dir lets Python fall through to site-packages on the next session +start. No reinstall or primer session required. + +--- + +### "No providers available" after `amplifier-digital-twin update` + +**Symptom:** After running `update`, Amplifier sessions fail immediately with +`Error: No providers available`. The provider (Anthropic) is configured in +`~/.amplifier/settings.yaml` but the module isn't loading. + +**Root cause:** An earlier version of this profile used `rm -rf /root/.amplifier/cache/` +in the update section, which wiped **all** module caches — including provider-anthropic, +loop-streaming, context-simple, and every other foundation module. Amplifier +cannot start a session until those modules are re-downloaded and cached. + +This was fixed in the profile. If you are seeing this on a DTU provisioned from +an older profile version: + +**Fix:** +```bash +# Reinstall Amplifier to repopulate the module cache (~2-3 min) +amplifier-digital-twin exec ${DTU_ID} -- bash -c " + export PATH=/root/.local/bin:\$PATH + uv tool install -vv git+https://github.com/microsoft/amplifier + amplifier --version +" +``` + +After this completes, re-apply the memory bundle: +```bash +amplifier-digital-twin exec ${DTU_ID} -- bash -c " + export PATH=/root/.local/bin:\$PATH + amplifier bundle add --app 'git+https://github.com/michaeljabbour/amplifier-bundle-memory@main#subdirectory=behaviors/mempalace.yaml' +" +``` + +**Prevention:** the `update` section in the current profile does targeted cache +invalidation (`rm -rf ...amplifier-module-hooks-mempalace-*` etc.) rather than a +full wipe. Do not add `rm -rf /root/.amplifier/cache/` to this profile. + +--- + +### Debug patches in `amplifier_core/loader.py` break module loading + +**Symptom:** All module loads fail with +`UnboundLocalError: cannot access local variable 'sys' where it is not associated +with a value` at `loader.py, in _load_entry_point, mod = sys.modules.get(module_name)`. + +**Root cause:** A debug patch added `import pathlib, sys` inside an `if` block +within `_load_entry_point()`. Python's scoping treats any assignment to a name +inside a function (including `import x`) as making that name *local to the entire +function*. When the `if` block is not entered, `sys` is never assigned but Python +still looks for it as a local — causing `UnboundLocalError` every time `sys` is +referenced anywhere else in the function. + +**Fix:** Remove the offending lines from `loader.py`: +```bash +amplifier-digital-twin exec ${DTU_ID} -- \ + /root/.local/share/uv/tools/amplifier/bin/python -c " +import pathlib, re +LOADER = pathlib.Path('/root/.local/share/uv/tools/amplifier/lib/python3.12/site-packages/amplifier_core/loader.py') +src = LOADER.read_text() +lines = [l for l in src.split('\n') + if not ('import pathlib' in l and 'pathlib.Path(' in l and l.strip().startswith('import'))] +LOADER.write_text('\n'.join(lines)) +import py_compile; py_compile.compile(str(LOADER), doraise=True) +print('loader.py cleaned and syntax OK') +" +# Clear the stale .pyc +amplifier-digital-twin exec ${DTU_ID} -- \ + rm -f '/root/.local/share/uv/tools/amplifier/lib/python3.12/site-packages/amplifier_core/__pycache__/loader.cpython-312.pyc' +``` + +**Prevention:** never add `import ` inside an `if` block within a function +that also references `` outside the block. Use `import as _` +if a conditional import is genuinely needed, or import at module top level. diff --git a/modules/hooks-mempalace-briefing/amplifier_module_hooks_mempalace_briefing/__init__.py b/modules/hooks-mempalace-briefing/amplifier_module_hooks_mempalace_briefing/__init__.py index abe88bf..f8250be 100644 --- a/modules/hooks-mempalace-briefing/amplifier_module_hooks_mempalace_briefing/__init__.py +++ b/modules/hooks-mempalace-briefing/amplifier_module_hooks_mempalace_briefing/__init__.py @@ -47,6 +47,26 @@ def emit_event(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] pass +try: + from amplifier_module_tool_mempalace.coordinator_bridge import ( + NOOP_ASYNC_BRIDGE, + AsyncBridge, + make_async_bridge, + register_events, + ) +except ImportError: + AsyncBridge = Any # type: ignore + + async def NOOP_ASYNC_BRIDGE(event: str, payload: Any) -> None: # type: ignore[misc] + pass + + def make_async_bridge(coordinator: Any) -> Any: # type: ignore[misc] + return NOOP_ASYNC_BRIDGE + + def register_events(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] + pass + + # ── Re-ranking ────────────────────────────────────────────────────────────── #: Scaling constant. Bounds max boost/penalty to ±0.04 at weight=1.0. @@ -227,7 +247,7 @@ def _build_briefing( include_diary: bool, include_project_context: bool, importance_weight: float = 1.0, -) -> tuple[str, list[str], int, int, int]: +) -> tuple[str, list[str], int, list[dict[str, Any]], list[dict[str, Any]]]: """Assemble a concise briefing from palace search, KG, diary, and coordination files. Returns (briefing_text, sections, token_estimate, results_fetched, results_after_rerank). @@ -241,8 +261,8 @@ def _build_briefing( """ sections: list[str] = [] approx_tokens = 0 - results_fetched = 0 - results_after_rerank = 0 + results_fetched: list = [] + results_after_rerank: list = [] # 1. Semantic search — fetch extra candidates for re-ranking headroom (CP4: 8 → top 5) wing = f"wing_{project}" @@ -255,7 +275,7 @@ def _build_briefing( }, ) raw_results = search_result.get("results", []) - results_fetched = len(raw_results) + results_fetched = list(raw_results) # 2. Importance re-ranking (CP4) if importance_weight == 0.0 or not raw_results: @@ -268,7 +288,7 @@ def _build_briefing( reranked = _rerank_by_importance(raw_results, lookup, weight=importance_weight) results = reranked[:5] - results_after_rerank = len(results) + results_after_rerank = list(results) if results: lines = [f"**Recent palace memories — `{project}`:**"] @@ -344,7 +364,12 @@ class MempalaceBriefingHook: name = "hooks-mempalace-briefing" events = ["session:start"] - def __init__(self, config: dict[str, Any] | None = None) -> None: + def __init__( + self, + config: dict[str, Any] | None = None, + *, + bridge_emit: AsyncBridge | None = None, + ) -> None: self.config = config or {} self.token_budget: int = self.config.get("token_budget", 1500) self.include_kg: bool = self.config.get("include_kg", True) @@ -359,6 +384,8 @@ def __init__(self, config: dict[str, Any] | None = None) -> None: self.config.get("briefing_importance_weight", 1.0) ) + self._bridge_emit: AsyncBridge = bridge_emit or NOOP_ASYNC_BRIDGE + async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: sid = data.get("session_id") @@ -374,6 +401,13 @@ async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: data={"reason": "mempalace_unavailable"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:briefing_skipped", + {"ok": False, "reason": "mempalace_unavailable"}, + ) + except Exception: + pass # Still inject project-context coordination files even without MemPalace if self.include_project_context: pc_dir = _find_project_context_dir() @@ -408,6 +442,13 @@ async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: ) ) + # Derive drawer_ids from results_after_rerank (list of dicts) + drawer_ids = [ + r["id"] + for r in (results_after_rerank or []) + if isinstance(r, dict) and "id" in r + ] + if briefing: if self.emit_events: emit_event( @@ -419,12 +460,26 @@ async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: "project": project, "section_count": len(sections), "token_estimate": token_estimate, - "results_fetched": results_fetched, - "results_after_rerank": results_after_rerank, + "results_fetched": len(results_fetched or []), + "results_after_rerank": len(results_after_rerank or []), "importance_weight": self.briefing_importance_weight, }, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:briefing_assembled", + { + "ok": True, + "project": project, + "section_count": len(sections), + "token_estimate": token_estimate, + "drawer_ids": drawer_ids, + "importance_weight": self.briefing_importance_weight, + }, + ) + except Exception: + pass return HookResult( action="inject_context", context_injection=briefing, @@ -441,6 +496,13 @@ async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: data={"reason": "no_content"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:briefing_skipped", + {"ok": False, "reason": "no_content", "project": project}, + ) + except Exception: + pass return HookResult(action="continue") @@ -448,7 +510,15 @@ async def mount( coordinator: Any, config: dict[str, Any] | None = None ) -> dict[str, Any]: """Mount the mempalace-briefing hook into the Amplifier coordinator.""" - hook = MempalaceBriefingHook(config) + register_events( + coordinator, + "memory-mempalace-briefing", + ["memory-mempalace:briefing_assembled", "memory-mempalace:briefing_skipped"], + ) + + bridge_emit = make_async_bridge(coordinator) + + hook = MempalaceBriefingHook(config, bridge_emit=bridge_emit) for event in hook.events: coordinator.hooks.register(event, hook, name=hook.name) return { diff --git a/modules/hooks-mempalace-capture/amplifier_module_hooks_mempalace_capture/__init__.py b/modules/hooks-mempalace-capture/amplifier_module_hooks_mempalace_capture/__init__.py index da5cc3d..ce34b18 100644 --- a/modules/hooks-mempalace-capture/amplifier_module_hooks_mempalace_capture/__init__.py +++ b/modules/hooks-mempalace-capture/amplifier_module_hooks_mempalace_capture/__init__.py @@ -90,6 +90,26 @@ def _emitter_resolve_session_id(session_id: str | None = None) -> str: # type: return os.environ.get("AMPLIFIER_SESSION_ID") or "unknown" +try: + from amplifier_module_tool_mempalace.coordinator_bridge import ( + NOOP_SYNC_BRIDGE, + SyncBridge, + make_sync_bridge, + register_events, + ) +except ImportError: + SyncBridge = Any # type: ignore + + def NOOP_SYNC_BRIDGE(event: str, payload: Any) -> None: # type: ignore[misc] + pass + + def make_sync_bridge(coordinator: Any) -> Any: # type: ignore[misc] + return NOOP_SYNC_BRIDGE + + def register_events(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] + pass + + def _detect_wing(cwd: str | None = None) -> str: """Detect the active project wing from git remote or directory name.""" try: @@ -243,6 +263,8 @@ def _detect_category(text: str) -> str | None: _QUEUE: queue.Queue["_CaptureJob"] | None = None _DRAIN_THREAD: threading.Thread | None = None _DRAIN_LOCK = threading.Lock() +# Bridge for drain thread → coordinator forwarding. Set by mount(). +_DRAIN_BRIDGE: SyncBridge = NOOP_SYNC_BRIDGE @dataclass(frozen=True) @@ -345,6 +367,16 @@ def _drain_loop() -> None: }, session_id=job.session_id, ) + try: + _DRAIN_BRIDGE( + "memory-mempalace:capture_failed", + { + "capture_id": job.capture_id, + "reason": "worker_exception", + }, + ) + except Exception: + pass except Exception: pass _spool_delete(job.spool_path) @@ -383,6 +415,22 @@ def _process_job(job: _CaptureJob) -> None: }, session_id=job.session_id, ) + try: + _DRAIN_BRIDGE( + "memory-mempalace:drawer_filed", + { + "capture_id": job.capture_id, + "wing": wing, + "room": room, + "category": job.category, + "content_bytes": len(job.tool_output.encode("utf-8")), + "source": job.source, + "ok": True, + "preview": truncate_preview(job.tool_output), + }, + ) + except Exception: + pass _spool_delete(job.spool_path) except Exception: if job.emit_events: @@ -394,6 +442,16 @@ def _process_job(job: _CaptureJob) -> None: data={"capture_id": job.capture_id, "reason": "mcp_error"}, session_id=job.session_id, ) + try: + _DRAIN_BRIDGE( + "memory-mempalace:capture_failed", + { + "capture_id": job.capture_id, + "reason": "mcp_error", + }, + ) + except Exception: + pass # Leave the spool entry in place so a future resume can retry. @@ -458,7 +516,12 @@ class MempalaceCaptureHook: name = "hooks-mempalace-capture" events = ["tool:post"] - def __init__(self, config: dict[str, Any] | None = None) -> None: + def __init__( + self, + config: dict[str, Any] | None = None, + *, + bridge_emit: SyncBridge | None = None, + ) -> None: self.config = config or {} self.auto_wing: bool = self.config.get("auto_wing", True) self.auto_room: bool = self.config.get("auto_room", True) @@ -466,6 +529,8 @@ def __init__(self, config: dict[str, Any] | None = None) -> None: self.emit_events: bool = bool(self.config.get("emit_events", True)) # Categories to capture (empty list = capture all palace-worthy content) self.categories: list[str] = self.config.get("categories", []) + # Coordinator bridge — no-op default keeps the drain thread safe in tests + self._bridge_emit: SyncBridge = bridge_emit or NOOP_SYNC_BRIDGE async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: """Hot-path handler. @@ -581,12 +646,25 @@ async def mount( ) -> dict[str, Any]: """Mount the mempalace-capture hook into the Amplifier coordinator. - Side effect: starts the drain thread (idempotent) and replays any - spool entries left over from a prior crashed run of the same session. + Side effect: registers the contributor, wires the coordinator bridge, + starts the drain thread (idempotent), and replays any spool entries + left over from a prior crashed run of the same session. Amplifier's native session re-hydration restores the spool dir; we just sweep it. """ - hook = MempalaceCaptureHook(config) + global _DRAIN_BRIDGE + + cfg = config or {} + + register_events( + coordinator, + "memory-mempalace-capture", + ["memory-mempalace:drawer_filed", "memory-mempalace:capture_failed"], + ) + + bridge_emit = make_sync_bridge(coordinator) + _DRAIN_BRIDGE = bridge_emit + hook = MempalaceCaptureHook(cfg, bridge_emit=bridge_emit) for event in hook.events: coordinator.hooks.register(event, hook, name=hook.name) diff --git a/modules/hooks-mempalace-interject/amplifier_module_hooks_mempalace_interject/__init__.py b/modules/hooks-mempalace-interject/amplifier_module_hooks_mempalace_interject/__init__.py index b5d5e67..a8109cf 100644 --- a/modules/hooks-mempalace-interject/amplifier_module_hooks_mempalace_interject/__init__.py +++ b/modules/hooks-mempalace-interject/amplifier_module_hooks_mempalace_interject/__init__.py @@ -60,6 +60,27 @@ class HookRegistry: # type: ignore def emit_event(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] pass + +try: + from amplifier_module_tool_mempalace.coordinator_bridge import ( + NOOP_ASYNC_BRIDGE, + AsyncBridge, + make_async_bridge, + register_events, + ) +except ImportError: + AsyncBridge = Any # type: ignore + + async def NOOP_ASYNC_BRIDGE(event: str, payload: Any) -> None: # type: ignore[misc] + pass + + def make_async_bridge(coordinator: Any) -> Any: # type: ignore[misc] + return NOOP_ASYNC_BRIDGE + + def register_events(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] + pass + + # ── Constants ──────────────────────────────────────────────────────────────── DEFAULT_COSINE_THRESHOLD = 0.72 @@ -224,7 +245,13 @@ class MempalaceInterjectHook: LLM judge for scores in the uncertain band. """ - def __init__(self, config: dict[str, Any]) -> None: + def __init__( + self, + config: dict[str, Any] | None = None, + *, + bridge_emit: AsyncBridge | None = None, + ) -> None: + config = config or {} self.cosine_threshold: float = float( config.get("cosine_threshold", DEFAULT_COSINE_THRESHOLD) ) @@ -249,8 +276,10 @@ def __init__(self, config: dict[str, Any]) -> None: self._last_injected: dict[str, int] = {} # Turn counter (incremented on orchestrator:complete) self._turn: int = 0 - # Briefing memory IDs (populated at session:start if briefing hook shares state) + # Briefing memory IDs (populated via cross-hook briefing_assembled listener) self._briefed_ids: set[str] = set() + # Coordinator bridge emit function + self._bridge_emit: AsyncBridge = bridge_emit or NOOP_ASYNC_BRIDGE def _is_on_cooldown(self, memory_id: str) -> bool: """Check if a memory was recently injected (within cooldown_turns).""" @@ -333,6 +362,13 @@ async def on_prompt_submit(self, event: str, data: dict[str, Any]) -> HookResult data={"trigger": "prompt_submit", "reason": "disabled"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + {"ok": False, "trigger": "prompt_submit", "reason": "disabled"}, + ) + except Exception: + pass return HookResult(action="continue") prompt_text = data.get("prompt", "") or data.get("content", "") @@ -345,6 +381,17 @@ async def on_prompt_submit(self, event: str, data: dict[str, Any]) -> HookResult data={"trigger": "prompt_submit", "reason": "too_short"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + { + "ok": False, + "trigger": "prompt_submit", + "reason": "too_short", + }, + ) + except Exception: + pass return HookResult(action="continue") # Reset per-turn guard (new user prompt = new turn) @@ -365,6 +412,17 @@ async def on_prompt_submit(self, event: str, data: dict[str, Any]) -> HookResult data={"trigger": "prompt_submit", "reason": skip_reason}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + { + "ok": False, + "trigger": "prompt_submit", + "reason": skip_reason, + }, + ) + except Exception: + pass return HookResult(action="continue") injection = _format_injection(memories, event, self.max_inject_chars) @@ -386,6 +444,20 @@ async def on_prompt_submit(self, event: str, data: dict[str, Any]) -> HookResult }, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:memory_surfaced", + { + "ok": True, + "preview": injection[:100] if injection else None, + "trigger": "prompt_submit", + "memory_ids": [m["id"] for m in memories], + "top_score": top_score, + "judge_used": judge_used, + }, + ) + except Exception: + pass return HookResult( action="inject_context", @@ -407,6 +479,13 @@ async def on_tool_pre(self, event: str, data: dict[str, Any]) -> HookResult: data={"trigger": "tool_pre", "reason": "disabled"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + {"ok": False, "trigger": "tool_pre", "reason": "disabled"}, + ) + except Exception: + pass return HookResult(action="continue") tool_name = data.get("tool_name", "") @@ -428,6 +507,13 @@ async def on_tool_pre(self, event: str, data: dict[str, Any]) -> HookResult: data={"trigger": "tool_pre", "reason": "too_short"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + {"ok": False, "trigger": "tool_pre", "reason": "too_short"}, + ) + except Exception: + pass return HookResult(action="continue") ( @@ -445,6 +531,13 @@ async def on_tool_pre(self, event: str, data: dict[str, Any]) -> HookResult: data={"trigger": "tool_pre", "reason": skip_reason}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + {"ok": False, "trigger": "tool_pre", "reason": skip_reason}, + ) + except Exception: + pass return HookResult(action="continue") injection = _format_injection(memories, event, self.max_inject_chars) @@ -465,6 +558,20 @@ async def on_tool_pre(self, event: str, data: dict[str, Any]) -> HookResult: }, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:memory_surfaced", + { + "ok": True, + "preview": injection[:100] if injection else None, + "trigger": "tool_pre", + "memory_ids": [m["id"] for m in memories], + "top_score": top_score, + "judge_used": judge_used, + }, + ) + except Exception: + pass return HookResult( action="inject_context", @@ -493,6 +600,17 @@ async def on_orchestrator_complete( data={"trigger": "orchestrator_complete", "reason": "disabled"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + { + "ok": False, + "trigger": "orchestrator_complete", + "reason": "disabled", + }, + ) + except Exception: + pass return HookResult(action="continue") # Infinite loop guard: skip if we already injected this turn @@ -506,6 +624,17 @@ async def on_orchestrator_complete( data={"trigger": "orchestrator_complete", "reason": "guard_flag"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + { + "ok": False, + "trigger": "orchestrator_complete", + "reason": "guard_flag", + }, + ) + except Exception: + pass return HookResult(action="continue") # Extract the LLM's response text @@ -519,6 +648,17 @@ async def on_orchestrator_complete( data={"trigger": "orchestrator_complete", "reason": "too_short"}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + { + "ok": False, + "trigger": "orchestrator_complete", + "reason": "too_short", + }, + ) + except Exception: + pass return HookResult(action="continue") # Only check for contradictions — use a higher threshold @@ -537,6 +677,17 @@ async def on_orchestrator_complete( data={"trigger": "orchestrator_complete", "reason": skip_reason}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + { + "ok": False, + "trigger": "orchestrator_complete", + "reason": skip_reason, + }, + ) + except Exception: + pass return HookResult(action="continue") # Extra filter: only inject if a memory explicitly contradicts the response @@ -571,6 +722,17 @@ async def on_orchestrator_complete( }, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:interject_skipped", + { + "ok": False, + "trigger": "orchestrator_complete", + "reason": "below_threshold", + }, + ) + except Exception: + pass return HookResult(action="continue") injection = _format_injection(contradicting, event, self.max_inject_chars) @@ -592,6 +754,20 @@ async def on_orchestrator_complete( }, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:memory_surfaced", + { + "ok": True, + "preview": injection[:100] if injection else None, + "trigger": "orchestrator_complete", + "memory_ids": [m["id"] for m in contradicting], + "top_score": top_score, + "judge_used": judge_used, + }, + ) + except Exception: + pass return HookResult( action="inject_context", @@ -611,9 +787,37 @@ async def mount( Registers three separate handlers on prompt:submit, tool:pre, and orchestrator:complete, all at priority 20 (early, non-critical). + + Also registers a contributor for observability events and a cross-hook + listener for memory-mempalace:briefing_assembled to populate _briefed_ids. """ cfg = config or {} - hook = MempalaceInterjectHook(cfg) + + register_events( + coordinator, + "memory-mempalace-interject", + ["memory-mempalace:memory_surfaced", "memory-mempalace:interject_skipped"], + ) + + bridge_emit = make_async_bridge(coordinator) + + # Instantiate the hook with the bridge_emit closure + hook = MempalaceInterjectHook(cfg, bridge_emit=bridge_emit) + + # Cross-hook listener: update _briefed_ids when briefing_assembled fires + async def _on_briefing_assembled(event: str, data: Any) -> HookResult: + try: + ids = data.get("drawer_ids", []) if data else [] + hook._briefed_ids.update(str(i) for i in ids if i) + except Exception: + pass + return HookResult(action="continue") + + coordinator.hooks.register( + "memory-mempalace:briefing_assembled", + _on_briefing_assembled, + name="interject-briefing-listener", + ) # Register each event with its own dedicated handler method # Priority 20: runs early (after critical instrumentation at 50+) @@ -652,5 +856,6 @@ async def mount( "prompt_enabled": hook.prompt_enabled, "tool_pre_enabled": hook.tool_pre_enabled, "orc_enabled": hook.orc_enabled, + "emit_events": hook.emit_events, }, } diff --git a/modules/hooks-project-context/amplifier_module_hooks_project_context/__init__.py b/modules/hooks-project-context/amplifier_module_hooks_project_context/__init__.py index c9fcb9b..37257e2 100644 --- a/modules/hooks-project-context/amplifier_module_hooks_project_context/__init__.py +++ b/modules/hooks-project-context/amplifier_module_hooks_project_context/__init__.py @@ -44,7 +44,27 @@ def emit_event(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] pass -# ── Template stubs ──────────────────────────────────────────────────────────── +try: + from amplifier_module_tool_mempalace.coordinator_bridge import ( + NOOP_ASYNC_BRIDGE, + AsyncBridge, + make_async_bridge, + register_events, + ) +except ImportError: + AsyncBridge = Any # type: ignore + + async def NOOP_ASYNC_BRIDGE(event: str, payload: Any) -> None: # type: ignore[misc] + pass + + def make_async_bridge(coordinator: Any) -> Any: # type: ignore[misc] + return NOOP_ASYNC_BRIDGE + + def register_events(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] + pass + + +# ── Template stubs ───────────────────────────────────────────────────────────── _AGENTS_MD = """\ # Agent Instructions @@ -142,7 +162,7 @@ def emit_event(*args: Any, **kwargs: Any) -> None: # type: ignore[misc] """ -# ── Helpers ─────────────────────────────────────────────────────────────────── +# ── Helpers ──────────────────────────────────────────────────────────────────── def _find_git_root() -> Path | None: @@ -239,19 +259,25 @@ def _read_tier1(pc_dir: Path, token_budget: int) -> tuple[str, list[str], int]: return result, files_read, token_estimate -# ── Hook classes ───────────────────────────────────────────────────────────── +# ── Hook classes ─────────────────────────────────────────────────────────────── class ProjectContextStartHook: name = "hooks-project-context-start" events = ["session:start"] - def __init__(self, config: dict[str, Any] | None = None) -> None: + def __init__( + self, + config: dict[str, Any] | None = None, + *, + bridge_emit: AsyncBridge | None = None, + ) -> None: self.config = config or {} self.tier1_always: bool = self.config.get("tier1_always", True) self.setup_if_missing: bool = self.config.get("setup_if_missing", True) self.token_budget: int = self.config.get("token_budget", 800) self.emit_events: bool = bool(self.config.get("emit_events", True)) + self._bridge_emit: AsyncBridge = bridge_emit or NOOP_ASYNC_BRIDGE async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: sid = data.get("session_id") @@ -273,6 +299,17 @@ async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: }, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:coordination_scaffolded", + { + "ok": True, + "pc_dir": str(pc_dir), + "files_created": files_created, + }, + ) + except Exception: + pass if pc_dir is None: return HookResult(action="continue") @@ -291,6 +328,17 @@ async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: }, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:coordination_read", + { + "ok": True, + "files_read": files_read, + "token_estimate": token_estimate, + }, + ) + except Exception: + pass return HookResult( action="inject_context", context_injection=block, @@ -306,10 +354,16 @@ class ProjectContextEndHook: name = "hooks-project-context-end" events = ["session:end"] - def __init__(self, config: dict[str, Any] | None = None) -> None: + def __init__( + self, + config: dict[str, Any] | None = None, + *, + bridge_emit: AsyncBridge | None = None, + ) -> None: self.config = config or {} self.handoff_on_end: bool = self.config.get("handoff_on_end", True) self.emit_events: bool = bool(self.config.get("emit_events", True)) + self._bridge_emit: AsyncBridge = bridge_emit or NOOP_ASYNC_BRIDGE async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: if not self.handoff_on_end: @@ -339,6 +393,16 @@ async def __call__(self, event: str, data: dict[str, Any]) -> HookResult: data={"prompt_preview": prompt[:200]}, session_id=sid, ) + try: + await self._bridge_emit( + "memory-mempalace:curator_handoff_requested", + { + "ok": True, + "prompt_preview": prompt[:200], + }, + ) + except Exception: + pass return HookResult(action="continue") @@ -346,8 +410,21 @@ async def mount( coordinator: Any, config: dict[str, Any] | None = None ) -> dict[str, Any]: """Mount the project-context hooks into the Amplifier coordinator.""" - start_hook = ProjectContextStartHook(config) - end_hook = ProjectContextEndHook(config) + + register_events( + coordinator, + "memory-mempalace-project-context", + [ + "memory-mempalace:coordination_read", + "memory-mempalace:coordination_scaffolded", + "memory-mempalace:curator_handoff_requested", + ], + ) + + bridge_emit = make_async_bridge(coordinator) + + start_hook = ProjectContextStartHook(config, bridge_emit=bridge_emit) + end_hook = ProjectContextEndHook(config, bridge_emit=bridge_emit) for hook in (start_hook, end_hook): for evt in hook.events: coordinator.hooks.register(evt, hook, name=hook.name) diff --git a/modules/tool-mempalace/amplifier_module_tool_mempalace/__init__.py b/modules/tool-mempalace/amplifier_module_tool_mempalace/__init__.py index d8deacb..89e9ff0 100644 --- a/modules/tool-mempalace/amplifier_module_tool_mempalace/__init__.py +++ b/modules/tool-mempalace/amplifier_module_tool_mempalace/__init__.py @@ -26,6 +26,12 @@ from amplifier_core import Tool, ToolResult # type: ignore +from .coordinator_bridge import ( + NOOP_SYNC_BRIDGE, + SyncBridge, + make_sync_bridge, + register_events, +) from .event_emitter import _read_events_with_skip_count, emit_event from .garden import execute_garden @@ -59,6 +65,11 @@ class PalaceTool(Tool): "MemPalace memory operations. Operations: search, remember, status, " "kg (knowledge graph), traverse, diary, mine, events, garden." ) + + def __init__(self, *, bridge_emit: SyncBridge | None = None) -> None: + super().__init__() + self._bridge_emit: SyncBridge = bridge_emit or NOOP_SYNC_BRIDGE + input_schema = { "type": "object", "properties": { @@ -382,6 +393,31 @@ async def execute(self, operation: str, **kwargs: Any) -> ToolResult: # type: i # eventually complete on its own # Do NOT treat the 120s wall-clock budget as a hard resource # bound; treat it as a response-time guarantee to the caller. + + def combined_emit( + hook: str, + event: str, + *, + ok: bool = True, + preview: str | None = None, + data: dict[str, Any] | None = None, + session_id: str | None = None, + ) -> None: + """Emit to private JSONL and forward to coordinator bridge.""" + emit_event( + hook, + event, + ok=ok, + preview=preview, + data=data, + session_id=session_id, + ) + try: + payload = {"ok": ok, "preview": preview, **(data or {})} + self._bridge_emit(f"memory-mempalace:{event}", payload) + except Exception: + pass + try: garden_result = await asyncio.wait_for( asyncio.to_thread( @@ -393,7 +429,7 @@ async def execute(self, operation: str, **kwargs: Any) -> ToolResult: # type: i cluster_threshold=float( kwargs.get("cluster_threshold", 0.80) ), - emit_fn=emit_event, + emit_fn=combined_emit, session_id=kwargs.get("session_id"), ), timeout=_GARDEN_TIMEOUT_S, @@ -418,6 +454,21 @@ async def execute(self, operation: str, **kwargs: Any) -> ToolResult: # type: i ) except Exception: pass # never let event emission failure crash the error path + try: + self._bridge_emit( + "memory-mempalace:garden_completed", + { + "ok": False, + "scope_wing": kwargs.get("wing"), + "scope_room": kwargs.get("room"), + "drawers_analyzed": 0, + "clusters_found": 0, + "kg_edges_created": 0, + "timed_out": True, + }, + ) + except Exception: + pass return ToolResult( success=False, error={ @@ -442,6 +493,20 @@ async def execute(self, operation: str, **kwargs: Any) -> ToolResult: # type: i }, session_id=kwargs.get("session_id"), ) + try: + self._bridge_emit( + "memory-mempalace:garden_completed", + { + "ok": True, + "scope_wing": kwargs.get("wing"), + "scope_room": kwargs.get("room"), + "drawers_analyzed": garden_result["drawers_analyzed"], + "clusters_found": len(garden_result["clusters"]), + "kg_edges_created": garden_result["kg_edges_created"], + }, + ) + except Exception: + pass return ToolResult(output=json.dumps(garden_result, indent=2)) @@ -468,7 +533,14 @@ async def mount( coordinator: Any, config: dict[str, Any] | None = None ) -> dict[str, Any]: """Mount the palace tool into the Amplifier coordinator.""" - tool = PalaceTool() + register_events( + coordinator, + "memory-mempalace-tool", + ["memory-mempalace:garden_completed", "memory-mempalace:garden_progress"], + ) + + bridge_emit = make_sync_bridge(coordinator) + tool = PalaceTool(bridge_emit=bridge_emit) await coordinator.mount("tools", tool, name=tool.name) return { "name": "tool-mempalace", diff --git a/modules/tool-mempalace/amplifier_module_tool_mempalace/coordinator_bridge.py b/modules/tool-mempalace/amplifier_module_tool_mempalace/coordinator_bridge.py new file mode 100644 index 0000000..50af73b --- /dev/null +++ b/modules/tool-mempalace/amplifier_module_tool_mempalace/coordinator_bridge.py @@ -0,0 +1,109 @@ +"""Shared coordinator bridge factories for memory-mempalace modules. + +One pattern, two flavors. Both swallow producer-side errors so a failing +observer never corrupts the caller. + +* ``make_async_bridge`` — for callers awaiting from the coordinator's + loop (briefing, interject, project-context hooks). +* ``make_sync_bridge`` — for callers running OFF the loop (capture's + drain thread, tool's garden thread). Captures the running loop at + mount time, schedules emits via ``run_coroutine_threadsafe``, guards + against a closed loop. + +``register_events`` factors out the best-effort observability +contributor registration (was inconsistently wrapped across modules). +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable, Sequence +from typing import Any + +AsyncBridge = Callable[[str, Any], Awaitable[None]] +SyncBridge = Callable[[str, Any], None] + + +async def _noop_async(event: str, payload: Any) -> None: # pragma: no cover + return None + + +def _noop_sync(event: str, payload: Any) -> None: # pragma: no cover + return None + + +# Public no-op singletons. Hook/tool classes use these as defaults so +# they remain callable without going through mount() — i.e. testable. +NOOP_ASYNC_BRIDGE: AsyncBridge = _noop_async +NOOP_SYNC_BRIDGE: SyncBridge = _noop_sync + + +def register_events( + coordinator: Any, + contributor: str, + events: Sequence[str], +) -> None: + """Register a module's emitted event names with observability. + + Best-effort: never raises. The events list is snapshotted so the + contributor lambda is decoupled from caller-side mutation. + """ + try: + snapshot = list(events) + coordinator.register_contributor( + "observability.events", + contributor, + lambda: snapshot, + ) + except Exception: + pass + + +def make_async_bridge(coordinator: Any) -> AsyncBridge: + """Build an async bridge to ``coordinator.hooks.emit``. + + Use from within an async hook running on the coordinator's loop. + Errors from the hook bus are swallowed. + """ + + async def bridge_emit(event: str, payload: Any) -> None: + try: + await coordinator.hooks.emit(event, payload) + except Exception: + pass + + return bridge_emit + + +def make_sync_bridge(coordinator: Any) -> SyncBridge: + """Build a thread-safe sync bridge to ``coordinator.hooks.emit``. + + Must be called from ``async def mount`` so a running loop is + available to capture. The returned callable is safe to invoke from + any thread; errors and a closed loop are silently absorbed. + """ + loop = asyncio.get_running_loop() + + def bridge_emit(event: str, payload: Any) -> None: + try: + if loop.is_closed(): + return + asyncio.run_coroutine_threadsafe( + coordinator.hooks.emit(event, payload), + loop, + ) + except Exception: + pass + + return bridge_emit + + +__all__ = [ + "AsyncBridge", + "SyncBridge", + "NOOP_ASYNC_BRIDGE", + "NOOP_SYNC_BRIDGE", + "register_events", + "make_async_bridge", + "make_sync_bridge", +] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0ef5bb0..a7c3719 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,6 +1,9 @@ """Integration test fixtures for the amplifier-bundle-memory DTU. -These fixtures are designed to run INSIDE the DTU container: +These tests run INSIDE the memory-bundle-e2e DTU container. On the host +machine they are automatically skipped — see pytest_collection_modifyitems. + +Fixtures designed to run inside the DTU container: - reset_palace: autouse module-scope fixture that resets the memory palace before each test module so each module starts with a clean slate. Calls the @@ -20,6 +23,31 @@ import pytest +def pytest_collection_modifyitems(config, items): + """Skip all integration tests when not running inside the DTU container. + + The sentinel path /root/.mempalace only exists inside the + memory-bundle-e2e DTU profile. On the host the tests are marked as + SKIP so they are visible in CI output without failing the suite. + + A PermissionError is treated as "not in DTU" — it means we are running + as a non-root user that cannot stat /root/.mempalace. + """ + try: + in_dtu = Path("/root/.mempalace").exists() + except PermissionError: + in_dtu = False + + if in_dtu: + return + + skip_marker = pytest.mark.skip( + reason="DTU environment required (run inside memory-bundle-e2e container)" + ) + for item in items: + item.add_marker(skip_marker) + + @pytest.fixture(scope="module", autouse=True) def reset_palace(): """Reset the memory palace before each test module. diff --git a/tests/integration/test_event_wiring.py b/tests/integration/test_event_wiring.py new file mode 100644 index 0000000..d8be3c7 --- /dev/null +++ b/tests/integration/test_event_wiring.py @@ -0,0 +1,134 @@ +"""End-to-end coordinator event wiring tests. + +Runs INSIDE the memory-bundle-e2e DTU container. +Validates: hook fires -> coordinator.hooks.emit -> hooks-logging writes events.jsonl. + +All tests in this module are automatically skipped on the host machine — +see tests/integration/conftest.py (pytest_collection_modifyitems). + +DTU requirements (provisioned by memory-bundle-e2e.yaml): +- /root/.mempalace (seeded palace + spool dir) +- amplifier installed via uv tool install +- mempalace CLI on PATH +- ANTHROPIC_API_KEY / OPENAI_API_KEY in /root/.amplifier/keys.env +- pytest-asyncio installed +""" + +from __future__ import annotations + +import json +import subprocess +import time +from pathlib import Path + +import pytest + +WORKSPACE = Path("/workspace/amplifier-bundle-memory") + + +def _latest_events_jsonl() -> Path | None: + """Return the most recent session's events.jsonl from /root/.amplifier. + + Searches recursively under /root/.amplifier, sorts by modification time, + and returns the path with the highest mtime. Returns None if no file found. + """ + files = sorted( + Path("/root/.amplifier").rglob("events.jsonl"), + key=lambda p: p.stat().st_mtime, + ) + return files[-1] if files else None + + +def _events_in(path: Path) -> list[dict]: + """Load a JSONL file and return a list of event dicts.""" + events = [] + for line in path.read_text().splitlines(): + line = line.strip() + if line: + events.append(json.loads(line)) + return events + + +def _coordinator_events(path: Path) -> list[dict]: + """Filter events whose 'event' key starts with 'memory-mempalace:'.""" + return [ + e + for e in _events_in(path) + if isinstance(e.get("event"), str) + and e["event"].startswith("memory-mempalace:") + ] + + +def test_drawer_filed_appears_in_events_jsonl(): + """drawer_filed event should appear in events.jsonl after amplifier run. + + Runs an amplifier session with a message that contains an architecture + decision — the mempalace hook should file it as a drawer and emit a + memory-mempalace:drawer_filed coordinator event. + """ + subprocess.run( + [ + "amplifier", + "run", + "--", + "echo 'Architecture decision: we use dual-emit for observability'", + ], + timeout=120, + cwd=WORKSPACE, + ) + time.sleep(2.0) + events_path = _latest_events_jsonl() + assert events_path is not None, "no events.jsonl found — is hooks-logging mounted?" + coordinator_events = _coordinator_events(events_path) + event_names = [e.get("event") for e in coordinator_events] + assert "memory-mempalace:drawer_filed" in event_names + + +def test_briefing_assembled_payload_has_drawer_ids(): + """briefing_assembled event payload should contain a drawer_ids list. + + Runs a short amplifier session and checks that the coordinator emitted a + memory-mempalace:briefing_assembled event whose payload includes a + 'drawer_ids' list. + """ + subprocess.run( + ["amplifier", "run", "--", "echo done"], + timeout=60, + cwd=WORKSPACE, + ) + time.sleep(1.0) + events_path = _latest_events_jsonl() + assert events_path is not None, "no events.jsonl found — is hooks-logging mounted?" + coordinator_events = _coordinator_events(events_path) + briefing_events = [ + e + for e in coordinator_events + if e.get("event") == "memory-mempalace:briefing_assembled" + ] + assert briefing_events, "No briefing_assembled event found in coordinator events" + briefing = briefing_events[-1] + data = briefing.get("data", {}) + # Defensive: if 'drawer_ids' not in data, try the briefing dict directly + if "drawer_ids" not in data: + data = briefing + assert "drawer_ids" in data, ( + f"'drawer_ids' not found in briefing_assembled payload. " + f"Available keys: {list(data.keys())}" + ) + assert isinstance(data["drawer_ids"], list), ( + f"'drawer_ids' should be a list, got {type(data['drawer_ids'])}" + ) + + +def test_briefed_ids_prevents_reinjection(): + """briefed drawer IDs should not be re-injected in subsequent surfaced events. + + This test validates the deduplication contract: drawer_ids present in + briefing_assembled should not reappear in memory_surfaced events for the + same session. + """ + pytest.skip( + "Requires content-pinned session with deterministic retrieval. " + "Validated manually by comparing drawer_ids in briefing_assembled " + "with memory_ids in subsequent memory_surfaced events." + ) diff --git a/tests/test_contract.py b/tests/test_contract.py index 59ac5ba..7159d89 100644 --- a/tests/test_contract.py +++ b/tests/test_contract.py @@ -84,6 +84,11 @@ def __init__(self) -> None: self.hooks = HookRegistry() self.session_id: str | None = "test-session" + self._contributors: dict[str, dict[str, Any]] = {} + + def register_contributor(self, channel: str, name: str, callback: Any) -> None: + """Record a contributor registration (stub — no-op beyond recording).""" + self._contributors.setdefault(channel, {})[name] = callback async def _dispatch(registry: Any, event: str, data: dict[str, Any]) -> list[Any]: diff --git a/tests/test_coordinator_bridge.py b/tests/test_coordinator_bridge.py new file mode 100644 index 0000000..1242e34 --- /dev/null +++ b/tests/test_coordinator_bridge.py @@ -0,0 +1,768 @@ +""" +Coordinator-bridge unit tests. + +These tests use FakeCoordinator — a pure-Python stub that replaces the real +Amplifier kernel (RustCoordinator / RustHookRegistry). No real Amplifier +kernel, filesystem, MCP server, ChromaDB instance, or subprocess is involved. + +The suite verifies four properties of the memory-mempalace coordinator bridge: + +1. **mount() registration** — register_contributor() is called at mount() time + with the expected channel and event list so the hook wires itself into the + coordinator correctly. + +2. **bridge_emit / sync_bridge_emit round-trip** — after each emit_event call + site fires, the bridge emits the corresponding coordinator event via + bridge_emit (async path) or sync_bridge_emit (sync/threaded path). + +3. **emit_events:false suppression** — when the hook is configured with + ``emit_events: false``, BOTH the private JSONL emit_event call AND the + coordinator bridge emit are suppressed; the coordinator sees no events. + +4. **_briefed_ids cross-hook population** — the interject hook's ``_briefed_ids`` + set is populated when a sibling hook emits the + ``memory-mempalace:briefing_assembled`` coordinator event, confirming that + the bridge correctly wires sibling hooks through the coordinator. +""" + +from __future__ import annotations + +import asyncio +import threading # noqa: F401 (used by later tasks in this file) +from typing import Any +from unittest.mock import AsyncMock, MagicMock # noqa: F401 (used by later tasks in this file) + +import pytest # noqa: F401 (used by later tasks in this file) + + +# --------------------------------------------------------------------------- +# Shared stubs +# --------------------------------------------------------------------------- + + +class _Result: + """Minimal stand-in for the RustHookResult returned by emit().""" + + def __init__(self, action: str = "continue") -> None: + self.action = action + + +class FakeHooks: + """ + Stub of ``coordinator.hooks`` (RustHookRegistry). + + Records every register() call and re-dispatches emit() to all handlers + that were registered for the emitted event name. + """ + + def __init__(self) -> None: + # event_name -> list of (handler, name, priority) + self._registered: dict[str, list[tuple[Any, str, int]]] = {} + # Full log of every register() call as dicts for easy assertion + self._register_log: list[dict[str, Any]] = [] + # Full log of every emit() call as (event_name, data) tuples + self._emit_log: list[tuple[str, Any]] = [] + + def register( + self, + event_name: str, + handler: Any, + name: str = "", + priority: int = 0, + ) -> None: + """Record and store a handler registration.""" + self._register_log.append( + { + "event_name": event_name, + "handler": handler, + "name": name, + "priority": priority, + } + ) + self._registered.setdefault(event_name, []).append((handler, name, priority)) + + async def emit(self, event_name: str, data: Any = None) -> _Result: + """ + Record the emit and invoke every registered handler for *event_name*. + + Handlers may be sync or async; both are supported. Returns a + ``_Result`` with ``action='continue'`` unconditionally. + """ + self._emit_log.append((event_name, data)) + for handler, _name, _priority in self._registered.get(event_name, []): + result = handler(event_name, data) + if asyncio.iscoroutine(result): + await result + return _Result(action="continue") + + +class FakeCoordinator: + """ + Stub of ``RustCoordinator``. + + Provides the hooks registry and a contributor registration table that + tests can inspect without starting the real Amplifier kernel. + """ + + def __init__(self) -> None: + self.hooks: FakeHooks = FakeHooks() + # channel -> {contributor_name: callback} + self._contributors: dict[str, dict[str, Any]] = {} + + def register_contributor(self, channel: str, name: str, callback: Any) -> None: + """Record a contributor registration on *channel*.""" + self._contributors.setdefault(channel, {})[name] = callback + + async def mount(self, channel: str, tool: Any, *, name: str = "") -> None: + """No-op fake mount — records nothing, satisfies tool mount() contract.""" + + +# --------------------------------------------------------------------------- +# Capture hook — coordinator bridge tests (RED phase) +# --------------------------------------------------------------------------- + + +class TestCaptureCoordinatorBridge: + """Tests for coordinator bridge wiring in the capture hook. + + These are RED-phase TDD tests. They document the desired behavior + of the coordinator bridge before the implementation exists. + + Expected failing reasons (RED phase): + - mount() does not call register_contributor() → test 1 fails + - MempalaceCaptureHook has no _sync_bridge_emit attribute → test 2 fails + """ + + def test_register_contributor_called_at_mount(self) -> None: + """mount() must call register_contributor on the coordinator with + channel='observability.events' and name='memory-mempalace-capture'. + + The contributor callback must return a list of events that includes: + - 'memory-mempalace:drawer_filed' + - 'memory-mempalace:capture_failed' + + And must NOT include: + - 'memory-mempalace:capture_queued' (private-JSONL-only; intentionally hidden) + """ + import asyncio + + import amplifier_module_hooks_mempalace_capture as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator)) + + assert "observability.events" in coordinator._contributors, ( + "mount() must call register_contributor with channel 'observability.events'" + ) + contribs = coordinator._contributors["observability.events"] + assert "memory-mempalace-capture" in contribs, ( + "mount() must register contributor with name 'memory-mempalace-capture'" + ) + + callback = contribs["memory-mempalace-capture"] + events = callback() + assert "memory-mempalace:drawer_filed" in events, ( + "contributor callback must include 'memory-mempalace:drawer_filed'" + ) + assert "memory-mempalace:capture_failed" in events, ( + "contributor callback must include 'memory-mempalace:capture_failed'" + ) + assert "memory-mempalace:capture_queued" not in events, ( + "capture_queued is private-JSONL-only and must NOT be in coordinator events" + ) + + def test_drawer_filed_emits_to_coordinator_from_drain_thread( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: Any + ) -> None: + """After a worthy tool:post event, the drain thread must emit + 'memory-mempalace:drawer_filed' to the coordinator via _bridge_emit. + + The hook must expose a _bridge_emit attribute confirming bridge wiring. + drawer_filed must also appear in the private-JSONL emit log. + """ + import asyncio + import time + + import amplifier_module_hooks_mempalace_capture as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator)) + + monkeypatch.setattr(m, "_mcp_add_drawer", lambda *a, **kw: None) + monkeypatch.setattr(m, "_detect_wing", lambda: "wing_test") + monkeypatch.setattr( + m, "_spool_dir_for", lambda sid: tmp_path / "spool" / (sid or "x") + ) + + emit_lock = threading.Lock() + emitted: list[tuple[Any, ...]] = [] + + def _capture(*a: Any, **kw: Any) -> None: + with emit_lock: + emitted.append((a, kw)) + + monkeypatch.setattr(m, "emit_event", _capture) + + hook = m.MempalaceCaptureHook() + asyncio.run( + hook( + "tool:post", + { + "tool_name": "bash", + "tool_input": {"command": "ls -la"}, + "tool_output": "x" * 200, + }, + ) + ) + + # Wait for drain thread to finish (500 iterations × 0.01s = 5s deadline) + for _ in range(500): + if m._QUEUE is None or m._QUEUE.unfinished_tasks == 0: + break + time.sleep(0.01) + + # The hook must have a _bridge_emit attribute (coordinator bridge wiring) + assert hasattr(hook, "_bridge_emit"), ( + "MempalaceCaptureHook must have a _bridge_emit attribute " + "to wire the drain thread into the coordinator bridge" + ) + + # drawer_filed must appear in the private-JSONL emit list + emitted_names = [a[0][1] for a in emitted if a[0] and len(a[0]) > 1] + assert "drawer_filed" in emitted_names, ( + f"Expected 'drawer_filed' in private-JSONL emits after drain, " + f"got: {emitted_names}" + ) + + def test_capture_queued_does_not_emit_to_coordinator( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: Any + ) -> None: + """capture_queued must NOT appear in coordinator.hooks._emit_log. + + capture_queued is intentionally private-JSONL-only. Even after the + bridge is wired, capture_queued events must never reach the coordinator. + """ + import asyncio + import time + + import amplifier_module_hooks_mempalace_capture as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator)) + + monkeypatch.setattr(m, "_mcp_add_drawer", lambda *a, **kw: None) + monkeypatch.setattr(m, "_detect_wing", lambda: "wing_test") + monkeypatch.setattr( + m, "_spool_dir_for", lambda sid: tmp_path / "spool" / (sid or "x") + ) + + emit_lock = threading.Lock() + emitted: list[tuple[Any, ...]] = [] + + def _capture(*a: Any, **kw: Any) -> None: + with emit_lock: + emitted.append((a, kw)) + + monkeypatch.setattr(m, "emit_event", _capture) + + hook = m.MempalaceCaptureHook() + asyncio.run( + hook( + "tool:post", + { + "tool_name": "bash", + "tool_input": {"command": "ls -la"}, + "tool_output": "x" * 200, + }, + ) + ) + + # Wait for drain thread + for _ in range(500): + if m._QUEUE is None or m._QUEUE.unfinished_tasks == 0: + break + time.sleep(0.01) + + # capture_queued must never appear in coordinator.hooks._emit_log + coordinator_event_names = [ev[0] for ev in coordinator.hooks._emit_log] + assert "memory-mempalace:capture_queued" not in coordinator_event_names, ( + "capture_queued is private-JSONL-only and must never appear in " + "coordinator.hooks._emit_log" + ) + + def test_emit_events_false_suppresses_both_channels( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: Any + ) -> None: + """emit_events=False must suppress BOTH the private-JSONL channel + AND any coordinator bridge emits. + """ + import asyncio + import time + + import amplifier_module_hooks_mempalace_capture as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator, config={"emit_events": False})) + + monkeypatch.setattr(m, "_mcp_add_drawer", lambda *a, **kw: None) + monkeypatch.setattr(m, "_detect_wing", lambda: "wing_test") + monkeypatch.setattr( + m, "_spool_dir_for", lambda sid: tmp_path / "spool" / (sid or "x") + ) + + emitted: list[tuple[Any, ...]] = [] + monkeypatch.setattr(m, "emit_event", lambda *a, **kw: emitted.append((a, kw))) + + hook = m.MempalaceCaptureHook(config={"emit_events": False}) + asyncio.run( + hook( + "tool:post", + { + "tool_name": "bash", + "tool_input": {}, + "tool_output": "x" * 200, + }, + ) + ) + + # Drain queue + for _ in range(500): + if m._QUEUE is None or m._QUEUE.unfinished_tasks == 0: + break + time.sleep(0.01) + + # Private-JSONL channel: no emits + assert emitted == [], ( + f"emit_events=False must suppress all private-JSONL emits, got: {emitted}" + ) + + # Coordinator channel: no events starting with 'memory-mempalace:' + coordinator_events = [ + ev[0] + for ev in coordinator.hooks._emit_log + if ev[0].startswith("memory-mempalace:") + ] + assert coordinator_events == [], ( + f"emit_events=False must suppress coordinator bridge emits, " + f"got: {coordinator_events}" + ) + + +# --------------------------------------------------------------------------- +# Briefing hook — coordinator bridge tests +# --------------------------------------------------------------------------- + + +class TestBriefingCoordinatorBridge: + """Tests for coordinator bridge wiring in the briefing hook. + + These tests verify that mount() registers a contributor and that the hook + emits coordinator events (briefing_assembled, briefing_skipped) via + bridge_emit, carrying drawer_ids derived from results_after_rerank. + """ + + def test_register_contributor_called_at_mount(self) -> None: + """mount() must call register_contributor on the coordinator with + channel='observability.events' and name='memory-mempalace-briefing'. + + The contributor callback must return a list of events that includes: + - 'memory-mempalace:briefing_assembled' + - 'memory-mempalace:briefing_skipped' + """ + import asyncio + + import amplifier_module_hooks_mempalace_briefing as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator)) + + assert "observability.events" in coordinator._contributors, ( + "mount() must call register_contributor with channel 'observability.events'" + ) + contribs = coordinator._contributors["observability.events"] + assert "memory-mempalace-briefing" in contribs, ( + "mount() must register contributor with name 'memory-mempalace-briefing'" + ) + + callback = contribs["memory-mempalace-briefing"] + events = callback() + assert "memory-mempalace:briefing_assembled" in events, ( + "contributor callback must include 'memory-mempalace:briefing_assembled'" + ) + assert "memory-mempalace:briefing_skipped" in events, ( + "contributor callback must include 'memory-mempalace:briefing_skipped'" + ) + + def test_briefing_assembled_emits_with_drawer_ids( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """After briefing_assembled, bridge emits with drawer_ids from results_after_rerank.""" + import asyncio + import subprocess + + import amplifier_module_hooks_mempalace_briefing as m # type: ignore[import] + + def fake_run(cmd: Any, *a: Any, **kw: Any) -> subprocess.CompletedProcess[str]: + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + + monkeypatch.setattr(m.subprocess, "run", fake_run) + + results_with_ids = [ + {"id": "drawer-1", "room": "r", "text": "t1", "score": 0.9}, + {"id": "drawer-2", "room": "r", "text": "t2", "score": 0.8}, + ] + + monkeypatch.setattr( + m, + "_build_briefing", + lambda **kw: ( + "## briefing", + ["section"], + 100, + results_with_ids, + results_with_ids, + ), + ) + monkeypatch.setattr(m, "_detect_project_name", lambda: "testproject") + + bridge_calls: list[tuple[str, Any]] = [] + + async def fake_bridge(event_name: str, payload: Any) -> None: + bridge_calls.append((event_name, payload)) + + hook = m.MempalaceBriefingHook(bridge_emit=fake_bridge) + asyncio.run(hook("session:start", {"opening_prompt": "test"})) + + assembled_calls = [ + (name, payload) + for name, payload in bridge_calls + if name == "memory-mempalace:briefing_assembled" + ] + assert len(assembled_calls) == 1, ( + f"Expected exactly one 'memory-mempalace:briefing_assembled' bridge call, " + f"got: {bridge_calls}" + ) + _, payload = assembled_calls[0] + assert payload.get("ok") is True, f"Expected ok=True in payload, got: {payload}" + assert payload.get("drawer_ids") == ["drawer-1", "drawer-2"], ( + f"Expected drawer_ids=['drawer-1', 'drawer-2'], got: {payload.get('drawer_ids')}" + ) + + def test_emit_events_false_suppresses_both_channels( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + """emit_events=False must suppress BOTH the private-JSONL channel + AND any coordinator bridge emits. + """ + import asyncio + + import amplifier_module_hooks_mempalace_briefing as m # type: ignore[import] + + def raise_not_found(*a: Any, **kw: Any) -> None: + raise FileNotFoundError("mempalace not found") + + monkeypatch.setattr(m.subprocess, "run", raise_not_found) + monkeypatch.setattr(m, "_find_project_context_dir", lambda: None) + + emitted: list[tuple[Any, ...]] = [] + monkeypatch.setattr(m, "emit_event", lambda *a, **kw: emitted.append((a, kw))) + + bridge_calls: list[tuple[str, Any]] = [] + + async def fake_bridge(event_name: str, payload: Any) -> None: + bridge_calls.append((event_name, payload)) + + hook = m.MempalaceBriefingHook( + config={"emit_events": False}, bridge_emit=fake_bridge + ) + asyncio.run(hook("session:start", {})) + + # Private-JSONL channel: no emits + assert emitted == [], ( + f"emit_events=False must suppress all private-JSONL emits, got: {emitted}" + ) + + # Coordinator channel: no events starting with 'memory-mempalace:' + coordinator_events = [ + name for name, _ in bridge_calls if name.startswith("memory-mempalace:") + ] + assert coordinator_events == [], ( + f"emit_events=False must suppress coordinator bridge emits, " + f"got: {coordinator_events}" + ) + + +# --------------------------------------------------------------------------- +# Interject hook — coordinator bridge tests +# --------------------------------------------------------------------------- + + +class TestInterjectCoordinatorBridge: + """Tests for coordinator bridge wiring in the interject hook. + + These tests verify that mount() registers a contributor, registers a + cross-hook listener for briefing_assembled, and emits coordinator events + (memory_surfaced, interject_skipped) via bridge_emit. + """ + + def test_register_contributor_called_at_mount(self) -> None: + """mount() must call register_contributor with contributor name + 'memory-mempalace-interject'. The callback must return a list that + includes 'memory-mempalace:memory_surfaced' and + 'memory-mempalace:interject_skipped'. + """ + import asyncio + + import amplifier_module_hooks_mempalace_interject as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator)) + + assert "observability.events" in coordinator._contributors, ( + "mount() must call register_contributor with channel 'observability.events'" + ) + contribs = coordinator._contributors["observability.events"] + assert "memory-mempalace-interject" in contribs, ( + "mount() must register contributor with name 'memory-mempalace-interject'" + ) + + callback = contribs["memory-mempalace-interject"] + events = callback() + assert "memory-mempalace:memory_surfaced" in events, ( + "contributor callback must include 'memory-mempalace:memory_surfaced'" + ) + assert "memory-mempalace:interject_skipped" in events, ( + "contributor callback must include 'memory-mempalace:interject_skipped'" + ) + + def test_briefing_assembled_listener_registered_in_mount(self) -> None: + """mount() must register a handler for 'memory-mempalace:briefing_assembled' + in coordinator.hooks._registered so that when the briefing hook emits + briefing_assembled, the interject hook's _briefed_ids is updated. + """ + import asyncio + + import amplifier_module_hooks_mempalace_interject as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator)) + + assert "memory-mempalace:briefing_assembled" in coordinator.hooks._registered, ( + "mount() must register a handler for 'memory-mempalace:briefing_assembled' " + "so that briefing events update _briefed_ids" + ) + + async def test_briefed_ids_populated_from_briefing_event(self) -> None: + """After mount(), emitting 'memory-mempalace:briefing_assembled' with + drawer_ids must populate the interject hook's _briefed_ids set. + + 1. Find hook via prompt:submit registered handler (bound method). + 2. Assert _briefed_ids starts empty. + 3. Emit briefing_assembled with drawer_ids=['d-1', 'd-2']. + 4. Assert _briefed_ids == {'d-1', 'd-2'}. + """ + import amplifier_module_hooks_mempalace_interject as m # type: ignore[import] + + coordinator = FakeCoordinator() + await m.mount(coordinator) + + # Find hook via the prompt:submit registered handler (bound method) + handlers = coordinator.hooks._registered.get("prompt:submit", []) + assert handlers, "Expected prompt:submit handler registered after mount()" + handler = handlers[0][0] # (handler, name, priority) + hook = handler.__self__ + + # Initially _briefed_ids must be empty + assert hook._briefed_ids == set(), ( + f"Expected _briefed_ids == set() before briefing event, " + f"got: {hook._briefed_ids}" + ) + + # Emit briefing_assembled — the registered listener must update _briefed_ids + await coordinator.hooks.emit( + "memory-mempalace:briefing_assembled", + {"drawer_ids": ["d-1", "d-2"]}, + ) + + assert hook._briefed_ids == {"d-1", "d-2"}, ( + f"Expected _briefed_ids == {{'d-1', 'd-2'}} after briefing event, " + f"got: {hook._briefed_ids}" + ) + + async def test_memory_surfaced_emits_to_coordinator(self) -> None: + """After mount(), calling on_prompt_submit with a matching memory must + emit exactly one 'memory-mempalace:memory_surfaced' event to the + coordinator with ok=True, trigger='prompt_submit', memory_ids=['m1']. + """ + import amplifier_module_hooks_mempalace_interject as m # type: ignore[import] + + coordinator = FakeCoordinator() + await m.mount(coordinator) + + # Find hook via prompt:submit registered handler + handlers = coordinator.hooks._registered.get("prompt:submit", []) + assert handlers, "Expected prompt:submit handler registered after mount()" + handler = handlers[0][0] # (handler, name, priority) + hook = handler.__self__ + + # Stub _retrieve_and_gate to return one matching memory + async def _fake_retrieve(query: str, event: str): # type: ignore[no-untyped-def] + return ([{"id": "m1", "text": "hello", "score": 0.9}], True, "", False) + + hook._retrieve_and_gate = _fake_retrieve # type: ignore[method-assign] + + # Call on_prompt_submit with a long-enough prompt + await hook.on_prompt_submit( + "prompt:submit", + {"prompt": "this is a long enough prompt to pass the length check"}, + ) + + # Assert exactly one 'memory-mempalace:memory_surfaced' bridge call + surfaced = [ + (name, data) + for name, data in coordinator.hooks._emit_log + if name == "memory-mempalace:memory_surfaced" + ] + assert len(surfaced) == 1, ( + f"Expected exactly one 'memory-mempalace:memory_surfaced' bridge call, " + f"got: {coordinator.hooks._emit_log}" + ) + _, payload = surfaced[0] + assert payload.get("ok") is True, f"Expected ok=True in payload, got: {payload}" + assert payload.get("trigger") == "prompt_submit", ( + f"Expected trigger='prompt_submit' in payload, got: {payload}" + ) + assert payload.get("memory_ids") == ["m1"], ( + f"Expected memory_ids=['m1'] in payload, got: {payload}" + ) + + +# --------------------------------------------------------------------------- +# Project-context hook — coordinator bridge tests +# --------------------------------------------------------------------------- + + +class TestProjectContextCoordinatorBridge: + """Tests for coordinator bridge wiring in the project-context hooks. + + These tests verify that mount() registers a contributor and that the hooks + emit coordinator events (coordination_read, coordination_scaffolded, + curator_handoff_requested) via bridge_emit. + """ + + def test_register_contributor_called_at_mount(self) -> None: + """mount() must call register_contributor on the coordinator with + channel='observability.events' and name='memory-mempalace-project-context'. + + The contributor callback must return a list of events that is a superset of: + - 'memory-mempalace:coordination_read' + - 'memory-mempalace:coordination_scaffolded' + - 'memory-mempalace:curator_handoff_requested' + """ + import asyncio + + import amplifier_module_hooks_project_context as m # type: ignore[import] + + coordinator = FakeCoordinator() + asyncio.run(m.mount(coordinator)) + + assert "observability.events" in coordinator._contributors, ( + "mount() must call register_contributor with channel 'observability.events'" + ) + contribs = coordinator._contributors["observability.events"] + assert "memory-mempalace-project-context" in contribs, ( + "mount() must register contributor with name 'memory-mempalace-project-context'" + ) + + callback = contribs["memory-mempalace-project-context"] + events = set(callback()) + required_events = { + "memory-mempalace:coordination_read", + "memory-mempalace:coordination_scaffolded", + "memory-mempalace:curator_handoff_requested", + } + assert required_events <= events, ( + f"contributor callback must include all of {required_events}, got: {events}" + ) + + def test_curator_handoff_requested_bridges_at_session_end( + self, monkeypatch: pytest.MonkeyPatch, tmp_path: Any + ) -> None: + """After a session:end event, the hook must emit exactly one + 'memory-mempalace:curator_handoff_requested' bridge call when + project-context dir exists. + """ + import asyncio + + import amplifier_module_hooks_project_context as m # type: ignore[import] + + # Create a fake project-context directory so the hook proceeds + pc_dir = tmp_path / "project-context" + pc_dir.mkdir() + + monkeypatch.setattr(m, "_find_project_context_dir", lambda: pc_dir) + + bridge_calls: list[tuple[str, Any]] = [] + + async def fake_bridge(event_name: str, payload: Any) -> None: + bridge_calls.append((event_name, payload)) + + hook = m.ProjectContextEndHook(bridge_emit=fake_bridge) + asyncio.run(hook("session:end", {"session_id": "sid"})) + + handoff_calls = [ + (name, payload) + for name, payload in bridge_calls + if name == "memory-mempalace:curator_handoff_requested" + ] + assert len(handoff_calls) == 1, ( + f"Expected exactly one 'memory-mempalace:curator_handoff_requested' bridge call, " + f"got: {bridge_calls}" + ) + + +# --------------------------------------------------------------------------- +# Tool-mempalace — coordinator bridge tests +# --------------------------------------------------------------------------- + + +class TestToolMempalaceCoordinatorBridge: + """Tests for coordinator bridge wiring in the palace tool. + + These tests verify that mount() registers a contributor and that garden + operations forward events to the coordinator via the combined_emit / + sync_bridge_emit bridge. + """ + + def test_register_contributor_called_at_mount(self) -> None: + """mount() must call register_contributor on the coordinator with + channel='observability.events' and name='memory-mempalace-tool'. + + The contributor callback must return a list of events that includes: + - 'memory-mempalace:garden_completed' + - 'memory-mempalace:garden_progress' + """ + import asyncio + + import amplifier_module_tool_mempalace as m # type: ignore[import] + + coordinator = FakeCoordinator() + + asyncio.run(m.mount(coordinator)) + + assert "observability.events" in coordinator._contributors, ( + "mount() must call register_contributor with channel 'observability.events'" + ) + contribs = coordinator._contributors["observability.events"] + assert "memory-mempalace-tool" in contribs, ( + "mount() must register contributor with name 'memory-mempalace-tool'" + ) + + callback = contribs["memory-mempalace-tool"] + events = callback() + assert "memory-mempalace:garden_completed" in events, ( + "contributor callback must include 'memory-mempalace:garden_completed'" + ) + assert "memory-mempalace:garden_progress" in events, ( + "contributor callback must include 'memory-mempalace:garden_progress'" + ) diff --git a/tests/test_hook_emissions.py b/tests/test_hook_emissions.py index 4641796..dcf4641 100644 --- a/tests/test_hook_emissions.py +++ b/tests/test_hook_emissions.py @@ -128,9 +128,10 @@ def test_capture_emits_queued_synchronously( _drain() filed = [e for e in emitted if e[0][1] == "drawer_filed"] assert len(filed) == 1, f"Expected drawer_filed after drain in {emitted}" - assert filed[0][1].get("data", {}).get("capture_id") == kwargs["data"][ - "capture_id" - ] + assert ( + filed[0][1].get("data", {}).get("capture_id") + == kwargs["data"]["capture_id"] + ) assert "wing" in filed[0][1].get("data", {}) def test_capture_returns_fast_under_slow_drawer_write( @@ -295,23 +296,19 @@ def test_briefing_emits_on_assemble(self, monkeypatch: pytest.MonkeyPatch) -> No emitted: list[tuple[Any, ...]] = [] monkeypatch.setattr(m, "emit_event", lambda *a, **kw: emitted.append((a, kw))) - def fake_run( - cmd: Any, *a: Any, **kw: Any - ) -> subprocess.CompletedProcess[str]: + def fake_run(cmd: Any, *a: Any, **kw: Any) -> subprocess.CompletedProcess[str]: return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") monkeypatch.setattr(m.subprocess, "run", fake_run) monkeypatch.setattr( m, "_build_briefing", - lambda **kw: ("## Briefing\ntest content", ["semantic"], 100, 3, 3), + lambda **kw: ("## Briefing\ntest content", ["semantic"], 100, [], []), ) monkeypatch.setattr(m, "_detect_project_name", lambda: "testproject") hook = m.MempalaceBriefingHook() - result = asyncio.run( - hook("session:start", {"opening_prompt": "start working"}) - ) + result = asyncio.run(hook("session:start", {"opening_prompt": "start working"})) assert result.action == "inject_context" assembled = [e for e in emitted if e[0][1] == "briefing_assembled"] @@ -322,6 +319,8 @@ def fake_run( data = kwargs.get("data", {}) assert "project" in data assert "section_count" in data + assert isinstance(data["results_fetched"], int) + assert isinstance(data["results_after_rerank"], int) assert data["results_fetched"] == data["results_after_rerank"] assert data["importance_weight"] == 1.0 @@ -385,7 +384,12 @@ async def test_interject_emits_on_surface( hook = m.MempalaceInterjectHook({}) memories = [ - {"id": "mem_1", "text": "test memory content", "score": 0.85, "metadata": {}} + { + "id": "mem_1", + "text": "test memory content", + "score": 0.85, + "metadata": {}, + } ] hook._retrieve_and_gate = AsyncMock( # type: ignore[method-assign]