Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions desloppify/app/cli_support/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ def create_parser(*, langs: list[str], detector_names: list[str]) -> argparse.Ar
action="version",
version=_cli_version_string(),
)
parser.add_argument(
"--allow-unsafe-coerce",
action="store_true",
default=False,
help=(
"Allow unsafe persistence coercion/re-save for recovery workflows. "
"Use only when explicitly repairing corrupted state/plan payloads."
),
)
sub = parser.add_subparsers(
dest="command",
parser_class=_NoAbbrevArgumentParser,
Expand Down
5 changes: 5 additions & 0 deletions desloppify/base/exception_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class TriageValidationError(CommandError):
"""Raised for invalid triage-stage attestation or workflow inputs."""


class PersistenceSafetyError(CommandError):
"""Raised when plan/state persistence safety checks fail."""


PLAN_LOAD_EXCEPTIONS = (
ImportError,
AttributeError,
Expand All @@ -42,6 +46,7 @@ class TriageValidationError(CommandError):
__all__ = [
"CommandError",
"PLAN_LOAD_EXCEPTIONS",
"PersistenceSafetyError",
"PacketValidationError",
"RunnerTimeoutError",
"TriageValidationError",
Expand Down
1 change: 1 addition & 0 deletions desloppify/base/runtime_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class RuntimeContext:
source_file_cache: SourceFileCache = field(
default_factory=lambda: SourceFileCache(max_entries=16)
)
allow_unsafe_coerce: bool = False


_PROCESS_RUNTIME_CONTEXT = RuntimeContext()
Expand Down
4 changes: 4 additions & 0 deletions desloppify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from desloppify.base.discovery.paths import get_default_path, get_project_root
from desloppify.base.registry import detector_names, on_detector_registered
from desloppify.base.runtime_state import runtime_scope
from desloppify.base.runtime_state import current_runtime_context
from desloppify.languages import available_langs
from desloppify.state import load_state

Expand Down Expand Up @@ -169,6 +170,9 @@ def main() -> None:

try:
with runtime_scope():
current_runtime_context().allow_unsafe_coerce = bool(
getattr(args, "allow_unsafe_coerce", False)
)
_resolve_default_path(args)
_load_shared_runtime(args)

Expand Down
162 changes: 142 additions & 20 deletions desloppify/engine/_plan/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from pathlib import Path

from desloppify.base.discovery.file_paths import safe_write_text
from desloppify.base.exception_sets import PersistenceSafetyError
from desloppify.base.output.fallbacks import log_best_effort_failure
from desloppify.base.runtime_state import current_runtime_context
from desloppify.engine._plan.schema import (
PLAN_VERSION,
PlanModel,
Expand All @@ -22,69 +24,189 @@
logger = logging.getLogger(__name__)

PLAN_FILE = STATE_DIR / "plan.json"
_UNSAFE_MARKER_KEY = "_unsafe_load_reasons"
_QUARANTINE_KEY = "_load_quarantine"
_SAFE_ERROR_PREFIX = "DLP_PERSISTENCE_PLAN"


def load_plan(path: Path | None = None) -> PlanModel:
"""Load plan from disk, or return empty plan on missing/corruption."""
def _allow_unsafe_coerce(allow_unsafe_coerce: bool | None) -> bool:
if allow_unsafe_coerce is not None:
return bool(allow_unsafe_coerce)
return bool(current_runtime_context().allow_unsafe_coerce)


def _quarantine_path(plan_path: Path) -> Path:
stamp = utc_now().replace(":", "-")
return plan_path.with_name(f"{plan_path.stem}.quarantine.{stamp}{plan_path.suffix}")


def _write_quarantine_snapshot(plan_path: Path, *, raw_text: str, reason: str) -> Path | None:
quarantine_path = _quarantine_path(plan_path)
payload = {
"source_path": str(plan_path),
"reason": reason,
"captured_at": utc_now(),
"raw_text": raw_text,
}
try:
safe_write_text(quarantine_path, json.dumps(payload, indent=2, default=json_default) + "\n")
except OSError as exc:
log_best_effort_failure(logger, "write plan quarantine snapshot", exc)
return None
return quarantine_path


def _raise_plan_safety_error(*, code: str, detail: str, quarantine_path: Path | None = None) -> None:
message = f"[{_SAFE_ERROR_PREFIX}_{code}] {detail}"
if quarantine_path is not None:
message += f" Recovery snapshot: {quarantine_path}"
raise PersistenceSafetyError(message, exit_code=2)


def load_plan(
path: Path | None = None,
*,
allow_unsafe_coerce: bool | None = None,
) -> PlanModel:
"""Load plan from disk with explicit safety checks."""
plan_path = path or PLAN_FILE
unsafe_allowed = _allow_unsafe_coerce(allow_unsafe_coerce)
if not plan_path.exists():
return empty_plan()

raw_primary = ""
try:
data = json.loads(plan_path.read_text())
raw_primary = plan_path.read_text()
data = json.loads(raw_primary)
except (json.JSONDecodeError, UnicodeDecodeError, OSError) as ex:
# Try backup before giving up
backup = plan_path.with_suffix(".json.bak")
if backup.exists():
try:
data = json.loads(backup.read_text())
raw_backup = backup.read_text()
data = json.loads(raw_backup)
logger.warning("Plan file corrupted (%s), loaded from backup.", ex)
print(f" Warning: Plan file corrupted ({ex}), loaded from backup.", file=sys.stderr)
# Fall through to validation below
except (json.JSONDecodeError, UnicodeDecodeError, OSError) as backup_ex:
logger.warning("Plan file and backup both corrupted: %s / %s", ex, backup_ex)
print(f" Warning: Plan file corrupted ({ex}). Starting fresh.", file=sys.stderr)
return empty_plan()
quarantine_path = _write_quarantine_snapshot(
plan_path,
raw_text=raw_primary,
reason=f"primary parse error: {ex}; backup parse error: {backup_ex}",
)
_raise_plan_safety_error(
code="PARSE_FAILED",
detail="Plan file and backup are unreadable.",
quarantine_path=quarantine_path,
)
else:
logger.warning("Plan file corrupted (%s). Starting fresh.", ex)
print(f" Warning: Plan file corrupted ({ex}). Starting fresh.", file=sys.stderr)
return empty_plan()
quarantine_path = _write_quarantine_snapshot(
plan_path,
raw_text=raw_primary,
reason=f"primary parse error: {ex}",
)
_raise_plan_safety_error(
code="PARSE_FAILED",
detail="Plan file is unreadable and no backup is available.",
quarantine_path=quarantine_path,
)

if not isinstance(data, dict):
logger.warning("Plan file root is not a JSON object. Starting fresh.")
print(" Warning: Plan file root must be a JSON object. Starting fresh.", file=sys.stderr)
return empty_plan()
quarantine_path = _write_quarantine_snapshot(
plan_path,
raw_text=raw_primary,
reason="plan root is not a JSON object",
)
_raise_plan_safety_error(
code="ROOT_NOT_OBJECT",
detail="Plan file root must be a JSON object.",
quarantine_path=quarantine_path,
)

version = data.get("version", 1)
if version > PLAN_VERSION:
logger.warning("Plan file version %d > supported %d.", version, PLAN_VERSION)
print(
f" Warning: Plan file version {version} is newer than supported "
f"({PLAN_VERSION}). Some features may not work correctly.",
file=sys.stderr,
if not unsafe_allowed:
_raise_plan_safety_error(
code="FUTURE_VERSION",
detail=(
f"Plan schema version {version} is newer than supported ({PLAN_VERSION}). "
"Re-run with --allow-unsafe-coerce only for manual recovery."
),
)
logger.warning(
"Unsafe plan coercion enabled for future schema version %s (supported=%s).",
version,
PLAN_VERSION,
)

ensure_plan_defaults(data)
try:
validate_plan(data)
except ValueError as ex:
logger.warning("Plan invariants invalid (%s). Starting fresh.", ex)
print(f" Warning: Plan invariants invalid ({ex}). Starting fresh.", file=sys.stderr)
return empty_plan()
quarantine_path = _write_quarantine_snapshot(
plan_path,
raw_text=raw_primary,
reason=f"plan invariants invalid: {ex}",
)
_raise_plan_safety_error(
code="INVALID_INVARIANTS",
detail=f"Plan invariants invalid: {ex}",
quarantine_path=quarantine_path,
)

reasons: list[str] = []
if version > PLAN_VERSION:
reasons.append("future_schema_version")
quarantine_payload = data.get(_QUARANTINE_KEY)
if isinstance(quarantine_payload, dict) and quarantine_payload:
reasons.append("normalized_malformed_sections")
if reasons:
data[_UNSAFE_MARKER_KEY] = reasons

return data # type: ignore[return-value]


def save_plan(plan: PlanModel | dict, path: Path | None = None) -> None:
def _assert_safe_to_save(
plan: PlanModel | dict[str, object],
*,
allow_unsafe_coerce: bool | None,
) -> None:
unsafe_allowed = _allow_unsafe_coerce(allow_unsafe_coerce)
reasons = plan.get(_UNSAFE_MARKER_KEY)
if unsafe_allowed:
return
if isinstance(reasons, list) and reasons:
_raise_plan_safety_error(
code="UNSAFE_SAVE_BLOCKED",
detail=(
"Plan payload contains unsafe normalization markers "
f"({', '.join(str(item) for item in reasons)}). "
"Use --allow-unsafe-coerce only after manual verification."
),
)


def save_plan(
plan: PlanModel | dict,
path: Path | None = None,
*,
allow_unsafe_coerce: bool | None = None,
) -> None:
"""Validate and save plan to disk atomically."""
_assert_safe_to_save(plan, allow_unsafe_coerce=allow_unsafe_coerce)
ensure_plan_defaults(plan)
plan["updated"] = utc_now()
validate_plan(plan)

plan_path = path or PLAN_FILE
plan_path.parent.mkdir(parents=True, exist_ok=True)

content = json.dumps(plan, indent=2, default=json_default) + "\n"
serializable_plan = dict(plan)
serializable_plan.pop(_UNSAFE_MARKER_KEY, None)
content = json.dumps(serializable_plan, indent=2, default=json_default) + "\n"

if plan_path.exists():
backup = plan_path.with_suffix(".json.bak")
Expand Down
37 changes: 37 additions & 0 deletions desloppify/engine/_plan/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,43 @@ def ensure_plan_defaults(plan: dict[str, Any]) -> None:
for key, value in defaults.items():
plan.setdefault(key, value)
_upgrade_plan_to_v7(plan)
_normalize_skipped_entries(plan)


def _normalize_skipped_entries(plan: dict[str, Any]) -> None:
"""Normalize malformed skipped entries without dropping original payloads."""
skipped = plan.get("skipped")
if not isinstance(skipped, dict):
return

quarantine = plan.setdefault("_load_quarantine", {})
if not isinstance(quarantine, dict):
quarantine = {}
plan["_load_quarantine"] = quarantine

quarantined_entries = quarantine.setdefault("invalid_skipped_entries", {})
if not isinstance(quarantined_entries, dict):
quarantined_entries = {}
quarantine["invalid_skipped_entries"] = quarantined_entries

for issue_id, entry in list(skipped.items()):
if not isinstance(entry, dict):
quarantined_entries[issue_id] = entry
skipped[issue_id] = {
"issue_id": issue_id,
"kind": "temporary",
"reason": "Recovered malformed skipped entry",
}
continue

kind = entry.get("kind")
if kind in VALID_SKIP_KINDS:
continue

quarantined_entries[issue_id] = dict(entry)
entry["kind"] = "temporary"
entry.setdefault("issue_id", issue_id)
entry.setdefault("reason", "Recovered invalid skip kind")


def triage_clusters(plan: dict[str, Any]) -> dict[str, Cluster]:
Expand Down
26 changes: 21 additions & 5 deletions desloppify/engine/_plan/schema_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,22 @@ def _ensure_container(
key: str,
expected_type: type[list] | type[dict],
default_factory,
*,
quarantine: dict[str, Any] | None = None,
) -> None:
if not isinstance(plan.get(key), expected_type):
plan[key] = default_factory()
current = plan.get(key)
if isinstance(current, expected_type):
return
if quarantine is not None and key in plan:
quarantine.setdefault("container_type_mismatches", {})[key] = current
plan[key] = default_factory()


def ensure_container_types(plan: dict[str, Any]) -> None:
quarantine = plan.setdefault("_load_quarantine", {})
if not isinstance(quarantine, dict):
quarantine = {}
plan["_load_quarantine"] = quarantine
for key, expected_type, default_factory in (
("queue_order", list, list),
("deferred", list, list),
Expand All @@ -39,11 +49,17 @@ def ensure_container_types(plan: dict[str, Any]) -> None:
("execution_log", list, list),
("epic_triage_meta", dict, dict),
):
_ensure_container(plan, key, expected_type, default_factory)
_ensure_container(
plan,
key,
expected_type,
default_factory,
quarantine=quarantine,
)
_rename_key(plan["epic_triage_meta"], "finding_snapshot_hash", "issue_snapshot_hash")
_ensure_container(plan, "commit_log", list, list)
_ensure_container(plan, "commit_log", list, list, quarantine=quarantine)
_rename_key(plan, "uncommitted_findings", "uncommitted_issues")
_ensure_container(plan, "uncommitted_issues", list, list)
_ensure_container(plan, "uncommitted_issues", list, list, quarantine=quarantine)
if "commit_tracking_branch" not in plan:
plan["commit_tracking_branch"] = None

Expand Down
Loading