1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2017-Present Couchbase, Inc.
3%%
4%% Use of this software is governed by the Business Source License included
5%% in the file licenses/BSL-Couchbase.txt.  As of the Change Date specified
6%% in that file, in accordance with the Business Source License, use of this
7%% software will be governed by the Apache License, Version 2.0, included in
8%% the file licenses/APL2.txt.
9%%
10
11%% @doc implementation of cluster topology related REST API's
12
13-module(menelaus_web_cluster).
14
15-include("cut.hrl").
16-include("ns_common.hrl").
17-include("menelaus_web.hrl").
18
19%% Remove by OTP25
20-compile([{nowarn_deprecated_function, [{ http_uri,parse,2 }]}]).
21
22-ifdef(TEST).
23-include_lib("eunit/include/eunit.hrl").
24-endif.
25
26-export([handle_cluster_init/1,
27         handle_engage_cluster2/1,
28         handle_complete_join/1,
29         handle_join/1,
30         serve_node_services/1,
31         serve_node_services_streaming/1,
32         handle_setup_services_post/1,
33         handle_rebalance_progress/2,
34         handle_eject_post/1,
35         handle_add_node/1,
36         handle_add_node_to_group/2,
37         handle_failover/1,
38         handle_start_failover/1,
39         handle_start_graceful_failover/1,
40         handle_rebalance/1,
41         handle_re_add_node/1,
42         handle_re_failover/1,
43         handle_stop_rebalance/1,
44         handle_set_recovery_type/1,
45         get_rebalance_error/0]).
46
47-import(menelaus_util,
48        [reply_json/2,
49         reply_json/3,
50         reply/2,
51         reply_text/3,
52         reply_ok/3,
53         parse_validate_port_number/1,
54         handle_streaming/2]).
55
56handle_cluster_init(Req) ->
57    menelaus_web_rbac:assert_no_users_upgrade(),
58    menelaus_util:survive_web_server_restart(
59        fun () -> handle_cluster_init(Req, 10) end).
60
61handle_cluster_init(_Req, Retries) when Retries =< 0 ->
62    erlang:error(exceeded_retries);
63handle_cluster_init(Req, Retries) ->
64    Config = ns_config:get(),
65    Snapshot = chronicle_compat:get_snapshot(
66                 [ns_bucket:fetch_snapshot(all, _),
67                  ns_cluster_membership:fetch_snapshot(_)],
68                 #{ns_config => Config}),
69
70    validator:handle(
71      fun (Props) ->
72          try
73              ok = menelaus_web_node:node_init(Req, Props),
74              Res = cluster_init(Req, Config, Props),
75              reply_json(Req, Res)
76          catch
77              throw:{error, Code, Msg} ->
78                  menelaus_util:global_error_exception(Code, Msg);
79              throw:retry_needed ->
80                  handle_cluster_init(Req, Retries - 1)
81          end
82      end, Req, form,
83      menelaus_web_node:node_init_validators() ++
84      cluster_init_validators(Config, Snapshot)).
85
86cluster_init(Req, Config, Params) ->
87    %% POST /pools/default
88    menelaus_web_pools:handle_pool_settings_post_body(Req, Config, Params),
89    %% POST /settings/stats
90    menelaus_web_settings:apply_stats_settings(Params),
91    %% POST /node/controller/setupServices
92    case do_setup_services_post(Req, Params) of
93        ok -> ok;
94        {error, ErrorMsg} -> throw({error, 400, ErrorMsg})
95    end,
96    %% setting of n2n encryption
97    case proplists:get_value(nodeEncryption, Params) of
98        undefined -> ok;
99        Encryption ->
100            AFamily = cb_dist:address_family(),
101            CBDistCfg = [{nodeEncryption, Encryption},
102                         {externalListeners, [{AFamily, Encryption}]}],
103            case netconfig_updater:apply_config(CBDistCfg) of
104                ok ->
105                    %% Wait for web servers to restart
106                    ns_config:sync_announcements(),
107                    menelaus_event:sync(chronicle_compat_events:event_manager()),
108                    cluster_compat_mode:is_enterprise() andalso
109                        ns_ssl_services_setup:sync();
110                {error, Msg} ->
111                    throw({error, 400, Msg})
112            end
113    end,
114    %% POST /settings/indexes
115    IndexerParams = [{storageMode, V} || {indexerStorageMode, V} <- Params],
116    menelaus_web_indexes:apply_indexes_settings(Req, IndexerParams),
117    %% POST /settings/web
118    menelaus_web_settings:handle_settings_web_post(Req, Params).
119
120cluster_init_validators(Config, Snapshot) ->
121    menelaus_web_pools:pool_settings_post_validators(Config, Snapshot) ++
122    menelaus_web_settings:settings_stats_validators() ++
123    setup_services_validators() ++
124    menelaus_web_node:node_encryption_validators() ++
125    menelaus_web_settings:settings_web_post_validators() ++
126    [menelaus_web_indexes:validate_storage_mode(indexerStorageMode, _),
127     validator:has_params(_),
128     validator:unsupported(_)].
129
130handle_engage_cluster2(Req) ->
131    Body = mochiweb_request:recv_body(Req),
132    {struct, NodeKVList} = mochijson2:decode(Body),
133    %% a bit kludgy, but 100% correct way to protect ourselves when
134    %% everything will restart.
135    process_flag(trap_exit, true),
136    case ns_cluster:engage_cluster(NodeKVList) of
137        {ok, _} ->
138            %% NOTE: for 2.1+ cluster compat we may need
139            %% something fancier. For now 2.0 is compatible only with
140            %% itself and 1.8.x thus no extra work is needed.
141            %%
142            %% The idea is that engage_cluster/complete_join sequence
143            %% is our cluster version compat negotiation. First
144            %% cluster sends joinee node it's info in engage_cluster
145            %% payload. Node then checks if it can work in that compat
146            %% mode (node itself being single node cluster works in
147            %% max compat mode it supports, which is perhaps higher
148            %% then cluster's). If node supports this mode, it needs
149            %% to send back engage_cluster reply with
150            %% clusterCompatibility of cluster compat mode. Otherwise
151            %% cluster would refuse this node as it runs in higher
152            %% compat mode. That could be much much higher future
153            %% compat mode. So only joinee knows if it can work in
154            %% backwards compatible mode or not. Thus sending back of
155            %% 'corrected' clusterCompatibility in engage_cluster
156            %% response is our only option.
157            %%
158            %% NOTE: we don't need to actually switch to lower compat
159            %% mode during engage_cluster. Because complete_join will
160            %% cause full restart of joinee node, which will cause it
161            %% to start back in cluster's compat mode.
162            %%
163            %% For now we just look if 1.8.x is asking us to join it
164            %% and if it is, then we reply with clusterCompatibility
165            %% of 1 which is the only thing they'll support
166            %%
167            %% NOTE: I was thinking about simply sending back
168            %% clusterCompatibility of cluster, but that would break
169            %% 10.x (i.e. much future version) check of backwards
170            %% compatibility with us. I.e. because 2.0 is not checking
171            %% cluster's compatibility (there's no need), lying about
172            %% our cluster compat mode would not allow cluster to
173            %% check we're compatible with it.
174            %%
175            %% 127.0.0.1 below is a bit subtle. See MB-8404. In
176            %% CBSE-385 we saw how mis-configured node that was forced
177            %% into 127.0.0.1 address was successfully added to
178            %% cluster. And my thinking is "rename node in
179            %% node-details output if node is 127.0.0.1" behavior
180            %% that's needed for correct client's operation is to
181            %% blame here. But given engage cluster is strictly
182            %% intra-cluster thing we can return 127.0.0.1 back as
183            %% 127.0.0.1 and thus join attempt in CBSE-385 would be
184            %% prevented at completeJoin step which would be sent to
185            %% 127.0.0.1 (joiner) and bounced.
186            {struct, Result} = menelaus_web_node:build_full_node_info(node()),
187            {_, _} = CompatTuple = lists:keyfind(<<"clusterCompatibility">>, 1, NodeKVList),
188            ThreeXCompat = cluster_compat_mode:effective_cluster_compat_version_for(
189                             cluster_compat_mode:supported_compat_version()),
190            ResultWithCompat =
191                case CompatTuple of
192                    {_, V} when V < ThreeXCompat ->
193                        ?log_info("Lowering our advertised clusterCompatibility in order to enable joining older cluster"),
194                        Result3 = lists:keyreplace(<<"clusterCompatibility">>, 1, Result, CompatTuple),
195                        lists:keyreplace(clusterCompatibility, 1, Result3, CompatTuple);
196                    _ ->
197                        Result
198                end,
199            reply_json(Req, {struct, ResultWithCompat});
200        {error, _What, Message} ->
201            reply_json(Req, [Message], 400)
202    end,
203    exit(normal).
204
205handle_complete_join(Req) ->
206    {struct, NodeKVList} = mochijson2:decode(mochiweb_request:recv_body(Req)),
207    erlang:process_flag(trap_exit, true),
208    case ns_cluster:complete_join(NodeKVList) of
209        {ok, _} ->
210            reply_json(Req, [], 200);
211        {error, _What, Message} ->
212            reply_json(Req, [Message], 400)
213    end,
214    exit(normal).
215
216handle_join(Req) ->
217    %% paths:
218    %%  cluster secured, admin logged in:
219    %%           after creds work and node join happens,
220    %%           200 returned with Location header pointing
221    %%           to new /pool/default
222    %%  cluster not secured, after node join happens,
223    %%           a 200 returned with Location header to new /pool/default,
224    %%           401 if request had
225    %%  cluster either secured or not:
226    %%           a 400 with json error message when join fails for whatever reason
227    %%
228    %% parameter example: clusterMemberHostIp=192%2E168%2E0%2E1&
229    %%                    clusterMemberPort=8091&
230    %%                    user=admin&password=admin123
231    %%
232    case ns_config_auth:is_system_provisioned() of
233        true ->
234            Msg = <<"Node is already provisioned. To join use controller/addNode api of the cluster">>,
235            reply_json(Req, [Msg], 400);
236        false ->
237            handle_join_clean_node(Req)
238    end.
239
240parse_validate_services_list(ServicesList) ->
241    KnownServices = ns_cluster_membership:supported_services(),
242    ServicePairs = [{erlang:atom_to_list(S), S} || S <- KnownServices],
243    ServiceStrings = string:tokens(ServicesList, ","),
244    FoundServices = [{SN, lists:keyfind(SN, 1, ServicePairs)} || SN <- ServiceStrings],
245    UnknownServices = [SN || {SN, false} <- FoundServices],
246    case UnknownServices of
247        [_|_] ->
248            Msg = io_lib:format("Unknown services: ~p", [UnknownServices]),
249            {error, iolist_to_binary(Msg)};
250        [] ->
251            RV = lists:usort([S || {_, {_, S}} <- FoundServices]),
252            case RV of
253                [] ->
254                    {error, <<"At least one service has to be selected">>};
255                _ ->
256                    {ok, RV}
257            end
258    end.
259
260parse_join_cluster_params(Params, ThisIsJoin) ->
261    Hostname =
262        case proplists:get_value("hostname", Params) of
263            undefined when ThisIsJoin =:= true ->
264                %%  this is for backward compatibility
265                CMemPort = proplists:get_value("clusterMemberPort", Params),
266                CMemHostIp = proplists:get_value("clusterMemberHostIp", Params),
267                case lists:member(undefined, [CMemPort, CMemHostIp]) of
268                    true ->
269                        "";
270                    _ ->
271                        lists:concat([CMemHostIp, ":", CMemPort])
272                end;
273            undefined -> "";
274            X -> X
275        end,
276    OtherUser = proplists:get_value("user", Params),
277    OtherPswd = proplists:get_value("password", Params),
278
279    AddNodeErrors =
280        case ThisIsJoin of
281            false ->
282                KnownParams = ["hostname", "user", "password", "services"],
283                UnknownParams = [K || {K, _} <- Params,
284                                      not lists:member(K, KnownParams)],
285                case UnknownParams of
286                    [_|_] ->
287                        Msg = io_lib:format("Got unknown parameters: ~p",
288                                            [UnknownParams]),
289                        [iolist_to_binary(Msg)];
290                    [] ->
291                        []
292                end;
293            true -> []
294        end,
295
296    Services = case proplists:get_value("services", Params) of
297                   undefined ->
298                       {ok, ns_cluster_membership:default_services()};
299                   SvcParams ->
300                       case parse_validate_services_list(SvcParams) of
301                           {ok, Svcs} ->
302                               {ok, Svcs};
303                           SvcsError ->
304                               SvcsError
305                       end
306               end,
307
308    BasePList = [{user, OtherUser},
309                 {password, OtherPswd}],
310
311    MissingFieldErrors = [iolist_to_binary([atom_to_list(F), <<" is missing">>])
312                          || {F, V} <- BasePList,
313                             V =:= undefined],
314
315    DefaultScheme = case cluster_compat_mode:tls_supported() of
316                        true -> https;
317                        false -> http
318                    end,
319
320    {HostnameError, ParsedHostnameRV} =
321        case (catch parse_hostname(Hostname, DefaultScheme)) of
322            {error, HMsgs} ->
323                {HMsgs, undefined};
324            {ParsedScheme, ParsedHost, ParsedPort} when is_list(ParsedHost) ->
325                {[], {ParsedScheme, ParsedHost, ParsedPort}}
326        end,
327
328    NewHostnameParams = case proplists:get_value("newNodeHostname", Params) of
329                            undefined -> [];
330                            NH ->
331                                case string:trim(NH) of
332                                    "" -> [];
333                                    "127.0.0.1" -> [];
334                                    "::1" -> [];
335                                    _ -> [{new_node_hostname, NH}]
336                                end
337                        end,
338
339    Errors = MissingFieldErrors ++ HostnameError ++ AddNodeErrors ++
340        case Services of
341            {error, ServicesError} ->
342                [ServicesError];
343            _ ->
344                []
345        end,
346    case Errors of
347        [] ->
348            {ok, ServicesList} = Services,
349            {Scheme, Host, Port} = ParsedHostnameRV,
350            {ok, [{services, ServicesList},
351                  {scheme, Scheme},
352                  {host, Host},
353                  {port, Port}
354                  | BasePList ++ NewHostnameParams]};
355        _ ->
356            {errors, Errors}
357    end.
358
359handle_join_clean_node(Req) ->
360    Params = mochiweb_request:parse_post(Req),
361
362    case parse_join_cluster_params(Params, true) of
363        {errors, Errors} ->
364            reply_json(Req, Errors, 400);
365        {ok, Fields} ->
366            OtherScheme = proplists:get_value(scheme, Fields),
367            OtherHost = proplists:get_value(host, Fields),
368            OtherPort = proplists:get_value(port, Fields),
369            OtherUser = proplists:get_value(user, Fields),
370            OtherPswd = proplists:get_value(password, Fields),
371            Services = proplists:get_value(services, Fields),
372            Hostname = proplists:get_value(new_node_hostname, Fields),
373            handle_join_tail(Req, OtherScheme, OtherHost, OtherPort, OtherUser,
374                             OtherPswd, Services, Hostname)
375    end.
376
377handle_join_tail(Req, OtherScheme, OtherHost, OtherPort, OtherUser, OtherPswd,
378                 Services, Hostname) ->
379    process_flag(trap_exit, true),
380    RV = case ns_cluster:check_host_port_connectivity(OtherHost, OtherPort) of
381             {ok, MyIP, AFamily} ->
382                 Host =
383                    case Hostname of
384                        undefined ->
385                            {struct, MyPList} =
386                                menelaus_web_node:build_full_node_info(
387                                  {ip, MyIP}, node()),
388                            HostnamePort =
389                                binary_to_list(misc:expect_prop_value(hostname,
390                                                                      MyPList)),
391                            [H, _] = string:split(HostnamePort, ":", trailing),
392                            H;
393                        H -> H
394                    end,
395                 NodeURL = build_node_url(OtherScheme, Host),
396
397                 AddNode = call_add_node(OtherScheme, OtherHost, OtherPort,
398                                         {OtherUser, OtherPswd}, AFamily,
399                                         _, Services),
400                 case AddNode(NodeURL) of
401                     {client_error, [<<"Unsupported protocol https">>]} ->
402                         %% Happens when adding a 6.5 node to a pre-6.5
403                         %% cluster
404                         ?log_warning("Node ~p:~p doesn't support adding nodes "
405                                      "via https; http will be used instead",
406                                      [OtherHost, OtherPort]),
407                         AddNode(build_node_url(http, Host));
408                     Other ->
409                         Other
410                 end;
411             {error, Reason} ->
412                    M = case ns_error_messages:connection_error_message(
413                               Reason, OtherHost, OtherPort) of
414                            undefined -> io:format("~p", [Reason]);
415                            Msg -> Msg
416                        end,
417                    URL = menelaus_rest:rest_url(OtherHost, OtherPort, "",
418                                                 OtherScheme),
419                    ReasonStr = io_lib:format("Failed to connect to ~s. ~s",
420                                              [URL, M]),
421                    {error, host_connectivity, iolist_to_binary(ReasonStr)}
422         end,
423
424    case RV of
425        {ok, _} ->
426            reply(Req, 200);
427        {client_error, JSON} ->
428            reply_json(Req, JSON, 400);
429        {error, _What, Message} ->
430            reply_json(Req, [Message], 400)
431    end,
432    exit(normal).
433
434build_node_url(Scheme, Host) ->
435    Port = case Scheme of
436               http -> service_ports:get_port(rest_port);
437               https -> service_ports:get_port(ssl_rest_port)
438           end,
439    HostWBrackets = misc:maybe_add_brackets(Host),
440    URL = io_lib:format("~p://~s:~b", [Scheme, HostWBrackets, Port]),
441    lists:flatten(URL).
442
443call_add_node(OtherScheme, OtherHost, OtherPort, Creds, AFamily,
444              ThisNodeURL, Services) ->
445    BasePayload = [{<<"hostname">>, list_to_binary(ThisNodeURL)},
446                   {<<"user">>, []},
447                   {<<"password">>, []}],
448
449    {Payload, Endpoint} =
450        case Services =:= ns_cluster_membership:default_services() of
451            true ->
452                {BasePayload, "/controller/addNode"};
453            false ->
454                ServicesStr =
455                    string:join([erlang:atom_to_list(S) || S <- Services], ","),
456                SVCPayload = [{"services", ServicesStr} | BasePayload],
457                {SVCPayload, "/controller/addNodeV2"}
458        end,
459
460    Options = [{connect_options, [AFamily]}],
461
462    Res = menelaus_rest:json_request_hilevel(
463            post,
464            {OtherScheme, OtherHost, OtherPort, Endpoint,
465             "application/x-www-form-urlencoded",
466             mochiweb_util:urlencode(Payload)},
467            Creds, Options),
468    case Res of
469        {error, rest_error, _M, {bad_status, 404, _Msg}} ->
470            NewMsg = <<"Node attempting to join an older cluster. Some of the "
471                       "selected services are not available.">>,
472            {error, rest_error, NewMsg};
473        {error, rest_error, M, _} ->
474            {error, rest_error, M};
475        Other -> Other
476    end.
477
478%% waits till only one node is left in cluster
479do_eject_myself_rec(0, _) ->
480    exit(self_eject_failed);
481do_eject_myself_rec(IterationsLeft, Period) ->
482    MySelf = node(),
483    case ns_node_disco:nodes_actual() of
484        [MySelf] -> ok;
485        _ ->
486            timer:sleep(Period),
487            do_eject_myself_rec(IterationsLeft-1, Period)
488    end.
489
490do_eject_myself() ->
491    ns_cluster:leave(),
492    do_eject_myself_rec(10, 250).
493
494handle_eject_post(Req) ->
495    PostArgs = mochiweb_request:parse_post(Req),
496    %
497    % either Eject a running node, or eject a node which is down.
498    %
499    % request is a urlencoded form with otpNode
500    %
501    % responses are 200 when complete
502    %               401 if creds were not supplied and are required
503    %               403 if creds were supplied and are incorrect
504    %               400 if the node to be ejected doesn't exist
505    %
506    OtpNodeStr = case proplists:get_value("otpNode", PostArgs) of
507                     undefined -> undefined;
508                     "Self" -> atom_to_list(node());
509                     X -> X
510                 end,
511    case OtpNodeStr of
512        undefined ->
513            reply_text(Req, "Bad Request\n", 400);
514        _ ->
515            OtpNode = list_to_atom(OtpNodeStr),
516            case ns_cluster_membership:get_cluster_membership(OtpNode) of
517                active ->
518                    reply_text(Req, "Cannot remove active server.\n", 400);
519                _ ->
520                    do_handle_eject_post(Req, OtpNode)
521            end
522    end.
523
524do_handle_eject_post(Req, OtpNode) ->
525    %% Verify that the server lists are consistent with cluster membership
526    %% states in all buckets.
527    lists:foreach(
528      fun ({Bucket, BucketConfig}) ->
529              ok = ns_janitor:check_server_list(Bucket, BucketConfig)
530      end, ns_bucket:get_buckets()),
531
532    case OtpNode =:= node() of
533        true ->
534            do_eject_myself(),
535            ns_audit:remove_node(Req, node()),
536            reply(Req, 200);
537        false ->
538            case lists:member(OtpNode, ns_node_disco:nodes_wanted()) of
539                true ->
540                    ns_cluster:leave(OtpNode),
541                    ?MENELAUS_WEB_LOG(?NODE_EJECTED, "Node ejected: ~p from node: ~p",
542                                      [OtpNode, erlang:node()]),
543                    ns_audit:remove_node(Req, OtpNode),
544                    reply(Req, 200);
545                false ->
546                                                % Node doesn't exist.
547                    ?MENELAUS_WEB_LOG(0018, "Request to eject nonexistant server failed.  Requested node: ~p",
548                                      [OtpNode]),
549                    reply_text(Req, "Server does not exist.\n", 400)
550            end
551    end.
552
553setup_services_validators() ->
554    [validator:boolean(setDefaultMemQuotas, _),
555     validator:default(setDefaultMemQuotas, false, _),
556     validator:required(services, _),
557     validator:validate(
558      fun (ServicesString) ->
559          case ns_config_auth:is_system_provisioned() of
560              true ->
561                  {error, <<"cannot change node services after cluster is "
562                            "provisioned">>};
563              false ->
564                  case parse_validate_services_list(ServicesString) of
565                      {ok, Svcs} ->
566                            case lists:member(kv, Svcs) of
567                                true ->
568                                    case ns_cluster:enforce_topology_limitation(
569                                           Svcs) of
570                                        ok -> {value, Svcs};
571                                        Error -> Error
572                                    end;
573                                false ->
574                                    {error, <<"cannot setup first cluster "
575                                              "node without kv service">>}
576                            end;
577                      {error, Msg} -> {error, Msg}
578                  end
579          end
580      end, services, _)].
581
582setup_services_check_quota(Services, SetDefaultMemQuotas) ->
583    Quotas = case SetDefaultMemQuotas of
584                 false ->
585                     lists:map(
586                       fun(Service) ->
587                               {ok, Quota} = memory_quota:get_quota(Service),
588                               {Service, Quota}
589                       end, memory_quota:aware_services());
590                 true ->
591                     do_update_with_default_quotas(memory_quota:default_quotas(Services))
592             end,
593
594    case Quotas of
595        {error, _Msg} = E ->
596            E;
597        _ ->
598            case memory_quota:check_this_node_quotas(Services, Quotas) of
599                ok ->
600                    ok;
601                {error, {total_quota_too_high, _, TotalQuota, MaxAllowed}} ->
602                    Msg = io_lib:format("insufficient memory to satisfy memory quota "
603                                        "for the services "
604                                        "(requested quota is ~bMB, "
605                                        "maximum allowed quota for the node is ~bMB)",
606                                        [TotalQuota, MaxAllowed]),
607                    {error, iolist_to_binary(Msg)}
608            end
609    end.
610
611do_update_with_default_quotas(Quotas) ->
612    do_update_with_default_quotas(Quotas, 10).
613
614do_update_with_default_quotas(_, 0) ->
615    {error, <<"Could not update the config with default memory quotas">>};
616do_update_with_default_quotas(Quotas, RetriesLeft) ->
617    case memory_quota:set_quotas(ns_config:get(), Quotas) of
618        ok ->
619            Quotas;
620        retry_needed ->
621            do_update_with_default_quotas(Quotas, RetriesLeft - 1)
622    end.
623
624handle_setup_services_post(Req) ->
625    validator:handle(
626      fun (Props) ->
627          case do_setup_services_post(Req, Props) of
628              ok -> reply(Req, 200);
629              {error, Error} -> reply_json(Req, [Error], 400)
630          end
631      end, Req, form, setup_services_validators()).
632
633do_setup_services_post(Req, Props) ->
634    Services = proplists:get_value(services, Props),
635    SetDefaultMemQuotas = proplists:get_value(setDefaultMemQuotas, Props),
636    case setup_services_check_quota(Services, SetDefaultMemQuotas) of
637        ok ->
638            ok = chronicle_compat:set({node, node(), services}, Services),
639            ns_audit:setup_node_services(Req, node(), Services),
640            ok;
641        {error, Error} ->
642            {error, Error}
643    end.
644
645validate_add_node_params(User, Password) ->
646    Candidates = case lists:member(undefined, [User, Password]) of
647                     true -> [<<"Missing required parameter.">>];
648                     _ -> [case {User, Password} of
649                               {[], []} -> true;
650                               {[_Head | _], [_PasswordHead | _]} -> true;
651                               {[], [_PasswordHead | _]} -> <<"If a username is not specified, a password must not be supplied.">>;
652                               _ -> <<"A password must be supplied.">>
653                           end]
654                 end,
655    lists:filter(fun (E) -> E =/= true end, Candidates).
656
657malformed_url_message(Hostname) ->
658    list_to_binary(
659      io_lib:format(
660        "Malformed URL ~s; if using IPv6, enclose in square brackets",
661        [Hostname])).
662
663%% erlang R15B03 has http_uri:parse/2 that does the job
664%% reimplement after support of R14B04 will be dropped
665parse_hostname(Hostname, DefaultScheme) ->
666    do_parse_hostname(string:trim(Hostname), DefaultScheme).
667
668do_parse_hostname([], _) ->
669    throw({error, [<<"Hostname is required.">>]});
670
671do_parse_hostname(Hostname, DefaultScheme) ->
672    WithScheme = case string:str(Hostname, "://") of
673                     0 -> atom_to_list(DefaultScheme) ++ "://" ++ Hostname;
674                     _ -> Hostname
675                 end,
676    SchemeVer = fun (S) ->
677                        case string:to_lower(S) of
678                            "http" -> valid;
679                            "https" -> valid;
680                            _ -> {error, {invalid_scheme, S}}
681                        end
682                end,
683    case http_uri:parse(WithScheme, [{scheme_validation_fun, SchemeVer},
684                                     {ipv6_host_with_brackets, false},
685                                     {scheme_defaults, [{http, 8091},
686                                                        {https, 18091}]}]) of
687        {ok, {Scheme, "", Host, Port, "/", ""}} ->
688            {Scheme, Host, parse_validate_port_number(integer_to_list(Port))};
689        {error, {invalid_scheme, S}} ->
690            throw({error, [list_to_binary("Unsupported protocol " ++ S)]});
691        _ ->
692            throw({error, [malformed_url_message(Hostname)]})
693    end.
694
695handle_add_node(Req) ->
696    do_handle_add_node(Req, undefined).
697
698handle_add_node_to_group(GroupUUIDString, Req) ->
699    do_handle_add_node(Req, list_to_binary(GroupUUIDString)).
700
701add_node_error_code(cannot_acquire_lock) ->
702    503;
703add_node_error_code(unknown_group) ->
704    404;
705add_node_error_code(_) ->
706    400.
707
708do_handle_add_node(Req, GroupUUID) ->
709    %% parameter example: hostname=epsilon.local, user=Administrator, password=asd!23
710    Params = mochiweb_request:parse_post(Req),
711
712    Parsed = case parse_join_cluster_params(Params, false) of
713                 {ok, ParsedKV} ->
714                     case validate_add_node_params(proplists:get_value(user, ParsedKV),
715                                                   proplists:get_value(password, ParsedKV)) of
716                         [] ->
717                             {ok, ParsedKV};
718                         CredErrors ->
719                             {errors, CredErrors}
720                     end;
721                 {errors, ParseErrors} ->
722                     {errors, ParseErrors}
723             end,
724
725    case Parsed of
726        {ok, KV} ->
727            User = proplists:get_value(user, KV),
728            Password = proplists:get_value(password, KV),
729            Scheme = proplists:get_value(scheme, KV),
730            Hostname = proplists:get_value(host, KV),
731            Port = proplists:get_value(port, KV),
732            Services = proplists:get_value(services, KV),
733
734            menelaus_util:survive_web_server_restart(
735              fun () ->
736                  case ns_cluster:add_node_to_group(
737                         Scheme, Hostname, Port,
738                         {User, Password},
739                         GroupUUID,
740                         Services) of
741                      {ok, OtpNode} ->
742                          ns_audit:add_node(Req, Hostname, Port, User,
743                                            GroupUUID, Services, OtpNode),
744                          ServicesJSON =
745                              [ns_cluster_membership:json_service_name(S)
746                               || S <- Services],
747                          event_log:add_log(
748                            node_join_success,
749                            [{node_added, list_to_binary(Hostname)},
750                             {node_services, ServicesJSON}]),
751
752                          reply_json(Req, {struct, [{otpNode, OtpNode}]}, 200);
753                      {error, What, Message} ->
754                          reply_json(Req, [Message], add_node_error_code(What))
755                  end
756              end);
757        {errors, ErrorList} ->
758            reply_json(Req, ErrorList, 400)
759    end.
760
761validate_node(NodeArg) ->
762    Node = (catch list_to_existing_atom(NodeArg)),
763    case Node of
764        undefined ->
765            {error, "No server specified."};
766        _ when not is_atom(Node) ->
767            {error, "Unknown server given."};
768        _ ->
769            {ok, Node}
770    end.
771
772parse_graceful_failover_args(Req) ->
773    Params = mochiweb_request:parse_post(Req),
774    parse_otp_nodes(Params).
775
776parse_otp_nodes(Params) ->
777    OtpNodes = proplists:lookup_all("otpNode", Params),
778    {Good, Bad} = lists:foldl(
779                    fun ({_Key, Val}, {G, B}) ->
780                            case validate_node(Val) of
781                                {ok, Node} -> {[Node | G], B};
782                                _ -> {G, [Val | B]}
783                            end
784                    end, {[], []}, OtpNodes),
785    case Bad of
786        [] ->
787            case Good of
788                [] ->
789                    {error, "No server specified."};
790                _ ->
791                    %% Remove duplicates.
792                    {ok, lists:usort(Good)}
793            end;
794        _ ->
795            {error, io_lib:format("Unknown server given: ~p", [Bad])}
796    end.
797
798parse_hard_failover_args(Req) ->
799    Params = mochiweb_request:parse_post(Req),
800    case parse_otp_nodes(Params) of
801        {ok, Nodes} ->
802            AllowUnsafe = proplists:get_value("allowUnsafe", Params),
803            {ok, Nodes, AllowUnsafe =:= "true"};
804        Error ->
805            Error
806    end.
807
808failover_reply({incompatible_with_previous, Nodes}, Req) ->
809    Hostnames = [binary_to_list(H) ||
810                    {_, H} <- menelaus_web_node:get_hostnames(Req, Nodes)],
811    {400, io_lib:format("Failover must include the following nodes: ~s.",
812                        [string:join(Hostnames, ", ")])};
813failover_reply(RV, _) ->
814    failover_reply(RV).
815
816failover_reply(ok) ->
817    200;
818failover_reply(in_progress) ->
819    failover_reply(rebalance_running);
820failover_reply(rebalance_running) ->
821    {503, "Rebalance running."};
822failover_reply(in_recovery) ->
823    {503, "Cluster is in recovery mode."};
824failover_reply(orchestration_unsafe) ->
825    %% 504 is a stretch here of course, but we do
826    %% need to convey the information to the client somehow.
827    {504, "Cannot safely perform a failover at the moment"};
828failover_reply(config_sync_failed) ->
829    {500, "Failed to synchronize config to other nodes"};
830failover_reply(quorum_lost) ->
831    {500, "Cannot safely perform a failover at the moment"};
832failover_reply({config_sync_failed, _}) ->
833    failover_reply(config_sync_failed);
834failover_reply(last_node) ->
835    {400, "Last active node cannot be failed over."};
836failover_reply(not_graceful) ->
837    {400, "Failover cannot be done gracefully (would lose vbuckets)."};
838failover_reply(non_kv_node) ->
839    {400, "Failover cannot be done gracefully for a node without data service."
840     " Use hard failover."};
841failover_reply(unknown_node) ->
842    {400, "Unknown server given."};
843failover_reply(inactive_node) ->
844    {400, "Inactive server given."};
845failover_reply(stopped_by_user) ->
846    {409, "Stopped by user."};
847failover_reply({not_in_peers, Node, _ClusterNodes}) ->
848    {400, io_lib:format("~p which is orchestrating the failover must be "
849                        "one of the nodes that survive the failover", [Node])};
850failover_reply({aborted, Map}) ->
851    Format = [{failed_peers, "Failover could not be processed on nodes ~p"},
852              {diverged_peers, "Failover is unsafe on nodes ~p due to "
853                               "diverged histories"}],
854    Errs = maps:fold(
855             fun (K, V, Acc) ->
856                     F = proplists:get_value(K, Format),
857                     Err = lists:flatten(io_lib:format(F, [V])),
858                     case Acc of
859                         [] -> Err;
860                         _ -> Acc ++ [" and "] ++ Err
861                     end
862             end, [], Map),
863    {503, lists:flatten(Errs)};
864failover_reply(Other) ->
865    {500, io_lib:format("Unexpected server error: ~p", [Other])}.
866
867failover_audit_and_reply(RV, Req, Nodes, Type) ->
868    case failover_reply(RV, Req) of
869        200 ->
870            ns_audit:failover_nodes(Req, Nodes, Type),
871            reply(Req, 200);
872        {Code, Message} ->
873            reply_text(Req, Message, Code)
874    end.
875
876handle_failover(Req) ->
877    case parse_hard_failover_args(Req) of
878        {ok, Nodes, AllowUnsafe} ->
879            failover_audit_and_reply(
880              ns_cluster_membership:failover(Nodes, AllowUnsafe),
881              Req, Nodes, hard);
882        {error, ErrorMsg} ->
883            reply_text(Req, ErrorMsg, 400)
884    end.
885
886handle_start_failover(Req) ->
887    case parse_hard_failover_args(Req) of
888        {ok, Nodes, AllowUnsafe} ->
889            failover_audit_and_reply(
890              ns_orchestrator:start_failover(Nodes, AllowUnsafe),
891              Req, Nodes, hard);
892        {error, ErrorMsg} ->
893            reply_text(Req, ErrorMsg, 400)
894    end.
895
896handle_start_graceful_failover(Req) ->
897    case parse_graceful_failover_args(Req) of
898        {ok, Nodes} ->
899            failover_audit_and_reply(
900              ns_orchestrator:start_graceful_failover(Nodes),
901              Req, Nodes, graceful);
902        {error, ErrorMsg} ->
903            reply_text(Req, ErrorMsg, 400)
904    end.
905
906handle_rebalance(Req) ->
907    Params = mochiweb_request:parse_post(Req),
908    case string:tokens(proplists:get_value("knownNodes", Params, ""),",") of
909        [] ->
910            reply_json(Req, {struct, [{empty_known_nodes, 1}]}, 400);
911        KnownNodesS ->
912            EjectedNodesS = string:tokens(proplists:get_value("ejectedNodes",
913                                                              Params, ""), ","),
914            UnknownNodes = [S || S <- EjectedNodesS ++ KnownNodesS,
915                                try list_to_existing_atom(S), false
916                                catch error:badarg -> true end],
917            case UnknownNodes of
918                [] ->
919                    DeltaRecoveryBuckets = case proplists:get_value("deltaRecoveryBuckets", Params) of
920                                               undefined -> all;
921                                               RawRecoveryBuckets ->
922                                                   [BucketName || BucketName <- string:tokens(RawRecoveryBuckets, ",")]
923                                           end,
924                    do_handle_rebalance(Req, KnownNodesS, EjectedNodesS, DeltaRecoveryBuckets);
925                _ ->
926                    reply_json(Req, {struct, [{mismatch, 1}]}, 400)
927            end
928    end.
929
930-spec do_handle_rebalance(any(), [string()], [string()], all | [bucket_name()]) -> any().
931do_handle_rebalance(Req, KnownNodesS, EjectedNodesS, DeltaRecoveryBuckets) ->
932    EjectedNodes = [list_to_existing_atom(N) || N <- EjectedNodesS],
933    KnownNodes = [list_to_existing_atom(N) || N <- KnownNodesS],
934    case rebalance:start(KnownNodes, EjectedNodes, DeltaRecoveryBuckets) of
935        already_balanced ->
936            reply(Req, 200);
937        in_progress ->
938            reply(Req, 200);
939        nodes_mismatch ->
940            reply_json(Req, {struct, [{mismatch, 1}]}, 400);
941        delta_recovery_not_possible ->
942            reply_json(Req, {struct, [{deltaRecoveryNotPossible, 1}]}, 400);
943        no_active_nodes_left ->
944            reply_text(Req, "No active nodes left", 400);
945        in_recovery ->
946            reply_text(Req, "Cluster is in recovery mode.", 503);
947        no_kv_nodes_left ->
948            reply_json(Req, {struct, [{noKVNodesLeft, 1}]}, 400);
949        ok ->
950            ns_audit:rebalance_initiated(Req, KnownNodes, EjectedNodes, DeltaRecoveryBuckets),
951            reply(Req, 200)
952    end.
953
954handle_rebalance_progress(_PoolId, Req) ->
955    case rebalance:progress() of
956        {running, PerNode} ->
957            PerNodeJson = [{atom_to_binary(Node, latin1),
958                            {struct, [{progress, Progress}]}}
959                           || {Node, Progress} <- PerNode],
960            Status = [{status, <<"running">>} | PerNodeJson],
961            reply_json(Req, {struct, Status}, 200);
962        not_running ->
963            Status = [{status, <<"none">>} | get_rebalance_error()],
964            reply_json(Req, {struct, Status}, 200);
965        {error, timeout} = Err ->
966            reply_json(Req, {[Err]}, 503)
967    end.
968
969get_rebalance_error() ->
970    [{errorMessage, iolist_to_binary(ErrorMessage)} ||
971        {none, ErrorMessage} <- [rebalance:status()]].
972
973handle_stop_rebalance(Req) ->
974    validator:handle(handle_stop_rebalance(Req, _),
975                     Req, form, [validator:boolean(allowUnsafe, _)]).
976
977handle_stop_rebalance(Req, Params) ->
978    AllowUnsafe = proplists:get_value(allowUnsafe, Params, false),
979    case rebalance:stop(AllowUnsafe) of
980        unsafe ->
981            reply_text(Req,
982                       "Cannot communicate to the orchestrator node. "
983                       "Stopping rebalance is unsafe. "
984                       "This can be overriden by passing allowUnsafe=true "
985                       "in the POST form.",
986                       504);
987        _ ->
988            reply(Req, 200)
989    end.
990
991handle_re_add_node(Req) ->
992    Params = mochiweb_request:parse_post(Req),
993    do_handle_set_recovery_type(Req, full, Params).
994
995handle_re_failover(Req) ->
996    Params = mochiweb_request:parse_post(Req),
997    NodeString = proplists:get_value("otpNode", Params, "undefined"),
998    case ns_cluster_membership:re_failover(NodeString) of
999        {ok, _} ->
1000            ns_audit:failover_nodes(Req, [list_to_existing_atom(NodeString)],
1001                                    cancel_recovery),
1002            reply(Req, 200);
1003        not_possible ->
1004            reply(Req, 400)
1005    end.
1006
1007serve_node_services(Req) ->
1008    {_Rev, _RevEpoch, Bin, _NodesExtHash} =
1009        bucket_info_cache:build_node_services(),
1010    reply_ok(Req, "application/json", Bin).
1011
1012serve_node_services_streaming(Req) ->
1013    handle_streaming(
1014      fun (_, _UpdateID) ->
1015              {_Rev, _RevEpoc, V, _NodesExtHash} =
1016                bucket_info_cache:build_node_services(),
1017              {just_write, {write, V}}
1018      end, Req).
1019
1020decode_recovery_type("delta") ->
1021    delta;
1022decode_recovery_type("full") ->
1023    full;
1024decode_recovery_type(_) ->
1025    undefined.
1026
1027handle_set_recovery_type(Req) ->
1028    Params = mochiweb_request:parse_post(Req),
1029    Type = decode_recovery_type(proplists:get_value("recoveryType", Params)),
1030    do_handle_set_recovery_type(Req, Type, Params).
1031
1032do_handle_set_recovery_type(Req, Type, Params) ->
1033    NodeStr = proplists:get_value("otpNode", Params),
1034
1035    Node = try
1036               list_to_existing_atom(NodeStr)
1037           catch
1038               error:badarg ->
1039                   undefined
1040           end,
1041
1042    OtpNodeErrorMsg = <<"invalid node name or node can't be used for delta recovery">>,
1043
1044    NodeSvcs = ns_cluster_membership:node_services(ns_config:latest(), Node),
1045    NotKVIndex = not lists:member(kv, NodeSvcs) andalso not lists:member(index, NodeSvcs),
1046
1047    Errors = lists:flatten(
1048               [case Type of
1049                    undefined ->
1050                        [{recoveryType, <<"recovery type must be either 'delta' or 'full'">>}];
1051                    _ ->
1052                        []
1053                end,
1054
1055                case Node of
1056                    undefined ->
1057                        [{otpNode, OtpNodeErrorMsg}];
1058                    _ ->
1059                        []
1060                end,
1061
1062                case Type =:= delta andalso NotKVIndex of
1063                    true ->
1064                        [{otpNode, OtpNodeErrorMsg}];
1065                    false ->
1066                        []
1067                end]),
1068
1069    case Errors of
1070        [] ->
1071            case ns_cluster_membership:update_recovery_type(Node, Type) of
1072                {ok, _} ->
1073                    ns_audit:enter_node_recovery(Req, Node, Type),
1074                    reply_json(Req, [], 200);
1075                bad_node ->
1076                    reply_json(Req, {struct, [{otpNode, OtpNodeErrorMsg}]}, 400)
1077            end;
1078        _ ->
1079            reply_json(Req, {struct, Errors}, 400)
1080    end.
1081
1082
1083-ifdef(TEST).
1084parse_validate_services_list_test() ->
1085    meck:new(cluster_compat_mode, [passthrough]),
1086    meck:expect(cluster_compat_mode, is_enterprise, fun () -> true end),
1087    {error, _} = parse_validate_services_list(""),
1088    ?assertEqual({ok, [index, kv, n1ql]}, parse_validate_services_list("n1ql,kv,index")),
1089    {ok, [kv]} = parse_validate_services_list("kv"),
1090    {error, _} = parse_validate_services_list("n1ql,kv,s"),
1091    ?assertMatch({error, _}, parse_validate_services_list("neeql,kv")),
1092    meck:unload(cluster_compat_mode).
1093
1094hostname_parsing_test() ->
1095    Urls = [" \t\r\nhttp://host:1025\n\r\t ",
1096            "http://host:100",
1097            "http://host:100000",
1098            "hTTp://host:8000",
1099            "ftp://host:600",
1100            "http://host",
1101            "127.0.0.1:6000",
1102            "host:port",
1103            "aaa:bb:cc",
1104            " \t\r\nhost\n",
1105            " ",
1106            "https://host:2000",
1107            "[::1]",
1108            "http://::1:10000",
1109            "http://[::1]:10000"],
1110
1111    ExpectedResults = [{http, "host",1025},
1112                       {error, [<<"The port number must be greater than 1023 and less than 65536.">>]},
1113                       {error, [<<"The port number must be greater than 1023 and less than 65536.">>]},
1114                       {http, "host", 8000},
1115                       {error, [<<"Unsupported protocol ftp">>]},
1116                       {http, "host", 8091},
1117                       {https, "127.0.0.1", 6000},
1118                       {error,
1119                        [<<"Malformed URL host:port; "
1120                           "if using IPv6, enclose in square brackets">>]},
1121                       {error, [<<"Malformed URL aaa:bb:cc; "
1122                           "if using IPv6, enclose in square brackets">>]},
1123                       {https, "host", 18091},
1124                       {error, [<<"Hostname is required.">>]},
1125                       {https, "host", 2000},
1126                       {https, "::1", 18091},
1127                       {error, [<<"Malformed URL http://::1:10000; "
1128                           "if using IPv6, enclose in square brackets">>]},
1129                       {http, "::1", 10000}],
1130
1131    Results = [(catch parse_hostname(X, https)) || X <- Urls],
1132
1133    ?assertEqual(ExpectedResults, Results),
1134    ok.
1135-endif.
1136