diff --git a/docs/memory.md b/docs/memory.md index bea56c8..3585e0e 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -1,70 +1,167 @@ -# Octopal Hybrid Memory System +# Octopal Memory -Octopal uses a sophisticated three-tier hybrid memory architecture designed to balance immediate conversational context, semantic retrieval of past events, and stable long-term knowledge ("Crystallization"). +Octopal remembers things in a few different ways, because not all memory is the same. -## 1. Architecture Overview +Some things are only useful for the current conversation. Some are useful later if they become relevant again. Some should become durable knowledge that survives across sessions. And some things change over time, so the system needs to distinguish between "this was true before" and "this is true now". -Memory is divided into three distinct layers: +The goal of the memory system is simple: -| Layer | Storage | Retrieval Strategy | Purpose | -| :--- | :--- | :--- | :--- | -| **Temporal** | SQLite | Last N messages | Immediate conversational flow. | -| **Semantic** | SQLite + Vectors | Top-K Cosine Similarity | Recalling relevant past events/conversations. | -| **Canonical** | Filesystem (`.md`) | Hybrid (Inject + Search) | Crystallized knowledge, decisions, and lessons. | +- keep Octo grounded in the current conversation +- help Octo recall relevant past work +- preserve important long-term knowledge +- reduce prompt noise instead of endlessly stuffing more text into context ---- +## The Short Version -## 2. The Canonical Layer (`memory/canon/`) +Octopal memory has five high-level parts: -This is the "Stable" memory of the agent. Unlike the SQLite event store, the Canonical layer is curated by the Octo and is not subject to automatic expiration. +1. recent conversation memory +2. searchable past memory +3. durable canon files +4. current facts derived from durable knowledge and observed history +5. reset and continuity notes that help Octo resume work after context resets -### Core Files -- `facts.md`: Verified truths about the user, project, or world. -- `decisions.md`: Architectural choices, policy rulings, and persistent preferences. -- `failures.md`: Lessons learned from errors to prevent repetition. +These parts work together. Not everything is loaded all the time. -### Context Tiers -1. **Tier 1 (Automatic Injection):** `decisions.md` and `failures.md` are always injected into the Octo's system prompt (summarized if they exceed size limits). -2. **Tier 2 (Semantic Search):** The Octo uses the `search_canon` tool to query `facts.md` and other canon files when she needs specific details. +## 1. Recent Conversation Memory ---- +This is the short-term memory for active chats. -## 3. Decoupled Vector Storage +It keeps track of recent user messages, assistant replies, worker outcomes, and other nearby context that matters for the current flow. This is what helps Octo stay coherent inside an ongoing conversation without having to rediscover everything from scratch. -To ensure scalability and model independence, Octopal decouples text storage from the vector index. +This layer is useful for: -- **Table `memory_entries`**: Stores the raw text, role, and metadata. -- **Table `memory_embeddings`**: Stores vectors mapped to entries by UUID, including the model name used for embedding. -- **Table `canon_embeddings`**: Stores semantic chunks of the canonical files for Tier 2 retrieval. +- following the current thread +- avoiding repeating the same reply +- keeping track of what was just said or done -This allows for: -- **Re-embedding:** Switching embedding models without losing the original text. -- **Multi-model support:** Using different models for different types of retrieval. +This memory is not meant to become permanent truth by itself. ---- +## 2. Searchable Past Memory -## 4. The Memory Contract +Octopal also keeps a broader searchable memory of past messages and events. -Octopal enforces a strict hierarchy for writing to memory: +When the current conversation is not enough, the system can pull in relevant past material. It does not just blindly replay old chat logs. It tries to retrieve the pieces that are most useful for the current question. -1. **Workers (Propose):** Workers cannot modify the Canon. They use the `propose_knowledge` tool to flag facts or lessons. -2. **Octo (Curate):** The Octo reviews worker proposals and her own experiences. She uses `manage_canon` to "crystallize" information into the `.md` files. -3. **Automatic (Log):** All raw interactions are automatically logged to the SQLite temporal/semantic store. +This layer is useful for: ---- +- recalling old discussions +- finding earlier solutions +- remembering past attempts, problems, and trade-offs -## 5. Maintenance & Guardrails +To make this cleaner, memory entries are also tagged at a high level when possible, for example as: -- **Compaction:** If a canonical file exceeds 4,000 characters, the `CanonService` issues a warning to the Octo, who is then responsible for summarizing and refactoring the file. -- **Cleanup:** Ephemeral SQLite memory is pruned according to settings (default: entries older than 30 days or exceeding 1,000 records). Canonical memory is **never** automatically deleted. +- decisions +- preferences +- problems +- milestones +- emotional or human-context moments -## 6. Quality Controls (Recent Improvements) +That helps Octo search more intelligently instead of treating every memory as the same kind of thing. -Octopal now applies additional quality controls in the transient/semantic memory pipeline: +## 3. Canon Files -- **Deduplication on Write:** Exact normalized duplicates in recent same-chat history are skipped to reduce noise. -- **Contradiction Tagging:** Simple assertion conflicts (for example, `X is Y` vs `X is not Y`) are flagged in metadata (`contradiction_detected`, `contradiction_with`) instead of silently treated as equal truth. -- **Confidence Scoring:** Entries carry a confidence value in metadata (role-defaulted and adjustable), which influences retrieval rank. -- **Recency Weighting:** Semantic retrieval applies a recency decay factor so newer relevant memories are favored while still retaining long-term recall. +The canon is the durable memory of the system. -These controls improve retrieval precision while preserving auditability of conflicting observations. +These are the files under `workspace/memory/canon/`, especially: + +- `facts.md` +- `decisions.md` +- `failures.md` + +Think of canon as crystallized knowledge. It is not just "whatever happened in chat". It is the part that should keep mattering later. + +In practice: + +- `facts.md` holds durable truths worth remembering +- `decisions.md` holds important choices and policies +- `failures.md` holds lessons learned and mistakes not worth repeating + +Canon is intentionally more stable and curated than normal chat memory. + +## 4. Current Facts + +Some knowledge changes over time. + +For example: + +- which tool is currently preferred +- which provider is active now +- whether something is working or broken +- what the current next step is + +For this, Octopal now keeps a separate facts layer. This layer is meant to answer questions like: + +- what seems true right now? +- what used to be true, but no longer is? +- what current fact is worth surfacing without dragging in a lot of old raw conversation? + +This keeps Octo from having to infer the present state from a pile of old messages every time. + +## 5. Reset and Continuity Memory + +Long-running agent work sometimes needs context resets. + +When that happens, Octopal stores structured handoff and continuity notes so Octo can wake up with a useful summary instead of just losing the thread. + +This includes things like: + +- current goal +- open threads +- constraints +- next step +- a short reflection on what mattered before the reset + +This layer is not the same as canon. It is there to preserve momentum and continuity. + +## How Memory Gets Used + +Octo does not load all memory all the time. + +Very roughly, the system works like this: + +- durable canon is always important +- recent history is used for current conversation flow +- searchable memory is pulled in when needed +- current facts are used when the question is about active state +- reset continuity notes are mainly used after a context reset + +This keeps the system practical. The point is not to remember everything equally. The point is to remember the right things at the right time. + +## What Gets Saved Automatically + +A lot of memory is created automatically: + +- conversation entries +- assistant and worker outcomes +- semantic/searchable memory +- fact candidates inferred from memory +- continuity notes created during context reset + +This means the system can improve recall without requiring constant manual upkeep. + +## What Still Needs Curation + +Not every remembered thing should become durable knowledge. + +The canon is still the place for carefully chosen long-term knowledge. That means: + +- raw chat history is not automatically the same as truth +- temporary observations are not automatically permanent facts +- reflection notes are not automatically canon + +This separation matters. It helps Octo remember more without turning memory into a junk drawer. + +## Why This Design Helps + +The memory system is designed to make Octo better in a practical way: + +- less likely to forget important project decisions +- less likely to drag irrelevant old context into the prompt +- better at recalling current state versus historical state +- better at resuming work after long tasks or resets +- better at preserving useful lessons without bloating every conversation + +In short: + +Octopal tries to remember broadly, use memory selectively, and keep durable knowledge clean. diff --git a/src/octopal/cli/main.py b/src/octopal/cli/main.py index 8b52a6c..5fe4e7a 100644 --- a/src/octopal/cli/main.py +++ b/src/octopal/cli/main.py @@ -843,6 +843,14 @@ def memory_stats() -> None: with console.status("[bold green]Analyzing memory...[/bold green]"): entries = store.list_memory_entries(limit=1000000) # Get all for stats + facts = store.list_memory_facts( + settings.memory_owner_id, + limit=1000000, + ) + diary_entries = store.list_octo_diary_entries( + settings.memory_owner_id, + limit=1000000, + ) total = len(entries) @@ -861,8 +869,18 @@ def memory_stats() -> None: if chat_id: by_chat[chat_id] = by_chat.get(chat_id, 0) + 1 + facts_by_status: dict[str, int] = {} + for fact in facts: + facts_by_status[fact.status] = facts_by_status.get(fact.status, 0) + 1 + console.print("\n") console.print(Align.center(f"[bold white]Total Memory Entries:[/bold white] [bright_cyan]{total}[/bright_cyan] [dim]|[/dim] [bold white]Unique Chats:[/bold white] [bright_cyan]{len(by_chat)}[/bright_cyan]")) + console.print( + Align.center( + f"[bold white]Memory Facts:[/bold white] [bright_cyan]{len(facts)}[/bright_cyan] [dim]|[/dim] " + f"[bold white]Diary Entries:[/bold white] [bright_cyan]{len(diary_entries)}[/bright_cyan]" + ) + ) role_table = Table(title="Entries by Role", border_style="bright_blue", show_header=True, expand=False) role_table.add_column("Role", style="magenta", width=20) @@ -873,6 +891,15 @@ def memory_stats() -> None: console.print("\n") console.print(Align.center(role_table)) + + if facts_by_status: + facts_table = Table(title="Facts by Status", border_style="bright_blue", show_header=True, expand=False) + facts_table.add_column("Status", style="magenta", width=20) + facts_table.add_column("Count", style="bright_green", justify="right", width=10) + for status, count in sorted(facts_by_status.items()): + facts_table.add_row(status, str(count)) + console.print("\n") + console.print(Align.center(facts_table)) console.print("\n") diff --git a/src/octopal/infrastructure/store/base.py b/src/octopal/infrastructure/store/base.py index 197e3f5..44c03cc 100644 --- a/src/octopal/infrastructure/store/base.py +++ b/src/octopal/infrastructure/store/base.py @@ -7,6 +7,9 @@ AuditEvent, IntentRecord, MemoryEntry, + MemoryFactRecord, + MemoryFactSourceRecord, + OctoDiaryEntryRecord, PermitRecord, WorkerRecord, WorkerTemplateRecord, @@ -78,6 +81,36 @@ def search_memory_entries_lexical( def cleanup_old_memory(self, keep_days: int = 30, keep_count: int = 1000) -> int: ... def delete_memory_entries_by_chat(self, chat_id: int, keep_recent: int = 0) -> int: ... + def upsert_memory_fact(self, record: MemoryFactRecord) -> None: ... + + def list_memory_facts( + self, + owner_id: str, + *, + limit: int = 100, + status: str | None = None, + subject: str | None = None, + key: str | None = None, + source_kind: str | None = None, + source_ref: str | None = None, + ) -> list[MemoryFactRecord]: ... + + def invalidate_memory_fact(self, fact_id: str, valid_to: datetime, status: str = "invalidated") -> None: ... + + def add_memory_fact_source(self, record: MemoryFactSourceRecord) -> None: ... + + def list_memory_fact_sources(self, fact_id: str) -> list[MemoryFactSourceRecord]: ... + + def add_octo_diary_entry(self, record: OctoDiaryEntryRecord) -> None: ... + + def list_octo_diary_entries( + self, + owner_id: str, + *, + chat_id: int | None = None, + limit: int = 20, + ) -> list[OctoDiaryEntryRecord]: ... + def is_chat_bootstrapped(self, chat_id: int) -> bool: ... def mark_chat_bootstrapped(self, chat_id: int, ts: datetime) -> None: ... diff --git a/src/octopal/infrastructure/store/models.py b/src/octopal/infrastructure/store/models.py index ed79b63..1b6fc2e 100644 --- a/src/octopal/infrastructure/store/models.py +++ b/src/octopal/infrastructure/store/models.py @@ -94,3 +94,46 @@ class MemoryEntry(BaseModel): embedding: list[float] | None = None created_at: datetime metadata: dict[str, Any] = Field(default_factory=dict) + + +class MemoryFactRecord(BaseModel): + model_config = ConfigDict(frozen=True) + + id: str + owner_id: str + subject: str + key: str + value_text: str + value_json: dict[str, Any] | None = None + fact_type: str + confidence: float + status: str + valid_from: datetime | None = None + valid_to: datetime | None = None + facets: list[str] = Field(default_factory=list) + source_kind: str | None = None + source_ref: str | None = None + created_at: datetime + updated_at: datetime + + +class MemoryFactSourceRecord(BaseModel): + model_config = ConfigDict(frozen=True) + + fact_id: str + memory_entry_uuid: str | None = None + canon_filename: str | None = None + source_note: str | None = None + created_at: datetime + + +class OctoDiaryEntryRecord(BaseModel): + model_config = ConfigDict(frozen=True) + + id: str + owner_id: str + chat_id: int | None = None + kind: str + summary: str + details: dict[str, Any] = Field(default_factory=dict) + created_at: datetime diff --git a/src/octopal/infrastructure/store/sqlite.py b/src/octopal/infrastructure/store/sqlite.py index 5961f11..6f8db21 100644 --- a/src/octopal/infrastructure/store/sqlite.py +++ b/src/octopal/infrastructure/store/sqlite.py @@ -14,6 +14,9 @@ AuditEvent, IntentRecord, MemoryEntry, + MemoryFactRecord, + MemoryFactSourceRecord, + OctoDiaryEntryRecord, PermitRecord, WorkerRecord, WorkerTemplateRecord, @@ -181,6 +184,57 @@ def _init_schema(self) -> None: CREATE INDEX IF NOT EXISTS ix_canon_embeddings_filename ON canon_embeddings (filename); + CREATE TABLE IF NOT EXISTS memory_facts ( + id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + subject TEXT NOT NULL, + key TEXT NOT NULL, + value_text TEXT NOT NULL, + value_json TEXT, + fact_type TEXT NOT NULL, + confidence REAL NOT NULL, + status TEXT NOT NULL, + valid_from TEXT, + valid_to TEXT, + facets_json TEXT NOT NULL, + source_kind TEXT, + source_ref TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS ix_memory_facts_owner_subject_key_status + ON memory_facts (owner_id, subject, key, status); + CREATE INDEX IF NOT EXISTS ix_memory_facts_owner_status_valid_to + ON memory_facts (owner_id, status, valid_to); + CREATE INDEX IF NOT EXISTS ix_memory_facts_source + ON memory_facts (source_kind, source_ref, status); + + CREATE TABLE IF NOT EXISTS memory_fact_sources ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + fact_id TEXT NOT NULL, + memory_entry_uuid TEXT, + canon_filename TEXT, + source_note TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY(fact_id) REFERENCES memory_facts(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS ix_memory_fact_sources_fact_id ON memory_fact_sources (fact_id); + + CREATE TABLE IF NOT EXISTS octo_diary_entries ( + id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + chat_id INTEGER, + kind TEXT NOT NULL, + summary TEXT NOT NULL, + details_json TEXT NOT NULL, + created_at TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS ix_octo_diary_entries_owner_chat_created + ON octo_diary_entries (owner_id, chat_id, created_at DESC); + CREATE TABLE IF NOT EXISTS chat_state ( chat_id INTEGER PRIMARY KEY, bootstrapped_at TEXT, @@ -234,6 +288,67 @@ def _ensure_schema_upgrades(self) -> None: """ ) self._conn.execute("CREATE INDEX IF NOT EXISTS ix_canon_embeddings_filename ON canon_embeddings (filename)") + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS memory_facts ( + id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + subject TEXT NOT NULL, + key TEXT NOT NULL, + value_text TEXT NOT NULL, + value_json TEXT, + fact_type TEXT NOT NULL, + confidence REAL NOT NULL, + status TEXT NOT NULL, + valid_from TEXT, + valid_to TEXT, + facets_json TEXT NOT NULL, + source_kind TEXT, + source_ref TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + self._conn.execute( + "CREATE INDEX IF NOT EXISTS ix_memory_facts_owner_subject_key_status ON memory_facts (owner_id, subject, key, status)" + ) + self._conn.execute( + "CREATE INDEX IF NOT EXISTS ix_memory_facts_owner_status_valid_to ON memory_facts (owner_id, status, valid_to)" + ) + self._conn.execute( + "CREATE INDEX IF NOT EXISTS ix_memory_facts_source ON memory_facts (source_kind, source_ref, status)" + ) + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS memory_fact_sources ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + fact_id TEXT NOT NULL, + memory_entry_uuid TEXT, + canon_filename TEXT, + source_note TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY(fact_id) REFERENCES memory_facts(id) ON DELETE CASCADE + ) + """ + ) + self._conn.execute("CREATE INDEX IF NOT EXISTS ix_memory_fact_sources_fact_id ON memory_fact_sources (fact_id)") + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS octo_diary_entries ( + id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL, + chat_id INTEGER, + kind TEXT NOT NULL, + summary TEXT NOT NULL, + details_json TEXT NOT NULL, + created_at TEXT NOT NULL + ) + """ + ) + self._conn.execute( + "CREATE INDEX IF NOT EXISTS ix_octo_diary_entries_owner_chat_created ON octo_diary_entries (owner_id, chat_id, created_at DESC)" + ) self._conn.commit() except sqlite3.OperationalError: pass @@ -806,6 +921,149 @@ def list_canon_embeddings(self, filename: str | None = None) -> list[dict[str, A for row in rows ] + def upsert_memory_fact(self, record: MemoryFactRecord) -> None: + self._conn.execute( + """ + INSERT OR REPLACE INTO memory_facts ( + id, owner_id, subject, key, value_text, value_json, fact_type, confidence, + status, valid_from, valid_to, facets_json, source_kind, source_ref, + created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + record.id, + record.owner_id, + record.subject, + record.key, + record.value_text, + json.dumps(record.value_json) if record.value_json is not None else None, + record.fact_type, + float(record.confidence), + record.status, + record.valid_from.isoformat() if record.valid_from else None, + record.valid_to.isoformat() if record.valid_to else None, + json.dumps(record.facets), + record.source_kind, + record.source_ref, + record.created_at.isoformat(), + record.updated_at.isoformat(), + ), + ) + self._conn.commit() + + def list_memory_facts( + self, + owner_id: str, + *, + limit: int = 100, + status: str | None = None, + subject: str | None = None, + key: str | None = None, + source_kind: str | None = None, + source_ref: str | None = None, + ) -> list[MemoryFactRecord]: + query = ["SELECT * FROM memory_facts WHERE owner_id = ?"] + params: list[Any] = [owner_id] + if status is not None: + query.append("AND status = ?") + params.append(status) + if subject is not None: + query.append("AND subject = ?") + params.append(subject) + if key is not None: + query.append("AND key = ?") + params.append(key) + if source_kind is not None: + query.append("AND source_kind = ?") + params.append(source_kind) + if source_ref is not None: + query.append("AND source_ref = ?") + params.append(source_ref) + query.append("ORDER BY updated_at DESC LIMIT ?") + params.append(limit) + cursor = self._conn.execute(" ".join(query), tuple(params)) + return [self._row_to_memory_fact(row) for row in cursor.fetchall()] + + def invalidate_memory_fact(self, fact_id: str, valid_to: datetime, status: str = "invalidated") -> None: + self._conn.execute( + """ + UPDATE memory_facts + SET status = ?, valid_to = ?, updated_at = ? + WHERE id = ? + """, + (status, valid_to.isoformat(), utc_now().isoformat(), fact_id), + ) + self._conn.commit() + + def add_memory_fact_source(self, record: MemoryFactSourceRecord) -> None: + self._conn.execute( + """ + INSERT INTO memory_fact_sources (fact_id, memory_entry_uuid, canon_filename, source_note, created_at) + VALUES (?, ?, ?, ?, ?) + """, + ( + record.fact_id, + record.memory_entry_uuid, + record.canon_filename, + record.source_note, + record.created_at.isoformat(), + ), + ) + self._conn.commit() + + def list_memory_fact_sources(self, fact_id: str) -> list[MemoryFactSourceRecord]: + cursor = self._conn.execute( + "SELECT * FROM memory_fact_sources WHERE fact_id = ? ORDER BY id ASC", + (fact_id,), + ) + return [self._row_to_memory_fact_source(row) for row in cursor.fetchall()] + + def add_octo_diary_entry(self, record: OctoDiaryEntryRecord) -> None: + self._conn.execute( + """ + INSERT OR REPLACE INTO octo_diary_entries (id, owner_id, chat_id, kind, summary, details_json, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + record.id, + record.owner_id, + record.chat_id, + record.kind, + record.summary, + json.dumps(record.details), + record.created_at.isoformat(), + ), + ) + self._conn.commit() + + def list_octo_diary_entries( + self, + owner_id: str, + *, + chat_id: int | None = None, + limit: int = 20, + ) -> list[OctoDiaryEntryRecord]: + if chat_id is None: + cursor = self._conn.execute( + """ + SELECT * FROM octo_diary_entries + WHERE owner_id = ? + ORDER BY created_at DESC LIMIT ? + """, + (owner_id, limit), + ) + else: + cursor = self._conn.execute( + """ + SELECT * FROM octo_diary_entries + WHERE owner_id = ? AND (chat_id = ? OR chat_id IS NULL) + ORDER BY created_at DESC LIMIT ? + """, + (owner_id, chat_id, limit), + ) + return [self._row_to_octo_diary_entry(row) for row in cursor.fetchall()] + def cleanup_old_memory(self, keep_days: int = 30, keep_count: int = 1000) -> int: """ Cleanup old memory entries to prevent database bloat. @@ -1028,6 +1286,46 @@ def _row_to_memory(self, row: sqlite3.Row) -> MemoryEntry: metadata=_loads_json(row["metadata_json"]), ) + def _row_to_memory_fact(self, row: sqlite3.Row) -> MemoryFactRecord: + return MemoryFactRecord( + id=row["id"], + owner_id=row["owner_id"], + subject=row["subject"], + key=row["key"], + value_text=row["value_text"], + value_json=_loads_json(row["value_json"]), + fact_type=row["fact_type"], + confidence=float(row["confidence"]), + status=row["status"], + valid_from=_parse_dt(row["valid_from"]) if row["valid_from"] else None, + valid_to=_parse_dt(row["valid_to"]) if row["valid_to"] else None, + facets=_loads_json(row["facets_json"], []), + source_kind=_row_get(row, "source_kind"), + source_ref=_row_get(row, "source_ref"), + created_at=_parse_dt(row["created_at"]), + updated_at=_parse_dt(row["updated_at"]), + ) + + def _row_to_memory_fact_source(self, row: sqlite3.Row) -> MemoryFactSourceRecord: + return MemoryFactSourceRecord( + fact_id=row["fact_id"], + memory_entry_uuid=_row_get(row, "memory_entry_uuid"), + canon_filename=_row_get(row, "canon_filename"), + source_note=_row_get(row, "source_note"), + created_at=_parse_dt(row["created_at"]), + ) + + def _row_to_octo_diary_entry(self, row: sqlite3.Row) -> OctoDiaryEntryRecord: + return OctoDiaryEntryRecord( + id=row["id"], + owner_id=row["owner_id"], + chat_id=_row_get(row, "chat_id"), + kind=row["kind"], + summary=row["summary"], + details=_loads_json(row["details_json"], {}), + created_at=_parse_dt(row["created_at"]), + ) + def _backfill_memory_scope_columns(self) -> None: cursor = self._conn.execute( """ diff --git a/src/octopal/runtime/app.py b/src/octopal/runtime/app.py index 1c7bf31..709856f 100644 --- a/src/octopal/runtime/app.py +++ b/src/octopal/runtime/app.py @@ -9,6 +9,8 @@ from octopal.infrastructure.providers.openai_embeddings import OpenAIEmbeddingsProvider from octopal.infrastructure.store.sqlite import SQLiteStore from octopal.runtime.memory.canon import CanonService +from octopal.runtime.memory.facts import FactsService +from octopal.runtime.memory.reflection import ReflectionService from octopal.runtime.memory.service import MemoryService from octopal.runtime.octo.core import Octo from octopal.runtime.policy.engine import PolicyEngine @@ -43,6 +45,14 @@ def build_octo(settings: Settings) -> Octo: ) approvals = ApprovalManager(bot=None) embeddings = OpenAIEmbeddingsProvider(settings) if settings.openai_api_key else None + facts = FactsService( + store=store, + owner_id=settings.memory_owner_id, + ) + reflection = ReflectionService( + store=store, + owner_id=settings.memory_owner_id, + ) memory = MemoryService( store=store, embeddings=embeddings, @@ -51,11 +61,13 @@ def build_octo(settings: Settings) -> Octo: prefilter_k=settings.memory_prefilter_k, min_score=settings.memory_min_score, max_chars=settings.memory_max_chars, + facts=facts, ) canon = CanonService( workspace_dir=settings.workspace_dir, store=store, embeddings=embeddings, + facts=facts, ) scheduler = SchedulerService(store=store, workspace_dir=settings.workspace_dir) @@ -75,6 +87,8 @@ def build_octo(settings: Settings) -> Octo: approvals=approvals, memory=memory, canon=canon, + facts=facts, + reflection=reflection, scheduler=scheduler, mcp_manager=mcp_manager, connector_manager=connector_manager, diff --git a/src/octopal/runtime/memory/canon.py b/src/octopal/runtime/memory/canon.py index 64b6918..a9bdce9 100644 --- a/src/octopal/runtime/memory/canon.py +++ b/src/octopal/runtime/memory/canon.py @@ -14,6 +14,7 @@ if TYPE_CHECKING: from octopal.infrastructure.providers.embeddings import EmbeddingsProvider from octopal.infrastructure.store.base import Store + from octopal.runtime.memory.facts import FactsService logger = structlog.get_logger(__name__) @@ -24,6 +25,7 @@ class CanonService: store: Store embeddings: EmbeddingsProvider | None = None max_file_chars: int = 4000 # Guardrail for canon bloat + facts: FactsService | None = None def __post_init__(self) -> None: self.canon_dir = self.workspace_dir / "memory" / "canon" @@ -35,6 +37,16 @@ def __post_init__(self) -> None: if not path.exists(): path.write_text(f"# {filename.replace('.md', '').title()}\n\n", encoding="utf-8") self._ensure_event_log_bootstrap() + if self.facts is not None: + try: + self.facts.prune_unsupported_canon_facts() + except Exception: + logger.exception("Failed to prune unsupported canon facts on startup") + for filename in self.list_files(): + try: + self.facts.sync_verified_facts_from_canon(filename, self.read_canon(filename)) + except Exception: + logger.exception("Failed to sync canon facts on startup", filename=filename) def _normalize_filename(self, filename: str) -> str: candidate = filename.strip() @@ -61,6 +73,12 @@ async def write_canon(self, filename: str, content: str, mode: Literal["append", rebuilt = await asyncio.to_thread(self._compact_from_events) new_content = rebuilt.get(filename, "") + if self.facts is not None: + try: + await asyncio.to_thread(self.facts.sync_verified_facts_from_canon, filename, new_content) + except Exception: + logger.exception("Failed to sync canon facts", filename=filename) + # Trigger async re-indexing if self.embeddings: asyncio.create_task(self.index_canon(filename)) diff --git a/src/octopal/runtime/memory/facts.py b/src/octopal/runtime/memory/facts.py new file mode 100644 index 0000000..1e9353f --- /dev/null +++ b/src/octopal/runtime/memory/facts.py @@ -0,0 +1,327 @@ +from __future__ import annotations + +import hashlib +import re +from dataclasses import dataclass + +from octopal.infrastructure.store.base import Store +from octopal.infrastructure.store.models import ( + MemoryEntry, + MemoryFactRecord, + MemoryFactSourceRecord, +) +from octopal.runtime.memory.service import infer_memory_facets +from octopal.utils import utc_now + +_ASSERTION_RE = re.compile( + r"^\s*(?P.+?)\s+is\s+(?Pnot\s+)?(?P.+?)\s*[.!?]?\s*$", + re.IGNORECASE, +) +_TOKEN_RE = re.compile(r"[a-z0-9_]{3,}") +_SUPPORTED_CANON_FACT_FILES = {"facts.md", "decisions.md", "failures.md"} +_LOW_SIGNAL_SUBJECTS = { + "this", + "that", + "it", + "they", + "these", + "those", + "here", + "there", +} + + +@dataclass +class FactsService: + store: Store + owner_id: str = "default" + + def record_candidate_from_memory(self, entry: MemoryEntry) -> MemoryFactRecord | None: + metadata = entry.metadata or {} + if not metadata.get("fact_candidate"): + return None + + subject = _normalize_component(metadata.get("fact_subject_hint")) + value_text = _normalize_component(metadata.get("fact_value_hint")) + if not subject or not value_text: + extracted = _extract_assertion(entry.content) + if extracted is None: + return None + subject = subject or extracted["subject"] + value_text = value_text or extracted["value_text"] + + now = utc_now() + record = MemoryFactRecord( + id=_fact_id(self.owner_id, "memory", entry.id, subject, "is", value_text, "candidate"), + owner_id=self.owner_id, + subject=subject, + key="is", + value_text=value_text, + value_json=None, + fact_type="assertion", + confidence=float(metadata.get("confidence", 0.5) or 0.5), + status="candidate", + valid_from=entry.created_at, + valid_to=None, + facets=sorted(set(_clean_facets(metadata.get("memory_facets")))), + source_kind="memory", + source_ref=entry.id, + created_at=entry.created_at, + updated_at=now, + ) + self.store.upsert_memory_fact(record) + self.store.add_memory_fact_source( + MemoryFactSourceRecord( + fact_id=record.id, + memory_entry_uuid=entry.id, + canon_filename=None, + source_note="memory_candidate", + created_at=now, + ) + ) + return record + + def sync_verified_facts_from_canon(self, filename: str, content: str) -> dict[str, int]: + parsed = self._parse_canon_facts(filename, content) + parsed_by_id = {record.id: record for record in parsed} + existing_active = self.store.list_memory_facts( + self.owner_id, + limit=500, + status="active", + source_kind="canon", + source_ref=filename, + ) + + superseded = 0 + now = utc_now() + existing_ids = {record.id for record in existing_active} + for existing in existing_active: + if existing.id in parsed_by_id: + continue + self.store.invalidate_memory_fact(existing.id, now, status="superseded") + superseded += 1 + + for record in parsed: + self.store.upsert_memory_fact(record) + if record.id not in existing_ids: + self.store.add_memory_fact_source( + MemoryFactSourceRecord( + fact_id=record.id, + memory_entry_uuid=None, + canon_filename=filename, + source_note="canon_verified", + created_at=now, + ) + ) + + return {"active": len(parsed), "superseded": superseded} + + def prune_unsupported_canon_facts(self) -> int: + active = self.store.list_memory_facts( + self.owner_id, + limit=500, + status="active", + source_kind="canon", + ) + now = utc_now() + pruned = 0 + for record in active: + if (record.source_ref or "") in _SUPPORTED_CANON_FACT_FILES: + continue + self.store.invalidate_memory_fact(record.id, now, status="superseded") + pruned += 1 + return pruned + + def get_relevant_facts( + self, + query: str, + *, + memory_facets: list[str] | None = None, + limit: int = 3, + ) -> list[str]: + tokens = _tokenize(query) + if not tokens: + return [] + + active = self.store.list_memory_facts( + self.owner_id, + limit=max(limit * 12, 50), + status="active", + ) + if not active: + return [] + + requested_facets = set(_clean_facets(memory_facets)) + filtered = [ + record for record in active if set(record.facets) & requested_facets + ] if requested_facets else [] + candidates = filtered or active + + scored: list[tuple[tuple[int, float, str], MemoryFactRecord]] = [] + for record in candidates: + haystack = " ".join( + [ + record.subject, + record.key, + record.value_text, + record.fact_type, + " ".join(record.facets), + ] + ).lower() + overlap = sum(1 for token in tokens if token in haystack) + if overlap <= 0: + continue + scored.append(((overlap, float(record.confidence), record.updated_at.isoformat()), record)) + + scored.sort(key=lambda item: item[0], reverse=True) + return [_format_fact(record) for _, record in scored[:limit]] + + def _parse_canon_facts(self, filename: str, content: str) -> list[MemoryFactRecord]: + if filename not in _SUPPORTED_CANON_FACT_FILES: + return [] + lines = [line.strip() for line in content.splitlines()] + now = utc_now() + records: list[MemoryFactRecord] = [] + for raw_line in lines: + cleaned = _clean_canon_line(raw_line) + if not cleaned: + continue + assertion = _extract_assertion(cleaned) + if assertion is None: + continue + if not _should_accept_verified_assertion(cleaned, assertion): + continue + facets = set(_clean_facets(infer_memory_facets(cleaned))) + facets.discard("fact_candidate") + if filename == "decisions.md": + facets.add("decision") + elif filename == "failures.md": + facets.add("problem") + + record = MemoryFactRecord( + id=_fact_id( + self.owner_id, + "canon", + filename, + assertion["subject"], + "is_not" if assertion["negated"] else "is", + assertion["value_text"], + "active", + ), + owner_id=self.owner_id, + subject=assertion["subject"], + key="is_not" if assertion["negated"] else "is", + value_text=assertion["value_text"], + value_json=None, + fact_type=filename.replace(".md", ""), + confidence=0.95, + status="active", + valid_from=now, + valid_to=None, + facets=sorted(facets), + source_kind="canon", + source_ref=filename, + created_at=now, + updated_at=now, + ) + records.append(record) + return records + + +def _format_fact(record: MemoryFactRecord) -> str: + source = f" ({record.source_ref})" if record.source_ref else "" + return f"{record.subject} {record.key.replace('_', ' ')} {record.value_text}{source}" + + +def _fact_id( + owner_id: str, + source_kind: str, + source_ref: str, + subject: str, + key: str, + value_text: str, + status: str, +) -> str: + payload = "|".join([owner_id, source_kind, source_ref, subject, key, value_text, status]) + digest = hashlib.sha1(payload.encode("utf-8")).hexdigest() + return f"fact_{digest[:24]}" + + +def _extract_assertion(value: str) -> dict[str, str | bool] | None: + match = _ASSERTION_RE.match(value or "") + if not match: + return None + subject = _normalize_component(match.group("subject")) + predicate = _normalize_component(match.group("predicate")) + if not subject or not predicate: + return None + return { + "subject": subject, + "value_text": predicate, + "negated": bool(match.group("neg")), + } + + +def _clean_canon_line(value: str) -> str: + line = (value or "").strip() + if not line or line.startswith("#"): + return "" + line = re.sub(r"^[-*]\s+", "", line) + line = re.sub(r"^\d+[.)]\s+", "", line) + line = line.replace("**", "").replace("__", "").replace("`", "") + return line.strip() + + +def _normalize_component(value: object) -> str: + text = str(value or "").strip().lower() + text = re.sub(r"\s+", " ", text) + return text + + +def _clean_facets(value: object) -> list[str]: + if not isinstance(value, list): + return [] + result: list[str] = [] + for item in value: + text = str(item).strip() + if text: + result.append(text) + return result + + +def _tokenize(value: str) -> set[str]: + return {match.group(0) for match in _TOKEN_RE.finditer((value or "").lower())} + + +def _should_accept_verified_assertion( + original_line: str, + assertion: dict[str, str | bool], +) -> bool: + line = _normalize_component(original_line) + subject = str(assertion["subject"]) + predicate = str(assertion["value_text"]) + + if "?" in line: + return False + if "open question" in line: + return False + if re.search(r"[.!?]\s+\S", line): + return False + if len(line) > 160: + return False + + subject_tokens = _split_words(subject) + predicate_tokens = _split_words(predicate) + if not subject_tokens or not predicate_tokens: + return False + if len(subject_tokens) > 8 or len(predicate_tokens) > 16: + return False + if subject in _LOW_SIGNAL_SUBJECTS: + return False + if '"' in subject or "'" in subject: + return False + return '"' not in predicate + + +def _split_words(value: str) -> list[str]: + return re.findall(r"[a-z0-9_]+", (value or "").lower()) diff --git a/src/octopal/runtime/memory/reflection.py b/src/octopal/runtime/memory/reflection.py new file mode 100644 index 0000000..2c63153 --- /dev/null +++ b/src/octopal/runtime/memory/reflection.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from typing import Any + +from octopal.infrastructure.store.base import Store +from octopal.infrastructure.store.models import OctoDiaryEntryRecord +from octopal.utils import utc_now + + +@dataclass +class ReflectionService: + store: Store + owner_id: str = "default" + + def record_context_reset(self, chat_id: int, handoff: dict[str, Any]) -> OctoDiaryEntryRecord: + created_at = utc_now() + goal_now = str(handoff.get("goal_now", "") or "").strip() + next_step = str(handoff.get("next_step", "") or "").strip() + reason = str(handoff.get("reason", "") or "").strip() or "context reset" + summary_parts = [f"reset: {reason}"] + if goal_now: + summary_parts.append(f"goal: {goal_now}") + if next_step: + summary_parts.append(f"next: {next_step}") + summary = " | ".join(summary_parts)[:600] + + record = OctoDiaryEntryRecord( + id=str(uuid.uuid4()), + owner_id=self.owner_id, + chat_id=chat_id, + kind="context_reset", + summary=summary, + details={ + "reason": reason, + "goal_now": goal_now, + "next_step": next_step, + "open_threads": list(handoff.get("open_threads") or []), + "critical_constraints": list(handoff.get("critical_constraints") or []), + "health_snapshot": dict(handoff.get("health_snapshot") or {}), + }, + created_at=created_at, + ) + self.store.add_octo_diary_entry(record) + return record + + def list_recent(self, chat_id: int | None = None, limit: int = 5) -> list[OctoDiaryEntryRecord]: + return self.store.list_octo_diary_entries( + self.owner_id, + chat_id=chat_id, + limit=limit, + ) + + def build_wakeup_context(self, chat_id: int, limit: int = 2, max_chars: int = 600) -> str: + entries = self.list_recent(chat_id=chat_id, limit=limit) + if not entries: + return "" + lines = ["Recent reflection relevant to this wake-up:"] + for entry in reversed(entries): + lines.append(f"- {entry.summary}") + text = "\n".join(lines) + if len(text) <= max_chars: + return text + return text[: max_chars - 28].rstrip() + "\n...[reflection truncated]..." diff --git a/src/octopal/runtime/memory/service.py b/src/octopal/runtime/memory/service.py index 9a34a91..153d0f7 100644 --- a/src/octopal/runtime/memory/service.py +++ b/src/octopal/runtime/memory/service.py @@ -4,18 +4,43 @@ import math import re import uuid +from contextlib import suppress from dataclasses import dataclass -from typing import Any +from typing import TYPE_CHECKING, Any from octopal.infrastructure.providers.embeddings import EmbeddingsProvider from octopal.infrastructure.store.base import Store from octopal.infrastructure.store.models import MemoryEntry from octopal.utils import utc_now +if TYPE_CHECKING: + from octopal.runtime.memory.facts import FactsService + _ASSERTION_RE = re.compile( r"^\s*(?P.+?)\s+is\s+(?Pnot\s+)?(?P.+?)\s*[.!?]?\s*$", re.IGNORECASE, ) +_DECISION_PATTERNS = ( + re.compile(r"\b(decide|decided|choose|chose|picked|settled on|went with|switch(?:ed)? to|migrat(?:e|ed) to)\b", re.IGNORECASE), + re.compile(r"\b(instead of|rather than|trade-?off|the reason is|the reason was|because)\b", re.IGNORECASE), +) +_PREFERENCE_PATTERNS = ( + re.compile(r"\b(i prefer|we prefer|prefer to|always use|never use|please always|please never)\b", re.IGNORECASE), + re.compile(r"\b(i like|i don't like|i dont like|we always|we never)\b", re.IGNORECASE), +) +_MILESTONE_PATTERNS = ( + re.compile(r"\b(it works|it worked|got it working|fixed|solved|resolved|figured it out)\b", re.IGNORECASE), + re.compile(r"\b(implemented|shipped|deployed|launched|breakthrough|released)\b", re.IGNORECASE), +) +_PROBLEM_PATTERNS = ( + re.compile(r"\b(bug|error|crash|broken|issue|problem|failed|failing|stuck)\b", re.IGNORECASE), + re.compile(r"\b(doesn't work|doesnt work|not working|won't work|wont work|root cause|workaround)\b", re.IGNORECASE), + re.compile(r"\b(not healthy|unhealthy|degraded|down|outage)\b", re.IGNORECASE), +) +_EMOTIONAL_PATTERNS = ( + re.compile(r"\b(worried|afraid|proud|happy|sad|sorry|angry|grateful|excited)\b", re.IGNORECASE), + re.compile(r"\b(frustrated|confused|love|hate|i feel|i need|i wish)\b", re.IGNORECASE), +) @dataclass @@ -27,6 +52,7 @@ class MemoryService: prefilter_k: int = 80 min_score: float = 0.25 max_chars: int = 32000 + facts: FactsService | None = None async def add_message( self, @@ -50,6 +76,7 @@ async def add_message( merged_metadata = dict(metadata or {}) merged_metadata.setdefault("owner_id", self.owner_id) merged_metadata.setdefault("confidence", _default_confidence(role)) + _merge_enrichment_metadata(merged_metadata, trimmed) chat_id = _coerce_chat_id(merged_metadata.get("chat_id")) owner_id = str(merged_metadata.get("owner_id", self.owner_id)) @@ -79,8 +106,24 @@ async def add_message( metadata=merged_metadata, ) await asyncio.to_thread(self.store.add_memory_entry, entry) + if self.facts is not None: + with suppress(Exception): + await asyncio.to_thread(self.facts.record_candidate_from_memory, entry) async def get_context(self, query: str, exclude_chat_id: int | None = None) -> list[str]: + return await self.get_context_by_facets( + query, + exclude_chat_id=exclude_chat_id, + memory_facets=None, + ) + + async def get_context_by_facets( + self, + query: str, + *, + exclude_chat_id: int | None = None, + memory_facets: list[str] | None = None, + ) -> list[str]: if self.embeddings is None: return [] trimmed = query.strip() @@ -106,6 +149,9 @@ async def get_context(self, query: str, exclude_chat_id: int | None = None) -> l self.owner_id, max(self.prefilter_k, 200), ) + facet_filtered = _filter_entries_by_facets(candidates, memory_facets) + if facet_filtered: + candidates = facet_filtered scored: list[tuple[float, MemoryEntry]] = [] for entry in candidates: if not entry.embedding: @@ -209,6 +255,67 @@ def _default_confidence(role: str) -> float: return by_role.get((role or "").lower(), 0.6) +def _merge_enrichment_metadata(metadata: dict[str, Any], content: str) -> None: + facets = set(_coerce_str_list(metadata.get("memory_facets"))) + facets.update(infer_memory_facets(content)) + if metadata.get("fact_candidate") is False: + facets.discard("fact_candidate") + if facets: + metadata["memory_facets"] = sorted(facets) + + assertion = _extract_assertion(content) + if assertion is None: + return + + metadata.setdefault("fact_candidate", True) + metadata.setdefault("fact_subject_hint", assertion.subject) + metadata.setdefault("fact_value_hint", assertion.predicate) + metadata.setdefault("fact_negated", assertion.negated) + + +def infer_memory_facets(content: str) -> set[str]: + facets: set[str] = set() + for facet, patterns in ( + ("decision", _DECISION_PATTERNS), + ("preference", _PREFERENCE_PATTERNS), + ("milestone", _MILESTONE_PATTERNS), + ("problem", _PROBLEM_PATTERNS), + ("emotional", _EMOTIONAL_PATTERNS), + ): + if any(pattern.search(content) for pattern in patterns): + facets.add(facet) + if _extract_assertion(content) is not None: + facets.add("fact_candidate") + return facets + + +def _filter_entries_by_facets( + entries: list[MemoryEntry], + memory_facets: list[str] | None, +) -> list[MemoryEntry]: + requested = set(_coerce_str_list(memory_facets)) + if not requested: + return [] + + matches: list[MemoryEntry] = [] + for entry in entries: + entry_facets = set(_coerce_str_list((entry.metadata or {}).get("memory_facets"))) + if entry_facets & requested: + matches.append(entry) + return matches + + +def _coerce_str_list(value: Any) -> list[str]: + if not isinstance(value, list): + return [] + result: list[str] = [] + for item in value: + text = str(item).strip() + if text: + result.append(text) + return result + + def _coerce_confidence(value: Any, default: float) -> float: try: conf = float(value) diff --git a/src/octopal/runtime/octo/core.py b/src/octopal/runtime/octo/core.py index 367333f..02ef93a 100644 --- a/src/octopal/runtime/octo/core.py +++ b/src/octopal/runtime/octo/core.py @@ -29,7 +29,9 @@ ) from octopal.runtime.intents.types import ActionIntent from octopal.runtime.memory.canon import CanonService +from octopal.runtime.memory.facts import FactsService from octopal.runtime.memory.memchain import memchain_record +from octopal.runtime.memory.reflection import ReflectionService from octopal.runtime.memory.service import MemoryService from octopal.runtime.metrics import update_component_gauges from octopal.runtime.octo.delivery import ( @@ -709,6 +711,8 @@ class Octo: approvals: ApprovalManager memory: MemoryService canon: CanonService + facts: FactsService | None = None + reflection: ReflectionService | None = None scheduler: SchedulerService | None = None mcp_manager: MCPManager | None = None connector_manager: ConnectorManager | None = None @@ -1664,6 +1668,21 @@ async def request_context_reset(self, chat_id: int, args: dict[str, Any]) -> dic workspace_dir = Path(os.getenv("OCTOPAL_WORKSPACE_DIR", "workspace")).resolve() file_info = await asyncio.to_thread(_persist_context_reset_files, workspace_dir, handoff) + reflection_entry: dict[str, Any] | None = None + if self.reflection is not None: + try: + record = await asyncio.to_thread( + self.reflection.record_context_reset, + chat_id, + handoff, + ) + reflection_entry = { + "id": record.id, + "kind": record.kind, + "summary": record.summary, + } + except Exception: + logger.warning("Reflection record failed during context reset", chat_id=chat_id, exc_info=True) memchain_info: dict[str, Any] | None = None try: memchain_info = await asyncio.to_thread( @@ -1705,6 +1724,7 @@ async def request_context_reset(self, chat_id: int, args: dict[str, Any]) -> dic "requires_confirmation_for": requires_confirm_reasons, "health_snapshot": health, "files": file_info, + "reflection": reflection_entry or {}, "memchain": memchain_info or {}, }, ), @@ -1716,6 +1736,7 @@ async def request_context_reset(self, chat_id: int, args: dict[str, Any]) -> dic "deleted_entries": deleted_entries, "handoff": handoff, "files": file_info, + "reflection": reflection_entry or {}, "memchain": memchain_info or {}, "health_before": health, "requires_confirmation_for": requires_confirm_reasons, diff --git a/src/octopal/runtime/octo/prompt_builder.py b/src/octopal/runtime/octo/prompt_builder.py index 5761e89..319898a 100644 --- a/src/octopal/runtime/octo/prompt_builder.py +++ b/src/octopal/runtime/octo/prompt_builder.py @@ -9,11 +9,14 @@ from typing import TYPE_CHECKING from octopal.runtime.memory.memchain import memchain_verify +from octopal.runtime.memory.service import infer_memory_facets if TYPE_CHECKING: from octopal.infrastructure.providers.base import Message from octopal.infrastructure.store.base import Store from octopal.runtime.memory.canon import CanonService + from octopal.runtime.memory.facts import FactsService + from octopal.runtime.memory.reflection import ReflectionService from octopal.runtime.memory.service import MemoryService @@ -29,6 +32,16 @@ class BootstrapContext: files: list[tuple[str, int]] +@dataclass +class MemoryContextBundle: + canon_context: str + facts_context: list[str] + memory_context: list[str] + recent_history: list[tuple[str, str]] + prune_stats: dict[str, int] + selected_facets: list[str] + + async def _load_system_prompt_file() -> str: """Loads the content of octo_system.md.""" @@ -264,6 +277,60 @@ def _prune_recent_history_window( } +async def _build_memory_context_bundle( + memory: MemoryService, + canon: CanonService, + user_text: str, + chat_id: int, + facts: FactsService | None = None, +) -> MemoryContextBundle: + canon_context = await asyncio.to_thread(canon.get_tier1_context) + + selected_facets = sorted( + facet for facet in infer_memory_facets(user_text) if facet != "fact_candidate" + ) + facts_context: list[str] = [] + if facts is not None: + try: + facts_context = await asyncio.to_thread( + facts.get_relevant_facts, + user_text, + memory_facets=selected_facets or None, + ) + except Exception: + facts_context = [] + memory_getter = getattr(memory, "get_context_by_facets", None) + if callable(memory_getter): + memory_context = await memory_getter( + user_text, + exclude_chat_id=chat_id, + memory_facets=selected_facets or None, + ) + else: + memory_context = await memory.get_context(user_text, exclude_chat_id=chat_id) + + recent_history = await memory.get_recent_history(chat_id, limit=20) + if recent_history and recent_history[-1][0] == "user" and recent_history[-1][1] == user_text: + recent_history = recent_history[:-1] + max_history_chars = _env_int("OCTOPAL_CONTEXT_PRUNE_MAX_HISTORY_CHARS", 100_000, minimum=2_000) + keep_recent = _env_int("OCTOPAL_CONTEXT_PRUNE_KEEP_RECENT", 12, minimum=1) + per_message_chars = _env_int("OCTOPAL_CONTEXT_PRUNE_MESSAGE_CHARS", 32_000, minimum=500) + recent_history, prune_stats = _prune_recent_history_window( + recent_history, + max_history_chars=max_history_chars, + keep_recent=keep_recent, + per_message_chars=per_message_chars, + ) + return MemoryContextBundle( + canon_context=canon_context, + facts_context=facts_context, + memory_context=memory_context, + recent_history=recent_history, + prune_stats=prune_stats, + selected_facets=selected_facets, + ) + + async def build_octo_prompt( store: Store, memory: MemoryService, @@ -276,6 +343,8 @@ async def build_octo_prompt( saved_file_paths: list[str] | None = None, wake_notice: str = "", tool_policy_summary: str = "", + facts: FactsService | None = None, + reflection: ReflectionService | None = None, ) -> list[Message]: """Assembles all the pieces into the final message list for the LLM.""" @@ -299,22 +368,7 @@ async def build_octo_prompt( datetime_prompt = _current_datetime_prompt() - canon_context = await asyncio.to_thread(canon.get_tier1_context) - - memory_context = await memory.get_context(user_text, exclude_chat_id=chat_id) - - recent_history = await memory.get_recent_history(chat_id, limit=20) - if recent_history and recent_history[-1][0] == "user" and recent_history[-1][1] == user_text: - recent_history = recent_history[:-1] - max_history_chars = _env_int("OCTOPAL_CONTEXT_PRUNE_MAX_HISTORY_CHARS", 100_000, minimum=2_000) - keep_recent = _env_int("OCTOPAL_CONTEXT_PRUNE_KEEP_RECENT", 12, minimum=1) - per_message_chars = _env_int("OCTOPAL_CONTEXT_PRUNE_MESSAGE_CHARS", 32_000, minimum=500) - recent_history, prune_stats = _prune_recent_history_window( - recent_history, - max_history_chars=max_history_chars, - keep_recent=keep_recent, - per_message_chars=per_message_chars, - ) + memory_bundle = await _build_memory_context_bundle(memory, canon, user_text, chat_id, facts) messages: list[Message] = [Message(role="system", content=system_prompt)] if persona_prompt_lines: @@ -332,6 +386,16 @@ async def build_octo_prompt( ), ) ) + if reflection is not None: + try: + reflection_context = await asyncio.to_thread( + reflection.build_wakeup_context, + chat_id, + ) + except Exception: + reflection_context = "" + if reflection_context: + messages.append(Message(role="system", content=reflection_context)) messages.append(Message(role="system", content=datetime_prompt)) if tool_policy_summary.strip(): messages.append( @@ -341,29 +405,38 @@ async def build_octo_prompt( ) ) - if canon_context: - messages.append(Message(role="system", content=canon_context)) + if memory_bundle.canon_context: + messages.append(Message(role="system", content=memory_bundle.canon_context)) - if memory_context: + if memory_bundle.facts_context: messages.append( Message( - role="system", content="\n" + "\n".join(memory_context) + "\n" + role="system", + content="\n" + "\n".join(memory_bundle.facts_context) + "\n", + ) + ) + + if memory_bundle.memory_context: + messages.append( + Message( + role="system", + content="\n" + "\n".join(memory_bundle.memory_context) + "\n", ) ) - if prune_stats["trimmed"] > 0 or prune_stats["dropped"] > 0: + if memory_bundle.prune_stats["trimmed"] > 0 or memory_bundle.prune_stats["dropped"] > 0: messages.append( Message( role="system", content=( "Context pruning applied before inference:\n" - f"- trimmed_messages={prune_stats['trimmed']}\n" - f"- dropped_old_messages={prune_stats['dropped']}\n" - f"- history_chars_after_prune={prune_stats['total_chars']}" + f"- trimmed_messages={memory_bundle.prune_stats['trimmed']}\n" + f"- dropped_old_messages={memory_bundle.prune_stats['dropped']}\n" + f"- history_chars_after_prune={memory_bundle.prune_stats['total_chars']}" ), ) ) - if recent_history: - for role, content in recent_history: + if memory_bundle.recent_history: + for role, content in memory_bundle.recent_history: messages.append(Message(role=role, content=content)) if images: diff --git a/src/octopal/runtime/octo/router.py b/src/octopal/runtime/octo/router.py index bfa4453..eff0a28 100644 --- a/src/octopal/runtime/octo/router.py +++ b/src/octopal/runtime/octo/router.py @@ -196,6 +196,8 @@ async def route_or_reply( saved_file_paths=saved_file_paths, wake_notice=wake_notice, tool_policy_summary=tool_policy_summary, + facts=getattr(octo, "facts", None), + reflection=getattr(octo, "reflection", None), ) if not internal_followup: messages.append( diff --git a/tests/test_memory_facts.py b/tests/test_memory_facts.py new file mode 100644 index 0000000..052d5db --- /dev/null +++ b/tests/test_memory_facts.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path + +from octopal.infrastructure.store.models import MemoryFactRecord +from octopal.infrastructure.store.sqlite import SQLiteStore +from octopal.runtime.memory.canon import CanonService +from octopal.runtime.memory.facts import FactsService +from octopal.runtime.memory.service import MemoryService +from octopal.utils import utc_now + + +class _StoreSettings: + def __init__(self, state_dir: Path, workspace_dir: Path) -> None: + self.state_dir = state_dir + self.workspace_dir = workspace_dir + + +def test_memory_service_records_fact_candidates_in_store(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + facts = FactsService(store=store, owner_id="default") + service = MemoryService(store=store, embeddings=None, owner_id="default", facts=facts) + + async def scenario() -> None: + await service.add_message("assistant", "Service is healthy.", {"chat_id": 7}) + + asyncio.run(scenario()) + rows = store.list_memory_facts("default", status="candidate", limit=20) + assert len(rows) == 1 + assert rows[0].subject == "service" + assert rows[0].value_text == "healthy" + assert rows[0].source_kind == "memory" + + sources = store.list_memory_fact_sources(rows[0].id) + assert len(sources) == 1 + assert sources[0].memory_entry_uuid is not None + + +def test_canon_service_syncs_verified_facts(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + facts = FactsService(store=store, owner_id="default") + canon = CanonService( + workspace_dir=tmp_path / "workspace", + store=store, + embeddings=None, + facts=facts, + ) + + async def scenario() -> None: + await canon.write_canon("facts", "# Facts\n\nService is healthy.\n", "overwrite") + + asyncio.run(scenario()) + rows = store.list_memory_facts( + "default", + status="active", + source_kind="canon", + source_ref="facts.md", + limit=20, + ) + assert len(rows) == 1 + assert rows[0].subject == "service" + assert rows[0].value_text == "healthy" + + +def test_facts_service_returns_relevant_active_facts(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + facts = FactsService(store=store, owner_id="default") + canon = CanonService( + workspace_dir=tmp_path / "workspace", + store=store, + embeddings=None, + facts=facts, + ) + + async def scenario() -> None: + await canon.write_canon("decisions", "# Decisions\n\nPrimary installer is uv.\n", "overwrite") + + asyncio.run(scenario()) + context = facts.get_relevant_facts( + "what did we decide about installer?", + memory_facets=["decision"], + limit=3, + ) + assert len(context) == 1 + assert "primary installer is uv" in context[0] + + +def test_canon_fact_sync_ignores_low_signal_or_rhetorical_lines(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + facts = FactsService(store=store, owner_id="default") + canon = CanonService( + workspace_dir=tmp_path / "workspace", + store=store, + embeddings=None, + facts=facts, + ) + + content = """# Facts + +Service is healthy. +This is my system. +Context is finite - files, markdown, memory layers. They give fullness, but the task is to look beyond the known. +Open question: is 70 percent content loss a bug or feature? +""" + + async def scenario() -> None: + await canon.write_canon("facts", content, "overwrite") + + asyncio.run(scenario()) + rows = store.list_memory_facts( + "default", + status="active", + source_kind="canon", + source_ref="facts.md", + limit=20, + ) + assert len(rows) == 1 + assert rows[0].subject == "service" + assert rows[0].value_text == "healthy" + + +def test_canon_fact_sync_only_treats_supported_canon_files_as_verified_facts(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + facts = FactsService(store=store, owner_id="default") + + now = utc_now() + store.upsert_memory_fact( + MemoryFactRecord( + id="fact_agents_1", + owner_id="default", + subject="this", + key="is", + value_text="my system", + value_json=None, + fact_type="AGENTS", + confidence=0.95, + status="active", + valid_from=now, + valid_to=None, + facets=[], + source_kind="canon", + source_ref="AGENTS.md", + created_at=now, + updated_at=now, + ) + ) + + result = facts.sync_verified_facts_from_canon("AGENTS.md", "This is my system.\n") + assert result == {"active": 0, "superseded": 1} + assert store.list_memory_facts( + "default", + status="active", + source_kind="canon", + source_ref="AGENTS.md", + limit=20, + ) == [] + + +def test_canon_service_prunes_existing_unsupported_canon_facts_on_startup(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + now = utc_now() + store.upsert_memory_fact( + MemoryFactRecord( + id="fact_agents_startup", + owner_id="default", + subject="this", + key="is", + value_text="my system", + value_json=None, + fact_type="AGENTS", + confidence=0.95, + status="active", + valid_from=now, + valid_to=None, + facets=[], + source_kind="canon", + source_ref="AGENTS.md", + created_at=now, + updated_at=now, + ) + ) + + facts = FactsService(store=store, owner_id="default") + CanonService( + workspace_dir=tmp_path / "workspace", + store=store, + embeddings=None, + facts=facts, + ) + + assert store.list_memory_facts( + "default", + status="active", + source_kind="canon", + source_ref="AGENTS.md", + limit=20, + ) == [] diff --git a/tests/test_memory_quality_controls.py b/tests/test_memory_quality_controls.py index 7e36aa9..9fcccec 100644 --- a/tests/test_memory_quality_controls.py +++ b/tests/test_memory_quality_controls.py @@ -94,3 +94,104 @@ async def scenario() -> list[str]: context = asyncio.run(scenario()) assert len(context) == 1 assert "High confidence old fact" in context[0] + + +def test_memory_adds_typed_enrichment_metadata() -> None: + store = _StoreStub() + service = MemoryService(store=store, embeddings=None, owner_id="default") + + async def scenario() -> None: + await service.add_message( + "assistant", + "We decided to switch to uv because pip was too slow.", + {"chat_id": 11}, + ) + + asyncio.run(scenario()) + assert len(store.entries) == 1 + metadata = store.entries[0].metadata + facets = metadata.get("memory_facets") or [] + assert "decision" in facets + assert "fact_candidate" not in facets + + +def test_memory_adds_problem_emotional_and_fact_hints() -> None: + store = _StoreStub() + service = MemoryService(store=store, embeddings=None, owner_id="default") + + async def scenario() -> None: + await service.add_message( + "assistant", + "Service is not healthy and I'm worried about the deploy.", + {"chat_id": 12}, + ) + + asyncio.run(scenario()) + metadata = store.entries[-1].metadata + facets = metadata.get("memory_facets") or [] + assert "problem" in facets + assert "emotional" in facets + assert "fact_candidate" in facets + assert metadata.get("fact_candidate") is True + assert metadata.get("fact_subject_hint") == "service" + assert metadata.get("fact_value_hint") == "healthy and i'm worried about the deploy" + assert metadata.get("fact_negated") is True + + +def test_memory_preserves_explicit_enrichment_overrides() -> None: + store = _StoreStub() + service = MemoryService(store=store, embeddings=None, owner_id="default") + + async def scenario() -> None: + await service.add_message( + "assistant", + "Service is healthy.", + { + "chat_id": 13, + "memory_facets": ["custom_marker"], + "fact_candidate": False, + }, + ) + + asyncio.run(scenario()) + metadata = store.entries[-1].metadata + facets = metadata.get("memory_facets") or [] + assert "custom_marker" in facets + assert "fact_candidate" not in facets + assert metadata.get("fact_candidate") is False + + +def test_memory_context_prefers_matching_facets_when_available() -> None: + store = _StoreStub() + store.entries.extend( + [ + MemoryEntry( + id=str(uuid.uuid4()), + role="assistant", + content="We decided to use uv for installs.", + embedding=[1.0, 0.0], + created_at=utc_now(), + metadata={"owner_id": "default", "chat_id": 21, "memory_facets": ["decision"]}, + ), + MemoryEntry( + id=str(uuid.uuid4()), + role="assistant", + content="The deploy is broken right now.", + embedding=[1.0, 0.0], + created_at=utc_now(), + metadata={"owner_id": "default", "chat_id": 22, "memory_facets": ["problem"]}, + ), + ] + ) + service = MemoryService(store=store, embeddings=_EmbedStub(), owner_id="default", top_k=1, min_score=0.05) + + async def scenario() -> list[str]: + return await service.get_context_by_facets( + "why did we decide to use uv?", + exclude_chat_id=None, + memory_facets=["decision"], + ) + + context = asyncio.run(scenario()) + assert len(context) == 1 + assert "decided to use uv" in context[0] diff --git a/tests/test_memory_reflection.py b/tests/test_memory_reflection.py new file mode 100644 index 0000000..6caa299 --- /dev/null +++ b/tests/test_memory_reflection.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path + +from octopal.infrastructure.store.sqlite import SQLiteStore +from octopal.runtime.memory.reflection import ReflectionService +from octopal.runtime.octo.core import Octo +from octopal.runtime.octo.prompt_builder import build_octo_prompt + + +class _StoreSettings: + def __init__(self, state_dir: Path, workspace_dir: Path) -> None: + self.state_dir = state_dir + self.workspace_dir = workspace_dir + + +def test_reflection_service_builds_wakeup_context(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + reflection = ReflectionService(store=store, owner_id="default") + reflection.record_context_reset( + 42, + { + "reason": "context overloaded", + "goal_now": "Finish memory rollout.", + "next_step": "Run the targeted tests.", + "open_threads": ["facts layer"], + "critical_constraints": ["do not break live agent"], + "health_snapshot": {"context_size_estimate": 1234}, + }, + ) + + text = reflection.build_wakeup_context(42) + assert "Recent reflection relevant to this wake-up:" in text + assert "Finish memory rollout." in text + assert "Run the targeted tests." in text + + +def test_build_octo_prompt_includes_reflection_on_wakeup(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + reflection = ReflectionService(store=store, owner_id="default") + reflection.record_context_reset( + 5, + { + "reason": "context overloaded", + "goal_now": "Resume the task.", + "next_step": "Check the latest state first.", + "open_threads": [], + "critical_constraints": [], + "health_snapshot": {}, + }, + ) + + class DummyMemory: + async def get_context(self, user_text: str, exclude_chat_id: int | None = None): + return [] + + async def get_recent_history(self, chat_id: int, limit: int = 20): + return [] + + class DummyCanon: + def get_tier1_context(self): + return "" + + async def scenario() -> None: + messages = await build_octo_prompt( + store=store, + memory=DummyMemory(), + canon=DummyCanon(), + user_text="continue", + chat_id=5, + bootstrap_context="", + wake_notice="You woke up after a reset.", + reflection=reflection, + ) + merged = "\n".join(str(message.content) for message in messages if isinstance(message.content, str)) + assert "Wake-up directive after context reset:" in merged + assert "Recent reflection relevant to this wake-up:" in merged + assert "Resume the task." in merged + + asyncio.run(scenario()) + + +def test_octo_context_reset_records_reflection_entry(tmp_path: Path) -> None: + store = SQLiteStore(_StoreSettings(tmp_path / "data", tmp_path / "workspace")) + reflection = ReflectionService(store=store, owner_id="default") + + class DummyMemory: + async def add_message(self, role: str, content: str, metadata: dict): + return None + + octo = Octo( + provider=object(), + store=store, + policy=object(), + runtime=object(), + approvals=object(), + memory=DummyMemory(), + canon=object(), + reflection=reflection, + ) + + async def scenario() -> None: + result = await octo.request_context_reset( + 99, + { + "mode": "soft", + "reason": "context overloaded", + "goal_now": "Resume testing.", + "next_step": "Review the latest handoff.", + }, + ) + assert result["status"] == "reset_complete" + + asyncio.run(scenario()) + entries = store.list_octo_diary_entries("default", chat_id=99, limit=10) + assert len(entries) == 1 + assert entries[0].kind == "context_reset" + assert "Resume testing." in entries[0].summary diff --git a/tests/test_prompt_builder_images.py b/tests/test_prompt_builder_images.py index 53d1282..b9916c0 100644 --- a/tests/test_prompt_builder_images.py +++ b/tests/test_prompt_builder_images.py @@ -102,3 +102,89 @@ async def scenario() -> None: assert "do not repeat the same call" in merged asyncio.run(scenario()) + + +def test_build_octo_prompt_uses_facets_aware_memory_getter_when_available() -> None: + class DummyMemory: + def __init__(self) -> None: + self.calls: list[tuple[str, list[str] | None]] = [] + + async def get_context(self, user_text: str, exclude_chat_id: int | None = None): + raise AssertionError("facets-aware getter should be preferred when available") + + async def get_context_by_facets( + self, + user_text: str, + *, + exclude_chat_id: int | None = None, + memory_facets: list[str] | None = None, + ): + self.calls.append((user_text, memory_facets)) + return ["assistant: We decided to use uv."] + + async def get_recent_history(self, chat_id: int, limit: int = 20): + return [] + + class DummyCanon: + def get_tier1_context(self): + return "" + + memory = DummyMemory() + + async def scenario() -> None: + messages = await build_octo_prompt( + store=object(), + memory=memory, + canon=DummyCanon(), + user_text="why did we decide to use uv?", + chat_id=123, + bootstrap_context="", + ) + contents = [str(msg.content) for msg in messages if isinstance(msg.content, str)] + merged = "\n".join(contents) + assert "We decided to use uv." in merged + + asyncio.run(scenario()) + assert memory.calls == [("why did we decide to use uv?", ["decision"])] + + +def test_build_octo_prompt_includes_compact_facts_context_when_available() -> None: + class DummyMemory: + async def get_context(self, user_text: str, exclude_chat_id: int | None = None): + return [] + + async def get_context_by_facets( + self, + user_text: str, + *, + exclude_chat_id: int | None = None, + memory_facets: list[str] | None = None, + ): + return [] + + async def get_recent_history(self, chat_id: int, limit: int = 20): + return [] + + class DummyCanon: + def get_tier1_context(self): + return "" + + class DummyFacts: + def get_relevant_facts(self, query: str, *, memory_facets: list[str] | None = None, limit: int = 3): + return ["primary installer is uv (decisions.md)"] + + async def scenario() -> None: + messages = await build_octo_prompt( + store=object(), + memory=DummyMemory(), + canon=DummyCanon(), + user_text="what did we decide about installer?", + chat_id=123, + bootstrap_context="", + facts=DummyFacts(), + ) + merged = "\n".join(str(message.content) for message in messages if isinstance(message.content, str)) + assert "" in merged + assert "primary installer is uv" in merged + + asyncio.run(scenario())