1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2017-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16
17%% @doc implementation of cluster topology related REST API's
18
19-module(menelaus_web_cluster).
20
21-include_lib("eunit/include/eunit.hrl").
22-include("cut.hrl").
23-include("ns_common.hrl").
24-include("menelaus_web.hrl").
25
26-export([handle_engage_cluster2/1,
27         handle_complete_join/1,
28         handle_join/1,
29         serve_node_services/1,
30         serve_node_services_streaming/1,
31         handle_setup_services_post/1,
32         handle_rebalance_progress/2,
33         handle_eject_post/1,
34         handle_add_node/1,
35         handle_add_node_to_group/2,
36         handle_failover/1,
37         handle_start_graceful_failover/1,
38         handle_rebalance/1,
39         handle_re_add_node/1,
40         handle_re_failover/1,
41         handle_stop_rebalance/1,
42         handle_set_recovery_type/1]).
43
44-import(menelaus_util,
45        [reply_json/2,
46         reply_json/3,
47         reply/2,
48         reply_text/3,
49         reply_ok/3,
50         parse_validate_port_number/1,
51         handle_streaming/2]).
52
53handle_engage_cluster2(Req) ->
54    Body = Req:recv_body(),
55    {struct, NodeKVList} = mochijson2:decode(Body),
56    %% a bit kludgy, but 100% correct way to protect ourselves when
57    %% everything will restart.
58    process_flag(trap_exit, true),
59    case ns_cluster:engage_cluster(NodeKVList) of
60        {ok, _} ->
61            %% NOTE: for 2.1+ cluster compat we may need
62            %% something fancier. For now 2.0 is compatible only with
63            %% itself and 1.8.x thus no extra work is needed.
64            %%
65            %% The idea is that engage_cluster/complete_join sequence
66            %% is our cluster version compat negotiation. First
67            %% cluster sends joinee node it's info in engage_cluster
68            %% payload. Node then checks if it can work in that compat
69            %% mode (node itself being single node cluster works in
70            %% max compat mode it supports, which is perhaps higher
71            %% then cluster's). If node supports this mode, it needs
72            %% to send back engage_cluster reply with
73            %% clusterCompatibility of cluster compat mode. Otherwise
74            %% cluster would refuse this node as it runs in higher
75            %% compat mode. That could be much much higher future
76            %% compat mode. So only joinee knows if it can work in
77            %% backwards compatible mode or not. Thus sending back of
78            %% 'corrected' clusterCompatibility in engage_cluster
79            %% response is our only option.
80            %%
81            %% NOTE: we don't need to actually switch to lower compat
82            %% mode during engage_cluster. Because complete_join will
83            %% cause full restart of joinee node, which will cause it
84            %% to start back in cluster's compat mode.
85            %%
86            %% For now we just look if 1.8.x is asking us to join it
87            %% and if it is, then we reply with clusterCompatibility
88            %% of 1 which is the only thing they'll support
89            %%
90            %% NOTE: I was thinking about simply sending back
91            %% clusterCompatibility of cluster, but that would break
92            %% 10.x (i.e. much future version) check of backwards
93            %% compatibility with us. I.e. because 2.0 is not checking
94            %% cluster's compatibility (there's no need), lying about
95            %% our cluster compat mode would not allow cluster to
96            %% check we're compatible with it.
97            %%
98            %% 127.0.0.1 below is a bit subtle. See MB-8404. In
99            %% CBSE-385 we saw how mis-configured node that was forced
100            %% into 127.0.0.1 address was successfully added to
101            %% cluster. And my thinking is "rename node in
102            %% node-details output if node is 127.0.0.1" behavior
103            %% that's needed for correct client's operation is to
104            %% blame here. But given engage cluster is strictly
105            %% intra-cluster thing we can return 127.0.0.1 back as
106            %% 127.0.0.1 and thus join attempt in CBSE-385 would be
107            %% prevented at completeJoin step which would be sent to
108            %% 127.0.0.1 (joiner) and bounced.
109            {struct, Result} = menelaus_web_node:build_full_node_info(node(), misc:localhost()),
110            {_, _} = CompatTuple = lists:keyfind(<<"clusterCompatibility">>, 1, NodeKVList),
111            ThreeXCompat = cluster_compat_mode:effective_cluster_compat_version_for(
112                             cluster_compat_mode:supported_compat_version()),
113            ResultWithCompat =
114                case CompatTuple of
115                    {_, V} when V < ThreeXCompat ->
116                        ?log_info("Lowering our advertised clusterCompatibility in order to enable joining older cluster"),
117                        Result3 = lists:keyreplace(<<"clusterCompatibility">>, 1, Result, CompatTuple),
118                        lists:keyreplace(clusterCompatibility, 1, Result3, CompatTuple);
119                    _ ->
120                        Result
121                end,
122            reply_json(Req, {struct, ResultWithCompat});
123        {error, _What, Message, _Nested} ->
124            reply_json(Req, [Message], 400)
125    end,
126    exit(normal).
127
128handle_complete_join(Req) ->
129    {struct, NodeKVList} = mochijson2:decode(Req:recv_body()),
130    erlang:process_flag(trap_exit, true),
131    case ns_cluster:complete_join(NodeKVList) of
132        {ok, _} ->
133            reply_json(Req, [], 200);
134        {error, _What, Message, _Nested} ->
135            reply_json(Req, [Message], 400)
136    end,
137    exit(normal).
138
139handle_join(Req) ->
140    %% paths:
141    %%  cluster secured, admin logged in:
142    %%           after creds work and node join happens,
143    %%           200 returned with Location header pointing
144    %%           to new /pool/default
145    %%  cluster not secured, after node join happens,
146    %%           a 200 returned with Location header to new /pool/default,
147    %%           401 if request had
148    %%  cluster either secured or not:
149    %%           a 400 with json error message when join fails for whatever reason
150    %%
151    %% parameter example: clusterMemberHostIp=192%2E168%2E0%2E1&
152    %%                    clusterMemberPort=8091&
153    %%                    user=admin&password=admin123
154    %%
155    case ns_config_auth:is_system_provisioned() of
156        true ->
157            Msg = <<"Node is already provisioned. To join use controller/addNode api of the cluster">>,
158            reply_json(Req, [Msg], 400);
159        false ->
160            handle_join_clean_node(Req)
161    end.
162
163parse_validate_services_list(ServicesList) ->
164    KnownServices = ns_cluster_membership:supported_services(),
165    ServicePairs = [{erlang:atom_to_list(S), S} || S <- KnownServices],
166    ServiceStrings = string:tokens(ServicesList, ","),
167    FoundServices = [{SN, lists:keyfind(SN, 1, ServicePairs)} || SN <- ServiceStrings],
168    UnknownServices = [SN || {SN, false} <- FoundServices],
169    case UnknownServices of
170        [_|_] ->
171            Msg = io_lib:format("Unknown services: ~p", [UnknownServices]),
172            {error, iolist_to_binary(Msg)};
173        [] ->
174            RV = lists:usort([S || {_, {_, S}} <- FoundServices]),
175            case RV of
176                [] ->
177                    {error, <<"At least one service has to be selected">>};
178                _ ->
179                    {ok, RV}
180            end
181    end.
182
183parse_validate_services_list_test() ->
184    {error, _} = parse_validate_services_list(""),
185    ?assertEqual({ok, [index, kv, n1ql]}, parse_validate_services_list("n1ql,kv,index")),
186    {ok, [kv]} = parse_validate_services_list("kv"),
187    {error, _} = parse_validate_services_list("n1ql,kv,s"),
188    ?assertMatch({error, _}, parse_validate_services_list("neeql,kv")).
189
190parse_join_cluster_params(Params, ThisIsJoin) ->
191    Version = proplists:get_value("version", Params, "3.0"),
192
193    OldVersion = (Version =:= "3.0"),
194
195    Hostname = case proplists:get_value("hostname", Params) of
196                   undefined ->
197                       if
198                           ThisIsJoin andalso OldVersion ->
199                               %%  this is for backward compatibility
200                               ClusterMemberPort = proplists:get_value("clusterMemberPort", Params),
201                               ClusterMemberHostIp = proplists:get_value("clusterMemberHostIp", Params),
202                               case lists:member(undefined,
203                                                 [ClusterMemberPort, ClusterMemberHostIp]) of
204                                   true ->
205                                       "";
206                                   _ ->
207                                       lists:concat([ClusterMemberHostIp, ":", ClusterMemberPort])
208                               end;
209                           true ->
210                               ""
211                       end;
212                   X ->
213                       X
214               end,
215
216    OtherUser = proplists:get_value("user", Params),
217    OtherPswd = proplists:get_value("password", Params),
218
219    Version40 = cluster_compat_mode:compat_mode_string_40(),
220
221    VersionErrors = case Version of
222                        "3.0" ->
223                            [];
224                        %% bound above
225                        Version40 ->
226                            KnownParams = ["hostname", "version", "user", "password", "services"],
227                            UnknownParams = [K || {K, _} <- Params,
228                                                  not lists:member(K, KnownParams)],
229                            case UnknownParams of
230                                [_|_] ->
231                                    Msg = io_lib:format("Got unknown parameters: ~p", [UnknownParams]),
232                                    [iolist_to_binary(Msg)];
233                                [] ->
234                                    []
235                            end;
236                        _ ->
237                            [<<"version is not recognized">>]
238                    end,
239
240    Services = case proplists:get_value("services", Params) of
241                   undefined ->
242                       {ok, ns_cluster_membership:default_services()};
243                   SvcParams ->
244                       case parse_validate_services_list(SvcParams) of
245                           {ok, Svcs} ->
246                               {ok, Svcs};
247                           SvcsError ->
248                               SvcsError
249                       end
250               end,
251
252    BasePList = [{user, OtherUser},
253                 {password, OtherPswd}],
254
255    MissingFieldErrors = [iolist_to_binary([atom_to_list(F), <<" is missing">>])
256                          || {F, V} <- BasePList,
257                             V =:= undefined],
258
259    {HostnameError, ParsedHostnameRV} =
260        case (catch parse_hostname(Hostname)) of
261            {error, HMsgs} ->
262                {HMsgs, undefined};
263            {ParsedHost, ParsedPort} when is_list(ParsedHost) ->
264                {[], {ParsedHost, ParsedPort}}
265        end,
266
267    Errors = MissingFieldErrors ++ VersionErrors ++ HostnameError ++
268        case Services of
269            {error, ServicesError} ->
270                [ServicesError];
271            _ ->
272                []
273        end,
274    case Errors of
275        [] ->
276            {ok, ServicesList} = Services,
277            {Host, Port} = ParsedHostnameRV,
278            {ok, [{services, ServicesList},
279                  {host, Host},
280                  {port, Port}
281                  | BasePList]};
282        _ ->
283            {errors, Errors}
284    end.
285
286handle_join_clean_node(Req) ->
287    Params = Req:parse_post(),
288
289    case parse_join_cluster_params(Params, true) of
290        {errors, Errors} ->
291            reply_json(Req, Errors, 400);
292        {ok, Fields} ->
293            OtherHost = proplists:get_value(host, Fields),
294            OtherPort = proplists:get_value(port, Fields),
295            OtherUser = proplists:get_value(user, Fields),
296            OtherPswd = proplists:get_value(password, Fields),
297            Services = proplists:get_value(services, Fields),
298            handle_join_tail(Req, OtherHost, OtherPort, OtherUser, OtherPswd, Services)
299    end.
300
301handle_join_tail(Req, OtherHost, OtherPort, OtherUser, OtherPswd, Services) ->
302    process_flag(trap_exit, true),
303    RV = case ns_cluster:check_host_connectivity(OtherHost) of
304             {ok, MyIP} ->
305                 {struct, MyPList} = menelaus_web_node:build_full_node_info(node(), MyIP),
306                 Hostname = misc:expect_prop_value(hostname, MyPList),
307
308                 BasePayload = [{<<"hostname">>, Hostname},
309                                {<<"user">>, []},
310                                {<<"password">>, []}],
311
312                 {Payload, Endpoint} =
313                     case Services =:= ns_cluster_membership:default_services() of
314                         true ->
315                             {BasePayload, "/controller/addNode"};
316                         false ->
317                             ServicesStr = string:join([erlang:atom_to_list(S) || S <- Services], ","),
318                             SVCPayload = [{"version", cluster_compat_mode:compat_mode_string_40()},
319                                           {"services", ServicesStr}
320                                           | BasePayload],
321                             {SVCPayload, "/controller/addNodeV2"}
322                     end,
323
324
325                 RestRV = menelaus_rest:json_request_hilevel(post,
326                                                             {OtherHost, OtherPort,
327                                                              Endpoint,
328                                                              "application/x-www-form-urlencoded",
329                                                              mochiweb_util:urlencode(Payload)},
330                                                             {OtherUser, OtherPswd}),
331                 case RestRV of
332                     {error, What, _M, {bad_status, 404, Msg}} ->
333                         {error, What, <<"Node attempting to join an older cluster. Some of the selected services are not available.">>, {bad_status, 404, Msg}};
334                     Other ->
335                         Other
336                 end;
337             X -> X
338         end,
339
340    case RV of
341        {ok, _} ->
342            reply(Req, 200);
343        {client_error, JSON} ->
344            reply_json(Req, JSON, 400);
345        {error, _What, Message, _Nested} ->
346            reply_json(Req, [Message], 400)
347    end,
348    exit(normal).
349
350%% waits till only one node is left in cluster
351do_eject_myself_rec(0, _) ->
352    exit(self_eject_failed);
353do_eject_myself_rec(IterationsLeft, Period) ->
354    MySelf = node(),
355    case ns_node_disco:nodes_actual() of
356        [MySelf] -> ok;
357        _ ->
358            timer:sleep(Period),
359            do_eject_myself_rec(IterationsLeft-1, Period)
360    end.
361
362do_eject_myself() ->
363    ns_cluster:leave(),
364    do_eject_myself_rec(10, 250).
365
366handle_eject_post(Req) ->
367    PostArgs = Req:parse_post(),
368    %
369    % either Eject a running node, or eject a node which is down.
370    %
371    % request is a urlencoded form with otpNode
372    %
373    % responses are 200 when complete
374    %               401 if creds were not supplied and are required
375    %               403 if creds were supplied and are incorrect
376    %               400 if the node to be ejected doesn't exist
377    %
378    OtpNodeStr = case proplists:get_value("otpNode", PostArgs) of
379                     undefined -> undefined;
380                     "Self" -> atom_to_list(node());
381                     "zzzzForce" ->
382                         handle_force_self_eject(Req),
383                         exit(normal);
384                     X -> X
385                 end,
386    case OtpNodeStr of
387        undefined ->
388            reply_text(Req, "Bad Request\n", 400);
389        _ ->
390            OtpNode = list_to_atom(OtpNodeStr),
391            case ns_cluster_membership:get_cluster_membership(OtpNode) of
392                active ->
393                    reply_text(Req, "Cannot remove active server.\n", 400);
394                _ ->
395                    do_handle_eject_post(Req, OtpNode)
396            end
397    end.
398
399handle_force_self_eject(Req) ->
400    erlang:process_flag(trap_exit, true),
401    ns_cluster:force_eject_self(),
402    ns_audit:remove_node(Req, node()),
403    reply_text(Req, "done", 200),
404    ok.
405
406do_handle_eject_post(Req, OtpNode) ->
407    case OtpNode =:= node() of
408        true ->
409            do_eject_myself(),
410            ns_audit:remove_node(Req, node()),
411            reply(Req, 200);
412        false ->
413            case lists:member(OtpNode, ns_node_disco:nodes_wanted()) of
414                true ->
415                    ns_cluster:leave(OtpNode),
416                    ?MENELAUS_WEB_LOG(?NODE_EJECTED, "Node ejected: ~p from node: ~p",
417                                      [OtpNode, erlang:node()]),
418                    ns_audit:remove_node(Req, OtpNode),
419                    reply(Req, 200);
420                false ->
421                                                % Node doesn't exist.
422                    ?MENELAUS_WEB_LOG(0018, "Request to eject nonexistant server failed.  Requested node: ~p",
423                                      [OtpNode]),
424                    reply_text(Req, "Server does not exist.\n", 400)
425            end
426    end.
427
428validate_setup_services_post(Req) ->
429    Params = Req:parse_post(),
430    case ns_config_auth:is_system_provisioned() of
431        true ->
432            {error, <<"cannot change node services after cluster is provisioned">>};
433        _ ->
434            ServicesString = proplists:get_value("services", Params, ""),
435            case parse_validate_services_list(ServicesString) of
436                {ok, Svcs} ->
437                    case lists:member(kv, Svcs) of
438                        true ->
439                            case ns_cluster:enforce_topology_limitation(Svcs) of
440                                ok ->
441                                    setup_services_check_quota(Svcs, Params);
442                                Error ->
443                                    Error
444                            end;
445                        false ->
446                            {error, <<"cannot setup first cluster node without kv service">>}
447                    end;
448                {error, Msg} ->
449                    {error, Msg}
450            end
451    end.
452
453setup_services_check_quota(Services, Params) ->
454    Quotas = case proplists:get_value("setDefaultMemQuotas", Params, "false") of
455                 "false" ->
456                     lists:map(
457                       fun(Service) ->
458                               {ok, Quota} = memory_quota:get_quota(Service),
459                               {Service, Quota}
460                       end, memory_quota:aware_services(
461                              cluster_compat_mode:get_compat_version()));
462                 "true" ->
463                     do_update_with_default_quotas(memory_quota:default_quotas(Services))
464             end,
465
466    case Quotas of
467        {error, _Msg} = E ->
468            E;
469        _ ->
470            case memory_quota:check_this_node_quotas(Services, Quotas) of
471                ok ->
472                    {ok, Services};
473                {error, {total_quota_too_high, _, TotalQuota, MaxAllowed}} ->
474                    Msg = io_lib:format("insufficient memory to satisfy memory quota "
475                                        "for the services "
476                                        "(requested quota is ~bMB, "
477                                        "maximum allowed quota for the node is ~bMB)",
478                                        [TotalQuota, MaxAllowed]),
479                    {error, iolist_to_binary(Msg)}
480            end
481    end.
482
483do_update_with_default_quotas(Quotas) ->
484    do_update_with_default_quotas(Quotas, 10).
485
486do_update_with_default_quotas(_, 0) ->
487    {error, <<"Could not update the config with default memory quotas">>};
488do_update_with_default_quotas(Quotas, RetriesLeft) ->
489    case memory_quota:set_quotas(ns_config:get(), Quotas) of
490        ok ->
491            Quotas;
492        retry_needed ->
493            do_update_with_default_quotas(Quotas, RetriesLeft - 1)
494    end.
495
496handle_setup_services_post(Req) ->
497    case validate_setup_services_post(Req) of
498        {error, Error} ->
499            reply_json(Req, [Error], 400);
500        {ok, Services} ->
501            ns_config:set({node, node(), services}, Services),
502            ns_audit:setup_node_services(Req, node(), Services),
503            reply(Req, 200)
504    end.
505
506validate_add_node_params(User, Password) ->
507    Candidates = case lists:member(undefined, [User, Password]) of
508                     true -> [<<"Missing required parameter.">>];
509                     _ -> [case {User, Password} of
510                               {[], []} -> true;
511                               {[_Head | _], [_PasswordHead | _]} -> true;
512                               {[], [_PasswordHead | _]} -> <<"If a username is not specified, a password must not be supplied.">>;
513                               _ -> <<"A password must be supplied.">>
514                           end]
515                 end,
516    lists:filter(fun (E) -> E =/= true end, Candidates).
517
518%% erlang R15B03 has http_uri:parse/2 that does the job
519%% reimplement after support of R14B04 will be dropped
520parse_hostname(Hostname) ->
521    do_parse_hostname(misc:trim(Hostname)).
522
523do_parse_hostname([]) ->
524    throw({error, [<<"Hostname is required.">>]});
525do_parse_hostname(Hostname) ->
526    WithoutScheme = case string:str(Hostname, "://") of
527                        0 ->
528                            Hostname;
529                        X ->
530                            Scheme = string:sub_string(Hostname, 1, X - 1),
531                            case string:to_lower(Scheme) =:= "http" of
532                                false ->
533                                    throw({error, [list_to_binary("Unsupported protocol " ++ Scheme)]});
534                                true ->
535                                    string:sub_string(Hostname, X + 3)
536                            end
537                    end,
538
539    {Host, StringPort} = misc:split_host_port(WithoutScheme, "8091"),
540    {Host, parse_validate_port_number(StringPort)}.
541
542handle_add_node(Req) ->
543    do_handle_add_node(Req, undefined).
544
545handle_add_node_to_group(GroupUUIDString, Req) ->
546    do_handle_add_node(Req, list_to_binary(GroupUUIDString)).
547
548do_handle_add_node(Req, GroupUUID) ->
549    %% parameter example: hostname=epsilon.local, user=Administrator, password=asd!23
550    Params = Req:parse_post(),
551
552    Parsed = case parse_join_cluster_params(Params, false) of
553                 {ok, ParsedKV} ->
554                     case validate_add_node_params(proplists:get_value(user, ParsedKV),
555                                                   proplists:get_value(password, ParsedKV)) of
556                         [] ->
557                             {ok, ParsedKV};
558                         CredErrors ->
559                             {errors, CredErrors}
560                     end;
561                 {errors, ParseErrors} ->
562                     {errors, ParseErrors}
563             end,
564
565    case Parsed of
566        {ok, KV} ->
567            User = proplists:get_value(user, KV),
568            Password = proplists:get_value(password, KV),
569            Hostname = proplists:get_value(host, KV),
570            Port = proplists:get_value(port, KV),
571            Services = proplists:get_value(services, KV),
572            case ns_cluster:add_node_to_group(
573                   Hostname, Port,
574                   {User, Password},
575                   GroupUUID,
576                   Services) of
577                {ok, OtpNode} ->
578                    ns_audit:add_node(Req, Hostname, Port, User, GroupUUID, Services, OtpNode),
579                    reply_json(Req, {struct, [{otpNode, OtpNode}]}, 200);
580                {error, unknown_group, Message, _} ->
581                    reply_json(Req, [Message], 404);
582                {error, _What, Message, _Nested} ->
583                    reply_json(Req, [Message], 400)
584            end;
585        {errors, ErrorList} ->
586            reply_json(Req, ErrorList, 400)
587    end.
588
589validate_node(NodeArg) ->
590    Node = (catch list_to_existing_atom(NodeArg)),
591    case Node of
592        undefined ->
593            {error, "No server specified."};
594        _ when not is_atom(Node) ->
595            {error, "Unknown server given."};
596        _ ->
597            {ok, Node}
598    end.
599
600parse_graceful_failover_args(Req) ->
601    Params = Req:parse_post(),
602    case parse_otp_nodes(Params) of
603        {ok, [Node]} ->
604            {ok, Node};
605        {ok, _Nodes} ->
606            {error, "Multiple servers cannot be specified."};
607        Error ->
608            Error
609    end.
610
611parse_otp_nodes(Params) ->
612    OtpNodes = proplists:lookup_all("otpNode", Params),
613    {Good, Bad} = lists:foldl(
614                    fun ({_Key, Val}, {G, B}) ->
615                            case validate_node(Val) of
616                                {ok, Node} -> {[Node | G], B};
617                                _ -> {G, [Val | B]}
618                            end
619                    end, {[], []}, OtpNodes),
620    case Bad of
621        [] ->
622            case Good of
623                [] ->
624                    {error, "No server specified."};
625                _ ->
626                    %% Remove duplicates.
627                    {ok, lists:usort(Good)}
628            end;
629        _ ->
630            {error, io_lib:format("Unknown server given: ~p", [Bad])}
631    end.
632
633parse_hard_failover_args(Req) ->
634    Params = Req:parse_post(),
635    case parse_otp_nodes(Params) of
636        {ok, Nodes} ->
637            AllowUnsafe = proplists:get_value("allowUnsafe", Params),
638            {ok, Nodes, AllowUnsafe =:= "true"};
639        Error ->
640            Error
641    end.
642
643handle_failover(Req) ->
644    case parse_hard_failover_args(Req) of
645        {ok, Nodes, AllowUnsafe} ->
646            case ns_cluster_membership:failover(Nodes, AllowUnsafe) of
647                ok ->
648                    ns_audit:failover_nodes(Req, Nodes, hard),
649                    reply(Req, 200);
650                rebalance_running ->
651                    reply_text(Req, "Rebalance running.", 503);
652                in_recovery ->
653                    reply_text(Req, "Cluster is in recovery mode.", 503);
654                orchestration_unsafe ->
655                    reply_text(Req,
656                               "Cannot safely perform a failover at the moment",
657                               %% 504 is a stretch here of course, but we do
658                               %% need to convey the information to the client
659                               %% somehow.
660                               504);
661                last_node ->
662                    reply_text(Req, "Last active node cannot be failed over.", 400);
663                unknown_node ->
664                    reply_text(Req, "Unknown server given.", 400);
665                Other ->
666                    reply_text(Req,
667                               io_lib:format("Unexpected server error: ~p", [Other]),
668                               500)
669            end;
670        {error, ErrorMsg} ->
671            reply_text(Req, ErrorMsg, 400)
672    end.
673
674handle_start_graceful_failover(Req) ->
675    case parse_graceful_failover_args(Req) of
676        {ok, Node} ->
677            Msg = case ns_orchestrator:start_graceful_failover(Node) of
678                      ok ->
679                          [];
680                      in_progress ->
681                          {503, "Rebalance running."};
682                      in_recovery ->
683                          {503, "Cluster is in recovery mode."};
684                      not_graceful ->
685                          {400, "Failover cannot be done gracefully (would lose vbuckets)."};
686                      non_kv_node ->
687                          {400, "Failover cannot be done gracefully for a node without data. Use hard failover."};
688                      unknown_node ->
689                          {400, "Unknown server given."};
690                      last_node ->
691                          {400, "Last active node cannot be failed over."};
692                      {config_sync_failed, _} ->
693                          {500, "Failed to synchronize config to other nodes"};
694                      Other ->
695                          {500,
696                           io_lib:format("Unexpected server error: ~p", [Other])}
697                  end,
698            case Msg of
699                [] ->
700                    ns_audit:failover_nodes(Req, [Node], graceful),
701                    reply(Req, 200);
702                {Code, Text} ->
703                    reply_text(Req, Text, Code)
704            end;
705        {error, ErrorMsg} ->
706            reply_text(Req, ErrorMsg, 400)
707    end.
708
709handle_rebalance(Req) ->
710    Params = Req:parse_post(),
711    case string:tokens(proplists:get_value("knownNodes", Params, ""),",") of
712        [] ->
713            reply_json(Req, {struct, [{empty_known_nodes, 1}]}, 400);
714        KnownNodesS ->
715            EjectedNodesS = string:tokens(proplists:get_value("ejectedNodes",
716                                                              Params, ""), ","),
717            UnknownNodes = [S || S <- EjectedNodesS ++ KnownNodesS,
718                                try list_to_existing_atom(S), false
719                                catch error:badarg -> true end],
720            case UnknownNodes of
721                [] ->
722                    DeltaRecoveryBuckets = case proplists:get_value("deltaRecoveryBuckets", Params) of
723                                               undefined -> all;
724                                               RawRecoveryBuckets ->
725                                                   [BucketName || BucketName <- string:tokens(RawRecoveryBuckets, ",")]
726                                           end,
727                    do_handle_rebalance(Req, KnownNodesS, EjectedNodesS, DeltaRecoveryBuckets);
728                _ ->
729                    reply_json(Req, {struct, [{mismatch, 1}]}, 400)
730            end
731    end.
732
733-spec do_handle_rebalance(any(), [string()], [string()], all | [bucket_name()]) -> any().
734do_handle_rebalance(Req, KnownNodesS, EjectedNodesS, DeltaRecoveryBuckets) ->
735    EjectedNodes = [list_to_existing_atom(N) || N <- EjectedNodesS],
736    KnownNodes = [list_to_existing_atom(N) || N <- KnownNodesS],
737    case ns_cluster_membership:start_rebalance(KnownNodes,
738                                               EjectedNodes, DeltaRecoveryBuckets) of
739        already_balanced ->
740            reply(Req, 200);
741        in_progress ->
742            reply(Req, 200);
743        nodes_mismatch ->
744            reply_json(Req, {struct, [{mismatch, 1}]}, 400);
745        delta_recovery_not_possible ->
746            reply_json(Req, {struct, [{deltaRecoveryNotPossible, 1}]}, 400);
747        no_active_nodes_left ->
748            reply_text(Req, "No active nodes left", 400);
749        in_recovery ->
750            reply_text(Req, "Cluster is in recovery mode.", 503);
751        no_kv_nodes_left ->
752            reply_json(Req, {struct, [{noKVNodesLeft, 1}]}, 400);
753        ok ->
754            ns_audit:rebalance_initiated(Req, KnownNodes, EjectedNodes, DeltaRecoveryBuckets),
755            reply(Req, 200)
756    end.
757
758handle_rebalance_progress(_PoolId, Req) ->
759    Status = case ns_cluster_membership:get_rebalance_status() of
760                 {running, PerNode} ->
761                     [{status, <<"running">>}
762                      | [{atom_to_binary(Node, latin1),
763                          {struct, [{progress, Progress}]}} || {Node, Progress} <- PerNode]];
764                 _ ->
765                     case ns_config:search(rebalance_status) of
766                         {value, {none, ErrorMessage}} ->
767                             [{status, <<"none">>},
768                              {errorMessage, iolist_to_binary(ErrorMessage)}];
769                         _ -> [{status, <<"none">>}]
770                     end
771             end,
772    reply_json(Req, {struct, Status}, 200).
773
774handle_stop_rebalance(Req) ->
775    validator:handle(handle_stop_rebalance(Req, _),
776                     Req, form, [validator:boolean(allowUnsafe, _)]).
777
778handle_stop_rebalance(Req, Params) ->
779    AllowUnsafe = proplists:get_value(allowUnsafe, Params, false),
780    case ns_cluster_membership:stop_rebalance(AllowUnsafe) of
781        unsafe ->
782            reply_text(Req,
783                       "Cannot communicate to the orchestrator node. "
784                       "Stopping rebalance is unsafe. "
785                       "This can be overriden by passing allowUnsafe=true "
786                       "in the POST form.",
787                       504);
788        _ ->
789            reply(Req, 200)
790    end.
791
792handle_re_add_node(Req) ->
793    Params = Req:parse_post(),
794    do_handle_set_recovery_type(Req, full, Params).
795
796handle_re_failover(Req) ->
797    Params = Req:parse_post(),
798    NodeString = proplists:get_value("otpNode", Params, "undefined"),
799    case ns_cluster_membership:re_failover(NodeString) of
800        ok ->
801            ns_audit:failover_nodes(Req, [list_to_existing_atom(NodeString)],
802                                    cancel_recovery),
803            reply(Req, 200);
804        not_possible ->
805            reply(Req, 400)
806    end.
807
808serve_node_services(Req) ->
809    reply_ok(Req, "application/json", bucket_info_cache:build_node_services()).
810
811serve_node_services_streaming(Req) ->
812    handle_streaming(
813      fun (_) ->
814              V = bucket_info_cache:build_node_services(),
815              {just_write, {write, V}}
816      end, Req).
817
818decode_recovery_type("delta") ->
819    delta;
820decode_recovery_type("full") ->
821    full;
822decode_recovery_type(_) ->
823    undefined.
824
825handle_set_recovery_type(Req) ->
826    Params = Req:parse_post(),
827    Type = decode_recovery_type(proplists:get_value("recoveryType", Params)),
828    do_handle_set_recovery_type(Req, Type, Params).
829
830do_handle_set_recovery_type(Req, Type, Params) ->
831    NodeStr = proplists:get_value("otpNode", Params),
832
833    Node = try
834               list_to_existing_atom(NodeStr)
835           catch
836               error:badarg ->
837                   undefined
838           end,
839
840    OtpNodeErrorMsg = <<"invalid node name or node can't be used for delta recovery">>,
841
842    NodeSvcs = ns_cluster_membership:node_services(ns_config:latest(), Node),
843    NotKVIndex = not lists:member(kv, NodeSvcs) andalso not lists:member(index, NodeSvcs),
844
845    Errors = lists:flatten(
846               [case Type of
847                    undefined ->
848                        [{recoveryType, <<"recovery type must be either 'delta' or 'full'">>}];
849                    _ ->
850                        []
851                end,
852
853                case Node of
854                    undefined ->
855                        [{otpNode, OtpNodeErrorMsg}];
856                    _ ->
857                        []
858                end,
859
860                case Type =:= delta andalso NotKVIndex of
861                    true ->
862                        [{otpNode, OtpNodeErrorMsg}];
863                    false ->
864                        []
865                end]),
866
867    case Errors of
868        [] ->
869            case ns_cluster_membership:update_recovery_type(Node, Type) of
870                ok ->
871                    ns_audit:enter_node_recovery(Req, Node, Type),
872                    reply_json(Req, [], 200);
873                bad_node ->
874                    reply_json(Req, {struct, [{otpNode, OtpNodeErrorMsg}]}, 400)
875            end;
876        _ ->
877            reply_json(Req, {struct, Errors}, 400)
878    end.
879
880-ifdef(EUNIT).
881
882hostname_parsing_test() ->
883    Urls = [" \t\r\nhttp://host:1025\n\r\t ",
884            "http://host:100",
885            "http://host:100000",
886            "hTTp://host:8000",
887            "ftp://host:600",
888            "http://host",
889            "127.0.0.1:6000",
890            "host:port",
891            "aaa:bb:cc",
892            " \t\r\nhost\n",
893            " "],
894
895    ExpectedResults = [{"host",1025},
896                       {error, [<<"The port number must be greater than 1023 and less than 65536.">>]},
897                       {error, [<<"The port number must be greater than 1023 and less than 65536.">>]},
898                       {"host", 8000},
899                       {error, [<<"Unsupported protocol ftp">>]},
900                       {"host", 8091},
901                       {"127.0.0.1", 6000},
902                       {error, [<<"Port must be a number.">>]},
903                       {error, [<<"The hostname is malformed. If using an IPv6 address, please enclose the address within '[' and ']'">>]},
904                       {"host", 8091},
905                       {error, [<<"Hostname is required.">>]}],
906
907    Results = [(catch parse_hostname(X)) || X <- Urls],
908
909    ?assertEqual(ExpectedResults, Results),
910    ok.
911
912-endif.
913