From 4b3bb27cac1654a5f818f0a2fcb021dd21daa4be Mon Sep 17 00:00:00 2001 From: Nathan Long Date: Tue, 28 Oct 2025 09:26:25 -0400 Subject: [PATCH] Add label (repo name) to ownership and connection errors A label like MyApp.Repo can be passed via the :label option. It will be used in the process label for the DBConnection.Ownership.Manager process and will appear in error messages from both DBConnection.Ownership.Manager and DBConnection.Holder. This is helpful when applications use multiple repos (e.g., read replicas). Before: (DBConnection.OwnershipError) cannot find ownership process for #PID<...> (DBConnection.ConnectionError) connection is closed because of an error... After: (DBConnection.OwnershipError) cannot find ownership process for #PID<...> (MyApp.Repo) using mode :manual. (DBConnection.ConnectionError) MyApp.Repo connection is closed because of an error, disconnect or timeout Tests verify labels appear correctly in both error types, and that errors work correctly if no label is given. In https://github.com/elixir-ecto/ecto_sql/pull/698/ the adapter is updated to pass the repo name as a label. --- integration_test/ownership/manager_test.exs | 68 +++++++++++++++++++++ lib/db_connection/holder.ex | 47 ++++++++++---- lib/db_connection/ownership/manager.ex | 16 +++-- lib/db_connection/ownership/proxy.ex | 11 +++- 4 files changed, 125 insertions(+), 17 deletions(-) diff --git a/integration_test/ownership/manager_test.exs b/integration_test/ownership/manager_test.exs index a34c73d..e2dce28 100644 --- a/integration_test/ownership/manager_test.exs +++ b/integration_test/ownership/manager_test.exs @@ -681,6 +681,59 @@ defmodule ManagerTest do assert log =~ "** (RuntimeError) oops" end + test "includes label in ownership error when label is provided" do + {:ok, pool, opts} = start_pool(label: MyApp.Repo) + + # Try to use the pool without checking out - should raise OwnershipError with label + error = + assert_raise DBConnection.OwnershipError, fn -> + P.run(pool, fn _ -> :ok end, opts) + end + + # Verify the error message includes the label + assert error.message =~ "cannot find ownership process" + assert error.message =~ "(MyApp.Repo)" + assert error.message =~ "using mode :manual" + end + + test "doesn't require a label to produce an ownership error" do + {:ok, pool, opts} = start_pool() + + # Try to use the pool without checking out - should raise OwnershipError with label + error = + assert_raise DBConnection.OwnershipError, fn -> + P.run(pool, fn _ -> :ok end, opts) + end + + assert error.message =~ "cannot find ownership process" + end + + test "includes label in connection closed error when label is provided" do + alias TestQuery, as: Q + {pool, _agent} = start_pool_with_disconnect_on_execute(label: MyApp.Repo) + Ownership.ownership_checkout(pool, []) + + P.transaction(pool, fn conn -> + P.execute(conn, %Q{}, [:param]) + error = assert_raise DBConnection.ConnectionError, fn -> P.execute!(conn, %Q{}, [:param]) end + assert String.starts_with?(error.message, "MyApp.Repo connection is closed") + :closed + end) + end + + test "doesn't require a label to produce a connection closed error" do + alias TestQuery, as: Q + {pool, _agent} = start_pool_with_disconnect_on_execute() + Ownership.ownership_checkout(pool, []) + + P.transaction(pool, fn conn -> + P.execute(conn, %Q{}, [:param]) + error = assert_raise DBConnection.ConnectionError, fn -> P.execute!(conn, %Q{}, [:param]) end + assert String.starts_with?(error.message, "connection is closed") + :closed + end) + end + defp start_pool(opts \\ []) do stack = [{:ok, :state}] ++ List.duplicate({:idle, :state}, 10) {:ok, agent} = A.start_link(stack) @@ -690,6 +743,21 @@ defmodule ManagerTest do {:ok, pid, opts} end + # Helper to set up a pool that will disconnect during a transaction. + # This is needed to trigger Holder's "connection is closed" error. + defp start_pool_with_disconnect_on_execute(opts \\ []) do + err = %DBConnection.ConnectionError{message: "test error"} + # The stack represents TestAgent responses: initial connect, begin transaction, + # disconnect on first execute, then allow reconnect. + stack = [{:ok, :state}, {:ok, :began, :new_state}, {:disconnect, err, :newer_state}, :ok, {:ok, :state}] + + {:ok, agent} = A.start_link(stack) + pool_opts = [agent: agent, parent: self(), ownership_mode: :manual] ++ opts + {:ok, pool} = P.start_link(pool_opts) + + {pool, agent} + end + defp async_no_callers(fun) do Task.async(fn -> Process.delete(:"$callers") diff --git a/lib/db_connection/holder.ex b/lib/db_connection/holder.ex index e8f399c..ee00846 100644 --- a/lib/db_connection/holder.ex +++ b/lib/db_connection/holder.ex @@ -9,7 +9,7 @@ defmodule DBConnection.Holder do @time_unit 1000 Record.defrecord(:conn, [:connection, :module, :state, :lock, :ts, deadline: nil, status: :ok]) - Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock]) + Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock, :label]) @type t :: :ets.tid() @type checkin_time :: non_neg_integer() | nil @@ -131,24 +131,38 @@ defmodule DBConnection.Holder do end defp handle_or_cleanup(type, pool_ref, fun, args, opts) do - pool_ref(holder: holder, lock: lock) = pool_ref + pool_ref(holder: holder, lock: lock, label: label) = pool_ref try do :ets.lookup(holder, :conn) rescue ArgumentError -> - msg = "connection is closed because of an error, disconnect or timeout" + msg = + maybe_prefix_label( + "connection is closed because of an error, disconnect or timeout", + label + ) + {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} else [conn(lock: conn_lock)] when conn_lock != lock -> - raise "an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}" + raise maybe_prefix_label( + "an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}", + label, + ":" + ) [conn(status: :error)] -> - msg = "connection is closed because of an error, disconnect or timeout" + msg = + maybe_prefix_label( + "connection is closed because of an error, disconnect or timeout", + label + ) + {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} [conn(status: :aborted)] when type != :cleanup -> - msg = "transaction rolling back" + msg = maybe_prefix_label("transaction rolling back", label) {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} [conn(module: module, state: state)] -> @@ -156,6 +170,10 @@ defmodule DBConnection.Holder do end end + defp maybe_prefix_label(msg, label, separator \\ "") do + if label, do: "#{inspect(label)} " <> separator <> msg, else: msg + end + ## Pool state helpers API (invoked by callers) @spec put_state(pool_ref :: any, term) :: :ok @@ -196,9 +214,9 @@ defmodule DBConnection.Holder do :ok end - @spec handle_checkout(t, {pid, reference}, reference, checkin_time) :: boolean - def handle_checkout(holder, {pid, mref}, ref, checkin_time) do - :ets.give_away(holder, pid, {mref, ref, checkin_time}) + @spec handle_checkout(t, {pid, reference}, reference, checkin_time, label :: any) :: boolean + def handle_checkout(holder, {pid, mref}, ref, checkin_time, label \\ nil) do + :ets.give_away(holder, pid, {mref, ref, checkin_time, label}) rescue ArgumentError -> if Process.alive?(pid) or :ets.info(holder, :owner) != self() do @@ -290,13 +308,20 @@ defmodule DBConnection.Holder do send(pid, {:db_connection, {self(), lock}, {:checkout, callers, start, queue?}}) receive do - {:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time}} -> + {:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time, label}} -> Process.demonitor(lock, [:flush]) {deadline, ops} = start_deadline(timeout, pool, ref, holder, start) :ets.update_element(holder, :conn, [{conn(:lock) + 1, lock} | ops]) pool_ref = - pool_ref(pool: pool, reference: ref, deadline: deadline, holder: holder, lock: lock) + pool_ref( + pool: pool, + reference: ref, + deadline: deadline, + holder: holder, + lock: lock, + label: label + ) checkout_result(holder, pool_ref, checkin_time) diff --git a/lib/db_connection/ownership/manager.ex b/lib/db_connection/ownership/manager.ex index 482bc7b..8728837 100644 --- a/lib/db_connection/ownership/manager.ex +++ b/lib/db_connection/ownership/manager.ex @@ -98,6 +98,8 @@ defmodule DBConnection.Ownership.Manager do mode = Keyword.get(pool_opts, :ownership_mode, :auto) checkout_opts = Keyword.take(pool_opts, [:ownership_timeout, :queue_target, :queue_interval]) + Util.set_label({__MODULE__, pool_opts[:label]}) + {:ok, %{ pool: pool, @@ -107,7 +109,8 @@ defmodule DBConnection.Ownership.Manager do mode: mode, mode_ref: nil, ets: ets, - log: log + log: log, + label: pool_opts[:label] }} end @@ -223,7 +226,7 @@ defmodule DBConnection.Ownership.Manager do {:noreply, state} :not_found when mode == :manual -> - not_found(from, mode) + not_found(from, mode, state.label) {:noreply, state} :not_found -> @@ -253,6 +256,9 @@ defmodule DBConnection.Ownership.Manager do defp proxy_checkout(state, caller, opts) do %{pool: pool, checkouts: checkouts, owners: owners, ets: ets, log: log, mode: mode} = state + label = state.label + opts = if label, do: Keyword.put(opts, :label, label), else: opts + {:ok, proxy} = DynamicSupervisor.start_child( DBConnection.Ownership.Supervisor, @@ -405,10 +411,12 @@ defmodule DBConnection.Ownership.Manager do caller end - defp not_found({pid, _} = from, mode) do + defp not_found({pid, _} = from, mode, label) do msg = """ cannot find ownership process for #{Util.inspect_pid(pid)} - using mode #{inspect(mode)}. + #{if label, do: "(#{inspect(label)})"} using mode #{inspect(mode)}. + (Note that a connection's mode reverts to :manual if its owner + terminates.) When using ownership, you must manage connections in one of the four ways: diff --git a/lib/db_connection/ownership/proxy.ex b/lib/db_connection/ownership/proxy.ex index 41ac7f3..404a1f9 100644 --- a/lib/db_connection/ownership/proxy.ex +++ b/lib/db_connection/ownership/proxy.ex @@ -34,6 +34,11 @@ defmodule DBConnection.Ownership.Proxy do pre_checkin = Keyword.get(pool_opts, :pre_checkin, fn _, mod, state -> {:ok, mod, state} end) post_checkout = Keyword.get(pool_opts, :post_checkout, &{:ok, &1, &2}) + label = pool_opts[:label] + + if label do + Process.set_label({__MODULE__, label}) + end state = %{ client: nil, @@ -193,8 +198,10 @@ defmodule DBConnection.Ownership.Proxy do {:reply, connection_metrics, state} end - defp checkout({pid, ref} = from, %{holder: holder} = state) do - if Holder.handle_checkout(holder, from, ref, nil) do + defp checkout({pid, ref} = from, %{holder: holder, pool_opts: pool_opts} = state) do + label = pool_opts[:label] + + if Holder.handle_checkout(holder, from, ref, nil, label) do {:noreply, %{state | client: {pid, ref, pruned_stacktrace(pid)}}} else next(state)