Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do

@type conn() :: Postgrex.conn()
@type filter :: {binary, binary, binary}
@type subscription_params :: {binary, binary, [filter]}
@type subscription_params :: {action_filter :: binary, schema :: binary, table :: binary, [filter]}
@type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]

@filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
Expand Down Expand Up @@ -36,26 +36,32 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
subscription_id,
entity,
filters,
claims
claims,
action_filter
)
select
$4::text::uuid,
sub_tables.entity,
$6,
$5
$5,
$7
from
sub_tables
on conflict
(subscription_id, entity, filters)
(subscription_id, entity, filters, action_filter)
do update set
claims = excluded.claims,
created_at = now()
returning
id"

transaction(conn, fn conn ->
Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params = {schema, table, filters}} ->
case query(conn, sql, [publication, schema, table, id, claims, filters]) do
Enum.map(subscription_list, fn %{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

later we should extract this to a function as this is a bit cluttered but not important now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will refactor!

id: id,
claims: claims,
subscription_params: params = {action_filter, schema, table, filters}
} ->
case query(conn, sql, [publication, schema, table, id, claims, filters, action_filter]) do
{:ok, %{num_rows: num} = result} when num > 0 ->
send(manager, {:subscribed, {caller, id}})
result
Expand All @@ -80,8 +86,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
:exit, reason -> {:error, {:exit, reason}}
end

defp params_to_log({schema, table, filters}) do
[schema: schema, table: table, filters: filters]
defp params_to_log({action_filter, schema, table, filters}) do
[event: action_filter, schema: schema, table: table, filters: filters]
|> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
end

Expand Down Expand Up @@ -168,27 +174,27 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
## Examples

iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
{:ok, {"public", "messages", [{"subject", "eq", "hey"}]}}
{:ok, {"*", "public", "messages", [{"subject", "eq", "hey"}]}}

`in` filter:

iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
{:ok, {"public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
{:ok, {"*", "public", "messages", [{"subject", "in", "{hidee,ho}"}]}}

no filter:

iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
{:ok, {"public", "messages", []}}
{:ok, {"*", "public", "messages", []}}

only schema:

iex> parse_subscription_params(%{"schema" => "public"})
{:ok, {"public", "*", []}}
{:ok, {"*", "public", "*", []}}

only table:

iex> parse_subscription_params(%{"table" => "messages"})
{:ok, {"public", "messages", []}}
{:ok, {"*", "public", "messages", []}}

An unsupported filter will respond with an error tuple:

Expand All @@ -209,13 +215,15 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do

@spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
def parse_subscription_params(params) do
action_filter = action_filter(params)

case params do
%{"schema" => schema, "table" => table, "filter" => filter} ->
with [col, rest] <- String.split(filter, "=", parts: 2),
[filter_type, value] when filter_type in @filter_types <-
String.split(rest, ".", parts: 2),
{:ok, formatted_value} <- format_filter_value(filter_type, value) do
{:ok, {schema, table, [{col, filter_type, formatted_value}]}}
{:ok, {action_filter, schema, table, [{col, filter_type, formatted_value}]}}
else
{:error, msg} ->
{:error, "Error parsing `filter` params: #{msg}"}
Expand All @@ -225,13 +233,13 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
end

%{"schema" => schema, "table" => table} ->
{:ok, {schema, table, []}}
{:ok, {action_filter, schema, table, []}}

%{"schema" => schema} ->
{:ok, {schema, "*", []}}
{:ok, {action_filter, schema, "*", []}}

%{"table" => table} ->
{:ok, {"public", table, []}}
{:ok, {action_filter, "public", table, []}}

map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
{:error,
Expand All @@ -243,6 +251,19 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
end
end

defp action_filter(%{"event" => "*"}), do: "*"

defp action_filter(%{"event" => event}) when is_binary(event) do
case String.upcase(event) do
"INSERT" -> "INSERT"
"UPDATE" -> "UPDATE"
"DELETE" -> "DELETE"
_ -> "*"
end
end

defp action_filter(_), do: "*"

defp format_filter_value(filter, value) do
case filter do
"in" ->
Expand Down
8 changes: 6 additions & 2 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ defmodule Realtime.Tenants.Migrations do
RunSubscriptionIndexBridgingDisabled,
BroadcastSendErrorLogging,
CreateMessagesReplayIndex,
BroadcastSendIncludePayloadId
BroadcastSendIncludePayloadId,
AddActionToSubscriptions,
FilterActionPostgresChanges
}

@migrations [
Expand Down Expand Up @@ -145,7 +147,9 @@ defmodule Realtime.Tenants.Migrations do
{20_250_523_164_012, RunSubscriptionIndexBridgingDisabled},
{20_250_714_121_412, BroadcastSendErrorLogging},
{20_250_905_041_441, CreateMessagesReplayIndex},
{20_251_103_001_201, BroadcastSendIncludePayloadId}
{20_251_103_001_201, BroadcastSendIncludePayloadId},
{20_251_120_212_548, AddActionToSubscriptions},
{20_251_120_215_549, FilterActionPostgresChanges}
]

defstruct [:tenant_external_id, :settings]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Realtime.Tenants.Migrations.AddActionToSubscriptions do
@moduledoc false
use Ecto.Migration

def up do
execute("""
ALTER TABLE realtime.subscription
ADD COLUMN action_filter text DEFAULT '*' CHECK (action_filter IN ('*', 'INSERT', 'UPDATE', 'DELETE'));
""")

execute("""
CREATE UNIQUE INDEX subscription_subscription_id_entity_filters_action_filter_key on realtime.subscription (subscription_id, entity, filters, action_filter);
""")

execute("""
DROP INDEX IF EXISTS "realtime"."subscription_subscription_id_entity_filters_key";
""")
end

def down do
execute("""
ALTER TABLE realtime.subscription DROP COLUMN action_filter;
""")

execute("""
CREATE UNIQUE INDEX subscription_subscription_id_entity_filters_key on realtime.subscription (subscription_id, entity, filters)
""")

execute("""
DROP INDEX IF EXISTS "realtime"."subscription_subscription_id_entity_filters_action_filter_key";
""")
end
end
Loading