Skip to content

Commit e66554d

Browse files
committed
feat: add Beacon
1 parent 05df771 commit e66554d

24 files changed

+1632
-48
lines changed

.github/workflows/beacon_tests.yml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: Beacon Tests
2+
defaults:
3+
run:
4+
shell: bash
5+
working-directory: ./beacon
6+
on:
7+
pull_request:
8+
paths:
9+
- "beacon/**"
10+
11+
push:
12+
branches:
13+
- main
14+
15+
concurrency:
16+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
17+
cancel-in-progress: true
18+
19+
env:
20+
MIX_ENV: test
21+
22+
jobs:
23+
tests:
24+
name: Tests & Lint
25+
runs-on: ubuntu-latest
26+
27+
steps:
28+
- uses: actions/checkout@v2
29+
- name: Setup elixir
30+
id: beam
31+
uses: erlef/setup-beam@v1
32+
with:
33+
otp-version: 27.x # Define the OTP version [required]
34+
elixir-version: 1.18.x # Define the elixir version [required]
35+
- name: Install dependencies
36+
run: mix deps.get
37+
- name: Start epmd
38+
run: epmd -daemon
39+
- name: Run tests
40+
run: MIX_ENV=test mix test
41+
env:
42+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
43+
- name: Check for warnings
44+
run: mix compile --force --warnings-as-errors
45+
- name: Run format check
46+
run: mix format --check-formatted

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ RUN mix local.hex --force && \
3434

3535
# install mix dependencies
3636
COPY mix.exs mix.lock ./
37+
COPY beacon beacon
3738
RUN mix deps.get --only $MIX_ENV
3839
RUN mkdir config
3940

beacon/.formatter.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Used by "mix format"
2+
[
3+
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
4+
]

beacon/.gitignore

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# The directory Mix will write compiled artifacts to.
2+
/_build/
3+
4+
# If you run "mix test --cover", coverage assets end up here.
5+
/cover/
6+
7+
# The directory Mix downloads your dependencies sources to.
8+
/deps/
9+
10+
# Where third-party dependencies like ExDoc output generated docs.
11+
/doc/
12+
13+
# If the VM crashes, it generates a dump, let's ignore it too.
14+
erl_crash.dump
15+
16+
# Also ignore archive artifacts (built via "mix archive.build").
17+
*.ez
18+
19+
# Ignore package tarball (built via "mix hex.build").
20+
beacon-*.tar
21+
22+
# Temporary files, for example, from tests.
23+
/tmp/

beacon/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Beacon
2+
3+
Beacon is a scalable process group manager. The main use case for this library is to have membership counts available on the cluster without spamming whenever a process joins or leaves a group. A node can have thousands of processes joining and leaving hundreds of groups while sending just the membership count to other nodes.
4+
5+
The main features are:
6+
7+
* Process pids are available only to the node the where the processes reside;
8+
* Groups are partitioned locally to allow greater concurrency while joining different groups;
9+
* Group counts are periodically broadcasted (defaults to every 5 seconds) to update group membership numbers to all participating nodes;
10+
* Sub-cluster nodes join by using same scope;
11+
12+
## Installation
13+
14+
The package can be installed by adding `beacon` to your list of dependencies in `mix.exs`:
15+
16+
```elixir
17+
def deps do
18+
[
19+
{:beacon, "~> 1.0"}
20+
]
21+
end
22+
```
23+
24+
## Using
25+
26+
Add Beacon to your application's supervision tree specifying a scope name (here it's `:users`)
27+
28+
```elixir
29+
def start(_type, _args) do
30+
children =
31+
[
32+
{Beacon, :users},
33+
# Or passing options:
34+
# {Beacon, [:users, opts]}
35+
# See Beacon.start_link/2 for the options
36+
```
37+
38+
Now process can join groups
39+
40+
```elixir
41+
iex> pid = self()
42+
#PID<0.852.0>
43+
iex> Beacon.join(:users, {:tenant, 123}, pid)
44+
:ok
45+
iex> Beacon.local_member_count(:users, {:tenant, 123})
46+
1
47+
iex> Beacon.local_members(:users, {:tenant, 123})
48+
[#PID<0.852.0>]
49+
iex> Beacon.local_member?(:users, {:tenant, 123}, pid)
50+
true
51+
```
52+
53+
From another node part of the same scope:
54+
55+
```elixir
56+
iex> Beacon.member_counts(:users)
57+
%{{:tenant, 123} => 1}
58+
iex> Beacon.member_count(:users, {:tenant, 123})
59+
1
60+
```

beacon/config/config.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import Config
2+
3+
# Print nothing during tests unless captured or a test failure happens
4+
config :logger, backends: [], level: :debug

beacon/lib/beacon.ex

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
defmodule Beacon do
2+
@moduledoc """
3+
Distributed process group membership tracking.
4+
"""
5+
6+
alias Beacon.Partition
7+
alias Beacon.Scope
8+
9+
@type group :: any
10+
@type start_option :: {:partitions, pos_integer()} | {:broadcast_interval_in_ms, non_neg_integer()}
11+
12+
@doc "Returns a supervisor child specification for a Beacon scope"
13+
def child_spec([scope]) when is_atom(scope), do: child_spec([scope, []])
14+
def child_spec(scope) when is_atom(scope), do: child_spec([scope, []])
15+
16+
def child_spec([scope, opts]) when is_atom(scope) and is_list(opts) do
17+
%{
18+
id: Beacon,
19+
start: {__MODULE__, :start_link, [scope, opts]},
20+
type: :supervisor
21+
}
22+
end
23+
24+
@doc """
25+
Starts the Beacon supervision tree for `scope`.
26+
27+
Options:
28+
29+
* `:partitions` - number of partitions to use (default: number of schedulers online)
30+
* `:broadcast_interval_in_ms`: - interval in milliseconds to broadcast membership counts to other nodes (default: 5000 ms)
31+
* `:message_module` - module implementing `Beacon.Adapter` behaviour (default: `Beacon.Adapter.ErlDist`)
32+
"""
33+
@spec start_link(atom, [start_option]) :: Supervisor.on_start()
34+
def start_link(scope, opts \\ []) when is_atom(scope) do
35+
{partitions, opts} = Keyword.pop(opts, :partitions, System.schedulers_online())
36+
broadcast_interval_in_ms = Keyword.get(opts, :broadcast_interval_in_ms)
37+
38+
if not (is_integer(partitions) and partitions >= 1) do
39+
raise ArgumentError,
40+
"expected :partitions to be a positive integer, got: #{inspect(partitions)}"
41+
end
42+
43+
if broadcast_interval_in_ms != nil and
44+
not (is_integer(broadcast_interval_in_ms) and broadcast_interval_in_ms > 0) do
45+
raise ArgumentError,
46+
"expected :broadcast_interval_in_ms to be a positive integer, got: #{inspect(broadcast_interval_in_ms)}"
47+
end
48+
49+
Beacon.Supervisor.start_link(scope, partitions, opts)
50+
end
51+
52+
@doc "Join pid to group in scope"
53+
@spec join(atom, any, pid) :: :ok | {:error, :not_local}
54+
def join(_scope, _group, pid) when is_pid(pid) and node(pid) != node(), do: {:error, :not_local}
55+
def join(scope, group, pid) when is_atom(scope) and is_pid(pid) do
56+
Partition.join(Beacon.Supervisor.partition(scope, group), group, pid)
57+
end
58+
59+
@doc "Leave pid from group in scope"
60+
@spec leave(atom, group, pid) :: :ok
61+
def leave(scope, group, pid) when is_atom(scope) and is_pid(pid) do
62+
Partition.leave(Beacon.Supervisor.partition(scope, group), group, pid)
63+
end
64+
65+
@doc "Get total members count per group in scope"
66+
@spec member_counts(atom) :: %{group => non_neg_integer}
67+
def member_counts(scope) when is_atom(scope) do
68+
remote_counts = Scope.member_counts(scope)
69+
70+
scope
71+
|> local_member_counts()
72+
|> Map.merge(remote_counts, fn _k, v1, v2 -> v1 + v2 end)
73+
end
74+
75+
@doc "Get total member count of group in scope"
76+
@spec member_count(atom, group) :: non_neg_integer
77+
def member_count(scope, group) do
78+
local_member_count(scope, group) + Scope.member_count(scope, group)
79+
end
80+
81+
@doc "Get total member count of group in scope on specific node"
82+
@spec member_count(atom, group, node) :: non_neg_integer
83+
def member_count(scope, group, node) when node == node(), do: local_member_count(scope, group)
84+
def member_count(scope, group, node), do: Scope.member_count(scope, group, node)
85+
86+
@doc "Get local members of group in scope"
87+
@spec local_members(atom, group) :: [pid]
88+
def local_members(scope, group) when is_atom(scope) do
89+
Partition.members(Beacon.Supervisor.partition(scope, group), group)
90+
end
91+
92+
@doc "Get local member count of group in scope"
93+
@spec local_member_count(atom, group) :: non_neg_integer
94+
def local_member_count(scope, group) when is_atom(scope) do
95+
Partition.member_count(Beacon.Supervisor.partition(scope, group), group)
96+
end
97+
98+
@doc "Get local members count per group in scope"
99+
@spec local_member_counts(atom) :: %{group => non_neg_integer}
100+
def local_member_counts(scope) when is_atom(scope) do
101+
Enum.reduce(Beacon.Supervisor.partitions(scope), %{}, fn partition_name, acc ->
102+
Map.merge(acc, Partition.member_counts(partition_name))
103+
end)
104+
end
105+
106+
@doc "Check if pid is a local member of group in scope"
107+
@spec local_member?(atom, group, pid) :: boolean
108+
def local_member?(scope, group, pid) when is_atom(scope) and is_pid(pid) do
109+
Partition.member?(Beacon.Supervisor.partition(scope, group), group, pid)
110+
end
111+
112+
@doc "Get all local groups in scope"
113+
@spec local_groups(atom) :: [group]
114+
def local_groups(scope) when is_atom(scope) do
115+
Enum.flat_map(Beacon.Supervisor.partitions(scope), fn partition_name ->
116+
Partition.groups(partition_name)
117+
end)
118+
end
119+
120+
@doc "Get local group count in scope"
121+
@spec local_group_count(atom) :: non_neg_integer
122+
def local_group_count(scope) when is_atom(scope) do
123+
Enum.sum_by(Beacon.Supervisor.partitions(scope), fn partition_name ->
124+
Partition.group_count(partition_name)
125+
end)
126+
end
127+
128+
@doc "Get groups in scope"
129+
@spec groups(atom) :: [group]
130+
def groups(scope) when is_atom(scope) do
131+
remote_groups = Scope.groups(scope)
132+
133+
scope
134+
|> local_groups()
135+
|> MapSet.new()
136+
|> MapSet.union(remote_groups)
137+
|> MapSet.to_list()
138+
end
139+
140+
@doc "Get group count in scope"
141+
@spec group_count(atom) :: non_neg_integer
142+
def group_count(scope) when is_atom(scope) do
143+
remote_groups = Scope.groups(scope)
144+
145+
scope
146+
|> local_groups()
147+
|> MapSet.new()
148+
|> MapSet.union(remote_groups)
149+
|> MapSet.size()
150+
end
151+
end

beacon/lib/beacon/adapter.ex

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule Beacon.Adapter do
2+
@moduledoc """
3+
Behaviour module for Beacon messaging adapters.
4+
"""
5+
6+
@doc "Register the current process to receive messages for the given scope"
7+
@callback register(scope :: atom) :: :ok
8+
9+
@doc "Broadcast a message to all nodes in the given scope"
10+
@callback broadcast(scope :: atom, message :: term) :: any
11+
12+
@doc "Broadcast a message to specific nodes in the given scope"
13+
@callback broadcast(scope :: atom, [node], message :: term) :: any
14+
15+
@doc "Send a message to a specific node in the given scope"
16+
@callback send(scope :: atom, node, message :: term) :: any
17+
end
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
defmodule Beacon.Adapter.ErlDist do
2+
@moduledoc false
3+
4+
import Kernel, except: [send: 2]
5+
6+
@behaviour Beacon.Adapter
7+
8+
@impl true
9+
def register(scope) do
10+
Process.register(self(), Beacon.Supervisor.name(scope))
11+
:ok
12+
end
13+
14+
@impl true
15+
def broadcast(scope, message) do
16+
name = Beacon.Supervisor.name(scope)
17+
Enum.each(Node.list(), fn node -> :erlang.send({name, node}, message, [:noconnect]) end)
18+
end
19+
20+
@impl true
21+
def broadcast(scope, nodes, message) do
22+
name = Beacon.Supervisor.name(scope)
23+
Enum.each(nodes, fn node -> :erlang.send({name, node}, message, [:noconnect]) end)
24+
end
25+
26+
@impl true
27+
def send(scope, node, message) do
28+
:erlang.send({Beacon.Supervisor.name(scope), node}, message, [:noconnect])
29+
end
30+
end

0 commit comments

Comments
 (0)