Skip to content

Commit a11236f

Browse files
committed
Improve worker cleanup on early coordinator exit
Previously, if the coordinator process is killed too quickly, before the stream worker cleanup process is spawned, remote workers may be left around waiting until the default 5 minute timeout expires. In order to reliably clean up processes in that state, need to start the cleaner process, with all the job references, before we start submitting them for execution. At first, it may seem impossible to monitor a process until after it's already spawned. That's true for regular processes, however rexi operates on plain references. For each process we spawn remotely we create a reference on the coordinator side, which we can then use to track that job. Those are just plain manually created references. Nothing stops us from creating them first, adding them to a cleaner process, and only then submitting them. That's exactly what this commit accomplishes: * Create a streams specific `fabric_streams:submit_jobs/4` function, which spawns the cleanup process early, generates worker references, and then submits the jobs. This way, all the existing streaming submit_jobs can be replaced easily in one line: fabric_util -> fabric_streams. * The cleanup process operates as previously: monitors the coordinator for exits, and fires off `kill_all` message to each node. * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the caller specifies the references a new argument. This is what allows us to start the cleanup process before the even get submitted. Older calls can just be easily call into the `cast_ref` versions with their own created references. Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test coverage, including the streaming logic as well. It's not 100% yet, but getting there. Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't actually build erldocs anywhere, so replace them with something more helpful. The streaming protocol itself was never quite described anywhere, and it can take sometime to figure it out (at least it took me), so took the chance to also add a very basic, high level description of the message flow. Related: #5127 (comment)
1 parent d0cf54e commit a11236f

File tree

7 files changed

+326
-68
lines changed

7 files changed

+326
-68
lines changed

src/couch_replicator/src/couch_replicator_fabric.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
docs(DbName, Options, QueryArgs, Callback, Acc) ->
2424
Shards = mem3:shards(DbName),
25-
Workers0 = fabric_util:submit_jobs(
25+
Workers0 = fabric_streams:submit_jobs(
2626
Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]
2727
),
2828
RexiMon = fabric_util:create_monitors(Workers0),

src/fabric/src/fabric_streams.erl

+102-11
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-module(fabric_streams).
1414

1515
-export([
16+
submit_jobs/4,
1617
start/2,
1718
start/3,
1819
start/4,
@@ -27,6 +28,23 @@
2728

2829
-define(WORKER_CLEANER, fabric_worker_cleaner).
2930

31+
% This is the streams equivalent of fabric_util:submit_jobs/4. Besides
32+
% submitting the jobs it also starts the worker cleaner and adds each started
33+
% job to the cleaner first before the job is submitted.
34+
%
35+
submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
36+
% Create refs first and add them to the cleaner to ensure if our process
37+
% gets killed, the remote workers will be cleaned up as well.
38+
RefFun = fun(#shard{} = Shard) -> Shard#shard{ref = make_ref()} end,
39+
Workers = lists:map(RefFun, Shards),
40+
ClientReq = chttpd_util:mochiweb_client_req_get(),
41+
spawn_worker_cleaner(self(), Workers, ClientReq),
42+
SubmitFun = fun(#shard{node = Node, name = ShardName, ref = Ref}) ->
43+
rexi:cast_ref(Ref, Node, {Module, EndPoint, [ShardName | ExtraArgs]})
44+
end,
45+
ok = lists:foreach(SubmitFun, Workers),
46+
Workers.
47+
3048
start(Workers, Keypos) ->
3149
start(Workers, Keypos, undefined, undefined).
3250

@@ -158,39 +176,49 @@ handle_stream_start(Else, _, _) ->
158176
% Spawn an auxiliary rexi worker cleaner. This will be used in cases
159177
% when the coordinator (request) process is forceably killed and doesn't
160178
% get a chance to process its `after` fabric:clean/1 clause.
161-
spawn_worker_cleaner(Coordinator, Workers, ClientReq) ->
179+
spawn_worker_cleaner(Coordinator, Workers, ClientReq) when
180+
is_pid(Coordinator), is_list(Workers)
181+
->
162182
case get(?WORKER_CLEANER) of
163183
undefined ->
164184
Pid = spawn(fun() ->
165185
erlang:monitor(process, Coordinator),
166-
cleaner_loop(Coordinator, Workers, ClientReq)
186+
NodeRefSet = set_from_list(shards_to_node_refs(Workers)),
187+
cleaner_loop(Coordinator, NodeRefSet, ClientReq)
167188
end),
168189
put(?WORKER_CLEANER, Pid),
169190
Pid;
170-
ExistingCleaner ->
191+
ExistingCleaner when is_pid(ExistingCleaner) ->
171192
ExistingCleaner
172193
end.
173194

174-
cleaner_loop(Pid, Workers, ClientReq) ->
195+
cleaner_loop(Pid, NodeRefSet, ClientReq) ->
175196
CheckMSec = chttpd_util:mochiweb_client_req_check_msec(),
176197
receive
177-
{add_worker, Pid, Worker} ->
178-
cleaner_loop(Pid, [Worker | Workers], ClientReq);
198+
{add_node_ref, Pid, {_, _} = NodeRef} ->
199+
cleaner_loop(Pid, sets:add_element(NodeRef, NodeRefSet), ClientReq);
179200
{'DOWN', _, _, Pid, _} ->
180-
fabric_util:cleanup(Workers)
201+
rexi:kill_all(sets:to_list(NodeRefSet))
181202
after CheckMSec ->
182203
chttpd_util:stop_client_process_if_disconnected(Pid, ClientReq),
183-
cleaner_loop(Pid, Workers, ClientReq)
204+
cleaner_loop(Pid, NodeRefSet, ClientReq)
184205
end.
185206

186-
add_worker_to_cleaner(CoordinatorPid, Worker) ->
207+
add_worker_to_cleaner(CoordinatorPid, #shard{node = Node, ref = Ref}) ->
187208
case get(?WORKER_CLEANER) of
188209
CleanerPid when is_pid(CleanerPid) ->
189-
CleanerPid ! {add_worker, CoordinatorPid, Worker};
210+
CleanerPid ! {add_node_ref, CoordinatorPid, {Node, Ref}};
190211
_ ->
191212
ok
192213
end.
193214

215+
set_from_list(List) when is_list(List) ->
216+
sets:from_list(List, [{version, 2}]).
217+
218+
shards_to_node_refs(Workers) when is_list(Workers) ->
219+
Fun = fun(#shard{node = Node, ref = Ref}) -> {Node, Ref} end,
220+
lists:map(Fun, Workers).
221+
194222
-ifdef(TEST).
195223

196224
-include_lib("couch/include/couch_eunit.hrl").
@@ -207,7 +235,8 @@ worker_cleaner_test_() ->
207235
?TDEF_FE(does_not_fire_if_cleanup_called),
208236
?TDEF_FE(should_clean_additional_worker_too),
209237
?TDEF_FE(coordinator_is_killed_if_client_disconnects),
210-
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected)
238+
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected),
239+
?TDEF_FE(submit_jobs_sets_up_cleaner)
211240
]
212241
}
213242
}.
@@ -351,6 +380,68 @@ coordinator_is_not_killed_if_client_is_connected(_) ->
351380
{'DOWN', CleanerRef, _, _, _} -> ok
352381
end.
353382

383+
submit_jobs_sets_up_cleaner(_) ->
384+
meck:reset(rexi),
385+
erase(?WORKER_CLEANER),
386+
Shards = [
387+
#shard{node = 'n1'},
388+
#shard{node = 'n2'}
389+
],
390+
meck:expect(rexi, cast_ref, fun(Ref, _, _) -> Ref end),
391+
{Coord, CoordRef} = spawn_monitor(fun() ->
392+
Workers = submit_jobs(Shards, fabric_rpc, potatoes, []),
393+
receive
394+
{get_workers_and_cleaner, From} ->
395+
From ! {Workers, get(?WORKER_CLEANER)},
396+
timer:sleep(999999)
397+
end
398+
end),
399+
Coord ! {get_workers_and_cleaner, self()},
400+
{Workers, Cleaner} =
401+
receive
402+
Msg -> Msg
403+
end,
404+
?assert(is_pid(Cleaner)),
405+
?assert(is_process_alive(Cleaner)),
406+
?assert(is_process_alive(Coord)),
407+
CheckWorkerFun = fun(#shard{node = Node, ref = Ref}) ->
408+
?assert(is_reference(Ref)),
409+
{Node, Ref}
410+
end,
411+
NodeRefs = lists:map(CheckWorkerFun, Workers),
412+
?assertEqual(length(Shards), length(Workers)),
413+
?assertEqual(length(lists:usort(NodeRefs)), length(NodeRefs)),
414+
% Were the jobs actually submitted?
415+
meck:wait(2, rexi, cast_ref, '_', 1000),
416+
% If we kill the coordinator, the cleaner should kill the workers
417+
meck:reset(rexi),
418+
CleanupMon = erlang:monitor(process, Cleaner),
419+
exit(Coord, kill),
420+
receive
421+
{'DOWN', CoordRef, _, _, WorkerReason} ->
422+
?assertEqual(killed, WorkerReason)
423+
after 1000 ->
424+
?assert(is_process_alive(Coord))
425+
end,
426+
% Cleaner should do the cleanup
427+
meck:wait(1, rexi, kill_all, '_', 1000),
428+
History = meck:history(rexi),
429+
?assertMatch([{_, {rexi, kill_all, _}, ok}], History),
430+
[{Pid, {rexi, kill_all, Args}, ok}] = History,
431+
% It was the cleaner who called it
432+
?assertEqual(Cleaner, Pid),
433+
?assertMatch([[{_, _}, {_, _}]], Args),
434+
[NodeRefsKilled] = Args,
435+
% The node refs killed are the ones we expect
436+
?assertEqual(lists:sort(NodeRefs), lists:sort(NodeRefsKilled)),
437+
% Cleanup process should exit when done
438+
receive
439+
{'DOWN', CleanupMon, _, _, CleanerReason} ->
440+
?assertEqual(normal, CleanerReason)
441+
after 1000 ->
442+
?assert(is_process_alive(Cleaner))
443+
end.
444+
354445
setup() ->
355446
ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
356447
% Speed up disconnect socket timeout for the test to 200 msec

src/fabric/src/fabric_view_all_docs.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) ->
2525
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
2626
DbName = fabric:dbname(Db),
2727
{Shards, RingOpts} = shards(Db, QueryArgs),
28-
Workers0 = fabric_util:submit_jobs(
28+
Workers0 = fabric_streams:submit_jobs(
2929
Shards, fabric_rpc, all_docs, [Options, WorkerArgs]
3030
),
3131
RexiMon = fabric_util:create_monitors(Workers0),

src/fabric/src/fabric_view_map.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) ->
4040
Repls = fabric_ring:get_shard_replacements(DbName, Shards),
4141
RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
4242
StartFun = fun(Shard) ->
43-
hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
43+
hd(fabric_streams:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
4444
end,
45-
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
45+
Workers0 = fabric_streams:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
4646
RexiMon = fabric_util:create_monitors(Workers0),
4747
try
4848
case

src/fabric/src/fabric_view_reduce.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
3131
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
3232
Repls = fabric_ring:get_shard_replacements(DbName, Shards),
3333
StartFun = fun(Shard) ->
34-
hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
34+
hd(fabric_streams:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
3535
end,
36-
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, reduce_view, RPCArgs),
36+
Workers0 = fabric_streams:submit_jobs(Shards, fabric_rpc, reduce_view, RPCArgs),
3737
RexiMon = fabric_util:create_monitors(Workers0),
3838
try
3939
case

0 commit comments

Comments
 (0)