Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/beacon_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Beacon Tests
defaults:
run:
shell: bash
working-directory: ./beacon
on:
pull_request:
paths:
- "beacon/**"

push:
branches:
- main

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
MIX_ENV: test

jobs:
tests:
name: Tests & Lint
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Setup elixir
id: beam
uses: erlef/setup-beam@v1
with:
otp-version: 27.x # Define the OTP version [required]
elixir-version: 1.18.x # Define the elixir version [required]
- name: Install dependencies
run: mix deps.get
- name: Start epmd
run: epmd -daemon
- name: Run tests
run: MIX_ENV=test mix test
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Check for warnings
run: mix compile --force --warnings-as-errors
- name: Run format check
run: mix format --check-formatted
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ RUN mix local.hex --force && \

# install mix dependencies
COPY mix.exs mix.lock ./
COPY beacon beacon
RUN mix deps.get --only $MIX_ENV
RUN mkdir config

Expand Down
4 changes: 4 additions & 0 deletions beacon/.formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
23 changes: 23 additions & 0 deletions beacon/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
beacon-*.tar

# Temporary files, for example, from tests.
/tmp/
60 changes: 60 additions & 0 deletions beacon/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Beacon

Beacon is a scalable process group manager. The main use case for this library is to have membership counts available on the cluster without spamming whenever a process joins or leaves a group. A node can have thousands of processes joining and leaving hundreds of groups while sending just the membership count to other nodes.

The main features are:

* Process pids are available only to the node the where the processes reside;
* Groups are partitioned locally to allow greater concurrency while joining different groups;
* Group counts are periodically broadcasted (defaults to every 5 seconds) to update group membership numbers to all participating nodes;
* Sub-cluster nodes join by using same scope;

## Installation

The package can be installed by adding `beacon` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:beacon, "~> 1.0"}
]
end
```

## Using

Add Beacon to your application's supervision tree specifying a scope name (here it's `:users`)

```elixir
def start(_type, _args) do
children =
[
{Beacon, :users},
# Or passing options:
# {Beacon, [:users, opts]}
# See Beacon.start_link/2 for the options
```

Now process can join groups

```elixir
iex> pid = self()
#PID<0.852.0>
iex> Beacon.join(:users, {:tenant, 123}, pid)
:ok
iex> Beacon.local_member_count(:users, {:tenant, 123})
1
iex> Beacon.local_members(:users, {:tenant, 123})
[#PID<0.852.0>]
iex> Beacon.local_member?(:users, {:tenant, 123}, pid)
true
```

From another node part of the same scope:

```elixir
iex> Beacon.member_counts(:users)
%{{:tenant, 123} => 1}
iex> Beacon.member_count(:users, {:tenant, 123})
1
```
4 changes: 4 additions & 0 deletions beacon/config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import Config

# Print nothing during tests unless captured or a test failure happens
config :logger, backends: [], level: :debug
153 changes: 153 additions & 0 deletions beacon/lib/beacon.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
defmodule Beacon do
@moduledoc """
Distributed process group membership tracking.
"""

alias Beacon.Partition
alias Beacon.Scope

@type group :: any
@type start_option ::
{:partitions, pos_integer()} | {:broadcast_interval_in_ms, non_neg_integer()}

@doc "Returns a supervisor child specification for a Beacon scope"
def child_spec([scope]) when is_atom(scope), do: child_spec([scope, []])
def child_spec(scope) when is_atom(scope), do: child_spec([scope, []])

def child_spec([scope, opts]) when is_atom(scope) and is_list(opts) do
%{
id: Beacon,
start: {__MODULE__, :start_link, [scope, opts]},
type: :supervisor
}
end

@doc """
Starts the Beacon supervision tree for `scope`.

Options:

* `:partitions` - number of partitions to use (default: number of schedulers online)
* `:broadcast_interval_in_ms`: - interval in milliseconds to broadcast membership counts to other nodes (default: 5000 ms)
* `:message_module` - module implementing `Beacon.Adapter` behaviour (default: `Beacon.Adapter.ErlDist`)
"""
@spec start_link(atom, [start_option]) :: Supervisor.on_start()
def start_link(scope, opts \\ []) when is_atom(scope) do
{partitions, opts} = Keyword.pop(opts, :partitions, System.schedulers_online())
broadcast_interval_in_ms = Keyword.get(opts, :broadcast_interval_in_ms)

if not (is_integer(partitions) and partitions >= 1) do
raise ArgumentError,
"expected :partitions to be a positive integer, got: #{inspect(partitions)}"
end

if broadcast_interval_in_ms != nil and
not (is_integer(broadcast_interval_in_ms) and broadcast_interval_in_ms > 0) do
raise ArgumentError,
"expected :broadcast_interval_in_ms to be a positive integer, got: #{inspect(broadcast_interval_in_ms)}"
end

Beacon.Supervisor.start_link(scope, partitions, opts)
end

@doc "Join pid to group in scope"
@spec join(atom, any, pid) :: :ok | {:error, :not_local}
def join(_scope, _group, pid) when is_pid(pid) and node(pid) != node(), do: {:error, :not_local}

def join(scope, group, pid) when is_atom(scope) and is_pid(pid) do
Partition.join(Beacon.Supervisor.partition(scope, group), group, pid)
end

@doc "Leave pid from group in scope"
@spec leave(atom, group, pid) :: :ok
def leave(scope, group, pid) when is_atom(scope) and is_pid(pid) do
Partition.leave(Beacon.Supervisor.partition(scope, group), group, pid)
end

@doc "Get total members count per group in scope"
@spec member_counts(atom) :: %{group => non_neg_integer}
def member_counts(scope) when is_atom(scope) do
remote_counts = Scope.member_counts(scope)

scope
|> local_member_counts()
|> Map.merge(remote_counts, fn _k, v1, v2 -> v1 + v2 end)
end

@doc "Get total member count of group in scope"
@spec member_count(atom, group) :: non_neg_integer
def member_count(scope, group) do
local_member_count(scope, group) + Scope.member_count(scope, group)
end

@doc "Get total member count of group in scope on specific node"
@spec member_count(atom, group, node) :: non_neg_integer
def member_count(scope, group, node) when node == node(), do: local_member_count(scope, group)
def member_count(scope, group, node), do: Scope.member_count(scope, group, node)

@doc "Get local members of group in scope"
@spec local_members(atom, group) :: [pid]
def local_members(scope, group) when is_atom(scope) do
Partition.members(Beacon.Supervisor.partition(scope, group), group)
end

@doc "Get local member count of group in scope"
@spec local_member_count(atom, group) :: non_neg_integer
def local_member_count(scope, group) when is_atom(scope) do
Partition.member_count(Beacon.Supervisor.partition(scope, group), group)
end

@doc "Get local members count per group in scope"
@spec local_member_counts(atom) :: %{group => non_neg_integer}
def local_member_counts(scope) when is_atom(scope) do
Enum.reduce(Beacon.Supervisor.partitions(scope), %{}, fn partition_name, acc ->
Map.merge(acc, Partition.member_counts(partition_name))
end)
end

@doc "Check if pid is a local member of group in scope"
@spec local_member?(atom, group, pid) :: boolean
def local_member?(scope, group, pid) when is_atom(scope) and is_pid(pid) do
Partition.member?(Beacon.Supervisor.partition(scope, group), group, pid)
end

@doc "Get all local groups in scope"
@spec local_groups(atom) :: [group]
def local_groups(scope) when is_atom(scope) do
Enum.flat_map(Beacon.Supervisor.partitions(scope), fn partition_name ->
Partition.groups(partition_name)
end)
end

@doc "Get local group count in scope"
@spec local_group_count(atom) :: non_neg_integer
def local_group_count(scope) when is_atom(scope) do
Enum.sum_by(Beacon.Supervisor.partitions(scope), fn partition_name ->
Partition.group_count(partition_name)
end)
end

@doc "Get groups in scope"
@spec groups(atom) :: [group]
def groups(scope) when is_atom(scope) do
remote_groups = Scope.groups(scope)

scope
|> local_groups()
|> MapSet.new()
|> MapSet.union(remote_groups)
|> MapSet.to_list()
end

@doc "Get group count in scope"
@spec group_count(atom) :: non_neg_integer
def group_count(scope) when is_atom(scope) do
remote_groups = Scope.groups(scope)

scope
|> local_groups()
|> MapSet.new()
|> MapSet.union(remote_groups)
|> MapSet.size()
end
end
17 changes: 17 additions & 0 deletions beacon/lib/beacon/adapter.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Beacon.Adapter do
@moduledoc """
Behaviour module for Beacon messaging adapters.
"""

@doc "Register the current process to receive messages for the given scope"
@callback register(scope :: atom) :: :ok

@doc "Broadcast a message to all nodes in the given scope"
@callback broadcast(scope :: atom, message :: term) :: any

@doc "Broadcast a message to specific nodes in the given scope"
@callback broadcast(scope :: atom, [node], message :: term) :: any

@doc "Send a message to a specific node in the given scope"
@callback send(scope :: atom, node, message :: term) :: any
end
30 changes: 30 additions & 0 deletions beacon/lib/beacon/adapter/erl_dist.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Beacon.Adapter.ErlDist do
@moduledoc false

import Kernel, except: [send: 2]

@behaviour Beacon.Adapter

@impl true
def register(scope) do
Process.register(self(), Beacon.Supervisor.name(scope))
:ok
end

@impl true
def broadcast(scope, message) do
name = Beacon.Supervisor.name(scope)
Enum.each(Node.list(), fn node -> :erlang.send({name, node}, message, [:noconnect]) end)
end

@impl true
def broadcast(scope, nodes, message) do
name = Beacon.Supervisor.name(scope)
Enum.each(nodes, fn node -> :erlang.send({name, node}, message, [:noconnect]) end)
end

@impl true
def send(scope, node, message) do
:erlang.send({Beacon.Supervisor.name(scope), node}, message, [:noconnect])
end
end
Loading