Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions context_scribe/evaluator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
}


def get_evaluator(name: str) -> BaseEvaluator:
def get_evaluator(name: str, **kwargs) -> BaseEvaluator:
"""Return an evaluator instance by name. Raises ValueError for unknown names."""
cls = EVALUATOR_REGISTRY.get(name)
if cls is None:
raise ValueError(
f"Unknown evaluator '{name}'. "
f"Available: {', '.join(sorted(EVALUATOR_REGISTRY))}"
)
return cls()
return cls(**kwargs)
Comment thread
don-petry marked this conversation as resolved.


__all__ = [
Expand Down
136 changes: 109 additions & 27 deletions context_scribe/evaluator/base_evaluator.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,122 @@
import importlib.resources
import json
import logging
import re
import subprocess
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional

from context_scribe.models.interaction import Interaction
from context_scribe.models.evaluator_models import RuleOutput, INTERNAL_SIGNATURE
from context_scribe.models.evaluator_models import (
RuleOutput, INTERNAL_SIGNATURE, PrefilterResult, PrefilterMetrics,
)

logger = logging.getLogger(__name__)


def _parse_bool(value) -> Optional[bool]:
"""Safely parse a boolean that may arrive as a string from LLM JSON.

Returns ``None`` for unrecognised or null values so the caller can
fall back to full evaluation (fail-open behaviour).
"""
if isinstance(value, bool):
return value
if isinstance(value, str):
normalised = value.strip().lower()
if normalised in ("true", "1", "yes"):
return True
if normalised in ("false", "0", "no"):
return False
return None # unrecognised string → pass through
return None # None / other types → pass through


def _load_package_template(filename: str) -> str:
"""Load a template file from this package using importlib.resources."""
return (
importlib.resources.files("context_scribe.evaluator")
.joinpath(filename)
.read_text(encoding="utf-8")
)


class BaseEvaluator(ABC):
def __init__(self):
# Load the prompt template
template_path = Path(__file__).parent / "prompt_template.md"
with open(template_path, "r", encoding="utf-8") as f:
self.prompt_template = f.read()
def __init__(self, skip_prefilter: bool = False):
self.skip_prefilter = skip_prefilter
self.metrics = PrefilterMetrics()
# Load the prompt templates via importlib.resources (works in
# packaged installs such as wheels / zip imports).
self.prompt_template = _load_package_template("prompt_template.md")
self._prefilter_template = _load_package_template("prefilter_template.md")

@abstractmethod
def _execute_cli(self, prompt: str) -> str:
"""Executes the specific CLI tool and returns the raw stdout.

Should raise subprocess.TimeoutExpired if the execution takes too long.
"""
pass

def _pre_evaluate(self, interaction: Interaction) -> Optional[PrefilterResult]:
"""Stage 1: Lightweight check to filter non-rule interactions."""
prompt = self._prefilter_template.format(
internal_signature=INTERNAL_SIGNATURE,
content=interaction.content,
)
try:
output = self._execute_cli(prompt)

# Extract response text from JSON wrapper if present
response_text = output
try:
data = json.loads(output)
if isinstance(data, dict):
response_text = data.get("result", data.get("response", output))
except json.JSONDecodeError:
pass

response_text = re.sub(r'```(?:json)?\s*', '', str(response_text)).strip()

# Parse the prefilter JSON response
json_match = re.search(r'\{[^}]*"contains_rule"[^}]*\}', response_text)
if json_match:
pf_data = json.loads(json_match.group(0))
parsed = _parse_bool(pf_data.get("contains_rule", True))
if parsed is None:
logger.warning(
"Unrecognised contains_rule value %r, passing through to full eval",
pf_data.get("contains_rule"),
)
return None
return PrefilterResult(
contains_rule=parsed,
confidence=float(pf_data.get("confidence", 0.0)),
)
Comment thread
don-petry marked this conversation as resolved.

logger.warning("Could not parse prefilter response, passing through to full eval")
return None

except subprocess.TimeoutExpired:
logger.warning("Prefilter timed out, passing through to full eval")
return None
except Exception as e:
logger.warning("Prefilter error: %s, passing through to full eval", e)
return None

def evaluate_interaction(self, interaction: Interaction, existing_global: str = "", existing_project: str = "") -> Optional[RuleOutput]:
# Stage 1: Pre-filter
if not self.skip_prefilter:
prefilter_result = self._pre_evaluate(interaction)
self.metrics.record_result(prefilter_result)
if prefilter_result and prefilter_result.should_skip_full_eval:
logger.info(
"Prefilter: skipping full eval for %s (confidence=%.2f)",
interaction.project_name, prefilter_result.confidence,
)
return None

# Stage 2: Full extraction
prompt = self.prompt_template.format(
internal_signature=INTERNAL_SIGNATURE,
project_name=interaction.project_name,
Expand All @@ -37,36 +127,29 @@ def evaluate_interaction(self, interaction: Interaction, existing_global: str =

try:
output = self._execute_cli(prompt)

# Extract response text
response_text = output
try:
data = json.loads(output)
if isinstance(data, dict):
# Handle both gemini ("response") and claude ("result"/"response") formats
response_text = data.get("result", data.get("response", output))
except json.JSONDecodeError:
pass

# Strip markdown code fences if present (Claude often wraps JSON in ```json ... ```)
# Strip markdown code fences if present
response_text = re.sub(r'```(?:json)?\s*', '', str(response_text)).strip()

# Robust JSON extraction: look for substrings that start with { and end with }
# and contain both "scope" and "rules"
# Robust JSON extraction
best_rule_data = None

# Find all { and } positions
start_indices = [i for i, char in enumerate(response_text) if char == '{']
end_indices = [i for i, char in enumerate(response_text) if char == '}']

# Try progressively smaller substrings starting from the first { and ending at the last }
# until we find a valid JSON object that has our keys.

for start in start_indices:
for end in reversed(end_indices):
if end > start:
try:
candidate = response_text[start:end+1]
# Quick check to avoid expensive json.loads on non-candidates
if '"scope"' in candidate and '"rules"' in candidate:
data = json.loads(candidate)
if isinstance(data, dict) and "scope" in data and "rules" in data:
Expand All @@ -81,35 +164,34 @@ def evaluate_interaction(self, interaction: Interaction, existing_global: str =
try:
rules_raw = best_rule_data["rules"]
desc = best_rule_data.get("description", "Updated rules")

if isinstance(rules_raw, list):
rules_content = "\n".join([str(r) for r in rules_raw]).strip()
else:
rules_content = str(rules_raw).strip()

if len(rules_content) > 0:
return RuleOutput(
content=rules_content,
scope=str(best_rule_data["scope"]).upper(),
content=rules_content,
scope=str(best_rule_data["scope"]).upper(),
description=str(desc)
)
except Exception as e:
logger.debug(f"Failed to extract rule fields from JSON: {e}")

if "NO_RULE" in str(response_text):
return None
# Fallback for non-JSON responses (robustness)

# Fallback for non-JSON responses
text_upper = str(response_text).upper()
if "PROJECT" in text_upper or "GLOBAL" in text_upper:
scope = "PROJECT" if "PROJECT" in text_upper else "GLOBAL"
# Try to find some content if rules are just listed
content = str(response_text)
return RuleOutput(content=content, scope=scope, description="Extracted via fallback")

logger.error(f"Failed to parse rule extraction for {interaction.project_name}")
return None

except subprocess.TimeoutExpired:
logger.error(f"Evaluation timed out for {interaction.project_name}")
return None
Expand Down
4 changes: 2 additions & 2 deletions context_scribe/evaluator/claude_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
class ClaudeEvaluator(BaseEvaluator):
"""Evaluator that uses Claude Code CLI for headless rule extraction."""

def __init__(self):
super().__init__()
def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
subprocess.run(["claude", "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
Expand Down
4 changes: 2 additions & 2 deletions context_scribe/evaluator/copilot_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
class CopilotEvaluator(BaseEvaluator):
"""Evaluator that uses the GitHub Copilot CLI for headless rule extraction."""

def __init__(self):
super().__init__()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._cli_path = shutil.which("copilot")
if not self._cli_path:
logger.warning(
Expand Down
4 changes: 2 additions & 2 deletions context_scribe/evaluator/gemini_cli_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
class GeminiCliEvaluator(BaseEvaluator):
"""Evaluator that uses Gemini CLI for headless rule extraction."""

def __init__(self):
super().__init__()
def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
subprocess.run(["gemini", "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
Expand Down
24 changes: 24 additions & 0 deletions context_scribe/evaluator/prefilter_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{internal_signature}
You are a lightweight classifier. Your ONLY job is to determine whether the following
user-agent interaction contains a NEW persistent preference, project constraint, or
behavioral rule that should be remembered long-term.

Examples of rule-bearing interactions:
- "Always use tabs instead of spaces"
- "For this project, use PostgreSQL not MySQL"
- "Never use semicolons in TypeScript"

Examples of NON-rule interactions:
- "Can you help me fix this bug?"
- "Explain how async/await works"
- "Generate a function that sorts a list"

INTERACTION:
'''
{content}
'''

Respond with ONLY a JSON object:
{{"contains_rule": true, "confidence": 0.95}}
or
{{"contains_rule": false, "confidence": 0.90}}
21 changes: 17 additions & 4 deletions context_scribe/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(self, tool: str, bank_path: str):
self.last_event_time = "N/A"
self.update_count = 0
self.history = [] # List of (time, file_path, description) tuples
self.prefilter_passed = 0
self.prefilter_skipped = 0

def add_history(self, file_path: str, description: str):
self.update_count += 1
Expand Down Expand Up @@ -99,9 +101,13 @@ def generate_layout(self) -> Layout:
# Footer
stats = Table.grid(expand=True)
stats.add_column(justify="left")
stats.add_column(justify="center")
stats.add_column(justify="right")
total_processed = self.prefilter_passed + self.prefilter_skipped
skip_rate = (self.prefilter_skipped / total_processed * 100) if total_processed > 0 else 0
stats.add_row(
Text(f" System: Active", style="green"),
Text(f"Prefilter: {self.prefilter_skipped} skipped / {total_processed} total ({skip_rate:.0f}%)", style="dim"),
Text(f"Total Rules Extracted: {self.update_count} ", style="bold green")
)
layout["footer"].update(Panel(stats, border_style="dim"))
Expand Down Expand Up @@ -205,7 +211,7 @@ def _status(msg: str, db, live, debug: bool):
live.update(db.generate_layout())


async def run_daemon(tool: str, bank_path: str, debug: bool = False, evaluator_name: str = "auto") -> bool:
async def run_daemon(tool: str, bank_path: str, debug: bool = False, evaluator_name: str = "auto", skip_prefilter: bool = False) -> bool:
if tool == "gemini-cli":
bootstrap_global_config()
provider = GeminiCliProvider()
Expand All @@ -221,7 +227,7 @@ async def run_daemon(tool: str, bank_path: str, debug: bool = False, evaluator_n

if evaluator_name == "auto":
evaluator_name = _detect_evaluator(tool)
evaluator = get_evaluator(evaluator_name)
evaluator = get_evaluator(evaluator_name, skip_prefilter=skip_prefilter)
Comment thread
don-petry marked this conversation as resolved.
mcp_client = MemoryBankClient(bank_path=bank_path)

try:
Expand Down Expand Up @@ -255,6 +261,12 @@ async def _loop(live=None):
_status(f"🧠 Thinking: Extracting rules for {interaction.project_name}...", db, live, debug)
rule_output = await loop.run_in_executor(None, evaluator.evaluate_interaction, interaction, existing_global, existing_project)

# Sync prefilter metrics to dashboard
metrics = getattr(evaluator, 'metrics', None)
if metrics and isinstance(getattr(metrics, 'prefilter_passed', None), int):
db.prefilter_passed = metrics.prefilter_passed
db.prefilter_skipped = metrics.prefilter_skipped

if rule_output:
dest_proj = "global" if rule_output.scope == "GLOBAL" else interaction.project_name
dest_file = "global_rules.md" if rule_output.scope == "GLOBAL" else "rules.md"
Expand Down Expand Up @@ -301,12 +313,13 @@ async def _loop(live=None):
@click.option('--bank-path', default='~/.memory-bank', help='Path to your Memory Bank root')
@click.option('--evaluator', 'evaluator_name', default='auto', type=click.Choice(['auto'] + sorted(EVALUATOR_REGISTRY)), help='Evaluator LLM to use (default: auto-detect)')
@click.option('--debug', is_flag=True, default=False, help='Stream plain debug logs instead of dashboard UI')
def cli(tool, bank_path, evaluator_name, debug):
@click.option('--skip-prefilter', is_flag=True, default=False, help='Disable Stage 1 prefilter (send all interactions to full eval)')
def cli(tool, bank_path, evaluator_name, debug, skip_prefilter):
"""Context-Scribe: Persistent Secretary Daemon"""
if debug:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
try:
asyncio.run(run_daemon(tool, bank_path, debug=debug, evaluator_name=evaluator_name))
asyncio.run(run_daemon(tool, bank_path, debug=debug, evaluator_name=evaluator_name, skip_prefilter=skip_prefilter))
except KeyboardInterrupt:
pass

Expand Down
Loading
Loading