Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 93 additions & 10 deletions bindu/server/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager, nullcontext
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, AsyncIterator
from uuid import UUID

import anyio
from opentelemetry.trace import get_tracer, use_span
Expand Down Expand Up @@ -242,19 +244,100 @@ def build_artifacts(self, result: Any) -> list[Artifact]:
async def _handle_pause(self, params: TaskIdParams) -> None:
"""Handle pause operation.

TODO: Implement task pause functionality
- Save current execution state
- Update task to 'suspended' state
- Release resources while preserving context
Saves task execution context and transitions task to suspended state.

Implementation:
- Load task from storage
- Save execution state to task metadata (paused_at timestamp)
- Update task state to 'suspended'
- Release resources while preserving task context
- Log the pause operation

Args:
params: Task identification parameters containing task_id
"""
raise NotImplementedError("Pause operation not yet implemented")
task_id: UUID = params["task_id"]

try:
# Load the task to ensure it exists
task = await self.storage.load_task(task_id)
if not task:
logger.error(f"Task {task_id} not found for pause operation")
raise ValueError(f"Task {task_id} not found")

# Prepare metadata with pause timestamp
pause_metadata = {
"paused_at": datetime.now(timezone.utc).isoformat(),
"paused_from_state": task.get("status", {}).get("state", "unknown"),
}

# Update task to suspended state with pause metadata
await self.storage.update_task(
task_id,
state="suspended",
metadata=pause_metadata,
)

logger.info(
f"Task {task_id} paused successfully. "
f"Previous state: {pause_metadata['paused_from_state']}"
)

except Exception as e:
logger.error(f"Failed to pause task {task_id}: {e}", exc_info=True)
raise

async def _handle_resume(self, params: TaskIdParams) -> None:
"""Handle resume operation.

TODO: Implement task resume functionality
- Restore execution state
- Update task to 'resumed' state
- Continue from last checkpoint
Restores task execution context and transitions task to resumed state.

Implementation:
- Load task from storage
- Verify task is in suspended state (graceful if not)
- Restore execution context from task metadata if available
- Update task state to 'resumed'
- Log the resume operation

Args:
params: Task identification parameters containing task_id
"""
raise NotImplementedError("Resume operation not yet implemented")
task_id: UUID = params["task_id"]

try:
# Load the task to ensure it exists
task = await self.storage.load_task(task_id)
if not task:
logger.error(f"Task {task_id} not found for resume operation")
raise ValueError(f"Task {task_id} not found")

current_state = task.get("status", {}).get("state", "unknown")

# Prepare metadata with resume timestamp and original pause info
resume_metadata = {
"resumed_at": datetime.now(timezone.utc).isoformat(),
"resumed_from_state": current_state,
}

# If task was suspended, preserve the original paused_at timestamp
if "metadata" in task and "paused_at" in task.get("metadata", {}):
resume_metadata["paused_at"] = task["metadata"]["paused_at"]
resume_metadata["paused_from_state"] = task["metadata"].get(
"paused_from_state", "unknown"
)

# Update task to resumed state with resume metadata
await self.storage.update_task(
task_id,
state="resumed",
metadata=resume_metadata,
)

logger.info(
f"Task {task_id} resumed successfully. "
f"Previous state: {current_state}"
)

except Exception as e:
logger.error(f"Failed to resume task {task_id}: {e}", exc_info=True)
raise
Loading
Loading