1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_group).
16-behaviour(gen_server).
17
18%% API
19-export([start_link/1, request_group_info/1, get_data_size/1]).
20-export([open_set_group/3]).
21-export([request_group/2, release_group/1]).
22-export([is_view_defined/1, define_view/2]).
23-export([set_state/4]).
24-export([add_replica_partitions/2, remove_replica_partitions/2]).
25-export([mark_as_unindexable/2, mark_as_indexable/2]).
26-export([monitor_partition_update/4, demonitor_partition_update/2]).
27-export([reset_utilization_stats/1, get_utilization_stats/1]).
28-export([inc_access_stat/1, remove_duplicate_partitions/1]).
29
30%% gen_server callbacks
31-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
32         terminate/2, code_change/3]).
33
34-include("couch_db.hrl").
35-include_lib("couch_set_view/include/couch_set_view.hrl").
36
37-type init_args()   :: {string(), binary(), #set_view_group{}}.
38-type compact_fun() :: fun((#set_view_group{},
39                            #set_view_group{},
40                            string(),
41                            pid() | 'nil',
42                            pid()) -> no_return()).
43-type monitor_error() :: {'shutdown', any()} |
44                         'marked_for_cleanup' |
45                         {'updater_error', any()}.
46
47-define(DEFAULT_TIMEOUT, 3000).
48-define(GET_TIMEOUT(State), ((State)#state.timeout)).
49
50-define(root_dir(State), element(1, State#state.init_args)).
51-define(set_name(State), element(2, State#state.init_args)).
52-define(type(State), (element(3, State#state.init_args))#set_view_group.type).
53-define(group_sig(State), (element(3, State#state.init_args))#set_view_group.sig).
54-define(group_id(State), (State#state.group)#set_view_group.name).
55-define(dcp_pid(State), (State#state.group)#set_view_group.dcp_pid).
56-define(category(State), (State#state.group)#set_view_group.category).
57-define(is_defined(State),
58    (((State#state.group)#set_view_group.index_header)#set_view_index_header.num_partitions > 0)).
59-define(replicas_on_transfer(State),
60        ((State#state.group)#set_view_group.index_header)#set_view_index_header.replicas_on_transfer).
61-define(have_pending_transition(State),
62        ((((State#state.group)#set_view_group.index_header)
63          #set_view_index_header.pending_transition) /= nil)).
64
65-define(MAX_HIST_SIZE, 10).
66% flow control buffer size 20 MB
67-define(DCP_CONTROL_BUFFER_SIZE, "20971520").
68
69% Seqs cache ttl in microseconds
70-define(SEQS_CACHE_TTL, 300000).
71
72-record(util_stats, {
73    useful_indexing_time = 0.0  :: float(),
74    wasted_indexing_time = 0.0  :: float(),
75    updates = 0                 :: non_neg_integer(),
76    updater_interruptions = 0   :: non_neg_integer(),
77    compaction_time = 0.0       :: float(),
78    compactions = 0             :: non_neg_integer(),
79    compactor_interruptions = 0 :: non_neg_integer()
80}).
81
82-record(up_listener, {
83    pid,
84    monref,
85    partition,
86    seq
87}).
88
89-record(waiter, {
90    from,
91    debug = false :: boolean(),
92    % seqs for active partitions only
93    seqs = []     :: partition_seqs()
94}).
95
96-record(seqs_cache, {
97    timestamp = {0, 0, 0}              :: timer:time(),
98    is_waiting = false                 :: boolean(),
99    seqs = []                          :: partition_seqs()
100}).
101
102-record(state, {
103    init_args                          :: init_args(),
104    replica_group = nil                :: 'nil' | pid(),
105    group = #set_view_group{}          :: #set_view_group{},
106    updater_pid = nil                  :: 'nil' | pid(),
107    initial_build = false              :: boolean(),
108    updater_state = not_running        :: set_view_updater_state() | 'not_running' | 'starting',
109    compactor_pid = nil                :: 'nil' | pid(),
110    compactor_file = nil               :: 'nil' | pid(),
111    compactor_fun = nil                :: 'nil' | compact_fun(),
112    compactor_retry_number = 0         :: non_neg_integer(),
113    waiting_list = []                  :: [#waiter{}],
114    cleaner_pid = nil                  :: 'nil' | pid(),
115    shutdown = false                   :: boolean(),
116    shutdown_aliases                   :: [binary()],
117    auto_cleanup = true                :: boolean(),
118    auto_transfer_replicas = true      :: boolean(),
119    replica_partitions = []            :: ordsets:ordset(partition_id()),
120    pending_transition_waiters = []    :: [{From::{pid(), reference()}, #set_view_group_req{}}],
121    update_listeners = dict:new()      :: dict(),
122    compact_log_files = nil            :: 'nil' | {[[string()]], partition_seqs(), partition_versions()},
123    timeout = ?DEFAULT_TIMEOUT         :: non_neg_integer() | 'infinity'
124}).
125
126-define(inc_stat(Group, S),
127    ets:update_counter(
128        Group#set_view_group.stats_ets,
129        ?set_view_group_stats_key(Group),
130        {S, 1})).
131-define(inc_cleanup_stops(Group), ?inc_stat(Group, #set_view_group_stats.cleanup_stops)).
132-define(inc_updater_errors(Group), ?inc_stat(Group, #set_view_group_stats.update_errors)).
133-define(inc_accesses(Group), ?inc_stat(Group, #set_view_group_stats.accesses)).
134
135% Same as in couch_file. That's the offset where headers
136% are stored
137-define(SIZE_BLOCK, 4096).
138
139
140% api methods
141-spec request_group(pid(), #set_view_group_req{}) ->
142                   {'ok', #set_view_group{}} | {'error', term()}.
143request_group(Pid, Req) ->
144    #set_view_group_req{wanted_partitions = WantedPartitions} = Req,
145    Req2 = Req#set_view_group_req{
146        wanted_partitions = ordsets:from_list(WantedPartitions)
147    },
148    request_group(Pid, Req2, 1).
149
150-spec request_group(pid(), #set_view_group_req{}, non_neg_integer()) ->
151                   {'ok', #set_view_group{}} | {'error', term()}.
152request_group(Pid, Req, Retries) ->
153    case gen_server:call(Pid, Req, infinity) of
154    {ok, Group, ActiveReplicasBitmask} ->
155        #set_view_group{
156            ref_counter = RefCounter,
157            replica_pid = RepPid,
158            name = GroupName,
159            set_name = SetName,
160            category = Category
161        } = Group,
162        case request_replica_group(RepPid, ActiveReplicasBitmask, Req) of
163        {ok, RepGroup} ->
164            {ok, Group#set_view_group{replica_group = RepGroup}};
165        retry ->
166            couch_ref_counter:drop(RefCounter),
167            ?LOG_INFO("Retrying group `~s` (~s) request, stale=~s,"
168                      " set `~s`, retry attempt #~p",
169                      [GroupName, Category, Req#set_view_group_req.stale,
170                       SetName,Retries]),
171            request_group(Pid, Req, Retries + 1)
172        end;
173    Error ->
174        Error
175    end.
176
177
178-spec request_replica_group(pid(), bitmask(), #set_view_group_req{}) ->
179                           {'ok', #set_view_group{} | 'nil'} | 'retry'.
180request_replica_group(_RepPid, 0, _Req) ->
181    {ok, nil};
182request_replica_group(RepPid, ActiveReplicasBitmask, Req) ->
183    {ok, RepGroup, 0} = gen_server:call(RepPid, Req, infinity),
184    case ?set_abitmask(RepGroup) =:= ActiveReplicasBitmask of
185    true ->
186        {ok, RepGroup};
187    false ->
188        couch_ref_counter:drop(RepGroup#set_view_group.ref_counter),
189        retry
190    end.
191
192
193-spec release_group(#set_view_group{}) -> ok.
194release_group(#set_view_group{ref_counter = RefCounter, replica_group = RepGroup}) ->
195    couch_ref_counter:drop(RefCounter),
196    case RepGroup of
197    #set_view_group{ref_counter = RepRefCounter} ->
198        couch_ref_counter:drop(RepRefCounter);
199    nil ->
200        ok
201    end.
202
203
204-spec request_group_info(pid()) -> {'ok', [{term(), term()}]}.
205request_group_info(Pid) ->
206    case gen_server:call(Pid, request_group_info, infinity) of
207    {ok, GroupInfoList} ->
208        {ok, GroupInfoList};
209    Error ->
210        throw(Error)
211    end.
212
213
214-spec get_data_size(pid()) -> {'ok', [{term(), term()}]}.
215get_data_size(Pid) ->
216    case gen_server:call(Pid, get_data_size, infinity) of
217    {ok, _Info} = Ok ->
218        Ok;
219    Error ->
220        throw(Error)
221    end.
222
223
224-spec define_view(pid(), #set_view_params{}) -> 'ok' | {'error', term()}.
225define_view(Pid, Params) ->
226    #set_view_params{
227        max_partitions = NumPartitions,
228        active_partitions = ActivePartitionsList,
229        passive_partitions = PassivePartitionsList,
230        use_replica_index = UseReplicaIndex
231    } = Params,
232    ActiveList = lists:usort(ActivePartitionsList),
233    ActiveBitmask = couch_set_view_util:build_bitmask(ActiveList),
234    PassiveList = lists:usort(PassivePartitionsList),
235    PassiveBitmask = couch_set_view_util:build_bitmask(PassiveList),
236    case (ActiveBitmask band PassiveBitmask) /= 0 of
237    true ->
238        throw({bad_view_definition,
239            <<"Intersection between active and passive bitmasks">>});
240    false ->
241        ok
242    end,
243    gen_server:call(
244        Pid, {define_view, NumPartitions, ActiveList, ActiveBitmask,
245            PassiveList, PassiveBitmask, UseReplicaIndex}, infinity).
246
247
248-spec is_view_defined(pid()) -> boolean().
249is_view_defined(Pid) ->
250    gen_server:call(Pid, is_view_defined, infinity).
251
252
253-spec set_state(pid(),
254                ordsets:ordset(partition_id()),
255                ordsets:ordset(partition_id()),
256                ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
257set_state(_Pid, [], [], []) ->
258    ok;
259set_state(Pid, Active, Passive, Cleanup) ->
260    ordsets:is_set(Active) orelse throw({error, <<"Active list is not an ordset">>}),
261    ordsets:is_set(Passive) orelse throw({error, <<"Passive list is not an ordset">>}),
262    case ordsets:intersection(Active, Passive) of
263    [] ->
264        ordsets:is_set(Cleanup) orelse throw({error, <<"Cleanup list is not an ordset">>}),
265        case ordsets:intersection(Active, Cleanup) of
266        [] ->
267            case ordsets:intersection(Passive, Cleanup) of
268            [] ->
269                gen_server:call(
270                    Pid, {set_state, Active, Passive, Cleanup}, infinity);
271            _ ->
272                {error,
273                    <<"Intersection between passive and cleanup partition lists">>}
274            end;
275        _ ->
276            {error, <<"Intersection between active and cleanup partition lists">>}
277        end;
278    _ ->
279        {error, <<"Intersection between active and passive partition lists">>}
280    end.
281
282
283-spec add_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
284add_replica_partitions(_Pid, []) ->
285    ok;
286add_replica_partitions(Pid, Partitions) ->
287    BitMask = couch_set_view_util:build_bitmask(Partitions),
288    gen_server:call(Pid, {add_replicas, BitMask}, infinity).
289
290
291-spec remove_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
292remove_replica_partitions(_Pid, []) ->
293    ok;
294remove_replica_partitions(Pid, Partitions) ->
295    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
296    gen_server:call(Pid, {remove_replicas, Partitions}, infinity).
297
298
299-spec mark_as_unindexable(pid(), ordsets:ordset(partition_id())) ->
300                                 'ok' | {'error', term()}.
301mark_as_unindexable(Pid, Partitions) ->
302    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
303    gen_server:call(Pid, {mark_as_unindexable, Partitions}, infinity).
304
305
306-spec mark_as_indexable(pid(), ordsets:ordset(partition_id())) ->
307                               'ok' | {'error', term()}.
308mark_as_indexable(Pid, Partitions) ->
309    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
310    gen_server:call(Pid, {mark_as_indexable, Partitions}, infinity).
311
312
313-spec monitor_partition_update(pid(), partition_id(), reference(), pid()) ->
314                               'ok' | {'error', term()}.
315monitor_partition_update(Pid, PartitionId, Ref, CallerPid) ->
316    gen_server:call(
317        Pid, {monitor_partition_update, PartitionId, Ref, CallerPid}, infinity).
318
319
320-spec demonitor_partition_update(pid(), reference()) -> 'ok'.
321demonitor_partition_update(Pid, Ref) ->
322    ok = gen_server:call(Pid, {demonitor_partition_update, Ref}, infinity).
323
324
325-spec reset_utilization_stats(pid()) -> 'ok'.
326reset_utilization_stats(Pid) ->
327    ok = gen_server:call(Pid, reset_utilization_stats, infinity).
328
329
330-spec get_utilization_stats(pid()) -> {'ok', [{atom() | binary(), term()}]}.
331get_utilization_stats(Pid) ->
332    gen_server:call(Pid, get_utilization_stats, infinity).
333
334-spec inc_access_stat(pid()) -> 'ok'.
335inc_access_stat(Pid) ->
336    gen_server:call(Pid, increment_stat, infinity).
337
338start_link({RootDir, SetName, Group}) ->
339    Args = {RootDir, SetName, Group#set_view_group{type = main}},
340    proc_lib:start_link(?MODULE, init, [Args]).
341
342
343init({_, _, Group} = InitArgs) ->
344    process_flag(trap_exit, true),
345    {ok, State} = try
346        do_init(InitArgs)
347    catch
348    _:Error ->
349        ?LOG_ERROR("~s error opening set view group `~s` (~s), signature `~s',"
350                   " from set `~s`: ~p",
351                   [?MODULE, Group#set_view_group.name,
352                    Group#set_view_group.category, hex_sig(Group),
353                    Group#set_view_group.set_name, Error]),
354        exit(Error)
355    end,
356    proc_lib:init_ack({ok, self()}),
357    gen_server:enter_loop(?MODULE, [], State, 1).
358
359
360do_init({_, SetName, _} = InitArgs) ->
361    case prepare_group(InitArgs, false) of
362    {ok, Group} ->
363        #set_view_group{
364            fd = Fd,
365            index_header = Header,
366            type = Type,
367            category = Category,
368            mod = Mod
369        } = Group,
370        RefCounter = new_fd_ref_counter(Fd),
371        case Header#set_view_index_header.has_replica of
372        false ->
373            ReplicaPid = nil,
374            ReplicaParts = [];
375        true ->
376            ReplicaPid = open_replica_group(InitArgs),
377            maybe_fix_replica_group(ReplicaPid, Group),
378            ReplicaParts = get_replica_partitions(ReplicaPid)
379        end,
380        ViewCount = length(Group#set_view_group.views),
381        case Header#set_view_index_header.num_partitions > 0 of
382        false ->
383            ?LOG_INFO("Started undefined ~s (~s) set view group `~s`,"
384                      " group `~s`,  signature `~s', view count: ~p",
385                      [Type, Category, SetName,
386                       Group#set_view_group.name, hex_sig(Group), ViewCount]);
387        true ->
388            ?LOG_INFO("Started ~s (~s) set view group `~s`, group `~s`,"
389                      " signature `~s', view count ~p~n"
390                      "active partitions:      ~w~n"
391                      "passive partitions:     ~w~n"
392                      "cleanup partitions:     ~w~n"
393                      "unindexable partitions: ~w~n"
394                      "~sreplica support~n" ++
395                      case Header#set_view_index_header.has_replica of
396                      true ->
397                          "replica partitions: ~w~n"
398                          "replica partitions on transfer: ~w~n";
399                      false ->
400                          ""
401                      end,
402                      [Type, Category, SetName, Group#set_view_group.name,
403                       hex_sig(Group), ViewCount,
404                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.abitmask),
405                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.pbitmask),
406                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.cbitmask),
407                       Header#set_view_index_header.unindexable_seqs,
408                       case Header#set_view_index_header.has_replica of
409                       true ->
410                           "";
411                       false ->
412                           "no "
413                       end] ++
414                       case Header#set_view_index_header.has_replica of
415                       true ->
416                           [ReplicaParts, ?set_replicas_on_transfer(Group)];
417                       false ->
418                           []
419                       end)
420        end,
421        DcpName = <<(atom_to_binary(Mod, latin1))/binary, ": ",
422            SetName/binary, " ", (Group#set_view_group.name)/binary,
423            " (", (atom_to_binary(Category, latin1))/binary, "/",
424            (atom_to_binary(Type, latin1))/binary, ")">>,
425        {User, Passwd} = get_auth(),
426        DcpBufferSize = list_to_integer(couch_config:get("dcp",
427            "flow_control_buffer_size", ?DCP_CONTROL_BUFFER_SIZE)),
428        ?LOG_INFO("Flow control buffer size is ~p bytes", [DcpBufferSize]),
429
430        case couch_dcp_client:start(DcpName, SetName, User, Passwd,
431            DcpBufferSize) of
432        {ok, DcpPid} ->
433            Group2 = maybe_upgrade_header(Group, DcpPid),
434            State = #state{
435                init_args = InitArgs,
436                replica_group = ReplicaPid,
437                replica_partitions = ReplicaParts,
438                group = Group2#set_view_group{
439                    ref_counter = RefCounter,
440                    replica_pid = ReplicaPid,
441                    dcp_pid = DcpPid
442                }
443            },
444            init_seqs_cache(),
445            true = ets:insert(
446                 Group#set_view_group.stats_ets,
447                 #set_view_group_stats{ets_key = ?set_view_group_stats_key(Group)}),
448            TmpDir = updater_tmp_dir(State),
449            ok = couch_set_view_util:delete_sort_files(TmpDir, all),
450            reset_util_stats(),
451            {ok, maybe_apply_pending_transition(State)};
452        Error ->
453            couch_file:close(Fd),
454            throw(Error)
455        end;
456    Error ->
457        throw(Error)
458    end.
459
460handle_call(get_sig, _From, #state{group = Group} = State) ->
461    {reply, {ok, Group#set_view_group.sig}, State, ?GET_TIMEOUT(State)};
462
463handle_call({set_auto_cleanup, Enabled}, _From, State) ->
464    % To be used only by unit tests.
465    {reply, ok, State#state{auto_cleanup = Enabled}, ?GET_TIMEOUT(State)};
466
467handle_call({set_timeout, T}, _From, State) ->
468    % To be used only by unit tests.
469    {reply, ok, State#state{timeout = T}, T};
470
471handle_call({set_auto_transfer_replicas, Enabled}, _From, State) ->
472    % To be used only by unit tests.
473    {reply, ok, State#state{auto_transfer_replicas = Enabled}, ?GET_TIMEOUT(State)};
474
475handle_call({define_view, NumPartitions, _, _, _, _, _}, _From, State)
476        when (not ?is_defined(State)), NumPartitions > ?MAX_NUM_PARTITIONS ->
477    {reply, {error, <<"Too high value for number of partitions">>}, State};
478
479handle_call({define_view, NumPartitions, ActiveList, ActiveBitmask,
480        PassiveList, PassiveBitmask, UseReplicaIndex}, _From, State) when not ?is_defined(State) ->
481    #state{init_args = InitArgs, group = Group} = State,
482    PartitionsList = lists:usort(ActiveList ++ PassiveList),
483    Seqs = lists:map(
484        fun(PartId) -> {PartId, 0} end, PartitionsList),
485    PartVersions = lists:map(
486        fun(PartId) -> {PartId, [{0, 0}]} end, PartitionsList),
487    #set_view_group{
488        name = DDocId,
489        index_header = Header
490    } = Group,
491    NewHeader = Header#set_view_index_header{
492        num_partitions = NumPartitions,
493        abitmask = ActiveBitmask,
494        pbitmask = PassiveBitmask,
495        seqs = Seqs,
496        has_replica = UseReplicaIndex,
497        partition_versions = PartVersions
498    },
499    case (?type(State) =:= main) andalso UseReplicaIndex of
500    false ->
501        ReplicaPid = nil;
502    true ->
503        ReplicaPid = open_replica_group(InitArgs),
504        ok = gen_server:call(ReplicaPid, {define_view, NumPartitions, [], 0, [], 0, false}, infinity)
505    end,
506    NewGroup = Group#set_view_group{
507        index_header = NewHeader,
508        replica_pid = ReplicaPid
509    },
510    State2 = State#state{
511        group = NewGroup,
512        replica_group = ReplicaPid
513    },
514    {ok, HeaderPos} = commit_header(NewGroup),
515    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
516              " configured with:~n"
517              "~p partitions~n"
518              "~sreplica support~n"
519              "initial active partitions ~w~n"
520              "initial passive partitions ~w",
521              [?set_name(State), ?type(State), ?category(State), DDocId,
522               hex_sig(Group), NumPartitions,
523               case UseReplicaIndex of
524               true ->  "";
525               false -> "no "
526               end,
527        ActiveList, PassiveList]),
528    NewGroup2 = (State2#state.group)#set_view_group{
529        header_pos = HeaderPos
530    },
531    State3 = State2#state{
532        group = NewGroup2
533    },
534    {reply, ok, State3, ?GET_TIMEOUT(State3)};
535
536handle_call({define_view, _, _, _, _, _, _}, _From, State) ->
537    {reply, view_already_defined, State, ?GET_TIMEOUT(State)};
538
539handle_call(is_view_defined, _From, State) ->
540    {reply, ?is_defined(State), State, ?GET_TIMEOUT(State)};
541
542handle_call(_Msg, _From, State) when not ?is_defined(State) ->
543    {reply, {error, view_undefined}, State};
544
545handle_call({set_state, ActiveList, PassiveList, CleanupList}, _From, State) ->
546    try
547        NewState = maybe_update_partition_states(
548            ActiveList, PassiveList, CleanupList, State),
549        {reply, ok, NewState, ?GET_TIMEOUT(NewState)}
550    catch
551    throw:Error ->
552        {reply, Error, State}
553    end;
554
555handle_call({add_replicas, BitMask}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
556    #state{
557        group = Group,
558        replica_partitions = ReplicaParts
559    } = State,
560    BitMask2 = case BitMask band ?set_abitmask(Group) of
561    0 ->
562        BitMask;
563    Common1 ->
564        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
565                  " set partitions ~w to replica state because they are"
566                  " currently marked as active",
567                  [?set_name(State), ?type(State), ?category(State),
568                   ?group_id(State),
569                   couch_set_view_util:decode_bitmask(Common1)]),
570        BitMask bxor Common1
571    end,
572    BitMask3 = case BitMask2 band ?set_pbitmask(Group) of
573    0 ->
574        BitMask2;
575    Common2 ->
576        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
577                  " set partitions  ~w to replica state because they are"
578                  " currently marked as passive",
579                  [?set_name(State), ?type(State), ?category(State),
580                   ?group_id(State),
581                   couch_set_view_util:decode_bitmask(Common2)]),
582        BitMask2 bxor Common2
583    end,
584    Parts = couch_set_view_util:decode_bitmask(BitMask3),
585    ok = set_state(ReplicaPid, [], Parts, []),
586    NewReplicaParts = ordsets:union(ReplicaParts, Parts),
587    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
588              " defined new replica partitions: ~w~n"
589              "New full set of replica partitions is: ~w~n",
590              [?set_name(State), ?type(State), ?category(State),
591               ?group_id(State), Parts, NewReplicaParts]),
592    State2 = State#state{
593        replica_partitions = NewReplicaParts
594    },
595    {reply, ok, State2, ?GET_TIMEOUT(State2)};
596
597handle_call({remove_replicas, Partitions}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
598    #state{
599        replica_partitions = ReplicaParts,
600        group = Group
601    } = State,
602    case ordsets:intersection(?set_replicas_on_transfer(Group), Partitions) of
603    [] ->
604        ok = set_state(ReplicaPid, [], [], Partitions),
605        NewState = State#state{
606            replica_partitions = ordsets:subtract(ReplicaParts, Partitions)
607        };
608    Common ->
609        UpdaterWasRunning = is_pid(State#state.updater_pid),
610        State2 = stop_cleaner(State),
611        #state{group = Group3} = State3 = stop_updater(State2),
612        {ok, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs, NewVersions} =
613            set_cleanup_partitions(
614                Common,
615                ?set_abitmask(Group3),
616                ?set_pbitmask(Group3),
617                ?set_cbitmask(Group3),
618                ?set_seqs(Group3),
619                ?set_partition_versions(Group)),
620        case NewCbitmask =/= ?set_cbitmask(Group3) of
621        true ->
622             State4 = stop_compactor(State3);
623        false ->
624             State4 = State3
625        end,
626        ReplicaPartitions2 = ordsets:subtract(ReplicaParts, Common),
627        ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group3), Common),
628        State5 = update_header(
629            State4,
630            NewAbitmask,
631            NewPbitmask,
632            NewCbitmask,
633            NewSeqs,
634            ?set_unindexable_seqs(State4#state.group),
635            ReplicasOnTransfer2,
636            ReplicaPartitions2,
637            ?set_pending_transition(State4#state.group),
638            NewVersions),
639        ok = set_state(ReplicaPid, [], [], Partitions),
640        case UpdaterWasRunning of
641        true ->
642            State6 = start_updater(State5);
643        false ->
644            State6 = State5
645        end,
646        NewState = maybe_start_cleaner(State6)
647    end,
648    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, marked the following"
649              " replica partitions for removal: ~w",
650              [?set_name(State), ?type(State), ?category(State),
651               ?group_id(State), Partitions]),
652    {reply, ok, NewState, ?GET_TIMEOUT(NewState)};
653
654handle_call({mark_as_unindexable, Partitions}, _From, State) ->
655    try
656        State2 = process_mark_as_unindexable(State, Partitions),
657        {reply, ok, State2, ?GET_TIMEOUT(State2)}
658    catch
659    throw:Error ->
660        {reply, Error, State, ?GET_TIMEOUT(State)}
661    end;
662
663handle_call({mark_as_indexable, Partitions}, _From, State) ->
664    try
665        State2 = process_mark_as_indexable(State, Partitions),
666        {reply, ok, State2, ?GET_TIMEOUT(State2)}
667    catch
668    throw:Error ->
669        {reply, Error, State, ?GET_TIMEOUT(State)}
670    end;
671
672handle_call(increment_stat, _From, State) ->
673    ?inc_accesses(State#state.group),
674    {reply, ok, State, ?GET_TIMEOUT(State)};
675
676handle_call(#set_view_group_req{} = Req, From, State) ->
677    #state{
678        group = Group,
679        pending_transition_waiters = Waiters
680    } = State,
681    State2 = case is_any_partition_pending(Req, Group) of
682    false ->
683        process_view_group_request(Req, From, State);
684    true ->
685        State#state{pending_transition_waiters = [{From, Req} | Waiters]}
686    end,
687    inc_view_group_access_stats(Req, State2#state.group),
688    {noreply, State2, ?GET_TIMEOUT(State2)};
689
690handle_call(request_group, _From, #state{group = Group} = State) ->
691    % Meant to be called only by this module and the compactor module.
692    % Callers aren't supposed to read from the group's fd, we don't
693    % increment here the ref counter on behalf of the caller.
694    {reply, {ok, Group}, State, ?GET_TIMEOUT(State)};
695
696handle_call(replica_pid, _From, #state{replica_group = Pid} = State) ->
697    % To be used only by unit tests.
698    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
699
700handle_call({start_updater, Options}, _From, State) ->
701    % To be used only by unit tests.
702    State2 = start_updater(State, Options),
703    {reply, {ok, State2#state.updater_pid}, State2, ?GET_TIMEOUT(State2)};
704
705handle_call(start_cleaner, _From, State) ->
706    % To be used only by unit tests.
707    State2 = maybe_start_cleaner(State#state{auto_cleanup = true}),
708    State3 = State2#state{auto_cleanup = State#state.auto_cleanup},
709    {reply, {ok, State2#state.cleaner_pid}, State3, ?GET_TIMEOUT(State3)};
710
711handle_call(updater_pid, _From, #state{updater_pid = Pid} = State) ->
712    % To be used only by unit tests.
713    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
714
715handle_call(cleaner_pid, _From, #state{cleaner_pid = Pid} = State) ->
716    % To be used only by unit tests.
717    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
718
719handle_call({test_rollback, RollbackSeqs}, _From, State0) ->
720    % To be used only by unit tests.
721    Response = case rollback(State0, RollbackSeqs) of
722    {ok, State} ->
723        ok;
724    {error, {cannot_rollback, State}} ->
725        cannot_rollback
726    end,
727    {reply, Response, State, ?GET_TIMEOUT(State)};
728
729handle_call(request_group_info, _From, State) ->
730    GroupInfo = get_group_info(State),
731    {reply, {ok, GroupInfo}, State, ?GET_TIMEOUT(State)};
732
733handle_call(get_data_size, _From, State) ->
734    DataSizeInfo = get_data_size_info(State),
735    {reply, {ok, DataSizeInfo}, State, ?GET_TIMEOUT(State)};
736
737handle_call({start_compact, _CompactFun}, _From,
738            #state{updater_pid = UpPid, initial_build = true} = State) when is_pid(UpPid) ->
739    {reply, {error, initial_build}, State, ?GET_TIMEOUT(State)};
740handle_call({start_compact, CompactFun}, _From, #state{compactor_pid = nil} = State) ->
741    #state{compactor_pid = Pid} = State2 = start_compactor(State, CompactFun),
742    {reply, {ok, Pid}, State2, ?GET_TIMEOUT(State2)};
743handle_call({start_compact, _}, _From, State) ->
744    %% compact already running, this is a no-op
745    {reply, {ok, State#state.compactor_pid}, State};
746
747handle_call({compact_done, Result}, {Pid, _}, #state{compactor_pid = Pid} = State) ->
748    #state{
749        update_listeners = Listeners,
750        group = Group,
751        updater_pid = UpdaterPid,
752        compactor_pid = CompactorPid
753    } = State,
754    #set_view_group{
755        fd = OldFd,
756        ref_counter = RefCounter,
757        filepath = OldFilepath
758    } = Group,
759    #set_view_compactor_result{
760        group = NewGroup0,
761        compact_time = Duration,
762        cleanup_kv_count = CleanupKVCount
763    } = Result,
764
765    MissingChangesCount = couch_set_view_util:missing_changes_count(
766        ?set_seqs(Group), ?set_seqs(NewGroup0)),
767    case MissingChangesCount == 0 of
768    true ->
769        % Compactor might have received a group snapshot from an updater.
770        NewGroup = fix_updater_group(NewGroup0, Group),
771        HeaderBin = couch_set_view_util:group_to_header_bin(NewGroup),
772        {ok, NewHeaderPos} = couch_file:write_header_bin(
773            NewGroup#set_view_group.fd, HeaderBin),
774        if is_pid(UpdaterPid) ->
775            ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compact group"
776                      " up to date - restarting updater",
777                      [?set_name(State), ?type(State), ?category(State),
778                       ?group_id(State)]),
779            % Decided to switch to compacted file
780            % Compactor has caught up and hence discard the running updater
781            couch_set_view_util:shutdown_wait(UpdaterPid),
782            stop_dcp_streams(State);
783        true ->
784            ok
785        end,
786        NewFilepath = increment_filepath(Group),
787        NewRefCounter = new_fd_ref_counter(NewGroup#set_view_group.fd),
788        case ?set_replicas_on_transfer(Group) /= ?set_replicas_on_transfer(NewGroup) of
789        true ->
790            % Set of replicas on transfer changed while compaction was running.
791            % Just write a new header with the new set of replicas on transfer and all the
792            % metadata that is updated when that set changes (active and passive bitmasks).
793            % This happens only during (or after, for a short period) a cluster rebalance or
794            % failover. This shouldn't take too long, as we are writing and fsync'ing only
795            % one header, all data was already fsync'ed by the compactor process.
796            NewGroup2 = NewGroup#set_view_group{
797                ref_counter = NewRefCounter,
798                filepath = NewFilepath,
799                index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
800                    replicas_on_transfer = ?set_replicas_on_transfer(Group),
801                    abitmask = ?set_abitmask(Group),
802                    pbitmask = ?set_pbitmask(Group)
803                }
804            },
805            {ok, NewHeaderPos2} = commit_header(NewGroup2);
806        false ->
807            % The compactor process committed an header with up to date state information and
808            % did an fsync before calling us. No need to commit a new header here (and fsync).
809            NewGroup2 = NewGroup#set_view_group{
810                ref_counter = NewRefCounter,
811                filepath = NewFilepath
812	    },
813	    NewHeaderPos2 = NewHeaderPos
814        end,
815        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compaction complete"
816                  " in ~.3f seconds, filtered ~p key-value pairs",
817                  [?set_name(State), ?type(State), ?category(State),
818                   ?group_id(State), Duration, CleanupKVCount]),
819        inc_util_stat(#util_stats.compaction_time, Duration),
820        ok = couch_file:only_snapshot_reads(OldFd),
821        ok = couch_file:delete(?root_dir(State), OldFilepath),
822        %% After rename call we're sure the header was written to the file
823        %% (no need for couch_file:flush/1 call).
824        ok = couch_file:rename(NewGroup#set_view_group.fd, NewFilepath),
825
826        %% cleanup old group
827        unlink(CompactorPid),
828        couch_ref_counter:drop(RefCounter),
829
830        NewUpdaterPid =
831        if is_pid(UpdaterPid) ->
832            CurSeqs = indexable_partition_seqs(State),
833            spawn_link(couch_set_view_updater,
834                       update,
835                       [self(), NewGroup2, CurSeqs, false, updater_tmp_dir(State), []]);
836        true ->
837            nil
838        end,
839
840        Listeners2 = notify_update_listeners(State, Listeners, NewGroup2),
841        State2 = State#state{
842            update_listeners = Listeners2,
843            compactor_pid = nil,
844            compactor_file = nil,
845            compactor_fun = nil,
846            compact_log_files = nil,
847            compactor_retry_number = 0,
848            updater_pid = NewUpdaterPid,
849            initial_build = is_pid(NewUpdaterPid) andalso
850                    couch_set_view_util:is_initial_build(NewGroup2),
851            updater_state = case is_pid(NewUpdaterPid) of
852                true -> starting;
853                false -> not_running
854            end,
855            group = NewGroup2#set_view_group{
856                header_pos = NewHeaderPos2
857            }
858        },
859        inc_compactions(Result),
860        {reply, ok, maybe_apply_pending_transition(State2), ?GET_TIMEOUT(State2)};
861    false ->
862        State2 = State#state{
863            compactor_retry_number = State#state.compactor_retry_number + 1
864        },
865        {reply, {update, MissingChangesCount}, State2, ?GET_TIMEOUT(State2)}
866    end;
867handle_call({compact_done, _Result}, _From, State) ->
868    % From a previous compactor that was killed/stopped, ignore.
869    {noreply, State, ?GET_TIMEOUT(State)};
870
871handle_call(cancel_compact, _From, #state{compactor_pid = nil} = State) ->
872    {reply, ok, State, ?GET_TIMEOUT(State)};
873handle_call(cancel_compact, _From, #state{compactor_pid = Pid} = State) ->
874    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
875              " canceling compaction (pid ~p)",
876              [?set_name(State), ?type(State), ?category(State),
877               ?group_id(State), Pid]),
878    State2 = stop_compactor(State),
879    State3 = maybe_start_cleaner(State2),
880    {reply, ok, State3, ?GET_TIMEOUT(State3)};
881
882handle_call({monitor_partition_update, PartId, _Ref, _Pid}, _From, State)
883        when PartId >= ?set_num_partitions(State#state.group) ->
884    Msg = io_lib:format("Invalid partition: ~p", [PartId]),
885    {reply, {error, iolist_to_binary(Msg)}, State, ?GET_TIMEOUT(State)};
886
887handle_call({monitor_partition_update, PartId, Ref, Pid}, _From, State) ->
888    try
889        State2 = process_monitor_partition_update(State, PartId, Ref, Pid),
890        {reply, ok, State2, ?GET_TIMEOUT(State2)}
891    catch
892    throw:Error ->
893        {reply, Error, State, ?GET_TIMEOUT(State)}
894    end;
895
896handle_call({demonitor_partition_update, Ref}, _From, State) ->
897    #state{update_listeners = Listeners} = State,
898    case dict:find(Ref, Listeners) of
899    error ->
900        {reply, ok, State, ?GET_TIMEOUT(State)};
901    {ok, #up_listener{monref = MonRef, partition = PartId}} ->
902        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, removing partition ~p"
903                   "update monitor, reference ~p",
904                   [?set_name(State), ?type(State), ?category(State),
905                    ?group_id(State), PartId, Ref]),
906        erlang:demonitor(MonRef, [flush]),
907        State2 = State#state{update_listeners = dict:erase(Ref, Listeners)},
908        {reply, ok, State2, ?GET_TIMEOUT(State2)}
909    end;
910
911handle_call(compact_log_files, _From, State) ->
912    {Files0, Seqs, PartVersions} = State#state.compact_log_files,
913    Files = lists:map(fun lists:reverse/1, Files0),
914    NewState = State#state{compact_log_files = nil},
915    {reply, {ok, {Files, Seqs, PartVersions}}, NewState, ?GET_TIMEOUT(NewState)};
916
917handle_call(reset_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
918    reset_util_stats(),
919    case is_pid(RepPid) of
920    true ->
921        ok = gen_server:call(RepPid, reset_utilization_stats, infinity);
922    false ->
923        ok
924    end,
925    {reply, ok, State, ?GET_TIMEOUT(State)};
926
927handle_call(get_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
928    Stats = erlang:get(util_stats),
929    UsefulIndexing = Stats#util_stats.useful_indexing_time,
930    WastedIndexing = Stats#util_stats.wasted_indexing_time,
931    StatNames = record_info(fields, util_stats),
932    StatPoses = lists:seq(2, record_info(size, util_stats)),
933    StatsList0 = lists:foldr(
934        fun({StatName, StatPos}, Acc) ->
935            Val = element(StatPos, Stats),
936            [{StatName, Val} | Acc]
937        end,
938        [], lists:zip(StatNames, StatPoses)),
939    StatsList1 = [{total_indexing_time, UsefulIndexing + WastedIndexing} | StatsList0],
940    case is_pid(RepPid) of
941    true ->
942        {ok, RepStats} = gen_server:call(RepPid, get_utilization_stats, infinity),
943        StatsList = StatsList1 ++ [{replica_utilization_stats, {RepStats}}];
944    false ->
945        StatsList = StatsList1
946    end,
947    {reply, {ok, StatsList}, State, ?GET_TIMEOUT(State)}.
948
949
950handle_cast(_Msg, State) when not ?is_defined(State) ->
951    {noreply, State};
952
953handle_cast({compact_log_files, Files, Seqs, PartVersions, _Init},
954                                #state{compact_log_files = nil} = State) ->
955    LogList = lists:map(fun(F) -> [F] end, Files),
956    {noreply, State#state{compact_log_files = {LogList, Seqs, PartVersions}},
957        ?GET_TIMEOUT(State)};
958
959handle_cast({compact_log_files, Files, NewSeqs, NewPartVersions, Init}, State) ->
960    LogList = case Init of
961    true ->
962        lists:map(fun(F) -> [F] end, Files);
963    false ->
964        {OldL, _OldSeqs, _OldPartVersions} = State#state.compact_log_files,
965        lists:zipwith(
966            fun(F, Current) -> [F | Current] end,
967            Files, OldL)
968    end,
969    {noreply, State#state{compact_log_files = {LogList, NewSeqs, NewPartVersions}},
970        ?GET_TIMEOUT(State)};
971
972handle_cast({ddoc_updated, NewSig, Aliases}, State) ->
973    #state{
974        waiting_list = Waiters,
975        group = #set_view_group{sig = CurSig}
976    } = State,
977    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
978              " design document was updated~n"
979              "  new signature:   ~s~n"
980              "  current aliases: ~p~n"
981              "  shutdown flag:   ~s~n"
982              "  waiting clients: ~p~n",
983              [?set_name(State), ?type(State), ?category(State),
984               ?group_id(State), hex_sig(CurSig), hex_sig(NewSig), Aliases,
985               State#state.shutdown, length(Waiters)]),
986    case NewSig of
987    CurSig ->
988        State2 = State#state{shutdown = false, shutdown_aliases = undefined},
989        {noreply, State2, ?GET_TIMEOUT(State2)};
990    _ ->
991        State2 = State#state{shutdown = true, shutdown_aliases = Aliases},
992        case Waiters of
993        [] ->
994            {stop, normal, State2};
995        _ ->
996            {noreply, State2}
997        end
998    end;
999
1000handle_cast(before_master_delete, State) ->
1001    Error = {error, {db_deleted, ?master_dbname((?set_name(State)))}},
1002    State2 = reply_all(State, Error),
1003    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, going to shutdown because "
1004              "master database is being deleted",
1005              [?set_name(State), ?type(State), ?category(State),
1006               ?group_id(State)]),
1007    {stop, shutdown, State2};
1008
1009handle_cast({update, MinNumChanges}, #state{group = Group} = State) ->
1010    case is_pid(State#state.updater_pid) of
1011    true ->
1012        {noreply, State};
1013    false ->
1014        CurSeqs = indexable_partition_seqs(State),
1015        MissingCount = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
1016        case (MissingCount >= MinNumChanges) andalso (MissingCount > 0) of
1017        true ->
1018            {noreply, do_start_updater(State, CurSeqs, [])};
1019        false ->
1020            {noreply, State}
1021        end
1022    end;
1023
1024handle_cast({update_replica, _MinNumChanges}, #state{replica_group = nil} = State) ->
1025    {noreply, State};
1026
1027handle_cast({update_replica, MinNumChanges}, #state{replica_group = Pid} = State) ->
1028    ok = gen_server:cast(Pid, {update, MinNumChanges}),
1029    {noreply, State}.
1030
1031
1032handle_info(timeout, State) when not ?is_defined(State) ->
1033    {noreply, State};
1034
1035handle_info(timeout, #state{group = Group} = State) ->
1036    TransferReplicas = (?set_replicas_on_transfer(Group) /= []) andalso
1037        State#state.auto_transfer_replicas,
1038    case TransferReplicas orelse (dict:size(State#state.update_listeners) > 0) of
1039    true ->
1040        {noreply, start_updater(State)};
1041    false ->
1042        {noreply, maybe_start_cleaner(State)}
1043    end;
1044
1045handle_info({partial_update, Pid, AckTo, NewGroup}, #state{updater_pid = Pid} = State) ->
1046    case ?have_pending_transition(State) andalso
1047        (?set_cbitmask(NewGroup) =:= 0) andalso
1048        (?set_cbitmask(State#state.group) =/= 0) andalso
1049        (State#state.waiting_list =:= []) of
1050    true ->
1051        State2 = process_partial_update(State, NewGroup),
1052        AckTo ! update_processed,
1053        State3 = stop_updater(State2),
1054        NewState = maybe_apply_pending_transition(State3);
1055    false ->
1056        NewState = process_partial_update(State, NewGroup),
1057        AckTo ! update_processed
1058    end,
1059    {noreply, NewState};
1060handle_info({partial_update, _, AckTo, _}, State) ->
1061    %% message from an old (probably pre-compaction) updater; ignore
1062    AckTo ! update_processed,
1063    {noreply, State, ?GET_TIMEOUT(State)};
1064
1065handle_info({updater_info, Pid, {state, UpdaterState}}, #state{updater_pid = Pid} = State) ->
1066    #state{
1067        group = Group,
1068        waiting_list = WaitList,
1069        replica_partitions = RepParts
1070    } = State,
1071    State2 = State#state{updater_state = UpdaterState},
1072    case UpdaterState of
1073    updating_passive ->
1074        WaitList2 = reply_with_group(Group, RepParts, WaitList),
1075        State3 = State2#state{waiting_list = WaitList2},
1076        case State#state.shutdown of
1077        true ->
1078            State4 = stop_updater(State3),
1079            {stop, normal, State4};
1080        false ->
1081            State4 = maybe_apply_pending_transition(State3),
1082            {noreply, State4}
1083        end;
1084    _ ->
1085        {noreply, State2}
1086    end;
1087
1088handle_info({updater_info, _Pid, {state, _UpdaterState}}, State) ->
1089    % Message from an old updater, ignore.
1090    {noreply, State, ?GET_TIMEOUT(State)};
1091
1092handle_info({'EXIT', Pid, {clean_group, CleanGroup, Count, Time}}, #state{cleaner_pid = Pid} = State) ->
1093    #state{group = OldGroup} = State,
1094    case CleanGroup#set_view_group.mod of
1095    % The mapreduce view cleaner is a native C based one that writes the
1096    % information to disk.
1097    mapreduce_view ->
1098        {ok, NewGroup0} =
1099             couch_set_view_util:refresh_viewgroup_header(CleanGroup);
1100    spatial_view ->
1101        NewGroup0 = CleanGroup
1102    end,
1103    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
1104    ?LOG_INFO("Cleanup finished for set view `~s`, ~s (~s) group `~s`~n"
1105              "Removed ~p values from the index in ~.3f seconds~n"
1106              "active partitions before:  ~w~n"
1107              "active partitions after:   ~w~n"
1108              "passive partitions before: ~w~n"
1109              "passive partitions after:  ~w~n"
1110              "cleanup partitions before: ~w~n"
1111              "cleanup partitions after:  ~w~n" ++
1112          case is_pid(State#state.replica_group) of
1113          true ->
1114              "Current set of replica partitions: ~w~n"
1115              "Current set of replicas on transfer: ~w~n";
1116          false ->
1117              []
1118          end,
1119          [?set_name(State), ?type(State), ?category(State), ?group_id(State),
1120           Count, Time,
1121           couch_set_view_util:decode_bitmask(?set_abitmask(OldGroup)),
1122           couch_set_view_util:decode_bitmask(?set_abitmask(NewGroup)),
1123           couch_set_view_util:decode_bitmask(?set_pbitmask(OldGroup)),
1124           couch_set_view_util:decode_bitmask(?set_pbitmask(NewGroup)),
1125           couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup)),
1126           couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup))] ++
1127              case is_pid(State#state.replica_group) of
1128              true ->
1129                  [State#state.replica_partitions, ?set_replicas_on_transfer(NewGroup)];
1130              false ->
1131                  []
1132              end),
1133    State2 = State#state{
1134        cleaner_pid = nil,
1135        group = NewGroup
1136    },
1137    inc_cleanups(State2#state.group, Time, Count, false),
1138    {noreply, maybe_apply_pending_transition(State2)};
1139
1140handle_info({'EXIT', Pid, Reason}, #state{cleaner_pid = Pid, group = Group} = State) ->
1141    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1142    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`, cleanup process ~p"
1143               " died with unexpected reason: ~p",
1144               [?set_name(State), ?type(State), ?category(State),
1145                ?group_id(State), Pid, Reason]),
1146    {noreply, State#state{cleaner_pid = nil}, ?GET_TIMEOUT(State)};
1147
1148handle_info({'EXIT', Pid, {updater_finished, Result}}, #state{updater_pid = Pid} = State) ->
1149    #set_view_updater_result{
1150        stats = Stats,
1151        group = #set_view_group{filepath = Path} = NewGroup0,
1152        tmp_file = BuildFile
1153    } = Result,
1154    #set_view_updater_stats{
1155        indexing_time = IndexingTime,
1156        blocked_time = BlockedTime,
1157        inserted_ids = InsertedIds,
1158        deleted_ids = DeletedIds,
1159        inserted_kvs = InsertedKVs,
1160        deleted_kvs = DeletedKVs,
1161        cleanup_kv_count = CleanupKVCount,
1162        seqs = SeqsDone
1163    } = Stats,
1164    case State#state.initial_build of
1165    true ->
1166        NewRefCounter = new_fd_ref_counter(BuildFile),
1167        ok = couch_file:only_snapshot_reads(NewGroup0#set_view_group.fd),
1168        ok = couch_file:delete(?root_dir(State), Path),
1169        ok = couch_file:rename(BuildFile, Path),
1170        couch_ref_counter:drop(NewGroup0#set_view_group.ref_counter),
1171        NewGroup = NewGroup0#set_view_group{
1172            ref_counter = NewRefCounter,
1173            fd = BuildFile
1174        };
1175    false ->
1176        ok = couch_file:refresh_eof(NewGroup0#set_view_group.fd),
1177        NewGroup = NewGroup0
1178    end,
1179    State2 = process_partial_update(State, NewGroup),
1180    #state{
1181        waiting_list = WaitList,
1182        replica_partitions = ReplicaParts,
1183        shutdown = Shutdown,
1184        group = NewGroup2,
1185        update_listeners = UpdateListeners2,
1186        initial_build = InitialBuild
1187    } = State2,
1188    inc_updates(NewGroup2, Result, false, false),
1189    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, updater finished~n"
1190              "Indexing time: ~.3f seconds~n"
1191              "Blocked time:  ~.3f seconds~n"
1192              "Inserted IDs:  ~p~n"
1193              "Deleted IDs:   ~p~n"
1194              "Inserted KVs:  ~p~n"
1195              "Deleted KVs:   ~p~n"
1196              "Cleaned KVs:   ~p~n"
1197              "# seqs done:   ~p~n",
1198              [?set_name(State), ?type(State), ?category(State),
1199               ?group_id(State), IndexingTime, BlockedTime, InsertedIds,
1200               DeletedIds, InsertedKVs, DeletedKVs, CleanupKVCount, SeqsDone]),
1201    UpdaterRestarted = case InitialBuild of
1202    true ->
1203        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1204                  " initial index build of on-disk items is done,"
1205                  " restart updater to index in-memory items in case"
1206                  " there are any",
1207                  [?set_name(State2), ?type(State2), ?category(State2),
1208                   ?group_id(State2)]),
1209        StoppedUpdaterState = State2#state{
1210            updater_pid = nil,
1211            initial_build = false,
1212            updater_state = not_running
1213        },
1214        State3 = start_updater(StoppedUpdaterState),
1215        WaitList2 = State3#state.waiting_list,
1216        is_pid(State3#state.updater_pid);
1217    false ->
1218        WaitList2 = reply_with_group(NewGroup2, ReplicaParts, WaitList),
1219        State3 = State2,
1220        false
1221    end,
1222    case UpdaterRestarted of
1223    true ->
1224        {noreply, State3, ?GET_TIMEOUT(State3)};
1225    false ->
1226        case Shutdown andalso (WaitList2 == []) of
1227        true ->
1228            {stop, normal, State3#state{waiting_list = []}};
1229        false ->
1230            State4 = State3#state{
1231                updater_pid = nil,
1232                initial_build = false,
1233                updater_state = not_running,
1234                waiting_list = WaitList2
1235            },
1236            State5 = maybe_apply_pending_transition(State4),
1237            State6 = case (WaitList2 /= []) orelse (dict:size(UpdateListeners2) > 0) of
1238            true ->
1239                start_updater(State5);
1240            false ->
1241                State5
1242            end,
1243            State7 = maybe_start_cleaner(State6),
1244            {noreply, State7, ?GET_TIMEOUT(State7)}
1245        end
1246    end;
1247
1248handle_info({'EXIT', Pid, {updater_error, purge}}, #state{updater_pid = Pid} = State) ->
1249    State2 = reset_group_from_state(State),
1250    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because updater"
1251              " detected missed document deletes (purge)",
1252              [?set_name(State), ?type(State),
1253               ?category(State), ?group_id(State)]),
1254    State3 = start_updater(State2),
1255    {noreply, State3, ?GET_TIMEOUT(State3)};
1256
1257handle_info({'EXIT', Pid, {updater_error, {rollback, RollbackSeqs}}},
1258        #state{updater_pid = Pid} = State0) ->
1259    Rollback = rollback(State0, RollbackSeqs),
1260    State2 = case Rollback of
1261    {ok, State} ->
1262        ?LOG_INFO(
1263            "Set view `~s`, ~s (~s) group `~s`, group update because group"
1264            " needed to be rolled back",
1265            [?set_name(State), ?type(State),
1266             ?category(State), ?group_id(State)]),
1267        State#state{
1268            updater_pid = nil,
1269            initial_build = false,
1270            updater_state = not_running
1271        };
1272    {error, {cannot_rollback, State}} ->
1273        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because "
1274            "a rollback wasn't possible",
1275            [?set_name(State), ?type(State),
1276             ?category(State), ?group_id(State)]),
1277        reset_group_from_state(State)
1278    end,
1279    State3 = start_updater(State2),
1280    {noreply, State3, ?GET_TIMEOUT(State3)};
1281
1282handle_info({'EXIT', Pid, {updater_error, Error}}, #state{updater_pid = Pid, group = Group} = State) ->
1283    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1284    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1285               " received error from updater: ~p",
1286               [?set_name(State), ?type(State), ?category(State),
1287                ?group_id(State), Error]),
1288    Listeners2 = error_notify_update_listeners(
1289        State, State#state.update_listeners, {updater_error, Error}),
1290    State2 = State#state{
1291        updater_pid = nil,
1292        initial_build = false,
1293        updater_state = not_running,
1294        update_listeners = Listeners2
1295    },
1296    stop_dcp_streams(State),
1297    ?inc_updater_errors(State2#state.group),
1298    case State#state.shutdown of
1299    true ->
1300        {stop, normal, reply_all(State2, {error, Error})};
1301    false ->
1302        Error2 = case Error of
1303        {_, 86, Msg} ->
1304            {error, <<"Reducer: ", Msg/binary>>};
1305        {_, 87, _} ->
1306            {error, <<"reduction too large">>};
1307        {_, _Reason} ->
1308            Error;
1309        _ ->
1310            {error, Error}
1311        end,
1312        State3 = reply_all(State2, Error2),
1313        {noreply, maybe_start_cleaner(State3), ?GET_TIMEOUT(State3)}
1314    end;
1315
1316handle_info({'EXIT', _Pid, {updater_error, _Error}}, State) ->
1317    % from old, shutdown updater, ignore
1318    {noreply, State, ?GET_TIMEOUT(State)};
1319
1320handle_info({'EXIT', UpPid, reset}, #state{updater_pid = UpPid} = State) ->
1321    % TODO: once purge support is properly added, this needs to take into
1322    % account the replica index.
1323    State2 = stop_cleaner(State),
1324    case prepare_group(State#state.init_args, true) of
1325    {ok, ResetGroup} ->
1326        {ok, start_updater(State2#state{group = ResetGroup})};
1327    Error ->
1328        {stop, normal, reply_all(State2, Error)}
1329    end;
1330
1331handle_info({'EXIT', Pid, normal}, State) ->
1332    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1333              " linked PID ~p stopped normally",
1334              [?set_name(State), ?type(State), ?category(State),
1335               ?group_id(State), Pid]),
1336    {noreply, State, ?GET_TIMEOUT(State)};
1337
1338handle_info({'EXIT', Pid, Reason}, #state{compactor_pid = Pid} = State) ->
1339    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1340               " compactor process ~p died with unexpected reason: ~p",
1341               [?set_name(State), ?type(State), ?category(State),
1342                ?group_id(State), Pid, Reason]),
1343    couch_util:shutdown_sync(State#state.compactor_file),
1344    _ = couch_file:delete(?root_dir(State), compact_file_name(State)),
1345    State2 = State#state{
1346        compactor_pid = nil,
1347        compactor_file = nil,
1348        compact_log_files = nil
1349    },
1350    {noreply, State2, ?GET_TIMEOUT(State2)};
1351
1352handle_info({'EXIT', Pid, Reason},
1353        #state{group = #set_view_group{dcp_pid = Pid}} = State) ->
1354    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1355               " DCP process ~p died with unexpected reason: ~p",
1356               [?set_name(State), ?type(State), ?category(State),
1357                ?group_id(State), Pid, Reason]),
1358    {stop, {dcp_died, Reason}, State};
1359
1360handle_info({'EXIT', Pid, Reason}, State) ->
1361    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1362               " terminating because linked PID ~p died with reason: ~p",
1363               [?set_name(State), ?type(State), ?category(State),
1364                ?group_id(State), Pid, Reason]),
1365    {stop, Reason, State};
1366
1367handle_info({all_seqs, nil, StatsResponse}, State) ->
1368    #state{
1369       group = Group
1370    } = State,
1371    SeqsCache = erlang:get(seqs_cache),
1372    NewState = case StatsResponse of
1373    {ok, Seqs} ->
1374        Partitions = group_partitions(Group),
1375        Seqs2 = couch_set_view_util:filter_seqs(Partitions, Seqs),
1376        NewCacheVal = #seqs_cache{
1377            timestamp = os:timestamp(),
1378            is_waiting = false,
1379            seqs = Seqs2
1380        },
1381        State2 = case is_pid(State#state.updater_pid) of
1382        true ->
1383            State;
1384        false ->
1385            CurSeqs = indexable_partition_seqs(State, Seqs2),
1386            case CurSeqs > ?set_seqs(State#state.group) of
1387            true ->
1388                do_start_updater(State, CurSeqs, []);
1389            false ->
1390                State
1391            end
1392        end,
1393        % If a rollback happened in between async seqs update request and
1394        % its response, is_waiting flag will be resetted. In that case, we should
1395        % just discard the update seqs.
1396        case SeqsCache#seqs_cache.is_waiting of
1397        true ->
1398            erlang:put(seqs_cache, NewCacheVal);
1399        false ->
1400            ok
1401        end,
1402        State2;
1403    _ ->
1404        ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1405                              " received bad response for update seqs"
1406                              " request ~p",
1407                              [?set_name(State), ?type(State),
1408                               ?category(State), ?group_id(State),
1409                               StatsResponse]),
1410        SeqsCache2 = SeqsCache#seqs_cache{is_waiting = false},
1411        erlang:put(seqs_cache, SeqsCache2),
1412        State
1413    end,
1414    {noreply, NewState}.
1415
1416
1417terminate(Reason, #state{group = #set_view_group{sig = Sig} = Group} = State) ->
1418    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s`,"
1419              " terminating with reason: ~p",
1420              [?set_name(State), ?type(State), ?category(State),
1421               ?group_id(State), hex_sig(Sig), Reason]),
1422    Listeners2 = error_notify_update_listeners(
1423        State, State#state.update_listeners, {shutdown, Reason}),
1424    State2 = reply_all(State#state{update_listeners = Listeners2}, Reason),
1425    State3 = notify_pending_transition_waiters(State2, {shutdown, Reason}),
1426    couch_set_view_util:shutdown_wait(State3#state.cleaner_pid),
1427    couch_set_view_util:shutdown_wait(State3#state.updater_pid),
1428    couch_util:shutdown_sync(State3#state.compactor_pid),
1429    couch_util:shutdown_sync(State3#state.compactor_file),
1430    couch_util:shutdown_sync(State3#state.replica_group),
1431    Group = State#state.group,
1432    true = ets:delete(Group#set_view_group.stats_ets,
1433        ?set_view_group_stats_key(Group)),
1434    catch couch_file:only_snapshot_reads((State3#state.group)#set_view_group.fd),
1435    case State#state.shutdown of
1436    true when ?type(State) == main ->
1437        % Important to delete files here. A quick succession of ddoc updates, updating
1438        % the ddoc back to a previous version (while we're here in terminate), may lead
1439        % us to a case where it should start clean but it doesn't, because
1440        % couch_set_view:cleanup_index_files/1 was not invoked right before the last
1441        % reverting ddoc update, or it was invoked but did not finish before the view
1442        % group got spawned again. Problem here is during rebalance, where one of the
1443        % databases might have been deleted after the last ddoc update and while or after
1444        % we're here in terminate, so we must ensure we don't leave old index files around.
1445        % MB-6415 and MB-6517
1446        case State#state.shutdown_aliases of
1447        [] ->
1448            delete_index_file(?root_dir(State), Group, main),
1449            delete_index_file(?root_dir(State), Group, replica);
1450        _ ->
1451            ok
1452        end;
1453    _ ->
1454        ok
1455    end,
1456    TmpDir = updater_tmp_dir(State),
1457    ok = couch_set_view_util:delete_sort_files(TmpDir, all),
1458    _ = file:del_dir(TmpDir),
1459    ok.
1460
1461
1462code_change(_OldVsn, State, _Extra) ->
1463    {ok, State}.
1464
1465
1466-spec reply_with_group(#set_view_group{},
1467                       ordsets:ordset(partition_id()),
1468                       [#waiter{}]) -> [#waiter{}].
1469reply_with_group(_Group, _ReplicaPartitions, []) ->
1470    [];
1471reply_with_group(Group, ReplicaPartitions, WaitList) ->
1472    ActiveReplicasBitmask = couch_set_view_util:build_bitmask(
1473        ?set_replicas_on_transfer(Group)),
1474    ActiveIndexable = [{P, S} || {P, S} <- ?set_seqs(Group),
1475                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1476    ActiveUnindexable = [{P, S} || {P, S} <- ?set_unindexable_seqs(Group),
1477                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1478    GroupSeqs = ordsets:union(ActiveIndexable, ActiveUnindexable),
1479    WaitList2 = lists:foldr(
1480        fun(#waiter{debug = false} = Waiter, Acc) ->
1481            case maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) of
1482            true ->
1483                Acc;
1484            false ->
1485                [Waiter | Acc]
1486            end;
1487        (#waiter{debug = true} = Waiter, Acc) ->
1488            [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1489                ?set_view_group_stats_key(Group)),
1490            DebugGroup = Group#set_view_group{
1491                debug_info = #set_view_debug_info{
1492                    stats = Stats,
1493                    original_abitmask = ?set_abitmask(Group),
1494                    original_pbitmask = ?set_pbitmask(Group),
1495                    replica_partitions = ReplicaPartitions,
1496                    wanted_seqs = Waiter#waiter.seqs
1497                }
1498            },
1499            case maybe_reply_with_group(Waiter, DebugGroup, GroupSeqs, ActiveReplicasBitmask) of
1500            true ->
1501                Acc;
1502            false ->
1503                [Waiter | Acc]
1504            end
1505        end,
1506        [], WaitList),
1507    WaitList2.
1508
1509
1510-spec maybe_reply_with_group(#waiter{}, #set_view_group{}, partition_seqs(), bitmask()) -> boolean().
1511maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) ->
1512    #waiter{from = {Pid, _} = From, seqs = ClientSeqs} = Waiter,
1513    case (ClientSeqs == []) orelse (GroupSeqs >= ClientSeqs) of
1514    true ->
1515        couch_ref_counter:add(Group#set_view_group.ref_counter, Pid),
1516        gen_server:reply(From, {ok, Group, ActiveReplicasBitmask}),
1517        true;
1518    false ->
1519        false
1520    end.
1521
1522
1523-spec reply_all(#state{}, term()) -> #state{}.
1524reply_all(#state{waiting_list = []} = State, _Reply) ->
1525    State;
1526reply_all(#state{waiting_list = WaitList} = State, Reply) ->
1527    lists:foreach(fun(#waiter{from = From}) ->
1528        catch gen_server:reply(From, Reply)
1529    end, WaitList),
1530    State#state{waiting_list = []}.
1531
1532
1533-spec prepare_group(init_args(), boolean()) -> {'ok', #set_view_group{}} |
1534                                               {'error', atom()}.
1535prepare_group({RootDir, SetName, Group0}, ForceReset)->
1536    #set_view_group{
1537        sig = Sig,
1538        type = Type
1539    } = Group0,
1540    Filepath = find_index_file(RootDir, Group0),
1541    Group = Group0#set_view_group{filepath = Filepath},
1542    case open_index_file(Filepath) of
1543    {ok, Fd} ->
1544        if ForceReset ->
1545            % this can happen if we missed a purge
1546            {ok, reset_file(Fd, Group)};
1547        true ->
1548            case (catch couch_file:read_header_bin(Fd)) of
1549            {ok, HeaderBin, HeaderPos} ->
1550                HeaderSig = couch_set_view_util:header_bin_sig(HeaderBin);
1551            _ ->
1552                HeaderPos = 0,  % keep dialyzer happy
1553                HeaderSig = <<>>,
1554                HeaderBin = <<>>
1555            end,
1556            case HeaderSig == Sig of
1557            true ->
1558                HeaderInfo = couch_set_view_util:header_bin_to_term(HeaderBin),
1559                {ok, init_group(Fd, Group, HeaderInfo, HeaderPos)};
1560            _ ->
1561                % this happens on a new file
1562                case (not ForceReset) andalso (Type =:= main) of
1563                true ->
1564                    % initializing main view group
1565                    ok = delete_index_file(RootDir, Group, replica);
1566                false ->
1567                    ok
1568                end,
1569                {ok, reset_file(Fd, Group)}
1570            end
1571        end;
1572    {error, emfile} = Error ->
1573        ?LOG_ERROR("Can't open set view `~s`, ~s (~s) group `~s`:"
1574                   " too many files open",
1575                   [SetName, Type, Group#set_view_group.category,
1576                    Group#set_view_group.name]),
1577        Error;
1578    Error ->
1579        ok = delete_index_file(RootDir, Group, Type),
1580        case (not ForceReset) andalso (Type =:= main) of
1581        true ->
1582            % initializing main view group
1583            ok = delete_index_file(RootDir, Group, replica);
1584        false ->
1585            ok
1586        end,
1587        Error
1588    end.
1589
1590
1591-spec hex_sig(#set_view_group{} | binary()) -> string().
1592hex_sig(#set_view_group{sig = Sig}) ->
1593    hex_sig(Sig);
1594hex_sig(GroupSig) ->
1595    couch_util:to_hex(GroupSig).
1596
1597
1598-spec base_index_file_name(#set_view_group{}, set_view_group_type()) -> string().
1599base_index_file_name(Group, Type) ->
1600    atom_to_list(Type) ++ "_" ++ hex_sig(Group#set_view_group.sig) ++
1601        Group#set_view_group.extension.
1602
1603
1604-spec find_index_file(string(), #set_view_group{}) -> string().
1605find_index_file(RootDir, Group) ->
1606    #set_view_group{
1607        set_name = SetName,
1608        type = Type,
1609        category = Category
1610    } = Group,
1611    DesignRoot = couch_set_view:set_index_dir(RootDir, SetName, Category),
1612    BaseName = base_index_file_name(Group, Type),
1613    FullPath = filename:join([DesignRoot, BaseName]),
1614    case filelib:wildcard(FullPath ++ ".[0-9]*") of
1615    [] ->
1616        FullPath ++ ".1";
1617    Matching ->
1618        BaseNameSplitted = string:tokens(BaseName, "."),
1619        Matching2 = lists:filter(
1620            fun(Match) ->
1621                MatchBase = filename:basename(Match),
1622                [Suffix | Rest] = lists:reverse(string:tokens(MatchBase, ".")),
1623                (lists:reverse(Rest) =:= BaseNameSplitted) andalso
1624                    is_integer((catch list_to_integer(Suffix)))
1625            end,
1626            Matching),
1627        case Matching2 of
1628        [] ->
1629            FullPath ++ ".1";
1630        _ ->
1631            GetSuffix = fun(FileName) ->
1632                list_to_integer(lists:last(string:tokens(FileName, ".")))
1633            end,
1634            Matching3 = lists:sort(
1635                fun(A, B) -> GetSuffix(A) > GetSuffix(B) end,
1636                Matching2),
1637            hd(Matching3)
1638        end
1639    end.
1640
1641
1642-spec delete_index_file(string(), #set_view_group{}, set_view_group_type()) -> no_return().
1643delete_index_file(RootDir, Group, Type) ->
1644    #set_view_group{
1645        set_name = SetName,
1646        category = Category
1647    } = Group,
1648    SetDir = couch_set_view:set_index_dir(RootDir, SetName, Category),
1649    BaseName = filename:join([SetDir, base_index_file_name(Group, Type)]),
1650    lists:foreach(
1651        fun(F) -> ok = couch_file:delete(RootDir, F) end,
1652        filelib:wildcard(BaseName ++ ".[0-9]*")).
1653
1654
1655-spec compact_file_name(#state{} | #set_view_group{}) -> string().
1656compact_file_name(#state{group = Group}) ->
1657    compact_file_name(Group);
1658compact_file_name(#set_view_group{filepath = CurFilepath}) ->
1659    CurFilepath ++ ".compact".
1660
1661
1662-spec increment_filepath(#set_view_group{}) -> string().
1663increment_filepath(#set_view_group{filepath = CurFilepath}) ->
1664    [Suffix | Rest] = lists:reverse(string:tokens(CurFilepath, ".")),
1665    NewSuffix = integer_to_list(list_to_integer(Suffix) + 1),
1666    string:join(lists:reverse(Rest), ".") ++ "." ++ NewSuffix.
1667
1668
1669-spec open_index_file(string()) -> {'ok', pid()} | {'error', atom()}.
1670open_index_file(Filepath) ->
1671    case do_open_index_file(Filepath) of
1672    {ok, Fd} ->
1673        unlink(Fd),
1674        {ok, Fd};
1675    Error ->
1676        Error
1677    end.
1678
1679do_open_index_file(Filepath) ->
1680    case couch_file:open(Filepath) of
1681    {ok, Fd}        -> {ok, Fd};
1682    {error, enoent} -> couch_file:open(Filepath, [create]);
1683    Error           -> Error
1684    end.
1685
1686
1687open_set_group(Mod, SetName, GroupId) ->
1688    case couch_set_view_ddoc_cache:get_ddoc(SetName, GroupId) of
1689    {ok, DDoc} ->
1690        {ok, Mod:design_doc_to_set_view_group(SetName, DDoc)};
1691    {doc_open_error, Error} ->
1692        Error;
1693    {db_open_error, Error} ->
1694        Error
1695    end.
1696
1697
1698% To be used for debug/troubleshooting only (accessible via REST/HTTP API)
1699get_group_info(State) ->
1700    #state{
1701        group = Group,
1702        replica_group = ReplicaPid,
1703        updater_pid = UpdaterPid,
1704        updater_state = UpdaterState,
1705        compactor_pid = CompactorPid,
1706        waiting_list = WaitersList,
1707        cleaner_pid = CleanerPid,
1708        replica_partitions = ReplicaParts
1709    } = State,
1710    #set_view_group{
1711        fd = Fd,
1712        sig = GroupSig,
1713        id_btree = Btree,
1714        views = Views,
1715        mod = Mod
1716    } = Group,
1717    PendingTrans = get_pending_transition(State),
1718    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1719        ?set_view_group_stats_key(Group)),
1720    JsonStats = {[
1721        {full_updates, Stats#set_view_group_stats.full_updates},
1722        {partial_updates, Stats#set_view_group_stats.partial_updates},
1723        {stopped_updates, Stats#set_view_group_stats.stopped_updates},
1724        {updater_cleanups, Stats#set_view_group_stats.updater_cleanups},
1725        {compactions, Stats#set_view_group_stats.compactions},
1726        {cleanups, Stats#set_view_group_stats.cleanups},
1727        {waiting_clients, length(WaitersList)},
1728        {cleanup_interruptions, Stats#set_view_group_stats.cleanup_stops},
1729        {update_history, Stats#set_view_group_stats.update_history},
1730        {compaction_history, Stats#set_view_group_stats.compaction_history},
1731        {cleanup_history, Stats#set_view_group_stats.cleanup_history}
1732    ]},
1733    {ok, Size} = couch_file:bytes(Fd),
1734    GroupPartitions = ordsets:from_list(
1735        couch_set_view_util:decode_bitmask(?set_abitmask(Group) bor ?set_pbitmask(Group))),
1736    PartVersions = lists:map(fun({PartId, PartVersion}) ->
1737        {couch_util:to_binary(PartId), [tuple_to_list(V) || V <- PartVersion]}
1738    end, ?set_partition_versions(Group)),
1739
1740    IndexSeqs = ?set_seqs(Group),
1741    IndexPartitions = [PartId || {PartId, _} <- IndexSeqs],
1742    % Extract the seqnum from KV store for all indexible partitions.
1743    {ok, GroupSeqs} = get_seqs(State, GroupPartitions),
1744    PartSeqs = couch_set_view_util:filter_seqs(IndexPartitions, GroupSeqs),
1745
1746    % Calculate the total sum over difference of Seqnum between KV
1747    % and Index partition.
1748    SeqDiffs = lists:zipwith(
1749        fun({PartId, Seq1}, {PartId, Seq2}) ->
1750            Seq1 - Seq2
1751        end, PartSeqs, IndexSeqs),
1752    TotalSeqDiff = lists:sum(SeqDiffs),
1753
1754    [
1755        {signature, ?l2b(hex_sig(GroupSig))},
1756        {disk_size, Size},
1757        {data_size, Mod:view_group_data_size(Btree, Views)},
1758        {updater_running, is_pid(UpdaterPid)},
1759        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build},
1760        {updater_state, couch_util:to_binary(UpdaterState)},
1761        {compact_running, CompactorPid /= nil},
1762        {cleanup_running, (CleanerPid /= nil) orelse
1763            ((CompactorPid /= nil) andalso (?set_cbitmask(Group) =/= 0))},
1764        {max_number_partitions, ?set_num_partitions(Group)},
1765        {update_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- IndexSeqs]}},
1766        {partition_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- GroupSeqs]}},
1767        {total_seqs_diff, TotalSeqDiff},
1768        {active_partitions, couch_set_view_util:decode_bitmask(?set_abitmask(Group))},
1769        {passive_partitions, couch_set_view_util:decode_bitmask(?set_pbitmask(Group))},
1770        {cleanup_partitions, couch_set_view_util:decode_bitmask(?set_cbitmask(Group))},
1771        {unindexable_partitions, {[{couch_util:to_binary(P), S} || {P, S} <- ?set_unindexable_seqs(Group)]}},
1772        {stats, JsonStats},
1773        {pending_transition, case PendingTrans of
1774            nil ->
1775                null;
1776            #set_view_transition{} ->
1777                {[
1778                    {active, PendingTrans#set_view_transition.active},
1779                    {passive, PendingTrans#set_view_transition.passive}
1780                ]}
1781            end
1782        },
1783        {partition_versions, {PartVersions}}
1784    ] ++
1785    case (?type(State) =:= main) andalso is_pid(ReplicaPid) of
1786    true ->
1787        [{replica_partitions, ReplicaParts}, {replicas_on_transfer, ?set_replicas_on_transfer(Group)}];
1788    false ->
1789        []
1790    end ++
1791    get_replica_group_info(ReplicaPid).
1792
1793get_replica_group_info(ReplicaPid) when is_pid(ReplicaPid) ->
1794    {ok, RepGroupInfo} = gen_server:call(ReplicaPid, request_group_info, infinity),
1795    [{replica_group_info, {RepGroupInfo}}];
1796get_replica_group_info(_) ->
1797    [].
1798
1799
1800get_data_size_info(State) ->
1801    #state{
1802        group = Group,
1803        replica_group = ReplicaPid,
1804        updater_pid = UpdaterPid
1805    } = State,
1806    #set_view_group{
1807        fd = Fd,
1808        id_btree = Btree,
1809        sig = GroupSig,
1810        views = Views,
1811        mod = Mod
1812    } = Group,
1813    {ok, FileSize} = couch_file:bytes(Fd),
1814    DataSize = Mod:view_group_data_size(Btree, Views),
1815    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1816        ?set_view_group_stats_key(Group)),
1817    Info = [
1818        {signature, hex_sig(GroupSig)},
1819        {disk_size, FileSize},
1820        {data_size, DataSize},
1821        {accesses, Stats#set_view_group_stats.accesses},
1822        {updater_running, is_pid(UpdaterPid)},
1823        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build}
1824    ],
1825    case is_pid(ReplicaPid) of
1826    false ->
1827        Info;
1828    true ->
1829        {ok, RepInfo} = gen_server:call(ReplicaPid, get_data_size, infinity),
1830        [{replica_group_info, RepInfo} | Info]
1831    end.
1832
1833
1834-spec reset_group(#set_view_group{}) -> #set_view_group{}.
1835reset_group(Group) ->
1836    #set_view_group{
1837        views = Views,
1838        mod = Mod
1839    } = Group,
1840    Views2 = lists:map(fun(View) ->
1841        Indexer = Mod:reset_view(View#set_view.indexer),
1842        View#set_view{indexer = Indexer}
1843    end, Views),
1844    Group#set_view_group{
1845        fd = nil,
1846        index_header = #set_view_index_header{},
1847        id_btree = nil,
1848        views = Views2
1849    }.
1850
1851
1852-spec reset_file(pid(), #set_view_group{}) -> #set_view_group{}.
1853reset_file(Fd, #set_view_group{views = Views, index_header = Header} = Group) ->
1854    ok = couch_file:truncate(Fd, 0),
1855    EmptyHeader = Header#set_view_index_header{
1856        seqs = [{PartId, 0} ||
1857            {PartId, _} <- Header#set_view_index_header.seqs],
1858        id_btree_state = nil,
1859        view_states = [nil || _ <- Views],
1860        replicas_on_transfer = [],
1861        pending_transition = nil,
1862        unindexable_seqs = [{PartId, 0} ||
1863            {PartId, _} <- Header#set_view_index_header.unindexable_seqs],
1864        partition_versions = [{PartId, [{0, 0}]} ||
1865            {PartId, _} <- Header#set_view_index_header.partition_versions]
1866    },
1867    EmptyGroup = Group#set_view_group{index_header = EmptyHeader},
1868    EmptyHeaderBin = couch_set_view_util:group_to_header_bin(EmptyGroup),
1869    {ok, Pos} = couch_file:write_header_bin(Fd, EmptyHeaderBin),
1870    init_group(Fd, reset_group(EmptyGroup), EmptyHeader, Pos).
1871
1872
1873-spec reset_group_from_state(#state{}) -> #state{}.
1874reset_group_from_state(State) ->
1875    Group = State#state.group,
1876    Group2 = reset_file(Group#set_view_group.fd, Group),
1877    State#state{
1878        updater_pid = nil,
1879        initial_build = false,
1880        updater_state = not_running,
1881        group = Group2
1882    }.
1883
1884
1885-spec init_group(pid(),
1886                 #set_view_group{},
1887                 #set_view_index_header{},
1888                 non_neg_integer()) -> #set_view_group{}.
1889init_group(Fd, Group, IndexHeader, HeaderPos) ->
1890    #set_view_group{
1891        views = Views0,
1892        mod = Mod
1893    } = Group,
1894    Views = [V#set_view{ref = make_ref()} || V <- Views0],
1895    #set_view_index_header{
1896        id_btree_state = IdBtreeState,
1897        view_states = ViewStates
1898    } = IndexHeader,
1899    IdTreeReduce = fun(reduce, KVs) ->
1900        <<(length(KVs)):40, (couch_set_view_util:partitions_map(KVs, 0)):?MAX_NUM_PARTITIONS>>;
1901    (rereduce, [First | Rest]) ->
1902        lists:foldl(
1903            fun(<<S:40, M:?MAX_NUM_PARTITIONS>>, <<T:40, A:?MAX_NUM_PARTITIONS>>) ->
1904                <<(S + T):40, (M bor A):?MAX_NUM_PARTITIONS>>
1905            end,
1906            First, Rest)
1907    end,
1908    KvChunkThreshold = couch_config:get("set_views", "btree_kv_node_threshold", "7168"),
1909    KpChunkThreshold = couch_config:get("set_views", "btree_kp_node_threshold", "6144"),
1910    BtreeOptions = [
1911        {kv_chunk_threshold, list_to_integer(KvChunkThreshold)},
1912        {kp_chunk_threshold, list_to_integer(KpChunkThreshold)},
1913        {binary_mode, true}
1914    ],
1915    {ok, IdBtree} = couch_btree:open(
1916        IdBtreeState, Fd, [{reduce, IdTreeReduce} | BtreeOptions]),
1917    Views2 = Mod:setup_views(Fd, BtreeOptions, Group, ViewStates, Views),
1918    Group#set_view_group{
1919        fd = Fd,
1920        id_btree = IdBtree,
1921        views = Views2,
1922        index_header = IndexHeader,
1923        header_pos = HeaderPos
1924    }.
1925
1926
1927commit_header(Group) ->
1928    commit_header(Group, true).
1929
1930-spec commit_header(#set_view_group{}, boolean()) -> {'ok', non_neg_integer()}.
1931commit_header(Group, Fsync) ->
1932    HeaderBin = couch_set_view_util:group_to_header_bin(Group),
1933    {ok, Pos} = couch_file:write_header_bin(Group#set_view_group.fd, HeaderBin),
1934    case Fsync of
1935    true ->
1936        ok = couch_file:sync(Group#set_view_group.fd);
1937    false ->
1938        ok = couch_file:flush(Group#set_view_group.fd)
1939    end,
1940    {ok, Pos}.
1941
1942-spec filter_out_bitmask_partitions(ordsets:ordset(partition_id()),
1943                                    bitmask()) -> ordsets:ordset(partition_id()).
1944filter_out_bitmask_partitions(Partitions, BMask) ->
1945    [P || P <- Partitions, ((BMask bsr P) band 1) =/= 1].
1946
1947-spec maybe_update_partition_states(ordsets:ordset(partition_id()),
1948                                    ordsets:ordset(partition_id()),
1949                                    ordsets:ordset(partition_id()),
1950                                    #state{}) -> #state{}.
1951maybe_update_partition_states(ActiveList0, PassiveList0, CleanupList0, State) ->
1952    #state{group = Group} = State,
1953    PendingTrans = ?set_pending_transition(Group),
1954    PendingActive = ?pending_transition_active(PendingTrans),
1955    PendingPassive = ?pending_transition_passive(PendingTrans),
1956    PendingUnindexable = ?pending_transition_unindexable(PendingTrans),
1957    case (?set_unindexable_seqs(Group) == []) andalso (PendingUnindexable == []) of
1958    true ->
1959        ActiveList = ActiveList0,
1960        PassiveList = PassiveList0,
1961        CleanupList = CleanupList0;
1962    false ->
1963        AlreadyActive = couch_set_view_util:build_bitmask(PendingActive) bor ?set_abitmask(Group),
1964        AlreadyPassive = couch_set_view_util:build_bitmask(PendingPassive) bor ?set_pbitmask(Group),
1965        ActiveList = filter_out_bitmask_partitions(ActiveList0, AlreadyActive),
1966        PassiveList = filter_out_bitmask_partitions(PassiveList0, AlreadyPassive),
1967        CleanupList = filter_out_bitmask_partitions(CleanupList0, ?set_cbitmask(Group)),
1968        ActiveMarkedAsUnindexable0 = [
1969            P || P <- ActiveList, is_unindexable_part(P, Group)
1970        ],
1971
1972        % Replicas on transfer look like normal active partitions for the
1973        % caller. Hence they can be marked as unindexable. The actual
1974        % transfer to a real active partition needs to happen though. Thus
1975        % an intersection between newly activated partitions and unindexable
1976        % ones is possible (MB-8677).
1977        ActiveMarkedAsUnindexable = ordsets:subtract(
1978            ActiveMarkedAsUnindexable0, ?set_replicas_on_transfer(Group)),
1979
1980        case ActiveMarkedAsUnindexable of
1981        [] ->
1982            ok;
1983        _ ->
1984            ErrorMsg1 = io_lib:format("Intersection between requested active list "
1985                "and current unindexable partitions: ~w", [ActiveMarkedAsUnindexable]),
1986            throw({error, iolist_to_binary(ErrorMsg1)})
1987        end,
1988        PassiveMarkedAsUnindexable = [
1989            P || P <- PassiveList, is_unindexable_part(P, Group)
1990        ],
1991        case PassiveMarkedAsUnindexable of
1992        [] ->
1993            ok;
1994        _ ->
1995            ErrorMsg2 = io_lib:format("Intersection between requested passive list "
1996                "and current unindexable partitions: ~w", [PassiveMarkedAsUnindexable]),
1997            throw({error, iolist_to_binary(ErrorMsg2)})
1998        end,
1999        CleanupMarkedAsUnindexable = [
2000            P || P <- CleanupList, is_unindexable_part(P, Group)
2001        ],
2002        case CleanupMarkedAsUnindexable of
2003        [] ->
2004            ok;
2005        _ ->
2006            ErrorMsg3 = io_lib:format("Intersection between requested cleanup list "
2007                "and current unindexable partitions: ~w", [CleanupMarkedAsUnindexable]),
2008            throw({error, iolist_to_binary(ErrorMsg3)})
2009        end
2010    end,
2011    ActiveMask = couch_set_view_util:build_bitmask(ActiveList),
2012    case ActiveMask >= (1 bsl ?set_num_partitions(Group)) of
2013    true ->
2014        throw({error, <<"Invalid active partitions list">>});
2015    false ->
2016        ok
2017    end,
2018    PassiveMask = couch_set_view_util:build_bitmask(PassiveList),
2019    case PassiveMask >= (1 bsl ?set_num_partitions(Group)) of
2020    true ->
2021        throw({error, <<"Invalid passive partitions list">>});
2022    false ->
2023        ok
2024    end,
2025    CleanupMask = couch_set_view_util:build_bitmask(CleanupList),
2026    case CleanupMask >= (1 bsl ?set_num_partitions(Group)) of
2027    true ->
2028        throw({error, <<"Invalid cleanup partitions list">>});
2029    false ->
2030        ok
2031    end,
2032
2033    ActivePending = ?pending_transition_active(PendingTrans),
2034    PassivePending = ?pending_transition_passive(PendingTrans),
2035    IsEffectlessTransition =
2036        (ActiveMask bor ?set_abitmask(Group)) == ?set_abitmask(Group) andalso
2037        (PassiveMask bor ?set_pbitmask(Group)) == ?set_pbitmask(Group) andalso
2038        ((CleanupMask band (?set_abitmask(Group) bor ?set_pbitmask(Group))) == 0) andalso
2039        ordsets:is_disjoint(CleanupList, ActivePending) andalso
2040        ordsets:is_disjoint(CleanupList, PassivePending) andalso
2041        ordsets:is_disjoint(ActiveList, PassivePending) andalso
2042        ordsets:is_disjoint(PassiveList, ActivePending),
2043
2044    case IsEffectlessTransition of
2045    true ->
2046        State;
2047    false ->
2048        RestartUpdater = updater_needs_restart(
2049            Group, ActiveMask, PassiveMask, CleanupMask),
2050        NewState = update_partition_states(
2051            ActiveList, PassiveList, CleanupList, State, RestartUpdater),
2052        #state{group = NewGroup, updater_pid = UpdaterPid} = NewState,
2053        case RestartUpdater of
2054        false when is_pid(UpdaterPid) ->
2055            case missing_partitions(Group, NewGroup) of
2056            [] ->
2057                ok;
2058            MissingPassive ->
2059                UpdaterPid ! {new_passive_partitions, MissingPassive}
2060            end;
2061        _ ->
2062            ok
2063        end,
2064        NewState
2065    end.
2066
2067
2068-spec update_partition_states(ordsets:ordset(partition_id()),
2069                              ordsets:ordset(partition_id()),
2070                              ordsets:ordset(partition_id()),
2071                              #state{},
2072                              boolean()) -> #state{}.
2073update_partition_states(ActiveList, PassiveList, CleanupList, State, RestartUpdater) ->
2074    State2 = stop_cleaner(State),
2075    case RestartUpdater of
2076    true ->
2077        #state{group = Group3} = State3 = stop_updater(State2);
2078    false ->
2079        #state{group = Group3} = State3 = State2
2080    end,
2081    UpdaterWasRunning = is_pid(State#state.updater_pid),
2082    ActiveInCleanup = partitions_still_in_cleanup(ActiveList, Group3),
2083    PassiveInCleanup = partitions_still_in_cleanup(PassiveList, Group3),
2084    NewPendingTrans = merge_into_pending_transition(
2085        Group3, ActiveInCleanup, PassiveInCleanup, CleanupList),
2086    ApplyActiveList = ordsets:subtract(ActiveList, ActiveInCleanup),
2087    ApplyPassiveList = ordsets:subtract(PassiveList, PassiveInCleanup),
2088    ApplyCleanupList = CleanupList,
2089    State4 = persist_partition_states(
2090               State3, ApplyActiveList, ApplyPassiveList,
2091               ApplyCleanupList, NewPendingTrans, []),
2092    State5 = notify_pending_transition_waiters(State4),
2093    after_partition_states_updated(State5, UpdaterWasRunning).
2094
2095
2096-spec merge_into_pending_transition(#set_view_group{},
2097                                    ordsets:ordset(partition_id()),
2098                                    ordsets:ordset(partition_id()),
2099                                    ordsets:ordset(partition_id())) ->
2100                                           #set_view_transition{} | 'nil'.
2101merge_into_pending_transition(Group, ActiveInCleanup, PassiveInCleanup, CleanupList) ->
2102    PendingTrans = ?set_pending_transition(Group),
2103    ActivePending = ?pending_transition_active(PendingTrans),
2104    PassivePending = ?pending_transition_passive(PendingTrans),
2105    case ordsets:intersection(PassivePending, ActiveInCleanup) of
2106    [] ->
2107        PassivePending2 = PassivePending;
2108    Int ->
2109        PassivePending2 = ordsets:subtract(PassivePending, Int)
2110    end,
2111    case ordsets:intersection(ActivePending, PassiveInCleanup) of
2112    [] ->
2113        ActivePending2 = ActivePending;
2114    Int2 ->
2115        ActivePending2 = ordsets:subtract(ActivePending, Int2)
2116    end,
2117    ActivePending3 = ordsets:subtract(ActivePending2, CleanupList),
2118    PassivePending3 = ordsets:subtract(PassivePending2, CleanupList),
2119    ActivePending4 = ordsets:union(ActivePending3, ActiveInCleanup),
2120    PassivePending4 = ordsets:union(PassivePending3, PassiveInCleanup),
2121    case (ActivePending4 == []) andalso (PassivePending4 == []) of
2122    true ->
2123        nil;
2124    false ->
2125        #set_view_transition{
2126            active = ActivePending4,
2127            passive = PassivePending4
2128        }
2129    end.
2130
2131
2132-spec after_partition_states_updated(#state{}, boolean()) -> #state{}.
2133after_partition_states_updated(State, UpdaterWasRunning) ->
2134    State2 = case UpdaterWasRunning of
2135    true ->
2136        % Updater was running, we stopped it, updated the group we received
2137        % from the updater, updated that group's bitmasks and update seqs,
2138        % and now restart the updater with this modified group.
2139        start_updater(State);
2140    false ->
2141        State
2142    end,
2143    State3 = stop_compactor(State2),
2144    maybe_start_cleaner(State3).
2145
2146
2147-spec persist_partition_states(#state{},
2148                               ordsets:ordset(partition_id()),
2149                               ordsets:ordset(partition_id()),
2150                               ordsets:ordset(partition_id()),
2151                               #set_view_transition{} | 'nil',
2152                               ordsets:ordset(partition_id())) -> #state{}.
2153persist_partition_states(State, ActiveList, PassiveList, CleanupList, PendingTrans, ToBeUnindexable) ->
2154    % There can never be intersection between given active, passive and cleanup lists.
2155    % This check is performed elsewhere, outside the gen_server.
2156    #state{
2157        group = Group,
2158        replica_partitions = ReplicaParts,
2159        replica_group = ReplicaPid,
2160        update_listeners = Listeners,
2161        waiting_list = WaitList
2162    } = State,
2163    case ordsets:intersection(ActiveList, ReplicaParts) of
2164    [] ->
2165         ActiveList2 = ActiveList,
2166         PassiveList2 = PassiveList,
2167         ReplicasOnTransfer2 = ?set_replicas_on_transfer(Group),
2168         ReplicasToMarkActive = [];
2169    CommonRep ->
2170         PassiveList2 = ordsets:union(PassiveList, CommonRep),
2171         ActiveList2 = ordsets:subtract(ActiveList, CommonRep),
2172         ReplicasOnTransfer2 = ordsets:union(?set_replicas_on_transfer(Group), CommonRep),
2173         ReplicasToMarkActive = CommonRep
2174    end,
2175    case ordsets:intersection(PassiveList, ReplicasOnTransfer2) of
2176    [] ->
2177        ReplicasToCleanup = [],
2178        PassiveList3 = PassiveList2,
2179        ReplicasOnTransfer3 = ReplicasOnTransfer2;
2180    CommonRep2 ->
2181        ReplicasToCleanup = CommonRep2,
2182        PassiveList3 = ordsets:subtract(PassiveList2, CommonRep2),
2183        ReplicasOnTransfer3 = ordsets:subtract(ReplicasOnTransfer2, CommonRep2)
2184    end,
2185    case ordsets:intersection(CleanupList, ReplicasOnTransfer3) of
2186    [] ->
2187        ReplicaParts2 = ReplicaParts,
2188        ReplicasOnTransfer4 = ReplicasOnTransfer3,
2189        ReplicasToCleanup2 = ReplicasToCleanup;
2190    CommonRep3 ->
2191        ReplicaParts2 = ordsets:subtract(ReplicaParts, CommonRep3),
2192        ReplicasOnTransfer4 = ordsets:subtract(ReplicasOnTransfer3, CommonRep3),
2193        ReplicasToCleanup2 = ordsets:union(ReplicasToCleanup, CommonRep3)
2194    end,
2195    {ok, NewAbitmask1, NewPbitmask1, NewSeqs1, NewVersions1} =
2196        set_active_partitions(
2197            ActiveList2,
2198            ?set_abitmask(Group),
2199            ?set_pbitmask(Group),
2200            ?set_seqs(Group),
2201            ?set_partition_versions(Group)),
2202    {ok, NewAbitmask2, NewPbitmask2, NewSeqs2, NewVersions2} =
2203        set_passive_partitions(
2204            PassiveList3,
2205            NewAbitmask1,
2206            NewPbitmask1,
2207            NewSeqs1,
2208            NewVersions1),
2209    {ok, NewAbitmask3, NewPbitmask3, NewCbitmask3, NewSeqs3, NewVersions3} =
2210        set_cleanup_partitions(
2211            CleanupList,
2212            NewAbitmask2,
2213            NewPbitmask2,
2214            ?set_cbitmask(Group),
2215            NewSeqs2,
2216            NewVersions2),
2217    {NewSeqs4, NewUnindexableSeqs, NewVersions4} = lists:foldl(
2218        fun(PartId, {AccSeqs, AccUnSeqs, AccVersions}) ->
2219            PartSeq = couch_set_view_util:get_part_seq(PartId, AccSeqs),
2220            AccSeqs2 = orddict:erase(PartId, AccSeqs),
2221            AccUnSeqs2 = orddict:store(PartId, PartSeq, AccUnSeqs),
2222            AccVersions2 = orddict:erase(PartId, AccVersions),
2223            {AccSeqs2, AccUnSeqs2, AccVersions2}
2224        end,
2225        {NewSeqs3, ?set_unindexable_seqs(Group), NewVersions3},
2226        ToBeUnindexable),
2227    State2 = update_header(
2228        State,
2229        NewAbitmask3,
2230        NewPbitmask3,
2231        NewCbitmask3,
2232        NewSeqs4,
2233        NewUnindexableSeqs,
2234        ReplicasOnTransfer4,
2235        ReplicaParts2,
2236        PendingTrans,
2237        NewVersions4),
2238    % A crash might happen between updating our header and updating the state of
2239    % replica view group. The init function must detect and correct this.
2240    ok = set_state(ReplicaPid, ReplicasToMarkActive, [], ReplicasToCleanup2),
2241    % Need to update list of active partition sequence numbers for every blocked client.
2242    WaitList2 = update_waiting_list(
2243        WaitList, State, ActiveList2, PassiveList3, CleanupList),
2244    State3 = State2#state{waiting_list = WaitList2},
2245    case (dict:size(Listeners) > 0) andalso (CleanupList /= []) of
2246    true ->
2247        Listeners2 = dict:filter(
2248            fun(Ref, Listener) ->
2249                #up_listener{
2250                    pid = Pid,
2251                    monref = MonRef,
2252                    partition = PartId
2253                } = Listener,
2254                case lists:member(PartId, CleanupList) of
2255                true ->
2256                    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2257                              " replying to partition ~p"
2258                              " update monitor, reference ~p,"
2259                              " error: marked_for_cleanup",
2260                              [?set_name(State), ?type(State),
2261                               ?category(State), ?group_id(State),
2262                               Ref, PartId]),
2263                    Pid ! {Ref, marked_for_cleanup},
2264                    erlang:demonitor(MonRef, [flush]),
2265                    false;
2266                false ->
2267                    true
2268                end
2269            end,
2270            Listeners),
2271        State3#state{update_listeners = Listeners2};
2272    false ->
2273        State3
2274    end.
2275
2276
2277-spec update_waiting_list([#waiter{}],
2278                          #state{},
2279                          ordsets:ordset(partition_id()),
2280                          ordsets:ordset(partition_id()),
2281                          ordsets:ordset(partition_id())) -> [#waiter{}].
2282update_waiting_list([], _State, _AddActiveList, _AddPassiveList, _AddCleanupList) ->
2283    [];
2284update_waiting_list(WaitList, State, AddActiveList, AddPassiveList, AddCleanupList) ->
2285    {ok, AddActiveSeqs} = get_seqs(State, AddActiveList),
2286    RemoveSet = ordsets:union(AddPassiveList, AddCleanupList),
2287    MapFun = fun(W) -> update_waiter_seqs(W, AddActiveSeqs, RemoveSet) end,
2288    [MapFun(W) || W <- WaitList].
2289
2290
2291-spec update_waiter_seqs(#waiter{},
2292                         partition_seqs(),
2293                         ordsets:ordset(partition_id())) -> #waiter{}.
2294update_waiter_seqs(Waiter, AddActiveSeqs, ToRemove) ->
2295    Seqs2 = lists:foldl(
2296        fun({PartId, Seq}, Acc) ->
2297            case couch_set_view_util:has_part_seq(PartId, Acc) of
2298            true ->
2299                Acc;
2300            false ->
2301                orddict:store(PartId, Seq, Acc)
2302            end
2303        end,
2304        Waiter#waiter.seqs, AddActiveSeqs),
2305    Seqs3 = lists:foldl(
2306        fun(PartId, Acc) -> orddict:erase(PartId, Acc) end,
2307        Seqs2, ToRemove),
2308    Waiter#waiter{seqs = Seqs3}.
2309
2310
2311-spec maybe_apply_pending_transition(#state{}) -> #state{}.
2312maybe_apply_pending_transition(State) when not ?have_pending_transition(State) ->
2313    State;
2314maybe_apply_pending_transition(State) ->
2315    State2 = stop_cleaner(State),
2316    #state{group = Group3} = State3 = stop_updater(State2),
2317    UpdaterWasRunning = is_pid(State#state.updater_pid),
2318    #set_view_transition{
2319        active = ActivePending,
2320        passive = PassivePending,
2321        unindexable = UnindexablePending
2322    } = get_pending_transition(State),
2323    ActiveInCleanup = partitions_still_in_cleanup(ActivePending, Group3),
2324    PassiveInCleanup = partitions_still_in_cleanup(PassivePending, Group3),
2325    ApplyActiveList = ordsets:subtract(ActivePending, ActiveInCleanup),
2326    ApplyPassiveList = ordsets:subtract(PassivePending, PassiveInCleanup),
2327    {ApplyUnindexableList, NewUnindexablePending} = lists:partition(
2328        fun(P) ->
2329            lists:member(P, ApplyActiveList) orelse
2330            lists:member(P, ApplyPassiveList)
2331        end,
2332        UnindexablePending),
2333    case (ApplyActiveList /= []) orelse (ApplyPassiveList /= []) of
2334    true ->
2335        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2336                  " applying state transitions from pending transition:~n"
2337                  "  Active partitions:  ~w~n"
2338                  "  Passive partitions: ~w~n"
2339                  "  Unindexable:        ~w~n",
2340                  [?set_name(State), ?type(State), ?category(State),
2341                   ?group_id(State), ApplyActiveList, ApplyPassiveList,
2342                   ApplyUnindexableList]),
2343        case (ActiveInCleanup == []) andalso (PassiveInCleanup == []) of
2344        true ->
2345            NewPendingTrans = nil;
2346        false ->
2347            NewPendingTrans = #set_view_transition{
2348                active = ActiveInCleanup,
2349                passive = PassiveInCleanup,
2350                unindexable = NewUnindexablePending
2351            }
2352        end,
2353        State4 = set_pending_transition(State3, NewPendingTrans),
2354        State5 = persist_partition_states(
2355            State4, ApplyActiveList, ApplyPassiveList, [], NewPendingTrans, ApplyUnindexableList),
2356        State6 = notify_pending_transition_waiters(State5),
2357        NewState = case dict:size(State6#state.update_listeners) > 0 of
2358        true ->
2359            start_updater(State6);
2360        false ->
2361            State6
2362        end;
2363    false ->
2364        NewState = State3
2365    end,
2366    after_partition_states_updated(NewState, UpdaterWasRunning).
2367
2368
2369-spec notify_pending_transition_waiters(#state{}) -> #state{}.
2370notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State) ->
2371    State;
2372notify_pending_transition_waiters(State) ->
2373    #state{
2374        pending_transition_waiters = TransWaiters,
2375        group = Group,
2376        replica_partitions = RepParts,
2377        waiting_list = WaitList
2378    } = State,
2379    CurSeqs = active_partition_seqs(State),
2380    {TransWaiters2, WaitList2, GroupReplyList, TriggerGroupUpdate} =
2381        lists:foldr(
2382            fun({From, Req} = TransWaiter, {AccTrans, AccWait, ReplyAcc, AccTriggerUp}) ->
2383                #set_view_group_req{
2384                    stale = Stale,
2385                    debug = Debug
2386                } = Req,
2387                case is_any_partition_pending(Req, Group) of
2388                true ->
2389                    {[TransWaiter | AccTrans], AccWait, ReplyAcc, AccTriggerUp};
2390                false when Stale == ok ->
2391                    Waiter = #waiter{from = From, debug = Debug},
2392                    {AccTrans, AccWait, [Waiter | ReplyAcc], AccTriggerUp};
2393                false when Stale == update_after ->
2394                    Waiter = #waiter{from = From, debug = Debug},
2395                    {AccTrans, AccWait, [Waiter | ReplyAcc], true};
2396                false when Stale == false ->
2397                    Waiter = #waiter{from = From, debug = Debug, seqs = CurSeqs},
2398                    {AccTrans, [Waiter | AccWait], ReplyAcc, true}
2399                end
2400            end,
2401            {[], WaitList, [], false},
2402            TransWaiters),
2403    [] = reply_with_group(Group, RepParts, GroupReplyList),
2404    WaitList3 = reply_with_group(Group, RepParts, WaitList2),
2405    State2 = State#state{
2406        pending_transition_waiters = TransWaiters2,
2407        waiting_list = WaitList3
2408    },
2409    case TriggerGroupUpdate of
2410    true ->
2411        start_updater(State2);
2412    false ->
2413        State2
2414    end.
2415
2416
2417-spec notify_pending_transition_waiters(#state{}, term()) -> #state{}.
2418notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State, _Reply) ->
2419    State;
2420notify_pending_transition_waiters(#state{pending_transition_waiters = Waiters} = State, Reply) ->
2421    lists:foreach(fun(F) -> catch gen_server:reply(F, Reply) end, Waiters),
2422    State#state{pending_transition_waiters = []}.
2423
2424
2425-spec set_passive_partitions(ordsets:ordset(partition_id()),
2426                             bitmask(),
2427                             bitmask(),
2428                             partition_seqs(),
2429                             partition_versions()) ->
2430                                    {'ok', bitmask(), bitmask(),
2431                                     partition_seqs(), partition_versions()}.
2432set_passive_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2433    {ok, Abitmask, Pbitmask, Seqs, Versions};
2434
2435set_passive_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2436    PartMask = 1 bsl PartId,
2437    case PartMask band Abitmask of
2438    0 ->
2439        case PartMask band Pbitmask of
2440        PartMask ->
2441            set_passive_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2442        0 ->
2443            NewSeqs = lists:ukeymerge(1, [{PartId, 0}], Seqs),
2444            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2445            set_passive_partitions(
2446                Rest, Abitmask, Pbitmask bor PartMask, NewSeqs, NewVersions)
2447        end;
2448    PartMask ->
2449        set_passive_partitions(
2450            Rest, Abitmask bxor PartMask, Pbitmask bor PartMask, Seqs,
2451            Versions)
2452    end.
2453
2454
2455-spec set_active_partitions(ordsets:ordset(partition_id()),
2456                            bitmask(),
2457                            bitmask(),
2458                            partition_seqs(),
2459                            partition_versions()) ->
2460                                   {'ok', bitmask(), bitmask(),
2461                                    partition_seqs(), partition_versions()}.
2462set_active_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2463    {ok, Abitmask, Pbitmask, Seqs, Versions};
2464
2465set_active_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2466    PartMask = 1 bsl PartId,
2467    case PartMask band Pbitmask of
2468    0 ->
2469        case PartMask band Abitmask of
2470        PartMask ->
2471            set_active_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2472        0 ->
2473            NewSeqs = lists:ukeymerge(1, Seqs, [{PartId, 0}]),
2474            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2475            set_active_partitions(Rest, Abitmask bor PartMask, Pbitmask,
2476                NewSeqs, NewVersions)
2477        end;
2478    PartMask ->
2479        set_active_partitions(
2480            Rest, Abitmask bor PartMask, Pbitmask bxor PartMask, Seqs,
2481            Versions)
2482    end.
2483
2484
2485-spec set_cleanup_partitions(ordsets:ordset(partition_id()),
2486                             bitmask(),
2487                             bitmask(),
2488                             bitmask(),
2489                             partition_seqs(),
2490                             partition_versions()) ->
2491                                    {'ok', bitmask(), bitmask(), bitmask(),
2492                                     partition_seqs(), partition_versions()}.
2493set_cleanup_partitions([], Abitmask, Pbitmask, Cbitmask, Seqs, Versions) ->
2494    {ok, Abitmask, Pbitmask, Cbitmask, Seqs, Versions};
2495
2496set_cleanup_partitions([PartId | Rest], Abitmask, Pbitmask, Cbitmask, Seqs,
2497        Versions) ->
2498    PartMask = 1 bsl PartId,
2499    case PartMask band Cbitmask of
2500    PartMask ->
2501        set_cleanup_partitions(
2502            Rest, Abitmask, Pbitmask, Cbitmask, Seqs, Versions);
2503    0 ->
2504        Seqs2 = lists:keydelete(PartId, 1, Seqs),
2505        Versions2 = lists:keydelete(PartId, 1, Versions),
2506        Cbitmask2 = Cbitmask bor PartMask,
2507        case PartMask band Abitmask of
2508        PartMask ->
2509            set_cleanup_partitions(
2510                Rest, Abitmask bxor PartMask, Pbitmask, Cbitmask2, Seqs2,
2511                Versions2);
2512        0 ->
2513            case (PartMask band Pbitmask) of
2514            PartMask ->
2515                set_cleanup_partitions(
2516                    Rest, Abitmask, Pbitmask bxor PartMask, Cbitmask2, Seqs2,
2517                    Versions2);
2518            0 ->
2519                set_cleanup_partitions(
2520                    Rest, Abitmask, Pbitmask, Cbitmask,Seqs, Versions2)
2521            end
2522        end
2523    end.
2524
2525
2526-spec update_header(#state{},
2527                    bitmask(),
2528                    bitmask(),
2529                    bitmask(),
2530                    partition_seqs(),
2531                    partition_seqs(),
2532                    ordsets:ordset(partition_id()),
2533                    ordsets:ordset(partition_id()),
2534                    #set_view_transition{} | 'nil',
2535                    partition_versions()) -> #state{}.
2536update_header(State, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs,
2537              NewUnindexableSeqs, NewRelicasOnTransfer, NewReplicaParts,
2538              NewPendingTrans, NewPartVersions) ->
2539    #state{
2540        group = #set_view_group{
2541            index_header =
2542                #set_view_index_header{
2543                    abitmask = Abitmask,
2544                    pbitmask = Pbitmask,
2545                    cbitmask = Cbitmask,
2546                    replicas_on_transfer = ReplicasOnTransfer,
2547                    unindexable_seqs = UnindexableSeqs,
2548                    pending_transition = PendingTrans,
2549                    partition_versions = PartVersions
2550                } = Header
2551        } = Group,
2552        replica_partitions = ReplicaParts
2553    } = State,
2554    NewGroup = Group#set_view_group{
2555        index_header = Header#set_view_index_header{
2556            abitmask = NewAbitmask,
2557            pbitmask = NewPbitmask,
2558            cbitmask = NewCbitmask,
2559            seqs = NewSeqs,
2560            unindexable_seqs = NewUnindexableSeqs,
2561            replicas_on_transfer = NewRelicasOnTransfer,
2562            pending_transition = NewPendingTrans,
2563            partition_versions = NewPartVersions
2564        }
2565    },
2566    NewGroup2 = remove_duplicate_partitions(NewGroup),
2567    #set_view_group{
2568        index_header = #set_view_index_header{
2569            partition_versions = NewPartVersions2
2570        }
2571    } = NewGroup2,
2572    NewState = State#state{
2573        group = NewGroup2,
2574        replica_partitions = NewReplicaParts
2575    },
2576    FsyncHeader = (NewCbitmask /= Cbitmask),
2577    {ok, HeaderPos} = commit_header(NewState#state.group, FsyncHeader),
2578    NewGroup3 = (NewState#state.group)#set_view_group{
2579        header_pos = HeaderPos
2580    },
2581    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, partition states updated~n"
2582              "active partitions before:    ~w~n"
2583              "active partitions after:     ~w~n"
2584              "passive partitions before:   ~w~n"
2585              "passive partitions after:    ~w~n"
2586              "cleanup partitions before:   ~w~n"
2587              "cleanup partitions after:    ~w~n"
2588              "unindexable partitions:      ~w~n"
2589              "replica partitions before:   ~w~n"
2590              "replica partitions after:    ~w~n"
2591              "replicas on transfer before: ~w~n"
2592              "replicas on transfer after:  ~w~n"
2593              "pending transition before:~n"
2594              "  active:      ~w~n"
2595              "  passive:     ~w~n"
2596              "  unindexable: ~w~n"
2597              "pending transition after:~n"
2598              "  active:      ~w~n"
2599              "  passive:     ~w~n"
2600              "  unindexable: ~w~n"
2601              "partition versions before:~n~w~n"
2602              "partition versions after:~n~w~n",
2603              [?set_name(State), ?type(State), ?category(State),
2604               ?group_id(State),
2605               couch_set_view_util:decode_bitmask(Abitmask),
2606               couch_set_view_util:decode_bitmask(NewAbitmask),
2607               couch_set_view_util:decode_bitmask(Pbitmask),
2608               couch_set_view_util:decode_bitmask(NewPbitmask),
2609               couch_set_view_util:decode_bitmask(Cbitmask),
2610               couch_set_view_util:decode_bitmask(NewCbitmask),
2611               UnindexableSeqs,
2612               ReplicaParts,
2613               NewReplicaParts,
2614               ReplicasOnTransfer,
2615               NewRelicasOnTransfer,
2616               ?pending_transition_active(PendingTrans),
2617               ?pending_transition_passive(PendingTrans),
2618               ?pending_transition_unindexable(PendingTrans),
2619               ?pending_transition_active(NewPendingTrans),
2620               ?pending_transition_passive(NewPendingTrans),
2621               ?pending_transition_unindexable(NewPendingTrans),
2622               PartVersions,
2623               NewPartVersions2]),
2624    NewState#state{group = NewGroup3}.
2625
2626
2627-spec maybe_start_cleaner(#state{}) -> #state{}.
2628maybe_start_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
2629    State;
2630maybe_start_cleaner(#state{auto_cleanup = false} = State) ->
2631    State;
2632maybe_start_cleaner(#state{group = Group} = State) ->
2633    case is_pid(State#state.compactor_pid) orelse
2634        is_pid(State#state.updater_pid) orelse (?set_cbitmask(Group) == 0) of
2635    true ->
2636        State;
2637    false ->
2638        Cleaner = spawn_link(fun() -> exit(cleaner(State)) end),
2639        ?LOG_INFO("Started cleanup process ~p for"
2640                  " set view `~s`, ~s (~s) group `~s`",
2641                  [Cleaner, ?set_name(State), ?type(State), ?category(State),
2642                   ?group_id(State)]),
2643        State#state{cleaner_pid = Cleaner}
2644    end.
2645
2646
2647-spec stop_cleaner(#state{}) -> #state{}.
2648stop_cleaner(#state{cleaner_pid = nil} = State) ->
2649    State;
2650stop_cleaner(#state{cleaner_pid = Pid, group = Group} = State) when is_pid(Pid) ->
2651    MRef = erlang:monitor(process, Pid),
2652    Pid ! stop,
2653    unlink(Pid),
2654    ?LOG_INFO("Stopping cleanup process for set view `~s`, group `~s` (~s)",
2655              [?set_name(State), ?group_id(State), ?category(State)]),
2656    NewState = receive
2657    {'EXIT', Pid, Reason} ->
2658        after_cleaner_stopped(State, Reason);
2659    {'DOWN', MRef, process, Pid, Reason} ->
2660        receive {'EXIT', Pid, _} -> ok after 0 -> ok end,
2661        after_cleaner_stopped(State, Reason)
2662    after 5000 ->
2663        couch_set_view_util:shutdown_cleaner(Group, Pid),
2664        ok = couch_file:refresh_eof(Group#set_view_group.fd),
2665        ?LOG_ERROR("Timeout stopping cleanup process ~p for"
2666                   " set view `~s`, ~s (~s) group `~s`",
2667                   [Pid, ?set_name(State), ?type(State), ?category(State),
2668                    ?group_id(State)]),
2669        State#state{cleaner_pid = nil}
2670    end,
2671    erlang:demonitor(MRef, [flush]),
2672    NewState.
2673
2674
2675after_cleaner_stopped(State, {clean_group, CleanGroup, Count, Time}) ->
2676    #state{group = OldGroup} = State,
2677    {ok, NewGroup0} = couch_set_view_util:refresh_viewgroup_header(CleanGroup),
2678    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
2679    ?LOG_INFO("Stopped cleanup process for"
2680              " set view `~s`, ~s (~s) group `~s`.~n"
2681              "Removed ~p values from the index in ~.3f seconds~n"
2682              "New set of partitions to cleanup: ~w~n"
2683              "Old set of partitions to cleanup: ~w~n",
2684              [?set_name(State), ?type(State), ?category(State),
2685               ?group_id(State), Count, Time,
2686               couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup)),
2687               couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup))]),
2688    case ?set_cbitmask(NewGroup) of
2689    0 ->
2690        inc_cleanups(State#state.group, Time, Count, false);
2691    _ ->
2692        ?inc_cleanup_stops(State#state.group)
2693    end,
2694    State#state{
2695        group = NewGroup,
2696        cleaner_pid = nil
2697    };
2698after_cleaner_stopped(#state{cleaner_pid = Pid, group = Group} = State, Reason) ->
2699    ok = couch_file:refresh_eof(Group#set_view_group.fd),
2700    ?LOG_ERROR("Cleanup process ~p for set view `~s`, ~s (~s) group `~s`,"
2701               " died with reason: ~p",
2702               [Pid, ?set_name(State), ?type(State), ?category(State),
2703                ?group_id(State), Reason]),
2704    State#state{cleaner_pid = nil}.
2705
2706
2707-spec cleaner(#state{}) -> {'clean_group', #set_view_group{}, non_neg_integer(), float()}.
2708cleaner(#state{group = Group}) ->
2709    StartTime = os:timestamp(),
2710    {ok, NewGroup, TotalPurgedCount} = couch_set_view_util:cleanup_group(Group),
2711    Duration = timer:now_diff(os:timestamp(), StartTime) / 1000000,
2712    {clean_group, NewGroup, TotalPurgedCount, Duration}.
2713
2714
2715-spec indexable_partition_seqs(#state{}) -> partition_seqs().
2716indexable_partition_seqs(State) ->
2717    Partitions = group_partitions(State#state.group),
2718    {ok, Seqs} = get_seqs(State, Partitions),
2719    indexable_partition_seqs(State, Seqs).
2720
2721-spec indexable_partition_seqs(#state{}, partition_seqs()) -> partition_seqs().
2722indexable_partition_seqs(#state{group = Group}, Seqs) ->
2723    case ?set_unindexable_seqs(Group) of
2724    [] ->
2725        Seqs;
2726    _ ->
2727        IndexSeqs = ?set_seqs(Group),
2728        CurPartitions = [P || {P, _} <- IndexSeqs],
2729        ReplicasOnTransfer = ?set_replicas_on_transfer(Group),
2730        Partitions = ordsets:union(CurPartitions, ReplicasOnTransfer),
2731        % Index unindexable replicas on transfer though (as the reason for the
2732        % transfer is to become active and indexable).
2733        couch_set_view_util:filter_seqs(Partitions, Seqs)
2734    end.
2735
2736
2737-spec active_partition_seqs(#state{}) -> partition_seqs().
2738active_partition_seqs(#state{group = Group} = State) ->
2739    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
2740    {ok, CurSeqs} = get_seqs(State, ActiveParts),
2741    CurSeqs.
2742
2743-spec active_partition_seqs(#state{}, partition_seqs()) -> partition_seqs().
2744active_partition_seqs(#state{group = Group}, Seqs) ->
2745    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
2746    couch_set_view_util:filter_seqs(ActiveParts, Seqs).
2747
2748
2749-spec start_compactor(#state{}, compact_fun()) -> #state{}.
2750start_compactor(State, CompactFun) ->
2751    #state{group = Group} = State2 = stop_cleaner(State),
2752    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compaction starting",
2753              [?set_name(State2), ?type(State), ?category(State),
2754               ?group_id(State2)]),
2755    #set_view_group{
2756        fd = CompactFd
2757    } = NewGroup = compact_group(State2),
2758    Owner = self(),
2759    TmpDir = updater_tmp_dir(State2),
2760    Pid = spawn_link(fun() ->
2761        CompactFun(Group,
2762                   NewGroup,
2763                   TmpDir,
2764                   State#state.updater_pid,
2765                   Owner)
2766    end),
2767    State2#state{
2768        compactor_pid = Pid,
2769        compactor_fun = CompactFun,
2770        compactor_file = CompactFd,
2771        compact_log_files = nil,
2772        compactor_retry_number = 0
2773    }.
2774
2775
2776-spec stop_compactor(#state{}) -> #state{}.
2777stop_compactor(#state{compactor_pid = nil} = State) ->
2778    State;
2779stop_compactor(#state{compactor_pid = Pid, compactor_file = CompactFd} = State) ->
2780    couch_util:shutdown_sync(Pid),
2781    couch_util:shutdown_sync(CompactFd),
2782    CompactFile = compact_file_name(State),
2783    ok = couch_file:delete(?root_dir(State), CompactFile),
2784    case ?set_cbitmask(State#state.group) of
2785    0 ->
2786        ok;
2787    _ ->
2788        ?inc_cleanup_stops(State#state.group)
2789    end,
2790    inc_util_stat(#util_stats.compactor_interruptions, 1),
2791    State#state{
2792        compactor_pid = nil,
2793        compactor_file = nil,
2794        compact_log_files =