diff --git a/examples/structured_output.py b/examples/structured_output.py new file mode 100644 index 0000000..de5842d --- /dev/null +++ b/examples/structured_output.py @@ -0,0 +1,137 @@ +import os +import json +import time +from codevf import CodeVFClient + +# Configuration helper for environment variables +def load_env_file(path: str = ".env") -> None: + """Manually parse a .env file to avoid extra dependencies like python-dotenv.""" + if not os.path.exists(path): + return + with open(path, "r") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" in line: + key, value = line.split("=", 1) + # Only set if not already in environment + if key.strip() not in os.environ: + os.environ[key.strip()] = value.strip().strip("'\"") + +def get_config(key: str, default: str = "") -> str: + """Helper to fetch environment variables with clean values.""" + value = os.environ.get(key, default) + return value.strip("'\"") + +def main(): + # Load .env if it exists in current or parent directory + load_env_file() + if not os.environ.get("CODEVF_API_KEY"): + load_env_file("codevf-sdk-python/.env") + + # 1. Initialize the client + api_key = get_config("CODEVF_API_KEY") + if not api_key: + print("Error: CODEVF_API_KEY environment variable is not set.") + print("Please set it: export CODEVF_API_KEY='your-key-here' (Linux/macOS) or $env:CODEVF_API_KEY='your-key-here' (PowerShell)") + return + + base_url = get_config("CODEVF_BASE_URL", "https://codevf.com/api/v1/") + client = CodeVFClient(api_key=api_key, base_url=base_url) + + # 2. Define a JSON Schema for the structured output you want + schema = { + "type": "object", + "properties": { + "vulnerabilities": { + "type": "array", + "items": { + "type": "object", + "properties": { + "severity": { + "type": "string", + "enum": ["critical", "high", "medium", "low"] + }, + "description": { "type": "string" }, + "location": { "type": "string" }, + "recommendation": { "type": "string" } + }, + "required": ["severity", "description"] + } + }, + "securityScore": { + "type": "number", + "minimum": 0, + "maximum": 100 + } + }, + "required": ["vulnerabilities", "securityScore"] + } + + print("Submitting task with response_schema and attachments...") + + # 3. Create a task with response_schema + # Using realtime_answer mode for instant results + prompt = ( + "Analyze this code for security issues, considering the database configuration in the attachment:\n\n" + "def login(user, pwd):\n" + " query = f\"SELECT * FROM users WHERE user='{user}' AND pwd='{pwd}'\"\n" + " return db.execute(query)" + ) + + # Attachments can provide additional context for the analysis + attachments = [ + { + "fileName": "db_config.py", + "mimeType": "text/x-python", + "content": "DB_HOST = 'localhost'\nDB_USER = 'admin'\nDB_PASS = '123456'\nDB_NAME = 'production_db'" + } + ] + + project_id = int(get_config("CODEVF_PROJECT_ID", "1")) + + try: + task = client.tasks.create( + prompt=prompt, + project_id=project_id, + max_credits=60, + mode="realtime_answer", + response_schema=schema, + attachments=attachments + ) + except Exception as e: + print(f"Failed to create task: {e}") + return + + print(f"Task created: {task.id}, Status: {task.status}") + + # 4. Handle results + # For realtime_answer, the result might be available immediately + if task.status == "completed": + print("\nStructured Result received immediately:") + if isinstance(task.result, dict): + # When response_schema is used, result is returned as a raw dict + print(json.dumps(task.result, indent=2)) + else: + # Fallback for standard result + print(f"Message: {task.result.message}") + else: + # For other modes or if not immediately finished, poll until completed + print("\nWaiting for task completion...") + while task.status not in ["completed", "cancelled"]: + time.sleep(2) + task = client.tasks.retrieve(task.id) + print(f"Status: {task.status}") + + if task.status == "completed": + print("\nResult received:") + if isinstance(task.result, dict): + print(json.dumps(task.result, indent=2)) + else: + print(f"Message: {task.result.message}") + else: + print(f"\nTask ended with status: {task.status}") + +if __name__ == "__main__": + main() diff --git a/src/codevf/client.py b/src/codevf/client.py index 50fe1f3..6466694 100644 --- a/src/codevf/client.py +++ b/src/codevf/client.py @@ -19,6 +19,7 @@ InsufficientCreditsError, InvalidMetadataError, InvalidModeError, + InvalidSchemaError, InvalidTagError, MaxCreditsExceededError, NotFoundError, @@ -43,6 +44,7 @@ "attachment_too_large": AttachmentTooLargeError, "idempotency_conflict": IdempotencyConflictError, "insufficient_credits": InsufficientCreditsError, + "invalid_schema": InvalidSchemaError, "token_expired": AuthenticationError, "rate_limit_exceeded": RateLimitError, } diff --git a/src/codevf/exceptions.py b/src/codevf/exceptions.py index f02ab0e..db06939 100644 --- a/src/codevf/exceptions.py +++ b/src/codevf/exceptions.py @@ -87,6 +87,11 @@ class InsufficientCreditsError(APIError): pass +class InvalidSchemaError(APIError): + """Raised when the provided responseSchema is invalid JSON Schema.""" + pass + + class PayloadTooLargeError(APIError): """Raised when the JSON body exceeds 150KB.""" pass diff --git a/src/codevf/models/task.py b/src/codevf/models/task.py index 9ed4b7c..63c1a64 100644 --- a/src/codevf/models/task.py +++ b/src/codevf/models/task.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from decimal import Decimal, ROUND_UP from enum import Enum -from typing import Any, Dict, Iterable, List, Optional, Sequence +from typing import Any, Dict, Iterable, List, Optional, Sequence, Union from .types import MetadataDict, JSONPrimitive @@ -76,14 +76,30 @@ class TaskResponse: max_credits: int created_at: str credits_used: Optional[int] = None - result: Optional[TaskResult] = None + result: Optional[Union[TaskResult, Dict[str, Any]]] = None + response_schema: Optional[Dict[str, Any]] = None @classmethod def from_payload(cls, payload: Dict[str, Any]) -> "TaskResponse": mode_value = str(payload.get("mode", ServiceMode.STANDARD.value)) mode = ServiceMode.validate(mode_value) result_payload = payload.get("result") - result = TaskResult.from_payload(result_payload) if isinstance(result_payload, dict) else None + response_schema = payload.get("responseSchema") + + result: Optional[Union[TaskResult, Dict[str, Any]]] = None + if isinstance(result_payload, dict): + # If a responseSchema was used, the result is ALWAYS treated as a raw dict + # matching that schema, even if it contains keys like "message". + if response_schema is not None: + result = result_payload + # Otherwise, check if it looks like a standard TaskResult + elif "message" in result_payload and "deliverables" in result_payload: + result = TaskResult.from_payload(result_payload) + else: + # Fallback for structured output if responseSchema wasn't returned in payload + # but the shape doesn't match TaskResult. + result = result_payload + return cls( id=str(payload["id"]), status=str(payload["status"]), @@ -92,6 +108,7 @@ def from_payload(cls, payload: Dict[str, Any]) -> "TaskResponse": created_at=str(payload["createdAt"]), credits_used=payload.get("creditsUsed"), result=result, + response_schema=response_schema, ) @@ -105,6 +122,7 @@ class TaskCreatePayload: tag_id: Optional[int] = None idempotency_key: Optional[str] = None attachments: Optional[List[Dict[str, Any]]] = None + response_schema: Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: payload: Dict[str, Any] = { @@ -122,5 +140,7 @@ def to_dict(self) -> Dict[str, Any]: payload["idempotencyKey"] = self.idempotency_key if self.attachments: payload["attachments"] = self.attachments + if self.response_schema is not None: + payload["responseSchema"] = self.response_schema return payload diff --git a/src/codevf/resources/tasks.py b/src/codevf/resources/tasks.py index 06250ae..6232cea 100644 --- a/src/codevf/resources/tasks.py +++ b/src/codevf/resources/tasks.py @@ -42,6 +42,7 @@ def create( idempotency_key: Optional[str] = None, attachments: Optional[Sequence[Mapping[str, Any]]] = None, tag_id: Optional[int] = None, + response_schema: Optional[Dict[str, Any]] = None, ) -> TaskResponse: """ Submit a new task request. @@ -55,6 +56,7 @@ def create( idempotency_key: Optional UUID v4 to deduplicate submissions. attachments: File attachments (JSON-compatible dicts). tag_id: Expert-level tag ID to control cost multiplier. + response_schema: Optional JSON Schema for structured output. Returns: A `TaskResponse` wrapping the server payload. @@ -84,6 +86,7 @@ def create( tag_id=tag_id, idempotency_key=idempotency_key, attachments=normalized_attachments or None, + response_schema=response_schema, ) response = cast(Dict[str, Any], self._client.post("tasks/create", data=payload.to_dict())) diff --git a/tests/test_structured_output.py b/tests/test_structured_output.py new file mode 100644 index 0000000..fb47a2e --- /dev/null +++ b/tests/test_structured_output.py @@ -0,0 +1,64 @@ +import pytest +from codevf.models.task import TaskResponse, TaskResult, ServiceMode + +def test_parse_standard_result(): + """Verify that standard results (message + deliverables) parse into TaskResult.""" + payload = { + "id": "task_1", + "status": "completed", + "mode": "standard", + "maxCredits": 240, + "createdAt": "2026-01-01T00:00:00Z", + "result": { + "message": "Standard analysis", + "deliverables": [] + } + } + task = TaskResponse.from_payload(payload) + assert isinstance(task.result, TaskResult) + assert task.result.message == "Standard analysis" + assert task.response_schema is None + +def test_parse_structured_result_with_schema_discriminator(): + """ + Verify that if responseSchema is present, the result is returned as a raw dict + even if it contains 'message' and 'deliverables' keys. + """ + schema = {"type": "object", "properties": {"message": {"type": "string"}}} + payload = { + "id": "task_2", + "status": "completed", + "mode": "realtime_answer", + "maxCredits": 60, + "createdAt": "2026-01-01T00:00:00Z", + "responseSchema": schema, + "result": { + "message": "Structured message", + "deliverables": "This is NOT a list, but schema allows it as a string maybe" + } + } + task = TaskResponse.from_payload(payload) + # It MUST be a dict because responseSchema was provided + assert isinstance(task.result, dict) + assert task.result["message"] == "Structured message" + assert task.response_schema == schema + +def test_parse_structured_result_fallback(): + """ + Verify that if responseSchema is NOT present but the shape doesn't match + standard TaskResult, it still returns a raw dict. + """ + payload = { + "id": "task_3", + "status": "completed", + "mode": "realtime_answer", + "maxCredits": 60, + "createdAt": "2026-01-01T00:00:00Z", + "result": { + "score": 95, + "issues": [] + } + } + task = TaskResponse.from_payload(payload) + assert isinstance(task.result, dict) + assert task.result["score"] == 95