Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion tests/test_run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import time

from toolkit.core.run_context import RunContext, get_run_dir, read_run_record
from toolkit.core.run_context import RunContext, get_run_dir, read_run_record, write_run_record


def _read_context(path: Path) -> dict[str, object]:
Expand Down Expand Up @@ -165,3 +165,36 @@ def test_read_run_record_does_not_treat_error_message_as_path(tmp_path: Path) ->
assert record["error"] == "/diagnostic text that is not a filesystem path"
assert record["_portability"]["portable"] is True
assert record["_portability"]["warnings"] == []


def test_write_run_record_retries_on_permission_error(tmp_path: Path, monkeypatch) -> None:
run_dir = get_run_dir(tmp_path, "demo_ds", 2022)
payload = {
"dataset": "demo_ds",
"year": 2022,
"run_id": "retry_case",
"started_at": "2026-02-28T09:00:00+00:00",
"finished_at": None,
"status": "RUNNING",
"layers": {"raw": {"status": "PENDING"}, "clean": {"status": "PENDING"}, "mart": {"status": "PENDING"}},
"validations": {"raw": {}, "clean": {}, "mart": {}},
"error": None,
}

replace_calls = {"n": 0}
original_replace = Path.replace

def flaky_replace(self: Path, target: Path) -> Path:
replace_calls["n"] += 1
if replace_calls["n"] == 1 and self.name.endswith(".tmp"):
raise PermissionError("[WinError 5] Access is denied")
return original_replace(self, target)

monkeypatch.setattr(Path, "replace", flaky_replace)

written = write_run_record(run_dir, "retry_case", payload)

assert written.exists()
assert replace_calls["n"] >= 2
stored = json.loads(written.read_text(encoding="utf-8"))
assert stored["run_id"] == "retry_case"
19 changes: 18 additions & 1 deletion toolkit/core/run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import re
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath
Expand All @@ -11,6 +12,7 @@

_LAYER_NAMES = ("raw", "clean", "mart")
_WINDOWS_ABS_RE = re.compile(r"^[A-Za-z]:[\\/]")
_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS = (0.05, 0.1, 0.2)
_PORTABLE_RUN_PATH_FIELDS: set[tuple[str, ...]] = {
("layers", "raw", "artifact_path"),
("layers", "clean", "artifact_path"),
Expand Down Expand Up @@ -119,7 +121,22 @@ def write_run_record(run_dir: Path, run_id: str, payload: dict[str, Any]) -> Pat
json.dumps(payload, indent=2, ensure_ascii=False),
encoding="utf-8",
)
tmp.replace(path)

last_error: PermissionError | None = None
for attempt in range(len(_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS) + 1):
try:
tmp.replace(path)
return path
except PermissionError as exc:
# On Windows, AV/indexing can transiently hold the tmp/target handle.
# Retrying keeps run tracking resilient without changing record format.
last_error = exc
if attempt >= len(_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS):
raise
time.sleep(_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS[attempt])

if last_error is not None:
raise last_error
return path


Expand Down
Loading