Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions examples/structured_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import json
import time
from codevf import CodeVFClient

# Configuration helper for environment variables
def load_env_file(path: str = ".env") -> None:
"""Manually parse a .env file to avoid extra dependencies like python-dotenv."""
if not os.path.exists(path):
return
with open(path, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
# Only set if not already in environment
if key.strip() not in os.environ:
os.environ[key.strip()] = value.strip().strip("'\"")

def get_config(key: str, default: str = "") -> str:
"""Helper to fetch environment variables with clean values."""
value = os.environ.get(key, default)
return value.strip("'\"")

def main():
# Load .env if it exists in current or parent directory
load_env_file()
if not os.environ.get("CODEVF_API_KEY"):
load_env_file("codevf-sdk-python/.env")

# 1. Initialize the client
api_key = get_config("CODEVF_API_KEY")
if not api_key:
print("Error: CODEVF_API_KEY environment variable is not set.")
print("Please set it: export CODEVF_API_KEY='your-key-here' (Linux/macOS) or $env:CODEVF_API_KEY='your-key-here' (PowerShell)")
return

base_url = get_config("CODEVF_BASE_URL", "https://codevf.com/api/v1/")
client = CodeVFClient(api_key=api_key, base_url=base_url)

# 2. Define a JSON Schema for the structured output you want
schema = {
"type": "object",
"properties": {
"vulnerabilities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"severity": {
"type": "string",
"enum": ["critical", "high", "medium", "low"]
},
"description": { "type": "string" },
"location": { "type": "string" },
"recommendation": { "type": "string" }
},
"required": ["severity", "description"]
}
},
"securityScore": {
"type": "number",
"minimum": 0,
"maximum": 100
}
},
"required": ["vulnerabilities", "securityScore"]
}

print("Submitting task with response_schema and attachments...")

# 3. Create a task with response_schema
# Using realtime_answer mode for instant results
prompt = (
"Analyze this code for security issues, considering the database configuration in the attachment:\n\n"
"def login(user, pwd):\n"
" query = f\"SELECT * FROM users WHERE user='{user}' AND pwd='{pwd}'\"\n"
" return db.execute(query)"
)

# Attachments can provide additional context for the analysis
attachments = [
{
"fileName": "db_config.py",
"mimeType": "text/x-python",
"content": "DB_HOST = 'localhost'\nDB_USER = 'admin'\nDB_PASS = '123456'\nDB_NAME = 'production_db'"
}
]

project_id = int(get_config("CODEVF_PROJECT_ID", "1"))

try:
task = client.tasks.create(
prompt=prompt,
project_id=project_id,
max_credits=60,
mode="realtime_answer",
response_schema=schema,
attachments=attachments
)
except Exception as e:
print(f"Failed to create task: {e}")
return

print(f"Task created: {task.id}, Status: {task.status}")

# 4. Handle results
# For realtime_answer, the result might be available immediately
if task.status == "completed":
print("\nStructured Result received immediately:")
if isinstance(task.result, dict):
# When response_schema is used, result is returned as a raw dict
print(json.dumps(task.result, indent=2))
else:
# Fallback for standard result
print(f"Message: {task.result.message}")
else:
# For other modes or if not immediately finished, poll until completed
print("\nWaiting for task completion...")
while task.status not in ["completed", "cancelled"]:
time.sleep(2)
task = client.tasks.retrieve(task.id)
print(f"Status: {task.status}")

if task.status == "completed":
print("\nResult received:")
if isinstance(task.result, dict):
print(json.dumps(task.result, indent=2))
else:
print(f"Message: {task.result.message}")
else:
print(f"\nTask ended with status: {task.status}")

if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions src/codevf/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
InsufficientCreditsError,
InvalidMetadataError,
InvalidModeError,
InvalidSchemaError,
InvalidTagError,
MaxCreditsExceededError,
NotFoundError,
Expand All @@ -43,6 +44,7 @@
"attachment_too_large": AttachmentTooLargeError,
"idempotency_conflict": IdempotencyConflictError,
"insufficient_credits": InsufficientCreditsError,
"invalid_schema": InvalidSchemaError,
"token_expired": AuthenticationError,
"rate_limit_exceeded": RateLimitError,
}
Expand Down
5 changes: 5 additions & 0 deletions src/codevf/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ class InsufficientCreditsError(APIError):
pass


class InvalidSchemaError(APIError):
"""Raised when the provided responseSchema is invalid JSON Schema."""
pass


class PayloadTooLargeError(APIError):
"""Raised when the JSON body exceeds 150KB."""
pass
26 changes: 23 additions & 3 deletions src/codevf/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass
from decimal import Decimal, ROUND_UP
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Sequence
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union

from .types import MetadataDict, JSONPrimitive

Expand Down Expand Up @@ -76,14 +76,30 @@ class TaskResponse:
max_credits: int
created_at: str
credits_used: Optional[int] = None
result: Optional[TaskResult] = None
result: Optional[Union[TaskResult, Dict[str, Any]]] = None
response_schema: Optional[Dict[str, Any]] = None

@classmethod
def from_payload(cls, payload: Dict[str, Any]) -> "TaskResponse":
mode_value = str(payload.get("mode", ServiceMode.STANDARD.value))
mode = ServiceMode.validate(mode_value)
result_payload = payload.get("result")
result = TaskResult.from_payload(result_payload) if isinstance(result_payload, dict) else None
response_schema = payload.get("responseSchema")

result: Optional[Union[TaskResult, Dict[str, Any]]] = None
if isinstance(result_payload, dict):
# If a responseSchema was used, the result is ALWAYS treated as a raw dict
# matching that schema, even if it contains keys like "message".
if response_schema is not None:
result = result_payload
# Otherwise, check if it looks like a standard TaskResult
elif "message" in result_payload and "deliverables" in result_payload:
result = TaskResult.from_payload(result_payload)
else:
# Fallback for structured output if responseSchema wasn't returned in payload
# but the shape doesn't match TaskResult.
result = result_payload

return cls(
id=str(payload["id"]),
status=str(payload["status"]),
Expand All @@ -92,6 +108,7 @@ def from_payload(cls, payload: Dict[str, Any]) -> "TaskResponse":
created_at=str(payload["createdAt"]),
credits_used=payload.get("creditsUsed"),
result=result,
response_schema=response_schema,
)


Expand All @@ -105,6 +122,7 @@ class TaskCreatePayload:
tag_id: Optional[int] = None
idempotency_key: Optional[str] = None
attachments: Optional[List[Dict[str, Any]]] = None
response_schema: Optional[Dict[str, Any]] = None

def to_dict(self) -> Dict[str, Any]:
payload: Dict[str, Any] = {
Expand All @@ -122,5 +140,7 @@ def to_dict(self) -> Dict[str, Any]:
payload["idempotencyKey"] = self.idempotency_key
if self.attachments:
payload["attachments"] = self.attachments
if self.response_schema is not None:
payload["responseSchema"] = self.response_schema

return payload
3 changes: 3 additions & 0 deletions src/codevf/resources/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def create(
idempotency_key: Optional[str] = None,
attachments: Optional[Sequence[Mapping[str, Any]]] = None,
tag_id: Optional[int] = None,
response_schema: Optional[Dict[str, Any]] = None,
) -> TaskResponse:
"""
Submit a new task request.
Expand All @@ -55,6 +56,7 @@ def create(
idempotency_key: Optional UUID v4 to deduplicate submissions.
attachments: File attachments (JSON-compatible dicts).
tag_id: Expert-level tag ID to control cost multiplier.
response_schema: Optional JSON Schema for structured output.

Returns:
A `TaskResponse` wrapping the server payload.
Expand Down Expand Up @@ -84,6 +86,7 @@ def create(
tag_id=tag_id,
idempotency_key=idempotency_key,
attachments=normalized_attachments or None,
response_schema=response_schema,
)

response = cast(Dict[str, Any], self._client.post("tasks/create", data=payload.to_dict()))
Expand Down
64 changes: 64 additions & 0 deletions tests/test_structured_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pytest
from codevf.models.task import TaskResponse, TaskResult, ServiceMode

def test_parse_standard_result():
"""Verify that standard results (message + deliverables) parse into TaskResult."""
payload = {
"id": "task_1",
"status": "completed",
"mode": "standard",
"maxCredits": 240,
"createdAt": "2026-01-01T00:00:00Z",
"result": {
"message": "Standard analysis",
"deliverables": []
}
}
task = TaskResponse.from_payload(payload)
assert isinstance(task.result, TaskResult)
assert task.result.message == "Standard analysis"
assert task.response_schema is None

def test_parse_structured_result_with_schema_discriminator():
"""
Verify that if responseSchema is present, the result is returned as a raw dict
even if it contains 'message' and 'deliverables' keys.
"""
schema = {"type": "object", "properties": {"message": {"type": "string"}}}
payload = {
"id": "task_2",
"status": "completed",
"mode": "realtime_answer",
"maxCredits": 60,
"createdAt": "2026-01-01T00:00:00Z",
"responseSchema": schema,
"result": {
"message": "Structured message",
"deliverables": "This is NOT a list, but schema allows it as a string maybe"
}
}
task = TaskResponse.from_payload(payload)
# It MUST be a dict because responseSchema was provided
assert isinstance(task.result, dict)
assert task.result["message"] == "Structured message"
assert task.response_schema == schema

def test_parse_structured_result_fallback():
"""
Verify that if responseSchema is NOT present but the shape doesn't match
standard TaskResult, it still returns a raw dict.
"""
payload = {
"id": "task_3",
"status": "completed",
"mode": "realtime_answer",
"maxCredits": 60,
"createdAt": "2026-01-01T00:00:00Z",
"result": {
"score": 95,
"issues": []
}
}
task = TaskResponse.from_payload(payload)
assert isinstance(task.result, dict)
assert task.result["score"] == 95