1%% @author Couchbase <info@couchbase.com>
2%% @copyright 2016 Couchbase, Inc.
3%%
4%% Licensed under the Apache License, Version 2.0 (the "License");
5%% you may not use this file except in compliance with the License.
6%% You may obtain a copy of the License at
7%%
8%%      http://www.apache.org/licenses/LICENSE-2.0
9%%
10%% Unless required by applicable law or agreed to in writing, software
11%% distributed under the License is distributed on an "AS IS" BASIS,
12%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13%% See the License for the specific language governing permissions and
14%% limitations under the License.
15%%
16-module(rebalance_progress).
17
18-export([init/1, init/2, get_progress/1, update/3]).
19
20-record(progress, {
21          per_service :: dict(),
22          aggregated  :: dict()
23         }).
24
25init(LiveNodes) ->
26    init(LiveNodes, [kv] ++ ns_cluster_membership:topology_aware_services()).
27
28init(LiveNodes, Services) ->
29    do_init([{S, ns_cluster_membership:service_nodes(LiveNodes, S)} ||
30                S <- Services]).
31
32do_init(Services) ->
33    aggregate(init_per_service(Services)).
34
35init_per_service(Services) ->
36    dict:from_list([{Service, init_service(Nodes)} ||
37                       {Service, Nodes} <- Services]).
38
39init_service(Nodes) ->
40    dict:from_list([{N, 0} || N <- Nodes]).
41
42get_progress(#progress{aggregated = Aggregated}) ->
43    Aggregated.
44
45update(Service, ServiceProgress, #progress{per_service = PerService}) ->
46    aggregate(do_update(Service, ServiceProgress, PerService)).
47
48do_update(Service, ServiceProgress, PerService) ->
49    dict:update(Service,
50                fun (OldServiceProgress) ->
51                        dict:merge(fun (_, _, New) ->
52                                           New
53                                   end, OldServiceProgress, ServiceProgress)
54                end, PerService).
55
56aggregate(PerService) ->
57    Aggregated0 =
58        dict:fold(
59          fun (_, ServiceProgress, AggAcc) ->
60                  dict:fold(
61                    fun (Node, NodeProgress, Acc) ->
62                            misc:dict_update(
63                              Node,
64                              fun ({Count, Sum}) ->
65                                      {Count + 1, Sum + NodeProgress}
66                              end, {0, 0}, Acc)
67                    end, AggAcc, ServiceProgress)
68          end, dict:new(), PerService),
69
70    Aggregated =
71        dict:map(fun (_, {Count, Sum}) ->
72                         Sum / Count
73                 end, Aggregated0),
74
75    #progress{per_service = PerService,
76              aggregated = Aggregated}.
77