diff --git a/.env.template b/.env.template index 13632724..fb55c93f 100644 --- a/.env.template +++ b/.env.template @@ -22,6 +22,12 @@ AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT = # NEW: chat deployment name for generator AZURE_OPENAI_CHAT_DEPLOYMENT = +#for Azure Speech and Translator +AZURE_SPEECH_KEY=your-speech-key +AZURE_SPEECH_REGION=westus +AZURE_SPEECH_TTS_VOICE=en-US-JennyNeural +AZURE_TRANSLATOR_KEY=your-translator-key + # Optional: non-Azure OpenAI fallback OPENAI_API_KEY = OPENAI_MODEL_NAME = diff --git a/.gitignore b/.gitignore index d9030e97..d897e713 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ env/ # OS .DS_Store Thumbs.db +backend/.langsmith_local_runs/ diff --git a/backend/README.md b/backend/README.md new file mode 100644 index 00000000..56cfcfa0 --- /dev/null +++ b/backend/README.md @@ -0,0 +1,165 @@ +# Backend - Debug Endpoints + +This document describes the debug endpoints for development and testing of the personalization pipeline. + +**Dev-only:** These routers are mounted only when the `ECHO_DEBUG` environment variable is set to `1`, `true`, or `yes`. + +--- + +## 1. GET /debug/deliveries - Email Previews for UI + +**Purpose:** Run the orchestrator for a small set of mock customers and return minimal email preview objects (subject and body) for UI preview. + +### Query parameters + +- `mock` (optional, boolean): When `true`, returns precomputed previews without running the pipeline. + +## Response shape + +The endpoint returns JSON with the top-level key `previews`, an array of preview objects. +Each preview contains: + +- `user_id` (string) +- `email` (string) +- `subject` (string | null) +- `body` (string | null) +- `variant_id` (string | null) +- `blocked` (boolean) — true when no safe variant is available +- `error` (string | null) — set when pipeline execution fails for that user + +Example (mock response): + +```json +{ + "previews": [ + { + "user_id": "U001", + "email": "emma@example.com", + "subject": "Hi Emma, quick note about running shoes", + "body": "Hi Emma,\n\nWe thought you might like this: …\n\n— Team", + "variant_id": "A", + "blocked": false, + "error": null + }, + { + "user_id": "U002", + "email": "liam@example.com", + "subject": "Liam, more on the Acme plan", + "body": "Hello Liam,\n\nDetails: …\nLearn more on our site.", + "variant_id": "B", + "blocked": false, + "error": null + } + ] +} +``` + +Example (live response with a pipeline error for a user): + +```json +{ + "previews": [ + { + "user_id": "U001", + "email": "emma@example.com", + "subject": "S A", + "body": "B A", + "variant_id": "A", + "blocked": false, + "error": null + }, + { + "user_id": "U002", + "email": "liam@example.com", + "subject": null, + "body": null, + "variant_id": null, + "blocked": false, + "error": "pipeline failed" + } + ] +} +``` + +## How to use locally + +1. Enable the debug router and start the server (PowerShell): + +```powershell +$env:ECHO_DEBUG = '1' +E:/EchoAI/EchoVoice-AI/venv/Scripts/python.exe -m uvicorn backend.app.main:app --reload +``` + +2. Test GET /debug/deliveries (mock): + +```powershell +curl "http://127.0.0.1:8000/debug/deliveries?mock=true" +``` + +3. Test GET /debug/deliveries (run pipeline): + +```powershell +curl "http://127.0.0.1:8000/debug/deliveries" +``` + +4. Test POST /debug/run (full pipeline debug): + +```powershell +$body = @{customer = @{id = "U001"; name = "Emma"; email = "emma@example.com"}} | ConvertTo-Json +Invoke-RestMethod -Method POST -Uri "http://127.0.0.1:8000/debug/run" -ContentType "application/json" -Body $body +``` + +--- + +## 2. POST /debug/run - Full Pipeline Debug + +**Purpose:** Run the full orchestrator pipeline for a single customer and return the complete MessageState (all intermediate results) for debugging. + +### Request body + +```json +{ + "customer": { + "id": "U001", + "name": "Emma", + "email": "emma@example.com", + "last_event": "viewed_product", + "properties": { + "segment": "high_value" + } + } +} +``` + +### Response + +Returns the full orchestrator result including all pipeline stages: + +```json +{ + "segment": {"category": "high_value", "confidence": 0.95}, + "citations": ["Knowledge article #123", "Brand guideline v2.1"], + "variants": [ + {"id": "V1", "subject": "Hi Emma...", "body": "Dear Emma..."}, + {"id": "V2", "subject": "Emma, don't miss...", "body": "Hello Emma..."} + ], + "safety": { + "safe": [{"id": "V1", "subject": "Hi Emma...", "body": "Dear Emma..."}], + "blocked": [{"id": "V2", "reason": "policy_violation"}] + }, + "analysis": {"winner": {"variant_id": "V1", "score": 0.87}}, + "delivery": {"status": "sent", "message_id": "msg_abc123"} +} +``` + +--- + +## Notes & recommendations + +- These endpoints are for development and debugging only. Disable in production by not setting `ECHO_DEBUG`. +- **GET /debug/deliveries**: Use `mock=true` for fast UI iteration. Use without `mock` to test actual pipeline. +- **POST /debug/run**: Inspect complete pipeline execution including all intermediate stages. +- The `body_text` field in `/debug/deliveries` is a compatibility alias for `body`. +- Consider using `ECHO_DEBUG_CACHE_TTL` for caching to speed up UI development. + +That's it — let me know if you'd like me to add this snippet into the repo's root `README.md` as well or create a short example component that fetches and renders the previews. \ No newline at end of file diff --git a/backend/README_LANGSMITH.md b/backend/README_LANGSMITH.md new file mode 100644 index 00000000..c96b92d2 --- /dev/null +++ b/backend/README_LANGSMITH.md @@ -0,0 +1,213 @@ +LangSmith monitoring (opt-in) +================================= + +This project includes a lightweight, opt-in LangSmith instrumentation wrapper at `backend/services/langsmith_monitor.py`. + +Purpose +------- +- Provide safe, non-blocking telemetry hooks for agents (generator, retriever, etc.). +- No-op by default so local dev and CI are unaffected. +- When enabled, the wrapper either forwards to the LangSmith SDK (if installed and configured) or writes local JSON run files under `backend/.langsmith_local_runs/`. + +How to enable +------------- +1. Set the environment variable `LANGSMITH_ENABLED=1` or `LANGSMITH_API_KEY=`. +2. (Optional) Install the LangSmith SDK in your Python environment: `pip install langsmith`. + +Behavior +-------- +- If `LANGSMITH_ENABLED` is not present, the wrapper functions (`start_run`, `log_event`, `finish_run`) are no-ops. +- If enabled but the SDK is not installed, the wrapper writes JSON files to `backend/.langsmith_local_runs/` for inspection. +- Instrumented agents: `backend/agents/generator.py` and `backend/agents/retriever.py` call the wrapper at start/finish/error points. + +Next steps +---------- +1. Review the small changes in `backend/services/langsmith_monitor.py` and the agent instrumentation. +2. Run a smoke test locally (no secrets required): + +```bash +# from repo root +backend/.venv/bin/python -c "import sys; sys.path.insert(0,'backend'); from services.langsmith_monitor import LANGSMITH_ENABLED; print('LANGSMITH_ENABLED=', LANGSMITH_ENABLED)" +``` + +3. To fully integrate with LangSmith UI, set `LANGSMITH_API_KEY` and install the SDK. We can then update `langsmith_monitor.py` to use the SDK client directly. + +4. Coordinate with the team on run naming, metadata shape, and whether to prefer a central tracer vs per-agent instrumentation. + + +Naming & metadata conventions (recommended) +----------------------------------------- +This project recommends the following minimal conventions for recorded runs and events so telemetry is consistent and safe across agents. + +- Run name pattern + - Format: .[:] + - Examples: + - segmenter.segment_user + - retriever.retrieve_citations:payment_plans + - generator.generate_variants:default_personalization + - safety_gate.safety_check_and_filter + - analytics.evaluate_variants + +- Required top-level metadata fields + - run_id: UUID (generated by agent/wrapper) + - run_name: string (matches the Run name pattern) + - agent: string (agent short name, e.g., "segmenter") + - start_time / end_time: ISO 8601 UTC timestamps + - status: "running" | "success" | "error" + - version: code version or commit SHA (optional but recommended) + - tags: list[str] (optional short tags, e.g., ["dev","experiment-42"]) + +- Input/PII policy (allowlist + pseudonymization) + - Always avoid recording raw PII (email, full name, SSN, phone, address). + - Record a pseudonymized identifier instead: + - customer_id_hash: deterministic HMAC/SHA256 of the internal id, using a team secret (do not commit the secret). + - Safe inputs: last_event, allowlisted properties (explicitly list safe keys in code), cohort labels. + - For any potentially sensitive text, store only a redacted snippet or omit it. + +- Outputs to record + - Short structured outputs (e.g., segment label, number of citations, variant count). + - Metrics: latency_ms, token_usage, counts. + - For full text outputs (LLM responses) prefer storing an artifact reference or a redacted snippet — avoid inline PII. + +- Event naming + - Use consistent event names: "input_received", "llm_call", "citations_fetched", "variants_generated", "safety_result", "evaluation_done", "error". + - Each event should include a timestamp and a small payload with non-PII fields. + +Example run (segmenter) +----------------------- +Given a customer record (from `data/customers.json`): + +```json +{ + "id": "cust_002", + "name": "Bob", + "email": "bob@example.com", + "last_event": "payment_plans", + "properties": { + "form_started": "yes", + "scheduled": "no", + "attended": "no" + } +} +``` + +Store only safe fields and a pseudonymized id. Example recorded run: + +```json +{ + "run_id": "c7f6f3d7-1d2b-4a45-9f09-1e2b3c4d5e6f", + "run_name": "segmenter.segment_user", + "agent": "segmenter", + "start_time": "2025-11-23T08:12:06.123Z", + "end_time": "2025-11-23T08:12:06.234Z", + "status": "success", + "version": "main@b81971d", + "tags": ["dev"], + "inputs": { + "customer_id_hash": "sha256:c2f9...ab12", + "last_event": "payment_plans", + "properties": {"form_started":"yes","scheduled":"no","attended":"no"} + }, + "outputs": {"segment":"payment_plans:StartedFormOrFlow","intent_level":"medium","reasons_count":3}, + "events": [ + {"time":"2025-11-23T08:12:06.130Z","name":"segment_computed","payload":{"segment":"payment_plans:StartedFormOrFlow","intent_level":"medium"}} + ] +} +``` + +Hashing guidance +---------------- +- Use a deterministic HMAC or SHA256 with a team secret to produce pseudonymous IDs usable for joins but not reversible. Example: `sha256(team_secret + customer_id)`. +- Store the team secret in a secure secret store / env var and do not commit it. + +Env var: LANGSMITH_HMAC_SECRET +-------------------------------- +- Purpose: supply a secret used to compute deterministic HMAC-SHA256 pseudonymous IDs for any internal identifiers (e.g., customer ids). When present, the monitor will compute `customer_id_hash` as an HMAC-SHA256 of the raw id using this secret. +- How to set (example): + +```bash +export LANGSMITH_HMAC_SECRET="your-team-secret-very-long-and-random" +``` + +- Example output recorded in run metadata (truncated for readability): + +```json +"inputs": { + "customer_id_hash": "sha256:3a1f5b8c9d4e2f7a1b2c3d4e5f67890abcdef1234567890abcdef1234567890", + "last_event": "payment_plans" +} +``` + +Notes: +- If `LANGSMITH_HMAC_SECRET` is not set the wrapper falls back to a plain SHA256 digest of the id. This still avoids storing raw PII but is less secure for deterministic joins across systems. Prefer setting the HMAC secret in a secure store. +- Keep the secret out of version control and CI logs. Use your environment/secret manager (GitHub Secrets, AWS Parameter Store, Azure Key Vault, etc.). + +Team checklist to finalize +------------------------- +- [ ] Approve run_name pattern and list of agents to instrument +- [ ] Approve required metadata fields and allowlisted input keys +- [ ] Decide HMAC/secret location for deterministic hashing +- [ ] Decide retention policy for run artifacts and full-text captures +- [ ] Plan LangSmith SDK wiring once conventions are finalized + +--- + +Paste this section into the PR body or the issue comment to capture the agreed conventions. Once agreed I can wire a deterministic hash into `langsmith_monitor.start_run` and add tests that assert recorded runs follow the schema. + +# LangSmith monitor (opt-in) + +This folder contains a lightweight, opt-in LangSmith monitoring wrapper and example instrumentation for generator and retriever agents. + +How it works +- The wrapper is `backend/services/langsmith_monitor.py`. +- By default the wrapper is disabled and is a no-op. To enable set one of: + - `LANGSMITH_API_KEY` (preferred) + - `LANGSMITH_ENABLED=1` (for local testing; writes local JSON files) +- When enabled and the `langsmith` SDK is installed, the wrapper can be extended to forward runs to LangSmith. + +Local testing +- Without enabling Langsmith, instrumentation will not affect runtime. +- If you want to inspect local runs instead of sending to LangSmith: + ```bash + export LANGSMITH_ENABLED=1 + # run your agent (from repo root): + cd backend + ./venv/bin/python -c "from services import langsmith_monitor; print(langsmith_monitor.LANGSMITH_ENABLED)" + # local run files are written to backend/.langsmith_local_runs/ + ``` + +Next steps +- Optionally wire the wrapper to the real `langsmith` SDK when team is ready. +- Decide a naming convention for run names and metadata, and extend the wrapper to include team-specific fields. + +Notes +- The wrapper is intentionally minimal to avoid adding runtime risks. It writes local JSON files when enabled and the SDK is not installed. +LangSmith integration (opt-in) +================================= + +This folder contains a lightweight, opt-in wrapper to record agent runs locally +or forward to LangSmith when enabled. + +How to enable (local testing) +- By default the monitor is disabled. To enable local JSON recording set: + +```bash +export LANGSMITH_ENABLED=1 +``` + +This will create run files under `backend/.langsmith_local_runs/` for each +instrumented agent run. + +How to enable real LangSmith (team) +- Install the `langsmith` package into your Python environment. +- Set an API key: + +```bash +export LANGSMITH_API_KEY=sk_...your_key... +``` + +Notes +- The wrapper is intentionally minimal and no-op by default to avoid + introducing runtime behavior changes. Once enabled, it records run start, + events, and finish status. The team can later replace or extend the wrapper + to call the official LangSmith SDK or to normalize metadata. diff --git a/backend/README_SWAGGER.md b/backend/README_SWAGGER.md new file mode 100644 index 00000000..82a5ae84 --- /dev/null +++ b/backend/README_SWAGGER.md @@ -0,0 +1,47 @@ +Lightweight Swagger runner +========================= + +This file documents how to run a lightweight local FastAPI server that exposes the project's OpenAPI/Swagger UI without installing heavy dependencies (FAISS, LangChain, etc.). + +What it does +- Creates a tiny FastAPI app that includes the health endpoint and a dummy `/orchestrate` route with the same request model shapes used by the real app. This is intended only for local OpenAPI inspection and development of client code. + +## Files +- `run_swagger_light.py` — lightweight runner added to the `backend/` folder. + +## Run locally (recommended) +1. Change to the `backend/` folder: + +```bash +cd backend +``` + +2. Create a virtual environment and install minimal deps: + +```bash +python3 -m venv .venv +./.venv/bin/python -m pip install --upgrade pip setuptools wheel +./.venv/bin/pip install fastapi uvicorn python-dotenv pydantic +``` + +3. Start the lightweight server (detached): + +```bash +cd backend +nohup ./.venv/bin/python -m uvicorn run_swagger_light:app --app-dir $(pwd) --host 127.0.0.1 --port 8000 > /tmp/evai_uvicorn.log 2>&1 & +echo "server started, logs -> /tmp/evai_uvicorn.log" +``` + +4. Open Swagger UI in your browser: + + http://127.0.0.1:8000/docs + +Or fetch the OpenAPI spec directly: + +```bash +curl http://127.0.0.1:8000/openapi.json | jq . +``` + +Notes and next steps +- The dummy `/orchestrate` endpoint only echoes the parsed payload and does not execute orchestration logic. +- If you want the real endpoints wired up in Swagger you must install the full backend dependencies (see `requirements.txt`) — this will take longer and requires heavy packages such as FAISS. diff --git a/backend/agents/analytics.py b/backend/agents/analytics.py index 4d196551..bc1c1ec9 100644 --- a/backend/agents/analytics.py +++ b/backend/agents/analytics.py @@ -1,10 +1,34 @@ import random def evaluate_variants(variants: list, customer: dict) -> dict: + # Opt-in instrumentation + try: + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + except Exception: + start_run = log_event = finish_run = lambda *a, **k: None + LANGSMITH_ENABLED = False + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("analytics.evaluate_variants", {"variant_count": len(variants)}) + # Mock evaluation: assign random CTR estimates and pick best results = [] for v in variants: ctr = round(random.uniform(0.02, 0.20), 3) - results.append({'variant_id': v.get('id'), 'ctr': ctr}) - winner = max(results, key=lambda r: r['ctr']) if results else None - return {'results': results, 'winner': winner} + results.append({"variant_id": v.get("id"), "ctr": ctr}) + winner = max(results, key=lambda r: r["ctr"]) if results else None + + out = {"results": results, "winner": winner} + + if run_id: + try: + log_event(run_id, "evaluation_done", {"winner": winner, "count": len(results)}) + finish_run(run_id, status="success", outputs={"winner": winner}) + except Exception: + try: + finish_run(run_id, status="error", outputs={}) + except Exception: + pass + + return out diff --git a/backend/agents/assignment_agent.py b/backend/agents/assignment_agent.py new file mode 100644 index 00000000..9170811e --- /dev/null +++ b/backend/agents/assignment_agent.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import hashlib +import json +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple + + +class AssignmentStrategy(Enum): + MD5_HASH = "md5_hash" + ROUND_ROBIN = "round_robin" + RANDOM = "random" + + +class ABAssignmentAgent: + """ + Deterministic A/B (multi-variant) assignment agent using MD5 hashing. + + Example: + agent = ABAssignmentAgent(split_ratio={"A":0.5,"B":0.5}, seed="echovoice") + assignment = agent.assign_user("U123", "exp_001", context={"email": "x@x.com"}) + """ + + def __init__( + self, + split_ratio: Optional[Dict[str, float]] = None, + seed: str = "echovoice", + strategy: AssignmentStrategy = AssignmentStrategy.MD5_HASH, + ): + if split_ratio is None: + split_ratio = {"A": 0.5, "B": 0.5} + self.split_ratio = dict(split_ratio) + self._validate_split_ratio() + self.variant_ids = list(self.split_ratio.keys()) + self.thresholds = self._compute_thresholds() + self.seed = seed + self.strategy = strategy + + def _validate_split_ratio(self) -> None: + total = sum(self.split_ratio.values()) + if not (0.999 <= total <= 1.001): + raise ValueError("split_ratio values must sum to 1.0") + for k, v in self.split_ratio.items(): + if v <= 0 or v > 1: + raise ValueError(f"Invalid split fraction for {k}: {v}") + + def _compute_thresholds(self) -> List[Tuple[str, float]]: + thresholds: List[Tuple[str, float]] = [] + cum = 0.0 + for vid in self.variant_ids: + cum += float(self.split_ratio[vid]) + thresholds.append((vid, cum)) + return thresholds + + def _compute_hash_value(self, user_id: str, experiment_id: str) -> float: + """ + Deterministic hash in [0.0, 1.0). + + Uses md5(seed + experiment_id + user_id) normalized to [0,1). + """ + key = f"{self.seed}:{experiment_id}:{user_id}" + digest = hashlib.md5(key.encode("utf-8")).hexdigest() + truncated = int(digest[:15], 16) + max_val = float(int("f" * 15, 16)) + return truncated / max_val + + def assign_user( + self, + user_id: str, + experiment_id: str, + context: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + if self.strategy != AssignmentStrategy.MD5_HASH: + raise NotImplementedError(f"Strategy {self.strategy} not implemented") + + hash_value = self._compute_hash_value(user_id, experiment_id) + chosen_variant = None + for vid, upper in self.thresholds: + if hash_value < upper: + chosen_variant = vid + break + if chosen_variant is None: + chosen_variant = self.variant_ids[-1] + + assignment = { + "variant_id": chosen_variant, + "hash_value": hash_value, + "experiment_id": experiment_id, + "user_id": user_id, + "context": context, + "deterministic": True, + } + + return assignment + + def validate_assignment(self, assignment: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + required = {"variant_id", "hash_value", "experiment_id", "user_id"} + missing = required - set(assignment.keys()) + if missing: + return False, f"Missing required field(s): {', '.join(sorted(missing))}" + if assignment["variant_id"] not in self.variant_ids: + return False, "Invalid variant_id" + hv = assignment["hash_value"] + if not isinstance(hv, (int, float)) or not (0.0 <= hv < 1.0): + return False, "hash_value out of range [0.0, 1.0)" + return True, None + + +class MicrosoftServicesAdapter: + """ + Development-friendly hooks for Azure integrations (App Insights, Kusto, Service Bus). + Replace prints with real SDK calls when deploying to Azure. + """ + + @staticmethod + def log_assignment_to_app_insights( + assignment: Dict[str, Any], + instrumentation_key: Optional[str] = None, + ) -> bool: + if instrumentation_key is None: + print(f"[App Insights Hook] Assignment: {json.dumps(assignment)}") + return True + return True + + @staticmethod + def log_assignment_to_kusto( + assignment: Dict[str, Any], + cluster_uri: Optional[str] = None, + database: str = "echovoice", + ) -> bool: + if cluster_uri is None: + print(f"[Kusto Hook] Assignment: {json.dumps(assignment)}") + return True + return True + + @staticmethod + def publish_assignment_event( + assignment: Dict[str, Any], + connection_string: Optional[str] = None, + queue_name: str = "assignment-events", + ) -> bool: + if connection_string is None: + print(f"[Service Bus Hook] Assignment Event: {json.dumps(assignment)}") + return True + return True diff --git a/backend/agents/generator.py b/backend/agents/generator.py index f120bb01..1f4b3ef8 100644 --- a/backend/agents/generator.py +++ b/backend/agents/generator.py @@ -37,9 +37,33 @@ import json from typing import Any, Dict, List, Optional -from langchain_core.documents import Document # for downstream typing only -from langchain_openai import AzureChatOpenAI, ChatOpenAI -from langchain_core.messages import SystemMessage, HumanMessage +try: + from langchain_core.documents import Document # for downstream typing only +except Exception: + # Minimal placeholder for tests when langchain_core isn't installed + class Document: + def __init__(self, page_content: str = "", metadata: dict | None = None): + self.page_content = page_content + self.metadata = metadata or {} + +try: + from langchain_openai import AzureChatOpenAI, ChatOpenAI +except Exception: + AzureChatOpenAI = None + ChatOpenAI = None + +try: + from langchain_core.messages import SystemMessage, HumanMessage +except Exception: + # Simple message placeholders used only for LLM path; safe to stub when not installed + class SystemMessage: + def __init__(self, content: str): + self.content = content + + class HumanMessage: + def __init__(self, content: str): + self.content = content +from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED # IMPORTANT: adjust this import path if your config module lives elsewhere. # You said "app/config.py", so we import from app.config. @@ -395,13 +419,32 @@ def generate_variants( This keeps your pipeline testable locally even without secrets. """ - # 1) Try LLM path - variants = _generate_with_llm(customer, segment, citations) - if variants is not None: + run_id = None + # Before generating variants, starts a LangSmith run + if LANGSMITH_ENABLED: + run_id = start_run("generator.generate_variants", {"agent": "generator", "customer_id": customer.get("id"), "use_case": segment.get("use_case")}) + + try: + # 1) Try LLM path + variants = _generate_with_llm(customer, segment, citations) + if variants is not None: + if run_id: + log_event(run_id, "generated_variants", {"count": len(variants)}) + finish_run(run_id, status="success", outputs={"count": len(variants)}) + return variants + + # 2) Fallback path (no LLM or error) + variants = _fallback_template_variants(customer, segment, citations) + if run_id: + log_event(run_id, "fallback_variants", {"count": len(variants)}) + finish_run(run_id, status="success", outputs={"count": len(variants), "generator": "template_fallback"}) return variants - # 2) Fallback path (no LLM or error) - return _fallback_template_variants(customer, segment, citations) + except Exception as e: + if run_id: + log_event(run_id, "error", {"error": str(e)}) + finish_run(run_id, status="error", outputs={"error": str(e)}) + raise # --------------------------------------------------------------------- diff --git a/backend/agents/retriever.py b/backend/agents/retriever.py index 8ae26ba1..63b91f43 100644 --- a/backend/agents/retriever.py +++ b/backend/agents/retriever.py @@ -138,6 +138,12 @@ def retrieve_citations( "source": "...", } """ + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("retriever.retrieve_citations", {"agent": "retriever", "top_k": top_k}) + query = build_query_from_segment(segment_result) path = jsonl_path or str(DEFAULT_JSONL_PATH) @@ -149,18 +155,30 @@ def retrieve_citations( original_text = doc.page_content or "" redacted = redact_pii(original_text) - citations.append( - { - "id": meta.get("id"), - "title": meta.get("title"), - "section": meta.get("section"), - "text": original_text, - "redacted_text": redacted, - "url": meta.get("url"), - "published_date": meta.get("published_date"), - "source": meta.get("source", "corpus"), - } - ) + citation = { + "id": meta.get("id"), + "title": meta.get("title"), + "section": meta.get("section"), + "text": original_text, + "redacted_text": redacted, + "url": meta.get("url"), + "published_date": meta.get("published_date"), + "source": meta.get("source", "corpus"), + } + citations.append(citation) + + if run_id: + try: + # Record the query and number of citations + log_event(run_id, "query", {"query": query, "top_k": top_k}) + log_event(run_id, "citations_returned", {"count": len(citations)}) + finish_run(run_id, status="success", outputs={"count": len(citations)}) + except Exception: + # Do not let monitoring errors break retrieval + try: + finish_run(run_id, status="error", outputs={}) + except Exception: + pass return citations diff --git a/backend/agents/safety_gate.py b/backend/agents/safety_gate.py index 7a841078..79fec1ab 100644 --- a/backend/agents/safety_gate.py +++ b/backend/agents/safety_gate.py @@ -212,6 +212,17 @@ def safety_check_and_filter(variants: List[Dict[str, Any]]) -> Dict[str, Any]: ] } """ + # Opt-in instrumentation + try: + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + except Exception: + start_run = log_event = finish_run = lambda *a, **k: None + LANGSMITH_ENABLED = False + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("safety_gate.safety_check_and_filter", {"variant_count": len(variants)}) + safe: List[Dict[str, Any]] = [] blocked: List[Dict[str, Any]] = [] @@ -240,6 +251,17 @@ def safety_check_and_filter(variants: List[Dict[str, Any]]) -> Dict[str, Any]: result = {"safe": safe, "blocked": blocked} logger.info("safety_check_and_filter: safe=%d blocked=%d", len(safe), len(blocked)) + + if run_id: + try: + log_event(run_id, "safety_result", {"safe": len(safe), "blocked": len(blocked)}) + finish_run(run_id, status="success", outputs={"safe": len(safe), "blocked": len(blocked)}) + except Exception: + try: + finish_run(run_id, status="error", outputs={}) + except Exception as e: + logger.exception("Exception occurred while calling finish_run in error handling: %s", e) + return result diff --git a/backend/agents/segmenter.py b/backend/agents/segmenter.py index 5087c78c..75992ef3 100644 --- a/backend/agents/segmenter.py +++ b/backend/agents/segmenter.py @@ -25,6 +25,17 @@ def segment_user(customer: dict) -> dict: } """ + # Opt-in instrumentation (no-op when disabled) + try: + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + except Exception: + start_run = log_event = finish_run = lambda *a, **k: None + LANGSMITH_ENABLED = False + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("segmenter.segment_user", {"user_id": customer.get("user_id"), "email": customer.get("email")}) + viewed_page = (customer.get("viewed_page") or "").strip().lower() form_started = to_bool(customer.get("form_started")) scheduled = to_bool(customer.get("scheduled")) @@ -65,7 +76,7 @@ def segment_user(customer: dict) -> dict: segment_label = f"{use_case}:{funnel_stage}" reasons.insert(0, f"interested in: {use_case_label}") - return { + result = { "segment": segment_label, "use_case": use_case, "use_case_label": use_case_label, @@ -74,6 +85,18 @@ def segment_user(customer: dict) -> dict: "reasons": reasons, } + if run_id: + try: + log_event(run_id, "segment_computed", {"segment": segment_label, "intent_level": intent_level}) + finish_run(run_id, status="success", outputs={"segment": segment_label}) + except Exception: + try: + finish_run(run_id, status="error", outputs={}) + except Exception: + pass + + return result + def load_customers_from_csv(csv_path: str): customers = [] diff --git a/backend/app/config.py b/backend/app/config.py index dcb8b7b7..2aa956d3 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,3 +1,5 @@ +# backend/app/config.py + import os from typing import List from dotenv import load_dotenv @@ -10,19 +12,32 @@ VECTOR_DB_API_KEY = os.getenv('VECTOR_DB_API_KEY') DELIVERY_PROVIDER_API_KEY = os.getenv('DELIVERY_PROVIDER_API_KEY') LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO') + # Azure OpenAI (for embeddings + chat) AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") -AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview") +AZURE_OPENAI_API_VERSION = os.getenv( + "AZURE_OPENAI_API_VERSION", + "2024-02-15-preview", +) # Embeddings deployment (to use this in vector_db) AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT = os.getenv( "AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT" ) -# NEW: chat deployment name for generator +# Chat deployment name for generator AZURE_OPENAI_CHAT_DEPLOYMENT = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") +# === NEW: Azure Speech + Translator config for media services === +AZURE_SPEECH_KEY = os.getenv("AZURE_SPEECH_KEY") +AZURE_SPEECH_REGION = os.getenv("AZURE_SPEECH_REGION") + +AZURE_TRANSLATOR_KEY = os.getenv("AZURE_TRANSLATOR_KEY") +AZURE_TRANSLATOR_REGION = os.getenv("AZURE_TRANSLATOR_REGION") +AZURE_TRANSLATOR_ENDPOINT = os.getenv("AZURE_TRANSLATOR_ENDPOINT") +# e.g. "https://api.cognitive.microsofttranslator.com" + # Optional: non-Azure OpenAI fallback OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_MODEL_NAME = os.getenv("OPENAI_MODEL_NAME", "gpt-4o-mini") @@ -33,25 +48,37 @@ # CORS origins: allow explicit env var or sensible defaults in development _env_origins = os.getenv("ALLOWED_ORIGINS") if _env_origins: - ALLOWED_ORIGINS: List[str] = [o.strip() for o in _env_origins.split(",") if o.strip()] + ALLOWED_ORIGINS: List[str] = [ + o.strip() for o in _env_origins.split(",") if o.strip() + ] else: - if ENV == "production": - # In production default to an explicit, empty list to force operators to opt-in. - prod_origin = os.getenv("PRODUCTION_ORIGIN") - ALLOWED_ORIGINS = [prod_origin] if prod_origin else [] - else: - # Development-friendly defaults - ALLOWED_ORIGINS = [ - "http://localhost", - "http://localhost:3000", - "http://127.0.0.1:3000", - "http://localhost:8000", - ] + if ENV == "production": + # In production default to an explicit, empty list to force operators to opt-in. + prod_origin = os.getenv("PRODUCTION_ORIGIN") + ALLOWED_ORIGINS = [prod_origin] if prod_origin else [] + else: + # Development-friendly defaults + ALLOWED_ORIGINS = [ + "http://localhost", + "http://localhost:3000", + "http://127.0.0.1:3000", + "http://localhost:8000", + ] + def get_allowed_origins() -> List[str]: - """Return the configured origins list.""" - return ALLOWED_ORIGINS + """Return the configured origins list.""" + return ALLOWED_ORIGINS # Redis URL for adapter (optional - set to enable Redis-backed store) REDIS_URL = os.getenv("REDIS_URL") + + +def is_debug_enabled() -> bool: + """Return True when debug routes/features should be enabled. + + Controlled via the `ECHO_DEBUG` environment variable (1/true/yes). + """ + val = os.getenv("ECHO_DEBUG", "").lower() + return val in ("1", "true", "yes") diff --git a/backend/app/graph/langgraph_flow.py b/backend/app/graph/langgraph_flow.py index 942cebf9..eaaca0a6 100644 --- a/backend/app/graph/langgraph_flow.py +++ b/backend/app/graph/langgraph_flow.py @@ -6,6 +6,7 @@ from app.nodes.retriever_node import RetrieverNode from app.nodes.generator_node import GeneratorNode from app.nodes.safety_node import SafetyNode +from app.nodes.assignment_node import AssignmentNode from app.nodes.analytics_node import AnalyticsNode from app.nodes.hitl_node import HITLNode from services.delivery import send_email_mock @@ -20,6 +21,7 @@ class FlowState(TypedDict, total=False): citations: List[Dict[str, Any]] variants: List[Dict[str, Any]] safety: Dict[str, Any] + assignment: Dict[str, Any] hitl: Dict[str, Any] analysis: Dict[str, Any] delivery: Dict[str, Any] @@ -82,6 +84,26 @@ def safety_node(state: FlowState) -> FlowState: return state +def assignment_node(state: FlowState) -> FlowState: + """ + Assign the customer to an experiment variant deterministically. + Stores the assignment under state['assignment'] and leaves other + state values intact. + """ + customer = state.get("customer") or {} + experiment_id = customer.get("experiment_id") or customer.get("exp_id") or "exp_default" + + assignment_input = { + "user_id": customer.get("id"), + "experiment_id": experiment_id, + "context": {"segment": state.get("segment")}, + } + + assignment = AssignmentNode().run(assignment_input) + state["assignment"] = assignment + return state + + def hitl_node(state: FlowState) -> FlowState: """ Human-in-the-loop (HITL) node. @@ -179,6 +201,7 @@ def build_graph(): graph.add_node("segmenter", segmenter_node) graph.add_node("retriever", retriever_node) graph.add_node("generator", generator_node) + graph.add_node("assignment", assignment_node) graph.add_node("safety", safety_node) graph.add_node("hitl", hitl_node) graph.add_node("analytics", analytics_node) @@ -188,7 +211,8 @@ def build_graph(): graph.set_entry_point("segmenter") graph.add_edge("segmenter", "retriever") graph.add_edge("retriever", "generator") - graph.add_edge("generator", "safety") + graph.add_edge("generator", "assignment") + graph.add_edge("assignment", "safety") graph.add_edge("safety", "hitl") graph.add_edge("hitl", "analytics") graph.add_edge("analytics", "delivery") diff --git a/backend/app/graph/orchestrator.py b/backend/app/graph/orchestrator.py index b369be80..cc2b3ed7 100644 --- a/backend/app/graph/orchestrator.py +++ b/backend/app/graph/orchestrator.py @@ -12,6 +12,12 @@ from app.graph.langgraph_flow import build_graph from services.logger import get_logger from app.nodes.segmenter_node import SegmenterNode +from app.nodes.retriever_node import RetrieverNode +from app.nodes.generator_node import GeneratorNode +from app.nodes.safety_node import SafetyNode +from app.nodes.hitl_node import HITLNode +from app.nodes.analytics_node import AnalyticsNode +from services.delivery import send_email_mock logger = get_logger("graph.orchestrator") @@ -31,12 +37,11 @@ def __init__( store_: Optional[MemoryStore] = None, logger_=None, segmenter: Optional[SegmenterNode] = None, - # kept for backward compatibility, but no longer used directly: - retriever=None, - generator=None, - safety=None, - hitl=None, - analytics=None, + retriever: Optional[RetrieverNode] = None, + generator: Optional[GeneratorNode] = None, + safety: Optional[SafetyNode] = None, + hitl: Optional[HITLNode] = None, + analytics: Optional[AnalyticsNode] = None, ) -> None: # Prefer an explicitly provided store, otherwise fall back to the module-level `store` self.store = store_ or store @@ -44,9 +49,11 @@ def __init__( # Segmenter can be overridden via FastAPI dependency overrides in tests self.segmenter = segmenter or SegmenterNode() - - # Build the LangGraph graph once per orchestrator instance - self.graph = build_graph() + self.retriever = retriever or RetrieverNode() + self.generator = generator or GeneratorNode() + self.safety = safety or SafetyNode() + self.hitl = hitl or HITLNode() + self.analytics = analytics or AnalyticsNode() def close(self) -> None: """Cleanup resources held by the orchestrator or graph. @@ -54,14 +61,13 @@ def close(self) -> None: This is a best-effort hook. If the graph exposes a `close` or `shutdown` method it will be called. """ - try: - close_fn = getattr(self.graph, "close", None) or getattr( - self.graph, "shutdown", None - ) - if callable(close_fn): - close_fn() - except Exception: - logger.exception("error closing graph") + for node in (self.segmenter, self.retriever, self.generator, self.safety,self.hitl, self.analytics): + try: + close_fn = getattr(node, "close", None) or getattr(node, "shutdown", None) + if callable(close_fn): + close_fn() + except Exception: + logger.exception("error closing node %s", type(node).__name__) async def run_flow(self, flow_name: str, payload: Dict[str, Any]) -> Dict[str, Any]: """Run a named flow with the given payload via the LangGraph graph. @@ -81,57 +87,61 @@ async def run_flow(self, flow_name: str, payload: Dict[str, Any]) -> Dict[str, A f"{key}:flow_started", {"flow": flow_name, "payload": payload} ) except Exception: - # best-effort: store is optional and should not break execution - logger.exception("failed to persist flow marker") + self.logger.exception("failed to persist variants") - # Compatibility hook: call the injected segmenter and persist its output. - # This allows tests to override `get_segmenter` and assert on - # store["{id}:segment"]. + # Build and invoke the LangGraph flow to get a final state dict. try: - segment_from_node = self.segmenter.run(payload) - self.logger.info("Segment (node): %s", segment_from_node) - self.store.set(f"{key}:segment", segment_from_node) + graph = build_graph() + final_state = graph.invoke({"customer": payload}) except Exception: - self.logger.exception("failed to run or persist segmenter output") - - # Invoke the LangGraph graph with the expected input shape - # The graph is expected to work off `{"customer": payload}` - self.logger.info("Invoking graph for flow '%s'", flow_name) - result_state = await self.graph.ainvoke({"customer": payload}) or {} - - # Extract values from the graph state - segment = result_state.get("segment") - citations = result_state.get("citations") - variants = result_state.get("variants") - safety = result_state.get("safety") - hitl = result_state.get("hitl") - analysis = result_state.get("analysis") - delivery = result_state.get("delivery") - - # Persist other pieces of state for inspection/debugging. - # NOTE: we intentionally do NOT overwrite "segment" here so tests - # that override the segmenter still see their injected value in the store. - for field_name, value in [ - ("citations", citations), - ("variants", variants), - ("safety", safety), - ("hitl", hitl), - ("analysis", analysis), - ("delivery", delivery), - ]: - try: - self.store.set(f"{key}:{field_name}", value) - except Exception: - self.logger.exception("failed to persist %s", field_name) + self.logger.exception("failed to invoke LangGraph flow") + final_state = {} + + # Extract expected fields from the final state with safe fallbacks. + segment = final_state.get("segment") + citations = final_state.get("citations") + variants = final_state.get("variants") + safety_result = final_state.get("safety") or {} + hitl_result = final_state.get("hitl") + analysis = final_state.get("analysis") + delivery_result = final_state.get("delivery") + + # Log safety summary if present + if isinstance(safety_result, dict): + safe_count = len(safety_result.get("safe", [])) + blocked_count = len(safety_result.get("blocked", [])) + self.logger.info(f"Safety safe={safe_count} blocked={blocked_count}") + + # Persist transient results (best-effort) + try: + if segment is not None: + # Allow an injected/overridden segmenter (useful in tests) + try: + segment_from_node = self.segmenter.run(payload) + segment = segment_from_node + except Exception: + # Fall back to the segment computed by the graph + pass + self.store.set(f"{key}:segment", segment) + if citations is not None: + self.store.set(f"{key}:citations", citations) + if variants is not None: + self.store.set(f"{key}:variants", variants) + if hitl_result is not None: + self.store.set(f"{key}:hitl", hitl_result) + if analysis is not None: + self.store.set(f"{key}:analysis", analysis) + except Exception: + self.logger.exception("failed to persist transient state") - response: Dict[str, Any] = { + response = { "segment": segment, "citations": citations, "variants": variants, - "safety": safety, - "hitl": hitl, + "safety": safety_result, + "hitl": hitl_result, "analysis": analysis, - "delivery": delivery, + "delivery": delivery_result, } self.logger.info("Orchestrator.run_flow completed: %s", flow_name) diff --git a/backend/app/main.py b/backend/app/main.py index c066cac3..26fddbc5 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,14 +1,15 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from .config import get_allowed_origins +from .config import get_allowed_origins, is_debug_enabled from pydantic import BaseModel from typing import Optional, Dict, Any import uvicorn -from services.logger import get_logger +from backend.services.logger import get_logger from .routers.health import router as health_router from .routers.orchestrator import router as orchestrator_router - +from .routers.media import router as media_router # added import for media router +from .routers.hitl import router as hitl_router # added import for hitl router logger = get_logger('orchestrator') app = FastAPI(title='EchoVoice-AI Orchestrator') @@ -27,6 +28,13 @@ # Register routers (standard `routers/` package) app.include_router(health_router) app.include_router(orchestrator_router) +app.include_router(media_router) # included media router +app.include_router(hitl_router) # included hitl router + +# Mount debug router only when explicitly enabled in environment (dev only) +if is_debug_enabled(): + app.include_router(debug_router) + if __name__ == '__main__': diff --git a/backend/app/nodes/assignment_node.py b/backend/app/nodes/assignment_node.py new file mode 100644 index 00000000..605a9dca --- /dev/null +++ b/backend/app/nodes/assignment_node.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Any, Dict, Optional + +from .base_node import BaseNode +from agents.assignment_agent import ABAssignmentAgent + + +class AssignmentNode(BaseNode): + """ + Thin adapter node to assign a user for an experiment. + + Expected input: + { + "user_id": "...", + "experiment_id": "...", + "context": {...} # optional + } + + Returns: + assignment dict (see ABAssignmentAgent.assign_user) + """ + + def __init__(self, name: str = "assignment", agent: Optional[ABAssignmentAgent] = None): + super().__init__(name) + self.agent = agent or ABAssignmentAgent() + + def run(self, data: Dict[str, Any]) -> Dict[str, Any]: + user_id = data.get("user_id") or data.get("customer", {}).get("id") + experiment_id = data.get("experiment_id") or data.get("exp_id") or "exp_default" + context = data.get("context") + return self.agent.assign_user(user_id, experiment_id, context=context) diff --git a/backend/app/nodes/hitl_node.py b/backend/app/nodes/hitl_node.py index 1285c31f..5e3ee614 100644 --- a/backend/app/nodes/hitl_node.py +++ b/backend/app/nodes/hitl_node.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Optional from uuid import uuid4 +from app.store import reviews as review_store #to persist reviews from services.logger import get_logger @@ -22,10 +23,10 @@ class HITLNode: NOTE: - This node does NOT block on a human decision. - It just prepares the review metadata and logs it. - - In a future step, you can persist this in a database/Redis/queue - and build API endpoints like: + - API endpoints like: - GET /hitl/{review_id} - - POST /hitl/decide (approve/reject) + - POST /hitl/{review_id}/decision (approve/reject) + can load and update the stored review object. """ def __init__(self, logger: Optional[Any] = None) -> None: @@ -68,6 +69,18 @@ def run( # Generate a unique review_id that a UI or API can later use review_id = f"review_{uuid4().hex}" + # Persist full HITL review (customer + variants) so it can be fetched later + # via GET /hitl/{review_id} and updated by POST /hitl/{review_id}/decision. + # We also log it for traceability / debugging. + try: + review_store.create_review(review_id, customer, variants) + except Exception: + # If persistence fails, log the error but still return the hitl_payload + # so the rest of the flow can proceed. + self.logger.exception( + "HITLNode: failed to persist review %s to store", review_id + ) + hitl_payload = { "review_id": review_id, "status": "pending_human_approval", @@ -76,9 +89,7 @@ def run( "num_variants": len(variants), } - # In a real system, you might persist (customer + variants) - # keyed by review_id in a DB or Redis here. - # For now, we only log it so you can see it in the logs. + self.logger.info( "HITLNode: created review job %s for customer %s with %d variants", review_id, diff --git a/backend/app/routers/debug.py b/backend/app/routers/debug.py new file mode 100644 index 00000000..06b9098e --- /dev/null +++ b/backend/app/routers/debug.py @@ -0,0 +1,185 @@ +from fastapi import APIRouter, Depends, Query, HTTPException +from typing import Any, Dict, List, Optional +import os +import time + +from pydantic import BaseModel + +from ..graph.orchestrator import Orchestrator +from .orchestrator import get_orchestrator, OrchestrateRequest +from backend.services.logger import get_logger + +logger = get_logger("routers.debug") + +router = APIRouter(prefix="/debug", tags=["debug"]) + + +class Preview(BaseModel): + user_id: str + email: str + subject: Optional[str] = None + body: Optional[str] = None + # compatibility alias some frontends expect + body_text: Optional[str] = None + variant_id: Optional[str] = None + blocked: bool = False + error: Optional[str] = None + + +class PreviewsResponse(BaseModel): + previews: List[Preview] + + +# Precomputed mock previews used when ?mock=true +PRECOMPUTED_PREVIEWS = [ + { + "user_id": "U001", + "email": "emma@example.com", + "subject": "Hi Emma, quick note about running shoes", + "body": "Hi Emma,\n\nWe thought you might like this: …\n\n— Team", + "body_text": "Hi Emma,\n\nWe thought you might like this: …\n\n— Team", + "variant_id": "A", + "blocked": False, + }, + { + "user_id": "U002", + "email": "liam@example.com", + "subject": "Liam, more on the Acme plan", + "body": "Hello Liam,\n\nDetails: …\nLearn more on our site.", + "body_text": "Hello Liam,\n\nDetails: …\nLearn more on our site.", + "variant_id": "B", + "blocked": False, + }, +] + + +# Simple process-local TTL cache for debug previews (dev-only). Keyed by mock flag. +# Each entry: { 'value': , 'expires_at': } +_DEBUG_PREVIEWS_CACHE: Dict[str, Dict[str, Any]] = {} + + +def _get_cache_ttl_seconds() -> Optional[int]: + v = os.environ.get("ECHO_DEBUG_CACHE_TTL") + if not v: + return None + try: + t = int(v) + if t <= 0: + return None + return t + except Exception: + return None + + +@router.get("/deliveries", response_model=PreviewsResponse) +async def get_debug_deliveries( + orchestrator: Orchestrator = Depends(get_orchestrator), + mock: bool = Query(False, description="Return precomputed mock previews without running pipeline"), +) -> Dict[str, Any]: + """ + Debug endpoint (Issue #11). + + Runs the lightweight orchestrator for a couple of mock customers and + returns only the email preview outputs (to, subject, body) suitable + for the UI preview. + """ + + mock_customers = [ + {"id": "U001", "name": "Emma", "email": "emma@example.com"}, + {"id": "U002", "name": "Liam", "email": "liam@example.com"}, + ] + + cache_ttl = _get_cache_ttl_seconds() + cache_key = f"mock={bool(mock)}" + + # Serve cached response when TTL enabled and cache valid + if cache_ttl: + entry = _DEBUG_PREVIEWS_CACHE.get(cache_key) + if entry and entry.get("expires_at", 0) > time.time(): + logger.debug("returning cached debug previews for %s", cache_key) + return entry["value"] + + if mock: + response = {"previews": PRECOMPUTED_PREVIEWS} + if cache_ttl: + _DEBUG_PREVIEWS_CACHE[cache_key] = {"value": response, "expires_at": time.time() + cache_ttl} + return response + + previews: List[Dict[str, Any]] = [] + + for c in mock_customers: + preview = { + "user_id": c.get("id"), + "email": c.get("email"), + "subject": None, + "body": None, + "body_text": None, + "variant_id": None, + "blocked": False, + "error": None, + } + + try: + result = await orchestrator.run_flow("default_personalization", c) + except Exception: + logger.exception("debug preview run_flow failed for %s", c.get("id")) + preview["error"] = "pipeline failed" + previews.append(preview) + continue + + # Extract winner (if any) and safe variants + winner = result.get("analysis", {}).get("winner") if isinstance(result.get("analysis"), dict) else None + safe_variants = result.get("safety", {}).get("safe", []) if isinstance(result.get("safety"), dict) else [] + + variant = None + if winner and isinstance(winner, dict): + variant_id = winner.get("variant_id") + variant = next((v for v in safe_variants if v.get("id") == variant_id), None) + + # Fallback: use first safe variant if no explicit winner + if not variant and safe_variants: + variant = safe_variants[0] + + if variant: + preview["variant_id"] = variant.get("id") + preview["subject"] = variant.get("subject") + preview["body"] = variant.get("body") + # compatibility alias for frontends that look for `body_text` + preview["body_text"] = variant.get("body") + else: + preview["blocked"] = True + + previews.append(preview) + + response = {"previews": previews} + if cache_ttl: + _DEBUG_PREVIEWS_CACHE[cache_key] = {"value": response, "expires_at": time.time() + cache_ttl} + return response + + +@router.post("/run") +async def debug_run_pipeline( + payload: OrchestrateRequest, + orchestrator: Orchestrator = Depends(get_orchestrator), +) -> Dict[str, Any]: + """ + Debug endpoint (Issue #10). + + Runs the full orchestrator pipeline for a single customer and returns + the complete MessageState (all intermediate results) for debugging. + + This endpoint returns the full orchestrator result including: + - segment + - citations + - variants + - safety + - analysis + - delivery + """ + customer = payload.customer.model_dump() + if not customer: + raise HTTPException(status_code=400, detail="customer missing") + + logger.info("debug/run for customer %s", customer.get("id") or customer.get("email")) + result = await orchestrator.run_flow("default_personalization", customer) + return result diff --git a/backend/app/routers/hitl.py b/backend/app/routers/hitl.py new file mode 100644 index 00000000..2ea6eec2 --- /dev/null +++ b/backend/app/routers/hitl.py @@ -0,0 +1,71 @@ +# backend/app/routers/hitl.py + +from typing import Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from app.store import reviews as review_store +from app.store import audit_log + +# This must be named `router` so `from app.routers.hitl import router` works +router = APIRouter( + prefix="/hitl", + tags=["hitl"], +) + + +# ---------- Pydantic models ---------- # + +class HITLDecisionRequest(BaseModel): + """Request body for human decision on a HITL review.""" + approved_variant_id: str + notes: Optional[str] = None + + +# ---------- Endpoints ---------- # + +@router.get("/{review_id}") +async def get_hitl_review(review_id: str): + """ + Fetch a HITL review by ID. + + Returns the full review as stored in the review store. + """ + review = review_store.get_review(review_id) + if review is None: + raise HTTPException(status_code=404, detail="Review not found") + return review + + +@router.post("/{review_id}/decision") +async def submit_hitl_decision(review_id: str, payload: HITLDecisionRequest): + """ + Submit a human decision for a HITL review. + + - Marks the review as 'approved' + - Stores approved_variant_id and optional notes + - Writes an audit log entry + """ + review = review_store.get_review(review_id) + if review is None: + raise HTTPException(status_code=404, detail="Review not found") + + updated = review_store.update_review( + review_id=review_id, + status="approved", + approved_variant_id=payload.approved_variant_id, + notes=payload.notes, + ) + + audit_log.log_action( + review_id=review_id, + user_id=None, # TODO: plug in real reviewer ID once auth is wired + action="DECISION_SUBMIT", + metadata={ + "approved_variant_id": payload.approved_variant_id, + "notes_present": bool(payload.notes), + }, + ) + + return updated diff --git a/backend/app/routers/media.py b/backend/app/routers/media.py new file mode 100644 index 00000000..c45cf22d --- /dev/null +++ b/backend/app/routers/media.py @@ -0,0 +1,100 @@ +# backend/app/routers/media.py + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from services import media as media_service # our service layer with Azure / logic + +router = APIRouter( + prefix="/media", + tags=["media"], +) + + +# --------- Speech to Text --------- # + +class STTRequest(BaseModel): + """Request body for Speech-to-Text.""" + # For now we accept a URL pointing to an audio file. + # Later you can add support for file uploads or base64 if needed. + audio_url: str + + +class STTResponse(BaseModel): + """Response body for Speech-to-Text.""" + text: str + + +@router.post("/speech-to-text", response_model=STTResponse) +async def speech_to_text(payload: STTRequest) -> STTResponse: + """ + Convert speech (audio file at a URL) into text. + + This is meant to support UI flows where a human reviewer dictates content + or needs an audio recording transcribed. + """ + try: + text = await media_service.speech_to_text_from_url(payload.audio_url) + return STTResponse(text=text) + except Exception as e: + # In production you might log more detail but return a generic message. + raise HTTPException(status_code=500, detail="Speech-to-text failed") from e + + +# --------- Text to Speech --------- # + +class TTSRequest(BaseModel): + """Request body for Text-to-Speech.""" + # The email text (or part of it) that should be read out loud. + text: str + + +class TTSResponse(BaseModel): + """Response body for Text-to-Speech.""" + # URL where the generated audio can be fetched/streamed by the UI. + audio_url: str + + +@router.post("/text-to-speech", response_model=TTSResponse) +async def text_to_speech(payload: TTSRequest) -> TTSResponse: + """ + Convert text into speech and return an audio URL. + + The UI can call this so the human reviewer can listen to the email + instead of reading the text. + """ + try: + audio_url = await media_service.text_to_speech_to_url(payload.text) + return TTSResponse(audio_url=audio_url) + except Exception as e: + raise HTTPException(status_code=500, detail="Text-to-speech failed") from e + + +# --------- Translation --------- # + +class TranslateRequest(BaseModel): + """Request body for translation.""" + text: str + target_lang: str # e.g. "es", "fr", "ta" + + +class TranslateResponse(BaseModel): + """Response body for translation.""" + translated_text: str + + +@router.post("/translate", response_model=TranslateResponse) +async def translate(payload: TranslateRequest) -> TranslateResponse: + """ + Translate text into the target language. + + Used by the reviewer UI to see the generated email in different languages. + """ + try: + translated = await media_service.translate_text( + payload.text, + payload.target_lang, + ) + return TranslateResponse(translated_text=translated) + except Exception as e: + raise HTTPException(status_code=500, detail="Translation failed") from e diff --git a/backend/app/routers/orchestrator.py b/backend/app/routers/orchestrator.py index 6abcccec..9e1fdf32 100644 --- a/backend/app/routers/orchestrator.py +++ b/backend/app/routers/orchestrator.py @@ -6,7 +6,11 @@ from app.store import MemoryStore, store from app.graph import Orchestrator from app.nodes.segmenter_node import SegmenterNode - +from app.nodes.retriever_node import RetrieverNode +from app.nodes.generator_node import GeneratorNode +from app.nodes.safety_node import SafetyNode +from app.nodes.hitl_node import HITLNode +from app.nodes.analytics_node import AnalyticsNode router = APIRouter() logger = get_logger("orchestrator") @@ -19,11 +23,35 @@ def get_segmenter() -> SegmenterNode: """Default segmenter provider (can be overridden in tests).""" return SegmenterNode() + +def get_retriever() -> RetrieverNode: + return RetrieverNode() + + +def get_generator() -> GeneratorNode: + return GeneratorNode() + + +def get_safety() -> SafetyNode: + return SafetyNode() +def get_hitl() -> HITLNode: + return HITLNode() + + +def get_analytics() -> AnalyticsNode: + return AnalyticsNode() + + async def get_orchestrator( store: MemoryStore = Depends(get_store), - segmenter: SegmenterNode = Depends(get_segmenter), -) -> AsyncGenerator[Orchestrator, None]: - """Yield a per-request Orchestrator instance. + segmenter: SegmenterNode = Depends(get_segmenter), + retriever: RetrieverNode = Depends(get_retriever), + generator: GeneratorNode = Depends(get_generator), + safety: SafetyNode = Depends(get_safety), + hitl: HITLNode = Depends(get_hitl), + analytics: AnalyticsNode = Depends(get_analytics), + ) -> AsyncGenerator[Orchestrator, None]: + """FastAPI dependency returning a per-request Orchestrator instance. The Orchestrator delegates flow execution to the LangGraph graph. We ensure resources are cleaned up at the end of the request. @@ -31,7 +59,12 @@ async def get_orchestrator( orch = Orchestrator( store_=store, logger_=logger, - segmenter=segmenter, + segmenter=segmenter, + retriever=retriever, + generator=generator, + safety=safety, + hitl=hitl, + analytics=analytics, ) try: yield orch diff --git a/backend/app/store/audit_log.py b/backend/app/store/audit_log.py new file mode 100644 index 00000000..7a53d1b2 --- /dev/null +++ b/backend/app/store/audit_log.py @@ -0,0 +1,101 @@ +# backend/app/store/audit_log.py + +""" +Audit logging for Human-in-the-Loop (HITL) interactions. + +This module records a time-ordered list of actions for each review_id, +so we can answer questions like: +- Who approved which variant, and when? +- How often did the reviewer use TTS, STT, or Translate? +- What language / modality was used? + +Logs are stored in the shared app.store backend (MemoryStore / RedisStore) +under keys of the form: + + hitl:log:{review_id} + +Each log entry is a dict with: + { + "timestamp": ISO-8601 UTC string, + "review_id": str | None, + "user_id": str | None, + "action": str, + "metadata": dict + } +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from app.store import store + + +def _log_key(review_id: str) -> str: + """ + Build the storage key for logs belonging to a specific review_id. + """ + return f"hitl:log:{review_id}" + + +def _now_iso() -> str: + """ + Return current time as an ISO 8601 string in UTC. + """ + return datetime.now(timezone.utc).isoformat() + + +def log_action( + review_id: Optional[str], + action: str, + user_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> None: + """ + Append a single audit log entry for a given review_id. + + Args: + review_id: The HITL review ID this action is associated with. + Can be None if not tied to a specific review, but + grouping by review_id is recommended when available. + action: A short action code, e.g.: + - "TTS_PLAY" + - "STT_TRANSCRIBE" + - "TRANSLATE" + - "APPROVE_DECISION" + user_id: Identifier for the human reviewer or system user (if known). + metadata: Extra context (e.g. variant_id, target_lang, audio_url). + """ + if review_id is None: + # If there's no review_id, we currently skip logging. + # You can extend this later to support global logs if needed. + return + + key = _log_key(review_id) + entry = { + "timestamp": _now_iso(), + "review_id": review_id, + "user_id": user_id, + "action": action, + "metadata": metadata or {}, + } + + # Load existing log list (if any), append, and save back. + logs: Optional[List[Dict[str, Any]]] = store.get(key) + if logs is None: + logs = [] + + logs.append(entry) + store.set(key, logs) + + +def get_logs(review_id: str) -> List[Dict[str, Any]]: + """ + Fetch all audit log entries for a given review_id. + + Returns: + A list of log entry dicts (possibly empty if nothing logged yet). + """ + logs = store.get(_log_key(review_id)) + return logs or [] diff --git a/backend/app/store/reviews.py b/backend/app/store/reviews.py new file mode 100644 index 00000000..83cf21b7 --- /dev/null +++ b/backend/app/store/reviews.py @@ -0,0 +1,78 @@ +# backend/app/store/reviews.py + +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +from app.store.memory_store import MemoryStore # or your actual store class + +_STORE_KEY_PREFIX = "hitl:review:" + +store = MemoryStore() # or whatever you’re already using + + +def _key(review_id: str) -> str: + return f"{_STORE_KEY_PREFIX}{review_id}" + + +def create_review( + review_id: str, + customer: Dict[str, Any], + variants: list[Dict[str, Any]], +) -> Dict[str, Any]: + now = datetime.now(timezone.utc).isoformat() + + review = { + "review_id": review_id, + "customer": customer, + "variants": variants, + "status": "pending_human_approval", + "approved_variant_id": None, + "notes": None, + "created_at": now, + "updated_at": now, + } + + store.set(_key(review_id), review) + return review + + +def get_review(review_id: str) -> Optional[Dict[str, Any]]: + return store.get(_key(review_id)) + + +def update_review( + review_id: str, + *, + status: Optional[str] = None, + approved_variant_id: Optional[str] = None, + notes: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + """ + Update an existing HITL review. + + Only fields passed (not None) are updated. Returns the updated review, + or None if the review does not exist. + """ + review = get_review(review_id) + if review is None: + return None + + changed = False + + if status is not None: + review["status"] = status + changed = True + + if approved_variant_id is not None: + review["approved_variant_id"] = approved_variant_id + changed = True + + if notes is not None: + review["notes"] = notes + changed = True + + if changed: + review["updated_at"] = datetime.now(timezone.utc).isoformat() + store.set(_key(review_id), review) + + return review diff --git a/backend/requirements.txt b/backend/requirements.txt index 140a5c28..0fefd17e 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -76,3 +76,4 @@ uvicorn==0.38.0 xxhash==3.6.0 yarl==1.22.0 zstandard==0.25.0 +azure-cognitiveservices-speech diff --git a/backend/run_swagger_light.py b/backend/run_swagger_light.py new file mode 100644 index 00000000..db8f2dbc --- /dev/null +++ b/backend/run_swagger_light.py @@ -0,0 +1,54 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, Field +from typing import Optional, Dict, Any +import uvicorn + +try: + # Reuse the project's CORS config when available + from app.config import get_allowed_origins + origins = get_allowed_origins() +except Exception: + origins = ["http://localhost", "http://127.0.0.1:3000"] + +app = FastAPI(title="EchoVoice-AI Orchestrator (light)") + +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@app.get("/health") +async def health_check(): + return {"status": "ok"} + + +# Lightweight Pydantic models mirroring the real API so Swagger shows the schema +class CustomerModel(BaseModel): + id: Optional[str] = Field(None, example="U001") + name: Optional[str] = Field(None, example="Selvi") + email: Optional[str] = Field(None, example="a@example.com") + last_event: Optional[str] = Field(None, example="payment_plans") + properties: Dict[str, Any] = Field(default_factory=dict) + + +class OrchestrateRequest(BaseModel): + customer: CustomerModel + + +@app.post("/orchestrate") +async def orchestrate(payload: OrchestrateRequest): + """Dummy orchestrate endpoint for local Swagger UI. + + This returns the parsed payload and does NOT call any heavy services. + It's intended for local testing of the OpenAPI/Swagger UI only. + """ + # Return a minimal response so the docs show request/response shapes + return {"status": "ok", "received": payload.customer.model_dump()} + + +if __name__ == "__main__": + uvicorn.run(app, host="127.0.0.1", port=8000) diff --git a/backend/services/langsmith_monitor.py b/backend/services/langsmith_monitor.py new file mode 100644 index 00000000..bb04d584 --- /dev/null +++ b/backend/services/langsmith_monitor.py @@ -0,0 +1,153 @@ +"""Opt-in LangSmith monitor wrapper. + +This module provides a tiny, safe API to record agent runs/events. +By default it is a no-op. To enable real recording set the env var +`LANGSMITH_API_KEY` (or `LANGSMITH_ENABLED=1`). If the `langsmith` +SDK is not installed but the wrapper is enabled, it will write +local JSON files under `backend/.langsmith_local_runs/` for inspection. + +The API is intentionally small and synchronous to keep it low-risk. +""" +from __future__ import annotations + +import json +import os +import time +import uuid +import hashlib +import hmac +from pathlib import Path +from typing import Any, Dict, Optional + +LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY") +LANGSMITH_ENABLED = bool(os.getenv("LANGSMITH_ENABLED") == "1" or LANGSMITH_API_KEY) +# Optional secret for deterministic pseudonymization (HMAC-SHA256). Keep this out of source control. +LANGSMITH_HMAC_SECRET = os.getenv("LANGSMITH_HMAC_SECRET") + +# Try to import the real langsmith SDK if available. We don't require it. +HAS_SDK = False +_client = None +try: + # import langsmith # Removed unused import + + HAS_SDK = True + # Defer client creation until needed and after validating API key +except Exception: + HAS_SDK = False + + +RUNS_DIR = Path(__file__).resolve().parents[1] / ".langsmith_local_runs" +RUNS_DIR.mkdir(parents=True, exist_ok=True) + + +def _now() -> float: + return time.time() + + +def start_run(name: str, metadata: Optional[Dict[str, Any]] = None) -> str: + """Start a run and return a run_id. + + This is lightweight and synchronous. If disabled, returns a generated id + but does not persist anything. + """ + run_id = str(uuid.uuid4()) + metadata = metadata or {} + + # Pseudonymize known identifier keys to avoid writing raw PII. + # We look for common keys and replace them with a deterministic hash. + id_keys = ["customer_id", "user_id", "customer"] + for k in list(metadata.keys()): + if k in id_keys and isinstance(metadata.get(k), str): + raw = metadata.pop(k) + secret = LANGSMITH_HMAC_SECRET + if secret: + digest = hmac.new(secret.encode("utf-8"), raw.encode("utf-8"), hashlib.sha256).hexdigest() + method = "hmac-sha256" + else: + # Fallback to plain sha256 if no secret is available. Still non-reversible. + digest = hashlib.sha256(raw.encode("utf-8")).hexdigest() + method = "sha256" + + metadata["customer_id_hash"] = f"sha256:{digest}" + metadata.setdefault("pseudonymization", {})["method"] = method + metadata.setdefault("pseudonymization", {})["secret_present"] = bool(secret) + + record = { + "id": run_id, + "name": name, + "metadata": metadata, + "start_time": _now(), + "events": [], + } + + if not LANGSMITH_ENABLED: + return run_id + + if HAS_SDK and LANGSMITH_API_KEY: + # If the LangSmith client API is available, integrate here. + # We avoid a hard dependency on the SDK; implement direct client + # wiring later if desired. For now, write a local file as well. + pass + + # Write initial record to local file for visibility + path = RUNS_DIR / f"{run_id}.json" + with path.open("w", encoding="utf-8") as f: + json.dump(record, f, ensure_ascii=False, indent=2) + + return run_id + + +def log_event(run_id: str, name: str, payload: Optional[Dict[str, Any]] = None) -> None: + """Log a named event to the run record. No-op if disabled.""" + if not LANGSMITH_ENABLED: + return + + payload = payload or {} + path = RUNS_DIR / f"{run_id}.json" + if not path.exists(): + # Create a minimal record if missing + rec = { + "id": run_id, + "name": None, + "metadata": {}, + "start_time": _now(), + "events": [], + } + else: + with path.open("r", encoding="utf-8") as f: + try: + rec = json.load(f) + except Exception: + rec = {"id": run_id, "events": []} + + rec.setdefault("events", []).append({"time": _now(), "name": name, "payload": payload}) + + with path.open("w", encoding="utf-8") as f: + json.dump(rec, f, ensure_ascii=False, indent=2) + + +def finish_run(run_id: str, status: str = "success", outputs: Optional[Dict[str, Any]] = None) -> None: + """Finish the run by marking end time and optional outputs. No-op if disabled.""" + if not LANGSMITH_ENABLED: + return + + outputs = outputs or {} + path = RUNS_DIR / f"{run_id}.json" + if not path.exists(): + rec = {"id": run_id, "events": []} + else: + with path.open("r", encoding="utf-8") as f: + try: + rec = json.load(f) + except Exception: + rec = {"id": run_id, "events": []} + + rec["end_time"] = _now() + rec["status"] = status + rec["outputs"] = outputs + + with path.open("w", encoding="utf-8") as f: + json.dump(rec, f, ensure_ascii=False, indent=2) + + +__all__ = ["start_run", "log_event", "finish_run", "LANGSMITH_ENABLED"] diff --git a/backend/services/media.py b/backend/services/media.py new file mode 100644 index 00000000..c0b8d8a1 --- /dev/null +++ b/backend/services/media.py @@ -0,0 +1,191 @@ +# backend/services/media.py + +""" +Media-related services for EchoVoice AI. + +Supports: +- Speech-to-Text (STT) using Azure Speech (real when configured, stub fallback) +- Text-to-Speech (TTS) using Azure Speech (real when configured, stub fallback) +- Translation using Azure Translator (real) +""" + +from __future__ import annotations + +import asyncio +import uuid +from pathlib import Path +from typing import Optional, Any + +import httpx + +# Azure SDK (TTS/STT) +import azure.cognitiveservices.speech as speechsdk # type: ignore + +from app import config +from services.logger import get_logger + + +logger = get_logger("media") + + +TTS_DIR = Path("data/tts") +TTS_DIR.mkdir(exist_ok=True, parents=True) + +STT_TMP = Path("data/stt_tmp") +STT_TMP.mkdir(exist_ok=True, parents=True) + + +# --------------------------------------------------------- +# Helper: Check whether Azure Speech is configured +# --------------------------------------------------------- + +def _has_speech_config() -> bool: + return bool(config.AZURE_SPEECH_KEY and config.AZURE_SPEECH_REGION) + + +def _get_speech_config() -> speechsdk.SpeechConfig: + if not _has_speech_config(): + raise RuntimeError("Azure Speech config missing") + + return speechsdk.SpeechConfig( + subscription=config.AZURE_SPEECH_KEY, + region=config.AZURE_SPEECH_REGION, + ) + + +# --------------------------------------------------------- +# SPEECH TO TEXT +# --------------------------------------------------------- + +async def speech_to_text_from_url(audio_url: str) -> str: + """ + Speech-to-text: + - If Azure Speech configured → real STT + - Else → stub fallback (local dev) + """ + + # Fallback: no Azure config + if not _has_speech_config(): + logger.info("STT fallback mode — no Azure config set") + return f"[STT stub transcript for {audio_url}]" + + # Real Azure path + logger.info("Downloading audio from %s for STT", audio_url) + + tmp_path = STT_TMP / f"{uuid.uuid4().hex}.wav" + + async with httpx.AsyncClient() as client: + r = await client.get(audio_url) + r.raise_for_status() + tmp_path.write_bytes(r.content) + + text = await asyncio.get_event_loop().run_in_executor( + None, _stt_local_file, tmp_path + ) + + try: + tmp_path.unlink(missing_ok=True) + except: + pass + + return text + + +def _stt_local_file(path: Path) -> str: + speech_config = _get_speech_config() + audio_config = speechsdk.AudioConfig(filename=str(path)) + recognizer = speechsdk.SpeechRecognizer( + speech_config=speech_config, + audio_config=audio_config, + ) + + result = recognizer.recognize_once() + + if result.reason == speechsdk.ResultReason.RecognizedSpeech: + return result.text + + raise RuntimeError(f"Azure STT failed — reason={result.reason}") + + +# --------------------------------------------------------- +# TEXT TO SPEECH +# --------------------------------------------------------- + +async def text_to_speech_to_url(text: str) -> str: + """ + Text-to-speech: + - Real Azure TTS when configured + - Stub fallback when missing + """ + + # Fallback + if not _has_speech_config(): + logger.info("TTS fallback mode — no Azure config set") + fake_url = "data/tts/fake-audio.wav" + return fake_url + + # Real Azure TTS + out_path = await asyncio.get_event_loop().run_in_executor( + None, _tts_local_file, text + ) + return str(out_path) + + +def _tts_local_file(text: str) -> Path: + speech_config = _get_speech_config() + speech_config.speech_synthesis_voice_name = ( + config.AZURE_SPEECH_TTS_VOICE or "en-US-JennyNeural" + ) + + filename = TTS_DIR / f"{uuid.uuid4().hex}.wav" + + audio_cfg = speechsdk.audio.AudioOutputConfig(filename=str(filename)) + synthesizer = speechsdk.SpeechSynthesizer( + speech_config=speech_config, audio_config=audio_cfg + ) + + result = synthesizer.speak_text_async(text).get() + + if result.reason != speechsdk.ResultReason.SynthesizingAudioCompleted: + raise RuntimeError(f"Azure TTS failed — reason={result.reason}") + + return filename + + +# --------------------------------------------------------- +# TRANSLATION (REAL ONLY) +# --------------------------------------------------------- + +def _require_translator(): + if not config.AZURE_TRANSLATOR_KEY or not config.AZURE_TRANSLATOR_ENDPOINT: + raise RuntimeError( + "Azure Translator missing. " + "Set AZURE_TRANSLATOR_KEY + AZURE_TRANSLATOR_ENDPOINT" + ) + + +async def translate_text(text: str, target_lang: str) -> str: + _require_translator() + + endpoint = config.AZURE_TRANSLATOR_ENDPOINT.rstrip("/") + url = f"{endpoint}/translate" + + params = {"api-version": "3.0", "to": target_lang} + headers = { + "Content-Type": "application/json", + "Ocp-Apim-Subscription-Key": config.AZURE_TRANSLATOR_KEY, + "Ocp-Apim-Subscription-Region": config.AZURE_TRANSLATOR_REGION or "", + } + + body = [{"Text": text}] + + logger.info("Calling Azure Translator → %s", target_lang) + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, params=params, headers=headers, json=body) + + resp.raise_for_status() + data = resp.json() + + translated = data[0]["translations"][0]["text"] + return translated diff --git a/backend/services/vector_db.py b/backend/services/vector_db.py index 117eaaf3..baef98a9 100644 --- a/backend/services/vector_db.py +++ b/backend/services/vector_db.py @@ -20,10 +20,36 @@ from pathlib import Path from typing import List, Dict, Any, Optional -from langchain_openai import AzureOpenAIEmbeddings -from langchain_community.vectorstores import FAISS -from langchain_core.documents import Document -from langchain_community.embeddings import FakeEmbeddings +# Defer optional heavy imports (langchain, FAISS) until runtime so tests that +# don't require real embeddings can import this module without installing +# langchain-related packages. +AzureOpenAIEmbeddings = None +FAISS = None +Document = None +FakeEmbeddings = None +try: + from langchain_openai import AzureOpenAIEmbeddings # type: ignore +except Exception: + AzureOpenAIEmbeddings = None + +try: + from langchain_community.vectorstores import FAISS # type: ignore +except Exception: + FAISS = None + +try: + from langchain_core.documents import Document # type: ignore +except Exception: + # Provide a minimal Document placeholder for typing/runtime when missing + class Document: + def __init__(self, page_content: str = "", metadata: dict | None = None): + self.page_content = page_content + self.metadata = metadata or {} + +try: + from langchain_community.embeddings import FakeEmbeddings # type: ignore +except Exception: + FakeEmbeddings = None # Default JSONL path (can be overridden by callers) @@ -31,7 +57,7 @@ DEFAULT_JSONL_PATH = PROJECT_ROOT / "data" / "irs_tax_knowledge.jsonl" # Internal cache -_VECTORSTORE: Optional[FAISS] = None +_VECTORSTORE: Optional[Any] = None _CORPUS_CACHE: Optional[List[Dict[str, Any]]] = None @@ -54,7 +80,20 @@ def get_embedding_model(): "[vector_db] Azure OpenAI config missing – using FakeEmbeddings for local testing." ) # size can be anything consistent; 1536 is a common embedding dimension - return FakeEmbeddings(size=1536) + if FakeEmbeddings is not None: + return FakeEmbeddings(size=1536) + # Minimal in-memory fallback embedding model: returns zero vectors + class _ZeroEmbeddings: + def __init__(self, size: int = 1536): + self.size = size + + def embed_documents(self, docs): + return [[0.0] * self.size for _ in docs] + + def embed_query(self, q): + return [0.0] * self.size + + return _ZeroEmbeddings(size=1536) # Real Azure embeddings for production return AzureOpenAIEmbeddings( @@ -88,7 +127,7 @@ def load_corpus(jsonl_path: Optional[str] = None) -> List[Dict[str, Any]]: return docs -def _build_vectorstore(jsonl_path: Optional[str] = None) -> FAISS: +def _build_vectorstore(jsonl_path: Optional[str] = None) -> Any: """ Build a FAISS vector store from the JSONL corpus. @@ -111,13 +150,25 @@ def _build_vectorstore(jsonl_path: Optional[str] = None) -> FAISS: "source": d.get("source", "corpus"), } documents.append(Document(page_content=page_content, metadata=metadata)) - embeddings = get_embedding_model() + + # If FAISS is not available, provide a lightweight in-memory placeholder + if FAISS is None: + class _InMemoryVS: + def __init__(self, docs): + self._docs = docs + + def similarity_search(self, query, k=5): + # naive: return first k documents + return [Document(page_content=d.get("text", ""), metadata={k: v for k, v in d.items()}) for d in self._docs[:k]] + + return _InMemoryVS(corpus) + vs = FAISS.from_documents(documents, embedding=embeddings) return vs -def get_vectorstore(jsonl_path: Optional[str] = None) -> FAISS: +def get_vectorstore(jsonl_path: Optional[str] = None) -> Any: """ Get (or lazily build) the FAISS vector store. """ diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 8b3836ce..bb36a329 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -3,7 +3,10 @@ def pytest_configure(): - # Ensure the `backend` package directory is on sys.path so tests can import `app`. - base = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) - if base not in sys.path: - sys.path.insert(0, base) + # Ensure the repository root (parent of `backend`) is on sys.path so tests + # can import packages like `backend.services` or `app` reliably. Previously + # we inserted the `backend` directory which made `import backend...` fail + # (Python would look for backend/backend). Add the repo root instead. + repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + if repo_root not in sys.path: + sys.path.insert(0, repo_root) diff --git a/backend/tests/test_assignment_agent.py b/backend/tests/test_assignment_agent.py new file mode 100644 index 00000000..be9f03f5 --- /dev/null +++ b/backend/tests/test_assignment_agent.py @@ -0,0 +1,17 @@ +from backend.agents.assignment_agent import ABAssignmentAgent + + +def test_assignment_deterministic(): + agent = ABAssignmentAgent(split_ratio={"A": 0.5, "B": 0.5}, seed="test-seed") + a1 = agent.assign_user("user123", "exp_x") + a2 = agent.assign_user("user123", "exp_x") + assert a1["variant_id"] == a2["variant_id"] + assert a1["experiment_id"] == "exp_x" + assert 0.0 <= a1["hash_value"] < 1.0 + + +def test_validate_assignment(): + agent = ABAssignmentAgent(split_ratio={"A": 0.7, "B": 0.3}) + assignment = agent.assign_user("u1", "exp1") + ok, err = agent.validate_assignment(assignment) + assert ok and err is None diff --git a/backend/tests/test_audit_log.py b/backend/tests/test_audit_log.py new file mode 100644 index 00000000..d343c860 --- /dev/null +++ b/backend/tests/test_audit_log.py @@ -0,0 +1,72 @@ +# backend/tests/test_audit_log.py + +from app.store import audit_log +from app.store import store + + +def test_log_action_creates_and_appends_entries(): + review_id = "review_test_1" + + # Make sure we start clean for this key + store.set(f"hitl:log:{review_id}", None) + + # Log first action + audit_log.log_action( + review_id=review_id, + action="TTS_PLAY", + user_id="selvi", + metadata={"variant_id": "A"}, + ) + + # Log second action + audit_log.log_action( + review_id=review_id, + action="TRANSLATE", + user_id="selvi", + metadata={"target_lang": "es"}, + ) + + logs = audit_log.get_logs(review_id) + + assert len(logs) == 2 + + first, second = logs[0], logs[1] + + # First entry + assert first["review_id"] == review_id + assert first["user_id"] == "selvi" + assert first["action"] == "TTS_PLAY" + assert first["metadata"]["variant_id"] == "A" + assert "timestamp" in first + + # Second entry + assert second["review_id"] == review_id + assert second["user_id"] == "selvi" + assert second["action"] == "TRANSLATE" + assert second["metadata"]["target_lang"] == "es" + assert "timestamp" in second + + +def test_get_logs_returns_empty_list_when_no_logs(): + review_id = "review_no_logs" + + # Ensure nothing stored for this key + store.set(f"hitl:log:{review_id}", None) + + logs = audit_log.get_logs(review_id) + + assert isinstance(logs, list) + assert logs == [] + + +def test_log_action_skips_when_review_id_is_none(): + # If review_id is None, we currently no-op. + audit_log.log_action( + review_id=None, + action="TTS_PLAY", + user_id="someone", + metadata={"foo": "bar"}, + ) + + # There is no specific key to check; we just assert that it doesn't crash. + # If you want stricter behavior later, you can change log_action and update this test. diff --git a/backend/tests/test_debug_previews.py b/backend/tests/test_debug_previews.py new file mode 100644 index 00000000..2a9025f8 --- /dev/null +++ b/backend/tests/test_debug_previews.py @@ -0,0 +1,159 @@ +import os + +# Enable debug router before importing app so the router is mounted +os.environ.setdefault("ECHO_DEBUG", "1") + +from fastapi.testclient import TestClient +from app.main import app +from app.routers.orchestrator import get_orchestrator + + +class MockOrchestrator: + async def run_flow(self, flow_name, payload): + # deterministic results for tests based on id + if payload.get("id") == "U001": + return { + "analysis": {"winner": {"variant_id": "A"}}, + "safety": {"safe": [{"id": "A", "subject": "S A", "body": "B A"}]}, + } + if payload.get("id") == "U002": + return { + "analysis": {"winner": None}, + "safety": {"safe": [{"id": "B", "subject": "S B", "body": "B B"}]}, + } + return {"analysis": {}, "safety": {"safe": []}} + + +def get_mock_orchestrator(): + return MockOrchestrator() + + +def test_debug_previews(monkeypatch): + # Override the orchestrator dependency with our mock + app.dependency_overrides[get_orchestrator] = lambda: get_mock_orchestrator() + client = TestClient(app) + + r = client.get("/debug/deliveries") + assert r.status_code == 200 + data = r.json() + assert "previews" in data + # check U001 used winner A + assert any(p["user_id"] == "U001" and p["subject"] == "S A" for p in data["previews"]) + # check U002 used fallback to first safe variant + assert any(p["user_id"] == "U002" and p["subject"] == "S B" for p in data["previews"]) +# cleanup dependency overrides + app.dependency_overrides.clear() + +def test_debug_previews_mock(): + # ensure mock=true returns precomputed previews without depending on orchestration + client = TestClient(app) + r = client.get("/debug/deliveries?mock=true") + assert r.status_code == 200 + data = r.json() + assert "previews" in data + # mock fixtures include U001 and U002 with subjects matching PRECOMPUTED_PREVIEWS + assert any(p["user_id"] == "U001" and "running shoes" in (p.get("subject") or "") for p in data["previews"]) + assert any(p["user_id"] == "U002" and "Acme plan" in (p.get("subject") or "") for p in data["previews"]) + + +def test_body_text_alias(monkeypatch): + """Ensure each preview includes `body_text` equal to `body`.""" + app.dependency_overrides[get_orchestrator] = lambda: get_mock_orchestrator() + client = TestClient(app) + + r = client.get("/debug/deliveries") + assert r.status_code == 200 + data = r.json() + assert "previews" in data + for p in data["previews"]: + # alias should be present and match body (both may be None) + assert "body_text" in p + assert p["body_text"] == p.get("body") + app.dependency_overrides.clear() + +def test_debug_previews_cache(monkeypatch): + """When ECHO_DEBUG_CACHE_TTL is set, subsequent requests within TTL should not re-run orchestrator.""" + # small TTL for test + os.environ["ECHO_DEBUG_CACHE_TTL"] = "60" + + class CounterOrchestrator: + def __init__(self): + self.count = 0 + + async def run_flow(self, flow_name, payload): + self.count += 1 + # return a simple deterministic safe variant + return { + "analysis": {"winner": {"variant_id": "A"}}, + "safety": {"safe": [{"id": "A", "subject": "S A", "body": "B A"}]}, + } + + counter = CounterOrchestrator() + app.dependency_overrides[get_orchestrator] = lambda: counter + client = TestClient(app) + + r1 = client.get("/debug/deliveries") + assert r1.status_code == 200 + # after first request, orchestrator should have been invoked (once per customer) + # our mock increments once per run_flow call; there are two mock customers => count == 2 + assert counter.count == 2 + + # Second request within TTL should use cache and not increment counter + r2 = client.get("/debug/deliveries") + assert r2.status_code == 200 + assert counter.count == 2 + + # cleanup env + os.environ.pop("ECHO_DEBUG_CACHE_TTL", None) + app.dependency_overrides.clear() + + +def test_debug_run_full_pipeline(): + """Test POST /debug/run returns full orchestrator result (MessageState).""" + class FullMockOrchestrator: + async def run_flow(self, flow_name, payload): + # Return complete MessageState structure + return { + "segment": {"category": "high_value"}, + "citations": ["citation1", "citation2"], + "variants": [ + {"id": "V1", "subject": "Subject 1", "body": "Body 1"}, + {"id": "V2", "subject": "Subject 2", "body": "Body 2"}, + ], + "safety": { + "safe": [{"id": "V1", "subject": "Subject 1", "body": "Body 1"}], + "blocked": [{"id": "V2", "reason": "policy violation"}], + }, + "analysis": {"winner": {"variant_id": "V1"}}, + "delivery": {"status": "sent", "message_id": "msg123"}, + } + + app.dependency_overrides[get_orchestrator] = lambda: FullMockOrchestrator() + client = TestClient(app) + + payload = { + "customer": { + "id": "U999", + "name": "Test User", + "email": "test@example.com", + } + } + + r = client.post("/debug/run", json=payload) + assert r.status_code == 200 + data = r.json() + + # Verify all expected keys are present in full MessageState + assert "segment" in data + assert "citations" in data + assert "variants" in data + assert "safety" in data + assert "analysis" in data + assert "delivery" in data + + # Verify some structure + assert data["segment"]["category"] == "high_value" + assert len(data["variants"]) == 2 + assert data["analysis"]["winner"]["variant_id"] == "V1" + + app.dependency_overrides.clear() diff --git a/backend/tests/test_hitl_review_store.py b/backend/tests/test_hitl_review_store.py new file mode 100644 index 00000000..a9bda05b --- /dev/null +++ b/backend/tests/test_hitl_review_store.py @@ -0,0 +1,50 @@ +# backend/tests/test_hitl_review_store.py + +from app.nodes.hitl_node import HITLNode +from app.store import reviews as review_store + + +def test_hitl_node_persists_review_with_variants(): + node = HITLNode() + + customer = {"id": "cust_123", "email": "test@example.com"} + variants = [ + {"id": "A", "text": "Hello A"}, + {"id": "B", "text": "Hello B"}, + ] + + result = node.run(customer, variants) + + # Check lightweight payload + assert result["status"] == "pending_human_approval" + assert result["num_variants"] == 2 + assert result["review_id"] is not None + + review_id = result["review_id"] + + # Check persisted review in store + stored = review_store.get_review(review_id) + assert stored is not None + assert stored["review_id"] == review_id + assert stored["status"] == "pending_human_approval" + assert stored["customer"]["id"] == "cust_123" + assert stored["customer"]["email"] == "test@example.com" + assert len(stored["variants"]) == 2 + assert stored["approved_variant_id"] is None + assert stored["notes"] is None + assert "created_at" in stored + assert "updated_at" in stored + + +def test_hitl_node_no_variants_does_not_create_review(): + node = HITLNode() + + customer = {"id": "cust_empty", "email": "empty@example.com"} + variants = [] # no safe variants + + result = node.run(customer, variants) + + # No review created in this case + assert result["review_id"] is None + assert result["status"] == "no_variants" + assert result["num_variants"] == 0 diff --git a/backend/tests/test_hitl_router.py b/backend/tests/test_hitl_router.py new file mode 100644 index 00000000..b95710e4 --- /dev/null +++ b/backend/tests/test_hitl_router.py @@ -0,0 +1,61 @@ +# backend/tests/test_hitl_router.py + +from fastapi.testclient import TestClient + +from app.main import app +from app.store import reviews as review_store + +client = TestClient(app) + + +def _create_sample_review(review_id: str = "review_test_1"): + customer = {"id": "cust_123", "email": "test@example.com"} + variants = [ + {"id": "A", "text": "Hello A"}, + {"id": "B", "text": "Hello B"}, + ] + review_store.create_review(review_id, customer, variants) + return review_id + + +def test_get_hitl_review_success(): + review_id = _create_sample_review("review_get_ok") + + resp = client.get(f"/hitl/{review_id}") + assert resp.status_code == 200 + + data = resp.json() + assert data["review_id"] == review_id + assert data["status"] == "pending_human_approval" + assert data["customer"]["id"] == "cust_123" + assert len(data["variants"]) == 2 + + +def test_get_hitl_review_not_found(): + resp = client.get("/hitl/nonexistent_review") + assert resp.status_code == 404 + assert resp.json()["detail"] == "Review not found" + + +def test_submit_hitl_decision_updates_review(): + review_id = _create_sample_review("review_decision_ok") + + payload = { + "approved_variant_id": "A", + "notes": "Looks good to send", + } + + resp = client.post(f"/hitl/{review_id}/decision", json=payload) + assert resp.status_code == 200 + + data = resp.json() + assert data["review_id"] == review_id + assert data["status"] == "approved" + assert data["approved_variant_id"] == "A" + assert data["notes"] == "Looks good to send" + + # Double-check store state + stored = review_store.get_review(review_id) + assert stored["status"] == "approved" + assert stored["approved_variant_id"] == "A" + assert stored["notes"] == "Looks good to send" diff --git a/backend/tests/test_langsmith_monitor.py b/backend/tests/test_langsmith_monitor.py new file mode 100644 index 00000000..caaee6e5 --- /dev/null +++ b/backend/tests/test_langsmith_monitor.py @@ -0,0 +1,83 @@ +import os +import importlib +import json +from pathlib import Path + + +def _runs_dir(): + base = Path(__file__).resolve().parents[1] + return base / ".langsmith_local_runs" + + +def test_monitor_noop_by_default(): + # Ensure env vars are unset + os.environ.pop("LANGSMITH_ENABLED", None) + os.environ.pop("LANGSMITH_API_KEY", None) + + import services.langsmith_monitor as monitor + importlib.reload(monitor) + + assert monitor.LANGSMITH_ENABLED is False + + +def test_monitor_writes_local_file_when_enabled(tmp_path): + # Enable monitoring via env var + os.environ["LANGSMITH_ENABLED"] = "1" + os.environ.pop("LANGSMITH_API_KEY", None) + + import services.langsmith_monitor as monitor + importlib.reload(monitor) + + assert monitor.LANGSMITH_ENABLED is True + + runs_dir = _runs_dir() + # Clean up any existing files + if runs_dir.exists(): + for f in runs_dir.glob("*.json"): + f.unlink() + + run_id = monitor.start_run("test.run", {"x": 1}) + # A file should be created for the run + path = runs_dir / f"{run_id}.json" + assert path.exists() + + # Log an event and finish + monitor.log_event(run_id, "ev", {"k": "v"}) + monitor.finish_run(run_id, status="success", outputs={"ok": True}) + + # Load and inspect + with path.open("r", encoding="utf-8") as f: + rec = json.load(f) + + assert rec.get("id") == run_id + assert any(e.get("name") == "ev" for e in rec.get("events", [])) + assert rec.get("status") == "success" + + # cleanup + path.unlink() + os.environ.pop("LANGSMITH_ENABLED", None) + + +def test_segmenter_integration_creates_run(tmp_path): + # Enable monitoring + os.environ["LANGSMITH_ENABLED"] = "1" + + # Ensure clean runs dir + runs_dir = _runs_dir() + if runs_dir.exists(): + for f in runs_dir.glob("*.json"): + f.unlink() + + # Run the segmenter which should create a run file + from agents.segmenter import segment_user + + seg = segment_user({"user_id": "U123", "viewed_page": "payment_plans"}) + assert "segment" in seg + + files = list(runs_dir.glob("*.json")) + assert files, "Expected at least one run file from segmenter" + + # cleanup + for f in files: + f.unlink() + os.environ.pop("LANGSMITH_ENABLED", None) diff --git a/backend/tests/test_langsmith_monitor_hashing.py b/backend/tests/test_langsmith_monitor_hashing.py new file mode 100644 index 00000000..286f1b02 --- /dev/null +++ b/backend/tests/test_langsmith_monitor_hashing.py @@ -0,0 +1,53 @@ +import importlib +import json +import sys +from pathlib import Path + + +def _reload_monitor(monkeypatch): + # Ensure the monitor module picks up environment changes during tests + monkeypatch.setenv("LANGSMITH_ENABLED", "1") + # Remove module if already loaded so import picks up env changes + if "backend.services.langsmith_monitor" in sys.modules: + importlib.reload(sys.modules["backend.services.langsmith_monitor"]) + else: + importlib.import_module("backend.services.langsmith_monitor") + return sys.modules["backend.services.langsmith_monitor"] + + +def test_pseudonymization_with_secret(monkeypatch, tmp_path): + monkeypatch.setenv("LANGSMITH_HMAC_SECRET", "test-secret-123") + monitor = _reload_monitor(monkeypatch) + + run_id = monitor.start_run("segmenter.segment_user", metadata={ + "customer_id": "cust_002", + "last_event": "payment_plans", + }) + + path = Path(monitor.RUNS_DIR) / f"{run_id}.json" + assert path.exists(), "run file should be written" + data = json.loads(path.read_text(encoding="utf-8")) + + # raw id must not be present + assert "customer_id" not in data.get("metadata", {}), "raw customer_id must not be stored" + assert "customer_id_hash" in data.get("metadata", {}), "hashed id must be present" + ps = data["metadata"].get("pseudonymization", {}) + assert ps.get("method") == "hmac-sha256" + assert ps.get("secret_present") is True + + +def test_pseudonymization_without_secret(monkeypatch): + # Ensure no secret is set + monkeypatch.delenv("LANGSMITH_HMAC_SECRET", raising=False) + monitor = _reload_monitor(monkeypatch) + + run_id = monitor.start_run("segmenter.segment_user", metadata={"customer_id": "cust_003"}) + path = Path(monitor.RUNS_DIR) / f"{run_id}.json" + assert path.exists() + data = json.loads(path.read_text(encoding="utf-8")) + + assert "customer_id" not in data.get("metadata", {}) + assert "customer_id_hash" in data.get("metadata", {}) + ps = data["metadata"].get("pseudonymization", {}) + assert ps.get("method") == "sha256" + assert ps.get("secret_present") is False