xref: /5.5.2/ns_server/src/menelaus_users.erl (revision d1858ba3)
1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2016-2018 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16%% @doc implementation of local and external users
17
18-module(menelaus_users).
19
20-include("ns_common.hrl").
21-include("ns_config.hrl").
22-include("rbac.hrl").
23-include("pipes.hrl").
24-include("cut.hrl").
25
26-include_lib("eunit/include/eunit.hrl").
27
28-export([get_users_45/1,
29         select_users/1,
30         select_auth_infos/1,
31         store_user/4,
32         delete_user/1,
33         change_password/2,
34         authenticate/2,
35         get_roles/1,
36         user_exists/1,
37         get_user_name/1,
38         upgrade_to_4_5/1,
39         get_password_change_timestamp/1,
40         get_salt_and_mac/1,
41         user_auth_info/2,
42         build_scram_auth/1,
43         build_scram_auth_info/1,
44         build_plain_auth/1,
45         build_plain_auth/2,
46         get_users_version/0,
47         get_auth_version/0,
48         empty_storage/0,
49         upgrade_to_50/2,
50         upgrade_to_55/2,
51         config_upgrade_to_50/0,
52         config_upgrade_to_55/0,
53         upgrade_status/0,
54         get_passwordless/0,
55         filter_out_invalid_roles/3,
56         cleanup_bucket_roles/1,
57         get_auth_info/1]).
58
59%% callbacks for replicated_dets
60-export([init/1, on_save/2, on_empty/1, handle_call/4, handle_info/2]).
61
62-export([start_storage/0, start_replicator/0, start_auth_cache/0]).
63
64%% RPC'd from ns_couchdb node
65-export([get_auth_info_on_ns_server/1]).
66
67-define(MAX_USERS_ON_CE, 20).
68
69-record(state, {base, passwordless}).
70
71replicator_name() ->
72    users_replicator.
73
74storage_name() ->
75    users_storage.
76
77versions_name() ->
78    menelaus_users_versions.
79
80auth_cache_name() ->
81    menelaus_users_cache.
82
83start_storage() ->
84    Replicator = erlang:whereis(replicator_name()),
85    Path = filename:join(path_config:component_path(data, "config"), "users.dets"),
86    CacheSize = ns_config:read_key_fast(menelaus_users_cache_size, 256),
87    replicated_dets:start_link(?MODULE, [], storage_name(), Path, Replicator, CacheSize).
88
89get_users_version() ->
90    case ns_node_disco:couchdb_node() == node() of
91        false ->
92            [{user_version, V, Base}] = ets:lookup(versions_name(), user_version),
93            {V, Base};
94        true ->
95            rpc:call(ns_node_disco:ns_server_node(), ?MODULE, get_users_version, [])
96    end.
97
98get_auth_version() ->
99    case ns_node_disco:couchdb_node() == node() of
100        false ->
101            [{auth_version, V, Base}] = ets:lookup(versions_name(), auth_version),
102            {V, Base};
103        true ->
104            rpc:call(ns_node_disco:ns_server_node(), ?MODULE, get_auth_version, [])
105    end.
106
107start_replicator() ->
108    GetRemoteNodes =
109        fun () ->
110                ns_node_disco:nodes_actual_other()
111        end,
112    doc_replicator:start_link(replicator_name(), GetRemoteNodes,
113                              storage_name()).
114
115start_auth_cache() ->
116    versioned_cache:start_link(
117      auth_cache_name(), 200,
118      fun (I) ->
119              ?log_debug("Retrieve user ~p from ns_server node",
120                         [ns_config_log:tag_user_data(I)]),
121              rpc:call(ns_node_disco:ns_server_node(), ?MODULE, get_auth_info_on_ns_server, [I])
122      end,
123      fun () ->
124              dist_manager:wait_for_node(fun ns_node_disco:ns_server_node/0),
125              [{{user_storage_events, ns_node_disco:ns_server_node()}, fun (_) -> true end}]
126      end,
127      fun () -> {get_auth_version(), get_users_version()} end).
128
129empty_storage() ->
130    replicated_dets:empty(storage_name()).
131
132get_passwordless() ->
133    gen_server:call(storage_name(), get_passwordless, infinity).
134
135init([]) ->
136    _ = ets:new(versions_name(), [protected, named_table]),
137    #state{base = init_versions()}.
138
139init_versions() ->
140    Base = crypto:rand_uniform(0, 16#100000000),
141    ets:insert_new(versions_name(), [{user_version, 0, Base}, {auth_version, 0, Base}]),
142    gen_event:notify(user_storage_events, {user_version, {0, Base}}),
143    gen_event:notify(user_storage_events, {auth_version, {0, Base}}),
144    Base.
145
146on_save(Docs, State) ->
147    {MessagesToSend, NewState} =
148        lists:foldl(
149          fun (Doc, {MessagesAcc, StateAcc}) ->
150                  case replicated_dets:get_id(Doc) of
151                      {user, _} ->
152                          {sets:add_element({change_version, user_version}, MessagesAcc), StateAcc};
153                      {auth, Identity} ->
154                          NState = maybe_update_passwordless(Identity,
155                                                             replicated_dets:get_value(Doc),
156                                                             replicated_dets:is_deleted(Doc),
157                                                             StateAcc),
158                          {sets:add_element({change_version, auth_version}, MessagesAcc), NState}
159                  end
160          end, {sets:new(), State}, Docs),
161    lists:foreach(fun (Msg) ->
162                          self() ! Msg
163                  end, sets:to_list(MessagesToSend)),
164    NewState.
165
166handle_info({change_version, Key} = Msg, #state{base = Base} = State) ->
167    misc:flush(Msg),
168    Ver = ets:update_counter(versions_name(), Key, 1),
169    gen_event:notify(user_storage_events, {Key, {Ver, Base}}),
170    {noreply, State}.
171
172on_empty(_State) ->
173    true = ets:delete_all_objects(versions_name()),
174    #state{base = init_versions()}.
175
176maybe_update_passwordless(_Identity, _Value, _Deleted, State = #state{passwordless = undefined}) ->
177    State;
178maybe_update_passwordless(Identity, _Value, true, State = #state{passwordless = Passwordless}) ->
179    State#state{passwordless = lists:delete(Identity, Passwordless)};
180maybe_update_passwordless(Identity, Auth, false, State = #state{passwordless = Passwordless}) ->
181    NewPasswordless =
182        case authenticate_with_info(Auth, "") of
183            true ->
184                case lists:member(Identity, Passwordless) of
185                    true ->
186                        Passwordless;
187                    false ->
188                        [Identity | Passwordless]
189                end;
190            false ->
191                lists:delete(Identity, Passwordless)
192        end,
193    State#state{passwordless = NewPasswordless}.
194
195handle_call(get_passwordless, _From, TableName, #state{passwordless = undefined} = State) ->
196    Passwordless =
197        pipes:run(
198          replicated_dets:select(TableName, {auth, '_'}, 100, true),
199          ?make_consumer(
200             pipes:fold(?producer(),
201                        fun ({{auth, Identity}, Auth}, Acc) ->
202                                case authenticate_with_info(Auth, "") of
203                                    true ->
204                                        [Identity | Acc];
205                                    false ->
206                                        Acc
207                                end
208                        end, []))),
209    {reply, Passwordless, State#state{passwordless = Passwordless}};
210handle_call(get_passwordless, _From, _TableName, #state{passwordless = Passwordless} = State) ->
211    {reply, Passwordless, State}.
212
213-spec get_users_45(ns_config()) -> [{rbac_identity(), []}].
214get_users_45(Config) ->
215    ns_config:search(Config, user_roles, []).
216
217select_users(KeySpec) ->
218    replicated_dets:select(storage_name(), {user, KeySpec}, 100).
219
220select_auth_infos(KeySpec) ->
221    replicated_dets:select(storage_name(), {auth, KeySpec}, 100).
222
223build_auth(false, undefined) ->
224    password_required;
225build_auth(false, Password) ->
226    build_scram_auth(Password);
227build_auth({_, _}, undefined) ->
228    same;
229build_auth({_, CurrentAuth}, Password) ->
230    {Salt, Mac} = get_salt_and_mac(CurrentAuth),
231    case ns_config_auth:hash_password(Salt, Password) of
232        Mac ->
233            case has_scram_hashes(CurrentAuth) of
234                false ->
235                    build_scram_auth(Password);
236                _ ->
237                    same
238            end;
239        _ ->
240            build_scram_auth(Password)
241    end.
242
243-spec store_user(rbac_identity(), rbac_user_name(), rbac_password(), [rbac_role()]) -> run_txn_return().
244store_user(Identity, Name, Password, Roles) ->
245    Props = case Name of
246                undefined ->
247                    [];
248                _ ->
249                    [{name, Name}]
250            end,
251    case cluster_compat_mode:is_cluster_50() of
252        true ->
253            store_user_50(Identity, Props, Password, Roles, ns_config:get());
254        false ->
255            store_user_45(Identity, Props, Roles)
256    end.
257
258store_user_45({UserName, external}, Props, Roles) ->
259    ns_config:run_txn(
260      fun (Config, SetFn) ->
261              case menelaus_roles:validate_roles(Roles, Config) of
262                  {_, []} ->
263                      Identity = {UserName, saslauthd},
264                      Users = get_users_45(Config),
265                      NewUsers = lists:keystore(Identity, 1, Users,
266                                                {Identity, [{roles, Roles} | Props]}),
267                      {commit, SetFn(user_roles, NewUsers, Config)};
268                  {_, BadRoles} ->
269                      {abort, {error, roles_validation, BadRoles}}
270              end
271      end).
272
273count_users() ->
274    pipes:run(menelaus_users:select_users('_'),
275              ?make_consumer(
276                 pipes:fold(?producer(),
277                            fun (_, Acc) ->
278                                    Acc + 1
279                            end, 0))).
280
281check_limit(Identity) ->
282    case cluster_compat_mode:is_enterprise() of
283        true ->
284            true;
285        false ->
286            case count_users() >= ?MAX_USERS_ON_CE of
287                true ->
288                    user_exists(Identity);
289                false ->
290                    true
291            end
292    end.
293
294store_user_50({_UserName, Domain} = Identity, Props, Password, Roles, Config) ->
295    CurrentAuth = replicated_dets:get(storage_name(), {auth, Identity}),
296    case check_limit(Identity) of
297        true ->
298            case Domain of
299                external ->
300                    store_user_50_with_auth(Identity, Props, same, Roles, Config);
301                local ->
302                    case build_auth(CurrentAuth, Password) of
303                        password_required ->
304                            {abort, password_required};
305                        Auth ->
306                            store_user_50_with_auth(Identity, Props, Auth, Roles, Config)
307                    end
308            end;
309        false ->
310            {abort, too_many}
311    end.
312
313store_user_50_with_auth(Identity, Props, Auth, Roles, Config) ->
314    case menelaus_roles:validate_roles(Roles, Config) of
315        {NewRoles, []} ->
316            ok = store_user_50_validated(Identity, [{roles, NewRoles} | Props], Auth),
317            {commit, ok};
318        {_, BadRoles} ->
319            {abort, {error, roles_validation, BadRoles}}
320    end.
321
322store_user_50_validated(Identity, Props, Auth) ->
323    ok = replicated_dets:set(storage_name(), {user, Identity}, Props),
324    case store_auth(Identity, Auth) of
325        ok ->
326            ok;
327        unchanged ->
328            ok
329    end.
330
331store_auth(_Identity, same) ->
332    unchanged;
333store_auth(Identity, Auth) when is_list(Auth) ->
334    ok = replicated_dets:set(storage_name(), {auth, Identity}, Auth).
335
336change_password({_UserName, local} = Identity, Password) when is_list(Password) ->
337    case replicated_dets:get(storage_name(), {user, Identity}) of
338        false ->
339            user_not_found;
340        _ ->
341            CurrentAuth = replicated_dets:get(storage_name(), {auth, Identity}),
342            Auth = build_auth(CurrentAuth, Password),
343            store_auth(Identity, Auth)
344    end.
345
346-spec delete_user(rbac_identity()) -> run_txn_return().
347delete_user(Identity) ->
348    case cluster_compat_mode:is_cluster_50() of
349        true ->
350            delete_user_50(Identity);
351        false ->
352            delete_user_45(Identity)
353    end.
354
355delete_user_45({UserName, external}) ->
356    Identity = {UserName, saslauthd},
357    ns_config:run_txn(
358      fun (Config, SetFn) ->
359              case ns_config:search(Config, user_roles) of
360                  false ->
361                      {abort, {error, not_found}};
362                  {value, Users} ->
363                      case lists:keytake(Identity, 1, Users) of
364                          false ->
365                              {abort, {error, not_found}};
366                          {value, _, NewUsers} ->
367                              {commit, SetFn(user_roles, NewUsers, Config)}
368                      end
369              end
370      end).
371
372delete_user_50({_, Domain} = Identity) ->
373    case Domain of
374        local ->
375            _ = replicated_dets:delete(storage_name(), {auth, Identity});
376        external ->
377            ok
378    end,
379    case replicated_dets:delete(storage_name(), {user, Identity}) of
380        {not_found, _} ->
381            {abort, {error, not_found}};
382        ok ->
383            {commit, ok}
384    end.
385
386get_salt_and_mac(Auth) ->
387    SaltAndMacBase64 = binary_to_list(proplists:get_value(<<"plain">>, Auth)),
388    <<Salt:16/binary, Mac:20/binary>> = base64:decode(SaltAndMacBase64),
389    {Salt, Mac}.
390
391has_scram_hashes(Auth) ->
392    proplists:is_defined(<<"sha1">>, Auth).
393
394-spec authenticate(rbac_user_id(), rbac_password()) -> boolean().
395authenticate(Username, Password) ->
396    case cluster_compat_mode:is_cluster_50() of
397        true ->
398            Identity = {Username, local},
399            case get_auth_info(Identity) of
400                false ->
401                    false;
402                Auth ->
403                    authenticate_with_info(Auth, Password)
404            end;
405        false ->
406            false
407    end.
408
409get_auth_info(Identity) ->
410    case ns_node_disco:couchdb_node() == node() of
411        false ->
412            get_auth_info_on_ns_server(Identity);
413        true ->
414            versioned_cache:get(auth_cache_name(), Identity)
415    end.
416
417get_auth_info_on_ns_server(Identity) ->
418    case replicated_dets:get(storage_name(), {user, Identity}) of
419        false ->
420            false;
421        _ ->
422            case replicated_dets:get(storage_name(), {auth, Identity}) of
423                false ->
424                    false;
425                {_, Auth} ->
426                    Auth
427            end
428    end.
429
430-spec authenticate_with_info(list(), rbac_password()) -> boolean().
431authenticate_with_info(Auth, Password) ->
432    {Salt, Mac} = get_salt_and_mac(Auth),
433    misc:compare_secure(ns_config_auth:hash_password(Salt, Password), Mac).
434
435get_user_props_45({User, external}) ->
436    ns_config:search_prop(ns_config:latest(), user_roles, {User, saslauthd}, []).
437
438get_user_props(Identity) ->
439    case cluster_compat_mode:is_cluster_50() of
440        true ->
441            replicated_dets:get(storage_name(), {user, Identity}, []);
442        false ->
443            get_user_props_45(Identity)
444    end.
445
446-spec user_exists(rbac_identity()) -> boolean().
447user_exists(Identity) ->
448    get_user_props(Identity) =/= [].
449
450-spec get_roles(rbac_identity()) -> [rbac_role()].
451get_roles(Identity) ->
452    proplists:get_value(roles, get_user_props(Identity), []).
453
454-spec get_user_name(rbac_identity()) -> rbac_user_name().
455get_user_name({_, Domain} = Identity) when Domain =:= local orelse Domain =:= external ->
456    proplists:get_value(name, get_user_props(Identity));
457get_user_name(_) ->
458    undefined.
459
460-spec get_password_change_timestamp(rbac_identity()) -> undefined | integer().
461get_password_change_timestamp({_, external}) ->
462    undefined;
463get_password_change_timestamp(Identity) ->
464    replicated_dets:get_last_modified(storage_name(),
465                                      {auth, Identity},
466                                      undefined).
467
468user_auth_info(User, Auth) ->
469    {[{<<"n">>, list_to_binary(User)} | Auth]}.
470
471build_scram_auth_info(UserPasswords) ->
472    [user_auth_info(U, build_scram_auth(P)) || {U, P} <- UserPasswords].
473
474build_scram_auth(Password) ->
475    BuildAuth =
476        fun (Type) ->
477                {S, H, I} = scram_sha:hash_password(Type, Password),
478                {scram_sha:auth_info_key(Type),
479                    {[{<<"h">>, base64:encode(H)},
480                      {<<"s">>, base64:encode(S)},
481                      {<<"i">>, I}]}}
482        end,
483    build_plain_auth(Password) ++
484        [BuildAuth(Sha) || Sha <- scram_sha:supported_types()].
485
486build_plain_auth(Password) ->
487    {Salt, Mac} = ns_config_auth:hash_password(Password),
488    build_plain_auth(Salt, Mac).
489
490build_plain_auth(Salt, Mac) ->
491    SaltAndMac = <<Salt/binary, Mac/binary>>,
492    [{<<"plain">>, base64:encode(SaltAndMac)}].
493
494collect_users(asterisk, _Role, Dict) ->
495    Dict;
496collect_users([], _Role, Dict) ->
497    Dict;
498collect_users([User | Rest], Role, Dict) ->
499    NewDict = dict:update(User, fun (Roles) ->
500                                        ordsets:add_element(Role, Roles)
501                                end, ordsets:from_list([Role]), Dict),
502    collect_users(Rest, Role, NewDict).
503
504-spec upgrade_to_4_5(ns_config()) -> [{set, user_roles, _}].
505upgrade_to_4_5(Config) ->
506    case ns_config:search(Config, saslauthd_auth_settings) of
507        false ->
508            [];
509        {value, Props} ->
510            case proplists:get_value(enabled, Props, false) of
511                false ->
512                    [];
513                true ->
514                    Dict = dict:new(),
515                    Dict1 = collect_users(proplists:get_value(admins, Props, []), admin, Dict),
516                    Dict2 = collect_users(proplists:get_value(roAdmins, Props, []), ro_admin, Dict1),
517                    [{set, user_roles,
518                      lists:map(fun ({User, Roles}) ->
519                                        {{binary_to_list(User), saslauthd},
520                                         [{roles, ordsets:to_list(Roles)}]}
521                                end, dict:to_list(Dict2))}]
522            end
523    end.
524
525upgrade_to_4_5_test() ->
526    Config = [[{saslauthd_auth_settings,
527                [{enabled,true},
528                 {admins,[<<"user1">>, <<"user2">>, <<"user1">>, <<"user3">>]},
529                 {roAdmins,[<<"user4">>, <<"user1">>]}]}]],
530    UserRoles = [{{"user1", saslauthd}, [{roles, [admin, ro_admin]}]},
531                 {{"user2", saslauthd}, [{roles, [admin]}]},
532                 {{"user3", saslauthd}, [{roles, [admin]}]},
533                 {{"user4", saslauthd}, [{roles, [ro_admin]}]}],
534    Upgraded = upgrade_to_4_5(Config),
535    ?assertMatch([{set, user_roles, _}], Upgraded),
536    [{set, user_roles, UpgradedUserRoles}] = Upgraded,
537    ?assertMatch(UserRoles, lists:sort(UpgradedUserRoles)).
538
539upgrade_to_4_5_asterisk_test() ->
540    Config = [[{saslauthd_auth_settings,
541                [{enabled,true},
542                 {admins, asterisk},
543                 {roAdmins,[<<"user1">>]}]}]],
544    UserRoles = [{{"user1", saslauthd}, [{roles, [ro_admin]}]}],
545    Upgraded = upgrade_to_4_5(Config),
546    ?assertMatch([{set, user_roles, _}], Upgraded),
547    [{set, user_roles, UpgradedUserRoles}] = Upgraded,
548    ?assertMatch(UserRoles, lists:sort(UpgradedUserRoles)).
549
550upgrade_to_50(Config, Nodes) ->
551    try
552        Repair =
553            case ns_config:search(Config, users_upgrade) of
554                false ->
555                    ns_config:set(users_upgrade, started),
556                    false;
557                {value, started} ->
558                    ?log_debug("Found unfinished users upgrade. Continue."),
559                    true
560            end,
561        do_upgrade_to_50(Nodes, Repair),
562        ok
563    catch T:E ->
564            ale:error(?USER_LOGGER, "Unsuccessful user storage upgrade.~n~p",
565                      [{T,E,erlang:get_stacktrace()}]),
566            error
567    end.
568
569do_upgrade_to_50(Nodes, Repair) ->
570    %% propagate users_upgrade to nodes
571    case ns_config_rep:ensure_config_seen_by_nodes(Nodes) of
572        ok ->
573            ok;
574        {error, BadNodes} ->
575            throw({push_config, BadNodes})
576    end,
577    %% pull latest user information from nodes
578    case ns_config_rep:pull_remotes(Nodes) of
579        ok ->
580            ok;
581        Error ->
582            throw({pull_config, Error})
583    end,
584
585    case Repair of
586        true ->
587            %% in case if aborted upgrade left some junk
588            replicated_storage:sync_to_me(storage_name(), Nodes,
589                                          ?get_timeout(users_upgrade, 60000)),
590            replicated_dets:delete_all(storage_name());
591        false ->
592            ok
593    end,
594    Config = ns_config:get(),
595    AdminName =
596        case ns_config_auth:get_creds(Config, admin) of
597            undefined ->
598                undefined;
599            {AN, _} ->
600                AN
601        end,
602
603    case ns_config_auth:get_creds(Config, ro_admin) of
604        undefined ->
605            ok;
606        {ROAdmin, {Salt, Mac}} ->
607            Auth = build_plain_auth(Salt, Mac),
608            {commit, ok} =
609                store_user_50_with_auth({ROAdmin, local}, [{name, "Read Only User"}],
610                                        Auth, [ro_admin], Config)
611    end,
612
613    lists:foreach(
614      fun ({Name, _}) when Name =:= AdminName ->
615              ?log_warning("Not creating user for bucket ~p, because the name matches administrators id",
616                           [AdminName]);
617          ({BucketName, BucketConfig}) ->
618              Password = proplists:get_value(sasl_password, BucketConfig, ""),
619              UUID = proplists:get_value(uuid, BucketConfig),
620              Name = "Generated user for bucket " ++ BucketName,
621              ok = store_user_50_validated(
622                     {BucketName, local},
623                     [{name, Name}, {roles, [{bucket_full_access, [{BucketName, UUID}]}]}],
624                     build_scram_auth(Password))
625      end, ns_bucket:get_buckets(Config)),
626
627    LdapUsers = get_users_45(Config),
628    lists:foreach(
629      fun ({{LdapUser, saslauthd}, Props}) ->
630              Roles = proplists:get_value(roles, Props),
631              {ValidatedRoles, _} = menelaus_roles:validate_roles(Roles, Config),
632              NewProps = lists:keystore(roles, 1, Props, {roles, ValidatedRoles}),
633              ok = store_user_50_validated({LdapUser, external}, NewProps, same)
634      end, LdapUsers).
635
636config_upgrade_to_50() ->
637    [{delete, users_upgrade}, {delete, read_only_user_creds}].
638
639rbac_55_upgrade_key() ->
640    {rbac_upgrade, ?VERSION_55}.
641
642config_upgrade_to_55() ->
643    [{delete, rbac_55_upgrade_key()}].
644
645upgrade_status() ->
646    UserUpgrade = ns_config:read_key_fast(users_upgrade, undefined),
647    RolesUpgrade = ns_config:read_key_fast(rbac_55_upgrade_key(), undefined),
648    case {UserUpgrade, RolesUpgrade} of
649        {undefined, undefined} -> no_upgrade;
650        _ -> upgrade_in_progress
651    end.
652
653filter_out_invalid_roles(Props, Definitions, AllPossibleValues) ->
654    Roles = proplists:get_value(roles, Props, []),
655    FilteredRoles = menelaus_roles:filter_out_invalid_roles(Roles, Definitions, AllPossibleValues),
656    lists:keystore(roles, 1, Props, {roles, FilteredRoles}).
657
658cleanup_bucket_roles(BucketName) ->
659    ?log_debug("Delete all roles for bucket ~p", [BucketName]),
660    Buckets = lists:keydelete(BucketName, 1, ns_bucket:get_buckets()),
661    Definitions = menelaus_roles:get_definitions(),
662    AllPossibleValues = menelaus_roles:calculate_possible_param_values(Buckets),
663
664    UpdateFun =
665        fun ({user, Key}, Props) ->
666                case menelaus_users:filter_out_invalid_roles(Props, Definitions,
667                                                             AllPossibleValues) of
668                    Props ->
669                        skip;
670                    NewProps ->
671                        ?log_debug("Changing properties of ~p from ~p to ~p due to deletion of ~p",
672                                   [Key, Props, NewProps, BucketName]),
673                        {update, NewProps}
674                end
675        end,
676    case replicated_dets:select_with_update(storage_name(), {user, '_'}, 100, UpdateFun) of
677        [] ->
678            ok;
679        Errors ->
680            ?log_warning("Failed to cleanup some roles: ~p", [Errors]),
681            ok
682    end.
683
684upgrade_to_55(Config, Nodes) ->
685    try
686        case ns_config:search(Config, rbac_55_upgrade_key()) of
687            false ->
688                ns_config:set(rbac_55_upgrade_key(), started);
689            {value, started} ->
690                ?log_debug("Found unfinished roles upgrade. Continue.")
691        end,
692        do_upgrade_to_55(Nodes),
693        ok
694    catch T:E ->
695              ale:error(?USER_LOGGER, "Unsuccessful user storage upgrade.~n~p",
696                        [{T, E, erlang:get_stacktrace()}]),
697              error
698    end.
699
700upgrade_roles_to_55(OldRoles) ->
701    lists:flatmap(fun maybe_upgrade_role_to_55/1, OldRoles).
702
703maybe_upgrade_role_to_55(cluster_admin) ->
704    [cluster_admin, {bucket_full_access, [any]}];
705maybe_upgrade_role_to_55({bucket_admin, Buckets}) ->
706    [{bucket_admin, Buckets}, {bucket_full_access, Buckets}];
707maybe_upgrade_role_to_55(Role) ->
708    [Role].
709
710upgrade_user_roles_to_55(Props) ->
711    OldRoles = proplists:get_value(roles, Props),
712    %% Convert roles and remove duplicates.
713    NewRoles = lists:usort(upgrade_roles_to_55(OldRoles)),
714    [{roles, NewRoles} | lists:keydelete(roles, 1, Props)].
715
716user_roles_require_upgrade({{user, _Identity}, Props}) ->
717    Roles = proplists:get_value(roles, Props),
718    lists:any(?cut(maybe_upgrade_role_to_55(_1) =/= [_1]), Roles).
719
720fetch_users_for_55_upgrade() ->
721    pipes:run(menelaus_users:select_users({'_', '_'}),
722              [pipes:filter(fun user_roles_require_upgrade/1),
723               pipes:map(fun ({{user, I}, _}) -> I end)],
724              pipes:collect()).
725
726do_upgrade_to_55(Nodes) ->
727    %% propagate rbac_55_upgrade_key to nodes
728    ok = ns_config_rep:ensure_config_seen_by_nodes(Nodes),
729
730    replicated_storage:sync_to_me(
731      storage_name(), Nodes, ?get_timeout(rbac_55_upgrade_key(), 60000)),
732
733    UpdateUsers = fetch_users_for_55_upgrade(),
734    lists:foreach(fun (Identity) ->
735                          OldProps = get_user_props(Identity),
736                          NewProps = upgrade_user_roles_to_55(OldProps),
737                          store_user_50_validated(Identity, NewProps, same)
738                  end, UpdateUsers).
739