Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions integration_test/ownership/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,59 @@
assert log =~ "** (RuntimeError) oops"
end

test "includes label in ownership error when label is provided" do
{:ok, pool, opts} = start_pool(label: MyApp.Repo)

# Try to use the pool without checking out - should raise OwnershipError with label
error =
assert_raise DBConnection.OwnershipError, fn ->
P.run(pool, fn _ -> :ok end, opts)
end

# Verify the error message includes the label
assert error.message =~ "cannot find ownership process"
assert error.message =~ "(MyApp.Repo)"
assert error.message =~ "using mode :manual"
end

test "doesn't require a label to produce an ownership error" do
{:ok, pool, opts} = start_pool()

# Try to use the pool without checking out - should raise OwnershipError with label
error =
assert_raise DBConnection.OwnershipError, fn ->
P.run(pool, fn _ -> :ok end, opts)
end

assert error.message =~ "cannot find ownership process"
end

test "includes label in connection closed error when label is provided" do

Check failure on line 711 in integration_test/ownership/manager_test.exs

View workflow job for this annotation

GitHub Actions / test (1.14, 24.2)

test includes label in connection closed error when label is provided (ManagerTest)
alias TestQuery, as: Q
{pool, _agent} = start_pool_with_disconnect_on_execute(label: MyApp.Repo)
Ownership.ownership_checkout(pool, [])

P.transaction(pool, fn conn ->
P.execute(conn, %Q{}, [:param])
error = assert_raise DBConnection.ConnectionError, fn -> P.execute!(conn, %Q{}, [:param]) end
assert String.starts_with?(error.message, "MyApp.Repo connection is closed")
:closed
end)
end

test "doesn't require a label to produce a connection closed error" do
alias TestQuery, as: Q
{pool, _agent} = start_pool_with_disconnect_on_execute()
Ownership.ownership_checkout(pool, [])

P.transaction(pool, fn conn ->
P.execute(conn, %Q{}, [:param])
error = assert_raise DBConnection.ConnectionError, fn -> P.execute!(conn, %Q{}, [:param]) end
assert String.starts_with?(error.message, "connection is closed")
:closed
end)
end

defp start_pool(opts \\ []) do
stack = [{:ok, :state}] ++ List.duplicate({:idle, :state}, 10)
{:ok, agent} = A.start_link(stack)
Expand All @@ -690,6 +743,21 @@
{:ok, pid, opts}
end

# Helper to set up a pool that will disconnect during a transaction.
# This is needed to trigger Holder's "connection is closed" error.
defp start_pool_with_disconnect_on_execute(opts \\ []) do
err = %DBConnection.ConnectionError{message: "test error"}
# The stack represents TestAgent responses: initial connect, begin transaction,
# disconnect on first execute, then allow reconnect.
stack = [{:ok, :state}, {:ok, :began, :new_state}, {:disconnect, err, :newer_state}, :ok, {:ok, :state}]

{:ok, agent} = A.start_link(stack)
pool_opts = [agent: agent, parent: self(), ownership_mode: :manual] ++ opts
{:ok, pool} = P.start_link(pool_opts)

{pool, agent}
end

defp async_no_callers(fun) do
Task.async(fn ->
Process.delete(:"$callers")
Expand Down
47 changes: 36 additions & 11 deletions lib/db_connection/holder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule DBConnection.Holder do
@time_unit 1000

Record.defrecord(:conn, [:connection, :module, :state, :lock, :ts, deadline: nil, status: :ok])
Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock])
Record.defrecord(:pool_ref, [:pool, :reference, :deadline, :holder, :lock, :label])

@type t :: :ets.tid()
@type checkin_time :: non_neg_integer() | nil
Expand Down Expand Up @@ -131,31 +131,49 @@ defmodule DBConnection.Holder do
end

defp handle_or_cleanup(type, pool_ref, fun, args, opts) do
pool_ref(holder: holder, lock: lock) = pool_ref
pool_ref(holder: holder, lock: lock, label: label) = pool_ref

try do
:ets.lookup(holder, :conn)
rescue
ArgumentError ->
msg = "connection is closed because of an error, disconnect or timeout"
msg =
maybe_prefix_label(
"connection is closed because of an error, disconnect or timeout",
label
)

{:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused}
else
[conn(lock: conn_lock)] when conn_lock != lock ->
raise "an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}"
raise maybe_prefix_label(
"an outdated connection has been given to DBConnection on #{fun}/#{length(args) + 2}",
label,
":"
)

[conn(status: :error)] ->
msg = "connection is closed because of an error, disconnect or timeout"
msg =
maybe_prefix_label(
"connection is closed because of an error, disconnect or timeout",
label
)

{:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused}

[conn(status: :aborted)] when type != :cleanup ->
msg = "transaction rolling back"
msg = maybe_prefix_label("transaction rolling back", label)
{:disconnect, DBConnection.ConnectionError.exception(msg), _state = :unused}

[conn(module: module, state: state)] ->
holder_apply(holder, module, fun, args ++ [opts, state])
end
end

defp maybe_prefix_label(msg, label, separator \\ "") do
if label, do: "#{inspect(label)} " <> separator <> msg, else: msg
end

## Pool state helpers API (invoked by callers)

@spec put_state(pool_ref :: any, term) :: :ok
Expand Down Expand Up @@ -196,9 +214,9 @@ defmodule DBConnection.Holder do
:ok
end

@spec handle_checkout(t, {pid, reference}, reference, checkin_time) :: boolean
def handle_checkout(holder, {pid, mref}, ref, checkin_time) do
:ets.give_away(holder, pid, {mref, ref, checkin_time})
@spec handle_checkout(t, {pid, reference}, reference, checkin_time, label :: any) :: boolean
def handle_checkout(holder, {pid, mref}, ref, checkin_time, label \\ nil) do
:ets.give_away(holder, pid, {mref, ref, checkin_time, label})
rescue
ArgumentError ->
if Process.alive?(pid) or :ets.info(holder, :owner) != self() do
Expand Down Expand Up @@ -290,13 +308,20 @@ defmodule DBConnection.Holder do
send(pid, {:db_connection, {self(), lock}, {:checkout, callers, start, queue?}})

receive do
{:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time}} ->
{:"ETS-TRANSFER", holder, pool, {^lock, ref, checkin_time, label}} ->
Process.demonitor(lock, [:flush])
{deadline, ops} = start_deadline(timeout, pool, ref, holder, start)
:ets.update_element(holder, :conn, [{conn(:lock) + 1, lock} | ops])

pool_ref =
pool_ref(pool: pool, reference: ref, deadline: deadline, holder: holder, lock: lock)
pool_ref(
pool: pool,
reference: ref,
deadline: deadline,
holder: holder,
lock: lock,
label: label
)

checkout_result(holder, pool_ref, checkin_time)

Expand Down
16 changes: 12 additions & 4 deletions lib/db_connection/ownership/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ defmodule DBConnection.Ownership.Manager do
mode = Keyword.get(pool_opts, :ownership_mode, :auto)
checkout_opts = Keyword.take(pool_opts, [:ownership_timeout, :queue_target, :queue_interval])

Util.set_label({__MODULE__, pool_opts[:label]})

{:ok,
%{
pool: pool,
Expand All @@ -107,7 +109,8 @@ defmodule DBConnection.Ownership.Manager do
mode: mode,
mode_ref: nil,
ets: ets,
log: log
log: log,
label: pool_opts[:label]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are passing the label, wouldn't make more sense to use it for Process.set_label? And then we don't. need to pass it around? It just becomes part of our Util.inspect_pid ?

Copy link
Contributor Author

@nathanl nathanl Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the repo name would be a good label for the ownership manager - it isn't the repo per se, and probably ecto_sql and other callers shouldn't tell it how to label itself. I'm just trying to have this info available for the not_found error for this connection.

Copy link
Member

@josevalim josevalim Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could compose it {__MODULE__, label}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josevalim I updated to set the process label. However, to get the repo name in the errors in DBConnection.Holder we have to pass them along, and I thought fetching the label back out of the process label was awkward. So I'm also putting it in to process state. Thoughts?

}}
end

Expand Down Expand Up @@ -223,7 +226,7 @@ defmodule DBConnection.Ownership.Manager do
{:noreply, state}

:not_found when mode == :manual ->
not_found(from, mode)
not_found(from, mode, state.label)
{:noreply, state}

:not_found ->
Expand Down Expand Up @@ -253,6 +256,9 @@ defmodule DBConnection.Ownership.Manager do
defp proxy_checkout(state, caller, opts) do
%{pool: pool, checkouts: checkouts, owners: owners, ets: ets, log: log, mode: mode} = state

label = state.label
opts = if label, do: Keyword.put(opts, :label, label), else: opts

{:ok, proxy} =
DynamicSupervisor.start_child(
DBConnection.Ownership.Supervisor,
Expand Down Expand Up @@ -405,10 +411,12 @@ defmodule DBConnection.Ownership.Manager do
caller
end

defp not_found({pid, _} = from, mode) do
defp not_found({pid, _} = from, mode, label) do
msg = """
cannot find ownership process for #{Util.inspect_pid(pid)}
using mode #{inspect(mode)}.
#{if label, do: "(#{inspect(label)})"} using mode #{inspect(mode)}.
(Note that a connection's mode reverts to :manual if its owner
terminates.)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on line 381


When using ownership, you must manage connections in one
of the four ways:
Expand Down
11 changes: 9 additions & 2 deletions lib/db_connection/ownership/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

pre_checkin = Keyword.get(pool_opts, :pre_checkin, fn _, mod, state -> {:ok, mod, state} end)
post_checkout = Keyword.get(pool_opts, :post_checkout, &{:ok, &1, &2})
label = pool_opts[:label]

if label do
Process.set_label({__MODULE__, label})

Check warning on line 40 in lib/db_connection/ownership/proxy.ex

View workflow job for this annotation

GitHub Actions / test (1.14, 24.2)

Process.set_label/1 is undefined or private
end

state = %{
client: nil,
Expand Down Expand Up @@ -193,8 +198,10 @@
{:reply, connection_metrics, state}
end

defp checkout({pid, ref} = from, %{holder: holder} = state) do
if Holder.handle_checkout(holder, from, ref, nil) do
defp checkout({pid, ref} = from, %{holder: holder, pool_opts: pool_opts} = state) do
label = pool_opts[:label]

if Holder.handle_checkout(holder, from, ref, nil, label) do
{:noreply, %{state | client: {pid, ref, pruned_stacktrace(pid)}}}
else
next(state)
Expand Down
Loading