1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2009-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16-module(ns_cluster_membership).
17
18-include("cut.hrl").
19-include("ns_common.hrl").
20-include_lib("eunit/include/eunit.hrl").
21
22-export([get_nodes_with_status/1,
23         get_nodes_with_status/2,
24         get_nodes_with_status/3,
25         active_nodes/0,
26         active_nodes/1,
27         inactive_added_nodes/0,
28         actual_active_nodes/0,
29         actual_active_nodes/1,
30         get_nodes_cluster_membership/0,
31         get_nodes_cluster_membership/1,
32         get_cluster_membership/1,
33         get_cluster_membership/2,
34         get_node_server_group/2,
35         activate/1,
36         deactivate/1,
37         failover/2,
38         re_failover/1,
39         system_joinable/0,
40         start_rebalance/3,
41         stop_rebalance/1,
42         get_rebalance_status/0,
43         is_balanced/0,
44         get_recovery_type/2,
45         update_recovery_type/2
46        ]).
47
48-export([supported_services/0,
49         allowed_services/1,
50         supported_services_for_version/1,
51         cluster_supported_services/0,
52         topology_aware_services/0,
53         topology_aware_services_for_version/1,
54         default_services/0,
55         set_service_map/2,
56         get_service_map/2,
57         failover_service_nodes/3,
58         service_has_pending_failover/2,
59         service_clear_pending_failover/1,
60         node_active_services/1,
61         node_active_services/2,
62         node_services/1,
63         node_services/2,
64         service_active_nodes/1,
65         service_active_nodes/2,
66         service_actual_nodes/2,
67         service_nodes/2,
68         service_nodes/3,
69         should_run_service/2,
70         should_run_service/3,
71         user_friendly_service_name/1]).
72
73get_nodes_with_status(PredOrStatus) ->
74    get_nodes_with_status(ns_config:latest(), PredOrStatus).
75
76get_nodes_with_status(Config, PredOrStatus) ->
77    get_nodes_with_status(Config,
78                          ns_node_disco:nodes_wanted(Config), PredOrStatus).
79
80get_nodes_with_status(Config, Nodes, Status)
81  when is_atom(Status) ->
82    get_nodes_with_status(Config, Nodes, _ =:= Status);
83get_nodes_with_status(Config, Nodes, Pred)
84  when is_function(Pred, 1) ->
85    [Node || Node <- Nodes,
86             Pred(get_cluster_membership(Node, Config))].
87
88active_nodes() ->
89    active_nodes(ns_config:get()).
90
91active_nodes(Config) ->
92    get_nodes_with_status(Config, active).
93
94inactive_added_nodes() ->
95    get_nodes_with_status(inactiveAdded).
96
97actual_active_nodes() ->
98    actual_active_nodes(ns_config:get()).
99
100actual_active_nodes(Config) ->
101    get_nodes_with_status(Config, ns_node_disco:nodes_actual(), active).
102
103get_nodes_cluster_membership() ->
104    get_nodes_cluster_membership(ns_node_disco:nodes_wanted()).
105
106get_nodes_cluster_membership(Nodes) ->
107    Config = ns_config:get(),
108    [{Node, get_cluster_membership(Node, Config)} || Node <- Nodes].
109
110get_cluster_membership(Node) ->
111    get_cluster_membership(Node, ns_config:get()).
112
113get_cluster_membership(Node, Config) ->
114    case ns_config:search(Config, {node, Node, membership}) of
115        {value, Value} ->
116             Value;
117        _ ->
118            inactiveAdded
119    end.
120
121get_node_server_group(Node, Config) ->
122    {value, Groups} = ns_config:search(Config, server_groups),
123    get_node_server_group_inner(Node, Groups).
124
125get_node_server_group_inner(_, []) ->
126    undefined;
127get_node_server_group_inner(Node, [SG | Rest]) ->
128    case lists:member(Node, proplists:get_value(nodes, SG)) of
129        true ->
130            proplists:get_value(name, SG);
131        false ->
132            get_node_server_group_inner(Node, Rest)
133    end.
134
135system_joinable() ->
136    ns_node_disco:nodes_wanted() =:= [node()].
137
138get_rebalance_status() ->
139    ns_orchestrator:rebalance_progress().
140
141start_rebalance(KnownNodes, EjectedNodes, DeltaRecoveryBuckets) ->
142    ns_orchestrator:start_rebalance(KnownNodes, EjectedNodes, DeltaRecoveryBuckets).
143
144activate(Nodes) ->
145    ns_config:set([{{node, Node, membership}, active} ||
146                      Node <- Nodes]).
147
148deactivate(Nodes) ->
149    ns_config:set([{{node, Node, membership}, inactiveFailed}
150                   || Node <- Nodes]).
151
152is_stop_rebalance_safe() ->
153    case ns_config:search(rebalancer_pid) of
154        false ->
155            true;
156        {value, undefined} ->
157            true;
158        {value, Pid} ->
159            PidNode = node(Pid),
160            MasterNode = mb_master:master_node(),
161            PidNode =:= MasterNode
162    end.
163
164stop_rebalance(AllowUnsafe) ->
165    case AllowUnsafe of
166        true ->
167            stop_rebalance();
168        false ->
169            stop_rebalance_if_safe()
170    end.
171
172stop_rebalance() ->
173    ns_orchestrator:stop_rebalance().
174
175stop_rebalance_if_safe() ->
176    %% NOTE: this is inherently raceful. But race is tiny and largely
177    %% harmless. So we KISS instead.
178    case is_stop_rebalance_safe() of
179        false ->
180            unsafe;
181        _ ->
182            stop_rebalance()
183    end.
184
185is_balanced() ->
186    not ns_orchestrator:needs_rebalance().
187
188failover(Nodes, AllowUnsafe) ->
189    ns_orchestrator:failover(Nodes, AllowUnsafe).
190
191re_failover_possible(NodeString) ->
192    case (catch list_to_existing_atom(NodeString)) of
193        Node when is_atom(Node) ->
194            RecoveryType = ns_config:search(ns_config:latest(), {node, Node, recovery_type}, none),
195            Membership = ns_config:search(ns_config:latest(), {node, Node, membership}),
196            Ok = (lists:member(Node, ns_node_disco:nodes_wanted())
197                  andalso RecoveryType =/= none
198                  andalso Membership =:= {value, inactiveAdded}),
199            case Ok of
200                true ->
201                    {ok, Node};
202                _ ->
203                    not_possible
204            end;
205        _ ->
206            not_possible
207    end.
208
209%% moves node from pending-recovery state to failed over state
210%% used when users hits Cancel for pending-recovery node on UI
211re_failover(NodeString) ->
212    true = is_list(NodeString),
213    case re_failover_possible(NodeString) of
214        {ok, Node} ->
215            KVList = [{{node, Node, membership}, inactiveFailed},
216                      {{node, Node, recovery_type}, none}],
217            ns_config:set(KVList),
218            ok;
219        not_possible ->
220            not_possible
221    end.
222
223get_recovery_type(Config, Node) ->
224    ns_config:search(Config, {node, Node, recovery_type}, none).
225
226-spec update_recovery_type(node(), delta | full) -> ok | bad_node | conflict.
227update_recovery_type(Node, NewType) ->
228    RV = ns_config:run_txn(
229           fun (Config, Set) ->
230                   Membership = ns_config:search(Config, {node, Node, membership}),
231
232                   case ((Membership =:= {value, inactiveAdded}
233                          andalso get_recovery_type(Config, Node) =/= none)
234                         orelse Membership =:= {value, inactiveFailed}) of
235
236                       true ->
237                           Config1 = Set({node, Node, membership}, inactiveAdded, Config),
238                           {commit,
239                            Set({node, Node, recovery_type}, NewType, Config1)};
240                       false ->
241                           {abort, {error, bad_node}}
242                   end
243           end),
244
245    case RV of
246        {commit, _} ->
247            ok;
248        {abort, not_needed} ->
249            ok;
250        {abort, {error, Error}} ->
251            Error;
252        retry_needed ->
253            erlang:error(exceeded_retries)
254    end.
255
256supported_services() ->
257    supported_services_for_version(cluster_compat_mode:supported_compat_version()).
258
259allowed_services(enterprise) ->
260    supported_services();
261allowed_services(community) ->
262    supported_services() -- enterprise_only_services().
263
264maybe_example_service() ->
265    case os:getenv("ENABLE_EXAMPLE_SERVICE") =/= false of
266        true ->
267            [{?VERSION_45, example}];
268        false ->
269            []
270    end.
271
272enterprise_only_services() ->
273    [cbas, eventing].
274
275services_by_version() ->
276    [{[0, 0],      kv},
277     {?VERSION_40, n1ql},
278     {?VERSION_40, index},
279     {?VERSION_45, fts},
280     {?VERSION_55, cbas},
281     {?VERSION_55, eventing}] ++
282        maybe_example_service().
283
284topology_aware_services_by_version() ->
285    [{?VERSION_45, fts},
286     {?VERSION_50, index},
287     {?VERSION_55, cbas},
288     {?VERSION_55, eventing}] ++
289        maybe_example_service().
290
291filter_services_by_version(Version, Services) ->
292    lists:filtermap(fun ({V, Service}) ->
293                            case cluster_compat_mode:is_enabled_at(Version, V) of
294                                true ->
295                                    {true, Service};
296                                false ->
297                                    false
298                            end
299                    end, Services).
300
301supported_services_for_version(ClusterVersion) ->
302    filter_services_by_version(ClusterVersion, services_by_version()).
303
304-ifdef(EUNIT).
305supported_services_for_version_test() ->
306    ?assertEqual([kv], supported_services_for_version(?VERSION_25)),
307    ?assertEqual([kv], supported_services_for_version(?VERSION_30)),
308    ?assertEqual(lists:sort([kv,index,n1ql]),
309                 lists:sort(supported_services_for_version(?VERSION_40))),
310    ?assertEqual(lists:sort([kv,index,n1ql]),
311                 lists:sort(supported_services_for_version(?VERSION_41))),
312    ?assertEqual(lists:sort([fts,kv,index,n1ql]),
313                 lists:sort(supported_services_for_version(?VERSION_45))).
314-endif.
315
316cluster_supported_services() ->
317    supported_services_for_version(cluster_compat_mode:get_compat_version()).
318
319default_services() ->
320    [kv].
321
322topology_aware_services_for_version(Version) ->
323    filter_services_by_version(Version, topology_aware_services_by_version()).
324
325topology_aware_services() ->
326    topology_aware_services_for_version(cluster_compat_mode:get_compat_version()).
327
328-ifdef(EUNIT).
329topology_aware_services_for_version_test() ->
330    ?assertEqual([], topology_aware_services_for_version(?VERSION_25)),
331    ?assertEqual([], topology_aware_services_for_version(?VERSION_30)),
332    ?assertEqual([], topology_aware_services_for_version(?VERSION_40)),
333    ?assertEqual(lists:sort([fts]),
334                 lists:sort(topology_aware_services_for_version(?VERSION_45))),
335    ?assertEqual(lists:sort([fts,index]),
336                 lists:sort(topology_aware_services_for_version(?VERSION_50))).
337-endif.
338
339set_service_map(kv, _Nodes) ->
340    %% kv is special; it's dealt with using different set of functions
341    ok;
342set_service_map(Service, Nodes) ->
343    master_activity_events:note_set_service_map(Service, Nodes),
344    ns_config:set({service_map, Service}, Nodes).
345
346get_service_map(Config, kv) ->
347    %% kv is special; just return active kv nodes
348    ActiveNodes = active_nodes(Config),
349    service_nodes(Config, ActiveNodes, kv);
350get_service_map(Config, Service) ->
351    ns_config:search(Config, {service_map, Service}, []).
352
353failover_service_nodes(Config, Service, Nodes) ->
354    Map = ns_cluster_membership:get_service_map(Config, Service),
355    NewMap = Map -- Nodes,
356    ok = ns_config:set([{{service_map, Service}, NewMap},
357                        {{service_failover_pending, Service}, true}]).
358
359service_has_pending_failover(Config, Service) ->
360    ns_config:search(Config, {service_failover_pending, Service}, false).
361
362service_clear_pending_failover(Service) ->
363    ns_config:set({service_failover_pending, Service}, false).
364
365node_active_services(Node) ->
366    node_active_services(ns_config:latest(), Node).
367
368node_active_services(Config, Node) ->
369    AllServices = node_services(Config, Node),
370    [S || S <- AllServices,
371          lists:member(Node, service_active_nodes(Config, S))].
372
373node_services(Node) ->
374    node_services(ns_config:latest(), Node).
375
376node_services(Config, Node) ->
377    case ns_config:search(Config, {node, Node, services}) of
378        false ->
379            default_services();
380        {value, Value} ->
381            Value
382    end.
383
384should_run_service(Service, Node) ->
385    should_run_service(ns_config:latest(), Service, Node).
386
387should_run_service(Config, Service, Node) ->
388    case ns_config_auth:is_system_provisioned()
389        andalso ns_cluster_membership:get_cluster_membership(Node, Config) =:= active  of
390        false -> false;
391        true ->
392            Svcs = ns_cluster_membership:node_services(Config, Node),
393            lists:member(Service, Svcs)
394    end.
395
396service_active_nodes(Service) ->
397    service_active_nodes(ns_config:latest(), Service).
398
399%% just like get_service_map/2, but returns all nodes having a service if the
400%% cluster is not 4.1 yet
401service_active_nodes(Config, Service) ->
402    case cluster_compat_mode:is_cluster_41(Config) of
403        true ->
404            get_service_map(Config, Service);
405        false ->
406            ActiveNodes = active_nodes(Config),
407            service_nodes(Config, ActiveNodes, Service)
408    end.
409
410service_actual_nodes(Config, Service) ->
411    ActualNodes = ordsets:from_list(actual_active_nodes(Config)),
412    ServiceActiveNodes = ordsets:from_list(service_active_nodes(Config, Service)),
413    ordsets:intersection(ActualNodes, ServiceActiveNodes).
414
415service_nodes(Nodes, Service) ->
416    service_nodes(ns_config:latest(), Nodes, Service).
417
418service_nodes(Config, Nodes, Service) ->
419    [N || N <- Nodes,
420          ServiceC <- node_services(Config, N),
421          ServiceC =:= Service].
422
423user_friendly_service_name(kv) ->
424    "data";
425user_friendly_service_name(n1ql) ->
426    "query";
427user_friendly_service_name(fts) ->
428    "full text search";
429user_friendly_service_name(cbas) ->
430    "analytics";
431user_friendly_service_name(Service) ->
432    atom_to_list(Service).
433