xref: /vulcan/ns_server/src/ns_bucket.erl (revision 2bec42c5)
1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2009-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16-module(ns_bucket).
17
18-include("ns_common.hrl").
19-include("ns_config.hrl").
20-include_lib("eunit/include/eunit.hrl").
21
22%% API
23-export([auth_type/1,
24         bucket_nodes/1,
25         bucket_type/1,
26         replication_type/1,
27         create_bucket/3,
28         credentials/1,
29         delete_bucket/1,
30         delete_bucket_returning_config/1,
31         failover_warnings/0,
32         get_bucket/1,
33         get_bucket/2,
34         get_bucket_names/0,
35         get_bucket_names/1,
36         get_bucket_names_of_type/2,
37         get_bucket_names_of_type/3,
38         get_buckets/0,
39         get_buckets/1,
40         is_persistent/1,
41         is_port_free/1,
42         is_port_free/2,
43         is_valid_bucket_name/1,
44         json_map_from_config/2,
45         json_map_with_full_config/3,
46         live_bucket_nodes/1,
47         live_bucket_nodes_from_config/1,
48         map_to_replicas/1,
49         replicated_vbuckets/3,
50         maybe_get_bucket/2,
51         moxi_port/1,
52         name_conflict/1,
53         name_conflict/2,
54         names_conflict/2,
55         node_locator/1,
56         num_replicas/1,
57         ram_quota/1,
58         conflict_resolution_type/1,
59         drift_thresholds/1,
60         eviction_policy/1,
61         storage_mode/1,
62         raw_ram_quota/1,
63         sasl_password/1,
64         set_bucket_config/2,
65         set_fast_forward_map/2,
66         set_map/2,
67         set_map_opts/2,
68         set_servers/2,
69         filter_ready_buckets/1,
70         update_bucket_props/2,
71         update_bucket_props/4,
72         node_bucket_names/1,
73         node_bucket_names/2,
74         node_bucket_names_of_type/3,
75         node_bucket_names_of_type/4,
76         all_node_vbuckets/1,
77         update_vbucket_map_history/2,
78         past_vbucket_maps/0,
79         past_vbucket_maps/1,
80         config_to_map_options/1,
81         needs_rebalance/2,
82         can_have_views/1,
83         bucket_view_nodes/1,
84         bucket_config_view_nodes/1,
85         get_num_vbuckets/0,
86         config_upgrade_to_50/1,
87         config_upgrade_to_51/1,
88         config_upgrade_to_55/1]).
89
90
91%%%===================================================================
92%%% API
93%%%===================================================================
94
95%% @doc Return {Username, Password} for a bucket.
96-spec credentials(nonempty_string()) ->
97                         {nonempty_string(), string()}.
98credentials(Bucket) ->
99    {ok, BucketConfig} = get_bucket(Bucket),
100    {Bucket, proplists:get_value(sasl_password, BucketConfig, "")}.
101
102get_bucket(Bucket) ->
103    get_bucket(Bucket, ns_config:latest()).
104
105get_bucket(Bucket, Config) ->
106    BucketConfigs = get_buckets(Config),
107    case lists:keysearch(Bucket, 1, BucketConfigs) of
108        {value, {_, BucketConfig}} ->
109            {ok, BucketConfig};
110        false -> not_present
111    end.
112
113maybe_get_bucket(BucketName, undefined) ->
114    get_bucket(BucketName);
115maybe_get_bucket(_, BucketConfig) ->
116    {ok, BucketConfig}.
117
118get_bucket_names() ->
119    get_bucket_names(get_buckets()).
120
121get_bucket_names(BucketConfigs) ->
122    proplists:get_keys(BucketConfigs).
123
124-spec get_bucket_names_of_type(memcached|membase,
125                               undefined|couchstore|ephemeral) -> list().
126get_bucket_names_of_type(Type, Mode) ->
127    get_bucket_names_of_type(Type, Mode, get_buckets()).
128
129-spec get_bucket_names_of_type(memcached|membase,
130                               undefined|couchstore|ephemeral, list()) -> list().
131get_bucket_names_of_type(Type, Mode, BucketConfigs) ->
132    [Name || {Name, Config} <- BucketConfigs,
133             bucket_type(Config) == Type,
134             storage_mode(Config) == Mode].
135
136get_buckets() ->
137    get_buckets(ns_config:latest()).
138
139get_buckets(Config) ->
140    ns_config:search_prop(Config, buckets, configs, []).
141
142live_bucket_nodes(Bucket) ->
143    {ok, BucketConfig} = get_bucket(Bucket),
144    live_bucket_nodes_from_config(BucketConfig).
145
146live_bucket_nodes_from_config(BucketConfig) ->
147    Servers = proplists:get_value(servers, BucketConfig),
148    LiveNodes = [node()|nodes()],
149    [Node || Node <- Servers, lists:member(Node, LiveNodes) ].
150
151-spec conflict_resolution_type([{_,_}]) -> atom().
152conflict_resolution_type(BucketConfig) ->
153    proplists:get_value(conflict_resolution_type, BucketConfig, seqno).
154
155drift_thresholds(BucketConfig) ->
156    case conflict_resolution_type(BucketConfig) of
157        lww ->
158            {proplists:get_value(drift_ahead_threshold_ms, BucketConfig),
159             proplists:get_value(drift_behind_threshold_ms, BucketConfig)};
160        seqno ->
161            undefined
162    end.
163
164eviction_policy(BucketConfig) ->
165    Default = case storage_mode(BucketConfig) of
166                  undefined -> value_only;
167                  couchstore -> value_only;
168                  ephemeral -> no_eviction
169              end,
170    proplists:get_value(eviction_policy, BucketConfig, Default).
171
172-spec storage_mode([{_,_}]) -> atom().
173storage_mode(BucketConfig) ->
174    case bucket_type(BucketConfig) of
175        memcached ->
176            undefined;
177        membase ->
178            proplists:get_value(storage_mode, BucketConfig, couchstore)
179    end.
180
181%% returns bucket ram quota multiplied by number of nodes this bucket
182%% resides on. I.e. gives amount of ram quota that will be used by
183%% across the cluster for this bucket.
184-spec ram_quota([{_,_}]) -> integer().
185ram_quota(Bucket) ->
186    case proplists:get_value(ram_quota, Bucket) of
187        X when is_integer(X) ->
188            X * length(proplists:get_value(servers, Bucket, []))
189    end.
190
191%% returns bucket ram quota for _single_ node. Each node will subtract
192%% this much from it's node quota.
193-spec raw_ram_quota([{_,_}]) -> integer().
194raw_ram_quota(Bucket) ->
195    case proplists:get_value(ram_quota, Bucket) of
196        X when is_integer(X) ->
197            X
198    end.
199
200-define(FS_HARD_NODES_NEEDED, 4).
201-define(FS_FAILOVER_NEEDED, 3).
202-define(FS_REBALANCE_NEEDED, 2).
203-define(FS_SOFT_REBALANCE_NEEDED, 1).
204-define(FS_OK, 0).
205
206bucket_failover_safety(BucketConfig, ActiveNodes, LiveNodes) ->
207    ReplicaNum = ns_bucket:num_replicas(BucketConfig),
208    case ReplicaNum of
209        %% if replica count for bucket is 0 we cannot failover at all
210        0 -> {?FS_OK, ok};
211        _ ->
212            MinLiveCopies = min_live_copies(LiveNodes, BucketConfig),
213            BucketNodes = proplists:get_value(servers, BucketConfig),
214            BaseSafety =
215                if
216                    MinLiveCopies =:= undefined -> % janitor run pending
217                        case LiveNodes of
218                            [_,_|_] -> ?FS_OK;
219                            _ -> ?FS_HARD_NODES_NEEDED
220                        end;
221                    MinLiveCopies =< 1 ->
222                        %% we cannot failover without losing data
223                        %% is some of chain nodes are down ?
224                        DownBucketNodes = lists:any(fun (N) -> not lists:member(N, LiveNodes) end,
225                                                    BucketNodes),
226                        if
227                            DownBucketNodes ->
228                                %% yes. User should bring them back or failover/replace them (and possibly add more)
229                                ?FS_FAILOVER_NEEDED;
230                            %% Can we replace missing chain nodes with other live nodes ?
231                            LiveNodes =/= [] andalso tl(LiveNodes) =/= [] -> % length(LiveNodes) > 1, but more efficent
232                                %% we're generally fault tolerant, just not balanced enough
233                                ?FS_REBALANCE_NEEDED;
234                            true ->
235                                %% we have one (or 0) of live nodes, need at least one more to be fault tolerant
236                                ?FS_HARD_NODES_NEEDED
237                        end;
238                    true ->
239                        case needs_rebalance(BucketConfig, ActiveNodes) of
240                            true ->
241                                ?FS_SOFT_REBALANCE_NEEDED;
242                            false ->
243                                ?FS_OK
244                        end
245                end,
246            ExtraSafety =
247                if
248                    length(LiveNodes) =< ReplicaNum andalso BaseSafety =/= ?FS_HARD_NODES_NEEDED ->
249                        %% if we don't have enough nodes to put all replicas on
250                        softNodesNeeded;
251                    true ->
252                        ok
253                end,
254            {BaseSafety, ExtraSafety}
255    end.
256
257failover_safety_rec(?FS_HARD_NODES_NEEDED, _ExtraSafety, _, _ActiveNodes, _LiveNodes) ->
258    {?FS_HARD_NODES_NEEDED, ok};
259failover_safety_rec(BaseSafety, ExtraSafety, [], _ActiveNodes, _LiveNodes) ->
260    {BaseSafety, ExtraSafety};
261failover_safety_rec(BaseSafety, ExtraSafety, [BucketConfig | RestConfigs], ActiveNodes, LiveNodes) ->
262    {ThisBaseSafety, ThisExtraSafety} = bucket_failover_safety(BucketConfig, ActiveNodes, LiveNodes),
263    NewBaseSafety = case BaseSafety < ThisBaseSafety of
264                        true -> ThisBaseSafety;
265                        _ -> BaseSafety
266                    end,
267    NewExtraSafety = if ThisExtraSafety =:= softNodesNeeded
268                        orelse ExtraSafety =:= softNodesNeeded ->
269                             softNodesNeeded;
270                        true ->
271                             ok
272                     end,
273    failover_safety_rec(NewBaseSafety, NewExtraSafety,
274                        RestConfigs, ActiveNodes, LiveNodes).
275
276-spec failover_warnings() -> [failoverNeeded | rebalanceNeeded | hardNodesNeeded | softNodesNeeded].
277failover_warnings() ->
278    Config = ns_config:get(),
279
280    ActiveNodes = ns_cluster_membership:service_active_nodes(Config, kv),
281    LiveNodes = ns_cluster_membership:service_actual_nodes(Config, kv),
282    {BaseSafety0, ExtraSafety}
283        = failover_safety_rec(?FS_OK, ok,
284                              [C || {_, C} <- get_buckets(Config),
285                                    membase =:= bucket_type(C)],
286                              ActiveNodes,
287                              LiveNodes),
288    BaseSafety = case BaseSafety0 of
289                     ?FS_HARD_NODES_NEEDED -> hardNodesNeeded;
290                     ?FS_FAILOVER_NEEDED -> failoverNeeded;
291                     ?FS_REBALANCE_NEEDED -> rebalanceNeeded;
292                     ?FS_SOFT_REBALANCE_NEEDED -> softRebalanceNeeded;
293                     ?FS_OK -> ok
294                 end,
295    [S || S <- [BaseSafety, ExtraSafety], S =/= ok].
296
297map_to_replicas(Map) ->
298    lists:foldr(
299      fun ({VBucket, [Master | Replicas]}, Acc) ->
300              case Master of
301                  undefined ->
302                      Acc;
303                  _ ->
304                      [{Master, R, VBucket} || R <- Replicas, R =/= undefined] ++
305                          Acc
306              end
307      end, [], misc:enumerate(Map, 0)).
308
309%% returns _sorted_ list of vbuckets that are replicated from SrcNode
310%% to DstNode according to given Map.
311replicated_vbuckets(Map, SrcNode, DstNode) ->
312    VBuckets = [V || {S, D, V} <- map_to_replicas(Map),
313                     S =:= SrcNode, DstNode =:= D],
314    lists:sort(VBuckets).
315
316%% @doc Return the minimum number of live copies for all vbuckets.
317-spec min_live_copies([node()], list()) -> non_neg_integer() | undefined.
318min_live_copies(LiveNodes, Config) ->
319    case proplists:get_value(map, Config) of
320        undefined -> undefined;
321        Map ->
322            lists:foldl(
323              fun (Chain, Min) ->
324                      NumLiveCopies =
325                          lists:foldl(
326                            fun (Node, Acc) ->
327                                    case lists:member(Node, LiveNodes) of
328                                        true -> Acc + 1;
329                                        false -> Acc
330                                    end
331                            end, 0, Chain),
332                      erlang:min(Min, NumLiveCopies)
333              end, length(hd(Map)), Map)
334    end.
335
336node_locator(BucketConfig) ->
337    case proplists:get_value(type, BucketConfig) of
338        membase ->
339            vbucket;
340        memcached ->
341            ketama
342    end.
343
344-spec num_replicas([{_,_}]) -> integer().
345num_replicas(Bucket) ->
346    case proplists:get_value(num_replicas, Bucket) of
347        X when is_integer(X) ->
348            X
349    end.
350
351bucket_type(Bucket) ->
352    proplists:get_value(type, Bucket).
353
354auth_type(Bucket) ->
355    proplists:get_value(auth_type, Bucket).
356
357sasl_password(Bucket) ->
358    proplists:get_value(sasl_password, Bucket, "").
359
360moxi_port(Bucket) ->
361    proplists:get_value(moxi_port, Bucket).
362
363bucket_nodes(Bucket) ->
364    proplists:get_value(servers, Bucket).
365
366-spec replication_type([{_,_}]) -> bucket_replication_type().
367replication_type(Bucket) ->
368    proplists:get_value(repl_type, Bucket, tap).
369
370json_map_from_config(LocalAddr, BucketConfig) ->
371    Config = ns_config:get(),
372    json_map_with_full_config(LocalAddr, BucketConfig, Config).
373
374json_map_with_full_config(LocalAddr, BucketConfig, Config) ->
375    NumReplicas = num_replicas(BucketConfig),
376    EMap = proplists:get_value(map, BucketConfig, []),
377    BucketNodes = proplists:get_value(servers, BucketConfig, []),
378    ENodes = lists:delete(undefined, lists:usort(lists:append([BucketNodes |
379                                                                EMap]))),
380    Servers = lists:map(
381                fun (ENode) ->
382                        Port = ns_config:search_node_prop(ENode, Config,
383                                                          memcached, port),
384                        Host = case misc:node_name_host(ENode) of
385                                   {_Name, "127.0.0.1"} -> LocalAddr;
386                                   {_Name, "::1"} -> LocalAddr;
387                                   {_Name, H} -> H
388                               end,
389                        list_to_binary(misc:join_host_port(Host, Port))
390                end, ENodes),
391    {_, NodesToPositions0}
392        = lists:foldl(fun (N, {Pos,Dict}) ->
393                              {Pos+1, dict:store(N, Pos, Dict)}
394                      end, {0, dict:new()}, ENodes),
395    NodesToPositions = dict:store(undefined, -1, NodesToPositions0),
396    Map = [[dict:fetch(N, NodesToPositions) || N <- Chain] || Chain <- EMap],
397    FastForwardMapList =
398        case proplists:get_value(fastForwardMap, BucketConfig) of
399            undefined -> [];
400            FFM ->
401                [{vBucketMapForward,
402                  [[dict:fetch(N, NodesToPositions) || N <- Chain]
403                   || Chain <- FFM]}]
404        end,
405    {struct, [{hashAlgorithm, <<"CRC">>},
406              {numReplicas, NumReplicas},
407              {serverList, Servers},
408              {vBucketMap, Map} |
409              FastForwardMapList]}.
410
411set_bucket_config(Bucket, NewConfig) ->
412    update_bucket_config(Bucket, fun (_) -> NewConfig end).
413
414%% Here's code snippet from bucket-engine.  We also disallow '.' &&
415%% '..' which cause problems with browsers even when properly
416%% escaped. See bug 953
417%%
418%% static bool has_valid_bucket_name(const char *n) {
419%%     bool rv = strlen(n) > 0;
420%%     for (; *n; n++) {
421%%         rv &= isalpha(*n) || isdigit(*n) || *n == '.' || *n == '%' || *n == '_' || *n == '-';
422%%     }
423%%     return rv;
424%% }
425%%
426%% Now we also disallow bucket names starting with '.'. It's because couchdb
427%% creates (at least now) auxiliary directories which start with dot. We don't
428%% want to conflict with them
429is_valid_bucket_name([]) -> {error, empty};
430is_valid_bucket_name([$. | _]) -> {error, starts_with_dot};
431is_valid_bucket_name(BucketName) ->
432    case is_valid_bucket_name_inner(BucketName) of
433        {error, _} = X ->
434            X;
435        true ->
436            Reserved =
437                string:str(string:to_lower(BucketName), "_users.couch.") =:= 1 orelse
438                string:str(string:to_lower(BucketName), "_replicator.couch.") =:= 1,
439            case Reserved of
440                true ->
441                    {error, reserved};
442                false ->
443                    true
444            end
445    end.
446
447is_valid_bucket_name_inner([Char | Rest]) ->
448    case ($A =< Char andalso Char =< $Z)
449        orelse ($a =< Char andalso Char =< $z)
450        orelse ($0 =< Char andalso Char =< $9)
451        orelse Char =:= $. orelse Char =:= $%
452        orelse Char =:= $_ orelse Char =:= $- of
453        true ->
454            case Rest of
455                [] -> true;
456                _ -> is_valid_bucket_name_inner(Rest)
457            end;
458        _ -> {error, invalid}
459    end.
460
461is_not_a_bucket_port(BucketName, Port) ->
462    UsedPorts = lists:filter(fun (undefined) -> false;
463                                 (_) -> true
464                             end,
465                             [proplists:get_value(moxi_port, Config)
466                              || {Name, Config} <- get_buckets(),
467                                 Name /= BucketName]),
468    not lists:member(Port, UsedPorts).
469
470is_not_a_kernel_port(Port) ->
471    Env = application:get_all_env(kernel),
472    MinPort = case lists:keyfind(inet_dist_listen_min, 1, Env) of
473                  false ->
474                      1000000;
475                  {_, P} ->
476                      P
477              end,
478    MaxPort = case lists:keyfind(inet_dist_listen_max, 1, Env) of
479                  false ->
480                      0;
481                  {_, P1} ->
482                      P1
483              end,
484    Port < MinPort orelse Port > MaxPort.
485
486is_port_free(Port) ->
487    is_port_free([], Port).
488
489is_port_free(BucketName, Port) ->
490    is_port_free(BucketName, Port, ns_config:get()).
491
492is_port_free(BucketName, Port, Config) ->
493    true = (Port /= undefined),
494    TakenWebPort = case BucketName of
495                       [] ->
496                           0;
497                       _ ->
498                           proplists:get_value(port, menelaus_web:webconfig(Config))
499                   end,
500    SSLCapiPort = ns_config:search(Config, {node, node(), ssl_capi_port}, undefined),
501    SSLRestPort = ns_config:search(Config, {node, node(), ssl_rest_port}, undefined),
502
503    Port =/= ns_config:search_node_prop(Config, memcached, port)
504        andalso Port =/= ns_config:search_node_prop(Config, memcached, dedicated_port)
505        andalso Port =/= ns_config:search_node_prop(Config, memcached, ssl_port)
506        andalso Port =/= ns_config:search_node_prop(Config, moxi, port)
507        andalso Port =/= capi_utils:get_capi_port(node(), Config)
508        andalso Port =/= TakenWebPort
509        andalso Port =/= 4369 %% default epmd port
510        andalso is_not_a_bucket_port(BucketName, Port)
511        andalso is_not_a_kernel_port(Port)
512        andalso Port =/= SSLCapiPort
513        andalso Port =/= SSLRestPort.
514
515validate_bucket_config(BucketName, NewConfig) ->
516    case is_valid_bucket_name(BucketName) of
517        true ->
518            Port = proplists:get_value(moxi_port, NewConfig),
519            case Port =:= undefined orelse is_port_free(BucketName, Port) of
520                false ->
521                    {error, {port_conflict, Port}};
522                true ->
523                    ok
524            end;
525        {error, _} ->
526            {error, {invalid_bucket_name, BucketName}}
527    end.
528
529get_num_vbuckets() ->
530    case ns_config:search(couchbase_num_vbuckets_default) of
531        false ->
532            misc:getenv_int("COUCHBASE_NUM_VBUCKETS", 1024);
533        {value, X} ->
534            X
535    end.
536
537new_bucket_default_params(membase) ->
538    [{type, membase},
539     {num_vbuckets, get_num_vbuckets()},
540     {num_replicas, 1},
541     {ram_quota, 0},
542     {replication_topology, star},
543     {servers, []}];
544new_bucket_default_params(memcached) ->
545    Nodes = ns_cluster_membership:service_active_nodes(kv),
546    [{type, memcached},
547     {num_vbuckets, 0},
548     {num_replicas, 0},
549     {servers, Nodes},
550     {map, []},
551     {ram_quota, 0}].
552
553cleanup_bucket_props_pre_50(Props) ->
554    case proplists:get_value(auth_type, Props) of
555        sasl -> lists:keydelete(moxi_port, 1, Props);
556        none -> lists:keydelete(sasl_password, 1, Props)
557    end.
558
559cleanup_bucket_props(Props) ->
560    case proplists:get_value(moxi_port, Props) of
561        undefined ->
562            lists:keydelete(moxi_port, 1, Props);
563        _ ->
564            Props
565    end.
566
567generate_sasl_password() ->
568    binary_to_list(couch_uuids:random()).
569
570generate_sasl_password(Props) ->
571    [{auth_type, sasl} |
572     lists:keystore(sasl_password, 1, Props,
573                    {sasl_password, generate_sasl_password()})].
574
575create_bucket(BucketType, BucketName, NewConfig) ->
576    case validate_bucket_config(BucketName, NewConfig) of
577        ok ->
578            MergedConfig0 =
579                misc:update_proplist(new_bucket_default_params(BucketType),
580                                     NewConfig),
581            MergedConfig1 =
582                case cluster_compat_mode:is_cluster_50() of
583                    true ->
584                        generate_sasl_password(MergedConfig0);
585                    false ->
586                        cleanup_bucket_props_pre_50(MergedConfig0)
587                end,
588            BucketUUID = couch_uuids:random(),
589            MergedConfig = [{repl_type, dcp} |
590                            [{uuid, BucketUUID} | MergedConfig1]],
591            ns_config:update_sub_key(
592              buckets, configs,
593              fun (List) ->
594                      case lists:keyfind(BucketName, 1, List) of
595                          false -> ok;
596                          Tuple ->
597                              exit({already_exists, Tuple})
598                      end,
599                      [{BucketName, MergedConfig} | List]
600              end),
601            %% The janitor will handle creating the map.
602            ok;
603        E -> E
604    end.
605
606-spec delete_bucket(bucket_name()) -> ok | {exit, {not_found, bucket_name()}, any()}.
607delete_bucket(BucketName) ->
608    RV = ns_config:update_sub_key(buckets, configs,
609                                  fun (List) ->
610                                          case lists:keyfind(BucketName, 1, List) of
611                                              false -> exit({not_found, BucketName});
612                                              Tuple ->
613                                                  lists:delete(Tuple, List)
614                                          end
615                                  end),
616    case RV of
617        ok -> ok;
618        {exit, {not_found, _}, _} -> ok
619    end,
620    RV.
621
622-spec delete_bucket_returning_config(bucket_name()) ->
623                                            {ok, BucketConfig :: list()} |
624                                            {exit, {not_found, bucket_name()}, any()}.
625delete_bucket_returning_config(BucketName) ->
626    Ref = make_ref(),
627    Process = self(),
628    RV = ns_config:update_sub_key(buckets, configs,
629                                  fun (List) ->
630                                          case lists:keyfind(BucketName, 1, List) of
631                                              false -> exit({not_found, BucketName});
632                                              {_, BucketConfig} = Tuple ->
633                                                  Process ! {Ref, BucketConfig},
634                                                  lists:delete(Tuple, List)
635                                          end
636                                  end),
637    case RV of
638        ok ->
639            receive
640                {Ref, BucketConfig} ->
641                    {ok, BucketConfig}
642            after 0 ->
643                    exit(this_cannot_happen)
644            end;
645        {exit, {not_found, _}, _} ->
646            RV
647    end.
648
649filter_ready_buckets(BucketInfos) ->
650    lists:filter(fun ({_Name, PList}) ->
651                         case proplists:get_value(servers, PList, []) of
652                             [_|_] = List ->
653                                 lists:member(node(), List);
654                             _ -> false
655                         end
656                 end, BucketInfos).
657
658%% Updates properties of bucket of given name and type.  Check of type
659%% protects us from type change races in certain cases.
660%%
661%% If bucket with given name exists, but with different type, we
662%% should return {exit, {not_found, _}, _}
663update_bucket_props(Type, StorageMode, BucketName, Props) ->
664    case lists:member(BucketName,
665                      get_bucket_names_of_type(Type, StorageMode)) of
666        true ->
667            update_bucket_props(BucketName, Props);
668        false ->
669            {exit, {not_found, BucketName}, []}
670    end.
671
672update_bucket_props(BucketName, Props) ->
673    ns_config:update_sub_key(
674      buckets, configs,
675      fun (List) ->
676              RV = misc:key_update(
677                     BucketName, List,
678                     fun (OldProps) ->
679                             NewProps = lists:foldl(
680                                          fun ({K, _V} = Tuple, Acc) ->
681                                                  [Tuple | lists:keydelete(K, 1, Acc)]
682                                          end, OldProps, Props),
683                             case cluster_compat_mode:is_cluster_50() of
684                                 true ->
685                                     cleanup_bucket_props(NewProps);
686                                 false ->
687                                     cleanup_bucket_props_pre_50(NewProps)
688                             end
689                     end),
690              case RV of
691                  false -> exit({not_found, BucketName});
692                  _ -> ok
693              end,
694              RV
695      end).
696
697set_fast_forward_map(Bucket, Map) ->
698    update_bucket_config(
699      Bucket,
700      fun (OldConfig) ->
701              OldMap = proplists:get_value(fastForwardMap, OldConfig, []),
702              master_activity_events:note_set_ff_map(Bucket, Map, OldMap),
703              lists:keystore(fastForwardMap, 1, OldConfig,
704                             {fastForwardMap, Map})
705      end).
706
707
708set_map(Bucket, Map) ->
709    true = mb_map:is_valid(Map),
710    update_bucket_config(
711      Bucket,
712      fun (OldConfig) ->
713              OldMap = proplists:get_value(map, OldConfig, []),
714              master_activity_events:note_set_map(Bucket, Map, OldMap),
715              lists:keystore(map, 1, OldConfig, {map, Map})
716      end).
717
718set_map_opts(Bucket, Opts) ->
719    OptsHash = erlang:phash2(Opts),
720    update_bucket_config(
721      Bucket,
722      fun (OldConfig) ->
723              lists:keystore(map_opts_hash, 1, OldConfig, {map_opts_hash, OptsHash})
724      end).
725
726set_servers(Bucket, Servers) ->
727    update_bucket_config(
728      Bucket,
729      fun (OldConfig) ->
730              lists:keystore(servers, 1, OldConfig, {servers, Servers})
731      end).
732
733% Update the bucket config atomically.
734update_bucket_config(Bucket, Fun) ->
735    ok = ns_config:update_key(
736           buckets,
737           fun (List) ->
738                   Buckets = proplists:get_value(configs, List, []),
739                   OldConfig = proplists:get_value(Bucket, Buckets),
740                   NewConfig = Fun(OldConfig),
741                   NewBuckets = lists:keyreplace(Bucket, 1, Buckets, {Bucket, NewConfig}),
742                   lists:keyreplace(configs, 1, List, {configs, NewBuckets})
743           end).
744
745is_persistent(BucketName) ->
746    {ok, BucketConfig} = get_bucket(BucketName),
747    bucket_type(BucketConfig) =:= membase andalso
748        storage_mode(BucketConfig) =:= couchstore.
749
750names_conflict(BucketNameA, BucketNameB) ->
751    string:to_lower(BucketNameA) =:= string:to_lower(BucketNameB).
752
753%% @doc Check if a bucket name exists in the list. Case insensitive.
754name_conflict(BucketName, ListOfBuckets) ->
755    BucketNameLower = string:to_lower(BucketName),
756    lists:any(fun ({Name, _}) -> BucketNameLower == string:to_lower(Name) end,
757              ListOfBuckets).
758
759%% @doc Check if a bucket exists. Case insensitive.
760name_conflict(BucketName) ->
761    name_conflict(BucketName, get_buckets()).
762
763node_bucket_names(Node, BucketsConfigs) ->
764    [B || {B, C} <- BucketsConfigs,
765          lists:member(Node, proplists:get_value(servers, C, []))].
766
767node_bucket_names(Node) ->
768    node_bucket_names(Node, get_buckets()).
769
770-spec node_bucket_names_of_type(node(), memcached|membase,
771                                undefined|couchstore|ephemeral) -> list().
772node_bucket_names_of_type(Node, Type, Mode) ->
773    node_bucket_names_of_type(Node, Type, Mode, get_buckets()).
774
775-spec node_bucket_names_of_type(node(), memcached|membase,
776                                undefined|couchstore|ephemeral, list()) -> list().
777node_bucket_names_of_type(Node, Type, Mode, BucketConfigs) ->
778    [B || {B, C} <- BucketConfigs,
779          lists:member(Node, proplists:get_value(servers, C, [])),
780          bucket_type(C) =:= Type,
781          storage_mode(C) =:= Mode].
782
783%% All the vbuckets (active or replica) on a node
784-spec all_node_vbuckets(term()) -> list(integer()).
785all_node_vbuckets(BucketConfig) ->
786    VBucketMap = couch_util:get_value(map, BucketConfig, []),
787    Node = node(),
788    [Ordinal-1 ||
789        {Ordinal, VBuckets} <- misc:enumerate(VBucketMap),
790        lists:member(Node, VBuckets)].
791
792config_to_map_options(Config) ->
793    [{max_slaves, proplists:get_value(max_slaves, Config, 10)},
794     {replication_topology, proplists:get_value(replication_topology, Config, star)}].
795
796update_vbucket_map_history(Map, SanifiedOptions) ->
797    History = past_vbucket_maps(),
798    NewEntry = {Map, SanifiedOptions},
799    History2 = case lists:member(NewEntry, History) of
800                   true ->
801                       History;
802                   false ->
803                       History1 = [NewEntry | History],
804                       case length(History1) > ?VBMAP_HISTORY_SIZE of
805                           true -> lists:sublist(History1, ?VBMAP_HISTORY_SIZE);
806                           false -> History1
807                       end
808               end,
809    ns_config:set(vbucket_map_history, History2).
810
811past_vbucket_maps() ->
812    past_vbucket_maps(ns_config:latest()).
813
814past_vbucket_maps(Config) ->
815    case ns_config:search(Config, vbucket_map_history) of
816        {value, V} ->
817            lists:map(
818              fun ({Map, Options} = MapOptions) ->
819                      case proplists:get_value(replication_topology, Options) of
820                          undefined ->
821                              {Map, [{replication_topology, chain} | Options]};
822                          _ ->
823                              MapOptions
824                      end
825              end, V);
826        false -> []
827    end.
828
829needs_rebalance(BucketConfig, Nodes) ->
830    Servers = proplists:get_value(servers, BucketConfig, []),
831    case proplists:get_value(type, BucketConfig) of
832        membase ->
833            case Servers of
834                [] ->
835                    false;
836                _ ->
837                    Map = proplists:get_value(map, BucketConfig),
838                    Map =:= undefined orelse
839                        lists:sort(Nodes) =/= lists:sort(Servers) orelse
840                        ns_rebalancer:map_options_changed(BucketConfig) orelse
841                        (ns_rebalancer:unbalanced(Map, BucketConfig) andalso
842                         not is_compatible_past_map(Nodes, BucketConfig, Map))
843            end;
844        memcached ->
845            lists:sort(Nodes) =/= lists:sort(Servers)
846    end.
847
848is_compatible_past_map(Nodes, BucketConfig, Map) ->
849    History = ns_bucket:past_vbucket_maps(),
850    MapOpts = ns_rebalancer:generate_vbucket_map_options(Nodes, BucketConfig),
851    Matching = mb_map:find_matching_past_maps(Nodes, Map,
852                                              MapOpts, History, [trivial]),
853
854    lists:member(Map, Matching).
855
856can_have_views(BucketConfig) ->
857    storage_mode(BucketConfig) =:= couchstore.
858
859bucket_view_nodes(Bucket) ->
860    bucket_view_nodes(Bucket, ns_config:latest()).
861
862bucket_view_nodes(Bucket, Config) ->
863    case ns_bucket:get_bucket(Bucket, Config) of
864        {ok, BucketConfig} ->
865            bucket_config_view_nodes(BucketConfig);
866        not_present ->
867            []
868    end.
869
870bucket_config_view_nodes(BucketConfig) ->
871    case can_have_views(BucketConfig) of
872        true ->
873            lists:sort(ns_bucket:bucket_nodes(BucketConfig));
874        false ->
875            []
876    end.
877
878config_upgrade_to_50(Config) ->
879    Buckets = get_buckets(Config),
880    NewBuckets =
881        lists:map(
882          fun ({Name, Props}) ->
883                  {Name, misc:update_proplist(
884                           Props,
885                           [{auth_type, sasl}, {sasl_password, generate_sasl_password()}])}
886          end, Buckets),
887    [{set, buckets, [{configs, NewBuckets}]}].
888
889config_upgrade_to_51(Config) ->
890    %% fix for possible consequence of MB-27160
891    Buckets = get_buckets(Config),
892    NewBuckets =
893        lists:map(
894          fun ({"default" = Name, BucketConfig}) ->
895                  {Name,
896                   case sasl_password(BucketConfig) of
897                       "" ->
898                           lists:keystore(sasl_password, 1, BucketConfig,
899                                          {sasl_password, generate_sasl_password()});
900                       _ ->
901                           BucketConfig
902                   end};
903              (Pair) ->
904                  Pair
905          end, Buckets),
906    [{set, buckets, [{configs, NewBuckets}]}].
907
908config_upgrade_to_55(Config) ->
909    Buckets = get_buckets(Config),
910    NewBuckets =
911        lists:map(
912          fun ({Name, BCfg}) ->
913                  BCfg1 = lists:keystore(max_ttl, 1, BCfg, {max_ttl, 0}),
914                  BCfg2 = lists:keystore(compression_mode, 1, BCfg1,
915                                         {compression_mode, off}),
916                  {Name, BCfg2}
917          end, Buckets),
918    [{set, buckets, [{configs, NewBuckets}]}].
919
920%%
921%% Internal functions
922%%
923
924%%
925%% Tests
926%%
927
928min_live_copies_test() ->
929    ?assertEqual(min_live_copies([node1], []), undefined),
930    ?assertEqual(min_live_copies([node1], [{map, undefined}]), undefined),
931    Map1 = [[node1, node2], [node2, node1]],
932    ?assertEqual(2, min_live_copies([node1, node2], [{map, Map1}])),
933    ?assertEqual(1, min_live_copies([node1], [{map, Map1}])),
934    ?assertEqual(0, min_live_copies([node3], [{map, Map1}])),
935    Map2 = [[undefined, node2], [node2, node1]],
936    ?assertEqual(1, min_live_copies([node1, node2], [{map, Map2}])),
937    ?assertEqual(0, min_live_copies([node1, node3], [{map, Map2}])).
938