Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions safe_mini/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
"""safe-mini: a safe-by-construction local-execution substrate for mini-swe-agent-style coding agents.

Public API (post-port; currently stubs):
- Chunk, Budget, RunResult, FailureClass, ObservationPolicy, ExecutorPolicy (canonical types)
- AgentRunner (Protocol contract)
- SafeMiniRunner (concrete implementation)
- ExecutorPolicies, ObservationPolicies (policy registries)
- classify_failure (failure classifier helper)
"""
"""safe-mini: a safe local-execution substrate for mini-swe-agent-style agents."""

__version__ = "0.1.0"

# Stubs — replaced once the careful port from reference/ lab_safe_mini_agent.py lands.
from .action_parser import ActionFormat, ActionParseError, ParsedAction, parse_action
from .classifier import classify_failure
from .policies import ExecutorPolicies
from .protocol import AgentRunner
from .runner import SafeMiniRunner
from .types import (
Budget,
Chunk,
ExecutorPolicy,
FailureClass,
Observation,
ObservationPolicy,
RunResult,
)

__all__ = [
"ActionFormat",
"ActionParseError",
"AgentRunner",
"Budget",
"Chunk",
"ExecutorPolicies",
"ExecutorPolicy",
"FailureClass",
"Observation",
"ObservationPolicy",
"ParsedAction",
"RunResult",
"SafeMiniRunner",
"__version__",
"classify_failure",
"parse_action",
]
69 changes: 69 additions & 0 deletions safe_mini/action_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Action protocol parsing for safe-mini runner loops."""

from __future__ import annotations

import json
import re
from dataclasses import dataclass
from enum import StrEnum
from typing import Any


class ActionFormat(StrEnum):
"""Supported one-action response protocols."""

FENCED_BASH = "fenced-bash"
JSON = "json"


@dataclass(frozen=True)
class ParsedAction:
"""A parsed shell action plus the protocol format that produced it."""

command: str
format: ActionFormat


class ActionParseError(ValueError):
"""Raised when a model response does not contain exactly one valid action."""


def parse_action(text: str) -> ParsedAction:
"""Parse a model response into one shell command.

Supported forms:
- A single fenced block labeled ``bash-action``.
- A JSON object with ``{"action": "bash", "command": "..."}``.
"""

stripped = text.strip()
if stripped.startswith("{"):
return _parse_json_action(stripped)
return _parse_fenced_bash(stripped)


def _parse_fenced_bash(text: str) -> ParsedAction:
matches = re.findall(r"```bash-action\s*\n(.*?)\n```", text, re.DOTALL)
if len(matches) != 1:
raise ActionParseError(f"expected exactly one bash-action, found {len(matches)}")
command = matches[0].strip()
if not command:
raise ActionParseError("bash-action command cannot be empty")
return ParsedAction(command=command, format=ActionFormat.FENCED_BASH)


def _parse_json_action(text: str) -> ParsedAction:
try:
payload: Any = json.loads(text)
except json.JSONDecodeError as exc:
raise ActionParseError(f"invalid JSON action: {exc.msg}") from exc

if not isinstance(payload, dict):
raise ActionParseError("JSON action must be an object")
if payload.get("action") != "bash":
raise ActionParseError("JSON action must set action='bash'")

command = payload.get("command")
if not isinstance(command, str) or not command.strip():
raise ActionParseError("JSON action command must be a non-empty string")
return ParsedAction(command=command.strip(), format=ActionFormat.JSON)
27 changes: 27 additions & 0 deletions safe_mini/classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Failure classification for safe-mini run results."""

from __future__ import annotations

from safe_mini.types import FailureClass, RunResult


def classify_failure(result: RunResult) -> FailureClass:
"""Classify a failed run into safe-mini's seven-class taxonomy."""

if result.blocked_commands:
return FailureClass.SAFETY_VIOLATION
if result.action_protocol_violations:
return FailureClass.ACTION_PROTOCOL_VIOLATION
if result.reward_hacking_detected:
return FailureClass.REWARD_HACKING

transcript_text = "\n".join(str(item.get("content", "")) for item in result.transcript)
if "[truncated" in transcript_text:
return FailureClass.CONTEXT_STARVATION
if result.observation_budget_exhausted:
return FailureClass.BUDGET_EXHAUSTED
if result.steps == 0:
return FailureClass.EMBODIMENT_FAILURE
if not result.final_tests_pass and result.steps:
return FailureClass.BUDGET_EXHAUSTED
return FailureClass.EXHAUSTED_IDEAS
5 changes: 5 additions & 0 deletions safe_mini/observation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Observation policy helpers."""

from .policies import apply_observation_policy, final_tests_pass

__all__ = ["apply_observation_policy", "final_tests_pass"]
106 changes: 106 additions & 0 deletions safe_mini/observation/policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Observation shaping policies and final test checks."""

from __future__ import annotations

import json
import os
import subprocess
from pathlib import Path

from safe_mini.types import Observation, ObservationPolicy
from safe_mini.worktree import SANITIZED_PATH


def apply_observation_policy(
observation: Observation, policy: ObservationPolicy, limit: int
) -> Observation:
"""Return a copy of ``observation`` with output shaped to the requested policy."""

limit = max(limit, 0)
if policy in {ObservationPolicy.FULL, ObservationPolicy.TAIL}:
output, truncated = _truncate_tail(observation.raw_output or observation.output, limit)
elif policy == ObservationPolicy.HEADTAIL:
output, truncated = _headtail(observation.raw_output or observation.output, limit)
elif policy == ObservationPolicy.STRUCTURED:
output, truncated = _structured(observation, include_raw_tail=False, limit=limit)
elif policy == ObservationPolicy.STRUCTURED_RAW_TAIL:
output, truncated = _structured(observation, include_raw_tail=True, limit=limit)
else:
raise ValueError(f"unknown observation policy: {policy}")

return Observation(
command=observation.command,
output=output,
returncode=observation.returncode,
blocked=observation.blocked,
stdout=observation.stdout,
stderr=observation.stderr,
raw_output=observation.raw_output,
truncated=observation.truncated or truncated,
timed_out=observation.timed_out,
)


def final_tests_pass(cwd: str | Path, *, command: str = "python3 tests/run_tests.py") -> bool:
"""Run the repo's final verification command in the scoped environment."""

cwd_path = Path(cwd)
proc = subprocess.run(
command,
shell=True,
cwd=cwd_path,
env={
"PATH": os.environ.get("PATH") or SANITIZED_PATH,
"PYTHONPATH": str(cwd_path),
"HOME": str(cwd_path / ".agent-home"),
"LANG": "C.UTF-8",
},
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
)
return proc.returncode == 0


def _truncate_tail(text: str, limit: int) -> tuple[str, bool]:
if limit == 0:
return "", bool(text)
if len(text) <= limit:
return text, False
return f"[truncated to last {limit} chars]\n{text[-limit:]}", True


def _headtail(text: str, limit: int) -> tuple[str, bool]:
if limit == 0:
return "", bool(text)
if len(text) <= limit:
return text, False
head_len = limit // 2
tail_len = limit - head_len
return (
f"[truncated to first {head_len} and last {tail_len} chars]\n"
f"{text[:head_len]}\n...\n{text[-tail_len:]}",
True,
)


def _structured(
observation: Observation, *, include_raw_tail: bool, limit: int
) -> tuple[str, bool]:
payload: dict[str, object] = {
"command": observation.command,
"returncode": observation.returncode,
"blocked": observation.blocked,
"timed_out": observation.timed_out,
"stdout_chars": len(observation.stdout),
"stderr_chars": len(observation.stderr),
}
truncated = False
if include_raw_tail:
raw_tail, truncated = _truncate_tail(observation.raw_output or observation.output, limit)
payload["raw_tail"] = raw_tail
else:
payload["stdout"] = observation.stdout
payload["stderr"] = observation.stderr
return json.dumps(payload, sort_keys=True), truncated
22 changes: 22 additions & 0 deletions safe_mini/policies/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Executor policy registry."""

from __future__ import annotations

from safe_mini.types import ExecutorPolicy

from .executor import AllowlistExecutor, BaseExecutor, ExecutorBase, OpenExecutor, SafeExecutor

ExecutorPolicies: dict[ExecutorPolicy, type[ExecutorBase]] = {
ExecutorPolicy.OPEN: OpenExecutor,
ExecutorPolicy.SAFE: SafeExecutor,
ExecutorPolicy.ALLOWLIST: AllowlistExecutor,
}

__all__ = [
"AllowlistExecutor",
"BaseExecutor",
"ExecutorBase",
"ExecutorPolicies",
"OpenExecutor",
"SafeExecutor",
]
Loading
Loading