Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions lib/pg2pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ defmodule Pg2PubSub do
"""
require Logger

@typedoc """
The name of a topic
"""
@type topic_name :: any

@doc """
Subscribe to a topic

Expand All @@ -23,22 +28,26 @@ defmodule Pg2PubSub do
iex> Pg2PubSub.subscribe("foo")
{:ok, :already_registered}

# subscribing to non-string topics
iex> Pg2PubSub.subscribe({"job", 14})
{:ok, :registered}

"""
@spec subscribe(String.t) :: term
@spec subscribe(topic_name) :: term
def subscribe(topic) do
:ok = Logger.debug "#{inspect self} subscribing to #{topic}..."
:ok = Logger.debug "#{inspect self} subscribing to #{inspect topic}..."
:pg2.create(topic)
case :pg2.get_members(topic) do
{:error, error} ->
:ok = Logger.error "Publisher failed to get members of topic #{topic}: #{error}"
:ok = Logger.error "Publisher failed to get members of topic #{inspect topic}: #{error}"
{:error, error}
pids ->
unless self in pids do
:pg2.join(topic, self)
:ok = Logger.debug "#{inspect self} subscribed to #{topic}"
:ok = Logger.debug "#{inspect self} subscribed to #{inspect topic}"
{:ok, :registered}
else
:ok = Logger.debug "#{inspect self} already subscribed to #{topic}"
:ok = Logger.debug "#{inspect self} already subscribed to #{inspect topic}"
{:ok, :already_registered}
end
end
Expand All @@ -64,16 +73,21 @@ defmodule Pg2PubSub do
iex> Pg2PubSub.unsubscribe("foo")
:ok

# unsubscribing from non-string topics
iex> Pg2PubSub.subscribe({"job", 14})
{:ok, :registered}
iex> Pg2PubSub.unsubscribe({"job", 14})
:ok
"""
@spec unsubscribe(String.t) :: term
@spec unsubscribe(topic_name) :: term
def unsubscribe(topic) do
:ok = Logger.debug "#{inspect self} unsubscribing from #{topic}..."
:ok = Logger.debug "#{inspect self} unsubscribing from #{inspect topic}..."
case :pg2.leave(topic, self) do
{:error, {:no_such_group, _topic}} ->
:ok = Logger.warn "no subscribers for topic #{topic}"
:ok = Logger.warn "no subscribers for topic #{inspect topic}"
:ok
:ok ->
:ok = Logger.debug "#{inspect self} unsubscribed from #{topic}"
:ok = Logger.debug "#{inspect self} unsubscribed from #{inspect topic}"
:ok
end
end
Expand All @@ -97,7 +111,7 @@ defmodule Pg2PubSub do
"bar"

"""
@spec publish(String.t, any) :: :ok
@spec publish(topic_name, any) :: :ok
def publish(topic, msg) do
case :pg2.get_members(topic) do
{:error, err} ->
Expand Down
13 changes: 11 additions & 2 deletions test/pg2pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Pg2PubSubTest do

setup do
:pg2.which_groups
|> Enum.map fn (x) -> :pg2.delete(x) end
|> Enum.map(fn (x) -> :pg2.delete(x) end)
:ok
end

Expand Down Expand Up @@ -42,4 +42,13 @@ defmodule Pg2PubSubTest do
assert_receive :foo
refute_receive :foo # check only received once
end
end

test "supports more complex topics than just strings" do
Pg2PubSub.subscribe({:context, "bar"})

Pg2PubSub.publish({:context, "bar"}, :bar)
Pg2PubSub.unsubscribe({:context, "bar"})

assert_receive :bar
end
end