xref: /6.0.3/ns_server/src/async.erl (revision 10d30afe)
1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2017-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15
16-module(async).
17
18-include("cut.hrl").
19-include("ns_common.hrl").
20-include_lib("eunit/include/eunit.hrl").
21
22-export([start/1, start/2,
23         start_many/2, start_many/3,
24         perform/1, perform/2,
25         abort/1, abort/2,
26         abort_many/1, abort_many/2,
27         send/2,
28         with/2, with/3,
29         with_many/3, with_many/4,
30         wait/1, wait/2,
31         wait_many/1, wait_many/2,
32         wait_any/1, wait_any/2,
33         race/2, map/2, foreach/2,
34         run_with_timeout/2,
35         get_identity/0]).
36
37start(Fun) ->
38    start(Fun, []).
39
40start(Fun, Opts) ->
41    SpawnFun =
42        case proplists:get_value(monitor, Opts, false) of
43            true ->
44                fun misc:spawn_monitor/1;
45            false ->
46                fun proc_lib:spawn/1
47        end,
48
49    Parent           = self(),
50    ParentController = get_controller(),
51
52    SpawnFun(
53      fun () ->
54              async_init(Parent, ParentController, Opts, Fun)
55      end).
56
57perform(Fun) ->
58    perform(Fun, [monitor]).
59
60perform(Fun, Opts) ->
61    start(Fun, Opts ++ [{type, perform}]).
62
63start_many(Fun, Args) ->
64    start_many(Fun, Args, []).
65
66start_many(Fun, Args, Opts) ->
67    [start(fun () ->
68                   Fun(A)
69           end, Opts) || A <- Args].
70
71abort(Pid) ->
72    abort_many([Pid]).
73
74abort(Pid, Reason) ->
75    abort_many([Pid], Reason).
76
77abort_many(Pids) ->
78    abort_many(Pids, shutdown).
79
80abort_many(Pids, Reason) ->
81    misc:terminate_and_wait(Pids, Reason).
82
83send(Async, Msg) ->
84    Async ! {'$async_msg', Msg},
85    Msg.
86
87with(AsyncBody, Fun) ->
88    with(AsyncBody, [], Fun).
89
90with(AsyncBody, Opts, Fun) ->
91    Async = start(AsyncBody, Opts),
92    try
93        Fun(Async)
94    after
95        abort(Async)
96    end.
97
98with_many(AsyncBody, Args, Fun) ->
99    with_many(AsyncBody, Args, [], Fun).
100
101with_many(AsyncBody, Args, Opts, Fun) ->
102    Asyncs = start_many(AsyncBody, Args, Opts),
103    try
104        Fun(Asyncs)
105    after
106        abort_many(Asyncs)
107    end.
108
109wait(Pid) ->
110    wait(Pid, []).
111
112wait(Pid, Flags) ->
113    call(Pid, get_result, Flags).
114
115wait_many(Pids) ->
116    wait_many(Pids, []).
117
118wait_many(Pids, Flags) ->
119    call_many(Pids, get_result, Flags).
120
121wait_any(Pids) ->
122    wait_any(Pids, []).
123
124wait_any(Pids, Flags) ->
125    call_any(Pids, get_result, Flags).
126
127race(Fun1, Fun2) ->
128    with(
129      Fun1,
130      fun (Async1) ->
131              with(
132                Fun2,
133                fun (Async2) ->
134                        case wait_any([Async1, Async2]) of
135                            {Async1, R} ->
136                                {left, R};
137                            {Async2, R} ->
138                                {right, R}
139                        end
140                end)
141      end).
142
143map(Fun, List) ->
144    with_many(
145      Fun, List,
146      fun (Asyncs) ->
147              Results = wait_many(Asyncs),
148              [R || {_, R} <- Results]
149      end).
150
151foreach(Fun, List) ->
152    with_many(
153      Fun, List,
154      fun (Asyncs) ->
155              _ = wait_many(Asyncs),
156              ok
157      end).
158
159run_with_timeout(Fun, Timeout) ->
160    case race(Fun, fun () -> receive after Timeout -> timeout end end) of
161        {left, R} ->
162            {ok, R};
163        {right, timeout} ->
164            {error, timeout}
165    end.
166
167get_identity() ->
168    case get_role() of
169        executor ->
170            Controller = get_controller(),
171            true = is_pid(Controller),
172
173            {ok, Controller};
174        _ ->
175            not_async
176    end.
177
178%% internal
179async_init(Parent, ParentController, Opts, Fun) ->
180    erlang:monitor(process, Parent),
181
182    set_role(controller),
183    maybe_register_with_parent_async(ParentController),
184
185    Adopters = proplists:get_value(adopters, Opts, []),
186    lists:foreach(register_for_adoption(_), Adopters),
187
188    process_flag(trap_exit, true),
189
190    Reply      = make_ref(),
191    Controller = self(),
192
193    Child =
194        spawn_link(
195          fun () ->
196                  set_role(executor),
197                  set_controller(Controller),
198
199                  To = {Controller, Reply},
200
201                  try Fun() of
202                      R ->
203                          reply(To, {ok, R})
204                  catch
205                      T:E ->
206                          Stack = erlang:get_stacktrace(),
207                          reply(To, {raised, {T, E, Stack}}),
208                          erlang:raise(T, E, Stack)
209                  end
210          end),
211
212    case proplists:get_value(abort_after, Opts) of
213        undefined ->
214            ok;
215        AbortAfter when is_integer(AbortAfter) ->
216            erlang:send_after(AbortAfter, self(), abort_after_expired)
217    end,
218
219    Type = proplists:get_value(type, Opts, wait),
220    async_loop_wait_result(Type, Child, Reply, []).
221
222maybe_register_with_parent_async(undefined) ->
223    ok;
224maybe_register_with_parent_async(Pid) ->
225    {ok, _} = register_with_async(Pid).
226
227register_with_async(Pid) ->
228    controller = get_role(),
229    case call(Pid, {register_child_async, self()}) of
230        {ok, _} = Ok ->
231            Ok;
232        nack ->
233            ?log_debug("Received nack when trying to register with ~p", [Pid]),
234            exit(normal)
235    end.
236
237async_loop_wait_result(Type, Child, Reply, ChildAsyncs) ->
238    receive
239        {'DOWN', _MRef, process, _Pid, Reason} = Down ->
240            maybe_log_down_message(Down),
241            terminate_now(Child, ChildAsyncs, Reason);
242        {'EXIT', Child, Reason} ->
243            terminate_on_query(Type,
244                               undefined, ChildAsyncs, {child_died, Reason});
245        %% note, we don't assume that this comes from the parent, because we
246        %% can be terminated by parent async, for example, which is not the
247        %% actual parent of our process
248        {'EXIT', _, Reason} ->
249            terminate_now(Child, ChildAsyncs, Reason);
250        {'$async_req', From, {register_child_async, Pid}} ->
251            reply(From, {ok, Child}),
252            async_loop_wait_result(Type, Child, Reply, [Pid | ChildAsyncs]);
253        {Reply, Result} ->
254            async_loop_handle_result(Type, Child, ChildAsyncs, Result);
255        {'$async_msg', Msg} ->
256            Child ! Msg,
257            async_loop_wait_result(Type, Child, Reply, ChildAsyncs);
258        abort_after_expired ->
259            terminate_on_query(Type, Child, ChildAsyncs, timeout)
260    end.
261
262maybe_terminate_child(undefined, _Reason) ->
263    ok;
264maybe_terminate_child(Child, Reason)
265  when is_pid(Child) ->
266    misc:unlink_terminate(Child, Reason).
267
268terminate_children(ChildAsyncs, Reason) ->
269    terminate_children(undefined, ChildAsyncs, Reason).
270
271terminate_children(Child, ChildAsyncs, Reason) ->
272    MRefs = [erlang:monitor(process, Pid) || Pid <- [Child | ChildAsyncs],
273                                             Pid =/= undefined],
274    maybe_terminate_child(Child, Reason),
275    lists:foreach(misc:terminate(_, Reason), ChildAsyncs),
276    terminate_children_loop(MRefs).
277
278terminate_children_loop([]) ->
279    ok;
280terminate_children_loop([MRef | Rest] = MRefs) ->
281    receive
282        {'DOWN', MRef, process, _Pid, _Reason} ->
283            terminate_children_loop(Rest);
284        {'$async_req', From, {register_child_async, _Pid}} ->
285            %% We need to continue responding to register_child_async
286            %% requests. If async receives a termination request, it will send
287            %% an exit signal to the executor process and will wait for it to
288            %% terminate. But the executor might have just spawned a new async
289            %% that will try to register with us and will get blocked. If it's
290            %% also the case that the executor traps exits and waits on this
291            %% newly spawned async and doesn't expect EXITs, we'll deadlock.
292            reply(From, nack),
293            terminate_children_loop(MRefs)
294    end.
295
296terminate_now(Child, ChildAsyncs, Reason) ->
297    terminate_children(Child, ChildAsyncs, Reason),
298    exit(Reason).
299
300terminate_on_query(perform, Child, ChildAsyncs, Reason) ->
301    terminate_now(Child, ChildAsyncs, Reason);
302terminate_on_query(wait, Child, ChildAsyncs, Reason) ->
303    terminate_children(Child, ChildAsyncs, Reason),
304    async_loop_with_result({die, Reason}).
305
306async_loop_handle_result(Type, Child, ChildAsyncs, Result) ->
307    unlink(Child),
308    ?flush({'EXIT', Child, _}),
309
310    terminate_children(ChildAsyncs, shutdown),
311
312    case Type of
313        perform ->
314            case Result of
315                {raised, {T, E, Stack}} ->
316                    erlang:raise(T, E, Stack);
317                {ok, _} ->
318                    exit(normal)
319            end;
320        wait ->
321            case Result of
322                {ok, Success} ->
323                    async_loop_with_result({reply, Success});
324                {raised, _} = Raised ->
325                    async_loop_with_result({die, Raised})
326            end
327    end.
328
329-spec async_loop_with_result({die, any()} | {reply, any()}) -> no_return().
330async_loop_with_result(Result) ->
331    receive
332        {'DOWN', _MRef, process, _Pid, Reason} = Down ->
333            maybe_log_down_message(Down),
334            exit(Reason);
335        {'EXIT', _, Reason} ->
336            exit(Reason);
337        {'$async_req', From, get_result} ->
338            handle_get_result(From, Result);
339        {'$async_req', From, {register_child_async, _Pid}} ->
340            %% We don't expect register requests at this point, but it's
341            %% possible to write a correct async that has such behavior. If we
342            %% don't reply, the requesting process will have to wait till we
343            %% die, which is unnecessary. So we just respond with nack to kill
344            %% it quickly.
345            reply(From, nack),
346            async_loop_with_result(Result);
347        {'$async_req', _, _} = Req ->
348            exit({unexpected_request, Req});
349        _ ->
350            async_loop_with_result(Result)
351    end.
352
353handle_get_result(From, {reply, Result}) ->
354    reply(From, Result),
355    exit(normal);
356handle_get_result(_From, {die, Reason}) ->
357    exit(Reason).
358
359call(Pid, Req) ->
360    call(Pid, Req, []).
361
362call(Pid, Req, Flags) ->
363    [{Pid, R}] = call_many([Pid], Req, Flags),
364    R.
365
366call_many(Pids, Req, Flags) ->
367    PidMRefs = monitor_asyncs(Pids),
368    try
369        send_req_many(PidMRefs, Req),
370        recv_many(PidMRefs, Flags)
371    after
372        demonitor_asyncs(PidMRefs)
373    end.
374
375call_any(Pids, Req, Flags) ->
376    PidMRefs = monitor_asyncs(Pids),
377    try
378        send_req_many(PidMRefs, Req),
379        recv_any(PidMRefs, Flags)
380    after
381        Pids = demonitor_asyncs(PidMRefs),
382        abort_many(Pids),
383        drop_extra_resps(PidMRefs)
384    end.
385
386drop_extra_resps(PidMRefs) ->
387    lists:foreach(
388      fun ({_, MRef}) ->
389              ?flush({MRef, _})
390      end, PidMRefs).
391
392reply({Pid, Tag}, Reply) ->
393    Pid ! {Tag, Reply}.
394
395monitor_asyncs(Pids) ->
396    [{Pid, erlang:monitor(process, Pid)} || Pid <- Pids].
397
398demonitor_asyncs(PidMRefs) ->
399    lists:map(
400      fun ({Pid, MRef}) ->
401              erlang:demonitor(MRef, [flush]),
402              Pid
403      end, PidMRefs).
404
405send_req(Pid, MRef, Req) ->
406    Pid ! {'$async_req', {self(), MRef}, Req}.
407
408send_req_many(PidMRefs, Req) ->
409    lists:foreach(
410      fun ({Pid, MRef}) ->
411              send_req(Pid, MRef, Req)
412      end, PidMRefs).
413
414recv_resp(MRef, Interruptible) ->
415    receive
416        {MRef, R} ->
417            R;
418        {'DOWN', MRef, _, _, Reason} ->
419            recv_resp_handle_down(Reason);
420        {'EXIT', _Pid, _Reason} = Exit when Interruptible ->
421            throw({interrupted, Exit})
422    end.
423
424recv_resp_handle_down({raised, {T, E, Stack}}) ->
425    erlang:raise(T, E, Stack);
426recv_resp_handle_down(Reason) ->
427    exit(Reason).
428
429recv_many(PidMRefs, Flags) ->
430    Interruptible = proplists:get_bool(interruptible, Flags),
431    [{Pid, recv_resp(MRef, Interruptible)} || {Pid, MRef} <- PidMRefs].
432
433recv_any(PidMRefs, Flags) ->
434    Interruptible = proplists:get_bool(interruptible, Flags),
435    recv_any_loop(PidMRefs, Interruptible, []).
436
437recv_any_loop(PidMRefs, Interruptible, PendingMsgs) ->
438    receive
439        {Ref, R} = Msg when is_reference(Ref) ->
440            case lists:keyfind(Ref, 2, PidMRefs) of
441                {Pid, Ref} ->
442                    recv_any_loop_resend_pending(PendingMsgs),
443                    {Pid, R};
444                false ->
445                    recv_any_loop(PidMRefs,
446                                  Interruptible,
447                                  [Msg | PendingMsgs])
448            end;
449        {'DOWN', Ref, _, _, Reason} = Msg ->
450            case lists:keymember(Ref, 2, PidMRefs) of
451                true ->
452                    recv_any_loop_resend_pending(PendingMsgs),
453                    recv_resp_handle_down(Reason);
454                false ->
455                    recv_any_loop(PidMRefs,
456                                  Interruptible,
457                                  [Msg | PendingMsgs])
458            end;
459        {'EXIT', _Pid, _Reason} = Exit when Interruptible ->
460            throw({interrupted, Exit})
461    end.
462
463recv_any_loop_resend_pending(PendingMsgs) ->
464    lists:foreach(
465      fun (Msg) ->
466              self() ! Msg
467      end, lists:reverse(PendingMsgs)).
468
469set_role(Role) ->
470    erlang:put('$async_role', Role).
471
472get_role() ->
473    erlang:get('$async_role').
474
475set_controller(Pid) when is_pid(Pid) ->
476    executor = get_role(),
477    erlang:put('$async_controller', Pid).
478
479get_controller() ->
480    erlang:get('$async_controller').
481
482register_for_adoption(Controller) ->
483    {ok, Executor} = register_with_async(Controller),
484    erlang:monitor(process, Executor).
485
486maybe_log_down_message({'DOWN', _MRef, process, Pid, Reason}) ->
487    case misc:is_normal_termination(Reason) of
488        true ->
489            ok;
490        false ->
491            ?log_warning("Monitored process ~p "
492                         "terminated abnormally (reason = ~p)", [Pid, Reason])
493    end.
494
495-ifdef(EUNIT).
496abort_after_test() ->
497    A1 = async:start(?cut(timer:sleep(10000)), [{abort_after, 100}]),
498    ?assertExit(timeout, async:wait(A1)),
499
500    A2 = async:start(?cut(timer:sleep(10000)), [{abort_after, 100}]),
501    timer:sleep(200),
502    ?assertExit(timeout, async:wait(A2)),
503
504    ok = async:with(?cut(timer:sleep(100)),
505                    [{abort_after, 200}], async:wait(_)),
506
507
508    {A3, MRef} = async:perform(?cut(timer:sleep(1000)),
509                               [monitor, {abort_after, 100}]),
510    ?must_flush({'DOWN', MRef, process, A3, timeout}).
511
512async_trap_exit_test() ->
513    %% Test that we can abort an async (A), whose body traps exits and spawns
514    %% another async (B) that tries to register with A after A has received a
515    %% termination request.
516
517    Parent = self(),
518    A = async:start(
519          fun () ->
520                  process_flag(trap_exit, true),
521                  Parent ! {child, self()},
522                  receive
523                      go -> ok
524                  end,
525
526                  B = async:start(fun () ->
527                                          ok
528                                  end),
529                  async:wait(B)
530          end),
531
532    Child = receive
533                {child, Pid} ->
534                    Pid
535            end,
536    Aborter = spawn(fun() -> async:abort(A) end),
537    Child ! go,
538    ok = misc:wait_for_process(Aborter, infinity).
539-endif.
540