Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
407 changes: 389 additions & 18 deletions backend/advanced_workflow_orchestrator.py

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions backend/api/time_travel_routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from typing import Dict, Any, Optional
import logging

from advanced_workflow_orchestrator import orchestrator

router = APIRouter(prefix="/api/time-travel", tags=["time_travel"])
logger = logging.getLogger(__name__)

# Single instance/factory pattern should be used in real app
# For now, we assume one orchestrator exists or is created per request (which matches current tests but not prod)
# TO-DO: Inject the singleton orchestrator from main_api_app

class ForkRequest(BaseModel):
step_id: str
new_variables: Optional[Dict[str, Any]] = None

@router.post("/workflows/{execution_id}/fork")
async def fork_workflow(execution_id: str, request: ForkRequest):
"""
[Lesson 3] Fork a workflow execution from a specific step.
Creates a 'Parallel Universe' with modified variables.
"""
logger.info(f"⏳ Time-Travel Request: Forking {execution_id} at {request.step_id}")


# Use the shared singleton instance
orch = orchestrator

new_execution_id = await orch.fork_execution(
original_execution_id=execution_id,
step_id=request.step_id,
new_variables=request.new_variables
)

if not new_execution_id:
raise HTTPException(status_code=404, detail="Snapshot not found or fork failed")

return {
"status": "success",
"original_execution_id": execution_id,
"new_execution_id": new_execution_id,
"message": "Welcome to the Multiverse. 🌌"
}
21 changes: 21 additions & 0 deletions backend/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,27 @@ class UserConnection(Base):
user = relationship("User", backref="connections")
workspace = relationship("Workspace", backref="connections")

class WorkflowSnapshot(Base):
"""
Time-Travel Debugging: Immutable snapshot of execution state at a specific step.
This acts as a 'Save Point' allowing users to fork/replay from this exact moment.
"""
__tablename__ = "workflow_snapshots"

id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
execution_id = Column(String, ForeignKey("workflow_executions.execution_id"), nullable=False, index=True)
step_id = Column(String, nullable=False) # The step that just finished/is current
step_order = Column(Integer, nullable=False) # Sequence number (0, 1, 2...)

# State Capture
context_snapshot = Column(Text, nullable=False) # Full JSON dump of WorkflowContext (vars, results)

# Metadata
status = Column(String, nullable=False) # Status at this snapshot (e.g. COMPLETED, FAILED)
created_at = Column(DateTime(timezone=True), server_default=func.now())

# Relationships
execution = relationship("WorkflowExecution", backref="snapshots")
class IngestedDocument(Base):
"""Record of an ingested document from a service like Google Drive"""
__tablename__ = "ingested_documents"
Expand Down
208 changes: 176 additions & 32 deletions backend/core/workflow_ui_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,33 +303,80 @@ async def get_executions():

executions = []
# Convert Orchestrator contexts to UI Execution models
for context in orchestrator.active_contexts.values():
status_map = {
WorkflowStatus.PENDING: "pending",
WorkflowStatus.RUNNING: "running",
WorkflowStatus.COMPLETED: "completed",
WorkflowStatus.FAILED: "failed",
WorkflowStatus.CANCELLED: "cancelled"
}

# Calculate metrics
total_steps = 4 # Default estimate
current_step = 0
if context.results:
current_step = len(context.results)

executions.append(WorkflowExecution(
execution_id=context.execution_id,
workflow_id=context.input_data.get("_ui_workflow_id", context.workflow_id), # Prefer UI ID if stored
status=status_map.get(context.status, "unknown"),
start_time=context.started_at.isoformat() if context.started_at else datetime.now().isoformat(),
end_time=context.completed_at.isoformat() if context.completed_at else None,
current_step=current_step,
total_steps=total_steps,
trigger_data=context.input_data,
results=context.results,
errors=[context.error_message] if context.error_message else []
))

# Use list() to avoid RuntimeError if dict changes size during iteration
for context in list(orchestrator.active_contexts.values()):
try:
# Handle potential dict vs object (migration safety)
c_id = getattr(context, 'workflow_id', None)
if not c_id and isinstance(context, dict):
c_id = context.get('workflow_id')

c_input = getattr(context, 'input_data', {})
if not c_input and isinstance(context, dict):
c_input = context.get('input_data', {})

c_status = getattr(context, 'status', 'pending')
if isinstance(context, dict):
c_status = context.get('status', 'pending')

status_str = "unknown"
if hasattr(c_status, 'value'):
status_str = c_status.value
else:
status_str = str(c_status)

# Safe Date Handling
c_started = getattr(context, 'started_at', None)
if isinstance(context, dict):
c_started = context.get('started_at')

start_time_str = datetime.now().isoformat()
if isinstance(c_started, datetime):
start_time_str = c_started.isoformat()
elif isinstance(c_started, str):
start_time_str = c_started

c_ended = getattr(context, 'completed_at', None)
if isinstance(context, dict):
c_ended = context.get('completed_at')

end_time_str = None
if isinstance(c_ended, datetime):
end_time_str = c_ended.isoformat()
elif isinstance(c_ended, str):
end_time_str = c_ended

c_results = getattr(context, 'results', {})
if isinstance(context, dict):
c_results = context.get('results', {})

c_error = getattr(context, 'error_message', None)
if isinstance(context, dict):
c_error = context.get('error_message')

# Calculate metrics
current_step = len(c_results) if c_results else 0

executions.append(WorkflowExecution(
execution_id=str(c_id),
workflow_id=c_input.get("_ui_workflow_id", str(c_id)), # Prefer UI ID if stored
status=status_str,
start_time=start_time_str,
end_time=end_time_str,
current_step=current_step,
total_steps=4,
trigger_data=c_input,
results=c_results,
errors=[str(c_error)] if c_error else []
))
except Exception as e:
# Log but don't crash the whole list
import traceback
print(f"Error parsing execution context: {e}")
# traceback.print_exc()
continue


# Sort by start time (newest first)
executions.sort(key=lambda x: x.start_time, reverse=True)
Expand All @@ -338,10 +385,14 @@ async def get_executions():
except ImportError:
# Fallback if orchestrator not available/path issue
return {"success": True, "executions": [e.dict() for e in MOCK_EXECUTIONS]}
except Exception as e:
import traceback
traceback.print_exc()
return {"success": False, "error": str(e), "executions": []}

@router.post("/execute")
async def execute_workflow(payload: Dict[str, Any], background_tasks: BackgroundTasks):
from advanced_workflow_orchestrator import orchestrator
from advanced_workflow_orchestrator import orchestrator, WorkflowContext, WorkflowStatus, WorkflowDefinition, WorkflowStep, WorkflowStepType

workflow_id = payload.get("workflow_id")
input_data = payload.get("input", {})
Expand All @@ -359,18 +410,102 @@ async def execute_workflow(payload: Dict[str, Any], background_tasks: Background

# Check if workflow exists in orchestrator
if orchestrator_id not in orchestrator.workflows:
# Fallback logic could go here
pass
# [FIX] Bridge Mock/UI Workflows to Real Orchestrator
# If not found, check MOCK_WORKFLOWS and register it on the fly
found_mock = next((w for w in MOCK_WORKFLOWS if w.id == workflow_id), None)
if not found_mock:
# Check templates if not in active workflows
found_mock = next((t for t in MOCK_TEMPLATES if t.id == workflow_id), None)

if found_mock:
orchestrator_steps = []

# Simple conversion logic
for step in found_mock.steps:
# Default to universal integration
step_type = WorkflowStepType.UNIVERSAL_INTEGRATION
svc = step.service.lower() if step.service else "unknown"
act = step.action.lower() if step.action else "execute"

if svc in ["ai", "llm"]:
step_type = WorkflowStepType.NLU_ANALYSIS
elif svc in ["slack", "discord"]:
step_type = WorkflowStepType.SLACK_NOTIFICATION
elif svc in ["email", "gmail", "outlook"]:
step_type = WorkflowStepType.EMAIL_SEND
elif svc == "delay":
step_type = WorkflowStepType.DELAY

orchestrator_steps.append(WorkflowStep(
step_id=step.id,
step_type=step_type,
description=step.name,
parameters={**step.parameters, "service": svc, "action": act},
next_steps=[] # Sequential by default in this simple bridge
))

# Link steps sequentially
for i in range(len(orchestrator_steps) - 1):
orchestrator_steps[i].next_steps = [orchestrator_steps[i+1].step_id]

new_def = WorkflowDefinition(
workflow_id=workflow_id,
name=found_mock.name,
description=found_mock.description,
steps=orchestrator_steps,
start_step=orchestrator_steps[0].step_id if orchestrator_steps else "end",
version="1.0-ui-bridge"
)

orchestrator.workflows[workflow_id] = new_def
orchestrator_id = workflow_id # Use the ID we just registered
pass
else:
print(f"Warning: Workflow ID {workflow_id} not found in orchestrator or mocks.")

# Generate Execution ID for immediate UI feedback
execution_id = f"exec_{uuid.uuid4().hex[:8]}"

# [FIX] Pre-register the context so it appears in lists immediately
# and provides valid data for the UI response
context = WorkflowContext(
workflow_id=execution_id,
user_id="ui_user",
input_data=input_data
)
context.execution_id = execution_id # Ensure this field exists if defined by chance
context.started_at = datetime.now()
context.status = WorkflowStatus.PENDING

# Register immediately
orchestrator.active_contexts[execution_id] = context

async def _run_orchestration():
await orchestrator.execute_workflow(orchestrator_id, input_data, execution_id=execution_id)
try:
# Pass the ALREADY CREATED contex ID
await orchestrator.execute_workflow(orchestrator_id, input_data, execution_id=execution_id)
except Exception as e:
print(f"Background execution failed: {e}")
import traceback
traceback.print_exc()
context.status = WorkflowStatus.FAILED
context.error_message = str(e)
context.completed_at = datetime.now()

background_tasks.add_task(_run_orchestration)

return {"success": True, "execution_id": execution_id, "message": "Workflow started via Real Orchestrator"}
# Return FULL Execution Object compatible with Frontend
return {
"success": True,
"execution_id": execution_id,
"workflow_id": workflow_id,
"status": "pending",
"start_time": context.started_at.isoformat(),
"current_step": 0,
"total_steps": 4, # Placeholder
"results": {},
"message": "Workflow started via Real Orchestrator"
}

@router.post("/executions/{execution_id}/cancel")
async def cancel_execution(execution_id: str):
Expand All @@ -379,3 +514,12 @@ async def cancel_execution(execution_id: str):
exc.status = "cancelled"
return {"success": True}
raise HTTPException(status_code=404, detail="Execution not found")
@router.get("/debug/state")
async def get_orchestrator_state():
"""Debug endpoint to inspect orchestrator memory"""
from advanced_workflow_orchestrator import orchestrator
return {
"active_contexts": list(orchestrator.active_contexts.keys()),
"memory_snapshots": list(orchestrator.memory_snapshots.keys()),
"snapshot_details": {k: {"step": v.get("current_step"), "vars": list(v.get("variables", {}).keys())} for k, v in orchestrator.memory_snapshots.items()}
}
35 changes: 35 additions & 0 deletions backend/create_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import requests
import time
import json

url = "http://localhost:8000/api/v1/workflow-ui/execute"
payload = {
"workflow_id": "customer_support_automation",
"input": {"text": "I have a billing issue"}
}

try:
for i in range(10):
try:
print(f"Sending POST to {url} (Attempt {i+1})...")
response = requests.post(url, json=payload)
print(f"Status: {response.status_code}")
if response.status_code == 200:
data = response.json()
exec_id = data.get('execution_id')
print(f"Execution ID: {exec_id}")
with open("last_execution_id.txt", "w") as f:
f.write(exec_id)
break
else:
print(f"Error: {response.text}")
except requests.exceptions.ConnectionError:
print("Server not ready, retrying in 2s...")
time.sleep(2)
continue
except Exception as e:
print(f"Request failed: {e}")
break

except Exception as e:
print(f"Script failed: {e}")
23 changes: 23 additions & 0 deletions backend/create_fork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import requests
import sys

# Read last execution ID
try:
with open("last_execution_id.txt", "r") as f:
execution_id = f.read().strip()
except FileNotFoundError:
print("Error: last_execution_id.txt not found. Run create_execution.py first.")
sys.exit(1)

url = f"http://localhost:8000/api/time-travel/workflows/{execution_id}/fork"
payload = {
"step_id": "analyze_ticket" # Trying a known step ID for customer_support_automation
}

print(f"Sending POST to {url}...")
try:
response = requests.post(url, json=payload)
print(f"Status: {response.status_code}")
print(f"Response: {response.text}")
except Exception as e:
print(f"Request failed: {e}")
4 changes: 2 additions & 2 deletions backend/integrations/atom_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
LanceDBMemoryManager,
CommunicationData
)
except ImportError:
logging.warning("Core LanceDB and Communication handlers not found. Using local fallbacks for development.")
except (ImportError, OSError, Exception) as e:
logging.warning(f"Core LanceDB and Communication handlers not found ({e}). Using local fallbacks for development.")

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion backend/integrations/bridge/external_integration_routes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import Dict, Any, List, Optional
from backend.core.external_integration_service import external_integration_service
from core.external_integration_service import external_integration_service

router = APIRouter(prefix="/api/v1/external-integrations", tags=["External Integrations"])

Expand Down
Loading
Loading