Skip to content
This repository was archived by the owner on Aug 9, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions priv/vmq_server.schema
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@
{default, off},
{datatype, flag}
]}.
%%
%% @doc Allows multiple clients to subscribe to a group topic. A group topic
%% enables the distribution of messages among the different subscriber group
%% members. To start a new subscriber group or to become a group member a client
%% must prefix a SUBSCRIBE topic with $GROUP-<mygroupname> followed by the
%% topic.
%% Assuming multiple clients are subscribed to $GROUP-mygroup/a/b/c. Messages
%% to a/b/c are randomly distributed among the member of the subscriber group 'mygroup'.
%% (non-standard behaviour!).
{mapping, "allow_subscriber_groups", "vmq_server.allow_subscriber_groups", [
{default, off},
{datatype, flag},
hidden
]}.

%% @doc If allow_multiple_sessions is enabled 'queue_deliver_mode' will specify
%% how the messages are delivered to multiple sessions. Default is 'fanout' which
Expand Down
7 changes: 1 addition & 6 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
%%-*- mode: erlang -*-
{require_otp_vsn, "17"}.
{cover_enabled, true}.
{erl_opts, [{platform_define, "^[0-9]+", namespaced_types},
{parse_transform, lager_transform},
warnings_as_errors,
debug_info]}.
{ct_use_short_names, true}.
{ct_extra_params, "-cover test/cover.spec -epmd_port 4369"}.
{xref_checks, []}.
{deps, [
{lager, "3.0.2"},
Expand All @@ -33,13 +30,11 @@
{vmq_commons, {git, "git://github.com/erlio/vmq_commons.git", {branch, "master"}}},
{vmq_plugin, {git, "git://github.com/erlio/vmq_plugin.git", {branch, "master"}}},

%% simulating netsplits for dummies, only needed in test cases
{epmdpxy, {git, "git://github.com/dergraf/epmdpxy", {branch, "master"}}},

{time_compat, {git, "git://github.com/lasp-lang/time_compat.git", {branch, "master"}}}
]}.
{overrides, [
{override, jobs, [{deps, [{parse_trans, "2.9.0"}]}]},
{override, sext, [{src_dirs, ["src"]}]}
]}.

{cover_enabled, true}.
46 changes: 39 additions & 7 deletions src/vmq_cluster_com.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ process(<<"msg", L:32, Bin:L/binary, Rest/binary>>) ->
#vmq_msg{mountpoint=MP,
routing_key=Topic,
reg_view=RegView} = Msg = binary_to_term(Bin),
_ = vmq_reg_view:fold(RegView, MP, Topic, fun publish/2, Msg),
_ = vmq_reg:publish(RegView, MP, Topic, fun publish_fold_fun/2, Msg),
process(Rest);
process(<<"enq", L:32, Bin:L/binary, Rest/binary>>) ->
{CallerPid, Ref, {enqueue, QueuePid, Msgs}} = binary_to_term(Bin),
Expand All @@ -165,9 +165,41 @@ process(<<"enq", L:32, Bin:L/binary, Rest/binary>>) ->
process(Rest);
process(<<>>) -> ok.

publish({_, _} = SubscriberIdAndQoS, Msg) ->
vmq_reg:publish(SubscriberIdAndQoS, Msg);
publish(_Node, Msg) ->
%% we ignore remote subscriptions, they are already covered
%% by original publisher
Msg.
publish_fold_fun({{_,_}, _} = SubscriberIdAndQoS, Acc) ->
%% Local Subscription
vmq_reg:publish_fold_fun(SubscriberIdAndQoS, Acc);
publish_fold_fun({{_, Node, _}, _} = GroupSub, {Msg, _} = Acc) when Node == node() ->
%% Only handle SubscriberGroups for local node
%% Why this Case Clause??
%%
%% Assuming you have two nodes A and B.
%% Node A has one subscriber with a/b
%% and one subscriber with $GROUP-test/a/b
%%
%% A message with a routing key a/b issued on Node B
%% would result in two matching routing entries for Node A,
%% not just one in the case of 'normal' subscriptions.
%%
%% For this reason we have to filter out the messages
%% not targetted to the subscriber group member.
%%
%% Note: Currently the message gets at max two times,
%% once for the subscriber group and once for the
%% 'normal' subscription.
case Msg#vmq_msg.subscriber_group of
undefined ->
%% Filter out the message
Acc;
Group when is_binary(Group) ->
%% It's a message for the subscriber group member
vmq_reg:publish_fold_fun(GroupSub, Acc)
end;
publish_fold_fun(_, Acc) ->
%% Could be Messages targetted for remote subscriptions that are already
%% covered by the origin publisher
%%
%% or
%%
%% could be messages targetted for subscriber groups that are already
%% covered by the origin publisher
Acc.
1 change: 1 addition & 0 deletions src/vmq_config_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ register_config_() ->
["allow_anonymous",
"trade_consistency",
"allow_multiple_sessions",
"allow_subscriber_groups",
"queue_deliver_mode",
"queue_type",
"retry_interval",
Expand Down
13 changes: 9 additions & 4 deletions src/vmq_mqtt_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
retry_interval=20000 :: pos_integer(),
upgrade_qos=false :: boolean(),
trade_consistency=false :: boolean(),
allow_subscriber_groups=false :: boolean(),
reg_view=vmq_reg_trie :: atom()
}).

Expand All @@ -93,6 +94,7 @@ init(Peer, Opts) ->
end,
AllowAnonymous = vmq_config:get_env(allow_anonymous, false),
TradeConsistency = vmq_config:get_env(trade_consistency, false),
AllowSubscriberGroups = vmq_config:get_env(allow_subscriber_groups, false),
RetryInterval = vmq_config:get_env(retry_interval, 20),
MaxClientIdSize = vmq_config:get_env(max_client_id_size, 23),
MaxInflightMsgs = vmq_config:get_env(max_inflight_messages, 20),
Expand All @@ -113,6 +115,7 @@ init(Peer, Opts) ->
keep_alive_tref=TRef,
retry_interval=1000 * RetryInterval,
trade_consistency=TradeConsistency,
allow_subscriber_groups=AllowSubscriberGroups,
reg_view=RegView}}.

data_in(Data, SessionState) when is_binary(Data) ->
Expand Down Expand Up @@ -217,7 +220,7 @@ connected(#mqtt_publish{message_id=MessageId, topic=Topic,
Ret =
case {Topic, valid_msg_size(Payload, MaxMessageSize)} of
{[<<"$", _binary>> |_], _} ->
%% $SYS
%% $SYS and $GROUP
{State#state{recv_cnt=incr_msg_recv_cnt(RecvCnt)}, []};
{_, true} ->
Msg = #vmq_msg{routing_key=Topic,
Expand Down Expand Up @@ -344,8 +347,9 @@ connected(#mqtt_pubcomp{message_id=MessageId}, State) ->
end;
connected(#mqtt_subscribe{message_id=MessageId, topics=Topics}, State) ->
#state{subscriber_id=SubscriberId, username=User,
trade_consistency=Consistency} = State,
case vmq_reg:subscribe(Consistency, User, SubscriberId, Topics) of
trade_consistency=Consistency,
allow_subscriber_groups=SubscriberGroups} = State,
case vmq_reg:subscribe(Consistency, SubscriberGroups, User, SubscriberId, Topics) of
{ok, QoSs} ->
{NewState, Out} = send_frame(#mqtt_suback{message_id=MessageId,
qos_table=QoSs}, State),
Expand Down Expand Up @@ -588,7 +592,8 @@ auth_on_register(User, Password, State) ->
max_inflight_messages=?state_val(max_inflight_messages, Args, State),
retry_interval=?state_val(retry_interval, Args, State),
upgrade_qos=?state_val(upgrade_qos, Args, State),
trade_consistency=?state_val(trade_consistency, Args, State)
trade_consistency=?state_val(trade_consistency, Args, State),
allow_subscriber_groups=?state_val(allow_subscriber_groups, Args, State)
},
{ok, queue_opts(ChangedState, Args), ChangedState};
{error, Reason} ->
Expand Down
105 changes: 86 additions & 19 deletions src/vmq_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ online({change_state, NewSessionState, SessionPid}, State) ->
{next_state, online, change_session_state(NewSessionState, SessionPid, State)};
online({notify_recv, SessionPid}, #state{id=SId, sessions=Sessions} = State) ->
#session{queue=#queue{backup=Backup} = Queue} = Session = maps:get(SessionPid, Sessions),
cleanup_queue(SId, Backup),
_ = cleanup_queue(SId, Backup),
NewSessions = maps:update(SessionPid,
Session#session{queue=Queue#queue{backup=queue:new()}},
Sessions),
Expand Down Expand Up @@ -242,7 +242,7 @@ drain(drain_start, #state{id=SId, offline=#queue{queue=Q} = Queue,
%% instead of the erlang distribution link.
case vmq_cluster:remote_enqueue(node(RemoteQueue), {enqueue, RemoteQueue, Msgs}) of
ok ->
cleanup_queue(SId, DrainQ),
_ = cleanup_queue(SId, DrainQ),
case queue:len(NewQ) of
L when L > 0 ->
gen_fsm:send_event(self(), drain_start),
Expand Down Expand Up @@ -317,7 +317,7 @@ offline(expire_session, #state{id=SId, offline=#queue{queue=Q}} = State) ->
vmq_exo:decr_inactive_clients(),
vmq_exo:incr_expired_clients(),
vmq_reg:delete_subscriptions(SId),
cleanup_queue(SId, Q),
_ = cleanup_queue(SId, Q),
{stop, normal, State};
offline(Event, State) ->
lager:error("got unknown event in offline state ~p", [Event]),
Expand Down Expand Up @@ -466,18 +466,18 @@ del_session(SessionPid, #state{id=SId, sessions=Sessions} = State) ->
NewSessions = maps:remove(SessionPid, Sessions),
case maps:get(SessionPid, Sessions) of
#session{clean=true} = Session ->
cleanup_session(SId, Session),
{State#state{sessions=NewSessions}, Session};
Republish = cleanup_session(SId, Session, maps:size(NewSessions) == 0),
{State#state{sessions=NewSessions}, Session, Republish};
Session ->
%% give queue content of this session to other alive sessions
%% or to offline queue
{insert_from_session(Session, State#state{sessions=NewSessions}),
Session}
Session, []}
end.

handle_session_down(SessionPid, StateName,
#state{id=SId, waiting_call=WaitingCall} = State) ->
{NewState, DeletedSession} = del_session(SessionPid, State),
{NewState, DeletedSession, Republish} = del_session(SessionPid, State),
case {maps:size(NewState#state.sessions), StateName, WaitingCall} of
{0, wait_for_offline, {add_session, NewSessionPid, Opts, From}} ->
%% last session gone
Expand Down Expand Up @@ -515,6 +515,8 @@ handle_session_down(SessionPid, StateName,
%% clean session flag
vmq_exo:decr_active_clients(),
vmq_reg:delete_subscriptions(SId),
%% If possible republish to subscriber groups if available
republish(SId, Republish),
_ = vmq_plugin:all(on_client_gone, [SId]),
{stop, normal, NewState};
{0, OldStateName, _} ->
Expand Down Expand Up @@ -569,7 +571,7 @@ disconnect_sessions(#state{sessions=Sessions}) ->
change_session_state(NewState, SessionPid, #state{id=SId, sessions=Sessions} = State) ->

#session{queue=#queue{backup=Backup} = Queue} = Session = maps:get(SessionPid, Sessions),
cleanup_queue(SId, Backup),
_ = cleanup_queue(SId, Backup),
UpdatedSession = change_session_state(NewState, Session#session{queue=Queue#queue{backup=queue:new()}}),
NewSessions = maps:update(SessionPid, UpdatedSession, Sessions),
State#state{sessions=NewSessions}.
Expand Down Expand Up @@ -702,23 +704,40 @@ send_notification(#session{pid=Pid} = Session) ->
vmq_mqtt_fsm:send(Pid, {mail, self(), new_data}),
Session#session{status=passive}.

cleanup_session(SubscriberId, #session{queue=#queue{queue=Q}}) ->
cleanup_queue(SubscriberId, Q).
cleanup_session(SubscriberId, #session{queue=#queue{queue=Q}}, DoCollectRepublish) ->
cleanup_queue(SubscriberId, Q, DoCollectRepublish).

cleanup_queue(_, {[],[]}) -> ok; %% optimization
cleanup_queue(SId, Queue) ->
cleanup_queue_(SId, queue:out(Queue)).
cleanup_queue(SId, Queue, false).

cleanup_queue_(SId, {{value, {deliver, _, _} = Msg}, NewQueue}) ->
cleanup_queue(_, {[],[]}, _) -> ok; %% optimization
cleanup_queue(SId, Queue, false) ->
cleanup_queue_(SId, queue:out(Queue), false);
cleanup_queue(SId, Queue, true) ->
cleanup_queue_(SId, queue:out(Queue), []).

cleanup_queue_(SId, {{value, {deliver, _, _} = Msg}, NewQueue}, false) ->
maybe_offline_delete(SId, Msg),
cleanup_queue_(SId, queue:out(NewQueue));
cleanup_queue_(SId, {{value, {deliver_bin, _}}, NewQueue}) ->
cleanup_queue_(SId, queue:out(NewQueue), false);
cleanup_queue_(SId, {{value, {deliver, _, #vmq_msg{subscriber_group=Group}} =
Msg}, NewQueue}, RepublishAcc)->
NewRepublishAcc =
case Group of
undefined ->
RepublishAcc;
_ ->
[Msg|RepublishAcc]
end,
maybe_offline_delete(SId, Msg),
cleanup_queue_(SId, queue:out(NewQueue), NewRepublishAcc);
cleanup_queue_(SId, {{value, {deliver_bin, _}}, NewQueue}, RepublishAcc) ->
% no need to deref
cleanup_queue_(SId, queue:out(NewQueue));
cleanup_queue_(SId, {{value, MsgRef}, NewQueue}) when is_binary(MsgRef) ->
cleanup_queue_(SId, queue:out(NewQueue), RepublishAcc);
cleanup_queue_(SId, {{value, MsgRef}, NewQueue}, RepublishAcc) when is_binary(MsgRef) ->
maybe_offline_delete(SId, MsgRef),
cleanup_queue_(SId, queue:out(NewQueue));
cleanup_queue_(_, {empty, _}) -> ok.
cleanup_queue_(SId, queue:out(NewQueue), RepublishAcc);
cleanup_queue_(_, {empty, _}, false) -> ok;
cleanup_queue_(_, {empty, _}, RepublishAcc) -> RepublishAcc.


session_fold(SId, Fun, Acc, Map) ->
Expand Down Expand Up @@ -840,3 +859,51 @@ queue_split(N, Queue) ->
L -> L
end,
queue:split(NN, Queue).

republish(_, []) -> ok;
republish(SId, [{deliver, _, Msg}|Rest]) ->
#vmq_msg{mountpoint=MP,
routing_key=Topic,
reg_view=RegView,
subscriber_group=G} = Msg,
NewMsg = Msg#vmq_msg{persisted=false, msg_ref=undefined},
_ = vmq_reg:publish(RegView, MP, Topic,
%% The Publish Fold Fun is 'normaly' used to publish
%% the message to every subscriber. But it also
%% accumulates the subscribers of subscriber groups
%% which are (once accumulated) used to route the
%% message to a specific member of a group.
%%
%% In this case we only want to accumulate the
%% subscriber groups as the goal is to find a new
%% available member of the group that could handle the
%% messages.
%%
%% TODO: improve the interface to the publish_fold_fun
%% so the usage from outside vmq_reg is more
%% intuitive.
%%
%% Fun
%% -- Local Subscription
%% ({SubscriberId, QoS}, {Msg, SubGroups})
%% -> {Msg, SubGroups};
%%
%% -- A SubscriberGroup Subscription
%% ({{Group, Node, SubscriberId}, QoS}, {Msg, SubGroups})
%% -> {Msg, SubGroups};
%%
%% -- Remote Subscription
%% (Node, {Msg, SubGroups})
%% -> {Msg, SubGroups};
%%
fun({{Group, _, SubscriberId}, _} = Sub, {AccMsg, SubscriberGroups})
when (Group == G) and (SubscriberId =/= SId) ->
%% only include this Subscription IF the
%% SubscriberId is not ourself and the it is in
%% the same Group as we are.
{AccMsg, vmq_reg:add_to_subscriber_group(Sub, SubscriberGroups)};
(_, Acc) ->
%% ignore every other subscription
Acc
end, NewMsg),
republish(SId, Rest).
Loading