diff --git a/backend/advanced_workflow_orchestrator.py b/backend/advanced_workflow_orchestrator.py index 126e2163c..41375de9c 100644 --- a/backend/advanced_workflow_orchestrator.py +++ b/backend/advanced_workflow_orchestrator.py @@ -169,11 +169,308 @@ def __init__(self): self.template_manager = None logger.warning("WorkflowTemplateManager not found, template features disabled") + # In-Memory Snapshot Query Store (Fallback for Time-Travel) + self.memory_snapshots = {} + # Initialize AI service self._initialize_ai_service() # Load predefined workflows self._load_predefined_workflows() + + # Phase 11: Restore active executions (Fix Ghost Workflows) + self._restore_active_executions() + + def _create_snapshot(self, context: WorkflowContext, step_id: str): + """ + + """ + # Create snapshot data object + snapshot_data = { + "variables": context.variables.copy(), + "results": context.results.copy(), + "execution_history": context.execution_history.copy(), + "current_step": context.current_step + } + + # 1. Save to Memory (Always available) + snapshot_key = f"{context.workflow_id}:{step_id}" + self.memory_snapshots[snapshot_key] = snapshot_data + logger.info(f"📸 In-Memory Snapshot created for {context.workflow_id} at step {step_id}") + + # 2. Save to Database (If available) + if MODELS_AVAILABLE: + try: + from core.database import SessionLocal + from core.models import WorkflowSnapshot + import json + + with SessionLocal() as db: + snapshot = WorkflowSnapshot( + execution_id=context.workflow_id, + step_id=step_id, + step_order=len(context.execution_history), # Index based on history length + status=context.results.get(step_id, {}).get("status", "unknown"), + context_snapshot=json.dumps(snapshot_data) + ) + db.add(snapshot) + db.commit() + except Exception as e: + logger.error(f"Failed to persist snapshot to DB: {e}") + + def _restore_active_executions(self): + """ + Restore state of running/waiting workflows from DB after restart. + This prevents 'Ghost Workflows' that vanish from memory. + """ + if not MODELS_AVAILABLE: + logger.warning("Models not available, skipping execution restoration") + return + + try: + from core.database import SessionLocal + from core.models import WorkflowExecution + import json + + with SessionLocal() as db: + # Fetch orphaned executions + restorable_statuses = [ + WorkflowStatus.RUNNING.value, + WorkflowStatus.WAITING_APPROVAL.value + ] + executions = db.query(WorkflowExecution).filter( + WorkflowExecution.status.in_(restorable_statuses) + ).all() + + restored_count = 0 + for exec_record in executions: + try: + # Reconstruct Context + context_data = json.loads(exec_record.context) if exec_record.context else {} + + # Create fresh context object + context = WorkflowContext( + workflow_id=exec_record.workflow_id, + user_id=exec_record.user_id or "default_user", # Handle legacy nulls + input_data=json.loads(exec_record.input_data) if exec_record.input_data else {} + ) + + # Rehydrate state + # DB uses Uppercase (WorkflowExecutionStatus), Orchestrator uses Lowercase (WorkflowStatus) + try: + context.status = WorkflowStatus(exec_record.status.lower()) + except ValueError: + # Fallback if unknown status + logger.warning(f"Unknown status '{exec_record.status}' for workflow {exec_record.workflow_id}, defaulting to PENDING") + context.status = WorkflowStatus.PENDING + + context.variables = context_data.get("variables", {}) + context.results = context_data.get("results", {}) + context.execution_history = context_data.get("execution_history", []) + context.current_step = context_data.get("current_step") + + # Add to active memory + # NOTE: This does not auto-resume the AsyncIO task (which requires a Task Manager), + # but it makes the state visible effectively "pausing" it safely rather than losing it. + self.active_contexts[exec_record.workflow_id] = context + restored_count += 1 + except Exception as e: + logger.error(f"Failed to restore execution {exec_record.attributes.get('id', 'unknown')}: {e}") + + if restored_count > 0: + logger.info(f"👻 Resurrected {restored_count} Ghost Workflows from database.") + + except Exception as e: + logger.error(f"Error during execution restoration: {e}") + + async def fork_execution(self, original_execution_id: str, step_id: str, new_variables: Optional[Dict[str, Any]] = None) -> Optional[str]: + """ + Args: + original_execution_id: The timeline we are branching from. + step_id: The moment in time (step) to branch from. + new_variables: Optional changes to history (e.g., fixing a wrong input). + + Returns: + new_execution_id: The ID of the parallel universe. + """ + # Snapshot Retrieval Strategy: DB (Priority) -> Memory (Fallback) + snapshot_key = f"{original_execution_id}:{step_id}" + state_data = None + + # 1. Try DB First (Source of Truth) + if MODELS_AVAILABLE: + try: + from core.database import SessionLocal + from core.models import WorkflowSnapshot + import json + + with SessionLocal() as db: + snapshot = db.query(WorkflowSnapshot).filter( + WorkflowSnapshot.execution_id == original_execution_id, + WorkflowSnapshot.step_id == step_id + ).first() + + if snapshot: + state_data = json.loads(snapshot.context_snapshot) + logger.info(f"💾 Snapshot loaded from Database for {snapshot_key}") + except Exception as e: + logger.error(f"DB Snapshot lookup failed: {e}") + + # 2. Fallback to Memory if DB failed or missed + if not state_data: + state_data = self.memory_snapshots.get(snapshot_key) + if state_data: + logger.info(f"🧠 Snapshot loaded from Memory (Fallback) for {snapshot_key}") + + if not state_data: + logger.error(f"Snapshot not found for {original_execution_id} at {step_id} (DB + Memory checked)") + return None + + try: + # 2. Resurrect State from Snapshot + # Apply "Time Travel" edits (New Variables) + # This is the "Fix" part of "Fork & Fix" + current_vars = state_data.get("variables", {}).copy() + if new_variables: + # [Lesson 4] Safe Mode: Backend Safeguard + # Explicitly ignore system keys to prevent state corruption + system_keys = {'status', 'error', 'timestamp', 'execution_time_ms', 'step_id', 'step_type', 'notes', 'requires_confirmation'} + sanitized_vars = {k: v for k, v in new_variables.items() if k not in system_keys} + current_vars.update(sanitized_vars) + + # 3. Create the Parallel Universe (New Execution Record) + + # Get original metadata FIRST (DB Priority -> Memory Fallback) + original_user_id = "default" + original_input_data = {} + original_workflow_id = "unknown" + + # 3a. Try DB for Metadata + meta_found = False + if MODELS_AVAILABLE: + try: + from core.database import SessionLocal + from core.models import WorkflowExecution, WorkflowExecutionStatus + import json + + with SessionLocal() as db: + original_exec = db.query(WorkflowExecution).filter( + WorkflowExecution.execution_id == original_execution_id + ).first() + + if original_exec: + original_user_id = original_exec.user_id or "default" + original_input_data = json.loads(original_exec.input_data) if original_exec.input_data else {} + original_workflow_id = original_exec.workflow_id + meta_found = True + except Exception as e: + logger.warning(f"DB Metadata lookup failed: {e}") + + # 3b. Fallback to Memory for Metadata + if not meta_found and original_execution_id in self.active_contexts: + orig_ctx = self.active_contexts[original_execution_id] + original_user_id = orig_ctx.user_id + original_input_data = orig_ctx.input_data + original_workflow_id = orig_ctx.workflow_id + + # Generate ID using the retrieved workflow_id + new_execution_id = f"{original_workflow_id}-forked-{str(uuid.uuid4())[:8]}" + + + + # Persist the NEW execution to DB + if MODELS_AVAILABLE: + try: + from core.database import SessionLocal + from core.models import WorkflowExecution, WorkflowExecutionStatus + import json + + with SessionLocal() as db: + new_exec = WorkflowExecution( + execution_id=new_execution_id, + workflow_id=original_workflow_id, + user_id=original_user_id, + status=WorkflowExecutionStatus.PENDING.value, # Ready to run + input_data=json.dumps(original_input_data), + context=json.dumps({ + "variables": current_vars, + "results": state_data.get("results"), + "execution_history": state_data.get("execution_history"), + "current_step": step_id + }), + version=1 + ) + db.add(new_exec) + db.commit() + except Exception as e: + logger.warning(f"Failed to persist new forked execution to DB: {e}") + + # 4. Load into Orchestrator Memory (Critical for Execution) + context = WorkflowContext( + workflow_id=new_execution_id, + user_id=original_user_id, + input_data=original_input_data + ) + context.variables = current_vars + + # DEEP COPY results to prevent mutation bleeding between universes + import copy + context.results = copy.deepcopy(state_data.get("results", {})) + context.execution_history = copy.deepcopy(state_data.get("execution_history", [])) + + context.current_step = step_id + + self.active_contexts[new_execution_id] = context + + # TRIGGER EXECUTION + # 1. Resolve Definition ID + definition_id = original_input_data.get("_ui_workflow_id") + + # Fallback: If not in input, try to find a workflow that contains this step_id + # This is expensive but necessary if _ui_workflow_id isn't present + menu_workflow = None + if definition_id and definition_id in self.workflows: + menu_workflow = self.workflows[definition_id] + else: + for wf in self.workflows.values(): + if any(s.step_id == step_id for s in wf.steps): + menu_workflow = wf + break + + if menu_workflow: + logger.info(f"🚀 Fork Auto-Start: Triggering execution for {new_execution_id} using def {menu_workflow.workflow_id}") + asyncio.create_task(self._run_forked_execution(menu_workflow, step_id, context)) + else: + logger.warning(f"⚠️ Could not auto-start forked workflow {new_execution_id}: Definition not found.") + + logger.info(f"🌌 Timeline Forked! Created {new_execution_id} from {step_id}") + return new_execution_id + + except Exception as e: + logger.error(f"Forking failed: {e}") + return None + + async def _run_forked_execution(self, workflow: WorkflowDefinition, start_step_id: str, context: WorkflowContext): + """Lifecycle manager for forked executions""" + try: + context.status = WorkflowStatus.RUNNING + # context.started_at = datetime.datetime.now() # Keep original start time? Or reset? Let's keep original for history. + + await self._execute_workflow_step(workflow, start_step_id, context) + + # Only mark completed if not already failed + if context.status != WorkflowStatus.FAILED: + context.status = WorkflowStatus.COMPLETED + context.completed_at = datetime.datetime.now() + logger.info(f"✅ Forked execution {context.workflow_id} completed successfully.") + + except Exception as e: + context.status = WorkflowStatus.FAILED + context.error_message = str(e) + context.completed_at = datetime.datetime.now() + logger.error(f"❌ Forked execution {context.workflow_id} failed: {e}") + + def _initialize_ai_service(self): """Initialize AI service for NLU processing""" @@ -720,7 +1017,8 @@ async def generate_dynamic_workflow(self, user_query: str) -> Dict[str, Any]: async def execute_workflow(self, workflow_id: str, input_data: Dict[str, Any], - execution_context: Optional[Dict[str, Any]] = None) -> WorkflowContext: + execution_context: Optional[Dict[str, Any]] = None, + execution_id: str = None) -> WorkflowContext: """Execute a complex workflow""" if workflow_id not in self.workflows: @@ -728,7 +1026,7 @@ async def execute_workflow(self, workflow_id: str, input_data: Dict[str, Any], workflow = self.workflows[workflow_id] context = WorkflowContext( - workflow_id=str(uuid.uuid4()), + workflow_id=execution_id or str(uuid.uuid4()), user_id=execution_context.get("user_id", "default_user") if execution_context else "default_user", input_data=input_data, status=WorkflowStatus.RUNNING, @@ -1076,6 +1374,9 @@ async def _execute_workflow_step(self, workflow: WorkflowDefinition, step_id: st # Sequential execution for next_step in target_next_steps: await self._execute_workflow_step(workflow, next_step, context) + + + self._create_snapshot(context, step_id) async def _check_conditions(self, conditions: Dict[str, Any], context: WorkflowContext) -> bool: """Check if step conditions are met""" @@ -1157,23 +1458,93 @@ async def _evaluate_condition(self, condition: str, context: WorkflowContext) -> return True # Default to proceeding if condition evaluation fails def _resolve_variables(self, value: Any, context: WorkflowContext) -> Any: - """Resolve variables in a value (string, dict, or list)""" + + """ + Resolve variables in a value (string, dict, or list) with support for nesting. + Uses an iterative inside-out approach to handle {{ {{var}} }}. + """ if isinstance(value, str): - # Replace {{variable}} with value from context.variables - import re - matches = re.findall(r'\{\{([^}]+)\}\}', value) - for match in matches: - # Support nested access like {{step_id.key}} - if '.' in match: - parts = match.split('.') - step_id = parts[0] - key = parts[1] - if step_id in context.results: - val = context.results[step_id].get(key, "") - value = value.replace(f"{{{{{match}}}}}", str(val)) - elif match in context.variables: - value = value.replace(f"{{{{{match}}}}}", str(context.variables[match])) - return value + # Iteratively resolve innermost variables first + # Limit iterations to prevent infinite loops (e.g., self-referencing variables) + max_iterations = 10 + current_value = value + + for _ in range(max_iterations): + # Find all {{ key }} patterns that do NOT contain other {{ }} inside them + # strictly matching the innermost pair + matches = re.finditer(r'\{\{([^{}]+)\}\}', current_value) + + replacements_made = False + # We must process matches carefully because the string changes + # It's safer to find one, replace, and re-scan, or process strictly distinct regions. + # Re-scanning is safer for overlaps, though slightly slower. + + # Let's collect ALL simple matches in this pass + found_matches = list(matches) + + if not found_matches: + break # No more variables to resolve + + # Apply replacements for this pass + # We use a temporary string construction to avoid index offset issues + new_value = current_value + + for match in found_matches: + full_match = match.group(0) # {{key}} + var_content = match.group(1).strip() # key + + replacement_val = full_match # Default/Fallback + + # 1. Resolve the key + if '.' in var_content: + # Step output access: step_id.key.subkey... + parts = var_content.split('.') + step_id = parts[0] + + if step_id in context.results: + # We found the step, now traverse the rest of the path + val = context.results[step_id] + path = parts[1:] + + found = True + for p in path: + if isinstance(val, dict): + val = val.get(p) + if val is None: + found = False + break + else: + # We tried to access a property of a non-dict + found = False + break + + if found and val is not None: + replacement_val = str(val) + + elif var_content in context.variables: + # Direct context variable + replacement_val = str(context.variables[var_content]) + + # 2. Perform replacement if we found a value + # Note: We replace ONLY if we resolved it, or should we leave it? + # Previous logic left it. We'll stick to that but handle the recursion. + if replacement_val != full_match: + # Replace only the FIRST occurrence related to this specific match logic? + # Or all? All is standard for templates. + # But be careful if two different vars resolve to same string. + new_value = new_value.replace(full_match, replacement_val) + replacements_made = True + + if not replacements_made: + # If we found matches but couldn't resolve ANY of them, we are stuck. + # Stop to avoid infinite loop. + break + + current_value = new_value + + return current_value + + elif isinstance(value, dict): return {k: self._resolve_variables(v, context) for k, v in value.items()} elif isinstance(value, list): diff --git a/backend/api/time_travel_routes.py b/backend/api/time_travel_routes.py new file mode 100644 index 000000000..fe56abcb1 --- /dev/null +++ b/backend/api/time_travel_routes.py @@ -0,0 +1,46 @@ + +from fastapi import APIRouter, HTTPException, Depends +from pydantic import BaseModel +from typing import Dict, Any, Optional +import logging + +from advanced_workflow_orchestrator import orchestrator + +router = APIRouter(prefix="/api/time-travel", tags=["time_travel"]) +logger = logging.getLogger(__name__) + +# Single instance/factory pattern should be used in real app +# For now, we assume one orchestrator exists or is created per request (which matches current tests but not prod) +# TO-DO: Inject the singleton orchestrator from main_api_app + +class ForkRequest(BaseModel): + step_id: str + new_variables: Optional[Dict[str, Any]] = None + +@router.post("/workflows/{execution_id}/fork") +async def fork_workflow(execution_id: str, request: ForkRequest): + """ + [Lesson 3] Fork a workflow execution from a specific step. + Creates a 'Parallel Universe' with modified variables. + """ + logger.info(f"⏳ Time-Travel Request: Forking {execution_id} at {request.step_id}") + + + # Use the shared singleton instance + orch = orchestrator + + new_execution_id = await orch.fork_execution( + original_execution_id=execution_id, + step_id=request.step_id, + new_variables=request.new_variables + ) + + if not new_execution_id: + raise HTTPException(status_code=404, detail="Snapshot not found or fork failed") + + return { + "status": "success", + "original_execution_id": execution_id, + "new_execution_id": new_execution_id, + "message": "Welcome to the Multiverse. 🌌" + } diff --git a/backend/core/models.py b/backend/core/models.py index 905a83db2..d97ff207a 100644 --- a/backend/core/models.py +++ b/backend/core/models.py @@ -460,6 +460,27 @@ class UserConnection(Base): user = relationship("User", backref="connections") workspace = relationship("Workspace", backref="connections") +class WorkflowSnapshot(Base): + """ + Time-Travel Debugging: Immutable snapshot of execution state at a specific step. + This acts as a 'Save Point' allowing users to fork/replay from this exact moment. + """ + __tablename__ = "workflow_snapshots" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + execution_id = Column(String, ForeignKey("workflow_executions.execution_id"), nullable=False, index=True) + step_id = Column(String, nullable=False) # The step that just finished/is current + step_order = Column(Integer, nullable=False) # Sequence number (0, 1, 2...) + + # State Capture + context_snapshot = Column(Text, nullable=False) # Full JSON dump of WorkflowContext (vars, results) + + # Metadata + status = Column(String, nullable=False) # Status at this snapshot (e.g. COMPLETED, FAILED) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + # Relationships + execution = relationship("WorkflowExecution", backref="snapshots") class IngestedDocument(Base): """Record of an ingested document from a service like Google Drive""" __tablename__ = "ingested_documents" diff --git a/backend/core/workflow_ui_endpoints.py b/backend/core/workflow_ui_endpoints.py index 0b7abe053..af131f219 100644 --- a/backend/core/workflow_ui_endpoints.py +++ b/backend/core/workflow_ui_endpoints.py @@ -303,33 +303,80 @@ async def get_executions(): executions = [] # Convert Orchestrator contexts to UI Execution models - for context in orchestrator.active_contexts.values(): - status_map = { - WorkflowStatus.PENDING: "pending", - WorkflowStatus.RUNNING: "running", - WorkflowStatus.COMPLETED: "completed", - WorkflowStatus.FAILED: "failed", - WorkflowStatus.CANCELLED: "cancelled" - } - - # Calculate metrics - total_steps = 4 # Default estimate - current_step = 0 - if context.results: - current_step = len(context.results) - - executions.append(WorkflowExecution( - execution_id=context.execution_id, - workflow_id=context.input_data.get("_ui_workflow_id", context.workflow_id), # Prefer UI ID if stored - status=status_map.get(context.status, "unknown"), - start_time=context.started_at.isoformat() if context.started_at else datetime.now().isoformat(), - end_time=context.completed_at.isoformat() if context.completed_at else None, - current_step=current_step, - total_steps=total_steps, - trigger_data=context.input_data, - results=context.results, - errors=[context.error_message] if context.error_message else [] - )) + + # Use list() to avoid RuntimeError if dict changes size during iteration + for context in list(orchestrator.active_contexts.values()): + try: + # Handle potential dict vs object (migration safety) + c_id = getattr(context, 'workflow_id', None) + if not c_id and isinstance(context, dict): + c_id = context.get('workflow_id') + + c_input = getattr(context, 'input_data', {}) + if not c_input and isinstance(context, dict): + c_input = context.get('input_data', {}) + + c_status = getattr(context, 'status', 'pending') + if isinstance(context, dict): + c_status = context.get('status', 'pending') + + status_str = "unknown" + if hasattr(c_status, 'value'): + status_str = c_status.value + else: + status_str = str(c_status) + + # Safe Date Handling + c_started = getattr(context, 'started_at', None) + if isinstance(context, dict): + c_started = context.get('started_at') + + start_time_str = datetime.now().isoformat() + if isinstance(c_started, datetime): + start_time_str = c_started.isoformat() + elif isinstance(c_started, str): + start_time_str = c_started + + c_ended = getattr(context, 'completed_at', None) + if isinstance(context, dict): + c_ended = context.get('completed_at') + + end_time_str = None + if isinstance(c_ended, datetime): + end_time_str = c_ended.isoformat() + elif isinstance(c_ended, str): + end_time_str = c_ended + + c_results = getattr(context, 'results', {}) + if isinstance(context, dict): + c_results = context.get('results', {}) + + c_error = getattr(context, 'error_message', None) + if isinstance(context, dict): + c_error = context.get('error_message') + + # Calculate metrics + current_step = len(c_results) if c_results else 0 + + executions.append(WorkflowExecution( + execution_id=str(c_id), + workflow_id=c_input.get("_ui_workflow_id", str(c_id)), # Prefer UI ID if stored + status=status_str, + start_time=start_time_str, + end_time=end_time_str, + current_step=current_step, + total_steps=4, + trigger_data=c_input, + results=c_results, + errors=[str(c_error)] if c_error else [] + )) + except Exception as e: + # Log but don't crash the whole list + import traceback + print(f"Error parsing execution context: {e}") + # traceback.print_exc() + continue + # Sort by start time (newest first) executions.sort(key=lambda x: x.start_time, reverse=True) @@ -338,10 +385,14 @@ async def get_executions(): except ImportError: # Fallback if orchestrator not available/path issue return {"success": True, "executions": [e.dict() for e in MOCK_EXECUTIONS]} + except Exception as e: + import traceback + traceback.print_exc() + return {"success": False, "error": str(e), "executions": []} @router.post("/execute") async def execute_workflow(payload: Dict[str, Any], background_tasks: BackgroundTasks): - from advanced_workflow_orchestrator import orchestrator + from advanced_workflow_orchestrator import orchestrator, WorkflowContext, WorkflowStatus, WorkflowDefinition, WorkflowStep, WorkflowStepType workflow_id = payload.get("workflow_id") input_data = payload.get("input", {}) @@ -359,18 +410,102 @@ async def execute_workflow(payload: Dict[str, Any], background_tasks: Background # Check if workflow exists in orchestrator if orchestrator_id not in orchestrator.workflows: - # Fallback logic could go here - pass + # [FIX] Bridge Mock/UI Workflows to Real Orchestrator + # If not found, check MOCK_WORKFLOWS and register it on the fly + found_mock = next((w for w in MOCK_WORKFLOWS if w.id == workflow_id), None) + if not found_mock: + # Check templates if not in active workflows + found_mock = next((t for t in MOCK_TEMPLATES if t.id == workflow_id), None) + + if found_mock: + orchestrator_steps = [] + + # Simple conversion logic + for step in found_mock.steps: + # Default to universal integration + step_type = WorkflowStepType.UNIVERSAL_INTEGRATION + svc = step.service.lower() if step.service else "unknown" + act = step.action.lower() if step.action else "execute" + + if svc in ["ai", "llm"]: + step_type = WorkflowStepType.NLU_ANALYSIS + elif svc in ["slack", "discord"]: + step_type = WorkflowStepType.SLACK_NOTIFICATION + elif svc in ["email", "gmail", "outlook"]: + step_type = WorkflowStepType.EMAIL_SEND + elif svc == "delay": + step_type = WorkflowStepType.DELAY + + orchestrator_steps.append(WorkflowStep( + step_id=step.id, + step_type=step_type, + description=step.name, + parameters={**step.parameters, "service": svc, "action": act}, + next_steps=[] # Sequential by default in this simple bridge + )) + + # Link steps sequentially + for i in range(len(orchestrator_steps) - 1): + orchestrator_steps[i].next_steps = [orchestrator_steps[i+1].step_id] + + new_def = WorkflowDefinition( + workflow_id=workflow_id, + name=found_mock.name, + description=found_mock.description, + steps=orchestrator_steps, + start_step=orchestrator_steps[0].step_id if orchestrator_steps else "end", + version="1.0-ui-bridge" + ) + + orchestrator.workflows[workflow_id] = new_def + orchestrator_id = workflow_id # Use the ID we just registered + pass + else: + print(f"Warning: Workflow ID {workflow_id} not found in orchestrator or mocks.") # Generate Execution ID for immediate UI feedback execution_id = f"exec_{uuid.uuid4().hex[:8]}" + # [FIX] Pre-register the context so it appears in lists immediately + # and provides valid data for the UI response + context = WorkflowContext( + workflow_id=execution_id, + user_id="ui_user", + input_data=input_data + ) + context.execution_id = execution_id # Ensure this field exists if defined by chance + context.started_at = datetime.now() + context.status = WorkflowStatus.PENDING + + # Register immediately + orchestrator.active_contexts[execution_id] = context + async def _run_orchestration(): - await orchestrator.execute_workflow(orchestrator_id, input_data, execution_id=execution_id) + try: + # Pass the ALREADY CREATED contex ID + await orchestrator.execute_workflow(orchestrator_id, input_data, execution_id=execution_id) + except Exception as e: + print(f"Background execution failed: {e}") + import traceback + traceback.print_exc() + context.status = WorkflowStatus.FAILED + context.error_message = str(e) + context.completed_at = datetime.now() background_tasks.add_task(_run_orchestration) - return {"success": True, "execution_id": execution_id, "message": "Workflow started via Real Orchestrator"} + # Return FULL Execution Object compatible with Frontend + return { + "success": True, + "execution_id": execution_id, + "workflow_id": workflow_id, + "status": "pending", + "start_time": context.started_at.isoformat(), + "current_step": 0, + "total_steps": 4, # Placeholder + "results": {}, + "message": "Workflow started via Real Orchestrator" + } @router.post("/executions/{execution_id}/cancel") async def cancel_execution(execution_id: str): @@ -379,3 +514,12 @@ async def cancel_execution(execution_id: str): exc.status = "cancelled" return {"success": True} raise HTTPException(status_code=404, detail="Execution not found") +@router.get("/debug/state") +async def get_orchestrator_state(): + """Debug endpoint to inspect orchestrator memory""" + from advanced_workflow_orchestrator import orchestrator + return { + "active_contexts": list(orchestrator.active_contexts.keys()), + "memory_snapshots": list(orchestrator.memory_snapshots.keys()), + "snapshot_details": {k: {"step": v.get("current_step"), "vars": list(v.get("variables", {}).keys())} for k, v in orchestrator.memory_snapshots.items()} + } diff --git a/backend/create_execution.py b/backend/create_execution.py new file mode 100644 index 000000000..b4bce4506 --- /dev/null +++ b/backend/create_execution.py @@ -0,0 +1,35 @@ +import requests +import time +import json + +url = "http://localhost:8000/api/v1/workflow-ui/execute" +payload = { + "workflow_id": "customer_support_automation", + "input": {"text": "I have a billing issue"} +} + +try: + for i in range(10): + try: + print(f"Sending POST to {url} (Attempt {i+1})...") + response = requests.post(url, json=payload) + print(f"Status: {response.status_code}") + if response.status_code == 200: + data = response.json() + exec_id = data.get('execution_id') + print(f"Execution ID: {exec_id}") + with open("last_execution_id.txt", "w") as f: + f.write(exec_id) + break + else: + print(f"Error: {response.text}") + except requests.exceptions.ConnectionError: + print("Server not ready, retrying in 2s...") + time.sleep(2) + continue + except Exception as e: + print(f"Request failed: {e}") + break + +except Exception as e: + print(f"Script failed: {e}") diff --git a/backend/create_fork.py b/backend/create_fork.py new file mode 100644 index 000000000..35f7fe742 --- /dev/null +++ b/backend/create_fork.py @@ -0,0 +1,23 @@ +import requests +import sys + +# Read last execution ID +try: + with open("last_execution_id.txt", "r") as f: + execution_id = f.read().strip() +except FileNotFoundError: + print("Error: last_execution_id.txt not found. Run create_execution.py first.") + sys.exit(1) + +url = f"http://localhost:8000/api/time-travel/workflows/{execution_id}/fork" +payload = { + "step_id": "analyze_ticket" # Trying a known step ID for customer_support_automation +} + +print(f"Sending POST to {url}...") +try: + response = requests.post(url, json=payload) + print(f"Status: {response.status_code}") + print(f"Response: {response.text}") +except Exception as e: + print(f"Request failed: {e}") diff --git a/backend/integrations/atom_ingestion_pipeline.py b/backend/integrations/atom_ingestion_pipeline.py index 125e57279..35670f50f 100644 --- a/backend/integrations/atom_ingestion_pipeline.py +++ b/backend/integrations/atom_ingestion_pipeline.py @@ -18,8 +18,8 @@ LanceDBMemoryManager, CommunicationData ) -except ImportError: - logging.warning("Core LanceDB and Communication handlers not found. Using local fallbacks for development.") +except (ImportError, OSError, Exception) as e: + logging.warning(f"Core LanceDB and Communication handlers not found ({e}). Using local fallbacks for development.") logger = logging.getLogger(__name__) diff --git a/backend/integrations/bridge/external_integration_routes.py b/backend/integrations/bridge/external_integration_routes.py index ed1ab81b1..95c353ba6 100644 --- a/backend/integrations/bridge/external_integration_routes.py +++ b/backend/integrations/bridge/external_integration_routes.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, HTTPException, BackgroundTasks from typing import Dict, Any, List, Optional -from backend.core.external_integration_service import external_integration_service +from core.external_integration_service import external_integration_service router = APIRouter(prefix="/api/v1/external-integrations", tags=["External Integrations"]) diff --git a/backend/integrations/zoho_workdrive_service.py b/backend/integrations/zoho_workdrive_service.py index e53c9897d..a6ec6eb8a 100644 --- a/backend/integrations/zoho_workdrive_service.py +++ b/backend/integrations/zoho_workdrive_service.py @@ -47,6 +47,7 @@ async def get_access_token(self, user_id: str) -> Optional[str]: logger.error(f"Error getting Zoho access token: {e}") return None + async def get_teams(self, user_id: str) -> List[Dict[str, Any]]: """List WorkDrive teams for the user""" token = await self.get_access_token(user_id) diff --git a/backend/last_execution_id.txt b/backend/last_execution_id.txt new file mode 100644 index 000000000..f0daa84ec --- /dev/null +++ b/backend/last_execution_id.txt @@ -0,0 +1 @@ +exec_bea860ec \ No newline at end of file diff --git a/backend/main_api_app.py b/backend/main_api_app.py index d4d4d39b7..f2ad5e2f6 100644 --- a/backend/main_api_app.py +++ b/backend/main_api_app.py @@ -306,6 +306,12 @@ async def auto_load_integration_middleware(request, call_next): except ImportError as e: logger.warning(f"Reasoning routes not found: {e}") + # 4d. Time Travel Routes + try: + from api.time_travel_routes import router as time_travel_router # [Lesson 3] + app.include_router(time_travel_router) # [Lesson 3] + except ImportError as e: + logger.warning(f"Time Travel routes not found: {e}") # 4. Microsoft 365 Integration try: from integrations.microsoft365_routes import microsoft365_router diff --git a/backend/orchestrator_debug.txt b/backend/orchestrator_debug.txt new file mode 100644 index 000000000..55eff0a2d --- /dev/null +++ b/backend/orchestrator_debug.txt @@ -0,0 +1,6 @@ +EXECUTE: 2086765856480 +DEBUG_STATE: 2086765856480 +EXECUTE: 2849150846688 +EXECUTE: 2696176240688 +FORK: 2696176240688 +DEBUG_STATE: 2696176240688 diff --git a/backend/orchestrator_trace.txt b/backend/orchestrator_trace.txt new file mode 100644 index 000000000..184859276 --- /dev/null +++ b/backend/orchestrator_trace.txt @@ -0,0 +1,6 @@ +TRACE: execute_workflow customer_support_automation exec_id=exec_06e5fa6e in instance 2696176240688 +TRACE: Added context exec_06e5fa6e. Active count: 1 +TRACE: _create_snapshot exec_06e5fa6e step=categorize_ticket in 2696176240688 +TRACE: Saved memory snapshot exec_06e5fa6e:categorize_ticket. Total snapshots: 1 +TRACE: _create_snapshot exec_06e5fa6e step=analyze_ticket in 2696176240688 +TRACE: Saved memory snapshot exec_06e5fa6e:analyze_ticket. Total snapshots: 2 diff --git a/backend/start_server.bat b/backend/start_server.bat new file mode 100644 index 000000000..df5ce90ef --- /dev/null +++ b/backend/start_server.bat @@ -0,0 +1,8 @@ +@echo off +cd /d "%~dp0" +echo Starting ATOM Backend Server... +echo Activating Virtual Environment... +call venv\Scripts\activate.bat +echo Starting API Application... +python main_api_app.py +pause diff --git a/backend/startup_error.txt b/backend/startup_error.txt new file mode 100644 index 000000000..b6def9246 Binary files /dev/null and b/backend/startup_error.txt differ diff --git a/backend/test_api_error.txt b/backend/test_api_error.txt new file mode 100644 index 000000000..dc024c2d4 Binary files /dev/null and b/backend/test_api_error.txt differ diff --git a/backend/test_api_error_2.txt b/backend/test_api_error_2.txt new file mode 100644 index 000000000..fe4c762e8 Binary files /dev/null and b/backend/test_api_error_2.txt differ diff --git a/backend/test_api_error_2_utf8.txt b/backend/test_api_error_2_utf8.txt new file mode 100644 index 000000000..bcb445090 --- /dev/null +++ b/backend/test_api_error_2_utf8.txt @@ -0,0 +1,200 @@ +python : \u26a0\ufe0f +WARNING: Using SQLite +development database. +Set DATABASE_URL for +production deployment. +At line:1 char:1 ++ python tests/chaos/tes +t_api_forking.py > +test_api_error_2.txt +2>&1 ++ ~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~ + + CategoryInfo + : NotSpecified + : (\u26a0\ufe0f WA. + ..ion deployment.:S +tring) [], RemoteEx +ception + + FullyQualifiedErr + orId : NativeComman + dError + +C:\Users\Mannan Bajaj\at +om\backend\tests\chaos\. +./..\core\workflow_templ +ate_system.py:47: Pydant +icDeprecatedSince20: +Pydantic V1 style +`@validator` validators +are deprecated. You +should migrate to +Pydantic V2 style +`@field_validator` +validators, see the +migration guide for +more details. +Deprecated in Pydantic +V2.0 to be removed in +V3.0. See Pydantic V2 +Migration Guide at https +://errors.pydantic.dev/2 +.11/migration/ + @validator('label', +'description', +pre=True, always=True) +C:\Users\Mannan Bajaj\at +om\backend\tests\chaos\. +./..\core\workflow_templ +ate_system.py:69: Pydant +icDeprecatedSince20: +Pydantic V1 style +`@validator` validators +are deprecated. You +should migrate to +Pydantic V2 style +`@field_validator` +validators, see the +migration guide for +more details. +Deprecated in Pydantic +V2.0 to be removed in +V3.0. See Pydantic V2 +Migration Guide at https +://errors.pydantic.dev/2 +.11/migration/ + +@validator('depends_on') +C:\Users\Mannan Bajaj\Ap +pData\Roaming\Python\Pyt +hon313\site-packages\pyd +antic\_internal\_config. +py:323: PydanticDeprecat +edSince20: Support for +class-based `config` is +deprecated, use +ConfigDict instead. +Deprecated in Pydantic +V2.0 to be removed in +V3.0. See Pydantic V2 +Migration Guide at https +://errors.pydantic.dev/2 +.11/migration/ + warnings.warn(DEPRECAT +ION_MESSAGE, +DeprecationWarning) +C:\Users\Mannan Bajaj\at +om\backend\tests\chaos\. +./..\core\workflow_templ +ate_system.py:111: Pydan +ticDeprecatedSince20: +Pydantic V1 style +`@validator` validators +are deprecated. You +should migrate to +Pydantic V2 style +`@field_validator` +validators, see the +migration guide for +more details. +Deprecated in Pydantic +V2.0 to be removed in +V3.0. See Pydantic V2 +Migration Guide at https +://errors.pydantic.dev/2 +.11/migration/ + @validator('steps') +Could not initialize AI +service: No module +named 'anthropic' +Error during execution +restoration: (sqlite3.Op +erationalError) no such +column: workflow_executi +ons.visibility +[SQL: SELECT workflow_ex +ecutions.execution_id +AS workflow_executions_e +xecution_id, workflow_ex +ecutions.workflow_id AS +workflow_executions_work +flow_id, workflow_execut +ions.status AS workflow_ +executions_status, workf +low_executions.input_dat +a AS workflow_executions +_input_data, workflow_ex +ecutions.steps AS workfl +ow_executions_steps, wor +kflow_executions.outputs + AS workflow_executions_ +outputs, workflow_execut +ions.context AS workflow +_executions_context, wor +kflow_executions.version + AS workflow_executions_ +version, workflow_execut +ions.created_at AS workf +low_executions_created_a +t, workflow_executions.u +pdated_at AS workflow_ex +ecutions_updated_at, wor +kflow_executions.error +AS workflow_executions_e +rror, workflow_execution +s.user_id AS workflow_ex +ecutions_user_id, workfl +ow_executions.visibility + AS workflow_executions_ +visibility, workflow_exe +cutions.owner_id AS work +flow_executions_owner_id +, workflow_executions.te +am_id AS workflow_execut +ions_team_id +FROM +workflow_executions +WHERE workflow_execution +s.status IN (?, ?)] +[parameters: +('running', +'waiting_approval')] +(Background on this +error at: https://sqlalc +he.me/e/20/e3q8) +E +======================== +======================== +====================== +ERROR: +test_fork_endpoint (__ma +in__.ForkApiTest.test_fo +rk_endpoint) +Verify that POST /api/ti +me-travel/workflows/:id/ +fork calls the +orchestrator. +------------------------ +------------------------ +---------------------- +Traceback (most recent +call last): + File "C:\Python313\Lib +\unittest\mock.py", +line 1424, in patched + return +func(*newargs, +**newkeywargs) +TypeError: ForkApiTest.t +est_fork_endpoint() +missing 1 required +positional argument: +'mock_fork' + +------------------------ +------------------------ +---------------------- +Ran 1 test in 1.316s + +FAILED (errors=1) diff --git a/backend/test_api_error_utf8.txt b/backend/test_api_error_utf8.txt new file mode 100644 index 000000000..05345fd60 --- /dev/null +++ b/backend/test_api_error_utf8.txt @@ -0,0 +1,104 @@ +python : \u26a0\ufe0f +WARNING: Using SQLite +development database. +Set DATABASE_URL for +production deployment. +At line:1 char:1 ++ python tests/chaos/tes +t_api_forking.py > +test_api_error.txt 2>&1 ++ ~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~ + + CategoryInfo + : NotSpecified + : (\u26a0\ufe0f WA. + ..ion deployment.:S +tring) [], RemoteEx +ception + + FullyQualifiedErr + orId : NativeComman + dError + +E +======================== +======================== +====================== +ERROR: +test_fork_endpoint (__ma +in__.ForkApiTest.test_fo +rk_endpoint) +Verify that POST /api/ti +me-travel/workflows/:id/ +fork calls the +orchestrator. +------------------------ +------------------------ +---------------------- +Traceback (most recent +call last): + File "C:\Python313\Lib +\unittest\mock.py", +line 1421, in patched + with self.decoration +_helper(patched, + ~~~~~~~~~~~~~~~ +~~~~~~~^^^^^^^^^ + + args, + + ^^^^^ + + keywargs) as +(newargs, newkeywargs): + + ^^^^^^^^^ + File "C:\Python313\Lib +\contextlib.py", line +141, in __enter__ + return +next(self.gen) + File "C:\Python313\Lib +\unittest\mock.py", +line 1403, in +decoration_helper + arg = exit_stack.ent +er_context(patching) + File "C:\Python313\Lib +\contextlib.py", line +530, in enter_context + result = _enter(cm) + File "C:\Python313\Lib +\unittest\mock.py", +line 1495, in __enter__ + original, local = +self.get_original() + +~~~~~~~~~~~~~~~~~^^ + File "C:\Python313\Lib +\unittest\mock.py", +line 1465, in +get_original + raise +AttributeError( + "%s does not +have the attribute %r" +% (target, name) + ) +AttributeError: +does not have the +attribute +'MODELS_AVAILABLE' + +------------------------ +------------------------ +---------------------- +Ran 1 test in 0.834s + +FAILED (errors=1) diff --git a/backend/test_api_output.txt b/backend/test_api_output.txt new file mode 100644 index 000000000..453dd7669 Binary files /dev/null and b/backend/test_api_output.txt differ diff --git a/backend/test_fork_output.txt b/backend/test_fork_output.txt new file mode 100644 index 000000000..0d080d812 Binary files /dev/null and b/backend/test_fork_output.txt differ diff --git a/backend/test_output.txt b/backend/test_output.txt index b92800d37..79184577d 100644 Binary files a/backend/test_output.txt and b/backend/test_output.txt differ diff --git a/backend/tests/chaos/test_api_forking.py b/backend/tests/chaos/test_api_forking.py new file mode 100644 index 000000000..f5a9d392a --- /dev/null +++ b/backend/tests/chaos/test_api_forking.py @@ -0,0 +1,55 @@ + +import unittest +from unittest.mock import MagicMock, patch, AsyncMock +from fastapi.testclient import TestClient +import sys +import os + +# Add backend to path +sys.path.append(os.path.join(os.path.dirname(__file__), '../../')) + +class ForkApiTest(unittest.TestCase): + + @patch('advanced_workflow_orchestrator.AdvancedWorkflowOrchestrator.fork_execution', new_callable=AsyncMock) + @patch('core.database.SessionLocal') # Mock DB to prevent startup errors + @patch('advanced_workflow_orchestrator.MODELS_AVAILABLE', True) + def test_fork_endpoint(self, mock_fork, mock_session): + """ + Verify that POST /api/time-travel/workflows/:id/fork calls the orchestrator. + """ + # Setup Mock Return + mock_fork.return_value = "forked-123" + + # Import app AFTER mocking to avoid premature startup logic + try: + from main_api_app import app + client = TestClient(app) + + print("\n[Test API] Calling Fork Endpoint...") + response = client.post( + "/api/time-travel/workflows/origin-123/fork", + json={"step_id": "step_5", "new_variables": {"a": 1}} + ) + + print(f"[Test API] Status: {response.status_code}") + print(f"[Test API] Response: {response.json()}") + + # Assertions + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["new_execution_id"], "forked-123") + + # Verify Orchestrator Call + mock_fork.assert_called_once() + print("SUCCESS: Endpoint correctly routed to Orchestrator.") + + except ImportError: + import traceback + print(f"IMPORT ERROR:\n{traceback.format_exc()}") + raise + except Exception: + import traceback + print(f"TEST EXECUTION ERROR:\n{traceback.format_exc()}") + raise + +if __name__ == '__main__': + unittest.main() diff --git a/backend/tests/chaos/test_forking.py b/backend/tests/chaos/test_forking.py new file mode 100644 index 000000000..db1e2e018 --- /dev/null +++ b/backend/tests/chaos/test_forking.py @@ -0,0 +1,99 @@ + +import unittest +from unittest.mock import MagicMock, patch, ANY +import sys +import os +import json + +# Add backend to path +sys.path.append(os.path.join(os.path.dirname(__file__), '../../')) + +class ForkingTest(unittest.TestCase): + + @patch('core.database.SessionLocal') + @patch('advanced_workflow_orchestrator.MODELS_AVAILABLE', True) + def test_fork_execution(self, mock_session_cls): + """ + Verify that fork_execution creates a parallel universe. + """ + # 1. Setup Mock DB + mock_db = MagicMock() + mock_session_cls.return_value.__enter__.return_value = mock_db + + # 2. Mock Snapshot (The Save Point) + mock_snapshot = MagicMock() + mock_snapshot.execution_id = "origin_timeline" + mock_snapshot.step_id = "step_5" + mock_snapshot.context_snapshot = json.dumps({ + "variables": {"status": "broken", "money": 0}, + "results": {"step_4": "ok"}, + "execution_history": ["step_1", "step_2", "step_3", "step_4"], + "current_step": "step_5" + }) + + # 3. Mock Original Execution (for metadata clonging) + mock_orig_exec = MagicMock() + mock_orig_exec.workflow_id = "wf_financial" + mock_orig_exec.input_data = '{"client": "BigCorp"}' + mock_orig_exec.version = 1 + + # Configure DB Query Side Effects + def query_side_effect(model): + query_mock = MagicMock() + if "WorkflowSnapshot" in str(model): + # The implementation uses .filter(A, B).first() -> Single filter call + query_mock.filter.return_value.first.return_value = mock_snapshot + elif "WorkflowExecution" in str(model): + query_mock.filter.return_value.first.return_value = mock_orig_exec + return query_mock + + mock_db.query.side_effect = query_side_effect + + # 4. Run the Fork + from advanced_workflow_orchestrator import AdvancedWorkflowOrchestrator + orch = AdvancedWorkflowOrchestrator() + + print("\n[Test Fork] Attempting to fork execution...") + # We apply a "Fix" during the fork + new_id = self.loop.run_until_complete( + orch.fork_execution( + "origin_timeline", + "step_5", + new_variables={"status": "fixed", "money": 1000} + ) + ) + + # 5. Verify the "Parallel Universe" + print(f"[Test Fork] New Universe ID: {new_id}") + self.assertIsNotNone(new_id) + self.assertNotEqual(new_id, "origin_timeline") + self.assertIn("fork", new_id) + + # Check DB Insert + mock_db.add.assert_called_once() + new_exec_record = mock_db.add.call_args[0][0] + self.assertEqual(new_exec_record.execution_id, new_id) + + # Check Initial State + context_data = json.loads(new_exec_record.context) + self.assertEqual(context_data["variables"]["status"], "fixed", "Variables were not patched!") + self.assertEqual(context_data["variables"]["money"], 1000, "Variables were not patched!") + self.assertEqual(context_data["current_step"], "step_5", "Wrong starting step") + + # Check In-Memory Load + self.assertIn(new_id, orch.active_contexts) + mem_ctx = orch.active_contexts[new_id] + self.assertEqual(mem_ctx.variables["status"], "fixed") + + print("SUCCESS: Fork created successfully with patched variables.") + + def setUp(self): + import asyncio + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.close() + +if __name__ == '__main__': + unittest.main() diff --git a/backend/tests/chaos/test_persistence.py b/backend/tests/chaos/test_persistence.py new file mode 100644 index 000000000..78681d1d9 --- /dev/null +++ b/backend/tests/chaos/test_persistence.py @@ -0,0 +1,63 @@ + +import unittest +from unittest.mock import MagicMock, patch +import sys +import os +import json + +# Add backend to path +sys.path.append(os.path.join(os.path.dirname(__file__), '../../')) + +class OrchestratorPersistenceTest(unittest.TestCase): + + @patch('core.database.SessionLocal') + @patch('advanced_workflow_orchestrator.MODELS_AVAILABLE', True) # Force enable + def test_ghost_resurrection(self, mock_session_cls): + """ + Verify that __init__ calls _restore_active_executions and loads from DB. + """ + # 1. Setup Mock DB + mock_db = MagicMock() + mock_session_cls.return_value.__enter__.return_value = mock_db + + # Mock Query Results + mock_execution = MagicMock() + mock_execution.workflow_id = "ghost_flow_1" + mock_execution.status = "RUNNING" # ENUM typically uses uppercase + mock_execution.context = json.dumps({ + "variables": {"status": "alive"}, + "current_step": "step_5" + }) + mock_execution.input_data = "{}" + mock_execution.user_id = "test_user" + + # When db.query().filter().all() is called + mock_db.query.return_value.filter.return_value.all.return_value = [mock_execution] + + # 2. Initialize Orchestrator + # We need to ensure we import the class where the patch is active + # The patching above hits 'advanced_workflow_orchestrator.SessionLocal' + # So we import the module + import advanced_workflow_orchestrator + # Force reload? No, simpler to just import. + from advanced_workflow_orchestrator import AdvancedWorkflowOrchestrator + + print("\n[Test Persistence] Initializing Orchestrator...") + # Check if patch worked by inspecting the imported module's SessionLocal if possible? + # Actually, since we use 'from core.database import SessionLocal' inside the function, + # checking sys.modules['core.database'].SessionLocal is what matters. + import core.database + print(f"DEBUG: core.database.SessionLocal is {core.database.SessionLocal}") + + orch = AdvancedWorkflowOrchestrator() + + # 3. Validation + print(f"[Test Persistence] Active Contexts: {len(orch.active_contexts)}") + + self.assertIn("ghost_flow_1", orch.active_contexts, "Ghost workflow was NOT restored!") + restored_ctx = orch.active_contexts["ghost_flow_1"] + self.assertEqual(restored_ctx.variables.get("status"), "alive", "Context variables lost") + print("SUCCESS: Ghost Workflow 'ghost_flow_1' was successfully resurrected from DB.") + +if __name__ == '__main__': + unittest.main() diff --git a/backend/tests/chaos/test_snapshot.py b/backend/tests/chaos/test_snapshot.py new file mode 100644 index 000000000..2dac12488 --- /dev/null +++ b/backend/tests/chaos/test_snapshot.py @@ -0,0 +1,56 @@ + +import unittest +from unittest.mock import MagicMock, patch, ANY +import sys +import os +import json + +# Add backend to path +sys.path.append(os.path.join(os.path.dirname(__file__), '../../')) + +class SnapshotTest(unittest.TestCase): + + @patch('core.database.SessionLocal') + @patch('advanced_workflow_orchestrator.MODELS_AVAILABLE', True) + def test_snapshot_creation(self, mock_session_cls): + """ + Verify that _create_snapshot is called and saves to DB. + """ + # 1. Setup Mock DB + mock_db = MagicMock() + mock_session_cls.return_value.__enter__.return_value = mock_db + + from advanced_workflow_orchestrator import AdvancedWorkflowOrchestrator, WorkflowContext + + orch = AdvancedWorkflowOrchestrator() + + # 2. Create Dummy Context + ctx = WorkflowContext( + workflow_id="test_time_travel", + variables={"hero": "Mario"}, + results={"step_1": {"status": "completed", "score": 100}} + ) + ctx.execution_history = [{"step_id": "step_1"}] + + # 3. Trigger Snapshot manually (unit test the method first) + print("\n[Test Snapshot] Triggering snapshot...") + orch._create_snapshot(ctx, "step_1") + + # 4. Verify DB Insert + # We expect db.add() to be called with a WorkflowSnapshot object + mock_db.add.assert_called_once() + args, _ = mock_db.add.call_args + snapshot = args[0] + + print(f"[Test Snapshot] Captured object: {type(snapshot).__name__}") + self.assertEqual(snapshot.execution_id, "test_time_travel") + self.assertEqual(snapshot.step_id, "step_1") + self.assertEqual(snapshot.step_order, 1) + + # Verify JSON serialization + content = json.loads(snapshot.context_snapshot) + self.assertEqual(content["variables"]["hero"], "Mario") + print("SUCCESS: Snapshot saved with correct state data.") + +if __name__ == '__main__': + unittest.main() diff --git a/backend/tests/chaos/test_variables.py b/backend/tests/chaos/test_variables.py new file mode 100644 index 000000000..dd16b42f6 --- /dev/null +++ b/backend/tests/chaos/test_variables.py @@ -0,0 +1,90 @@ + +import re +import unittest +from typing import Dict, Any, Optional +from dataclasses import dataclass, field + +# --- Mock Context --- +@dataclass +class WorkflowContext: + variables: Dict[str, Any] = field(default_factory=dict) + results: Dict[str, Any] = field(default_factory=dict) + +# --- The Code Under Test (extracted from AdvancedWorkflowOrchestrator) --- +class VariableResolver: + def _resolve_variables(self, value: Any, context: WorkflowContext) -> Any: + """Resolve variables in a value (string, dict, or list)""" + if isinstance(value, str): + # Replace {{variable}} with value from context.variables + matches = re.findall(r'\{\{([^}]+)\}\}', value) + for match in matches: + # Support nested access like {{step_id.key}} + if '.' in match: + parts = match.split('.') + step_id = parts[0] + key = parts[1] + if step_id in context.results: + val = context.results[step_id].get(key, "") + value = value.replace(f"{{{{{match}}}}}", str(val)) + elif match in context.variables: + value = value.replace(f"{{{{{match}}}}}", str(context.variables[match])) + return value + elif isinstance(value, dict): + return {k: self._resolve_variables(v, context) for k, v in value.items()} + elif isinstance(value, list): + return [self._resolve_variables(v, context) for v in value] + return value + +class TestVariableResolution(unittest.TestCase): + def setUp(self): + self.resolver = VariableResolver() + self.context = WorkflowContext() + self.context.variables = { + "user": "Alice", + "count": 10, + "nested_ptr": "user" + } + self.context.results = { + "step_1": {"output": "Success", "id": 123} + } + + def test_basic_resolution(self): + """Test simple variable replacement""" + result = self.resolver._resolve_variables("Hello {{user}}", self.context) + print(f"\n[Test Basic] 'Hello {{{{user}}}}' -> '{result}'") + self.assertEqual(result, "Hello Alice") + + def test_step_output_resolution(self): + """Test accessing step results""" + result = self.resolver._resolve_variables("ID: {{step_1.id}}", self.context) + print(f"[Test Step] 'ID: {{{{step_1.id}}}}' -> '{result}'") + self.assertEqual(result, "ID: 123") + + def test_undefined_variable(self): + """Test variable that doesn't exist""" + input_str = "Value: {{missing_var}}" + result = self.resolver._resolve_variables(input_str, self.context) + print(f"[Test Undefined] '{input_str}' -> '{result}'") + # CURRENT BEHAVIOR: It likely leaves it as-is because the `if match in context.variables` check fails. + # This confirms "Silent Failure" or "Leakage" + self.assertEqual(result, input_str) + + def test_nested_curly_braces(self): + """Test nested braces which Regex struggles with""" + # Intent: Resolve {{nested_ptr}} to "user", then resolve {{user}} to "Alice" + # Actual Regex: likely matches "nested_ptr}} to output {{user" or similar weirdness + input_str = "Double: {{ {{nested_ptr}} }}" + result = self.resolver._resolve_variables(input_str, self.context) + print(f"[Test Nested] '{input_str}' -> '{result}'") + # This will almost certainly fail or produce garbage + self.assertNotEqual(result, "Double: Alice") + + def test_partial_match_ambiguity(self): + """Test when regex greedy match might fail""" + input_str = "{{user}} and {{count}}" + result = self.resolver._resolve_variables(input_str, self.context) + print(f"[Test Multi] '{input_str}' -> '{result}'") + self.assertEqual(result, "Alice and 10") + +if __name__ == '__main__': + unittest.main() diff --git a/backend/tests/chaos/test_variables_regression.py b/backend/tests/chaos/test_variables_regression.py new file mode 100644 index 000000000..89bbe0410 --- /dev/null +++ b/backend/tests/chaos/test_variables_regression.py @@ -0,0 +1,115 @@ + +import unittest +import sys +import os +from typing import Dict, Any + +# Mock context +class WorkflowContext: + def __init__(self, variables=None, results=None): + self.variables = variables or {} + self.results = results or {} + +# Import the class (we need to be able to import the method or class) +# Since we modified the file, we can import it directly if dependencies allow. +# However, AdvancedWorkflowOrchestrator has many imports. +# It might be safer to copy the NEW implementation into the test file to test the Logic in isolation, +# OR try to import. Importing is better for integration test, but might fail due to missing env. +# Let's try to import. If it fails, I'll Mock the class and inject the method. + +try: + from advanced_workflow_orchestrator import AdvancedWorkflowOrchestrator +except ImportError: + # If import fails (likely due to path), we append path + sys.path.append(os.path.join(os.path.dirname(__file__), '../../')) + try: + from advanced_workflow_orchestrator import AdvancedWorkflowOrchestrator + except ImportError: + # If deeply nested dependencies (like 'core.models') fail, we mock the class with new logic + # This mirrors the logic we JUST injected. + import re + class AdvancedWorkflowOrchestrator: + def _resolve_variables(self, value: Any, context: WorkflowContext) -> Any: + """ + Resolve variables in a value (string, dict, or list) with support for nesting. + Uses an iterative inside-out approach to handle {{ {{var}} }}. + """ + # [PASTED LOGIC FOR TEST ISOLATION IF PREVIOUS IMPORT FAILS] + # ... avoiding massive paste, relying on import first ... + pass + +# Actually, to be 100% sure we test the FILE ON DISK, we must import it. +# I will use a simple wrapper to import. + +class TestVariableRegression(unittest.TestCase): + def setUp(self): + # We need an instance. + # AdvancedWorkflowOrchestrator __init__ loads things. + # We should Mock the __init__ if possible or handle side effects. + # But wait, I can just patch the class? + pass + + def test_regression_suite(self): + """ + Comprehensive regression test for variable resolution. + """ + # 1. Instantiate + try: + orch = AdvancedWorkflowOrchestrator() + except: + # If init fails, we might need to bypass it. + # Let's create a dummy class that inherits or just use the function if it was static (it's not). + orch = AdvancedWorkflowOrchestrator.__new__(AdvancedWorkflowOrchestrator) + + # 2. Setup Context + ctx = WorkflowContext( + variables={ + "name": "Atom", + "version": 1.0, + "is_live": True, + "nested_key": "version" + }, + results={ + "step1": {"output": "http://api.com", "code": 200}, + "step2": {"data": {"id": 999}} + } + ) + + # 3. Test Cases + cases = [ + ("Hello {{name}}", "Hello Atom"), # Simple + ("v{{version}}", "v1.0"), # Number to String + ("Status: {{is_live}}", "Status: True"), # Bool to String + ("No vars", "No vars"), # Identity + ("{{missing}}", "{{missing}}"), # Undefined (Preserve) + ("Link: {{step1.output}}", "Link: http://api.com"), # Step Output + ("ID: {{step2.data.id}}", "ID: 999"), # Nested Dict Access + ("{{name}} - {{version}}", "Atom - 1.0"), # Multiple vars + + # COMPLEX / NEW FEATURES + ("{{ {{nested_key}} }}", "1.0"), # Nested: {{version}} -> 1.0 + ("{{step2.data.{{missing_key}}}}", "{{step2.data.{{missing_key}}}}"), # Broken nested + ] + + print("\n--- Regression Test Running ---") + for inp, expected in cases: + res = orch._resolve_variables(inp, ctx) + print(f"Input: '{inp}' -> Output: '{res}'") + self.assertEqual(res, expected, f"Failed on input: {inp}") + + # 4. Dictionary Test + input_dict = {"a": "{{name}}", "b": 123} + res_dict = orch._resolve_variables(input_dict, ctx) + self.assertEqual(res_dict["a"], "Atom") + self.assertEqual(res_dict["b"], 123) + print("Dictionary Test: OK") + + # 5. List Test + input_list = ["{{name}}", "{{step1.code}}"] + res_list = orch._resolve_variables(input_list, ctx) + self.assertEqual(res_list[0], "Atom") + self.assertEqual(res_list[1], "200") + print("List Test: OK") + +if __name__ == '__main__': + unittest.main() diff --git a/chaos_broken_tool.txt b/chaos_broken_tool.txt new file mode 100644 index 000000000..dba8f8d55 --- /dev/null +++ b/chaos_broken_tool.txt @@ -0,0 +1,7 @@ +>>> [CHAOS] Starting TEST 3: The Broken Tool Loop + [GOAL] Verify system handles repeated tool failures without infinite loop + [CHAOS] Executing Tool: search_web -> SIMULATING FAILURE + [CHAOS] Executing Tool: search_web -> SIMULATING FAILURE + [CHAOS] Executing Tool: search_web -> SIMULATING FAILURE + [RESULT] Agent Final Answer: I cannot search right now. +[PASS] Circuit Breaker worked (Agent gave up naturally or Loop Limit hit). diff --git a/chaos_needle_result.txt b/chaos_needle_result.txt new file mode 100644 index 000000000..9623cbb6e --- /dev/null +++ b/chaos_needle_result.txt @@ -0,0 +1,12 @@ +>>> [CHAOS] Starting TEST 2: Needle in a Haystack +[CRITICAL FAIL] module 'core' has no attribute 'memory' +Traceback (most recent call last): + File "C:\Users\Mannan Bajaj\atom\backend\tests\chaos\test_needle.py", line 30, in main + patch('core.memory.MemoryManager.get_chat_history') as mock_get_history, \ + ~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + File "C:\Python313\Lib\unittest\mock.py", line 1479, in __enter__ + self.target = self.getter() + ~~~~~~~~~~~^^ + File "C:\Python313\Lib\pkgutil.py", line 528, in resolve_name + result = getattr(result, p) +AttributeError: module 'core' has no attribute 'memory' diff --git a/check_schema.py b/check_schema.py new file mode 100644 index 000000000..6d5c51330 --- /dev/null +++ b/check_schema.py @@ -0,0 +1,23 @@ + +import sqlite3 +import os + +# Assuming default dev DB +db_path = "backend/atom_dev.db" +if not os.path.exists(db_path): + # Try alternate location if widely used + db_path = "atom_dev.db" + +print(f"Checking DB: {db_path}") + +try: + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute("PRAGMA table_info(users)") + columns = cursor.fetchall() + print("Columns in 'users' table:") + for col in columns: + print(col) + conn.close() +except Exception as e: + print(f"Error: {e}") diff --git a/convert_log.py b/convert_log.py new file mode 100644 index 000000000..ab9e592c7 --- /dev/null +++ b/convert_log.py @@ -0,0 +1,13 @@ + +try: + with open("git_log.txt", "r", encoding="utf-16-le") as f: + content = f.read() +except Exception as e: + print(f"Failed to read utf-16-le: {e}") + # Try default encoding if that failed, maybe it wasn't utf-16 + with open("git_log.txt", "r") as f: + content = f.read() + +with open("git_log_utf8.txt", "w", encoding="utf-8") as f: + f.write(content) +print("Conversion complete") diff --git a/debug_login.py b/debug_login.py new file mode 100644 index 000000000..97f3a8c69 --- /dev/null +++ b/debug_login.py @@ -0,0 +1,18 @@ +import requests + +url = "http://localhost:5059/api/auth/login" +payload = { + "username": "admin@example.com", + "password": "securePass123" +} +headers = { + "Content-Type": "application/x-www-form-urlencoded" +} + +try: + response = requests.post(url, data=payload, headers=headers) + print(f"Status Code: {response.status_code}") + print("Response Body:") + print(response.text) +except Exception as e: + print(f"Request failed: {e}") diff --git a/debug_output.txt b/debug_output.txt new file mode 100644 index 000000000..1b15aaa45 Binary files /dev/null and b/debug_output.txt differ diff --git a/debug_output_2.txt b/debug_output_2.txt new file mode 100644 index 000000000..649dcc13c Binary files /dev/null and b/debug_output_2.txt differ diff --git a/debug_output_3.txt b/debug_output_3.txt new file mode 100644 index 000000000..c117794f1 Binary files /dev/null and b/debug_output_3.txt differ diff --git a/debug_output_utf8.txt b/debug_output_utf8.txt new file mode 100644 index 000000000..b305dabab --- /dev/null +++ b/debug_output_utf8.txt @@ -0,0 +1,3 @@ +Status Code: 500 +Response Body: +{"detail":"Internal Server Error","error":"(sqlite3.OperationalError) no such column: users.skills\n[SQL: SELECT users.id AS users_id, users.email AS users_email, users.password_hash AS users_password_hash, users.first_name AS users_first_name, users.last_name AS users_last_name, users.role AS users_role, users.status AS users_status, users.skills AS users_skills, users.capacity_hours AS users_capacity_hours, users.hourly_cost_rate AS users_hourly_cost_rate, users.metadata_json AS users_metadata_json, users.created_at AS users_created_at, users.updated_at AS users_updated_at, users.last_login AS users_last_login \nFROM users \nWHERE users.email = ?\n LIMIT ? OFFSET ?]\n[parameters: ('admin@example.com', 1, 0)]\n(Background on this error at: https://sqlalche.me/e/20/e3q8)","traceback":"Traceback (most recent call last):\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\base.py\", line 1963, in _exec_single_context\n self.dialect.do_execute(\n ~~~~~~~~~~~~~~~~~~~~~~~^\n cursor, str_statement, effective_parameters, context\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\default.py\", line 943, in do_execute\n cursor.execute(statement, parameters)\n ~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^\nsqlite3.OperationalError: no such column: users.skills\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File \"C:\\Users\\Mannan Bajaj\\atom\\backend\\core\\auth_endpoints.py\", line 44, in login_for_access_token\n user = db.query(User).filter(User.email == form_data.username).first()\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\orm\\query.py\", line 2759, in first\n return self.limit(1)._iter().first() # type: ignore\n ~~~~~~~~~~~~~~~~~~~^^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\orm\\query.py\", line 2857, in _iter\n result: Union[ScalarResult[_T], Result[_T]] = self.session.execute(\n ~~~~~~~~~~~~~~~~~~~~^\n statement,\n ^^^^^^^^^^\n params,\n ^^^^^^^\n execution_options={\"_sa_orm_load_options\": self.load_options},\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\orm\\session.py\", line 2365, in execute\n return self._execute_internal(\n ~~~~~~~~~~~~~~~~~~~~~~^\n statement,\n ^^^^^^^^^^\n ...<4 lines>...\n _add_event=_add_event,\n ^^^^^^^^^^^^^^^^^^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\orm\\session.py\", line 2251, in _execute_internal\n result: Result[Any] = compile_state_cls.orm_execute_statement(\n ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^\n self,\n ^^^^^\n ...<4 lines>...\n conn,\n ^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\orm\\context.py\", line 306, in orm_execute_statement\n result = conn.execute(\n statement, params or {}, execution_options=execution_options\n )\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\base.py\", line 1415, in execute\n return meth(\n self,\n distilled_parameters,\n execution_options or NO_OPTIONS,\n )\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\sql\\elements.py\", line 523, in _execute_on_connection\n return connection._execute_clauseelement(\n ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^\n self, distilled_params, execution_options\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\base.py\", line 1637, in _execute_clauseelement\n ret = self._execute_context(\n dialect,\n ...<8 lines>...\n cache_hit=cache_hit,\n )\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\base.py\", line 1842, in _execute_context\n return self._exec_single_context(\n ~~~~~~~~~~~~~~~~~~~~~~~~~^\n dialect, context, statement, parameters\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\base.py\", line 1982, in _exec_single_context\n self._handle_dbapi_exception(\n ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^\n e, str_statement, effective_parameters, cursor, context\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\base.py\", line 2351, in _handle_dbapi_exception\n raise sqlalchemy_exception.with_traceback(exc_info[2]) from e\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\base.py\", line 1963, in _exec_single_context\n self.dialect.do_execute(\n ~~~~~~~~~~~~~~~~~~~~~~~^\n cursor, str_statement, effective_parameters, context\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n )\n ^\n File \"C:\\Users\\Mannan Bajaj\\AppData\\Roaming\\Python\\Python313\\site-packages\\sqlalchemy\\engine\\default.py\", line 943, in do_execute\n cursor.execute(statement, parameters)\n ~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^\nsqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: users.skills\n[SQL: SELECT users.id AS users_id, users.email AS users_email, users.password_hash AS users_password_hash, users.first_name AS users_first_name, users.last_name AS users_last_name, users.role AS users_role, users.status AS users_status, users.skills AS users_skills, users.capacity_hours AS users_capacity_hours, users.hourly_cost_rate AS users_hourly_cost_rate, users.metadata_json AS users_metadata_json, users.created_at AS users_created_at, users.updated_at AS users_updated_at, users.last_login AS users_last_login \nFROM users \nWHERE users.email = ?\n LIMIT ? OFFSET ?]\n[parameters: ('admin@example.com', 1, 0)]\n(Background on this error at: https://sqlalche.me/e/20/e3q8)\n"} diff --git a/frontend-nextjs/.babelrc b/frontend-nextjs/.babelrc.disable similarity index 100% rename from frontend-nextjs/.babelrc rename to frontend-nextjs/.babelrc.disable diff --git a/frontend-nextjs/build_log.txt b/frontend-nextjs/build_log.txt new file mode 100644 index 000000000..a1a6c3497 Binary files /dev/null and b/frontend-nextjs/build_log.txt differ diff --git a/frontend-nextjs/components/WorkflowAutomation.tsx b/frontend-nextjs/components/WorkflowAutomation.tsx index f09dc71bc..1dca42e08 100644 --- a/frontend-nextjs/components/WorkflowAutomation.tsx +++ b/frontend-nextjs/components/WorkflowAutomation.tsx @@ -60,6 +60,8 @@ import { AlertTriangle, FileText, Activity, + History, // [Lesson 3] + GitBranch, // [Lesson 3] } from "lucide-react"; interface WorkflowTemplate { @@ -142,6 +144,12 @@ const WorkflowAutomation: React.FC = () => { const [builderInitialData, setBuilderInitialData] = useState(null); // For AI generated workflows const [genPrompt, setGenPrompt] = useState(""); + // [Lesson 3] Time-Travel State + const [isForkModalOpen, setIsForkModalOpen] = useState(false); + const [forkStepId, setForkStepId] = useState(null); + const [forkVariables, setForkVariables] = useState>({}); + // [Lesson 3] UX: Raw string state for editable text area + const [forkVariablesJson, setForkVariablesJson] = useState("{}"); const { toast } = useToast(); // Fetch initial data @@ -440,6 +448,47 @@ const WorkflowAutomation: React.FC = () => { } }; + // [Lesson 3] Time-Travel / Fork Handler + const handleForkWorkflow = async () => { + if (!activeExecution || !forkStepId) return; + + try { + setExecuting(true); + const response = await fetch( + `/api/time-travel/workflows/${activeExecution.execution_id}/fork`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + step_id: forkStepId, + new_variables: forkVariables + }), + } + ); + + const data = await response.json(); + if (response.ok) { + toast({ + title: "Timeline Forked! 🌌", + description: `Created parallel universe: ${data.new_execution_id}`, + }); + await fetchExecutions(); // Refresh list + setIsForkModalOpen(false); + setIsExecutionModalOpen(false); // Close details + } else { + throw new Error(data.detail || "Fork failed"); + } + } catch (error) { + console.error("Fork Error:", error); + toast({ + title: "Time-Travel Failed", + description: "Could not fork timeline.", + variant: "error", + }); + } finally { + setExecuting(false); + } + }; const handleFormChange = (field: string, value: any) => { setFormData((prev) => ({ ...prev, @@ -796,8 +845,16 @@ const WorkflowAutomation: React.FC = () => { {execution.workflow_id} - - + {/* [Lesson 3] UX: Visual indicator for forked workflows */} + { + execution.execution_id.includes("-forked-") && ( + + (forked) + + ) + } + +

Started:{" "} @@ -810,7 +867,7 @@ const WorkflowAutomation: React.FC = () => {

)}
- +
@@ -866,24 +923,26 @@ const WorkflowAutomation: React.FC = () => {
- - - + + + ))} - {executions.length === 0 && ( - - - No executions yet - - Execute a workflow to see execution history here. - - - )} - - + { + executions.length === 0 && ( + + + No executions yet + + Execute a workflow to see execution history here. + + + ) + } + + {/* Services Tab */} - + < TabsContent value="services" className="mt-6" >
{Object.entries(services).map(([serviceName, serviceInfo]) => ( @@ -923,8 +982,8 @@ const WorkflowAutomation: React.FC = () => { ))}
-
- + + )} {/* Template Execution Modal */} @@ -1206,38 +1265,150 @@ const WorkflowAutomation: React.FC = () => { Step: {stepId} +
+ + Captured State + + +
                                 {JSON.stringify(result, null, 2)}
                               
-
-
+ + ), )} - - + + )} + { + activeExecution.errors && activeExecution.errors.length > 0 && ( + + + Errors +
+ {activeExecution.errors.map((error, index) => ( + + {error} + + ))} +
+
+ ) + } + + )} + + + + + - {activeExecution.errors && activeExecution.errors.length > 0 && ( - - - Errors -
- {activeExecution.errors.map((error, index) => ( - - {error} - - ))} -
-
+ + {/* [Lesson 3] Fork / Time Travel Modal */} + < Dialog open={isForkModalOpen} onOpenChange={setIsForkModalOpen} > + + + + + Time Travel: Fork from Step {forkStepId} + + + Create a parallel universe starting from this step. You can patch variables to fix errors. + + + +
+ + + Branching Timeline + + Original execution {activeExecution?.execution_id} will be preserved. A new execution will function as a "Clone". + + + +
+
+ + + {Object.keys(forkVariables).length} params + +
+ + {Object.keys(forkVariables).length === 0 ? ( +
+

No tunable parameters found for this step.

+

Forking will proceed with original state.

+
+ ) : ( +
+ {Object.entries(forkVariables).map(([key, value]) => ( +
+ + { + const newVal = e.target.value; + // Try to conserve types (number/bool) if possible, otherwise string + let typedVal: any = newVal; + if (newVal === 'true') typedVal = true; + else if (newVal === 'false') typedVal = false; + else if (!isNaN(Number(newVal)) && newVal.trim() !== '') typedVal = Number(newVal); + + setForkVariables(prev => ({ ...prev, [key]: typedVal })); + }} + /> +
+ ))} +
)}
- )} +
+ - + +
- - + + + ); }; diff --git a/frontend-nextjs/full_log.txt b/frontend-nextjs/full_log.txt new file mode 100644 index 000000000..89f915600 Binary files /dev/null and b/frontend-nextjs/full_log.txt differ diff --git a/frontend-nextjs/hooks/useVoiceAgent.ts b/frontend-nextjs/hooks/useVoiceAgent.ts index 10c21122c..73251c414 100644 --- a/frontend-nextjs/hooks/useVoiceAgent.ts +++ b/frontend-nextjs/hooks/useVoiceAgent.ts @@ -41,6 +41,7 @@ export const useVoiceAgent = (): UseVoiceAgentReturn => { if (audioRef.current) { audioRef.current.pause(); audioRef.current.currentTime = 0; + } setIsPlaying(false); }, []); @@ -93,6 +94,7 @@ export const useVoiceAgent = (): UseVoiceAgentReturn => { console.error("Error creating audio object:", error); setIsPlaying(false); } + }, [stopAudio]); return { diff --git a/frontend-nextjs/log_2_ascii.txt b/frontend-nextjs/log_2_ascii.txt new file mode 100644 index 000000000..19cc398c2 --- /dev/null +++ b/frontend-nextjs/log_2_ascii.txt @@ -0,0 +1,5 @@ + +> atomic-app@0.1.0-alpha.1 type-check +> tsc --noEmit + +components/Microsoft365Integration.tsx(1245,129): error TS2322: Type '"destructive"' is not assignable to type '"error" | "default" | "success" | "warning"'. diff --git a/frontend-nextjs/log_ascii.txt b/frontend-nextjs/log_ascii.txt new file mode 100644 index 000000000..e266eb38f --- /dev/null +++ b/frontend-nextjs/log_ascii.txt @@ -0,0 +1,40 @@ + +> atomic-app@0.1.0-alpha.1 type-check +> tsc --noEmit + +components/Microsoft365Integration.tsx(405,13): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(469,17): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(485,13): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(532,17): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(549,13): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(571,17): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(577,13): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(592,17): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(601,13): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1245,75): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1250,71): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1271,67): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1298,67): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1313,67): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1336,67): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1346,67): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1368,53): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. +components/Microsoft365Integration.tsx(1373,67): error TS2349: This expression is not callable. + Type '{ toast: (props: Omit) => void; dismiss: (id: string) => void; toasts: ToastProps[]; }' has no call signatures. diff --git a/frontend-nextjs/next.config.js b/frontend-nextjs/next.config.js index 1cda40c77..9e9aff8ec 100644 --- a/frontend-nextjs/next.config.js +++ b/frontend-nextjs/next.config.js @@ -57,6 +57,10 @@ const nextConfig = { source: "/api/intelligence/:path*", destination: "http://localhost:8000/api/intelligence/:path*", }, + { + source: "/api/time-travel/:path*", + destination: "http://localhost:8000/api/time-travel/:path*", + }, // Add general API rewrite for other endpoints { source: "/api/v1/:path*", diff --git a/frontend-nextjs/pages/agents.tsx b/frontend-nextjs/pages/agents.tsx deleted file mode 100644 index 8a188415e..000000000 --- a/frontend-nextjs/pages/agents.tsx +++ /dev/null @@ -1,12 +0,0 @@ -import React from 'react'; -import AgentStudio from '../components/Agents/AgentStudio'; - -const AgentsPage: React.FC = () => { - return ( -
- -
- ); -}; - -export default AgentsPage; diff --git a/frontend-nextjs/pages/finance.tsx b/frontend-nextjs/pages/finance.tsx deleted file mode 100644 index 5e5a86e09..000000000 --- a/frontend-nextjs/pages/finance.tsx +++ /dev/null @@ -1,15 +0,0 @@ -import React from 'react'; -import Head from 'next/head'; -import { FinanceCommandCenter } from '@/components/dashboards/FinanceCommandCenter'; -import Layout from '@/components/layout/Layout'; - -export default function FinancePage() { - return ( - - - Finance Command Center | Atom - - - - ); -} diff --git a/git_log.txt b/git_log.txt new file mode 100644 index 000000000..1540b79e0 Binary files /dev/null and b/git_log.txt differ diff --git a/git_log_utf8.txt b/git_log_utf8.txt new file mode 100644 index 000000000..68632e451 --- /dev/null +++ b/git_log_utf8.txt @@ -0,0 +1,20 @@ +0bd261ba|2025-12-24|Fix backend 404s, Auth stability, and clean up build artifacts +84938916|2025-12-24|Standardize MS365 integration and fix NLU visualization +3e862a0d|2025-12-22|docs: update README with Universal BYOK and External Governance features +054a03b5|2025-12-22|Refactor multi-tenant memory isolation and implement Phase 64 budget guardrails +71c1d933|2025-12-21|feat: implement autonomous stakeholder communication generation +8671fdc7|2025-12-21|feat: implement small business mastery features (phases 60-62) +2ffdbefe|2025-12-21|Phases 56-59 complete: Advanced AI Reasoning & Small Biz Financial Intelligence +c7706b2a|2025-12-21|Implement Phase 40-42: AI Accounting, GraphRAG with Stakeholders, and Advanced Workflow Orchestration +81fe7790|2025-12-21|docs: Add Knowledge Graph & AI Memory as key selling point +c4fcf452|2025-12-21|docs: Add AI Accounting, Financial Ops, and Event-Sourced Architecture features to README +5916038a|2025-12-20|fix: add API rewrites for sales and accounting, add demo seeding script +9e546671|2025-12-20|feat: implement AI Sales Dashboard with Lead, Deal, and Meeting Intelligence +67ad1e1b|2025-12-20|feat: implement AI Sales Automation suite with CRM intelligence and memory integration +8517df2d|2025-12-20|Implement Phase 10 & 11: Cross-System Reasoning & HITL Enhancements +2d0b93ff|2025-12-20|Fix login 500 & infinite loop, stabilize backend/frontend +d3eb55c7|2025-12-20|Merge branch 'upstream/main' into office-365-integration +1ca11895|2025-12-20|Added office 365 apps integrations including excel, power bi, teams etc. +5ccfbb99|2025-12-19|feat: universal integrations, dynamic templates, and frontend refinements (Phase 13) +3707d745|2025-12-19|feat: implement unified LanceDB ingestion pipeline with hybrid search +1849ecce|2025-12-19|Merge remote-tracking branch 'origin/main' and resolve conflicts in favor of advanced local implementation diff --git a/migrate_db.py b/migrate_db.py new file mode 100644 index 000000000..16e040d0d --- /dev/null +++ b/migrate_db.py @@ -0,0 +1,31 @@ + +import sqlite3 +import os + +db_path = "backend/atom_dev.db" +if not os.path.exists(db_path): + print("DB not found at backend/atom_dev.db, trying atom_dev.db") + db_path = "atom_dev.db" + +conn = sqlite3.connect(db_path) +cursor = conn.cursor() + +def add_column_if_not_exists(table, col_name, col_type): + try: + cursor.execute(f"ALTER TABLE {table} ADD COLUMN {col_name} {col_type}") + print(f"Added {col_name}") + except sqlite3.OperationalError as e: + if "duplicate column name" in str(e): + print(f"Column {col_name} already exists") + else: + print(f"Error adding {col_name}: {e}") + +print(f"Migrating {db_path}...") +add_column_if_not_exists("users", "skills", "TEXT") +add_column_if_not_exists("users", "capacity_hours", "FLOAT DEFAULT 40.0") +add_column_if_not_exists("users", "hourly_cost_rate", "FLOAT DEFAULT 0.0") +add_column_if_not_exists("users", "metadata_json", "TEXT") # SQLite uses TEXT for JSON + +conn.commit() +conn.close() +print("Migration done.") diff --git a/needle_debug_log.txt b/needle_debug_log.txt new file mode 100644 index 000000000..747574f11 Binary files /dev/null and b/needle_debug_log.txt differ diff --git a/package-lock.json b/package-lock.json index 136a37699..7a659e552 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,9 @@ { +<<<<<<< HEAD + "name": "atom", +======= "name": "app", +>>>>>>> upstream/main "lockfileVersion": 3, "requires": true, "packages": { @@ -981,8 +985,12 @@ "resolved": "https://registry.npmjs.org/devtools-protocol/-/devtools-protocol-0.0.1534754.tgz", "integrity": "sha512-26T91cV5dbOYnXdJi5qQHoTtUoNEqwkHcAyu/IKtjIAxiEqPMrDiRkDOPWVsGfNZGmlQVHQbZRSjD8sxagWVsQ==", "dev": true, +<<<<<<< HEAD + "license": "BSD-3-Clause" +======= "license": "BSD-3-Clause", "peer": true +>>>>>>> upstream/main }, "node_modules/dunder-proto": { "version": "1.0.1", @@ -1888,7 +1896,12 @@ "version": "0.27.0", "resolved": "https://registry.npmjs.org/scheduler/-/scheduler-0.27.0.tgz", "integrity": "sha512-eNv+WrVbKu1f3vbYJT/xtiF5syA5HPIMtf9IgY/nKg0sWqzAUEvqY/xm7OcZc/qafLx/iO9FgOmeSAp4v5ti/Q==", +<<<<<<< HEAD + "license": "MIT", + "peer": true +======= "license": "MIT" +>>>>>>> upstream/main }, "node_modules/semver": { "version": "7.7.3",