diff --git a/lexai-functions/.env.example b/lexai-functions/.env.example new file mode 100644 index 0000000..93e7e0a --- /dev/null +++ b/lexai-functions/.env.example @@ -0,0 +1,23 @@ +# Copy to `.env` for local runs and for `firebase deploy` (Firebase loads this folder’s `.env` +# when resolving params during deploy). + +# --- Google Cloud Secret Manager (production runtime) --- +# Create or rotate secrets (you will be prompted for the value; paste once, it is not echoed): +# cd .. # repo root (where firebase.json lives) +# firebase functions:secrets:set OPENAI_API_KEY +# firebase functions:secrets:set PINECONE_API_KEY +# firebase functions:secrets:set RUNPOD_API_KEY +# +# First deploy after adding secrets: if the CLI asks, allow it to grant the function’s +# service account access to read these secrets. +# +# For local development only, you may also put the same keys in this file so imports and +# the emulator match production variable names: +OPENAI_API_KEY= +PINECONE_API_KEY= +RUNPOD_API_KEY= + +# --- Plain environment / deploy params (not Secret Manager) --- +# Supplied via this `.env` on deploy, or via interactive prompts if unset. +RUNPOD_ENDPOINT_ID= +OPENAI_TRANSLATION_MODEL=gpt-5.1 diff --git a/lexai-functions/.gitignore b/lexai-functions/.gitignore index 1609bab..4d8ee00 100644 --- a/lexai-functions/.gitignore +++ b/lexai-functions/.gitignore @@ -4,3 +4,4 @@ __pycache__/ # Python virtual environment venv/ *.local +.env diff --git a/lexai-functions/main.py b/lexai-functions/main.py index 1b64714..bc2ad84 100644 --- a/lexai-functions/main.py +++ b/lexai-functions/main.py @@ -1,29 +1,74 @@ -import json +"""Firebase callable entrypoint for LexAI chat. + +Pipeline: optional translation of user + history into English, embed the English prompt, +retrieve Michigan legislation chunks from Pinecone, call the English-only RunPod legal +model, then translate the assistant reply back to the client's UI language when needed. + +Pinecone is initialized lazily so deploy-time import/discovery does not require +``PINECONE_API_KEY`` until a request actually runs. +""" + import os import time +from typing import Any + import requests from firebase_functions import https_fn +from firebase_functions.params import SecretParam, StringParam from firebase_admin import initialize_app from pinecone import Pinecone from dotenv import load_dotenv +from openai_translate import ( + normalize_to_english, + require_openai_if_translating, + translate_english_to_ui_language, +) + load_dotenv() -# --- Config --- -PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") -RUNPOD_API_KEY = os.getenv("RUNPOD_API_KEY") -RUNPOD_ENDPOINT_ID = os.getenv("RUNPOD_ENDPOINT_ID") +# --- Secret Manager (bind in @https_fn.on_call); runtime values appear in os.environ --- +OPENAI_API_KEY = SecretParam("OPENAI_API_KEY") +PINECONE_API_KEY = SecretParam("PINECONE_API_KEY") +RUNPOD_API_KEY = SecretParam("RUNPOD_API_KEY") + +# --- Deploy-time / runtime params (non-secret); also read from lexai-functions/.env on deploy --- +RUNPOD_ENDPOINT_ID = StringParam( + "RUNPOD_ENDPOINT_ID", + label="RunPod endpoint ID", + description="Serverless endpoint ID from the RunPod console (v2 API path segment).", +) +OPENAI_TRANSLATION_MODEL = StringParam( + "OPENAI_TRANSLATION_MODEL", + default="gpt-5.1", + label="OpenAI translation model", + description="Model id for translating non-English UI around RunPod.", +) + INDEX_NAME = "michigan-legislation" EMBED_MODEL = "multilingual-e5-large" initialize_app() -# Initialize Pinecone -pc = Pinecone(api_key=PINECONE_API_KEY) -index = pc.Index(INDEX_NAME) +_PINECONE_CLIENT: Pinecone | None = None +_PINECONE_INDEX: Any = None + + +def _get_pinecone() -> tuple[Pinecone, Any]: + """Lazy Pinecone client so deploy-time discovery does not require API keys at import.""" + global _PINECONE_CLIENT, _PINECONE_INDEX + if _PINECONE_CLIENT is not None and _PINECONE_INDEX is not None: + return _PINECONE_CLIENT, _PINECONE_INDEX + api_key = (os.environ.get("PINECONE_API_KEY") or "").strip() + if not api_key: + raise RuntimeError("PINECONE_API_KEY is not set") + _PINECONE_CLIENT = Pinecone(api_key=api_key) + _PINECONE_INDEX = _PINECONE_CLIENT.Index(INDEX_NAME) + return _PINECONE_CLIENT, _PINECONE_INDEX def embed_query(query_text: str) -> list: + pc, _ = _get_pinecone() embeddings_response = pc.inference.embed( model=EMBED_MODEL, inputs=[query_text], @@ -33,6 +78,7 @@ def embed_query(query_text: str) -> list: def query_pinecone(query_embedding: list, top_k: int = 5) -> list: + _, index = _get_pinecone() results = index.query( vector=query_embedding, top_k=top_k, @@ -42,10 +88,18 @@ def query_pinecone(query_embedding: list, top_k: int = 5) -> list: def call_runpod(messages: list) -> str: + """POST to RunPod serverless ``/run``, then poll ``/status`` until COMPLETED or timeout.""" + endpoint_id = (os.environ.get("RUNPOD_ENDPOINT_ID") or "").strip() + runpod_key = (os.environ.get("RUNPOD_API_KEY") or "").strip() + if not endpoint_id: + raise RuntimeError("RUNPOD_ENDPOINT_ID is not set") + if not runpod_key: + raise RuntimeError("RUNPOD_API_KEY is not set") + run_response = requests.post( - f"https://api.runpod.ai/v2/{RUNPOD_ENDPOINT_ID}/run", + f"https://api.runpod.ai/v2/{endpoint_id}/run", headers={ - "Authorization": f"Bearer {RUNPOD_API_KEY}", + "Authorization": f"Bearer {runpod_key}", "Content-Type": "application/json", }, json={ @@ -66,8 +120,8 @@ def call_runpod(messages: list) -> str: # Poll for completion, max 5 minutes for _ in range(150): status_response = requests.get( - f"https://api.runpod.ai/v2/{RUNPOD_ENDPOINT_ID}/status/{job_id}", - headers={"Authorization": f"Bearer {RUNPOD_API_KEY}"}, + f"https://api.runpod.ai/v2/{endpoint_id}/status/{job_id}", + headers={"Authorization": f"Bearer {runpod_key}"}, ) status_response.raise_for_status() result = status_response.json() @@ -96,8 +150,16 @@ def call_runpod(messages: list) -> str: return "Error: Request timed out." -@https_fn.on_call(enforce_app_check=False, timeout_sec=300) +@https_fn.on_call( + enforce_app_check=False, + timeout_sec=300, + secrets=[OPENAI_API_KEY, PINECONE_API_KEY, RUNPOD_API_KEY], +) def chat(req: https_fn.CallableRequest) -> dict: + """HTTPS callable: ``req.data`` may include ``prompt``, ``chat_history``, ``language`` (default ``en``). + + Returns ``{"response": str}`` on success or ``{"error": str}`` on validation/runtime failure. + """ try: prompt = req.data.get("prompt", "") chat_history = req.data.get("chat_history", []) @@ -106,33 +168,33 @@ def chat(req: https_fn.CallableRequest) -> dict: if not prompt: return {"error": "No prompt provided"} + require_openai_if_translating(language) - query_embedding = embed_query(prompt) + history_en, prompt_en = normalize_to_english(chat_history, prompt, language) + query_embedding = embed_query(prompt_en) relevant_chunks = query_pinecone(query_embedding, top_k=5) - context = "\n\n---\n\n".join(relevant_chunks) - system_message = { "role": "system", "content": ( - f"You are LexAI, a legal assistant specializing in Michigan legislation. " - f"Answer the user's question based on the following legislation excerpts. " - f"If the excerpts don't contain relevant information, say so honestly. " - f"Respond in {language}.\n\n" + "You are LexAI, a legal assistant specializing in Michigan legislation. " + "Answer the user's question based on the following legislation excerpts. " + "If the excerpts don't contain relevant information, say so honestly. " + "Write your entire answer in clear English.\n\n" f"RELEVANT LEGISLATION:\n{context}" ), } - messages = [system_message] + chat_history + [{"role": "user", "content": prompt}] - + messages = [system_message] + history_en + [{"role": "user", "content": prompt_en}] - response_text = call_runpod(messages) + response_en = call_runpod(messages) + response_text = translate_english_to_ui_language(response_en, language) return {"response": response_text} except Exception as e: - return {"error": str(e)} \ No newline at end of file + return {"error": str(e)} diff --git a/lexai-functions/openai_translate.py b/lexai-functions/openai_translate.py new file mode 100644 index 0000000..b7f86d8 --- /dev/null +++ b/lexai-functions/openai_translate.py @@ -0,0 +1,167 @@ +"""OpenAI helpers wrapping a non-English UI around an English-only RunPod model. + +``normalize_to_english`` batches prior turns plus the current user message into one +structured JSON translation call (stable indices, translation-only system prompt). +``translate_english_to_ui_language`` maps the assistant's English reply back to the +client's display language. English UI skips both calls. + +Model id: ``OPENAI_TRANSLATION_MODEL`` (default ``gpt-5.1``). ``_chat_complete`` tries +``max_completion_tokens`` first, then falls back to ``max_tokens`` for older SDK shapes. +""" + +from __future__ import annotations + +import json +import os +from typing import Any + +from openai import OpenAI + +# System prompts: batch in (_BATCH_SYSTEM) vs single assistant string out (_OUT_SYSTEM_TEMPLATE). +_BATCH_SYSTEM = """You translate legal app chat fragments to English for a downstream English-only legal model. + +Rules: +- Output ONLY one JSON object. No markdown code fences. No commentary before or after JSON. +- Exact shape: {"items":[{"i":,"t":""}]} +- The same integer keys `i`, same count, and same order as in the input. +- Preserve statute identifiers, MCL / Michigan references, section numbers, docket-style numbers, and party names. +- Do not add legal analysis or advice — translation only. +""" + +_OUT_SYSTEM_TEMPLATE = """Translate the assistant message from English into {target_language}. + +Rules: +- Preserve citations, statute and section numbers, lists, and paragraph breaks where possible. +- Output only the translated text — no preamble, no quotes, no markdown wrapper.""" + + +def is_ui_english(language: str | None) -> bool: + """True when the client is using English; skips translation in/out.""" + s = (language or "").strip().lower() + return s in ("english", "en", "") + + +def _client() -> OpenAI: + """Configured OpenAI client; requires ``OPENAI_API_KEY`` in the environment.""" + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + raise RuntimeError("OPENAI_API_KEY is not set") + return OpenAI(api_key=api_key) + + +def translation_model() -> str: + """Model id for translation calls (not the RunPod legal model).""" + return os.environ.get("OPENAI_TRANSLATION_MODEL", "gpt-5.1").strip() + + +def require_openai_if_translating(ui_language: str) -> None: + """Fail fast before RAG/RunPod when non-English UI is selected but no API key is bound.""" + if is_ui_english(ui_language): + return + if not os.environ.get("OPENAI_API_KEY"): + raise RuntimeError("OPENAI_API_KEY is required when the UI language is not English") + + +def _chat_complete( + client: OpenAI, + *, + messages: list[dict[str, str]], + json_object: bool, +) -> str: + """One chat.completions call; ``json_object=True`` for batch translate payloads.""" + model = translation_model() + kwargs: dict[str, Any] = { + "model": model, + "messages": messages, + "temperature": 0.2, + } + if json_object: + kwargs["response_format"] = {"type": "json_object"} + try: + # Newer OpenAI Python SDK uses max_completion_tokens. + resp = client.chat.completions.create(**kwargs, max_completion_tokens=8192) + except TypeError: + resp = client.chat.completions.create(**kwargs, max_tokens=8192) + content = resp.choices[0].message.content + return (content or "").strip() + + +def normalize_to_english( + chat_history: list[Any], + current_prompt: str, + ui_language: str, +) -> tuple[list[dict[str, str]], str]: + """Return ``(history_en, prompt_en)`` for RunPod. + + Non-English: one batched JSON object over all segments so roles stay aligned with + translated text. Validates that every input index ``i`` is returned exactly once. + """ + history: list[dict[str, str]] = [] + for m in chat_history: + if not isinstance(m, dict): + continue + role = str(m.get("role", "user")) + content = str(m.get("content", "")) + history.append({"role": role, "content": content}) + + prompt = str(current_prompt) + + if is_ui_english(ui_language): + return history, prompt + + client = _client() + payloads: list[dict[str, Any]] = [] + idx = 0 + for m in history: + payloads.append({"i": idx, "role": m["role"], "t": m["content"]}) + idx += 1 + payloads.append({"i": idx, "role": "user", "t": prompt}) + last_i = idx + + user_payload = json.dumps({"items": payloads}, ensure_ascii=False) + raw = _chat_complete( + client, + messages=[ + {"role": "system", "content": _BATCH_SYSTEM}, + {"role": "user", "content": user_payload}, + ], + json_object=True, + ) + data = json.loads(raw or "{}") + items = data.get("items") + if not isinstance(items, list) or len(items) != len(payloads): + raise RuntimeError("OpenAI translation returned invalid items length") + + by_i: dict[int, str] = {} + for x in items: + if not isinstance(x, dict) or "i" not in x: + continue + by_i[int(x["i"])] = str(x.get("t", "")) + + if len(by_i) != len(payloads): + raise RuntimeError("OpenAI translation missing segment keys") + + history_en: list[dict[str, str]] = [] + for p in payloads: + if p["i"] == last_i: + break + history_en.append({"role": str(p["role"]), "content": by_i[p["i"]]}) + prompt_en = by_i[last_i] + return history_en, prompt_en + + +def translate_english_to_ui_language(text: str, ui_language: str) -> str: + """Map RunPod's English assistant string to the UI language; no-op for English.""" + if is_ui_english(ui_language): + return text + client = _client() + target = (ui_language or "English").strip() + system = _OUT_SYSTEM_TEMPLATE.format(target_language=target) + return _chat_complete( + client, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": text}, + ], + json_object=False, + ) diff --git a/lexai-functions/requirements.txt b/lexai-functions/requirements.txt index 18df91f..ef27de4 100644 --- a/lexai-functions/requirements.txt +++ b/lexai-functions/requirements.txt @@ -2,4 +2,5 @@ firebase-functions>=0.1.0 firebase-admin>=6.0.0 requests>=2.31.0 python-dotenv -pinecone>=5.0.0 \ No newline at end of file +pinecone>=5.0.0 +openai>=1.40.0 \ No newline at end of file