1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2017-2019 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15
16-module(async).
17
18-include("cut.hrl").
19-include("ns_common.hrl").
20
21-ifdef(TEST).
22-include_lib("eunit/include/eunit.hrl").
23-endif.
24
25-export([start/1, start/2,
26         start_many/2, start_many/3,
27         abort/1, abort/2,
28         abort_many/1, abort_many/2,
29         send/2,
30         with/2, with/3,
31         with_many/3, with_many/4,
32         wait/1, wait/2,
33         wait_many/1, wait_many/2,
34         wait_any/1, wait_any/2,
35         race/2, map/2, foreach/2,
36         run_with_timeout/2,
37         get_identity/0]).
38
39start(Fun) ->
40    start(Fun, []).
41
42start(Fun, Opts) ->
43    SpawnFun =
44        case proplists:get_value(monitor, Opts, false) of
45            true ->
46                fun misc:spawn_monitor/1;
47            false ->
48                fun proc_lib:spawn/1
49        end,
50
51    Parent           = self(),
52    ParentController = get_controller(),
53
54    SpawnFun(
55      fun () ->
56              async_init(Parent, ParentController, Opts, Fun)
57      end).
58
59start_many(Fun, Args) ->
60    start_many(Fun, Args, []).
61
62start_many(Fun, Args, Opts) ->
63    [start(fun () ->
64                   Fun(A)
65           end, Opts) || A <- Args].
66
67abort(Pid) ->
68    abort_many([Pid]).
69
70abort(Pid, Reason) ->
71    abort_many([Pid], Reason).
72
73abort_many(Pids) ->
74    abort_many(Pids, shutdown).
75
76abort_many(Pids, Reason) ->
77    misc:terminate_and_wait(Pids, Reason).
78
79send(Async, Msg) ->
80    Async ! {'$async_msg', Msg},
81    Msg.
82
83with(AsyncBody, Fun) ->
84    with(AsyncBody, [], Fun).
85
86with(AsyncBody, Opts, Fun) ->
87    Async = start(AsyncBody, Opts),
88    try
89        Fun(Async)
90    after
91        abort(Async)
92    end.
93
94with_many(AsyncBody, Args, Fun) ->
95    with_many(AsyncBody, Args, [], Fun).
96
97with_many(AsyncBody, Args, Opts, Fun) ->
98    Asyncs = start_many(AsyncBody, Args, Opts),
99    try
100        Fun(Asyncs)
101    after
102        abort_many(Asyncs)
103    end.
104
105wait(Pid) ->
106    wait(Pid, []).
107
108wait(Pid, Flags) ->
109    call(Pid, get_result, Flags).
110
111wait_many(Pids) ->
112    wait_many(Pids, []).
113
114wait_many(Pids, Flags) ->
115    call_many(Pids, get_result, Flags).
116
117wait_any(Pids) ->
118    wait_any(Pids, []).
119
120wait_any(Pids, Flags) ->
121    call_any(Pids, get_result, Flags).
122
123race(Fun1, Fun2) ->
124    with(
125      Fun1,
126      fun (Async1) ->
127              with(
128                Fun2,
129                fun (Async2) ->
130                        case wait_any([Async1, Async2]) of
131                            {Async1, R} ->
132                                {left, R};
133                            {Async2, R} ->
134                                {right, R}
135                        end
136                end)
137      end).
138
139map(Fun, List) ->
140    with_many(
141      Fun, List,
142      fun (Asyncs) ->
143              Results = wait_many(Asyncs),
144              [R || {_, R} <- Results]
145      end).
146
147foreach(Fun, List) ->
148    with_many(
149      Fun, List,
150      fun (Asyncs) ->
151              _ = wait_many(Asyncs),
152              ok
153      end).
154
155run_with_timeout(Fun, Timeout) ->
156    try
157        with(Fun, [{abort_after, Timeout}], ?cut({ok, wait(_)}))
158    catch
159        exit:timeout ->
160            {error, timeout}
161    end.
162
163get_identity() ->
164    case get_role() of
165        executor ->
166            Controller = get_controller(),
167            true = is_pid(Controller),
168
169            {ok, Controller};
170        _ ->
171            not_async
172    end.
173
174%% internal
175async_init(Parent, ParentController, Opts, Fun) ->
176    erlang:monitor(process, Parent),
177
178    set_role(controller),
179    maybe_register_with_parent_async(ParentController),
180
181    Adopters = proplists:get_value(adopters, Opts, []),
182    lists:foreach(register_for_adoption(_), Adopters),
183
184    process_flag(trap_exit, true),
185
186    Reply      = make_ref(),
187    Controller = self(),
188
189    Child =
190        spawn_link(
191          fun () ->
192                  set_role(executor),
193                  set_controller(Controller),
194
195                  To = {Controller, Reply},
196
197                  try Fun() of
198                      R ->
199                          reply(To, {ok, R})
200                  catch
201                      T:E ->
202                          Stack = erlang:get_stacktrace(),
203                          reply(To, {raised, {T, E, Stack}}),
204                          erlang:raise(T, E, Stack)
205                  end
206          end),
207
208    case proplists:get_value(abort_after, Opts) of
209        undefined ->
210            ok;
211        infinity ->
212            ok;
213        AbortAfter when is_integer(AbortAfter) ->
214            erlang:send_after(AbortAfter, self(), abort_after_expired)
215    end,
216
217    async_loop_wait_result(Child, Reply, []).
218
219maybe_register_with_parent_async(undefined) ->
220    ok;
221maybe_register_with_parent_async(Pid) ->
222    {ok, _} = register_with_async(Pid).
223
224register_with_async(Pid) ->
225    controller = get_role(),
226    case call(Pid, {register_child_async, self()}) of
227        {ok, _} = Ok ->
228            Ok;
229        nack ->
230            ?log_debug("Received nack when trying to register with ~p", [Pid]),
231            exit(normal)
232    end.
233
234async_loop_wait_result(Child, Reply, ChildAsyncs) ->
235    receive
236        {'DOWN', _MRef, process, Pid, Reason} = Down ->
237            maybe_log_down_message(Down),
238
239            %% We change the reason to a {shutdown, _} because we want to
240            %% avoid polluting logs with crash reports. Since the process that
241            %% died must have already produced a crash report and since it's
242            %% not our process that is the cause of the problem, we suppress
243            %% the crash report here.
244            terminate_now(Child, ChildAsyncs,
245                          {shutdown, {monitored_process_died, Pid, Reason}});
246        {'EXIT', Child, Reason} ->
247            terminate_on_query(undefined, ChildAsyncs, {child_died, Reason});
248        %% note, we don't assume that this comes from the parent, because we
249        %% can be terminated by parent async, for example, which is not the
250        %% actual parent of our process
251        {'EXIT', _, Reason} ->
252            terminate_now(Child, ChildAsyncs, Reason);
253        {'$async_req', From, {register_child_async, Pid}} ->
254            reply(From, {ok, Child}),
255            async_loop_wait_result(Child, Reply, [Pid | ChildAsyncs]);
256        {Reply, Result} ->
257            async_loop_handle_result(Child, ChildAsyncs, Result);
258        {'$async_msg', Msg} ->
259            Child ! Msg,
260            async_loop_wait_result(Child, Reply, ChildAsyncs);
261        abort_after_expired ->
262            terminate_on_query(Child, ChildAsyncs, timeout)
263    end.
264
265maybe_terminate_child(undefined) ->
266    ok;
267maybe_terminate_child(Child)
268  when is_pid(Child) ->
269    misc:unlink_terminate(Child, shutdown).
270
271terminate_children(Child, ChildAsyncs) ->
272    MRefs = [erlang:monitor(process, Pid) || Pid <- [Child | ChildAsyncs],
273                                             Pid =/= undefined],
274    maybe_terminate_child(Child),
275    lists:foreach(misc:terminate(_, shutdown), ChildAsyncs),
276    terminate_children_loop(MRefs).
277
278terminate_children_loop([]) ->
279    ok;
280terminate_children_loop([MRef | Rest] = MRefs) ->
281    receive
282        {'DOWN', MRef, process, _Pid, _Reason} ->
283            terminate_children_loop(Rest);
284        {'$async_req', From, {register_child_async, _Pid}} ->
285            %% We need to continue responding to register_child_async
286            %% requests. If async receives a termination request, it will send
287            %% an exit signal to the executor process and will wait for it to
288            %% terminate. But the executor might have just spawned a new async
289            %% that will try to register with us and will get blocked. If it's
290            %% also the case that the executor traps exits and waits on this
291            %% newly spawned async and doesn't expect EXITs, we'll deadlock.
292            reply(From, nack),
293            terminate_children_loop(MRefs)
294    end.
295
296terminate_now(Child, ChildAsyncs, Reason) ->
297    terminate_children(Child, ChildAsyncs),
298    exit(Reason).
299
300terminate_on_query(Child, ChildAsyncs, Reason) ->
301    terminate_children(Child, ChildAsyncs),
302    async_loop_with_result({die, Reason}).
303
304async_loop_handle_result(Child, ChildAsyncs, Result) ->
305    terminate_children(Child, ChildAsyncs),
306
307    case Result of
308        {ok, Success} ->
309            async_loop_with_result({reply, Success});
310        {raised, _} = Raised ->
311            async_loop_with_result({die, Raised})
312    end.
313
314-spec async_loop_with_result({die, any()} | {reply, any()}) -> no_return().
315async_loop_with_result(Result) ->
316    receive
317        {'DOWN', _MRef, process, Pid, Reason} = Down ->
318            maybe_log_down_message(Down),
319
320            %% {shutdown, _} so we don't produce a crash report.
321            exit({shutdown, {monitored_process_died, Pid, Reason}});
322        {'EXIT', _, Reason} ->
323            exit(Reason);
324        {'$async_req', From, get_result} ->
325            handle_get_result(From, Result);
326        {'$async_req', From, {register_child_async, _Pid}} ->
327            %% We don't expect register requests at this point, but it's
328            %% possible to write a correct async that has such behavior. If we
329            %% don't reply, the requesting process will have to wait till we
330            %% die, which is unnecessary. So we just respond with nack to kill
331            %% it quickly.
332            reply(From, nack),
333            async_loop_with_result(Result);
334        {'$async_req', _, _} = Req ->
335            exit({unexpected_request, Req});
336        _ ->
337            async_loop_with_result(Result)
338    end.
339
340handle_get_result(From, {reply, Result}) ->
341    reply(From, Result),
342    exit(normal);
343handle_get_result(_From, {die, Reason}) ->
344    %% Wrapping the reason in {shutdown, _} so we don't produce an unneeded
345    %% crash report.
346    exit({shutdown, {async_died, Reason}}).
347
348call(Pid, Req) ->
349    call(Pid, Req, []).
350
351call(Pid, Req, Flags) ->
352    [{Pid, R}] = call_many([Pid], Req, Flags),
353    R.
354
355call_many(Pids, Req, Flags) ->
356    PidMRefs = monitor_asyncs(Pids),
357    try
358        send_req_many(PidMRefs, Req),
359        recv_many(PidMRefs, Flags)
360    after
361        demonitor_asyncs(PidMRefs)
362    end.
363
364call_any(Pids, Req, Flags) ->
365    PidMRefs = monitor_asyncs(Pids),
366    try
367        send_req_many(PidMRefs, Req),
368        recv_any(PidMRefs, Flags)
369    after
370        Pids = demonitor_asyncs(PidMRefs),
371        abort_many(Pids),
372        drop_extra_resps(PidMRefs)
373    end.
374
375drop_extra_resps(PidMRefs) ->
376    lists:foreach(
377      fun ({_, MRef}) ->
378              ?flush({MRef, _})
379      end, PidMRefs).
380
381reply({Pid, Tag}, Reply) ->
382    Pid ! {Tag, Reply}.
383
384monitor_asyncs(Pids) ->
385    [{Pid, erlang:monitor(process, Pid)} || Pid <- Pids].
386
387demonitor_asyncs(PidMRefs) ->
388    lists:map(
389      fun ({Pid, MRef}) ->
390              erlang:demonitor(MRef, [flush]),
391              Pid
392      end, PidMRefs).
393
394send_req(Pid, MRef, Req) ->
395    Pid ! {'$async_req', {self(), MRef}, Req}.
396
397send_req_many(PidMRefs, Req) ->
398    lists:foreach(
399      fun ({Pid, MRef}) ->
400              send_req(Pid, MRef, Req)
401      end, PidMRefs).
402
403recv_resp(MRef, Interruptible) ->
404    receive
405        {MRef, R} ->
406            R;
407        {'DOWN', MRef, _, _, Reason} ->
408            recv_resp_handle_down(Reason);
409        {'EXIT', _Pid, _Reason} = Exit when Interruptible ->
410            throw({interrupted, Exit})
411    end.
412
413recv_resp_handle_down({shutdown, {async_died, Reason}}) ->
414    recv_resp_handle_down(Reason);
415%% The following clause can only occur when called recursively by the one
416%% above. But keeping it in place in case we need to wait on an async running
417%% older code.
418recv_resp_handle_down({raised, {T, E, Stack}}) ->
419    erlang:raise(T, E, Stack);
420recv_resp_handle_down(Reason) ->
421    exit(Reason).
422
423recv_many(PidMRefs, Flags) ->
424    Interruptible = proplists:get_bool(interruptible, Flags),
425    [{Pid, recv_resp(MRef, Interruptible)} || {Pid, MRef} <- PidMRefs].
426
427recv_any(PidMRefs, Flags) ->
428    Interruptible = proplists:get_bool(interruptible, Flags),
429    recv_any_loop(PidMRefs, Interruptible, []).
430
431recv_any_loop(PidMRefs, Interruptible, PendingMsgs) ->
432    receive
433        {Ref, R} = Msg when is_reference(Ref) ->
434            case lists:keyfind(Ref, 2, PidMRefs) of
435                {Pid, Ref} ->
436                    recv_any_loop_resend_pending(PendingMsgs),
437                    {Pid, R};
438                false ->
439                    recv_any_loop(PidMRefs,
440                                  Interruptible,
441                                  [Msg | PendingMsgs])
442            end;
443        {'DOWN', Ref, _, _, Reason} = Msg ->
444            case lists:keymember(Ref, 2, PidMRefs) of
445                true ->
446                    recv_any_loop_resend_pending(PendingMsgs),
447                    recv_resp_handle_down(Reason);
448                false ->
449                    recv_any_loop(PidMRefs,
450                                  Interruptible,
451                                  [Msg | PendingMsgs])
452            end;
453        {'EXIT', _Pid, _Reason} = Exit when Interruptible ->
454            throw({interrupted, Exit})
455    end.
456
457recv_any_loop_resend_pending(PendingMsgs) ->
458    lists:foreach(
459      fun (Msg) ->
460              self() ! Msg
461      end, lists:reverse(PendingMsgs)).
462
463set_role(Role) ->
464    erlang:put('$async_role', Role).
465
466get_role() ->
467    erlang:get('$async_role').
468
469set_controller(Pid) when is_pid(Pid) ->
470    executor = get_role(),
471    erlang:put('$async_controller', Pid).
472
473get_controller() ->
474    erlang:get('$async_controller').
475
476register_for_adoption(Controller) ->
477    {ok, Executor} = register_with_async(Controller),
478    erlang:monitor(process, Executor).
479
480maybe_log_down_message({'DOWN', _MRef, process, Pid, Reason}) ->
481    case misc:is_normal_termination(Reason) of
482        true ->
483            ok;
484        false ->
485            ?log_warning("Monitored process ~p "
486                         "terminated abnormally (reason = ~p)", [Pid, Reason])
487    end.
488
489
490-ifdef(TEST).
491race_test() ->
492    0 = ?flush(_),
493
494    {_, Result} = race(?cut(a), ?cut(b)),
495    ?assert(lists:member(Result, [a, b])),
496
497    WaitFun = fun () ->
498                      Ref = make_ref(),
499                      receive
500                          Ref ->
501                              ok
502                      end
503              end,
504
505    ?assertEqual({left, a}, race(?cut(a), WaitFun)),
506    ?assertEqual({right, b}, race(WaitFun, ?cut(b))),
507
508    0 = ?flush(_).
509
510abort_after_test() ->
511    A1 = async:start(?cut(timer:sleep(10000)), [{abort_after, 100}]),
512    ?assertExit(timeout, async:wait(A1)),
513
514    A2 = async:start(?cut(timer:sleep(10000)), [{abort_after, 100}]),
515    timer:sleep(200),
516    ?assertExit(timeout, async:wait(A2)),
517
518    ok = async:with(?cut(timer:sleep(100)),
519                    [{abort_after, 200}], async:wait(_)),
520
521    ok = async:with(?cut(timer:sleep(100)),
522                    [{abort_after, infinity}], async:wait(_)).
523
524run_with_timeout_test() ->
525    {ok, good} = run_with_timeout(?cut(good), 1000),
526    {ok, good} = run_with_timeout(?cut(good), infinity),
527    {error, timeout} = run_with_timeout(?cut(timer:sleep(1000)), 100).
528
529exceptions_rethrown_test() ->
530    ?assertThrow(test, with(?cut(throw(test)), wait(_))),
531    ?assertThrow(test2,
532                 with(fun () ->
533                              with(?cut(throw(test2)), ?cut(wait(_)))
534                      end, wait(_))).
535
536async_trap_exit_test() ->
537    %% Test that we can abort an async (A), whose body traps exits and spawns
538    %% another async (B) that tries to register with A after A has received a
539    %% termination request.
540
541    Parent = self(),
542    A = async:start(
543          fun () ->
544                  process_flag(trap_exit, true),
545                  Parent ! {child, self()},
546                  receive
547                      go -> ok
548                  end,
549
550                  B = async:start(fun () ->
551                                          ok
552                                  end),
553                  async:wait(B)
554          end),
555
556    Child = receive
557                {child, Pid} ->
558                    Pid
559            end,
560    Aborter = spawn(fun() -> async:abort(A) end),
561    Child ! go,
562    ok = misc:wait_for_process(Aborter, infinity).
563-endif.
564