diff --git a/priv/vmq_server.schema b/priv/vmq_server.schema index 7a6565a..b3d9b23 100644 --- a/priv/vmq_server.schema +++ b/priv/vmq_server.schema @@ -21,6 +21,20 @@ {default, off}, {datatype, flag} ]}. +%% +%% @doc Allows multiple clients to subscribe to a group topic. A group topic +%% enables the distribution of messages among the different subscriber group +%% members. To start a new subscriber group or to become a group member a client +%% must prefix a SUBSCRIBE topic with $GROUP- followed by the +%% topic. +%% Assuming multiple clients are subscribed to $GROUP-mygroup/a/b/c. Messages +%% to a/b/c are randomly distributed among the member of the subscriber group 'mygroup'. +%% (non-standard behaviour!). +{mapping, "allow_subscriber_groups", "vmq_server.allow_subscriber_groups", [ + {default, off}, + {datatype, flag}, + hidden + ]}. %% @doc If allow_multiple_sessions is enabled 'queue_deliver_mode' will specify %% how the messages are delivered to multiple sessions. Default is 'fanout' which diff --git a/rebar.config b/rebar.config index 5f7a1cd..a0edd59 100644 --- a/rebar.config +++ b/rebar.config @@ -1,12 +1,9 @@ %%-*- mode: erlang -*- {require_otp_vsn, "17"}. -{cover_enabled, true}. {erl_opts, [{platform_define, "^[0-9]+", namespaced_types}, {parse_transform, lager_transform}, warnings_as_errors, debug_info]}. -{ct_use_short_names, true}. -{ct_extra_params, "-cover test/cover.spec -epmd_port 4369"}. {xref_checks, []}. {deps, [ {lager, "3.0.2"}, @@ -33,9 +30,6 @@ {vmq_commons, {git, "git://github.com/erlio/vmq_commons.git", {branch, "master"}}}, {vmq_plugin, {git, "git://github.com/erlio/vmq_plugin.git", {branch, "master"}}}, - %% simulating netsplits for dummies, only needed in test cases - {epmdpxy, {git, "git://github.com/dergraf/epmdpxy", {branch, "master"}}}, - {time_compat, {git, "git://github.com/lasp-lang/time_compat.git", {branch, "master"}}} ]}. {overrides, [ @@ -43,3 +37,4 @@ {override, sext, [{src_dirs, ["src"]}]} ]}. +{cover_enabled, true}. diff --git a/src/vmq_cluster_com.erl b/src/vmq_cluster_com.erl index fbc7b1b..48fc2c7 100644 --- a/src/vmq_cluster_com.erl +++ b/src/vmq_cluster_com.erl @@ -146,7 +146,7 @@ process(<<"msg", L:32, Bin:L/binary, Rest/binary>>) -> #vmq_msg{mountpoint=MP, routing_key=Topic, reg_view=RegView} = Msg = binary_to_term(Bin), - _ = vmq_reg_view:fold(RegView, MP, Topic, fun publish/2, Msg), + _ = vmq_reg:publish(RegView, MP, Topic, fun publish_fold_fun/2, Msg), process(Rest); process(<<"enq", L:32, Bin:L/binary, Rest/binary>>) -> {CallerPid, Ref, {enqueue, QueuePid, Msgs}} = binary_to_term(Bin), @@ -165,9 +165,41 @@ process(<<"enq", L:32, Bin:L/binary, Rest/binary>>) -> process(Rest); process(<<>>) -> ok. -publish({_, _} = SubscriberIdAndQoS, Msg) -> - vmq_reg:publish(SubscriberIdAndQoS, Msg); -publish(_Node, Msg) -> - %% we ignore remote subscriptions, they are already covered - %% by original publisher - Msg. +publish_fold_fun({{_,_}, _} = SubscriberIdAndQoS, Acc) -> + %% Local Subscription + vmq_reg:publish_fold_fun(SubscriberIdAndQoS, Acc); +publish_fold_fun({{_, Node, _}, _} = GroupSub, {Msg, _} = Acc) when Node == node() -> + %% Only handle SubscriberGroups for local node + %% Why this Case Clause?? + %% + %% Assuming you have two nodes A and B. + %% Node A has one subscriber with a/b + %% and one subscriber with $GROUP-test/a/b + %% + %% A message with a routing key a/b issued on Node B + %% would result in two matching routing entries for Node A, + %% not just one in the case of 'normal' subscriptions. + %% + %% For this reason we have to filter out the messages + %% not targetted to the subscriber group member. + %% + %% Note: Currently the message gets at max two times, + %% once for the subscriber group and once for the + %% 'normal' subscription. + case Msg#vmq_msg.subscriber_group of + undefined -> + %% Filter out the message + Acc; + Group when is_binary(Group) -> + %% It's a message for the subscriber group member + vmq_reg:publish_fold_fun(GroupSub, Acc) + end; +publish_fold_fun(_, Acc) -> + %% Could be Messages targetted for remote subscriptions that are already + %% covered by the origin publisher + %% + %% or + %% + %% could be messages targetted for subscriber groups that are already + %% covered by the origin publisher + Acc. diff --git a/src/vmq_config_cli.erl b/src/vmq_config_cli.erl index 52417f7..fa8a5ae 100644 --- a/src/vmq_config_cli.erl +++ b/src/vmq_config_cli.erl @@ -27,6 +27,7 @@ register_config_() -> ["allow_anonymous", "trade_consistency", "allow_multiple_sessions", + "allow_subscriber_groups", "queue_deliver_mode", "queue_type", "retry_interval", diff --git a/src/vmq_mqtt_fsm.erl b/src/vmq_mqtt_fsm.erl index f146748..6ed1753 100644 --- a/src/vmq_mqtt_fsm.erl +++ b/src/vmq_mqtt_fsm.erl @@ -75,6 +75,7 @@ retry_interval=20000 :: pos_integer(), upgrade_qos=false :: boolean(), trade_consistency=false :: boolean(), + allow_subscriber_groups=false :: boolean(), reg_view=vmq_reg_trie :: atom() }). @@ -93,6 +94,7 @@ init(Peer, Opts) -> end, AllowAnonymous = vmq_config:get_env(allow_anonymous, false), TradeConsistency = vmq_config:get_env(trade_consistency, false), + AllowSubscriberGroups = vmq_config:get_env(allow_subscriber_groups, false), RetryInterval = vmq_config:get_env(retry_interval, 20), MaxClientIdSize = vmq_config:get_env(max_client_id_size, 23), MaxInflightMsgs = vmq_config:get_env(max_inflight_messages, 20), @@ -113,6 +115,7 @@ init(Peer, Opts) -> keep_alive_tref=TRef, retry_interval=1000 * RetryInterval, trade_consistency=TradeConsistency, + allow_subscriber_groups=AllowSubscriberGroups, reg_view=RegView}}. data_in(Data, SessionState) when is_binary(Data) -> @@ -217,7 +220,7 @@ connected(#mqtt_publish{message_id=MessageId, topic=Topic, Ret = case {Topic, valid_msg_size(Payload, MaxMessageSize)} of {[<<"$", _binary>> |_], _} -> - %% $SYS + %% $SYS and $GROUP {State#state{recv_cnt=incr_msg_recv_cnt(RecvCnt)}, []}; {_, true} -> Msg = #vmq_msg{routing_key=Topic, @@ -344,8 +347,9 @@ connected(#mqtt_pubcomp{message_id=MessageId}, State) -> end; connected(#mqtt_subscribe{message_id=MessageId, topics=Topics}, State) -> #state{subscriber_id=SubscriberId, username=User, - trade_consistency=Consistency} = State, - case vmq_reg:subscribe(Consistency, User, SubscriberId, Topics) of + trade_consistency=Consistency, + allow_subscriber_groups=SubscriberGroups} = State, + case vmq_reg:subscribe(Consistency, SubscriberGroups, User, SubscriberId, Topics) of {ok, QoSs} -> {NewState, Out} = send_frame(#mqtt_suback{message_id=MessageId, qos_table=QoSs}, State), @@ -588,7 +592,8 @@ auth_on_register(User, Password, State) -> max_inflight_messages=?state_val(max_inflight_messages, Args, State), retry_interval=?state_val(retry_interval, Args, State), upgrade_qos=?state_val(upgrade_qos, Args, State), - trade_consistency=?state_val(trade_consistency, Args, State) + trade_consistency=?state_val(trade_consistency, Args, State), + allow_subscriber_groups=?state_val(allow_subscriber_groups, Args, State) }, {ok, queue_opts(ChangedState, Args), ChangedState}; {error, Reason} -> diff --git a/src/vmq_queue.erl b/src/vmq_queue.erl index 446c68e..bfa5f65 100644 --- a/src/vmq_queue.erl +++ b/src/vmq_queue.erl @@ -176,7 +176,7 @@ online({change_state, NewSessionState, SessionPid}, State) -> {next_state, online, change_session_state(NewSessionState, SessionPid, State)}; online({notify_recv, SessionPid}, #state{id=SId, sessions=Sessions} = State) -> #session{queue=#queue{backup=Backup} = Queue} = Session = maps:get(SessionPid, Sessions), - cleanup_queue(SId, Backup), + _ = cleanup_queue(SId, Backup), NewSessions = maps:update(SessionPid, Session#session{queue=Queue#queue{backup=queue:new()}}, Sessions), @@ -242,7 +242,7 @@ drain(drain_start, #state{id=SId, offline=#queue{queue=Q} = Queue, %% instead of the erlang distribution link. case vmq_cluster:remote_enqueue(node(RemoteQueue), {enqueue, RemoteQueue, Msgs}) of ok -> - cleanup_queue(SId, DrainQ), + _ = cleanup_queue(SId, DrainQ), case queue:len(NewQ) of L when L > 0 -> gen_fsm:send_event(self(), drain_start), @@ -317,7 +317,7 @@ offline(expire_session, #state{id=SId, offline=#queue{queue=Q}} = State) -> vmq_exo:decr_inactive_clients(), vmq_exo:incr_expired_clients(), vmq_reg:delete_subscriptions(SId), - cleanup_queue(SId, Q), + _ = cleanup_queue(SId, Q), {stop, normal, State}; offline(Event, State) -> lager:error("got unknown event in offline state ~p", [Event]), @@ -466,18 +466,18 @@ del_session(SessionPid, #state{id=SId, sessions=Sessions} = State) -> NewSessions = maps:remove(SessionPid, Sessions), case maps:get(SessionPid, Sessions) of #session{clean=true} = Session -> - cleanup_session(SId, Session), - {State#state{sessions=NewSessions}, Session}; + Republish = cleanup_session(SId, Session, maps:size(NewSessions) == 0), + {State#state{sessions=NewSessions}, Session, Republish}; Session -> %% give queue content of this session to other alive sessions %% or to offline queue {insert_from_session(Session, State#state{sessions=NewSessions}), - Session} + Session, []} end. handle_session_down(SessionPid, StateName, #state{id=SId, waiting_call=WaitingCall} = State) -> - {NewState, DeletedSession} = del_session(SessionPid, State), + {NewState, DeletedSession, Republish} = del_session(SessionPid, State), case {maps:size(NewState#state.sessions), StateName, WaitingCall} of {0, wait_for_offline, {add_session, NewSessionPid, Opts, From}} -> %% last session gone @@ -515,6 +515,8 @@ handle_session_down(SessionPid, StateName, %% clean session flag vmq_exo:decr_active_clients(), vmq_reg:delete_subscriptions(SId), + %% If possible republish to subscriber groups if available + republish(SId, Republish), _ = vmq_plugin:all(on_client_gone, [SId]), {stop, normal, NewState}; {0, OldStateName, _} -> @@ -569,7 +571,7 @@ disconnect_sessions(#state{sessions=Sessions}) -> change_session_state(NewState, SessionPid, #state{id=SId, sessions=Sessions} = State) -> #session{queue=#queue{backup=Backup} = Queue} = Session = maps:get(SessionPid, Sessions), - cleanup_queue(SId, Backup), + _ = cleanup_queue(SId, Backup), UpdatedSession = change_session_state(NewState, Session#session{queue=Queue#queue{backup=queue:new()}}), NewSessions = maps:update(SessionPid, UpdatedSession, Sessions), State#state{sessions=NewSessions}. @@ -702,23 +704,40 @@ send_notification(#session{pid=Pid} = Session) -> vmq_mqtt_fsm:send(Pid, {mail, self(), new_data}), Session#session{status=passive}. -cleanup_session(SubscriberId, #session{queue=#queue{queue=Q}}) -> - cleanup_queue(SubscriberId, Q). +cleanup_session(SubscriberId, #session{queue=#queue{queue=Q}}, DoCollectRepublish) -> + cleanup_queue(SubscriberId, Q, DoCollectRepublish). -cleanup_queue(_, {[],[]}) -> ok; %% optimization cleanup_queue(SId, Queue) -> - cleanup_queue_(SId, queue:out(Queue)). + cleanup_queue(SId, Queue, false). -cleanup_queue_(SId, {{value, {deliver, _, _} = Msg}, NewQueue}) -> +cleanup_queue(_, {[],[]}, _) -> ok; %% optimization +cleanup_queue(SId, Queue, false) -> + cleanup_queue_(SId, queue:out(Queue), false); +cleanup_queue(SId, Queue, true) -> + cleanup_queue_(SId, queue:out(Queue), []). + +cleanup_queue_(SId, {{value, {deliver, _, _} = Msg}, NewQueue}, false) -> maybe_offline_delete(SId, Msg), - cleanup_queue_(SId, queue:out(NewQueue)); -cleanup_queue_(SId, {{value, {deliver_bin, _}}, NewQueue}) -> + cleanup_queue_(SId, queue:out(NewQueue), false); +cleanup_queue_(SId, {{value, {deliver, _, #vmq_msg{subscriber_group=Group}} = + Msg}, NewQueue}, RepublishAcc)-> + NewRepublishAcc = + case Group of + undefined -> + RepublishAcc; + _ -> + [Msg|RepublishAcc] + end, + maybe_offline_delete(SId, Msg), + cleanup_queue_(SId, queue:out(NewQueue), NewRepublishAcc); +cleanup_queue_(SId, {{value, {deliver_bin, _}}, NewQueue}, RepublishAcc) -> % no need to deref - cleanup_queue_(SId, queue:out(NewQueue)); -cleanup_queue_(SId, {{value, MsgRef}, NewQueue}) when is_binary(MsgRef) -> + cleanup_queue_(SId, queue:out(NewQueue), RepublishAcc); +cleanup_queue_(SId, {{value, MsgRef}, NewQueue}, RepublishAcc) when is_binary(MsgRef) -> maybe_offline_delete(SId, MsgRef), - cleanup_queue_(SId, queue:out(NewQueue)); -cleanup_queue_(_, {empty, _}) -> ok. + cleanup_queue_(SId, queue:out(NewQueue), RepublishAcc); +cleanup_queue_(_, {empty, _}, false) -> ok; +cleanup_queue_(_, {empty, _}, RepublishAcc) -> RepublishAcc. session_fold(SId, Fun, Acc, Map) -> @@ -840,3 +859,51 @@ queue_split(N, Queue) -> L -> L end, queue:split(NN, Queue). + +republish(_, []) -> ok; +republish(SId, [{deliver, _, Msg}|Rest]) -> + #vmq_msg{mountpoint=MP, + routing_key=Topic, + reg_view=RegView, + subscriber_group=G} = Msg, + NewMsg = Msg#vmq_msg{persisted=false, msg_ref=undefined}, + _ = vmq_reg:publish(RegView, MP, Topic, + %% The Publish Fold Fun is 'normaly' used to publish + %% the message to every subscriber. But it also + %% accumulates the subscribers of subscriber groups + %% which are (once accumulated) used to route the + %% message to a specific member of a group. + %% + %% In this case we only want to accumulate the + %% subscriber groups as the goal is to find a new + %% available member of the group that could handle the + %% messages. + %% + %% TODO: improve the interface to the publish_fold_fun + %% so the usage from outside vmq_reg is more + %% intuitive. + %% + %% Fun + %% -- Local Subscription + %% ({SubscriberId, QoS}, {Msg, SubGroups}) + %% -> {Msg, SubGroups}; + %% + %% -- A SubscriberGroup Subscription + %% ({{Group, Node, SubscriberId}, QoS}, {Msg, SubGroups}) + %% -> {Msg, SubGroups}; + %% + %% -- Remote Subscription + %% (Node, {Msg, SubGroups}) + %% -> {Msg, SubGroups}; + %% + fun({{Group, _, SubscriberId}, _} = Sub, {AccMsg, SubscriberGroups}) + when (Group == G) and (SubscriberId =/= SId) -> + %% only include this Subscription IF the + %% SubscriberId is not ourself and the it is in + %% the same Group as we are. + {AccMsg, vmq_reg:add_to_subscriber_group(Sub, SubscriberGroups)}; + (_, Acc) -> + %% ignore every other subscription + Acc + end, NewMsg), + republish(SId, Rest). diff --git a/src/vmq_reg.erl b/src/vmq_reg.erl index 0a7173e..78a2d24 100644 --- a/src/vmq_reg.erl +++ b/src/vmq_reg.erl @@ -19,7 +19,7 @@ %% API -export([ %% used in mqtt fsm handling - subscribe/4, + subscribe/5, unsubscribe/4, register_subscriber/2, delete_subscriptions/1, @@ -51,8 +51,10 @@ terminate/2, code_change/3]). -%% called by vmq_cluster_com --export([publish/2]). +%% used in vmq_cluster_com +-export([publish/4]). +-export([publish/5]). +-export([publish_fold_fun/2]). %% used from plugins -export([direct_plugin_exports/1]). @@ -71,34 +73,34 @@ -define(TOMBSTONE, '$deleted'). -define(NR_OF_REG_RETRIES, 10). --spec subscribe(flag(), username() | plugin_id(), subscriber_id(), +-spec subscribe(flag(), flag(), username() | plugin_id(), subscriber_id(), [{topic(), qos()}]) -> {ok, [qos() | not_allowed]} | {error, not_allowed | overloaded | not_ready}. -subscribe(false, User, SubscriberId, Topics) -> +subscribe(false, AllowSubscriberGroups, User, SubscriberId, Topics) -> %% trade availability for consistency - vmq_cluster:if_ready(fun subscribe_/3, [User, SubscriberId, Topics]); -subscribe(true, User, SubscriberId, Topics) -> + vmq_cluster:if_ready(fun subscribe_/4, [User, AllowSubscriberGroups, SubscriberId, Topics]); +subscribe(true, AllowSubscriberGroups, User, SubscriberId, Topics) -> %% trade consistency for availability - subscribe_(User, SubscriberId, Topics). + subscribe_(User, AllowSubscriberGroups, SubscriberId, Topics). -subscribe_(User, SubscriberId, Topics) -> +subscribe_(User, AllowSubscriberGroups, SubscriberId, Topics) -> case vmq_plugin:all_till_ok(auth_on_subscribe, [User, SubscriberId, Topics]) of ok -> - subscribe_op(User, SubscriberId, Topics); + subscribe_op(User, AllowSubscriberGroups, SubscriberId, Topics); {ok, NewTopics} when is_list(NewTopics) -> - subscribe_op(User, SubscriberId, NewTopics); + subscribe_op(User, AllowSubscriberGroups, SubscriberId, NewTopics); {error, _} -> {error, not_allowed} end. -subscribe_op(User, SubscriberId, Topics) -> +subscribe_op(User, AllowSubscriberGroups, SubscriberId, Topics) -> rate_limited_op( fun() -> - add_subscriber(Topics, SubscriberId) + add_subscriber(AllowSubscriberGroups, SubscriberId, Topics) end, fun(_) -> QoSTable = @@ -250,11 +252,9 @@ publish(#vmq_msg{trade_consistency=true, true -> %% retain set action vmq_retain_srv:insert(MP, Topic, Payload), - RegView:fold(MP, Topic, fun publish/2, Msg#vmq_msg{retain=false}), - ok; + publish(RegView, MP, Topic, Msg#vmq_msg{retain=false}); false -> - RegView:fold(MP, Topic, fun publish/2, Msg), - ok + publish(RegView, MP, Topic, Msg) end; publish(#vmq_msg{trade_consistency=false, reg_view=RegView, @@ -270,33 +270,88 @@ publish(#vmq_msg{trade_consistency=false, true when (IsRetain == true) -> %% retain set action vmq_retain_srv:insert(MP, Topic, Payload), - vmq_reg_view:fold(RegView, MP, Topic, fun publish/2, Msg#vmq_msg{retain=false}), - ok; + publish(RegView, MP, Topic, Msg#vmq_msg{retain=false}); true -> - RegView:fold(MP, Topic, fun publish/2, Msg), - ok; + publish(RegView, MP, Topic, Msg); false -> {error, not_ready} end. +publish(RegView, MP, Topic, Msg) -> + publish(RegView, MP, Topic, fun publish_fold_fun/2, Msg). + +publish(RegView, MP, Topic, Fun, Msg) -> + Acc = publish_fold_acc(Msg), + {NewMsg, SubscriberGroups} = vmq_reg_view:fold(RegView, MP, Topic, Fun, Acc), + publish_to_subscriber_groups(NewMsg, SubscriberGroups). + %% publish/2 is used as the fold function in RegView:fold/4 -publish({SubscriberId, QoS}, Msg) -> - publish(Msg, QoS, get_queue_pid(SubscriberId)); -publish(Node, Msg) -> +publish_fold_fun({{_,_} = SubscriberId, QoS}, {Msg, _} = Acc) -> + %% Local Subscriber + case get_queue_pid(SubscriberId) of + not_found -> Acc; + QPid -> + ok = vmq_queue:enqueue(QPid, {deliver, QoS, Msg}), + Acc + end; +publish_fold_fun({{_Group, _Node, _SubscriberId}, _QoS} = Sub, {Msg, SubscriberGroups}) -> + %% Subscriber Group + {Msg, add_to_subscriber_group(Sub, SubscriberGroups)}; +publish_fold_fun(Node, {Msg, _} = Acc) -> + %% Remote Subscriber case vmq_cluster:publish(Node, Msg) of ok -> - Msg; + Acc; {error, Reason} -> lager:warning("can't publish to remote node ~p due to '~p'", [Node, Reason]), - Msg + Acc end. -publish(Msg, _, not_found) -> Msg; -publish(Msg, QoS, QPid) -> - ok = vmq_queue:enqueue(QPid, {deliver, QoS, Msg}), - Msg. +publish_fold_acc(Msg) -> {Msg, undefined}. + +publish_to_subscriber_groups(_, undefined) -> ok; +publish_to_subscriber_groups(Msg, SubscriberGroups) when is_map(SubscriberGroups) -> + publish_to_subscriber_groups(Msg, maps:to_list(SubscriberGroups)); +publish_to_subscriber_groups(_, []) -> ok; +publish_to_subscriber_groups(Msg, [{Group, []}|Rest]) -> + lager:warning("can't publish to subscriber group ~p due to no subscriber available", [Group]), + publish_to_subscriber_groups(Msg, Rest); +publish_to_subscriber_groups(Msg, [{Group, SubscriberGroup}|Rest]) -> + NewMsg = Msg#vmq_msg{subscriber_group=Group}, + N = random:uniform(length(SubscriberGroup)), + case lists:nth(N, SubscriberGroup) of + {Node, SubscriberId, QoS} = Sub when Node == node() -> + case get_queue_pid(SubscriberId) of + not_found -> + NewSubscriberGroup = lists:delete(Sub, SubscriberGroup), + %% retry with other members of this group + publish_to_subscriber_groups(NewMsg, [{Group, NewSubscriberGroup}|Rest]); + QPid -> + ok = vmq_queue:enqueue(QPid, {deliver, QoS, NewMsg}), + publish_to_subscriber_groups(NewMsg, Rest) + end; + {Node, _, _} = Sub -> + case vmq_cluster:publish(Node, NewMsg) of + ok -> + publish_to_subscriber_groups(NewMsg, Rest); + {error, Reason} -> + lager:warning("can't publish for subscriber group to remote node ~p due to '~p'", [Node, Reason]), + NewSubscriberGroup = lists:delete(Sub, SubscriberGroup), + %% retry with other members of this group + publish_to_subscriber_groups(NewMsg, [{Group, NewSubscriberGroup}|Rest]) + end + end. + +add_to_subscriber_group(Sub, undefined) -> + add_to_subscriber_group(Sub, #{}); +add_to_subscriber_group({{Group, Node, SubscriberId}, QoS}, SubscriberGroups) -> + SubscriberGroup = maps:get(Group, SubscriberGroups, []), + maps:put(Group, [{Node, SubscriberId, QoS}|SubscriberGroup], + SubscriberGroups). -spec deliver_retained(subscriber_id(), topic(), qos()) -> 'ok'. +deliver_retained(SubscriberId, [<<"$GROUP-", _/binary>>|Topic], QoS) -> + deliver_retained(SubscriberId, Topic, QoS); deliver_retained({MP, _} = SubscriberId, Topic, QoS) -> QPid = get_queue_pid(SubscriberId), vmq_retain_srv:match_fold( @@ -564,8 +619,10 @@ direct_plugin_exports(Mod) when is_atom(Mod) -> fun([W|_] = Topic) when is_binary(W) -> wait_til_ready(), CallingPid = self(), + AllowSubscriberGroups = + vmq_config:get_env(allow_subscriber_groups, false), User = {plugin, Mod, CallingPid}, - subscribe(TradeConsistency, User, + subscribe(TradeConsistency, AllowSubscriberGroups, User, {MountPoint, ClientId(CallingPid)}, [{Topic, 0}]); (_) -> {error, invalid_topic} @@ -691,28 +748,25 @@ fold_sessions(FoldFun, Acc) -> end, AccAcc, vmq_queue:get_sessions(QPid)) end, Acc). --spec add_subscriber([{topic(), qos() | not_allowed}], subscriber_id()) -> ok. -add_subscriber(Topics, SubscriberId) -> +-spec add_subscriber(flag(), subscriber_id(), [{topic(), qos() | not_allowed}]) -> ok. +add_subscriber(AllowSubscriberGroups, SubscriberId, Topics) -> + OldSubs = plumtree_metadata:get(?SUBSCRIBER_DB, SubscriberId, [{default, []}]), NewSubs = - case plumtree_metadata:get(?SUBSCRIBER_DB, SubscriberId) of - undefined -> - %% not_allowed topics are filtered out here - [{Topic, QoS, node()} || {Topic, QoS} <- Topics, is_integer(QoS)]; - Subs -> - lists:foldl(fun ({_Topic, not_allowed}, NewSubsAcc) -> - NewSubsAcc; - ({Topic, QoS}, NewSubsAcc) -> - NewSub = {Topic, QoS, node()}, - case lists:member(NewSub, NewSubsAcc) of - true -> NewSubsAcc; - false -> - [NewSub|NewSubsAcc] - end - end, Subs, Topics) - end, + lists:foldl(fun ({_Topic, not_allowed}, NewSubsAcc) -> + NewSubsAcc; + ({[<<"$GROUP-", _binary>>|_], _}, NewSubsAcc) + when not AllowSubscriberGroups -> + NewSubsAcc; + ({Topic, QoS}, NewSubsAcc) when is_integer(QoS) -> + NewSub = {Topic, QoS, node()}, + case lists:member(NewSub, NewSubsAcc) of + true -> NewSubsAcc; + false -> + [NewSub|NewSubsAcc] + end + end, OldSubs, Topics), plumtree_metadata:put(?SUBSCRIBER_DB, SubscriberId, NewSubs). - -spec del_subscriber(subscriber_id()) -> ok. del_subscriber(SubscriberId) -> plumtree_metadata:delete(?SUBSCRIBER_DB, SubscriberId). diff --git a/src/vmq_reg_trie.erl b/src/vmq_reg_trie.erl index 2fe001e..2919e7f 100644 --- a/src/vmq_reg_trie.erl +++ b/src/vmq_reg_trie.erl @@ -193,6 +193,10 @@ handle_event(Handler, Event) -> ok end. +handle_delete_event({MP, _} = SubscriberId, [{[<<"$GROUP-", Group/binary>>|Topic], QoS, Node}|Rest]) -> + del_topic(MP, Topic, Node), + del_subscriber(MP, Topic, {Group, Node, SubscriberId}, QoS), + handle_delete_event(SubscriberId, Rest); handle_delete_event({MP, _} = SubscriberId, [{Topic, QoS, Node}|Rest]) when Node == node() -> del_topic(MP, Topic, Node), del_subscriber(MP, Topic, SubscriberId, QoS), @@ -202,6 +206,11 @@ handle_delete_event({MP, _} = SubscriberId, [{Topic, _, Node}|Rest]) -> handle_delete_event(SubscriberId, Rest); handle_delete_event(_, []) -> ok. +handle_add_event({MP, _} = SubscriberId, [{[<<"$GROUP-", Group/binary>>|Topic], QoS, Node}|Rest]) -> + %% A Topic that is used as part of the subscriber group + add_topic(MP, Topic, Node), + add_subscriber(MP, Topic, {Group, Node, SubscriberId}, QoS), + handle_add_event(SubscriberId, Rest); handle_add_event({MP, _} = SubscriberId, [{Topic, QoS, Node}|Rest]) when Node == node() -> add_topic(MP, Topic, Node), add_subscriber(MP, Topic, SubscriberId, QoS), diff --git a/src/vmq_server.app.src b/src/vmq_server.app.src index c8817d0..0fefaaa 100644 --- a/src/vmq_server.app.src +++ b/src/vmq_server.app.src @@ -22,6 +22,7 @@ {env, [ % session opts {allow_anonymous, true}, + {allow_subscriber_groups, false}, {max_client_id_size, 23}, {retry_interval, 20}, {max_inflight_messages, 20}, diff --git a/src/vmq_server.hrl b/src/vmq_server.hrl index 5b29ff3..cc17edf 100644 --- a/src/vmq_server.hrl +++ b/src/vmq_server.hrl @@ -11,6 +11,7 @@ trade_consistency=false :: boolean(), reg_view=vmq_reg_trie :: atom(), mountpoint :: mountpoint(), - persisted=false :: flag() + persisted=false :: flag(), + subscriber_group :: undefined | binary() }). -type msg() :: #vmq_msg{}. diff --git a/test/vmq_cluster_SUITE.erl b/test/vmq_cluster_SUITE.erl index 56a865f..d4b49d3 100644 --- a/test/vmq_cluster_SUITE.erl +++ b/test/vmq_cluster_SUITE.erl @@ -20,37 +20,50 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("kernel/include/inet.hrl"). +-include_lib("eunit/include/eunit.hrl"). -include_lib("vmq_commons/include/vmq_types.hrl"). %% =================================================================== %% common_test callbacks %% =================================================================== init_per_suite(_Config) -> + %lager:start(), %% this might help, might not... - os:cmd(os:find_executable("epmd") ++ " -daemon"), - case net_kernel:start([test_master, shortnames]) of + os:cmd(os:find_executable("epmd")++" -daemon"), + {ok, Hostname} = inet:gethostname(), + case net_kernel:start([list_to_atom("runner@"++Hostname), shortnames]) of {ok, _} -> ok; - {error, _} -> ok + {error, {already_started, _}} -> ok end, - cover:start(), + lager:info("node name ~p", [node()]), _Config. end_per_suite(_Config) -> + application:stop(lager), _Config. -init_per_testcase(_Case, Config) -> - {_MasterNode, Nodes} = start_cluster(5), - CoverNodes = [begin - wait_til_ready(Node), - Node - end || {Node, _} <- Nodes], +init_per_testcase(Case, Config) -> + Nodes = vmq_cluster_test_utils:pmap( + fun({N, P}) -> + Node = vmq_cluster_test_utils:start_node(N, Config, Case), + {ok, _} = rpc:call(Node, vmq_server_cmd, listener_start, + [P, []]), + %% allow all + ok = rpc:call(Node, vmq_auth, register_hooks, []), + {Node, P} + end, [{test1, 18883}, + {test2, 18884}, + {test3, 18885}, + {test4, 18886}, + {test5, 18887}]), + {CoverNodes, _} = lists:unzip(Nodes), {ok, _} = ct_cover:add_nodes(CoverNodes), [{nodes, Nodes}|Config]. -end_per_testcase(_, Config) -> - {_, Nodes} = lists:keyfind(nodes, 1, Config), - stop_cluster(Nodes), - Config. +end_per_testcase(_, _Config) -> + vmq_cluster_test_utils:pmap(fun(Node) -> ct_slave:stop(Node) end, + [test1, test2, test3, test4, test5]), + ok. all() -> [multiple_connect_test @@ -63,7 +76,21 @@ all() -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Actual Tests %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +ensure_cluster(Config) -> + [{Node1, _}|OtherNodes] = Nodes = proplists:get_value(nodes, Config), + [begin + {ok, _} = rpc:call(Node, vmq_server_cmd, node_join, [Node1]) + end || {Node, _} <- OtherNodes], + {NodeNames, _} = lists:unzip(Nodes), + Expected = lists:sort(NodeNames), + ok = vmq_cluster_test_utils:wait_until_joined(NodeNames, Expected), + [?assertEqual({Node, Expected}, {Node, + lists:sort(vmq_cluster_test_utils:get_cluster_members(Node))}) + || Node <- NodeNames], + ok. + multiple_connect_test(Config) -> + ok = ensure_cluster(Config), {_, Nodes} = lists:keyfind(nodes, 1, Config), NrOfConnects = 250, NrOfProcesses = NrOfConnects div 50, %random:uniform(NrOfConnects), @@ -73,7 +100,19 @@ multiple_connect_test(Config) -> true = check_unique_client("connect-multiple", Nodes), Config. +wait_until_converged(Nodes, Fun, ExpectedReturn) -> + {NodeNames, _} = lists:unzip(Nodes), + vmq_cluster_test_utils:wait_until( + fun() -> + lists:all(fun(X) -> X == true end, + vmq_cluster_test_utils:pmap( + fun(Node) -> + ExpectedReturn == Fun(Node) + end, NodeNames)) + end, 60*2, 500). + multiple_connect_unclean_test(Config) -> + ok = ensure_cluster(Config), {_, Nodes} = lists:keyfind(nodes, 1, Config), Topic = "qos1/multiple/test", Connect = packet:gen_connect("connect-unclean", [{clean_session, false}, @@ -82,39 +121,27 @@ multiple_connect_unclean_test(Config) -> Subscribe = packet:gen_subscribe(123, Topic, 1), Suback = packet:gen_suback(123, 1), Disconnect = packet:gen_disconnect(), - {ok, Socket} = packet:do_client_connect(Connect, Connack, opts(Nodes)), + {RandomNode, RandomPort} = random_node(Nodes), + {ok, Socket} = packet:do_client_connect(Connect, Connack, [{port, + RandomPort}]), ok = gen_tcp:send(Socket, Subscribe), ok = packet:expect_packet(Socket, "suback", Suback), ok = gen_tcp:send(Socket, Disconnect), - timer:sleep(2000), - Subs = fun() -> rpc:multicall([N || {N, _} <-Nodes], - vmq_reg, total_subscriptions, []) - end, - io:format(user, "!!!!!!!!!!!!!!!!!!! Subs before send ~p~n", [Subs()]), - %% publish random content to the topic - Strd = fun() -> rpc:multicall([N || {N, _} <-Nodes], - vmq_reg, stored, [{"", <<"connect-unclean">>}]) - end, - io:format(user, "!!!!!!!!!!!!!!!!!!! stored msgs before send ~p~n", [Strd()]), - Payloads = publish_random(Nodes, 1000, Topic), - Ports = fun() -> rpc:multicall([N || {N, _} <-Nodes], - erlang, system_info, [port_count]) - end, - Procs = fun() -> rpc:multicall([N || {N, _} <-Nodes], - erlang, system_info, [process_count]) - end, - io:format(user, "~n!!!!!!!!!!!!!!!!!!! stored msgs after send ~p~n", [Strd()]), - io:format(user, "!!!!!!!!!!!!!!!!!!! port_count ~p~n", [Ports()]), - io:format(user, "!!!!!!!!!!!!!!!!!!! process_count ~p~n", [Procs()]), - timer:sleep(2000), - ok = receive_publishes(Nodes, Topic, Payloads), - timer:sleep(2000), - io:format(user, "!!!!!!!!!!!!!!!!!!! stored msgs after deliver~p~n", [Strd()]), - io:format(user, "!!!!!!!!!!!!!!!!!!! port_count ~p~n", [Ports()]), - io:format(user, "!!!!!!!!!!!!!!!!!!! process_count ~p~n", [Procs()]), - Config. + gen_tcp:close(Socket), + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, 1}]), + Payloads = publish_random(Nodes, 100, Topic), + ok = vmq_cluster_test_utils:wait_until( + fun() -> + 100 == rpc:call(RandomNode, vmq_reg, stored, + [{"", <<"connect-unclean">>}]) + end, 60, 500), + ok = receive_publishes(Nodes, Topic, Payloads). distributed_subscribe_test(Config) -> + ok = ensure_cluster(Config), {_, Nodes} = lists:keyfind(nodes, 1, Config), Topic = "qos1/distributed/test", Sockets = @@ -130,7 +157,10 @@ distributed_subscribe_test(Config) -> ok = packet:expect_packet(Socket, "suback", Suback), Socket end || {_, Port} <- Nodes], - timer:sleep(2000), + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, length(Sockets)}]), [PubSocket|Rest] = Sockets, Publish = packet:gen_publish(Topic, 1, <<"test-message">>, [{mid, 1}]), Puback = packet:gen_puback(1), @@ -144,10 +174,11 @@ distributed_subscribe_test(Config) -> Config. cluster_leave_test(Config) -> + ok = ensure_cluster(Config), {_, [{Node, Port}|RestNodes] = Nodes} = lists:keyfind(nodes, 1, Config), Topic = "cluster/leave/topic", %% create 100 sessions - [PubSocket|_] = + [PubSocket|_] = Sockets = [begin Connect = packet:gen_connect("connect-" ++ integer_to_list(I), [{clean_session, false}, @@ -160,39 +191,33 @@ cluster_leave_test(Config) -> ok = packet:expect_packet(Socket, "suback", Suback), Socket end || I <- lists:seq(1,100)], - timer:sleep(2000), - - Subs = fun(Nds) -> rpc:multicall([N || {N, _} <-Nds], - vmq_reg, total_subscriptions, []) - end, - Strd = fun(Nds) -> rpc:multicall([N || {N, _} <-Nds], - vmq_queue_sup, summary, []) - end, - io:format(user, "!!!!!!!!!!!!!!!!!!! Subs before leave ~p~n", [Subs(Nodes)]), - + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, length(Sockets)}]), %% publish a message for every session Publish = packet:gen_publish(Topic, 1, <<"test-message">>, [{mid, 1}]), Puback = packet:gen_puback(1), ok = gen_tcp:send(PubSocket, Publish), ok = packet:expect_packet(PubSocket, "puback", Puback), - timer:sleep(1000), %% just to ensure that the message was routed to all subscribers - %% let first node leave the cluster, this should migrate the sessions + ok = vmq_cluster_test_utils:wait_until( + fun() -> + {100, 0, 0, 0, 0} == rpc:call(Node, vmq_queue_sup, summary, []) + end, 60, 500), + %% Pick a control node for initiating the cluster leave [{CtrlNode, _}|_] = RestNodes, - %% Node_leave might return before migration has finished {ok, _} = rpc:call(CtrlNode, vmq_server_cmd, node_leave, [Node]), - - io:format(user, "!!!!!!!!!!!!!!!!!!! Subs after leave ~p~n", [Subs(RestNodes)]), - io:format(user, "!!!!!!!!!!!!!!!!!!! queue summary after leave ~p~n", [Strd(RestNodes)]), - - - %% the 100 sessions should now be migrated to the rest of the nodes - %% each holding 25 queues each containing 1 message - [{0, 0, 0, 25, 25} = rpc:call(N, vmq_queue_sup, summary, []) - || {N, _} <- RestNodes], - Config. + %% Leaving Node will disconnect all sessions and give away all messages + %% The disconnected sessions are equally migrated to the rest of the nodes + %% As the clients don't reconnect (in this test), their sessions are offline + ok = wait_until_converged(RestNodes, + fun(N) -> + rpc:call(N, vmq_queue_sup, summary, []) + end, {0, 0, 0, 25, 25}). cluster_leave_dead_node_test(Config) -> - {_, [Node = {Name, Port}|RestNodes] = Nodes} = lists:keyfind(nodes, 1, Config), + ok = ensure_cluster(Config), + {_, [{Node, Port}|RestNodes] = Nodes} = lists:keyfind(nodes, 1, Config), Topic = "cluster/leave/dead/topic", %% create 100 sessions on first Node _ = @@ -209,37 +234,24 @@ cluster_leave_dead_node_test(Config) -> gen_tcp:send(Socket, packet:gen_disconnect()), gen_tcp:close(Socket) end || I <- lists:seq(1,100)], - timer:sleep(2000), - - Subs = fun(Nds) -> rpc:multicall([N || {N, _} <-Nds], - vmq_reg, total_subscriptions, []) - end, - - Strd = fun(Nds) -> rpc:multicall([N || {N, _} <-Nds], - vmq_queue_sup, summary, []) - end, - - io:format(user, "!!!!!!!!!!!!!!!!!!! Subs before leave ~p~n", [Subs(Nodes)]), - io:format(user, "!!!!!!!!!!!!!!!!!!! queue summary before leave ~p~n", [Strd(Nodes)]), - + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, 100}]), %% stop first node - stop_node(Node), + ct_slave:stop(Node), - %% let first node leave the cluster, this should migrate the sessions + %% Pick a control node for initiating the cluster leave + %% let first node leave the cluster, this should migrate the sessions, + %% but not the messages [{CtrlNode, _}|_] = RestNodes, - %% Node_leave might return before migration has finished - {ok, _} = rpc:call(CtrlNode, vmq_server_cmd, node_leave, [Name]), - - io:format(user, "!!!!!!!!!!!!!!!!!!! Subs after leave ~p~n", [Subs(RestNodes)]), - io:format(user, "!!!!!!!!!!!!!!!!!!! queue summary after leave ~p~n", [Strd(RestNodes)]), - - - %% the 100 sessions should now be migrated to the rest of the nodes - %% each holding 25 queues each containing 1 message - [{0, 0, 0, 25, 0} = rpc:call(N, vmq_queue_sup, summary, []) - || {N, _} <- RestNodes], - Config. + {ok, _} = rpc:call(CtrlNode, vmq_server_cmd, node_leave, [Node]), + %% The disconnected sessions are equally migrated to the rest of the nodes + ok = wait_until_converged(RestNodes, + fun(N) -> + rpc:call(N, vmq_queue_sup, summary, []) + end, {0, 0, 0, 25, 0}). publish(Nodes, NrOfProcesses, NrOfMsgsPerProcess) -> publish(self(), Nodes, NrOfProcesses, NrOfMsgsPerProcess, []). @@ -287,6 +299,7 @@ publish_random(Nodes, N, Topic, Acc) -> ok = gen_tcp:send(Socket, Publish), ok = packet:expect_packet(Socket, "puback", Puback), ok = gen_tcp:send(Socket, Disconnect), + gen_tcp:close(Socket), publish_random(Nodes, N - 1, Topic, [Payload|Acc]). receive_publishes(_, _, []) -> ok; @@ -294,15 +307,11 @@ receive_publishes([{_,Port}=N|Nodes], Topic, Payloads) -> Connect = packet:gen_connect("connect-unclean", [{clean_session, false}, {keepalive, 10}]), Connack = packet:gen_connack(true, 0), - Disconnect = packet:gen_disconnect(), - %Opts = opts(Nodes), Opts = [{port, Port}], {ok, Socket} = packet:do_client_connect(Connect, Connack, Opts), case recv(Socket, <<>>) of {ok, #mqtt_publish{message_id=MsgId, payload=Payload}} -> ok = gen_tcp:send(Socket, packet:gen_puback(MsgId)), - ok = gen_tcp:send(Socket, Disconnect), - gen_tcp:close(Socket), receive_publishes(Nodes ++ [N], Topic, Payloads -- [Payload]); {error, _} -> receive_publishes(Nodes ++ [N], Topic, Payloads) @@ -338,92 +347,9 @@ hook_auth_on_subscribe(_, _, _) -> ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Internal %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -start_cluster(NrOfNodes) -> - Self = self(), - {MasterNode, Nodes} = State = - lists:foldl( - fun(I, {MasterNode, Acc}) -> - Name = list_to_atom("test_"++integer_to_list(I)), - Port = 18880 + I, - {NewMasterNode, Node} = - case MasterNode of - undefined -> - N = start_node(Name, Port, []), - {N, N}; - _ -> - N = start_node(Name, Port, [MasterNode]), - {MasterNode, N} - end, - Self ! done, - {NewMasterNode, [{Node, Port}|Acc]} - end , {undefined, []}, lists:seq(1, NrOfNodes)), - receive_times(done, NrOfNodes), - wait_til_ready(MasterNode), - Readies = fun() -> - RPCNodes = [N || {N, _} <- Nodes], - rpc:multicall(RPCNodes, vmq_cluster, recheck, []), - timer:sleep(500), - rpc:multicall(RPCNodes, vmq_cluster, is_ready, []) - end, - io:format(user, "cluster state ~p~n", [Readies()]), - State. - -start_node(Name, Port, DiscoveryNodes) -> - CodePath = lists:filter(fun filelib:is_dir/1, code:get_path()), - NodeConfig = [{monitor_master, true}, - {startup_functions, - [{code, set_path, [CodePath]}]}], - case ct_slave:start(Name, NodeConfig) of - {ok, Node} -> - ok = rpc:call(Node, vmq_test_utils, setup, []), - case DiscoveryNodes of - [] -> - ignore; - _ -> - {ok, _} = rpc:call(Node, vmq_server_cmd, node_join, DiscoveryNodes) - end, - wait_til_ready(Node), - {ok, _} = rpc:call(Node, vmq_server_cmd, set_config, [allow_anonymous, false]), - {ok, _} = rpc:call(Node, vmq_server_cmd, listener_start, [Port, []]), - - ok = rpc:call(Node, vmq_plugin_mgr, enable_module_plugin, - [auth_on_register, ?MODULE, hook_uname_password_success, 5]), - ok = rpc:call(Node, vmq_plugin_mgr, enable_module_plugin, - [auth_on_publish, ?MODULE, hook_auth_on_publish, 6]), - ok = rpc:call(Node, vmq_plugin_mgr, enable_module_plugin, - [auth_on_subscribe, ?MODULE, hook_auth_on_subscribe, 3]), - Node; - {error, already_started, Node} -> - ct_slave:stop(Node), - timer:sleep(1000), - start_node(Name, Port, DiscoveryNodes) - end. - - -stop_cluster(Nodes) -> - error_logger:info_msg("stop cluster nodes ~p", [Nodes]), - lists:foreach(fun stop_node/1, lists:reverse(Nodes)). - -stop_node({Node, _Port}) -> - [NodeNameStr|_] = re:split(atom_to_list(Node), "@", [{return, list}]), - try - ok = rpc:call(Node, vmq_test_utils, teardown, []), - ShortNodeName = list_to_existing_atom(NodeNameStr), - {ok, _} = ct_slave:stop(ShortNodeName) - catch - _:_ -> - ok - end. - +random_node(Nodes) -> + lists:nth(random:uniform(length(Nodes)), Nodes). -wait_til_ready(Node) -> - wait_til_ready(Node, rpc:call(Node, vmq_cluster, is_ready, []), 100). -wait_til_ready(_, true, _) -> ok; -wait_til_ready(Node, false, I) when I > 0 -> - timer:sleep(100), - wait_til_ready(Node, rpc:call(Node, vmq_cluster, is_ready, []), I - 1); -wait_til_ready(N, _, _) -> - exit({not_ready, N, rpc:call(N, erlang, whereis, [vmq_cluster])}). opts(Nodes) -> {_, Port} = lists:nth(random:uniform(length(Nodes)), Nodes), diff --git a/test/vmq_cluster_netsplit_SUITE.erl b/test/vmq_cluster_netsplit_SUITE.erl new file mode 100644 index 0000000..636798f --- /dev/null +++ b/test/vmq_cluster_netsplit_SUITE.erl @@ -0,0 +1,259 @@ +-module(vmq_cluster_netsplit_SUITE). +-export([ + %% suite/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2, + all/0 + ]). + +-export([publish_qos0_test/1, + register_consistency_test/1, + register_consistency_multiple_sessions_test/1, + register_not_ready_test/1]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/inet.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("vmq_commons/include/vmq_types.hrl"). + +%% =================================================================== +%% common_test callbacks +%% =================================================================== +init_per_suite(_Config) -> + lager:start(), + %% this might help, might not... + os:cmd(os:find_executable("epmd")++" -daemon"), + {ok, Hostname} = inet:gethostname(), + case net_kernel:start([list_to_atom("runner@"++Hostname), shortnames]) of + {ok, _} -> ok; + {error, {already_started, _}} -> ok + end, + lager:info("node name ~p", [node()]), + _Config. + +end_per_suite(_Config) -> + application:stop(lager), + _Config. + +init_per_testcase(Case, Config) -> + Nodes = vmq_cluster_test_utils:pmap( + fun({N, P}) -> + Node = vmq_cluster_test_utils:start_node(N, Config, Case), + {ok, _} = rpc:call(Node, vmq_server_cmd, listener_start, + [P, []]), + %% allow all + ok = rpc:call(Node, vmq_auth, register_hooks, []), + {Node, P} + end, [{test1, 18883}, + {test2, 18884}, + {test3, 18885}, + {test4, 18886}, + {test5, 18887}]), + {CoverNodes, _} = lists:unzip(Nodes), + {ok, _} = ct_cover:add_nodes(CoverNodes), + [{nodes, Nodes}|Config]. + +end_per_testcase(_, _Config) -> + vmq_cluster_test_utils:pmap(fun(Node) -> ct_slave:stop(Node) end, + [test1, test2, test3, test4, test5]), + ok. + +all() -> + [publish_qos0_test, + register_consistency_test, + register_consistency_multiple_sessions_test, + register_not_ready_test]. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Actual Tests +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +set_config(Key, Val) -> + rpc:multicall(vmq_server_cmd, set_config, [Key, Val]), + ok. + +register_consistency_test(Config) -> + ok = ensure_cluster(Config), + {_, Nodes} = lists:keyfind(nodes, 1, Config), + {Island1, Island2} = lists:split(length(Nodes) div 2, Nodes), + + %% Create Partitions + {Island1Names, _} = lists:unzip(Island1), + {Island2Names, _} = lists:unzip(Island2), + vmq_cluster_test_utils:partition_cluster(Island1Names, Island2Names), + + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_cluster, is_ready, []) + end, false), + + {_, Island1Port} = random_node(Island1), + {_, Island2Port} = random_node(Island2), + + Connect = packet:gen_connect("test-client", [{clean_session, true}, + {keepalive, 10}]), + %% Island 1 should return us the proper CONNACK(3) + {ok, _} = packet:do_client_connect(Connect, packet:gen_connack(3), + [{port, Island1Port}]), + %% Island 2 should return us the proper CONACK(3) + {ok, _} = packet:do_client_connect(Connect, packet:gen_connack(3), + [{port, Island2Port}]), + vmq_cluster_test_utils:heal_cluster(Island1Names, Island2Names), + ok. + +register_consistency_multiple_sessions_test(Config) -> + ok = ensure_cluster(Config), + {_, Nodes} = lists:keyfind(nodes, 1, Config), + {Island1, Island2} = lists:split(length(Nodes) div 2, Nodes), + + %% we configure the nodes to trade consistency for availability + set_config(trade_consistency, true), + set_config(allow_multiple_sessions, true), + + %% Create Partitions + {Island1Names, _} = lists:unzip(Island1), + {Island2Names, _} = lists:unzip(Island2), + vmq_cluster_test_utils:partition_cluster(Island1Names, Island2Names), + + {_, Island1Port} = random_node(Island1), + {_, Island2Port} = random_node(Island2), + + Connect = packet:gen_connect("test-client-multiple", [{clean_session, true}, + {keepalive, 10}]), + Connack = packet:gen_connack(0), + {ok, Socket1} = packet:do_client_connect(Connect, Connack, + [{port, Island1Port}]), + + {ok, Socket2} = packet:do_client_connect(Connect, Connack, + [{port, Island2Port}]), + vmq_cluster_test_utils:heal_cluster(Island1Names, Island2Names), + gen_tcp:close(Socket1), + gen_tcp:close(Socket2), + ok. + +register_not_ready_test(Config) -> + ok = ensure_cluster(Config), + {_, Nodes} = lists:keyfind(nodes, 1, Config), + {Island1, Island2} = lists:split(length(Nodes) div 2, Nodes), + + %% Connect a test-client + Connect = packet:gen_connect("test-client-not-ready", [{clean_session, true}, + {keepalive, 10}]), + Connack = packet:gen_connack(0), + {_, Port} = random_node(Nodes), + {ok, _Socket} = packet:do_client_connect(Connect, Connack, + [{port, Port}]), + + %% Create Partitions + {Island1Names, _} = lists:unzip(Island1), + {Island2Names, _} = lists:unzip(Island2), + vmq_cluster_test_utils:partition_cluster(Island1Names, Island2Names), + + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_cluster, is_ready, []) + end, false), + + %% we are now on a partitioned network and SHOULD NOT allow new connections + ConnNack = packet:gen_connack(3), %% server unavailable + [begin + {ok, S} = packet:do_client_connect(Connect, ConnNack, [{port, P}]), + gen_tcp:close(S) + end || {_, P} <- Nodes], + + %% fix cables + vmq_cluster_test_utils:heal_cluster(Island1Names, Island2Names), + + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_cluster, is_ready, []) + end, true), + + %% connect MUST go through now. + [begin + {ok, S} = packet:do_client_connect(Connect, Connack, [{port, P}]), + gen_tcp:close(S) + end || {_, P} <- Nodes], + ok. + +publish_qos0_test(Config) -> + ok = ensure_cluster(Config), + {_, Nodes} = lists:keyfind(nodes, 1, Config), + {Island1, Island2} = lists:split(length(Nodes) div 2, Nodes), + Connect = packet:gen_connect("test-netsplit-client", [{clean_session, false}, + {keepalive, 60}]), + Connack = packet:gen_connack(0), + Subscribe = packet:gen_subscribe(53, "netsplit/0/test", 0), + Suback = packet:gen_suback(53, 0), + {_, Island1Port} = random_node(Island1), + {ok, Socket} = packet:do_client_connect(Connect, Connack, [{port, + Island1Port}]), + ok = gen_tcp:send(Socket, Subscribe), + ok = packet:expect_packet(Socket, "suback", Suback), + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, 1}]), + + %% Create Partitions + {Island1Names, _} = lists:unzip(Island1), + {Island2Names, _} = lists:unzip(Island2), + vmq_cluster_test_utils:partition_cluster(Island1Names, Island2Names), + + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_cluster, is_ready, []) + end, false), + + {_, Island2Port} = random_node(Island2), + set_config(trade_consistency, true), + set_config(allow_multiple_sessions, true), + Publish = packet:gen_publish("netsplit/0/test", 0, <<"message">>, + [{mid, 1}]), + helper_pub_qos1("test-netsplit-sender", Publish, Island2Port), + + %% fix the network + vmq_cluster_test_utils:heal_cluster(Island1Names, Island2Names), + + %% the publish is expected once the netsplit is fixed + ok = packet:expect_packet(Socket, "publish", Publish). + + +helper_pub_qos1(ClientId, Publish, Port) -> + Connect = packet:gen_connect(ClientId, [{keepalive, 60}]), + Connack = packet:gen_connack(0), + {ok, Socket} = packet:do_client_connect(Connect, Connack, [{port, Port}]), + ok = gen_tcp:send(Socket, Publish), + gen_tcp:close(Socket). + +ensure_cluster(Config) -> + [{Node1, _}|OtherNodes] = Nodes = proplists:get_value(nodes, Config), + [begin + {ok, _} = rpc:call(Node, vmq_server_cmd, node_join, [Node1]) + end || {Node, _} <- OtherNodes], + {NodeNames, _} = lists:unzip(Nodes), + Expected = lists:sort(NodeNames), + ok = vmq_cluster_test_utils:wait_until_joined(NodeNames, Expected), + [?assertEqual({Node, Expected}, {Node, + lists:sort(vmq_cluster_test_utils:get_cluster_members(Node))}) + || Node <- NodeNames], + ok. + +wait_until_converged(Nodes, Fun, ExpectedReturn) -> + {NodeNames, _} = lists:unzip(Nodes), + vmq_cluster_test_utils:wait_until( + fun() -> + lists:all(fun(X) -> X == true end, + vmq_cluster_test_utils:pmap( + fun(Node) -> + ExpectedReturn == Fun(Node) + end, NodeNames)) + end, 60*2, 500). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Internal +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +random_node(Nodes) -> + lists:nth(random:uniform(length(Nodes)), Nodes). diff --git a/test/vmq_cluster_test_utils.erl b/test/vmq_cluster_test_utils.erl new file mode 100644 index 0000000..ba1e54a --- /dev/null +++ b/test/vmq_cluster_test_utils.erl @@ -0,0 +1,185 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2015 Helium Systems, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(vmq_cluster_test_utils). + +-export([ + get_cluster_members/1, + pmap/2, + wait_until/3, + wait_until_left/2, + wait_until_joined/2, + wait_until_offline/1, + wait_until_disconnected/2, + wait_until_connected/2, + start_node/3, + partition_cluster/2, + heal_cluster/2 + ]). +get_cluster_members(Node) -> + {Node, {ok, Res}} = {Node, rpc:call(Node, plumtree_peer_service_manager, get_local_state, [])}, + riak_dt_orswot:value(Res). + +pmap(F, L) -> + Parent = self(), + lists:foldl( + fun(X, N) -> + spawn_link(fun() -> + Parent ! {pmap, N, F(X)} + end), + N+1 + end, 0, L), + L2 = [receive {pmap, N, R} -> {N,R} end || _ <- L], + {_, L3} = lists:unzip(lists:keysort(1, L2)), + L3. + +wait_until(Fun, Retry, Delay) when Retry > 0 -> + Res = Fun(), + case Res of + true -> + ok; + _ when Retry == 1 -> + {fail, Res}; + _ -> + timer:sleep(Delay), + wait_until(Fun, Retry-1, Delay) + end. + +wait_until_left(Nodes, LeavingNode) -> + wait_until(fun() -> + lists:all(fun(X) -> X == true end, + pmap(fun(Node) -> + not + lists:member(LeavingNode, + get_cluster_members(Node)) + end, Nodes)) + end, 60*2, 500). + +wait_until_joined(Nodes, ExpectedCluster) -> + wait_until(fun() -> + lists:all(fun(X) -> X == true end, + pmap(fun(Node) -> + lists:sort(ExpectedCluster) == + lists:sort(get_cluster_members(Node)) + end, Nodes)) + end, 60*2, 500). + +wait_until_offline(Node) -> + wait_until(fun() -> + pang == net_adm:ping(Node) + end, 60*2, 500). + +wait_until_disconnected(Node1, Node2) -> + wait_until(fun() -> + pang == rpc:call(Node1, net_adm, ping, [Node2]) + end, 60*2, 500). + +wait_until_connected(Node1, Node2) -> + wait_until(fun() -> + pong == rpc:call(Node1, net_adm, ping, [Node2]) + end, 60*2, 500). + +start_node(Name, Config, Case) -> + CodePath = lists:filter(fun filelib:is_dir/1, code:get_path()), + %% have the slave nodes monitor the runner node, so they can't outlive it + NodeConfig = [ + {monitor_master, true}, + {erl_flags, "-smp"}, %% smp for the eleveldb god + {startup_functions, [ + {code, set_path, [CodePath]} + ]}], + case ct_slave:start(Name, NodeConfig) of + {ok, Node} -> + + PrivDir = proplists:get_value(priv_dir, Config), + NodeDir = filename:join([PrivDir, Node, Case]), + ok = rpc:call(Node, application, load, [vmq_server]), + ok = rpc:call(Node, application, load, [vmq_plugin]), + ok = rpc:call(Node, application, load, [plumtree]), + ok = rpc:call(Node, application, load, [lager]), + ok = rpc:call(Node, application, set_env, [lager, + log_root, + NodeDir]), + ok = rpc:call(Node, application, set_env, [plumtree, + plumtree_data_dir, + NodeDir]), + ok = rpc:call(Node, application, set_env, [plumtree, + metadata_root, + NodeDir ++ "/meta/"]), + ok = rpc:call(Node, application, set_env, [vmq_server, + listeners, + [{vmq, [{{{127,0,0,1}, + random_port(Node)}, + []}]} + ]]), + ok = rpc:call(Node, application, set_env, [vmq_server, + msg_store_opts, + [{store_dir, + NodeDir++"/msgstore"}] + ]), + ok = rpc:call(Node, application, set_env, [vmq_plugin, + wait_for_proc, + vmq_server_sup]), + ok = rpc:call(Node, application, set_env, [vmq_plugin, + plugin_dir, + NodeDir]), + + {ok, _} = rpc:call(Node, application, ensure_all_started, + [vmq_server]), + ok = wait_until(fun() -> + case rpc:call(Node, plumtree_peer_service_manager, get_local_state, []) of + {ok, _Res} -> + case rpc:call(Node, erlang, whereis, + [vmq_server_sup]) of + undefined -> + false; + P when is_pid(P) -> + true + end; + _ -> false + end + end, 60, 500), + Node; + {error, already_started, Node} -> + ct_slave:stop(Name), + wait_until_offline(Node), + start_node(Name, Config, Case) + end. + +partition_cluster(ANodes, BNodes) -> + pmap(fun({Node1, Node2}) -> + true = rpc:call(Node1, erlang, set_cookie, [Node2, canttouchthis]), + true = rpc:call(Node1, erlang, disconnect_node, [Node2]), + ok = wait_until_disconnected(Node1, Node2) + end, + [{Node1, Node2} || Node1 <- ANodes, Node2 <- BNodes]), + ok. + +heal_cluster(ANodes, BNodes) -> + GoodCookie = erlang:get_cookie(), + pmap(fun({Node1, Node2}) -> + true = rpc:call(Node1, erlang, set_cookie, [Node2, GoodCookie]), + ok = wait_until_connected(Node1, Node2) + end, + [{Node1, Node2} || Node1 <- ANodes, Node2 <- BNodes]), + ok. + +random_port(Node) -> + 10000 + (erlang:phash2(Node) rem 10000). diff --git a/test/vmq_netsplit_publish_SUITE.erl b/test/vmq_netsplit_publish_SUITE.erl deleted file mode 100644 index cd81ded..0000000 --- a/test/vmq_netsplit_publish_SUITE.erl +++ /dev/null @@ -1,86 +0,0 @@ --module(vmq_netsplit_publish_SUITE). --export([ - %% suite/0, - init_per_suite/1, - end_per_suite/1, - init_per_testcase/2, - end_per_testcase/2, - all/0 - ]). - --export([publish_qos0_test/1]). - --define(NET_TICK_TIME, 5). - -%% =================================================================== -%% common_test callbacks -%% =================================================================== -init_per_suite(_Config) -> - cover:start(), - _Config. - -end_per_suite(_Config) -> - _Config. - -init_per_testcase(_Case, Config) -> - Nodes = vmq_netsplit_utils:setup(?NET_TICK_TIME), - [{nodes, Nodes}|Config]. - -end_per_testcase(_, Config) -> - vmq_netsplit_utils:teardown(proplists:get_value(nodes, Config, [])), - Config. - -all() -> - [publish_qos0_test]. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Actual Tests -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -publish_qos0_test(Config) -> - Nodes = proplists:get_value(nodes, Config, []), - ok = vmq_netsplit_utils:check_connected(Nodes), - Connect = packet:gen_connect("test-netsplit-client", [{clean_session, false}, - {keepalive, 60}]), - Connack = packet:gen_connack(0), - Subscribe = packet:gen_subscribe(53, "netsplit/0/test", 0), - Suback = packet:gen_suback(53, 0), - Port = vmq_netsplit_utils:get_port(Nodes), - {ok, Socket} = packet:do_client_connect(Connect, Connack, - [{port, Port}]), - ok = gen_tcp:send(Socket, Subscribe), - ok = packet:expect_packet(Socket, "suback", Suback), - - % ensures the subscription is replicated - timer:sleep(100), - %% Create Partitions - {Island1, Island2} = vmq_netsplit_utils:partition_network(Nodes), - - %%================================%% - %% Window of Uncertanity %% - %% %% - %% Erlang thinks everything is ok %% - %% ..outch.. %% - %%================================%% - - %% in order the test sender can connect and register itself - %% we enable allow_multiple_session - ok = vmq_netsplit_utils:configure_trade_consistency(Nodes), - Publish = packet:gen_publish("netsplit/0/test", 0, <<"message">>, - [{mid, 1}]), - Island2Port = vmq_netsplit_utils:get_port(Island2), - helper_pub_qos1("test-netsplit-sender", Publish, Island2Port), - - %% fix the network - vmq_netsplit_utils:fix_network(Island1, Island2), - - %% the publish is expected once the netsplit is fixed - ok = packet:expect_packet(Socket, "publish", Publish). - - -helper_pub_qos1(ClientId, Publish, Port) -> - Connect = packet:gen_connect(ClientId, [{keepalive, 60}]), - Connack = packet:gen_connack(0), - {ok, Socket} = packet:do_client_connect(Connect, Connack, [{port, Port}]), - ok = gen_tcp:send(Socket, Publish), - gen_tcp:close(Socket). - diff --git a/test/vmq_netsplit_register_consistency_SUITE.erl b/test/vmq_netsplit_register_consistency_SUITE.erl deleted file mode 100644 index 66bf6d4..0000000 --- a/test/vmq_netsplit_register_consistency_SUITE.erl +++ /dev/null @@ -1,76 +0,0 @@ --module(vmq_netsplit_register_consistency_SUITE). --export([ - %% suite/0, - init_per_suite/1, - end_per_suite/1, - init_per_testcase/2, - end_per_testcase/2, - all/0 - ]). - --export([register_consistency_test/1]). - --define(NET_TICK_TIME, 5). - -%% =================================================================== -%% common_test callbacks -%% =================================================================== -init_per_suite(_Config) -> - cover:start(), - _Config. - -end_per_suite(_Config) -> - _Config. - -init_per_testcase(_Case, Config) -> - Nodes = vmq_netsplit_utils:setup(?NET_TICK_TIME), - [{nodes, Nodes}|Config]. - -end_per_testcase(_, Config) -> - vmq_netsplit_utils:teardown(proplists:get_value(nodes, Config, [])), - Config. - -all() -> - [register_consistency_test]. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Actual Tests -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -register_consistency_test(Config) -> - Nodes = proplists:get_value(nodes, Config, []), - ok = vmq_netsplit_utils:check_connected(Nodes), - - %% Create Partitions - {Island1, Island2} = vmq_netsplit_utils:partition_network(Nodes), - %% Island1 : [vmq1@marvin, vmq2@marvin] - %% Island2 : [vmq3@marvin, vmq4@marvin, vmq5@marvin] - - %%================================%% - %% Window of Uncertanity %% - %% %% - %% Erlang thinks everything is ok %% - %% ..outch.. %% - %%================================%% - - %% the registration is synchronized using vmq_reg_leader, depending if - %% we connect to a node sitting in the same partition as the leader we - %% get disconnected earlier in the registration process. - %% - %% vmq_reg_leader will use phash2(SubscriberId) rem length(Nodes) + 1 - %% to find the leader node... in this case this will always be the third - %% cluster node as phash2({"", <<"test-client">>}) rem 5 + 1 -> 3 - - PortInIsland1 = vmq_netsplit_utils:get_port(Island1), - PortInIsland2 = vmq_netsplit_utils:get_port(Island2), - - Connect = packet:gen_connect("test-client", [{clean_session, true}, - {keepalive, 10}]), - %% Island 1 should return us the proper CONNACK(3) - {ok, _} = packet:do_client_connect(Connect, packet:gen_connack(3), - [{port, PortInIsland1}]), - %% Island 2 should return us the proper CONACK(0) as the leader is in this - %% partition - {ok, _} = packet:do_client_connect(Connect, packet:gen_connack(0), - [{port, PortInIsland2}]), - vmq_netsplit_utils:fix_network(Island1, Island2), - ok. diff --git a/test/vmq_netsplit_register_multiple_session_SUITE.erl b/test/vmq_netsplit_register_multiple_session_SUITE.erl deleted file mode 100644 index 1675386..0000000 --- a/test/vmq_netsplit_register_multiple_session_SUITE.erl +++ /dev/null @@ -1,71 +0,0 @@ --module(vmq_netsplit_register_multiple_session_SUITE). --export([ - %% suite/0, - init_per_suite/1, - end_per_suite/1, - init_per_testcase/2, - end_per_testcase/2, - all/0 - ]). - --export([register_consistency_test/1]). - --define(NET_TICK_TIME, 5). - -%% =================================================================== -%% common_test callbacks -%% =================================================================== -init_per_suite(_Config) -> - cover:start(), - _Config. - -end_per_suite(_Config) -> - _Config. - -init_per_testcase(_Case, Config) -> - Nodes = vmq_netsplit_utils:setup(?NET_TICK_TIME), - [{nodes, Nodes}|Config]. - -end_per_testcase(_, Config) -> - vmq_netsplit_utils:teardown(proplists:get_value(nodes, Config, [])), - Config. - -all() -> - [register_consistency_test]. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Actual Tests -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -register_consistency_test(Config) -> - Nodes = proplists:get_value(nodes, Config, []), - ok = vmq_netsplit_utils:check_connected(Nodes), - %% we configure the nodes to trade consistency for availability - ok = vmq_netsplit_utils:configure_trade_consistency(Nodes), - - %% Create Partitions - {Island1, Island2} = vmq_netsplit_utils:partition_network(Nodes), - - %%================================%% - %% Window of Uncertanity %% - %% %% - %% Erlang thinks everything is ok %% - %% ..outch.. %% - %%================================%% - - PortInIsland1 = vmq_netsplit_utils:get_port(Island1), - PortInIsland2 = vmq_netsplit_utils:get_port(Island2), - - Connect = packet:gen_connect("test-client-multiple", [{clean_session, true}, - {keepalive, 10}]), - Connack = packet:gen_connack(0), - {ok, Socket1} = packet:do_client_connect(Connect, Connack, - [{port, PortInIsland1}]), - - {ok, Socket2} = packet:do_client_connect(Connect, Connack, - [{port, PortInIsland2}]), - vmq_netsplit_utils:fix_network(Island1, Island2), - gen_tcp:close(Socket1), - gen_tcp:close(Socket2), - ok. - - diff --git a/test/vmq_netsplit_register_not_ready_SUITE.erl b/test/vmq_netsplit_register_not_ready_SUITE.erl deleted file mode 100644 index d602f14..0000000 --- a/test/vmq_netsplit_register_not_ready_SUITE.erl +++ /dev/null @@ -1,96 +0,0 @@ --module(vmq_netsplit_register_not_ready_SUITE). --export([ - %% suite/0, - init_per_suite/1, - end_per_suite/1, - init_per_testcase/2, - end_per_testcase/2, - all/0 - ]). - --export([register_not_ready_test/1]). - --define(NET_TICK_TIME, 5). - -%% =================================================================== -%% common_test callbacks -%% =================================================================== -init_per_suite(_Config) -> - cover:start(), - _Config. - -end_per_suite(_Config) -> - _Config. - -init_per_testcase(_Case, Config) -> - Nodes = vmq_netsplit_utils:setup(?NET_TICK_TIME), - [{nodes, Nodes}|Config]. - -end_per_testcase(_, Config) -> - vmq_netsplit_utils:teardown(proplists:get_value(nodes, Config, [])), - Config. - -all() -> - [register_not_ready_test]. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Actual Tests -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -register_not_ready_test(Config) -> - Nodes = proplists:get_value(nodes, Config, []), - ok = vmq_netsplit_utils:check_connected(Nodes), - - %% Connect a test-client - Connect = packet:gen_connect("test-client-not-ready", [{clean_session, true}, - {keepalive, 10}]), - Connack = packet:gen_connack(0), - Port = vmq_netsplit_utils:get_port(Nodes), - {ok, _Socket} = packet:do_client_connect(Connect, Connack, - [{port, Port}]), - - %% Create Partitions - {Island1, Island2} = vmq_netsplit_utils:partition_network(Nodes), - - %%================================%% - %% Window of Uncertanity %% - %% %% - %% Erlang thinks everything is ok %% - %% ..outch.. %% - %%================================%% - - - %% SLEEP until cluster knows about net split - true = vmq_netsplit_utils:ensure_not_ready(Nodes), - %%================================%% - %% Window of Uncertanity %% - %% %% - %% -- CLOSED -- %% - %%================================%% - - vmq_netsplit_utils:check_connected(Island1), - vmq_netsplit_utils:check_connected(Island2), - - %% we are now on a partitioned network and SHOULD NOT allow new connections - ConnNack = packet:gen_connack(3), %% server unavailable - [begin - P = vmq_netsplit_utils:get_port([N]), - {ok, S} = packet:do_client_connect(Connect, ConnNack, [{port, P}]), - gen_tcp:close(S) - end || N <- Nodes], - - - %% fix cables - vmq_netsplit_utils:fix_network(Island1, Island2), - timer:sleep(1000), - vmq_netsplit_utils:check_connected(Nodes), - - - %% connect MUST go through now. - [begin - P = vmq_netsplit_utils:get_port([N]), - {ok, S} = packet:do_client_connect(Connect, Connack, [{port, P}]), - gen_tcp:close(S) - end || N <- Nodes], - - - ok. diff --git a/test/vmq_netsplit_subscribe_SUITE.erl b/test/vmq_netsplit_subscribe_SUITE.erl deleted file mode 100644 index 6181351..0000000 --- a/test/vmq_netsplit_subscribe_SUITE.erl +++ /dev/null @@ -1,101 +0,0 @@ --module(vmq_netsplit_subscribe_SUITE). --export([ - %% suite/0, - init_per_suite/1, - end_per_suite/1, - init_per_testcase/2, - end_per_testcase/2, - all/0 - ]). - --export([subscribe_clean_session_test/1]). - --define(NET_TICK_TIME, 5). - -%% =================================================================== -%% common_test callbacks -%% =================================================================== -init_per_suite(_Config) -> - cover:start(), - _Config. - -end_per_suite(_Config) -> - _Config. - -init_per_testcase(_Case, Config) -> - Nodes = vmq_netsplit_utils:setup(?NET_TICK_TIME), - [{nodes, Nodes}|Config]. - -end_per_testcase(_, Config) -> - vmq_netsplit_utils:teardown(proplists:get_value(nodes, Config, [])), - Config. - -all() -> - [subscribe_clean_session_test]. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Actual Tests -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -subscribe_clean_session_test(Config) -> - Nodes = proplists:get_value(nodes, Config, []), - ok = vmq_netsplit_utils:check_connected(Nodes), - Connect = packet:gen_connect("test-netsplit-client", [{clean_session, false}, - {keepalive, 10}]), - Connack = packet:gen_connack(0), - Subscribe = packet:gen_subscribe(53, "netsplit/0/test", 0), - Suback = packet:gen_suback(53, 0), - Port = vmq_netsplit_utils:get_port(Nodes), % port for first node - [FirstNode|_] = Nodes, - {ok, Socket} = packet:do_client_connect(Connect, Connack, - [{port, Port}]), - - %% Create Partitions - {Island1, Island2} = vmq_netsplit_utils:partition_network(Nodes), - - %%================================%% - %% Window of Uncertanity %% - %% %% - %% Erlang thinks everything is ok %% - %% ..outch.. %% - %%================================%% - - %% this Subscription will only be visible in one partition - ok = gen_tcp:send(Socket, Subscribe), - ok = packet:expect_packet(Socket, "suback", Suback), - timer:sleep(100), - Island1Res = vmq_netsplit_utils:proxy_multicall(Island1, vmq_reg, - subscriptions_for_subscriber_id, - [{"", <<"test-netsplit-client">>}]), - io:format(user, "subscrptions on island 1: ~p", [Island1Res]), - Island1Res = [[{[<<"netsplit">>, <<"0">>, <<"test">>], 0, FirstNode}] - || _ <- lists:seq(1, length(Island1))], - Island2Res = vmq_netsplit_utils:proxy_multicall(Island2, vmq_reg, - subscriptions_for_subscriber_id, - [{"", <<"test-netsplit-client">>}]), - Island2Res = [[] || _ <- lists:seq(1, length(Island2))], - - %% SLEEP until cluster knows about net split - true = vmq_netsplit_utils:ensure_not_ready(Nodes), - - %%================================%% - %% Window of Uncertanity %% - %% %% - %% -- CLOSED -- %% - %%================================%% - - vmq_netsplit_utils:check_connected(Island1), - vmq_netsplit_utils:check_connected(Island2), - - - %% fix the network - vmq_netsplit_utils:fix_network(Island1, Island2), - timer:sleep(?NET_TICK_TIME * 2000), - vmq_netsplit_utils:check_connected(Nodes), - - %% unsplit should have merged the tables - timer:sleep(10000), %% wait until async rebuild triggered - NodesRes = vmq_netsplit_utils:proxy_multicall(Nodes, vmq_reg, - subscriptions_for_subscriber_id, - [{"", <<"test-netsplit-client">>}]), - NodesRes = [[{[<<"netsplit">>, <<"0">>, <<"test">>], 0, FirstNode}] - || _ <- lists:seq(1, length(Nodes))]. diff --git a/test/vmq_netsplit_utils.erl b/test/vmq_netsplit_utils.erl deleted file mode 100644 index d25c8c6..0000000 --- a/test/vmq_netsplit_utils.erl +++ /dev/null @@ -1,254 +0,0 @@ --module(vmq_netsplit_utils). --export([setup/1, - teardown/1, - partition_network/1, - fix_network/2, - get_port/1, - check_connected/1, - configure_trade_consistency/1, - ensure_not_ready/1, - call_proxy/4, - proxy_multicall/4]). - --export([start_app/3, - proxy/0, - wait_til_ready/0]). - --define(NR_OF_NODES, 5). --define(INIT_PORT, 18880). --define(DEFAULT_EPMDPXY_PORT, 4369). - --export([hook_auth_on_publish/6, - hook_auth_on_subscribe/3]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Setup Functions -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -setup(NetTickTime) -> - {A, B, C} = os:timestamp(), - random:seed(A, B, C), - os:cmd("killall epmd"), - epmdpxy:start(?DEFAULT_EPMDPXY_PORT), - timer:sleep(1000), - %% ct:pal("Started EPMDPXY on node ~p with EPMD port ~p~n", - %% [node(), ?DEFAULT_EPMDPXY_PORT]), - vmq_test_utils:maybe_start_distribution(vmq_ct_master), - set_net_ticktime(NetTickTime), - Hosts = hosts(), - [DiscoveryNode|_] = Ns = start_slaves(NetTickTime, Hosts), - ct_cover:add_nodes(Ns), - try - [ok = rpc:call(Node, ?MODULE, start_app, [NetTickTime, DiscoveryNode, I]) - || {I, Node} <- lists:zip(lists:seq(1, length(Ns)), Ns)], - wait_till_cluster_ready(Ns), - lists:sort(Ns) - catch - _:R -> - stop_slaves(Ns), - application:stop(epmdpxy), - exit(R) - end. - -teardown(Nodes) -> - stop_slaves(Nodes), - application:stop(epmdpxy). - -partition_network(Nodes) -> - %% Create Partitions - Size = ?NR_OF_NODES div 2, - Island1 = lists:sublist(Nodes, Size), - Island2 = Nodes -- Island1, - io:format(user, "Create two partitions ~p and ~p~n", [Island1, Island2]), - epmdpxy:cut_cables(Island1, Island2), - {Island1, Island2}. - -fix_network(Island1, Island2) -> - epmdpxy:fix_cables(Island1, Island2). - -get_port([Node|_]) -> - [Name, _] = re:split(atom_to_list(Node), "@", [{return, list}]), - [I|_] = lists:reverse(Name), - ?INIT_PORT + list_to_integer([I]). - -check_connected(Nodes) -> - check_connected(Nodes, 0). -check_connected([N1, N2|Rest] = Nodes, I) when length(Nodes) < I -> - %% ensure all nodes are connected - [N2|Rest] = call_proxy(N1, erlang, nodes, []) -- [node()], - check_connected([N2|Rest] ++ [N1], I + 1); -check_connected(_ , _) -> ok. - -configure_trade_consistency(Nodes) -> - proxy_multicall(Nodes, vmq_server_cmd, set_config, [ - trade_consistency, - true]), - %% we must also allow multiple sessions, if we want to let - %% new clients register during netsplit, otherwise - %% vmq_reg_leader will complain about the unstable cluster - proxy_multicall(Nodes, vmq_server_cmd, set_config, [ - allow_multiple_sessions, - true]), - ok. - -ensure_not_ready(Nodes) -> - ensure_not_ready(Nodes, 60000). - -ensure_not_ready(_, 0) -> - error_logger:error_report("cluster can't agree on cluster state on time"), - false; -ensure_not_ready(Nodes, Max) -> - Ready = proxy_multicall(Nodes, vmq_cluster, is_ready, []), - case lists:member(true, Ready) of - false -> - %% ct:pal("cluster is not ready anymore"), - true; - _ -> - %% ct:pal("still no consensus about cluster state ~p", [Ready]), - timer:sleep(1000), - ensure_not_ready(Nodes, Max - 1000) - end. - -hosts() -> - [list_to_atom("vmq"++integer_to_list(I)) - || I <- lists:seq(1, ?NR_OF_NODES)]. - - - - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Hooks -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -hook_auth_on_publish(_, _, _, _, _, _) -> ok. -hook_auth_on_subscribe(_, _, _) -> ok. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Internal -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -start_app(NetTickTime, DiscoveryNode, I) -> - Port = ?INIT_PORT + I, - ok = set_net_ticktime(NetTickTime), - vmq_test_utils:setup(), - application:load(vmq_server), - vmq_server_cmd:set_config(allow_anonymous, true), - vmq_server_cmd:set_config(retry_interval, 10), - vmq_server_cmd:listener_start(Port, []), - case I of - 1 -> - % we are the discovery node - ignore; - _ -> - vmq_server_cmd:node_join(DiscoveryNode) - end, - vmq_plugin_mgr:enable_module_plugin(auth_on_publish, ?MODULE, - hook_auth_on_publish, 6), - vmq_plugin_mgr:enable_module_plugin(auth_on_subscribe, ?MODULE, - hook_auth_on_subscribe, 3), - - io:fwrite(user, "vernemq started ~p~n", [node()]). - -wait_till_cluster_ready([N|Nodes]) -> - call_proxy(N, ?MODULE, wait_til_ready, []), - wait_till_cluster_ready(Nodes); -wait_till_cluster_ready([]) -> ok. - -wait_til_ready() -> - wait_til_ready(vmq_cluster:is_ready(), 100). - -wait_til_ready(true, _) -> ok; -wait_til_ready(false, I) when I > 0 -> - timer:sleep(500), - wait_til_ready(vmq_cluster:is_ready(), I - 1); -wait_til_ready(_, _) -> - exit(not_ready). - - -%% from uwiger/locks --define(PROXY, vmq_server_test_proxy). -proxy() -> - register(?PROXY, self()), - process_flag(trap_exit, true), - proxy_loop(). - -proxy_loop() -> - receive - {From, Ref, apply, M, F, A} -> - From ! {Ref, (catch apply(M,F,A))}; - _ -> - ok - end, - proxy_loop(). - -proxy_multicall(Ns, M, F, A) -> - [call_proxy(N, M, F, A) || N <- Ns]. - -call_proxy(N, M, F, A) -> - Ref = erlang:monitor(process, {?PROXY, N}), - {?PROXY, N} ! {self(), Ref, apply, M, F, A}, - receive - {'DOWN', Ref, _, _, Reason} -> - error({proxy_died, N, Reason}); - {Ref, Result} -> - Result - after 5000 -> - error(proxy_call_timeout) - end. - -start_slaves(NetTickTime, Ns) -> - - Nodes = [start_slave(NetTickTime, N) || N <- Ns], - Nodes. - -start_slave(NetTickTime, Name) -> - CodePath = lists:filter(fun filelib:is_dir/1, code:get_path()), - NodeConfig = [{monitor_master, true}, - {erl_flags, "-kernel net_ticktime " ++ integer_to_list(NetTickTime)}, - {startup_functions, - [{code, set_path, [CodePath]}]}], - {ok, Node} = ct_slave:start(host(), Name, NodeConfig), - spawn(Node, ?MODULE, proxy, []), - Node. - -stop_slaves(Ns) -> - [ok = stop_slave(N) || N <- Ns], - ok. - -stop_slave(N) -> - try erlang:monitor_node(N, true) of - true -> - rpc:call(N, erlang, halt, []), - receive - {nodedown, N} -> ok - after 10000 -> - erlang:error(slave_stop_timeout) - end - catch - error:badarg -> - ok - end. - -host() -> - [_Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]), - list_to_atom(Host). - -set_net_ticktime(NetTickTime) -> - %% ct:pal("change net_ticktime on node ~p to ~p initiated", [node(), NetTickTime]), - case net_kernel:set_net_ticktime(NetTickTime, NetTickTime) of - unchanged -> - %% ct:pal("net_ticktime on node ~p changed", [node()]), - ok; - change_initiated -> - wait_till_net_tick_converged(NetTickTime); - {ongoing_change_to, _} -> - wait_till_net_tick_converged(NetTickTime) - end. - -wait_till_net_tick_converged(NetTickTime) -> - case net_kernel:get_net_ticktime() of - NetTickTime -> - %% ct:pal("net_ticktime on node ~p changed", [node()]), - ok; - {ongoing_change_to, NetTickTime} -> - timer:sleep(1000), - wait_till_net_tick_converged(NetTickTime) - end. diff --git a/test/vmq_queue_SUITE.erl b/test/vmq_queue_SUITE.erl index fd28bee..673680d 100644 --- a/test/vmq_queue_SUITE.erl +++ b/test/vmq_queue_SUITE.erl @@ -58,7 +58,7 @@ queue_crash_test(_) -> SessionPid1 = spawn(fun() -> mock_session(Parent) end), {ok, false, QPid1} = vmq_reg_leader:register_subscriber(SessionPid1, SubscriberId, QueueOpts), - {ok, [1]} = vmq_reg:subscribe(false, <<"mock-user">>, SubscriberId, + {ok, [1]} = vmq_reg:subscribe(false, false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"topic">>], 1}]), %% at this point we've a working subscription timer:sleep(10), @@ -96,7 +96,7 @@ queue_fifo_test(_) -> SessionPid1 = spawn(fun() -> mock_session(Parent) end), {ok, false, QPid} = vmq_reg_leader:register_subscriber(SessionPid1, SubscriberId, QueueOpts), - {ok, [1]} = vmq_reg:subscribe(false, <<"mock-user">>, SubscriberId, + {ok, [1]} = vmq_reg:subscribe(false, false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"fifo">>, <<"topic">>], 1}]), %% teardown session SessionPid1 ! go_down, @@ -117,7 +117,7 @@ queue_lifo_test(_) -> SessionPid1 = spawn(fun() -> mock_session(Parent) end), {ok, false, QPid} = vmq_reg_leader:register_subscriber(SessionPid1, SubscriberId, QueueOpts), - {ok, [1]} = vmq_reg:subscribe(false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"lifo">>, <<"topic">>], 1}]), + {ok, [1]} = vmq_reg:subscribe(false, false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"lifo">>, <<"topic">>], 1}]), %% teardown session SessionPid1 ! go_down, timer:sleep(10), @@ -138,7 +138,7 @@ queue_fifo_offline_drop_test(_) -> SessionPid1 = spawn(fun() -> mock_session(Parent) end), {ok, false, QPid} = vmq_reg_leader:register_subscriber(SessionPid1, SubscriberId, QueueOpts), - {ok, [1]} = vmq_reg:subscribe(false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"fifo">>, <<"topic">>], 1}]), + {ok, [1]} = vmq_reg:subscribe(false, false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"fifo">>, <<"topic">>], 1}]), %% teardown session SessionPid1 ! go_down, timer:sleep(10), @@ -162,7 +162,7 @@ queue_lifo_offline_drop_test(_) -> SessionPid1 = spawn(fun() -> mock_session(Parent) end), {ok, false, QPid} = vmq_reg_leader:register_subscriber(SessionPid1, SubscriberId, QueueOpts), - {ok, [1]} = vmq_reg:subscribe(false, <<"mock-user">>, SubscriberId, + {ok, [1]} = vmq_reg:subscribe(false, false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"lifo">>, <<"topic">>], 1}]), %% teardown session SessionPid1 ! go_down, @@ -186,7 +186,7 @@ queue_offline_transition_test(_) -> queue_type => fifo}), SessionPid1 = spawn(fun() -> mock_session(Parent) end), {ok, false, QPid} = vmq_reg_leader:register_subscriber(SessionPid1, SubscriberId, QueueOpts), - {ok, [1]} = vmq_reg:subscribe(false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"transition">>], 1}]), + {ok, [1]} = vmq_reg:subscribe(false, false, <<"mock-user">>, SubscriberId, [{[<<"test">>, <<"transition">>], 1}]), timer:sleep(10), % give some time to plumtree %% teardown session diff --git a/test/vmq_queue_hooks_SUITE.erl b/test/vmq_queue_hooks_SUITE.erl index 4b62183..06db197 100644 --- a/test/vmq_queue_hooks_SUITE.erl +++ b/test/vmq_queue_hooks_SUITE.erl @@ -121,26 +121,22 @@ hook_auth_on_subscribe(_, _, _) -> ok. hook_auth_on_publish(_, _, _, _, _, _) -> ok. hook_on_client_wakeup({"", <<"queue-client">>}) -> - ets:insert(?MODULE, {on_client_wakeup, true}), - io:format(user, "on_client_wakeup~n", []); + ets:insert(?MODULE, {on_client_wakeup, true}); hook_on_client_wakeup(_) -> ok. hook_on_client_gone({"", <<"queue-client">>}) -> - ets:insert(?MODULE, {on_client_gone, true}), - io:format(user, "on_client_gone~n", []); + ets:insert(?MODULE, {on_client_gone, true}); hook_on_client_gone(_) -> ok. hook_on_client_offline({"", <<"queue-client">>}) -> - ets:insert(?MODULE, {on_client_offline, true}), - io:format(user, "on_client_offline~n", []); + ets:insert(?MODULE, {on_client_offline, true}); hook_on_client_offline(_) -> ok. hook_on_offline_message({"", <<"queue-client">>}) -> - ets:insert(?MODULE, {on_offline_message, true}), - io:format(user, "on_offline_message~n", []); + ets:insert(?MODULE, {on_offline_message, true}); hook_on_offline_message(_) -> ok. diff --git a/test/vmq_subscriber_groups_SUITE.erl b/test/vmq_subscriber_groups_SUITE.erl new file mode 100644 index 0000000..d077a0c --- /dev/null +++ b/test/vmq_subscriber_groups_SUITE.erl @@ -0,0 +1,260 @@ +-module(vmq_subscriber_groups_SUITE). +-export([ + %% suite/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2, + all/0 + ]). + +-export([subscriber_groups_test/1, + retain_test/1]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/inet.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("vmq_commons/include/vmq_types.hrl"). + +%% =================================================================== +%% common_test callbacks +%% =================================================================== +init_per_suite(_Config) -> + %lager:start(), + %% this might help, might not... + os:cmd(os:find_executable("epmd")++" -daemon"), + {ok, Hostname} = inet:gethostname(), + case net_kernel:start([list_to_atom("runner@"++Hostname), shortnames]) of + {ok, _} -> ok; + {error, {already_started, _}} -> ok + end, + lager:info("node name ~p", [node()]), + _Config. + +end_per_suite(_Config) -> + application:stop(lager), + _Config. + +init_per_testcase(Case, Config) -> + Nodes = vmq_cluster_test_utils:pmap( + fun({N, P}) -> + Node = vmq_cluster_test_utils:start_node(N, Config, Case), + {ok, _} = rpc:call(Node, vmq_server_cmd, listener_start, + [P, []]), + {ok, _} = rpc:call(Node, vmq_server_cmd, set_config, [ + allow_subscriber_groups, true]), + %% allow all + ok = rpc:call(Node, vmq_auth, register_hooks, []), + {Node, P} + end, [{test1, 18883}, + {test2, 18884}, + {test3, 18885}, + {test4, 18886}, + {test5, 18887}]), + {CoverNodes, _} = lists:unzip(Nodes), + {ok, _} = ct_cover:add_nodes(CoverNodes), + [{nodes, Nodes}|Config]. + +end_per_testcase(_, _Config) -> + vmq_cluster_test_utils:pmap(fun(Node) -> ct_slave:stop(Node) end, + [test1, test2, test3, test4, test5]), + ok. + +all() -> + [subscriber_groups_test, + retain_test]. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Actual Tests +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +subscriber_groups_test(Config) -> + ok = ensure_cluster(Config), + {_, Nodes} = lists:keyfind(nodes, 1, Config), + NumSubs = 10, + NumMsgs = 100, + _ = start_subscribers(NumSubs, Nodes), + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, NumSubs}]), + Payloads = send(NumMsgs, Nodes, []), + Ret = mgr_recv_loop(Payloads, #{}), + check_uniformness(NumSubs, NumMsgs, Ret), + ?assertEqual([], flush_mailbox([])). + +retain_test(Config) -> + %% we have to test, that all subscribers receive the proper retain + %% message. Retain handling shouldn't change in case the subscription + %% is part of a subscriber group. + ok = ensure_cluster(Config), + {_, [{_, Port}|_] = Nodes} = lists:keyfind(nodes, 1, Config), + NumSubs = 10, + Connect = packet:gen_connect("sub-group-sender", [{clean_session, true}, + {keepalive, 60}]), + Connack = packet:gen_connack(0), + {ok, PubSocket} = packet:do_client_connect(Connect, Connack, [{port, Port}]), + Topic = "a/b/c", + Payload = crypto:rand_bytes(64), + Publish = packet:gen_publish(Topic, 1, Payload, [{mid, 1}, {retain, true}]), + Puback = packet:gen_puback(1), + ok = gen_tcp:send(PubSocket, Publish), + ok = packet:expect_packet(PubSocket, "puback", Puback), + gen_tcp:close(PubSocket), + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, retained, []) + end, 1), + _ = start_subscribers(NumSubs, Nodes), + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, NumSubs}]), + Ret = mgr_recv_loop([Payload||_<-lists:seq(1,NumSubs)], #{}), + check_uniformness(NumSubs, NumSubs, Ret), + ?assertEqual([], flush_mailbox([])). + +republish_test(Config) -> + ok = ensure_cluster(Config), + {_, [{_FirstPort}|_] = Nodes} = lists:keyfind(nodes, 1, Config), + NumSubs = 10, + NumMsgs = 100, + + %% Create NumSubs -1 Subscribers on all nodes which are not acking + %% any PUBLISH they receive. + NotAckingPids = start_subscribers(NumSubs - 1, Nodes, false, []), + %% Create 1 Subscriber on one node which is behaving normally + Self = self(), + ClientId = "sub-group-client-xyz", + AckingPid = spawn_link(fun() -> recv_connect(Self, ClientId, Port, true) end), + + %% Wait until cluster has converged + ok = wait_until_converged(Nodes, + fun(N) -> + rpc:call(N, vmq_reg, total_subscriptions, []) + end, [{total, NumSubs}]), + %% Send messages, only one subscriber is properly acking the messages + Payloads = send(NumMsgs, Nodes, []), + %% Let's kill one after the other + Ret = mgr_recv_loop(Payloads, #{}), + check_uniformness(NumSubs, NumMsgs, Ret), + ?assertEqual([], flush_mailbox([])). + + +start_subscribers(N, Nodes) -> + start_subscribers(N, Nodes, true, []). + +start_subscribers(0, _, DoAck, Pids) -> Pids; +start_subscribers(N, [{_, Port} = Node|Nodes], DoAck, Acc) -> + Self = self(), + ClientId = "sub-group-client-"++integer_to_list(N), + Pid = spawn_link(fun() -> recv_connect(Self, ClientId, Port, DoAck) end), + start_subscribers(N - 1, Nodes ++ [Node], DoAck, [Pid|Acc]). + +recv_connect(Parent, ClientId, Port, DoAck) -> + Connect = packet:gen_connect(ClientId, [{clean_session, true}, + {keepalive, 60}]), + Connack = packet:gen_connack(0), + SubscriberGroupTopic = "$GROUP-mygroup/a/b/c", + Subscribe = packet:gen_subscribe(123, SubscriberGroupTopic, 1), + Suback = packet:gen_suback(123, 1), + {ok, Socket} = packet:do_client_connect(Connect, Connack, [{port, Port}]), + ok = gen_tcp:send(Socket, Subscribe), + ok = packet:expect_packet(Socket, "suback", Suback), + inet:setopts(Socket, [{active, true}]), + recv_loop(Parent, Port, Socket, <<>>, DoAck). + +recv_loop(Parent, Port, Socket, Buf, DoAck) -> + case vmq_parser:parse(Buf) of + more -> + ok; + {error, _} = E -> + exit(E); + {#mqtt_publish{message_id=MsgId, payload=Payload}, NewBuf} when DoAck-> + ok = gen_tcp:send(Socket, packet:gen_puback(MsgId)), + Parent ! {recv, self(), Payload}, + recv_loop(Parent, Port, Socket, NewBuf, DoAck); + {_, NewBuf} -> + recv_loop(Parent, Port, Socket, NewBuf, DoAck) + end, + receive + {tcp, Socket, Data} -> + recv_loop(Parent, Port, Socket, <>, DoAck); + {tcp_closed, Socket} -> + ok; + Else -> + exit(Else) + + end. + +flush_mailbox(Acc) -> + receive + M -> + flush_mailbox([M|Acc]) + after + 0 -> + Acc + end. + + +mgr_recv_loop([], Acc) -> Acc; +mgr_recv_loop(Payloads, Acc) -> + receive + {recv, Pid, Payload} -> + Cnt = maps:get(Pid, Acc, 0), + mgr_recv_loop(Payloads -- [Payload], maps:put(Pid, Cnt + 1, Acc)) + end. + +check_uniformness(TotalSubs, TotalMsgs, RecvRet) -> + ?assertEqual({0,0}, + maps:fold(fun(_,Cnt, {NumSubs, NumMsgs}) -> + ?assert(Cnt > 0), + {NumSubs - 1, NumMsgs - Cnt} + end, {TotalSubs, TotalMsgs}, RecvRet)). + + + +send(0, _, Acc) -> Acc; +send(N, [{_, Port} = Node|Nodes], Acc) -> + Connect = packet:gen_connect("sub-group-sender", [{clean_session, true}, + {keepalive, 60}]), + Connack = packet:gen_connack(0), + {ok, PubSocket} = packet:do_client_connect(Connect, Connack, [{port, Port}]), + Topic = "a/b/c", + Payload = crypto:rand_bytes(64), + Publish = packet:gen_publish(Topic, 1, Payload, [{mid, 1}]), + Puback = packet:gen_puback(1), + ok = gen_tcp:send(PubSocket, Publish), + ok = packet:expect_packet(PubSocket, "puback", Puback), + Disconnect = packet:gen_disconnect(), + gen_tcp:send(PubSocket, Disconnect), + gen_tcp:close(PubSocket), + send(N - 1, [Node|Nodes], [Payload|Acc]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Internal +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +ensure_cluster(Config) -> + [{Node1, _}|OtherNodes] = Nodes = proplists:get_value(nodes, Config), + [begin + {ok, _} = rpc:call(Node, vmq_server_cmd, node_join, [Node1]) + end || {Node, _} <- OtherNodes], + {NodeNames, _} = lists:unzip(Nodes), + Expected = lists:sort(NodeNames), + ok = vmq_cluster_test_utils:wait_until_joined(NodeNames, Expected), + [?assertEqual({Node, Expected}, {Node, + lists:sort(vmq_cluster_test_utils:get_cluster_members(Node))}) + || Node <- NodeNames], + ok. + +wait_until_converged(Nodes, Fun, ExpectedReturn) -> + {NodeNames, _} = lists:unzip(Nodes), + vmq_cluster_test_utils:wait_until( + fun() -> + lists:all(fun(X) -> X == true end, + vmq_cluster_test_utils:pmap( + fun(Node) -> + ExpectedReturn == Fun(Node) + end, NodeNames)) + end, 60*2, 500). + +