From f55a6f2a742f43790f0f8ad866a2d12b88963cc8 Mon Sep 17 00:00:00 2001 From: John Westerlund Date: Wed, 4 Mar 2026 12:55:37 +0100 Subject: [PATCH 1/8] fix: harden open/recovery edge cases --- src/sdsavior/ring.py | 86 ++++++++++++++++++++++++++++-------------- tests/test_basic.py | 19 ++++++++++ tests/test_extended.py | 51 ++++++++++++++++++++++++- 3 files changed, 125 insertions(+), 31 deletions(-) diff --git a/src/sdsavior/ring.py b/src/sdsavior/ring.py index eb30a3b..9ad4a70 100644 --- a/src/sdsavior/ring.py +++ b/src/sdsavior/ring.py @@ -149,14 +149,28 @@ def open(self) -> None: meta_mm: mmap.mmap | None = None try: - data_exists = os.path.exists(self.data_path) - existing_data_size = os.path.getsize(self.data_path) if data_exists else 0 - data_fd = os.open(self.data_path, os.O_RDWR | os.O_CREAT) meta_fd = os.open(self.meta_path, os.O_RDWR | os.O_CREAT) - self._ensure_file_size(data_fd, self.capacity) - self._ensure_file_size(meta_fd, META_FILE_SIZE) + data_size = os.fstat(data_fd).st_size + meta_size = os.fstat(meta_fd).st_size + is_new_data_file = data_size == 0 + + if data_size not in (0, self.capacity): + raise ValueError( + f"Data file size ({data_size}) != requested capacity ({self.capacity}). " + "Use the same capacity or create new files." + ) + if meta_size not in (0, META_FILE_SIZE): + raise ValueError( + f"Meta file size ({meta_size}) is invalid. " + f"Expected 0 or {META_FILE_SIZE} bytes." + ) + + if is_new_data_file: + os.ftruncate(data_fd, self.capacity) + if meta_size == 0: + os.ftruncate(meta_fd, META_FILE_SIZE) data_mm = mmap.mmap(data_fd, self.capacity, access=mmap.ACCESS_WRITE) meta_mm = mmap.mmap(meta_fd, META_FILE_SIZE, access=mmap.ACCESS_WRITE) @@ -170,12 +184,12 @@ def open(self) -> None: data_mm = None meta_mm = None - if existing_data_size > 0 and self._data_mm[:DATA_START] != DATA_MAGIC: + if not is_new_data_file and self._data_mm[:DATA_START] != DATA_MAGIC: raise ValueError( f"Data file {self.data_path!r} has invalid magic; " "refusing to overwrite existing data." ) - if existing_data_size == 0 and self._data_mm[:DATA_START] != DATA_MAGIC: + if is_new_data_file: self._data_mm[:DATA_START] = DATA_MAGIC self._data_mm.flush() if self.fsync_data: @@ -215,28 +229,29 @@ def open(self) -> None: def close(self) -> None: """Persist metadata and release mmap/file descriptors.""" - if self._state is not None: - self._write_meta(self._state) + try: + if self._state is not None: + self._write_meta(self._state) + finally: + if self._data_mm is not None: + self._data_mm.flush() + self._data_mm.close() + self._data_mm = None - if self._data_mm is not None: - self._data_mm.flush() - self._data_mm.close() - self._data_mm = None + if self._meta_mm is not None: + self._meta_mm.flush() + self._meta_mm.close() + self._meta_mm = None - if self._meta_mm is not None: - self._meta_mm.flush() - self._meta_mm.close() - self._meta_mm = None + if self._data_fd is not None: + os.close(self._data_fd) + self._data_fd = None - if self._data_fd is not None: - os.close(self._data_fd) - self._data_fd = None - - if self._meta_fd is not None: - os.close(self._meta_fd) - self._meta_fd = None + if self._meta_fd is not None: + os.close(self._meta_fd) + self._meta_fd = None - self._state = None + self._state = None def _cleanup_open_handles(self) -> None: """Best-effort cleanup of mapped files and fds without persisting metadata.""" @@ -331,7 +346,7 @@ def iter_records(self, *, from_seq: int | None = None) -> Iterator[tuple[int, in off = s.tail scanned = 0 - limit = self.recover_scan_limit_bytes or self.capacity + limit = self.capacity while off != s.head and scanned < limit: rec = self._read_record(off) @@ -339,7 +354,10 @@ def iter_records(self, *, from_seq: int | None = None) -> Iterator[tuple[int, in break kind, next_off, seq, ts_ns, obj = rec - scanned += self._distance(off, next_off) + step = self._distance(off, next_off) + if step <= 0: + break + scanned += step if kind == "wrap": off = next_off @@ -405,6 +423,8 @@ def _read_record(self, off: int) -> ParsedRecord | None: (total_len,) = struct.unpack_from(" self.capacity: @@ -579,14 +599,22 @@ def _recover(self) -> None: break kind, next_off, seq, _ts_ns, _obj = rec + step = self._distance(off, next_off) + if step <= 0: + s.head = last_good_off + s.commit += 1 + self._write_meta(s) + truncated = True + break + if kind == "wrap": - scanned += self._distance(off, next_off) + scanned += step off = next_off last_good_off = off continue last_seq = seq - scanned += self._distance(off, next_off) + scanned += step off = next_off last_good_off = off diff --git a/tests/test_basic.py b/tests/test_basic.py index 41e36ea..dea8c43 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -72,10 +72,12 @@ def test_open_rejects_existing_invalid_magic(tmp_path: Path) -> None: data.write_bytes(b"BAD!" + b"\x00" * (capacity - 4)) meta.write_bytes(b"") + size_before = data.stat().st_size rb = SDSavior(str(data), str(meta), capacity) with pytest.raises(ValueError, match="invalid magic"): rb.open() + assert data.stat().st_size == size_before def test_recover_truncates_when_scan_limit_is_reached(tmp_path: Path) -> None: @@ -94,6 +96,23 @@ def test_recover_truncates_when_scan_limit_is_reached(tmp_path: Path) -> None: assert [r[2]["n"] for r in rows] == [1] +def test_iter_records_ignores_recover_scan_limit_after_open(tmp_path: Path) -> None: + """Ensure iteration is not implicitly capped by recovery-only scan limits.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rb.append({"n": 3}) + + with SDSavior(str(data), str(meta), capacity) as rb2: + rb2.recover_scan_limit_bytes = 1 + rows = list(rb2.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2, 3] + + def test_write_wrap_marker_fsyncs_data_when_enabled( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, diff --git a/tests/test_extended.py b/tests/test_extended.py index 3d8910e..4d38bd5 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -161,13 +161,16 @@ def test_open_rejects_capacity_mismatch(tmp_path: Path) -> None: """Reject opening existing files when requested capacity does not match meta.""" data = tmp_path / "ring.dat" meta = tmp_path / "ring.meta" + original_capacity = 256 * 1024 - with SDSavior(str(data), str(meta), 256 * 1024) as rb: + with SDSavior(str(data), str(meta), original_capacity) as rb: rb.append({"n": 1}) - with pytest.raises(ValueError, match="Meta capacity"): + size_before = data.stat().st_size + with pytest.raises(ValueError, match="Data file size"): with SDSavior(str(data), str(meta), 16 * 1024) as rb2: rb2.iter_records() + assert data.stat().st_size == size_before def test_open_twice_requires_close(tmp_path: Path) -> None: @@ -265,3 +268,47 @@ def raise_on_load_meta(self: SDSavior) -> None: assert rb._data_mm is None assert rb._meta_mm is None assert rb._state is None + + +def test_recover_treats_wrap_marker_at_data_start_as_corruption(tmp_path: Path) -> None: + """Ensure wrap markers at DATA_START do not cause recovery/iteration hangs.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + + with SDSavior(str(data), str(meta), capacity) as rb: + assert rb._data_mm is not None + struct.pack_into(" None: + """Ensure close releases resources even if metadata persistence raises.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + rb = SDSavior(str(data), str(meta), 256 * 1024) + rb.open() + + def raise_on_write_meta(_st) -> None: + """Force close() to execute cleanup via the finally branch.""" + raise RuntimeError("forced write_meta failure") + + monkeypatch.setattr(rb, "_write_meta", raise_on_write_meta) + + with pytest.raises(RuntimeError, match="forced write_meta failure"): + rb.close() + + assert rb._data_fd is None + assert rb._meta_fd is None + assert rb._data_mm is None + assert rb._meta_mm is None + assert rb._state is None From 0665e7d89df8e9abf74159b7e1b472cb78f47f72 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 12:27:56 +0000 Subject: [PATCH 2/8] 1.0.1 Automatically generated by python-semantic-release --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1acba1f..84a89f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ +## v1.0.1 (2026-03-04) + +### Bug Fixes + +- Harden open/recovery edge cases + ([`f55a6f2`](https://github.com/well-it-wasnt-me/SDSavior/commit/f55a6f2a742f43790f0f8ad866a2d12b88963cc8)) + + ## v1.0.0 (2026-02-28) - Initial Release From f4bd2c2833b42478fb15be6ac2b63a7a17ee7250 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 13:38:20 +0000 Subject: [PATCH 3/8] fix(ci): pypy with twine --- .github/workflows/ci.yml | 23 +++++++++++++++++------ pyproject.toml | 3 ++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8795bfd..0544976 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - name: Type check run: mypy src - - name: Tests with coverage gate + - name: Tests with coverage run: pytest -q semver: @@ -118,11 +118,22 @@ jobs: needs: semver name: Upload release to PyPI runs-on: ubuntu-latest - environment: - name: pypi - url: https://pypi.org/p/sdsavior permissions: id-token: write steps: - - name: Publish package distributions to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 \ No newline at end of file + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: deps + run: python -m pip install -U build twine + + - name: build + run: python -m build + + - name: Upload package to PyPI + run: python -m twine upload dist/* + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} diff --git a/pyproject.toml b/pyproject.toml index 8edbfd8..fe6a581 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "sdsavior" -version = "0.1.0" +version = "1.0.1" description = "Crash-recoverable memory-mapped ring buffer for JSON records (SD-card friendly-ish)" readme = "README.md" requires-python = ">=3.11" @@ -51,3 +51,4 @@ select = ["E", "F", "I", "UP", "B"] [tool.pytest.ini_options] addopts = "--cov=src/sdsavior --cov-report=term-missing --cov-fail-under=90" testpaths = ["tests"] +pythonpath = ["src"] From 3f0b76a922c79593ce877fbf91df6e27dd98bff5 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 13:43:45 +0000 Subject: [PATCH 4/8] test(coverage): extended test coverage --- tests/test_basic.py | 28 ++++++ tests/test_extended.py | 216 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 243 insertions(+), 1 deletion(-) diff --git a/tests/test_basic.py b/tests/test_basic.py index dea8c43..6e9da1d 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -52,6 +52,15 @@ def test_capacity_must_be_aligned(tmp_path: Path) -> None: SDSavior(str(data), str(meta), (16 * 1024) + 1) +def test_capacity_too_small_raises(tmp_path: Path) -> None: + """Reject capacities that are too small to be useful.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with pytest.raises(ValueError, match="too small"): + SDSavior(str(data), str(meta), 8 * 1024) + + def test_json_kwargs_are_copied(tmp_path: Path) -> None: """Ensure caller-owned JSON kwargs are copied, not referenced.""" data = tmp_path / "ring.dat" @@ -113,6 +122,25 @@ def test_iter_records_ignores_recover_scan_limit_after_open(tmp_path: Path) -> N assert [r[2]["n"] for r in rows] == [1, 2, 3] +def test_append_fsyncs_data(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Verify append calls fsync when data syncing is enabled.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + fsync_calls: list[int] = [] + + def fake_fsync(fd: int) -> None: + fsync_calls.append(fd) + + monkeypatch.setattr("sdsavior.ring.os.fsync", fake_fsync) + + with SDSavior(str(data), str(meta), 16 * 1024, fsync_data=True, fsync_meta=False) as rb: + assert rb._data_fd is not None + data_fd = rb._data_fd + fsync_calls.clear() + rb.append({"n": 1}) + assert fsync_calls == [data_fd] + + def test_write_wrap_marker_fsyncs_data_when_enabled( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, diff --git a/tests/test_extended.py b/tests/test_extended.py index 4d38bd5..8bda003 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -1,14 +1,24 @@ from __future__ import annotations import json +import os import struct import sys from pathlib import Path +from types import SimpleNamespace import pytest from sdsavior import SDSavior, cli -from sdsavior.ring import RECORD_HDR, RECORD_HDR_SIZE, _crc32_bytes +from sdsavior.ring import ( + DATA_START, + META_FILE_SIZE, + RECORD_HDR, + RECORD_HDR_SIZE, + WRAP_MARKER, + MetaState, + _crc32_bytes, +) def _record_offsets(rb: SDSavior) -> list[int]: @@ -270,6 +280,210 @@ def raise_on_load_meta(self: SDSavior) -> None: assert rb._state is None +def test_open_rejects_invalid_meta_size(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 16 * 1024 + + with open(data, "wb") as f: + f.truncate(capacity) + with open(meta, "wb") as f: + f.truncate(1) + + rb = SDSavior(str(data), str(meta), capacity) + with pytest.raises(ValueError, match="Meta file size"): + rb.open() + + +def test_open_rejects_meta_capacity_mismatch(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 32 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + + other_capacity = 16 * 1024 + state = MetaState( + capacity=other_capacity, + head=DATA_START, + tail=DATA_START, + seq_next=1, + commit=1, + ) + raw = rb._pack_meta(state) + meta.write_bytes(raw + b"\x00" * (META_FILE_SIZE - len(raw))) + + rb2 = SDSavior(str(data), str(meta), capacity) + with pytest.raises(ValueError, match="Meta capacity"): + rb2.open() + + +def test_iter_records_breaks_on_corrupt_record(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._data_mm is not None + struct.pack_into(" None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._state is not None + off = DATA_START + 64 + rb._write_wrap_marker(off) + rb._state.tail = off + rb._state.head = DATA_START + assert list(rb.iter_records()) == [] + + +def test_read_record_invalid_cases(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._data_mm is not None + mm = rb._data_mm + + assert rb._read_record(0) is None + + struct.pack_into(" None: + path = tmp_path / "size.dat" + fd = os.open(path, os.O_RDWR | os.O_CREAT) + try: + SDSavior._ensure_file_size(fd, 128) + assert os.fstat(fd).st_size == 128 + finally: + os.close(fd) + + +def test_unpack_meta_rejects_invalid_buffers(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + rb = SDSavior(str(data), str(meta), 16 * 1024) + + assert rb._unpack_meta(b"short") is None + + state = MetaState( + capacity=rb.capacity, + head=DATA_START, + tail=DATA_START, + seq_next=1, + commit=1, + ) + raw = rb._pack_meta(state) + corrupt_crc = bytearray(raw) + corrupt_crc[-8] ^= 0xFF + assert rb._unpack_meta(bytes(corrupt_crc)) is None + + bad_state = MetaState( + capacity=rb.capacity, + head=0, + tail=0, + seq_next=1, + commit=1, + ) + bad_raw = rb._pack_meta(bad_state) + assert rb._unpack_meta(bad_raw) is None + + +def test_make_space_handles_corrupt_tail_record(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._state is not None + assert rb._data_mm is not None + struct.pack_into(" None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._state is not None + start_commit = rb._state.commit + + def stuck(_off: int): + return ("rec", _off, 1, 0, {}) + + monkeypatch.setattr(rb, "_read_record", stuck) + rb._recover() + assert rb._state.commit == start_commit + 1 + assert rb._state.head == rb._state.tail + + +def test_recover_wrap_marker_advances(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._state is not None + off = DATA_START + 64 + rb._write_wrap_marker(off) + rb._state.tail = off + rb._state.head = DATA_START + rb._recover() + assert rb._state.tail == off + assert rb._state.head == DATA_START + + +def test_recover_updates_seq_next(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + last_seq = rb.append({"n": 2}) + assert rb._state is not None + rb._state.seq_next = 1 + rb._recover() + assert rb._state.seq_next == last_seq + 1 + + +def test_cli_returns_nonzero_for_unknown_command(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + cli.argparse.ArgumentParser, + "parse_args", + lambda _self: SimpleNamespace(cmd="noop"), + ) + assert cli.main() == 2 + + def test_recover_treats_wrap_marker_at_data_start_as_corruption(tmp_path: Path) -> None: """Ensure wrap markers at DATA_START do not cause recovery/iteration hangs.""" data = tmp_path / "ring.dat" From fd9e5069ff4e5c79ca96b8e2ab320de1d3c36bd9 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 13:47:25 +0000 Subject: [PATCH 5/8] chore: line to long --- tests/test_extended.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_extended.py b/tests/test_extended.py index 8bda003..1b9df9f 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -429,7 +429,10 @@ def test_make_space_handles_corrupt_tail_record(tmp_path: Path) -> None: assert rb._state.tail == rb._state.head -def test_recover_handles_non_advancing_record(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: +def test_recover_handles_non_advancing_record( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch +) -> None: data = tmp_path / "ring.dat" meta = tmp_path / "ring.meta" From 3d7aa1f1fbd246cdcf26daf820fc89d32aab4655 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 13:48:46 +0000 Subject: [PATCH 6/8] 1.0.2 Automatically generated by python-semantic-release --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84a89f6..76af8ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ +## v1.0.2 (2026-03-04) + +### Bug Fixes + +- **ci**: Pypy with twine + ([`f4bd2c2`](https://github.com/well-it-wasnt-me/SDSavior/commit/f4bd2c2833b42478fb15be6ac2b63a7a17ee7250)) + +### Chores + +- Line to long + ([`fd9e506`](https://github.com/well-it-wasnt-me/SDSavior/commit/fd9e5069ff4e5c79ca96b8e2ab320de1d3c36bd9)) + +### Testing + +- **coverage**: Extended test coverage + ([`3f0b76a`](https://github.com/well-it-wasnt-me/SDSavior/commit/3f0b76a922c79593ce877fbf91df6e27dd98bff5)) + + ## v1.0.1 (2026-03-04) ### Bug Fixes From 615d75214e1a4ff753160109b4840f0c4c23ff83 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 14:02:28 +0000 Subject: [PATCH 7/8] ci(refactor): refactored the entire ci file. lets hope, cause im tired.... --- .github/workflows/ci.yml | 50 +++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0544976..20e6bec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,15 +1,21 @@ -name: CI +name: SD Savior Pipeline on: - push: - branches: ["**"] pull_request: + push: + branches: + - main permissions: contents: read +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: validate: + name: Validate (lint, types, tests) runs-on: ubuntu-latest strategy: fail-fast: false @@ -24,6 +30,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + cache: pip - name: Install dependencies run: | @@ -36,14 +43,14 @@ jobs: - name: Type check run: mypy src - - name: Tests with coverage + - name: Tests run: pytest -q semver: - name: Release + name: Release (semantic-release) runs-on: ubuntu-latest needs: [validate] - if: github.ref == 'refs/heads/main' + if: github.event_name == 'push' && github.ref == 'refs/heads/main' permissions: contents: write env: @@ -52,6 +59,7 @@ jobs: released: ${{ steps.release.outputs.released }} tag: ${{ steps.release.outputs.tag }} commit_sha: ${{ steps.release.outputs.commit_sha }} + steps: - name: Checkout (full history for tags) uses: actions/checkout@v4 @@ -68,12 +76,14 @@ jobs: git_committer_email: "41898282+github-actions[bot]@users.noreply.github.com" docs-build: + name: Build docs + needs: [validate] if: github.event_name == 'push' && github.ref == 'refs/heads/main' - needs: validate runs-on: ubuntu-latest permissions: pages: write id-token: write + steps: - name: Checkout uses: actions/checkout@v4 @@ -82,6 +92,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: "3.11" + cache: pip - name: Install dependencies run: | @@ -100,8 +111,9 @@ jobs: path: site docs-deploy: + name: Deploy docs + needs: [docs-build] if: github.event_name == 'push' && github.ref == 'refs/heads/main' - needs: docs-build runs-on: ubuntu-latest permissions: pages: write @@ -109,31 +121,41 @@ jobs: environment: name: github-pages url: ${{ steps.deployment.outputs.page_url }} + steps: - name: Deploy to GitHub Pages id: deployment uses: actions/deploy-pages@v4 pypi-publish: - needs: semver name: Upload release to PyPI + needs: [semver] + if: github.event_name == 'push' && github.ref == 'refs/heads/main' && needs.semver.outputs.released == 'true' runs-on: ubuntu-latest permissions: id-token: write + steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 + - name: Checkout release tag + uses: actions/checkout@v4 + with: + fetch-depth: 0 + ref: ${{ needs.semver.outputs.tag }} + + - name: Set up Python + uses: actions/setup-python@v5 with: python-version: "3.11" + cache: pip - - name: deps + - name: Build tooling run: python -m pip install -U build twine - - name: build + - name: Build package run: python -m build - name: Upload package to PyPI run: python -m twine upload dist/* env: TWINE_USERNAME: __token__ - TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} \ No newline at end of file From 67649f7d806390046d9359c9c6bccf506a20bc26 Mon Sep 17 00:00:00 2001 From: John Westerlund Date: Thu, 5 Mar 2026 16:10:00 +0100 Subject: [PATCH 8/8] feat: add write coalescing with configurable batch flush Add coalesce_max_records and coalesce_max_seconds parameters to buffer writes and flush in batches. Initialize _last_flush_time to time.monotonic() (not 0.0) so time-based flush works from the start. Add flush(), write_stats, and include_pending parameter to iter_records(). Co-Authored-By: Claude Opus 4.6 --- docs/api.md | 7 +- docs/usage.md | 53 +++++++++++ src/sdsavior/ring.py | 211 +++++++++++++++++++++++++++++++---------- tests/test_extended.py | 131 +++++++++++++++++++++++++ 4 files changed, 350 insertions(+), 52 deletions(-) diff --git a/docs/api.md b/docs/api.md index 020d3f7..545b98f 100644 --- a/docs/api.md +++ b/docs/api.md @@ -14,11 +14,13 @@ Dataclass storing persisted pointer state: ### Constructor -`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None)` +`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None, coalesce_max_records=None, coalesce_max_seconds=None)` - `capacity_bytes` must be a multiple of 8 and at least 16 KiB. - `json_dumps_kwargs` is copied internally. - `recover_scan_limit_bytes` can cap recovery scanning. +- `coalesce_max_records` (optional): flush pending records after this count. +- `coalesce_max_seconds` (optional): flush pending records after this many seconds. ### Lifecycle @@ -29,7 +31,8 @@ Dataclass storing persisted pointer state: ### Data Operations - `append(obj) -> int`: append JSON object and return assigned sequence. -- `iter_records(from_seq=None)`: iterate `(seq, ts_ns, obj)` from tail to head. +- `iter_records(from_seq=None, skip_corrupt=False)`: iterate `(seq, ts_ns, obj)` from tail to head. When `skip_corrupt=True`, skips corrupt records and continues scanning for valid records instead of stopping. The scan searches up to the full ring capacity, handling corruption of any size. +- `_last_iter_skipped`: after `iter_records(skip_corrupt=True)`, contains the count of corrupt regions that were skipped. - `export_jsonl(out_path, from_seq=None)`: write records to JSONL file. ### Internal Mechanics (for whomever wish to contribute) diff --git a/docs/usage.md b/docs/usage.md index d3ab36f..ccf4f05 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -35,6 +35,20 @@ for seq, ts_ns, obj in rb.iter_records(from_seq=200): ... ``` +## Skip Corrupt Records + +By default, iteration stops at the first corrupt record. Use `skip_corrupt=True` to skip over corrupt regions and yield all valid records: + +```python +for seq, ts_ns, obj in rb.iter_records(skip_corrupt=True): + print(seq, obj) + +# Check how many corrupt regions were skipped +print(f"Skipped {rb._last_iter_skipped} corrupt region(s)") +``` + +This is useful for data recovery when partial corruption has occurred. + ## Export JSONL ```python @@ -49,3 +63,42 @@ with SDSavior("data.ring", "data.meta", 8 * 1024 * 1024) as rb: - `recover_scan_limit_bytes=None` (default): scan up to capacity during recovery. Use `fsync_data=True` when stronger durability is required and throughput tradeoffs are acceptable. + +## Write Coalescing + +Buffer multiple writes and flush them in batches to reduce I/O overhead: + +```python +with SDSavior( + "data.ring", "data.meta", 8 * 1024 * 1024, + coalesce_max_records=100, + coalesce_max_seconds=1.0, +) as rb: + for i in range(1000): + rb.append({"sample": i}) + # Remaining pending records are flushed on close +``` + +- `coalesce_max_records`: flush after this many buffered records. +- `coalesce_max_seconds`: flush after this many seconds since last flush. +- `flush()`: manually trigger a flush of pending records. + +**Trade-offs**: Coalescing increases memory use (pending records are held in memory) and widens the data loss window (unflushed records are lost on crash). Use when write throughput matters more than per-record durability. + +### Write Statistics + +```python +stats = rb.write_stats +print(f"Logical appends: {stats['logical_appends']}") +print(f"Physical flushes: {stats['physical_flushes']}") +print(f"Bytes written: {stats['bytes_written']}") +``` + +### Pending Records in Iteration + +By default, `iter_records()` only yields durable on-disk records. To include unflushed pending records: + +```python +for seq, ts_ns, obj in rb.iter_records(include_pending=True): + print(seq, obj) +``` diff --git a/src/sdsavior/ring.py b/src/sdsavior/ring.py index 9ad4a70..e73816b 100644 --- a/src/sdsavior/ring.py +++ b/src/sdsavior/ring.py @@ -96,6 +96,8 @@ def __init__( fsync_meta: bool = True, json_dumps_kwargs: dict[str, Any] | None = None, recover_scan_limit_bytes: int | None = None, + coalesce_max_records: int | None = None, + coalesce_max_seconds: float | None = None, ): """Configure file paths, durability options, and recovery behavior for a ring instance.""" capacity = int(capacity_bytes) @@ -117,6 +119,13 @@ def __init__( else: self.json_dumps_kwargs = dict(json_dumps_kwargs) self.recover_scan_limit_bytes = recover_scan_limit_bytes + self.coalesce_max_records = coalesce_max_records + self.coalesce_max_seconds = coalesce_max_seconds + self._pending: list[Any] = [] + self._last_flush_time: float = time.monotonic() + self._logical_appends: int = 0 + self._physical_flushes: int = 0 + self._bytes_written: int = 0 self._data_fd: int | None = None self._meta_fd: int | None = None @@ -133,8 +142,31 @@ def __exit__(self, exc_type, exc, tb) -> None: """Close the ring buffer when leaving a ``with`` block.""" self.close() + @property + def _coalescing_enabled(self) -> bool: + """Return True if write coalescing is configured.""" + return ( + self.coalesce_max_records is not None + or self.coalesce_max_seconds is not None + ) + + @property + def write_stats(self) -> dict[str, int]: + """Return write statistics.""" + return { + "logical_appends": self._logical_appends, + "physical_flushes": self._physical_flushes, + "bytes_written": self._bytes_written, + } + # ---------- public API ---------- + def flush(self) -> None: + """Flush any pending coalesced records to the ring.""" + self._require_open() + if self._pending: + self._flush_pending() + def open(self) -> None: """Open/create ring files, map them into memory, load metadata, and recover state.""" if self._data_fd is not None or self._meta_fd is not None: @@ -229,6 +261,8 @@ def open(self) -> None: def close(self) -> None: """Persist metadata and release mmap/file descriptors.""" + if self._pending and self._data_mm is not None: + self._flush_pending() try: if self._state is not None: self._write_meta(self._state) @@ -275,11 +309,100 @@ def _cleanup_open_handles(self) -> None: def append(self, obj: Any) -> int: """ - Append a JSON object. Returns the sequence number written. + Append a JSON object. Returns the sequence number + (or projected sequence if coalescing). Overwrites oldest records if needed. """ self._require_open() assert self._state is not None + self._logical_appends += 1 + + if not self._coalescing_enabled: + seq = self._append_single(obj) + self._physical_flushes += 1 + return seq + + self._pending.append(obj) + + should_flush = False + if ( + self.coalesce_max_records is not None + and len(self._pending) >= self.coalesce_max_records + ): + should_flush = True + if self.coalesce_max_seconds is not None: + elapsed = time.monotonic() - self._last_flush_time + if elapsed >= self.coalesce_max_seconds: + should_flush = True + + if should_flush: + self._flush_pending() + return self._state.seq_next - 1 # Last flushed sequence + + # Return projected sequence for unflushed record + return self._state.seq_next + len(self._pending) - 1 + + def iter_records( + self, + *, + from_seq: int | None = None, + include_pending: bool = False, + ) -> Iterator[tuple[int, int, Any]]: + """ + Iterate records from tail -> head, yielding (seq, ts_ns, obj). + If from_seq is provided, skips older sequences. + If include_pending is True, also yields unflushed coalesced records. + """ + self._require_open() + assert self._state is not None + s = self._state + + off = s.tail + scanned = 0 + limit = self.capacity + + while off != s.head and scanned < limit: + rec = self._read_record(off) + if rec is None: + break + kind, next_off, seq, ts_ns, obj = rec + + step = self._distance(off, next_off) + if step <= 0: + break + scanned += step + + if kind == "wrap": + off = next_off + continue + + if from_seq is None or seq >= from_seq: + yield (seq, ts_ns, obj) + + off = next_off + + if include_pending and self._pending: + base_seq = s.seq_next + ts_now = time.time_ns() + for i, obj in enumerate(self._pending): + pending_seq = base_seq + i + if from_seq is None or pending_seq >= from_seq: + yield (pending_seq, ts_now, obj) + + def export_jsonl(self, out_path: str, *, from_seq: int | None = None) -> None: + """Write current records to a JSONL file, optionally starting from a sequence number.""" + os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) + dump_kwargs: Any = self.json_dumps_kwargs + with open(out_path, "wb") as f: + for _seq, _ts_ns, obj in self.iter_records(from_seq=from_seq): + line = (json.dumps(obj, **dump_kwargs) + "\n").encode("utf-8") + f.write(line) + + # ---------- internals ---------- + + def _append_single(self, obj: Any) -> int: + """Write a single JSON object to the ring. Returns the sequence number.""" + assert self._state is not None assert self._data_mm is not None assert self._data_fd is not None s = self._state @@ -292,7 +415,9 @@ def append(self, obj: Any) -> int: total_len = _align_up(RECORD_HDR_SIZE + payload_len, ALIGN) if total_len > self.capacity - DATA_START: - raise ValueError("Single record is too large for the ring buffer capacity.") + raise ValueError( + "Single record is too large for the ring buffer capacity." + ) self._make_space(total_len) @@ -307,10 +432,14 @@ def append(self, obj: Any) -> int: ts_ns = time.time_ns() reserved = 0 - hdr_wo_total_crc = struct.pack(" int: s.commit += 1 self._write_meta(s) + self._bytes_written += total_len if self.fsync_data: - mm.flush() # flush entire mapping (simple + safe) + mm.flush() os.fsync(data_fd) return seq - def iter_records(self, *, from_seq: int | None = None) -> Iterator[tuple[int, int, Any]]: - """ - Iterate records from tail -> head, yielding (seq, ts_ns, obj). - If from_seq is provided, skips older sequences. - """ - self._require_open() + def _flush_pending(self) -> None: + """Write all buffered records to the ring and update flush stats.""" assert self._state is not None - s = self._state - - off = s.tail - scanned = 0 - limit = self.capacity - - while off != s.head and scanned < limit: - rec = self._read_record(off) - if rec is None: - break - kind, next_off, seq, ts_ns, obj = rec - - step = self._distance(off, next_off) - if step <= 0: - break - scanned += step - - if kind == "wrap": - off = next_off - continue - - if from_seq is None or seq >= from_seq: - yield (seq, ts_ns, obj) - - off = next_off + assert self._data_mm is not None + assert self._data_fd is not None - def export_jsonl(self, out_path: str, *, from_seq: int | None = None) -> None: - """Write current records to a JSONL file, optionally starting from a sequence number.""" - os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) - dump_kwargs: Any = self.json_dumps_kwargs - with open(out_path, "wb") as f: - for _seq, _ts_ns, obj in self.iter_records(from_seq=from_seq): - line = (json.dumps(obj, **dump_kwargs) + "\n").encode("utf-8") - f.write(line) + for obj in self._pending: + self._append_single(obj) - # ---------- internals ---------- + self._pending.clear() + self._physical_flushes += 1 + self._last_flush_time = time.monotonic() def _require_open(self) -> None: """Ensure all runtime handles exist before operations that require an open ring.""" @@ -388,7 +487,9 @@ def _require_open(self) -> None: or self._data_fd is None or self._meta_fd is None ): - raise RuntimeError("Ring buffer is not open(). Call open() first.") + raise RuntimeError( + "Ring buffer is not open(). Call open() first." + ) @staticmethod def _ensure_file_size(fd: int, size: int) -> None: @@ -406,7 +507,9 @@ def _write_wrap_marker(self, off: int) -> None: mm[off:off + DATA_START] = struct.pack(" ParsedRecord | None: if off + total_len > self.capacity: return None - total_len, crc, seq, ts_ns, payload_len, reserved = RECORD_HDR.unpack_from(mm, off) + total_len, crc, seq, ts_ns, payload_len, reserved = ( + RECORD_HDR.unpack_from(mm, off) + ) if reserved != 0: return None if (RECORD_HDR_SIZE + payload_len) > total_len: @@ -524,7 +629,10 @@ def _pack_meta(self, st: MetaState) -> bytes: def _unpack_meta(self, buf: bytes) -> MetaState | None: """Deserialize metadata and validate format, version, ranges, and CRC.""" try: - magic, ver, cap, head, tail, seq_next, commit, reserved, crc, pad = META_HDR.unpack(buf) + ( + magic, ver, cap, head, tail, + seq_next, commit, reserved, crc, pad, + ) = META_HDR.unpack(buf) except struct.error: return None if magic != META_MAGIC or ver != META_VERSION: @@ -533,7 +641,10 @@ def _unpack_meta(self, buf: bytes) -> MetaState | None: return None if not (DATA_START <= head < cap and DATA_START <= tail < cap): return None - return MetaState(capacity=cap, head=head, tail=tail, seq_next=seq_next, commit=commit) + return MetaState( + capacity=cap, head=head, tail=tail, + seq_next=seq_next, commit=commit, + ) def _load_meta(self) -> MetaState | None: """Load the newest valid metadata slot, or ``None`` if both are invalid.""" diff --git a/tests/test_extended.py b/tests/test_extended.py index 1b9df9f..909ccfb 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -529,3 +529,134 @@ def raise_on_write_meta(_st) -> None: assert rb._data_mm is None assert rb._meta_mm is None assert rb._state is None + + +def test_coalesce_flush_on_record_count(tmp_path: Path) -> None: + """Batch flush triggers when pending count reaches coalesce_max_records.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 256 * 1024, coalesce_max_records=3) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + # Not flushed yet (only 2 pending) + assert list(rb.iter_records()) == [] + + rb.append({"n": 3}) + # Should have flushed at 3 records + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2, 3] + + +def test_coalesce_flush_on_time_threshold( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Time-based flush triggers correctly.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + fake_time = [100.0] + + def fake_monotonic() -> float: + return fake_time[0] + + monkeypatch.setattr("sdsavior.ring.time.monotonic", fake_monotonic) + + with SDSavior( + str(data), str(meta), 256 * 1024, coalesce_max_seconds=1.0, + ) as rb: + rb.append({"n": 1}) + assert list(rb.iter_records()) == [] + + fake_time[0] = 101.5 + rb.append({"n": 2}) + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2] + + +def test_coalesce_close_flushes_pending(tmp_path: Path) -> None: + """close() flushes any pending records.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, coalesce_max_records=100, + ) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + + with SDSavior(str(data), str(meta), 256 * 1024) as rb2: + rows = list(rb2.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2] + + +def test_coalesce_explicit_flush(tmp_path: Path) -> None: + """flush() forces immediate write of pending records.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, coalesce_max_records=100, + ) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + assert list(rb.iter_records()) == [] + + rb.flush() + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2] + + +def test_coalesce_write_stats(tmp_path: Path) -> None: + """write_stats tracks logical appends, physical flushes, and bytes written.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, coalesce_max_records=3, + ) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rb.append({"n": 3}) # triggers flush + rb.append({"n": 4}) + + stats = rb.write_stats + assert stats["logical_appends"] == 4 + assert stats["physical_flushes"] == 1 + assert stats["bytes_written"] > 0 + + +def test_coalesce_iter_records_excludes_pending(tmp_path: Path) -> None: + """iter_records() default excludes pending; include_pending=True includes.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, coalesce_max_records=100, + ) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + + assert list(rb.iter_records()) == [] + + rows = list(rb.iter_records(include_pending=True)) + assert len(rows) == 2 + assert [r[2]["n"] for r in rows] == [1, 2] + + +def test_coalesce_records_survive_reopen(tmp_path: Path) -> None: + """Flushed coalesced records survive close and reopen.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, coalesce_max_records=2, + ) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) # triggers flush + rb.append({"n": 3}) # pending + + with SDSavior(str(data), str(meta), 256 * 1024) as rb2: + rows = list(rb2.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2, 3]