Skip to content

Commit d0cf54e

Browse files
committed
Ensure rexi_buffer metric includes the internal bufferd messages
Previously, the rexi_buffer metric reported just the mailbox size of the buffers. However rexi buffers are special as they themselves act as explicit message queues, and current length of those explicit buffers wasn't reflected anywhere in metrics. So, improve the metric usefulness and reflect both the explicit and the implicit queue lengths. There was already an existent `gen_buffered_count` gen_server call, however since the buffers could potentially be in the hotpath, avoid using and instead use a persistent term + a counter scheme similar how we do for all couch_stats metrics. In addition, noticed that rexi had no tests at all. It is of course, battle tested in production, but since we made changes to it, added some tests to cover the changed bits.
1 parent 4214120 commit d0cf54e

File tree

4 files changed

+284
-12
lines changed

4 files changed

+284
-12
lines changed

src/rexi/src/rexi_buffer.erl

+40-12
Original file line numberDiff line numberDiff line change
@@ -18,55 +18,76 @@
1818
init/1,
1919
handle_call/3,
2020
handle_cast/2,
21-
handle_info/2
21+
handle_info/2,
22+
terminate/2
2223
]).
2324

2425
-export([
2526
send/2,
26-
start_link/1
27+
start_link/1,
28+
get_buffered_count/1,
29+
erase_buffer/1
2730
]).
2831

32+
-define(BUFFER_COUNT_DEFAULT, 2000).
33+
-define(COUNTER, counter).
34+
2935
-record(state, {
36+
server_id,
3037
buffer = queue:new(),
3138
sender = nil,
3239
count = 0,
40+
counter,
3341
max_count
3442
}).
3543

3644
start_link(ServerId) ->
37-
gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
45+
gen_server:start_link({local, ServerId}, ?MODULE, [ServerId], []).
3846

3947
send(Dest, Msg) ->
4048
Server = list_to_atom(lists:concat([rexi_buffer, "_", get_node(Dest)])),
4149
gen_server:cast(Server, {deliver, Dest, Msg}).
4250

43-
init(_) ->
51+
get_buffered_count(ServerId) when is_atom(ServerId) ->
52+
case persistent_term:get(counter_key(ServerId), undefined) of
53+
undefined -> 0;
54+
Ref -> counters:get(Ref, 1)
55+
end.
56+
57+
erase_buffer(ServerId) ->
58+
gen_server:call(ServerId, erase_buffer, infinity).
59+
60+
init([ServerId]) ->
4461
%% TODO Leverage os_mon to discover available memory in the system
45-
Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
46-
{ok, #state{max_count = Max}}.
62+
Counter = counters:new(1, []),
63+
persistent_term:put(counter_key(ServerId), Counter),
64+
Max = config:get_integer("rexi", "buffer_count", ?BUFFER_COUNT_DEFAULT),
65+
{ok, #state{server_id = ServerId, max_count = Max, counter = Counter}}.
4766

48-
handle_call(erase_buffer, _From, State) ->
49-
{reply, ok, State#state{buffer = queue:new(), count = 0}, 0};
50-
handle_call(get_buffered_count, _From, State) ->
51-
{reply, State#state.count, State, 0}.
67+
handle_call(erase_buffer, _From, #state{counter = Counter} = State) ->
68+
counters:put(Counter, 1, 0),
69+
{reply, ok, State#state{buffer = queue:new(), count = 0}, 0}.
5270

53-
handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
71+
handle_cast({deliver, Dest, Msg}, #state{} = State) ->
72+
#state{counter = Counter, buffer = Q, count = C} = State,
5473
couch_stats:increment_counter([rexi, buffered]),
5574
Q2 = queue:in({Dest, Msg}, Q),
5675
case should_drop(State) of
5776
true ->
5877
couch_stats:increment_counter([rexi, dropped]),
5978
{noreply, State#state{buffer = queue:drop(Q2)}, 0};
6079
false ->
80+
counters:add(Counter, 1, 1),
6181
{noreply, State#state{buffer = Q2, count = C + 1}, 0}
6282
end.
6383

6484
handle_info(timeout, #state{sender = nil, buffer = {[], []}, count = 0} = State) ->
6585
{noreply, State};
6686
handle_info(timeout, #state{sender = nil, count = C} = State) when C > 0 ->
67-
#state{buffer = Q, count = C} = State,
87+
#state{counter = Counter, buffer = Q} = State,
6888
{{value, {Dest, Msg}}, Q2} = queue:out_r(Q),
6989
NewState = State#state{buffer = Q2, count = C - 1},
90+
counters:add(Counter, 1, -1),
7091
case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
7192
ok when C =:= 1 ->
7293
% We just sent the last queued messsage, we'll use this opportunity
@@ -86,10 +107,17 @@ handle_info(timeout, State) ->
86107
handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) ->
87108
{noreply, State#state{sender = nil}, 0}.
88109

110+
terminate(_Reason, #state{server_id = ServerId}) ->
111+
persistent_term:erase(counter_key(ServerId)),
112+
ok.
113+
89114
should_drop(#state{count = Count, max_count = Max}) ->
90115
Count >= Max.
91116

92117
get_node({_, Node}) when is_atom(Node) ->
93118
Node;
94119
get_node(Pid) when is_pid(Pid) ->
95120
node(Pid).
121+
122+
counter_key(ServerId) when is_atom(ServerId) ->
123+
{?MODULE, ?COUNTER, ServerId}.

src/rexi/src/rexi_server_mon.erl

+8
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ start_link(ChildMod) ->
4343
status() ->
4444
gen_server:call(?MODULE, status).
4545

46+
aggregate_queue_len(rexi_buffer) ->
47+
% rexi_buffer acts as an explicit message queue. In order to get useful
48+
% metrics from it we really need to add both its process' message queue and
49+
% already buffered messages.
50+
ServerIds = server_ids(rexi_buffer),
51+
MQLengths = [message_queue_len(ServerId) || ServerId <- ServerIds],
52+
BufLengths = [rexi_buffer:get_buffered_count(ServerId) || ServerId <- ServerIds],
53+
lists:sum(MQLengths) + lists:sum(BufLengths);
4654
aggregate_queue_len(ChildMod) ->
4755
lists:sum([message_queue_len(ServerId) || ServerId <- server_ids(ChildMod)]).
4856

src/rexi/test/rexi_buffer_tests.erl

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2+
% use this file except in compliance with the License. You may obtain a copy of
3+
% the License at
4+
%
5+
% http://www.apache.org/licenses/LICENSE-2.0
6+
%
7+
% Unless required by applicable law or agreed to in writing, software
8+
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
% License for the specific language governing permissions and limitations under
11+
% the License.
12+
13+
-module(rexi_buffer_tests).
14+
15+
-include_lib("couch/include/couch_eunit.hrl").
16+
17+
rexi_buffer_test_() ->
18+
{
19+
foreach,
20+
fun setup/0,
21+
fun teardown/1,
22+
[
23+
?TDEF_FE(t_send),
24+
?TDEF_FE(t_get_buffered_count),
25+
?TDEF_FE(t_buffer_erase),
26+
?TDEF_FE(t_terminate_clears_persistent_term)
27+
]
28+
}.
29+
30+
setup() ->
31+
Module = atom_to_binary(?MODULE),
32+
RandSuffix = binary:encode_hex(rand:bytes(4)),
33+
ServerId = binary_to_atom(<<Module/binary, "_", RandSuffix/binary>>),
34+
{ok, Pid} = rexi_buffer:start_link(ServerId),
35+
unlink(Pid),
36+
{ServerId, Pid}.
37+
38+
teardown({_ServerId, Pid}) ->
39+
case is_process_alive(Pid) of
40+
true -> test_util:stop_sync(Pid);
41+
false -> ok
42+
end.
43+
44+
t_send({ServerId, Pid}) ->
45+
?assert(is_process_alive(Pid)),
46+
?assertEqual(Pid, whereis(ServerId)),
47+
{DestPid, DestRef} = spawn_monitor(fun() ->
48+
receive
49+
Msg -> exit({got, Msg})
50+
end
51+
end),
52+
gen_server:cast(ServerId, {deliver, DestPid, potato}),
53+
ReceivedVal =
54+
receive
55+
{'DOWN', DestRef, process, DestPid, Res} -> Res
56+
end,
57+
?assertEqual({got, potato}, ReceivedVal).
58+
59+
t_get_buffered_count({ServerId, _}) ->
60+
NonExistentDest = {foo, 'nonexistent@127.0.0.1'},
61+
?assertEqual(0, rexi_buffer:get_buffered_count('nonexistent_server_id')),
62+
?assertEqual(0, rexi_buffer:get_buffered_count(ServerId)),
63+
% Set a fake sender to make the buffer block
64+
sys:replace_state(ServerId, fun(OldSt) -> setelement(4, OldSt, {foo, bar}) end),
65+
gen_server:cast(ServerId, {deliver, NonExistentDest, potato}),
66+
test_util:wait(fun() ->
67+
case rexi_buffer:get_buffered_count(ServerId) of
68+
0 -> wait;
69+
N when is_integer(N), N > 0 -> ok
70+
end
71+
end),
72+
?assertEqual(1, rexi_buffer:get_buffered_count(ServerId)),
73+
gen_server:cast(ServerId, {deliver, NonExistentDest, tomato}),
74+
gen_server:cast(ServerId, {deliver, NonExistentDest, cabbage}),
75+
test_util:wait(fun() ->
76+
case rexi_buffer:get_buffered_count(ServerId) of
77+
N when is_integer(N), N =< 2 -> wait;
78+
N when is_integer(N), N > 2 -> ok
79+
end
80+
end),
81+
?assertEqual(3, rexi_buffer:get_buffered_count(ServerId)),
82+
% Unblock sender
83+
sys:replace_state(ServerId, fun(OldSt) -> setelement(4, OldSt, nil) end),
84+
gen_server:cast(ServerId, {deliver, NonExistentDest, cucumber}),
85+
test_util:wait(fun() ->
86+
case rexi_buffer:get_buffered_count(ServerId) of
87+
N when is_integer(N), N > 0 -> wait;
88+
0 -> ok
89+
end
90+
end),
91+
?assertEqual(ok, rexi_buffer:erase_buffer(ServerId)),
92+
?assertEqual(0, rexi_buffer:get_buffered_count(ServerId)).
93+
94+
t_buffer_erase({ServerId, _}) ->
95+
NonExistentDest = {foo, 'nonexistent@127.0.0.1'},
96+
?assertEqual(0, rexi_buffer:get_buffered_count('nonexistent_server_id')),
97+
?assertEqual(0, rexi_buffer:get_buffered_count(ServerId)),
98+
% Set a fake sender to make the buffer block
99+
sys:replace_state(ServerId, fun(OldSt) -> setelement(4, OldSt, {foo, bar}) end),
100+
gen_server:cast(ServerId, {deliver, NonExistentDest, potato}),
101+
test_util:wait(fun() ->
102+
case rexi_buffer:get_buffered_count(ServerId) of
103+
0 -> wait;
104+
N when is_integer(N), N > 0 -> ok
105+
end
106+
end),
107+
?assertEqual(1, rexi_buffer:get_buffered_count(ServerId)),
108+
?assertEqual(ok, rexi_buffer:erase_buffer(ServerId)),
109+
?assertEqual(0, rexi_buffer:get_buffered_count(ServerId)).
110+
111+
t_terminate_clears_persistent_term({ServerId, Pid}) ->
112+
?assertNotEqual(undefined, persistent_term:get({rexi_buffer, counter, ServerId}, undefined)),
113+
?assertEqual(ok, gen_server:stop(Pid, shutdown, infinity)),
114+
?assertEqual(undefined, persistent_term:get({rexi_buffer, counter, ServerId}, undefined)).

src/rexi/test/rexi_tests.erl

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2+
% use this file except in compliance with the License. You may obtain a copy of
3+
% the License at
4+
%
5+
% http://www.apache.org/licenses/LICENSE-2.0
6+
%
7+
% Unless required by applicable law or agreed to in writing, software
8+
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
% License for the specific language governing permissions and limitations under
11+
% the License.
12+
13+
-module(rexi_tests).
14+
15+
-export([
16+
rpc_test_fun/1
17+
]).
18+
19+
-include_lib("couch/include/couch_eunit.hrl").
20+
21+
rexi_buffer_test_() ->
22+
{
23+
foreach,
24+
fun setup/0,
25+
fun teardown/1,
26+
[
27+
?TDEF_FE(t_cast),
28+
?TDEF_FE(t_sync_cast),
29+
?TDEF_FE(t_kill),
30+
?TDEF_FE(t_cast_error),
31+
?TDEF_FE(t_metrics),
32+
?TDEF_FE(t_ping)
33+
]
34+
}.
35+
36+
setup() ->
37+
test_util:start_couch([rexi]).
38+
39+
teardown(Ctx) ->
40+
test_util:stop_couch(Ctx).
41+
42+
rpc_test_fun({sleep, MSec}) ->
43+
rexi:reply({sleeping, self()}),
44+
timer:sleep(MSec);
45+
rpc_test_fun({error, Error}) ->
46+
error(Error);
47+
rpc_test_fun(ping) ->
48+
rexi:ping();
49+
rpc_test_fun(Arg) ->
50+
rexi:reply({Arg, get()}).
51+
52+
t_cast(_) ->
53+
?assertMatch({RexiServer, node42} when is_atom(RexiServer), rexi_utils:server_pid(node42)),
54+
put(nonce, yup),
55+
Ref = rexi:cast(node(), {?MODULE, rpc_test_fun, [potato]}),
56+
{Res, Dict} =
57+
receive
58+
{Ref, {R, D}} -> {R, maps:from_list(D)}
59+
end,
60+
?assertEqual(potato, Res),
61+
?assertMatch(
62+
#{
63+
nonce := yup,
64+
'$initial_call' := {?MODULE, rpc_test_fun, 1},
65+
rexi_from := {_Pid, _Ref}
66+
},
67+
Dict
68+
).
69+
70+
t_sync_cast(_) ->
71+
?assertMatch({RexiServer, node42} when is_atom(RexiServer), rexi_utils:server_pid(node42)),
72+
put(nonce, yup),
73+
Ref = rexi:cast(node(), self(), {?MODULE, rpc_test_fun, [potato]}, [sync]),
74+
{Res, Dict} =
75+
receive
76+
{Ref, {R, D}} -> {R, maps:from_list(D)}
77+
end,
78+
?assertEqual(potato, Res),
79+
?assertMatch(
80+
#{
81+
nonce := yup,
82+
'$initial_call' := {?MODULE, rpc_test_fun, 1},
83+
rexi_from := {_Pid, _Ref}
84+
},
85+
Dict
86+
).
87+
88+
t_cast_error(_) ->
89+
?assertMatch({RexiServer, node42} when is_atom(RexiServer), rexi_utils:server_pid(node42)),
90+
Ref = rexi:cast(node(), self(), {?MODULE, rpc_test_fun, [{error, tomato}]}, []),
91+
Res =
92+
receive
93+
{Ref, RexiExit} -> RexiExit
94+
end,
95+
?assertMatch({rexi_EXIT, {tomato, [{?MODULE, rpc_test_fun, 1, _} | _]}}, Res).
96+
97+
t_kill(_) ->
98+
Ref = rexi:cast(node(), {?MODULE, rpc_test_fun, [{sleep, 10000}]}),
99+
WorkerPid =
100+
receive
101+
{Ref, {sleeping, Pid}} -> Pid
102+
end,
103+
?assert(is_process_alive(WorkerPid)),
104+
Mon = monitor(process, WorkerPid),
105+
rexi:kill_all([{node(), Ref}]),
106+
KillReason =
107+
receive
108+
{'DOWN', Mon, _, _, Res} -> Res
109+
end,
110+
?assertEqual(killed, KillReason).
111+
112+
t_metrics(_) ->
113+
?assertEqual(0, rexi:aggregate_buffer_queue_len()),
114+
?assertEqual(0, rexi:aggregate_server_queue_len()).
115+
116+
t_ping(_) ->
117+
rexi:cast(node(), {?MODULE, rpc_test_fun, [ping]}),
118+
Res =
119+
receive
120+
{rexi, Ping} -> Ping
121+
end,
122+
?assertEqual('$rexi_ping', Res).

0 commit comments

Comments
 (0)