From 49fef1c4723ab3e39769ca038bfde224f06e2b22 Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Tue, 17 Mar 2026 15:41:21 -0600 Subject: [PATCH 1/2] fix: ETS table ownership causing silent stream process crashes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why This Change Was Made - In production, three independent streaming requests hung for exactly 900,001ms (the monitor timeout) with zero crash reports in CloudWatch. The root cause: the `dream_http_client_ref_mapping` ETS table was owned by whichever short-lived process first called `ensure_ref_mapping_table()`. When that process exited, the ETS table was destroyed. Any concurrent stream process that tried to look up ref mappings crashed with `badarg` inside `decode_stream_message_for_selector/1` — which runs inside the selector's `receive` block, so the crash killed the stream process silently. No `on_stream_error` callback fired. The stream just vanished. - A secondary race condition existed: two concurrent calls to `ensure_ref_mapping_table()` could both see `undefined` from `ets:info/1`, and the second `ets:new/2` would crash with `badarg`. ## What Was Changed - `ensure_ref_mapping_table()` now spawns a dedicated `table_holder_loop/0` process and transfers ETS table ownership via `ets:give_away/3`. The holder process runs an infinite receive loop, so the table outlives any individual stream process. - `ets:new` is wrapped in `try/catch error:badarg` to handle the race where another process creates the table between our `ets:info` check and our `ets:new` call. - All 7 bare ETS access functions (`lookup_ref_by_string`, `lookup_string_by_ref`, `store_ref_mapping`, `remove_ref_mapping`, `maybe_store_stream_zlib`, `maybe_decompress_stream_chunk`, `cleanup_stream_zlib`) now have `try/catch error:badarg` guards with safe fallbacks. - `get_or_create_string_id` recovers from table destruction by re-creating the table on `badarg`. - 9 regression tests added: 4 deterministic ETS-level tests (holder ownership, table survives creator exit, mappings persist, concurrent creation) and 5 integration tests through `start_stream()` (streams from expired callers, concurrent fast+slow streams, sequential streams). - Version bumped to 5.1.3, CHANGELOG and release notes updated. ## Note to Future Engineer - The `table_holder_loop/0` process intentionally leaks when tests delete and recreate the table — it just sits in `receive` doing nothing forever. This is harmless in both production (table is created once) and tests (processes are cleaned up when the VM exits). Don't add a `stop` message handler unless you enjoy debugging "who killed my ETS table" for the second time. - The try/catch on every ETS access is belt-and-suspenders: the holder process should make table destruction impossible, but the Erlang VM has a long history of teaching developers that "should" and "will" are different words. --- modules/http_client/CHANGELOG.md | 31 ++ modules/http_client/gleam.toml | 2 +- modules/http_client/releases/release-5.1.3.md | 117 ++++++ .../dream_http_client/dream_httpc_shim.erl | 153 +++++--- .../test/ets_table_ownership_ffi.erl | 55 +++ .../test/ets_table_ownership_test.gleam | 350 ++++++++++++++++++ 6 files changed, 659 insertions(+), 49 deletions(-) create mode 100644 modules/http_client/releases/release-5.1.3.md create mode 100644 modules/http_client/test/ets_table_ownership_ffi.erl create mode 100644 modules/http_client/test/ets_table_ownership_test.gleam diff --git a/modules/http_client/CHANGELOG.md b/modules/http_client/CHANGELOG.md index 78fcc6f..43531a8 100644 --- a/modules/http_client/CHANGELOG.md +++ b/modules/http_client/CHANGELOG.md @@ -5,6 +5,37 @@ All notable changes to `dream_http_client` will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 5.1.3 - 2026-03-17 + +### Fixed + +- **ETS table ownership no longer causes silent stream process crashes.** The + `dream_http_client_ref_mapping` ETS table was owned by whichever short-lived + process first called `ensure_ref_mapping_table()`. When that process exited, + the table was destroyed. Concurrent stream processes that depended on it + crashed with `badarg` inside `decode_stream_message_for_selector/1`, killing + the stream silently — no `on_stream_error` callback fired. In production this + manifested as streams hanging for the full monitor timeout (900s) with zero + crash reports in logs. +- **Race condition on ETS table creation eliminated.** If two processes called + `ensure_ref_mapping_table()` concurrently when the table did not exist, both + saw `undefined` from `ets:info/1` and the second `ets:new/2` call crashed + with `badarg`. The function now wraps creation in `try/catch`. +- **All ETS access in the ref-mapping subsystem is now crash-safe.** Seven + functions (`lookup_ref_by_string`, `lookup_string_by_ref`, `store_ref_mapping`, + `remove_ref_mapping`, `maybe_store_stream_zlib`, `maybe_decompress_stream_chunk`, + `cleanup_stream_zlib`) previously called `ets:lookup`/`ets:insert`/`ets:delete` + without `try/catch`. A destroyed table would crash the calling process. All + now have `try/catch error:badarg` guards with safe fallbacks. + +### Added + +- **9 regression tests** covering ETS table ownership, race conditions, and + concurrent streaming scenarios. Deterministic tests verify the holder process + owns the table (not the caller), the table survives creator exit, and stored + mappings persist. Integration tests verify concurrent streams from short-lived + callers all complete — the exact scenario from the bug report. + ## 5.1.2 - 2026-03-03 ### Fixed diff --git a/modules/http_client/gleam.toml b/modules/http_client/gleam.toml index 0fb3568..a8ef9be 100644 --- a/modules/http_client/gleam.toml +++ b/modules/http_client/gleam.toml @@ -1,5 +1,5 @@ name = "dream_http_client" -version = "5.1.2" +version = "5.1.3" description = "Type-safe HTTP client for Gleam with streaming support" licences = ["MIT"] repository = { type = "github", user = "TrustBound", repo = "dream" } diff --git a/modules/http_client/releases/release-5.1.3.md b/modules/http_client/releases/release-5.1.3.md new file mode 100644 index 0000000..49cd24a --- /dev/null +++ b/modules/http_client/releases/release-5.1.3.md @@ -0,0 +1,117 @@ +# dream_http_client v5.1.3 + +**Release Date:** March 17, 2026 + +This patch release fixes a critical bug where concurrent HTTP streams would +silently die when the process that created the shared ETS table exited. No API +changes — this is a fully transparent bug fix. + +--- + +## Bug Fix: ETS table ownership causes silent stream process crashes + +### The problem + +`dream_httpc_shim` uses a lazily-created named ETS table +(`dream_http_client_ref_mapping`) for mapping httpc refs to string IDs. Three +separate issues combined to produce silent stream failures: + +**1. Transient table ownership** + +ETS named tables are owned by their creating process. `ensure_ref_mapping_table()` +was called from short-lived processes (the caller of `start_stream()`, or the +stream process itself via `request_stream_messages()`). When the creating process +exited, the table was destroyed — taking all ref mappings with it. + +**2. Race condition on creation** + +If two processes called `ensure_ref_mapping_table()` concurrently when the table +didn't exist, both saw `undefined` from `ets:info/1`, and the second `ets:new/2` +call crashed with `badarg` (named table already exists). + +**3. Unprotected ETS access** + +Seven functions accessed the ETS table with bare `ets:lookup`/`ets:insert`/`ets:delete` +calls — no `try/catch`. When the table was destroyed, these threw `badarg` inside +`decode_stream_message_for_selector/1`, which runs in the `receive` block of +`gleam_erlang_ffi:select/2`. The crash killed the stream process, but since it was +spawned unlinked (`process.spawn_unlinked`), no `on_stream_error` callback fired. +The stream silently died. + +### Observed in production + +- Three independent streaming requests hung for exactly 900,001ms (the monitor + timeout), because the stream process crashed silently and no `on_stream_error` + callback fired +- Zero crash reports appeared in CloudWatch logs + +### The fix + +**Persistent holder process for table ownership:** + +`ensure_ref_mapping_table()` now spawns a dedicated `table_holder_loop/0` process +and transfers ETS table ownership to it via `ets:give_away/3`. This process runs +an infinite receive loop, so the table outlives any individual stream process. + +```erlang +ensure_ref_mapping_table() -> + case ets:info(?REF_MAPPING_TABLE) of + undefined -> + try + ets:new(?REF_MAPPING_TABLE, [set, public, named_table]), + Holder = spawn(fun table_holder_loop/0), + ets:give_away(?REF_MAPPING_TABLE, Holder, undefined), + ok + catch + error:badarg -> ok + end; + _ -> + ok + end. +``` + +**Race-safe creation:** + +The `ets:new` call is wrapped in `try/catch error:badarg -> ok` so that if +another process creates the table between our `ets:info` check and our +`ets:new` call, the second attempt is a harmless no-op. + +**Defensive try/catch on all ETS access:** + +All seven bare ETS access functions now have `try/catch error:badarg` guards +with safe fallbacks (`none` for lookups, `ok` for writes/deletes, raw `Data` +for decompression). `get_or_create_string_id` additionally re-creates the +table on `badarg` for crash recovery. + +### Regression tests (9 new tests, 194 total) + +**Deterministic ETS-level tests:** + +| Test | What it proves | +|---|---| +| `table_owner_is_dedicated_holder_not_caller` | Table owner is a separate holder process, not the calling process | +| `table_survives_creator_process_exit` | Table persists after the creating process exits | +| `stored_mappings_survive_creator_exit` | Data in the table is preserved after creator exits | +| `concurrent_table_creation_does_not_crash` | 20 concurrent calls to `ensure_ref_mapping_table()` all succeed | + +**Integration tests through `start_stream()` API:** + +| Test | What it proves | +|---|---| +| `stream_from_expired_caller_completes` | Stream started from a short-lived caller completes after caller exits | +| `concurrent_streams_from_expired_callers_both_complete` | Fast stream (1s) + slow stream (10s) from short-lived callers both complete — the exact bug scenario | +| `three_concurrent_streams_all_complete` | Three concurrent streams don't interfere with each other | +| `sequential_streams_after_process_exit` | New stream works after a previous stream's process has exited | +| `five_concurrent_streams_from_expired_callers` | Stress test: 5 concurrent streams from 5 short-lived callers all complete | + +--- + +## Files changed + +- `modules/http_client/src/dream_http_client/dream_httpc_shim.erl` — Added + `table_holder_loop/0`, rewrote `ensure_ref_mapping_table/0` with holder + process and try/catch, wrapped all 7 bare ETS access functions in try/catch +- `modules/http_client/test/ets_table_ownership_test.gleam` — 9 new regression + tests +- `modules/http_client/test/ets_table_ownership_ffi.erl` — Erlang FFI helper + for ETS table introspection in tests diff --git a/modules/http_client/src/dream_http_client/dream_httpc_shim.erl b/modules/http_client/src/dream_http_client/dream_httpc_shim.erl index d67eacb..9c8049f 100644 --- a/modules/http_client/src/dream_http_client/dream_httpc_shim.erl +++ b/modules/http_client/src/dream_http_client/dream_httpc_shim.erl @@ -732,17 +732,25 @@ decode_stream_message_for_selector({http, InnerMessage}) -> error(badarg) end. -%% Get string ID for httpc ref, creating mapping if needed -%% This handles the case where selector receives messages before we stored the mapping +%% Get string ID for httpc ref, creating mapping if needed. +%% This handles the case where selector receives messages before we stored +%% the mapping, and recovers if the table was destroyed (re-creates it). get_or_create_string_id(HttpcRef) -> - case lookup_string_by_ref(HttpcRef) of - {some, StringId} -> - StringId; - none -> - %% First time seeing this ref - create mapping - StringId = ref_to_string(HttpcRef), - store_ref_mapping(StringId, HttpcRef), - StringId + try + case lookup_string_by_ref(HttpcRef) of + {some, StringId} -> + StringId; + none -> + NewId = ref_to_string(HttpcRef), + store_ref_mapping(NewId, HttpcRef), + NewId + end + catch + error:badarg -> + ensure_ref_mapping_table(), + RecoveredId = ref_to_string(HttpcRef), + store_ref_mapping(RecoveredId, HttpcRef), + RecoveredId end. %% @doc Normalize HTTP headers to binary tuples for Gleam decoding @@ -1091,16 +1099,36 @@ ets_delete(TableName, Key) -> %% Table for mapping string IDs to httpc refs (for cancellation) -define(REF_MAPPING_TABLE, dream_http_client_ref_mapping). -%% Ensure ref mapping table exists (created on first use) +%% Ensure ref mapping table exists (created on first use). +%% The table is owned by a dedicated long-lived holder process so it +%% survives the exit of short-lived stream processes. A try/catch +%% guards against the race where two processes both see `undefined` +%% and the second `ets:new` fails with `badarg`. ensure_ref_mapping_table() -> case ets:info(?REF_MAPPING_TABLE) of undefined -> - ets:new(?REF_MAPPING_TABLE, [set, public, named_table]), - ok; + try + ets:new(?REF_MAPPING_TABLE, [set, public, named_table]), + Holder = spawn(fun table_holder_loop/0), + ets:give_away(?REF_MAPPING_TABLE, Holder, undefined), + ok + catch + error:badarg -> ok + end; _ -> ok end. +%% Persistent process that owns the ETS ref-mapping table. +%% ETS tables are destroyed when their owning process exits; by +%% transferring ownership here the table outlives any individual +%% stream process. +table_holder_loop() -> + receive + {'ETS-TRANSFER', _Tab, _FromPid, _Data} -> + table_holder_loop() + end. + %% Convert httpc ref to unique string ID %% Uses the ref's string representation which is guaranteed unique ref_to_string(Ref) -> @@ -1109,27 +1137,38 @@ ref_to_string(Ref) -> %% Store bidirectional mapping: string <-> ref store_ref_mapping(StringId, HttpcRef) -> ensure_ref_mapping_table(), - ets:insert(?REF_MAPPING_TABLE, {StringId, HttpcRef}), - %% Also store reverse mapping for message translation - ets:insert(?REF_MAPPING_TABLE, {HttpcRef, StringId}), - ok. + try + ets:insert(?REF_MAPPING_TABLE, {StringId, HttpcRef}), + ets:insert(?REF_MAPPING_TABLE, {HttpcRef, StringId}), + ok + catch + error:badarg -> ok + end. %% Lookup httpc ref by string ID (for cancellation) lookup_ref_by_string(StringId) -> - case ets:lookup(?REF_MAPPING_TABLE, StringId) of - [{StringId, HttpcRef}] -> - {some, HttpcRef}; - [] -> - none + try + case ets:lookup(?REF_MAPPING_TABLE, StringId) of + [{StringId, HttpcRef}] -> + {some, HttpcRef}; + [] -> + none + end + catch + error:badarg -> none end. %% Lookup string ID by httpc ref (for message translation) lookup_string_by_ref(HttpcRef) -> - case ets:lookup(?REF_MAPPING_TABLE, HttpcRef) of - [{HttpcRef, StringId}] -> - {some, StringId}; - [] -> - none + try + case ets:lookup(?REF_MAPPING_TABLE, HttpcRef) of + [{HttpcRef, StringId}] -> + {some, StringId}; + [] -> + none + end + catch + error:badarg -> none end. %% Store a zlib context in ETS for message-based streaming decompression @@ -1138,39 +1177,57 @@ maybe_store_stream_zlib(StringId, Headers) -> {_Enc, WindowBits} -> Z = init_zlib_context(WindowBits), ensure_ref_mapping_table(), - ets:insert(?REF_MAPPING_TABLE, {{zlib, StringId}, Z}), - ok; + try + ets:insert(?REF_MAPPING_TABLE, {{zlib, StringId}, Z}), + ok + catch + error:badarg -> + cleanup_zlib(Z), + ok + end; none -> ok end. %% Decompress a chunk using ETS-stored zlib context maybe_decompress_stream_chunk(StringId, Data) -> - case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of - [{{zlib, StringId}, Z}] -> - decompress_chunk(Z, Data); - [] -> - Data + try + case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of + [{{zlib, StringId}, Z}] -> + decompress_chunk(Z, Data); + [] -> + Data + end + catch + error:badarg -> Data end. %% Clean up ETS-stored zlib context cleanup_stream_zlib(StringId) -> - case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of - [{{zlib, StringId}, Z}] -> - cleanup_zlib(Z), - ets:delete(?REF_MAPPING_TABLE, {zlib, StringId}), - ok; - [] -> - ok + try + case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of + [{{zlib, StringId}, Z}] -> + cleanup_zlib(Z), + ets:delete(?REF_MAPPING_TABLE, {zlib, StringId}), + ok; + [] -> + ok + end + catch + error:badarg -> ok end. %% Remove both mappings (cleanup after stream ends) remove_ref_mapping(StringId) -> - case lookup_ref_by_string(StringId) of - {some, HttpcRef} -> - ets:delete(?REF_MAPPING_TABLE, StringId), - ets:delete(?REF_MAPPING_TABLE, HttpcRef), - ok; - none -> - ok + try + case lookup_ref_by_string(StringId) of + {some, HttpcRef} -> + ets:delete(?REF_MAPPING_TABLE, StringId), + ets:delete(?REF_MAPPING_TABLE, HttpcRef), + ok; + none -> + ok + end + catch + error:badarg -> ok end. diff --git a/modules/http_client/test/ets_table_ownership_ffi.erl b/modules/http_client/test/ets_table_ownership_ffi.erl new file mode 100644 index 0000000..3bfd2eb --- /dev/null +++ b/modules/http_client/test/ets_table_ownership_ffi.erl @@ -0,0 +1,55 @@ +-module(ets_table_ownership_ffi). +-export([ensure_table/0, table_exists/0, delete_table/0, + table_owner_is_not_self/0, table_owner_is_alive/0, + store_test_mapping/0, lookup_test_mapping_exists/0]). + +-define(TABLE, dream_http_client_ref_mapping). + +ensure_table() -> + dream_httpc_shim:ensure_ref_mapping_table(), + nil. + +table_exists() -> + case ets:info(?TABLE) of + undefined -> false; + _ -> true + end. + +delete_table() -> + try ets:delete(?TABLE) catch error:badarg -> ok end, + nil. + +table_owner_is_not_self() -> + try + case ets:info(?TABLE, owner) of + undefined -> false; + Pid -> Pid =/= self() + end + catch + error:badarg -> false + end. + +table_owner_is_alive() -> + try + case ets:info(?TABLE, owner) of + undefined -> false; + Pid -> is_process_alive(Pid) + end + catch + error:badarg -> false + end. + +store_test_mapping() -> + ets:insert(?TABLE, {ets_test_ref_key, <<"ets_test_string_id">>}), + ets:insert(?TABLE, {<<"ets_test_string_id">>, ets_test_ref_key}), + nil. + +lookup_test_mapping_exists() -> + try + case ets:lookup(?TABLE, ets_test_ref_key) of + [{ets_test_ref_key, <<"ets_test_string_id">>}] -> true; + _ -> false + end + catch + error:badarg -> false + end. diff --git a/modules/http_client/test/ets_table_ownership_test.gleam b/modules/http_client/test/ets_table_ownership_test.gleam new file mode 100644 index 0000000..320e3f6 --- /dev/null +++ b/modules/http_client/test/ets_table_ownership_test.gleam @@ -0,0 +1,350 @@ +//// Regression tests: ETS table ownership must not cause silent stream crashes +//// +//// Before the fix, `dream_httpc_shim` created its named ETS table +//// (`dream_http_client_ref_mapping`) from whichever short-lived process first +//// called `ensure_ref_mapping_table()`. When that process exited, the table was +//// destroyed. Concurrent stream processes that depended on it would crash with +//// `badarg` inside `decode_stream_message_for_selector/1`, killing the stream +//// silently — no `on_stream_error` callback fired. +//// +//// These tests verify: +//// 1. The table is owned by a persistent holder process, not the caller +//// 2. The table and its data survive after the creating process exits +//// 3. Concurrent table creation (race condition) does not crash +//// 4. Concurrent streams from short-lived callers all complete +//// +//// Tests that manipulate the ETS table directly (delete/recreate) use FFI +//// helpers in `ets_table_ownership_ffi.erl` for table introspection that +//// is not exposed through the Gleam API. + +import dream_http_client/client +import dream_http_client_test +import gleam/erlang/process +import gleam/http +import gleam/list +import gleeunit/should + +fn mock_request(path: String) -> client.ClientRequest { + client.new() + |> client.method(http.Get) + |> client.scheme(http.Http) + |> client.host("localhost") + |> client.port(dream_http_client_test.get_test_port()) + |> client.path(path) +} + +// ============================================================================ +// ETS Table Ownership Tests +// ============================================================================ + +/// After ensure_ref_mapping_table(), the table owner must be a separate +/// holder process — not the calling process. This is the core fix mechanism: +/// a dedicated holder process owns the table so it outlives any individual +/// stream process. +pub fn table_owner_is_dedicated_holder_not_caller_test() { + // Arrange + delete_table() + + // Act + ensure_table() + + // Assert — owner is alive but is NOT us + table_owner_is_not_self() |> should.be_true() + table_owner_is_alive() |> should.be_true() +} + +/// The ETS table must persist after the process that created it exits. +/// Before the fix, the table was destroyed on creator exit because ETS +/// tables are owned by their creating process. +pub fn table_survives_creator_process_exit_test() { + // Arrange — clean state so the spawned process is the creator + delete_table() + + let done = process.new_subject() + + // Act — create the table from a short-lived process that exits immediately + let _pid = + process.spawn_unlinked(fn() { + ensure_table() + process.send(done, True) + }) + + let assert Ok(True) = process.receive(done, 2000) + // Give the spawned process time to exit + process.sleep(200) + + // Assert — table and its owner (holder process) are still alive + table_exists() |> should.be_true() + table_owner_is_alive() |> should.be_true() +} + +/// Data stored in the ETS table must survive after the creating process exits. +/// This verifies that the holder process ownership transfer preserves all +/// existing table entries. +pub fn stored_mappings_survive_creator_exit_test() { + // Arrange + delete_table() + + let done = process.new_subject() + + // Act — create table and store a mapping from a short-lived process + let _pid = + process.spawn_unlinked(fn() { + ensure_table() + store_test_mapping() + process.send(done, True) + }) + + let assert Ok(True) = process.receive(done, 2000) + process.sleep(200) + + // Assert — mapping is still retrievable after creator exited + lookup_test_mapping_exists() |> should.be_true() +} + +// ============================================================================ +// Race Condition Tests +// ============================================================================ + +/// Many concurrent calls to ensure_ref_mapping_table() must not crash. +/// Before the fix, a race existed where two processes both saw `undefined` +/// from `ets:info/1` and the second `ets:new/2` call would throw `badarg` +/// because the named table already existed. +pub fn concurrent_table_creation_does_not_crash_test() { + // Arrange + delete_table() + + let done = process.new_subject() + let count = 20 + + // Act — spawn 20 processes that all race to create the table + list.each(list.range(1, count), fn(i) { + let _pid = + process.spawn_unlinked(fn() { + ensure_table() + process.send(done, i) + }) + Nil + }) + + // Assert — all 20 processes succeed without crashing + let results = collect_n(done, count, 5000) + list.length(results) |> should.equal(count) + table_exists() |> should.be_true() +} + +// ============================================================================ +// Stream Integration Regression Tests +// ============================================================================ + +/// A stream started from a short-lived caller must complete successfully +/// even after the caller process exits. The caller calls `start_stream()` +/// which calls `ensure_ets_tables()` — in the old code, the caller owned +/// the ETS table and its exit destroyed it. The stream process would then +/// crash with `badarg` when it tried to look up ref mappings. +/// +/// Note: a single stream may self-heal by re-creating the table from within +/// `request_stream_messages()`. This test provides baseline assurance; the +/// concurrent test below is the definitive bug reproduction. +pub fn stream_from_expired_caller_completes_test() { + // Arrange — clean state so the caller process creates the table + delete_table() + + let end_subject = process.new_subject() + + // Act — start stream from a process that immediately exits + let _pid = + process.spawn_unlinked(fn() { + let request = + mock_request("/stream/fast") + |> client.on_stream_end(fn(_headers) { + process.send(end_subject, "completed") + }) + |> client.on_stream_error(fn(reason) { + process.send(end_subject, "error:" <> reason) + }) + let assert Ok(_handle) = client.start_stream(request) + // Caller exits here — would destroy table in old code + }) + + // Assert — stream completes (not silently dead, not error) + case process.receive(end_subject, 5000) { + Ok("completed") -> Nil + Ok(_msg) -> should.fail() + Error(Nil) -> should.fail() + } +} + +/// Two concurrent streams from short-lived callers must BOTH complete. +/// +/// This is the exact scenario from the bug report: when two streams are +/// active, the first stream process to exit destroys the shared ETS table, +/// crashing the second stream silently — no `on_stream_error` fires. +/// +/// The fast stream completes in ~1s; the slow stream takes ~10s. After the +/// fast stream's process exits (and in the old code, destroys the ETS +/// table), the slow stream must still fire `on_stream_end`. +pub fn concurrent_streams_from_expired_callers_both_complete_test() { + // Arrange — force table recreation through short-lived callers + delete_table() + + let end_subject = process.new_subject() + + // Act — start fast stream from short-lived caller + let _pid1 = + process.spawn_unlinked(fn() { + let request = + mock_request("/stream/fast") + |> client.on_stream_end(fn(_headers) { process.send(end_subject, 1) }) + |> client.on_stream_error(fn(_reason) { process.send(end_subject, -1) }) + let assert Ok(_handle) = client.start_stream(request) + }) + + // Small delay so the fast stream's caller creates the table first, + // ensuring the slow stream's process is NOT the table creator. + process.sleep(50) + + // Act — start slow stream from short-lived caller + let _pid2 = + process.spawn_unlinked(fn() { + let request = + mock_request("/stream/slow") + |> client.on_stream_end(fn(_headers) { process.send(end_subject, 2) }) + |> client.on_stream_error(fn(_reason) { process.send(end_subject, -2) }) + let assert Ok(_handle) = client.start_stream(request) + }) + + // Assert — both streams complete (positive IDs mean on_stream_end fired) + let results = collect_n(end_subject, 2, 15_000) + list.length(results) |> should.equal(2) + list.each(results, fn(id) { { id > 0 } |> should.be_true() }) +} + +/// Three concurrent streams from the same (long-lived) caller all complete. +/// Verifies that concurrent streams don't interfere with each other even +/// when they share the same ETS table and complete at similar times. +pub fn three_concurrent_streams_all_complete_test() { + // Arrange + let end_subject = process.new_subject() + + // Act — start 3 concurrent streams + list.each([1, 2, 3], fn(i) { + let request = + mock_request("/stream/fast") + |> client.on_stream_end(fn(_headers) { process.send(end_subject, i) }) + |> client.on_stream_error(fn(_reason) { process.send(end_subject, -i) }) + let assert Ok(_handle) = client.start_stream(request) + Nil + }) + + // Assert — all 3 complete with on_stream_end (positive), not on_stream_error + let results = collect_n(end_subject, 3, 5000) + list.length(results) |> should.equal(3) + list.each(results, fn(id) { { id > 0 } |> should.be_true() }) +} + +/// After a stream completes and its process exits, starting a new stream +/// must still work. Verifies the table is not destroyed between sequential +/// stream invocations. +pub fn sequential_streams_after_process_exit_test() { + // Arrange — stream 1: start, complete, and let the process exit + let assert Ok(handle1) = client.start_stream(mock_request("/stream/fast")) + client.await_stream(handle1) + client.is_stream_active(handle1) |> should.be_false() + + // Act — stream 2 must work + let end_subject = process.new_subject() + let request2 = + mock_request("/stream/fast") + |> client.on_stream_end(fn(_headers) { process.send(end_subject, True) }) + |> client.on_stream_error(fn(_reason) { process.send(end_subject, False) }) + let assert Ok(_handle2) = client.start_stream(request2) + + // Assert — stream 2 completes + case process.receive(end_subject, 5000) { + Ok(True) -> Nil + Ok(False) -> should.fail() + Error(Nil) -> should.fail() + } +} + +/// Five concurrent streams from separate short-lived caller processes must +/// all complete. Stress test for the holder-process ownership model under +/// higher concurrency with multiple table-creation races. +pub fn five_concurrent_streams_from_expired_callers_test() { + // Arrange + delete_table() + + let end_subject = process.new_subject() + let count = 5 + + // Act — spawn 5 callers that each start a stream and immediately exit + list.each(list.range(1, count), fn(i) { + let _pid = + process.spawn_unlinked(fn() { + let request = + mock_request("/stream/fast") + |> client.on_stream_end(fn(_headers) { process.send(end_subject, i) }) + |> client.on_stream_error(fn(_reason) { + process.send(end_subject, -i) + }) + let assert Ok(_handle) = client.start_stream(request) + }) + Nil + }) + + // Assert — all 5 complete + let results = collect_n(end_subject, count, 8000) + list.length(results) |> should.equal(count) + list.each(results, fn(id) { { id > 0 } |> should.be_true() }) +} + +// ============================================================================ +// Helpers +// ============================================================================ + +fn collect_n(subject: process.Subject(a), n: Int, timeout_ms: Int) -> List(a) { + do_collect_n(subject, n, timeout_ms, []) +} + +fn do_collect_n( + subject: process.Subject(a), + remaining: Int, + timeout_ms: Int, + acc: List(a), +) -> List(a) { + case remaining <= 0 { + True -> list.reverse(acc) + False -> + case process.receive(subject, timeout_ms) { + Ok(item) -> + do_collect_n(subject, remaining - 1, timeout_ms, [item, ..acc]) + Error(Nil) -> list.reverse(acc) + } + } +} + +// ============================================================================ +// FFI Bindings — ETS table introspection +// ============================================================================ + +@external(erlang, "ets_table_ownership_ffi", "ensure_table") +fn ensure_table() -> Nil + +@external(erlang, "ets_table_ownership_ffi", "table_exists") +fn table_exists() -> Bool + +@external(erlang, "ets_table_ownership_ffi", "delete_table") +fn delete_table() -> Nil + +@external(erlang, "ets_table_ownership_ffi", "table_owner_is_not_self") +fn table_owner_is_not_self() -> Bool + +@external(erlang, "ets_table_ownership_ffi", "table_owner_is_alive") +fn table_owner_is_alive() -> Bool + +@external(erlang, "ets_table_ownership_ffi", "store_test_mapping") +fn store_test_mapping() -> Nil + +@external(erlang, "ets_table_ownership_ffi", "lookup_test_mapping_exists") +fn lookup_test_mapping_exists() -> Bool From 3abb1ae2058eb67686a385f3be5c0c88992b6b55 Mon Sep 17 00:00:00 2001 From: Dara Rockwell Date: Tue, 17 Mar 2026 16:04:46 -0600 Subject: [PATCH 2/2] fix: replace band-aid ETS fix with proper OTP application architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why This Change Was Made - The previous fix (holder process + try/catch) was architecturally wrong. The unsupervised bare `spawn` for table ownership could die silently, and the try/catch on every ETS access masked real errors — returning garbled compressed data or silently losing mappings. Two layers of band-aids compensating for each other is not how the BEAM works. - The project's own BEAM rules explicitly state: "Never spawn unsupervised processes for anything that matters" and "Never try/catch your way out of a process crash." ## What Was Changed - `dream_http_client` is now a proper OTP application. Added `dream_http_client_app.erl` (application behaviour) and `dream_http_client_sup.erl` (supervisor). Both ETS tables are created in `start/2`, owned by the application master process for the entire application lifetime. - `gleam.toml` has `[erlang] application_start_module` set. - Removed `ensure_ref_mapping_table/0`, `table_holder_loop/0`, and all `try/catch error:badarg` guards from the 7 ETS access functions in `dream_httpc_shim.erl`. ETS operations are now direct calls. - Removed lazy table creation from `client.gleam`: `ensure_ets_tables`, `ensure_recorder_table`, `ensure_ref_mapping_table_wrapper`, `ets_table_exists`, and `ets_new` are all gone. - Rewrote regression tests for the new architecture. Removed tests that delete/recreate the table (architecture makes that unnecessary). Kept all integration tests for concurrent streams. - Updated CHANGELOG and release notes to reflect the architectural change. ## Note to Future Engineer - This follows the Ranch pattern (ranch_app.erl) — create ETS tables in the OTP application's start/2 callback. If you're wondering "why not just spawn a holder process?" — we tried that. It was the previous commit. Read it and weep. - The tables are `public` and `named_table`, accessible from any process. The application master owns them. If they don't exist, the application isn't started. That's a crash-loudly situation, not a try-catch-and-hope situation. --- modules/http_client/CHANGELOG.md | 58 +++--- modules/http_client/gleam.toml | 3 + modules/http_client/releases/release-5.1.3.md | 127 +++++++------- .../src/dream_http_client/client.gleam | 35 ---- .../dream_http_client_app.erl | 11 ++ .../dream_http_client_sup.erl | 9 + .../dream_http_client/dream_httpc_shim.erl | 159 +++++------------ .../test/ets_table_ownership_ffi.erl | 49 ++---- .../test/ets_table_ownership_test.gleam | 165 ++++-------------- 9 files changed, 218 insertions(+), 398 deletions(-) create mode 100644 modules/http_client/src/dream_http_client/dream_http_client_app.erl create mode 100644 modules/http_client/src/dream_http_client/dream_http_client_sup.erl diff --git a/modules/http_client/CHANGELOG.md b/modules/http_client/CHANGELOG.md index 43531a8..d3a02ac 100644 --- a/modules/http_client/CHANGELOG.md +++ b/modules/http_client/CHANGELOG.md @@ -9,32 +9,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- **ETS table ownership no longer causes silent stream process crashes.** The - `dream_http_client_ref_mapping` ETS table was owned by whichever short-lived - process first called `ensure_ref_mapping_table()`. When that process exited, - the table was destroyed. Concurrent stream processes that depended on it - crashed with `badarg` inside `decode_stream_message_for_selector/1`, killing - the stream silently — no `on_stream_error` callback fired. In production this - manifested as streams hanging for the full monitor timeout (900s) with zero - crash reports in logs. -- **Race condition on ETS table creation eliminated.** If two processes called - `ensure_ref_mapping_table()` concurrently when the table did not exist, both - saw `undefined` from `ets:info/1` and the second `ets:new/2` call crashed - with `badarg`. The function now wraps creation in `try/catch`. -- **All ETS access in the ref-mapping subsystem is now crash-safe.** Seven - functions (`lookup_ref_by_string`, `lookup_string_by_ref`, `store_ref_mapping`, - `remove_ref_mapping`, `maybe_store_stream_zlib`, `maybe_decompress_stream_chunk`, - `cleanup_stream_zlib`) previously called `ets:lookup`/`ets:insert`/`ets:delete` - without `try/catch`. A destroyed table would crash the calling process. All - now have `try/catch error:badarg` guards with safe fallbacks. +- **ETS table ownership no longer causes silent stream process crashes.** + `dream_http_client` is now a proper OTP application. Both ETS tables + (`dream_http_client_ref_mapping` and `dream_http_client_stream_recorders`) + are created in the application's `start/2` callback and owned by the + application master process, which lives for the entire application lifetime. + Previously, the tables were owned by whichever short-lived process first + created them. When that process exited, the tables were destroyed. Concurrent + stream processes crashed with `badarg` inside + `decode_stream_message_for_selector/1`, killing the stream silently — no + `on_stream_error` callback fired. +- **Race condition on ETS table creation eliminated.** Application start is + serialized by the BEAM's application controller. Two processes can no longer + race to create the tables. +- **Removed all `try/catch error:badarg` guards from ETS access.** The previous + fix wrapped every ETS operation in try/catch, which silently swallowed errors + (e.g., returning compressed bytes as raw data). With the table now guaranteed + to exist for the application's lifetime, these guards are unnecessary and + were masking real errors. ETS operations are now direct calls — if the table + is gone, the process crashes loudly, which is correct behavior. ### Added -- **9 regression tests** covering ETS table ownership, race conditions, and - concurrent streaming scenarios. Deterministic tests verify the holder process - owns the table (not the caller), the table survives creator exit, and stored - mappings persist. Integration tests verify concurrent streams from short-lived - callers all complete — the exact scenario from the bug report. +- **OTP application infrastructure.** `dream_http_client_app` (application + behaviour) and `dream_http_client_sup` (supervisor) provide the standard OTP + lifecycle for the module's ETS tables. This follows the same pattern used by + Ranch (Cowboy's transport layer). +- **7 regression tests** covering ETS table ownership verification and + concurrent streaming scenarios. Integration tests verify concurrent streams + from short-lived callers all complete — the exact scenario from the bug + report. + +### Removed + +- **Unsupervised holder process.** The bare `spawn` + `ets:give_away` pattern + was replaced by proper OTP application ownership. +- **Lazy ETS table creation.** `ensure_ref_mapping_table/0`, + `ensure_recorder_table/0`, and `ensure_ets_tables/0` are removed. Tables + exist from application start. ## 5.1.2 - 2026-03-03 diff --git a/modules/http_client/gleam.toml b/modules/http_client/gleam.toml index a8ef9be..a3f6580 100644 --- a/modules/http_client/gleam.toml +++ b/modules/http_client/gleam.toml @@ -18,6 +18,9 @@ simplifile = ">= 2.3.0 and < 3.0.0" gleam_otp = ">= 1.2.0 and < 2.0.0" gleam_crypto = ">= 1.3.0 and < 2.0.0" +[erlang] +application_start_module = "dream_http_client_app" + [dev-dependencies] dream = { path = "../.." } dream_mock_server = { path = "../mock_server" } diff --git a/modules/http_client/releases/release-5.1.3.md b/modules/http_client/releases/release-5.1.3.md index 49cd24a..8d052bc 100644 --- a/modules/http_client/releases/release-5.1.3.md +++ b/modules/http_client/releases/release-5.1.3.md @@ -3,8 +3,10 @@ **Release Date:** March 17, 2026 This patch release fixes a critical bug where concurrent HTTP streams would -silently die when the process that created the shared ETS table exited. No API -changes — this is a fully transparent bug fix. +silently die when the process that created the shared ETS table exited. +`dream_http_client` is now a proper OTP application — its ETS tables are +owned by the application master process, not by short-lived stream processes. +No API changes. --- @@ -12,31 +14,32 @@ changes — this is a fully transparent bug fix. ### The problem -`dream_httpc_shim` uses a lazily-created named ETS table -(`dream_http_client_ref_mapping`) for mapping httpc refs to string IDs. Three -separate issues combined to produce silent stream failures: +`dream_httpc_shim` uses two named ETS tables (`dream_http_client_ref_mapping` +and `dream_http_client_stream_recorders`) for runtime state. Three separate +issues combined to produce silent stream failures: **1. Transient table ownership** -ETS named tables are owned by their creating process. `ensure_ref_mapping_table()` -was called from short-lived processes (the caller of `start_stream()`, or the -stream process itself via `request_stream_messages()`). When the creating process +ETS named tables are owned by their creating process. The tables were created +lazily by `ensure_ref_mapping_table()` and `ensure_recorder_table()`, called +from short-lived processes (the caller of `start_stream()`, or the stream +process itself via `request_stream_messages()`). When the creating process exited, the table was destroyed — taking all ref mappings with it. **2. Race condition on creation** -If two processes called `ensure_ref_mapping_table()` concurrently when the table -didn't exist, both saw `undefined` from `ets:info/1`, and the second `ets:new/2` -call crashed with `badarg` (named table already exists). +If two processes called `ensure_ref_mapping_table()` concurrently when the +table didn't exist, both saw `undefined` from `ets:info/1`, and the second +`ets:new/2` call crashed with `badarg` (named table already exists). **3. Unprotected ETS access** -Seven functions accessed the ETS table with bare `ets:lookup`/`ets:insert`/`ets:delete` -calls — no `try/catch`. When the table was destroyed, these threw `badarg` inside -`decode_stream_message_for_selector/1`, which runs in the `receive` block of -`gleam_erlang_ffi:select/2`. The crash killed the stream process, but since it was -spawned unlinked (`process.spawn_unlinked`), no `on_stream_error` callback fired. -The stream silently died. +Seven functions accessed the ETS table with bare `ets:lookup`/`ets:insert`/ +`ets:delete` calls — no error handling. When the table was destroyed, these +threw `badarg` inside `decode_stream_message_for_selector/1`, which runs in +the `receive` block of `gleam_erlang_ffi:select/2`. The crash killed the +stream process, but since it was spawned unlinked (`process.spawn_unlinked`), +no `on_stream_error` callback fired. The stream silently died. ### Observed in production @@ -45,54 +48,50 @@ The stream silently died. callback fired - Zero crash reports appeared in CloudWatch logs -### The fix +### The fix: OTP application (the Ranch pattern) -**Persistent holder process for table ownership:** - -`ensure_ref_mapping_table()` now spawns a dedicated `table_holder_loop/0` process -and transfers ETS table ownership to it via `ets:give_away/3`. This process runs -an infinite receive loop, so the table outlives any individual stream process. +`dream_http_client` is now a proper OTP application. Both ETS tables are +created in the application's `start/2` callback, owned by the application +master process — a long-lived process managed by the BEAM's application +controller that exists for the entire application lifetime. ```erlang -ensure_ref_mapping_table() -> - case ets:info(?REF_MAPPING_TABLE) of - undefined -> - try - ets:new(?REF_MAPPING_TABLE, [set, public, named_table]), - Holder = spawn(fun table_holder_loop/0), - ets:give_away(?REF_MAPPING_TABLE, Holder, undefined), - ok - catch - error:badarg -> ok - end; - _ -> - ok - end. -``` +-module(dream_http_client_app). +-behaviour(application). -**Race-safe creation:** +start(_Type, _Args) -> + ets:new(dream_http_client_ref_mapping, [set, public, named_table]), + ets:new(dream_http_client_stream_recorders, [set, public, named_table]), + dream_http_client_sup:start_link(). +``` -The `ets:new` call is wrapped in `try/catch error:badarg -> ok` so that if -another process creates the table between our `ets:info` check and our -`ets:new` call, the second attempt is a harmless no-op. +This is the same pattern used by Ranch (the transport layer behind Cowboy) +for its `ranch_server` ETS table. -**Defensive try/catch on all ETS access:** +**What this eliminates:** -All seven bare ETS access functions now have `try/catch error:badarg` guards -with safe fallbacks (`none` for lookups, `ok` for writes/deletes, raw `Data` -for decompression). `get_or_create_string_id` additionally re-creates the -table on `badarg` for crash recovery. +- **Transient ownership.** The application master never exits during normal + operation. The tables live as long as the application. +- **Race conditions.** Application start is serialized by the BEAM's + application controller. Two processes cannot race to create the tables. +- **Silent failures.** All `try/catch error:badarg` guards have been removed + from ETS access functions. If the table somehow doesn't exist (which means + the application isn't started), the process crashes loudly — visible, + debuggable, and correct. +- **Unsupervised processes.** The bare `spawn` + `ets:give_away` holder + process is gone, replaced by OTP's built-in application lifecycle. +- **Lazy table creation.** `ensure_ref_mapping_table/0`, + `ensure_recorder_table/0`, and `ensure_ets_tables/0` are removed. Tables + exist from application start, not from first use. -### Regression tests (9 new tests, 194 total) +### Regression tests (7 new tests, 192 total) -**Deterministic ETS-level tests:** +**ETS ownership verification:** | Test | What it proves | |---|---| -| `table_owner_is_dedicated_holder_not_caller` | Table owner is a separate holder process, not the calling process | -| `table_survives_creator_process_exit` | Table persists after the creating process exits | -| `stored_mappings_survive_creator_exit` | Data in the table is preserved after creator exits | -| `concurrent_table_creation_does_not_crash` | 20 concurrent calls to `ensure_ref_mapping_table()` all succeed | +| `table_is_owned_by_application_not_caller` | Table exists, owner is not the test process, owner is alive | +| `stored_mappings_survive_writer_process_exit` | Data persists after the writing process exits | **Integration tests through `start_stream()` API:** @@ -108,10 +107,18 @@ table on `badarg` for crash recovery. ## Files changed -- `modules/http_client/src/dream_http_client/dream_httpc_shim.erl` — Added - `table_holder_loop/0`, rewrote `ensure_ref_mapping_table/0` with holder - process and try/catch, wrapped all 7 bare ETS access functions in try/catch -- `modules/http_client/test/ets_table_ownership_test.gleam` — 9 new regression - tests -- `modules/http_client/test/ets_table_ownership_ffi.erl` — Erlang FFI helper - for ETS table introspection in tests +- `modules/http_client/src/dream_http_client/dream_http_client_app.erl` — New + OTP application behaviour module; creates ETS tables in `start/2` +- `modules/http_client/src/dream_http_client/dream_http_client_sup.erl` — New + minimal supervisor +- `modules/http_client/gleam.toml` — Added `[erlang] application_start_module` +- `modules/http_client/src/dream_http_client/dream_httpc_shim.erl` — Removed + `ensure_ref_mapping_table/0`, `table_holder_loop/0`, and all `try/catch` + guards on ETS access; restored direct ETS calls +- `modules/http_client/src/dream_http_client/client.gleam` — Removed lazy + table creation (`ensure_ets_tables`, `ensure_recorder_table`, + `ensure_ref_mapping_table_wrapper`, `ets_table_exists`, `ets_new`) +- `modules/http_client/test/ets_table_ownership_test.gleam` — 7 regression + tests for the new architecture +- `modules/http_client/test/ets_table_ownership_ffi.erl` — Simplified FFI + helpers for ETS introspection diff --git a/modules/http_client/src/dream_http_client/client.gleam b/modules/http_client/src/dream_http_client/client.gleam index 2cc01bb..8c3cff5 100644 --- a/modules/http_client/src/dream_http_client/client.gleam +++ b/modules/http_client/src/dream_http_client/client.gleam @@ -2145,24 +2145,11 @@ fn pair_with_name(value: String, name: String) -> #(String, String) { /// client.cancel_stream_handle(stream) /// ``` pub fn start_stream(request: ClientRequest) -> Result(StreamHandle, String) { - // Ensure ETS tables exist before spawning - ensure_ets_tables() - - // Spawn process to handle the stream let stream_pid = process.spawn_unlinked(fn() { run_stream_process(request) }) Ok(StreamHandle(pid: stream_pid)) } -// Ensure all required ETS tables exist -fn ensure_ets_tables() -> Nil { - ensure_recorder_table() - ensure_ref_mapping_table_wrapper() -} - -@external(erlang, "dream_httpc_shim", "ensure_ref_mapping_table") -fn ensure_ref_mapping_table_wrapper() -> Nil - fn run_stream_process(request: ClientRequest) -> Nil { // Try playback from recording first case maybe_replay_from_recording(request) { @@ -2445,27 +2432,6 @@ type MessageStreamRecorderState { // ETS table name for recorder state const recorder_table_name = "dream_http_client_stream_recorders" -// Ensure ETS table exists (idempotent) -fn ensure_recorder_table() -> Nil { - case ets_table_exists(recorder_table_name) { - True -> Nil - False -> { - ets_new(recorder_table_name, [ - atom.create("set"), - atom.create("public"), - atom.create("named_table"), - ]) - Nil - } - } -} - -@external(erlang, "dream_httpc_shim", "ets_table_exists") -fn ets_table_exists(name: String) -> Bool - -@external(erlang, "dream_httpc_shim", "ets_new") -fn ets_new(name: String, options: List(atom.Atom)) -> d.Dynamic - @external(erlang, "dream_httpc_shim", "ets_insert") fn ets_insert( table: String, @@ -2488,7 +2454,6 @@ fn store_message_stream_recorder( rec: recorder.Recorder, recorded_req: recording.RecordedRequest, ) -> Nil { - ensure_recorder_table() let RequestId(id) = request_id ets_insert(recorder_table_name, id, rec, recorded_req, [], [], None) } diff --git a/modules/http_client/src/dream_http_client/dream_http_client_app.erl b/modules/http_client/src/dream_http_client/dream_http_client_app.erl new file mode 100644 index 0000000..d5b5b0a --- /dev/null +++ b/modules/http_client/src/dream_http_client/dream_http_client_app.erl @@ -0,0 +1,11 @@ +-module(dream_http_client_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(_Type, _Args) -> + ets:new(dream_http_client_ref_mapping, [set, public, named_table]), + ets:new(dream_http_client_stream_recorders, [set, public, named_table]), + dream_http_client_sup:start_link(). + +stop(_State) -> + ok. diff --git a/modules/http_client/src/dream_http_client/dream_http_client_sup.erl b/modules/http_client/src/dream_http_client/dream_http_client_sup.erl new file mode 100644 index 0000000..8221780 --- /dev/null +++ b/modules/http_client/src/dream_http_client/dream_http_client_sup.erl @@ -0,0 +1,9 @@ +-module(dream_http_client_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, {#{strategy => one_for_one, intensity => 5, period => 10}, []}}. diff --git a/modules/http_client/src/dream_http_client/dream_httpc_shim.erl b/modules/http_client/src/dream_http_client/dream_httpc_shim.erl index 9c8049f..db3bf81 100644 --- a/modules/http_client/src/dream_http_client/dream_httpc_shim.erl +++ b/modules/http_client/src/dream_http_client/dream_httpc_shim.erl @@ -3,8 +3,7 @@ -export([request_stream/6, fetch_next/2, fetch_start_headers/2, request_stream_messages/6, cancel_stream/1, cancel_stream_by_string/1, receive_stream_message/1, decode_stream_message_for_selector/1, normalize_headers/1, request_sync/5, - ets_table_exists/1, ets_new/2, ets_insert/7, ets_lookup/2, ets_delete/2, - ensure_ref_mapping_table/0]). + ets_table_exists/1, ets_new/2, ets_insert/7, ets_lookup/2, ets_delete/2]). %% @doc Start a streaming HTTP request with pull-based chunk retrieval %% @@ -503,8 +502,6 @@ request_stream_messages(Method, Url, Headers, Body, _ReceiverPid, TimeoutMs) -> ok = ensure_started(inets), ok = configure_httpc(), - ensure_ref_mapping_table(), - NUrl = to_list(Url), NHeaders = maybe_add_accept_encoding(to_headers(Headers)), Req = build_req(NUrl, NHeaders, Body), @@ -734,23 +731,15 @@ decode_stream_message_for_selector({http, InnerMessage}) -> %% Get string ID for httpc ref, creating mapping if needed. %% This handles the case where selector receives messages before we stored -%% the mapping, and recovers if the table was destroyed (re-creates it). +%% the mapping. get_or_create_string_id(HttpcRef) -> - try - case lookup_string_by_ref(HttpcRef) of - {some, StringId} -> - StringId; - none -> - NewId = ref_to_string(HttpcRef), - store_ref_mapping(NewId, HttpcRef), - NewId - end - catch - error:badarg -> - ensure_ref_mapping_table(), - RecoveredId = ref_to_string(HttpcRef), - store_ref_mapping(RecoveredId, HttpcRef), - RecoveredId + case lookup_string_by_ref(HttpcRef) of + {some, StringId} -> + StringId; + none -> + NewId = ref_to_string(HttpcRef), + store_ref_mapping(NewId, HttpcRef), + NewId end. %% @doc Normalize HTTP headers to binary tuples for Gleam decoding @@ -1099,36 +1088,6 @@ ets_delete(TableName, Key) -> %% Table for mapping string IDs to httpc refs (for cancellation) -define(REF_MAPPING_TABLE, dream_http_client_ref_mapping). -%% Ensure ref mapping table exists (created on first use). -%% The table is owned by a dedicated long-lived holder process so it -%% survives the exit of short-lived stream processes. A try/catch -%% guards against the race where two processes both see `undefined` -%% and the second `ets:new` fails with `badarg`. -ensure_ref_mapping_table() -> - case ets:info(?REF_MAPPING_TABLE) of - undefined -> - try - ets:new(?REF_MAPPING_TABLE, [set, public, named_table]), - Holder = spawn(fun table_holder_loop/0), - ets:give_away(?REF_MAPPING_TABLE, Holder, undefined), - ok - catch - error:badarg -> ok - end; - _ -> - ok - end. - -%% Persistent process that owns the ETS ref-mapping table. -%% ETS tables are destroyed when their owning process exits; by -%% transferring ownership here the table outlives any individual -%% stream process. -table_holder_loop() -> - receive - {'ETS-TRANSFER', _Tab, _FromPid, _Data} -> - table_holder_loop() - end. - %% Convert httpc ref to unique string ID %% Uses the ref's string representation which is guaranteed unique ref_to_string(Ref) -> @@ -1136,39 +1095,26 @@ ref_to_string(Ref) -> %% Store bidirectional mapping: string <-> ref store_ref_mapping(StringId, HttpcRef) -> - ensure_ref_mapping_table(), - try - ets:insert(?REF_MAPPING_TABLE, {StringId, HttpcRef}), - ets:insert(?REF_MAPPING_TABLE, {HttpcRef, StringId}), - ok - catch - error:badarg -> ok - end. + ets:insert(?REF_MAPPING_TABLE, {StringId, HttpcRef}), + ets:insert(?REF_MAPPING_TABLE, {HttpcRef, StringId}), + ok. %% Lookup httpc ref by string ID (for cancellation) lookup_ref_by_string(StringId) -> - try - case ets:lookup(?REF_MAPPING_TABLE, StringId) of - [{StringId, HttpcRef}] -> - {some, HttpcRef}; - [] -> - none - end - catch - error:badarg -> none + case ets:lookup(?REF_MAPPING_TABLE, StringId) of + [{StringId, HttpcRef}] -> + {some, HttpcRef}; + [] -> + none end. %% Lookup string ID by httpc ref (for message translation) lookup_string_by_ref(HttpcRef) -> - try - case ets:lookup(?REF_MAPPING_TABLE, HttpcRef) of - [{HttpcRef, StringId}] -> - {some, StringId}; - [] -> - none - end - catch - error:badarg -> none + case ets:lookup(?REF_MAPPING_TABLE, HttpcRef) of + [{HttpcRef, StringId}] -> + {some, StringId}; + [] -> + none end. %% Store a zlib context in ETS for message-based streaming decompression @@ -1176,58 +1122,39 @@ maybe_store_stream_zlib(StringId, Headers) -> case detect_stream_encoding(Headers) of {_Enc, WindowBits} -> Z = init_zlib_context(WindowBits), - ensure_ref_mapping_table(), - try - ets:insert(?REF_MAPPING_TABLE, {{zlib, StringId}, Z}), - ok - catch - error:badarg -> - cleanup_zlib(Z), - ok - end; + ets:insert(?REF_MAPPING_TABLE, {{zlib, StringId}, Z}), + ok; none -> ok end. %% Decompress a chunk using ETS-stored zlib context maybe_decompress_stream_chunk(StringId, Data) -> - try - case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of - [{{zlib, StringId}, Z}] -> - decompress_chunk(Z, Data); - [] -> - Data - end - catch - error:badarg -> Data + case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of + [{{zlib, StringId}, Z}] -> + decompress_chunk(Z, Data); + [] -> + Data end. %% Clean up ETS-stored zlib context cleanup_stream_zlib(StringId) -> - try - case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of - [{{zlib, StringId}, Z}] -> - cleanup_zlib(Z), - ets:delete(?REF_MAPPING_TABLE, {zlib, StringId}), - ok; - [] -> - ok - end - catch - error:badarg -> ok + case ets:lookup(?REF_MAPPING_TABLE, {zlib, StringId}) of + [{{zlib, StringId}, Z}] -> + cleanup_zlib(Z), + ets:delete(?REF_MAPPING_TABLE, {zlib, StringId}), + ok; + [] -> + ok end. %% Remove both mappings (cleanup after stream ends) remove_ref_mapping(StringId) -> - try - case lookup_ref_by_string(StringId) of - {some, HttpcRef} -> - ets:delete(?REF_MAPPING_TABLE, StringId), - ets:delete(?REF_MAPPING_TABLE, HttpcRef), - ok; - none -> - ok - end - catch - error:badarg -> ok + case lookup_ref_by_string(StringId) of + {some, HttpcRef} -> + ets:delete(?REF_MAPPING_TABLE, StringId), + ets:delete(?REF_MAPPING_TABLE, HttpcRef), + ok; + none -> + ok end. diff --git a/modules/http_client/test/ets_table_ownership_ffi.erl b/modules/http_client/test/ets_table_ownership_ffi.erl index 3bfd2eb..a55c377 100644 --- a/modules/http_client/test/ets_table_ownership_ffi.erl +++ b/modules/http_client/test/ets_table_ownership_ffi.erl @@ -1,42 +1,26 @@ -module(ets_table_ownership_ffi). --export([ensure_table/0, table_exists/0, delete_table/0, - table_owner_is_not_self/0, table_owner_is_alive/0, - store_test_mapping/0, lookup_test_mapping_exists/0]). +-export([table_exists/0, table_owner_is_not_self/0, table_owner_is_alive/0, + store_test_mapping/0, lookup_test_mapping_exists/0, + clear_test_mappings/0]). -define(TABLE, dream_http_client_ref_mapping). -ensure_table() -> - dream_httpc_shim:ensure_ref_mapping_table(), - nil. - table_exists() -> case ets:info(?TABLE) of undefined -> false; _ -> true end. -delete_table() -> - try ets:delete(?TABLE) catch error:badarg -> ok end, - nil. - table_owner_is_not_self() -> - try - case ets:info(?TABLE, owner) of - undefined -> false; - Pid -> Pid =/= self() - end - catch - error:badarg -> false + case ets:info(?TABLE, owner) of + undefined -> false; + Pid -> Pid =/= self() end. table_owner_is_alive() -> - try - case ets:info(?TABLE, owner) of - undefined -> false; - Pid -> is_process_alive(Pid) - end - catch - error:badarg -> false + case ets:info(?TABLE, owner) of + undefined -> false; + Pid -> is_process_alive(Pid) end. store_test_mapping() -> @@ -45,11 +29,12 @@ store_test_mapping() -> nil. lookup_test_mapping_exists() -> - try - case ets:lookup(?TABLE, ets_test_ref_key) of - [{ets_test_ref_key, <<"ets_test_string_id">>}] -> true; - _ -> false - end - catch - error:badarg -> false + case ets:lookup(?TABLE, ets_test_ref_key) of + [{ets_test_ref_key, <<"ets_test_string_id">>}] -> true; + _ -> false end. + +clear_test_mappings() -> + ets:delete(?TABLE, ets_test_ref_key), + ets:delete(?TABLE, <<"ets_test_string_id">>), + nil. diff --git a/modules/http_client/test/ets_table_ownership_test.gleam b/modules/http_client/test/ets_table_ownership_test.gleam index 320e3f6..ff89a7e 100644 --- a/modules/http_client/test/ets_table_ownership_test.gleam +++ b/modules/http_client/test/ets_table_ownership_test.gleam @@ -1,21 +1,22 @@ //// Regression tests: ETS table ownership must not cause silent stream crashes //// -//// Before the fix, `dream_httpc_shim` created its named ETS table -//// (`dream_http_client_ref_mapping`) from whichever short-lived process first -//// called `ensure_ref_mapping_table()`. When that process exited, the table was -//// destroyed. Concurrent stream processes that depended on it would crash with -//// `badarg` inside `decode_stream_message_for_selector/1`, killing the stream -//// silently — no `on_stream_error` callback fired. +//// The `dream_http_client_ref_mapping` ETS table is created by the +//// `dream_http_client` OTP application in its `start/2` callback. The table +//// is owned by the application master process, which lives for the entire +//// application lifetime. This means: +//// +//// - No short-lived process can destroy the table by exiting. +//// - No race condition on table creation (application start is serialized). +//// - No try/catch needed on ETS access (the table is always there). //// //// These tests verify: -//// 1. The table is owned by a persistent holder process, not the caller -//// 2. The table and its data survive after the creating process exits -//// 3. Concurrent table creation (race condition) does not crash -//// 4. Concurrent streams from short-lived callers all complete +//// 1. The table exists and is owned by a long-lived process, not the caller +//// 2. Concurrent streams from short-lived callers all complete +//// 3. Sequential streams across process boundaries work //// -//// Tests that manipulate the ETS table directly (delete/recreate) use FFI -//// helpers in `ets_table_ownership_ffi.erl` for table introspection that -//// is not exposed through the Gleam API. +//// The integration tests are the real regression tests — they reproduce the +//// exact scenario from the original bug report (concurrent streams from +//// expired callers) and verify it can never happen again. import dream_http_client/client import dream_http_client_test @@ -37,60 +38,24 @@ fn mock_request(path: String) -> client.ClientRequest { // ETS Table Ownership Tests // ============================================================================ -/// After ensure_ref_mapping_table(), the table owner must be a separate -/// holder process — not the calling process. This is the core fix mechanism: -/// a dedicated holder process owns the table so it outlives any individual -/// stream process. -pub fn table_owner_is_dedicated_holder_not_caller_test() { - // Arrange - delete_table() - - // Act - ensure_table() - - // Assert — owner is alive but is NOT us - table_owner_is_not_self() |> should.be_true() - table_owner_is_alive() |> should.be_true() -} - -/// The ETS table must persist after the process that created it exits. -/// Before the fix, the table was destroyed on creator exit because ETS -/// tables are owned by their creating process. -pub fn table_survives_creator_process_exit_test() { - // Arrange — clean state so the spawned process is the creator - delete_table() - - let done = process.new_subject() - - // Act — create the table from a short-lived process that exits immediately - let _pid = - process.spawn_unlinked(fn() { - ensure_table() - process.send(done, True) - }) - - let assert Ok(True) = process.receive(done, 2000) - // Give the spawned process time to exit - process.sleep(200) - - // Assert — table and its owner (holder process) are still alive +/// The ETS table must exist and be owned by a process that is NOT the +/// calling test process. With the OTP application architecture, the table +/// is owned by the application master — a long-lived process managed by +/// the BEAM's application controller. +pub fn table_is_owned_by_application_not_caller_test() { table_exists() |> should.be_true() + table_owner_is_not_self() |> should.be_true() table_owner_is_alive() |> should.be_true() } -/// Data stored in the ETS table must survive after the creating process exits. -/// This verifies that the holder process ownership transfer preserves all -/// existing table entries. -pub fn stored_mappings_survive_creator_exit_test() { - // Arrange - delete_table() - +/// Data stored in the ETS table from a short-lived process must persist +/// after that process exits. The table is owned by the application master, +/// so no individual process exit can destroy it or its data. +pub fn stored_mappings_survive_writer_process_exit_test() { let done = process.new_subject() - // Act — create table and store a mapping from a short-lived process let _pid = process.spawn_unlinked(fn() { - ensure_table() store_test_mapping() process.send(done, True) }) @@ -98,39 +63,8 @@ pub fn stored_mappings_survive_creator_exit_test() { let assert Ok(True) = process.receive(done, 2000) process.sleep(200) - // Assert — mapping is still retrievable after creator exited lookup_test_mapping_exists() |> should.be_true() -} - -// ============================================================================ -// Race Condition Tests -// ============================================================================ - -/// Many concurrent calls to ensure_ref_mapping_table() must not crash. -/// Before the fix, a race existed where two processes both saw `undefined` -/// from `ets:info/1` and the second `ets:new/2` call would throw `badarg` -/// because the named table already existed. -pub fn concurrent_table_creation_does_not_crash_test() { - // Arrange - delete_table() - - let done = process.new_subject() - let count = 20 - - // Act — spawn 20 processes that all race to create the table - list.each(list.range(1, count), fn(i) { - let _pid = - process.spawn_unlinked(fn() { - ensure_table() - process.send(done, i) - }) - Nil - }) - - // Assert — all 20 processes succeed without crashing - let results = collect_n(done, count, 5000) - list.length(results) |> should.equal(count) - table_exists() |> should.be_true() + clear_test_mappings() } // ============================================================================ @@ -138,21 +72,12 @@ pub fn concurrent_table_creation_does_not_crash_test() { // ============================================================================ /// A stream started from a short-lived caller must complete successfully -/// even after the caller process exits. The caller calls `start_stream()` -/// which calls `ensure_ets_tables()` — in the old code, the caller owned +/// even after the caller process exits. In the old code, the caller owned /// the ETS table and its exit destroyed it. The stream process would then -/// crash with `badarg` when it tried to look up ref mappings. -/// -/// Note: a single stream may self-heal by re-creating the table from within -/// `request_stream_messages()`. This test provides baseline assurance; the -/// concurrent test below is the definitive bug reproduction. +/// crash with `badarg` when looking up ref mappings. pub fn stream_from_expired_caller_completes_test() { - // Arrange — clean state so the caller process creates the table - delete_table() - let end_subject = process.new_subject() - // Act — start stream from a process that immediately exits let _pid = process.spawn_unlinked(fn() { let request = @@ -164,10 +89,8 @@ pub fn stream_from_expired_caller_completes_test() { process.send(end_subject, "error:" <> reason) }) let assert Ok(_handle) = client.start_stream(request) - // Caller exits here — would destroy table in old code }) - // Assert — stream completes (not silently dead, not error) case process.receive(end_subject, 5000) { Ok("completed") -> Nil Ok(_msg) -> should.fail() @@ -182,15 +105,11 @@ pub fn stream_from_expired_caller_completes_test() { /// crashing the second stream silently — no `on_stream_error` fires. /// /// The fast stream completes in ~1s; the slow stream takes ~10s. After the -/// fast stream's process exits (and in the old code, destroys the ETS +/// fast stream's caller exits (and in the old code, destroys the ETS /// table), the slow stream must still fire `on_stream_end`. pub fn concurrent_streams_from_expired_callers_both_complete_test() { - // Arrange — force table recreation through short-lived callers - delete_table() - let end_subject = process.new_subject() - // Act — start fast stream from short-lived caller let _pid1 = process.spawn_unlinked(fn() { let request = @@ -200,11 +119,8 @@ pub fn concurrent_streams_from_expired_callers_both_complete_test() { let assert Ok(_handle) = client.start_stream(request) }) - // Small delay so the fast stream's caller creates the table first, - // ensuring the slow stream's process is NOT the table creator. process.sleep(50) - // Act — start slow stream from short-lived caller let _pid2 = process.spawn_unlinked(fn() { let request = @@ -214,7 +130,6 @@ pub fn concurrent_streams_from_expired_callers_both_complete_test() { let assert Ok(_handle) = client.start_stream(request) }) - // Assert — both streams complete (positive IDs mean on_stream_end fired) let results = collect_n(end_subject, 2, 15_000) list.length(results) |> should.equal(2) list.each(results, fn(id) { { id > 0 } |> should.be_true() }) @@ -224,10 +139,8 @@ pub fn concurrent_streams_from_expired_callers_both_complete_test() { /// Verifies that concurrent streams don't interfere with each other even /// when they share the same ETS table and complete at similar times. pub fn three_concurrent_streams_all_complete_test() { - // Arrange let end_subject = process.new_subject() - // Act — start 3 concurrent streams list.each([1, 2, 3], fn(i) { let request = mock_request("/stream/fast") @@ -237,7 +150,6 @@ pub fn three_concurrent_streams_all_complete_test() { Nil }) - // Assert — all 3 complete with on_stream_end (positive), not on_stream_error let results = collect_n(end_subject, 3, 5000) list.length(results) |> should.equal(3) list.each(results, fn(id) { { id > 0 } |> should.be_true() }) @@ -247,12 +159,10 @@ pub fn three_concurrent_streams_all_complete_test() { /// must still work. Verifies the table is not destroyed between sequential /// stream invocations. pub fn sequential_streams_after_process_exit_test() { - // Arrange — stream 1: start, complete, and let the process exit let assert Ok(handle1) = client.start_stream(mock_request("/stream/fast")) client.await_stream(handle1) client.is_stream_active(handle1) |> should.be_false() - // Act — stream 2 must work let end_subject = process.new_subject() let request2 = mock_request("/stream/fast") @@ -260,7 +170,6 @@ pub fn sequential_streams_after_process_exit_test() { |> client.on_stream_error(fn(_reason) { process.send(end_subject, False) }) let assert Ok(_handle2) = client.start_stream(request2) - // Assert — stream 2 completes case process.receive(end_subject, 5000) { Ok(True) -> Nil Ok(False) -> should.fail() @@ -269,16 +178,12 @@ pub fn sequential_streams_after_process_exit_test() { } /// Five concurrent streams from separate short-lived caller processes must -/// all complete. Stress test for the holder-process ownership model under -/// higher concurrency with multiple table-creation races. +/// all complete. Stress test for the OTP application ownership model under +/// higher concurrency. pub fn five_concurrent_streams_from_expired_callers_test() { - // Arrange - delete_table() - let end_subject = process.new_subject() let count = 5 - // Act — spawn 5 callers that each start a stream and immediately exit list.each(list.range(1, count), fn(i) { let _pid = process.spawn_unlinked(fn() { @@ -293,7 +198,6 @@ pub fn five_concurrent_streams_from_expired_callers_test() { Nil }) - // Assert — all 5 complete let results = collect_n(end_subject, count, 8000) list.length(results) |> should.equal(count) list.each(results, fn(id) { { id > 0 } |> should.be_true() }) @@ -328,15 +232,9 @@ fn do_collect_n( // FFI Bindings — ETS table introspection // ============================================================================ -@external(erlang, "ets_table_ownership_ffi", "ensure_table") -fn ensure_table() -> Nil - @external(erlang, "ets_table_ownership_ffi", "table_exists") fn table_exists() -> Bool -@external(erlang, "ets_table_ownership_ffi", "delete_table") -fn delete_table() -> Nil - @external(erlang, "ets_table_ownership_ffi", "table_owner_is_not_self") fn table_owner_is_not_self() -> Bool @@ -348,3 +246,6 @@ fn store_test_mapping() -> Nil @external(erlang, "ets_table_ownership_ffi", "lookup_test_mapping_exists") fn lookup_test_mapping_exists() -> Bool + +@external(erlang, "ets_table_ownership_ffi", "clear_test_mappings") +fn clear_test_mappings() -> Nil