1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_group).
16-behaviour(gen_server).
17
18%% API
19-export([start_link/1, request_group_info/1, get_data_size/1]).
20-export([open_set_group/3]).
21-export([request_group/2, release_group/1]).
22-export([is_view_defined/1, define_view/2]).
23-export([set_state/4]).
24-export([add_replica_partitions/2, remove_replica_partitions/2]).
25-export([mark_as_unindexable/2, mark_as_indexable/2]).
26-export([monitor_partition_update/4, demonitor_partition_update/2]).
27-export([reset_utilization_stats/1, get_utilization_stats/1]).
28-export([inc_access_stat/1, remove_duplicate_partitions/1]).
29
30%% gen_server callbacks
31-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
32         terminate/2, code_change/3]).
33
34-include("couch_db.hrl").
35-include_lib("couch_set_view/include/couch_set_view.hrl").
36
37-type init_args()   :: {string(), binary(), #set_view_group{}}.
38-type compact_fun() :: fun((#set_view_group{},
39                            #set_view_group{},
40                            string(),
41                            pid() | 'nil',
42                            pid()) -> no_return()).
43-type monitor_error() :: {'shutdown', any()} |
44                         'marked_for_cleanup' |
45                         {'updater_error', any()}.
46
47-define(DEFAULT_TIMEOUT, 3000).
48-define(GET_TIMEOUT(State), ((State)#state.timeout)).
49
50-define(root_dir(State), element(1, State#state.init_args)).
51-define(set_name(State), element(2, State#state.init_args)).
52-define(type(State), (element(3, State#state.init_args))#set_view_group.type).
53-define(group_sig(State), (element(3, State#state.init_args))#set_view_group.sig).
54-define(group_id(State), (State#state.group)#set_view_group.name).
55-define(dcp_pid(State), (State#state.group)#set_view_group.dcp_pid).
56-define(category(State), (State#state.group)#set_view_group.category).
57-define(is_defined(State),
58    (((State#state.group)#set_view_group.index_header)#set_view_index_header.num_partitions > 0)).
59-define(replicas_on_transfer(State),
60        ((State#state.group)#set_view_group.index_header)#set_view_index_header.replicas_on_transfer).
61-define(have_pending_transition(State),
62        ((((State#state.group)#set_view_group.index_header)
63          #set_view_index_header.pending_transition) /= nil)).
64
65-define(MAX_HIST_SIZE, 10).
66% flow control buffer size 20 MB
67-define(DCP_CONTROL_BUFFER_SIZE, "20971520").
68
69% Seqs cache ttl in microseconds
70-define(SEQS_CACHE_TTL, 300000).
71
72-record(util_stats, {
73    useful_indexing_time = 0.0  :: float(),
74    wasted_indexing_time = 0.0  :: float(),
75    updates = 0                 :: non_neg_integer(),
76    updater_interruptions = 0   :: non_neg_integer(),
77    compaction_time = 0.0       :: float(),
78    compactions = 0             :: non_neg_integer(),
79    compactor_interruptions = 0 :: non_neg_integer()
80}).
81
82-record(up_listener, {
83    pid,
84    monref,
85    partition,
86    seq
87}).
88
89-record(waiter, {
90    from,
91    debug = false :: boolean(),
92    % seqs for active partitions only
93    seqs = []     :: partition_seqs()
94}).
95
96-record(seqs_cache, {
97    timestamp = {0, 0, 0}              :: timer:time(),
98    is_waiting = false                 :: boolean(),
99    seqs = []                          :: partition_seqs()
100}).
101
102-record(state, {
103    init_args                          :: init_args(),
104    replica_group = nil                :: 'nil' | pid(),
105    group = #set_view_group{}          :: #set_view_group{},
106    updater_pid = nil                  :: 'nil' | pid(),
107    initial_build = false              :: boolean(),
108    updater_state = not_running        :: set_view_updater_state() | 'not_running' | 'starting',
109    compactor_pid = nil                :: 'nil' | pid(),
110    compactor_file = nil               :: 'nil' | pid(),
111    compactor_fun = nil                :: 'nil' | compact_fun(),
112    compactor_retry_number = 0         :: non_neg_integer(),
113    waiting_list = []                  :: [#waiter{}],
114    cleaner_pid = nil                  :: 'nil' | pid(),
115    shutdown = false                   :: boolean(),
116    shutdown_aliases                   :: [binary()],
117    auto_cleanup = true                :: boolean(),
118    auto_transfer_replicas = true      :: boolean(),
119    replica_partitions = []            :: ordsets:ordset(partition_id()),
120    pending_transition_waiters = []    :: [{From::{pid(), reference()}, #set_view_group_req{}}],
121    update_listeners = dict:new()      :: dict(),
122    compact_log_files = nil            :: 'nil' | {[[string()]], partition_seqs(), partition_versions()},
123    timeout = ?DEFAULT_TIMEOUT         :: non_neg_integer() | 'infinity'
124}).
125
126-define(inc_stat(Group, S),
127    ets:update_counter(
128        Group#set_view_group.stats_ets,
129        ?set_view_group_stats_key(Group),
130        {S, 1})).
131-define(inc_cleanup_stops(Group), ?inc_stat(Group, #set_view_group_stats.cleanup_stops)).
132-define(inc_updater_errors(Group), ?inc_stat(Group, #set_view_group_stats.update_errors)).
133-define(inc_accesses(Group), ?inc_stat(Group, #set_view_group_stats.accesses)).
134
135% Same as in couch_file. That's the offset where headers
136% are stored
137-define(SIZE_BLOCK, 4096).
138
139
140% api methods
141-spec request_group(pid(), #set_view_group_req{}) ->
142                   {'ok', #set_view_group{}} | {'error', term()}.
143request_group(Pid, Req) ->
144    #set_view_group_req{wanted_partitions = WantedPartitions} = Req,
145    Req2 = Req#set_view_group_req{
146        wanted_partitions = ordsets:from_list(WantedPartitions)
147    },
148    request_group(Pid, Req2, 1).
149
150-spec request_group(pid(), #set_view_group_req{}, non_neg_integer()) ->
151                   {'ok', #set_view_group{}} | {'error', term()}.
152request_group(Pid, Req, Retries) ->
153    case gen_server:call(Pid, Req, infinity) of
154    {ok, Group, ActiveReplicasBitmask} ->
155        #set_view_group{
156            ref_counter = RefCounter,
157            replica_pid = RepPid,
158            name = GroupName,
159            set_name = SetName,
160            category = Category
161        } = Group,
162        case request_replica_group(RepPid, ActiveReplicasBitmask, Req) of
163        {ok, RepGroup} ->
164            {ok, Group#set_view_group{replica_group = RepGroup}};
165        retry ->
166            couch_ref_counter:drop(RefCounter),
167            ?LOG_INFO("Retrying group `~s` (~s) request, stale=~s,"
168                      " set `~s`, retry attempt #~p",
169                      [GroupName, Category, Req#set_view_group_req.stale,
170                       SetName,Retries]),
171            request_group(Pid, Req, Retries + 1)
172        end;
173    Error ->
174        Error
175    end.
176
177
178-spec request_replica_group(pid(), bitmask(), #set_view_group_req{}) ->
179                           {'ok', #set_view_group{} | 'nil'} | 'retry'.
180request_replica_group(_RepPid, 0, _Req) ->
181    {ok, nil};
182request_replica_group(RepPid, ActiveReplicasBitmask, Req) ->
183    {ok, RepGroup, 0} = gen_server:call(RepPid, Req, infinity),
184    case ?set_abitmask(RepGroup) =:= ActiveReplicasBitmask of
185    true ->
186        {ok, RepGroup};
187    false ->
188        couch_ref_counter:drop(RepGroup#set_view_group.ref_counter),
189        retry
190    end.
191
192
193-spec release_group(#set_view_group{}) -> ok.
194release_group(#set_view_group{ref_counter = RefCounter, replica_group = RepGroup}) ->
195    couch_ref_counter:drop(RefCounter),
196    case RepGroup of
197    #set_view_group{ref_counter = RepRefCounter} ->
198        couch_ref_counter:drop(RepRefCounter);
199    nil ->
200        ok
201    end.
202
203
204-spec request_group_info(pid()) -> {'ok', [{term(), term()}]}.
205request_group_info(Pid) ->
206    case gen_server:call(Pid, request_group_info, infinity) of
207    {ok, GroupInfoList} ->
208        {ok, GroupInfoList};
209    Error ->
210        throw(Error)
211    end.
212
213
214-spec get_data_size(pid()) -> {'ok', [{term(), term()}]}.
215get_data_size(Pid) ->
216    case gen_server:call(Pid, get_data_size, infinity) of
217    {ok, _Info} = Ok ->
218        Ok;
219    Error ->
220        throw(Error)
221    end.
222
223
224-spec define_view(pid(), #set_view_params{}) -> 'ok' | {'error', term()}.
225define_view(Pid, Params) ->
226    #set_view_params{
227        max_partitions = NumPartitions,
228        active_partitions = ActivePartitionsList,
229        passive_partitions = PassivePartitionsList,
230        use_replica_index = UseReplicaIndex
231    } = Params,
232    ActiveList = lists:usort(ActivePartitionsList),
233    ActiveBitmask = couch_set_view_util:build_bitmask(ActiveList),
234    PassiveList = lists:usort(PassivePartitionsList),
235    PassiveBitmask = couch_set_view_util:build_bitmask(PassiveList),
236    case (ActiveBitmask band PassiveBitmask) /= 0 of
237    true ->
238        throw({bad_view_definition,
239            <<"Intersection between active and passive bitmasks">>});
240    false ->
241        ok
242    end,
243    gen_server:call(
244        Pid, {define_view, NumPartitions, ActiveList, ActiveBitmask,
245            PassiveList, PassiveBitmask, UseReplicaIndex}, infinity).
246
247
248-spec is_view_defined(pid()) -> boolean().
249is_view_defined(Pid) ->
250    gen_server:call(Pid, is_view_defined, infinity).
251
252
253-spec set_state(pid(),
254                ordsets:ordset(partition_id()),
255                ordsets:ordset(partition_id()),
256                ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
257set_state(_Pid, [], [], []) ->
258    ok;
259set_state(Pid, Active, Passive, Cleanup) ->
260    ordsets:is_set(Active) orelse throw({error, <<"Active list is not an ordset">>}),
261    ordsets:is_set(Passive) orelse throw({error, <<"Passive list is not an ordset">>}),
262    case ordsets:intersection(Active, Passive) of
263    [] ->
264        ordsets:is_set(Cleanup) orelse throw({error, <<"Cleanup list is not an ordset">>}),
265        case ordsets:intersection(Active, Cleanup) of
266        [] ->
267            case ordsets:intersection(Passive, Cleanup) of
268            [] ->
269                gen_server:call(
270                    Pid, {set_state, Active, Passive, Cleanup}, infinity);
271            _ ->
272                {error,
273                    <<"Intersection between passive and cleanup partition lists">>}
274            end;
275        _ ->
276            {error, <<"Intersection between active and cleanup partition lists">>}
277        end;
278    _ ->
279        {error, <<"Intersection between active and passive partition lists">>}
280    end.
281
282
283-spec add_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
284add_replica_partitions(_Pid, []) ->
285    ok;
286add_replica_partitions(Pid, Partitions) ->
287    BitMask = couch_set_view_util:build_bitmask(Partitions),
288    gen_server:call(Pid, {add_replicas, BitMask}, infinity).
289
290
291-spec remove_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
292remove_replica_partitions(_Pid, []) ->
293    ok;
294remove_replica_partitions(Pid, Partitions) ->
295    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
296    gen_server:call(Pid, {remove_replicas, Partitions}, infinity).
297
298
299-spec mark_as_unindexable(pid(), ordsets:ordset(partition_id())) ->
300                                 'ok' | {'error', term()}.
301mark_as_unindexable(Pid, Partitions) ->
302    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
303    gen_server:call(Pid, {mark_as_unindexable, Partitions}, infinity).
304
305
306-spec mark_as_indexable(pid(), ordsets:ordset(partition_id())) ->
307                               'ok' | {'error', term()}.
308mark_as_indexable(Pid, Partitions) ->
309    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
310    gen_server:call(Pid, {mark_as_indexable, Partitions}, infinity).
311
312
313-spec monitor_partition_update(pid(), partition_id(), reference(), pid()) ->
314                               'ok' | {'error', term()}.
315monitor_partition_update(Pid, PartitionId, Ref, CallerPid) ->
316    gen_server:call(
317        Pid, {monitor_partition_update, PartitionId, Ref, CallerPid}, infinity).
318
319
320-spec demonitor_partition_update(pid(), reference()) -> 'ok'.
321demonitor_partition_update(Pid, Ref) ->
322    ok = gen_server:call(Pid, {demonitor_partition_update, Ref}, infinity).
323
324
325-spec reset_utilization_stats(pid()) -> 'ok'.
326reset_utilization_stats(Pid) ->
327    ok = gen_server:call(Pid, reset_utilization_stats, infinity).
328
329
330-spec get_utilization_stats(pid()) -> {'ok', [{atom() | binary(), term()}]}.
331get_utilization_stats(Pid) ->
332    gen_server:call(Pid, get_utilization_stats, infinity).
333
334-spec inc_access_stat(pid()) -> 'ok'.
335inc_access_stat(Pid) ->
336    gen_server:call(Pid, increment_stat, infinity).
337
338start_link({RootDir, SetName, Group}) ->
339    Args = {RootDir, SetName, Group#set_view_group{type = main}},
340    proc_lib:start_link(?MODULE, init, [Args]).
341
342
343init({_, _, Group} = InitArgs) ->
344    process_flag(trap_exit, true),
345    {ok, State} = try
346        do_init(InitArgs)
347    catch
348    _:Error ->
349        % An error might occur before the file handler is associated with the
350        % group. There's a message once the file handler is created, so that
351        % we can close the file and make the write guard happy.
352        receive
353        {group_fd, Fd} ->
354                couch_file:close(Fd)
355        after 0 ->
356            ok
357        end,
358        ?LOG_ERROR("~s error opening set view group `~s` (~s), signature `~s',"
359                   " from set `~s`: ~p",
360                   [?MODULE, Group#set_view_group.name,
361                    Group#set_view_group.category, hex_sig(Group),
362                    Group#set_view_group.set_name, Error]),
363        exit(Error)
364    end,
365    proc_lib:init_ack({ok, self()}),
366    gen_server:enter_loop(?MODULE, [], State, 1).
367
368
369do_init({_, SetName, _} = InitArgs) ->
370    case prepare_group(InitArgs, false) of
371    {ok, Group} ->
372        #set_view_group{
373            fd = Fd,
374            index_header = Header,
375            type = Type,
376            category = Category,
377            mod = Mod
378        } = Group,
379        % Send the file handler to yourself, so that in case of an error
380        % we can clean up the write guard properly
381        self() ! {group_fd, Fd},
382        RefCounter = new_fd_ref_counter(Fd),
383        case Header#set_view_index_header.has_replica of
384        false ->
385            ReplicaPid = nil,
386            ReplicaParts = [];
387        true ->
388            ReplicaPid = open_replica_group(InitArgs),
389            maybe_fix_replica_group(ReplicaPid, Group),
390            ReplicaParts = get_replica_partitions(ReplicaPid)
391        end,
392        ViewCount = length(Group#set_view_group.views),
393        case Header#set_view_index_header.num_partitions > 0 of
394        false ->
395            ?LOG_INFO("Started undefined ~s (~s) set view group `~s`,"
396                      " group `~s`,  signature `~s', view count: ~p",
397                      [Type, Category, SetName,
398                       Group#set_view_group.name, hex_sig(Group), ViewCount]);
399        true ->
400            ?LOG_INFO("Started ~s (~s) set view group `~s`, group `~s`,"
401                      " signature `~s', view count ~p~n"
402                      "active partitions:      ~w~n"
403                      "passive partitions:     ~w~n"
404                      "cleanup partitions:     ~w~n"
405                      "unindexable partitions: ~w~n"
406                      "~sreplica support~n" ++
407                      case Header#set_view_index_header.has_replica of
408                      true ->
409                          "replica partitions: ~w~n"
410                          "replica partitions on transfer: ~w~n";
411                      false ->
412                          ""
413                      end,
414                      [Type, Category, SetName, Group#set_view_group.name,
415                       hex_sig(Group), ViewCount,
416                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.abitmask),
417                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.pbitmask),
418                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.cbitmask),
419                       Header#set_view_index_header.unindexable_seqs,
420                       case Header#set_view_index_header.has_replica of
421                       true ->
422                           "";
423                       false ->
424                           "no "
425                       end] ++
426                       case Header#set_view_index_header.has_replica of
427                       true ->
428                           [ReplicaParts, ?set_replicas_on_transfer(Group)];
429                       false ->
430                           []
431                       end)
432        end,
433        DcpName = <<(atom_to_binary(Mod, latin1))/binary, ": ",
434            SetName/binary, " ", (Group#set_view_group.name)/binary,
435            " (", (atom_to_binary(Category, latin1))/binary, "/",
436            (atom_to_binary(Type, latin1))/binary, ")">>,
437        {User, Passwd} = get_auth(),
438        DcpBufferSize = list_to_integer(couch_config:get("dcp",
439            "flow_control_buffer_size", ?DCP_CONTROL_BUFFER_SIZE)),
440        ?LOG_INFO("Flow control buffer size is ~p bytes", [DcpBufferSize]),
441
442        case couch_dcp_client:start(DcpName, SetName, User, Passwd,
443            DcpBufferSize) of
444        {ok, DcpPid} ->
445            Group2 = maybe_upgrade_header(Group, DcpPid),
446            State = #state{
447                init_args = InitArgs,
448                replica_group = ReplicaPid,
449                replica_partitions = ReplicaParts,
450                group = Group2#set_view_group{
451                    ref_counter = RefCounter,
452                    replica_pid = ReplicaPid,
453                    dcp_pid = DcpPid
454                }
455            },
456            init_seqs_cache(),
457            true = ets:insert(
458                 Group#set_view_group.stats_ets,
459                 #set_view_group_stats{ets_key = ?set_view_group_stats_key(Group)}),
460            TmpDir = updater_tmp_dir(State),
461            ok = couch_set_view_util:delete_sort_files(TmpDir, all),
462            reset_util_stats(),
463            {ok, maybe_apply_pending_transition(State)};
464        Error ->
465            couch_file:close(Fd),
466            throw(Error)
467        end;
468    Error ->
469        throw(Error)
470    end.
471
472handle_call(get_sig, _From, #state{group = Group} = State) ->
473    {reply, {ok, Group#set_view_group.sig}, State, ?GET_TIMEOUT(State)};
474
475handle_call({set_auto_cleanup, Enabled}, _From, State) ->
476    % To be used only by unit tests.
477    {reply, ok, State#state{auto_cleanup = Enabled}, ?GET_TIMEOUT(State)};
478
479handle_call({set_timeout, T}, _From, State) ->
480    % To be used only by unit tests.
481    {reply, ok, State#state{timeout = T}, T};
482
483handle_call({set_auto_transfer_replicas, Enabled}, _From, State) ->
484    % To be used only by unit tests.
485    {reply, ok, State#state{auto_transfer_replicas = Enabled}, ?GET_TIMEOUT(State)};
486
487handle_call({define_view, NumPartitions, _, _, _, _, _}, _From, State)
488        when (not ?is_defined(State)), NumPartitions > ?MAX_NUM_PARTITIONS ->
489    {reply, {error, <<"Too high value for number of partitions">>}, State};
490
491handle_call({define_view, NumPartitions, ActiveList, ActiveBitmask,
492        PassiveList, PassiveBitmask, UseReplicaIndex}, _From, State) when not ?is_defined(State) ->
493    #state{init_args = InitArgs, group = Group} = State,
494    PartitionsList = lists:usort(ActiveList ++ PassiveList),
495    Seqs = lists:map(
496        fun(PartId) -> {PartId, 0} end, PartitionsList),
497    PartVersions = lists:map(
498        fun(PartId) -> {PartId, [{0, 0}]} end, PartitionsList),
499    #set_view_group{
500        name = DDocId,
501        index_header = Header
502    } = Group,
503    NewHeader = Header#set_view_index_header{
504        num_partitions = NumPartitions,
505        abitmask = ActiveBitmask,
506        pbitmask = PassiveBitmask,
507        seqs = Seqs,
508        has_replica = UseReplicaIndex,
509        partition_versions = PartVersions
510    },
511    case (?type(State) =:= main) andalso UseReplicaIndex of
512    false ->
513        ReplicaPid = nil;
514    true ->
515        ReplicaPid = open_replica_group(InitArgs),
516        ok = gen_server:call(ReplicaPid, {define_view, NumPartitions, [], 0, [], 0, false}, infinity)
517    end,
518    NewGroup = Group#set_view_group{
519        index_header = NewHeader,
520        replica_pid = ReplicaPid
521    },
522    State2 = State#state{
523        group = NewGroup,
524        replica_group = ReplicaPid
525    },
526    {ok, HeaderPos} = commit_header(NewGroup),
527    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
528              " configured with:~n"
529              "~p partitions~n"
530              "~sreplica support~n"
531              "initial active partitions ~w~n"
532              "initial passive partitions ~w",
533              [?set_name(State), ?type(State), ?category(State), DDocId,
534               hex_sig(Group), NumPartitions,
535               case UseReplicaIndex of
536               true ->  "";
537               false -> "no "
538               end,
539        ActiveList, PassiveList]),
540    NewGroup2 = (State2#state.group)#set_view_group{
541        header_pos = HeaderPos
542    },
543    State3 = State2#state{
544        group = NewGroup2
545    },
546    {reply, ok, State3, ?GET_TIMEOUT(State3)};
547
548handle_call({define_view, _, _, _, _, _, _}, _From, State) ->
549    {reply, view_already_defined, State, ?GET_TIMEOUT(State)};
550
551handle_call(is_view_defined, _From, State) ->
552    {reply, ?is_defined(State), State, ?GET_TIMEOUT(State)};
553
554handle_call(_Msg, _From, State) when not ?is_defined(State) ->
555    {reply, {error, view_undefined}, State};
556
557handle_call({set_state, ActiveList, PassiveList, CleanupList}, _From, State) ->
558    try
559        NewState = maybe_update_partition_states(
560            ActiveList, PassiveList, CleanupList, State),
561        {reply, ok, NewState, ?GET_TIMEOUT(NewState)}
562    catch
563    throw:Error ->
564        {reply, Error, State}
565    end;
566
567handle_call({add_replicas, BitMask}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
568    #state{
569        group = Group,
570        replica_partitions = ReplicaParts
571    } = State,
572    BitMask2 = case BitMask band ?set_abitmask(Group) of
573    0 ->
574        BitMask;
575    Common1 ->
576        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
577                  " set partitions ~w to replica state because they are"
578                  " currently marked as active",
579                  [?set_name(State), ?type(State), ?category(State),
580                   ?group_id(State),
581                   couch_set_view_util:decode_bitmask(Common1)]),
582        BitMask bxor Common1
583    end,
584    BitMask3 = case BitMask2 band ?set_pbitmask(Group) of
585    0 ->
586        BitMask2;
587    Common2 ->
588        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
589                  " set partitions  ~w to replica state because they are"
590                  " currently marked as passive",
591                  [?set_name(State), ?type(State), ?category(State),
592                   ?group_id(State),
593                   couch_set_view_util:decode_bitmask(Common2)]),
594        BitMask2 bxor Common2
595    end,
596    Parts = couch_set_view_util:decode_bitmask(BitMask3),
597    ok = set_state(ReplicaPid, [], Parts, []),
598    NewReplicaParts = ordsets:union(ReplicaParts, Parts),
599    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
600              " defined new replica partitions: ~w~n"
601              "New full set of replica partitions is: ~w~n",
602              [?set_name(State), ?type(State), ?category(State),
603               ?group_id(State), Parts, NewReplicaParts]),
604    State2 = State#state{
605        replica_partitions = NewReplicaParts
606    },
607    {reply, ok, State2, ?GET_TIMEOUT(State2)};
608
609handle_call({remove_replicas, Partitions}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
610    #state{
611        replica_partitions = ReplicaParts,
612        group = Group
613    } = State,
614    case ordsets:intersection(?set_replicas_on_transfer(Group), Partitions) of
615    [] ->
616        ok = set_state(ReplicaPid, [], [], Partitions),
617        NewState = State#state{
618            replica_partitions = ordsets:subtract(ReplicaParts, Partitions)
619        };
620    Common ->
621        UpdaterWasRunning = is_pid(State#state.updater_pid),
622        State2 = stop_cleaner(State),
623        #state{group = Group3} = State3 = stop_updater(State2),
624        {ok, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs, NewVersions} =
625            set_cleanup_partitions(
626                Common,
627                ?set_abitmask(Group3),
628                ?set_pbitmask(Group3),
629                ?set_cbitmask(Group3),
630                ?set_seqs(Group3),
631                ?set_partition_versions(Group)),
632        case NewCbitmask =/= ?set_cbitmask(Group3) of
633        true ->
634             State4 = stop_compactor(State3);
635        false ->
636             State4 = State3
637        end,
638        ReplicaPartitions2 = ordsets:subtract(ReplicaParts, Common),
639        ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group3), Common),
640        State5 = update_header(
641            State4,
642            NewAbitmask,
643            NewPbitmask,
644            NewCbitmask,
645            NewSeqs,
646            ?set_unindexable_seqs(State4#state.group),
647            ReplicasOnTransfer2,
648            ReplicaPartitions2,
649            ?set_pending_transition(State4#state.group),
650            NewVersions),
651        ok = set_state(ReplicaPid, [], [], Partitions),
652        case UpdaterWasRunning of
653        true ->
654            State6 = start_updater(State5);
655        false ->
656            State6 = State5
657        end,
658        NewState = maybe_start_cleaner(State6)
659    end,
660    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, marked the following"
661              " replica partitions for removal: ~w",
662              [?set_name(State), ?type(State), ?category(State),
663               ?group_id(State), Partitions]),
664    {reply, ok, NewState, ?GET_TIMEOUT(NewState)};
665
666handle_call({mark_as_unindexable, Partitions}, _From, State) ->
667    try
668        State2 = process_mark_as_unindexable(State, Partitions),
669        {reply, ok, State2, ?GET_TIMEOUT(State2)}
670    catch
671    throw:Error ->
672        {reply, Error, State, ?GET_TIMEOUT(State)}
673    end;
674
675handle_call({mark_as_indexable, Partitions}, _From, State) ->
676    try
677        State2 = process_mark_as_indexable(State, Partitions),
678        {reply, ok, State2, ?GET_TIMEOUT(State2)}
679    catch
680    throw:Error ->
681        {reply, Error, State, ?GET_TIMEOUT(State)}
682    end;
683
684handle_call(increment_stat, _From, State) ->
685    ?inc_accesses(State#state.group),
686    {reply, ok, State, ?GET_TIMEOUT(State)};
687
688handle_call(#set_view_group_req{} = Req, From, State) ->
689    #state{
690        group = Group,
691        pending_transition_waiters = Waiters
692    } = State,
693    State2 = case is_any_partition_pending(Req, Group) of
694    false ->
695        process_view_group_request(Req, From, State);
696    true ->
697        State#state{pending_transition_waiters = [{From, Req} | Waiters]}
698    end,
699    inc_view_group_access_stats(Req, State2#state.group),
700    {noreply, State2, ?GET_TIMEOUT(State2)};
701
702handle_call(request_group, _From, #state{group = Group} = State) ->
703    % Meant to be called only by this module and the compactor module.
704    % Callers aren't supposed to read from the group's fd, we don't
705    % increment here the ref counter on behalf of the caller.
706    {reply, {ok, Group}, State, ?GET_TIMEOUT(State)};
707
708handle_call(replica_pid, _From, #state{replica_group = Pid} = State) ->
709    % To be used only by unit tests.
710    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
711
712handle_call({start_updater, Options}, _From, State) ->
713    % To be used only by unit tests.
714    State2 = start_updater(State, Options),
715    {reply, {ok, State2#state.updater_pid}, State2, ?GET_TIMEOUT(State2)};
716
717handle_call(start_cleaner, _From, State) ->
718    % To be used only by unit tests.
719    State2 = maybe_start_cleaner(State#state{auto_cleanup = true}),
720    State3 = State2#state{auto_cleanup = State#state.auto_cleanup},
721    {reply, {ok, State2#state.cleaner_pid}, State3, ?GET_TIMEOUT(State3)};
722
723handle_call(updater_pid, _From, #state{updater_pid = Pid} = State) ->
724    % To be used only by unit tests.
725    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
726
727handle_call(cleaner_pid, _From, #state{cleaner_pid = Pid} = State) ->
728    % To be used only by unit tests.
729    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
730
731handle_call({test_rollback, RollbackSeqs}, _From, State0) ->
732    % To be used only by unit tests.
733    Response = case rollback(State0, RollbackSeqs) of
734    {ok, State} ->
735        ok;
736    {error, {cannot_rollback, State}} ->
737        cannot_rollback
738    end,
739    {reply, Response, State, ?GET_TIMEOUT(State)};
740
741handle_call(request_group_info, _From, State) ->
742    GroupInfo = get_group_info(State),
743    {reply, {ok, GroupInfo}, State, ?GET_TIMEOUT(State)};
744
745handle_call(get_data_size, _From, State) ->
746    DataSizeInfo = get_data_size_info(State),
747    {reply, {ok, DataSizeInfo}, State, ?GET_TIMEOUT(State)};
748
749handle_call({start_compact, _CompactFun}, _From,
750            #state{updater_pid = UpPid, initial_build = true} = State) when is_pid(UpPid) ->
751    {reply, {error, initial_build}, State, ?GET_TIMEOUT(State)};
752handle_call({start_compact, CompactFun}, _From, #state{compactor_pid = nil} = State) ->
753    #state{compactor_pid = Pid} = State2 = start_compactor(State, CompactFun),
754    {reply, {ok, Pid}, State2, ?GET_TIMEOUT(State2)};
755handle_call({start_compact, _}, _From, State) ->
756    %% compact already running, this is a no-op
757    {reply, {ok, State#state.compactor_pid}, State};
758
759handle_call({compact_done, Result}, {Pid, _}, #state{compactor_pid = Pid} = State) ->
760    #state{
761        update_listeners = Listeners,
762        group = Group,
763        updater_pid = UpdaterPid,
764        compactor_pid = CompactorPid
765    } = State,
766    #set_view_group{
767        fd = OldFd,
768        ref_counter = RefCounter,
769        filepath = OldFilepath
770    } = Group,
771    #set_view_compactor_result{
772        group = NewGroup0,
773        compact_time = Duration,
774        cleanup_kv_count = CleanupKVCount
775    } = Result,
776
777    MissingChangesCount = couch_set_view_util:missing_changes_count(
778        ?set_seqs(Group), ?set_seqs(NewGroup0)),
779    case MissingChangesCount == 0 of
780    true ->
781        % Compactor might have received a group snapshot from an updater.
782        NewGroup = fix_updater_group(NewGroup0, Group),
783        HeaderBin = couch_set_view_util:group_to_header_bin(NewGroup),
784        {ok, NewHeaderPos} = couch_file:write_header_bin(
785            NewGroup#set_view_group.fd, HeaderBin),
786        if is_pid(UpdaterPid) ->
787            ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compact group"
788                      " up to date - restarting updater",
789                      [?set_name(State), ?type(State), ?category(State),
790                       ?group_id(State)]),
791            % Decided to switch to compacted file
792            % Compactor has caught up and hence discard the running updater
793            couch_set_view_util:shutdown_wait(UpdaterPid);
794        true ->
795            ok
796        end,
797        NewFilepath = increment_filepath(Group),
798        NewRefCounter = new_fd_ref_counter(NewGroup#set_view_group.fd),
799        case ?set_replicas_on_transfer(Group) /= ?set_replicas_on_transfer(NewGroup) of
800        true ->
801            % Set of replicas on transfer changed while compaction was running.
802            % Just write a new header with the new set of replicas on transfer and all the
803            % metadata that is updated when that set changes (active and passive bitmasks).
804            % This happens only during (or after, for a short period) a cluster rebalance or
805            % failover. This shouldn't take too long, as we are writing and fsync'ing only
806            % one header, all data was already fsync'ed by the compactor process.
807            NewGroup2 = NewGroup#set_view_group{
808                ref_counter = NewRefCounter,
809                filepath = NewFilepath,
810                index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
811                    replicas_on_transfer = ?set_replicas_on_transfer(Group),
812                    abitmask = ?set_abitmask(Group),
813                    pbitmask = ?set_pbitmask(Group)
814                }
815            },
816            {ok, NewHeaderPos2} = commit_header(NewGroup2);
817        false ->
818            % The compactor process committed an header with up to date state information and
819            % did an fsync before calling us. No need to commit a new header here (and fsync).
820            NewGroup2 = NewGroup#set_view_group{
821                ref_counter = NewRefCounter,
822                filepath = NewFilepath
823	    },
824	    NewHeaderPos2 = NewHeaderPos
825        end,
826        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compaction complete"
827                  " in ~.3f seconds, filtered ~p key-value pairs",
828                  [?set_name(State), ?type(State), ?category(State),
829                   ?group_id(State), Duration, CleanupKVCount]),
830        inc_util_stat(#util_stats.compaction_time, Duration),
831        ok = couch_file:only_snapshot_reads(OldFd),
832        %% After rename call we're sure the header was written to the file
833        %% (no need for couch_file:flush/1 call).
834        ok = couch_file:rename(NewGroup#set_view_group.fd, NewFilepath),
835        ok = couch_file:delete(?root_dir(State), OldFilepath),
836
837        %% cleanup old group
838        unlink(CompactorPid),
839        couch_ref_counter:drop(RefCounter),
840
841        NewUpdaterPid =
842        if is_pid(UpdaterPid) ->
843            CurSeqs = indexable_partition_seqs(State),
844            spawn_link(couch_set_view_updater,
845                       update,
846                       [self(), NewGroup2, CurSeqs, false, updater_tmp_dir(State), []]);
847        true ->
848            nil
849        end,
850
851        Listeners2 = notify_update_listeners(State, Listeners, NewGroup2),
852        State2 = State#state{
853            update_listeners = Listeners2,
854            compactor_pid = nil,
855            compactor_file = nil,
856            compactor_fun = nil,
857            compact_log_files = nil,
858            compactor_retry_number = 0,
859            updater_pid = NewUpdaterPid,
860            initial_build = is_pid(NewUpdaterPid) andalso
861                    couch_set_view_util:is_initial_build(NewGroup2),
862            updater_state = case is_pid(NewUpdaterPid) of
863                true -> starting;
864                false -> not_running
865            end,
866            group = NewGroup2#set_view_group{
867                header_pos = NewHeaderPos2
868            }
869        },
870        inc_compactions(Result),
871        {reply, ok, maybe_apply_pending_transition(State2), ?GET_TIMEOUT(State2)};
872    false ->
873        State2 = State#state{
874            compactor_retry_number = State#state.compactor_retry_number + 1
875        },
876        {reply, {update, MissingChangesCount}, State2, ?GET_TIMEOUT(State2)}
877    end;
878handle_call({compact_done, _Result}, _From, State) ->
879    % From a previous compactor that was killed/stopped, ignore.
880    {noreply, State, ?GET_TIMEOUT(State)};
881
882handle_call(cancel_compact, _From, #state{compactor_pid = nil} = State) ->
883    {reply, ok, State, ?GET_TIMEOUT(State)};
884handle_call(cancel_compact, _From, #state{compactor_pid = Pid} = State) ->
885    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
886              " canceling compaction (pid ~p)",
887              [?set_name(State), ?type(State), ?category(State),
888               ?group_id(State), Pid]),
889    State2 = stop_compactor(State),
890    State3 = maybe_start_cleaner(State2),
891    {reply, ok, State3, ?GET_TIMEOUT(State3)};
892
893handle_call({monitor_partition_update, PartId, _Ref, _Pid}, _From, State)
894        when PartId >= ?set_num_partitions(State#state.group) ->
895    Msg = io_lib:format("Invalid partition: ~p", [PartId]),
896    {reply, {error, iolist_to_binary(Msg)}, State, ?GET_TIMEOUT(State)};
897
898handle_call({monitor_partition_update, PartId, Ref, Pid}, _From, State) ->
899    try
900        State2 = process_monitor_partition_update(State, PartId, Ref, Pid),
901        {reply, ok, State2, ?GET_TIMEOUT(State2)}
902    catch
903    throw:Error ->
904        {reply, Error, State, ?GET_TIMEOUT(State)}
905    end;
906
907handle_call({demonitor_partition_update, Ref}, _From, State) ->
908    #state{update_listeners = Listeners} = State,
909    case dict:find(Ref, Listeners) of
910    error ->
911        {reply, ok, State, ?GET_TIMEOUT(State)};
912    {ok, #up_listener{monref = MonRef, partition = PartId}} ->
913        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, removing partition ~p"
914                   "update monitor, reference ~p",
915                   [?set_name(State), ?type(State), ?category(State),
916                    ?group_id(State), PartId, Ref]),
917        erlang:demonitor(MonRef, [flush]),
918        State2 = State#state{update_listeners = dict:erase(Ref, Listeners)},
919        {reply, ok, State2, ?GET_TIMEOUT(State2)}
920    end;
921
922handle_call(compact_log_files, _From, State) ->
923    {Files0, Seqs, PartVersions} = State#state.compact_log_files,
924    Files = lists:map(fun lists:reverse/1, Files0),
925    NewState = State#state{compact_log_files = nil},
926    {reply, {ok, {Files, Seqs, PartVersions}}, NewState, ?GET_TIMEOUT(NewState)};
927
928handle_call(reset_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
929    reset_util_stats(),
930    case is_pid(RepPid) of
931    true ->
932        ok = gen_server:call(RepPid, reset_utilization_stats, infinity);
933    false ->
934        ok
935    end,
936    {reply, ok, State, ?GET_TIMEOUT(State)};
937
938handle_call(get_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
939    Stats = erlang:get(util_stats),
940    UsefulIndexing = Stats#util_stats.useful_indexing_time,
941    WastedIndexing = Stats#util_stats.wasted_indexing_time,
942    StatNames = record_info(fields, util_stats),
943    StatPoses = lists:seq(2, record_info(size, util_stats)),
944    StatsList0 = lists:foldr(
945        fun({StatName, StatPos}, Acc) ->
946            Val = element(StatPos, Stats),
947            [{StatName, Val} | Acc]
948        end,
949        [], lists:zip(StatNames, StatPoses)),
950    StatsList1 = [{total_indexing_time, UsefulIndexing + WastedIndexing} | StatsList0],
951    case is_pid(RepPid) of
952    true ->
953        {ok, RepStats} = gen_server:call(RepPid, get_utilization_stats, infinity),
954        StatsList = StatsList1 ++ [{replica_utilization_stats, {RepStats}}];
955    false ->
956        StatsList = StatsList1
957    end,
958    {reply, {ok, StatsList}, State, ?GET_TIMEOUT(State)};
959
960handle_call(before_master_delete, _From, State) ->
961    Error = {error, {db_deleted, ?master_dbname((?set_name(State)))}},
962    State2 = reply_all(State, Error),
963    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, going to shutdown because "
964              "master database is being deleted",
965              [?set_name(State), ?type(State), ?category(State),
966               ?group_id(State)]),
967    {stop, shutdown, ok, State2}.
968
969
970handle_cast(_Msg, State) when not ?is_defined(State) ->
971    {noreply, State};
972
973handle_cast({compact_log_files, Files, Seqs, PartVersions, _Init},
974                                #state{compact_log_files = nil} = State) ->
975    LogList = lists:map(fun(F) -> [F] end, Files),
976    {noreply, State#state{compact_log_files = {LogList, Seqs, PartVersions}},
977        ?GET_TIMEOUT(State)};
978
979handle_cast({compact_log_files, Files, NewSeqs, NewPartVersions, Init}, State) ->
980    LogList = case Init of
981    true ->
982        lists:map(fun(F) -> [F] end, Files);
983    false ->
984        {OldL, _OldSeqs, _OldPartVersions} = State#state.compact_log_files,
985        lists:zipwith(
986            fun(F, Current) -> [F | Current] end,
987            Files, OldL)
988    end,
989    {noreply, State#state{compact_log_files = {LogList, NewSeqs, NewPartVersions}},
990        ?GET_TIMEOUT(State)};
991
992handle_cast({ddoc_updated, NewSig, Aliases}, State) ->
993    #state{
994        waiting_list = Waiters,
995        group = #set_view_group{sig = CurSig}
996    } = State,
997    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
998              " design document was updated~n"
999              "  new signature:   ~s~n"
1000              "  current aliases: ~p~n"
1001              "  shutdown flag:   ~s~n"
1002              "  waiting clients: ~p~n",
1003              [?set_name(State), ?type(State), ?category(State),
1004               ?group_id(State), hex_sig(CurSig), hex_sig(NewSig), Aliases,
1005               State#state.shutdown, length(Waiters)]),
1006    case NewSig of
1007    CurSig ->
1008        State2 = State#state{shutdown = false, shutdown_aliases = undefined},
1009        {noreply, State2, ?GET_TIMEOUT(State2)};
1010    <<>> ->
1011        remove_mapreduce_context_store(State#state.group),
1012        State2 = State#state{shutdown = true, shutdown_aliases = Aliases},
1013        Error = {error, <<"Design document was deleted">>},
1014        {stop, normal, reply_all(State2, Error)};
1015    _ ->
1016        remove_mapreduce_context_store(State#state.group),
1017        State2 = State#state{shutdown = true, shutdown_aliases = Aliases},
1018        case Waiters of
1019        [] ->
1020            {stop, normal, State2};
1021        _ ->
1022            {noreply, State2}
1023        end
1024    end;
1025
1026handle_cast({update, MinNumChanges}, #state{group = Group} = State) ->
1027    case is_pid(State#state.updater_pid) of
1028    true ->
1029        {noreply, State};
1030    false ->
1031        CurSeqs = indexable_partition_seqs(State),
1032        MissingCount = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
1033        case (MissingCount >= MinNumChanges) andalso (MissingCount > 0) of
1034        true ->
1035            {noreply, do_start_updater(State, CurSeqs, [])};
1036        false ->
1037            {noreply, State}
1038        end
1039    end;
1040
1041handle_cast({update_replica, _MinNumChanges}, #state{replica_group = nil} = State) ->
1042    {noreply, State};
1043
1044handle_cast({update_replica, MinNumChanges}, #state{replica_group = Pid} = State) ->
1045    ok = gen_server:cast(Pid, {update, MinNumChanges}),
1046    {noreply, State}.
1047
1048
1049handle_info(timeout, State) when not ?is_defined(State) ->
1050    {noreply, State};
1051
1052handle_info(timeout, #state{group = Group} = State) ->
1053    TransferReplicas = (?set_replicas_on_transfer(Group) /= []) andalso
1054        State#state.auto_transfer_replicas,
1055    case TransferReplicas orelse (dict:size(State#state.update_listeners) > 0) of
1056    true ->
1057        {noreply, start_updater(State)};
1058    false ->
1059        {noreply, maybe_start_cleaner(State)}
1060    end;
1061
1062handle_info({partial_update, Pid, AckTo, NewGroup}, #state{updater_pid = Pid} = State) ->
1063    case ?have_pending_transition(State) andalso
1064        (?set_cbitmask(NewGroup) =:= 0) andalso
1065        (?set_cbitmask(State#state.group) =/= 0) andalso
1066        (State#state.waiting_list =:= []) of
1067    true ->
1068        State2 = process_partial_update(State, NewGroup),
1069        AckTo ! update_processed,
1070        State3 = stop_updater(State2),
1071        NewState = maybe_apply_pending_transition(State3);
1072    false ->
1073        NewState = process_partial_update(State, NewGroup),
1074        AckTo ! update_processed
1075    end,
1076    {noreply, NewState};
1077handle_info({partial_update, _, AckTo, _}, State) ->
1078    %% message from an old (probably pre-compaction) updater; ignore
1079    AckTo ! update_processed,
1080    {noreply, State, ?GET_TIMEOUT(State)};
1081
1082handle_info({updater_info, Pid, {state, UpdaterState}}, #state{updater_pid = Pid} = State) ->
1083    #state{
1084        group = Group,
1085        waiting_list = WaitList,
1086        replica_partitions = RepParts
1087    } = State,
1088    State2 = State#state{updater_state = UpdaterState},
1089    case UpdaterState of
1090    updating_passive ->
1091        WaitList2 = reply_with_group(Group, RepParts, WaitList),
1092        State3 = State2#state{waiting_list = WaitList2},
1093        case State#state.shutdown of
1094        true ->
1095            State4 = stop_updater(State3),
1096            {stop, normal, State4};
1097        false ->
1098            State4 = maybe_apply_pending_transition(State3),
1099            {noreply, State4}
1100        end;
1101    _ ->
1102        {noreply, State2}
1103    end;
1104
1105handle_info({updater_info, _Pid, {state, _UpdaterState}}, State) ->
1106    % Message from an old updater, ignore.
1107    {noreply, State, ?GET_TIMEOUT(State)};
1108
1109handle_info({'EXIT', Pid, {clean_group, CleanGroup, Count, Time}}, #state{cleaner_pid = Pid} = State) ->
1110    #state{group = OldGroup} = State,
1111    case CleanGroup#set_view_group.mod of
1112    % The mapreduce view cleaner is a native C based one that writes the
1113    % information to disk.
1114    mapreduce_view ->
1115        {ok, NewGroup0} =
1116             couch_set_view_util:refresh_viewgroup_header(CleanGroup);
1117    spatial_view ->
1118        NewGroup0 = CleanGroup
1119    end,
1120    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
1121    ?LOG_INFO("Cleanup finished for set view `~s`, ~s (~s) group `~s`~n"
1122              "Removed ~p values from the index in ~.3f seconds~n"
1123              "active partitions before:  ~w~n"
1124              "active partitions after:   ~w~n"
1125              "passive partitions before: ~w~n"
1126              "passive partitions after:  ~w~n"
1127              "cleanup partitions before: ~w~n"
1128              "cleanup partitions after:  ~w~n" ++
1129          case is_pid(State#state.replica_group) of
1130          true ->
1131              "Current set of replica partitions: ~w~n"
1132              "Current set of replicas on transfer: ~w~n";
1133          false ->
1134              []
1135          end,
1136          [?set_name(State), ?type(State), ?category(State), ?group_id(State),
1137           Count, Time,
1138           couch_set_view_util:decode_bitmask(?set_abitmask(OldGroup)),
1139           couch_set_view_util:decode_bitmask(?set_abitmask(NewGroup)),
1140           couch_set_view_util:decode_bitmask(?set_pbitmask(OldGroup)),
1141           couch_set_view_util:decode_bitmask(?set_pbitmask(NewGroup)),
1142           couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup)),
1143           couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup))] ++
1144              case is_pid(State#state.replica_group) of
1145              true ->
1146                  [State#state.replica_partitions, ?set_replicas_on_transfer(NewGroup)];
1147              false ->
1148                  []
1149              end),
1150    State2 = State#state{
1151        cleaner_pid = nil,
1152        group = NewGroup
1153    },
1154    inc_cleanups(State2#state.group, Time, Count, false),
1155    {noreply, maybe_apply_pending_transition(State2)};
1156
1157handle_info({'EXIT', Pid, Reason}, #state{cleaner_pid = Pid, group = Group} = State) ->
1158    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1159    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`, cleanup process ~p"
1160               " died with unexpected reason: ~p",
1161               [?set_name(State), ?type(State), ?category(State),
1162                ?group_id(State), Pid, Reason]),
1163    {noreply, State#state{cleaner_pid = nil}, ?GET_TIMEOUT(State)};
1164
1165handle_info({'EXIT', Pid, {updater_finished, Result}}, #state{updater_pid = Pid} = State) ->
1166    #set_view_updater_result{
1167        stats = Stats,
1168        group = #set_view_group{filepath = Path} = NewGroup0,
1169        tmp_file = BuildFile
1170    } = Result,
1171    #set_view_updater_stats{
1172        indexing_time = IndexingTime,
1173        blocked_time = BlockedTime,
1174        inserted_ids = InsertedIds,
1175        deleted_ids = DeletedIds,
1176        inserted_kvs = InsertedKVs,
1177        deleted_kvs = DeletedKVs,
1178        cleanup_kv_count = CleanupKVCount,
1179        seqs = SeqsDone
1180    } = Stats,
1181    case State#state.initial_build of
1182    true ->
1183        NewRefCounter = new_fd_ref_counter(BuildFile),
1184        ok = couch_file:only_snapshot_reads(NewGroup0#set_view_group.fd),
1185        ok = couch_file:delete(?root_dir(State), Path),
1186        ok = couch_file:rename(BuildFile, Path),
1187        couch_ref_counter:drop(NewGroup0#set_view_group.ref_counter),
1188        NewGroup = NewGroup0#set_view_group{
1189            ref_counter = NewRefCounter,
1190            fd = BuildFile
1191        };
1192    false ->
1193        ok = couch_file:refresh_eof(NewGroup0#set_view_group.fd),
1194        NewGroup = NewGroup0
1195    end,
1196    State2 = process_partial_update(State, NewGroup),
1197    #state{
1198        waiting_list = WaitList,
1199        replica_partitions = ReplicaParts,
1200        shutdown = Shutdown,
1201        group = NewGroup2,
1202        update_listeners = UpdateListeners2,
1203        initial_build = InitialBuild
1204    } = State2,
1205    inc_updates(NewGroup2, Result, false, false),
1206    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, updater finished~n"
1207              "Indexing time: ~.3f seconds~n"
1208              "Blocked time:  ~.3f seconds~n"
1209              "Inserted IDs:  ~p~n"
1210              "Deleted IDs:   ~p~n"
1211              "Inserted KVs:  ~p~n"
1212              "Deleted KVs:   ~p~n"
1213              "Cleaned KVs:   ~p~n"
1214              "# seqs done:   ~p~n",
1215              [?set_name(State), ?type(State), ?category(State),
1216               ?group_id(State), IndexingTime, BlockedTime, InsertedIds,
1217               DeletedIds, InsertedKVs, DeletedKVs, CleanupKVCount, SeqsDone]),
1218    UpdaterRestarted = case InitialBuild of
1219    true ->
1220        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1221                  " initial index build of on-disk items is done,"
1222                  " restart updater to index in-memory items in case"
1223                  " there are any",
1224                  [?set_name(State2), ?type(State2), ?category(State2),
1225                   ?group_id(State2)]),
1226        StoppedUpdaterState = State2#state{
1227            updater_pid = nil,
1228            initial_build = false,
1229            updater_state = not_running
1230        },
1231        State3 = start_updater(StoppedUpdaterState),
1232        WaitList2 = State3#state.waiting_list,
1233        is_pid(State3#state.updater_pid);
1234    false ->
1235        WaitList2 = reply_with_group(NewGroup2, ReplicaParts, WaitList),
1236        State3 = State2,
1237        false
1238    end,
1239    case UpdaterRestarted of
1240    true ->
1241        {noreply, State3, ?GET_TIMEOUT(State3)};
1242    false ->
1243        case Shutdown andalso (WaitList2 == []) of
1244        true ->
1245            {stop, normal, State3#state{waiting_list = []}};
1246        false ->
1247            State4 = State3#state{
1248                updater_pid = nil,
1249                initial_build = false,
1250                updater_state = not_running,
1251                waiting_list = WaitList2
1252            },
1253            State5 = maybe_apply_pending_transition(State4),
1254            State6 = case (WaitList2 /= []) orelse (dict:size(UpdateListeners2) > 0) of
1255            true ->
1256                start_updater(State5);
1257            false ->
1258                State5
1259            end,
1260            State7 = maybe_start_cleaner(State6),
1261            {noreply, State7, ?GET_TIMEOUT(State7)}
1262        end
1263    end;
1264
1265handle_info({'EXIT', Pid, {updater_error, purge}}, #state{updater_pid = Pid} = State) ->
1266    State2 = reset_group_from_state(State),
1267    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because updater"
1268              " detected missed document deletes (purge)",
1269              [?set_name(State), ?type(State),
1270               ?category(State), ?group_id(State)]),
1271    State3 = start_updater(State2),
1272    {noreply, State3, ?GET_TIMEOUT(State3)};
1273
1274handle_info({'EXIT', Pid, {updater_error, {rollback, RollbackSeqs}}},
1275        #state{updater_pid = Pid} = State0) ->
1276    Rollback = rollback(State0, RollbackSeqs),
1277    State2 = case Rollback of
1278    {ok, State} ->
1279        ?LOG_INFO(
1280            "Set view `~s`, ~s (~s) group `~s`, group update because group"
1281            " needed to be rolled back",
1282            [?set_name(State), ?type(State),
1283             ?category(State), ?group_id(State)]),
1284        State#state{
1285            updater_pid = nil,
1286            initial_build = false,
1287            updater_state = not_running
1288        };
1289    {error, {cannot_rollback, State}} ->
1290        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because "
1291            "a rollback wasn't possible",
1292            [?set_name(State), ?type(State),
1293             ?category(State), ?group_id(State)]),
1294        reset_group_from_state(State)
1295    end,
1296    State3 = start_updater(State2),
1297    {noreply, State3, ?GET_TIMEOUT(State3)};
1298
1299handle_info({'EXIT', Pid, {updater_error, Error}}, #state{updater_pid = Pid, group = Group} = State) ->
1300    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1301    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1302               " received error from updater: ~p",
1303               [?set_name(State), ?type(State), ?category(State),
1304                ?group_id(State), Error]),
1305    Listeners2 = error_notify_update_listeners(
1306        State, State#state.update_listeners, {updater_error, Error}),
1307    State2 = State#state{
1308        updater_pid = nil,
1309        initial_build = false,
1310        updater_state = not_running,
1311        update_listeners = Listeners2
1312    },
1313    ?inc_updater_errors(State2#state.group),
1314    case State#state.shutdown of
1315    true ->
1316        {stop, normal, reply_all(State2, {error, Error})};
1317    false ->
1318        Error2 = case Error of
1319        {_, 86, Msg} ->
1320            {error, <<"Reducer: ", Msg/binary>>};
1321        {_, 87, _} ->
1322            {error, <<"reduction too large">>};
1323        {_, _Reason} ->
1324            Error;
1325        _ ->
1326            {error, Error}
1327        end,
1328        State3 = reply_all(State2, Error2),
1329        {noreply, maybe_start_cleaner(State3), ?GET_TIMEOUT(State3)}
1330    end;
1331
1332handle_info({'EXIT', _Pid, {updater_error, _Error}}, State) ->
1333    % from old, shutdown updater, ignore
1334    {noreply, State, ?GET_TIMEOUT(State)};
1335
1336handle_info({'EXIT', UpPid, reset}, #state{updater_pid = UpPid} = State) ->
1337    % TODO: once purge support is properly added, this needs to take into
1338    % account the replica index.
1339    State2 = stop_cleaner(State),
1340    case prepare_group(State#state.init_args, true) of
1341    {ok, ResetGroup} ->
1342        {ok, start_updater(State2#state{group = ResetGroup})};
1343    Error ->
1344        {stop, normal, reply_all(State2, Error)}
1345    end;
1346
1347handle_info({'EXIT', Pid, normal}, State) ->
1348    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1349              " linked PID ~p stopped normally",
1350              [?set_name(State), ?type(State), ?category(State),
1351               ?group_id(State), Pid]),
1352    {noreply, State, ?GET_TIMEOUT(State)};
1353
1354handle_info({'EXIT', Pid, Reason}, #state{compactor_pid = Pid} = State) ->
1355    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1356               " compactor process ~p died with unexpected reason: ~p",
1357               [?set_name(State), ?type(State), ?category(State),
1358                ?group_id(State), Pid, Reason]),
1359    couch_util:shutdown_sync(State#state.compactor_file),
1360    _ = couch_file:delete(?root_dir(State), compact_file_name(State)),
1361    State2 = State#state{
1362        compactor_pid = nil,
1363        compactor_file = nil,
1364        compact_log_files = nil
1365    },
1366    {noreply, State2, ?GET_TIMEOUT(State2)};
1367
1368handle_info({'EXIT', Pid, Reason},
1369        #state{group = #set_view_group{dcp_pid = Pid}} = State) ->
1370    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1371               " DCP process ~p died with unexpected reason: ~p",
1372               [?set_name(State), ?type(State), ?category(State),
1373                ?group_id(State), Pid, Reason]),
1374    {stop, {dcp_died, Reason}, State};
1375
1376handle_info({'EXIT', Pid, Reason}, State) ->
1377    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1378               " terminating because linked PID ~p died with reason: ~p",
1379               [?set_name(State), ?type(State), ?category(State),
1380                ?group_id(State), Pid, Reason]),
1381    {stop, Reason, State};
1382
1383handle_info({all_seqs, nil, StatsResponse}, State) ->
1384    #state{
1385       group = Group
1386    } = State,
1387    SeqsCache = erlang:get(seqs_cache),
1388    NewState = case StatsResponse of
1389    {ok, Seqs} ->
1390        Partitions = group_partitions(Group),
1391        Seqs2 = couch_set_view_util:filter_seqs(Partitions, Seqs),
1392        NewCacheVal = #seqs_cache{
1393            timestamp = os:timestamp(),
1394            is_waiting = false,
1395            seqs = Seqs2
1396        },
1397        State2 = case is_pid(State#state.updater_pid) of
1398        true ->
1399            State;
1400        false ->
1401            CurSeqs = indexable_partition_seqs(State, Seqs2),
1402            case CurSeqs > ?set_seqs(State#state.group) of
1403            true ->
1404                do_start_updater(State, CurSeqs, []);
1405            false ->
1406                State
1407            end
1408        end,
1409        % If a rollback happened in between async seqs update request and
1410        % its response, is_waiting flag will be resetted. In that case, we should
1411        % just discard the update seqs.
1412        case SeqsCache#seqs_cache.is_waiting of
1413        true ->
1414            erlang:put(seqs_cache, NewCacheVal);
1415        false ->
1416            ok
1417        end,
1418        State2;
1419    _ ->
1420        ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1421                              " received bad response for update seqs"
1422                              " request ~p",
1423                              [?set_name(State), ?type(State),
1424                               ?category(State), ?group_id(State),
1425                               StatsResponse]),
1426        SeqsCache2 = SeqsCache#seqs_cache{is_waiting = false},
1427        erlang:put(seqs_cache, SeqsCache2),
1428        State
1429    end,
1430    {noreply, NewState};
1431
1432handle_info({group_fd, _Fd}, State) ->
1433    % If no error occured, we don't care about the fd message
1434    {noreply, State, ?GET_TIMEOUT(State)}.
1435
1436
1437terminate(Reason, #state{group = #set_view_group{sig = Sig} = Group} = State) ->
1438    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s`,"
1439              " terminating with reason: ~p",
1440              [?set_name(State), ?type(State), ?category(State),
1441               ?group_id(State), hex_sig(Sig), Reason]),
1442    Listeners2 = error_notify_update_listeners(
1443        State, State#state.update_listeners, {shutdown, Reason}),
1444    State2 = reply_all(State#state{update_listeners = Listeners2}, Reason),
1445    State3 = notify_pending_transition_waiters(State2, {shutdown, Reason}),
1446    couch_set_view_util:shutdown_wait(State3#state.cleaner_pid),
1447    couch_set_view_util:shutdown_wait(State3#state.updater_pid),
1448    couch_util:shutdown_sync(State3#state.compactor_pid),
1449    couch_util:shutdown_sync(State3#state.compactor_file),
1450    couch_util:shutdown_sync(State3#state.replica_group),
1451    Group = State#state.group,
1452    true = ets:delete(Group#set_view_group.stats_ets,
1453        ?set_view_group_stats_key(Group)),
1454    catch couch_file:only_snapshot_reads((State3#state.group)#set_view_group.fd),
1455    case State#state.shutdown of
1456    true when ?type(State) == main ->
1457        % Important to delete files here. A quick succession of ddoc updates, updating
1458        % the ddoc back to a previous version (while we're here in terminate), may lead
1459        % us to a case where it should start clean but it doesn't, because
1460        % couch_set_view:cleanup_index_files/1 was not invoked right before the last
1461        % reverting ddoc update, or it was invoked but did not finish before the view
1462        % group got spawned again. Problem here is during rebalance, where one of the
1463        % databases might have been deleted after the last ddoc update and while or after
1464        % we're here in terminate, so we must ensure we don't leave old index files around.
1465        % MB-6415 and MB-6517
1466        case State#state.shutdown_aliases of
1467        [] ->
1468            delete_index_file(?root_dir(State), Group, main),
1469            delete_index_file(?root_dir(State), Group, replica);
1470        _ ->
1471            ok
1472        end;
1473    _ ->
1474        ok
1475    end,
1476    TmpDir = updater_tmp_dir(State),
1477    ok = couch_set_view_util:delete_sort_files(TmpDir, all),
1478    _ = file:del_dir(TmpDir),
1479    ok.
1480
1481
1482code_change(_OldVsn, State, _Extra) ->
1483    {ok, State}.
1484
1485
1486-spec reply_with_group(#set_view_group{},
1487                       ordsets:ordset(partition_id()),
1488                       [#waiter{}]) -> [#waiter{}].
1489reply_with_group(_Group, _ReplicaPartitions, []) ->
1490    [];
1491reply_with_group(Group, ReplicaPartitions, WaitList) ->
1492    ActiveReplicasBitmask = couch_set_view_util:build_bitmask(
1493        ?set_replicas_on_transfer(Group)),
1494    ActiveIndexable = [{P, S} || {P, S} <- ?set_seqs(Group),
1495                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1496    ActiveUnindexable = [{P, S} || {P, S} <- ?set_unindexable_seqs(Group),
1497                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1498    GroupSeqs = ordsets:union(ActiveIndexable, ActiveUnindexable),
1499    WaitList2 = lists:foldr(
1500        fun(#waiter{debug = false} = Waiter, Acc) ->
1501            case maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) of
1502            true ->
1503                Acc;
1504            false ->
1505                [Waiter | Acc]
1506            end;
1507        (#waiter{debug = true} = Waiter, Acc) ->
1508            [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1509                ?set_view_group_stats_key(Group)),
1510            DebugGroup = Group#set_view_group{
1511                debug_info = #set_view_debug_info{
1512                    stats = Stats,
1513                    original_abitmask = ?set_abitmask(Group),
1514                    original_pbitmask = ?set_pbitmask(Group),
1515                    replica_partitions = ReplicaPartitions,
1516                    wanted_seqs = Waiter#waiter.seqs
1517                }
1518            },
1519            case maybe_reply_with_group(Waiter, DebugGroup, GroupSeqs, ActiveReplicasBitmask) of
1520            true ->
1521                Acc;
1522            false ->
1523                [Waiter | Acc]
1524            end
1525        end,
1526        [], WaitList),
1527    WaitList2.
1528
1529
1530-spec maybe_reply_with_group(#waiter{}, #set_view_group{}, partition_seqs(), bitmask()) -> boolean().
1531maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) ->
1532    #waiter{from = {Pid, _} = From, seqs = ClientSeqs} = Waiter,
1533    case (ClientSeqs == []) orelse (GroupSeqs >= ClientSeqs) of
1534    true ->
1535        couch_ref_counter:add(Group#set_view_group.ref_counter, Pid),
1536        gen_server:reply(From, {ok, Group, ActiveReplicasBitmask}),
1537        true;
1538    false ->
1539        false
1540    end.
1541
1542
1543-spec reply_all(#state{}, term()) -> #state{}.
1544reply_all(#state{waiting_list = []} = State, _Reply) ->
1545    State;
1546reply_all(#state{waiting_list = WaitList} = State, Reply) ->
1547    lists:foreach(fun(#waiter{from = From}) ->
1548        catch gen_server:reply(From, Reply)
1549    end, WaitList),
1550    State#state{waiting_list = []}.
1551
1552
1553-spec prepare_group(init_args(), boolean()) -> {'ok', #set_view_group{}} |
1554                                               {'error', atom()}.
1555prepare_group({RootDir, SetName, Group0}, ForceReset)->
1556    #set_view_group{
1557        sig = Sig,
1558        type = Type
1559    } = Group0,
1560    Filepath = find_index_file(RootDir, Group0),
1561    Group = Group0#set_view_group{filepath = Filepath},
1562    case open_index_file(Filepath) of
1563    {ok, Fd} ->
1564        try
1565            if ForceReset ->
1566                % this can happen if we missed a purge
1567                {ok, reset_file(Fd, Group)};
1568            true ->
1569                case (catch couch_file:read_header_bin(Fd)) of
1570                {ok, HeaderBin, HeaderPos} ->
1571                    HeaderSig = couch_set_view_util:header_bin_sig(HeaderBin);
1572                _ ->
1573                    HeaderPos = 0,  % keep dialyzer happy
1574                    HeaderSig = <<>>,
1575                    HeaderBin = <<>>
1576                end,
1577                case HeaderSig == Sig of
1578                true ->
1579                    HeaderInfo = couch_set_view_util:header_bin_to_term(HeaderBin),
1580                    {ok, init_group(Fd, Group, HeaderInfo, HeaderPos)};
1581                _ ->
1582                    % this happens on a new file
1583                    case (not ForceReset) andalso (Type =:= main) of
1584                    true ->
1585                        % initializing main view group
1586                        ok = delete_index_file(RootDir, Group, replica);
1587                    false ->
1588                        ok
1589                    end,
1590                    {ok, reset_file(Fd, Group)}
1591                end
1592            end
1593        catch
1594        _:Error ->
1595            % In case of any error, we need to close the file as it isn't yet
1596            % associated with the group, hence can't be cleaned up during
1597            % group cleanup.
1598            couch_file:close(Fd),
1599            Error
1600        end;
1601    {error, emfile} = Error ->
1602        ?LOG_ERROR("Can't open set view `~s`, ~s (~s) group `~s`:"
1603                   " too many files open",
1604                   [SetName, Type, Group#set_view_group.category,
1605                    Group#set_view_group.name]),
1606        Error;
1607    Error ->
1608        ok = delete_index_file(RootDir, Group, Type),
1609        case (not ForceReset) andalso (Type =:= main) of
1610        true ->
1611            % initializing main view group
1612            ok = delete_index_file(RootDir, Group, replica);
1613        false ->
1614            ok
1615        end,
1616        Error
1617    end.
1618
1619
1620-spec hex_sig(#set_view_group{} | binary()) -> string().
1621hex_sig(#set_view_group{sig = Sig}) ->
1622    hex_sig(Sig);
1623hex_sig(GroupSig) ->
1624    couch_util:to_hex(GroupSig).
1625
1626
1627-spec base_index_file_name(#set_view_group{}, set_view_group_type()) -> string().
1628base_index_file_name(Group, Type) ->
1629    atom_to_list(Type) ++ "_" ++ hex_sig(Group#set_view_group.sig) ++
1630        Group#set_view_group.extension.
1631
1632
1633-spec find_index_file(string(), #set_view_group{}) -> string().
1634find_index_file(RootDir, Group) ->
1635    #set_view_group{
1636        set_name = SetName,
1637        type = Type,
1638        category = Category
1639    } = Group,
1640    DesignRoot = couch_set_view:set_index_dir(RootDir, SetName, Category),
1641    BaseName = base_index_file_name(Group, Type),
1642    FullPath = filename:join([DesignRoot, BaseName]),
1643    case filelib:wildcard(FullPath ++ ".[0-9]*") of
1644    [] ->
1645        FullPath ++ ".1";
1646    Matching ->
1647        BaseNameSplitted = string:tokens(BaseName, "."),
1648        Matching2 = lists:filter(
1649            fun(Match) ->
1650                MatchBase = filename:basename(Match),
1651                [Suffix | Rest] = lists:reverse(string:tokens(MatchBase, ".")),
1652                (lists:reverse(Rest) =:= BaseNameSplitted) andalso
1653                    is_integer((catch list_to_integer(Suffix)))
1654            end,
1655            Matching),
1656        case Matching2 of
1657        [] ->
1658            FullPath ++ ".1";
1659        _ ->
1660            GetSuffix = fun(FileName) ->
1661                list_to_integer(lists:last(string:tokens(FileName, ".")))
1662            end,
1663            Matching3 = lists:sort(
1664                fun(A, B) -> GetSuffix(A) > GetSuffix(B) end,
1665                Matching2),
1666            hd(Matching3)
1667        end
1668    end.
1669
1670
1671-spec delete_index_file(string(), #set_view_group{}, set_view_group_type()) -> no_return().
1672delete_index_file(RootDir, Group, Type) ->
1673    #set_view_group{
1674        set_name = SetName,
1675        category = Category
1676    } = Group,
1677    SetDir = couch_set_view:set_index_dir(RootDir, SetName, Category),
1678    BaseName = filename:join([SetDir, base_index_file_name(Group, Type)]),
1679    lists:foreach(
1680        fun(F) -> ok = couch_file:delete(RootDir, F) end,
1681        filelib:wildcard(BaseName ++ ".[0-9]*")).
1682
1683
1684-spec compact_file_name(#state{} | #set_view_group{}) -> string().
1685compact_file_name(#state{group = Group}) ->
1686    compact_file_name(Group);
1687compact_file_name(#set_view_group{filepath = CurFilepath}) ->
1688    CurFilepath ++ ".compact".
1689
1690
1691-spec increment_filepath(#set_view_group{}) -> string().
1692increment_filepath(#set_view_group{filepath = CurFilepath}) ->
1693    [Suffix | Rest] = lists:reverse(string:tokens(CurFilepath, ".")),
1694    NewSuffix = integer_to_list(list_to_integer(Suffix) + 1),
1695    string:join(lists:reverse(Rest), ".") ++ "." ++ NewSuffix.
1696
1697
1698-spec open_index_file(string()) -> {'ok', pid()} | {'error', atom()}.
1699open_index_file(Filepath) ->
1700    case do_open_index_file(Filepath) of
1701    {ok, Fd} ->
1702        unlink(Fd),
1703        {ok, Fd};
1704    Error ->
1705        Error
1706    end.
1707
1708do_open_index_file(Filepath) ->
1709    case couch_file:open(Filepath) of
1710    {ok, Fd}        -> {ok, Fd};
1711    {error, enoent} -> couch_file:open(Filepath, [create]);
1712    Error           -> Error
1713    end.
1714
1715
1716open_set_group(Mod, SetName, GroupId) ->
1717    case couch_set_view_ddoc_cache:get_ddoc(SetName, GroupId) of
1718    {ok, DDoc} ->
1719        {ok, Mod:design_doc_to_set_view_group(SetName, DDoc)};
1720    {doc_open_error, Error} ->
1721        Error;
1722    {db_open_error, Error} ->
1723        Error
1724    end.
1725
1726
1727% To be used for debug/troubleshooting only (accessible via REST/HTTP API)
1728get_group_info(State) ->
1729    #state{
1730        group = Group,
1731        replica_group = ReplicaPid,
1732        updater_pid = UpdaterPid,
1733        updater_state = UpdaterState,
1734        compactor_pid = CompactorPid,
1735        waiting_list = WaitersList,
1736        cleaner_pid = CleanerPid,
1737        replica_partitions = ReplicaParts
1738    } = State,
1739    #set_view_group{
1740        fd = Fd,
1741        sig = GroupSig,
1742        id_btree = Btree,
1743        views = Views,
1744        mod = Mod
1745    } = Group,
1746    PendingTrans = get_pending_transition(State),
1747    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1748        ?set_view_group_stats_key(Group)),
1749    JsonStats = {[
1750        {full_updates, Stats#set_view_group_stats.full_updates},
1751        {partial_updates, Stats#set_view_group_stats.partial_updates},
1752        {stopped_updates, Stats#set_view_group_stats.stopped_updates},
1753        {updater_cleanups, Stats#set_view_group_stats.updater_cleanups},
1754        {compactions, Stats#set_view_group_stats.compactions},
1755        {cleanups, Stats#set_view_group_stats.cleanups},
1756        {waiting_clients, length(WaitersList)},
1757        {cleanup_interruptions, Stats#set_view_group_stats.cleanup_stops},
1758        {update_history, Stats#set_view_group_stats.update_history},
1759        {compaction_history, Stats#set_view_group_stats.compaction_history},
1760        {cleanup_history, Stats#set_view_group_stats.cleanup_history},
1761        {accesses, Stats#set_view_group_stats.accesses},
1762        {update_errors, Stats#set_view_group_stats.update_errors},
1763        {duplicated_partition_versions_seen,
1764         Stats#set_view_group_stats.dup_partitions_counter}
1765    ]},
1766    {ok, Size} = couch_file:bytes(Fd),
1767    GroupPartitions = ordsets:from_list(
1768        couch_set_view_util:decode_bitmask(?set_abitmask(Group) bor ?set_pbitmask(Group))),
1769    PartVersions = lists:map(fun({PartId, PartVersion}) ->
1770        {couch_util:to_binary(PartId), [tuple_to_list(V) || V <- PartVersion]}
1771    end, ?set_partition_versions(Group)),
1772
1773    IndexSeqs = ?set_seqs(Group),
1774    IndexPartitions = [PartId || {PartId, _} <- IndexSeqs],
1775    % Extract the seqnum from KV store for all indexible partitions.
1776    {ok, GroupSeqs} = get_seqs(State, GroupPartitions),
1777    PartSeqs = couch_set_view_util:filter_seqs(IndexPartitions, GroupSeqs),
1778
1779    % Calculate the total sum over difference of Seqnum between KV
1780    % and Index partition.
1781    SeqDiffs = lists:zipwith(
1782        fun({PartId, Seq1}, {PartId, Seq2}) ->
1783            Seq1 - Seq2
1784        end, PartSeqs, IndexSeqs),
1785    TotalSeqDiff = lists:sum(SeqDiffs),
1786
1787    [
1788        {signature, ?l2b(hex_sig(GroupSig))},
1789        {disk_size, Size},
1790        {data_size, Mod:view_group_data_size(Btree, Views)},
1791        {updater_running, is_pid(UpdaterPid)},
1792        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build},
1793        {updater_state, couch_util:to_binary(UpdaterState)},
1794        {compact_running, CompactorPid /= nil},
1795        {cleanup_running, (CleanerPid /= nil) orelse
1796            ((CompactorPid /= nil) andalso (?set_cbitmask(Group) =/= 0))},
1797        {max_number_partitions, ?set_num_partitions(Group)},
1798        {update_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- IndexSeqs]}},
1799        {partition_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- GroupSeqs]}},
1800        {total_seqs_diff, TotalSeqDiff},
1801        {active_partitions, couch_set_view_util:decode_bitmask(?set_abitmask(Group))},
1802        {passive_partitions, couch_set_view_util:decode_bitmask(?set_pbitmask(Group))},
1803        {cleanup_partitions, couch_set_view_util:decode_bitmask(?set_cbitmask(Group))},
1804        {unindexable_partitions, {[{couch_util:to_binary(P), S} || {P, S} <- ?set_unindexable_seqs(Group)]}},
1805        {stats, JsonStats},
1806        {pending_transition, case PendingTrans of
1807            nil ->
1808                null;
1809            #set_view_transition{} ->
1810                {[
1811                    {active, PendingTrans#set_view_transition.active},
1812                    {passive, PendingTrans#set_view_transition.passive}
1813                ]}
1814            end
1815        },
1816        {partition_versions, {PartVersions}}
1817    ] ++
1818    case (?type(State) =:= main) andalso is_pid(ReplicaPid) of
1819    true ->
1820        [{replica_partitions, ReplicaParts}, {replicas_on_transfer, ?set_replicas_on_transfer(Group)}];
1821    false ->
1822        []
1823    end ++
1824    get_replica_group_info(ReplicaPid).
1825
1826get_replica_group_info(ReplicaPid) when is_pid(ReplicaPid) ->
1827    {ok, RepGroupInfo} = gen_server:call(ReplicaPid, request_group_info, infinity),
1828    [{replica_group_info, {RepGroupInfo}}];
1829get_replica_group_info(_) ->
1830    [].
1831
1832
1833get_data_size_info(State) ->
1834    #state{
1835        group = Group,
1836        replica_group = ReplicaPid,
1837        updater_pid = UpdaterPid
1838    } = State,
1839    #set_view_group{
1840        fd = Fd,
1841        id_btree = Btree,
1842        sig = GroupSig,
1843        views = Views,
1844        mod = Mod
1845    } = Group,
1846    {ok, FileSize} = couch_file:bytes(Fd),
1847    DataSize = Mod:view_group_data_size(Btree, Views),
1848    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1849        ?set_view_group_stats_key(Group)),
1850    Info = [
1851        {signature, hex_sig(GroupSig)},
1852        {disk_size, FileSize},
1853        {data_size, DataSize},
1854        {accesses, Stats#set_view_group_stats.accesses},
1855        {updater_running, is_pid(UpdaterPid)},
1856        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build}
1857    ],
1858    case is_pid(ReplicaPid) of
1859    false ->
1860        Info;
1861    true ->
1862        {ok, RepInfo} = gen_server:call(ReplicaPid, get_data_size, infinity),
1863        [{replica_group_info, RepInfo} | Info]
1864    end.
1865
1866
1867-spec reset_group(#set_view_group{}) -> #set_view_group{}.
1868reset_group(Group) ->
1869    #set_view_group{
1870        views = Views,
1871        mod = Mod
1872    } = Group,
1873    Views2 = lists:map(fun(View) ->
1874        Indexer = Mod:reset_view(View#set_view.indexer),
1875        View#set_view{indexer = Indexer}
1876    end, Views),
1877    Group#set_view_group{
1878        fd = nil,
1879        index_header = #set_view_index_header{},
1880        id_btree = nil,
1881        views = Views2
1882    }.
1883
1884
1885-spec reset_file(pid(), #set_view_group{}) -> #set_view_group{}.
1886reset_file(Fd, #set_view_group{views = Views, index_header = Header} = Group) ->
1887    ok = couch_file:truncate(Fd, 0),
1888    EmptyHeader = Header#set_view_index_header{
1889        seqs = [{PartId, 0} ||
1890            {PartId, _} <- Header#set_view_index_header.seqs],
1891        id_btree_state = nil,
1892        view_states = [nil || _ <- Views],
1893        replicas_on_transfer = [],
1894        pending_transition = nil,
1895        unindexable_seqs = [{PartId, 0} ||
1896            {PartId, _} <- Header#set_view_index_header.unindexable_seqs],
1897        partition_versions = [{PartId, [{0, 0}]} ||
1898            {PartId, _} <- Header#set_view_index_header.partition_versions]
1899    },
1900    EmptyGroup = Group#set_view_group{index_header = EmptyHeader},
1901    EmptyHeaderBin = couch_set_view_util:group_to_header_bin(EmptyGroup),
1902    {ok, Pos} = couch_file:write_header_bin(Fd, EmptyHeaderBin),
1903    init_group(Fd, reset_group(EmptyGroup), EmptyHeader, Pos).
1904
1905
1906-spec reset_group_from_state(#state{}) -> #state{}.
1907reset_group_from_state(State) ->
1908    Group = State#state.group,
1909    Group2 = reset_file(Group#set_view_group.fd, Group),
1910    State#state{
1911        updater_pid = nil,
1912        initial_build = false,
1913        updater_state = not_running,
1914        group = Group2
1915    }.
1916
1917
1918-spec init_group(pid(),
1919                 #set_view_group{},
1920                 #set_view_index_header{},
1921                 non_neg_integer()) -> #set_view_group{}.
1922init_group(Fd, Group, IndexHeader, HeaderPos) ->
1923    #set_view_group{
1924        views = Views0,
1925        mod = Mod
1926    } = Group,
1927    Views = [V#set_view{ref = make_ref()} || V <- Views0],
1928    #set_view_index_header{
1929        id_btree_state = IdBtreeState,
1930        view_states = ViewStates
1931    } = IndexHeader,
1932    IdTreeReduce = fun(reduce, KVs) ->
1933        <<(length(KVs)):40, (couch_set_view_util:partitions_map(KVs, 0)):?MAX_NUM_PARTITIONS>>;
1934    (rereduce, [First | Rest]) ->
1935        lists:foldl(
1936            fun(<<S:40, M:?MAX_NUM_PARTITIONS>>, <<T:40, A:?MAX_NUM_PARTITIONS>>) ->
1937                <<(S + T):40, (M bor A):?MAX_NUM_PARTITIONS>>
1938            end,
1939            First, Rest)
1940    end,
1941    KvChunkThreshold = couch_config:get("set_views", "btree_kv_node_threshold", "7168"),
1942    KpChunkThreshold = couch_config:get("set_views", "btree_kp_node_threshold", "6144"),
1943    BtreeOptions = [
1944        {kv_chunk_threshold, list_to_integer(KvChunkThreshold)},
1945        {kp_chunk_threshold, list_to_integer(KpChunkThreshold)},
1946        {binary_mode, true}
1947    ],
1948    {ok, IdBtree} = couch_btree:open(
1949        IdBtreeState, Fd, [{reduce, IdTreeReduce} | BtreeOptions]),
1950    Views2 = Mod:setup_views(Fd, BtreeOptions, Group, ViewStates, Views),
1951    Group#set_view_group{
1952        fd = Fd,
1953        id_btree = IdBtree,
1954        views = Views2,
1955        index_header = IndexHeader,
1956        header_pos = HeaderPos
1957    }.
1958
1959
1960commit_header(Group) ->
1961    commit_header(Group, true).
1962
1963-spec commit_header(#set_view_group{}, boolean()) -> {'ok', non_neg_integer()}.
1964commit_header(Group, Fsync) ->
1965    HeaderBin = couch_set_view_util:group_to_header_bin(Group),
1966    {ok, Pos} = couch_file:write_header_bin(Group#set_view_group.fd, HeaderBin),
1967    case Fsync of
1968    true ->
1969        ok = couch_file:sync(Group#set_view_group.fd);
1970    false ->
1971        ok = couch_file:flush(Group#set_view_group.fd)
1972    end,
1973    {ok, Pos}.
1974
1975-spec filter_out_bitmask_partitions(ordsets:ordset(partition_id()),
1976                                    bitmask()) -> ordsets:ordset(partition_id()).
1977filter_out_bitmask_partitions(Partitions, BMask) ->
1978    [P || P <- Partitions, ((BMask bsr P) band 1) =/= 1].
1979
1980-spec maybe_update_partition_states(ordsets:ordset(partition_id()),
1981                                    ordsets:ordset(partition_id()),
1982                                    ordsets:ordset(partition_id()),
1983                                    #state{}) -> #state{}.
1984maybe_update_partition_states(ActiveList0, PassiveList0, CleanupList0, State) ->
1985    #state{group = Group} = State,
1986    PendingTrans = ?set_pending_transition(Group),
1987    PendingActive = ?pending_transition_active(PendingTrans),
1988    PendingPassive = ?pending_transition_passive(PendingTrans),
1989    PendingUnindexable = ?pending_transition_unindexable(PendingTrans),
1990    case (?set_unindexable_seqs(Group) == []) andalso (PendingUnindexable == []) of
1991    true ->
1992        ActiveList = ActiveList0,
1993        PassiveList = PassiveList0,
1994        CleanupList = CleanupList0;
1995    false ->
1996        AlreadyActive = couch_set_view_util:build_bitmask(PendingActive) bor ?set_abitmask(Group),
1997        AlreadyPassive = couch_set_view_util:build_bitmask(PendingPassive) bor ?set_pbitmask(Group),
1998        ActiveList = filter_out_bitmask_partitions(ActiveList0, AlreadyActive),
1999        PassiveList = filter_out_bitmask_partitions(PassiveList0, AlreadyPassive),
2000        CleanupList = filter_out_bitmask_partitions(CleanupList0, ?set_cbitmask(Group)),
2001        ActiveMarkedAsUnindexable0 = [
2002            P || P <- ActiveList, is_unindexable_part(P, Group)
2003        ],
2004
2005        % Replicas on transfer look like normal active partitions for the
2006        % caller. Hence they can be marked as unindexable. The actual
2007        % transfer to a real active partition needs to happen though. Thus
2008        % an intersection between newly activated partitions and unindexable
2009        % ones is possible (MB-8677).
2010        ActiveMarkedAsUnindexable = ordsets:subtract(
2011            ActiveMarkedAsUnindexable0, ?set_replicas_on_transfer(Group)),
2012
2013        case ActiveMarkedAsUnindexable of
2014        [] ->
2015            ok;
2016        _ ->
2017            ErrorMsg1 = io_lib:format("Intersection between requested active list "
2018                "and current unindexable partitions: ~w", [ActiveMarkedAsUnindexable]),
2019            throw({error, iolist_to_binary(ErrorMsg1)})
2020        end,
2021        PassiveMarkedAsUnindexable = [
2022            P || P <- PassiveList, is_unindexable_part(P, Group)
2023        ],
2024        case PassiveMarkedAsUnindexable of
2025        [] ->
2026            ok;
2027        _ ->
2028            ErrorMsg2 = io_lib:format("Intersection between requested passive list "
2029                "and current unindexable partitions: ~w", [PassiveMarkedAsUnindexable]),
2030            throw({error, iolist_to_binary(ErrorMsg2)})
2031        end,
2032        CleanupMarkedAsUnindexable = [
2033            P || P <- CleanupList, is_unindexable_part(P, Group)
2034        ],
2035        case CleanupMarkedAsUnindexable of
2036        [] ->
2037            ok;
2038        _ ->
2039            ErrorMsg3 = io_lib:format("Intersection between requested cleanup list "
2040                "and current unindexable partitions: ~w", [CleanupMarkedAsUnindexable]),
2041            throw({error, iolist_to_binary(ErrorMsg3)})
2042        end
2043    end,
2044    ActiveMask = couch_set_view_util:build_bitmask(ActiveList),
2045    case ActiveMask >= (1 bsl ?set_num_partitions(Group)) of
2046    true ->
2047        throw({error, <<"Invalid active partitions list">>});
2048    false ->
2049        ok
2050    end,
2051    PassiveMask = couch_set_view_util:build_bitmask(PassiveList),
2052    case PassiveMask >= (1 bsl ?set_num_partitions(Group)) of
2053    true ->
2054        throw({error, <<"Invalid passive partitions list">>});
2055    false ->
2056        ok
2057    end,
2058    CleanupMask = couch_set_view_util:build_bitmask(CleanupList),
2059    case CleanupMask >= (1 bsl ?set_num_partitions(Group)) of
2060    true ->
2061        throw({error, <<"Invalid cleanup partitions list">>});
2062    false ->
2063        ok
2064    end,
2065
2066    ActivePending = ?pending_transition_active(PendingTrans),
2067    PassivePending = ?pending_transition_passive(PendingTrans),
2068    IsEffectlessTransition =
2069        (ActiveMask bor ?set_abitmask(Group)) == ?set_abitmask(Group) andalso
2070        (PassiveMask bor ?set_pbitmask(Group)) == ?set_pbitmask(Group) andalso
2071        ((CleanupMask band (?set_abitmask(Group) bor ?set_pbitmask(Group))) == 0) andalso
2072        ordsets:is_disjoint(CleanupList, ActivePending) andalso
2073        ordsets:is_disjoint(CleanupList, PassivePending) andalso
2074        ordsets:is_disjoint(ActiveList, PassivePending) andalso
2075        ordsets:is_disjoint(PassiveList, ActivePending),
2076
2077    case IsEffectlessTransition of
2078    true ->
2079        State;
2080    false ->
2081        RestartUpdater = updater_needs_restart(
2082            Group, ActiveMask, PassiveMask, CleanupMask),
2083        NewState = update_partition_states(
2084            ActiveList, PassiveList, CleanupList, State, RestartUpdater),
2085        #state{group = NewGroup, updater_pid = UpdaterPid} = NewState,
2086        case RestartUpdater of
2087        false when is_pid(UpdaterPid) ->
2088            case missing_partitions(Group, NewGroup) of
2089            [] ->
2090                ok;
2091            MissingPassive ->
2092                UpdaterPid ! {new_passive_partitions, MissingPassive}
2093            end;
2094        _ ->
2095            ok
2096        end,
2097        NewState
2098    end.
2099
2100
2101-spec update_partition_states(ordsets:ordset(partition_id()),
2102                              ordsets:ordset(partition_id()),
2103                              ordsets:ordset(partition_id()),
2104                              #state{},
2105                              boolean()) -> #state{}.
2106update_partition_states(ActiveList, PassiveList, CleanupList, State, RestartUpdater) ->
2107    State2 = stop_cleaner(State),
2108    case RestartUpdater of
2109    true ->
2110        #state{group = Group3} = State3 = stop_updater(State2);
2111    false ->
2112        #state{group = Group3} = State3 = State2
2113    end,
2114    UpdaterWasRunning = is_pid(State#state.updater_pid),
2115    ActiveInCleanup = partitions_still_in_cleanup(ActiveList, Group3),
2116    PassiveInCleanup = partitions_still_in_cleanup(PassiveList, Group3),
2117    NewPendingTrans = merge_into_pending_transition(
2118        Group3, ActiveInCleanup, PassiveInCleanup, CleanupList),
2119    ApplyActiveList = ordsets:subtract(ActiveList, ActiveInCleanup),
2120    ApplyPassiveList = ordsets:subtract(PassiveList, PassiveInCleanup),
2121    ApplyCleanupList = CleanupList,
2122    State4 = persist_partition_states(
2123               State3, ApplyActiveList, ApplyPassiveList,
2124               ApplyCleanupList, NewPendingTrans, []),
2125    State5 = notify_pending_transition_waiters(State4),
2126    after_partition_states_updated(State5, UpdaterWasRunning).
2127
2128
2129-spec merge_into_pending_transition(#set_view_group{},
2130                                    ordsets:ordset(partition_id()),
2131                                    ordsets:ordset(partition_id()),
2132                                    ordsets:ordset(partition_id())) ->
2133                                           #set_view_transition{} | 'nil'.
2134merge_into_pending_transition(Group, ActiveInCleanup, PassiveInCleanup, CleanupList) ->
2135    PendingTrans = ?set_pending_transition(Group),
2136    ActivePending = ?pending_transition_active(PendingTrans),
2137    PassivePending = ?pending_transition_passive(PendingTrans),
2138    case ordsets:intersection(PassivePending, ActiveInCleanup) of
2139    [] ->
2140        PassivePending2 = PassivePending;
2141    Int ->
2142        PassivePending2 = ordsets:subtract(PassivePending, Int)
2143    end,
2144    case ordsets:intersection(ActivePending, PassiveInCleanup) of
2145    [] ->
2146        ActivePending2 = ActivePending;
2147    Int2 ->
2148        ActivePending2 = ordsets:subtract(ActivePending, Int2)
2149    end,
2150    ActivePending3 = ordsets:subtract(ActivePending2, CleanupList),
2151    PassivePending3 = ordsets:subtract(PassivePending2, CleanupList),
2152    ActivePending4 = ordsets:union(ActivePending3, ActiveInCleanup),
2153    PassivePending4 = ordsets:union(PassivePending3, PassiveInCleanup),
2154    case (ActivePending4 == []) andalso (PassivePending4 == []) of
2155    true ->
2156        nil;
2157    false ->
2158        #set_view_transition{
2159            active = ActivePending4,
2160            passive = PassivePending4
2161        }
2162    end.
2163
2164
2165-spec after_partition_states_updated(#state{}, boolean()) -> #state{}.
2166after_partition_states_updated(State, UpdaterWasRunning) ->
2167    State2 = case UpdaterWasRunning of
2168    true ->
2169        % Updater was running, we stopped it, updated the group we received
2170        % from the updater, updated that group's bitmasks and update seqs,
2171        % and now restart the updater with this modified group.
2172        start_updater(State);
2173    false ->
2174        State
2175    end,
2176    State3 = stop_compactor(State2),
2177    maybe_start_cleaner(State3).
2178
2179
2180-spec persist_partition_states(#state{},
2181                               ordsets:ordset(partition_id()),
2182                               ordsets:ordset(partition_id()),
2183                               ordsets:ordset(partition_id()),
2184                               #set_view_transition{} | 'nil',
2185                               ordsets:ordset(partition_id())) -> #state{}.
2186persist_partition_states(State, ActiveList, PassiveList, CleanupList, PendingTrans, ToBeUnindexable) ->
2187    % There can never be intersection between given active, passive and cleanup lists.
2188    % This check is performed elsewhere, outside the gen_server.
2189    #state{
2190        group = Group,
2191        replica_partitions = ReplicaParts,
2192        replica_group = ReplicaPid,
2193        update_listeners = Listeners,
2194        waiting_list = WaitList
2195    } = State,
2196    case ordsets:intersection(ActiveList, ReplicaParts) of
2197    [] ->
2198         ActiveList2 = ActiveList,
2199         PassiveList2 = PassiveList,
2200         ReplicasOnTransfer2 = ?set_replicas_on_transfer(Group),
2201         ReplicasToMarkActive = [];
2202    CommonRep ->
2203         PassiveList2 = ordsets:union(PassiveList, CommonRep),
2204         ActiveList2 = ordsets:subtract(ActiveList, CommonRep),
2205         ReplicasOnTransfer2 = ordsets:union(?set_replicas_on_transfer(Group), CommonRep),
2206         ReplicasToMarkActive = CommonRep
2207    end,
2208    case ordsets:intersection(PassiveList, ReplicasOnTransfer2) of
2209    [] ->
2210        ReplicasToCleanup = [],
2211        PassiveList3 = PassiveList2,
2212        ReplicasOnTransfer3 = ReplicasOnTransfer2;
2213    CommonRep2 ->
2214        ReplicasToCleanup = CommonRep2,
2215        PassiveList3 = ordsets:subtract(PassiveList2, CommonRep2),
2216        ReplicasOnTransfer3 = ordsets:subtract(ReplicasOnTransfer2, CommonRep2)
2217    end,
2218    case ordsets:intersection(CleanupList, ReplicasOnTransfer3) of
2219    [] ->
2220        ReplicaParts2 = ReplicaParts,
2221        ReplicasOnTransfer4 = ReplicasOnTransfer3,
2222        ReplicasToCleanup2 = ReplicasToCleanup;
2223    CommonRep3 ->
2224        ReplicaParts2 = ordsets:subtract(ReplicaParts, CommonRep3),
2225        ReplicasOnTransfer4 = ordsets:subtract(ReplicasOnTransfer3, CommonRep3),
2226        ReplicasToCleanup2 = ordsets:union(ReplicasToCleanup, CommonRep3)
2227    end,
2228    {ok, NewAbitmask1, NewPbitmask1, NewSeqs1, NewVersions1} =
2229        set_active_partitions(
2230            ActiveList2,
2231            ?set_abitmask(Group),
2232            ?set_pbitmask(Group),
2233            ?set_seqs(Group),
2234            ?set_partition_versions(Group)),
2235    {ok, NewAbitmask2, NewPbitmask2, NewSeqs2, NewVersions2} =
2236        set_passive_partitions(
2237            PassiveList3,
2238            NewAbitmask1,
2239            NewPbitmask1,
2240            NewSeqs1,
2241            NewVersions1),
2242    {ok, NewAbitmask3, NewPbitmask3, NewCbitmask3, NewSeqs3, NewVersions3} =
2243        set_cleanup_partitions(
2244            CleanupList,
2245            NewAbitmask2,
2246            NewPbitmask2,
2247            ?set_cbitmask(Group),
2248            NewSeqs2,
2249            NewVersions2),
2250    {NewSeqs4, NewUnindexableSeqs, NewVersions4} = lists:foldl(
2251        fun(PartId, {AccSeqs, AccUnSeqs, AccVersions}) ->
2252            PartSeq = couch_set_view_util:get_part_seq(PartId, AccSeqs),
2253            AccSeqs2 = orddict:erase(PartId, AccSeqs),
2254            AccUnSeqs2 = orddict:store(PartId, PartSeq, AccUnSeqs),
2255            AccVersions2 = orddict:erase(PartId, AccVersions),
2256            {AccSeqs2, AccUnSeqs2, AccVersions2}
2257        end,
2258        {NewSeqs3, ?set_unindexable_seqs(Group), NewVersions3},
2259        ToBeUnindexable),
2260    State2 = update_header(
2261        State,
2262        NewAbitmask3,
2263        NewPbitmask3,
2264        NewCbitmask3,
2265        NewSeqs4,
2266        NewUnindexableSeqs,
2267        ReplicasOnTransfer4,
2268        ReplicaParts2,
2269        PendingTrans,
2270        NewVersions4),
2271    % A crash might happen between updating our header and updating the state of
2272    % replica view group. The init function must detect and correct this.
2273    ok = set_state(ReplicaPid, ReplicasToMarkActive, [], ReplicasToCleanup2),
2274    % Need to update list of active partition sequence numbers for every blocked client.
2275    WaitList2 = update_waiting_list(
2276        WaitList, State, ActiveList2, PassiveList3, CleanupList),
2277    State3 = State2#state{waiting_list = WaitList2},
2278    case (dict:size(Listeners) > 0) andalso (CleanupList /= []) of
2279    true ->
2280        Listeners2 = dict:filter(
2281            fun(Ref, Listener) ->
2282                #up_listener{
2283                    pid = Pid,
2284                    monref = MonRef,
2285                    partition = PartId
2286                } = Listener,
2287                case lists:member(PartId, CleanupList) of
2288                true ->
2289                    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2290                              " replying to partition ~p"
2291                              " update monitor, reference ~p,"
2292                              " error: marked_for_cleanup",
2293                              [?set_name(State), ?type(State),
2294                               ?category(State), ?group_id(State),
2295                               Ref, PartId]),
2296                    Pid ! {Ref, marked_for_cleanup},
2297                    erlang:demonitor(MonRef, [flush]),
2298                    false;
2299                false ->
2300                    true
2301                end
2302            end,
2303            Listeners),
2304        State3#state{update_listeners = Listeners2};
2305    false ->
2306        State3
2307    end.
2308
2309
2310-spec update_waiting_list([#waiter{}],
2311                          #state{},
2312                          ordsets:ordset(partition_id()),
2313                          ordsets:ordset(partition_id()),
2314                          ordsets:ordset(partition_id())) -> [#waiter{}].
2315update_waiting_list([], _State, _AddActiveList, _AddPassiveList, _AddCleanupList) ->
2316    [];
2317update_waiting_list(WaitList, State, AddActiveList, AddPassiveList, AddCleanupList) ->
2318    {ok, AddActiveSeqs} = get_seqs(State, AddActiveList),
2319    RemoveSet = ordsets:union(AddPassiveList, AddCleanupList),
2320    MapFun = fun(W) -> update_waiter_seqs(W, AddActiveSeqs, RemoveSet) end,
2321    [MapFun(W) || W <- WaitList].
2322
2323
2324-spec update_waiter_seqs(#waiter{},
2325                         partition_seqs(),
2326                         ordsets:ordset(partition_id())) -> #waiter{}.
2327update_waiter_seqs(Waiter, AddActiveSeqs, ToRemove) ->
2328    Seqs2 = lists:foldl(
2329        fun({PartId, Seq}, Acc) ->
2330            case couch_set_view_util:has_part_seq(PartId, Acc) of
2331            true ->
2332                Acc;
2333            false ->
2334                orddict:store(PartId, Seq, Acc)
2335            end
2336        end,
2337        Waiter#waiter.seqs, AddActiveSeqs),
2338    Seqs3 = lists:foldl(
2339        fun(PartId, Acc) -> orddict:erase(PartId, Acc) end,
2340        Seqs2, ToRemove),
2341    Waiter#waiter{seqs = Seqs3}.
2342
2343
2344-spec maybe_apply_pending_transition(#state{}) -> #state{}.
2345maybe_apply_pending_transition(State) when not ?have_pending_transition(State) ->
2346    State;
2347maybe_apply_pending_transition(State) ->
2348    State2 = stop_cleaner(State),
2349    #state{group = Group3} = State3 = stop_updater(State2),
2350    UpdaterWasRunning = is_pid(State#state.updater_pid),
2351    #set_view_transition{
2352        active = ActivePending,
2353        passive = PassivePending,
2354        unindexable = UnindexablePending
2355    } = get_pending_transition(State),
2356    ActiveInCleanup = partitions_still_in_cleanup(ActivePending, Group3),
2357    PassiveInCleanup = partitions_still_in_cleanup(PassivePending, Group3),
2358    ApplyActiveList = ordsets:subtract(ActivePending, ActiveInCleanup),
2359    ApplyPassiveList = ordsets:subtract(PassivePending, PassiveInCleanup),
2360    {ApplyUnindexableList, NewUnindexablePending} = lists:partition(
2361        fun(P) ->
2362            lists:member(P, ApplyActiveList) orelse
2363            lists:member(P, ApplyPassiveList)
2364        end,
2365        UnindexablePending),
2366    case (ApplyActiveList /= []) orelse (ApplyPassiveList /= []) of
2367    true ->
2368        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2369                  " applying state transitions from pending transition:~n"
2370                  "  Active partitions:  ~w~n"
2371                  "  Passive partitions: ~w~n"
2372                  "  Unindexable:        ~w~n",
2373                  [?set_name(State), ?type(State), ?category(State),
2374                   ?group_id(State), ApplyActiveList, ApplyPassiveList,
2375                   ApplyUnindexableList]),
2376        case (ActiveInCleanup == []) andalso (PassiveInCleanup == []) of
2377        true ->
2378            NewPendingTrans = nil;
2379        false ->
2380            NewPendingTrans = #set_view_transition{
2381                active = ActiveInCleanup,
2382                passive = PassiveInCleanup,
2383                unindexable = NewUnindexablePending
2384            }
2385        end,
2386        State4 = set_pending_transition(State3, NewPendingTrans),
2387        State5 = persist_partition_states(
2388            State4, ApplyActiveList, ApplyPassiveList, [], NewPendingTrans, ApplyUnindexableList),
2389        State6 = notify_pending_transition_waiters(State5),
2390        NewState = case dict:size(State6#state.update_listeners) > 0 of
2391        true ->
2392            start_updater(State6);
2393        false ->
2394            State6
2395        end;
2396    false ->
2397        NewState = State3
2398    end,
2399    after_partition_states_updated(NewState, UpdaterWasRunning).
2400
2401
2402-spec notify_pending_transition_waiters(#state{}) -> #state{}.
2403notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State) ->
2404    State;
2405notify_pending_transition_waiters(State) ->
2406    #state{
2407        pending_transition_waiters = TransWaiters,
2408        group = Group,
2409        replica_partitions = RepParts,
2410        waiting_list = WaitList
2411    } = State,
2412    CurSeqs = active_partition_seqs(State),
2413    {TransWaiters2, WaitList2, GroupReplyList, TriggerGroupUpdate} =
2414        lists:foldr(
2415            fun({From, Req} = TransWaiter, {AccTrans, AccWait, ReplyAcc, AccTriggerUp}) ->
2416                #set_view_group_req{
2417                    stale = Stale,
2418                    debug = Debug
2419                } = Req,
2420                case is_any_partition_pending(Req, Group) of
2421                true ->
2422                    {[TransWaiter | AccTrans], AccWait, ReplyAcc, AccTriggerUp};
2423                false when Stale == ok ->
2424                    Waiter = #waiter{from = From, debug = Debug},
2425                    {AccTrans, AccWait, [Waiter | ReplyAcc], AccTriggerUp};
2426                false when Stale == update_after ->
2427                    Waiter = #waiter{from = From, debug = Debug},
2428                    {AccTrans, AccWait, [Waiter | ReplyAcc], true};
2429                false when Stale == false ->
2430                    Waiter = #waiter{from = From, debug = Debug, seqs = CurSeqs},
2431                    {AccTrans, [Waiter | AccWait], ReplyAcc, true}
2432                end
2433            end,
2434            {[], WaitList, [], false},
2435            TransWaiters),
2436    [] = reply_with_group(Group, RepParts, GroupReplyList),
2437    WaitList3 = reply_with_group(Group, RepParts, WaitList2),
2438    State2 = State#state{
2439        pending_transition_waiters = TransWaiters2,
2440        waiting_list = WaitList3
2441    },
2442    case TriggerGroupUpdate of
2443    true ->
2444        start_updater(State2);
2445    false ->
2446        State2
2447    end.
2448
2449
2450-spec notify_pending_transition_waiters(#state{}, term()) -> #state{}.
2451notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State, _Reply) ->
2452    State;
2453notify_pending_transition_waiters(#state{pending_transition_waiters = Waiters} = State, Reply) ->
2454    lists:foreach(fun(F) -> catch gen_server:reply(F, Reply) end, Waiters),
2455    State#state{pending_transition_waiters = []}.
2456
2457
2458-spec set_passive_partitions(ordsets:ordset(partition_id()),
2459                             bitmask(),
2460                             bitmask(),
2461                             partition_seqs(),
2462                             partition_versions()) ->
2463                                    {'ok', bitmask(), bitmask(),
2464                                     partition_seqs(), partition_versions()}.
2465set_passive_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2466    {ok, Abitmask, Pbitmask, Seqs, Versions};
2467
2468set_passive_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2469    PartMask = 1 bsl PartId,
2470    case PartMask band Abitmask of
2471    0 ->
2472        case PartMask band Pbitmask of
2473        PartMask ->
2474            set_passive_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2475        0 ->
2476            NewSeqs = lists:ukeymerge(1, [{PartId, 0}], Seqs),
2477            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2478            set_passive_partitions(
2479                Rest, Abitmask, Pbitmask bor PartMask, NewSeqs, NewVersions)
2480        end;
2481    PartMask ->
2482        set_passive_partitions(
2483            Rest, Abitmask bxor PartMask, Pbitmask bor PartMask, Seqs,
2484            Versions)
2485    end.
2486
2487
2488-spec set_active_partitions(ordsets:ordset(partition_id()),
2489                            bitmask(),
2490                            bitmask(),
2491                            partition_seqs(),
2492                            partition_versions()) ->
2493                                   {'ok', bitmask(), bitmask(),
2494                                    partition_seqs(), partition_versions()}.
2495set_active_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2496    {ok, Abitmask, Pbitmask, Seqs, Versions};
2497
2498set_active_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2499    PartMask = 1 bsl PartId,
2500    case PartMask band Pbitmask of
2501    0 ->
2502        case PartMask band Abitmask of
2503        PartMask ->
2504            set_active_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2505        0 ->
2506            NewSeqs = lists:ukeymerge(1, Seqs, [{PartId, 0}]),
2507            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2508            set_active_partitions(Rest, Abitmask bor PartMask, Pbitmask,
2509                NewSeqs, NewVersions)
2510        end;
2511    PartMask ->
2512        set_active_partitions(
2513            Rest, Abitmask bor PartMask, Pbitmask bxor PartMask, Seqs,
2514            Versions)
2515    end.
2516
2517
2518-spec set_cleanup_partitions(ordsets:ordset(partition_id()),
2519                             bitmask(),
2520                             bitmask(),
2521                             bitmask(),
2522                             partition_seqs(),
2523                             partition_versions()) ->
2524                                    {'ok', bitmask(), bitmask(), bitmask(),
2525                                     partition_seqs(), partition_versions()}.
2526set_cleanup_partitions([], Abitmask, Pbitmask, Cbitmask, Seqs, Versions) ->
2527    {ok, Abitmask, Pbitmask, Cbitmask, Seqs, Versions};
2528
2529set_cleanup_partitions([PartId | Rest], Abitmask, Pbitmask, Cbitmask, Seqs,
2530        Versions) ->
2531    PartMask = 1 bsl PartId,
2532    case PartMask band Cbitmask of
2533    PartMask ->
2534        set_cleanup_partitions(
2535            Rest, Abitmask, Pbitmask, Cbitmask, Seqs, Versions);
2536    0 ->
2537        Seqs2 = lists:keydelete(PartId, 1, Seqs),
2538        Versions2 = lists:keydelete(PartId, 1, Versions),
2539        Cbitmask2 = Cbitmask bor PartMask,
2540        case PartMask band Abitmask of
2541        PartMask ->
2542            set_cleanup_partitions(
2543                Rest, Abitmask bxor PartMask, Pbitmask, Cbitmask2, Seqs2,
2544                Versions2);
2545        0 ->
2546            case (PartMask band Pbitmask) of
2547            PartMask ->
2548                set_cleanup_partitions(
2549                    Rest, Abitmask, Pbitmask bxor PartMask, Cbitmask2, Seqs2,
2550                    Versions2);
2551            0 ->
2552                set_cleanup_partitions(
2553                    Rest, Abitmask, Pbitmask, Cbitmask, Seqs, Versions)
2554            end
2555        end
2556    end.
2557
2558
2559-spec update_header(#state{},
2560                    bitmask(),
2561                    bitmask(),
2562                    bitmask(),
2563                    partition_seqs(),
2564                    partition_seqs(),
2565                    ordsets:ordset(partition_id()),
2566                    ordsets:ordset(partition_id()),
2567                    #set_view_transition{} | 'nil',
2568                    partition_versions()) -> #state{}.
2569update_header(State, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs,
2570              NewUnindexableSeqs, NewRelicasOnTransfer, NewReplicaParts,
2571              NewPendingTrans, NewPartVersions) ->
2572    #state{
2573        group = #set_view_group{
2574            index_header =
2575                #set_view_index_header{
2576                    abitmask = Abitmask,
2577                    pbitmask = Pbitmask,
2578                    cbitmask = Cbitmask,
2579                    replicas_on_transfer = ReplicasOnTransfer,
2580                    unindexable_seqs = UnindexableSeqs,
2581                    pending_transition = PendingTrans,
2582                    partition_versions = PartVersions
2583                } = Header
2584        } = Group,
2585        replica_partitions = ReplicaParts
2586    } = State,
2587    NewGroup = Group#set_view_group{
2588        index_header = Header#set_view_index_header{
2589            abitmask = NewAbitmask,
2590            pbitmask = NewPbitmask,
2591            cbitmask = NewCbitmask,
2592            seqs = NewSeqs,
2593            unindexable_seqs = NewUnindexableSeqs,
2594            replicas_on_transfer = NewRelicasOnTransfer,
2595            pending_transition = NewPendingTrans,
2596            partition_versions = NewPartVersions
2597        }
2598    },
2599    NewGroup2 = remove_duplicate_partitions(NewGroup),
2600    #set_view_group{
2601        index_header = #set_view_index_header{
2602            partition_versions = NewPartVersions2
2603        }
2604    } = NewGroup2,
2605    NewState = State#state{
2606        group = NewGroup2,
2607        replica_partitions = NewReplicaParts
2608    },
2609    FsyncHeader = (NewCbitmask /= Cbitmask),
2610    {ok, HeaderPos} = commit_header(NewState#state.group, FsyncHeader),
2611    NewGroup3 = (NewState#state.group)#set_view_group{
2612        header_pos = HeaderPos
2613    },
2614    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, partition states updated~n"
2615              "active partitions before:    ~w~n"
2616              "active partitions after:     ~w~n"
2617              "passive partitions before:   ~w~n"
2618              "passive partitions after:    ~w~n"
2619              "cleanup partitions before:   ~w~n"
2620              "cleanup partitions after:    ~w~n"
2621              "unindexable partitions:      ~w~n"
2622              "replica partitions before:   ~w~n"
2623              "replica partitions after:    ~w~n"
2624              "replicas on transfer before: ~w~n"
2625              "replicas on transfer after:  ~w~n"
2626              "pending transition before:~n"
2627              "  active:      ~w~n"
2628              "  passive:     ~w~n"
2629              "  unindexable: ~w~n"
2630              "pending transition after:~n"
2631              "  active:      ~w~n"
2632              "  passive:     ~w~n"
2633              "  unindexable: ~w~n"
2634              "partition versions before:~n~w~n"
2635              "partition versions after:~n~w~n",
2636              [?set_name(State), ?type(State), ?category(State),
2637               ?group_id(State),
2638               couch_set_view_util:decode_bitmask(Abitmask),
2639               couch_set_view_util:decode_bitmask(NewAbitmask),
2640               couch_set_view_util:decode_bitmask(Pbitmask),
2641               couch_set_view_util:decode_bitmask(NewPbitmask),
2642               couch_set_view_util:decode_bitmask(Cbitmask),
2643               couch_set_view_util:decode_bitmask(NewCbitmask),
2644               UnindexableSeqs,
2645               ReplicaParts,
2646               NewReplicaParts,
2647               ReplicasOnTransfer,
2648               NewRelicasOnTransfer,
2649               ?pending_transition_active(PendingTrans),
2650               ?pending_transition_passive(PendingTrans),
2651               ?pending_transition_unindexable(PendingTrans),
2652               ?pending_transition_active(NewPendingTrans),
2653               ?pending_transition_passive(NewPendingTrans),
2654               ?pending_transition_unindexable(NewPendingTrans),
2655               PartVersions,
2656               NewPartVersions2]),
2657    NewState#state{group = NewGroup3}.
2658
2659
2660-spec maybe_start_cleaner(#state{}) -> #state{}.
2661maybe_start_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
2662    State;
2663maybe_start_cleaner(#state{auto_cleanup = false} = State) ->
2664    State;
2665maybe_start_cleaner(#state{group = Group} = State) ->
2666    case is_pid(State#state.compactor_pid) orelse
2667        is_pid(State#state.updater_pid) orelse (?set_cbitmask(Group) == 0) of
2668    true ->
2669        State;
2670    false ->
2671        Cleaner = spawn_link(fun() -> exit(cleaner(State)) end),
2672        ?LOG_INFO("Started cleanup process ~p for"
2673                  " set view `~s`, ~s (~s) group `~s`",
2674                  [Cleaner, ?set_name(State), ?type(State), ?category(State),
2675                   ?group_id(State)]),
2676        State#state{cleaner_pid = Cleaner}
2677    end.
2678
2679
2680-spec stop_cleaner(#state{}) -> #state{}.
2681stop_cleaner(#state{cleaner_pid = nil} = State) ->
2682    State;
2683stop_cleaner(#state{cleaner_pid = Pid, group = Group} = State) when is_pid(Pid) ->
2684    MRef = erlang:monitor(process, Pid),
2685    Pid ! stop,
2686    unlink(Pid),
2687    ?LOG_INFO("Stopping cleanup process for set view `~s`, group `~s` (~s)",
2688              [?set_name(State), ?group_id(State), ?category(State)]),
2689    % NOTE vmx 2015-06-29: There is currently no way to cleanly stop the
2690    % spatial view cleaner while it is running. Hence do a hard stop right way.
2691    case Group#set_view_group.mod of
2692    spatial_view ->
2693        couch_util:shutdown_sync(Pid);
2694    _ ->
2695        ok
2696    end,
2697    NewState = receive
2698    {'EXIT', Pid, Reason} ->
2699        after_cleaner_stopped(State, Reason);
2700    {'DOWN', MRef, process, Pid, Reason} ->
2701        receive {'EXIT', Pid, _} -> ok after 0 -> ok end,
2702        after_cleaner_stopped(State, Reason)
2703    after 5000 ->
2704        couch_set_view_util:shutdown_cleaner(Group, Pid),
2705        ok = couch_file:refresh_eof(Group#set_view_group.fd),
2706        ?LOG_ERROR("Timeout stopping cleanup process ~p for"
2707                   " set view `~s`, ~s (~s) group `~s`",
2708                   [Pid, ?set_name(State), ?type(State), ?category(State),
2709                    ?group_id(State)]),
2710        State#state{cleaner_pid = nil}
2711    end,
2712    erlang:demonitor(MRef, [flush]),
2713    NewState.
2714
2715
2716after_cleaner_stopped(State, {clean_group, CleanGroup, Count, Time}) ->
2717    #state{group = OldGroup} = State,
2718    {ok, NewGroup0} = couch_set_view_util:refresh_viewgroup_header(CleanGroup),
2719    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
2720    ?LOG_INFO("Stopped cleanup process for"
2721              " set view `~s`, ~s (~s) group `~s`.~n"
2722              "Removed ~p values from the index in ~.3f seconds~n"
2723              "New set of partitions to cleanup: ~w~n"
2724              "Old set of partitions to cleanup: ~w~n",
2725              [?set_name(State), ?type(State), ?category(State),
2726               ?group_id(State), Count, Time,
2727               couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup)),
2728               couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup))]),
2729    case ?set_cbitmask(NewGroup) of
2730    0 ->
2731        inc_cleanups(State#state.group, Time, Count, false);
2732    _ ->
2733        ?inc_cleanup_stops(State#state.group)
2734    end,
2735    State#state{
2736        group = NewGroup,
2737        cleaner_pid = nil
2738    };
2739after_cleaner_stopped(#state{cleaner_pid = Pid, group = Group} = State, Reason) ->
2740    ok = couch_file:refresh_eof(Group#set_view_group.fd),
2741    ?LOG_ERROR("Cleanup process ~p for set view `~s`, ~s (~s) group `~s`,"
2742               " died with reason: ~p",
2743               [Pid, ?set_name(State), ?type(State), ?category(State),
2744                ?group_id(State), Reason]),
2745    State#state{cleaner_pid = nil}.
2746
2747
2748-spec cleaner(#state{}) -> {'clean_group', #set_view_group{}, non_neg_integer(), float()}.
2749cleaner(#state{group = Group}) ->
2750    StartTime = os:timestamp(),
2751    {ok, NewGroup, TotalPurgedCount} = couch_set_view_util:cleanup_group(Group),
2752    Duration = timer:now_diff(os:timestamp(), StartTime) / 1000000,
2753    {clean_group, NewGroup, TotalPurgedCount, Duration}.
2754
2755
2756-spec indexable_partition_seqs(#state{}) -> partition_seqs().
2757indexable_partition_seqs(State) ->
2758    Partitions = group_partitions(State#state.group),
2759    {ok, Seqs} = get_seqs(State, Partitions),
2760    indexable_partition_seqs(State, Seqs).
2761
2762-spec indexable_partition_seqs(#state{}, partition_seqs()) -> partition_seqs().
2763indexable_partition_seqs(#state{group = Group}, Seqs) ->
2764    case ?set_unindexable_seqs(Group) of
2765    [] ->
2766        Seqs;
2767    _ ->
2768        IndexSeqs = ?set_seqs(Group),
2769        CurPartitions = [P || {P, _} <- IndexSeqs],
2770        ReplicasOnTransfer = ?set_replicas_on_transfer(Group),
2771        Partitions = ordsets:union(CurPartitions, ReplicasOnTransfer),
2772        % Index unindexable replicas on transfer though (as the reason for the
2773        % transfer is to become active and indexable).
2774        couch_set_view_util:filter_seqs(Partitions, Seqs)
2775    end.
2776
2777
2778-spec active_partition_seqs(#state{}) -> partition_seqs().
2779active_partition_seqs(#state{group = Group} = State) ->
2780    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
2781    {ok, CurSeqs} = get_seqs(State, ActiveParts),
2782    CurSeqs.
2783
2784-spec active_partition_seqs(#state{}, partition_seqs()) -> partition_seqs().
2785active_partition_seqs(#state{group = Group}, Seqs) ->
2786    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
2787    couch_set_view_util:filter_seqs(ActiveParts, Seqs).
2788
2789
2790-spec start_compactor(#state{}, compact_fun()) -> #state{}.
2791start_compactor(State, CompactFun) ->
2792    #state{group = Group} = State2 = stop_cleaner(State),
2793    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compaction starting",
2794              [?set_name(State2), ?type(State), ?category(State),
2795               ?group_id(State2)]),
2796    #set_view_group{
2797        fd = CompactFd
2798    } = NewGroup = compact_group(State2),
2799    Owner = self(),
2800    TmpDir = updater_tmp_dir</