From f55a6f2a742f43790f0f8ad866a2d12b88963cc8 Mon Sep 17 00:00:00 2001 From: John Westerlund Date: Wed, 4 Mar 2026 12:55:37 +0100 Subject: [PATCH 1/8] fix: harden open/recovery edge cases --- src/sdsavior/ring.py | 86 ++++++++++++++++++++++++++++-------------- tests/test_basic.py | 19 ++++++++++ tests/test_extended.py | 51 ++++++++++++++++++++++++- 3 files changed, 125 insertions(+), 31 deletions(-) diff --git a/src/sdsavior/ring.py b/src/sdsavior/ring.py index eb30a3b..9ad4a70 100644 --- a/src/sdsavior/ring.py +++ b/src/sdsavior/ring.py @@ -149,14 +149,28 @@ def open(self) -> None: meta_mm: mmap.mmap | None = None try: - data_exists = os.path.exists(self.data_path) - existing_data_size = os.path.getsize(self.data_path) if data_exists else 0 - data_fd = os.open(self.data_path, os.O_RDWR | os.O_CREAT) meta_fd = os.open(self.meta_path, os.O_RDWR | os.O_CREAT) - self._ensure_file_size(data_fd, self.capacity) - self._ensure_file_size(meta_fd, META_FILE_SIZE) + data_size = os.fstat(data_fd).st_size + meta_size = os.fstat(meta_fd).st_size + is_new_data_file = data_size == 0 + + if data_size not in (0, self.capacity): + raise ValueError( + f"Data file size ({data_size}) != requested capacity ({self.capacity}). " + "Use the same capacity or create new files." + ) + if meta_size not in (0, META_FILE_SIZE): + raise ValueError( + f"Meta file size ({meta_size}) is invalid. " + f"Expected 0 or {META_FILE_SIZE} bytes." + ) + + if is_new_data_file: + os.ftruncate(data_fd, self.capacity) + if meta_size == 0: + os.ftruncate(meta_fd, META_FILE_SIZE) data_mm = mmap.mmap(data_fd, self.capacity, access=mmap.ACCESS_WRITE) meta_mm = mmap.mmap(meta_fd, META_FILE_SIZE, access=mmap.ACCESS_WRITE) @@ -170,12 +184,12 @@ def open(self) -> None: data_mm = None meta_mm = None - if existing_data_size > 0 and self._data_mm[:DATA_START] != DATA_MAGIC: + if not is_new_data_file and self._data_mm[:DATA_START] != DATA_MAGIC: raise ValueError( f"Data file {self.data_path!r} has invalid magic; " "refusing to overwrite existing data." ) - if existing_data_size == 0 and self._data_mm[:DATA_START] != DATA_MAGIC: + if is_new_data_file: self._data_mm[:DATA_START] = DATA_MAGIC self._data_mm.flush() if self.fsync_data: @@ -215,28 +229,29 @@ def open(self) -> None: def close(self) -> None: """Persist metadata and release mmap/file descriptors.""" - if self._state is not None: - self._write_meta(self._state) + try: + if self._state is not None: + self._write_meta(self._state) + finally: + if self._data_mm is not None: + self._data_mm.flush() + self._data_mm.close() + self._data_mm = None - if self._data_mm is not None: - self._data_mm.flush() - self._data_mm.close() - self._data_mm = None + if self._meta_mm is not None: + self._meta_mm.flush() + self._meta_mm.close() + self._meta_mm = None - if self._meta_mm is not None: - self._meta_mm.flush() - self._meta_mm.close() - self._meta_mm = None + if self._data_fd is not None: + os.close(self._data_fd) + self._data_fd = None - if self._data_fd is not None: - os.close(self._data_fd) - self._data_fd = None - - if self._meta_fd is not None: - os.close(self._meta_fd) - self._meta_fd = None + if self._meta_fd is not None: + os.close(self._meta_fd) + self._meta_fd = None - self._state = None + self._state = None def _cleanup_open_handles(self) -> None: """Best-effort cleanup of mapped files and fds without persisting metadata.""" @@ -331,7 +346,7 @@ def iter_records(self, *, from_seq: int | None = None) -> Iterator[tuple[int, in off = s.tail scanned = 0 - limit = self.recover_scan_limit_bytes or self.capacity + limit = self.capacity while off != s.head and scanned < limit: rec = self._read_record(off) @@ -339,7 +354,10 @@ def iter_records(self, *, from_seq: int | None = None) -> Iterator[tuple[int, in break kind, next_off, seq, ts_ns, obj = rec - scanned += self._distance(off, next_off) + step = self._distance(off, next_off) + if step <= 0: + break + scanned += step if kind == "wrap": off = next_off @@ -405,6 +423,8 @@ def _read_record(self, off: int) -> ParsedRecord | None: (total_len,) = struct.unpack_from(" self.capacity: @@ -579,14 +599,22 @@ def _recover(self) -> None: break kind, next_off, seq, _ts_ns, _obj = rec + step = self._distance(off, next_off) + if step <= 0: + s.head = last_good_off + s.commit += 1 + self._write_meta(s) + truncated = True + break + if kind == "wrap": - scanned += self._distance(off, next_off) + scanned += step off = next_off last_good_off = off continue last_seq = seq - scanned += self._distance(off, next_off) + scanned += step off = next_off last_good_off = off diff --git a/tests/test_basic.py b/tests/test_basic.py index 41e36ea..dea8c43 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -72,10 +72,12 @@ def test_open_rejects_existing_invalid_magic(tmp_path: Path) -> None: data.write_bytes(b"BAD!" + b"\x00" * (capacity - 4)) meta.write_bytes(b"") + size_before = data.stat().st_size rb = SDSavior(str(data), str(meta), capacity) with pytest.raises(ValueError, match="invalid magic"): rb.open() + assert data.stat().st_size == size_before def test_recover_truncates_when_scan_limit_is_reached(tmp_path: Path) -> None: @@ -94,6 +96,23 @@ def test_recover_truncates_when_scan_limit_is_reached(tmp_path: Path) -> None: assert [r[2]["n"] for r in rows] == [1] +def test_iter_records_ignores_recover_scan_limit_after_open(tmp_path: Path) -> None: + """Ensure iteration is not implicitly capped by recovery-only scan limits.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rb.append({"n": 3}) + + with SDSavior(str(data), str(meta), capacity) as rb2: + rb2.recover_scan_limit_bytes = 1 + rows = list(rb2.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2, 3] + + def test_write_wrap_marker_fsyncs_data_when_enabled( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, diff --git a/tests/test_extended.py b/tests/test_extended.py index 3d8910e..4d38bd5 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -161,13 +161,16 @@ def test_open_rejects_capacity_mismatch(tmp_path: Path) -> None: """Reject opening existing files when requested capacity does not match meta.""" data = tmp_path / "ring.dat" meta = tmp_path / "ring.meta" + original_capacity = 256 * 1024 - with SDSavior(str(data), str(meta), 256 * 1024) as rb: + with SDSavior(str(data), str(meta), original_capacity) as rb: rb.append({"n": 1}) - with pytest.raises(ValueError, match="Meta capacity"): + size_before = data.stat().st_size + with pytest.raises(ValueError, match="Data file size"): with SDSavior(str(data), str(meta), 16 * 1024) as rb2: rb2.iter_records() + assert data.stat().st_size == size_before def test_open_twice_requires_close(tmp_path: Path) -> None: @@ -265,3 +268,47 @@ def raise_on_load_meta(self: SDSavior) -> None: assert rb._data_mm is None assert rb._meta_mm is None assert rb._state is None + + +def test_recover_treats_wrap_marker_at_data_start_as_corruption(tmp_path: Path) -> None: + """Ensure wrap markers at DATA_START do not cause recovery/iteration hangs.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + + with SDSavior(str(data), str(meta), capacity) as rb: + assert rb._data_mm is not None + struct.pack_into(" None: + """Ensure close releases resources even if metadata persistence raises.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + rb = SDSavior(str(data), str(meta), 256 * 1024) + rb.open() + + def raise_on_write_meta(_st) -> None: + """Force close() to execute cleanup via the finally branch.""" + raise RuntimeError("forced write_meta failure") + + monkeypatch.setattr(rb, "_write_meta", raise_on_write_meta) + + with pytest.raises(RuntimeError, match="forced write_meta failure"): + rb.close() + + assert rb._data_fd is None + assert rb._meta_fd is None + assert rb._data_mm is None + assert rb._meta_mm is None + assert rb._state is None From 0665e7d89df8e9abf74159b7e1b472cb78f47f72 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 12:27:56 +0000 Subject: [PATCH 2/8] 1.0.1 Automatically generated by python-semantic-release --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1acba1f..84a89f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ +## v1.0.1 (2026-03-04) + +### Bug Fixes + +- Harden open/recovery edge cases + ([`f55a6f2`](https://github.com/well-it-wasnt-me/SDSavior/commit/f55a6f2a742f43790f0f8ad866a2d12b88963cc8)) + + ## v1.0.0 (2026-02-28) - Initial Release From f4bd2c2833b42478fb15be6ac2b63a7a17ee7250 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 13:38:20 +0000 Subject: [PATCH 3/8] fix(ci): pypy with twine --- .github/workflows/ci.yml | 23 +++++++++++++++++------ pyproject.toml | 3 ++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8795bfd..0544976 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - name: Type check run: mypy src - - name: Tests with coverage gate + - name: Tests with coverage run: pytest -q semver: @@ -118,11 +118,22 @@ jobs: needs: semver name: Upload release to PyPI runs-on: ubuntu-latest - environment: - name: pypi - url: https://pypi.org/p/sdsavior permissions: id-token: write steps: - - name: Publish package distributions to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 \ No newline at end of file + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + + - name: deps + run: python -m pip install -U build twine + + - name: build + run: python -m build + + - name: Upload package to PyPI + run: python -m twine upload dist/* + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} diff --git a/pyproject.toml b/pyproject.toml index 8edbfd8..fe6a581 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "sdsavior" -version = "0.1.0" +version = "1.0.1" description = "Crash-recoverable memory-mapped ring buffer for JSON records (SD-card friendly-ish)" readme = "README.md" requires-python = ">=3.11" @@ -51,3 +51,4 @@ select = ["E", "F", "I", "UP", "B"] [tool.pytest.ini_options] addopts = "--cov=src/sdsavior --cov-report=term-missing --cov-fail-under=90" testpaths = ["tests"] +pythonpath = ["src"] From 3f0b76a922c79593ce877fbf91df6e27dd98bff5 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 13:43:45 +0000 Subject: [PATCH 4/8] test(coverage): extended test coverage --- tests/test_basic.py | 28 ++++++ tests/test_extended.py | 216 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 243 insertions(+), 1 deletion(-) diff --git a/tests/test_basic.py b/tests/test_basic.py index dea8c43..6e9da1d 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -52,6 +52,15 @@ def test_capacity_must_be_aligned(tmp_path: Path) -> None: SDSavior(str(data), str(meta), (16 * 1024) + 1) +def test_capacity_too_small_raises(tmp_path: Path) -> None: + """Reject capacities that are too small to be useful.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with pytest.raises(ValueError, match="too small"): + SDSavior(str(data), str(meta), 8 * 1024) + + def test_json_kwargs_are_copied(tmp_path: Path) -> None: """Ensure caller-owned JSON kwargs are copied, not referenced.""" data = tmp_path / "ring.dat" @@ -113,6 +122,25 @@ def test_iter_records_ignores_recover_scan_limit_after_open(tmp_path: Path) -> N assert [r[2]["n"] for r in rows] == [1, 2, 3] +def test_append_fsyncs_data(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Verify append calls fsync when data syncing is enabled.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + fsync_calls: list[int] = [] + + def fake_fsync(fd: int) -> None: + fsync_calls.append(fd) + + monkeypatch.setattr("sdsavior.ring.os.fsync", fake_fsync) + + with SDSavior(str(data), str(meta), 16 * 1024, fsync_data=True, fsync_meta=False) as rb: + assert rb._data_fd is not None + data_fd = rb._data_fd + fsync_calls.clear() + rb.append({"n": 1}) + assert fsync_calls == [data_fd] + + def test_write_wrap_marker_fsyncs_data_when_enabled( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, diff --git a/tests/test_extended.py b/tests/test_extended.py index 4d38bd5..8bda003 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -1,14 +1,24 @@ from __future__ import annotations import json +import os import struct import sys from pathlib import Path +from types import SimpleNamespace import pytest from sdsavior import SDSavior, cli -from sdsavior.ring import RECORD_HDR, RECORD_HDR_SIZE, _crc32_bytes +from sdsavior.ring import ( + DATA_START, + META_FILE_SIZE, + RECORD_HDR, + RECORD_HDR_SIZE, + WRAP_MARKER, + MetaState, + _crc32_bytes, +) def _record_offsets(rb: SDSavior) -> list[int]: @@ -270,6 +280,210 @@ def raise_on_load_meta(self: SDSavior) -> None: assert rb._state is None +def test_open_rejects_invalid_meta_size(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 16 * 1024 + + with open(data, "wb") as f: + f.truncate(capacity) + with open(meta, "wb") as f: + f.truncate(1) + + rb = SDSavior(str(data), str(meta), capacity) + with pytest.raises(ValueError, match="Meta file size"): + rb.open() + + +def test_open_rejects_meta_capacity_mismatch(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 32 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + + other_capacity = 16 * 1024 + state = MetaState( + capacity=other_capacity, + head=DATA_START, + tail=DATA_START, + seq_next=1, + commit=1, + ) + raw = rb._pack_meta(state) + meta.write_bytes(raw + b"\x00" * (META_FILE_SIZE - len(raw))) + + rb2 = SDSavior(str(data), str(meta), capacity) + with pytest.raises(ValueError, match="Meta capacity"): + rb2.open() + + +def test_iter_records_breaks_on_corrupt_record(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._data_mm is not None + struct.pack_into(" None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._state is not None + off = DATA_START + 64 + rb._write_wrap_marker(off) + rb._state.tail = off + rb._state.head = DATA_START + assert list(rb.iter_records()) == [] + + +def test_read_record_invalid_cases(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._data_mm is not None + mm = rb._data_mm + + assert rb._read_record(0) is None + + struct.pack_into(" None: + path = tmp_path / "size.dat" + fd = os.open(path, os.O_RDWR | os.O_CREAT) + try: + SDSavior._ensure_file_size(fd, 128) + assert os.fstat(fd).st_size == 128 + finally: + os.close(fd) + + +def test_unpack_meta_rejects_invalid_buffers(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + rb = SDSavior(str(data), str(meta), 16 * 1024) + + assert rb._unpack_meta(b"short") is None + + state = MetaState( + capacity=rb.capacity, + head=DATA_START, + tail=DATA_START, + seq_next=1, + commit=1, + ) + raw = rb._pack_meta(state) + corrupt_crc = bytearray(raw) + corrupt_crc[-8] ^= 0xFF + assert rb._unpack_meta(bytes(corrupt_crc)) is None + + bad_state = MetaState( + capacity=rb.capacity, + head=0, + tail=0, + seq_next=1, + commit=1, + ) + bad_raw = rb._pack_meta(bad_state) + assert rb._unpack_meta(bad_raw) is None + + +def test_make_space_handles_corrupt_tail_record(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._state is not None + assert rb._data_mm is not None + struct.pack_into(" None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._state is not None + start_commit = rb._state.commit + + def stuck(_off: int): + return ("rec", _off, 1, 0, {}) + + monkeypatch.setattr(rb, "_read_record", stuck) + rb._recover() + assert rb._state.commit == start_commit + 1 + assert rb._state.head == rb._state.tail + + +def test_recover_wrap_marker_advances(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._state is not None + off = DATA_START + 64 + rb._write_wrap_marker(off) + rb._state.tail = off + rb._state.head = DATA_START + rb._recover() + assert rb._state.tail == off + assert rb._state.head == DATA_START + + +def test_recover_updates_seq_next(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + last_seq = rb.append({"n": 2}) + assert rb._state is not None + rb._state.seq_next = 1 + rb._recover() + assert rb._state.seq_next == last_seq + 1 + + +def test_cli_returns_nonzero_for_unknown_command(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + cli.argparse.ArgumentParser, + "parse_args", + lambda _self: SimpleNamespace(cmd="noop"), + ) + assert cli.main() == 2 + + def test_recover_treats_wrap_marker_at_data_start_as_corruption(tmp_path: Path) -> None: """Ensure wrap markers at DATA_START do not cause recovery/iteration hangs.""" data = tmp_path / "ring.dat" From fd9e5069ff4e5c79ca96b8e2ab320de1d3c36bd9 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 13:47:25 +0000 Subject: [PATCH 5/8] chore: line to long --- tests/test_extended.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_extended.py b/tests/test_extended.py index 8bda003..1b9df9f 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -429,7 +429,10 @@ def test_make_space_handles_corrupt_tail_record(tmp_path: Path) -> None: assert rb._state.tail == rb._state.head -def test_recover_handles_non_advancing_record(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: +def test_recover_handles_non_advancing_record( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch +) -> None: data = tmp_path / "ring.dat" meta = tmp_path / "ring.meta" From 3d7aa1f1fbd246cdcf26daf820fc89d32aab4655 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 13:48:46 +0000 Subject: [PATCH 6/8] 1.0.2 Automatically generated by python-semantic-release --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84a89f6..76af8ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ +## v1.0.2 (2026-03-04) + +### Bug Fixes + +- **ci**: Pypy with twine + ([`f4bd2c2`](https://github.com/well-it-wasnt-me/SDSavior/commit/f4bd2c2833b42478fb15be6ac2b63a7a17ee7250)) + +### Chores + +- Line to long + ([`fd9e506`](https://github.com/well-it-wasnt-me/SDSavior/commit/fd9e5069ff4e5c79ca96b8e2ab320de1d3c36bd9)) + +### Testing + +- **coverage**: Extended test coverage + ([`3f0b76a`](https://github.com/well-it-wasnt-me/SDSavior/commit/3f0b76a922c79593ce877fbf91df6e27dd98bff5)) + + ## v1.0.1 (2026-03-04) ### Bug Fixes From 615d75214e1a4ff753160109b4840f0c4c23ff83 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 4 Mar 2026 14:02:28 +0000 Subject: [PATCH 7/8] ci(refactor): refactored the entire ci file. lets hope, cause im tired.... --- .github/workflows/ci.yml | 50 +++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0544976..20e6bec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,15 +1,21 @@ -name: CI +name: SD Savior Pipeline on: - push: - branches: ["**"] pull_request: + push: + branches: + - main permissions: contents: read +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: validate: + name: Validate (lint, types, tests) runs-on: ubuntu-latest strategy: fail-fast: false @@ -24,6 +30,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + cache: pip - name: Install dependencies run: | @@ -36,14 +43,14 @@ jobs: - name: Type check run: mypy src - - name: Tests with coverage + - name: Tests run: pytest -q semver: - name: Release + name: Release (semantic-release) runs-on: ubuntu-latest needs: [validate] - if: github.ref == 'refs/heads/main' + if: github.event_name == 'push' && github.ref == 'refs/heads/main' permissions: contents: write env: @@ -52,6 +59,7 @@ jobs: released: ${{ steps.release.outputs.released }} tag: ${{ steps.release.outputs.tag }} commit_sha: ${{ steps.release.outputs.commit_sha }} + steps: - name: Checkout (full history for tags) uses: actions/checkout@v4 @@ -68,12 +76,14 @@ jobs: git_committer_email: "41898282+github-actions[bot]@users.noreply.github.com" docs-build: + name: Build docs + needs: [validate] if: github.event_name == 'push' && github.ref == 'refs/heads/main' - needs: validate runs-on: ubuntu-latest permissions: pages: write id-token: write + steps: - name: Checkout uses: actions/checkout@v4 @@ -82,6 +92,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: "3.11" + cache: pip - name: Install dependencies run: | @@ -100,8 +111,9 @@ jobs: path: site docs-deploy: + name: Deploy docs + needs: [docs-build] if: github.event_name == 'push' && github.ref == 'refs/heads/main' - needs: docs-build runs-on: ubuntu-latest permissions: pages: write @@ -109,31 +121,41 @@ jobs: environment: name: github-pages url: ${{ steps.deployment.outputs.page_url }} + steps: - name: Deploy to GitHub Pages id: deployment uses: actions/deploy-pages@v4 pypi-publish: - needs: semver name: Upload release to PyPI + needs: [semver] + if: github.event_name == 'push' && github.ref == 'refs/heads/main' && needs.semver.outputs.released == 'true' runs-on: ubuntu-latest permissions: id-token: write + steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 + - name: Checkout release tag + uses: actions/checkout@v4 + with: + fetch-depth: 0 + ref: ${{ needs.semver.outputs.tag }} + + - name: Set up Python + uses: actions/setup-python@v5 with: python-version: "3.11" + cache: pip - - name: deps + - name: Build tooling run: python -m pip install -U build twine - - name: build + - name: Build package run: python -m build - name: Upload package to PyPI run: python -m twine upload dist/* env: TWINE_USERNAME: __token__ - TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} \ No newline at end of file From f2697f5632de438723f98594f04456d8ecf64712 Mon Sep 17 00:00:00 2001 From: John Westerlund Date: Thu, 5 Mar 2026 16:04:27 +0100 Subject: [PATCH 8/8] feat: add async wrapper as extension module Add AsyncSDSavior in sdsavior.aio with dedicated single-thread executor for thread safety. Implements async iteration that yields records in chunks to avoid list materialization. Import explicitly from sdsavior.aio. Co-Authored-By: Claude Opus 4.6 --- docs/api.md | 29 +++++++++ docs/usage.md | 31 ++++++++++ pyproject.toml | 1 + src/sdsavior/aio.py | 148 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_async.py | 128 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 337 insertions(+) create mode 100644 src/sdsavior/aio.py create mode 100644 tests/test_async.py diff --git a/docs/api.md b/docs/api.md index 020d3f7..6f98c25 100644 --- a/docs/api.md +++ b/docs/api.md @@ -44,3 +44,32 @@ Dataclass storing persisted pointer state: ```python from sdsavior import SDSavior, MetaState ``` + +## `AsyncSDSavior` + +Async wrapper around `SDSavior`, imported from `sdsavior.aio`. + +```python +from sdsavior.aio import AsyncSDSavior +``` + +### Constructor + +`AsyncSDSavior(data_path, meta_path, capacity_bytes, *, executor=None, **kwargs)` + +- `executor`: optional `ThreadPoolExecutor`. Defaults to a dedicated single-thread executor for thread safety. +- `**kwargs`: passed through to `SDSavior`. + +### Async Lifecycle + +- `await open()` / `await close()` +- Async context manager: `async with AsyncSDSavior(...) as rb:` + +### Async Data Operations + +- `await append(obj) -> int`: append and return sequence number. +- `async for seq, ts_ns, obj in rb.iter_records(from_seq=None)`: async iteration yielding records in chunks (avoids full list materialization). +- `async for seq, ts_ns, obj in rb.from_seq(seq)`: convenience filter by sequence. +- `await export_jsonl(out_path, from_seq=None)`: export to JSONL. +- `await flush()`: flush pending coalesced records. +- `rb.ring`: access the underlying `SDSavior` instance. diff --git a/docs/usage.md b/docs/usage.md index d3ab36f..f1d62a1 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -49,3 +49,34 @@ with SDSavior("data.ring", "data.meta", 8 * 1024 * 1024) as rb: - `recover_scan_limit_bytes=None` (default): scan up to capacity during recovery. Use `fsync_data=True` when stronger durability is required and throughput tradeoffs are acceptable. + +## Async Usage + +For asyncio applications, use the async wrapper: + +```python +from sdsavior.aio import AsyncSDSavior + +async with AsyncSDSavior("data.ring", "data.meta", 8 * 1024 * 1024) as rb: + await rb.append({"event": "boot"}) + + async for seq, ts_ns, obj in rb.iter_records(): + print(seq, obj) + + # Filter by sequence + async for seq, ts_ns, obj in rb.from_seq(100): + print(seq, obj) + + # Export + await rb.export_jsonl("out/export.jsonl") +``` + +`AsyncSDSavior` uses a dedicated single-thread executor to safely wrap the non-thread-safe `SDSavior`. A custom executor can be provided: + +```python +from concurrent.futures import ThreadPoolExecutor + +executor = ThreadPoolExecutor(max_workers=1) +async with AsyncSDSavior("data.ring", "data.meta", 8 * 1024 * 1024, executor=executor) as rb: + await rb.append({"event": "boot"}) +``` diff --git a/pyproject.toml b/pyproject.toml index fe6a581..3b088e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,3 +52,4 @@ select = ["E", "F", "I", "UP", "B"] addopts = "--cov=src/sdsavior --cov-report=term-missing --cov-fail-under=90" testpaths = ["tests"] pythonpath = ["src"] +asyncio_mode = "auto" diff --git a/src/sdsavior/aio.py b/src/sdsavior/aio.py new file mode 100644 index 0000000..b6740ee --- /dev/null +++ b/src/sdsavior/aio.py @@ -0,0 +1,148 @@ +"""Async wrapper for SDSavior ring buffer. + +Usage:: + + from sdsavior.aio import AsyncSDSavior + + async with AsyncSDSavior("data.ring", "data.meta", 8 * 1024 * 1024) as rb: + await rb.append({"event": "boot"}) + async for seq, ts_ns, obj in rb.iter_records(): + print(seq, obj) +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from concurrent.futures import ThreadPoolExecutor +from typing import Any + +from .ring import SDSavior + + +class AsyncSDSavior: + """ + Async wrapper around SDSavior. + + Uses a dedicated single-thread executor by default to prevent concurrent + access to the non-thread-safe SDSavior instance. + """ + + def __init__( + self, + data_path: str, + meta_path: str, + capacity_bytes: int, + *, + executor: ThreadPoolExecutor | None = None, + **kwargs: Any, + ): + """Configure the async ring buffer wrapper. + + Args: + data_path: Path to the data ring file. + meta_path: Path to the metadata file. + capacity_bytes: Ring buffer capacity in bytes. + executor: Optional custom ThreadPoolExecutor. If None, a dedicated + single-thread executor is created for thread safety. + **kwargs: Additional keyword arguments passed to SDSavior. + """ + self._ring = SDSavior(data_path, meta_path, capacity_bytes, **kwargs) + self._owns_executor = executor is None + self._executor = executor or ThreadPoolExecutor(max_workers=1) + self._lock = asyncio.Lock() + + async def __aenter__(self) -> AsyncSDSavior: + """Open the ring buffer.""" + await self.open() + return self + + async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None: + """Close the ring buffer and shut down the executor if owned.""" + await self.close() + + async def _run(self, fn: Any, *args: Any) -> Any: + """Run a blocking function in the executor under the async lock.""" + loop = asyncio.get_running_loop() + async with self._lock: + return await loop.run_in_executor(self._executor, fn, *args) + + async def open(self) -> None: + """Open the underlying ring buffer.""" + await self._run(self._ring.open) + + async def close(self) -> None: + """Close the underlying ring buffer and shut down owned executor.""" + await self._run(self._ring.close) + if self._owns_executor: + self._executor.shutdown(wait=False) + + async def append(self, obj: Any) -> int: + """Append a JSON object and return the sequence number.""" + return await self._run(self._ring.append, obj) + + async def flush(self) -> None: + """Flush pending coalesced records if coalescing is enabled.""" + if hasattr(self._ring, "flush"): + await self._run(self._ring.flush) + + async def export_jsonl(self, out_path: str, *, from_seq: int | None = None) -> None: + """Export records to a JSONL file.""" + loop = asyncio.get_running_loop() + async with self._lock: + await loop.run_in_executor( + self._executor, + lambda: self._ring.export_jsonl(out_path, from_seq=from_seq), + ) + + async def iter_records( + self, *, from_seq: int | None = None + ) -> AsyncIterator[tuple[int, int, Any]]: + """ + Async iterator over records, yielding (seq, ts_ns, obj) one at a time. + + Records are fetched in chunks from the executor to avoid blocking the + event loop while still avoiding full list materialization for large rings. + """ + chunk_size = 256 + loop = asyncio.get_running_loop() + + def _read_chunk(skip: int) -> list[tuple[int, int, Any]]: + results: list[tuple[int, int, Any]] = [] + count = 0 + skipped = 0 + for rec in self._ring.iter_records(from_seq=from_seq): + if skipped < skip: + skipped += 1 + continue + results.append(rec) + count += 1 + if count >= chunk_size: + break + return results + + offset = 0 + while True: + async with self._lock: + chunk = await loop.run_in_executor( + self._executor, _read_chunk, offset + ) + if not chunk: + break + for rec in chunk: + yield rec + offset += len(chunk) + if len(chunk) < chunk_size: + break + + async def from_seq( + self, seq: int + ) -> AsyncIterator[tuple[int, int, Any]]: + """Convenience async iterator filtering records from a sequence number.""" + async for rec in self.iter_records(from_seq=seq): + yield rec + + @property + def ring(self) -> SDSavior: + """Access the underlying SDSavior instance (for advanced use).""" + return self._ring diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..e662afc --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import asyncio +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +import pytest + +from sdsavior.aio import AsyncSDSavior + + +@pytest.fixture +def ring_paths(tmp_path: Path) -> tuple[str, str]: + """Return data and meta file paths.""" + return str(tmp_path / "ring.dat"), str(tmp_path / "ring.meta") + + +CAPACITY = 256 * 1024 + + +@pytest.mark.asyncio +async def test_async_append_and_iter(ring_paths: tuple[str, str]) -> None: + """Basic append and iteration through async wrapper.""" + data, meta = ring_paths + async with AsyncSDSavior(data, meta, CAPACITY) as rb: + s1 = await rb.append({"n": 1}) + s2 = await rb.append({"n": 2}) + assert s2 == s1 + 1 + + rows = [rec async for rec in rb.iter_records()] + assert len(rows) == 2 + assert rows[0][2] == {"n": 1} + assert rows[1][2] == {"n": 2} + + +@pytest.mark.asyncio +async def test_async_context_manager(ring_paths: tuple[str, str]) -> None: + """Context manager opens and closes correctly.""" + data, meta = ring_paths + async with AsyncSDSavior(data, meta, CAPACITY) as rb: + await rb.append({"ok": True}) + rows = [rec async for rec in rb.iter_records()] + assert len(rows) == 1 + + +@pytest.mark.asyncio +async def test_async_export(ring_paths: tuple[str, str], tmp_path: Path) -> None: + """Export JSONL through async wrapper.""" + data, meta = ring_paths + out = str(tmp_path / "out.jsonl") + + async with AsyncSDSavior(data, meta, CAPACITY) as rb: + await rb.append({"n": 1}) + await rb.append({"n": 2}) + await rb.export_jsonl(out) + + import json + + with open(out) as f: + lines = [json.loads(line) for line in f] + assert lines == [{"n": 1}, {"n": 2}] + + +@pytest.mark.asyncio +async def test_async_flush(ring_paths: tuple[str, str]) -> None: + """flush() call works through async wrapper.""" + data, meta = ring_paths + async with AsyncSDSavior(data, meta, CAPACITY) as rb: + await rb.append({"n": 1}) + await rb.flush() # Should not raise + + +@pytest.mark.asyncio +async def test_async_concurrent_appends_serialized(ring_paths: tuple[str, str]) -> None: + """Concurrent appends are serialized by the async lock.""" + data, meta = ring_paths + async with AsyncSDSavior(data, meta, CAPACITY) as rb: + # Launch many concurrent appends + tasks = [rb.append({"n": i}) for i in range(50)] + seqs = await asyncio.gather(*tasks) + + # All sequences should be unique + assert len(set(seqs)) == 50 + + rows = [rec async for rec in rb.iter_records()] + assert len(rows) == 50 + + +@pytest.mark.asyncio +async def test_async_custom_executor(ring_paths: tuple[str, str]) -> None: + """Custom executor is used instead of default single-thread executor.""" + data, meta = ring_paths + custom_executor = ThreadPoolExecutor(max_workers=1) + try: + async with AsyncSDSavior(data, meta, CAPACITY, executor=custom_executor) as rb: + await rb.append({"n": 1}) + rows = [rec async for rec in rb.iter_records()] + assert len(rows) == 1 + finally: + custom_executor.shutdown(wait=True) + + +@pytest.mark.asyncio +async def test_async_reopen_persistence(ring_paths: tuple[str, str]) -> None: + """Records persist across close and reopen.""" + data, meta = ring_paths + + async with AsyncSDSavior(data, meta, CAPACITY) as rb: + await rb.append({"n": 1}) + await rb.append({"n": 2}) + + async with AsyncSDSavior(data, meta, CAPACITY) as rb2: + rows = [rec async for rec in rb2.iter_records()] + assert [r[2]["n"] for r in rows] == [1, 2] + + +@pytest.mark.asyncio +async def test_async_from_seq(ring_paths: tuple[str, str]) -> None: + """from_seq filters records by sequence number.""" + data, meta = ring_paths + + async with AsyncSDSavior(data, meta, CAPACITY) as rb: + await rb.append({"n": 1}) + s2 = await rb.append({"n": 2}) + await rb.append({"n": 3}) + + rows = [rec async for rec in rb.from_seq(s2)] + assert [r[2]["n"] for r in rows] == [2, 3]