diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index 45da8b052..55788e83d 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -8,7 +8,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do @type conn() :: Postgrex.conn() @type filter :: {binary, binary, binary} - @type subscription_params :: {binary, binary, [filter]} + @type subscription_params :: {action_filter :: binary, schema :: binary, table :: binary, [filter]} @type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}] @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"] @@ -36,17 +36,19 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do subscription_id, entity, filters, - claims + claims, + action_filter ) select $4::text::uuid, sub_tables.entity, $6, - $5 + $5, + $7 from sub_tables on conflict - (subscription_id, entity, filters) + (subscription_id, entity, filters, action_filter) do update set claims = excluded.claims, created_at = now() @@ -54,8 +56,12 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do id" transaction(conn, fn conn -> - Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params = {schema, table, filters}} -> - case query(conn, sql, [publication, schema, table, id, claims, filters]) do + Enum.map(subscription_list, fn %{ + id: id, + claims: claims, + subscription_params: params = {action_filter, schema, table, filters} + } -> + case query(conn, sql, [publication, schema, table, id, claims, filters, action_filter]) do {:ok, %{num_rows: num} = result} when num > 0 -> send(manager, {:subscribed, {caller, id}}) result @@ -80,8 +86,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do :exit, reason -> {:error, {:exit, reason}} end - defp params_to_log({schema, table, filters}) do - [schema: schema, table: table, filters: filters] + defp params_to_log({action_filter, schema, table, filters}) do + [event: action_filter, schema: schema, table: table, filters: filters] |> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end) end @@ -168,27 +174,27 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do ## Examples iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"}) - {:ok, {"public", "messages", [{"subject", "eq", "hey"}]}} + {:ok, {"*", "public", "messages", [{"subject", "eq", "hey"}]}} `in` filter: iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"}) - {:ok, {"public", "messages", [{"subject", "in", "{hidee,ho}"}]}} + {:ok, {"*", "public", "messages", [{"subject", "in", "{hidee,ho}"}]}} no filter: iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"}) - {:ok, {"public", "messages", []}} + {:ok, {"*", "public", "messages", []}} only schema: iex> parse_subscription_params(%{"schema" => "public"}) - {:ok, {"public", "*", []}} + {:ok, {"*", "public", "*", []}} only table: iex> parse_subscription_params(%{"table" => "messages"}) - {:ok, {"public", "messages", []}} + {:ok, {"*", "public", "messages", []}} An unsupported filter will respond with an error tuple: @@ -209,13 +215,15 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do @spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()} def parse_subscription_params(params) do + action_filter = action_filter(params) + case params do %{"schema" => schema, "table" => table, "filter" => filter} -> with [col, rest] <- String.split(filter, "=", parts: 2), [filter_type, value] when filter_type in @filter_types <- String.split(rest, ".", parts: 2), {:ok, formatted_value} <- format_filter_value(filter_type, value) do - {:ok, {schema, table, [{col, filter_type, formatted_value}]}} + {:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}} else {:error, msg} -> {:error, "Error parsing `filter` params: #{msg}"} @@ -225,13 +233,13 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do end %{"schema" => schema, "table" => table} -> - {:ok, {schema, table, []}} + {:ok, {action_filter, schema, table, []}} %{"schema" => schema} -> - {:ok, {schema, "*", []}} + {:ok, {action_filter, schema, "*", []}} %{"table" => table} -> - {:ok, {"public", table, []}} + {:ok, {action_filter, "public", table, []}} map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") -> {:error, @@ -243,6 +251,19 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do end end + defp action_filter(%{"event" => "*"}), do: "*" + + defp action_filter(%{"event" => event}) when is_binary(event) do + case String.upcase(event) do + "INSERT" -> "INSERT" + "UPDATE" -> "UPDATE" + "DELETE" -> "DELETE" + _ -> "*" + end + end + + defp action_filter(_), do: "*" + defp format_filter_value(filter, value) do case filter do "in" -> diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index 46ea7864f..12f79f449 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -77,7 +77,9 @@ defmodule Realtime.Tenants.Migrations do RunSubscriptionIndexBridgingDisabled, BroadcastSendErrorLogging, CreateMessagesReplayIndex, - BroadcastSendIncludePayloadId + BroadcastSendIncludePayloadId, + AddActionToSubscriptions, + FilterActionPostgresChanges } @migrations [ @@ -145,7 +147,9 @@ defmodule Realtime.Tenants.Migrations do {20_250_523_164_012, RunSubscriptionIndexBridgingDisabled}, {20_250_714_121_412, BroadcastSendErrorLogging}, {20_250_905_041_441, CreateMessagesReplayIndex}, - {20_251_103_001_201, BroadcastSendIncludePayloadId} + {20_251_103_001_201, BroadcastSendIncludePayloadId}, + {20_251_120_212_548, AddActionToSubscriptions}, + {20_251_120_215_549, FilterActionPostgresChanges} ] defstruct [:tenant_external_id, :settings] diff --git a/lib/realtime/tenants/repo/migrations/20251120212548_add_action_to_subscriptions.ex b/lib/realtime/tenants/repo/migrations/20251120212548_add_action_to_subscriptions.ex new file mode 100644 index 000000000..2157ac630 --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20251120212548_add_action_to_subscriptions.ex @@ -0,0 +1,33 @@ +defmodule Realtime.Tenants.Migrations.AddActionToSubscriptions do + @moduledoc false + use Ecto.Migration + + def up do + execute(""" + ALTER TABLE realtime.subscription + ADD COLUMN action_filter text DEFAULT '*' CHECK (action_filter IN ('*', 'INSERT', 'UPDATE', 'DELETE')); + """) + + execute(""" + CREATE UNIQUE INDEX subscription_subscription_id_entity_filters_action_filter_key on realtime.subscription (subscription_id, entity, filters, action_filter); + """) + + execute(""" + DROP INDEX IF EXISTS "realtime"."subscription_subscription_id_entity_filters_key"; + """) + end + + def down do + execute(""" + ALTER TABLE realtime.subscription DROP COLUMN action_filter; + """) + + execute(""" + CREATE UNIQUE INDEX subscription_subscription_id_entity_filters_key on realtime.subscription (subscription_id, entity, filters) + """) + + execute(""" + DROP INDEX IF EXISTS "realtime"."subscription_subscription_id_entity_filters_action_filter_key"; + """) + end +end diff --git a/lib/realtime/tenants/repo/migrations/20251120215549_filter_action_postgres_changes.ex b/lib/realtime/tenants/repo/migrations/20251120215549_filter_action_postgres_changes.ex new file mode 100644 index 000000000..6421acb9a --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20251120215549_filter_action_postgres_changes.ex @@ -0,0 +1,619 @@ +defmodule Realtime.Tenants.Migrations.FilterActionPostgresChanges do + @moduledoc false + use Ecto.Migration + + def up do + execute """ + create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + returns setof realtime.wal_rls + language plpgsql + volatile + as $$ + declare + -- Regclass of the table e.g. public.notes + entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; + + -- I, U, D, T: insert, update ... + action realtime.action = ( + case wal ->> 'action' + when 'I' then 'INSERT' + when 'U' then 'UPDATE' + when 'D' then 'DELETE' + else 'ERROR' + end + ); + + -- Is row level security enabled for the table + is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; + + subscriptions realtime.subscription[] = array_agg(subs) + from + realtime.subscription subs + where + subs.entity = entity_ + -- Filter by action early - only get subscriptions interested in this action + -- action_filter column can be: '*' (all), 'INSERT', 'UPDATE', or 'DELETE' + and (subs.action_filter = '*' or subs.action_filter = action::text); + + -- Subscription vars + roles regrole[] = array_agg(distinct us.claims_role::text) + from + unnest(subscriptions) us; + + working_role regrole; + claimed_role regrole; + claims jsonb; + + subscription_id uuid; + subscription_has_access bool; + visible_to_subscription_ids uuid[] = '{}'; + + -- structured info for wal's columns + columns realtime.wal_column[]; + -- previous identity values for update/delete + old_columns realtime.wal_column[]; + + error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; + + -- Primary jsonb output for record + output jsonb; + + begin + perform set_config('role', null, true); + + columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'columns') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + old_columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'identity') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + for working_role in select * from unnest(roles) loop + + -- Update `is_selectable` for columns and old_columns + columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(columns) c; + + old_columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(old_columns) c; + + if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + -- subscriptions is already filtered by entity + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 400: Bad Request, no primary key'] + )::realtime.wal_rls; + + -- The claims role does not have SELECT permission to the primary key of entity + elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 401: Unauthorized'] + )::realtime.wal_rls; + + else + output = jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action, + 'commit_timestamp', to_char( + ((wal ->> 'timestamp')::timestamptz at time zone 'utc'), + 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"' + ), + 'columns', ( + select + jsonb_agg( + jsonb_build_object( + 'name', pa.attname, + 'type', pt.typname + ) + order by pa.attnum asc + ) + from + pg_attribute pa + join pg_type pt + on pa.atttypid = pt.oid + where + attrelid = entity_ + and attnum > 0 + and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') + ) + ) + -- Add "record" key for insert and update + || case + when action in ('INSERT', 'UPDATE') then + jsonb_build_object( + 'record', + ( + select + jsonb_object_agg( + -- if unchanged toast, get column name and value from old record + coalesce((c).name, (oc).name), + case + when (c).name is null then (oc).value + else (c).value + end + ) + from + unnest(columns) c + full outer join unnest(old_columns) oc + on (c).name = (oc).name + where + coalesce((c).is_selectable, (oc).is_selectable) + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + else '{}'::jsonb + end + -- Add "old_record" key for update and delete + || case + when action = 'UPDATE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + when action = 'DELETE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey + ) + ) + else '{}'::jsonb + end; + + -- Create the prepared statement + if is_rls_enabled and action <> 'DELETE' then + if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then + deallocate walrus_rls_stmt; + end if; + execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); + end if; + + visible_to_subscription_ids = '{}'; + + for subscription_id, claims in ( + select + subs.subscription_id, + subs.claims + from + unnest(subscriptions) subs + where + subs.entity = entity_ + and subs.claims_role = working_role + and ( + realtime.is_visible_through_filters(columns, subs.filters) + or ( + action = 'DELETE' + and realtime.is_visible_through_filters(old_columns, subs.filters) + ) + ) + ) loop + + if not is_rls_enabled or action = 'DELETE' then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + else + -- Check if RLS allows the role to see the record + perform + -- Trim leading and trailing quotes from working_role because set_config + -- doesn't recognize the role as valid if they are included + set_config('role', trim(both '"' from working_role::text), true), + set_config('request.jwt.claims', claims::text, true); + + execute 'execute walrus_rls_stmt' into subscription_has_access; + + if subscription_has_access then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + end if; + end if; + end loop; + + perform set_config('role', null, true); + + return next ( + output, + is_rls_enabled, + visible_to_subscription_ids, + case + when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] + else '{}' + end + )::realtime.wal_rls; + + end if; + end loop; + + perform set_config('role', null, true); + end; + $$; + """ + end + + def down do + execute """ + create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) + returns setof realtime.wal_rls + language plpgsql + volatile + as $$ + declare + -- Regclass of the table e.g. public.notes + entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; + + -- I, U, D, T: insert, update ... + action realtime.action = ( + case wal ->> 'action' + when 'I' then 'INSERT' + when 'U' then 'UPDATE' + when 'D' then 'DELETE' + else 'ERROR' + end + ); + + -- Is row level security enabled for the table + is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; + + subscriptions realtime.subscription[] = array_agg(subs) + from + realtime.subscription subs + where + subs.entity = entity_; + + -- Subscription vars + roles regrole[] = array_agg(distinct us.claims_role::text) + from + unnest(subscriptions) us; + + working_role regrole; + claimed_role regrole; + claims jsonb; + + subscription_id uuid; + subscription_has_access bool; + visible_to_subscription_ids uuid[] = '{}'; + + -- structured info for wal's columns + columns realtime.wal_column[]; + -- previous identity values for update/delete + old_columns realtime.wal_column[]; + + error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; + + -- Primary jsonb output for record + output jsonb; + + begin + perform set_config('role', null, true); + + columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'columns') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + old_columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'identity') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + + for working_role in select * from unnest(roles) loop + + -- Update `is_selectable` for columns and old_columns + columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(columns) c; + + old_columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(old_columns) c; + + if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + -- subscriptions is already filtered by entity + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 400: Bad Request, no primary key'] + )::realtime.wal_rls; + + -- The claims role does not have SELECT permission to the primary key of entity + elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 401: Unauthorized'] + )::realtime.wal_rls; + + else + output = jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action, + 'commit_timestamp', to_char( + ((wal ->> 'timestamp')::timestamptz at time zone 'utc'), + 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"' + ), + 'columns', ( + select + jsonb_agg( + jsonb_build_object( + 'name', pa.attname, + 'type', pt.typname + ) + order by pa.attnum asc + ) + from + pg_attribute pa + join pg_type pt + on pa.atttypid = pt.oid + where + attrelid = entity_ + and attnum > 0 + and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') + ) + ) + -- Add "record" key for insert and update + || case + when action in ('INSERT', 'UPDATE') then + jsonb_build_object( + 'record', + ( + select + jsonb_object_agg( + -- if unchanged toast, get column name and value from old record + coalesce((c).name, (oc).name), + case + when (c).name is null then (oc).value + else (c).value + end + ) + from + unnest(columns) c + full outer join unnest(old_columns) oc + on (c).name = (oc).name + where + coalesce((c).is_selectable, (oc).is_selectable) + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + else '{}'::jsonb + end + -- Add "old_record" key for update and delete + || case + when action = 'UPDATE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + when action = 'DELETE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey + ) + ) + else '{}'::jsonb + end; + + -- Create the prepared statement + if is_rls_enabled and action <> 'DELETE' then + if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then + deallocate walrus_rls_stmt; + end if; + execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); + end if; + + visible_to_subscription_ids = '{}'; + + for subscription_id, claims in ( + select + subs.subscription_id, + subs.claims + from + unnest(subscriptions) subs + where + subs.entity = entity_ + and subs.claims_role = working_role + and ( + realtime.is_visible_through_filters(columns, subs.filters) + or ( + action = 'DELETE' + and realtime.is_visible_through_filters(old_columns, subs.filters) + ) + ) + ) loop + + if not is_rls_enabled or action = 'DELETE' then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + else + -- Check if RLS allows the role to see the record + perform + -- Trim leading and trailing quotes from working_role because set_config + -- doesn't recognize the role as valid if they are included + set_config('role', trim(both '"' from working_role::text), true), + set_config('request.jwt.claims', claims::text, true); + + execute 'execute walrus_rls_stmt' into subscription_has_access; + + if subscription_has_access then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + end if; + end if; + end loop; + + perform set_config('role', null, true); + + return next ( + output, + is_rls_enabled, + visible_to_subscription_ids, + case + when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] + else '{}' + end + )::realtime.wal_rls; + + end if; + end loop; + + perform set_config('role', null, true); + end; + $$; + """ + end +end diff --git a/mix.exs b/mix.exs index bd1294c58..da904db2a 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.67.2", + version: "2.68.0", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index a35c5ef43..1c38b6947 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -34,9 +34,10 @@ defmodule Realtime.Integration.RtChannelTest do {:ok, conn} = Database.connect(tenant, "realtime_test") # Let's drop the publication to cause an error - Database.transaction(conn, fn db_conn -> - Postgrex.query!(db_conn, "drop publication if exists supabase_realtime_test") - end) + {:ok, _} = + Database.transaction(conn, fn db_conn -> + Postgrex.query!(db_conn, "drop publication if exists supabase_realtime_test") + end) {socket, _} = get_connection(tenant, serializer) topic = "realtime:any" @@ -52,7 +53,7 @@ defmodule Realtime.Integration.RtChannelTest do "channel" => "any", "extension" => "postgres_changes", "message" => - "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [schema: public, table: *, filters: []]", + "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [event: INSERT, schema: public, table: *, filters: []]", "status" => "error" }, ref: nil, @@ -2314,7 +2315,7 @@ defmodule Realtime.Integration.RtChannelTest do # Does it recover? assert Connect.ready?(tenant.external_id) {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) - Process.sleep(1000) + Process.sleep(5000) %{rows: [[new_db_pid]]} = Postgrex.query!(db_conn, active_slot_query, []) assert new_db_pid != original_db_pid diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index cdda5bee8..006b8ffd0 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -201,14 +201,14 @@ defmodule Realtime.Extensions.CdcRlsTest do describe "integration" do setup [:integration] - test "subscribe inserts", %{tenant: tenant, conn: conn} do + test "subscribe inserts only", %{tenant: tenant, conn: conn} do on_exit(fn -> PostgresCdcRls.handle_stop(tenant.external_id, 10_000) end) %Tenant{extensions: extensions, external_id: external_id} = tenant postgres_extension = PostgresCdc.filter_settings("postgres_cdc_rls", extensions) args = Map.put(postgres_extension, "id", external_id) - pg_change_params = pubsub_subscribe(external_id) + pg_change_params = pubsub_subscribe(external_id, "INSERT") # First time it will return nil PostgresCdcRls.handle_connect(args) @@ -229,12 +229,17 @@ defmodule Realtime.Extensions.CdcRlsTest do # Now subscribe to the Postgres Changes {:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id) - assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) + + assert %Postgrex.Result{num_rows: 1} = Postgrex.query!(conn, "select id from realtime.subscription", []) # Insert a record %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", []) + # Delete the record + %{num_rows: 1} = Postgrex.query!(conn, "delete from test", []) assert_receive {:socket_push, :text, data}, 5000 + # No DELETE should be received + refute_receive {:socket_push, :text, _data}, 1000 assert %{ "event" => "postgres_changes", @@ -343,7 +348,7 @@ defmodule Realtime.Extensions.CdcRlsTest do %{node: node, response: response} end - test "subscribe inserts distributed mode", %{tenant: tenant, conn: conn, node: node, response: response} do + test "subscribe distributed mode", %{tenant: tenant, conn: conn, node: node, response: response} do %Tenant{extensions: extensions, external_id: external_id} = tenant postgres_extension = PostgresCdc.filter_settings("postgres_cdc_rls", extensions) @@ -353,8 +358,13 @@ defmodule Realtime.Extensions.CdcRlsTest do {:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id) assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) + # Wait for subscription to be executing + Process.sleep(200) + # Insert a record %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", []) + # Delete the record + %{num_rows: 1} = Postgrex.query!(conn, "delete from test", []) assert_receive {:socket_push, :text, data}, 5000 @@ -376,6 +386,26 @@ defmodule Realtime.Extensions.CdcRlsTest do "topic" => "realtime:test" } = Jason.decode!(data) + assert_receive {:socket_push, :text, data}, 5000 + + assert %{ + "event" => "postgres_changes", + "payload" => %{ + "data" => %{ + "columns" => [%{"name" => "id", "type" => "int4"}, %{"name" => "details", "type" => "text"}], + "commit_timestamp" => _, + "errors" => nil, + "type" => "DELETE", + "old_record" => %{"id" => ^id}, + "schema" => "public", + "table" => "test" + }, + "ids" => _ + }, + "ref" => nil, + "topic" => "realtime:test" + } = Jason.decode!(data) + assert_receive { :telemetry, [:realtime, :rpc], @@ -461,11 +491,11 @@ defmodule Realtime.Extensions.CdcRlsTest do %{tenant: tenant, conn: conn} end - defp pubsub_subscribe(external_id) do + defp pubsub_subscribe(external_id, event \\ "*") do pg_change_params = [ %{ id: UUID.uuid1(), - params: %{"event" => "*", "schema" => "public"}, + params: %{"event" => event, "schema" => "public"}, channel_pid: self(), claims: %{ "exp" => System.system_time(:second) + 100_000, diff --git a/test/realtime/extensions/cdc_rls/subscription_manager_test.exs b/test/realtime/extensions/cdc_rls/subscription_manager_test.exs index 3fbde34b5..b087ad953 100644 --- a/test/realtime/extensions/cdc_rls/subscription_manager_test.exs +++ b/test/realtime/extensions/cdc_rls/subscription_manager_test.exs @@ -147,7 +147,7 @@ defmodule Realtime.Extensions.CdcRls.SubscriptionManagerTest do pg_change_params = %{ id: uuid, - subscription_params: {"public", "*", []}, + subscription_params: {"*", "public", "*", []}, claims: %{ "exp" => System.system_time(:second) + 100_000, "iat" => 0, diff --git a/test/realtime/extensions/cdc_rls/subscriptions_test.exs b/test/realtime/extensions/cdc_rls/subscriptions_test.exs index fd2563cf4..e57c985fa 100644 --- a/test/realtime/extensions/cdc_rls/subscriptions_test.exs +++ b/test/realtime/extensions/cdc_rls/subscriptions_test.exs @@ -30,7 +30,19 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do assert {:ok, [%Postgrex.Result{}]} = Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) - %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) + %Postgrex.Result{rows: [[[], "*"]]} = + Postgrex.query!(conn, "select filters, action_filter from realtime.subscription", []) + end + + test "create all tables & all events on INSERT", %{conn: conn} do + {:ok, subscription_params} = Subscriptions.parse_subscription_params(%{"event" => "INSERT", "schema" => "public"}) + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:ok, [%Postgrex.Result{}]} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + + %Postgrex.Result{rows: [[[], "INSERT"]]} = + Postgrex.query!(conn, "select filters, action_filter from realtime.subscription", []) end test "create specific table all events", %{conn: conn} do @@ -53,7 +65,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do assert {:error, {:subscription_insert_failed, - "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [schema: public, table: test, filters: []]"}} = + "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [event: *, schema: public, table: test, filters: []]"}} = Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self()) %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) @@ -67,7 +79,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do assert {:error, {:subscription_insert_failed, - "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [schema: public, table: doesnotexist, filters: []]"}} = + "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [event: *, schema: public, table: doesnotexist, filters: []]"}} = Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self()) %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) @@ -85,7 +97,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do assert {:error, {:subscription_insert_failed, - "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"subject\", \"eq\", \"hey\"}]]. Exception: ERROR P0001 (raise_exception) invalid column for filter subject"}} = + "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [event: *, schema: public, table: test, filters: [{\"subject\", \"eq\", \"hey\"}]]. Exception: ERROR P0001 (raise_exception) invalid column for filter subject"}} = Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self()) %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) @@ -103,7 +115,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do assert {:error, {:subscription_insert_failed, - "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"id\", \"eq\", \"hey\"}]]. Exception: ERROR 22P02 (invalid_text_representation) invalid input syntax for type integer: \"hey\""}} = + "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [event: *, schema: public, table: test, filters: [{\"id\", \"eq\", \"hey\"}]]. Exception: ERROR 22P02 (invalid_text_representation) invalid input syntax for type integer: \"hey\""}} = Subscriptions.create(conn, "supabase_realtime_test", subscription_list, self(), self()) %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) @@ -219,7 +231,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.Subscriptions do "role" => "anon" }, id: UUID.uuid1(), - subscription_params: {"public", "*", []} + subscription_params: {"*", "public", "*", []} } | acc ] diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index d1530c5d3..fde96ea7d 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -108,9 +108,9 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert %{ postgres_changes: [ - %{:id => sub_id, "event" => "INSERT", "schema" => "public", "table" => "test"}, + %{:id => insert_sub_id, "event" => "INSERT", "schema" => "public", "table" => "test"}, %{ - :id => 4_845_530, + :id => delete_sub_id, "event" => "DELETE", "schema" => "public", "table" => "test" @@ -124,9 +124,12 @@ defmodule RealtimeWeb.RealtimeChannelTest do 5000 {:ok, conn} = Connect.lookup_or_start_connection(tenant.external_id) + # Insert, update and delete but update should not be received %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", []) + Postgrex.query!(conn, "update test set details = 'test' where id = $1", [id]) + Postgrex.query!(conn, "delete from test where id = $1", [id]) - assert_push "postgres_changes", %{data: data, ids: [4_845_530, ^sub_id]}, 500 + assert_push "postgres_changes", %{data: data, ids: [^insert_sub_id]}, 500 # we encode and decode because the data is a Jason.Fragment assert %{ @@ -139,8 +142,20 @@ defmodule RealtimeWeb.RealtimeChannelTest do "commit_timestamp" => _ } = Jason.encode!(data) |> Jason.decode!() - refute_receive %Socket.Message{} - refute_receive %Socket.Reply{} + assert_push "postgres_changes", %{data: data, ids: [^delete_sub_id]}, 500 + + # we encode and decode because the data is a Jason.Fragment + assert %{ + "table" => "test", + "type" => "DELETE", + "old_record" => %{"id" => ^id}, + "columns" => [%{"name" => "id", "type" => "int4"}, %{"name" => "details", "type" => "text"}], + "errors" => nil, + "schema" => "public", + "commit_timestamp" => _ + } = Jason.encode!(data) |> Jason.decode!() + + refute_receive _any end test "malformed subscription params", %{tenant: tenant} do @@ -187,7 +202,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert_push "system", %{ message: - "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [schema: public, table: doesnotexist, filters: []]", + "Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [event: *, schema: public, table: doesnotexist, filters: []]", status: "error", extension: "postgres_changes", channel: "test" @@ -218,7 +233,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert_push "system", %{ message: - "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [schema: public, table: test, filters: [{\"notacolumn\", \"eq\", \"123\"}]]. Exception: ERROR P0001 (raise_exception) invalid column for filter notacolumn", + "Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [event: *, schema: public, table: test, filters: [{\"notacolumn\", \"eq\", \"123\"}]]. Exception: ERROR P0001 (raise_exception) invalid column for filter notacolumn", status: "error", extension: "postgres_changes", channel: "test"