From df8b7397a0fd6a2c4e36a861efb22ba24087a671 Mon Sep 17 00:00:00 2001 From: Arulselvi Amirrthalingam Date: Thu, 20 Nov 2025 22:33:26 -0800 Subject: [PATCH 01/23] Add HITL node and LangGraph flow integration --- backend/app/graph/langgraph_flow.py | 226 ++++++++++++++++++++++++++++ backend/app/graph/orchestrator.py | 27 +++- backend/app/nodes/hitl_node.py | 89 +++++++++++ backend/app/routers/orchestrator.py | 6 + 4 files changed, 345 insertions(+), 3 deletions(-) create mode 100644 backend/app/graph/langgraph_flow.py create mode 100644 backend/app/nodes/hitl_node.py diff --git a/backend/app/graph/langgraph_flow.py b/backend/app/graph/langgraph_flow.py new file mode 100644 index 00000000..942cebf9 --- /dev/null +++ b/backend/app/graph/langgraph_flow.py @@ -0,0 +1,226 @@ +from typing import TypedDict, Dict, Any, List + +from langgraph.graph import StateGraph, END + +from app.nodes.segmenter_node import SegmenterNode +from app.nodes.retriever_node import RetrieverNode +from app.nodes.generator_node import GeneratorNode +from app.nodes.safety_node import SafetyNode +from app.nodes.analytics_node import AnalyticsNode +from app.nodes.hitl_node import HITLNode +from services.delivery import send_email_mock + + +class FlowState(TypedDict, total=False): + """ + Shared state that flows between nodes in the LangGraph graph. + """ + customer: Dict[str, Any] + segment: Dict[str, Any] + citations: List[Dict[str, Any]] + variants: List[Dict[str, Any]] + safety: Dict[str, Any] + hitl: Dict[str, Any] + analysis: Dict[str, Any] + delivery: Dict[str, Any] + + +# ----------------------- +# Node wrapper functions +# ----------------------- + + +def segmenter_node(state: FlowState) -> FlowState: + customer = state["customer"] + properties = customer.get("properties") or {} + + segmenter_customer = { + "user_id": customer.get("id"), + "email": customer.get("email"), + "viewed_page": customer.get("last_event"), + "form_started": properties.get("form_started"), + "scheduled": properties.get("scheduled"), + "attended": properties.get("attended"), + } + + segment = SegmenterNode().run(segmenter_customer) + state["segment"] = segment + return state + + +def retriever_node(state: FlowState) -> FlowState: + """ + Retrieve citations / knowledge snippets for the given customer/segment. + """ + citations = RetrieverNode().run(state["customer"]) + state["citations"] = citations + return state + + +def generator_node(state: FlowState) -> FlowState: + """ + Generate message variants using customer, segment, and citations. + """ + variants = GeneratorNode().run( + { + "customer": state["customer"], + "segment": state["segment"], + "citations": state["citations"], + } + ) + state["variants"] = variants + return state + + +def safety_node(state: FlowState) -> FlowState: + """ + Run the safety gate on generated variants. + Returns {"safe": [...], "blocked": [...]}. + """ + safety_result = SafetyNode().run(state["variants"]) + state["safety"] = safety_result + return state + + +def hitl_node(state: FlowState) -> FlowState: + """ + Human-in-the-loop (HITL) node. + + Takes the safe variants from the safety gate and enqueues them + for human review. For now, we still continue the flow (analytics + + delivery), but in a stricter RAI mode you could stop here and + wait for human approval. + """ + safety_result = state.get("safety") or {} + safe_variants: List[Dict[str, Any]] = safety_result.get("safe", []) + + hitl_result = HITLNode().run(state["customer"], safe_variants) + # Example hitl_result: {"review_id": "...", "status": "pending_human_approval"} + state["hitl"] = hitl_result + return state + + +def analytics_node(state: FlowState) -> FlowState: + """ + Analytics node. + + Scores safe variants (e.g., mock CTRs) and chooses a winner. + """ + safety_result = state.get("safety") or {} + safe_variants: List[Dict[str, Any]] = safety_result.get("safe", []) + + if not safe_variants: + state["analysis"] = {"results": [], "winner": None} + return state + + analysis = AnalyticsNode().run( + { + "variants": safe_variants, + "customer": state["customer"], + } + ) + state["analysis"] = analysis + return state + + +def delivery_node(state: FlowState) -> FlowState: + """ + Delivery node. + + Looks at the analytics winner and (for now) sends the corresponding + variant via the mock email delivery service. + """ + analysis = state.get("analysis") or {} + winner = analysis.get("winner") + safety_result = state.get("safety") or {} + safe_variants: List[Dict[str, Any]] = safety_result.get("safe", []) + + if not winner or not safe_variants: + state["delivery"] = None + return state + + # Find the winning variant + winner_id = winner.get("variant_id") + variant = next( + (v for v in safe_variants if v.get("id") == winner_id), + None, + ) + + if not variant: + state["delivery"] = None + return state + + # Use the mock delivery service + email = state["customer"].get("email") + subject = variant.get("subject") + body = variant.get("body") + + delivery_result = send_email_mock(email, subject, body) + state["delivery"] = delivery_result + return state + + +# ----------------------- +# Graph builder +# ----------------------- + + +def build_graph(): + """ + Build and compile the LangGraph StateGraph that represents + the full mini-agent pipeline: + + segmenter -> retriever -> generator -> safety + -> hitl -> analytics -> delivery -> END + """ + graph = StateGraph(FlowState) + + # Register nodes + graph.add_node("segmenter", segmenter_node) + graph.add_node("retriever", retriever_node) + graph.add_node("generator", generator_node) + graph.add_node("safety", safety_node) + graph.add_node("hitl", hitl_node) + graph.add_node("analytics", analytics_node) + graph.add_node("delivery", delivery_node) + + # Wire edges + graph.set_entry_point("segmenter") + graph.add_edge("segmenter", "retriever") + graph.add_edge("retriever", "generator") + graph.add_edge("generator", "safety") + graph.add_edge("safety", "hitl") + graph.add_edge("hitl", "analytics") + graph.add_edge("analytics", "delivery") + graph.add_edge("delivery", END) + + return graph.compile() +if __name__ == "__main__": + """ + Manual smoke test for the LangGraph flow. + + Run with: + cd backend + python -m app.graph.langgraph_flow + """ + import json + + graph = build_graph() + + test_customer = { + "id": "U_TEST", + "email": "test@example.com", + "last_event": "payment_plans", + "properties": { + "form_started": "yes", + "scheduled": "no", + "attended": "no", + }, + } + + initial_state = {"customer": test_customer} + final_state = graph.invoke(initial_state) + + print("=== LangGraph flow final state ===") + print(json.dumps(final_state, indent=2)) + diff --git a/backend/app/graph/orchestrator.py b/backend/app/graph/orchestrator.py index 921f1892..b3c83124 100644 --- a/backend/app/graph/orchestrator.py +++ b/backend/app/graph/orchestrator.py @@ -12,6 +12,7 @@ from app.nodes.retriever_node import RetrieverNode from app.nodes.generator_node import GeneratorNode from app.nodes.safety_node import SafetyNode +from app.nodes.hitl_node import HITLNode from app.nodes.analytics_node import AnalyticsNode from services.delivery import send_email_mock @@ -36,6 +37,7 @@ def __init__( retriever: Optional[RetrieverNode] = None, generator: Optional[GeneratorNode] = None, safety: Optional[SafetyNode] = None, + hitl: Optional[HITLNode] = None, analytics: Optional[AnalyticsNode] = None, ) -> None: # prefer an explicitly provided store, otherwise fall back to the module-level `store` @@ -46,6 +48,7 @@ def __init__( self.retriever = retriever or RetrieverNode() self.generator = generator or GeneratorNode() self.safety = safety or SafetyNode() + self.hitl = hitl or HITLNode() self.analytics = analytics or AnalyticsNode() def close(self) -> None: @@ -54,7 +57,7 @@ def close(self) -> None: This is a best-effort hook. If nodes expose a `close` or `shutdown` method it will be called. """ - for node in (self.segmenter, self.retriever, self.generator, self.safety, self.analytics): + for node in (self.segmenter, self.retriever, self.generator, self.safety,self.hitl, self.analytics): try: close_fn = getattr(node, "close", None) or getattr(node, "shutdown", None) if callable(close_fn): @@ -122,8 +125,24 @@ async def run_flow(self, flow_name: str, payload: Dict[str, Any]) -> Dict[str, A safe_count = len(safety_result.get("safe", [])) if isinstance(safety_result, dict) else 0 blocked_count = len(safety_result.get("blocked", [])) if isinstance(safety_result, dict) else 0 self.logger.info(f"Safety safe={safe_count} blocked={blocked_count}") + + + # 5. Human-in-the-loop (HITL) + hitl_result = None + safe_variants = ( + safety_result.get("safe", []) if isinstance(safety_result, dict) else [] + ) + if safe_variants: + # HITLNode should accept (customer, safe_variants) + hitl_result = self.hitl.run(payload, safe_variants) + # Example hitl_result: {"review_id": "...", "status": "pending_human_approval"} + self.logger.info(f"HITL: {hitl_result}") + try: + self.store.set(f"{key}:hitl", hitl_result) + except Exception: + self.logger.exception("failed to persist hitl result") - # 5. Analytics / choose winner + # 6. Analytics / choose winner analysis = self.analytics.run({"variants": safety_result.get("safe", []), "customer": payload}) if isinstance(safety_result, dict) else None winner = analysis.get("winner") if isinstance(analysis, dict) and analysis else None try: @@ -133,18 +152,20 @@ async def run_flow(self, flow_name: str, payload: Dict[str, Any]) -> Dict[str, A except Exception: self.logger.exception("failed to persist analysis/winner") - # 6. Delivery (mock) + # 7. Delivery (mock) delivery_result = None if winner and isinstance(safety_result, dict): variant = next((v for v in safety_result.get("safe", []) if v.get("id") == winner.get("variant_id")), None) if variant: delivery_result = send_email_mock(payload.get("email"), variant.get("subject"), variant.get("body")) + response = { "segment": segment, "citations": citations, "variants": variants, "safety": safety_result, + "hitl": hitl_result, "analysis": analysis, "delivery": delivery_result, } diff --git a/backend/app/nodes/hitl_node.py b/backend/app/nodes/hitl_node.py new file mode 100644 index 00000000..1285c31f --- /dev/null +++ b/backend/app/nodes/hitl_node.py @@ -0,0 +1,89 @@ +# backend/app/nodes/hitl_node.py + +from __future__ import annotations + +from typing import Any, Dict, List, Optional +from uuid import uuid4 + +from services.logger import get_logger + + +class HITLNode: + """ + Human-in-the-loop (HITL) review node. + + Responsibility: + - Take the *safe* variants (already passed the safety gate). + - Create a lightweight "review job" object with a review_id. + - Return metadata that the rest of the system can use to: + * show pending reviews in a UI + * later record an "approved" or "rejected" decision + + NOTE: + - This node does NOT block on a human decision. + - It just prepares the review metadata and logs it. + - In a future step, you can persist this in a database/Redis/queue + and build API endpoints like: + - GET /hitl/{review_id} + - POST /hitl/decide (approve/reject) + """ + + def __init__(self, logger: Optional[Any] = None) -> None: + self.logger = logger or get_logger("node.hitl") + + def run( + self, + customer: Dict[str, Any], + variants: List[Dict[str, Any]], + ) -> Dict[str, Any]: + """ + Enqueue variants for human review. + + Args: + customer: dict with customer info (id, email, etc.) + variants: list of *safe* variants from the safety gate. + + Returns: + A dict like: + { + "review_id": "review_...", + "status": "pending_human_approval", + "customer_id": "...", + "email": "...", + "num_variants": 2, + } + """ + # If there are no variants, there is nothing to review. + if not variants: + result = { + "review_id": None, + "status": "no_variants", + "customer_id": customer.get("id"), + "email": customer.get("email"), + "num_variants": 0, + } + self.logger.info("HITLNode: no variants to review: %s", result) + return result + + # Generate a unique review_id that a UI or API can later use + review_id = f"review_{uuid4().hex}" + + hitl_payload = { + "review_id": review_id, + "status": "pending_human_approval", + "customer_id": customer.get("id"), + "email": customer.get("email"), + "num_variants": len(variants), + } + + # In a real system, you might persist (customer + variants) + # keyed by review_id in a DB or Redis here. + # For now, we only log it so you can see it in the logs. + self.logger.info( + "HITLNode: created review job %s for customer %s with %d variants", + review_id, + customer.get("id") or customer.get("email"), + len(variants), + ) + + return hitl_payload diff --git a/backend/app/routers/orchestrator.py b/backend/app/routers/orchestrator.py index 36f6d1d8..41c57b5c 100644 --- a/backend/app/routers/orchestrator.py +++ b/backend/app/routers/orchestrator.py @@ -10,6 +10,7 @@ from ..nodes.retriever_node import RetrieverNode from ..nodes.generator_node import GeneratorNode from ..nodes.safety_node import SafetyNode +from ..nodes.hitl_node import HITLNode from ..nodes.analytics_node import AnalyticsNode router = APIRouter() @@ -36,6 +37,9 @@ def get_generator() -> GeneratorNode: def get_safety() -> SafetyNode: return SafetyNode() +def get_hitl() -> HITLNode: + return HITLNode() + def get_analytics() -> AnalyticsNode: return AnalyticsNode() @@ -47,6 +51,7 @@ async def get_orchestrator( retriever: RetrieverNode = Depends(get_retriever), generator: GeneratorNode = Depends(get_generator), safety: SafetyNode = Depends(get_safety), + hitl: HITLNode = Depends(get_hitl), analytics: AnalyticsNode = Depends(get_analytics), ) -> AsyncGenerator[Orchestrator, None]: """FastAPI dependency returning a per-request Orchestrator instance. @@ -61,6 +66,7 @@ async def get_orchestrator( retriever=retriever, generator=generator, safety=safety, + hitl=hitl, analytics=analytics, ) try: From ad6b5d41271c688d798499b140fea8f424e10c74 Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sat, 22 Nov 2025 17:32:05 +1100 Subject: [PATCH 02/23] feat(langsmith): add opt-in langsmith_monitor, instrument generator & retriever, add lightweight swagger runner --- backend/README_LANGSMITH.md | 57 ++++++++++++ backend/README_SWAGGER.md | 47 ++++++++++ backend/agents/generator.py | 29 +++++- backend/agents/retriever.py | 42 ++++++--- backend/app/main.py | 2 +- backend/run_swagger_light.py | 54 +++++++++++ backend/services/langsmith_monitor.py | 129 ++++++++++++++++++++++++++ 7 files changed, 342 insertions(+), 18 deletions(-) create mode 100644 backend/README_LANGSMITH.md create mode 100644 backend/README_SWAGGER.md create mode 100644 backend/run_swagger_light.py create mode 100644 backend/services/langsmith_monitor.py diff --git a/backend/README_LANGSMITH.md b/backend/README_LANGSMITH.md new file mode 100644 index 00000000..9e71b95b --- /dev/null +++ b/backend/README_LANGSMITH.md @@ -0,0 +1,57 @@ +# LangSmith monitor (opt-in) + +This folder contains a lightweight, opt-in LangSmith monitoring wrapper and example instrumentation for generator and retriever agents. + +How it works +- The wrapper is `backend/services/langsmith_monitor.py`. +- By default the wrapper is disabled and is a no-op. To enable set one of: + - `LANGSMITH_API_KEY` (preferred) + - `LANGSMITH_ENABLED=1` (for local testing; writes local JSON files) +- When enabled and the `langsmith` SDK is installed, the wrapper can be extended to forward runs to LangSmith. + +Local testing +- Without enabling Langsmith, instrumentation will not affect runtime. +- If you want to inspect local runs instead of sending to LangSmith: + ```bash + export LANGSMITH_ENABLED=1 + # run your agent (from repo root): + cd backend + ./venv/bin/python -c "from services import langsmith_monitor; print(langsmith_monitor.LANGSMITH_ENABLED)" + # local run files are written to backend/.langsmith_local_runs/ + ``` + +Next steps +- Optionally wire the wrapper to the real `langsmith` SDK when team is ready. +- Decide a naming convention for run names and metadata, and extend the wrapper to include team-specific fields. + +Notes +- The wrapper is intentionally minimal to avoid adding runtime risks. It writes local JSON files when enabled and the SDK is not installed. +LangSmith integration (opt-in) +================================= + +This folder contains a lightweight, opt-in wrapper to record agent runs locally +or forward to LangSmith when enabled. + +How to enable (local testing) +- By default the monitor is disabled. To enable local JSON recording set: + +```bash +export LANGSMITH_ENABLED=1 +``` + +This will create run files under `backend/.langsmith_local_runs/` for each +instrumented agent run. + +How to enable real LangSmith (team) +- Install the `langsmith` package into your Python environment. +- Set an API key: + +```bash +export LANGSMITH_API_KEY=sk_...your_key... +``` + +Notes +- The wrapper is intentionally minimal and no-op by default to avoid + introducing runtime behavior changes. Once enabled, it records run start, + events, and finish status. The team can later replace or extend the wrapper + to call the official LangSmith SDK or to normalize metadata. diff --git a/backend/README_SWAGGER.md b/backend/README_SWAGGER.md new file mode 100644 index 00000000..e43ccb16 --- /dev/null +++ b/backend/README_SWAGGER.md @@ -0,0 +1,47 @@ +Lightweight Swagger runner +========================= + +This file documents how to run a lightweight local FastAPI server that exposes the project's OpenAPI/Swagger UI without installing heavy dependencies (FAISS, LangChain, etc.). + +What it does +- Creates a tiny FastAPI app that includes the health endpoint and a dummy `/orchestrate` route with the same request model shapes used by the real app. This is intended only for local OpenAPI inspection and development of client code. + +Files +- `run_swagger_light.py` — lightweight runner added to the `backend/` folder. + +Run locally (recommended) +1. Change to the `backend/` folder: + +```bash +cd backend +``` + +2. Create a virtual environment and install minimal deps: + +```bash +python3 -m venv .venv +./.venv/bin/python -m pip install --upgrade pip setuptools wheel +./.venv/bin/pip install fastapi uvicorn python-dotenv pydantic +``` + +3. Start the lightweight server (detached): + +```bash +cd backend +nohup ./.venv/bin/python -m uvicorn run_swagger_light:app --app-dir $(pwd) --host 127.0.0.1 --port 8000 > /tmp/evai_uvicorn.log 2>&1 & +echo "server started, logs -> /tmp/evai_uvicorn.log" +``` + +4. Open Swagger UI in your browser: + + http://127.0.0.1:8000/docs + +Or fetch the OpenAPI spec directly: + +```bash +curl http://127.0.0.1:8000/openapi.json | jq . +``` + +Notes and next steps +- The dummy `/orchestrate` endpoint only echoes the parsed payload and does not execute orchestration logic. +- If you want the real endpoints wired up in Swagger you must install the full backend dependencies (see `requirements.txt`) — this will take longer and requires heavy packages such as FAISS. diff --git a/backend/agents/generator.py b/backend/agents/generator.py index f120bb01..6298ef47 100644 --- a/backend/agents/generator.py +++ b/backend/agents/generator.py @@ -40,6 +40,7 @@ from langchain_core.documents import Document # for downstream typing only from langchain_openai import AzureChatOpenAI, ChatOpenAI from langchain_core.messages import SystemMessage, HumanMessage +from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED # IMPORTANT: adjust this import path if your config module lives elsewhere. # You said "app/config.py", so we import from app.config. @@ -395,13 +396,31 @@ def generate_variants( This keeps your pipeline testable locally even without secrets. """ - # 1) Try LLM path - variants = _generate_with_llm(customer, segment, citations) - if variants is not None: + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("generator.generate_variants", {"agent": "generator", "customer_id": customer.get("id"), "use_case": segment.get("use_case")}) + + try: + # 1) Try LLM path + variants = _generate_with_llm(customer, segment, citations) + if variants is not None: + if run_id: + log_event(run_id, "generated_variants", {"count": len(variants)}) + finish_run(run_id, status="success", outputs={"count": len(variants)}) + return variants + + # 2) Fallback path (no LLM or error) + variants = _fallback_template_variants(customer, segment, citations) + if run_id: + log_event(run_id, "fallback_variants", {"count": len(variants)}) + finish_run(run_id, status="success", outputs={"count": len(variants), "generator": "template_fallback"}) return variants - # 2) Fallback path (no LLM or error) - return _fallback_template_variants(customer, segment, citations) + except Exception as e: + if run_id: + log_event(run_id, "error", {"error": str(e)}) + finish_run(run_id, status="error", outputs={"error": str(e)}) + raise # --------------------------------------------------------------------- diff --git a/backend/agents/retriever.py b/backend/agents/retriever.py index 8ae26ba1..63b91f43 100644 --- a/backend/agents/retriever.py +++ b/backend/agents/retriever.py @@ -138,6 +138,12 @@ def retrieve_citations( "source": "...", } """ + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("retriever.retrieve_citations", {"agent": "retriever", "top_k": top_k}) + query = build_query_from_segment(segment_result) path = jsonl_path or str(DEFAULT_JSONL_PATH) @@ -149,18 +155,30 @@ def retrieve_citations( original_text = doc.page_content or "" redacted = redact_pii(original_text) - citations.append( - { - "id": meta.get("id"), - "title": meta.get("title"), - "section": meta.get("section"), - "text": original_text, - "redacted_text": redacted, - "url": meta.get("url"), - "published_date": meta.get("published_date"), - "source": meta.get("source", "corpus"), - } - ) + citation = { + "id": meta.get("id"), + "title": meta.get("title"), + "section": meta.get("section"), + "text": original_text, + "redacted_text": redacted, + "url": meta.get("url"), + "published_date": meta.get("published_date"), + "source": meta.get("source", "corpus"), + } + citations.append(citation) + + if run_id: + try: + # Record the query and number of citations + log_event(run_id, "query", {"query": query, "top_k": top_k}) + log_event(run_id, "citations_returned", {"count": len(citations)}) + finish_run(run_id, status="success", outputs={"count": len(citations)}) + except Exception: + # Do not let monitoring errors break retrieval + try: + finish_run(run_id, status="error", outputs={}) + except Exception: + pass return citations diff --git a/backend/app/main.py b/backend/app/main.py index c066cac3..6b21c1e0 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -5,7 +5,7 @@ from typing import Optional, Dict, Any import uvicorn -from services.logger import get_logger +from ..services.logger import get_logger from .routers.health import router as health_router from .routers.orchestrator import router as orchestrator_router diff --git a/backend/run_swagger_light.py b/backend/run_swagger_light.py new file mode 100644 index 00000000..bc5e4f78 --- /dev/null +++ b/backend/run_swagger_light.py @@ -0,0 +1,54 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel, Field +from typing import Optional, Dict, Any +import uvicorn + +try: + # Reuse the project's CORS config when available + from app.config import get_allowed_origins + origins = get_allowed_origins() +except Exception: + origins = ["http://localhost", "http://127.0.0.1:3000"] + +app = FastAPI(title="EchoVoice-AI Orchestrator (light)") + +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@app.get("/health") +async def health_check(): + return {"status": "ok"} + + +# Lightweight Pydantic models mirroring the real API so Swagger shows the schema +class CustomerModel(BaseModel): + id: Optional[str] = Field(None, example="U001") + name: Optional[str] = Field(None, example="Selvi") + email: Optional[str] = Field(None, example="a@example.com") + last_event: Optional[str] = Field(None, example="payment_plans") + properties: Dict[str, Any] = Field(default_factory=dict) + + +class OrchestrateRequest(BaseModel): + customer: CustomerModel + + +@app.post("/orchestrate") +async def orchestrate(payload: OrchestrateRequest): + """Dummy orchestrate endpoint for local Swagger UI. + + This returns the parsed payload and does NOT call any heavy services. + It's intended for local testing of the OpenAPI/Swagger UI only. + """ + # Return a minimal response so the docs show request/response shapes + return {"status": "ok", "received": payload.customer.model_dump()} + + +if __name__ == "__main__": + uvicorn.run("run_swagger_light:app", host="127.0.0.1", port=8000) diff --git a/backend/services/langsmith_monitor.py b/backend/services/langsmith_monitor.py new file mode 100644 index 00000000..9803146e --- /dev/null +++ b/backend/services/langsmith_monitor.py @@ -0,0 +1,129 @@ +"""Opt-in LangSmith monitor wrapper. + +This module provides a tiny, safe API to record agent runs/events. +By default it is a no-op. To enable real recording set the env var +`LANGSMITH_API_KEY` (or `LANGSMITH_ENABLED=1`). If the `langsmith` +SDK is not installed but the wrapper is enabled, it will write +local JSON files under `backend/.langsmith_local_runs/` for inspection. + +The API is intentionally small and synchronous to keep it low-risk. +""" +from __future__ import annotations + +import json +import os +import time +import uuid +from pathlib import Path +from typing import Any, Dict, Optional + +LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY") +LANGSMITH_ENABLED = bool(os.getenv("LANGSMITH_ENABLED") == "1" or LANGSMITH_API_KEY) + +# Try to import the real langsmith SDK if available. We don't require it. +HAS_SDK = False +_client = None +try: + import langsmith + + HAS_SDK = True + # Defer client creation until needed and after validating API key +except Exception: + HAS_SDK = False + + +RUNS_DIR = Path(__file__).resolve().parents[1] / ".langsmith_local_runs" +RUNS_DIR.mkdir(parents=True, exist_ok=True) + + +def _now() -> float: + return time.time() + + +def start_run(name: str, metadata: Optional[Dict[str, Any]] = None) -> str: + """Start a run and return a run_id. + + This is lightweight and synchronous. If disabled, returns a generated id + but does not persist anything. + """ + run_id = str(uuid.uuid4()) + metadata = metadata or {} + record = { + "id": run_id, + "name": name, + "metadata": metadata, + "start_time": _now(), + "events": [], + } + + if not LANGSMITH_ENABLED: + return run_id + + if HAS_SDK and LANGSMITH_API_KEY: + # If the LangSmith client API is available, integrate here. + # We avoid a hard dependency on the SDK; implement direct client + # wiring later if desired. For now, write a local file as well. + pass + + # Write initial record to local file for visibility + path = RUNS_DIR / f"{run_id}.json" + with path.open("w", encoding="utf-8") as f: + json.dump(record, f, ensure_ascii=False, indent=2) + + return run_id + + +def log_event(run_id: str, name: str, payload: Optional[Dict[str, Any]] = None) -> None: + """Log a named event to the run record. No-op if disabled.""" + if not LANGSMITH_ENABLED: + return + + payload = payload or {} + path = RUNS_DIR / f"{run_id}.json" + if not path.exists(): + # Create a minimal record if missing + rec = { + "id": run_id, + "name": None, + "metadata": {}, + "start_time": _now(), + "events": [], + } + else: + with path.open("r", encoding="utf-8") as f: + try: + rec = json.load(f) + except Exception: + rec = {"id": run_id, "events": []} + + rec.setdefault("events", []).append({"time": _now(), "name": name, "payload": payload}) + + with path.open("w", encoding="utf-8") as f: + json.dump(rec, f, ensure_ascii=False, indent=2) + + +def finish_run(run_id: str, status: str = "success", outputs: Optional[Dict[str, Any]] = None) -> None: + """Finish the run by marking end time and optional outputs. No-op if disabled.""" + if not LANGSMITH_ENABLED: + return + + outputs = outputs or {} + path = RUNS_DIR / f"{run_id}.json" + if not path.exists(): + rec = {"id": run_id, "events": []} + else: + with path.open("r", encoding="utf-8") as f: + try: + rec = json.load(f) + except Exception: + rec = {"id": run_id, "events": []} + + rec["end_time"] = _now() + rec["status"] = status + rec["outputs"] = outputs + + with path.open("w", encoding="utf-8") as f: + json.dump(rec, f, ensure_ascii=False, indent=2) + + +__all__ = ["start_run", "log_event", "finish_run", "LANGSMITH_ENABLED"] From 235be7946ee85471c8e2581f957e430e759bb266 Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sat, 22 Nov 2025 17:33:15 +1100 Subject: [PATCH 03/23] feat(langsmith): add opt-in langsmith_monitor, instrument generator & retriever, add lightweight swagger runner and docs --- backend/README_LANGSMITH.md | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/backend/README_LANGSMITH.md b/backend/README_LANGSMITH.md index 9e71b95b..862d0fc0 100644 --- a/backend/README_LANGSMITH.md +++ b/backend/README_LANGSMITH.md @@ -1,3 +1,38 @@ +LangSmith monitoring (opt-in) +================================= + +This project includes a lightweight, opt-in LangSmith instrumentation wrapper at `backend/services/langsmith_monitor.py`. + +Purpose +------- +- Provide safe, non-blocking telemetry hooks for agents (generator, retriever, etc.). +- No-op by default so local dev and CI are unaffected. +- When enabled, the wrapper either forwards to the LangSmith SDK (if installed and configured) or writes local JSON run files under `backend/.langsmith_local_runs/`. + +How to enable +------------- +1. Set the environment variable `LANGSMITH_ENABLED=1` or `LANGSMITH_API_KEY=`. +2. (Optional) Install the LangSmith SDK in your Python environment: `pip install langsmith`. + +Behavior +-------- +- If `LANGSMITH_ENABLED` is not present, the wrapper functions (`start_run`, `log_event`, `finish_run`) are no-ops. +- If enabled but the SDK is not installed, the wrapper writes JSON files to `backend/.langsmith_local_runs/` for inspection. +- Instrumented agents: `backend/agents/generator.py` and `backend/agents/retriever.py` call the wrapper at start/finish/error points. + +Next steps +---------- +1. Review the small changes in `backend/services/langsmith_monitor.py` and the agent instrumentation. +2. Run a smoke test locally (no secrets required): + +```bash +# from repo root +backend/.venv/bin/python -c "import sys; sys.path.insert(0,'backend'); from services.langsmith_monitor import LANGSMITH_ENABLED; print('LANGSMITH_ENABLED=', LANGSMITH_ENABLED)" +``` + +3. To fully integrate with LangSmith UI, set `LANGSMITH_API_KEY` and install the SDK. We can then update `langsmith_monitor.py` to use the SDK client directly. + +4. Coordinate with the team on run naming, metadata shape, and whether to prefer a central tracer vs per-agent instrumentation. # LangSmith monitor (opt-in) This folder contains a lightweight, opt-in LangSmith monitoring wrapper and example instrumentation for generator and retriever agents. From 28034b77be17b7cf47ab60cad41c489e4ad87cc0 Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sat, 22 Nov 2025 18:10:51 +1100 Subject: [PATCH 04/23] feat(langsmith): instrument segmenter, safety_gate, analytics; add opt-in monitor and docs --- backend/agents/analytics.py | 30 +++++++++++++++++++++++++++--- backend/agents/safety_gate.py | 22 ++++++++++++++++++++++ backend/agents/segmenter.py | 25 ++++++++++++++++++++++++- 3 files changed, 73 insertions(+), 4 deletions(-) diff --git a/backend/agents/analytics.py b/backend/agents/analytics.py index 4d196551..bc1c1ec9 100644 --- a/backend/agents/analytics.py +++ b/backend/agents/analytics.py @@ -1,10 +1,34 @@ import random def evaluate_variants(variants: list, customer: dict) -> dict: + # Opt-in instrumentation + try: + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + except Exception: + start_run = log_event = finish_run = lambda *a, **k: None + LANGSMITH_ENABLED = False + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("analytics.evaluate_variants", {"variant_count": len(variants)}) + # Mock evaluation: assign random CTR estimates and pick best results = [] for v in variants: ctr = round(random.uniform(0.02, 0.20), 3) - results.append({'variant_id': v.get('id'), 'ctr': ctr}) - winner = max(results, key=lambda r: r['ctr']) if results else None - return {'results': results, 'winner': winner} + results.append({"variant_id": v.get("id"), "ctr": ctr}) + winner = max(results, key=lambda r: r["ctr"]) if results else None + + out = {"results": results, "winner": winner} + + if run_id: + try: + log_event(run_id, "evaluation_done", {"winner": winner, "count": len(results)}) + finish_run(run_id, status="success", outputs={"winner": winner}) + except Exception: + try: + finish_run(run_id, status="error", outputs={}) + except Exception: + pass + + return out diff --git a/backend/agents/safety_gate.py b/backend/agents/safety_gate.py index 7a841078..47ac2a14 100644 --- a/backend/agents/safety_gate.py +++ b/backend/agents/safety_gate.py @@ -212,6 +212,17 @@ def safety_check_and_filter(variants: List[Dict[str, Any]]) -> Dict[str, Any]: ] } """ + # Opt-in instrumentation + try: + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + except Exception: + start_run = log_event = finish_run = lambda *a, **k: None + LANGSMITH_ENABLED = False + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("safety_gate.safety_check_and_filter", {"variant_count": len(variants)}) + safe: List[Dict[str, Any]] = [] blocked: List[Dict[str, Any]] = [] @@ -240,6 +251,17 @@ def safety_check_and_filter(variants: List[Dict[str, Any]]) -> Dict[str, Any]: result = {"safe": safe, "blocked": blocked} logger.info("safety_check_and_filter: safe=%d blocked=%d", len(safe), len(blocked)) + + if run_id: + try: + log_event(run_id, "safety_result", {"safe": len(safe), "blocked": len(blocked)}) + finish_run(run_id, status="success", outputs={"safe": len(safe), "blocked": len(blocked)}) + except Exception: + try: + finish_run(run_id, status="error", outputs={}) + except Exception: + pass + return result diff --git a/backend/agents/segmenter.py b/backend/agents/segmenter.py index 5087c78c..75992ef3 100644 --- a/backend/agents/segmenter.py +++ b/backend/agents/segmenter.py @@ -25,6 +25,17 @@ def segment_user(customer: dict) -> dict: } """ + # Opt-in instrumentation (no-op when disabled) + try: + from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED + except Exception: + start_run = log_event = finish_run = lambda *a, **k: None + LANGSMITH_ENABLED = False + + run_id = None + if LANGSMITH_ENABLED: + run_id = start_run("segmenter.segment_user", {"user_id": customer.get("user_id"), "email": customer.get("email")}) + viewed_page = (customer.get("viewed_page") or "").strip().lower() form_started = to_bool(customer.get("form_started")) scheduled = to_bool(customer.get("scheduled")) @@ -65,7 +76,7 @@ def segment_user(customer: dict) -> dict: segment_label = f"{use_case}:{funnel_stage}" reasons.insert(0, f"interested in: {use_case_label}") - return { + result = { "segment": segment_label, "use_case": use_case, "use_case_label": use_case_label, @@ -74,6 +85,18 @@ def segment_user(customer: dict) -> dict: "reasons": reasons, } + if run_id: + try: + log_event(run_id, "segment_computed", {"segment": segment_label, "intent_level": intent_level}) + finish_run(run_id, status="success", outputs={"segment": segment_label}) + except Exception: + try: + finish_run(run_id, status="error", outputs={}) + except Exception: + pass + + return result + def load_customers_from_csv(csv_path: str): customers = [] From 7da34aaa2cced6890e31b0ead96d06ceb27095f4 Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sat, 22 Nov 2025 18:15:59 +1100 Subject: [PATCH 05/23] test(langsmith): add tests for opt-in monitor and segmenter instrumentation --- backend/tests/test_langsmith_monitor.py | 83 +++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 backend/tests/test_langsmith_monitor.py diff --git a/backend/tests/test_langsmith_monitor.py b/backend/tests/test_langsmith_monitor.py new file mode 100644 index 00000000..caaee6e5 --- /dev/null +++ b/backend/tests/test_langsmith_monitor.py @@ -0,0 +1,83 @@ +import os +import importlib +import json +from pathlib import Path + + +def _runs_dir(): + base = Path(__file__).resolve().parents[1] + return base / ".langsmith_local_runs" + + +def test_monitor_noop_by_default(): + # Ensure env vars are unset + os.environ.pop("LANGSMITH_ENABLED", None) + os.environ.pop("LANGSMITH_API_KEY", None) + + import services.langsmith_monitor as monitor + importlib.reload(monitor) + + assert monitor.LANGSMITH_ENABLED is False + + +def test_monitor_writes_local_file_when_enabled(tmp_path): + # Enable monitoring via env var + os.environ["LANGSMITH_ENABLED"] = "1" + os.environ.pop("LANGSMITH_API_KEY", None) + + import services.langsmith_monitor as monitor + importlib.reload(monitor) + + assert monitor.LANGSMITH_ENABLED is True + + runs_dir = _runs_dir() + # Clean up any existing files + if runs_dir.exists(): + for f in runs_dir.glob("*.json"): + f.unlink() + + run_id = monitor.start_run("test.run", {"x": 1}) + # A file should be created for the run + path = runs_dir / f"{run_id}.json" + assert path.exists() + + # Log an event and finish + monitor.log_event(run_id, "ev", {"k": "v"}) + monitor.finish_run(run_id, status="success", outputs={"ok": True}) + + # Load and inspect + with path.open("r", encoding="utf-8") as f: + rec = json.load(f) + + assert rec.get("id") == run_id + assert any(e.get("name") == "ev" for e in rec.get("events", [])) + assert rec.get("status") == "success" + + # cleanup + path.unlink() + os.environ.pop("LANGSMITH_ENABLED", None) + + +def test_segmenter_integration_creates_run(tmp_path): + # Enable monitoring + os.environ["LANGSMITH_ENABLED"] = "1" + + # Ensure clean runs dir + runs_dir = _runs_dir() + if runs_dir.exists(): + for f in runs_dir.glob("*.json"): + f.unlink() + + # Run the segmenter which should create a run file + from agents.segmenter import segment_user + + seg = segment_user({"user_id": "U123", "viewed_page": "payment_plans"}) + assert "segment" in seg + + files = list(runs_dir.glob("*.json")) + assert files, "Expected at least one run file from segmenter" + + # cleanup + for f in files: + f.unlink() + os.environ.pop("LANGSMITH_ENABLED", None) From f9bdc3f49afa7aee98bdf248abaef06a80b2c5dd Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sun, 23 Nov 2025 14:27:08 +1100 Subject: [PATCH 06/23] deterministic pseudonymization --- .../117a35da-dc73-4e68-a0e7-5131ff527154.json | 14 ++ .../12eb3799-95ae-4427-ab29-bf236e6ceea1.json | 13 ++ backend/README_LANGSMITH.md | 121 ++++++++++++++++++ backend/agents/generator.py | 1 + backend/services/langsmith_monitor.py | 24 ++++ .../tests/test_langsmith_monitor_hashing.py | 53 ++++++++ 6 files changed, 226 insertions(+) create mode 100644 backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json create mode 100644 backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json create mode 100644 backend/tests/test_langsmith_monitor_hashing.py diff --git a/backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json b/backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json new file mode 100644 index 00000000..2bae78de --- /dev/null +++ b/backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json @@ -0,0 +1,14 @@ +{ + "id": "117a35da-dc73-4e68-a0e7-5131ff527154", + "name": "segmenter.segment_user", + "metadata": { + "last_event": "payment_plans", + "customer_id_hash": "sha256:510d9b2e83b63a0c74e1122c301902f792b44f1f6a9cffa00c3172a6cea1ed18", + "pseudonymization": { + "method": "hmac-sha256", + "secret_present": true + } + }, + "start_time": 1763868061.6918888, + "events": [] +} \ No newline at end of file diff --git a/backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json b/backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json new file mode 100644 index 00000000..e60816a0 --- /dev/null +++ b/backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json @@ -0,0 +1,13 @@ +{ + "id": "12eb3799-95ae-4427-ab29-bf236e6ceea1", + "name": "segmenter.segment_user", + "metadata": { + "customer_id_hash": "sha256:2a5ad226ff7be23dc12f0745cec9e7e12da2458302dc669faebde99ac998315f", + "pseudonymization": { + "method": "sha256", + "secret_present": false + } + }, + "start_time": 1763868061.693179, + "events": [] +} \ No newline at end of file diff --git a/backend/README_LANGSMITH.md b/backend/README_LANGSMITH.md index 862d0fc0..c96b92d2 100644 --- a/backend/README_LANGSMITH.md +++ b/backend/README_LANGSMITH.md @@ -33,6 +33,127 @@ backend/.venv/bin/python -c "import sys; sys.path.insert(0,'backend'); from serv 3. To fully integrate with LangSmith UI, set `LANGSMITH_API_KEY` and install the SDK. We can then update `langsmith_monitor.py` to use the SDK client directly. 4. Coordinate with the team on run naming, metadata shape, and whether to prefer a central tracer vs per-agent instrumentation. + + +Naming & metadata conventions (recommended) +----------------------------------------- +This project recommends the following minimal conventions for recorded runs and events so telemetry is consistent and safe across agents. + +- Run name pattern + - Format: .[:] + - Examples: + - segmenter.segment_user + - retriever.retrieve_citations:payment_plans + - generator.generate_variants:default_personalization + - safety_gate.safety_check_and_filter + - analytics.evaluate_variants + +- Required top-level metadata fields + - run_id: UUID (generated by agent/wrapper) + - run_name: string (matches the Run name pattern) + - agent: string (agent short name, e.g., "segmenter") + - start_time / end_time: ISO 8601 UTC timestamps + - status: "running" | "success" | "error" + - version: code version or commit SHA (optional but recommended) + - tags: list[str] (optional short tags, e.g., ["dev","experiment-42"]) + +- Input/PII policy (allowlist + pseudonymization) + - Always avoid recording raw PII (email, full name, SSN, phone, address). + - Record a pseudonymized identifier instead: + - customer_id_hash: deterministic HMAC/SHA256 of the internal id, using a team secret (do not commit the secret). + - Safe inputs: last_event, allowlisted properties (explicitly list safe keys in code), cohort labels. + - For any potentially sensitive text, store only a redacted snippet or omit it. + +- Outputs to record + - Short structured outputs (e.g., segment label, number of citations, variant count). + - Metrics: latency_ms, token_usage, counts. + - For full text outputs (LLM responses) prefer storing an artifact reference or a redacted snippet — avoid inline PII. + +- Event naming + - Use consistent event names: "input_received", "llm_call", "citations_fetched", "variants_generated", "safety_result", "evaluation_done", "error". + - Each event should include a timestamp and a small payload with non-PII fields. + +Example run (segmenter) +----------------------- +Given a customer record (from `data/customers.json`): + +```json +{ + "id": "cust_002", + "name": "Bob", + "email": "bob@example.com", + "last_event": "payment_plans", + "properties": { + "form_started": "yes", + "scheduled": "no", + "attended": "no" + } +} +``` + +Store only safe fields and a pseudonymized id. Example recorded run: + +```json +{ + "run_id": "c7f6f3d7-1d2b-4a45-9f09-1e2b3c4d5e6f", + "run_name": "segmenter.segment_user", + "agent": "segmenter", + "start_time": "2025-11-23T08:12:06.123Z", + "end_time": "2025-11-23T08:12:06.234Z", + "status": "success", + "version": "main@b81971d", + "tags": ["dev"], + "inputs": { + "customer_id_hash": "sha256:c2f9...ab12", + "last_event": "payment_plans", + "properties": {"form_started":"yes","scheduled":"no","attended":"no"} + }, + "outputs": {"segment":"payment_plans:StartedFormOrFlow","intent_level":"medium","reasons_count":3}, + "events": [ + {"time":"2025-11-23T08:12:06.130Z","name":"segment_computed","payload":{"segment":"payment_plans:StartedFormOrFlow","intent_level":"medium"}} + ] +} +``` + +Hashing guidance +---------------- +- Use a deterministic HMAC or SHA256 with a team secret to produce pseudonymous IDs usable for joins but not reversible. Example: `sha256(team_secret + customer_id)`. +- Store the team secret in a secure secret store / env var and do not commit it. + +Env var: LANGSMITH_HMAC_SECRET +-------------------------------- +- Purpose: supply a secret used to compute deterministic HMAC-SHA256 pseudonymous IDs for any internal identifiers (e.g., customer ids). When present, the monitor will compute `customer_id_hash` as an HMAC-SHA256 of the raw id using this secret. +- How to set (example): + +```bash +export LANGSMITH_HMAC_SECRET="your-team-secret-very-long-and-random" +``` + +- Example output recorded in run metadata (truncated for readability): + +```json +"inputs": { + "customer_id_hash": "sha256:3a1f5b8c9d4e2f7a1b2c3d4e5f67890abcdef1234567890abcdef1234567890", + "last_event": "payment_plans" +} +``` + +Notes: +- If `LANGSMITH_HMAC_SECRET` is not set the wrapper falls back to a plain SHA256 digest of the id. This still avoids storing raw PII but is less secure for deterministic joins across systems. Prefer setting the HMAC secret in a secure store. +- Keep the secret out of version control and CI logs. Use your environment/secret manager (GitHub Secrets, AWS Parameter Store, Azure Key Vault, etc.). + +Team checklist to finalize +------------------------- +- [ ] Approve run_name pattern and list of agents to instrument +- [ ] Approve required metadata fields and allowlisted input keys +- [ ] Decide HMAC/secret location for deterministic hashing +- [ ] Decide retention policy for run artifacts and full-text captures +- [ ] Plan LangSmith SDK wiring once conventions are finalized + +--- + +Paste this section into the PR body or the issue comment to capture the agreed conventions. Once agreed I can wire a deterministic hash into `langsmith_monitor.start_run` and add tests that assert recorded runs follow the schema. + # LangSmith monitor (opt-in) This folder contains a lightweight, opt-in LangSmith monitoring wrapper and example instrumentation for generator and retriever agents. diff --git a/backend/agents/generator.py b/backend/agents/generator.py index 6298ef47..8ef4353d 100644 --- a/backend/agents/generator.py +++ b/backend/agents/generator.py @@ -397,6 +397,7 @@ def generate_variants( This keeps your pipeline testable locally even without secrets. """ run_id = None + # Before generating variants, starts a LangSmith run if LANGSMITH_ENABLED: run_id = start_run("generator.generate_variants", {"agent": "generator", "customer_id": customer.get("id"), "use_case": segment.get("use_case")}) diff --git a/backend/services/langsmith_monitor.py b/backend/services/langsmith_monitor.py index 9803146e..3dd51ee2 100644 --- a/backend/services/langsmith_monitor.py +++ b/backend/services/langsmith_monitor.py @@ -14,11 +14,15 @@ import os import time import uuid +import hashlib +import hmac from pathlib import Path from typing import Any, Dict, Optional LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY") LANGSMITH_ENABLED = bool(os.getenv("LANGSMITH_ENABLED") == "1" or LANGSMITH_API_KEY) +# Optional secret for deterministic pseudonymization (HMAC-SHA256). Keep this out of source control. +LANGSMITH_HMAC_SECRET = os.getenv("LANGSMITH_HMAC_SECRET") # Try to import the real langsmith SDK if available. We don't require it. HAS_SDK = False @@ -48,6 +52,26 @@ def start_run(name: str, metadata: Optional[Dict[str, Any]] = None) -> str: """ run_id = str(uuid.uuid4()) metadata = metadata or {} + + # Pseudonymize known identifier keys to avoid writing raw PII. + # We look for common keys and replace them with a deterministic hash. + id_keys = ["customer_id", "user_id", "customer"] + for k in list(metadata.keys()): + if k in id_keys and isinstance(metadata.get(k), str): + raw = metadata.pop(k) + secret = LANGSMITH_HMAC_SECRET + if secret: + digest = hmac.new(secret.encode("utf-8"), raw.encode("utf-8"), hashlib.sha256).hexdigest() + method = "hmac-sha256" + else: + # Fallback to plain sha256 if no secret is available. Still non-reversible. + digest = hashlib.sha256(raw.encode("utf-8")).hexdigest() + method = "sha256" + + metadata["customer_id_hash"] = f"sha256:{digest}" + metadata.setdefault("pseudonymization", {})["method"] = method + metadata.setdefault("pseudonymization", {})["secret_present"] = bool(secret) + record = { "id": run_id, "name": name, diff --git a/backend/tests/test_langsmith_monitor_hashing.py b/backend/tests/test_langsmith_monitor_hashing.py new file mode 100644 index 00000000..286f1b02 --- /dev/null +++ b/backend/tests/test_langsmith_monitor_hashing.py @@ -0,0 +1,53 @@ +import importlib +import json +import sys +from pathlib import Path + + +def _reload_monitor(monkeypatch): + # Ensure the monitor module picks up environment changes during tests + monkeypatch.setenv("LANGSMITH_ENABLED", "1") + # Remove module if already loaded so import picks up env changes + if "backend.services.langsmith_monitor" in sys.modules: + importlib.reload(sys.modules["backend.services.langsmith_monitor"]) + else: + importlib.import_module("backend.services.langsmith_monitor") + return sys.modules["backend.services.langsmith_monitor"] + + +def test_pseudonymization_with_secret(monkeypatch, tmp_path): + monkeypatch.setenv("LANGSMITH_HMAC_SECRET", "test-secret-123") + monitor = _reload_monitor(monkeypatch) + + run_id = monitor.start_run("segmenter.segment_user", metadata={ + "customer_id": "cust_002", + "last_event": "payment_plans", + }) + + path = Path(monitor.RUNS_DIR) / f"{run_id}.json" + assert path.exists(), "run file should be written" + data = json.loads(path.read_text(encoding="utf-8")) + + # raw id must not be present + assert "customer_id" not in data.get("metadata", {}), "raw customer_id must not be stored" + assert "customer_id_hash" in data.get("metadata", {}), "hashed id must be present" + ps = data["metadata"].get("pseudonymization", {}) + assert ps.get("method") == "hmac-sha256" + assert ps.get("secret_present") is True + + +def test_pseudonymization_without_secret(monkeypatch): + # Ensure no secret is set + monkeypatch.delenv("LANGSMITH_HMAC_SECRET", raising=False) + monitor = _reload_monitor(monkeypatch) + + run_id = monitor.start_run("segmenter.segment_user", metadata={"customer_id": "cust_003"}) + path = Path(monitor.RUNS_DIR) / f"{run_id}.json" + assert path.exists() + data = json.loads(path.read_text(encoding="utf-8")) + + assert "customer_id" not in data.get("metadata", {}) + assert "customer_id_hash" in data.get("metadata", {}) + ps = data["metadata"].get("pseudonymization", {}) + assert ps.get("method") == "sha256" + assert ps.get("secret_present") is False From 3bed10bcb33f12f74f6f679af2753584197dac56 Mon Sep 17 00:00:00 2001 From: Lillian Yang <87347776+lillian0624@users.noreply.github.com> Date: Sun, 23 Nov 2025 14:57:52 +1100 Subject: [PATCH 07/23] Update backend/run_swagger_light.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/run_swagger_light.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/run_swagger_light.py b/backend/run_swagger_light.py index bc5e4f78..db8f2dbc 100644 --- a/backend/run_swagger_light.py +++ b/backend/run_swagger_light.py @@ -51,4 +51,4 @@ async def orchestrate(payload: OrchestrateRequest): if __name__ == "__main__": - uvicorn.run("run_swagger_light:app", host="127.0.0.1", port=8000) + uvicorn.run(app, host="127.0.0.1", port=8000) From 900c79dd31d620b40d474b24c41fd7009f0d3120 Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sun, 23 Nov 2025 15:11:16 +1100 Subject: [PATCH 08/23] chore: fix absolute imports, make langchain/FAISS optional, and ignore local langsmith run artifacts --- .gitignore | 1 + .../117a35da-dc73-4e68-a0e7-5131ff527154.json | 14 ---- .../12eb3799-95ae-4427-ab29-bf236e6ceea1.json | 13 ---- backend/agents/generator.py | 29 +++++++- backend/app/main.py | 2 +- backend/app/routers/orchestrator.py | 12 ++-- backend/services/vector_db.py | 69 ++++++++++++++++--- 7 files changed, 94 insertions(+), 46 deletions(-) delete mode 100644 backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json delete mode 100644 backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json diff --git a/.gitignore b/.gitignore index d9030e97..d897e713 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ env/ # OS .DS_Store Thumbs.db +backend/.langsmith_local_runs/ diff --git a/backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json b/backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json deleted file mode 100644 index 2bae78de..00000000 --- a/backend/.langsmith_local_runs/117a35da-dc73-4e68-a0e7-5131ff527154.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "id": "117a35da-dc73-4e68-a0e7-5131ff527154", - "name": "segmenter.segment_user", - "metadata": { - "last_event": "payment_plans", - "customer_id_hash": "sha256:510d9b2e83b63a0c74e1122c301902f792b44f1f6a9cffa00c3172a6cea1ed18", - "pseudonymization": { - "method": "hmac-sha256", - "secret_present": true - } - }, - "start_time": 1763868061.6918888, - "events": [] -} \ No newline at end of file diff --git a/backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json b/backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json deleted file mode 100644 index e60816a0..00000000 --- a/backend/.langsmith_local_runs/12eb3799-95ae-4427-ab29-bf236e6ceea1.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "id": "12eb3799-95ae-4427-ab29-bf236e6ceea1", - "name": "segmenter.segment_user", - "metadata": { - "customer_id_hash": "sha256:2a5ad226ff7be23dc12f0745cec9e7e12da2458302dc669faebde99ac998315f", - "pseudonymization": { - "method": "sha256", - "secret_present": false - } - }, - "start_time": 1763868061.693179, - "events": [] -} \ No newline at end of file diff --git a/backend/agents/generator.py b/backend/agents/generator.py index 8ef4353d..1f4b3ef8 100644 --- a/backend/agents/generator.py +++ b/backend/agents/generator.py @@ -37,9 +37,32 @@ import json from typing import Any, Dict, List, Optional -from langchain_core.documents import Document # for downstream typing only -from langchain_openai import AzureChatOpenAI, ChatOpenAI -from langchain_core.messages import SystemMessage, HumanMessage +try: + from langchain_core.documents import Document # for downstream typing only +except Exception: + # Minimal placeholder for tests when langchain_core isn't installed + class Document: + def __init__(self, page_content: str = "", metadata: dict | None = None): + self.page_content = page_content + self.metadata = metadata or {} + +try: + from langchain_openai import AzureChatOpenAI, ChatOpenAI +except Exception: + AzureChatOpenAI = None + ChatOpenAI = None + +try: + from langchain_core.messages import SystemMessage, HumanMessage +except Exception: + # Simple message placeholders used only for LLM path; safe to stub when not installed + class SystemMessage: + def __init__(self, content: str): + self.content = content + + class HumanMessage: + def __init__(self, content: str): + self.content = content from services.langsmith_monitor import start_run, log_event, finish_run, LANGSMITH_ENABLED # IMPORTANT: adjust this import path if your config module lives elsewhere. diff --git a/backend/app/main.py b/backend/app/main.py index 6b21c1e0..c066cac3 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -5,7 +5,7 @@ from typing import Optional, Dict, Any import uvicorn -from ..services.logger import get_logger +from services.logger import get_logger from .routers.health import router as health_router from .routers.orchestrator import router as orchestrator_router diff --git a/backend/app/routers/orchestrator.py b/backend/app/routers/orchestrator.py index 63eb5aee..27e7464c 100644 --- a/backend/app/routers/orchestrator.py +++ b/backend/app/routers/orchestrator.py @@ -5,12 +5,12 @@ from services.logger import get_logger from app.store import MemoryStore, store from app.graph import Orchestrator -from ..nodes.segmenter_node import SegmenterNode -from ..nodes.retriever_node import RetrieverNode -from ..nodes.generator_node import GeneratorNode -from ..nodes.safety_node import SafetyNode -from ..nodes.hitl_node import HITLNode -from ..nodes.analytics_node import AnalyticsNode +from app.nodes.segmenter_node import SegmenterNode +from app.nodes.retriever_node import RetrieverNode +from app.nodes.generator_node import GeneratorNode +from app.nodes.safety_node import SafetyNode +from app.nodes.hitl_node import HITLNode +from app.nodes.analytics_node import AnalyticsNode router = APIRouter() logger = get_logger("orchestrator") diff --git a/backend/services/vector_db.py b/backend/services/vector_db.py index 117eaaf3..baef98a9 100644 --- a/backend/services/vector_db.py +++ b/backend/services/vector_db.py @@ -20,10 +20,36 @@ from pathlib import Path from typing import List, Dict, Any, Optional -from langchain_openai import AzureOpenAIEmbeddings -from langchain_community.vectorstores import FAISS -from langchain_core.documents import Document -from langchain_community.embeddings import FakeEmbeddings +# Defer optional heavy imports (langchain, FAISS) until runtime so tests that +# don't require real embeddings can import this module without installing +# langchain-related packages. +AzureOpenAIEmbeddings = None +FAISS = None +Document = None +FakeEmbeddings = None +try: + from langchain_openai import AzureOpenAIEmbeddings # type: ignore +except Exception: + AzureOpenAIEmbeddings = None + +try: + from langchain_community.vectorstores import FAISS # type: ignore +except Exception: + FAISS = None + +try: + from langchain_core.documents import Document # type: ignore +except Exception: + # Provide a minimal Document placeholder for typing/runtime when missing + class Document: + def __init__(self, page_content: str = "", metadata: dict | None = None): + self.page_content = page_content + self.metadata = metadata or {} + +try: + from langchain_community.embeddings import FakeEmbeddings # type: ignore +except Exception: + FakeEmbeddings = None # Default JSONL path (can be overridden by callers) @@ -31,7 +57,7 @@ DEFAULT_JSONL_PATH = PROJECT_ROOT / "data" / "irs_tax_knowledge.jsonl" # Internal cache -_VECTORSTORE: Optional[FAISS] = None +_VECTORSTORE: Optional[Any] = None _CORPUS_CACHE: Optional[List[Dict[str, Any]]] = None @@ -54,7 +80,20 @@ def get_embedding_model(): "[vector_db] Azure OpenAI config missing – using FakeEmbeddings for local testing." ) # size can be anything consistent; 1536 is a common embedding dimension - return FakeEmbeddings(size=1536) + if FakeEmbeddings is not None: + return FakeEmbeddings(size=1536) + # Minimal in-memory fallback embedding model: returns zero vectors + class _ZeroEmbeddings: + def __init__(self, size: int = 1536): + self.size = size + + def embed_documents(self, docs): + return [[0.0] * self.size for _ in docs] + + def embed_query(self, q): + return [0.0] * self.size + + return _ZeroEmbeddings(size=1536) # Real Azure embeddings for production return AzureOpenAIEmbeddings( @@ -88,7 +127,7 @@ def load_corpus(jsonl_path: Optional[str] = None) -> List[Dict[str, Any]]: return docs -def _build_vectorstore(jsonl_path: Optional[str] = None) -> FAISS: +def _build_vectorstore(jsonl_path: Optional[str] = None) -> Any: """ Build a FAISS vector store from the JSONL corpus. @@ -111,13 +150,25 @@ def _build_vectorstore(jsonl_path: Optional[str] = None) -> FAISS: "source": d.get("source", "corpus"), } documents.append(Document(page_content=page_content, metadata=metadata)) - embeddings = get_embedding_model() + + # If FAISS is not available, provide a lightweight in-memory placeholder + if FAISS is None: + class _InMemoryVS: + def __init__(self, docs): + self._docs = docs + + def similarity_search(self, query, k=5): + # naive: return first k documents + return [Document(page_content=d.get("text", ""), metadata={k: v for k, v in d.items()}) for d in self._docs[:k]] + + return _InMemoryVS(corpus) + vs = FAISS.from_documents(documents, embedding=embeddings) return vs -def get_vectorstore(jsonl_path: Optional[str] = None) -> FAISS: +def get_vectorstore(jsonl_path: Optional[str] = None) -> Any: """ Get (or lazily build) the FAISS vector store. """ From 56fc5b1ad1321803a96a6b0668bc2b28928c68f2 Mon Sep 17 00:00:00 2001 From: Lillian Yang <87347776+lillian0624@users.noreply.github.com> Date: Sun, 23 Nov 2025 17:10:04 +1100 Subject: [PATCH 09/23] Update backend/README_SWAGGER.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/README_SWAGGER.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/README_SWAGGER.md b/backend/README_SWAGGER.md index e43ccb16..2be052d8 100644 --- a/backend/README_SWAGGER.md +++ b/backend/README_SWAGGER.md @@ -9,7 +9,7 @@ What it does Files - `run_swagger_light.py` — lightweight runner added to the `backend/` folder. -Run locally (recommended) +## Run locally (recommended) 1. Change to the `backend/` folder: ```bash From 318ce233704d1c3942320e7f777af3b5e45acc2d Mon Sep 17 00:00:00 2001 From: Lillian Yang <87347776+lillian0624@users.noreply.github.com> Date: Sun, 23 Nov 2025 17:10:26 +1100 Subject: [PATCH 10/23] Update backend/README_SWAGGER.md Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/README_SWAGGER.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/README_SWAGGER.md b/backend/README_SWAGGER.md index 2be052d8..82a5ae84 100644 --- a/backend/README_SWAGGER.md +++ b/backend/README_SWAGGER.md @@ -6,7 +6,7 @@ This file documents how to run a lightweight local FastAPI server that exposes t What it does - Creates a tiny FastAPI app that includes the health endpoint and a dummy `/orchestrate` route with the same request model shapes used by the real app. This is intended only for local OpenAPI inspection and development of client code. -Files +## Files - `run_swagger_light.py` — lightweight runner added to the `backend/` folder. ## Run locally (recommended) From eeece9f2f60a4a1d402ae25159d6db1120c488df Mon Sep 17 00:00:00 2001 From: Lillian Yang <87347776+lillian0624@users.noreply.github.com> Date: Sun, 23 Nov 2025 17:10:43 +1100 Subject: [PATCH 11/23] Update backend/app/routers/orchestrator.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/app/routers/orchestrator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/app/routers/orchestrator.py b/backend/app/routers/orchestrator.py index 27e7464c..9e1fdf32 100644 --- a/backend/app/routers/orchestrator.py +++ b/backend/app/routers/orchestrator.py @@ -34,7 +34,6 @@ def get_generator() -> GeneratorNode: def get_safety() -> SafetyNode: return SafetyNode() - def get_hitl() -> HITLNode: return HITLNode() From 3ee43e3c31908eebe8b48ee79cc7ff2403c674e9 Mon Sep 17 00:00:00 2001 From: Lillian Yang <87347776+lillian0624@users.noreply.github.com> Date: Sun, 23 Nov 2025 17:11:17 +1100 Subject: [PATCH 12/23] Update backend/app/graph/orchestrator.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/app/graph/orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/graph/orchestrator.py b/backend/app/graph/orchestrator.py index 43f758af..ac8a6fbf 100644 --- a/backend/app/graph/orchestrator.py +++ b/backend/app/graph/orchestrator.py @@ -40,7 +40,7 @@ def __init__( retriever: Optional[RetrieverNode] = None, generator: Optional[GeneratorNode] = None, safety: Optional[SafetyNode] = None, - hitl: Optional[HITLNode] = None, + hitl: Optional[HITLNode] = None, analytics: Optional[AnalyticsNode] = None, ) -> None: # Prefer an explicitly provided store, otherwise fall back to the module-level `store` From 0adb36ef158b3b35cc9fd035299af9ec4d1e3ca5 Mon Sep 17 00:00:00 2001 From: Lillian Yang <87347776+lillian0624@users.noreply.github.com> Date: Sun, 23 Nov 2025 17:11:34 +1100 Subject: [PATCH 13/23] Update backend/agents/safety_gate.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/agents/safety_gate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/agents/safety_gate.py b/backend/agents/safety_gate.py index 47ac2a14..79fec1ab 100644 --- a/backend/agents/safety_gate.py +++ b/backend/agents/safety_gate.py @@ -259,8 +259,8 @@ def safety_check_and_filter(variants: List[Dict[str, Any]]) -> Dict[str, Any]: except Exception: try: finish_run(run_id, status="error", outputs={}) - except Exception: - pass + except Exception as e: + logger.exception("Exception occurred while calling finish_run in error handling: %s", e) return result From 712901e9a25855eb2d92e88afd712e238eb6f015 Mon Sep 17 00:00:00 2001 From: Lillian Yang <87347776+lillian0624@users.noreply.github.com> Date: Sun, 23 Nov 2025 17:11:56 +1100 Subject: [PATCH 14/23] Update backend/services/langsmith_monitor.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- backend/services/langsmith_monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/services/langsmith_monitor.py b/backend/services/langsmith_monitor.py index 3dd51ee2..bb04d584 100644 --- a/backend/services/langsmith_monitor.py +++ b/backend/services/langsmith_monitor.py @@ -28,7 +28,7 @@ HAS_SDK = False _client = None try: - import langsmith + # import langsmith # Removed unused import HAS_SDK = True # Defer client creation until needed and after validating API key From ef713fb7233d619ff2c4a3a23938774fb04ea203 Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sun, 23 Nov 2025 17:25:20 +1100 Subject: [PATCH 15/23] fix(tests): ensure repo root in sys.path; orchestrator: invoke LangGraph and persist transient state (support overridden segmenter) --- backend/app/graph/orchestrator.py | 81 ++++++++++++++++--------------- backend/tests/conftest.py | 11 +++-- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/backend/app/graph/orchestrator.py b/backend/app/graph/orchestrator.py index ac8a6fbf..cc2b3ed7 100644 --- a/backend/app/graph/orchestrator.py +++ b/backend/app/graph/orchestrator.py @@ -89,46 +89,51 @@ async def run_flow(self, flow_name: str, payload: Dict[str, Any]) -> Dict[str, A except Exception: self.logger.exception("failed to persist variants") - # 4. Safety checks - safety_result = self.safety.run(variants) - safe_count = len(safety_result.get("safe", [])) if isinstance(safety_result, dict) else 0 - blocked_count = len(safety_result.get("blocked", [])) if isinstance(safety_result, dict) else 0 - self.logger.info(f"Safety safe={safe_count} blocked={blocked_count}") - - - # 5. Human-in-the-loop (HITL) - hitl_result = None - safe_variants = ( - safety_result.get("safe", []) if isinstance(safety_result, dict) else [] - ) - if safe_variants: - # HITLNode should accept (customer, safe_variants) - hitl_result = self.hitl.run(payload, safe_variants) - # Example hitl_result: {"review_id": "...", "status": "pending_human_approval"} - self.logger.info(f"HITL: {hitl_result}") - try: - self.store.set(f"{key}:hitl", hitl_result) - except Exception: - self.logger.exception("failed to persist hitl result") - - # 6. Analytics / choose winner - analysis = self.analytics.run({"variants": safety_result.get("safe", []), "customer": payload}) if isinstance(safety_result, dict) else None - winner = analysis.get("winner") if isinstance(analysis, dict) and analysis else None + # Build and invoke the LangGraph flow to get a final state dict. try: - segment_from_node = self.segmenter.run(payload) - self.logger.info("Segment (node): %s", segment_from_node) - self.store.set(f"{key}:segment", segment_from_node) + graph = build_graph() + final_state = graph.invoke({"customer": payload}) except Exception: - self.logger.exception("failed to persist analysis/winner") - - # 7. Delivery (mock) - delivery_result = None - if winner and isinstance(safety_result, dict): - variant = next((v for v in safety_result.get("safe", []) if v.get("id") == winner.get("variant_id")), None) - if variant: - delivery_result = send_email_mock(payload.get("email"), variant.get("subject"), variant.get("body")) + self.logger.exception("failed to invoke LangGraph flow") + final_state = {} + + # Extract expected fields from the final state with safe fallbacks. + segment = final_state.get("segment") + citations = final_state.get("citations") + variants = final_state.get("variants") + safety_result = final_state.get("safety") or {} + hitl_result = final_state.get("hitl") + analysis = final_state.get("analysis") + delivery_result = final_state.get("delivery") + + # Log safety summary if present + if isinstance(safety_result, dict): + safe_count = len(safety_result.get("safe", [])) + blocked_count = len(safety_result.get("blocked", [])) + self.logger.info(f"Safety safe={safe_count} blocked={blocked_count}") + + # Persist transient results (best-effort) + try: + if segment is not None: + # Allow an injected/overridden segmenter (useful in tests) + try: + segment_from_node = self.segmenter.run(payload) + segment = segment_from_node + except Exception: + # Fall back to the segment computed by the graph + pass + self.store.set(f"{key}:segment", segment) + if citations is not None: + self.store.set(f"{key}:citations", citations) + if variants is not None: + self.store.set(f"{key}:variants", variants) + if hitl_result is not None: + self.store.set(f"{key}:hitl", hitl_result) + if analysis is not None: + self.store.set(f"{key}:analysis", analysis) + except Exception: + self.logger.exception("failed to persist transient state") - response = { "segment": segment, "citations": citations, @@ -136,7 +141,7 @@ async def run_flow(self, flow_name: str, payload: Dict[str, Any]) -> Dict[str, A "safety": safety_result, "hitl": hitl_result, "analysis": analysis, - "delivery": delivery, + "delivery": delivery_result, } self.logger.info("Orchestrator.run_flow completed: %s", flow_name) diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 8b3836ce..bb36a329 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -3,7 +3,10 @@ def pytest_configure(): - # Ensure the `backend` package directory is on sys.path so tests can import `app`. - base = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) - if base not in sys.path: - sys.path.insert(0, base) + # Ensure the repository root (parent of `backend`) is on sys.path so tests + # can import packages like `backend.services` or `app` reliably. Previously + # we inserted the `backend` directory which made `import backend...` fail + # (Python would look for backend/backend). Add the repo root instead. + repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + if repo_root not in sys.path: + sys.path.insert(0, repo_root) From 65a3c68365babc96d2ae3c96186dcf8b55c0c57d Mon Sep 17 00:00:00 2001 From: Lillian Yang Date: Sun, 23 Nov 2025 18:46:34 +1100 Subject: [PATCH 16/23] feat(assignment): add A/B assignment agent, node adapter, and wire into LangGraph; add tests --- backend/agents/assignment_agent.py | 145 +++++++++++++++++++++++++ backend/app/graph/langgraph_flow.py | 26 ++++- backend/app/nodes/assignment_node.py | 32 ++++++ backend/tests/test_assignment_agent.py | 17 +++ 4 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 backend/agents/assignment_agent.py create mode 100644 backend/app/nodes/assignment_node.py create mode 100644 backend/tests/test_assignment_agent.py diff --git a/backend/agents/assignment_agent.py b/backend/agents/assignment_agent.py new file mode 100644 index 00000000..9170811e --- /dev/null +++ b/backend/agents/assignment_agent.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import hashlib +import json +from enum import Enum +from typing import Any, Dict, List, Optional, Tuple + + +class AssignmentStrategy(Enum): + MD5_HASH = "md5_hash" + ROUND_ROBIN = "round_robin" + RANDOM = "random" + + +class ABAssignmentAgent: + """ + Deterministic A/B (multi-variant) assignment agent using MD5 hashing. + + Example: + agent = ABAssignmentAgent(split_ratio={"A":0.5,"B":0.5}, seed="echovoice") + assignment = agent.assign_user("U123", "exp_001", context={"email": "x@x.com"}) + """ + + def __init__( + self, + split_ratio: Optional[Dict[str, float]] = None, + seed: str = "echovoice", + strategy: AssignmentStrategy = AssignmentStrategy.MD5_HASH, + ): + if split_ratio is None: + split_ratio = {"A": 0.5, "B": 0.5} + self.split_ratio = dict(split_ratio) + self._validate_split_ratio() + self.variant_ids = list(self.split_ratio.keys()) + self.thresholds = self._compute_thresholds() + self.seed = seed + self.strategy = strategy + + def _validate_split_ratio(self) -> None: + total = sum(self.split_ratio.values()) + if not (0.999 <= total <= 1.001): + raise ValueError("split_ratio values must sum to 1.0") + for k, v in self.split_ratio.items(): + if v <= 0 or v > 1: + raise ValueError(f"Invalid split fraction for {k}: {v}") + + def _compute_thresholds(self) -> List[Tuple[str, float]]: + thresholds: List[Tuple[str, float]] = [] + cum = 0.0 + for vid in self.variant_ids: + cum += float(self.split_ratio[vid]) + thresholds.append((vid, cum)) + return thresholds + + def _compute_hash_value(self, user_id: str, experiment_id: str) -> float: + """ + Deterministic hash in [0.0, 1.0). + + Uses md5(seed + experiment_id + user_id) normalized to [0,1). + """ + key = f"{self.seed}:{experiment_id}:{user_id}" + digest = hashlib.md5(key.encode("utf-8")).hexdigest() + truncated = int(digest[:15], 16) + max_val = float(int("f" * 15, 16)) + return truncated / max_val + + def assign_user( + self, + user_id: str, + experiment_id: str, + context: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + if self.strategy != AssignmentStrategy.MD5_HASH: + raise NotImplementedError(f"Strategy {self.strategy} not implemented") + + hash_value = self._compute_hash_value(user_id, experiment_id) + chosen_variant = None + for vid, upper in self.thresholds: + if hash_value < upper: + chosen_variant = vid + break + if chosen_variant is None: + chosen_variant = self.variant_ids[-1] + + assignment = { + "variant_id": chosen_variant, + "hash_value": hash_value, + "experiment_id": experiment_id, + "user_id": user_id, + "context": context, + "deterministic": True, + } + + return assignment + + def validate_assignment(self, assignment: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + required = {"variant_id", "hash_value", "experiment_id", "user_id"} + missing = required - set(assignment.keys()) + if missing: + return False, f"Missing required field(s): {', '.join(sorted(missing))}" + if assignment["variant_id"] not in self.variant_ids: + return False, "Invalid variant_id" + hv = assignment["hash_value"] + if not isinstance(hv, (int, float)) or not (0.0 <= hv < 1.0): + return False, "hash_value out of range [0.0, 1.0)" + return True, None + + +class MicrosoftServicesAdapter: + """ + Development-friendly hooks for Azure integrations (App Insights, Kusto, Service Bus). + Replace prints with real SDK calls when deploying to Azure. + """ + + @staticmethod + def log_assignment_to_app_insights( + assignment: Dict[str, Any], + instrumentation_key: Optional[str] = None, + ) -> bool: + if instrumentation_key is None: + print(f"[App Insights Hook] Assignment: {json.dumps(assignment)}") + return True + return True + + @staticmethod + def log_assignment_to_kusto( + assignment: Dict[str, Any], + cluster_uri: Optional[str] = None, + database: str = "echovoice", + ) -> bool: + if cluster_uri is None: + print(f"[Kusto Hook] Assignment: {json.dumps(assignment)}") + return True + return True + + @staticmethod + def publish_assignment_event( + assignment: Dict[str, Any], + connection_string: Optional[str] = None, + queue_name: str = "assignment-events", + ) -> bool: + if connection_string is None: + print(f"[Service Bus Hook] Assignment Event: {json.dumps(assignment)}") + return True + return True diff --git a/backend/app/graph/langgraph_flow.py b/backend/app/graph/langgraph_flow.py index 942cebf9..eaaca0a6 100644 --- a/backend/app/graph/langgraph_flow.py +++ b/backend/app/graph/langgraph_flow.py @@ -6,6 +6,7 @@ from app.nodes.retriever_node import RetrieverNode from app.nodes.generator_node import GeneratorNode from app.nodes.safety_node import SafetyNode +from app.nodes.assignment_node import AssignmentNode from app.nodes.analytics_node import AnalyticsNode from app.nodes.hitl_node import HITLNode from services.delivery import send_email_mock @@ -20,6 +21,7 @@ class FlowState(TypedDict, total=False): citations: List[Dict[str, Any]] variants: List[Dict[str, Any]] safety: Dict[str, Any] + assignment: Dict[str, Any] hitl: Dict[str, Any] analysis: Dict[str, Any] delivery: Dict[str, Any] @@ -82,6 +84,26 @@ def safety_node(state: FlowState) -> FlowState: return state +def assignment_node(state: FlowState) -> FlowState: + """ + Assign the customer to an experiment variant deterministically. + Stores the assignment under state['assignment'] and leaves other + state values intact. + """ + customer = state.get("customer") or {} + experiment_id = customer.get("experiment_id") or customer.get("exp_id") or "exp_default" + + assignment_input = { + "user_id": customer.get("id"), + "experiment_id": experiment_id, + "context": {"segment": state.get("segment")}, + } + + assignment = AssignmentNode().run(assignment_input) + state["assignment"] = assignment + return state + + def hitl_node(state: FlowState) -> FlowState: """ Human-in-the-loop (HITL) node. @@ -179,6 +201,7 @@ def build_graph(): graph.add_node("segmenter", segmenter_node) graph.add_node("retriever", retriever_node) graph.add_node("generator", generator_node) + graph.add_node("assignment", assignment_node) graph.add_node("safety", safety_node) graph.add_node("hitl", hitl_node) graph.add_node("analytics", analytics_node) @@ -188,7 +211,8 @@ def build_graph(): graph.set_entry_point("segmenter") graph.add_edge("segmenter", "retriever") graph.add_edge("retriever", "generator") - graph.add_edge("generator", "safety") + graph.add_edge("generator", "assignment") + graph.add_edge("assignment", "safety") graph.add_edge("safety", "hitl") graph.add_edge("hitl", "analytics") graph.add_edge("analytics", "delivery") diff --git a/backend/app/nodes/assignment_node.py b/backend/app/nodes/assignment_node.py new file mode 100644 index 00000000..605a9dca --- /dev/null +++ b/backend/app/nodes/assignment_node.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Any, Dict, Optional + +from .base_node import BaseNode +from agents.assignment_agent import ABAssignmentAgent + + +class AssignmentNode(BaseNode): + """ + Thin adapter node to assign a user for an experiment. + + Expected input: + { + "user_id": "...", + "experiment_id": "...", + "context": {...} # optional + } + + Returns: + assignment dict (see ABAssignmentAgent.assign_user) + """ + + def __init__(self, name: str = "assignment", agent: Optional[ABAssignmentAgent] = None): + super().__init__(name) + self.agent = agent or ABAssignmentAgent() + + def run(self, data: Dict[str, Any]) -> Dict[str, Any]: + user_id = data.get("user_id") or data.get("customer", {}).get("id") + experiment_id = data.get("experiment_id") or data.get("exp_id") or "exp_default" + context = data.get("context") + return self.agent.assign_user(user_id, experiment_id, context=context) diff --git a/backend/tests/test_assignment_agent.py b/backend/tests/test_assignment_agent.py new file mode 100644 index 00000000..be9f03f5 --- /dev/null +++ b/backend/tests/test_assignment_agent.py @@ -0,0 +1,17 @@ +from backend.agents.assignment_agent import ABAssignmentAgent + + +def test_assignment_deterministic(): + agent = ABAssignmentAgent(split_ratio={"A": 0.5, "B": 0.5}, seed="test-seed") + a1 = agent.assign_user("user123", "exp_x") + a2 = agent.assign_user("user123", "exp_x") + assert a1["variant_id"] == a2["variant_id"] + assert a1["experiment_id"] == "exp_x" + assert 0.0 <= a1["hash_value"] < 1.0 + + +def test_validate_assignment(): + agent = ABAssignmentAgent(split_ratio={"A": 0.7, "B": 0.3}) + assignment = agent.assign_user("u1", "exp1") + ok, err = agent.validate_assignment(assignment) + assert ok and err is None From 7ae5a35cda76f338a2ad35515e75759d6dcf4f93 Mon Sep 17 00:00:00 2001 From: Arulselvi Amirrthalingam Date: Sun, 23 Nov 2025 13:51:23 -0800 Subject: [PATCH 17/23] Add media endpoints and HITL review store with tests --- backend/app/config.py | 52 +++++--- backend/app/main.py | 2 + backend/app/nodes/hitl_node.py | 23 +++- backend/app/routers/media.py | 100 +++++++++++++++ backend/app/store/reviews.py | 110 ++++++++++++++++ backend/services/media.py | 164 ++++++++++++++++++++++++ backend/tests/test_hitl_review_store.py | 50 ++++++++ 7 files changed, 478 insertions(+), 23 deletions(-) create mode 100644 backend/app/routers/media.py create mode 100644 backend/app/store/reviews.py create mode 100644 backend/services/media.py create mode 100644 backend/tests/test_hitl_review_store.py diff --git a/backend/app/config.py b/backend/app/config.py index dcb8b7b7..98b2d3aa 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,3 +1,5 @@ +# backend/app/config.py + import os from typing import List from dotenv import load_dotenv @@ -10,19 +12,32 @@ VECTOR_DB_API_KEY = os.getenv('VECTOR_DB_API_KEY') DELIVERY_PROVIDER_API_KEY = os.getenv('DELIVERY_PROVIDER_API_KEY') LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO') + # Azure OpenAI (for embeddings + chat) AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") -AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview") +AZURE_OPENAI_API_VERSION = os.getenv( + "AZURE_OPENAI_API_VERSION", + "2024-02-15-preview", +) # Embeddings deployment (to use this in vector_db) AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT = os.getenv( "AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT" ) -# NEW: chat deployment name for generator +# Chat deployment name for generator AZURE_OPENAI_CHAT_DEPLOYMENT = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT") +# === NEW: Azure Speech + Translator config for media services === +AZURE_SPEECH_KEY = os.getenv("AZURE_SPEECH_KEY") +AZURE_SPEECH_REGION = os.getenv("AZURE_SPEECH_REGION") + +AZURE_TRANSLATOR_KEY = os.getenv("AZURE_TRANSLATOR_KEY") +AZURE_TRANSLATOR_REGION = os.getenv("AZURE_TRANSLATOR_REGION") +AZURE_TRANSLATOR_ENDPOINT = os.getenv("AZURE_TRANSLATOR_ENDPOINT") +# e.g. "https://api.cognitive.microsofttranslator.com" + # Optional: non-Azure OpenAI fallback OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_MODEL_NAME = os.getenv("OPENAI_MODEL_NAME", "gpt-4o-mini") @@ -33,24 +48,27 @@ # CORS origins: allow explicit env var or sensible defaults in development _env_origins = os.getenv("ALLOWED_ORIGINS") if _env_origins: - ALLOWED_ORIGINS: List[str] = [o.strip() for o in _env_origins.split(",") if o.strip()] + ALLOWED_ORIGINS: List[str] = [ + o.strip() for o in _env_origins.split(",") if o.strip() + ] else: - if ENV == "production": - # In production default to an explicit, empty list to force operators to opt-in. - prod_origin = os.getenv("PRODUCTION_ORIGIN") - ALLOWED_ORIGINS = [prod_origin] if prod_origin else [] - else: - # Development-friendly defaults - ALLOWED_ORIGINS = [ - "http://localhost", - "http://localhost:3000", - "http://127.0.0.1:3000", - "http://localhost:8000", - ] + if ENV == "production": + # In production default to an explicit, empty list to force operators to opt-in. + prod_origin = os.getenv("PRODUCTION_ORIGIN") + ALLOWED_ORIGINS = [prod_origin] if prod_origin else [] + else: + # Development-friendly defaults + ALLOWED_ORIGINS = [ + "http://localhost", + "http://localhost:3000", + "http://127.0.0.1:3000", + "http://localhost:8000", + ] + def get_allowed_origins() -> List[str]: - """Return the configured origins list.""" - return ALLOWED_ORIGINS + """Return the configured origins list.""" + return ALLOWED_ORIGINS # Redis URL for adapter (optional - set to enable Redis-backed store) diff --git a/backend/app/main.py b/backend/app/main.py index c066cac3..1bfd81b0 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,6 +8,7 @@ from services.logger import get_logger from .routers.health import router as health_router from .routers.orchestrator import router as orchestrator_router +from .routers.media import router as media_router # added import for media router logger = get_logger('orchestrator') app = FastAPI(title='EchoVoice-AI Orchestrator') @@ -27,6 +28,7 @@ # Register routers (standard `routers/` package) app.include_router(health_router) app.include_router(orchestrator_router) +app.include_router(media_router) # included media router if __name__ == '__main__': diff --git a/backend/app/nodes/hitl_node.py b/backend/app/nodes/hitl_node.py index 1285c31f..5e3ee614 100644 --- a/backend/app/nodes/hitl_node.py +++ b/backend/app/nodes/hitl_node.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Optional from uuid import uuid4 +from app.store import reviews as review_store #to persist reviews from services.logger import get_logger @@ -22,10 +23,10 @@ class HITLNode: NOTE: - This node does NOT block on a human decision. - It just prepares the review metadata and logs it. - - In a future step, you can persist this in a database/Redis/queue - and build API endpoints like: + - API endpoints like: - GET /hitl/{review_id} - - POST /hitl/decide (approve/reject) + - POST /hitl/{review_id}/decision (approve/reject) + can load and update the stored review object. """ def __init__(self, logger: Optional[Any] = None) -> None: @@ -68,6 +69,18 @@ def run( # Generate a unique review_id that a UI or API can later use review_id = f"review_{uuid4().hex}" + # Persist full HITL review (customer + variants) so it can be fetched later + # via GET /hitl/{review_id} and updated by POST /hitl/{review_id}/decision. + # We also log it for traceability / debugging. + try: + review_store.create_review(review_id, customer, variants) + except Exception: + # If persistence fails, log the error but still return the hitl_payload + # so the rest of the flow can proceed. + self.logger.exception( + "HITLNode: failed to persist review %s to store", review_id + ) + hitl_payload = { "review_id": review_id, "status": "pending_human_approval", @@ -76,9 +89,7 @@ def run( "num_variants": len(variants), } - # In a real system, you might persist (customer + variants) - # keyed by review_id in a DB or Redis here. - # For now, we only log it so you can see it in the logs. + self.logger.info( "HITLNode: created review job %s for customer %s with %d variants", review_id, diff --git a/backend/app/routers/media.py b/backend/app/routers/media.py new file mode 100644 index 00000000..c45cf22d --- /dev/null +++ b/backend/app/routers/media.py @@ -0,0 +1,100 @@ +# backend/app/routers/media.py + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from services import media as media_service # our service layer with Azure / logic + +router = APIRouter( + prefix="/media", + tags=["media"], +) + + +# --------- Speech to Text --------- # + +class STTRequest(BaseModel): + """Request body for Speech-to-Text.""" + # For now we accept a URL pointing to an audio file. + # Later you can add support for file uploads or base64 if needed. + audio_url: str + + +class STTResponse(BaseModel): + """Response body for Speech-to-Text.""" + text: str + + +@router.post("/speech-to-text", response_model=STTResponse) +async def speech_to_text(payload: STTRequest) -> STTResponse: + """ + Convert speech (audio file at a URL) into text. + + This is meant to support UI flows where a human reviewer dictates content + or needs an audio recording transcribed. + """ + try: + text = await media_service.speech_to_text_from_url(payload.audio_url) + return STTResponse(text=text) + except Exception as e: + # In production you might log more detail but return a generic message. + raise HTTPException(status_code=500, detail="Speech-to-text failed") from e + + +# --------- Text to Speech --------- # + +class TTSRequest(BaseModel): + """Request body for Text-to-Speech.""" + # The email text (or part of it) that should be read out loud. + text: str + + +class TTSResponse(BaseModel): + """Response body for Text-to-Speech.""" + # URL where the generated audio can be fetched/streamed by the UI. + audio_url: str + + +@router.post("/text-to-speech", response_model=TTSResponse) +async def text_to_speech(payload: TTSRequest) -> TTSResponse: + """ + Convert text into speech and return an audio URL. + + The UI can call this so the human reviewer can listen to the email + instead of reading the text. + """ + try: + audio_url = await media_service.text_to_speech_to_url(payload.text) + return TTSResponse(audio_url=audio_url) + except Exception as e: + raise HTTPException(status_code=500, detail="Text-to-speech failed") from e + + +# --------- Translation --------- # + +class TranslateRequest(BaseModel): + """Request body for translation.""" + text: str + target_lang: str # e.g. "es", "fr", "ta" + + +class TranslateResponse(BaseModel): + """Response body for translation.""" + translated_text: str + + +@router.post("/translate", response_model=TranslateResponse) +async def translate(payload: TranslateRequest) -> TranslateResponse: + """ + Translate text into the target language. + + Used by the reviewer UI to see the generated email in different languages. + """ + try: + translated = await media_service.translate_text( + payload.text, + payload.target_lang, + ) + return TranslateResponse(translated_text=translated) + except Exception as e: + raise HTTPException(status_code=500, detail="Translation failed") from e diff --git a/backend/app/store/reviews.py b/backend/app/store/reviews.py new file mode 100644 index 00000000..813a25ea --- /dev/null +++ b/backend/app/store/reviews.py @@ -0,0 +1,110 @@ +# backend/app/store/reviews.py + +""" +Helpers for storing and retrieving HITL review objects. + +A "review" represents a Human-in-the-Loop (HITL) review job created by HITLNode. +Reviews are stored in the shared app.store backend (MemoryStore or RedisStore) +under keys of the form: + + hitl:{review_id} + +This module provides a small, typed API so other parts of the system +(HITLNode, HITL router, tests) can work with HITL reviews without +having to know the key format or storage details. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from app.store import store + + +def _review_key(review_id: str) -> str: + """ + Build the storage key for a given review_id. + + Example: + review_id = "abc123" -> "hitl:abc123" + """ + return f"hitl:{review_id}" + + +def _now_iso() -> str: + """ + Return current time as an ISO 8601 string in UTC. + + Using a string keeps the review object JSON-serializable, which is + important for RedisStore where values are stored as JSON. + """ + return datetime.now(timezone.utc).isoformat() + + +def create_review( + review_id: str, + customer: Dict[str, Any], + variants: List[Dict[str, Any]], +) -> Dict[str, Any]: + """ + Create and persist a new HITL review object. + + Args: + review_id: Unique identifier for the review. Should be stable and + used to look up this review later via GET /hitl/{review_id}. + customer: Customer payload (dict) from the orchestrator / flow state. + variants: List of safe variants (dicts) produced by the generator/safety + pipeline. Each should include at least an "id" and "text" field. + + Returns: + The full review object as stored. + """ + now = _now_iso() + review: Dict[str, Any] = { + "review_id": review_id, + "customer": customer, + "variants": variants, + "status": "pending_human_approval", + "approved_variant_id": None, + "notes": None, + "created_at": now, + "updated_at": now, + } + + store.set(_review_key(review_id), review) + return review + + +def get_review(review_id: str) -> Optional[Dict[str, Any]]: + """ + Fetch a HITL review by ID. + + Args: + review_id: The review identifier used when it was created. + + Returns: + The review dict if found, or None if not present in the store. + """ + return store.get(_review_key(review_id)) + + +def save_review(review: Dict[str, Any]) -> None: + """ + Persist an updated HITL review object. + + This function: + - Updates the `updated_at` timestamp. + - Writes the review back to the underlying store. + + It assumes `review["review_id"]` is present. + + Args: + review: The full review object to save. + """ + review_id = review.get("review_id") + if not review_id: + raise ValueError("review object must contain a 'review_id' field") + + review["updated_at"] = _now_iso() + store.set(_review_key(review_id), review) diff --git a/backend/services/media.py b/backend/services/media.py new file mode 100644 index 00000000..e900f4a8 --- /dev/null +++ b/backend/services/media.py @@ -0,0 +1,164 @@ +# backend/services/media.py + +""" +Media-related services for EchoVoice AI. + +Responsibilities: +- Speech-to-Text (STT) using Azure Speech (planned) +- Text-to-Speech (TTS) using Azure Speech (planned) +- Translation using Azure Translator (implemented via REST) + +These functions are pure service logic. Routers call these, +and they can later be re-used by other flows (e.g., LangGraph) +without going through HTTP again. +""" + +from typing import Optional + +import httpx + +from app import config +from services.logger import get_logger + +logger = get_logger("media") + + +class MediaConfigError(RuntimeError): + """Raised when required Azure media configuration is missing.""" + + +def _require_speech_config() -> None: + """Ensure Azure Speech configuration is set before STT/TTS calls.""" + if not config.AZURE_SPEECH_KEY or not config.AZURE_SPEECH_REGION: + raise MediaConfigError( + "Azure Speech config missing. " + "Set AZURE_SPEECH_KEY and AZURE_SPEECH_REGION in your environment." + ) + + +def _require_translator_config() -> None: + """Ensure Azure Translator configuration is set before translation calls.""" + if not config.AZURE_TRANSLATOR_KEY or not config.AZURE_TRANSLATOR_ENDPOINT: + raise MediaConfigError( + "Azure Translator config missing. " + "Set AZURE_TRANSLATOR_KEY and AZURE_TRANSLATOR_ENDPOINT " + "in your environment." + ) + + +# --------- Speech to Text --------- # + +async def speech_to_text_from_url(audio_url: str) -> str: + """ + Convert speech (from an audio URL) into text. + + Current implementation is a stub for local development: + - Validates that Azure Speech config is present. + - Logs the call. + - Returns a placeholder transcript. + + Later, you can replace the stub with a real Azure Speech SDK or REST call. + """ + _require_speech_config() + logger.info("Starting speech-to-text for audio_url=%s", audio_url) + + # TODO: Replace this stub with a real Azure Speech call. + transcript = f"[stub transcript for {audio_url}]" + + logger.info("Completed speech-to-text for audio_url=%s", audio_url) + return transcript + + +# --------- Text to Speech --------- # + +async def text_to_speech_to_url(text: str) -> str: + """ + Convert text into speech and return an audio URL. + + Current implementation is a stub: + - Validates Azure Speech config. + - Logs the call. + - Returns a fake URL. + + In a real implementation you would: + - Call Azure TTS to generate audio. + - Store the audio in blob storage (or similar). + - Return the storage URL for the UI to play. + """ + _require_speech_config() + logger.info("Starting text-to-speech (text length=%d)", len(text)) + + # TODO: Replace this stub with a real Azure TTS call and persisted audio. + fake_audio_url = "https://example.com/audio/generated-from-tts.wav" + + logger.info("Completed text-to-speech, audio_url=%s", fake_audio_url) + return fake_audio_url + + +# --------- Translation --------- # + +async def translate_text(text: str, target_lang: str) -> str: + """ + Translate text using Azure Translator REST API. + + Uses: + - AZURE_TRANSLATOR_ENDPOINT (e.g. "https://api.cognitive.microsofttranslator.com") + - AZURE_TRANSLATOR_KEY + - AZURE_TRANSLATOR_REGION (if required by your resource) + """ + _require_translator_config() + logger.info( + "Starting translation to target_lang=%s (text length=%d)", + target_lang, + len(text), + ) + + base = config.AZURE_TRANSLATOR_ENDPOINT.rstrip("/") + url = f"{base}/translate" + + params = { + "api-version": "3.0", + "to": target_lang, + } + + headers = { + "Ocp-Apim-Subscription-Key": config.AZURE_TRANSLATOR_KEY, + # Some Azure setups require the region header as well. + "Ocp-Apim-Subscription-Region": config.AZURE_TRANSLATOR_REGION or "", + "Content-Type": "application/json", + } + + body = [{"Text": text}] + + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, params=params, headers=headers, json=body) + resp.raise_for_status() + except httpx.HTTPError as e: + logger.exception("Azure Translator HTTP error") + raise RuntimeError("Azure Translator request failed") from e + + try: + data = resp.json() + # Expected response shape: + # [ + # { + # "translations": [ + # { + # "text": "...", + # "to": "xx" + # } + # ] + # } + # ] + first_item = data[0] + translations = first_item.get("translations", []) + if not translations: + raise KeyError("Missing 'translations' in Azure response") + translated_text = translations[0]["text"] + except Exception as e: + logger.exception("Failed to parse Azure Translator response") + raise RuntimeError("Invalid response from Azure Translator") from e + + logger.info("Completed translation to target_lang=%s", target_lang) + return translated_text diff --git a/backend/tests/test_hitl_review_store.py b/backend/tests/test_hitl_review_store.py new file mode 100644 index 00000000..a9bda05b --- /dev/null +++ b/backend/tests/test_hitl_review_store.py @@ -0,0 +1,50 @@ +# backend/tests/test_hitl_review_store.py + +from app.nodes.hitl_node import HITLNode +from app.store import reviews as review_store + + +def test_hitl_node_persists_review_with_variants(): + node = HITLNode() + + customer = {"id": "cust_123", "email": "test@example.com"} + variants = [ + {"id": "A", "text": "Hello A"}, + {"id": "B", "text": "Hello B"}, + ] + + result = node.run(customer, variants) + + # Check lightweight payload + assert result["status"] == "pending_human_approval" + assert result["num_variants"] == 2 + assert result["review_id"] is not None + + review_id = result["review_id"] + + # Check persisted review in store + stored = review_store.get_review(review_id) + assert stored is not None + assert stored["review_id"] == review_id + assert stored["status"] == "pending_human_approval" + assert stored["customer"]["id"] == "cust_123" + assert stored["customer"]["email"] == "test@example.com" + assert len(stored["variants"]) == 2 + assert stored["approved_variant_id"] is None + assert stored["notes"] is None + assert "created_at" in stored + assert "updated_at" in stored + + +def test_hitl_node_no_variants_does_not_create_review(): + node = HITLNode() + + customer = {"id": "cust_empty", "email": "empty@example.com"} + variants = [] # no safe variants + + result = node.run(customer, variants) + + # No review created in this case + assert result["review_id"] is None + assert result["status"] == "no_variants" + assert result["num_variants"] == 0 From 9eb770a346e112fffd5f4a09b9395a8b49c239fc Mon Sep 17 00:00:00 2001 From: Arulselvi Amirrthalingam Date: Sun, 23 Nov 2025 21:21:07 -0800 Subject: [PATCH 18/23] Add HITL audit logging and audit log tests --- backend/app/store/audit_log.py | 101 ++++++++++++++++++++++++++++++++ backend/tests/test_audit_log.py | 72 +++++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 backend/app/store/audit_log.py create mode 100644 backend/tests/test_audit_log.py diff --git a/backend/app/store/audit_log.py b/backend/app/store/audit_log.py new file mode 100644 index 00000000..7a53d1b2 --- /dev/null +++ b/backend/app/store/audit_log.py @@ -0,0 +1,101 @@ +# backend/app/store/audit_log.py + +""" +Audit logging for Human-in-the-Loop (HITL) interactions. + +This module records a time-ordered list of actions for each review_id, +so we can answer questions like: +- Who approved which variant, and when? +- How often did the reviewer use TTS, STT, or Translate? +- What language / modality was used? + +Logs are stored in the shared app.store backend (MemoryStore / RedisStore) +under keys of the form: + + hitl:log:{review_id} + +Each log entry is a dict with: + { + "timestamp": ISO-8601 UTC string, + "review_id": str | None, + "user_id": str | None, + "action": str, + "metadata": dict + } +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from app.store import store + + +def _log_key(review_id: str) -> str: + """ + Build the storage key for logs belonging to a specific review_id. + """ + return f"hitl:log:{review_id}" + + +def _now_iso() -> str: + """ + Return current time as an ISO 8601 string in UTC. + """ + return datetime.now(timezone.utc).isoformat() + + +def log_action( + review_id: Optional[str], + action: str, + user_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> None: + """ + Append a single audit log entry for a given review_id. + + Args: + review_id: The HITL review ID this action is associated with. + Can be None if not tied to a specific review, but + grouping by review_id is recommended when available. + action: A short action code, e.g.: + - "TTS_PLAY" + - "STT_TRANSCRIBE" + - "TRANSLATE" + - "APPROVE_DECISION" + user_id: Identifier for the human reviewer or system user (if known). + metadata: Extra context (e.g. variant_id, target_lang, audio_url). + """ + if review_id is None: + # If there's no review_id, we currently skip logging. + # You can extend this later to support global logs if needed. + return + + key = _log_key(review_id) + entry = { + "timestamp": _now_iso(), + "review_id": review_id, + "user_id": user_id, + "action": action, + "metadata": metadata or {}, + } + + # Load existing log list (if any), append, and save back. + logs: Optional[List[Dict[str, Any]]] = store.get(key) + if logs is None: + logs = [] + + logs.append(entry) + store.set(key, logs) + + +def get_logs(review_id: str) -> List[Dict[str, Any]]: + """ + Fetch all audit log entries for a given review_id. + + Returns: + A list of log entry dicts (possibly empty if nothing logged yet). + """ + logs = store.get(_log_key(review_id)) + return logs or [] diff --git a/backend/tests/test_audit_log.py b/backend/tests/test_audit_log.py new file mode 100644 index 00000000..d343c860 --- /dev/null +++ b/backend/tests/test_audit_log.py @@ -0,0 +1,72 @@ +# backend/tests/test_audit_log.py + +from app.store import audit_log +from app.store import store + + +def test_log_action_creates_and_appends_entries(): + review_id = "review_test_1" + + # Make sure we start clean for this key + store.set(f"hitl:log:{review_id}", None) + + # Log first action + audit_log.log_action( + review_id=review_id, + action="TTS_PLAY", + user_id="selvi", + metadata={"variant_id": "A"}, + ) + + # Log second action + audit_log.log_action( + review_id=review_id, + action="TRANSLATE", + user_id="selvi", + metadata={"target_lang": "es"}, + ) + + logs = audit_log.get_logs(review_id) + + assert len(logs) == 2 + + first, second = logs[0], logs[1] + + # First entry + assert first["review_id"] == review_id + assert first["user_id"] == "selvi" + assert first["action"] == "TTS_PLAY" + assert first["metadata"]["variant_id"] == "A" + assert "timestamp" in first + + # Second entry + assert second["review_id"] == review_id + assert second["user_id"] == "selvi" + assert second["action"] == "TRANSLATE" + assert second["metadata"]["target_lang"] == "es" + assert "timestamp" in second + + +def test_get_logs_returns_empty_list_when_no_logs(): + review_id = "review_no_logs" + + # Ensure nothing stored for this key + store.set(f"hitl:log:{review_id}", None) + + logs = audit_log.get_logs(review_id) + + assert isinstance(logs, list) + assert logs == [] + + +def test_log_action_skips_when_review_id_is_none(): + # If review_id is None, we currently no-op. + audit_log.log_action( + review_id=None, + action="TTS_PLAY", + user_id="someone", + metadata={"foo": "bar"}, + ) + + # There is no specific key to check; we just assert that it doesn't crash. + # If you want stricter behavior later, you can change log_action and update this test. From 69b125e5af17a12bf3710d6f8e642961c0157bf0 Mon Sep 17 00:00:00 2001 From: Sushma Gandham Date: Mon, 24 Nov 2025 12:14:41 -0500 Subject: [PATCH 19/23] WIP: local changes --- backend/app/config.py | 9 +++ backend/app/main.py | 8 +- backend/app/routers/debug.py | 113 +++++++++++++++++++++++++++ backend/tests/test_debug_previews.py | 43 ++++++++++ 4 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 backend/app/routers/debug.py create mode 100644 backend/tests/test_debug_previews.py diff --git a/backend/app/config.py b/backend/app/config.py index 2095199f..b05e87dd 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -39,3 +39,12 @@ def get_allowed_origins() -> List[str]: # Redis URL for adapter (optional - set to enable Redis-backed store) REDIS_URL = os.getenv("REDIS_URL") + + +def is_debug_enabled() -> bool: + """Return True when debug routes/features should be enabled. + + Controlled via the `ECHO_DEBUG` environment variable (1/true/yes). + """ + val = os.getenv("ECHO_DEBUG", "").lower() + return val in ("1", "true", "yes") diff --git a/backend/app/main.py b/backend/app/main.py index c066cac3..5a25723f 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,6 +1,6 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from .config import get_allowed_origins +from .config import get_allowed_origins, is_debug_enabled from pydantic import BaseModel from typing import Optional, Dict, Any import uvicorn @@ -8,6 +8,7 @@ from services.logger import get_logger from .routers.health import router as health_router from .routers.orchestrator import router as orchestrator_router +from app.routers.debug import router as debug_router logger = get_logger('orchestrator') app = FastAPI(title='EchoVoice-AI Orchestrator') @@ -28,6 +29,11 @@ app.include_router(health_router) app.include_router(orchestrator_router) +# Mount debug router only when explicitly enabled in environment (dev only) +if is_debug_enabled(): + app.include_router(debug_router) + + if __name__ == '__main__': # Run with Uvicorn for development diff --git a/backend/app/routers/debug.py b/backend/app/routers/debug.py new file mode 100644 index 00000000..22e33749 --- /dev/null +++ b/backend/app/routers/debug.py @@ -0,0 +1,113 @@ +from fastapi import APIRouter, Depends, Query +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + +from app.graph.orchestrator import Orchestrator +from app.routers.orchestrator import get_orchestrator +from services.logger import get_logger + +logger = get_logger("routers.debug") + +router = APIRouter(prefix="/debug", tags=["debug"]) + + +class Preview(BaseModel): + user_id: str + email: str + subject: Optional[str] = None + body: Optional[str] = None + variant_id: Optional[str] = None + blocked: bool = False + error: Optional[str] = None + + +class PreviewsResponse(BaseModel): + previews: List[Preview] + + +PRECOMPUTED_PREVIEWS = [ + { + "user_id": "U001", + "email": "emma@example.com", + "subject": "Hi Emma, quick note about running shoes", + "body": "Hi Emma,\n\nWe thought you might like this: …\n\n— Team", + "variant_id": "A", + "blocked": False, + }, + { + "user_id": "U002", + "email": "liam@example.com", + "subject": "Liam, more on the Acme plan", + "body": "Hello Liam,\n\nDetails: …\nLearn more on our site.", + "variant_id": "B", + "blocked": False, + }, +] + + +@router.get("/deliveries", response_model=PreviewsResponse) +async def get_debug_deliveries( + orchestrator: Orchestrator = Depends(get_orchestrator), + mock: bool = Query(False, description="Return precomputed mock previews without running pipeline"), +) -> Dict[str, Any]: + """ + Debug endpoint (Issue #11). + + Runs the lightweight orchestrator for a couple of mock customers and + returns only the email preview outputs (to, subject, body) suitable + for the UI preview. + """ + + mock_customers = [ + {"id": "U001", "name": "Emma", "email": "emma@example.com"}, + {"id": "U002", "name": "Liam", "email": "liam@example.com"}, + ] + + if mock: + return {"previews": PRECOMPUTED_PREVIEWS} + + previews: List[Dict[str, Any]] = [] + + for c in mock_customers: + preview = { + "user_id": c.get("id"), + "email": c.get("email"), + "subject": None, + "body": None, + "variant_id": None, + "blocked": False, + "error": None, + } + + try: + result = await orchestrator.run_flow("default_personalization", c) + except Exception: + logger.exception("debug preview run_flow failed for %s", c.get("id")) + preview["error"] = "pipeline failed" + previews.append(preview) + continue + + # Extract winner (if any) and safe variants + winner = result.get("analysis", {}).get("winner") if isinstance(result.get("analysis"), dict) else None + safe_variants = result.get("safety", {}).get("safe", []) if isinstance(result.get("safety"), dict) else [] + + variant = None + if winner and isinstance(winner, dict): + variant_id = winner.get("variant_id") + variant = next((v for v in safe_variants if v.get("id") == variant_id), None) + + # Fallback: use first safe variant if no explicit winner + if not variant and safe_variants: + variant = safe_variants[0] + + if variant: + preview["variant_id"] = variant.get("id") + preview["subject"] = variant.get("subject") + preview["body"] = variant.get("body") + else: + preview["blocked"] = True + + previews.append(preview) + + return {"previews": previews} diff --git a/backend/tests/test_debug_previews.py b/backend/tests/test_debug_previews.py new file mode 100644 index 00000000..cdd32bf0 --- /dev/null +++ b/backend/tests/test_debug_previews.py @@ -0,0 +1,43 @@ +import os + +# Enable debug router before importing app so the router is mounted +os.environ.setdefault("ECHO_DEBUG", "1") + +from fastapi.testclient import TestClient +from app.main import app +from app.routers.orchestrator import get_orchestrator + + +class MockOrchestrator: + async def run_flow(self, flow_name, payload): + # deterministic results for tests based on id + if payload.get("id") == "U001": + return { + "analysis": {"winner": {"variant_id": "A"}}, + "safety": {"safe": [{"id": "A", "subject": "S A", "body": "B A"}]}, + } + if payload.get("id") == "U002": + return { + "analysis": {"winner": None}, + "safety": {"safe": [{"id": "B", "subject": "S B", "body": "B B"}]}, + } + return {"analysis": {}, "safety": {"safe": []}} + + +def get_mock_orchestrator(): + return MockOrchestrator() + + +def test_debug_previews(monkeypatch): + # Override the orchestrator dependency with our mock + app.dependency_overrides[get_orchestrator] = lambda: get_mock_orchestrator() + client = TestClient(app) + + r = client.get("/debug/deliveries") + assert r.status_code == 200 + data = r.json() + assert "previews" in data + # check U001 used winner A + assert any(p["user_id"] == "U001" and p["subject"] == "S A" for p in data["previews"]) + # check U002 used fallback to first safe variant + assert any(p["user_id"] == "U002" and p["subject"] == "S B" for p in data["previews"]) From 510de4f35ecdfcda20c06fa959ccb954f6627d30 Mon Sep 17 00:00:00 2001 From: Sushma Gandham Date: Mon, 24 Nov 2025 15:44:40 -0500 Subject: [PATCH 20/23] Issue #11: Add debug endpoint to run pipeline and return email previews for UI - Implement GET /debug/deliveries endpoint that runs orchestrator for mock customers - Return minimal email preview objects (subject, body) suitable for UI preview - Add body_text compatibility alias for frontend flexibility - Add optional TTL caching via ECHO_DEBUG_CACHE_TTL env var for faster UI iteration - Add ?mock=true query param for precomputed previews without pipeline execution - Gate endpoint behind ECHO_DEBUG environment variable (dev-only) - Add comprehensive unit tests (4 tests covering orchestrator run, mock mode, alias, caching) - Add backend README documentation with usage examples - Fix package imports for consistent execution context (relative + package-qualified) --- backend/README.md | 110 +++++++++++++++++++++++++++ backend/app/graph/orchestrator.py | 16 ++-- backend/app/main.py | 4 +- backend/app/routers/debug.py | 54 +++++++++++-- backend/app/routers/orchestrator.py | 6 +- backend/tests/test_debug_previews.py | 65 ++++++++++++++++ 6 files changed, 237 insertions(+), 18 deletions(-) create mode 100644 backend/README.md diff --git a/backend/README.md b/backend/README.md new file mode 100644 index 00000000..6a168f41 --- /dev/null +++ b/backend/README.md @@ -0,0 +1,110 @@ +# Backend - Debug Previews Endpoint + +This short snippet documents the debug endpoint that runs the personalization pipeline and returns only the email preview outputs for UI previewing. + +## Endpoint + +- Path: `GET /debug/deliveries` +- Purpose: Run the orchestrator for a small set of mock customers and return minimal email preview objects (subject and body) that the frontend can use for UI preview. +- Dev-only: This router is mounted only when the `ECHO_DEBUG` environment variable is set to `1`, `true`, or `yes`. + +## Query parameters + +- `mock` (optional, boolean): When `true`, the endpoint returns a precomputed set of lightweight previews without running the pipeline. Useful for fast UI design. + +## Response shape + +The endpoint returns JSON with the top-level key `previews`, an array of preview objects. +Each preview contains: + +- `user_id` (string) +- `email` (string) +- `subject` (string | null) +- `body` (string | null) +- `variant_id` (string | null) +- `blocked` (boolean) — true when no safe variant is available +- `error` (string | null) — set when pipeline execution fails for that user + +Example (mock response): + +```json +{ + "previews": [ + { + "user_id": "U001", + "email": "emma@example.com", + "subject": "Hi Emma, quick note about running shoes", + "body": "Hi Emma,\n\nWe thought you might like this: …\n\n— Team", + "variant_id": "A", + "blocked": false, + "error": null + }, + { + "user_id": "U002", + "email": "liam@example.com", + "subject": "Liam, more on the Acme plan", + "body": "Hello Liam,\n\nDetails: …\nLearn more on our site.", + "variant_id": "B", + "blocked": false, + "error": null + } + ] +} +``` + +Example (live response with a pipeline error for a user): + +```json +{ + "previews": [ + { + "user_id": "U001", + "email": "emma@example.com", + "subject": "S A", + "body": "B A", + "variant_id": "A", + "blocked": false, + "error": null + }, + { + "user_id": "U002", + "email": "liam@example.com", + "subject": null, + "body": null, + "variant_id": null, + "blocked": false, + "error": "pipeline failed" + } + ] +} +``` + +## How to use locally + +1. Enable the debug router and start the server (PowerShell): + +```powershell +$env:ECHO_DEBUG = '1' +E:/EchoAI/EchoVoice-AI/venv/Scripts/python.exe -m uvicorn backend.app.main:app --reload +``` + +2. Hit the endpoint (mock): + +```powershell +curl "http://127.0.0.1:8000/debug/deliveries?mock=true" +``` + +3. Hit the endpoint (run pipeline): + +```powershell +curl "http://127.0.0.1:8000/debug/deliveries" +``` + +## Notes & recommendations + +- This endpoint is intended for development / UI preview only. Keep it disabled in production by not setting `ECHO_DEBUG`. +- Use `mock=true` for fast, deterministic previews while designing UI components. +- If the frontend expects a different key name (for example `body_text`), either adapt the frontend or add the compatibility key in the preview object. +- For heavy real nodes, consider adding caching or a TTL so UI previews are fast and deterministic. + +That's it — let me know if you'd like me to add this snippet into the repo's root `README.md` as well or create a short example component that fetches and renders the previews. \ No newline at end of file diff --git a/backend/app/graph/orchestrator.py b/backend/app/graph/orchestrator.py index 24375458..ef3a68a0 100644 --- a/backend/app/graph/orchestrator.py +++ b/backend/app/graph/orchestrator.py @@ -6,14 +6,14 @@ """ from typing import Any, Dict, Optional -from app.store import MemoryStore, store -from services.logger import get_logger -from app.nodes.segmenter_node import SegmenterNode -from app.nodes.retriever_node import RetrieverNode -from app.nodes.generator_node import GeneratorNode -from app.nodes.safety_node import SafetyNode -from app.nodes.analytics_node import AnalyticsNode -from services.delivery import send_email_mock +from ..store import MemoryStore, store +from backend.services.logger import get_logger +from ..nodes.segmenter_node import SegmenterNode +from ..nodes.retriever_node import RetrieverNode +from ..nodes.generator_node import GeneratorNode +from ..nodes.safety_node import SafetyNode +from ..nodes.analytics_node import AnalyticsNode +from backend.services.delivery import send_email_mock logger = get_logger("graph.orchestrator") diff --git a/backend/app/main.py b/backend/app/main.py index 5a25723f..cd1610b2 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -5,10 +5,10 @@ from typing import Optional, Dict, Any import uvicorn -from services.logger import get_logger +from backend.services.logger import get_logger from .routers.health import router as health_router from .routers.orchestrator import router as orchestrator_router -from app.routers.debug import router as debug_router +from .routers.debug import router as debug_router logger = get_logger('orchestrator') app = FastAPI(title='EchoVoice-AI Orchestrator') diff --git a/backend/app/routers/debug.py b/backend/app/routers/debug.py index 22e33749..224478d2 100644 --- a/backend/app/routers/debug.py +++ b/backend/app/routers/debug.py @@ -1,11 +1,13 @@ from fastapi import APIRouter, Depends, Query from typing import Any, Dict, List, Optional +import os +import time from pydantic import BaseModel -from app.graph.orchestrator import Orchestrator -from app.routers.orchestrator import get_orchestrator -from services.logger import get_logger +from ..graph.orchestrator import Orchestrator +from .orchestrator import get_orchestrator +from backend.services.logger import get_logger logger = get_logger("routers.debug") @@ -17,6 +19,8 @@ class Preview(BaseModel): email: str subject: Optional[str] = None body: Optional[str] = None + # compatibility alias some frontends expect + body_text: Optional[str] = None variant_id: Optional[str] = None blocked: bool = False error: Optional[str] = None @@ -26,12 +30,14 @@ class PreviewsResponse(BaseModel): previews: List[Preview] +# Precomputed mock previews used when ?mock=true PRECOMPUTED_PREVIEWS = [ { "user_id": "U001", "email": "emma@example.com", "subject": "Hi Emma, quick note about running shoes", "body": "Hi Emma,\n\nWe thought you might like this: …\n\n— Team", + "body_text": "Hi Emma,\n\nWe thought you might like this: …\n\n— Team", "variant_id": "A", "blocked": False, }, @@ -40,12 +46,31 @@ class PreviewsResponse(BaseModel): "email": "liam@example.com", "subject": "Liam, more on the Acme plan", "body": "Hello Liam,\n\nDetails: …\nLearn more on our site.", + "body_text": "Hello Liam,\n\nDetails: …\nLearn more on our site.", "variant_id": "B", "blocked": False, }, ] +# Simple process-local TTL cache for debug previews (dev-only). Keyed by mock flag. +# Each entry: { 'value': , 'expires_at': } +_DEBUG_PREVIEWS_CACHE: Dict[str, Dict[str, Any]] = {} + + +def _get_cache_ttl_seconds() -> Optional[int]: + v = os.environ.get("ECHO_DEBUG_CACHE_TTL") + if not v: + return None + try: + t = int(v) + if t <= 0: + return None + return t + except Exception: + return None + + @router.get("/deliveries", response_model=PreviewsResponse) async def get_debug_deliveries( orchestrator: Orchestrator = Depends(get_orchestrator), @@ -64,8 +89,21 @@ async def get_debug_deliveries( {"id": "U002", "name": "Liam", "email": "liam@example.com"}, ] + cache_ttl = _get_cache_ttl_seconds() + cache_key = f"mock={bool(mock)}" + + # Serve cached response when TTL enabled and cache valid + if cache_ttl: + entry = _DEBUG_PREVIEWS_CACHE.get(cache_key) + if entry and entry.get("expires_at", 0) > time.time(): + logger.debug("returning cached debug previews for %s", cache_key) + return entry["value"] + if mock: - return {"previews": PRECOMPUTED_PREVIEWS} + response = {"previews": PRECOMPUTED_PREVIEWS} + if cache_ttl: + _DEBUG_PREVIEWS_CACHE[cache_key] = {"value": response, "expires_at": time.time() + cache_ttl} + return response previews: List[Dict[str, Any]] = [] @@ -75,6 +113,7 @@ async def get_debug_deliveries( "email": c.get("email"), "subject": None, "body": None, + "body_text": None, "variant_id": None, "blocked": False, "error": None, @@ -105,9 +144,14 @@ async def get_debug_deliveries( preview["variant_id"] = variant.get("id") preview["subject"] = variant.get("subject") preview["body"] = variant.get("body") + # compatibility alias for frontends that look for `body_text` + preview["body_text"] = variant.get("body") else: preview["blocked"] = True previews.append(preview) - return {"previews": previews} + response = {"previews": previews} + if cache_ttl: + _DEBUG_PREVIEWS_CACHE[cache_key] = {"value": response, "expires_at": time.time() + cache_ttl} + return response diff --git a/backend/app/routers/orchestrator.py b/backend/app/routers/orchestrator.py index f57ee4f2..6c53c600 100644 --- a/backend/app/routers/orchestrator.py +++ b/backend/app/routers/orchestrator.py @@ -3,9 +3,9 @@ from typing import Optional, Dict, Any from typing import AsyncGenerator -from services.logger import get_logger -from app.store import MemoryStore, store -from app.graph import Orchestrator +from backend.services.logger import get_logger +from ..store import MemoryStore, store +from ..graph import Orchestrator from ..nodes.segmenter_node import SegmenterNode from ..nodes.retriever_node import RetrieverNode from ..nodes.generator_node import GeneratorNode diff --git a/backend/tests/test_debug_previews.py b/backend/tests/test_debug_previews.py index cdd32bf0..589f4ae3 100644 --- a/backend/tests/test_debug_previews.py +++ b/backend/tests/test_debug_previews.py @@ -41,3 +41,68 @@ def test_debug_previews(monkeypatch): assert any(p["user_id"] == "U001" and p["subject"] == "S A" for p in data["previews"]) # check U002 used fallback to first safe variant assert any(p["user_id"] == "U002" and p["subject"] == "S B" for p in data["previews"]) +# cleanup dependency overrides + app.dependency_overrides.clear() + +def test_debug_previews_mock(): + # ensure mock=true returns precomputed previews without depending on orchestration + client = TestClient(app) + r = client.get("/debug/deliveries?mock=true") + assert r.status_code == 200 + data = r.json() + assert "previews" in data + # mock fixtures include U001 and U002 with subjects matching PRECOMPUTED_PREVIEWS + assert any(p["user_id"] == "U001" and "running shoes" in (p.get("subject") or "") for p in data["previews"]) + assert any(p["user_id"] == "U002" and "Acme plan" in (p.get("subject") or "") for p in data["previews"]) + + +def test_body_text_alias(monkeypatch): + """Ensure each preview includes `body_text` equal to `body`.""" + app.dependency_overrides[get_orchestrator] = lambda: get_mock_orchestrator() + client = TestClient(app) + + r = client.get("/debug/deliveries") + assert r.status_code == 200 + data = r.json() + assert "previews" in data + for p in data["previews"]: + # alias should be present and match body (both may be None) + assert "body_text" in p + assert p["body_text"] == p.get("body") + app.dependency_overrides.clear() + +def test_debug_previews_cache(monkeypatch): + """When ECHO_DEBUG_CACHE_TTL is set, subsequent requests within TTL should not re-run orchestrator.""" + # small TTL for test + os.environ["ECHO_DEBUG_CACHE_TTL"] = "60" + + class CounterOrchestrator: + def __init__(self): + self.count = 0 + + async def run_flow(self, flow_name, payload): + self.count += 1 + # return a simple deterministic safe variant + return { + "analysis": {"winner": {"variant_id": "A"}}, + "safety": {"safe": [{"id": "A", "subject": "S A", "body": "B A"}]}, + } + + counter = CounterOrchestrator() + app.dependency_overrides[get_orchestrator] = lambda: counter + client = TestClient(app) + + r1 = client.get("/debug/deliveries") + assert r1.status_code == 200 + # after first request, orchestrator should have been invoked (once per customer) + # our mock increments once per run_flow call; there are two mock customers => count == 2 + assert counter.count == 2 + + # Second request within TTL should use cache and not increment counter + r2 = client.get("/debug/deliveries") + assert r2.status_code == 200 + assert counter.count == 2 + + # cleanup env + os.environ.pop("ECHO_DEBUG_CACHE_TTL", None) + app.dependency_overrides.clear() From 641dfbe14a7ccc8781ca982b533ed3cf54084b46 Mon Sep 17 00:00:00 2001 From: Sushma Gandham Date: Mon, 24 Nov 2025 19:52:53 -0500 Subject: [PATCH 21/23] Issue #10: Add POST /debug/run endpoint for full pipeline debugging - Add POST /debug/run that accepts OrchestrateRequest and returns complete MessageState - Returns all pipeline stages: segment, citations, variants, safety, analysis, delivery - Add test_debug_run_full_pipeline test verifying full result structure - Update backend README with endpoint documentation and usage examples - All tests passing (5/5) --- backend/README.md | 83 +++++++++++++++++++++++----- backend/app/routers/debug.py | 32 ++++++++++- backend/tests/test_debug_previews.py | 51 +++++++++++++++++ 3 files changed, 150 insertions(+), 16 deletions(-) diff --git a/backend/README.md b/backend/README.md index 6a168f41..56cfcfa0 100644 --- a/backend/README.md +++ b/backend/README.md @@ -1,16 +1,18 @@ -# Backend - Debug Previews Endpoint +# Backend - Debug Endpoints -This short snippet documents the debug endpoint that runs the personalization pipeline and returns only the email preview outputs for UI previewing. +This document describes the debug endpoints for development and testing of the personalization pipeline. -## Endpoint +**Dev-only:** These routers are mounted only when the `ECHO_DEBUG` environment variable is set to `1`, `true`, or `yes`. -- Path: `GET /debug/deliveries` -- Purpose: Run the orchestrator for a small set of mock customers and return minimal email preview objects (subject and body) that the frontend can use for UI preview. -- Dev-only: This router is mounted only when the `ECHO_DEBUG` environment variable is set to `1`, `true`, or `yes`. +--- -## Query parameters +## 1. GET /debug/deliveries - Email Previews for UI -- `mock` (optional, boolean): When `true`, the endpoint returns a precomputed set of lightweight previews without running the pipeline. Useful for fast UI design. +**Purpose:** Run the orchestrator for a small set of mock customers and return minimal email preview objects (subject and body) for UI preview. + +### Query parameters + +- `mock` (optional, boolean): When `true`, returns precomputed previews without running the pipeline. ## Response shape @@ -88,23 +90,76 @@ $env:ECHO_DEBUG = '1' E:/EchoAI/EchoVoice-AI/venv/Scripts/python.exe -m uvicorn backend.app.main:app --reload ``` -2. Hit the endpoint (mock): +2. Test GET /debug/deliveries (mock): ```powershell curl "http://127.0.0.1:8000/debug/deliveries?mock=true" ``` -3. Hit the endpoint (run pipeline): +3. Test GET /debug/deliveries (run pipeline): ```powershell curl "http://127.0.0.1:8000/debug/deliveries" ``` +4. Test POST /debug/run (full pipeline debug): + +```powershell +$body = @{customer = @{id = "U001"; name = "Emma"; email = "emma@example.com"}} | ConvertTo-Json +Invoke-RestMethod -Method POST -Uri "http://127.0.0.1:8000/debug/run" -ContentType "application/json" -Body $body +``` + +--- + +## 2. POST /debug/run - Full Pipeline Debug + +**Purpose:** Run the full orchestrator pipeline for a single customer and return the complete MessageState (all intermediate results) for debugging. + +### Request body + +```json +{ + "customer": { + "id": "U001", + "name": "Emma", + "email": "emma@example.com", + "last_event": "viewed_product", + "properties": { + "segment": "high_value" + } + } +} +``` + +### Response + +Returns the full orchestrator result including all pipeline stages: + +```json +{ + "segment": {"category": "high_value", "confidence": 0.95}, + "citations": ["Knowledge article #123", "Brand guideline v2.1"], + "variants": [ + {"id": "V1", "subject": "Hi Emma...", "body": "Dear Emma..."}, + {"id": "V2", "subject": "Emma, don't miss...", "body": "Hello Emma..."} + ], + "safety": { + "safe": [{"id": "V1", "subject": "Hi Emma...", "body": "Dear Emma..."}], + "blocked": [{"id": "V2", "reason": "policy_violation"}] + }, + "analysis": {"winner": {"variant_id": "V1", "score": 0.87}}, + "delivery": {"status": "sent", "message_id": "msg_abc123"} +} +``` + +--- + ## Notes & recommendations -- This endpoint is intended for development / UI preview only. Keep it disabled in production by not setting `ECHO_DEBUG`. -- Use `mock=true` for fast, deterministic previews while designing UI components. -- If the frontend expects a different key name (for example `body_text`), either adapt the frontend or add the compatibility key in the preview object. -- For heavy real nodes, consider adding caching or a TTL so UI previews are fast and deterministic. +- These endpoints are for development and debugging only. Disable in production by not setting `ECHO_DEBUG`. +- **GET /debug/deliveries**: Use `mock=true` for fast UI iteration. Use without `mock` to test actual pipeline. +- **POST /debug/run**: Inspect complete pipeline execution including all intermediate stages. +- The `body_text` field in `/debug/deliveries` is a compatibility alias for `body`. +- Consider using `ECHO_DEBUG_CACHE_TTL` for caching to speed up UI development. That's it — let me know if you'd like me to add this snippet into the repo's root `README.md` as well or create a short example component that fetches and renders the previews. \ No newline at end of file diff --git a/backend/app/routers/debug.py b/backend/app/routers/debug.py index 224478d2..06b9098e 100644 --- a/backend/app/routers/debug.py +++ b/backend/app/routers/debug.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, Depends, Query, HTTPException from typing import Any, Dict, List, Optional import os import time @@ -6,7 +6,7 @@ from pydantic import BaseModel from ..graph.orchestrator import Orchestrator -from .orchestrator import get_orchestrator +from .orchestrator import get_orchestrator, OrchestrateRequest from backend.services.logger import get_logger logger = get_logger("routers.debug") @@ -155,3 +155,31 @@ async def get_debug_deliveries( if cache_ttl: _DEBUG_PREVIEWS_CACHE[cache_key] = {"value": response, "expires_at": time.time() + cache_ttl} return response + + +@router.post("/run") +async def debug_run_pipeline( + payload: OrchestrateRequest, + orchestrator: Orchestrator = Depends(get_orchestrator), +) -> Dict[str, Any]: + """ + Debug endpoint (Issue #10). + + Runs the full orchestrator pipeline for a single customer and returns + the complete MessageState (all intermediate results) for debugging. + + This endpoint returns the full orchestrator result including: + - segment + - citations + - variants + - safety + - analysis + - delivery + """ + customer = payload.customer.model_dump() + if not customer: + raise HTTPException(status_code=400, detail="customer missing") + + logger.info("debug/run for customer %s", customer.get("id") or customer.get("email")) + result = await orchestrator.run_flow("default_personalization", customer) + return result diff --git a/backend/tests/test_debug_previews.py b/backend/tests/test_debug_previews.py index 589f4ae3..2a9025f8 100644 --- a/backend/tests/test_debug_previews.py +++ b/backend/tests/test_debug_previews.py @@ -106,3 +106,54 @@ async def run_flow(self, flow_name, payload): # cleanup env os.environ.pop("ECHO_DEBUG_CACHE_TTL", None) app.dependency_overrides.clear() + + +def test_debug_run_full_pipeline(): + """Test POST /debug/run returns full orchestrator result (MessageState).""" + class FullMockOrchestrator: + async def run_flow(self, flow_name, payload): + # Return complete MessageState structure + return { + "segment": {"category": "high_value"}, + "citations": ["citation1", "citation2"], + "variants": [ + {"id": "V1", "subject": "Subject 1", "body": "Body 1"}, + {"id": "V2", "subject": "Subject 2", "body": "Body 2"}, + ], + "safety": { + "safe": [{"id": "V1", "subject": "Subject 1", "body": "Body 1"}], + "blocked": [{"id": "V2", "reason": "policy violation"}], + }, + "analysis": {"winner": {"variant_id": "V1"}}, + "delivery": {"status": "sent", "message_id": "msg123"}, + } + + app.dependency_overrides[get_orchestrator] = lambda: FullMockOrchestrator() + client = TestClient(app) + + payload = { + "customer": { + "id": "U999", + "name": "Test User", + "email": "test@example.com", + } + } + + r = client.post("/debug/run", json=payload) + assert r.status_code == 200 + data = r.json() + + # Verify all expected keys are present in full MessageState + assert "segment" in data + assert "citations" in data + assert "variants" in data + assert "safety" in data + assert "analysis" in data + assert "delivery" in data + + # Verify some structure + assert data["segment"]["category"] == "high_value" + assert len(data["variants"]) == 2 + assert data["analysis"]["winner"]["variant_id"] == "V1" + + app.dependency_overrides.clear() From daeed2d533b5bde303a04dd74e1cd1e048838b3b Mon Sep 17 00:00:00 2001 From: Arulselvi Amirrthalingam Date: Mon, 24 Nov 2025 21:01:54 -0800 Subject: [PATCH 22/23] Implement Azure STT/TTS with fallback and update env template --- .env.template | 6 + backend/requirements.txt | 1 + backend/services/media.py | 249 +++++++++++++++++++++----------------- 3 files changed, 145 insertions(+), 111 deletions(-) diff --git a/.env.template b/.env.template index 13632724..fb55c93f 100644 --- a/.env.template +++ b/.env.template @@ -22,6 +22,12 @@ AZURE_OPENAI_EMBEDDINGS_DEPLOYMENT = # NEW: chat deployment name for generator AZURE_OPENAI_CHAT_DEPLOYMENT = +#for Azure Speech and Translator +AZURE_SPEECH_KEY=your-speech-key +AZURE_SPEECH_REGION=westus +AZURE_SPEECH_TTS_VOICE=en-US-JennyNeural +AZURE_TRANSLATOR_KEY=your-translator-key + # Optional: non-Azure OpenAI fallback OPENAI_API_KEY = OPENAI_MODEL_NAME = diff --git a/backend/requirements.txt b/backend/requirements.txt index 140a5c28..0fefd17e 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -76,3 +76,4 @@ uvicorn==0.38.0 xxhash==3.6.0 yarl==1.22.0 zstandard==0.25.0 +azure-cognitiveservices-speech diff --git a/backend/services/media.py b/backend/services/media.py index e900f4a8..c0b8d8a1 100644 --- a/backend/services/media.py +++ b/backend/services/media.py @@ -3,162 +3,189 @@ """ Media-related services for EchoVoice AI. -Responsibilities: -- Speech-to-Text (STT) using Azure Speech (planned) -- Text-to-Speech (TTS) using Azure Speech (planned) -- Translation using Azure Translator (implemented via REST) - -These functions are pure service logic. Routers call these, -and they can later be re-used by other flows (e.g., LangGraph) -without going through HTTP again. +Supports: +- Speech-to-Text (STT) using Azure Speech (real when configured, stub fallback) +- Text-to-Speech (TTS) using Azure Speech (real when configured, stub fallback) +- Translation using Azure Translator (real) """ -from typing import Optional +from __future__ import annotations + +import asyncio +import uuid +from pathlib import Path +from typing import Optional, Any import httpx +# Azure SDK (TTS/STT) +import azure.cognitiveservices.speech as speechsdk # type: ignore + from app import config from services.logger import get_logger + logger = get_logger("media") -class MediaConfigError(RuntimeError): - """Raised when required Azure media configuration is missing.""" +TTS_DIR = Path("data/tts") +TTS_DIR.mkdir(exist_ok=True, parents=True) +STT_TMP = Path("data/stt_tmp") +STT_TMP.mkdir(exist_ok=True, parents=True) -def _require_speech_config() -> None: - """Ensure Azure Speech configuration is set before STT/TTS calls.""" - if not config.AZURE_SPEECH_KEY or not config.AZURE_SPEECH_REGION: - raise MediaConfigError( - "Azure Speech config missing. " - "Set AZURE_SPEECH_KEY and AZURE_SPEECH_REGION in your environment." - ) +# --------------------------------------------------------- +# Helper: Check whether Azure Speech is configured +# --------------------------------------------------------- -def _require_translator_config() -> None: - """Ensure Azure Translator configuration is set before translation calls.""" - if not config.AZURE_TRANSLATOR_KEY or not config.AZURE_TRANSLATOR_ENDPOINT: - raise MediaConfigError( - "Azure Translator config missing. " - "Set AZURE_TRANSLATOR_KEY and AZURE_TRANSLATOR_ENDPOINT " - "in your environment." - ) +def _has_speech_config() -> bool: + return bool(config.AZURE_SPEECH_KEY and config.AZURE_SPEECH_REGION) -# --------- Speech to Text --------- # +def _get_speech_config() -> speechsdk.SpeechConfig: + if not _has_speech_config(): + raise RuntimeError("Azure Speech config missing") + + return speechsdk.SpeechConfig( + subscription=config.AZURE_SPEECH_KEY, + region=config.AZURE_SPEECH_REGION, + ) + + +# --------------------------------------------------------- +# SPEECH TO TEXT +# --------------------------------------------------------- async def speech_to_text_from_url(audio_url: str) -> str: """ - Convert speech (from an audio URL) into text. + Speech-to-text: + - If Azure Speech configured → real STT + - Else → stub fallback (local dev) + """ - Current implementation is a stub for local development: - - Validates that Azure Speech config is present. - - Logs the call. - - Returns a placeholder transcript. + # Fallback: no Azure config + if not _has_speech_config(): + logger.info("STT fallback mode — no Azure config set") + return f"[STT stub transcript for {audio_url}]" - Later, you can replace the stub with a real Azure Speech SDK or REST call. - """ - _require_speech_config() - logger.info("Starting speech-to-text for audio_url=%s", audio_url) + # Real Azure path + logger.info("Downloading audio from %s for STT", audio_url) - # TODO: Replace this stub with a real Azure Speech call. - transcript = f"[stub transcript for {audio_url}]" + tmp_path = STT_TMP / f"{uuid.uuid4().hex}.wav" - logger.info("Completed speech-to-text for audio_url=%s", audio_url) - return transcript + async with httpx.AsyncClient() as client: + r = await client.get(audio_url) + r.raise_for_status() + tmp_path.write_bytes(r.content) + text = await asyncio.get_event_loop().run_in_executor( + None, _stt_local_file, tmp_path + ) -# --------- Text to Speech --------- # + try: + tmp_path.unlink(missing_ok=True) + except: + pass -async def text_to_speech_to_url(text: str) -> str: - """ - Convert text into speech and return an audio URL. + return text - Current implementation is a stub: - - Validates Azure Speech config. - - Logs the call. - - Returns a fake URL. - In a real implementation you would: - - Call Azure TTS to generate audio. - - Store the audio in blob storage (or similar). - - Return the storage URL for the UI to play. - """ - _require_speech_config() - logger.info("Starting text-to-speech (text length=%d)", len(text)) +def _stt_local_file(path: Path) -> str: + speech_config = _get_speech_config() + audio_config = speechsdk.AudioConfig(filename=str(path)) + recognizer = speechsdk.SpeechRecognizer( + speech_config=speech_config, + audio_config=audio_config, + ) - # TODO: Replace this stub with a real Azure TTS call and persisted audio. - fake_audio_url = "https://example.com/audio/generated-from-tts.wav" + result = recognizer.recognize_once() - logger.info("Completed text-to-speech, audio_url=%s", fake_audio_url) - return fake_audio_url + if result.reason == speechsdk.ResultReason.RecognizedSpeech: + return result.text + raise RuntimeError(f"Azure STT failed — reason={result.reason}") -# --------- Translation --------- # -async def translate_text(text: str, target_lang: str) -> str: - """ - Translate text using Azure Translator REST API. +# --------------------------------------------------------- +# TEXT TO SPEECH +# --------------------------------------------------------- - Uses: - - AZURE_TRANSLATOR_ENDPOINT (e.g. "https://api.cognitive.microsofttranslator.com") - - AZURE_TRANSLATOR_KEY - - AZURE_TRANSLATOR_REGION (if required by your resource) +async def text_to_speech_to_url(text: str) -> str: """ - _require_translator_config() - logger.info( - "Starting translation to target_lang=%s (text length=%d)", - target_lang, - len(text), + Text-to-speech: + - Real Azure TTS when configured + - Stub fallback when missing + """ + + # Fallback + if not _has_speech_config(): + logger.info("TTS fallback mode — no Azure config set") + fake_url = "data/tts/fake-audio.wav" + return fake_url + + # Real Azure TTS + out_path = await asyncio.get_event_loop().run_in_executor( + None, _tts_local_file, text ) + return str(out_path) - base = config.AZURE_TRANSLATOR_ENDPOINT.rstrip("/") - url = f"{base}/translate" - params = { - "api-version": "3.0", - "to": target_lang, - } +def _tts_local_file(text: str) -> Path: + speech_config = _get_speech_config() + speech_config.speech_synthesis_voice_name = ( + config.AZURE_SPEECH_TTS_VOICE or "en-US-JennyNeural" + ) + + filename = TTS_DIR / f"{uuid.uuid4().hex}.wav" + + audio_cfg = speechsdk.audio.AudioOutputConfig(filename=str(filename)) + synthesizer = speechsdk.SpeechSynthesizer( + speech_config=speech_config, audio_config=audio_cfg + ) + + result = synthesizer.speak_text_async(text).get() + if result.reason != speechsdk.ResultReason.SynthesizingAudioCompleted: + raise RuntimeError(f"Azure TTS failed — reason={result.reason}") + + return filename + + +# --------------------------------------------------------- +# TRANSLATION (REAL ONLY) +# --------------------------------------------------------- + +def _require_translator(): + if not config.AZURE_TRANSLATOR_KEY or not config.AZURE_TRANSLATOR_ENDPOINT: + raise RuntimeError( + "Azure Translator missing. " + "Set AZURE_TRANSLATOR_KEY + AZURE_TRANSLATOR_ENDPOINT" + ) + + +async def translate_text(text: str, target_lang: str) -> str: + _require_translator() + + endpoint = config.AZURE_TRANSLATOR_ENDPOINT.rstrip("/") + url = f"{endpoint}/translate" + + params = {"api-version": "3.0", "to": target_lang} headers = { + "Content-Type": "application/json", "Ocp-Apim-Subscription-Key": config.AZURE_TRANSLATOR_KEY, - # Some Azure setups require the region header as well. "Ocp-Apim-Subscription-Region": config.AZURE_TRANSLATOR_REGION or "", - "Content-Type": "application/json", } body = [{"Text": text}] - try: - async with httpx.AsyncClient(timeout=10) as client: - resp = await client.post(url, params=params, headers=headers, json=body) - resp.raise_for_status() - except httpx.HTTPError as e: - logger.exception("Azure Translator HTTP error") - raise RuntimeError("Azure Translator request failed") from e + logger.info("Calling Azure Translator → %s", target_lang) - try: - data = resp.json() - # Expected response shape: - # [ - # { - # "translations": [ - # { - # "text": "...", - # "to": "xx" - # } - # ] - # } - # ] - first_item = data[0] - translations = first_item.get("translations", []) - if not translations: - raise KeyError("Missing 'translations' in Azure response") - translated_text = translations[0]["text"] - except Exception as e: - logger.exception("Failed to parse Azure Translator response") - raise RuntimeError("Invalid response from Azure Translator") from e - - logger.info("Completed translation to target_lang=%s", target_lang) - return translated_text + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, params=params, headers=headers, json=body) + + resp.raise_for_status() + data = resp.json() + + translated = data[0]["translations"][0]["text"] + return translated From 0982d934c64c116c38c7b3726f9d1b9224b40156 Mon Sep 17 00:00:00 2001 From: Arulselvi Amirrthalingam Date: Mon, 24 Nov 2025 22:26:55 -0800 Subject: [PATCH 23/23] Add HITL review router to expose review data and accept human decisions --- backend/app/main.py | 3 +- backend/app/routers/hitl.py | 71 +++++++++++++++++++ backend/app/store/reviews.py | 112 +++++++++++------------------- backend/tests/test_hitl_router.py | 61 ++++++++++++++++ 4 files changed, 174 insertions(+), 73 deletions(-) create mode 100644 backend/app/routers/hitl.py create mode 100644 backend/tests/test_hitl_router.py diff --git a/backend/app/main.py b/backend/app/main.py index 1bfd81b0..561e7253 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,7 +9,7 @@ from .routers.health import router as health_router from .routers.orchestrator import router as orchestrator_router from .routers.media import router as media_router # added import for media router - +from .routers.hitl import router as hitl_router # added import for hitl router logger = get_logger('orchestrator') app = FastAPI(title='EchoVoice-AI Orchestrator') @@ -29,6 +29,7 @@ app.include_router(health_router) app.include_router(orchestrator_router) app.include_router(media_router) # included media router +app.include_router(hitl_router) # included hitl router if __name__ == '__main__': diff --git a/backend/app/routers/hitl.py b/backend/app/routers/hitl.py new file mode 100644 index 00000000..2ea6eec2 --- /dev/null +++ b/backend/app/routers/hitl.py @@ -0,0 +1,71 @@ +# backend/app/routers/hitl.py + +from typing import Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from app.store import reviews as review_store +from app.store import audit_log + +# This must be named `router` so `from app.routers.hitl import router` works +router = APIRouter( + prefix="/hitl", + tags=["hitl"], +) + + +# ---------- Pydantic models ---------- # + +class HITLDecisionRequest(BaseModel): + """Request body for human decision on a HITL review.""" + approved_variant_id: str + notes: Optional[str] = None + + +# ---------- Endpoints ---------- # + +@router.get("/{review_id}") +async def get_hitl_review(review_id: str): + """ + Fetch a HITL review by ID. + + Returns the full review as stored in the review store. + """ + review = review_store.get_review(review_id) + if review is None: + raise HTTPException(status_code=404, detail="Review not found") + return review + + +@router.post("/{review_id}/decision") +async def submit_hitl_decision(review_id: str, payload: HITLDecisionRequest): + """ + Submit a human decision for a HITL review. + + - Marks the review as 'approved' + - Stores approved_variant_id and optional notes + - Writes an audit log entry + """ + review = review_store.get_review(review_id) + if review is None: + raise HTTPException(status_code=404, detail="Review not found") + + updated = review_store.update_review( + review_id=review_id, + status="approved", + approved_variant_id=payload.approved_variant_id, + notes=payload.notes, + ) + + audit_log.log_action( + review_id=review_id, + user_id=None, # TODO: plug in real reviewer ID once auth is wired + action="DECISION_SUBMIT", + metadata={ + "approved_variant_id": payload.approved_variant_id, + "notes_present": bool(payload.notes), + }, + ) + + return updated diff --git a/backend/app/store/reviews.py b/backend/app/store/reviews.py index 813a25ea..83cf21b7 100644 --- a/backend/app/store/reviews.py +++ b/backend/app/store/reviews.py @@ -1,67 +1,27 @@ # backend/app/store/reviews.py -""" -Helpers for storing and retrieving HITL review objects. - -A "review" represents a Human-in-the-Loop (HITL) review job created by HITLNode. -Reviews are stored in the shared app.store backend (MemoryStore or RedisStore) -under keys of the form: - - hitl:{review_id} - -This module provides a small, typed API so other parts of the system -(HITLNode, HITL router, tests) can work with HITL reviews without -having to know the key format or storage details. -""" - -from __future__ import annotations - from datetime import datetime, timezone -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional -from app.store import store +from app.store.memory_store import MemoryStore # or your actual store class +_STORE_KEY_PREFIX = "hitl:review:" -def _review_key(review_id: str) -> str: - """ - Build the storage key for a given review_id. - - Example: - review_id = "abc123" -> "hitl:abc123" - """ - return f"hitl:{review_id}" +store = MemoryStore() # or whatever you’re already using -def _now_iso() -> str: - """ - Return current time as an ISO 8601 string in UTC. - - Using a string keeps the review object JSON-serializable, which is - important for RedisStore where values are stored as JSON. - """ - return datetime.now(timezone.utc).isoformat() +def _key(review_id: str) -> str: + return f"{_STORE_KEY_PREFIX}{review_id}" def create_review( review_id: str, customer: Dict[str, Any], - variants: List[Dict[str, Any]], + variants: list[Dict[str, Any]], ) -> Dict[str, Any]: - """ - Create and persist a new HITL review object. - - Args: - review_id: Unique identifier for the review. Should be stable and - used to look up this review later via GET /hitl/{review_id}. - customer: Customer payload (dict) from the orchestrator / flow state. - variants: List of safe variants (dicts) produced by the generator/safety - pipeline. Each should include at least an "id" and "text" field. + now = datetime.now(timezone.utc).isoformat() - Returns: - The full review object as stored. - """ - now = _now_iso() - review: Dict[str, Any] = { + review = { "review_id": review_id, "customer": customer, "variants": variants, @@ -72,39 +32,47 @@ def create_review( "updated_at": now, } - store.set(_review_key(review_id), review) + store.set(_key(review_id), review) return review def get_review(review_id: str) -> Optional[Dict[str, Any]]: - """ - Fetch a HITL review by ID. + return store.get(_key(review_id)) + - Args: - review_id: The review identifier used when it was created. +def update_review( + review_id: str, + *, + status: Optional[str] = None, + approved_variant_id: Optional[str] = None, + notes: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + """ + Update an existing HITL review. - Returns: - The review dict if found, or None if not present in the store. + Only fields passed (not None) are updated. Returns the updated review, + or None if the review does not exist. """ - return store.get(_review_key(review_id)) + review = get_review(review_id) + if review is None: + return None + changed = False -def save_review(review: Dict[str, Any]) -> None: - """ - Persist an updated HITL review object. + if status is not None: + review["status"] = status + changed = True - This function: - - Updates the `updated_at` timestamp. - - Writes the review back to the underlying store. + if approved_variant_id is not None: + review["approved_variant_id"] = approved_variant_id + changed = True - It assumes `review["review_id"]` is present. + if notes is not None: + review["notes"] = notes + changed = True - Args: - review: The full review object to save. - """ - review_id = review.get("review_id") - if not review_id: - raise ValueError("review object must contain a 'review_id' field") + if changed: + review["updated_at"] = datetime.now(timezone.utc).isoformat() + store.set(_key(review_id), review) - review["updated_at"] = _now_iso() - store.set(_review_key(review_id), review) + return review diff --git a/backend/tests/test_hitl_router.py b/backend/tests/test_hitl_router.py new file mode 100644 index 00000000..b95710e4 --- /dev/null +++ b/backend/tests/test_hitl_router.py @@ -0,0 +1,61 @@ +# backend/tests/test_hitl_router.py + +from fastapi.testclient import TestClient + +from app.main import app +from app.store import reviews as review_store + +client = TestClient(app) + + +def _create_sample_review(review_id: str = "review_test_1"): + customer = {"id": "cust_123", "email": "test@example.com"} + variants = [ + {"id": "A", "text": "Hello A"}, + {"id": "B", "text": "Hello B"}, + ] + review_store.create_review(review_id, customer, variants) + return review_id + + +def test_get_hitl_review_success(): + review_id = _create_sample_review("review_get_ok") + + resp = client.get(f"/hitl/{review_id}") + assert resp.status_code == 200 + + data = resp.json() + assert data["review_id"] == review_id + assert data["status"] == "pending_human_approval" + assert data["customer"]["id"] == "cust_123" + assert len(data["variants"]) == 2 + + +def test_get_hitl_review_not_found(): + resp = client.get("/hitl/nonexistent_review") + assert resp.status_code == 404 + assert resp.json()["detail"] == "Review not found" + + +def test_submit_hitl_decision_updates_review(): + review_id = _create_sample_review("review_decision_ok") + + payload = { + "approved_variant_id": "A", + "notes": "Looks good to send", + } + + resp = client.post(f"/hitl/{review_id}/decision", json=payload) + assert resp.status_code == 200 + + data = resp.json() + assert data["review_id"] == review_id + assert data["status"] == "approved" + assert data["approved_variant_id"] == "A" + assert data["notes"] == "Looks good to send" + + # Double-check store state + stored = review_store.get_review(review_id) + assert stored["status"] == "approved" + assert stored["approved_variant_id"] == "A" + assert stored["notes"] == "Looks good to send"