From 37c4caca0fbd6c14293ceb2c7a92fbad0347dc0c Mon Sep 17 00:00:00 2001 From: Zio Gabber <78922322+Gabrymi93@users.noreply.github.com> Date: Sat, 4 Apr 2026 16:19:01 +0100 Subject: [PATCH] Rende robusto il write dei run record su Windows --- tests/test_run_context.py | 35 ++++++++++++++++++++++++++++++++++- toolkit/core/run_context.py | 19 ++++++++++++++++++- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/tests/test_run_context.py b/tests/test_run_context.py index d753f97..d68180d 100644 --- a/tests/test_run_context.py +++ b/tests/test_run_context.py @@ -5,7 +5,7 @@ import time -from toolkit.core.run_context import RunContext, get_run_dir, read_run_record +from toolkit.core.run_context import RunContext, get_run_dir, read_run_record, write_run_record def _read_context(path: Path) -> dict[str, object]: @@ -165,3 +165,36 @@ def test_read_run_record_does_not_treat_error_message_as_path(tmp_path: Path) -> assert record["error"] == "/diagnostic text that is not a filesystem path" assert record["_portability"]["portable"] is True assert record["_portability"]["warnings"] == [] + + +def test_write_run_record_retries_on_permission_error(tmp_path: Path, monkeypatch) -> None: + run_dir = get_run_dir(tmp_path, "demo_ds", 2022) + payload = { + "dataset": "demo_ds", + "year": 2022, + "run_id": "retry_case", + "started_at": "2026-02-28T09:00:00+00:00", + "finished_at": None, + "status": "RUNNING", + "layers": {"raw": {"status": "PENDING"}, "clean": {"status": "PENDING"}, "mart": {"status": "PENDING"}}, + "validations": {"raw": {}, "clean": {}, "mart": {}}, + "error": None, + } + + replace_calls = {"n": 0} + original_replace = Path.replace + + def flaky_replace(self: Path, target: Path) -> Path: + replace_calls["n"] += 1 + if replace_calls["n"] == 1 and self.name.endswith(".tmp"): + raise PermissionError("[WinError 5] Access is denied") + return original_replace(self, target) + + monkeypatch.setattr(Path, "replace", flaky_replace) + + written = write_run_record(run_dir, "retry_case", payload) + + assert written.exists() + assert replace_calls["n"] >= 2 + stored = json.loads(written.read_text(encoding="utf-8")) + assert stored["run_id"] == "retry_case" diff --git a/toolkit/core/run_context.py b/toolkit/core/run_context.py index c12dc30..774983f 100644 --- a/toolkit/core/run_context.py +++ b/toolkit/core/run_context.py @@ -2,6 +2,7 @@ import json import re +import time import uuid from datetime import datetime, timezone from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath @@ -11,6 +12,7 @@ _LAYER_NAMES = ("raw", "clean", "mart") _WINDOWS_ABS_RE = re.compile(r"^[A-Za-z]:[\\/]") +_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS = (0.05, 0.1, 0.2) _PORTABLE_RUN_PATH_FIELDS: set[tuple[str, ...]] = { ("layers", "raw", "artifact_path"), ("layers", "clean", "artifact_path"), @@ -119,7 +121,22 @@ def write_run_record(run_dir: Path, run_id: str, payload: dict[str, Any]) -> Pat json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8", ) - tmp.replace(path) + + last_error: PermissionError | None = None + for attempt in range(len(_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS) + 1): + try: + tmp.replace(path) + return path + except PermissionError as exc: + # On Windows, AV/indexing can transiently hold the tmp/target handle. + # Retrying keeps run tracking resilient without changing record format. + last_error = exc + if attempt >= len(_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS): + raise + time.sleep(_RUN_RECORD_RENAME_RETRY_DELAYS_SECONDS[attempt]) + + if last_error is not None: + raise last_error return path