Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ JSON file:

## Architecture

For the planned Space background-worker architecture, see
[`docs/phase-3-space-background-workers.md`](docs/phase-3-space-background-workers.md).

### Component Overview

```
Expand Down
267 changes: 266 additions & 1 deletion agent/core/session_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import logging
import os
from datetime import UTC, datetime
import uuid
from datetime import UTC, datetime, timedelta
from typing import Any

from bson import BSON
Expand Down Expand Up @@ -86,6 +87,9 @@ async def append_event(self, *_: Any, **__: Any) -> int | None:
async def load_events_after(self, *_: Any, **__: Any) -> list[dict[str, Any]]:
return []

async def latest_event_seq(self, *_: Any, **__: Any) -> int:
return 0

async def append_trace_message(self, *_: Any, **__: Any) -> int | None:
return None

Expand All @@ -98,6 +102,21 @@ async def try_increment_quota(self, *_: Any, **__: Any) -> int | None:
async def refund_quota(self, *_: Any, **__: Any) -> None:
return None

async def enqueue_run(self, *_: Any, **__: Any) -> dict[str, Any] | None:
return None

async def claim_next_run(self, *_: Any, **__: Any) -> dict[str, Any] | None:
return None

async def heartbeat_run(self, *_: Any, **__: Any) -> bool:
return False

async def finish_run(self, *_: Any, **__: Any) -> None:
return None

async def interrupt_expired_runs(self, *_: Any, **__: Any) -> int:
return 0


class MongoSessionStore(NoopSessionStore):
"""MongoDB-backed session store."""
Expand Down Expand Up @@ -152,6 +171,14 @@ async def _create_indexes(self) -> None:
[("session_id", 1), ("seq", 1)], unique=True
)
await self.db.session_trace_messages.create_index([("created_at", -1)])
await self.db.session_runs.create_index(
[("status", 1), ("lease_until", 1), ("created_at", 1)]
)
await self.db.session_runs.create_index([("session_id", 1), ("created_at", -1)])
await self.db.session_runs.create_index([("user_id", 1), ("created_at", -1)])
await self.db.session_runs.create_index(
[("idempotency_key", 1)], unique=True, sparse=True
)

def _ready(self) -> bool:
return bool(self.enabled and self.db is not None)
Expand Down Expand Up @@ -348,6 +375,15 @@ async def load_events_after(self, session_id: str, after_seq: int = 0) -> list[d
).sort("seq", 1)
return [row async for row in cursor]

async def latest_event_seq(self, session_id: str) -> int:
if not self._ready():
return 0
doc = await self.db.session_events.find_one(
{"session_id": session_id},
sort=[("seq", -1)],
)
return int(doc.get("seq", 0)) if doc else 0

async def append_trace_message(
self, session_id: str, message: dict[str, Any], source: str = "message"
) -> int | None:
Expand Down Expand Up @@ -410,6 +446,235 @@ async def refund_quota(self, user_id: str, day: str) -> None:
{"$inc": {"count": -1}, "$set": {"updated_at": _now()}},
)

async def enqueue_run(
self,
*,
session_id: str,
user_id: str,
operation: dict[str, Any],
idempotency_key: str | None = None,
surface: str = "space",
) -> dict[str, Any] | None:
"""Create a durable queued run and attach it to the session.

Returns None when the session already has an active run. The caller can
surface that as a 409 rather than starting two concurrent turns against
the same context.
"""
if not self._ready():
return None
if idempotency_key:
existing = await self.db.session_runs.find_one(
{"idempotency_key": idempotency_key}
)
if existing:
return existing

now = _now()
run_id = str(uuid.uuid4())
run = {
"_id": run_id,
"run_id": run_id,
"schema_version": SCHEMA_VERSION,
"session_id": session_id,
"user_id": user_id,
"surface": surface,
"operation": operation,
"status": "queued",
"idempotency_key": idempotency_key,
"lease_owner": None,
"lease_until": None,
"retry_count": 0,
"max_retries": 1,
"created_at": now,
"started_at": None,
"updated_at": now,
"finished_at": None,
"error": None,
}
try:
await self.db.session_runs.insert_one(run)
except DuplicateKeyError:
if idempotency_key:
return await self.db.session_runs.find_one(
{"idempotency_key": idempotency_key}
)
raise

attached = await self.db.sessions.update_one(
{
"_id": session_id,
"$or": [
{"active_run_id": {"$exists": False}},
{"active_run_id": None},
],
},
{
"$set": {
"active_run_id": run_id,
"runtime_state": "queued",
"updated_at": now,
}
},
)
if attached.matched_count == 0:
await self.db.session_runs.update_one(
{"_id": run_id},
{
"$set": {
"status": "cancelled",
"error": "session already has an active run",
"updated_at": _now(),
"finished_at": _now(),
}
},
)
return None
return run

async def claim_next_run(
self,
*,
worker_id: str,
lease_seconds: int = 120,
) -> dict[str, Any] | None:
"""Atomically claim the oldest queued run for a worker."""
if not self._ready():
return None
now = _now()
lease_until = now + timedelta(seconds=lease_seconds)
run = await self.db.session_runs.find_one_and_update(
{"status": "queued"},
{
"$set": {
"status": "running",
"lease_owner": worker_id,
"lease_until": lease_until,
"started_at": now,
"updated_at": now,
},
"$inc": {"retry_count": 1},
},
sort=[("created_at", 1)],
return_document=ReturnDocument.AFTER,
)
if not run:
return None

guarded = await self.db.sessions.update_one(
{"_id": run["session_id"], "active_run_id": run["_id"]},
{
"$set": {
"runtime_state": "processing",
"worker_owner": worker_id,
"worker_lease_until": lease_until,
"updated_at": now,
}
},
)
if guarded.matched_count == 0:
await self.finish_run(
run["_id"],
status="interrupted",
error="session guard failed while claiming run",
)
return None
return run

async def heartbeat_run(
self,
run_id: str,
*,
worker_id: str,
lease_seconds: int = 120,
) -> bool:
if not self._ready():
return False
now = _now()
lease_until = now + timedelta(seconds=lease_seconds)
result = await self.db.session_runs.update_one(
{"_id": run_id, "status": "running", "lease_owner": worker_id},
{"$set": {"lease_until": lease_until, "updated_at": now}},
)
if result.matched_count:
run = await self.db.session_runs.find_one({"_id": run_id})
if run:
await self.db.sessions.update_one(
{"_id": run["session_id"], "active_run_id": run_id},
{
"$set": {
"worker_lease_until": lease_until,
"updated_at": now,
}
},
)
return bool(result.matched_count)

async def finish_run(
self,
run_id: str,
*,
status: str,
error: str | None = None,
) -> None:
if not self._ready():
return
now = _now()
run = await self.db.session_runs.find_one_and_update(
{"_id": run_id},
{
"$set": {
"status": status,
"error": error,
"updated_at": now,
"finished_at": now,
"lease_until": None,
}
},
return_document=ReturnDocument.AFTER,
)
if not run:
return

runtime_state = {
"completed": "idle",
"waiting_approval": "waiting_approval",
"failed": "idle",
"cancelled": "idle",
"interrupted": "interrupted",
}.get(status, "idle")
await self.db.sessions.update_one(
{"_id": run["session_id"], "active_run_id": run_id},
{
"$set": {
"runtime_state": runtime_state,
"active_run_id": None,
"worker_owner": None,
"worker_lease_until": None,
"updated_at": now,
}
},
)

async def interrupt_expired_runs(self, *, before: datetime | None = None) -> int:
"""Mark expired running runs interrupted instead of replaying tool calls."""
if not self._ready():
return 0
now = _now()
cutoff = before or now
cursor = self.db.session_runs.find(
{"status": "running", "lease_until": {"$lt": cutoff}}
)
count = 0
async for run in cursor:
await self.finish_run(
run["_id"],
status="interrupted",
error="worker lease expired",
)
count += 1
return count


_store: NoopSessionStore | MongoSessionStore | None = None

Expand Down
Loading
Loading