Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%-*-Erlang-*-

{deps, [
{lager, ".*", {git, "https://github.com/basho/lager.git", {tag, "2.1.1"}}},
% {lager, ".*", {git, "https://github.com/basho/lager.git", {tag, "2.1.1"}}},
{gen_listener_tcp, ".*", {git, "https://github.com/travelping/gen_listener_tcp.git", {tag, "0.3.1"}}}

%% This is needed to run ezmq_zmq2_SUITE.erl
Expand All @@ -15,4 +15,4 @@
{cover_export_enabled, false}.
{cover_print_enabled, true}.

{erl_opts, [debug_info, {parse_transform, lager_transform}]}.
{erl_opts, [debug_info]}.
4 changes: 2 additions & 2 deletions rebar.config.travis
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%-*-Erlang-*-

{deps, [
{lager, ".*", {git, "https://github.com/basho/lager.git", {branch, "master"}}},
% {lager, ".*", {git, "https://github.com/basho/lager.git", {branch, "master"}}},
{gen_listener_tcp, ".*", {git, "https://github.com/travelping/gen_listener_tcp.git", {branch, "master"}}},
{erlzmq, ".*", {git, "https://github.com/zeromq/erlzmq2.git", {branch, "2.x"}}}
]}.
Expand All @@ -10,4 +10,4 @@
{cover_export_enabled, false}.
{cover_print_enabled, true}.

{erl_opts, [debug_info, {parse_transform, lager_transform}]}.
{erl_opts, [debug_info]}.
1 change: 0 additions & 1 deletion src/ezmq.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
kernel,
stdlib,
sasl,
lager,
gen_listener_tcp
]},
{mod, { ezmq_app, []}},
Expand Down
25 changes: 16 additions & 9 deletions src/ezmq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ handle_call({bind, tcp, Port, Opts}, _From, MqSState = #ezmq_socket{version = Ve
{nodelay,true}, {packet,raw}, {reuseaddr,true}],
TcpOpts1 = pass_inet_opts(Opts, TcpOpts0),

lager:debug("bind: ~p", [TcpOpts1]),
logger:debug("bind: ~p", [TcpOpts1]),
case ezmq_tcp_socket:start_link(Version, Type, Identity, Port, TcpOpts1) of
{ok, Pid} ->
Listen = orddict:append(Pid, {tcp, Port, Opts}, MqSState#ezmq_socket.listen_trans),
Expand Down Expand Up @@ -344,14 +344,21 @@ handle_call(sockname, _From, #ezmq_socket{listen_trans = ListenTrans, transports
handle_cast({deliver_accept, Transport, RemoteId}, State) ->
State1 = transports_activate(Transport, RemoteId, State),
send_owner_event(RemoteId, accepted, State1),
lager:debug("DELIVER_ACCPET: ~p", [lager:pr(State1, ?MODULE)]),
logger:debug("DELIVER_ACCPET: ~p", [State1, ?MODULE]),
if(State1#ezmq_socket.type == sub) ->
logger:debug("Subscribe transport: ~p, S ~p", [Transport,State]),
%TODO: add fitlters to State and subscribe to needed channels only
ezmq_link_send({[Transport], [<<1,"">>]}, State1);
true ->
ok
end,
State2 = send_queue_run(State1),
{noreply, State2};

handle_cast({deliver_connect, Transport, {ok, RemoteId}}, State) ->
State1 = transports_activate(Transport, RemoteId, State),
send_owner_event(RemoteId, connected, State1),
lager:debug("DELIVER_CONNECT: ~p", [lager:pr(State1, ?MODULE)]),
logger:debug("DELIVER_CONNECT: ~p", [State1]),
State2 = send_queue_run(State1),
{noreply, State2};

Expand All @@ -361,7 +368,7 @@ handle_cast({deliver_connect, Transport, Reply}, State = #ezmq_socket{connecting
{error, Reason} when Reason == eagain; Reason == ealready;
Reason == econnrefused; Reason == econnreset ->
ConnectArgs = orddict:fetch(Transport, Connecting),
lager:debug("CArgs: ~w", [ConnectArgs]),
logger:debug("CArgs: ~w", [ConnectArgs]),
erlang:send_after(3000, self(), {reconnect, ConnectArgs#cargs{failcnt = ConnectArgs#cargs.failcnt + 1}}),
State2 = State#ezmq_socket{connecting = orddict:erase(Transport, Connecting)},
{noreply, State2};
Expand All @@ -384,7 +391,7 @@ handle_cast({deliver_close, Transport}, State = #ezmq_socket{connecting = Connec
check_send_queue(State2)
end,
_State4 = queue_run(State3),
lager:debug("DELIVER_CLOSE: ~p", [lager:pr(_State4, ?MODULE)]),
logger:debug("DELIVER_CLOSE: ~p", [_State4]),
{noreply, State3};

handle_cast({deliver_recv, Transport, IdMsg}, State) ->
Expand Down Expand Up @@ -413,7 +420,7 @@ handle_info({reconnect, ConnectArgs}, #ezmq_socket{} = State) ->
{noreply, NewState};

handle_info({'EXIT', Pid, Reason}, #ezmq_socket{owner = Pid} = State) ->
lager:debug("owner process died: ~p~n", [Reason]),
logger:debug("owner process died: ~p~n", [Reason]),
{stop, normal, State};

handle_info({'EXIT', Pid, _Reason}, MqSState) ->
Expand Down Expand Up @@ -457,7 +464,7 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================

do_connect(ConnectArgs = #cargs{family = tcp}, MqSState = #ezmq_socket{version = Version, type = Type, identity = Identity}) ->
lager:debug("starting connect: ~w", [ConnectArgs]),
logger:debug("starting connect: ~w", [ConnectArgs]),
#cargs{address = Address, port = Port, tcpopts = TcpOpts,
timeout = Timeout, failcnt = _FailCnt} = ConnectArgs,
{ok, Transport} = ezmq_link:start_connection(),
Expand Down Expand Up @@ -560,7 +567,7 @@ next_mode(#ezmq_socket{mode = active_once} = State) ->
State#ezmq_socket{mode = passive}.

handle_deliver_recv(Transport, IdMsg, MqSState) ->
lager:debug("deliver_recv: ~w, ~w", [Transport, IdMsg]),
logger:debug("deliver_recv: ~w, ~w", [Transport, IdMsg]),
case ezmq_socket_fsm:check({deliver_recv, Transport, IdMsg}, MqSState) of
ok ->
MqSState0 = handle_deliver_recv_2(Transport, IdMsg, queue_size(MqSState), MqSState),
Expand Down Expand Up @@ -645,7 +652,7 @@ queue_close(Transport, MqSState = #ezmq_socket{recv_q = Q}) ->
MqSState#ezmq_socket{recv_q = Q1}.

dequeue(MqSState = #ezmq_socket{recv_q = Q}) ->
lager:debug("TRANS: ~p, PENDING: ~p", [MqSState#ezmq_socket.transports, Q]),
logger:debug("TRANS: ~p, PENDING: ~p", [MqSState#ezmq_socket.transports, Q]),
case transports_while(fun do_dequeue/2, Q, empty, MqSState) of
{{Transport, Value}, Q1} ->
MqSState0 = MqSState#ezmq_socket{recv_q = Q1},
Expand Down
64 changes: 32 additions & 32 deletions src/ezmq_link.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ init([]) ->
%% @end
%%--------------------------------------------------------------------
setup({accept, MqSocket, Version, Type, Identity, Socket}, State) ->
lager:debug("got setup"),
logger:debug("got setup"),
NewState = State#state{version = Version, role = server,
type = Type, mqsocket = MqSocket,
identity = Identity, socket = Socket},
lager:debug("NewState: ~p", [NewState]),
logger:debug("NewState: ~p", [NewState]),
send_greeting({next_state, open, NewState, ?CONNECT_TIMEOUT});

setup({connect, MqSocket, Version, Type, Identity, tcp, Address, Port, TcpOpts, Timeout}, State) ->
lager:debug("got connect: ~w, ~w", [Address, Port]),
logger:debug("got connect: ~w, ~w", [Address, Port]),
State1 = State#state{version = Version, role = client,
type = Type, mqsocket = MqSocket,
identity = Identity},
Expand All @@ -147,67 +147,67 @@ setup({connect, MqSocket, Version, Type, Identity, tcp, Address, Port, TcpOpts,
end.

connecting(timeout, State) ->
lager:debug("timeout in connecting"),
logger:debug("timeout in connecting"),
deliver_connect(State, {error, timeout}),
{stop, normal, State};

%% if we get a ZMTP 1.0 greeting or we are talking ZMTP 1.0 only, select ZMTP 1.0
connecting({_FrameType, IdLength}, State = #state{version = {Major, _}})
when Major == 1 ->
lager:debug("in connecting v1, got greeting: ~p", [IdLength]),
logger:debug("in connecting v1, got greeting: ~p", [IdLength]),
handle_zmtp13_greeting(IdLength, State);

connecting({short, IdLength}, State = #state{version = {_Major, _}}) ->
lager:debug("in connecting v~w, got greeting: ~p", [_Major, IdLength]),
logger:debug("in connecting v~w, got greeting: ~p", [_Major, IdLength]),
NextStateInfo = handle_zmtp13_greeting(IdLength, State),
finish_zmtp13_handshake(NextStateInfo);

connecting({long, IdLength}, State = #state{version = {Major, _}}) ->
lager:debug("in connecting v~w, got signature: ~p", [Major, IdLength]),
logger:debug("in connecting v~w, got signature: ~p", [Major, IdLength]),
send_major(next_handshake_state('$major', State#state{remote_id_len = IdLength}));

connecting(_Msg, State) ->
lager:debug("Invalid message in connecting: ~p", [_Msg]),
logger:debug("Invalid message in connecting: ~p", [_Msg]),
deliver_connect(State, {error, data}),
{stop, normal, State}.

open(timeout, State) ->
lager:debug("timeout in open"),
logger:debug("timeout in open"),
{stop, normal, State};

%% if we get a ZMTP 1.0 greeting or we are talking ZMTP 1.0 only, select ZMTP 1.0
open({_FrameType, IdLength}, State = #state{version = {Major, _}})
when Major == 1 ->
lager:debug("in open v1, got greeting: ~p", [IdLength]),
logger:debug("in open v1, got greeting: ~p", [IdLength]),
handle_zmtp13_greeting(IdLength, State);

open({short, IdLength}, State = #state{version = {_Major, _}}) ->
lager:debug("in open v~w, got greeting: ~p", [_Major, IdLength]),
logger:debug("in open v~w, got greeting: ~p", [_Major, IdLength]),
NextStateInfo = handle_zmtp13_greeting(IdLength, State),
finish_zmtp13_handshake(NextStateInfo);

open({long, IdLength}, State = #state{version = {Major, _}}) ->
lager:debug("in open v~w, got signature: ~p", [Major, IdLength]),
logger:debug("in open v~w, got signature: ~p", [Major, IdLength]),
send_major(next_handshake_state('$major', State#state{remote_id_len = IdLength}));

open(_Msg, State) ->
lager:debug("Invalid message in open: ~p", [_Msg]),
logger:debug("Invalid message in open: ~p", [_Msg]),
{stop, normal, State}.

handshake(timeout, State) ->
lager:debug("timeout in open"),
logger:debug("timeout in open"),
{stop, normal, State};

handshake(_Msg, State = #state{hs_state = HsState}) ->
lager:debug("Invalid handshake message in ~w: ~p", [HsState, _Msg]),
logger:debug("Invalid handshake message in ~w: ~p", [HsState, _Msg]),
{stop, normal, State}.

handshake({1, _}, '$identity', Data, #state{remote_id_len = IdLength})
when byte_size(Data) < IdLength ->
more;
handshake({1, _}, '$identity', Data, #state{remote_id_len = IdLength} = State) ->
<<RemoteId:IdLength/bytes, Rest/binary>> = Data,
lager:debug("in '$identity' v1, got remoteId: ~p", [RemoteId]),
logger:debug("in '$identity' v1, got remoteId: ~p", [RemoteId]),
State1 = State#state{pending = Rest},
State2 = remote_id_assign(RemoteId, State1),
deliver_connected(State2),
Expand All @@ -219,7 +219,7 @@ handshake({2, _}, '$identity', Data, #state{remote_id_len = IdLength})

handshake({2, _}, '$identity', <<0:8, IdLength:8/integer, RemoteId:IdLength/bytes, Rest/binary>>,
#state{remote_id_len = IdLength} = State) ->
lager:debug("in '$identity' v2, remoteId: ~p", [RemoteId]),
logger:debug("in '$identity' v2, remoteId: ~p", [RemoteId]),
State1 = State#state{pending = Rest},
State2 = remote_id_assign(RemoteId, State1),
deliver_connected(State2),
Expand All @@ -237,7 +237,7 @@ handshake(_, '$major', <<Major:8, Rest/binary>>, State)
State1 = State#state{pending = Rest},
negotiate_major(Major, State1);
handshake(_, '$major', <<Major:8, _/binary>>, State) ->
lager:error("peer tried invalid protocol version ~w", [Major]),
logger:error("peer tried invalid protocol version ~w", [Major]),
{stop, normal, State};

%% TODO: handle ZMTP 3.0+
Expand All @@ -249,15 +249,15 @@ handshake(_, '$socketType', <<SocketType:8, Rest/binary>>, State) ->
handle_socket_type(SocketType, State1);

handshake(Version, HsState, Data, State) ->
lager:debug("in ~p:~p, invalid data ~p", [Version, HsState, Data]),
logger:debug("in ~p:~p, invalid data ~p", [Version, HsState, Data]),
{stop, normal, State}.

connected(timeout, State) ->
lager:debug("timeout in connected"),
logger:debug("timeout in connected"),
{stop, normal, State};

connected({in, Frames}, #state{mqsocket = MqSocket, remote_id = RemoteId} = State) ->
lager:debug("in connected Frames: ~p", [Frames]),
logger:debug("in connected Frames: ~p", [Frames]),
ezmq:deliver_recv(MqSocket, {RemoteId, Frames}),
{next_state, connected, State};

Expand Down Expand Up @@ -356,12 +356,12 @@ handle_info({'EXIT', MqSocket, _Reason}, _StateName, #state{mqsocket = MqSocket}
{stop, normal, State#state{mqsocket = undefined}};

handle_info({tcp, Socket, Data}, StateName, #state{socket = Socket} = State) ->
lager:debug("handle_info: ~p", [Data]),
logger:debug("handle_info: ~p", [Data]),
State1 = State#state{pending = <<(State#state.pending)/binary, Data/binary>>},
handle_data(StateName, State1, {next_state, StateName, State1});

handle_info({tcp_closed, Socket}, _StateName, #state{socket = Socket} = State) ->
lager:debug("client disconnected: ~w", [Socket]),
logger:debug("client disconnected: ~w", [Socket]),
{stop, normal, State}.

handle_data(_StateName, #state{socket = Socket, pending = <<>>}, ProcessStateNext) ->
Expand All @@ -372,7 +372,7 @@ handle_data(StateName, #state{socket = Socket, pending = Pending} = State, Proce
StateName =:= open ->
{Msg, DataRest} = ezmq_frame:decode_greeting(Pending),
State1 = State#state{pending = DataRest},
lager:debug("handle_data (greeting): decoded: ~p, rest: ~p", [Msg, DataRest]),
logger:debug("handle_data (greeting): decoded: ~p, rest: ~p", [Msg, DataRest]),

case Msg of
more ->
Expand All @@ -392,7 +392,7 @@ handle_data(StateName, #state{socket = Socket, pending = Pending} = State, Proce

handle_data(handshake, State = #state{version = Version, hs_state = HsState, socket = Socket, pending = Pending},
ProcessStateNext) ->
lager:debug("handshake ~w.~w ~w, got data: ~p", [element(1, Version), element(2, Version), HsState, Pending]),
logger:debug("handshake ~w.~w ~w, got data: ~p", [element(1, Version), element(2, Version), HsState, Pending]),
case handshake(Version, HsState, Pending, State) of
more ->
next_state_socket_active(Socket, setelement(3, ProcessStateNext, State));
Expand All @@ -405,7 +405,7 @@ handle_data(handshake, State = #state{version = Version, hs_state = HsState, soc
handle_data(StateName, #state{socket = Socket, version = Ver, pending = Pending} = State, ProcessStateNext) ->
{Msg, DataRest} = ezmq_frame:decode(Ver, Pending),
State1 = State#state{pending = DataRest},
lager:debug("handle_data: (~w, ~p) decoded: ~p, rest: ~p", [Ver, StateName, Msg, DataRest]),
logger:debug("handle_data: (~w, ~p) decoded: ~p, rest: ~p", [Ver, StateName, Msg, DataRest]),

case Msg of
more ->
Expand All @@ -421,9 +421,9 @@ handle_data(StateName, #state{socket = Socket, version = Ver, pending = Pending}
{false, Frame} ->
Frames = lists:reverse([Frame|State1#state.frames]),
State2 = State1#state{frames = []},
lager:debug("handle_data: finale decoded: ~p", [Frames]),
logger:debug("handle_data: finale decoded: ~p", [Frames]),
Reply = exec_sync(Frames, StateName, State2),
lager:debug("handle_data: reply: ~p", [lager:pr(?MODULE, Reply)]),
logger:debug("handle_data: reply: ~p", [logger:pr(?MODULE, Reply)]),
handle_data_reply(Reply)
end.

Expand All @@ -440,12 +440,12 @@ handle_data(StateName, #state{socket = Socket, version = Ver, pending = Pending}
%%--------------------------------------------------------------------
terminate(_Reason, _StateName, #state{mqsocket = MqSocket, socket = Socket})
when is_port(Socket) ->
lager:debug("terminate"),
logger:debug("terminate"),
catch ezmq:deliver_close(MqSocket),
gen_tcp:close(Socket),
ok;
terminate(_Reason, _StateName, #state{mqsocket = MqSocket}) ->
lager:debug("terminate"),
logger:debug("terminate"),
catch ezmq:deliver_close(MqSocket),
ok.

Expand Down Expand Up @@ -523,7 +523,7 @@ handle_socket_type(RemoteSocketType, State = #state{type = Type}) ->
ok ->
next_handshake_state('$identity', State);
Other ->
lager:warning("socket types ~w: ~w", [{Type, RemoteSocketTypeAtom}, Other]),
logger:warning("socket types ~w: ~w", [{Type, RemoteSocketTypeAtom}, Other]),
{stop, normal, State}
end.

Expand Down Expand Up @@ -574,7 +574,7 @@ send_packet(Packet, NextStateInfo) ->
ok ->
next_state_socket_active(Socket, NextStateInfo);
{error, Reason} ->
lager:debug("error - Reason: ~p", [Reason]),
logger:debug("error - Reason: ~p", [Reason]),
{stop, normal, State}
end.

Expand Down
10 changes: 5 additions & 5 deletions src/ezmq_socket_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ init(Module, Opts, MqSState) ->
check(Action, MqSState = #ezmq_socket{fsm = Fsm}) ->
#fsm_state{module = Module, state_name = StateName, state = State} = Fsm,
R = Module:StateName(check, Action, MqSState, State),
lager:debug("ezmq_socket_fsm state: ~w, check: ~w, Result: ~w", [StateName, Action, R]),
logger:debug("ezmq_socket_fsm state: ~w, check: ~w, Result: ~w", [StateName, Action, R]),
R.

do(Action, MqSState = #ezmq_socket{fsm = Fsm}) ->
#fsm_state{module = Module, state_name = StateName, state = State} = Fsm,
case Module:StateName(do, Action, MqSState, State) of
{error, Reason} ->
lager:error("socket fsm for ~w exited with ~p, (~p,~p)~n", [Action, Reason, MqSState, State]),
logger:error("socket fsm for ~w exited with ~p, (~p,~p)~n", [Action, Reason, MqSState, State]),
error(Reason);
{next_state, NextStateName, NextMqSState, NextState} ->
lager:debug("ezmq_socket_fsm: state: ~w, Action: ~w, next_state: ~w", [StateName, Action, NextStateName]),
logger:debug("ezmq_socket_fsm: state: ~w, Action: ~w, next_state: ~w", [StateName, Action, NextStateName]),
NewFsm = Fsm#fsm_state{state_name = NextStateName, state = NextState},
NextMqSState#ezmq_socket{fsm = NewFsm}
end.
Expand All @@ -100,10 +100,10 @@ close(Transport, MqSState = #ezmq_socket{fsm = Fsm}) ->
#fsm_state{module = Module, state_name = StateName, state = State} = Fsm,
case Module:close(StateName, Transport, MqSState, State) of
{error, Reason} ->
lager:error("socket fsm for ~w exited with ~p, (~p,~p)~n", [Transport, Reason, MqSState, State]),
logger:error("socket fsm for ~w exited with ~p, (~p,~p)~n", [Transport, Reason, MqSState, State]),
error(Reason);
{next_state, NextStateName, NextMqSState, NextState} ->
lager:debug("ezmq_socket_fsm: state: ~w, Transport: ~w, next_state: ~w", [StateName, Transport, NextStateName]),
logger:debug("ezmq_socket_fsm: state: ~w, Transport: ~w, next_state: ~w", [StateName, Transport, NextStateName]),
NewFsm = Fsm#fsm_state{state_name = NextStateName, state = NextState},
NextMqSState#ezmq_socket{fsm = NewFsm}
end.
Expand Down
Loading