xref: /6.0.3/ns_server/src/ns_rebalancer.erl (revision 08ae4a4a)
1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2010-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16%% Monitor and maintain the vbucket layout of each bucket.
17%% There is one of these per bucket.
18%%
19%% @doc Rebalancing functions.
20%%
21
22-module(ns_rebalancer).
23
24-include("cut.hrl").
25-include("ns_common.hrl").
26-include("ns_stats.hrl").
27
28-include_lib("eunit/include/eunit.hrl").
29
30-export([orchestrate_failover/1,
31         check_graceful_failover_possible/2,
32         validate_autofailover/1,
33         generate_initial_map/1,
34         start_link_rebalance/5,
35         move_vbuckets/2,
36         unbalanced/2,
37         map_options_changed/1,
38         eject_nodes/1,
39         maybe_cleanup_old_buckets/1,
40         get_delta_recovery_nodes/2,
41         verify_replication/3,
42         start_link_graceful_failover/1,
43         generate_vbucket_map_options/2,
44         run_failover/2,
45         rebalance_topology_aware_services/4]).
46
47-export([wait_local_buckets_shutdown_complete/0]). % used via rpc:multicall
48
49
50-define(DATA_LOST, 1).
51-define(BAD_REPLICATORS, 2).
52
53-define(BUCKETS_SHUTDOWN_WAIT_TIMEOUT, ?get_timeout(buckets_shutdown, 20000)).
54
55-define(REBALANCER_READINESS_WAIT_TIMEOUT, ?get_timeout(readiness, 60000)).
56-define(REBALANCER_QUERY_STATES_TIMEOUT,   ?get_timeout(query_states, 10000)).
57-define(REBALANCER_APPLY_CONFIG_TIMEOUT,   ?get_timeout(apply_config, 300000)).
58-define(FAILOVER_CONFIG_SYNC_TIMEOUT,
59        ?get_timeout(failover_config_sync, 2000)).
60%%
61%% API
62%%
63
64run_failover(Nodes, AllowUnsafe) ->
65    ok = check_no_tap_buckets(),
66    case check_failover_possible(Nodes) of
67        ok ->
68            Result = leader_activities:run_activity(
69                       failover, majority,
70                       ?cut(orchestrate_failover(Nodes)),
71                       [{unsafe, AllowUnsafe}]),
72
73            case Result of
74                {leader_activities_error, _, {quorum_lost, _}} ->
75                    orchestration_unsafe;
76                {leader_activities_error, _, {no_quorum, _}} ->
77                    orchestration_unsafe;
78                _ ->
79                    Result
80            end;
81        Error ->
82            Error
83    end.
84
85orchestrate_failover(Nodes) ->
86    ale:info(?USER_LOGGER, "Starting failing over ~p", [Nodes]),
87    master_activity_events:note_failover(Nodes),
88
89    ErrorNodes = failover(Nodes),
90
91    case ErrorNodes of
92        [] ->
93            ns_cluster:counter_inc(failover_complete),
94            ale:info(?USER_LOGGER, "Failed over ~p: ok", [Nodes]);
95        _ ->
96            ns_cluster:counter_inc(failover_incomplete),
97            ale:error(?USER_LOGGER,
98                      "Failover couldn't "
99                      "complete on some nodes:~n~p", [ErrorNodes])
100    end,
101
102    ok = leader_activities:deactivate_quorum_nodes(Nodes),
103
104    ns_cluster:counter_inc(failover),
105    deactivate_nodes(Nodes),
106
107    ok.
108
109deactivate_nodes([]) ->
110    ok;
111deactivate_nodes(Nodes) ->
112    ale:info(?USER_LOGGER, "Deactivating failed over nodes ~p", [Nodes]),
113    ns_cluster_membership:deactivate(Nodes).
114
115%% @doc Fail one or more nodes. Doesn't eject the node from the cluster. Takes
116%% effect immediately.
117failover(Nodes) ->
118    lists:umerge([failover_buckets(Nodes),
119                  failover_services(Nodes)]).
120
121failover_buckets(Nodes) ->
122    Results = lists:flatmap(fun ({Bucket, BucketConfig}) ->
123                                    failover_bucket(Bucket, BucketConfig, Nodes)
124                            end, ns_bucket:get_buckets()),
125    failover_buckets_handle_failover_vbuckets(Results),
126    failover_handle_results(Results).
127
128failover_buckets_handle_failover_vbuckets(Results) ->
129    FailoverVBuckets =
130        misc:groupby_map(fun (Result) ->
131                                 Node   = proplists:get_value(node, Result),
132                                 Bucket = proplists:get_value(bucket, Result),
133                                 VBs    = proplists:get_value(vbuckets, Result),
134
135                                 {Node, {Bucket, VBs}}
136                         end, Results),
137
138    KVs = [{{node, N, failover_vbuckets}, VBs} || {N, VBs} <- FailoverVBuckets],
139    ns_config:set(KVs).
140
141failover_handle_results(Results) ->
142    NodeStatuses =
143        misc:groupby_map(fun (Result) ->
144                                 Node   = proplists:get_value(node, Result),
145                                 Status = proplists:get_value(status, Result),
146
147                                 {Node, Status}
148                         end, Results),
149
150    lists:filtermap(fun ({Node, Statuses}) ->
151                            NonOKs = [S || S <- Statuses, S =/= ok],
152
153                            case NonOKs of
154                                [] ->
155                                    false;
156                                _ ->
157                                    {true, Node}
158                            end
159                    end, NodeStatuses).
160
161failover_bucket(Bucket, BucketConfig, Nodes) ->
162    master_activity_events:note_bucket_failover_started(Bucket, Nodes),
163
164    Type   = ns_bucket:bucket_type(BucketConfig),
165    Result = do_failover_bucket(Type, Bucket, BucketConfig, Nodes),
166
167    master_activity_events:note_bucket_failover_ended(Bucket, Nodes),
168
169    Result.
170
171do_failover_bucket(memcached, Bucket, BucketConfig, Nodes) ->
172    failover_memcached_bucket(Nodes, Bucket, BucketConfig),
173    [];
174do_failover_bucket(membase, Bucket, BucketConfig, Nodes) ->
175    Map = proplists:get_value(map, BucketConfig, []),
176    R = failover_membase_bucket(Nodes, Bucket, BucketConfig, Map),
177
178    [[{bucket, Bucket},
179      {node, N},
180      {status, R},
181      {vbuckets, node_vbuckets(Map, N)}] || N <- Nodes].
182
183failover_services(Nodes) ->
184    failover_services(cluster_compat_mode:is_cluster_41(), Nodes).
185
186failover_services(false, _Nodes) ->
187    [];
188failover_services(true, Nodes) ->
189    Config    = ns_config:get(),
190    Services0 = lists:flatmap(
191                  ns_cluster_membership:node_services(Config, _), Nodes),
192    Services  = lists:usort(Services0) -- [kv],
193
194    Results = lists:flatmap(failover_service(Config, _, Nodes), Services),
195    failover_handle_results(Results).
196
197failover_service(Config, Service, Nodes) ->
198    ns_cluster_membership:failover_service_nodes(Config, Service, Nodes),
199
200    %% We're refetching the config since failover_service_nodes updated the
201    %% one that we had.
202    Result = service_janitor:complete_service_failover(ns_config:get(),
203                                                       Service,
204                                                       Nodes),
205    case Result of
206        ok ->
207            ?log_debug("Failed over service ~p on nodes ~p successfully",
208                       [Service, Nodes]);
209        _ ->
210            ?log_error("Failed to failover service ~p on nodes ~p: ~p",
211                       [Service, Nodes, Result])
212    end,
213
214    [[{node, Node},
215      {status, Result},
216      {service, Service}] || Node <- Nodes].
217
218get_failover_vbuckets(Config, Node) ->
219    ns_config:search(Config, {node, Node, failover_vbuckets}, []).
220
221validate_autofailover(Nodes) ->
222    BucketPairs = ns_bucket:get_buckets(),
223    UnsafeBuckets =
224        [BucketName
225         || {BucketName, BucketConfig} <- BucketPairs,
226            validate_autofailover_bucket(BucketConfig, Nodes) =:= false],
227    case UnsafeBuckets of
228        [] -> ok;
229        _ -> {error, UnsafeBuckets}
230    end.
231
232validate_autofailover_bucket(BucketConfig, Nodes) ->
233    case proplists:get_value(type, BucketConfig) of
234        membase ->
235            Map = proplists:get_value(map, BucketConfig),
236            Map1 = mb_map:promote_replicas(Map, Nodes),
237            case Map1 of
238                undefined ->
239                    true;
240                _ ->
241                    case [I || {I, [undefined|_]} <- misc:enumerate(Map1, 0)] of
242                        [] -> true;
243                        _MissingVBuckets ->
244                            false
245                    end
246            end;
247        _ ->
248            true
249    end.
250
251failover_memcached_bucket(Nodes, Bucket, BucketConfig) ->
252    remove_nodes_from_server_list(Nodes, Bucket, BucketConfig).
253
254failover_membase_bucket(Nodes, Bucket, BucketConfig, Map) when Map =:= [] ->
255    %% this is possible if bucket just got created and ns_janitor didn't get a
256    %% chance to create a map yet; or alternatively, if it failed to do so
257    %% because, for example, one of the nodes was down
258    failover_membase_bucket_with_no_map(Nodes, Bucket, BucketConfig);
259failover_membase_bucket(Nodes, Bucket, BucketConfig, Map) ->
260    failover_membase_bucket_with_map(Nodes, Bucket, BucketConfig, Map).
261
262failover_membase_bucket_with_no_map(Nodes, Bucket, BucketConfig) ->
263    ?log_debug("Skipping failover of bucket ~p because it has no vbuckets. "
264               "Config:~n~p", [Bucket, BucketConfig]),
265
266    %% we still need to make sure to remove ourselves from the bucket server
267    %% list
268    remove_nodes_from_server_list(Nodes, Bucket, BucketConfig),
269    ok.
270
271failover_membase_bucket_with_map(Nodes, Bucket, BucketConfig, Map) ->
272    %% Promote replicas of vbuckets on this node
273    NewMap = mb_map:promote_replicas(Map, Nodes),
274    true = (NewMap =/= undefined),
275
276    case [I || {I, [undefined|_]} <- misc:enumerate(NewMap, 0)] of
277        [] -> ok; % Phew!
278        MissingVBuckets ->
279            ?rebalance_error("Lost data in ~p for ~w", [Bucket, MissingVBuckets]),
280            ?user_log(?DATA_LOST,
281                      "Data has been lost for ~B% of vbuckets in bucket ~p.",
282                      [length(MissingVBuckets) * 100 div length(Map), Bucket])
283    end,
284
285    ns_bucket:set_fast_forward_map(Bucket, undefined),
286    ns_bucket:set_map(Bucket, NewMap),
287    remove_nodes_from_server_list(Nodes, Bucket, BucketConfig),
288    try ns_janitor:cleanup(Bucket, []) of
289        ok ->
290            ok;
291        {error, _, BadNodes} ->
292            ?rebalance_error("Skipped vbucket activations and "
293                             "replication topology changes because not "
294                             "all remaining nodes were found to have "
295                             "healthy bucket ~p: ~p", [Bucket, BadNodes]),
296            janitor_failed
297    catch
298        E:R ->
299            ?rebalance_error("Janitor cleanup of ~p failed after failover of ~p: ~p",
300                             [Bucket, Nodes, {E, R}]),
301            janitor_failed
302    end.
303
304remove_nodes_from_server_list(Nodes, Bucket, BucketConfig) ->
305    Servers = proplists:get_value(servers, BucketConfig),
306    ns_bucket:set_servers(Bucket, Servers -- Nodes).
307
308generate_vbucket_map_options(KeepNodes, BucketConfig) ->
309    Config = ns_config:get(),
310    generate_vbucket_map_options(KeepNodes, BucketConfig, Config).
311
312generate_vbucket_map_options(KeepNodes, BucketConfig, Config) ->
313    Tags = case ns_config:search(Config, server_groups) of
314               false ->
315                   undefined;
316               {value, ServerGroups} ->
317                   case [G || G <- ServerGroups,
318                              proplists:get_value(nodes, G) =/= []] of
319                       [_] ->
320                           %% note that we don't need to handle this case
321                           %% specially; but unfortunately removing it would
322                           %% make 2.5 nodes always believe that rebalance is
323                           %% required in case there's only one server group
324                           undefined;
325                       _ ->
326                           Tags0 = [case proplists:get_value(uuid, G) of
327                                        T ->
328                                            [{N, T} || N <- proplists:get_value(nodes, G),
329                                                       lists:member(N, KeepNodes)]
330                                    end || G <- ServerGroups],
331
332                           TagsRV = lists:append(Tags0),
333
334                           case KeepNodes -- [N || {N, _T} <- TagsRV] of
335                               [] -> ok;
336                               _ ->
337                                   %% there's tiny race between start of rebalance and
338                                   %% somebody changing server_groups. We largely ignore it,
339                                   %% but in case where it can clearly cause problem we raise
340                                   %% exception
341                                   erlang:error(server_groups_race_detected)
342                           end,
343
344                           TagsRV
345                   end
346           end,
347
348    Opts0 = ns_bucket:config_to_map_options(BucketConfig),
349
350    %% Note that we don't need to have replication_topology here (in fact as
351    %% of today it's still returned by ns_bucket:config_to_map_options/1), but
352    %% these options are used to compute map_opts_hash which in turn is used
353    %% to decide if rebalance is needed. So if we remove this, old nodes will
354    %% wrongly believe that rebalance is needed even when the cluster is
355    %% balanced. See MB-15543 for details.
356    misc:update_proplist(Opts0, [{replication_topology, star},
357                                 {tags, Tags}]).
358
359generate_vbucket_map(CurrentMap, KeepNodes, BucketConfig) ->
360    Opts = generate_vbucket_map_options(KeepNodes, BucketConfig),
361
362    Map0 =
363        case lists:keyfind(deltaRecoveryMap, 1, BucketConfig) of
364            {deltaRecoveryMap, DRMapAndOpts} when DRMapAndOpts =/= undefined ->
365                {DRMap, DROpts} = DRMapAndOpts,
366
367                case mb_map:is_trivially_compatible_past_map(KeepNodes, CurrentMap,
368                                                             Opts, DRMap, DROpts) of
369                    true ->
370                        DRMap;
371                    false ->
372                        undefined
373                end;
374            _ ->
375                undefined
376        end,
377
378    Map = case Map0 of
379              undefined ->
380                  EffectiveOpts = [{maps_history, ns_bucket:past_vbucket_maps()} | Opts],
381                  mb_map:generate_map(CurrentMap, KeepNodes, EffectiveOpts);
382              _ ->
383                  Map0
384          end,
385
386    {Map, Opts}.
387
388generate_initial_map(BucketConfig) ->
389    Chain = lists:duplicate(proplists:get_value(num_replicas, BucketConfig) + 1,
390                            undefined),
391    Map1 = lists:duplicate(proplists:get_value(num_vbuckets, BucketConfig),
392                           Chain),
393    Servers = proplists:get_value(servers, BucketConfig),
394    generate_vbucket_map(Map1, Servers, BucketConfig).
395
396local_buckets_shutdown_loop(Ref, CanWait) ->
397    ExcessiveBuckets = ns_memcached:active_buckets() -- ns_bucket:node_bucket_names(node()),
398    case ExcessiveBuckets of
399        [] ->
400            ok;
401        _ ->
402            case CanWait of
403                false ->
404                    exit({old_buckets_shutdown_wait_failed, ExcessiveBuckets});
405                true ->
406                    ?log_debug("Waiting until the following old bucket instances are gone: ~p", [ExcessiveBuckets]),
407                    receive
408                        {Ref, timeout} ->
409                            local_buckets_shutdown_loop(Ref, false);
410                        {Ref, _Msg} ->
411                            local_buckets_shutdown_loop(Ref, true)
412                    end
413            end
414    end.
415
416%% note: this is rpc:multicall-ed
417wait_local_buckets_shutdown_complete() ->
418    ExcessiveBuckets =
419        ns_memcached:active_buckets() -- ns_bucket:node_bucket_names(node()),
420    do_wait_local_buckets_shutdown_complete(ExcessiveBuckets).
421
422do_wait_local_buckets_shutdown_complete([]) ->
423    ok;
424do_wait_local_buckets_shutdown_complete(ExcessiveBuckets) ->
425    Timeout = ?BUCKETS_SHUTDOWN_WAIT_TIMEOUT * length(ExcessiveBuckets),
426    misc:executing_on_new_process(
427      fun () ->
428              Ref = erlang:make_ref(),
429              Parent = self(),
430              Subscription = ns_pubsub:subscribe_link(buckets_events,
431                                                      fun ({stopped, _, _, _} = StoppedMsg) ->
432                                                              Parent ! {Ref, StoppedMsg};
433                                                          (_) ->
434                                                              ok
435                                                      end),
436              erlang:send_after(Timeout, Parent, {Ref, timeout}),
437              try
438                  local_buckets_shutdown_loop(Ref, true)
439              after
440                  (catch ns_pubsub:unsubscribe(Subscription))
441              end
442      end).
443
444do_wait_buckets_shutdown(KeepNodes) ->
445    {Good, ReallyBad, FailedNodes} =
446        misc:rpc_multicall_with_plist_result(
447          KeepNodes, ns_rebalancer, wait_local_buckets_shutdown_complete, []),
448    NonOk = [Pair || {_Node, Result} = Pair <- Good,
449                     Result =/= ok],
450    Failures = ReallyBad ++ NonOk ++ [{N, node_was_down} || N <- FailedNodes],
451    case Failures of
452        [] ->
453            ok;
454        _ ->
455            ?rebalance_error("Failed to wait deletion of some buckets on some nodes: ~p~n", [Failures]),
456            exit({buckets_shutdown_wait_failed, Failures})
457    end.
458
459sanitize(Config) ->
460    misc:rewrite_key_value_tuple(sasl_password, "*****", Config).
461
462pull_and_push_config(Nodes) ->
463    case ns_config_rep:pull_remotes(Nodes) of
464        ok ->
465            ok;
466        Error ->
467            exit({config_sync_failed, Error})
468    end,
469
470    %% And after we have that, make sure recovery, rebalance and
471    %% graceful failover, all start with latest config reliably
472    case ns_config_rep:ensure_config_seen_by_nodes(Nodes) of
473        ok ->
474            cool;
475        {error, SyncFailedNodes} ->
476            exit({config_sync_failed, SyncFailedNodes})
477    end.
478
479start_link_rebalance(KeepNodes, EjectNodes,
480                     FailedNodes, DeltaNodes, DeltaRecoveryBucketNames) ->
481    proc_lib:start_link(
482      erlang, apply,
483      [fun () ->
484               ok = check_no_tap_buckets(),
485
486               KVKeep = ns_cluster_membership:service_nodes(KeepNodes, kv),
487               case KVKeep =:= [] of
488                   true ->
489                       proc_lib:init_ack({error, no_kv_nodes_left}),
490                       exit(normal);
491                   false ->
492                       ok
493               end,
494
495               KVDeltaNodes = ns_cluster_membership:service_nodes(DeltaNodes,
496                                                                  kv),
497               BucketConfigs = ns_bucket:get_buckets(),
498               case build_delta_recovery_buckets(KVKeep, KVDeltaNodes,
499                                                 BucketConfigs, DeltaRecoveryBucketNames) of
500                   {ok, DeltaRecoveryBucketTuples} ->
501                       proc_lib:init_ack({ok, self()}),
502
503                       master_activity_events:note_rebalance_start(
504                         self(), KeepNodes, EjectNodes, FailedNodes, DeltaNodes),
505
506                       rebalance(KeepNodes, EjectNodes, FailedNodes,
507                                 BucketConfigs,
508                                 DeltaNodes, DeltaRecoveryBucketTuples);
509                   {error, not_possible} ->
510                       proc_lib:init_ack({error, delta_recovery_not_possible})
511               end
512       end, []]).
513
514move_vbuckets(Bucket, Moves) ->
515    {ok, Config} = ns_bucket:get_bucket(Bucket),
516    Map = proplists:get_value(map, Config),
517    TMap = lists:foldl(fun ({VBucket, TargetChain}, Map0) ->
518                               setelement(VBucket+1, Map0, TargetChain)
519                       end, list_to_tuple(Map), Moves),
520    NewMap = tuple_to_list(TMap),
521    ProgressFun = make_progress_fun(0, 1),
522    run_mover(Bucket, Config,
523              proplists:get_value(servers, Config),
524              ProgressFun, Map, NewMap).
525
526rebalance_services(KeepNodes, EjectNodes) ->
527    Config = ns_config:get(),
528
529    AllServices = ns_cluster_membership:cluster_supported_services() -- [kv],
530    TopologyAwareServices = ns_cluster_membership:topology_aware_services(),
531    SimpleServices = AllServices -- TopologyAwareServices,
532
533    SimpleTSs = rebalance_simple_services(Config, SimpleServices, KeepNodes),
534    TopologyAwareTSs = rebalance_topology_aware_services(Config, TopologyAwareServices,
535                                                         KeepNodes, EjectNodes),
536
537    maybe_delay_eject_nodes(SimpleTSs ++ TopologyAwareTSs, EjectNodes).
538
539rebalance_simple_services(Config, Services, KeepNodes) ->
540    case cluster_compat_mode:is_cluster_41(Config) of
541        true ->
542            lists:filtermap(
543              fun (Service) ->
544                      ServiceNodes = ns_cluster_membership:service_nodes(KeepNodes, Service),
545                      Updated = update_service_map_with_config(Config, Service, ServiceNodes),
546
547                      case Updated of
548                          false ->
549                              false;
550                          true ->
551                              {true, {Service, os:timestamp()}}
552                      end
553              end, Services);
554        false ->
555            []
556    end.
557
558update_service_map_with_config(Config, Service, ServiceNodes0) ->
559    CurrentNodes0 = ns_cluster_membership:get_service_map(Config, Service),
560    update_service_map(Service, CurrentNodes0, ServiceNodes0).
561
562update_service_map(Service, CurrentNodes0, ServiceNodes0) ->
563    CurrentNodes = lists:sort(CurrentNodes0),
564    ServiceNodes = lists:sort(ServiceNodes0),
565
566    case CurrentNodes =:= ServiceNodes of
567        true ->
568            false;
569        false ->
570            ?rebalance_info("Updating service map for ~p:~n~p",
571                            [Service, ServiceNodes]),
572            ok = ns_cluster_membership:set_service_map(Service, ServiceNodes),
573            true
574    end.
575
576rebalance_topology_aware_services(Config, Services, KeepNodesAll, EjectNodesAll) ->
577    %% TODO: support this one day
578    DeltaNodesAll = [],
579
580    lists:filtermap(
581      fun (Service) ->
582              KeepNodes = ns_cluster_membership:service_nodes(Config, KeepNodesAll, Service),
583              DeltaNodes = ns_cluster_membership:service_nodes(Config, DeltaNodesAll, Service),
584
585              %% if a node being ejected is not active, then it means that it
586              %% was never rebalanced in in the first place; so we can
587              %% postpone the heat death of the universe a little bit by
588              %% ignoring such nodes
589              ActiveNodes = ns_cluster_membership:get_service_map(Config, Service),
590              EjectNodes = [N || N <- EjectNodesAll,
591                                 lists:member(N, ActiveNodes)],
592
593              AllNodes = EjectNodes ++ KeepNodes,
594
595              case AllNodes of
596                  [] ->
597                      false;
598                  _ ->
599                      update_service_map_with_config(Config, Service, AllNodes),
600                      ok = rebalance_topology_aware_service(Service, KeepNodes,
601                                                            EjectNodes, DeltaNodes),
602                      update_service_map(Service, AllNodes, KeepNodes),
603                      {true, {Service, os:timestamp()}}
604              end
605      end, Services).
606
607rebalance_topology_aware_service(Service, KeepNodes, EjectNodes, DeltaNodes) ->
608    ProgressCallback =
609        fun (Progress) ->
610                ns_orchestrator:update_progress(Service, Progress)
611        end,
612
613    misc:with_trap_exit(
614      fun () ->
615              {Pid, MRef} = service_rebalancer:spawn_monitor_rebalance(
616                              Service, KeepNodes,
617                              EjectNodes, DeltaNodes, ProgressCallback),
618
619              receive
620                  {'EXIT', _Pid, Reason} = Exit ->
621                      ?log_debug("Got an exit signal while waiting "
622                                 "for the service rebalance to complete. "
623                                 "Service: ~p. Exit message: ~p",
624                                 [Service, Exit]),
625
626                      misc:terminate_and_wait(Pid, Reason),
627                      exit(Reason);
628                  {'DOWN', MRef, _, _, Reason} ->
629                      case Reason of
630                          normal ->
631                              ok;
632                          _ ->
633                              exit({service_rebalance_failed, Service, Reason})
634                      end
635              end
636      end).
637
638get_service_eject_delay(Service) ->
639    Default =
640        case Service of
641            n1ql ->
642                20000;
643            fts ->
644                10000;
645            _ ->
646                0
647        end,
648
649    ?get_param({eject_delay, Service}, Default).
650
651maybe_delay_eject_nodes(Timestamps, EjectNodes) ->
652    case cluster_compat_mode:is_cluster_41() of
653        true ->
654            do_maybe_delay_eject_nodes(Timestamps, EjectNodes);
655        false ->
656            ok
657    end.
658
659do_maybe_delay_eject_nodes(_Timestamps, []) ->
660    ok;
661do_maybe_delay_eject_nodes(Timestamps, EjectNodes) ->
662    EjectedServices =
663        ordsets:union([ordsets:from_list(ns_cluster_membership:node_services(N))
664                       || N <- EjectNodes]),
665    Now = os:timestamp(),
666
667    Delays = [begin
668                  ServiceDelay = get_service_eject_delay(Service),
669
670                  case proplists:get_value(Service, Timestamps) of
671                      undefined ->
672                          %% it's possible that a node is ejected without ever
673                          %% getting rebalanced in; there's no point in
674                          %% delaying anything in such case
675                          0;
676                      RebalanceTS ->
677                          SinceRebalance = max(0, timer:now_diff(Now, RebalanceTS) div 1000),
678                          ServiceDelay - SinceRebalance
679                  end
680              end || Service <- EjectedServices],
681
682    Delay = lists:max(Delays),
683
684    case Delay > 0 of
685        true ->
686            ?log_info("Waiting ~pms before ejecting nodes:~n~p",
687                      [Delay, EjectNodes]),
688            timer:sleep(Delay);
689        false ->
690            ok
691    end.
692
693rebalance(KeepNodes, EjectNodesAll, FailedNodesAll,
694          BucketConfigs,
695          DeltaNodes, DeltaRecoveryBuckets) ->
696    ok = leader_activities:run_activity(
697           rebalance, majority,
698           ?cut(rebalance_body(KeepNodes, EjectNodesAll,
699                               FailedNodesAll, BucketConfigs,
700                               DeltaNodes, DeltaRecoveryBuckets))).
701
702rebalance_body(KeepNodes,
703               EjectNodesAll,
704               FailedNodesAll,
705               BucketConfigs, DeltaNodes, DeltaRecoveryBuckets) ->
706    KVDeltaNodes = ns_cluster_membership:service_nodes(DeltaNodes, kv),
707
708    ok = drop_old_2i_indexes(KeepNodes),
709    ok = apply_delta_recovery_buckets(DeltaRecoveryBuckets,
710                                      KVDeltaNodes, BucketConfigs),
711    ok = maybe_clear_full_recovery_type(KeepNodes),
712    ok = service_janitor:cleanup(),
713
714    ok = leader_activities:activate_quorum_nodes(KeepNodes),
715    ns_cluster_membership:activate(KeepNodes),
716
717    pull_and_push_config(EjectNodesAll ++ KeepNodes),
718
719    %% Eject failed nodes first so they don't cause trouble
720    FailedNodes = FailedNodesAll -- [node()],
721    eject_nodes(FailedNodes),
722
723    rebalance_kv(KeepNodes, EjectNodesAll, BucketConfigs, DeltaRecoveryBuckets),
724    rebalance_services(KeepNodes, EjectNodesAll),
725
726    ok = leader_activities:deactivate_quorum_nodes(EjectNodesAll),
727
728    %% don't eject ourselves at all here; this will be handled by
729    %% ns_orchestrator
730    EjectNowNodes = EjectNodesAll -- [node()],
731    eject_nodes(EjectNowNodes),
732
733    ok.
734
735make_progress_fun(BucketCompletion, NumBuckets) ->
736    fun (P) ->
737            Progress = dict:map(fun (_, N) ->
738                                        N / NumBuckets + BucketCompletion
739                                end, P),
740            update_kv_progress(Progress)
741    end.
742
743update_kv_progress(Progress) ->
744    ns_orchestrator:update_progress(kv, Progress).
745
746update_kv_progress(Nodes, Progress) ->
747    update_kv_progress(dict:from_list([{N, Progress} || N <- Nodes])).
748
749rebalance_kv(KeepNodes, EjectNodes, BucketConfigs, DeltaRecoveryBuckets) ->
750    %% wait when all bucket shutdowns are done on nodes we're
751    %% adding (or maybe adding)
752    do_wait_buckets_shutdown(KeepNodes),
753
754    NumBuckets = length(BucketConfigs),
755    ?rebalance_debug("BucketConfigs = ~p", [sanitize(BucketConfigs)]),
756
757    KeepKVNodes = ns_cluster_membership:service_nodes(KeepNodes, kv),
758    LiveKVNodes = ns_cluster_membership:service_nodes(KeepNodes ++ EjectNodes, kv),
759
760    case maybe_cleanup_old_buckets(KeepNodes) of
761        ok ->
762            ok;
763        Error ->
764            exit(Error)
765    end,
766
767    {ok, RebalanceObserver} = ns_rebalance_observer:start_link(length(BucketConfigs)),
768
769    lists:foreach(fun ({I, {BucketName, BucketConfig}}) ->
770                          BucketCompletion = I / NumBuckets,
771                          update_kv_progress(LiveKVNodes, BucketCompletion),
772
773                          ProgressFun = make_progress_fun(BucketCompletion, NumBuckets),
774                          rebalance_bucket(BucketName, BucketConfig, ProgressFun,
775                                           KeepKVNodes, EjectNodes, DeltaRecoveryBuckets)
776                  end, misc:enumerate(BucketConfigs, 0)),
777
778    update_kv_progress(LiveKVNodes, 1.0),
779    misc:unlink_terminate_and_wait(RebalanceObserver, shutdown).
780
781rebalance_bucket(BucketName, BucketConfig, ProgressFun,
782                 KeepKVNodes, EjectNodes, DeltaRecoveryBuckets) ->
783    ale:info(?USER_LOGGER, "Started rebalancing bucket ~s", [BucketName]),
784    ?rebalance_info("Rebalancing bucket ~p with config ~p",
785                    [BucketName, sanitize(BucketConfig)]),
786    case proplists:get_value(type, BucketConfig) of
787        memcached ->
788            rebalance_memcached_bucket(BucketName, KeepKVNodes);
789        membase ->
790            rebalance_membase_bucket(BucketName, BucketConfig, ProgressFun,
791                                     KeepKVNodes, EjectNodes, DeltaRecoveryBuckets)
792    end.
793
794rebalance_memcached_bucket(BucketName, KeepKVNodes) ->
795    master_activity_events:note_bucket_rebalance_started(BucketName),
796    ns_bucket:set_servers(BucketName, KeepKVNodes),
797    master_activity_events:note_bucket_rebalance_ended(BucketName).
798
799rebalance_membase_bucket(BucketName, BucketConfig, ProgressFun,
800                         KeepKVNodes, EjectNodes, DeltaRecoveryBuckets) ->
801    %% Only start one bucket at a time to avoid
802    %% overloading things
803    ThisEjected = ordsets:intersection(lists:sort(proplists:get_value(servers, BucketConfig, [])),
804                                       lists:sort(EjectNodes)),
805    ThisLiveNodes = KeepKVNodes ++ ThisEjected,
806    ns_bucket:set_servers(BucketName, ThisLiveNodes),
807    ?rebalance_info("Waiting for bucket ~p to be ready on ~p", [BucketName, ThisLiveNodes]),
808    {ok, _States, Zombies} = janitor_agent:query_states(BucketName, ThisLiveNodes, ?REBALANCER_READINESS_WAIT_TIMEOUT),
809    case Zombies of
810        [] ->
811            ?rebalance_info("Bucket is ready on all nodes"),
812            ok;
813        _ ->
814            exit({not_all_nodes_are_ready_yet, Zombies})
815    end,
816
817    run_janitor_pre_rebalance(BucketName),
818
819    {ok, NewConf} =
820        ns_bucket:get_bucket(BucketName),
821    master_activity_events:note_bucket_rebalance_started(BucketName),
822    {NewMap, MapOptions} =
823        do_rebalance_membase_bucket(BucketName, NewConf,
824                                    KeepKVNodes, ProgressFun, DeltaRecoveryBuckets),
825    ns_bucket:set_map_opts(BucketName, MapOptions),
826    ns_bucket:update_bucket_props(BucketName,
827                                  [{deltaRecoveryMap, undefined}]),
828    master_activity_events:note_bucket_rebalance_ended(BucketName),
829    run_verify_replication(BucketName, KeepKVNodes, NewMap).
830
831run_janitor_pre_rebalance(BucketName) ->
832    case ns_janitor:cleanup(BucketName,
833                            [{query_states_timeout, ?REBALANCER_QUERY_STATES_TIMEOUT},
834                             {apply_config_timeout, ?REBALANCER_APPLY_CONFIG_TIMEOUT}]) of
835        ok ->
836            ok;
837        {error, _, BadNodes} ->
838            exit({pre_rebalance_janitor_run_failed, BadNodes})
839    end.
840
841%% @doc Rebalance the cluster. Operates on a single bucket. Will
842%% either return ok or exit with reason 'stopped' or whatever reason
843%% was given by whatever failed.
844do_rebalance_membase_bucket(Bucket, Config,
845                            KeepNodes, ProgressFun, DeltaRecoveryBuckets) ->
846    Map = proplists:get_value(map, Config),
847    {FastForwardMap, MapOptions} =
848        case lists:keyfind(Bucket, 1, DeltaRecoveryBuckets) of
849            false ->
850                generate_vbucket_map(Map, KeepNodes, Config);
851            {_, _, V} ->
852                V
853        end,
854
855    ns_bucket:update_vbucket_map_history(FastForwardMap, MapOptions),
856    ?rebalance_debug("Target map options: ~p (hash: ~p)", [MapOptions, erlang:phash2(MapOptions)]),
857    {run_mover(Bucket, Config, KeepNodes, ProgressFun, Map, FastForwardMap),
858     MapOptions}.
859
860run_mover(Bucket, Config, KeepNodes, ProgressFun, Map, FastForwardMap) ->
861    ?rebalance_info("Target map (distance: ~p):~n~p", [(catch mb_map:vbucket_movements(Map, FastForwardMap)), FastForwardMap]),
862    ns_bucket:set_fast_forward_map(Bucket, FastForwardMap),
863    misc:with_trap_exit(
864      fun () ->
865              {ok, Pid} = ns_vbucket_mover:start_link(Bucket, Map,
866                                                      FastForwardMap,
867                                                      ProgressFun),
868              wait_for_mover(Pid)
869      end),
870
871    HadRebalanceOut = ((proplists:get_value(servers, Config, []) -- KeepNodes) =/= []),
872    case HadRebalanceOut of
873        true ->
874            SecondsToWait = ns_config:read_key_fast(rebalance_out_delay_seconds, 10),
875            ?rebalance_info("Waiting ~w seconds before completing rebalance out."
876                            " So that clients receive graceful not my vbucket instead of silent closed connection", [SecondsToWait]),
877            timer:sleep(SecondsToWait * 1000);
878        false ->
879            ok
880    end,
881    ns_bucket:set_fast_forward_map(Bucket, undefined),
882    ns_bucket:set_servers(Bucket, KeepNodes),
883    FastForwardMap.
884
885unbalanced(Map, BucketConfig) ->
886    Servers = proplists:get_value(servers, BucketConfig, []),
887    NumServers = length(Servers),
888
889    R = lists:any(
890          fun (Chain) ->
891                  lists:member(
892                    undefined,
893                    %% Don't warn about missing replicas when you have
894                    %% fewer servers than your copy count!
895                    lists:sublist(Chain, NumServers))
896          end, Map),
897
898    R orelse do_unbalanced(Map, Servers).
899
900do_unbalanced(Map, Servers) ->
901    {Masters, Replicas} =
902        lists:foldl(
903          fun ([M | R], {AccM, AccR}) ->
904                  {[M | AccM], R ++ AccR}
905          end, {[], []}, Map),
906    Masters1 = lists:sort([M || M <- Masters, lists:member(M, Servers)]),
907    Replicas1 = lists:sort([R || R <- Replicas, lists:member(R, Servers)]),
908
909    MastersCounts = misc:uniqc(Masters1),
910    ReplicasCounts = misc:uniqc(Replicas1),
911
912    NumServers = length(Servers),
913
914    lists:any(
915      fun (Counts0) ->
916              Counts1 = [C || {_, C} <- Counts0],
917              Len = length(Counts1),
918              Counts = case Len < NumServers of
919                           true ->
920                               lists:duplicate(NumServers - Len, 0) ++ Counts1;
921                           false ->
922                               true = Len =:= NumServers,
923                               Counts1
924                       end,
925              Counts =/= [] andalso lists:max(Counts) - lists:min(Counts) > 1
926      end, [MastersCounts, ReplicasCounts]).
927
928map_options_changed(BucketConfig) ->
929    Config = ns_config:get(),
930
931    Servers = proplists:get_value(servers, BucketConfig, []),
932
933    Opts = generate_vbucket_map_options(Servers, BucketConfig, Config),
934    OptsHash = proplists:get_value(map_opts_hash, BucketConfig),
935    case OptsHash of
936        undefined ->
937            true;
938        _ ->
939            erlang:phash2(Opts) =/= OptsHash
940    end.
941
942%%
943%% Internal functions
944%%
945
946%% @private
947
948
949%% @doc Eject a list of nodes from the cluster, making sure this node is last.
950eject_nodes(Nodes) ->
951    %% Leave myself last
952    LeaveNodes = case lists:member(node(), Nodes) of
953                     true ->
954                         (Nodes -- [node()]) ++ [node()];
955                     false ->
956                         Nodes
957                 end,
958    lists:foreach(fun (N) ->
959                          ns_cluster_membership:deactivate([N]),
960                          ns_cluster:leave(N)
961                  end, LeaveNodes).
962
963run_verify_replication(Bucket, Nodes, Map) ->
964    Pid = proc_lib:spawn_link(?MODULE, verify_replication, [Bucket, Nodes, Map]),
965    ?log_debug("Spawned verify_replication worker: ~p", [Pid]),
966    {trap_exit, false} = erlang:process_info(self(), trap_exit),
967    misc:wait_for_process(Pid, infinity).
968
969verify_replication(Bucket, Nodes, Map) ->
970    ExpectedReplicators0 = ns_bucket:map_to_replicas(Map),
971    ExpectedReplicators = lists:sort(ExpectedReplicators0),
972
973    {ActualReplicators, BadNodes} = janitor_agent:get_src_dst_vbucket_replications(Bucket, Nodes),
974    case BadNodes of
975        [] -> ok;
976        _ ->
977            ale:error(?USER_LOGGER, "Rebalance is done, but failed to verify replications on following nodes:~p", [BadNodes]),
978            exit(bad_replicas_due_to_bad_results)
979    end,
980
981    case misc:comm(ExpectedReplicators, ActualReplicators) of
982        {[], [], _} ->
983            ok;
984        {Missing, Extra, _} ->
985            ?user_log(?BAD_REPLICATORS,
986                      "Bad replicators after rebalance:~nMissing = ~p~nExtras = ~p",
987                      [Missing, Extra]),
988            exit(bad_replicas)
989    end.
990
991wait_for_mover(Pid) ->
992    receive
993        {'EXIT', Pid, Reason} ->
994            case Reason of
995                normal ->
996                    ok;
997                _ ->
998                    exit({mover_crashed, Reason})
999            end;
1000        {'EXIT', _Pid, {shutdown, stop} = Stop} ->
1001            ?log_debug("Got rebalance stop request"),
1002            TimeoutPid = diag_handler:arm_timeout(
1003                           5000,
1004                           fun (_) ->
1005                                   ?log_debug("Observing slow rebalance stop (mover pid: ~p)", [Pid]),
1006                                   timeout_diag_logger:log_diagnostics(slow_rebalance_stop)
1007                           end),
1008            try
1009                terminate_mover(Pid, Stop)
1010            after
1011                diag_handler:disarm_timeout(TimeoutPid)
1012            end;
1013        {'EXIT', _Pid, Reason} ->
1014            exit(Reason)
1015    end.
1016
1017terminate_mover(Pid, StopReason) ->
1018    ?log_debug("Terminating mover ~p with reason ~p", [Pid, StopReason]),
1019    exit(Pid, StopReason),
1020
1021    receive
1022        {'EXIT', Pid, MoverReason} ->
1023            ?log_debug("Mover ~p terminated with reason ~p",
1024                       [Pid, MoverReason]),
1025            %% No matter what the mover's termination reason was, we terminate
1026            %% with the reason that was asked of us. This is to deal with the
1027            %% cases when the mover just happens to terminate at around the
1028            %% time we request its termination.
1029            exit(StopReason);
1030        {'EXIT', _OtherPid, OtherReason} = Exit ->
1031            ?log_debug("Received an exit ~p while waiting for "
1032                       "mover ~p to terminate.", [Exit, Pid]),
1033            exit(OtherReason)
1034    end.
1035
1036maybe_cleanup_old_buckets(KeepNodes) ->
1037    case misc:rpc_multicall_with_plist_result(KeepNodes, ns_storage_conf, delete_unused_buckets_db_files, []) of
1038        {_, _, DownNodes} when DownNodes =/= [] ->
1039            ?rebalance_error("Failed to cleanup old buckets on some nodes: ~p",
1040                             [DownNodes]),
1041            {buckets_cleanup_failed, DownNodes};
1042        {Good, ReallyBad, []} ->
1043            ReallyBadNodes =
1044                case ReallyBad of
1045                    [] ->
1046                        [];
1047                    _ ->
1048                        ?rebalance_error(
1049                           "Failed to cleanup old buckets on some nodes: ~n~p",
1050                           [ReallyBad]),
1051                        lists:map(fun ({Node, _}) -> Node end, ReallyBad)
1052                end,
1053
1054            FailedNodes =
1055                lists:foldl(
1056                  fun ({Node, Result}, Acc) ->
1057                          case Result of
1058                              ok ->
1059                                  Acc;
1060                              Error ->
1061                                  ?rebalance_error(
1062                                     "Failed to cleanup old buckets on node ~p: ~p",
1063                                     [Node, Error]),
1064                                  [Node | Acc]
1065                          end
1066                  end, [], Good),
1067
1068            case FailedNodes ++ ReallyBadNodes of
1069                [] ->
1070                    ok;
1071                AllFailedNodes ->
1072                    {buckets_cleanup_failed, AllFailedNodes}
1073            end
1074    end.
1075
1076node_vbuckets(Map, Node) ->
1077    [V || {V, Chain} <- misc:enumerate(Map, 0),
1078          lists:member(Node, Chain)].
1079
1080find_delta_recovery_map(Config, AllNodes, DeltaNodes, Bucket, BucketConfig) ->
1081    {map, CurrentMap} = lists:keyfind(map, 1, BucketConfig),
1082    CurrentOptions = generate_vbucket_map_options(AllNodes, BucketConfig),
1083
1084    History = ns_bucket:past_vbucket_maps(Config),
1085    MatchingMaps = mb_map:find_matching_past_maps(AllNodes, CurrentMap,
1086                                                  CurrentOptions, History),
1087
1088    find_delta_recovery_map_loop(MatchingMaps,
1089                                 Config, Bucket, CurrentOptions, DeltaNodes).
1090
1091find_delta_recovery_map_loop([], _Config, _Bucket, _Options, _DeltaNodes) ->
1092    false;
1093find_delta_recovery_map_loop([TargetMap | Rest], Config, Bucket, Options, DeltaNodes) ->
1094    {_, TargetVBucketsDict} =
1095        lists:foldl(
1096          fun (Chain, {V, D}) ->
1097                  D1 = lists:foldl(
1098                         fun (Node, Acc) ->
1099                                 case lists:member(Node, Chain) of
1100                                     true ->
1101                                         dict:update(Node,
1102                                                     fun (Vs) ->
1103                                                             [V | Vs]
1104                                                     end, Acc);
1105                                     false ->
1106                                         Acc
1107                                 end
1108                         end, D, DeltaNodes),
1109
1110                  {V+1, D1}
1111          end,
1112          {0, dict:from_list([{N, []} || N <- DeltaNodes])}, TargetMap),
1113
1114    Usable =
1115        lists:all(
1116          fun (Node) ->
1117                  AllFailoverVBuckets = get_failover_vbuckets(Config, Node),
1118                  FailoverVBuckets = proplists:get_value(Bucket, AllFailoverVBuckets),
1119                  TargetVBuckets = lists:reverse(dict:fetch(Node, TargetVBucketsDict)),
1120
1121                  TargetVBuckets =:= FailoverVBuckets
1122          end, DeltaNodes),
1123
1124    case Usable of
1125        true ->
1126            {TargetMap, Options};
1127        false ->
1128            find_delta_recovery_map_loop(Rest, Config, Bucket, Options, DeltaNodes)
1129    end.
1130
1131membase_delta_recovery_buckets(DeltaRecoveryBuckets, MembaseBucketConfigs) ->
1132    MembaseBuckets = [Bucket || {Bucket, _} <- MembaseBucketConfigs],
1133
1134    case DeltaRecoveryBuckets of
1135        all ->
1136            MembaseBuckets;
1137        _ when is_list(DeltaRecoveryBuckets) ->
1138            ordsets:to_list(ordsets:intersection(ordsets:from_list(MembaseBuckets),
1139                                                 ordsets:from_list(DeltaRecoveryBuckets)))
1140    end.
1141
1142build_delta_recovery_buckets(_AllNodes, [] = _DeltaNodes, _AllBucketConfigs, _DeltaRecoveryBuckets) ->
1143    {ok, []};
1144build_delta_recovery_buckets(AllNodes, DeltaNodes, AllBucketConfigs, DeltaRecoveryBuckets0) ->
1145    Config = ns_config:get(),
1146
1147    MembaseBuckets = [P || {_, BucketConfig} = P <- AllBucketConfigs,
1148                           proplists:get_value(type, BucketConfig) =:= membase],
1149    DeltaRecoveryBuckets = membase_delta_recovery_buckets(DeltaRecoveryBuckets0, MembaseBuckets),
1150
1151    %% such non-lazy computation of recovery map is suboptimal, but
1152    %% it's not that big deal suboptimal. I'm doing it for better
1153    %% testability of build_delta_recovery_buckets_loop
1154    MappedConfigs = [{Bucket,
1155                      BucketConfig,
1156                      find_delta_recovery_map(Config, AllNodes, DeltaNodes,
1157                                              Bucket, BucketConfig)}
1158                     || {Bucket, BucketConfig} <- MembaseBuckets],
1159
1160    case build_delta_recovery_buckets_loop(MappedConfigs, DeltaRecoveryBuckets, []) of
1161        {ok, Recovered0} ->
1162            RV = [{Bucket,
1163                   build_transitional_bucket_config(BucketConfig, Map, Opts, DeltaNodes),
1164                   {Map, Opts}}
1165                  || {Bucket, BucketConfig, {Map, Opts}} <- Recovered0],
1166            {ok, RV};
1167        Error ->
1168            Error
1169    end.
1170
1171build_delta_recovery_buckets_loop([] = _MappedConfigs, _DeltaRecoveryBuckets, Acc) ->
1172    {ok, Acc};
1173build_delta_recovery_buckets_loop(MappedConfigs, DeltaRecoveryBuckets, Acc) ->
1174    [{Bucket, BucketConfig, RecoverResult0} | RestMapped] = MappedConfigs,
1175
1176    NeedBucket = lists:member(Bucket, DeltaRecoveryBuckets),
1177    RecoverResult = case NeedBucket of
1178                        true ->
1179                            RecoverResult0;
1180                        false ->
1181                            false
1182                    end,
1183    case RecoverResult of
1184        {Map, Opts} ->
1185            ?rebalance_debug("Found delta recovery map for bucket ~s: ~p",
1186                             [Bucket, {Map, Opts}]),
1187
1188            NewAcc = [{Bucket, BucketConfig, {Map, Opts}} | Acc],
1189            build_delta_recovery_buckets_loop(RestMapped, DeltaRecoveryBuckets, NewAcc);
1190        false ->
1191            case NeedBucket of
1192                true ->
1193                    ?rebalance_debug("Couldn't delta recover bucket ~s when we care about delta recovery of that bucket", [Bucket]),
1194                    %% run rest of elements for logging
1195                    _ = build_delta_recovery_buckets_loop(RestMapped, DeltaRecoveryBuckets, []),
1196                    {error, not_possible};
1197                false ->
1198                    build_delta_recovery_buckets_loop(RestMapped, DeltaRecoveryBuckets, Acc)
1199            end
1200    end.
1201
1202membase_delta_recovery_buckets_test() ->
1203    MembaseBuckets = [{"b1", conf}, {"b3", conf}],
1204    ["b1", "b3"] = membase_delta_recovery_buckets(["b1", "b2", "b3", "b4"], MembaseBuckets),
1205    ["b1", "b3"] = membase_delta_recovery_buckets(all, MembaseBuckets).
1206
1207build_delta_recovery_buckets_loop_test() ->
1208    MappedConfigs = [{"b1", conf1, {map, opts}},
1209                     {"b2", conf2, false}],
1210    All = membase_delta_recovery_buckets(all, [{"b1", conf}, {"b2", conf}]),
1211
1212    {ok, []} = build_delta_recovery_buckets_loop([], All, []),
1213    {error, not_possible} = build_delta_recovery_buckets_loop(MappedConfigs, All, []),
1214    {error, not_possible} = build_delta_recovery_buckets_loop(MappedConfigs, ["b2"], []),
1215    {error, not_possible} = build_delta_recovery_buckets_loop(MappedConfigs, ["b1", "b2"], []),
1216    {ok, []} = build_delta_recovery_buckets_loop(MappedConfigs, [], []),
1217    ?assertEqual({ok, [{"b1", conf1, {map, opts}}]},
1218                 build_delta_recovery_buckets_loop(MappedConfigs, ["b1"], [])),
1219    ?assertEqual({ok, [{"b1", conf1, {map, opts}}]},
1220                 build_delta_recovery_buckets_loop([hd(MappedConfigs)], All, [])).
1221
1222apply_delta_recovery_buckets([], _DeltaNodes, _CurrentBuckets) ->
1223    ok;
1224apply_delta_recovery_buckets(DeltaRecoveryBuckets, DeltaNodes, CurrentBuckets) ->
1225    NewBuckets = misc:update_proplist(
1226                   CurrentBuckets,
1227                   [{Bucket, BucketConfig} ||
1228                       {Bucket, BucketConfig, _} <- DeltaRecoveryBuckets]),
1229    NodeChanges = [[{{node, N, recovery_type}, none},
1230                    {{node, N, failover_vbuckets}, []},
1231                    {{node, N, membership}, active}] || N <- DeltaNodes],
1232    BucketChanges = {buckets, [{configs, NewBuckets}]},
1233
1234    Changes = lists:flatten([BucketChanges, NodeChanges]),
1235    ok = ns_config:set(Changes),
1236
1237    case ns_config_rep:ensure_config_seen_by_nodes(DeltaNodes) of
1238        ok ->
1239            cool;
1240        {error, SyncFailedNodes} ->
1241            exit({delta_recovery_config_synchronization_failed, SyncFailedNodes})
1242    end,
1243
1244    lists:foreach(
1245      fun ({Bucket, _, _}) ->
1246              ok = wait_for_bucket(Bucket, DeltaNodes)
1247      end, DeltaRecoveryBuckets),
1248
1249    ok.
1250
1251maybe_clear_full_recovery_type(Nodes) ->
1252    Cfg = ns_config:latest(),
1253    NodeChanges = [[{{node, N, recovery_type}, none},
1254                    {{node, N, failover_vbuckets}, []}]
1255                   || N <- Nodes,
1256                      ns_cluster_membership:get_recovery_type(Cfg, N) =:= full],
1257    ok = ns_config:set(lists:flatten(NodeChanges)).
1258
1259wait_for_bucket(Bucket, Nodes) ->
1260    ?log_debug("Waiting until bucket ~p gets ready on nodes ~p", [Bucket, Nodes]),
1261    do_wait_for_bucket(Bucket, Nodes).
1262
1263do_wait_for_bucket(Bucket, Nodes) ->
1264    case janitor_agent:query_states_details(Bucket, Nodes, 60000) of
1265        {ok, _States, []} ->
1266            ?log_debug("Bucket ~p became ready on nodes ~p", [Bucket, Nodes]),
1267            ok;
1268        {ok, _States, Failures} ->
1269            case check_failures(Failures) of
1270                keep_waiting ->
1271                    Zombies = [N || {N, _} <- Failures],
1272                    ?log_debug("Bucket ~p still not ready on nodes ~p",
1273                               [Bucket, Zombies]),
1274                    do_wait_for_bucket(Bucket, Zombies);
1275                fail ->
1276                    ?log_error("Bucket ~p not available on nodes ~p",
1277                               [Bucket, Failures]),
1278                    fail
1279            end
1280    end.
1281
1282check_failures(Failures) ->
1283    case [F || {_Node, Reason} = F <- Failures, Reason =/= warming_up] of
1284        [] ->
1285            keep_waiting;
1286        _ ->
1287            fail
1288    end.
1289
1290build_transitional_bucket_config(BucketConfig, TargetMap, Options, DeltaNodes) ->
1291    {num_replicas, NumReplicas} = lists:keyfind(num_replicas, 1, BucketConfig),
1292    {map, CurrentMap} = lists:keyfind(map, 1, BucketConfig),
1293    {servers, Servers} = lists:keyfind(servers, 1, BucketConfig),
1294    TransitionalMap =
1295        lists:map(
1296          fun ({CurrentChain, TargetChain}) ->
1297                  case CurrentChain of
1298                      [undefined | _] ->
1299                          CurrentChain;
1300                      _ ->
1301                          ChainDeltaNodes = [N || N <- TargetChain,
1302                                                  lists:member(N, DeltaNodes)],
1303                          PreservedNodes = lists:takewhile(
1304                                             fun (N) ->
1305                                                     N =/= undefined andalso
1306                                                         not lists:member(N, DeltaNodes)
1307                                             end, CurrentChain),
1308
1309                          TransitionalChain0 = PreservedNodes ++ ChainDeltaNodes,
1310                          N = length(TransitionalChain0),
1311                          true = N =< NumReplicas + 1,
1312
1313                          TransitionalChain0 ++
1314                              lists:duplicate(NumReplicas - N + 1, undefined)
1315                  end
1316          end, lists:zip(CurrentMap, TargetMap)),
1317
1318    NewServers = DeltaNodes ++ Servers,
1319
1320    misc:update_proplist(BucketConfig, [{map, TransitionalMap},
1321                                        {servers, NewServers},
1322                                        {deltaRecoveryMap, {TargetMap, Options}}]).
1323
1324get_delta_recovery_nodes(Config, Nodes) ->
1325    [N || N <- Nodes,
1326          ns_cluster_membership:get_cluster_membership(N, Config) =:= inactiveAdded
1327              andalso ns_cluster_membership:get_recovery_type(Config, N) =:= delta].
1328
1329start_link_graceful_failover(Node) ->
1330    proc_lib:start_link(erlang, apply, [fun run_graceful_failover/1, [Node]]).
1331
1332run_graceful_failover(Node) ->
1333    ok = check_no_tap_buckets(),
1334    pull_and_push_config(ns_node_disco:nodes_wanted()),
1335
1336    %% No graceful failovers for non KV node
1337    case lists:member(kv, ns_cluster_membership:node_services(Node)) of
1338        true ->
1339            ok;
1340        false ->
1341            erlang:exit(non_kv_node)
1342    end,
1343    case check_failover_possible([Node]) of
1344        ok ->
1345            ok;
1346        Error ->
1347            erlang:exit(Error)
1348    end,
1349
1350    AllBucketConfigs = ns_bucket:get_buckets(),
1351    InterestingBuckets = [BC || BC = {_, Conf} <- AllBucketConfigs,
1352                                proplists:get_value(type, Conf) =:= membase,
1353                                %% when bucket doesn't have a vbucket map,
1354                                %% there's not much to do with respect to
1355                                %% graceful failover; so we skip these;
1356                                %%
1357                                %% note, that failover will still operate on
1358                                %% these buckets and, if needed, will remove
1359                                %% the node from server list
1360                                proplists:get_value(map, Conf, []) =/= []],
1361    NumBuckets = length(InterestingBuckets),
1362
1363    case check_graceful_failover_possible(Node, InterestingBuckets) of
1364        true -> ok;
1365        false ->
1366            erlang:exit(not_graceful)
1367    end,
1368    proc_lib:init_ack({ok, self()}),
1369
1370    ok = leader_activities:run_activity(
1371           graceful_failover, majority,
1372           fun () ->
1373                   ale:info(?USER_LOGGER,
1374                            "Starting vbucket moves for "
1375                            "graceful failover of ~p", [Node]),
1376
1377                   lists:foldl(
1378                     fun ({BucketName, BucketConfig}, I) ->
1379                             do_run_graceful_failover_moves(Node,
1380                                                            BucketName,
1381                                                            BucketConfig,
1382                                                            I / NumBuckets,
1383                                                            NumBuckets),
1384                             I+1
1385                     end, 0, InterestingBuckets),
1386                   orchestrate_failover([Node]),
1387
1388                   ok
1389           end).
1390
1391do_run_graceful_failover_moves(Node, BucketName, BucketConfig, I, N) ->
1392    run_janitor_pre_rebalance(BucketName),
1393
1394    Map = proplists:get_value(map, BucketConfig, []),
1395    Map1 = mb_map:promote_replicas_for_graceful_failover(Map, Node),
1396
1397    ProgressFun = make_progress_fun(I, N),
1398    run_mover(BucketName, BucketConfig,
1399              proplists:get_value(servers, BucketConfig),
1400              ProgressFun, Map, Map1).
1401
1402check_graceful_failover_possible(Node, BucketsAll) ->
1403    Services = ns_cluster_membership:node_services(Node),
1404    case lists:member(kv, Services) of
1405        true ->
1406            case check_graceful_failover_possible_rec(Node, BucketsAll) of
1407                false -> false;
1408                _ -> true
1409            end;
1410        false ->
1411            false
1412    end.
1413
1414check_graceful_failover_possible_rec(_Node, []) ->
1415    [];
1416check_graceful_failover_possible_rec(Node, [{BucketName, BucketConfig} | RestBucketConfigs]) ->
1417    Map = proplists:get_value(map, BucketConfig, []),
1418    Servers = proplists:get_value(servers, BucketConfig, []),
1419    case lists:member(Node, Servers) of
1420        true ->
1421            Map1 = mb_map:promote_replicas_for_graceful_failover(Map, Node),
1422            case lists:any(fun (Chain) -> hd(Chain) =:= Node end, Map1) of
1423                true ->
1424                    false;
1425                false ->
1426                    case check_graceful_failover_possible_rec(Node, RestBucketConfigs) of
1427                        false -> false;
1428                        RecRV -> [BucketName | RecRV]
1429                    end
1430            end;
1431        false ->
1432            check_graceful_failover_possible_rec(Node, RestBucketConfigs)
1433    end.
1434
1435check_failover_possible(Nodes) ->
1436    ActiveNodes = lists:sort(ns_cluster_membership:active_nodes()),
1437    FailoverNodes = lists:sort(Nodes),
1438    case ActiveNodes of
1439        FailoverNodes ->
1440            last_node;
1441        _ ->
1442            case lists:subtract(FailoverNodes, ActiveNodes) of
1443                [] ->
1444                    case ns_cluster_membership:service_nodes(ActiveNodes, kv) of
1445                        FailoverNodes ->
1446                            last_node;
1447                        _ ->
1448                            ok
1449                    end;
1450                _ ->
1451                    unknown_node
1452            end
1453    end.
1454
1455drop_old_2i_indexes(KeepNodes) ->
1456    Config = ns_config:get(),
1457    NewNodes = KeepNodes -- ns_cluster_membership:active_nodes(Config),
1458    %% Only delta recovery is supported for index service.
1459    %% Note that if a node is running both KV and index service,
1460    %% and if user selects the full recovery option for such
1461    %% a node, then recovery_type will be set to full.
1462    %% But, we will treat delta and full recovery the same for
1463    %% the index data.
1464    %% Also, delta recovery for index service is different
1465    %% from that for the KV service. In case of index, it just
1466    %% means that we will not drop the indexes and their meta data.
1467    CleanupNodes = [N || N <- NewNodes,
1468                         ns_cluster_membership:get_recovery_type(Config, N) =:= none],
1469    ?rebalance_info("Going to drop possible old 2i indexes on nodes ~p",
1470                    [CleanupNodes]),
1471    {Oks, RPCErrors, Downs} = misc:rpc_multicall_with_plist_result(
1472                                CleanupNodes,
1473                                ns_storage_conf, delete_old_2i_indexes, []),
1474    RecoveryNodes = NewNodes -- CleanupNodes,
1475    ?rebalance_info("Going to keep possible 2i indexes on nodes ~p",
1476                    [RecoveryNodes]),
1477    %% Clear recovery type for non-KV nodes here.
1478    %% recovery_type for nodes running KV services gets cleared later.
1479    NonKV = [N || N <- RecoveryNodes,
1480                  not lists:member(kv, ns_cluster_membership:node_services(Config, N))],
1481    NodeChanges = [[{{node, N, recovery_type}, none},
1482                    {{node, N, membership}, active}] || N <- NonKV],
1483    ok = ns_config:set(lists:flatten(NodeChanges)),
1484    Errors = [{N, RV}
1485              || {N, RV} <- Oks,
1486                 RV =/= ok]
1487        ++ RPCErrors
1488        ++ [{N, node_down} || N <- Downs],
1489    case Errors of
1490        [] ->
1491            ?rebalance_debug("Cleanup succeeded: ~p", [Oks]),
1492            ok;
1493        _ ->
1494            ?rebalance_error("Failed to cleanup indexes: ~p", [Errors]),
1495            {old_indexes_cleanup_failed, Errors}
1496    end.
1497
1498check_no_tap_buckets() ->
1499    case cluster_compat_mode:have_non_dcp_buckets() of
1500        false ->
1501            ok;
1502        {true, BadBuckets} ->
1503            ale:error(?USER_LOGGER,
1504                      "Cannot rebalance/failover with non-dcp buckets. "
1505                      "Non-dcp buckets: ~p", [BadBuckets]),
1506            {error, {found_non_dcp_buckets, BadBuckets}}
1507    end.
1508