Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions backend/enhanced_ai_workflow_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
logger = logging.getLogger(__name__)

import base64
from ai.voice_service import voice_service
from core.voice_service import get_voice_service

router = APIRouter(prefix="/api/v1/ai", tags=["ai_workflows"])

Expand Down Expand Up @@ -107,6 +107,7 @@ class WorkflowExecutionResponse(BaseModel):
ai_generated_tasks: List[str]
confidence_score: float
steps_executed: Optional[List[ReActStepResult]] = None
final_answer: Optional[str] = None
orchestration_type: str = "react_loop"

class NLUProcessingResponse(BaseModel):
Expand Down Expand Up @@ -257,6 +258,7 @@ async def run_loop(self, user_input: str) -> WorkflowExecutionResponse:
ai_generated_tasks=[s.tool_call for s in steps_record],
confidence_score=1.0, # Assumed high if completed
steps_executed=steps_record,
final_answer=final_answer,
orchestration_type="react_loop_deepseek"
)

Expand All @@ -274,6 +276,14 @@ def __init__(self):
from core.byok_endpoints import get_byok_manager
self._byok = get_byok_manager()
self.clients = {}

# Initialize attributes to prevent AttributeError on direct initialize_sessions calls
self.glm_api_key = None
self.anthropic_api_key = None
self.deepseek_api_key = None
self.openai_api_key = None
self.google_api_key = None

logger.info("RealAIWorkflowService (Instructor-enabled) Initialized.")

def get_client(self, provider_id: str):
Expand Down Expand Up @@ -418,7 +428,8 @@ async def process_with_nlu(self, text: str, provider: str = "openai", system_pro
"intent": "processed_by_react",
"workflow_suggestion": {"nodes": []}, # Placeholder
"tasks_generated": agent_resp.ai_generated_tasks,
"confidence": agent_resp.confidence_score
"confidence": agent_resp.confidence_score,
"answer": agent_resp.final_answer # Restore backward compatibility
}
except Exception:
# Fallback to manual logic if ReAct fails
Expand Down Expand Up @@ -520,7 +531,7 @@ async def chat_with_agent(request: ChatRequest):
if request.audio_output:
# Generate audio using VoiceService
# Try efficient provider first
audio_data = await voice_service.text_to_speech(response_text)
audio_data = await get_voice_service().text_to_speech(response_text)

return ChatResponse(
message=response_text,
Expand Down
77 changes: 77 additions & 0 deletions backend/scripts/convert_trace_to_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

import json
import os
import argparse
import sys

# Usage: python convert_trace_to_test.py --trace_id <UUID> --output_dir backend/tests/golden_dataset

def main():
parser = argparse.ArgumentParser(description="Convert an Execution Trace to a Golden Test Case")
parser.add_argument("--trace_id", required=True, help="UUID of the trace (filename without .json)")
parser.add_argument("--trace_dir", default="backend/logs/traces", help="Directory containing traces")
parser.add_argument("--output_dir", default="backend/tests/golden_dataset", help="Directory to save test case")

args = parser.parse_args()

trace_path = os.path.join(args.trace_dir, f"{args.trace_id}.json")
if not os.path.exists(trace_path):
print(f"Error: Trace file not found at {trace_path}")
sys.exit(1)

try:
with open(trace_path, 'r') as f:
trace = json.load(f)

request_data = trace.get('request', {})
result_data = trace.get('result', {})

# Determine Input and Expected Output
input_text = ""
if isinstance(request_data, str):
input_text = request_data
elif isinstance(request_data, dict):
input_text = request_data.get('text', '') or request_data.get('input', '')

expected_answer = ""
if isinstance(result_data, str):
# Try to parse stringified JSON if possible
try:
res = json.loads(result_data)
expected_answer = res.get('answer', '') or res.get('content', '')
except:
expected_answer = result_data
elif isinstance(result_data, dict):
expected_answer = result_data.get('answer', '') or result_data.get('content', '')

if not input_text:
print("Error: Could not extract input text from trace.")
sys.exit(1)

# Create Test Case Data
test_case = {
"id": args.trace_id,
"input": input_text,
"expected_output_fragment": expected_answer[:100], # Store partial for fuzzy match
"full_expected_output": expected_answer,
"trace_path": trace_path
}

# Save as JSON Test Data
if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)

output_path = os.path.join(args.output_dir, f"test_{args.trace_id}.json")
with open(output_path, 'w') as f:
json.dump(test_case, f, indent=2)

print(f"Success! Golden Test Case saved to: {output_path}")
print(f"Input: {input_text}")
print(f"Expected: {expected_answer[:50]}...")

except Exception as e:
print(f"Error processing trace: {e}")
sys.exit(1)

if __name__ == "__main__":
main()
120 changes: 45 additions & 75 deletions backend/tests/chaos/test_broken_tool_loop.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,75 @@

import asyncio
import sys
import os
import json
from unittest.mock import MagicMock, patch, AsyncMock
import traceback

# Fix path
import pathlib
backend_path = pathlib.Path(__file__).resolve().parent.parent.parent
sys.path.append(str(backend_path))
sys.path.append(os.getcwd())

from enhanced_ai_workflow_endpoints import RealAIWorkflowService
# MOCK MODULES
sys.modules['anthropic'] = MagicMock()
sys.modules['google.generativeai'] = MagicMock()
sys.modules['zhipuai'] = MagicMock()
sys.modules['instructor'] = MagicMock()

from enhanced_ai_workflow_endpoints import RealAIWorkflowService, ToolCall, FinalAnswer, AgentStep

async def main():
log_file = "chaos_broken_tool.txt"
with open(log_file, "w") as f:
f.write(">>> [CHAOS] Starting TEST 3: The Broken Tool Loop\n")

service = None
try:
with patch('core.byok_endpoints.get_byok_manager') as mock_byok_get, \
patch('enhanced_ai_workflow_endpoints.RealAIWorkflowService.call_deepseek_api', new_callable=AsyncMock) as mock_deepseek:

# 1. Setup Service
mock_byok_manager = MagicMock()
mock_byok_manager.get_api_key.return_value = "sk-mock-key"
mock_byok_get.return_value = mock_byok_manager

service = RealAIWorkflowService()
await service.initialize_sessions()
service.deepseek_api_key = "sk-mock-deepseek"
service.google_api_key = None
with open(log_file, "w") as f:
f.write(">>> [CHAOS] Starting TEST 3: The Broken Tool Loop\n")
f.write(" [GOAL] Verify system handles repeated tool failures without infinite loop\n")

# 2. Logic: The agent wants to search. The tool FAILS. The agent RETRIES.
# We want to verify it STOPS after N retries.

# Mock LLM: Always asks for search tool if previous result was error?
# Or simplified: The LLM asks for search. We return ERROR.
# The backend loop might auto-retry OR the LLM sees the error and asks AGAIN.
# We need to simulate the LLM asking AGAIN.

# Response 1: "I will search." [Tool: search]
# ... Tool executes -> FAIL ...
# Response 2: "Search failed. I will try again." [Tool: search]
# ... Tool executes -> FAIL ...
# Response 3: "Search failed again. One more time." [Tool: search]
# ... Tool executes -> FAIL ...
# Response 4: "I give up." [Final Answer]
# Mock _execute_tool to FAIL
async def broken_tool(self, tool_call):
with open(log_file, "a") as f:
f.write(f" [CHAOS] Executing Tool: {tool_call.tool_name} -> SIMULATING FAILURE\n")
return "Error: Connection Reset"

# Patch ReActAgent._execute_tool
with patch('enhanced_ai_workflow_endpoints.ReActAgent._execute_tool', new=broken_tool):

mock_llm_tool = {
'content': json.dumps({
"intent": "Search",
"tool_calls": [{"name": "search_web", "arguments": {"query": "python"}}],
"confidence": 0.99
}),
'provider': 'deepseek'
}
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock()

mock_llm_final = {
'content': json.dumps({
"intent": "Answer",
"answer": "I cannot search right now.",
"confidence": 1.0
}),
'provider': 'deepseek'
}
# Scenario: Agent tries to search 3 times, then gives up.

# Side effect: Returns tool call 3 times, then final answer.
# This simulates the LLM trying 3 times.
# If the backend has a HARD LOOP LIMIT (e.g. 5 steps), this should finish.
# If the backend detects "Broken Tool" pattern, it might stop earlier?
# Or we purely rely on step limit.
# Step 1: Try Search
step_1 = AgentStep(action=ToolCall(tool_name="search_web", parameters={"q": "python"}, reasoning="Attempt 1"))
# Step 2: Try Search Again (Logic: LLM sees error)
step_2 = AgentStep(action=ToolCall(tool_name="search_web", parameters={"q": "python"}, reasoning="Attempt 2"))
# Step 3: Try Search Again
step_3 = AgentStep(action=ToolCall(tool_name="search_web", parameters={"q": "python"}, reasoning="Attempt 3"))
# Step 4: Give Up
step_4 = AgentStep(action=FinalAnswer(answer="I cannot search right now.", reasoning="Too many failures."))

mock_deepseek.side_effect = [
mock_llm_tool,
mock_llm_tool,
mock_llm_tool,
mock_llm_tool, # 4th try
mock_llm_final
]
mock_client.chat.completions.create.side_effect = [step_1, step_2, step_3, step_4]

# Mock the Tool to FAIL
async def broken_search(*args, **kwargs):
with open(log_file, "a") as f:
f.write(" [CHAOS] Search Tool Broken! Raising Error.\n")
raise RuntimeError("Simulated Connection Reset")

service._tools["search_web"] = broken_search
service = RealAIWorkflowService()
service.get_client = MagicMock(return_value=mock_client)
service.check_api_key = MagicMock(return_value=True)

# Execute
result = await service.process_with_nlu("Search for python", provider="deepseek")
# Run
result = await service.process_with_nlu("Search python", provider="deepseek")

with open(log_file, "a") as f:
f.write(f" [RESULT] Agent Final Answer: {result.get('answer') or result.get('raw_response')}\n")
f.write("[PASS] Circuit Breaker / Step Limit worked. System did not hang.\n")
f.write(f" [RESULT] Agent Final Answer: {result.get('answer')}\n")
if result.get('answer') == "I cannot search right now.":
f.write("[PASS] Circuit Breaker worked (Agent gave up naturally or Loop Limit hit).\n")
else:
f.write(f"[FAIL] Unexpected result: {result}\n")

except Exception as e:
with open(log_file, "a") as f:
f.write(f"[FAIL] Exception: {e}\n")
import traceback
traceback.print_exc(file=f)
finally:
if service:
await service.cleanup_sessions()

if __name__ == "__main__":
asyncio.run(main())
9 changes: 9 additions & 0 deletions backend/tests/chaos/test_needle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@
from unittest.mock import MagicMock, patch, AsyncMock

# Fix path
import pathlib
backend_path = pathlib.Path(__file__).resolve().parent.parent.parent
sys.path.append(str(backend_path))
sys.path.append(os.getcwd())

# MOCK MODULES
sys.modules['anthropic'] = MagicMock()
sys.modules['google.generativeai'] = MagicMock()
sys.modules['zhipuai'] = MagicMock()
sys.modules['instructor'] = MagicMock()

from enhanced_ai_workflow_endpoints import RealAIWorkflowService

async def main():
Expand Down
Loading
Loading