From 18519f037269fdd1ecec8e218aee5b082e367b45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Mon, 15 Dec 2025 14:04:50 +0000 Subject: [PATCH 1/3] fix: run migrations on tenants nearest region --- lib/realtime/nodes.ex | 5 -- lib/realtime/tenants.ex | 15 ++-- lib/realtime/tenants/migrations.ex | 39 ++++++++--- mix.exs | 2 +- .../region_aware_migrations_test.exs | 70 +++++++++++++++++++ test/realtime/nodes_test.exs | 4 +- .../controllers/tenant_controller_test.exs | 2 +- 7 files changed, 109 insertions(+), 28 deletions(-) create mode 100644 test/integration/region_aware_migrations_test.exs diff --git a/lib/realtime/nodes.ex b/lib/realtime/nodes.ex index 9203539b3..e57b9c7e0 100644 --- a/lib/realtime/nodes.ex +++ b/lib/realtime/nodes.ex @@ -94,11 +94,6 @@ defmodule Realtime.Nodes do @spec launch_node(String.t(), String.t() | nil, atom()) :: atom() def launch_node(tenant_id, region, default) do case region_nodes(region) do - [node] -> - Logger.warning("Only one region node (#{inspect(node)}) for #{region} using default #{inspect(default)}") - - default - [] -> Logger.warning("Zero region nodes for #{region} using #{inspect(default)}") default diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 87d19cd65..2cf2dec34 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -98,13 +98,11 @@ defmodule Realtime.Tenants do connected_cluster when is_integer(connected_cluster) -> tenant = Cache.get_tenant_by_external_id(external_id) - {:ok, db_conn} = Database.connect(tenant, "realtime_health_check") - Process.alive?(db_conn) && GenServer.stop(db_conn) - Migrations.run_migrations(tenant) + result? = Migrations.run_migrations(tenant) {:ok, %{ - healthy: true, + healthy: result? == :ok || result? == :noop, db_connected: false, connected_cluster: connected_cluster, region: region, @@ -475,10 +473,11 @@ defmodule Realtime.Tenants do @doc """ Checks if migrations for a given tenant need to run. """ - @spec run_migrations?(Tenant.t()) :: boolean() - def run_migrations?(%Tenant{} = tenant) do - tenant.migrations_ran < Enum.count(Migrations.migrations()) - end + @spec run_migrations?(Tenant.t() | integer()) :: boolean() + def run_migrations?(%Tenant{} = tenant), do: run_migrations?(tenant.migrations_ran) + + def run_migrations?(migrations_ran) when is_integer(migrations_ran), + do: migrations_ran < Enum.count(Migrations.migrations()) @doc """ Broadcasts an operation event to the tenant's operations channel. diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index 46ea7864f..47f4ad718 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -11,6 +11,8 @@ defmodule Realtime.Tenants.Migrations do alias Realtime.Repo alias Realtime.Api.Tenant alias Realtime.Api + alias Realtime.Nodes + alias Realtime.GenRpc alias Realtime.Tenants.Migrations.{ CreateRealtimeSubscriptionTable, @@ -148,7 +150,7 @@ defmodule Realtime.Tenants.Migrations do {20_251_103_001_201, BroadcastSendIncludePayloadId} ] - defstruct [:tenant_external_id, :settings] + defstruct [:tenant_external_id, :settings, migrations_ran: 0] @type t :: %__MODULE__{ tenant_external_id: binary(), @@ -160,24 +162,39 @@ defmodule Realtime.Tenants.Migrations do """ @spec run_migrations(Tenant.t()) :: :ok | :noop | {:error, any()} def run_migrations(%Tenant{} = tenant) do - %{extensions: [%{settings: settings} | _]} = tenant - attrs = %__MODULE__{tenant_external_id: tenant.external_id, settings: settings} + if Tenants.run_migrations?(tenant) do + %{extensions: [%{settings: settings} | _]} = tenant - supervisor = - {:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, tenant.external_id}} + attrs = %__MODULE__{ + tenant_external_id: tenant.external_id, + settings: settings, + migrations_ran: tenant.migrations_ran + } - spec = {__MODULE__, attrs} + node = + case Nodes.get_node_for_tenant(tenant) do + {:ok, node, _} -> node + {:error, _} -> node() + end - if Tenants.run_migrations?(tenant) do - case DynamicSupervisor.start_child(supervisor, spec) do - :ignore -> :ok - error -> error - end + GenRpc.call(node, __MODULE__, :start_migration, [attrs], tenant_id: tenant.external_id) else :noop end end + def start_migration(attrs) do + supervisor = + {:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, attrs.tenant_external_id}} + + spec = {__MODULE__, attrs} + + case DynamicSupervisor.start_child(supervisor, spec) do + :ignore -> :ok + error -> error + end + end + def start_link(%__MODULE__{tenant_external_id: tenant_external_id} = attrs) do name = {:via, Registry, {Unique, {__MODULE__, :host, tenant_external_id}}} GenServer.start_link(__MODULE__, attrs, name: name) diff --git a/mix.exs b/mix.exs index 50dfb0b53..7ae856195 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.69.0", + version: "2.69.1", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/region_aware_migrations_test.exs b/test/integration/region_aware_migrations_test.exs new file mode 100644 index 000000000..79c42367c --- /dev/null +++ b/test/integration/region_aware_migrations_test.exs @@ -0,0 +1,70 @@ +defmodule Realtime.Integration.RegionAwareMigrationsTest do + use Realtime.DataCase, async: false + use Mimic + + alias Containers + alias Realtime.Tenants + alias Realtime.Tenants.Migrations + + setup do + {:ok, port} = Containers.checkout() + + settings = [ + %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_host" => "127.0.0.1", + "db_name" => "postgres", + "db_user" => "supabase_admin", + "db_password" => "postgres", + "db_port" => "#{port}", + "poll_interval" => 100, + "poll_max_changes" => 100, + "poll_max_record_bytes" => 1_048_576, + "region" => "ap-southeast-2", + "publication" => "supabase_realtime_test", + "ssl_enforced" => false + } + } + ] + + tenant = tenant_fixture(%{extensions: settings}) + region = Application.get_env(:realtime, :region) + + {:ok, node} = + Clustered.start(nil, + extra_config: [ + {:realtime, :region, Tenants.region(tenant)}, + {:realtime, :master_region, region} + ] + ) + + on_exit(fn -> Clustered.stop() end) + + %{tenant: tenant, node: node} + end + + test "run_migrations routes to node in tenant's region with expected arguments", %{tenant: tenant, node: node} do + assert tenant.migrations_ran == 0 + + Realtime.GenRpc + |> Mimic.expect(:call, fn called_node, mod, func, args, opts -> + assert called_node == node + assert mod == Migrations + assert func == :start_migration + assert opts[:tenant_id] == tenant.external_id + + arg = hd(args) + assert arg.tenant_external_id == tenant.external_id + assert arg.migrations_ran == tenant.migrations_ran + assert arg.settings == hd(tenant.extensions).settings + + call_original(Realtime.GenRpc, :call, [node, mod, func, args, opts]) + end) + + assert :ok = Migrations.run_migrations(tenant) + Process.sleep(1000) + tenant = Realtime.Repo.reload!(tenant) + refute tenant.migrations_ran == 0 + end +end diff --git a/test/realtime/nodes_test.exs b/test/realtime/nodes_test.exs index f768bb89f..b127ed605 100644 --- a/test/realtime/nodes_test.exs +++ b/test/realtime/nodes_test.exs @@ -108,7 +108,7 @@ defmodule Realtime.NodesTest do assert region == expected_region end - test "on existing tenant id, and a single node for a given region, returns default", %{ + test "on existing tenant id, and a single node for a given region, returns single node", %{ tenant: tenant, region: region } do @@ -117,7 +117,7 @@ defmodule Realtime.NodesTest do expected_region = Tenants.region(tenant) - assert node == node() + assert node != node() assert region == expected_region end diff --git a/test/realtime_web/controllers/tenant_controller_test.exs b/test/realtime_web/controllers/tenant_controller_test.exs index 86186245b..95c7ab762 100644 --- a/test/realtime_web/controllers/tenant_controller_test.exs +++ b/test/realtime_web/controllers/tenant_controller_test.exs @@ -419,7 +419,7 @@ defmodule RealtimeWeb.TenantControllerTest do conn = get(conn, ~p"/api/tenants/#{tenant.external_id}/health") data = json_response(conn, 200)["data"] - Process.sleep(2000) + Process.sleep(1000) assert {:ok, %{rows: []}} = Postgrex.query(db_conn, "SELECT * FROM realtime.messages", []) From 2bb46bd336e0d576a50cde2684db2c267041fa9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Mon, 22 Dec 2025 14:52:14 +0000 Subject: [PATCH 2/3] reduce flakiness of test as we wait for syn to subscribe the node --- test/integration/region_aware_migrations_test.exs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/integration/region_aware_migrations_test.exs b/test/integration/region_aware_migrations_test.exs index 79c42367c..ed2064e5d 100644 --- a/test/integration/region_aware_migrations_test.exs +++ b/test/integration/region_aware_migrations_test.exs @@ -39,6 +39,8 @@ defmodule Realtime.Integration.RegionAwareMigrationsTest do ] ) + Process.sleep(100) + on_exit(fn -> Clustered.stop() end) %{tenant: tenant, node: node} From c7578c8a673f6c248cdd8ff2668bcd4dfd013751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Tue, 23 Dec 2025 15:21:59 +0000 Subject: [PATCH 3/3] remove flaky code --- test/integration/region_aware_migrations_test.exs | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/integration/region_aware_migrations_test.exs b/test/integration/region_aware_migrations_test.exs index ed2064e5d..892ed2382 100644 --- a/test/integration/region_aware_migrations_test.exs +++ b/test/integration/region_aware_migrations_test.exs @@ -41,8 +41,6 @@ defmodule Realtime.Integration.RegionAwareMigrationsTest do Process.sleep(100) - on_exit(fn -> Clustered.stop() end) - %{tenant: tenant, node: node} end