From 831bec87a68f90a64d7114e7738e1d2b3a2f35fd Mon Sep 17 00:00:00 2001 From: Cheng Cao Date: Tue, 24 Mar 2026 16:44:14 -0700 Subject: [PATCH 1/7] Fix abi3 dataloader lifecycle handling --- TODOS.md | 10 + dataloader.zig | 147 +++++++---- .../2026-03-24-limited-abi-remediation.md | 188 ++++++++++++++ lua_dataloader.zig | 58 ++++- lua_rt.zig | 18 ++ python/python.zig | 229 ++++++++++-------- python/tests/test_limited_abi.py | 213 ++++++++++++++++ 7 files changed, 707 insertions(+), 156 deletions(-) create mode 100644 TODOS.md create mode 100644 docs/plans/2026-03-24-limited-abi-remediation.md create mode 100644 python/tests/test_limited_abi.py diff --git a/TODOS.md b/TODOS.md new file mode 100644 index 0000000..4bc7072 --- /dev/null +++ b/TODOS.md @@ -0,0 +1,10 @@ +# TODOs + +## Re-import stress test for module-state binding + +- **What:** Add a follow-up stress test that imports the extension multiple times in one process and verifies module-owned type state stays isolated. +- **Why:** The current plan widens the architecture to module-owned state but intentionally skips explicit repeated-import or multi-module-object verification. +- **Pros:** Closes the largest remaining review gap and gives direct evidence that the redesign solved the architecture problem it set out to solve. +- **Cons:** Adds test complexity and may require awkward import mechanics or a dedicated subprocess harness. +- **Context:** `python/python.zig` is being redesigned away from process-global type pointers toward module-owned state. The current implementation plan keeps subprocess lifecycle coverage and negative-path tests, but it explicitly defers repeated-import coverage. +- **Depends on / blocked by:** The module-state redesign in `docs/plans/2026-03-24-limited-abi-remediation.md` landing first. diff --git a/dataloader.zig b/dataloader.zig index 38a475b..4df23ef 100644 --- a/dataloader.zig +++ b/dataloader.zig @@ -7,15 +7,35 @@ const concurrent_ring = @import("concurrent_ring.zig"); const logger = std.log.scoped(.dataloader); const wlog = std.log.scoped(.dataloader_io_thread); -pub const FileHandle = packed struct { - _: u14 = 0, - idx: u20, - generation: u20, - path_checksum: u8, -}; +pub const FileHandle = u64; + +const file_handle_idx_bits = 20; +const file_handle_generation_bits = 20; +const file_handle_checksum_bits = 8; +const file_handle_generation_shift = file_handle_checksum_bits; +const file_handle_idx_shift = file_handle_generation_shift + file_handle_generation_bits; +const file_handle_field_mask = (1 << file_handle_idx_bits) - 1; + +const max_file_slots = std.math.maxInt(u20); +const max_generation = std.math.maxInt(u20); + +inline fn makeFileHandle(idx: u20, generation: u20, checksum: u8) FileHandle { + return (@as(u64, idx) << file_handle_idx_shift) | + (@as(u64, generation) << file_handle_generation_shift) | + checksum; +} + +inline fn fileHandleIdx(file: FileHandle) usize { + return @intCast((file >> file_handle_idx_shift) & file_handle_field_mask); +} -const max_file_slots = std.math.maxInt(@FieldType(FileHandle, "idx")); -const max_generation = std.math.maxInt(@FieldType(FileHandle, "generation")); +inline fn fileHandleGeneration(file: FileHandle) u32 { + return @intCast((file >> file_handle_generation_shift) & file_handle_field_mask); +} + +inline fn fileHandleChecksum(file: FileHandle) u8 { + return @intCast(file & std.math.maxInt(u8)); +} pub const ReadBlockReq = struct { base: u64, @@ -101,17 +121,15 @@ pub const LoaderCtx = struct { req_mem_pool: std.heap.MemoryPool(XevReq), fn findFreeFileSlot(self: *Self) !FileHandle { + const file_slots = self.file_slots[0..]; + // This isn't called too often, just linear scan for (0..max_file_slots) |offset| { const i = (self.last_slot + offset) % max_file_slots; - const f = self.file_slots[i]; + const f = file_slots[i]; if (f == null) { self.last_slot = (i + 1) % max_file_slots; - return .{ - .idx = @intCast(i), - .generation = 0, - .path_checksum = 0, - }; + return makeFileHandle(@intCast(i), 0, 0); } } @@ -119,10 +137,14 @@ pub const LoaderCtx = struct { } fn checkFilehandle(self: *Self, file: FileHandle) !void { - const slot: usize = @intCast(file.idx); - if (self.file_slots[slot] == null) return LoaderError.InvalidFileHandle; - if (self.file_slots_generation[slot] != file.generation or self.file_slots_checksum[slot] != file.path_checksum) { - logger.warn("File handle {} is corrupted, current generation: {}, checksum: {}", .{ file, self.file_slots_generation[slot], self.file_slots_checksum[slot] }); + const slot = fileHandleIdx(file); + const file_slots = self.file_slots[0..]; + const generations = self.file_slots_generation[0..]; + const checksums = self.file_slots_checksum[0..]; + + if (file_slots[slot] == null) return LoaderError.InvalidFileHandle; + if (generations[slot] != fileHandleGeneration(file) or checksums[slot] != fileHandleChecksum(file)) { + logger.warn("File handle {} is corrupted, current generation: {}, checksum: {}", .{ file, generations[slot], checksums[slot] }); return LoaderError.InvalidFileHandle; } } @@ -153,12 +175,15 @@ pub const LoaderCtx = struct { const self = ud orelse unreachable; const xreq: *XevReq = @fieldParentPtr("c", c); + const slot = fileHandleIdx(xreq.req.file); + const request_id = xreq.request_id; const actual = r catch { - self.sendResponseSynced(xreq.request_id, LoaderError.ReadError); + self.fileDecRef(slot); + self.req_mem_pool.destroy(xreq); + self.sendResponseSynced(request_id, LoaderError.ReadError); return .disarm; }; - const slot: usize = @intCast(xreq.req.file.idx); self.fileDecRef(slot); if (actual != xreq.req.result_buffer.len) { @@ -172,24 +197,34 @@ pub const LoaderCtx = struct { } fn fileAddRef(self: *Self, slot: usize) void { + const refcounts = self.file_refcount[0..]; + // Should start on 1 - std.debug.assert(self.file_refcount[slot] > 0); - self.file_refcount[slot] += 1; + std.debug.assert(refcounts[slot] > 0); + refcounts[slot] += 1; } fn fileDecRef(self: *Self, slot: usize) void { - std.debug.assert(self.file_refcount[slot] > 0); - self.file_refcount[slot] -= 1; - if (self.file_refcount[slot] == 0) { - const f = self.file_slots[slot] orelse unreachable; + const refcounts = self.file_refcount[0..]; + const file_slots = self.file_slots[0..]; + + std.debug.assert(refcounts[slot] > 0); + refcounts[slot] -= 1; + if (refcounts[slot] == 0) { + const f = file_slots[slot] orelse unreachable; f.close(); - self.file_slots[slot] = null; + file_slots[slot] = null; } } fn handleReq(self: *Self, req_id: u64, req: Request) void { switch (req) { .open_file => |open_req| { + const file_slots = self.file_slots[0..]; + const xfile_slots = self.xfile_slots[0..]; + const refcounts = self.file_refcount[0..]; + const generations = self.file_slots_generation[0..]; + const checksums = self.file_slots_checksum[0..]; const file_path = open_req.file_path; wlog.debug("Req {}: open_file: file = {s}", .{ req_id, file_path }); @@ -206,16 +241,16 @@ pub const LoaderCtx = struct { const xf = xev.File.init(f) catch unreachable; // Commit state - const slot: usize = @intCast(h.idx); - h.path_checksum = path_checksum(file_path); - self.file_slots[slot] = f; - self.xfile_slots[slot] = xf; - self.file_refcount[slot] = 1; - self.file_slots_generation[slot] += 1; // gen 0 is reserved to catch errors - self.file_slots_checksum[slot] = h.path_checksum; - const gen = self.file_slots_generation[slot]; + const slot = fileHandleIdx(h); + const checksum = path_checksum(file_path); + file_slots[slot] = f; + xfile_slots[slot] = xf; + refcounts[slot] = 1; + generations[slot] += 1; // gen 0 is reserved to catch errors + checksums[slot] = checksum; + const gen = generations[slot]; if (gen > max_generation) @panic("Open file generation overflow"); - h.generation = @intCast(gen); + h = makeFileHandle(@intCast(slot), @intCast(gen), checksum); self.sendResponseSynced(req_id, .{ .open_file = h }); }, @@ -228,7 +263,7 @@ pub const LoaderCtx = struct { return; }; - const slot: usize = @intCast(file_handle.idx); + const slot = fileHandleIdx(file_handle); self.fileDecRef(slot); }, @@ -241,7 +276,7 @@ pub const LoaderCtx = struct { return; }; - const slot: usize = @intCast(read_req.file.idx); + const slot = fileHandleIdx(read_req.file); const xf = self.xfile_slots[slot]; // Prepare read request @@ -383,14 +418,26 @@ pub const LoaderCtx = struct { logger.debug("Worker thread started", .{}); } - pub fn init(alloc: std.mem.Allocator) !Self { - return .{ - .alloc = alloc, - .file_slots = [_]?std.fs.File{null} ** max_file_slots, - .file_slots_generation = [_]u32{0} ** max_file_slots, - .loop = try xev.Loop.init(.{}), - .req_mem_pool = try std.heap.MemoryPool(XevReq).initPreheated(alloc, 16), - }; + pub fn initInPlace(self: *Self, alloc: std.mem.Allocator) !void { + self.alloc = alloc; + self.request_ring = ReqRing.init(); + self.result_ring = ResultRing.init(); + self.file_slots = @splat(null); + self.xfile_slots = undefined; + self.file_refcount = @splat(0); + self.file_slots_generation = std.mem.zeroes([max_file_slots]u32); + self.file_slots_checksum = undefined; + self.loop = try xev.Loop.init(.{}); + errdefer self.loop.deinit(); + self.worker_thread = null; + self.req_cnt = 0; + self.is_running = false; + self.is_draining = false; + self.last_slot = 0; + self.debug_max_req_id = std.math.maxInt(u64); + self.tick = 0; + self.debug_max_tick = std.math.maxInt(u64); + self.req_mem_pool = try std.heap.MemoryPool(XevReq).initPreheated(alloc, 16); } pub fn deinit(self: *Self) void { @@ -421,7 +468,9 @@ test "test dataloader" { var debug_alloc = std.heap.DebugAllocator(.{}).init; defer _ = debug_alloc.deinit(); - var ctx = try LoaderCtx.init(debug_alloc.allocator()); + const ctx = try debug_alloc.allocator().create(LoaderCtx); + defer debug_alloc.allocator().destroy(ctx); + try ctx.initInPlace(debug_alloc.allocator()); ctx.debug_max_req_id = 5; ctx.debug_max_tick = 1000; try ctx.start(); @@ -450,7 +499,9 @@ test "test dataloader blocked join" { var debug_alloc = std.heap.DebugAllocator(.{}).init; defer _ = debug_alloc.deinit(); - var ctx = try LoaderCtx.init(debug_alloc.allocator()); + const ctx = try debug_alloc.allocator().create(LoaderCtx); + defer debug_alloc.allocator().destroy(ctx); + try ctx.initInPlace(debug_alloc.allocator()); ctx.debug_max_req_id = 100; ctx.debug_max_tick = 1000; try ctx.start(); diff --git a/docs/plans/2026-03-24-limited-abi-remediation.md b/docs/plans/2026-03-24-limited-abi-remediation.md new file mode 100644 index 0000000..7e07505 --- /dev/null +++ b/docs/plans/2026-03-24-limited-abi-remediation.md @@ -0,0 +1,188 @@ +# Ultar Limited ABI Remediation Implementation Plan + +> **Execution:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development to implement this plan task-by-task. +> **Plan Review:** Before execution, dispatch a subagent to review this plan using the appropriate review skill. +> **Important:** This plan is not implementation code. Do not write code or heavy pseudocode in this document. + +**Goal:** Remove non-compliant Limited-API object/type access from Ultar's Python binding and replace it with documented Python 3.11 abi3-compatible lifecycle handling using module-owned type state. + +**Architecture:** Keep the current Zig extension structure and abi3 packaging intact, but redesign the binding around module-owned state rather than process-global type pointers. The implementation should move from single-phase, global-type initialization to a clean module-state design with module-associated heap types, a small explicit `ModuleState` helper layer, and documented lifecycle handling for allocation, partial initialization, teardown, and test coverage. + +**Tech Stack:** Zig, CPython Limited API / Stable ABI (`Py_LIMITED_API=0x030b0000`), setuptools abi3 wheel packaging + +--- + +## What Already Exists + +- `python/python.zig` already contains the full binding surface: type definitions, alloc/dealloc paths, row wrapping, and ownership comments. +- `build.zig` and `python/setup.py` already agree on `_native.abi3` and `cp311` wheel tagging; they are verification targets, not redesign targets. +- `python/tests/test_dataloader.py` and `python/tests/loader_script.lua` already define the current public behavior and Lua-side loading shape. +- Previous commits in this repo already show lifecycle fragility in this area (`087626b`, `4ce71c3`), so teardown and ownership paths deserve extra rigor. + +## Architecture Diagram + +```text +Before +====== + +process globals + | + +--> DataLoaderType + +--> LoadedRowType + | + v + wrapOwnedRow() allocates rows + +After +===== + +module object + | + v ++-------------------+ +| ModuleState | +| - DataLoaderType | +| - LoadedRowType | ++-------------------+ + | | + | +--> wrapOwnedRow() resolves LoadedRowType from module state + | + +--> module init creates module-associated heap types + +instance lifecycle + DataLoader.__new__ -> native loader init -> iteration -> LoadedRow alloc + | | + +------ partial-init cleanup ------+ + | + clear/dealloc destroy exactly once +``` + +## Not in Scope + +- Broad Stable-ABI cleanup outside `python/python.zig` - this plan stays focused on the Ultar binding. +- Packaging refactors in `build.zig` or `python/setup.py` unless implementation uncovers a concrete abi3 mismatch. +- Explicit repeated-import or multiple-module-object verification in this change - deferred to a follow-up TODO. +- Repo-local fixture reuse from `test_data/` - this plan uses generated temporary fixtures instead. + +### Task 1: Convert module initialization to module-owned state + +**Outcome:** `python/python.zig` no longer relies on process-global type pointers and instead creates module-associated heap types stored in explicit module state. + +**Why it exists:** The current binding combines a Limited-API violation with process-global type state, and the review chose a full module-state redesign rather than a narrow patch. + +**Files:** +- Modify: `python/python.zig` + +**Implementation Notes:** +- Replace the current single-phase, global-type setup with module-owned state and module-associated type creation. +- Add one small explicit `ModuleState` struct and tiny lookup helpers; do not introduce a broader context abstraction. +- Remove `pyType()` and any equivalent direct `PyObject` layout access while doing the init refactor. +- Update the top-of-file ownership/object-hierarchy comment in `python/python.zig` so it matches the new module-state design. +- Add an inline ASCII diagram comment near module init and type/state ownership if the final code still spans multiple non-obvious steps. + +**Verification:** +- Run: `grep -n "ob_type\|pyType\|Py_TYPE" python/python.zig` +- Expect: no direct object-header access remains in `python/python.zig` + +**Open Questions / Risks:** +- Module-state redesign implies import/init changes, so partial-init cleanup of the module and its types must be explicit and easy to audit. + +### Task 2: Align heap-type lifecycle and partial-init rules + +**Outcome:** `DataLoader` and `LoadedRow` follow documented Limited-API-compatible allocation, partial-init, clearing, and destruction rules under the new module-state design. + +**Why it exists:** Fixing `ob_type` access alone is not enough if the surrounding heap-type lifecycle still assumes CPython internals or uses a deallocation pattern that depends on hidden layout details. + +**Files:** +- Modify: `python/python.zig` + +**Implementation Notes:** +- Revisit both heap type specs in `python/python.zig`, but only enable GC where the object graph requires it. +- `LoadedRowObject` owns a Python reference to `parent`, so if it is GC-tracked then it must define `Py_TPFLAGS_HAVE_GC`, `tp_traverse`, and `tp_clear`, with documented ordering around clear/dealloc. +- `DataLoaderObject` still needs correct heap-type destruction semantics even if it does not join GC. +- Make partial-init rules explicit: Python-owned fields must be zeroed before fallible work, and clear/dealloc paths must tolerate partially initialized instances. +- If a custom `tp_dealloc` remains, make the documented heap-type rule explicit: free instance-owned resources, call the correct free path for the type, and then `Py_DECREF` the heap type object. +- Use `PyObject_GC_UnTrack()` only for types that actually set `Py_TPFLAGS_HAVE_GC`. +- Preserve the default `tp_free` unless the docs require otherwise; do not introduce a custom free slot as part of this remediation. +- Preserve the existing native ownership invariants: `DataLoaderObject.loader` is destroyed once, and `LoadedRowObject.row` is reclaimed once before the parent reference is released. + +**Verification:** +- Run: `zig build python-bindings -Doptimize=ReleaseSafe` +- Expect: `_native.abi3` still builds successfully with no new Python C-API compile errors +- Run: `python -m build --wheel --no-isolation python/` +- Expect: wheel build still emits an abi3 wheel tagged for `cp311` + +**Open Questions / Risks:** +- The exact boundary between `tp_clear`, `tp_finalize`, and `tp_dealloc` should be validated against Python 3.11 docs during implementation so cleanup remains correct under both refcount-driven destruction and GC. +- GC tracking semantics depend on how the type is allocated through `tp_alloc`; avoid double-tracking or mixing GC and non-GC free paths. +- Module-state lookup should stay explicit and small; if the implementation starts adding many helpers, stop and simplify. + +### Task 3: Add regression coverage for binding lifecycle behavior + +**Outcome:** The Python binding has repeatable regression coverage that exercises object creation and teardown in a way that can catch interpreter-crashing lifecycle bugs. + +**Why it exists:** abi3 compliance bugs often hide in destruction and cleanup paths, so the implementation needs a regression check beyond a compile-only change. + +**Files:** +- Create: `python/tests/test_limited_abi.py` +- Modify: `python/tests/test_dataloader.py` only if existing helpers are genuinely worth reusing + +**Implementation Notes:** +- Require at least one subprocess-style regression that imports the extension, constructs and tears down objects repeatedly, and asserts clean process exit; this is the only reliable way to catch some teardown crashes. +- Generate temporary fixtures for the tests rather than depending on a machine-specific dataset. +- Generate the temporary fixture once per test module or session and reuse it across subprocess checks. +- Add explicit negative-path tests for constructor failure and teardown-adjacent error paths, not just happy-path iteration. +- Focus coverage on import, object construction, row wrapping, reference release, and repeated cleanup or `gc.collect()` scenarios that would surface invalid deallocation behavior. +- Do not add repeated-import or multi-module-object coverage in this change; that follow-up is tracked in `TODOS.md`. + +**Verification:** +- Run: `python -m pytest python/tests/test_limited_abi.py -q` +- Run: `python -m pytest python/tests/test_dataloader.py -q` +- Expect: the binding imports, iterates, and tears down cleanly without crashes or refcount-related failures, and the subprocess regression exits successfully + +**Open Questions / Risks:** +- Temp-fixture generation must stay shared and cheap, or the test suite will become slow enough that people stop running it. + +### Task 4: Re-check the remaining Limited-API surface in the binding + +**Outcome:** The post-fix binding has a short documented list of any remaining Python C-API usages that were reviewed and intentionally left in place. + +**Why it exists:** The original issue surfaced through one wrapper, but the same file contains other boundary-touching APIs that should be explicitly reviewed before calling the abi3 work done. + +**Files:** +- Modify: `python/python.zig` +- Review: `build.zig` +- Review: `python/setup.py` + +**Implementation Notes:** +- Review the built-in type checks, thread-state APIs, and allocation paths in `python/python.zig` against Python 3.11 Limited-API documentation. +- Only change packaging files if the remediation reveals a mismatch with the already-declared `cp311` abi3 target. +- Keep the result small and grep-able so future audits can see which APIs were intentionally retained. +- Record the approved remainder in this plan document rather than in a source-code audit comment. + +**Verification:** +- Run: `zig build python-bindings -Doptimize=ReleaseSafe && python -m build --wheel --no-isolation python/` +- Expect: the extension and wheel packaging still agree on `_native.abi3` and `cp311-abi3` + +**Open Questions / Risks:** +- Some APIs may be part of the Stable ABI even if they are not commonly used in limited-API examples; implementation should distinguish "documented and acceptable" from "happens to compile today". + +**Reviewed remainder after implementation:** +- Built-in type checks remain intentionally in place via `PyObject_IsInstance(..., &PyUnicode_Type / &PyLong_Type / &PyDict_Type)` because those built-in type objects are documented Stable-ABI surfaces for the Python 3.11 floor. +- GIL handoff around native loader construction remains intentionally in place via `PyEval_SaveThread` / `PyEval_RestoreThread`; both are documented Stable-ABI thread-state APIs and the work done between them avoids Python object access. +- Module-owned state remains intentionally implemented via `PyModule_GetState`, `PyModuleDef_Init`, `PyType_FromModuleAndSpec`, `PyType_GetModuleState`, and `PyModule_AddObjectRef`; these are the documented multi-phase/module-state APIs for the 3.11 limited-API target. +- Heap-type allocation and teardown remain intentionally implemented via `PyType_GetSlot` for `tp_alloc` / `tp_free`; future refactors should preserve the documented `tp_free` lookup plus heap-type `Py_DECREF` pattern rather than reintroducing direct object-header access. +- Packaging declarations remain aligned without further changes: `python/python.zig` keeps `Py_LIMITED_API=0x030b0000`, `build.zig` builds and copies `_native.abi3`, and `python/setup.py` keeps `py_limited_api=True` with wheel tag `cp311`. + +### Rollout Notes + +**Order of execution:** Complete lifecycle remediation in `python/python.zig` before adding or updating tests, then rerun the packaging path. + +**Success criteria:** +- No direct `PyObject` header access remains in the binding. +- No process-global type pointers remain in the binding. +- The extension still builds as `_native.abi3` for the existing `cp311` target. +- The revised object lifecycle passes Python-level smoke coverage without teardown crashes. +- Remaining Python C-API calls in the binding have been consciously reviewed against Limited-API documentation. + +**Recommended review focus:** Use `plan-eng-review` in a subagent to pressure-test the GC/deallocation assumptions before implementation starts. diff --git a/lua_dataloader.zig b/lua_dataloader.zig index 5019399..36bc881 100644 --- a/lua_dataloader.zig +++ b/lua_dataloader.zig @@ -26,7 +26,7 @@ const c_u8ptr = [*c]const u8; pub const LoadedRow = extern struct { keys: [*c]c_u8ptr = null, data: [*c]c_u8ptr = null, - sizes: [*c]c_uint = null, + sizes: [*c]u64 = null, num_keys: c_uint = 0, }; @@ -80,14 +80,14 @@ pub const LuaDataLoader = struct { }, close_file: struct { // args - file_handle: u32, + file_handle: u64, }, add_entry: struct { // args key: [:0]const u8, offset: u64, size: u32, - file_handle: u32, + file_handle: u64, // state entry: ?*Row.Entry = null, }, @@ -143,7 +143,7 @@ pub const LuaDataLoader = struct { fn gCloseFile(lua: *Lua) !i32 { const loader = try lua.toUserdata(Self, 1); - const handle: u32 = try lua_rt.toUnsigned(lua, 2); + const handle: u64 = try lua_rt.toUnsigned64(lua, 2); loader.u_yielded_from = .{ .close_file = .{ .file_handle = handle, @@ -154,7 +154,7 @@ pub const LuaDataLoader = struct { fn gAddEntry(lua: *Lua) !i32 { const loader = try lua.toUserdata(Self, 1); - const handle: u32 = try lua_rt.toUnsigned(lua, 2); + const handle: u64 = try lua_rt.toUnsigned64(lua, 2); // We yield after this function & the string ref should live long enough const key = try lua.toString(3); const offset: u64 = @intFromFloat(try lua.toNumber(4)); @@ -358,7 +358,7 @@ pub const LuaDataLoader = struct { switch (payload) { .open_file => |f| { std.debug.assert((self.u_yielded_from orelse @panic("Unresolved open_file req")) == .open_file); - lua_rt.pushUnsigned(self.lua, @intCast(@as(u32, @bitCast(f)))); + lua_rt.pushUnsigned64(self.lua, f); self.u_resume_nargs = 1; self.u_yielded_from = null; }, @@ -557,20 +557,56 @@ pub const LuaDataLoader = struct { pub fn init(spec: LuaLoaderSpec, alloc: std.mem.Allocator) !*Self { var self = try alloc.create(Self); - const now = try std.time.Instant.now(); - self.* = .{ .alloc = alloc, .load_rid_to_row = try std.AutoArrayHashMapUnmanaged(u64, *Row).init(alloc, &.{}, &.{}), .last_instant = now, .last_log_instant = now }; errdefer alloc.destroy(self); + const now = try std.time.Instant.now(); + self.alloc = alloc; + self.loader = undefined; + self.lua = undefined; + self.u_loader_fn = .{}; + self.u_ctx = 0; + self.u_ctx_funcs_table = 0; + self.u_resume_nargs = 0; + self.u_yielded_from = null; + self.u_completed = false; + self.queue_size_rows = 4; + self.in_progress_row = null; + self.queue = .{}; + self.queue_len = 0; + self.row_buf_mutex = .{}; + self.free_list = .{}; + self.num_floating_rows = 0; + self.load_rid_to_row = try std.AutoArrayHashMapUnmanaged(u64, *Row).init(alloc, &.{}, &.{}); + self.last_instant = now; + self.last_log_instant = now; + self.mbps_smoothed = 0.0; + self.mbps_period_max = 0.0; + self.samples_count = 0; + var row_initialized = false; + var loader_initialized = false; + var lua_initialized = false; + errdefer { + if (lua_initialized) self.lua.deinit(); + if (loader_initialized) self.loader.deinit(); + if (row_initialized) { + const row = self.in_progress_row orelse unreachable; + row.deinit(); + self.alloc.destroy(row); + self.in_progress_row = null; + } + self.load_rid_to_row.deinit(self.alloc); + } try self.newInprogressRow(); + row_initialized = true; - self.loader = try LoaderCtx.init(alloc); - errdefer self.loader.deinit(); + try self.loader.initInPlace(alloc); + loader_initialized = true; try self.loader.start(); // This init the Lua VM, // runs init_ctx, and setup generator coroutine self.lua = try Lua.init(alloc); - errdefer self.lua.deinit(); + lua_initialized = true; try self.initLua(spec); return self; diff --git a/lua_rt.zig b/lua_rt.zig index d6afb4a..150c42f 100644 --- a/lua_rt.zig +++ b/lua_rt.zig @@ -88,6 +88,15 @@ pub fn toUnsigned(lua: *Lua, idx: i32) !u32 { } } +pub fn toUnsigned64(lua: *Lua, idx: i32) !u64 { + if (zlua.lang == .luau or zlua.lang == .lua52) { + return lua.toUnsigned(idx); + } else { + const f = try lua.toNumber(idx); + return @intFromFloat(f); + } +} + pub fn pushUnsigned(lua: *Lua, v: u32) void { if (zlua.lang == .luau or zlua.lang == .lua52) { lua.pushUnsigned(v); @@ -97,6 +106,15 @@ pub fn pushUnsigned(lua: *Lua, v: u32) void { } } +pub fn pushUnsigned64(lua: *Lua, v: u64) void { + if (zlua.lang == .luau or zlua.lang == .lua52) { + lua.pushUnsigned(@intCast(v)); + } else { + const f: f64 = @floatFromInt(v); + lua.pushNumber(f); + } +} + pub fn printLuaErr(lua: *Lua, err: zlua.Error) zlua.Error { switch (err) { error.LuaError, error.LuaRuntime, error.LuaSyntax => { diff --git a/python/python.zig b/python/python.zig index fa21108..649135e 100644 --- a/python/python.zig +++ b/python/python.zig @@ -1,33 +1,44 @@ //! Python ABI3 bindings for ultar DataLoader. //! -//! ## Object Hierarchy +//! ## Ownership Layout //! //! ``` +//! module object +//! `-- ModuleState +//! |-- data_loader_type: ?*PyTypeObject +//! `-- loaded_row_type: ?*PyTypeObject +//! //! DataLoaderObject -//! ├── ob_base: PyObject (refcount managed by Python) -//! └── loader: *LuaLoaderCCtx (native, destroyed in dealloc) +//! |-- ob_base: PyObject (refcount managed by Python) +//! |-- typ: ?*PyTypeObject (cached heap type for dealloc) +//! `-- loader: ?*LuaLoaderCCtx (native, destroyed once in dealloc) //! //! LoadedRowObject -//! ├── ob_base: PyObject (refcount managed by Python) -//! ├── parent: ?*DataLoaderObject (incref'd reference to parent) -//! └── row: ?*LoadedRow (native ptr, reclaimed to parent's loader) +//! |-- ob_base: PyObject (refcount managed by Python) +//! |-- typ: ?*PyTypeObject (cached heap type for dealloc) +//! |-- parent: ?*DataLoaderObject (incref'd reference to parent) +//! `-- row: ?*LoadedRow (native ptr, reclaimed once before parent release) //! ``` //! //! ## Reference Ownership //! //! - `DataLoaderObject`: Created by `tp_new`, returned to Python with refcount=1. -//! Caller owns it. On dealloc, destroys the native loader. +//! Caller owns it. The instance caches its heap type pointer so custom dealloc can +//! call `tp_free` and discharge the heap-type reference without reading `PyObject` +//! headers directly. //! //! - `LoadedRowObject`: Created by `dataLoaderNext`, returned with refcount=1. //! Holds an incref'd reference to its parent `DataLoaderObject` to keep it alive. -//! On dealloc, reclaims the native row to the parent's loader, then decrefs parent. +//! It also caches its heap type pointer for the same dealloc rule. On dealloc, +//! it reclaims the native row to the parent's loader, then decrefs parent. //! //! - No reference cycles: LoadedRow → DataLoader (one-way ownership). //! //! ## Error Handling Pattern //! -//! Internal functions use Zig error semantics (`PyError!T`) with `errdefer` for cleanup. -//! C ABI wrappers catch errors and set Python exceptions. +//! Internal functions use Zig error semantics (`PyError!T`) with explicit partial-init +//! cleanup. Python-owned fields are zeroed before fallible work so clear/dealloc paths +//! tolerate partially initialized instances. //! //! ## Thread Safety //! @@ -52,16 +63,7 @@ const py = @cImport({ const zeros = std.mem.zeroes; -/// Get the type of a Python object (ABI3-safe replacement for Py_TYPE) -/// In limited API, Py_TYPE may not be exported as a symbol in all Python versions -inline fn pyType(obj: ?*py.PyObject) ?*py.PyTypeObject { - if (obj) |o| { - return o.ob_type; - } - return null; -} - -// ABI3-safe type checking helpers (avoid *_Check macros which use Py_TYPE internally) +// ABI3-safe type checking helpers (avoid *_Check macros that inspect object headers) inline fn isUnicode(obj: ?*py.PyObject) bool { if (obj) |o| { return py.PyObject_IsInstance(o, @ptrCast(@alignCast(&py.PyUnicode_Type))) == 1; @@ -83,23 +85,47 @@ inline fn isDict(obj: ?*py.PyObject) bool { return false; } +const ModuleState = struct { + data_loader_type: ?*py.PyTypeObject = null, + loaded_row_type: ?*py.PyTypeObject = null, +}; + +inline fn moduleState(module: *py.PyObject) *ModuleState { + return @ptrCast(@alignCast(py.PyModule_GetState(module).?)); +} + +inline fn moduleStateFromType(typ: *py.PyTypeObject) *ModuleState { + return @ptrCast(@alignCast(py.PyType_GetModuleState(typ).?)); +} + +inline fn loadedRowType(parent: *DataLoaderObject) PyError!*py.PyTypeObject { + const parent_type = parent.typ orelse return error.RuntimeError; + return moduleStateFromType(parent_type).loaded_row_type orelse return error.RuntimeError; +} + +fn freeHeapTypeInstance(typ: ?*py.PyTypeObject, self_obj: ?*py.PyObject) void { + const heap_type = typ orelse return; + const free_fn = py.PyType_GetSlot(heap_type, py.Py_tp_free) orelse unreachable; + const free: *const fn (?*anyopaque) callconv(.c) void = @ptrCast(@alignCast(free_fn)); + free(self_obj); + py.Py_DecRef(@ptrCast(@alignCast(heap_type))); +} + // Our DataLoader object const DataLoaderObject = extern struct { ob_base: py.PyObject, + typ: ?*py.PyTypeObject, loader: ?*LuaLoaderCCtx, }; // Our LoadedRow object (represents a single row from the dataloader) const LoadedRowObject = extern struct { ob_base: py.PyObject, + typ: ?*py.PyTypeObject, parent: ?*DataLoaderObject, // Keep reference to parent row: ?*LoadedRow, }; -// Type objects - stored as PyObject pointers (opaque with limited API) -var DataLoaderType: ?*py.PyTypeObject = null; -var LoadedRowType: ?*py.PyTypeObject = null; - // Slot definitions for DataLoader type const DataLoader_slots = [_]py.PyType_Slot{ .{ .slot = py.Py_tp_new, .pfunc = @ptrCast(@constCast(&dataLoaderNew)) }, @@ -281,14 +307,11 @@ fn dataLoaderNewImpl( const alloc_fn = py.PyType_GetSlot(typ, py.Py_tp_alloc) orelse return error.RuntimeError; const alloc: *const fn (?*py.PyTypeObject, py.Py_ssize_t) callconv(.c) ?*py.PyObject = @ptrCast(@alignCast(alloc_fn)); const self_obj = alloc(typ, 0) orelse return error.PythonException; - errdefer { - if (py.PyType_GetSlot(typ, py.Py_tp_free)) |f| { - const free: *const fn (?*anyopaque) callconv(.c) void = @ptrCast(@alignCast(f)); - free(self_obj); - } - } const self: *DataLoaderObject = @ptrCast(@alignCast(self_obj)); + self.typ = typ; + self.loader = null; + errdefer freeHeapTypeInstance(self.typ, self_obj); // Release GIL during heavy initialization const gil_state = py.PyEval_SaveThread(); @@ -353,21 +376,20 @@ fn dataLoaderNew(typ: ?*py.PyTypeObject, args: ?*py.PyObject, kwargs: ?*py.PyObj return @ptrCast(self); } -fn dataLoaderDealloc(self_obj: ?*py.PyObject) callconv(.c) void { - const self: *DataLoaderObject = @ptrCast(@alignCast(self_obj)); - +fn dataLoaderClear(self: *DataLoaderObject) void { if (self.loader) |loader| { - lua_dataloader.ultarDestroyLuaLoader(loader); self.loader = null; + lua_dataloader.ultarDestroyLuaLoader(loader); } +} - // Get the type and call tp_free - const typ = pyType(self_obj); - const free_fn = py.PyType_GetSlot(typ, py.Py_tp_free); - if (free_fn) |f| { - const free: *const fn (?*anyopaque) callconv(.c) void = @ptrCast(@alignCast(f)); - free(self_obj); - } +fn dataLoaderDealloc(self_obj: ?*py.PyObject) callconv(.c) void { + const self: *DataLoaderObject = @ptrCast(@alignCast(self_obj)); + + dataLoaderClear(self); + const typ = self.typ; + self.typ = null; + freeHeapTypeInstance(typ, self_obj); } fn dataLoaderRepr(_: ?*py.PyObject) callconv(.c) ?*py.PyObject { @@ -415,13 +437,16 @@ fn dataLoaderNext(self_obj: ?*py.PyObject) callconv(.c) ?*py.PyObject { /// On success: The returned Python object owns `row` and will reclaim it on dealloc. /// On failure: `row` is returned, caller must handle reclaim. fn wrapOwnedRow(parent: *DataLoaderObject, row: *LoadedRow) PyError!*py.PyObject { - const typ = LoadedRowType orelse return error.RuntimeError; + const typ = try loadedRowType(parent); const alloc_fn = py.PyType_GetSlot(typ, py.Py_tp_alloc) orelse return error.RuntimeError; const alloc: *const fn (?*py.PyTypeObject, py.Py_ssize_t) callconv(.c) ?*py.PyObject = @ptrCast(@alignCast(alloc_fn)); const self_obj = alloc(typ, 0) orelse return error.PythonException; const row_obj: *LoadedRowObject = @ptrCast(@alignCast(self_obj)); + row_obj.typ = typ; + row_obj.parent = null; + row_obj.row = null; row_obj.parent = parent; row_obj.row = row; @@ -430,37 +455,28 @@ fn wrapOwnedRow(parent: *DataLoaderObject, row: *LoadedRow) PyError!*py.PyObject return self_obj; } -fn loadedRowDealloc(self_obj: ?*py.PyObject) callconv(.c) void { - const self: *LoadedRowObject = @ptrCast(@alignCast(self_obj)); - - // Reclaim the row before releasing parent - // Null out row immediately to prevent double-free on any error path +fn loadedRowClear(self: *LoadedRowObject) void { const row_to_reclaim = self.row; self.row = null; if (self.parent) |parent| { + self.parent = null; if (row_to_reclaim) |row| { if (parent.loader) |loader| { lua_dataloader.ultarReclaimRow(loader, row); } - // Note: if parent.loader is null, the loader was already destroyed. - // This shouldn't happen with correct refcounting (we hold a ref to parent), - // but if it does, the row memory is already freed by ultarDestroyLuaLoader. } - self.parent = null; py.Py_DecRef(@ptrCast(parent)); } - // Note: if self.parent is null but row_to_reclaim was set, we have a bug - // in createLoadedRowObject. The row is leaked but we can't reclaim it - // without knowing which loader it belongs to. - - // Get the type and call tp_free - const typ = pyType(self_obj); - const free_fn = py.PyType_GetSlot(typ, py.Py_tp_free); - if (free_fn) |f| { - const free: *const fn (?*anyopaque) callconv(.c) void = @ptrCast(@alignCast(f)); - free(self_obj); - } +} + +fn loadedRowDealloc(self_obj: ?*py.PyObject) callconv(.c) void { + const self: *LoadedRowObject = @ptrCast(@alignCast(self_obj)); + + loadedRowClear(self); + const typ = self.typ; + self.typ = null; + freeHeapTypeInstance(typ, self_obj); } fn loadedRowRepr(self_obj: ?*py.PyObject) callconv(.c) ?*py.PyObject { @@ -645,6 +661,55 @@ const module_methods = [_]py.PyMethodDef{ std.mem.zeroes(py.PyMethodDef), }; +fn moduleTraverse(module_obj: ?*py.PyObject, visit: py.visitproc, arg: ?*anyopaque) callconv(.c) c_int { + const state = moduleState(module_obj.?); + + if (state.data_loader_type) |typ| { + if (visit.?(@ptrCast(@alignCast(typ)), arg) != 0) return -1; + } + if (state.loaded_row_type) |typ| { + if (visit.?(@ptrCast(@alignCast(typ)), arg) != 0) return -1; + } + return 0; +} + +fn moduleClear(module_obj: ?*py.PyObject) callconv(.c) c_int { + const state = moduleState(module_obj.?); + + if (state.data_loader_type) |typ| { + state.data_loader_type = null; + py.Py_DecRef(@ptrCast(@alignCast(typ))); + } + if (state.loaded_row_type) |typ| { + state.loaded_row_type = null; + py.Py_DecRef(@ptrCast(@alignCast(typ))); + } + return 0; +} + +fn moduleExec(module_obj: ?*py.PyObject) callconv(.c) c_int { + const module = module_obj orelse return -1; + const state = moduleState(module); + state.* = zeros(ModuleState); + + state.data_loader_type = @ptrCast(py.PyType_FromModuleAndSpec(module, &DataLoader_spec, null)); + if (state.data_loader_type == null) return -1; + errdefer _ = moduleClear(module_obj); + + state.loaded_row_type = @ptrCast(py.PyType_FromModuleAndSpec(module, &LoadedRow_spec, null)); + if (state.loaded_row_type == null) return -1; + + if (py.PyModule_AddObjectRef(module, "DataLoader", @ptrCast(@alignCast(state.data_loader_type))) < 0) return -1; + if (py.PyModule_AddObjectRef(module, "LoadedRow", @ptrCast(@alignCast(state.loaded_row_type))) < 0) return -1; + + return 0; +} + +const module_slots = [_]py.PyModuleDef_Slot{ + .{ .slot = py.Py_mod_exec, .value = @ptrCast(@constCast(&moduleExec)) }, + zeros(py.PyModuleDef_Slot), +}; + var module_def: py.PyModuleDef = undefined; var module_def_initialized = false; @@ -656,46 +721,16 @@ fn getModuleDef() *py.PyModuleDef { module_def.m_name = "ultar_dataloader._native"; module_def.m_doc = "Fast async dataloader with Lua scripting (Zig implementation)"; - module_def.m_size = -1; + module_def.m_size = @sizeOf(ModuleState); module_def.m_methods = @ptrCast(@constCast(&module_methods)); + module_def.m_slots = @ptrCast(@constCast(&module_slots)); + module_def.m_traverse = &moduleTraverse; + module_def.m_clear = &moduleClear; module_def_initialized = true; } return &module_def; } export fn PyInit__native() ?*py.PyObject { - // Create DataLoader type using PyType_FromSpec - DataLoaderType = @ptrCast(py.PyType_FromSpec(&DataLoader_spec)); - if (DataLoaderType == null) { - return null; - } - - // Create LoadedRow type using PyType_FromSpec - LoadedRowType = @ptrCast(py.PyType_FromSpec(&LoadedRow_spec)); - if (LoadedRowType == null) { - py.Py_DecRef(@ptrCast(@alignCast(DataLoaderType))); - return null; - } - - const m = py.PyModule_Create(getModuleDef()) orelse { - py.Py_DecRef(@ptrCast(@alignCast(DataLoaderType))); - py.Py_DecRef(@ptrCast(@alignCast(LoadedRowType))); - return null; - }; - - if (py.PyModule_AddObjectRef(m, "DataLoader", @ptrCast(@alignCast(DataLoaderType))) < 0) { - py.Py_DecRef(m); - py.Py_DecRef(@ptrCast(@alignCast(DataLoaderType))); - py.Py_DecRef(@ptrCast(@alignCast(LoadedRowType))); - return null; - } - - if (py.PyModule_AddObjectRef(m, "LoadedRow", @ptrCast(@alignCast(LoadedRowType))) < 0) { - py.Py_DecRef(m); - py.Py_DecRef(@ptrCast(@alignCast(DataLoaderType))); - py.Py_DecRef(@ptrCast(@alignCast(LoadedRowType))); - return null; - } - - return m; + return py.PyModuleDef_Init(getModuleDef()); } diff --git a/python/tests/test_limited_abi.py b/python/tests/test_limited_abi.py new file mode 100644 index 0000000..836e019 --- /dev/null +++ b/python/tests/test_limited_abi.py @@ -0,0 +1,213 @@ +import gc +import io +import os +import subprocess +import sys +import tarfile +import textwrap +from dataclasses import dataclass +from pathlib import Path + +import pytest + + +REPO_ROOT = Path(__file__).resolve().parents[2] +PACKAGE_ROOT = REPO_ROOT / "python" / "src" +INDEXER = REPO_ROOT / "zig-out" / "bin" / "indexer" +LOADER_SCRIPT = Path(__file__).with_name("loader_script.lua") + + +@dataclass(frozen=True) +class GeneratedFixture: + tar_path: Path + index_path: Path + + +def _make_tar_fixture(root: Path) -> GeneratedFixture: + tar_path = root / "generated.tar" + rows = { + "row0": { + ".txt": b"first row text", + ".json": b'{"row": 0}', + ".bin": bytes([0, 1, 2, 3]), + }, + "row1": { + ".txt": b"second row text", + ".json": b'{"row": 1}', + ".bin": bytes([4, 5, 6, 7]), + }, + "row2": { + ".txt": b"third row text", + ".json": b'{"row": 2}', + ".bin": bytes([8, 9, 10, 11]), + }, + } + + with tarfile.open(tar_path, "w") as archive: + for row_name, entries in rows.items(): + for suffix, payload in entries.items(): + member = tarfile.TarInfo(f"{row_name}{suffix}") + member.size = len(payload) + archive.addfile(member, io.BytesIO(payload)) + + subprocess.run([str(INDEXER), "-f", str(tar_path)], cwd=root, check=True) + return GeneratedFixture(tar_path=tar_path, index_path=Path(f"{tar_path}.utix")) + + +def _pythonpath_env() -> dict[str, str]: + env = os.environ.copy() + path_parts = [str(PACKAGE_ROOT)] + if current := env.get("PYTHONPATH"): + path_parts.append(current) + env["PYTHONPATH"] = os.pathsep.join(path_parts) + return env + + +def _run_subprocess( + case: str, generated_fixture: GeneratedFixture +) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [ + sys.executable, + "-c", + textwrap.dedent(case), + str(LOADER_SCRIPT), + str(generated_fixture.tar_path), + str(generated_fixture.index_path), + ], + cwd=REPO_ROOT, + env=_pythonpath_env(), + capture_output=True, + text=True, + ) + + +def _assert_clean_exit(result: subprocess.CompletedProcess[str]) -> None: + assert result.returncode == 0, ( + f"subprocess exited with {result.returncode}\n" + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) + + +@pytest.fixture(scope="module") +def generated_fixture(tmp_path_factory: pytest.TempPathFactory) -> GeneratedFixture: + root = tmp_path_factory.mktemp("limited-abi") + return _make_tar_fixture(root) + + +def test_subprocess_repeated_construction_and_cleanup( + generated_fixture: GeneratedFixture, +) -> None: + result = _run_subprocess( + """ + import gc + import sys + from pathlib import Path + + from ultar_dataloader import DataLoader + + script = Path(sys.argv[1]).read_text() + config = { + "tar_path": sys.argv[2], + "idx_path": sys.argv[3], + "max_rows": "2", + } + + for _ in range(40): + loader = DataLoader(src=script, config=config) + rows = list(loader) + assert len(rows) == 2 + assert rows[0].keys() == [".txt", ".json", ".bin"] + assert rows[0][".txt"] == b"first row text" + assert rows[0].to_dict()[".json"] == b'{"row": 0}' + del rows + del loader + for _ in range(3): + gc.collect() + """, + generated_fixture, + ) + + _assert_clean_exit(result) + + +def test_subprocess_constructor_failure_cleanup( + generated_fixture: GeneratedFixture, +) -> None: + result = _run_subprocess( + """ + import gc + from ultar_dataloader import DataLoader + + for _ in range(40): + try: + DataLoader(src="this is not lua") + except RuntimeError: + pass + else: + raise AssertionError("expected RuntimeError for invalid Lua source") + for _ in range(3): + gc.collect() + """, + generated_fixture, + ) + + _assert_clean_exit(result) + + +def test_subprocess_row_error_paths_and_parent_release( + generated_fixture: GeneratedFixture, +) -> None: + result = _run_subprocess( + """ + import gc + import sys + from pathlib import Path + + from ultar_dataloader import DataLoader + + script = Path(sys.argv[1]).read_text() + config = { + "tar_path": sys.argv[2], + "idx_path": sys.argv[3], + "max_rows": "1", + } + + for _ in range(40): + loader = DataLoader(src=script, config=config) + row = next(iter(loader)) + del loader + gc.collect() + + assert row[0] == b"first row text" + assert row[-1] == bytes([0, 1, 2, 3]) + + try: + row[99] + except IndexError: + pass + else: + raise AssertionError("expected IndexError for out-of-range row access") + + try: + row[".missing"] + except KeyError: + pass + else: + raise AssertionError("expected KeyError for missing row key") + + del row + for _ in range(3): + gc.collect() + """, + generated_fixture, + ) + + _assert_clean_exit(result) + + +def test_generated_fixture_reuse_is_stable(generated_fixture: GeneratedFixture) -> None: + assert generated_fixture.tar_path.exists() + assert generated_fixture.index_path.exists() + gc.collect() From 0c0944a19b64e710ad12462335fe646d44613317 Mon Sep 17 00:00:00 2001 From: Cheng Cao Date: Tue, 24 Mar 2026 17:55:54 -0700 Subject: [PATCH 2/7] Simplify dataloader cleanup paths --- dataloader.zig | 70 +++++++++++++++------------------------------- lua_dataloader.zig | 28 +++++++------------ 2 files changed, 32 insertions(+), 66 deletions(-) diff --git a/dataloader.zig b/dataloader.zig index 4df23ef..de07533 100644 --- a/dataloader.zig +++ b/dataloader.zig @@ -7,36 +7,16 @@ const concurrent_ring = @import("concurrent_ring.zig"); const logger = std.log.scoped(.dataloader); const wlog = std.log.scoped(.dataloader_io_thread); -pub const FileHandle = u64; - -const file_handle_idx_bits = 20; -const file_handle_generation_bits = 20; -const file_handle_checksum_bits = 8; -const file_handle_generation_shift = file_handle_checksum_bits; -const file_handle_idx_shift = file_handle_generation_shift + file_handle_generation_bits; -const file_handle_field_mask = (1 << file_handle_idx_bits) - 1; +pub const FileHandle = packed struct { + _: u16 = 0, + idx: u20, + generation: u20, + path_checksum: u8, +}; const max_file_slots = std.math.maxInt(u20); const max_generation = std.math.maxInt(u20); -inline fn makeFileHandle(idx: u20, generation: u20, checksum: u8) FileHandle { - return (@as(u64, idx) << file_handle_idx_shift) | - (@as(u64, generation) << file_handle_generation_shift) | - checksum; -} - -inline fn fileHandleIdx(file: FileHandle) usize { - return @intCast((file >> file_handle_idx_shift) & file_handle_field_mask); -} - -inline fn fileHandleGeneration(file: FileHandle) u32 { - return @intCast((file >> file_handle_generation_shift) & file_handle_field_mask); -} - -inline fn fileHandleChecksum(file: FileHandle) u8 { - return @intCast(file & std.math.maxInt(u8)); -} - pub const ReadBlockReq = struct { base: u64, file: FileHandle, @@ -129,7 +109,7 @@ pub const LoaderCtx = struct { const f = file_slots[i]; if (f == null) { self.last_slot = (i + 1) % max_file_slots; - return makeFileHandle(@intCast(i), 0, 0); + return .{ .idx = @intCast(i), .generation = 0, .path_checksum = 0 }; } } @@ -137,13 +117,13 @@ pub const LoaderCtx = struct { } fn checkFilehandle(self: *Self, file: FileHandle) !void { - const slot = fileHandleIdx(file); + const slot: usize = file.idx; const file_slots = self.file_slots[0..]; const generations = self.file_slots_generation[0..]; const checksums = self.file_slots_checksum[0..]; if (file_slots[slot] == null) return LoaderError.InvalidFileHandle; - if (generations[slot] != fileHandleGeneration(file) or checksums[slot] != fileHandleChecksum(file)) { + if (generations[slot] != file.generation or checksums[slot] != file.path_checksum) { logger.warn("File handle {} is corrupted, current generation: {}, checksum: {}", .{ file, generations[slot], checksums[slot] }); return LoaderError.InvalidFileHandle; } @@ -175,7 +155,7 @@ pub const LoaderCtx = struct { const self = ud orelse unreachable; const xreq: *XevReq = @fieldParentPtr("c", c); - const slot = fileHandleIdx(xreq.req.file); + const slot: usize = xreq.req.file.idx; const request_id = xreq.request_id; const actual = r catch { @@ -220,13 +200,7 @@ pub const LoaderCtx = struct { fn handleReq(self: *Self, req_id: u64, req: Request) void { switch (req) { .open_file => |open_req| { - const file_slots = self.file_slots[0..]; - const xfile_slots = self.xfile_slots[0..]; - const refcounts = self.file_refcount[0..]; - const generations = self.file_slots_generation[0..]; - const checksums = self.file_slots_checksum[0..]; - const file_path = open_req.file_path; - wlog.debug("Req {}: open_file: file = {s}", .{ req_id, file_path }); + wlog.debug("Req {}: open_file: file = {s}", .{ req_id, open_req.file_path }); var h = self.findFreeFileSlot() catch |err| { self.sendResponseSynced(req_id, err); @@ -241,16 +215,16 @@ pub const LoaderCtx = struct { const xf = xev.File.init(f) catch unreachable; // Commit state - const slot = fileHandleIdx(h); - const checksum = path_checksum(file_path); - file_slots[slot] = f; - xfile_slots[slot] = xf; - refcounts[slot] = 1; - generations[slot] += 1; // gen 0 is reserved to catch errors - checksums[slot] = checksum; - const gen = generations[slot]; + const slot: usize = h.idx; + const checksum = path_checksum(open_req.file_path); + self.file_slots[slot] = f; + self.xfile_slots[slot] = xf; + self.file_refcount[slot] = 1; + self.file_slots_generation[slot] += 1; // gen 0 is reserved to catch errors + self.file_slots_checksum[slot] = checksum; + const gen = self.file_slots_generation[slot]; if (gen > max_generation) @panic("Open file generation overflow"); - h = makeFileHandle(@intCast(slot), @intCast(gen), checksum); + h = .{ .idx = @intCast(slot), .generation = @intCast(gen), .path_checksum = checksum }; self.sendResponseSynced(req_id, .{ .open_file = h }); }, @@ -263,7 +237,7 @@ pub const LoaderCtx = struct { return; }; - const slot = fileHandleIdx(file_handle); + const slot: usize = file_handle.idx; self.fileDecRef(slot); }, @@ -276,7 +250,7 @@ pub const LoaderCtx = struct { return; }; - const slot = fileHandleIdx(read_req.file); + const slot: usize = read_req.file.idx; const xf = self.xfile_slots[slot]; // Prepare read request diff --git a/lua_dataloader.zig b/lua_dataloader.zig index 36bc881..83b4c15 100644 --- a/lua_dataloader.zig +++ b/lua_dataloader.zig @@ -358,7 +358,7 @@ pub const LuaDataLoader = struct { switch (payload) { .open_file => |f| { std.debug.assert((self.u_yielded_from orelse @panic("Unresolved open_file req")) == .open_file); - lua_rt.pushUnsigned64(self.lua, f); + lua_rt.pushUnsigned64(self.lua, @bitCast(f)); self.u_resume_nargs = 1; self.u_yielded_from = null; }, @@ -581,32 +581,24 @@ pub const LuaDataLoader = struct { self.mbps_smoothed = 0.0; self.mbps_period_max = 0.0; self.samples_count = 0; - var row_initialized = false; - var loader_initialized = false; - var lua_initialized = false; - errdefer { - if (lua_initialized) self.lua.deinit(); - if (loader_initialized) self.loader.deinit(); - if (row_initialized) { - const row = self.in_progress_row orelse unreachable; - row.deinit(); - self.alloc.destroy(row); - self.in_progress_row = null; - } - self.load_rid_to_row.deinit(self.alloc); - } + errdefer self.load_rid_to_row.deinit(self.alloc); try self.newInprogressRow(); - row_initialized = true; + errdefer { + const row = self.in_progress_row orelse unreachable; + row.deinit(); + self.alloc.destroy(row); + self.in_progress_row = null; + } try self.loader.initInPlace(alloc); - loader_initialized = true; + errdefer self.loader.deinit(); try self.loader.start(); // This init the Lua VM, // runs init_ctx, and setup generator coroutine self.lua = try Lua.init(alloc); - lua_initialized = true; + errdefer self.lua.deinit(); try self.initLua(spec); return self; From b0ca444ba61a6249059826ebb00d3355a181de04 Mon Sep 17 00:00:00 2001 From: Cheng Cao Date: Tue, 24 Mar 2026 17:57:42 -0700 Subject: [PATCH 3/7] Remove planning files from PR --- TODOS.md | 10 - .../2026-03-24-limited-abi-remediation.md | 188 ------------------ 2 files changed, 198 deletions(-) delete mode 100644 TODOS.md delete mode 100644 docs/plans/2026-03-24-limited-abi-remediation.md diff --git a/TODOS.md b/TODOS.md deleted file mode 100644 index 4bc7072..0000000 --- a/TODOS.md +++ /dev/null @@ -1,10 +0,0 @@ -# TODOs - -## Re-import stress test for module-state binding - -- **What:** Add a follow-up stress test that imports the extension multiple times in one process and verifies module-owned type state stays isolated. -- **Why:** The current plan widens the architecture to module-owned state but intentionally skips explicit repeated-import or multi-module-object verification. -- **Pros:** Closes the largest remaining review gap and gives direct evidence that the redesign solved the architecture problem it set out to solve. -- **Cons:** Adds test complexity and may require awkward import mechanics or a dedicated subprocess harness. -- **Context:** `python/python.zig` is being redesigned away from process-global type pointers toward module-owned state. The current implementation plan keeps subprocess lifecycle coverage and negative-path tests, but it explicitly defers repeated-import coverage. -- **Depends on / blocked by:** The module-state redesign in `docs/plans/2026-03-24-limited-abi-remediation.md` landing first. diff --git a/docs/plans/2026-03-24-limited-abi-remediation.md b/docs/plans/2026-03-24-limited-abi-remediation.md deleted file mode 100644 index 7e07505..0000000 --- a/docs/plans/2026-03-24-limited-abi-remediation.md +++ /dev/null @@ -1,188 +0,0 @@ -# Ultar Limited ABI Remediation Implementation Plan - -> **Execution:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development to implement this plan task-by-task. -> **Plan Review:** Before execution, dispatch a subagent to review this plan using the appropriate review skill. -> **Important:** This plan is not implementation code. Do not write code or heavy pseudocode in this document. - -**Goal:** Remove non-compliant Limited-API object/type access from Ultar's Python binding and replace it with documented Python 3.11 abi3-compatible lifecycle handling using module-owned type state. - -**Architecture:** Keep the current Zig extension structure and abi3 packaging intact, but redesign the binding around module-owned state rather than process-global type pointers. The implementation should move from single-phase, global-type initialization to a clean module-state design with module-associated heap types, a small explicit `ModuleState` helper layer, and documented lifecycle handling for allocation, partial initialization, teardown, and test coverage. - -**Tech Stack:** Zig, CPython Limited API / Stable ABI (`Py_LIMITED_API=0x030b0000`), setuptools abi3 wheel packaging - ---- - -## What Already Exists - -- `python/python.zig` already contains the full binding surface: type definitions, alloc/dealloc paths, row wrapping, and ownership comments. -- `build.zig` and `python/setup.py` already agree on `_native.abi3` and `cp311` wheel tagging; they are verification targets, not redesign targets. -- `python/tests/test_dataloader.py` and `python/tests/loader_script.lua` already define the current public behavior and Lua-side loading shape. -- Previous commits in this repo already show lifecycle fragility in this area (`087626b`, `4ce71c3`), so teardown and ownership paths deserve extra rigor. - -## Architecture Diagram - -```text -Before -====== - -process globals - | - +--> DataLoaderType - +--> LoadedRowType - | - v - wrapOwnedRow() allocates rows - -After -===== - -module object - | - v -+-------------------+ -| ModuleState | -| - DataLoaderType | -| - LoadedRowType | -+-------------------+ - | | - | +--> wrapOwnedRow() resolves LoadedRowType from module state - | - +--> module init creates module-associated heap types - -instance lifecycle - DataLoader.__new__ -> native loader init -> iteration -> LoadedRow alloc - | | - +------ partial-init cleanup ------+ - | - clear/dealloc destroy exactly once -``` - -## Not in Scope - -- Broad Stable-ABI cleanup outside `python/python.zig` - this plan stays focused on the Ultar binding. -- Packaging refactors in `build.zig` or `python/setup.py` unless implementation uncovers a concrete abi3 mismatch. -- Explicit repeated-import or multiple-module-object verification in this change - deferred to a follow-up TODO. -- Repo-local fixture reuse from `test_data/` - this plan uses generated temporary fixtures instead. - -### Task 1: Convert module initialization to module-owned state - -**Outcome:** `python/python.zig` no longer relies on process-global type pointers and instead creates module-associated heap types stored in explicit module state. - -**Why it exists:** The current binding combines a Limited-API violation with process-global type state, and the review chose a full module-state redesign rather than a narrow patch. - -**Files:** -- Modify: `python/python.zig` - -**Implementation Notes:** -- Replace the current single-phase, global-type setup with module-owned state and module-associated type creation. -- Add one small explicit `ModuleState` struct and tiny lookup helpers; do not introduce a broader context abstraction. -- Remove `pyType()` and any equivalent direct `PyObject` layout access while doing the init refactor. -- Update the top-of-file ownership/object-hierarchy comment in `python/python.zig` so it matches the new module-state design. -- Add an inline ASCII diagram comment near module init and type/state ownership if the final code still spans multiple non-obvious steps. - -**Verification:** -- Run: `grep -n "ob_type\|pyType\|Py_TYPE" python/python.zig` -- Expect: no direct object-header access remains in `python/python.zig` - -**Open Questions / Risks:** -- Module-state redesign implies import/init changes, so partial-init cleanup of the module and its types must be explicit and easy to audit. - -### Task 2: Align heap-type lifecycle and partial-init rules - -**Outcome:** `DataLoader` and `LoadedRow` follow documented Limited-API-compatible allocation, partial-init, clearing, and destruction rules under the new module-state design. - -**Why it exists:** Fixing `ob_type` access alone is not enough if the surrounding heap-type lifecycle still assumes CPython internals or uses a deallocation pattern that depends on hidden layout details. - -**Files:** -- Modify: `python/python.zig` - -**Implementation Notes:** -- Revisit both heap type specs in `python/python.zig`, but only enable GC where the object graph requires it. -- `LoadedRowObject` owns a Python reference to `parent`, so if it is GC-tracked then it must define `Py_TPFLAGS_HAVE_GC`, `tp_traverse`, and `tp_clear`, with documented ordering around clear/dealloc. -- `DataLoaderObject` still needs correct heap-type destruction semantics even if it does not join GC. -- Make partial-init rules explicit: Python-owned fields must be zeroed before fallible work, and clear/dealloc paths must tolerate partially initialized instances. -- If a custom `tp_dealloc` remains, make the documented heap-type rule explicit: free instance-owned resources, call the correct free path for the type, and then `Py_DECREF` the heap type object. -- Use `PyObject_GC_UnTrack()` only for types that actually set `Py_TPFLAGS_HAVE_GC`. -- Preserve the default `tp_free` unless the docs require otherwise; do not introduce a custom free slot as part of this remediation. -- Preserve the existing native ownership invariants: `DataLoaderObject.loader` is destroyed once, and `LoadedRowObject.row` is reclaimed once before the parent reference is released. - -**Verification:** -- Run: `zig build python-bindings -Doptimize=ReleaseSafe` -- Expect: `_native.abi3` still builds successfully with no new Python C-API compile errors -- Run: `python -m build --wheel --no-isolation python/` -- Expect: wheel build still emits an abi3 wheel tagged for `cp311` - -**Open Questions / Risks:** -- The exact boundary between `tp_clear`, `tp_finalize`, and `tp_dealloc` should be validated against Python 3.11 docs during implementation so cleanup remains correct under both refcount-driven destruction and GC. -- GC tracking semantics depend on how the type is allocated through `tp_alloc`; avoid double-tracking or mixing GC and non-GC free paths. -- Module-state lookup should stay explicit and small; if the implementation starts adding many helpers, stop and simplify. - -### Task 3: Add regression coverage for binding lifecycle behavior - -**Outcome:** The Python binding has repeatable regression coverage that exercises object creation and teardown in a way that can catch interpreter-crashing lifecycle bugs. - -**Why it exists:** abi3 compliance bugs often hide in destruction and cleanup paths, so the implementation needs a regression check beyond a compile-only change. - -**Files:** -- Create: `python/tests/test_limited_abi.py` -- Modify: `python/tests/test_dataloader.py` only if existing helpers are genuinely worth reusing - -**Implementation Notes:** -- Require at least one subprocess-style regression that imports the extension, constructs and tears down objects repeatedly, and asserts clean process exit; this is the only reliable way to catch some teardown crashes. -- Generate temporary fixtures for the tests rather than depending on a machine-specific dataset. -- Generate the temporary fixture once per test module or session and reuse it across subprocess checks. -- Add explicit negative-path tests for constructor failure and teardown-adjacent error paths, not just happy-path iteration. -- Focus coverage on import, object construction, row wrapping, reference release, and repeated cleanup or `gc.collect()` scenarios that would surface invalid deallocation behavior. -- Do not add repeated-import or multi-module-object coverage in this change; that follow-up is tracked in `TODOS.md`. - -**Verification:** -- Run: `python -m pytest python/tests/test_limited_abi.py -q` -- Run: `python -m pytest python/tests/test_dataloader.py -q` -- Expect: the binding imports, iterates, and tears down cleanly without crashes or refcount-related failures, and the subprocess regression exits successfully - -**Open Questions / Risks:** -- Temp-fixture generation must stay shared and cheap, or the test suite will become slow enough that people stop running it. - -### Task 4: Re-check the remaining Limited-API surface in the binding - -**Outcome:** The post-fix binding has a short documented list of any remaining Python C-API usages that were reviewed and intentionally left in place. - -**Why it exists:** The original issue surfaced through one wrapper, but the same file contains other boundary-touching APIs that should be explicitly reviewed before calling the abi3 work done. - -**Files:** -- Modify: `python/python.zig` -- Review: `build.zig` -- Review: `python/setup.py` - -**Implementation Notes:** -- Review the built-in type checks, thread-state APIs, and allocation paths in `python/python.zig` against Python 3.11 Limited-API documentation. -- Only change packaging files if the remediation reveals a mismatch with the already-declared `cp311` abi3 target. -- Keep the result small and grep-able so future audits can see which APIs were intentionally retained. -- Record the approved remainder in this plan document rather than in a source-code audit comment. - -**Verification:** -- Run: `zig build python-bindings -Doptimize=ReleaseSafe && python -m build --wheel --no-isolation python/` -- Expect: the extension and wheel packaging still agree on `_native.abi3` and `cp311-abi3` - -**Open Questions / Risks:** -- Some APIs may be part of the Stable ABI even if they are not commonly used in limited-API examples; implementation should distinguish "documented and acceptable" from "happens to compile today". - -**Reviewed remainder after implementation:** -- Built-in type checks remain intentionally in place via `PyObject_IsInstance(..., &PyUnicode_Type / &PyLong_Type / &PyDict_Type)` because those built-in type objects are documented Stable-ABI surfaces for the Python 3.11 floor. -- GIL handoff around native loader construction remains intentionally in place via `PyEval_SaveThread` / `PyEval_RestoreThread`; both are documented Stable-ABI thread-state APIs and the work done between them avoids Python object access. -- Module-owned state remains intentionally implemented via `PyModule_GetState`, `PyModuleDef_Init`, `PyType_FromModuleAndSpec`, `PyType_GetModuleState`, and `PyModule_AddObjectRef`; these are the documented multi-phase/module-state APIs for the 3.11 limited-API target. -- Heap-type allocation and teardown remain intentionally implemented via `PyType_GetSlot` for `tp_alloc` / `tp_free`; future refactors should preserve the documented `tp_free` lookup plus heap-type `Py_DECREF` pattern rather than reintroducing direct object-header access. -- Packaging declarations remain aligned without further changes: `python/python.zig` keeps `Py_LIMITED_API=0x030b0000`, `build.zig` builds and copies `_native.abi3`, and `python/setup.py` keeps `py_limited_api=True` with wheel tag `cp311`. - -### Rollout Notes - -**Order of execution:** Complete lifecycle remediation in `python/python.zig` before adding or updating tests, then rerun the packaging path. - -**Success criteria:** -- No direct `PyObject` header access remains in the binding. -- No process-global type pointers remain in the binding. -- The extension still builds as `_native.abi3` for the existing `cp311` target. -- The revised object lifecycle passes Python-level smoke coverage without teardown crashes. -- Remaining Python C-API calls in the binding have been consciously reviewed against Limited-API documentation. - -**Recommended review focus:** Use `plan-eng-review` in a subagent to pressure-test the GC/deallocation assumptions before implementation starts. From 1d0ba4f0e451b84f276991c139e18995cb40f091 Mon Sep 17 00:00:00 2001 From: Cheng Cao Date: Tue, 24 Mar 2026 18:02:59 -0700 Subject: [PATCH 4/7] Trim dataloader alias bloat --- dataloader.zig | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/dataloader.zig b/dataloader.zig index de07533..0c5a733 100644 --- a/dataloader.zig +++ b/dataloader.zig @@ -101,12 +101,10 @@ pub const LoaderCtx = struct { req_mem_pool: std.heap.MemoryPool(XevReq), fn findFreeFileSlot(self: *Self) !FileHandle { - const file_slots = self.file_slots[0..]; - // This isn't called too often, just linear scan for (0..max_file_slots) |offset| { const i = (self.last_slot + offset) % max_file_slots; - const f = file_slots[i]; + const f = self.file_slots[0..][i]; if (f == null) { self.last_slot = (i + 1) % max_file_slots; return .{ .idx = @intCast(i), .generation = 0, .path_checksum = 0 }; @@ -177,23 +175,18 @@ pub const LoaderCtx = struct { } fn fileAddRef(self: *Self, slot: usize) void { - const refcounts = self.file_refcount[0..]; - // Should start on 1 - std.debug.assert(refcounts[slot] > 0); - refcounts[slot] += 1; + std.debug.assert(self.file_refcount[0..][slot] > 0); + self.file_refcount[0..][slot] += 1; } fn fileDecRef(self: *Self, slot: usize) void { - const refcounts = self.file_refcount[0..]; - const file_slots = self.file_slots[0..]; - - std.debug.assert(refcounts[slot] > 0); - refcounts[slot] -= 1; - if (refcounts[slot] == 0) { - const f = file_slots[slot] orelse unreachable; + std.debug.assert(self.file_refcount[0..][slot] > 0); + self.file_refcount[0..][slot] -= 1; + if (self.file_refcount[0..][slot] == 0) { + const f = self.file_slots[0..][slot] orelse unreachable; f.close(); - file_slots[slot] = null; + self.file_slots[0..][slot] = null; } } From 030fe78cabf3a2c2fba1c6e780f7de046438f49c Mon Sep 17 00:00:00 2001 From: Cheng Cao Date: Tue, 24 Mar 2026 20:36:24 -0700 Subject: [PATCH 5/7] Harden dataloader handle validation --- dataloader.zig | 20 ++++++++++++++++++++ lua_rt.zig | 10 ++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/dataloader.zig b/dataloader.zig index 0c5a733..b30a5c7 100644 --- a/dataloader.zig +++ b/dataloader.zig @@ -120,6 +120,7 @@ pub const LoaderCtx = struct { const generations = self.file_slots_generation[0..]; const checksums = self.file_slots_checksum[0..]; + if (slot >= file_slots.len) return LoaderError.InvalidFileHandle; if (file_slots[slot] == null) return LoaderError.InvalidFileHandle; if (generations[slot] != file.generation or checksums[slot] != file.path_checksum) { logger.warn("File handle {} is corrupted, current generation: {}, checksum: {}", .{ file, generations[slot], checksums[slot] }); @@ -483,3 +484,22 @@ test "test dataloader blocked join" { try std.testing.expect(ctx.is_running == false); try std.testing.expectEqual(null, ctx.result_ring.dequeue()); } + +test "invalid file handle index is rejected" { + var debug_alloc = std.heap.DebugAllocator(.{}).init; + defer _ = debug_alloc.deinit(); + + const ctx = try debug_alloc.allocator().create(LoaderCtx); + defer debug_alloc.allocator().destroy(ctx); + try ctx.initInPlace(debug_alloc.allocator()); + defer ctx.deinit(); + + try std.testing.expectError( + LoaderError.InvalidFileHandle, + ctx.checkFilehandle(.{ + .idx = std.math.maxInt(u20), + .generation = 0, + .path_checksum = 0, + }), + ); +} diff --git a/lua_rt.zig b/lua_rt.zig index 150c42f..139e006 100644 --- a/lua_rt.zig +++ b/lua_rt.zig @@ -5,6 +5,10 @@ const msgpack = @import("msgpack.zig"); const logger = std.log.scoped(.lua_rt); +const max_exact_lua_handle = std.math.maxInt(u48); +const can_use_lua_unsigned64 = + (zlua.lang == .luau or zlua.lang == .lua52) and std.math.maxInt(zlua.Unsigned) >= max_exact_lua_handle; + const ScanCtx = struct { dir: std.fs.Dir, iter: std.fs.Dir.Iterator, @@ -89,10 +93,11 @@ pub fn toUnsigned(lua: *Lua, idx: i32) !u32 { } pub fn toUnsigned64(lua: *Lua, idx: i32) !u64 { - if (zlua.lang == .luau or zlua.lang == .lua52) { + if (can_use_lua_unsigned64) { return lua.toUnsigned(idx); } else { const f = try lua.toNumber(idx); + std.debug.assert(f >= 0 and f <= max_exact_lua_handle); return @intFromFloat(f); } } @@ -107,9 +112,10 @@ pub fn pushUnsigned(lua: *Lua, v: u32) void { } pub fn pushUnsigned64(lua: *Lua, v: u64) void { - if (zlua.lang == .luau or zlua.lang == .lua52) { + if (can_use_lua_unsigned64) { lua.pushUnsigned(@intCast(v)); } else { + std.debug.assert(v <= max_exact_lua_handle); const f: f64 = @floatFromInt(v); lua.pushNumber(f); } From b3b7aa34c481cdfbf7450ef7c11acdce59f70bcc Mon Sep 17 00:00:00 2001 From: Cheng Cao Date: Wed, 25 Mar 2026 16:38:19 -0700 Subject: [PATCH 6/7] Fix FileHandle f64 precision loss and worker thread data race Move FileHandle padding to high bits so idx/generation/checksum fit within f64 exact integer range (48 bits), preventing silent handle corruption when round-tripping through LuaJIT numbers. Replace non-atomic worker_thread null-signaling with atomic worker_done flag to eliminate data race between worker and join threads. Replace std.debug.assert in open_file response handler with explicit error return that survives ReleaseFast. Co-Authored-By: Claude Opus 4.6 (1M context) --- dataloader.zig | 13 ++++++++----- lua_dataloader.zig | 10 +++++----- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/dataloader.zig b/dataloader.zig index b30a5c7..e71dc1f 100644 --- a/dataloader.zig +++ b/dataloader.zig @@ -8,10 +8,10 @@ const logger = std.log.scoped(.dataloader); const wlog = std.log.scoped(.dataloader_io_thread); pub const FileHandle = packed struct { - _: u16 = 0, idx: u20, generation: u20, path_checksum: u8, + _: u16 = 0, }; const max_file_slots = std.math.maxInt(u20); @@ -88,6 +88,7 @@ pub const LoaderCtx = struct { loop: xev.Loop, worker_thread: ?std.Thread = null, + worker_done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), req_cnt: u64 = 0, is_running: bool = false, @@ -301,7 +302,7 @@ pub const LoaderCtx = struct { } } - self.worker_thread = null; + self.worker_done.store(true, .release); } // Loader side functions @@ -361,14 +362,14 @@ pub const LoaderCtx = struct { self.drainResponse(); std.Thread.yield() catch {}; } - // Now wait until the worker thread is done - while (self.worker_thread != null) { + // Now wait until the worker thread signals done + while (!self.worker_done.load(.acquire)) { self.drainResponse(); std.Thread.yield() catch {}; } - // When worker thread is null, it means the thread has exited self.drainResponse(); thread.join(); + self.worker_thread = null; } pub fn start(self: *Self) !void { @@ -377,6 +378,7 @@ pub const LoaderCtx = struct { } self.is_running = true; + self.worker_done.store(false, .monotonic); self.worker_thread = try std.Thread.spawn(.{ .allocator = self.alloc, @@ -398,6 +400,7 @@ pub const LoaderCtx = struct { self.loop = try xev.Loop.init(.{}); errdefer self.loop.deinit(); self.worker_thread = null; + self.worker_done = std.atomic.Value(bool).init(false); self.req_cnt = 0; self.is_running = false; self.is_draining = false; diff --git a/lua_dataloader.zig b/lua_dataloader.zig index 83b4c15..5f075af 100644 --- a/lua_dataloader.zig +++ b/lua_dataloader.zig @@ -357,16 +357,16 @@ pub const LuaDataLoader = struct { switch (payload) { .open_file => |f| { - std.debug.assert((self.u_yielded_from orelse @panic("Unresolved open_file req")) == .open_file); + if (self.u_yielded_from == null or self.u_yielded_from.? != .open_file) { + return error.UnexpectedOpenFileResponse; + } lua_rt.pushUnsigned64(self.lua, @bitCast(f)); self.u_resume_nargs = 1; self.u_yielded_from = null; }, .read_block => { - const rid = resp.request_id; - const kv = self.load_rid_to_row.fetchSwapRemove(rid) orelse @panic("read_block rid not found in map"); - const row = kv.value; - row.num_fullfilled += 1; + const kv = self.load_rid_to_row.fetchSwapRemove(resp.request_id) orelse @panic("read_block rid not found in map"); + kv.value.num_fullfilled += 1; }, } } From a40b574bc4e52b543a989f618cd6353b85d50d6e Mon Sep 17 00:00:00 2001 From: Cheng Cao Date: Wed, 25 Mar 2026 18:36:46 -0700 Subject: [PATCH 7/7] Fix type object leak on module init failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit errdefer was dead code — moduleExec returns c_int, not an error union, so the deferred cleanup never fired. Replace with explicit moduleClear calls on each failure path after data_loader_type is allocated. Co-Authored-By: Claude Opus 4.6 (1M context) --- python/python.zig | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/python/python.zig b/python/python.zig index 649135e..c1aa109 100644 --- a/python/python.zig +++ b/python/python.zig @@ -694,13 +694,21 @@ fn moduleExec(module_obj: ?*py.PyObject) callconv(.c) c_int { state.data_loader_type = @ptrCast(py.PyType_FromModuleAndSpec(module, &DataLoader_spec, null)); if (state.data_loader_type == null) return -1; - errdefer _ = moduleClear(module_obj); state.loaded_row_type = @ptrCast(py.PyType_FromModuleAndSpec(module, &LoadedRow_spec, null)); - if (state.loaded_row_type == null) return -1; + if (state.loaded_row_type == null) { + _ = moduleClear(module_obj); + return -1; + } - if (py.PyModule_AddObjectRef(module, "DataLoader", @ptrCast(@alignCast(state.data_loader_type))) < 0) return -1; - if (py.PyModule_AddObjectRef(module, "LoadedRow", @ptrCast(@alignCast(state.loaded_row_type))) < 0) return -1; + if (py.PyModule_AddObjectRef(module, "DataLoader", @ptrCast(@alignCast(state.data_loader_type))) < 0) { + _ = moduleClear(module_obj); + return -1; + } + if (py.PyModule_AddObjectRef(module, "LoadedRow", @ptrCast(@alignCast(state.loaded_row_type))) < 0) { + _ = moduleClear(module_obj); + return -1; + } return 0; }