1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2017-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16-module(leader_lease_acquirer).
17
18-behaviour(gen_server).
19
20%% API
21-export([start_link/0]).
22
23%% gen_server callbacks
24-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
25         terminate/2, code_change/3]).
26
27-include("cut.hrl").
28-include("ns_common.hrl").
29
30-define(SERVER, ?MODULE).
31
32-type worker() :: pid().
33-record(state, { uuid    :: binary(),
34                 nodes   :: sets:set(node()),
35                 workers :: [{node(), worker()}],
36
37                 leader_activities_pid :: pid()
38               }).
39
40%% API
41start_link() ->
42    leader_utils:ignore_if_new_orchestraction_disabled(
43      fun () ->
44          proc_lib:start_link(?MODULE, init, [[]])
45      end).
46
47%% gen_server callbacks
48init([]) ->
49    register(?SERVER, self()),
50    proc_lib:init_ack({ok, self()}),
51
52    leader_utils:wait_cluster_is_55(),
53    enter_loop().
54
55enter_loop() ->
56    process_flag(priority, high),
57    process_flag(trap_exit, true),
58
59    Self = self(),
60    ns_pubsub:subscribe_link(ns_node_disco_events,
61                             case _ of
62                                 {ns_node_disco_events, _Old, New} ->
63                                     Self ! {new_nodes, New};
64                                 _Other ->
65                                     ok
66                             end),
67
68    %% Generally, we are started after the leader_activities, so the name
69    %% should already be there. It's only when leader_activities dies
70    %% abnormally that we need to wait here. Hopefully, it shouldn't take it
71    %% long to restart, hence the short timeout.
72    ok = misc:wait_for_local_name(leader_activities, 1000),
73
74    {ok, Pid} = leader_activities:register_acquirer(Self),
75    erlang:monitor(process, Pid),
76
77    State = #state{uuid    = couch_uuids:random(),
78                   nodes   = sets:new(),
79                   workers = [],
80
81                   leader_activities_pid = Pid},
82
83    Nodes = ns_node_disco:nodes_actual(),
84    gen_server:enter_loop(?MODULE, [],
85                          handle_new_nodes(Nodes, State), {local, ?SERVER}).
86
87
88handle_call(Request, _From, State) ->
89    ?log_error("Received unexpected call ~p when state is~n~p",
90               [Request, State]),
91    {reply, nack, State}.
92
93handle_cast(Msg, State) ->
94    ?log_error("Received unexpected cast ~p when state is~n~p", [Msg, State]),
95    {noreply, State}.
96
97handle_info({new_nodes, Nodes0}, State) ->
98    Nodes = flush_new_nodes(Nodes0),
99    {noreply, handle_new_nodes(Nodes, State)};
100handle_info({'DOWN', MRef, process, Pid, Reason}, State) ->
101    {noreply, handle_down(MRef, Pid, Reason, State)};
102handle_info({'EXIT', Pid, Reason}, State) ->
103    {noreply, handle_exit(Pid, Reason, State)};
104handle_info(Info, State) ->
105    ?log_error("Received unexpected message ~p when state is~n~p",
106               [Info, State]),
107    {noreply, State}.
108
109terminate(Reason, State) ->
110    handle_terminate(Reason, State).
111
112code_change(_OldVsn, State, _Extra) ->
113    {ok, State}.
114
115%% Internal functions
116handle_new_nodes(NewNodes0, #state{nodes = OldNodes} = State) ->
117    NewNodes = sets:from_list(NewNodes0),
118    Added    = sets:subtract(NewNodes, OldNodes),
119    Removed  = sets:subtract(OldNodes, NewNodes),
120    NewState = State#state{nodes = NewNodes},
121
122    handle_added_nodes(Added, handle_removed_nodes(Removed, NewState)).
123
124handle_added_nodes(Nodes, State) ->
125    spawn_many_workers(Nodes, State).
126
127handle_removed_nodes(Nodes, State) ->
128    shutdown_many_workers(Nodes, State).
129
130handle_down(_MRef, Pid, Reason, #state{leader_activities_pid = Pid}) ->
131    ?log_info("Leader activities process ~p terminated with reason ~p",
132              [Pid, Reason]),
133    exit({leader_activities_died, Pid, Reason}).
134
135handle_exit(Pid, Reason, State) ->
136    case take_worker(Pid, State) of
137        not_found ->
138            ?log_error("Received an EXIT message "
139                       "from an unknown process: ~p", [{Pid, Reason}]),
140            exit({exit_from_unkown_process, Pid, Reason});
141        {ok, {Node, Pid}, NewState} ->
142            handle_worker_exit(Node, Pid, Reason, NewState)
143    end.
144
145handle_worker_exit(Node, Pid, Reason, State) ->
146    ?log_error("Worker ~p for node ~p terminated unexpectedly (reason ~p)",
147               [Pid, Node, Reason]),
148
149    cleanup_after_worker(Node),
150    spawn_worker(Node, State).
151
152handle_terminate(Reason, State) ->
153    case misc:is_shutdown(Reason) of
154        true ->
155            ok;
156        false ->
157            ?log_warning("Terminating abnormally (reason ~p):~n~p",
158                         [Reason, State])
159    end,
160
161    shutdown_all_workers(State),
162    abolish_all_leases(State),
163    ok.
164
165abolish_all_leases(#state{nodes = Nodes, uuid  = UUID}) ->
166    leader_lease_agent:abolish_leases(sets:to_list(Nodes), node(), UUID).
167
168spawn_worker(Node, State) ->
169    spawn_many_workers([Node], State).
170
171spawn_many_workers(Nodes, State)
172  when is_list(Nodes) ->
173    NewWorkers = [{N, spawn_worker_process(N, State)} || N <- Nodes],
174    misc:update_field(#state.workers, State, NewWorkers ++ _);
175spawn_many_workers(Nodes, State) ->
176    true = sets:is_set(Nodes),
177    spawn_many_workers(sets:to_list(Nodes), State).
178
179spawn_worker_process(Node, State) ->
180    leader_lease_acquire_worker:spawn_link(Node, State#state.uuid).
181
182shutdown_all_workers(State) ->
183    shutdown_many_workers(State#state.nodes, State).
184
185shutdown_many_workers(Nodes, State) ->
186    misc:update_field(#state.workers, State,
187                      lists:filter(fun ({N, Worker}) ->
188                                           case sets:is_element(N, Nodes) of
189                                               true ->
190                                                   shutdown_worker(N, Worker),
191                                                   false;
192                                               false ->
193                                                   true
194                                           end
195                                   end, _)).
196
197shutdown_worker(Node, Pid) ->
198    misc:unlink_terminate_and_wait(Pid, kill),
199    cleanup_after_worker(Node).
200
201cleanup_after_worker(Node) ->
202    %% make sure that if we owned the lease, we report it being lost
203    ok = leader_activities:lease_lost(self(), Node).
204
205take_worker(Pid, #state{workers = Workers} = State) ->
206    case lists:keytake(Pid, 2, Workers) of
207        {value, NodeWorker, RestWorkers} ->
208            {ok, NodeWorker, State#state{workers = RestWorkers}};
209        false ->
210            not_found
211    end.
212
213flush_new_nodes(Result) ->
214    receive
215        {new_nodes, Nodes} ->
216            flush_new_nodes(Nodes)
217    after
218        0 ->
219            Result
220    end.
221