diff --git a/integration_test/ownership/manager_test.exs b/integration_test/ownership/manager_test.exs index a34c73d..e2dce28 100644 --- a/integration_test/ownership/manager_test.exs +++ b/integration_test/ownership/manager_test.exs @@ -681,6 +681,59 @@ defmodule ManagerTest do assert log =~ "** (RuntimeError) oops" end + test "includes label in ownership error when label is provided" do + {:ok, pool, opts} = start_pool(label: MyApp.Repo) + + # Try to use the pool without checking out - should raise OwnershipError with label + error = + assert_raise DBConnection.OwnershipError, fn -> + P.run(pool, fn _ -> :ok end, opts) + end + + # Verify the error message includes the label + assert error.message =~ "cannot find ownership process" + assert error.message =~ "(MyApp.Repo)" + assert error.message =~ "using mode :manual" + end + + test "doesn't require a label to produce an ownership error" do + {:ok, pool, opts} = start_pool() + + # Try to use the pool without checking out - should raise OwnershipError with label + error = + assert_raise DBConnection.OwnershipError, fn -> + P.run(pool, fn _ -> :ok end, opts) + end + + assert error.message =~ "cannot find ownership process" + end + + test "includes label in connection closed error when label is provided" do + alias TestQuery, as: Q + {pool, _agent} = start_pool_with_disconnect_on_execute(label: MyApp.Repo) + Ownership.ownership_checkout(pool, []) + + P.transaction(pool, fn conn -> + P.execute(conn, %Q{}, [:param]) + error = assert_raise DBConnection.ConnectionError, fn -> P.execute!(conn, %Q{}, [:param]) end + assert String.starts_with?(error.message, "MyApp.Repo connection is closed") + :closed + end) + end + + test "doesn't require a label to produce a connection closed error" do + alias TestQuery, as: Q + {pool, _agent} = start_pool_with_disconnect_on_execute() + Ownership.ownership_checkout(pool, []) + + P.transaction(pool, fn conn -> + P.execute(conn, %Q{}, [:param]) + error = assert_raise DBConnection.ConnectionError, fn -> P.execute!(conn, %Q{}, [:param]) end + assert String.starts_with?(error.message, "connection is closed") + :closed + end) + end + defp start_pool(opts \\ []) do stack = [{:ok, :state}] ++ List.duplicate({:idle, :state}, 10) {:ok, agent} = A.start_link(stack) @@ -690,6 +743,21 @@ defmodule ManagerTest do {:ok, pid, opts} end + # Helper to set up a pool that will disconnect during a transaction. + # This is needed to trigger Holder's "connection is closed" error. + defp start_pool_with_disconnect_on_execute(opts \\ []) do + err = %DBConnection.ConnectionError{message: "test error"} + # The stack represents TestAgent responses: initial connect, begin transaction, + # disconnect on first execute, then allow reconnect. + stack = [{:ok, :state}, {:ok, :began, :new_state}, {:disconnect, err, :newer_state}, :ok, {:ok, :state}] + + {:ok, agent} = A.start_link(stack) + pool_opts = [agent: agent, parent: self(), ownership_mode: :manual] ++ opts + {:ok, pool} = P.start_link(pool_opts) + + {pool, agent} + end + defp async_no_callers(fun) do Task.async(fn -> Process.delete(:"$callers") diff --git a/lib/db_connection/holder.ex b/lib/db_connection/holder.ex index e8f399c..ee00846 100644 --- a/lib/db_connection/holder.ex +++ b/lib/db_connection/holder.ex @@ -9,7 +9,7 @@ defmodule DBConnection.Holder do @time_unit 1000 Record.defrecord(:conn, [:connection, :module, :state, :lock, :ts, deadline: nil, status: :ok]) - Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock]) + Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock, :label]) @type t :: :ets.tid() @type checkin_time :: non_neg_integer() | nil @@ -131,24 +131,38 @@ defmodule DBConnection.Holder do end defp handle_or_cleanup(type, pool_ref, fun, args, opts) do - pool_ref(holder: holder, lock: lock) = pool_ref + pool_ref(holder: holder, lock: lock, label: label) = pool_ref try do :ets.lookup(holder, :conn) rescue ArgumentError -> - msg = "connection is closed because of an error, disconnect or timeout" + msg = + maybe_prefix_label( + "connection is closed because of an error, disconnect or timeout", + label + ) + {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} else [conn(lock: conn_lock)] when conn_lock != lock -> - raise "an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}" + raise maybe_prefix_label( + "an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}", + label, + ":" + ) [conn(status: :error)] -> - msg = "connection is closed because of an error, disconnect or timeout" + msg = + maybe_prefix_label( + "connection is closed because of an error, disconnect or timeout", + label + ) + {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} [conn(status: :aborted)] when type != :cleanup -> - msg = "transaction rolling back" + msg = maybe_prefix_label("transaction rolling back", label) {:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused} [conn(module: module, state: state)] -> @@ -156,6 +170,10 @@ defmodule DBConnection.Holder do end end + defp maybe_prefix_label(msg, label, separator \\ "") do + if label, do: "#{inspect(label)} " <> separator <> msg, else: msg + end + ## Pool state helpers API (invoked by callers) @spec put_state(pool_ref :: any, term) :: :ok @@ -196,9 +214,9 @@ defmodule DBConnection.Holder do :ok end - @spec handle_checkout(t, {pid, reference}, reference, checkin_time) :: boolean - def handle_checkout(holder, {pid, mref}, ref, checkin_time) do - :ets.give_away(holder, pid, {mref, ref, checkin_time}) + @spec handle_checkout(t, {pid, reference}, reference, checkin_time, label :: any) :: boolean + def handle_checkout(holder, {pid, mref}, ref, checkin_time, label \\ nil) do + :ets.give_away(holder, pid, {mref, ref, checkin_time, label}) rescue ArgumentError -> if Process.alive?(pid) or :ets.info(holder, :owner) != self() do @@ -290,13 +308,20 @@ defmodule DBConnection.Holder do send(pid, {:db_connection, {self(), lock}, {:checkout, callers, start, queue?}}) receive do - {:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time}} -> + {:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time, label}} -> Process.demonitor(lock, [:flush]) {deadline, ops} = start_deadline(timeout, pool, ref, holder, start) :ets.update_element(holder, :conn, [{conn(:lock) + 1, lock} | ops]) pool_ref = - pool_ref(pool: pool, reference: ref, deadline: deadline, holder: holder, lock: lock) + pool_ref( + pool: pool, + reference: ref, + deadline: deadline, + holder: holder, + lock: lock, + label: label + ) checkout_result(holder, pool_ref, checkin_time) diff --git a/lib/db_connection/ownership/manager.ex b/lib/db_connection/ownership/manager.ex index 482bc7b..8728837 100644 --- a/lib/db_connection/ownership/manager.ex +++ b/lib/db_connection/ownership/manager.ex @@ -98,6 +98,8 @@ defmodule DBConnection.Ownership.Manager do mode = Keyword.get(pool_opts, :ownership_mode, :auto) checkout_opts = Keyword.take(pool_opts, [:ownership_timeout, :queue_target, :queue_interval]) + Util.set_label({__MODULE__, pool_opts[:label]}) + {:ok, %{ pool: pool, @@ -107,7 +109,8 @@ defmodule DBConnection.Ownership.Manager do mode: mode, mode_ref: nil, ets: ets, - log: log + log: log, + label: pool_opts[:label] }} end @@ -223,7 +226,7 @@ defmodule DBConnection.Ownership.Manager do {:noreply, state} :not_found when mode == :manual -> - not_found(from, mode) + not_found(from, mode, state.label) {:noreply, state} :not_found -> @@ -253,6 +256,9 @@ defmodule DBConnection.Ownership.Manager do defp proxy_checkout(state, caller, opts) do %{pool: pool, checkouts: checkouts, owners: owners, ets: ets, log: log, mode: mode} = state + label = state.label + opts = if label, do: Keyword.put(opts, :label, label), else: opts + {:ok, proxy} = DynamicSupervisor.start_child( DBConnection.Ownership.Supervisor, @@ -405,10 +411,12 @@ defmodule DBConnection.Ownership.Manager do caller end - defp not_found({pid, _} = from, mode) do + defp not_found({pid, _} = from, mode, label) do msg = """ cannot find ownership process for #{Util.inspect_pid(pid)} - using mode #{inspect(mode)}. + #{if label, do: "(#{inspect(label)})"} using mode #{inspect(mode)}. + (Note that a connection's mode reverts to :manual if its owner + terminates.) When using ownership, you must manage connections in one of the four ways: diff --git a/lib/db_connection/ownership/proxy.ex b/lib/db_connection/ownership/proxy.ex index 41ac7f3..404a1f9 100644 --- a/lib/db_connection/ownership/proxy.ex +++ b/lib/db_connection/ownership/proxy.ex @@ -34,6 +34,11 @@ defmodule DBConnection.Ownership.Proxy do pre_checkin = Keyword.get(pool_opts, :pre_checkin, fn _, mod, state -> {:ok, mod, state} end) post_checkout = Keyword.get(pool_opts, :post_checkout, &{:ok, &1, &2}) + label = pool_opts[:label] + + if label do + Process.set_label({__MODULE__, label}) + end state = %{ client: nil, @@ -193,8 +198,10 @@ defmodule DBConnection.Ownership.Proxy do {:reply, connection_metrics, state} end - defp checkout({pid, ref} = from, %{holder: holder} = state) do - if Holder.handle_checkout(holder, from, ref, nil) do + defp checkout({pid, ref} = from, %{holder: holder, pool_opts: pool_opts} = state) do + label = pool_opts[:label] + + if Holder.handle_checkout(holder, from, ref, nil, label) do {:noreply, %{state | client: {pid, ref, pruned_stacktrace(pid)}}} else next(state)