From 7f87fb89c60ea0e8aa42107f45c9c6bf75de4a74 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Pinho Date: Mon, 8 Dec 2025 14:18:27 +1300 Subject: [PATCH] feat: add Beacon --- .github/workflows/beacon_tests.yml | 46 +++ Dockerfile | 1 + beacon/.formatter.exs | 4 + beacon/.gitignore | 23 ++ beacon/README.md | 60 ++++ beacon/config/config.exs | 4 + beacon/lib/beacon.ex | 153 ++++++++ beacon/lib/beacon/adapter.ex | 17 + beacon/lib/beacon/adapter/erl_dist.ex | 30 ++ beacon/lib/beacon/partition.ex | 148 ++++++++ beacon/lib/beacon/scope.ex | 200 +++++++++++ beacon/lib/beacon/supervisor.ex | 53 +++ beacon/mix.exs | 34 ++ beacon/mix.lock | 5 + beacon/test/beacon/partition_test.exs | 172 +++++++++ beacon/test/beacon_test.exs | 469 +++++++++++++++++++++++++ beacon/test/support/peer.ex | 89 +++++ beacon/test/test_helper.exs | 3 + lib/realtime/application.ex | 11 + lib/realtime/beacon_pub_sub_adapter.ex | 33 ++ lib/realtime/user_counter.ex | 15 +- mix.exs | 1 + test/realtime/gen_rpc_pub_sub_test.exs | 2 +- test/realtime/user_counter_test.exs | 127 ++++--- 24 files changed, 1652 insertions(+), 48 deletions(-) create mode 100644 .github/workflows/beacon_tests.yml create mode 100644 beacon/.formatter.exs create mode 100644 beacon/.gitignore create mode 100644 beacon/README.md create mode 100644 beacon/config/config.exs create mode 100644 beacon/lib/beacon.ex create mode 100644 beacon/lib/beacon/adapter.ex create mode 100644 beacon/lib/beacon/adapter/erl_dist.ex create mode 100644 beacon/lib/beacon/partition.ex create mode 100644 beacon/lib/beacon/scope.ex create mode 100644 beacon/lib/beacon/supervisor.ex create mode 100644 beacon/mix.exs create mode 100644 beacon/mix.lock create mode 100644 beacon/test/beacon/partition_test.exs create mode 100644 beacon/test/beacon_test.exs create mode 100644 beacon/test/support/peer.ex create mode 100644 beacon/test/test_helper.exs create mode 100644 lib/realtime/beacon_pub_sub_adapter.ex diff --git a/.github/workflows/beacon_tests.yml b/.github/workflows/beacon_tests.yml new file mode 100644 index 000000000..bf2f8fae8 --- /dev/null +++ b/.github/workflows/beacon_tests.yml @@ -0,0 +1,46 @@ +name: Beacon Tests +defaults: + run: + shell: bash + working-directory: ./beacon +on: + pull_request: + paths: + - "beacon/**" + + push: + branches: + - main + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +env: + MIX_ENV: test + +jobs: + tests: + name: Tests & Lint + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Setup elixir + id: beam + uses: erlef/setup-beam@v1 + with: + otp-version: 27.x # Define the OTP version [required] + elixir-version: 1.18.x # Define the elixir version [required] + - name: Install dependencies + run: mix deps.get + - name: Start epmd + run: epmd -daemon + - name: Run tests + run: MIX_ENV=test mix test + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Check for warnings + run: mix compile --force --warnings-as-errors + - name: Run format check + run: mix format --check-formatted diff --git a/Dockerfile b/Dockerfile index 547fc5709..6eb90206b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,6 +34,7 @@ RUN mix local.hex --force && \ # install mix dependencies COPY mix.exs mix.lock ./ +COPY beacon beacon RUN mix deps.get --only $MIX_ENV RUN mkdir config diff --git a/beacon/.formatter.exs b/beacon/.formatter.exs new file mode 100644 index 000000000..d2cda26ed --- /dev/null +++ b/beacon/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/beacon/.gitignore b/beacon/.gitignore new file mode 100644 index 000000000..65fb2f5eb --- /dev/null +++ b/beacon/.gitignore @@ -0,0 +1,23 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +beacon-*.tar + +# Temporary files, for example, from tests. +/tmp/ diff --git a/beacon/README.md b/beacon/README.md new file mode 100644 index 000000000..b89093b1e --- /dev/null +++ b/beacon/README.md @@ -0,0 +1,60 @@ +# Beacon + +Beacon is a scalable process group manager. The main use case for this library is to have membership counts available on the cluster without spamming whenever a process joins or leaves a group. A node can have thousands of processes joining and leaving hundreds of groups while sending just the membership count to other nodes. + +The main features are: + +* Process pids are available only to the node the where the processes reside; +* Groups are partitioned locally to allow greater concurrency while joining different groups; +* Group counts are periodically broadcasted (defaults to every 5 seconds) to update group membership numbers to all participating nodes; +* Sub-cluster nodes join by using same scope; + +## Installation + +The package can be installed by adding `beacon` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:beacon, "~> 1.0"} + ] +end +``` + +## Using + +Add Beacon to your application's supervision tree specifying a scope name (here it's `:users`) + +```elixir +def start(_type, _args) do + children = + [ + {Beacon, :users}, + # Or passing options: + # {Beacon, [:users, opts]} + # See Beacon.start_link/2 for the options +``` + +Now process can join groups + +```elixir +iex> pid = self() +#PID<0.852.0> +iex> Beacon.join(:users, {:tenant, 123}, pid) +:ok +iex> Beacon.local_member_count(:users, {:tenant, 123}) +1 +iex> Beacon.local_members(:users, {:tenant, 123}) +[#PID<0.852.0>] +iex> Beacon.local_member?(:users, {:tenant, 123}, pid) +true +``` + +From another node part of the same scope: + +```elixir +iex> Beacon.member_counts(:users) +%{{:tenant, 123} => 1} +iex> Beacon.member_count(:users, {:tenant, 123}) +1 +``` diff --git a/beacon/config/config.exs b/beacon/config/config.exs new file mode 100644 index 000000000..e17c52707 --- /dev/null +++ b/beacon/config/config.exs @@ -0,0 +1,4 @@ +import Config + +# Print nothing during tests unless captured or a test failure happens +config :logger, backends: [], level: :debug diff --git a/beacon/lib/beacon.ex b/beacon/lib/beacon.ex new file mode 100644 index 000000000..ba8e7987c --- /dev/null +++ b/beacon/lib/beacon.ex @@ -0,0 +1,153 @@ +defmodule Beacon do + @moduledoc """ + Distributed process group membership tracking. + """ + + alias Beacon.Partition + alias Beacon.Scope + + @type group :: any + @type start_option :: + {:partitions, pos_integer()} | {:broadcast_interval_in_ms, non_neg_integer()} + + @doc "Returns a supervisor child specification for a Beacon scope" + def child_spec([scope]) when is_atom(scope), do: child_spec([scope, []]) + def child_spec(scope) when is_atom(scope), do: child_spec([scope, []]) + + def child_spec([scope, opts]) when is_atom(scope) and is_list(opts) do + %{ + id: Beacon, + start: {__MODULE__, :start_link, [scope, opts]}, + type: :supervisor + } + end + + @doc """ + Starts the Beacon supervision tree for `scope`. + + Options: + + * `:partitions` - number of partitions to use (default: number of schedulers online) + * `:broadcast_interval_in_ms`: - interval in milliseconds to broadcast membership counts to other nodes (default: 5000 ms) + * `:message_module` - module implementing `Beacon.Adapter` behaviour (default: `Beacon.Adapter.ErlDist`) + """ + @spec start_link(atom, [start_option]) :: Supervisor.on_start() + def start_link(scope, opts \\ []) when is_atom(scope) do + {partitions, opts} = Keyword.pop(opts, :partitions, System.schedulers_online()) + broadcast_interval_in_ms = Keyword.get(opts, :broadcast_interval_in_ms) + + if not (is_integer(partitions) and partitions >= 1) do + raise ArgumentError, + "expected :partitions to be a positive integer, got: #{inspect(partitions)}" + end + + if broadcast_interval_in_ms != nil and + not (is_integer(broadcast_interval_in_ms) and broadcast_interval_in_ms > 0) do + raise ArgumentError, + "expected :broadcast_interval_in_ms to be a positive integer, got: #{inspect(broadcast_interval_in_ms)}" + end + + Beacon.Supervisor.start_link(scope, partitions, opts) + end + + @doc "Join pid to group in scope" + @spec join(atom, any, pid) :: :ok | {:error, :not_local} + def join(_scope, _group, pid) when is_pid(pid) and node(pid) != node(), do: {:error, :not_local} + + def join(scope, group, pid) when is_atom(scope) and is_pid(pid) do + Partition.join(Beacon.Supervisor.partition(scope, group), group, pid) + end + + @doc "Leave pid from group in scope" + @spec leave(atom, group, pid) :: :ok + def leave(scope, group, pid) when is_atom(scope) and is_pid(pid) do + Partition.leave(Beacon.Supervisor.partition(scope, group), group, pid) + end + + @doc "Get total members count per group in scope" + @spec member_counts(atom) :: %{group => non_neg_integer} + def member_counts(scope) when is_atom(scope) do + remote_counts = Scope.member_counts(scope) + + scope + |> local_member_counts() + |> Map.merge(remote_counts, fn _k, v1, v2 -> v1 + v2 end) + end + + @doc "Get total member count of group in scope" + @spec member_count(atom, group) :: non_neg_integer + def member_count(scope, group) do + local_member_count(scope, group) + Scope.member_count(scope, group) + end + + @doc "Get total member count of group in scope on specific node" + @spec member_count(atom, group, node) :: non_neg_integer + def member_count(scope, group, node) when node == node(), do: local_member_count(scope, group) + def member_count(scope, group, node), do: Scope.member_count(scope, group, node) + + @doc "Get local members of group in scope" + @spec local_members(atom, group) :: [pid] + def local_members(scope, group) when is_atom(scope) do + Partition.members(Beacon.Supervisor.partition(scope, group), group) + end + + @doc "Get local member count of group in scope" + @spec local_member_count(atom, group) :: non_neg_integer + def local_member_count(scope, group) when is_atom(scope) do + Partition.member_count(Beacon.Supervisor.partition(scope, group), group) + end + + @doc "Get local members count per group in scope" + @spec local_member_counts(atom) :: %{group => non_neg_integer} + def local_member_counts(scope) when is_atom(scope) do + Enum.reduce(Beacon.Supervisor.partitions(scope), %{}, fn partition_name, acc -> + Map.merge(acc, Partition.member_counts(partition_name)) + end) + end + + @doc "Check if pid is a local member of group in scope" + @spec local_member?(atom, group, pid) :: boolean + def local_member?(scope, group, pid) when is_atom(scope) and is_pid(pid) do + Partition.member?(Beacon.Supervisor.partition(scope, group), group, pid) + end + + @doc "Get all local groups in scope" + @spec local_groups(atom) :: [group] + def local_groups(scope) when is_atom(scope) do + Enum.flat_map(Beacon.Supervisor.partitions(scope), fn partition_name -> + Partition.groups(partition_name) + end) + end + + @doc "Get local group count in scope" + @spec local_group_count(atom) :: non_neg_integer + def local_group_count(scope) when is_atom(scope) do + Enum.sum_by(Beacon.Supervisor.partitions(scope), fn partition_name -> + Partition.group_count(partition_name) + end) + end + + @doc "Get groups in scope" + @spec groups(atom) :: [group] + def groups(scope) when is_atom(scope) do + remote_groups = Scope.groups(scope) + + scope + |> local_groups() + |> MapSet.new() + |> MapSet.union(remote_groups) + |> MapSet.to_list() + end + + @doc "Get group count in scope" + @spec group_count(atom) :: non_neg_integer + def group_count(scope) when is_atom(scope) do + remote_groups = Scope.groups(scope) + + scope + |> local_groups() + |> MapSet.new() + |> MapSet.union(remote_groups) + |> MapSet.size() + end +end diff --git a/beacon/lib/beacon/adapter.ex b/beacon/lib/beacon/adapter.ex new file mode 100644 index 000000000..cc3fb6abf --- /dev/null +++ b/beacon/lib/beacon/adapter.ex @@ -0,0 +1,17 @@ +defmodule Beacon.Adapter do + @moduledoc """ + Behaviour module for Beacon messaging adapters. + """ + + @doc "Register the current process to receive messages for the given scope" + @callback register(scope :: atom) :: :ok + + @doc "Broadcast a message to all nodes in the given scope" + @callback broadcast(scope :: atom, message :: term) :: any + + @doc "Broadcast a message to specific nodes in the given scope" + @callback broadcast(scope :: atom, [node], message :: term) :: any + + @doc "Send a message to a specific node in the given scope" + @callback send(scope :: atom, node, message :: term) :: any +end diff --git a/beacon/lib/beacon/adapter/erl_dist.ex b/beacon/lib/beacon/adapter/erl_dist.ex new file mode 100644 index 000000000..4f3c2b55a --- /dev/null +++ b/beacon/lib/beacon/adapter/erl_dist.ex @@ -0,0 +1,30 @@ +defmodule Beacon.Adapter.ErlDist do + @moduledoc false + + import Kernel, except: [send: 2] + + @behaviour Beacon.Adapter + + @impl true + def register(scope) do + Process.register(self(), Beacon.Supervisor.name(scope)) + :ok + end + + @impl true + def broadcast(scope, message) do + name = Beacon.Supervisor.name(scope) + Enum.each(Node.list(), fn node -> :erlang.send({name, node}, message, [:noconnect]) end) + end + + @impl true + def broadcast(scope, nodes, message) do + name = Beacon.Supervisor.name(scope) + Enum.each(nodes, fn node -> :erlang.send({name, node}, message, [:noconnect]) end) + end + + @impl true + def send(scope, node, message) do + :erlang.send({Beacon.Supervisor.name(scope), node}, message, [:noconnect]) + end +end diff --git a/beacon/lib/beacon/partition.ex b/beacon/lib/beacon/partition.ex new file mode 100644 index 000000000..c8d58e9a9 --- /dev/null +++ b/beacon/lib/beacon/partition.ex @@ -0,0 +1,148 @@ +defmodule Beacon.Partition do + @moduledoc false + + use GenServer + require Logger + + defmodule State do + @moduledoc false + @type t :: %__MODULE__{ + name: atom, + scope: atom, + monitors: %{{Beacon.group(), pid} => reference} + } + defstruct [:name, :scope, monitors: %{}] + end + + @spec join(atom, Beacon.group(), pid) :: :ok + def join(partition_name, group, pid), do: GenServer.call(partition_name, {:join, group, pid}) + + @spec leave(atom, Beacon.group(), pid) :: :ok + def leave(partition_name, group, pid), do: GenServer.call(partition_name, {:leave, group, pid}) + + @spec members(atom, Beacon.group()) :: [pid] + def members(partition_name, group) do + case :ets.lookup_element(partition_name, group, 2, []) do + [] -> [] + pids -> MapSet.to_list(pids) + end + end + + @spec member_count(atom, Beacon.group()) :: non_neg_integer + def member_count(partition_name, group), do: :ets.lookup_element(partition_name, group, 3, 0) + + @spec member_counts(atom) :: %{Beacon.group() => non_neg_integer} + def member_counts(partition_name) do + partition_name + |> :ets.select([{{:"$1", :_, :"$2"}, [], [{{:"$1", :"$2"}}]}]) + |> Map.new() + end + + @spec member?(atom, Beacon.group(), pid) :: boolean + def member?(partition_name, group, pid) do + case :ets.lookup_element(partition_name, group, 2, []) do + [] -> false + pids -> MapSet.member?(pids, pid) + end + end + + @spec groups(atom) :: [Beacon.group()] + def groups(partition_name), do: :ets.select(partition_name, [{{:"$1", :_, :_}, [], [:"$1"]}]) + + @spec group_count(atom) :: non_neg_integer + def group_count(partition_name), do: :ets.info(partition_name, :size) + + @spec start_link(atom, atom) :: GenServer.on_start() + def start_link(scope, partition_name), + do: GenServer.start_link(__MODULE__, [scope, partition_name], name: partition_name) + + @impl true + @spec init(any) :: {:ok, State.t()} + def init([scope, name]) do + {:ok, %State{scope: scope, name: name}, {:continue, :rebuild_monitors}} + end + + @impl true + @spec handle_continue(:rebuild_monitors, State.t()) :: {:noreply, State.t()} + def handle_continue(:rebuild_monitors, state) do + monitors = + for {group, pids, _counter} <- :ets.tab2list(state.name), pid <- pids, into: %{} do + ref = Process.monitor(pid, tag: {:DOWN, group}) + {{group, pid}, ref} + end + + {:noreply, %{state | monitors: monitors}} + end + + @impl true + @spec handle_call({:join, Beacon.group(), pid}, GenServer.from(), State.t()) :: + {:reply, :ok, State.t()} + def handle_call({:join, group, pid}, _from, state) do + case :ets.lookup(state.name, group) do + [{^group, pids, counter}] -> + if MapSet.member?(pids, pid) do + # Already being tracked + {:reply, :ok, state} + else + new_pids = MapSet.put(pids, pid) + :ets.insert(state.name, {group, new_pids, counter + 1}) + ref = Process.monitor(pid, tag: {:DOWN, group}) + monitors = Map.put(state.monitors, {group, pid}, ref) + {:reply, :ok, %{state | monitors: monitors}} + end + + [] -> + :ets.insert(state.name, {group, MapSet.new([pid]), 1}) + ref = Process.monitor(pid, tag: {:DOWN, group}) + monitors = Map.put(state.monitors, {group, pid}, ref) + {:reply, :ok, %{state | monitors: monitors}} + end + end + + def handle_call({:leave, group, pid}, _from, state) do + state = remove(group, pid, state) + {:reply, :ok, state} + end + + @impl true + @spec handle_info({{:DOWN, Beacon.group()}, reference, :process, pid, term}, State.t()) :: + {:noreply, State.t()} + def handle_info({{:DOWN, group}, _ref, :process, pid, _reason}, state) do + state = remove(group, pid, state) + {:noreply, state} + end + + def handle_info(_, state), do: {:noreply, state} + + defp remove(group, pid, state) do + case :ets.lookup(state.name, group) do + [{^group, pids, counter}] -> + if MapSet.member?(pids, pid) do + new_pids = MapSet.delete(pids, pid) + new_counter = counter - 1 + + if new_counter == 0 do + :ets.delete(state.name, group) + else + :ets.insert(state.name, {group, new_pids, new_counter}) + end + else + Logger.warning( + "Beacon[#{node()}|#{state.scope}] Trying to remove an unknown process #{inspect(pid)}" + ) + end + + [] -> + :ok + end + + case Map.pop(state.monitors, {group, pid}) do + {nil, _} -> + state + + {ref, new_monitors} -> + Process.demonitor(ref, [:flush]) + %{state | monitors: new_monitors} + end + end +end diff --git a/beacon/lib/beacon/scope.ex b/beacon/lib/beacon/scope.ex new file mode 100644 index 000000000..00582d9b2 --- /dev/null +++ b/beacon/lib/beacon/scope.ex @@ -0,0 +1,200 @@ +defmodule Beacon.Scope do + @moduledoc false + + use GenServer + require Logger + + @default_broadcast_interval 5_000 + + @spec member_counts(atom) :: %{Beacon.group() => non_neg_integer} + def member_counts(scope) do + scope + |> table_name() + |> :ets.select([{{:_, :"$1"}, [], [:"$1"]}]) + |> Enum.reduce(%{}, fn member_counts, acc -> + Map.merge(acc, member_counts, fn _k, v1, v2 -> v1 + v2 end) + end) + end + + @spec member_count(atom, Beacon.group()) :: non_neg_integer + def member_count(scope, group) do + scope + |> table_name() + |> :ets.select([{{:_, :"$1"}, [], [:"$1"]}]) + |> Enum.sum_by(fn member_counts -> Map.get(member_counts, group, 0) end) + end + + @spec member_count(atom, Beacon.group(), node) :: non_neg_integer + def member_count(scope, group, node) do + case :ets.lookup(table_name(scope), node) do + [{^node, member_counts}] -> Map.get(member_counts, group, 0) + [] -> 0 + end + end + + @spec groups(atom) :: MapSet.t(Beacon.group()) + def groups(scope) do + scope + |> table_name() + |> :ets.select([{{:_, :"$1"}, [], [:"$1"]}]) + |> Enum.reduce(MapSet.new(), fn member_counts, acc -> + member_counts + |> Map.keys() + |> MapSet.new() + |> MapSet.union(acc) + end) + end + + @typep member_counts :: %{Beacon.group() => non_neg_integer} + + defp table_name(scope), do: :"#{scope}_beacon_peer_counts" + + defmodule State do + @moduledoc false + @type t :: %__MODULE__{ + scope: atom, + message_module: module, + broadcast_interval: non_neg_integer, + peer_counts_table: :ets.tid(), + peers: %{pid => reference} + } + defstruct [ + :scope, + :message_module, + :broadcast_interval, + :peer_counts_table, + peers: %{} + ] + end + + @spec start_link(atom, Keyword.t()) :: GenServer.on_start() + def start_link(scope, opts \\ []), do: GenServer.start_link(__MODULE__, [scope, opts]) + + @impl true + def init([scope, opts]) do + :ok = :net_kernel.monitor_nodes(true) + + peer_counts_table = + :ets.new(table_name(scope), [:set, :protected, :named_table, read_concurrency: true]) + + broadcast_interval = + Keyword.get(opts, :broadcast_interval_in_ms, @default_broadcast_interval) + + message_module = Keyword.get(opts, :message_module, Beacon.Adapter.ErlDist) + + Logger.info("Beacon[#{node()}|#{scope}] Starting") + + :ok = message_module.register(scope) + + {:ok, + %State{ + scope: scope, + message_module: message_module, + broadcast_interval: broadcast_interval, + peer_counts_table: peer_counts_table + }, {:continue, :discover}} + end + + @impl true + @spec handle_continue(:discover, State.t()) :: {:noreply, State.t()} + def handle_continue(:discover, state) do + state.message_module.broadcast(state.scope, {:discover, self()}) + Process.send_after(self(), :broadcast_counts, state.broadcast_interval) + {:noreply, state} + end + + @impl true + @spec handle_info( + {:discover, pid} + | {:sync, pid, member_counts} + | :broadcast_counts + | {:nodeup, node} + | {:nodedown, node} + | {:DOWN, reference, :process, pid, term}, + State.t() + ) :: {:noreply, State.t()} + def handle_info({:discover, peer}, state) do + Logger.info( + "Beacon[#{node()}|#{state.scope}] Received DISCOVER request from node #{node(peer)}" + ) + + state.message_module.send( + state.scope, + node(peer), + {:sync, self(), Beacon.local_member_counts(state.scope)} + ) + + # We don't do anything if we already know about this peer + if Map.has_key?(state.peers, peer) do + Logger.debug( + "Beacon[#{node()}|#{state.scope}] already know peer #{inspect(peer)} from node #{node(peer)}" + ) + + {:noreply, state} + else + Logger.debug( + "Beacon[#{node()}|#{state.scope}] discovered peer #{inspect(peer)} from node #{node(peer)}" + ) + + ref = Process.monitor(peer) + new_peers = Map.put(state.peers, peer, ref) + state.message_module.send(state.scope, node(peer), {:discover, self()}) + {:noreply, %State{state | peers: new_peers}} + end + end + + def handle_info({:sync, peer, member_counts}, state) do + :ets.insert(state.peer_counts_table, {node(peer), member_counts}) + {:noreply, state} + end + + def handle_info(:broadcast_counts, state) do + nodes = + state.peers + |> Map.keys() + |> Enum.map(&node/1) + + state.message_module.broadcast( + state.scope, + nodes, + {:sync, self(), Beacon.local_member_counts(state.scope)} + ) + + Process.send_after(self(), :broadcast_counts, state.broadcast_interval) + {:noreply, state} + end + + def handle_info({:nodeup, node}, state) when node == node(), do: {:noreply, state} + + def handle_info({:nodeup, node}, state) do + :telemetry.execute([:beacon, :node, :up], %{}, %{node: node}) + + Logger.info( + "Beacon[#{node()}|#{state.scope}] Node #{node} has joined the cluster, sending discover message" + ) + + state.message_module.send(state.scope, node, {:discover, self()}) + {:noreply, state} + end + + # Do nothing and wait for the DOWN message from monitor + def handle_info({:nodedown, _node}, state), do: {:noreply, state} + + def handle_info({:DOWN, ref, :process, peer, reason}, state) do + Logger.info( + "Beacon[#{node()}|#{state.scope}] Scope process is DOWN on node #{node(peer)}: #{inspect(reason)}" + ) + + case Map.pop(state.peers, peer) do + {nil, _} -> + {:noreply, state} + + {^ref, new_peers} -> + :ets.delete(state.peer_counts_table, node(peer)) + :telemetry.execute([:beacon, :node, :down], %{}, %{node: node(peer)}) + {:noreply, %State{state | peers: new_peers}} + end + end + + def handle_info(_msg, state), do: {:noreply, state} +end diff --git a/beacon/lib/beacon/supervisor.ex b/beacon/lib/beacon/supervisor.ex new file mode 100644 index 000000000..5ba9d7d0d --- /dev/null +++ b/beacon/lib/beacon/supervisor.ex @@ -0,0 +1,53 @@ +defmodule Beacon.Supervisor do + @moduledoc false + use Supervisor + + def name(scope), do: :"#{scope}_beacon" + def supervisor_name(scope), do: :"#{scope}_beacon_supervisor" + def partition_name(scope, partition), do: :"#{scope}_beacon_partition_#{partition}" + + @spec partition(atom, Scope.group()) :: atom + def partition(scope, group) do + case :persistent_term.get(scope, :unknown) do + :unknown -> raise "Beacon for scope #{inspect(scope)} is not started" + partition_names -> elem(partition_names, :erlang.phash2(group, tuple_size(partition_names))) + end + end + + @spec partitions(atom) :: [atom] + def partitions(scope) do + case :persistent_term.get(scope, :unknown) do + :unknown -> raise "Beacon for scope #{inspect(scope)} is not started" + partition_names -> Tuple.to_list(partition_names) + end + end + + @spec start_link(atom, pos_integer(), Keyword.t()) :: Supervisor.on_start() + def start_link(scope, partitions, opts \\ []) do + args = [scope, partitions, opts] + Supervisor.start_link(__MODULE__, args, name: supervisor_name(scope)) + end + + @impl true + def init([scope, partitions, opts]) do + children = + for i <- 0..(partitions - 1) do + partition_name = partition_name(scope, i) + + ^partition_name = + :ets.new(partition_name, [:set, :public, :named_table, read_concurrency: true]) + + %{id: i, start: {Beacon.Partition, :start_link, [scope, partition_name]}} + end + + partition_names = for i <- 0..(partitions - 1), do: partition_name(scope, i) + + :persistent_term.put(scope, List.to_tuple(partition_names)) + + children = [ + %{id: :scope, start: {Beacon.Scope, :start_link, [scope, opts]}} | children + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/beacon/mix.exs b/beacon/mix.exs new file mode 100644 index 000000000..4448f5f1e --- /dev/null +++ b/beacon/mix.exs @@ -0,0 +1,34 @@ +defmodule Beacon.MixProject do + use Mix.Project + + def project do + [ + app: :beacon, + version: "1.0.0", + elixir: "~> 1.18", + start_permanent: Mix.env() == :prod, + elixirc_paths: elixirc_paths(Mix.env()), + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger] + ] + end + + # Specifies which paths to compile per environment. + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:telemetry, "~> 1.3"}, + {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false} + # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} + ] + end +end diff --git a/beacon/mix.lock b/beacon/mix.lock new file mode 100644 index 000000000..2ba2a6c23 --- /dev/null +++ b/beacon/mix.lock @@ -0,0 +1,5 @@ +%{ + "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, + "mix_test_watch": {:hex, :mix_test_watch, "1.4.0", "d88bcc4fbe3198871266e9d2f00cd8ae350938efbb11d3fa1da091586345adbb", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "2b4693e17c8ead2ef56d4f48a0329891e8c2d0d73752c0f09272a2b17dc38d1b"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, +} diff --git a/beacon/test/beacon/partition_test.exs b/beacon/test/beacon/partition_test.exs new file mode 100644 index 000000000..c8751269c --- /dev/null +++ b/beacon/test/beacon/partition_test.exs @@ -0,0 +1,172 @@ +defmodule Beacon.PartitionTest do + use ExUnit.Case, async: true + alias Beacon.Partition + + setup do + scope = __MODULE__ + partition_name = Beacon.Supervisor.partition_name(scope, System.unique_integer([:positive])) + + ^partition_name = + :ets.new(partition_name, [:set, :public, :named_table, read_concurrency: true]) + + spec = %{ + id: partition_name, + start: {Partition, :start_link, [scope, partition_name]}, + type: :supervisor, + restart: :temporary + } + + pid = start_supervised!(spec) + + {:ok, partition_name: partition_name, partition_pid: pid} + end + + test "members/2 returns empty list for non-existent group", %{partition_name: partition} do + assert Partition.members(partition, :nonexistent) == [] + end + + test "member_count/2 returns 0 for non-existent group", %{partition_name: partition} do + assert Partition.member_count(partition, :nonexistent) == 0 + end + + test "member?/3 returns false for non-member", %{partition_name: partition} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + refute Partition.member?(partition, :group1, pid) + end + + test "join and query member", %{partition_name: partition} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + + assert :ok = Partition.join(partition, :group1, pid) + assert Partition.member?(partition, :group1, pid) + assert Partition.member_count(partition, :group1) == 1 + assert pid in Partition.members(partition, :group1) + end + + test "join multiple times and query member", %{partition_name: partition} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + + assert :ok = Partition.join(partition, :group1, pid) + assert :ok = Partition.join(partition, :group1, pid) + assert :ok = Partition.join(partition, :group1, pid) + + assert Partition.member?(partition, :group1, pid) + assert Partition.member_count(partition, :group1) == 1 + assert pid in Partition.members(partition, :group1) + end + + test "leave removes member", %{partition_name: partition} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + + Partition.join(partition, :group1, pid) + assert Partition.member?(partition, :group1, pid) + + Partition.leave(partition, :group1, pid) + refute Partition.member?(partition, :group1, pid) + end + + test "leave multiple times removes member", %{partition_name: partition} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + + Partition.join(partition, :group1, pid) + assert Partition.member?(partition, :group1, pid) + + Partition.leave(partition, :group1, pid) + Partition.leave(partition, :group1, pid) + Partition.leave(partition, :group1, pid) + refute Partition.member?(partition, :group1, pid) + end + + test "member_counts returns counts for all groups", %{partition_name: partition} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + pid3 = spawn_link(fn -> Process.sleep(:infinity) end) + + Partition.join(partition, :group1, pid1) + Partition.join(partition, :group1, pid2) + Partition.join(partition, :group2, pid3) + + counts = Partition.member_counts(partition) + assert map_size(counts) == 2 + assert counts[:group1] == 2 + assert counts[:group2] == 1 + end + + test "groups returns all groups", %{partition_name: partition} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + + Partition.join(partition, :group1, pid1) + Partition.join(partition, :group2, pid2) + + groups = Partition.groups(partition) + assert :group1 in groups + assert :group2 in groups + end + + test "group_counts returns number of groups", %{partition_name: partition} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + pid3 = spawn_link(fn -> Process.sleep(:infinity) end) + pid4 = spawn_link(fn -> Process.sleep(:infinity) end) + + Partition.join(partition, :group1, pid1) + Partition.join(partition, :group1, pid2) + Partition.join(partition, :group2, pid3) + Partition.join(partition, :group3, pid4) + + assert Partition.group_count(partition) == 3 + end + + test "process death removes member from group", %{partition_name: partition} do + pid = spawn(fn -> Process.sleep(:infinity) end) + + Partition.join(partition, :group1, pid) + assert Partition.member?(partition, :group1, pid) + + Process.exit(pid, :kill) + Process.sleep(50) + + refute Partition.member?(partition, :group1, pid) + assert Partition.member_count(partition, :group1) == 0 + end + + test "partition recovery monitors processes again", %{ + partition_name: partition, + partition_pid: partition_pid + } do + pid1 = spawn(fn -> Process.sleep(:infinity) end) + pid2 = spawn(fn -> Process.sleep(:infinity) end) + + Partition.join(partition, :group1, pid1) + Partition.join(partition, :group2, pid2) + + monitors = Process.info(partition_pid, [:monitors])[:monitors] |> Enum.map(&elem(&1, 1)) + assert length(monitors) + assert monitors |> Enum.member?(pid1) + assert monitors |> Enum.member?(pid2) + + assert %{{:group1, ^pid1} => _ref1, {:group2, ^pid2} => _ref2} = + :sys.get_state(partition_pid).monitors + + Process.monitor(partition_pid) + Process.exit(partition_pid, :kill) + assert_receive {:DOWN, _ref, :process, ^partition_pid, :killed} + + spec = %{ + id: :recover, + start: {Partition, :start_link, [__MODULE__, partition]}, + type: :supervisor + } + + partition_pid = start_supervised!(spec) + + assert %{{:group1, ^pid1} => _ref1, {:group2, ^pid2} => _ref2} = + :sys.get_state(partition_pid).monitors + + monitors = Process.info(partition_pid, [:monitors])[:monitors] |> Enum.map(&elem(&1, 1)) + assert length(monitors) + assert monitors |> Enum.member?(pid1) + assert monitors |> Enum.member?(pid2) + end +end diff --git a/beacon/test/beacon_test.exs b/beacon/test/beacon_test.exs new file mode 100644 index 000000000..fa40cabad --- /dev/null +++ b/beacon/test/beacon_test.exs @@ -0,0 +1,469 @@ +defmodule BeaconTest do + use ExUnit.Case, async: true + + setup do + scope = :"test_scope#{System.unique_integer([:positive])}" + + %{scope: scope} + end + + defp spec(scope, opts) do + %{ + id: scope, + start: {Beacon, :start_link, [scope, opts]}, + type: :supervisor + } + end + + describe "start_link/2" do + test "starts beacon with default partitions", %{scope: scope} do + pid = start_supervised!({Beacon, [scope, []]}) + assert Process.alive?(pid) + assert is_list(Beacon.Supervisor.partitions(scope)) + assert length(Beacon.Supervisor.partitions(scope)) == System.schedulers_online() + end + + test "starts beacon with custom partition count", %{scope: scope} do + pid = start_supervised!(spec(scope, partitions: 3)) + assert Process.alive?(pid) + assert length(Beacon.Supervisor.partitions(scope)) == 3 + end + + test "raises on invalid partition count", %{scope: scope} do + assert_raise ArgumentError, ~r/expected :partitions to be a positive integer/, fn -> + Beacon.start_link(scope, partitions: 0) + end + + assert_raise ArgumentError, ~r/expected :partitions to be a positive integer/, fn -> + Beacon.start_link(scope, partitions: -1) + end + + assert_raise ArgumentError, ~r/expected :partitions to be a positive integer/, fn -> + Beacon.start_link(scope, partitions: :invalid) + end + end + + test "raises on invalid broadcast_interval_in_ms", %{scope: scope} do + assert_raise ArgumentError, + ~r/expected :broadcast_interval_in_ms to be a positive integer/, + fn -> + Beacon.start_link(scope, broadcast_interval_in_ms: 0) + end + + assert_raise ArgumentError, + ~r/expected :broadcast_interval_in_ms to be a positive integer/, + fn -> + Beacon.start_link(scope, broadcast_interval_in_ms: -1) + end + + assert_raise ArgumentError, + ~r/expected :broadcast_interval_in_ms to be a positive integer/, + fn -> + Beacon.start_link(scope, broadcast_interval_in_ms: :invalid) + end + end + end + + describe "join/3 and leave/3" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "can join a group", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + assert :ok = Beacon.join(scope, :group1, pid) + assert Beacon.local_member?(scope, :group1, pid) + end + + test "can leave a group", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + assert :ok = Beacon.join(scope, :group1, pid) + assert Beacon.local_member?(scope, :group1, pid) + + assert :ok = Beacon.leave(scope, :group1, pid) + refute Beacon.local_member?(scope, :group1, pid) + end + + test "joining same group twice is idempotent", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + assert :ok = Beacon.join(scope, :group1, pid) + assert :ok = Beacon.join(scope, :group1, pid) + assert Beacon.local_member_count(scope, :group1) == 1 + end + + test "multiple processes can join same group", %{scope: scope} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + + assert :ok = Beacon.join(scope, :group1, pid1) + assert :ok = Beacon.join(scope, :group1, pid2) + + members = Beacon.local_members(scope, :group1) + assert length(members) == 2 + assert pid1 in members + assert pid2 in members + end + + test "process can join multiple groups", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + + assert :ok = Beacon.join(scope, :group1, pid) + assert :ok = Beacon.join(scope, :group2, pid) + + assert Beacon.local_member?(scope, :group1, pid) + assert Beacon.local_member?(scope, :group2, pid) + end + + test "automatically removes member when process dies", %{scope: scope} do + pid = spawn(fn -> Process.sleep(:infinity) end) + assert :ok = Beacon.join(scope, :group1, pid) + assert Beacon.local_member?(scope, :group1, pid) + + Process.exit(pid, :kill) + Process.sleep(50) + + refute Beacon.local_member?(scope, :group1, pid) + assert Beacon.local_member_count(scope, :group1) == 0 + end + end + + describe "local_members/2" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "returns empty list for non-existent group", %{scope: scope} do + assert Beacon.local_members(scope, :nonexistent) == [] + end + + test "returns all members of a group", %{scope: scope} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + pid3 = spawn_link(fn -> Process.sleep(:infinity) end) + + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group1, pid2) + Beacon.join(scope, :group2, pid3) + + members = Beacon.local_members(scope, :group1) + assert length(members) == 2 + assert pid1 in members + assert pid2 in members + refute pid3 in members + end + end + + describe "local_member_count/2" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "returns 0 for non-existent group", %{scope: scope} do + assert Beacon.local_member_count(scope, :nonexistent) == 0 + end + + test "returns correct count", %{scope: scope} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + + assert Beacon.local_member_count(scope, :group1) == 0 + + Beacon.join(scope, :group1, pid1) + assert Beacon.local_member_count(scope, :group1) == 1 + + Beacon.join(scope, :group1, pid2) + assert Beacon.local_member_count(scope, :group1) == 2 + + Beacon.leave(scope, :group1, pid1) + assert Beacon.local_member_count(scope, :group1) == 1 + end + end + + describe "local_member_counts/1" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "returns empty map when no groups exist", %{scope: scope} do + assert Beacon.local_member_counts(scope) == %{} + end + + test "returns counts for all groups", %{scope: scope} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + pid3 = spawn_link(fn -> Process.sleep(:infinity) end) + + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group1, pid2) + Beacon.join(scope, :group2, pid3) + + assert Beacon.local_member_counts(scope) == %{ + group1: 2, + group2: 1 + } + end + end + + describe "local_member?/3" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "returns false for non-member", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + refute Beacon.local_member?(scope, :group1, pid) + end + + test "returns true for member", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + Beacon.join(scope, :group1, pid) + assert Beacon.local_member?(scope, :group1, pid) + end + + test "returns false after leaving", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + + Beacon.join(scope, :group1, pid) + Beacon.leave(scope, :group1, pid) + + refute Beacon.local_member?(scope, :group1, pid) + end + end + + describe "local_groups/1" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "returns empty list when no groups exist", %{scope: scope} do + assert Beacon.local_groups(scope) == [] + end + + test "returns all groups with members", %{scope: scope} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group2, pid2) + Beacon.join(scope, :group3, pid1) + + groups = Beacon.local_groups(scope) + assert :group1 in groups + assert :group2 in groups + assert :group3 in groups + assert length(groups) == 3 + end + + test "removes group from list when last member leaves", %{scope: scope} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + Beacon.join(scope, :group1, pid) + assert :group1 in Beacon.local_groups(scope) + + Beacon.leave(scope, :group1, pid) + refute :group1 in Beacon.local_groups(scope) + end + end + + describe "local_group_count/1" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "returns 0 when no groups exist", %{scope: scope} do + assert Beacon.local_group_count(scope) == 0 + end + + test "returns correct count of groups", %{scope: scope} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group2, pid2) + Beacon.join(scope, :group3, pid2) + Beacon.join(scope, :group3, pid1) + assert Beacon.local_group_count(scope) == 3 + Beacon.leave(scope, :group2, pid2) + assert Beacon.local_group_count(scope) == 2 + end + end + + describe "member_counts/1" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 2)) + :ok + end + + test "returns local counts when no peers", %{scope: scope} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group1, pid2) + + counts = Beacon.member_counts(scope) + assert counts[:group1] == 2 + end + end + + describe "partition distribution" do + setup %{scope: scope} do + start_supervised!(spec(scope, partitions: 4)) + :ok + end + + test "distributes groups across partitions", %{scope: scope} do + # Create multiple processes and verify they're split against different partitions + pids = for _ <- 1..20, do: spawn_link(fn -> Process.sleep(:infinity) end) + + Enum.each(pids, fn pid -> + Beacon.join(scope, pid, pid) + end) + + # Check that multiple partitions are being used + partition_names = Beacon.Supervisor.partitions(scope) + + Enum.map(partition_names, fn partition_name -> + assert Beacon.Partition.member_counts(partition_name) > 1 + end) + end + + test "same group always maps to same partition", %{scope: scope} do + partition1 = Beacon.Supervisor.partition(scope, :my_group) + partition2 = Beacon.Supervisor.partition(scope, :my_group) + partition3 = Beacon.Supervisor.partition(scope, :my_group) + + assert partition1 == partition2 + assert partition2 == partition3 + end + end + + @aux_mod (quote do + defmodule PeerAux do + def start(scope) do + spawn(fn -> + {:ok, _} = Beacon.start_link(scope, broadcast_interval_in_ms: 50) + + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group2, pid2) + Beacon.join(scope, :group3, pid2) + + Process.sleep(:infinity) + end) + end + end + end) + + describe "distributed tests" do + setup do + scope = :"broadcast_scope#{System.unique_integer([:positive])}" + supervisor_pid = start_supervised!(spec(scope, partitions: 2, broadcast_interval_in_ms: 50)) + {:ok, peer, node} = Peer.start_disconnected(aux_mod: @aux_mod) + + ref = + :telemetry_test.attach_event_handlers(self(), [ + [:beacon, :node, :up], + [:beacon, :node, :down] + ]) + + %{scope: scope, supervisor_pid: supervisor_pid, peer: peer, node: node, telemetry_ref: ref} + end + + test "node up", %{scope: scope, peer: peer, node: node, telemetry_ref: telemetry_ref} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group1, pid2) + Beacon.join(scope, :group2, pid2) + + true = Node.connect(node) + :peer.call(peer, PeerAux, :start, [scope]) + + assert_receive {[:beacon, :node, :up], ^telemetry_ref, %{}, %{node: ^node}} + + # Wait for at least one broadcast interval + Process.sleep(150) + assert Beacon.group_count(scope) == 3 + groups = Beacon.groups(scope) + + assert length(groups) == 3 + assert :group1 in groups + assert :group2 in groups + assert :group3 in groups + + assert Beacon.member_counts(scope) == %{group1: 3, group2: 2, group3: 1} + assert Beacon.member_count(scope, :group1) == 3 + assert Beacon.member_count(scope, :group3, node) == 1 + assert Beacon.member_count(scope, :group1, node()) == 2 + end + + test "node down", %{scope: scope, peer: peer, node: node, telemetry_ref: telemetry_ref} do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group1, pid2) + Beacon.join(scope, :group2, pid2) + + true = Node.connect(node) + :peer.call(peer, PeerAux, :start, [scope]) + assert_receive {[:beacon, :node, :up], ^telemetry_ref, %{}, %{node: ^node}} + # Wait for remote scope to communicate with local + Process.sleep(150) + + true = Node.disconnect(node) + + assert_receive {[:beacon, :node, :down], ^telemetry_ref, %{}, %{node: ^node}} + + assert Beacon.member_counts(scope) == %{group1: 2, group2: 1} + assert Beacon.member_count(scope, :group1) == 2 + end + + test "scope restart can recover", %{ + scope: scope, + supervisor_pid: supervisor_pid, + peer: peer, + node: node, + telemetry_ref: telemetry_ref + } do + pid1 = spawn_link(fn -> Process.sleep(:infinity) end) + pid2 = spawn_link(fn -> Process.sleep(:infinity) end) + Beacon.join(scope, :group1, pid1) + Beacon.join(scope, :group1, pid2) + Beacon.join(scope, :group2, pid2) + + true = Node.connect(node) + :peer.call(peer, PeerAux, :start, [scope]) + assert_receive {[:beacon, :node, :up], ^telemetry_ref, %{}, %{node: ^node}} + + # Wait for remote scope to communicate with local + Process.sleep(150) + + [ + {1, _, :worker, [Beacon.Partition]}, + {0, _, :worker, [Beacon.Partition]}, + {:scope, scope_pid, :worker, [Beacon.Scope]} + ] = Supervisor.which_children(supervisor_pid) + + # Restart the scope process + Process.monitor(scope_pid) + Process.exit(scope_pid, :kill) + assert_receive {:DOWN, _ref, :process, ^scope_pid, :killed} + # Wait for recovery and communication + Process.sleep(200) + assert Beacon.group_count(scope) == 3 + groups = Beacon.groups(scope) + assert length(groups) == 3 + assert :group1 in groups + assert :group2 in groups + assert :group3 in groups + assert Beacon.member_counts(scope) == %{group1: 3, group2: 2, group3: 1} + end + end +end diff --git a/beacon/test/support/peer.ex b/beacon/test/support/peer.ex new file mode 100644 index 000000000..42ab7e8cd --- /dev/null +++ b/beacon/test/support/peer.ex @@ -0,0 +1,89 @@ +defmodule Peer do + @moduledoc """ + Uses the gist https://gist.github.com/ityonemo/177cbc96f8c8722bfc4d127ff9baec62 to start a node for testing + """ + + @doc """ + Starts a node for testing. + + Can receive an auxiliary module to be evaluated in the node so you are able to setup functions within the test context and outside of the normal code context + + e.g. + ``` + @aux_mod (quote do + defmodule Aux do + def checker(res), do: res + end + end) + + Code.eval_quoted(@aux_mod) + test "clustered call" do + {:ok, node} = Clustered.start(@aux_mod) + assert ok = :rpc.call(node, Aux, :checker, [:ok]) + end + ``` + """ + @spec start(Keyword.t()) :: {:ok, :peer.server_ref(), node} + def start(opts \\ []) do + {:ok, peer, node} = start_disconnected(opts) + + true = Node.connect(node) + + {:ok, peer, node} + end + + @doc """ + Similar to `start/2` but the node is not connected automatically + """ + @spec start_disconnected(Keyword.t()) :: {:ok, :peer.server_ref(), node} + def start_disconnected(opts \\ []) do + extra_config = Keyword.get(opts, :extra_config, []) + name = Keyword.get(opts, :name, :peer.random_name()) + aux_mod = Keyword.get(opts, :aux_mod, nil) + + true = :erlang.set_cookie(:cookie) + + {:ok, pid, node} = + ExUnit.Callbacks.start_supervised(%{ + id: {:peer, name}, + start: + {:peer, :start_link, + [ + %{ + name: name, + host: ~c"127.0.0.1", + longnames: true, + connection: :standard_io + } + ]} + }) + + :peer.call(pid, :erlang, :set_cookie, [:cookie]) + + :ok = :peer.call(pid, :code, :add_paths, [:code.get_path()]) + + for {app_name, _, _} <- Application.loaded_applications(), + {key, value} <- Application.get_all_env(app_name) do + :ok = :peer.call(pid, Application, :put_env, [app_name, key, value]) + end + + # Override with extra config + for {app_name, key, value} <- extra_config do + :ok = :peer.call(pid, Application, :put_env, [app_name, key, value]) + end + + {:ok, _} = :peer.call(pid, Application, :ensure_all_started, [:mix]) + :ok = :peer.call(pid, Mix, :env, [Mix.env()]) + + Enum.map( + [:logger, :runtime_tools, :mix, :os_mon, :beacon], + fn app -> {:ok, _} = :peer.call(pid, Application, :ensure_all_started, [app]) end + ) + + if aux_mod do + {{:module, _, _, _}, []} = :peer.call(pid, Code, :eval_quoted, [aux_mod]) + end + + {:ok, pid, node} + end +end diff --git a/beacon/test/test_helper.exs b/beacon/test/test_helper.exs new file mode 100644 index 000000000..eea6cb589 --- /dev/null +++ b/beacon/test/test_helper.exs @@ -0,0 +1,3 @@ +ExUnit.start(capture_log: true) + +:net_kernel.start([:"beacon@127.0.0.1"]) diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 482ffb798..00058acc9 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -53,6 +53,8 @@ defmodule Realtime.Application do connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots) no_channel_timeout_in_ms = Application.get_env(:realtime, :no_channel_timeout_in_ms) master_region = Application.get_env(:realtime, :master_region) || region + user_scope_shards = Application.fetch_env!(:realtime, :users_scope_shards) + user_scope_broadast_interval_in_ms = Application.get_env(:realtime, :users_scope_broadcast_interval_in_ms, 10_000) :syn.join(RegionNodes, region, self(), node: node()) @@ -66,6 +68,15 @@ defmodule Realtime.Application do {Cluster.Supervisor, [topologies, [name: Realtime.ClusterSupervisor]]}, {Phoenix.PubSub, name: Realtime.PubSub, pool_size: 10, adapter: pubsub_adapter(), broadcast_pool_size: broadcast_pool_size}, + {Beacon, + [ + :users, + [ + partitions: user_scope_shards, + broadcast_interval_in_ms: user_scope_broadast_interval_in_ms, + message_module: Realtime.BeaconPubSubAdapter + ] + ]}, {Cachex, name: Realtime.RateCounter}, Realtime.Tenants.Cache, Realtime.RateCounter.DynamicSupervisor, diff --git a/lib/realtime/beacon_pub_sub_adapter.ex b/lib/realtime/beacon_pub_sub_adapter.ex new file mode 100644 index 000000000..f4b551f6d --- /dev/null +++ b/lib/realtime/beacon_pub_sub_adapter.ex @@ -0,0 +1,33 @@ +defmodule Realtime.BeaconPubSubAdapter do + @moduledoc "Beacon adapter to use PubSub" + + import Kernel, except: [send: 2] + + @behaviour Beacon.Adapter + + @impl true + def register(scope) do + :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, topic(scope)) + end + + @impl true + def broadcast(scope, message) do + Phoenix.PubSub.broadcast_from(Realtime.PubSub, self(), topic(scope), message) + end + + @impl true + def broadcast(scope, _nodes, message) do + # Notice here that we don't filter by nodes, as PubSub broadcasts to all subscribers + # We are broadcasting to everyone because we want to use the fact that Realtime.PubSub uses + # regional broadcasting which is more efficient in this multi-region setup + + broadcast(scope, message) + end + + @impl true + def send(scope, node, message) do + Phoenix.PubSub.direct_broadcast(node, Realtime.PubSub, topic(scope), message) + end + + defp topic(scope), do: "beacon:#{scope}" +end diff --git a/lib/realtime/user_counter.ex b/lib/realtime/user_counter.ex index afcb357af..b6f85a920 100644 --- a/lib/realtime/user_counter.ex +++ b/lib/realtime/user_counter.ex @@ -8,7 +8,16 @@ defmodule Realtime.UsersCounter do Adds a RealtimeChannel pid to the `:users` scope for a tenant so we can keep track of all connected clients for a tenant. """ @spec add(pid(), String.t()) :: :ok - def add(pid, tenant_id), do: tenant_id |> scope() |> :syn.join(tenant_id, pid) + def add(pid, tenant_id) when is_pid(pid) and is_binary(tenant_id) do + beacon_join(pid, tenant_id) + tenant_id |> scope() |> :syn.join(tenant_id, pid) + end + + defp beacon_join(pid, tenant_id) do + :ok = Beacon.join(:users, tenant_id, pid) + rescue + _ -> Logger.error("Failed to join Beacon users scope for tenant #{tenant_id}") + end @doc """ Returns the count of all connected clients for a tenant for the cluster. @@ -75,13 +84,13 @@ defmodule Realtime.UsersCounter do """ @spec scope(String.t()) :: atom() def scope(tenant_id) do - shards = Application.get_env(:realtime, :users_scope_shards) + shards = Application.fetch_env!(:realtime, :users_scope_shards) shard = :erlang.phash2(tenant_id, shards) :"users_#{shard}" end def scopes() do - shards = Application.get_env(:realtime, :users_scope_shards) + shards = Application.fetch_env!(:realtime, :users_scope_shards) Enum.map(0..(shards - 1), fn shard -> :"users_#{shard}" end) end end diff --git a/mix.exs b/mix.exs index 50dfb0b53..d495f4f33 100644 --- a/mix.exs +++ b/mix.exs @@ -81,6 +81,7 @@ defmodule Realtime.MixProject do {:mint, "~> 1.4"}, {:logflare_logger_backend, "~> 0.11"}, {:syn, "~> 3.3"}, + {:beacon, path: "./beacon"}, {:cachex, "~> 4.0"}, {:open_api_spex, "~> 3.16"}, {:corsica, "~> 2.0"}, diff --git a/test/realtime/gen_rpc_pub_sub_test.exs b/test/realtime/gen_rpc_pub_sub_test.exs index 297394b14..4c5ded562 100644 --- a/test/realtime/gen_rpc_pub_sub_test.exs +++ b/test/realtime/gen_rpc_pub_sub_test.exs @@ -71,7 +71,7 @@ defmodule Realtime.GenRpcPubSubTest do # Avoid port collision client_config_per_node = %{ - :"main@127.0.0.1" => 5369, + :"main@127.0.0.1" => 5969, :"#{us_node}@127.0.0.1" => 16970, :"#{ap2_nodeX}@127.0.0.1" => 16971, :"#{ap2_nodeY}@127.0.0.1" => 16972 diff --git a/test/realtime/user_counter_test.exs b/test/realtime/user_counter_test.exs index d01b43640..f7725885d 100644 --- a/test/realtime/user_counter_test.exs +++ b/test/realtime/user_counter_test.exs @@ -5,8 +5,9 @@ defmodule Realtime.UsersCounterTest do setup_all do tenant_id = random_string() - {nodes, count} = generate_load(tenant_id) - %{tenant_id: tenant_id, count: count, nodes: nodes} + count = generate_load(tenant_id) + + %{tenant_id: tenant_id, count: count, nodes: Node.list()} end describe "add/1" do @@ -17,12 +18,13 @@ defmodule Realtime.UsersCounterTest do @aux_mod (quote do defmodule Aux do - def ping(), - do: - spawn(fn -> - Process.sleep(15000) - :pong - end) + def ping() do + spawn(fn -> Process.sleep(:infinity) end) + end + + def join(pid, group) do + UsersCounter.add(pid, group) + end end end) @@ -33,9 +35,19 @@ defmodule Realtime.UsersCounterTest do assert UsersCounter.add(self(), tenant_id) == :ok Process.sleep(1000) counts = UsersCounter.tenant_counts() + assert counts[tenant_id] == expected + 1 + assert map_size(counts) >= 61 + + counts = Beacon.local_member_counts(:users) + + assert counts[tenant_id] == 1 + assert map_size(counts) >= 1 - assert map_size(counts) >= 41 + counts = Beacon.member_counts(:users) + + assert counts[tenant_id] == expected + 1 + assert map_size(counts) >= 61 end end @@ -51,6 +63,8 @@ defmodule Realtime.UsersCounterTest do assert another_node_counts[tenant_id] == 2 assert map_size(another_node_counts) == 21 + + assert Beacon.local_member_counts(:users) == %{tenant_id => 1} end end @@ -62,46 +76,71 @@ defmodule Realtime.UsersCounterTest do end describe "tenant_users/2" do - test "returns count of connected clients for tenant on target cluster", %{tenant_id: tenant_id} do - {:ok, node} = Clustered.start(@aux_mod) - pid = Rpc.call(node, Aux, :ping, []) - UsersCounter.add(pid, tenant_id) - assert UsersCounter.tenant_users(node, tenant_id) == 1 + test "returns count of connected clients for tenant on target cluster", %{tenant_id: tenant_id, nodes: nodes} do + node = hd(nodes) + assert UsersCounter.tenant_users(node, tenant_id) == 2 + + assert Beacon.member_count(:users, tenant_id, node) == 2 end end - defp generate_load(tenant_id, n_nodes \\ 2, processes \\ 2) do - nodes = - for i <- 1..n_nodes do - # Avoid port collision - extra_config = [ - {:gen_rpc, :tcp_server_port, 15970 + i} - ] - - {:ok, node} = Clustered.start(@aux_mod, extra_config: extra_config, phoenix_port: 4012 + i) - - for _ <- 1..processes do - pid = Rpc.call(node, Aux, :ping, []) - - for _ <- 1..10 do - # replicate same pid added multiple times concurrently - Task.start(fn -> - UsersCounter.add(pid, tenant_id) - Process.sleep(10000) - end) - - # noisy neighbors to test handling of bigger loads on concurrent calls - Task.start(fn -> - pid = Rpc.call(node, Aux, :ping, []) - UsersCounter.add(pid, random_string()) - Process.sleep(10000) - end) - end - end - + defp generate_load(tenant_id) do + processes = 2 + + nodes = %{ + :"main@127.0.0.1" => 5969, + :"us_node@127.0.0.1" => 16980, + :"ap2_nodeX@127.0.0.1" => 16981, + :"ap2_nodeY@127.0.0.1" => 16982 + } + + regions = %{ + :"us_node@127.0.0.1" => "us-east-1", + :"ap2_nodeX@127.0.0.1" => "ap-southeast-2", + :"ap2_nodeY@127.0.0.1" => "ap-southeast-2" + } + + on_exit(fn -> Application.put_env(:gen_rpc, :client_config_per_node, {:internal, %{}}) end) + Application.put_env(:gen_rpc, :client_config_per_node, {:internal, nodes}) + + nodes + |> Enum.filter(fn {node, _port} -> node != Node.self() end) + |> Enum.with_index(1) + |> Enum.each(fn {{node, gen_rpc_port}, i} -> + # Avoid port collision + extra_config = [ + {:gen_rpc, :tcp_server_port, gen_rpc_port}, + {:gen_rpc, :client_config_per_node, {:internal, nodes}}, + {:realtime, :users_scope_broadcast_interval_in_ms, 100}, + {:realtime, :region, regions[node]} + ] + + node_name = node + |> to_string() + |> String.split("@") + |> hd() + |> String.to_atom() + + {:ok, node} = Clustered.start(@aux_mod, name: node_name, extra_config: extra_config, phoenix_port: 4012 + i) + + for _ <- 1..processes do + pid = Rpc.call(node, Aux, :ping, []) + + for _ <- 1..10 do + # replicate same pid added multiple times concurrently + Task.start(fn -> + Rpc.call(node, Aux, :join, [pid, tenant_id]) + end) + + # noisy neighbors to test handling of bigger loads on concurrent calls + Task.start(fn -> + Rpc.call(node, Aux, :join, [pid, random_string()]) + end) + end end + end) - {nodes, n_nodes * processes} + 3 * processes end end