1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_group).
16-behaviour(gen_server).
17
18%% API
19-export([start_link/1, request_group_info/1, get_data_size/1]).
20-export([open_set_group/3]).
21-export([request_group/2, release_group/1]).
22-export([is_view_defined/1, define_view/2]).
23-export([set_state/4]).
24-export([add_replica_partitions/2, remove_replica_partitions/2]).
25-export([mark_as_unindexable/2, mark_as_indexable/2]).
26-export([monitor_partition_update/4, demonitor_partition_update/2]).
27-export([reset_utilization_stats/1, get_utilization_stats/1]).
28-export([inc_access_stat/1, remove_duplicate_partitions/1]).
29
30%% gen_server callbacks
31-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
32         terminate/2, code_change/3]).
33
34-include("couch_db.hrl").
35-include_lib("couch_set_view/include/couch_set_view.hrl").
36
37-type init_args()   :: {string(), binary(), #set_view_group{}}.
38-type compact_fun() :: fun((#set_view_group{},
39                            #set_view_group{},
40                            string(),
41                            pid() | 'nil',
42                            pid()) -> no_return()).
43-type monitor_error() :: {'shutdown', any()} |
44                         'marked_for_cleanup' |
45                         {'updater_error', any()}.
46
47-define(DEFAULT_TIMEOUT, 3000).
48-define(GET_TIMEOUT(State), ((State)#state.timeout)).
49
50-define(root_dir(State), element(1, State#state.init_args)).
51-define(set_name(State), element(2, State#state.init_args)).
52-define(type(State), (element(3, State#state.init_args))#set_view_group.type).
53-define(group_sig(State), (element(3, State#state.init_args))#set_view_group.sig).
54-define(group_id(State), (State#state.group)#set_view_group.name).
55-define(dcp_pid(State), (State#state.group)#set_view_group.dcp_pid).
56-define(category(State), (State#state.group)#set_view_group.category).
57-define(is_defined(State),
58    (((State#state.group)#set_view_group.index_header)#set_view_index_header.num_partitions > 0)).
59-define(replicas_on_transfer(State),
60        ((State#state.group)#set_view_group.index_header)#set_view_index_header.replicas_on_transfer).
61-define(have_pending_transition(State),
62        ((((State#state.group)#set_view_group.index_header)
63          #set_view_index_header.pending_transition) /= nil)).
64
65-define(MAX_HIST_SIZE, 10).
66% flow control buffer size 20 MB
67-define(DCP_CONTROL_BUFFER_SIZE, "20971520").
68
69% Seqs cache ttl in microseconds
70-define(SEQS_CACHE_TTL, 300000).
71
72-define(DCP_OPEN_NO_VALUE, 16#08).
73
74-record(util_stats, {
75    useful_indexing_time = 0.0  :: float(),
76    wasted_indexing_time = 0.0  :: float(),
77    updates = 0                 :: non_neg_integer(),
78    updater_interruptions = 0   :: non_neg_integer(),
79    compaction_time = 0.0       :: float(),
80    compactions = 0             :: non_neg_integer(),
81    compactor_interruptions = 0 :: non_neg_integer()
82}).
83
84-record(up_listener, {
85    pid,
86    monref,
87    partition,
88    seq
89}).
90
91-record(waiter, {
92    from,
93    debug = false :: boolean(),
94    % seqs for active partitions only
95    seqs = []     :: partition_seqs()
96}).
97
98-record(seqs_cache, {
99    timestamp = {0, 0, 0}              :: timer:time(),
100    is_waiting = false                 :: boolean(),
101    seqs = []                          :: partition_seqs()
102}).
103
104-record(state, {
105    init_args                          :: init_args(),
106    replica_group = nil                :: 'nil' | pid(),
107    group = #set_view_group{}          :: #set_view_group{},
108    updater_pid = nil                  :: 'nil' | pid(),
109    initial_build = false              :: boolean(),
110    updater_state = not_running        :: set_view_updater_state() | 'not_running' | 'starting',
111    compactor_pid = nil                :: 'nil' | pid(),
112    compactor_file = nil               :: 'nil' | pid(),
113    compactor_fun = nil                :: 'nil' | compact_fun(),
114    compactor_retry_number = 0         :: non_neg_integer(),
115    waiting_list = []                  :: [#waiter{}],
116    cleaner_pid = nil                  :: 'nil' | pid(),
117    shutdown = false                   :: boolean(),
118    shutdown_aliases                   :: [binary()],
119    auto_cleanup = true                :: boolean(),
120    auto_transfer_replicas = true      :: boolean(),
121    replica_partitions = []            :: ordsets:ordset(partition_id()),
122    pending_transition_waiters = []    :: [{From::{pid(), reference()}, #set_view_group_req{}}],
123    update_listeners = dict:new()      :: dict(),
124    compact_log_files = nil            :: 'nil' | {[[string()]], partition_seqs(), partition_versions()},
125    timeout = ?DEFAULT_TIMEOUT         :: non_neg_integer() | 'infinity'
126}).
127
128-define(inc_stat(Group, S),
129    ets:update_counter(
130        Group#set_view_group.stats_ets,
131        ?set_view_group_stats_key(Group),
132        {S, 1})).
133-define(inc_cleanup_stops(Group), ?inc_stat(Group, #set_view_group_stats.cleanup_stops)).
134-define(inc_updater_errors(Group), ?inc_stat(Group, #set_view_group_stats.update_errors)).
135-define(inc_accesses(Group), ?inc_stat(Group, #set_view_group_stats.accesses)).
136
137% Same as in couch_file. That's the offset where headers
138% are stored
139-define(SIZE_BLOCK, 4096).
140
141
142% api methods
143-spec request_group(pid(), #set_view_group_req{}) ->
144                   {'ok', #set_view_group{}} | {'error', term()}.
145request_group(Pid, Req) ->
146    #set_view_group_req{wanted_partitions = WantedPartitions} = Req,
147    Req2 = Req#set_view_group_req{
148        wanted_partitions = ordsets:from_list(WantedPartitions)
149    },
150    request_group(Pid, Req2, 1).
151
152-spec request_group(pid(), #set_view_group_req{}, non_neg_integer()) ->
153                   {'ok', #set_view_group{}} | {'error', term()}.
154request_group(Pid, Req, Retries) ->
155    case gen_server:call(Pid, Req, infinity) of
156    {ok, Group, ActiveReplicasBitmask} ->
157        #set_view_group{
158            ref_counter = RefCounter,
159            replica_pid = RepPid,
160            name = GroupName,
161            set_name = SetName,
162            category = Category
163        } = Group,
164        case request_replica_group(RepPid, ActiveReplicasBitmask, Req) of
165        {ok, RepGroup} ->
166            {ok, Group#set_view_group{replica_group = RepGroup}};
167        retry ->
168            couch_ref_counter:drop(RefCounter),
169            ?LOG_INFO("Retrying group `~s` (~s) request, stale=~s,"
170                      " set `~s`, retry attempt #~p",
171                      [GroupName, Category, Req#set_view_group_req.stale,
172                       SetName,Retries]),
173            request_group(Pid, Req, Retries + 1)
174        end;
175    Error ->
176        Error
177    end.
178
179
180-spec request_replica_group(pid(), bitmask(), #set_view_group_req{}) ->
181                           {'ok', #set_view_group{} | 'nil'} | 'retry'.
182request_replica_group(_RepPid, 0, _Req) ->
183    {ok, nil};
184request_replica_group(RepPid, ActiveReplicasBitmask, Req) ->
185    {ok, RepGroup, 0} = gen_server:call(RepPid, Req, infinity),
186    case ?set_abitmask(RepGroup) =:= ActiveReplicasBitmask of
187    true ->
188        {ok, RepGroup};
189    false ->
190        couch_ref_counter:drop(RepGroup#set_view_group.ref_counter),
191        retry
192    end.
193
194
195-spec release_group(#set_view_group{}) -> ok.
196release_group(#set_view_group{ref_counter = RefCounter, replica_group = RepGroup}) ->
197    couch_ref_counter:drop(RefCounter),
198    case RepGroup of
199    #set_view_group{ref_counter = RepRefCounter} ->
200        couch_ref_counter:drop(RepRefCounter);
201    nil ->
202        ok
203    end.
204
205
206-spec request_group_info(pid()) -> {'ok', [{term(), term()}]}.
207request_group_info(Pid) ->
208    case gen_server:call(Pid, request_group_info, infinity) of
209    {ok, GroupInfoList} ->
210        {ok, GroupInfoList};
211    Error ->
212        throw(Error)
213    end.
214
215
216-spec get_data_size(pid()) -> {'ok', [{term(), term()}]}.
217get_data_size(Pid) ->
218    case gen_server:call(Pid, get_data_size, infinity) of
219    {ok, _Info} = Ok ->
220        Ok;
221    Error ->
222        throw(Error)
223    end.
224
225
226-spec define_view(pid(), #set_view_params{}) -> 'ok' | {'error', term()}.
227define_view(Pid, Params) ->
228    #set_view_params{
229        max_partitions = NumPartitions,
230        active_partitions = ActivePartitionsList,
231        passive_partitions = PassivePartitionsList,
232        use_replica_index = UseReplicaIndex
233    } = Params,
234    ActiveList = lists:usort(ActivePartitionsList),
235    ActiveBitmask = couch_set_view_util:build_bitmask(ActiveList),
236    PassiveList = lists:usort(PassivePartitionsList),
237    PassiveBitmask = couch_set_view_util:build_bitmask(PassiveList),
238    case (ActiveBitmask band PassiveBitmask) /= 0 of
239    true ->
240        throw({bad_view_definition,
241            <<"Intersection between active and passive bitmasks">>});
242    false ->
243        ok
244    end,
245    gen_server:call(
246        Pid, {define_view, NumPartitions, ActiveList, ActiveBitmask,
247            PassiveList, PassiveBitmask, UseReplicaIndex}, infinity).
248
249
250-spec is_view_defined(pid()) -> boolean().
251is_view_defined(Pid) ->
252    gen_server:call(Pid, is_view_defined, infinity).
253
254
255-spec set_state(pid(),
256                ordsets:ordset(partition_id()),
257                ordsets:ordset(partition_id()),
258                ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
259set_state(_Pid, [], [], []) ->
260    ok;
261set_state(Pid, Active, Passive, Cleanup) ->
262    ordsets:is_set(Active) orelse throw({error, <<"Active list is not an ordset">>}),
263    ordsets:is_set(Passive) orelse throw({error, <<"Passive list is not an ordset">>}),
264    case ordsets:intersection(Active, Passive) of
265    [] ->
266        ordsets:is_set(Cleanup) orelse throw({error, <<"Cleanup list is not an ordset">>}),
267        case ordsets:intersection(Active, Cleanup) of
268        [] ->
269            case ordsets:intersection(Passive, Cleanup) of
270            [] ->
271                gen_server:call(
272                    Pid, {set_state, Active, Passive, Cleanup}, infinity);
273            _ ->
274                {error,
275                    <<"Intersection between passive and cleanup partition lists">>}
276            end;
277        _ ->
278            {error, <<"Intersection between active and cleanup partition lists">>}
279        end;
280    _ ->
281        {error, <<"Intersection between active and passive partition lists">>}
282    end.
283
284
285-spec add_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
286add_replica_partitions(_Pid, []) ->
287    ok;
288add_replica_partitions(Pid, Partitions) ->
289    BitMask = couch_set_view_util:build_bitmask(Partitions),
290    gen_server:call(Pid, {add_replicas, BitMask}, infinity).
291
292
293-spec remove_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
294remove_replica_partitions(_Pid, []) ->
295    ok;
296remove_replica_partitions(Pid, Partitions) ->
297    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
298    gen_server:call(Pid, {remove_replicas, Partitions}, infinity).
299
300
301-spec mark_as_unindexable(pid(), ordsets:ordset(partition_id())) ->
302                                 'ok' | {'error', term()}.
303mark_as_unindexable(Pid, Partitions) ->
304    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
305    gen_server:call(Pid, {mark_as_unindexable, Partitions}, infinity).
306
307
308-spec mark_as_indexable(pid(), ordsets:ordset(partition_id())) ->
309                               'ok' | {'error', term()}.
310mark_as_indexable(Pid, Partitions) ->
311    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
312    gen_server:call(Pid, {mark_as_indexable, Partitions}, infinity).
313
314
315-spec monitor_partition_update(pid(), partition_id(), reference(), pid()) ->
316                               'ok' | {'error', term()}.
317monitor_partition_update(Pid, PartitionId, Ref, CallerPid) ->
318    gen_server:call(
319        Pid, {monitor_partition_update, PartitionId, Ref, CallerPid}, infinity).
320
321
322-spec demonitor_partition_update(pid(), reference()) -> 'ok'.
323demonitor_partition_update(Pid, Ref) ->
324    ok = gen_server:call(Pid, {demonitor_partition_update, Ref}, infinity).
325
326
327-spec reset_utilization_stats(pid()) -> 'ok'.
328reset_utilization_stats(Pid) ->
329    ok = gen_server:call(Pid, reset_utilization_stats, infinity).
330
331
332-spec get_utilization_stats(pid()) -> {'ok', [{atom() | binary(), term()}]}.
333get_utilization_stats(Pid) ->
334    gen_server:call(Pid, get_utilization_stats, infinity).
335
336-spec inc_access_stat(pid()) -> 'ok'.
337inc_access_stat(Pid) ->
338    gen_server:call(Pid, increment_stat, infinity).
339
340start_link({RootDir, SetName, Group}) ->
341    Args = {RootDir, SetName, Group#set_view_group{type = main}},
342    proc_lib:start_link(?MODULE, init, [Args]).
343
344
345init({_, _, Group} = InitArgs) ->
346    process_flag(trap_exit, true),
347    {ok, State} = try
348        do_init(InitArgs)
349    catch
350    _:Error ->
351        % An error might occur before the file handler is associated with the
352        % group. There's a message once the file handler is created, so that
353        % we can close the file and make the write guard happy.
354        receive
355        {group_fd, Fd} ->
356                couch_file:close(Fd)
357        after 0 ->
358            ok
359        end,
360        ?LOG_ERROR("~s error opening set view group `~s` (~s), signature `~s',"
361                   " from set `~s`: ~p",
362                   [?MODULE, Group#set_view_group.name,
363                    Group#set_view_group.category, hex_sig(Group),
364                    Group#set_view_group.set_name, Error]),
365        exit(Error)
366    end,
367    proc_lib:init_ack({ok, self()}),
368    gen_server:enter_loop(?MODULE, [], State, 1).
369
370
371do_init({_, SetName, _} = InitArgs) ->
372    case prepare_group(InitArgs, false) of
373    {ok, Group} ->
374        #set_view_group{
375            fd = Fd,
376            index_header = Header,
377            type = Type,
378            category = Category,
379            mod = Mod
380        } = Group,
381        % Send the file handler to yourself, so that in case of an error
382        % we can clean up the write guard properly
383        self() ! {group_fd, Fd},
384        RefCounter = new_fd_ref_counter(Fd),
385        case Header#set_view_index_header.has_replica of
386        false ->
387            ReplicaPid = nil,
388            ReplicaParts = [];
389        true ->
390            ReplicaPid = open_replica_group(InitArgs),
391            maybe_fix_replica_group(ReplicaPid, Group),
392            ReplicaParts = get_replica_partitions(ReplicaPid)
393        end,
394        ViewCount = length(Group#set_view_group.views),
395        case Header#set_view_index_header.num_partitions > 0 of
396        false ->
397            ?LOG_INFO("Started undefined ~s (~s) set view group `~s`,"
398                      " group `~s`,  signature `~s', view count: ~p",
399                      [Type, Category, SetName,
400                       Group#set_view_group.name, hex_sig(Group), ViewCount]);
401        true ->
402            ?LOG_INFO("Started ~s (~s) set view group `~s`, group `~s`,"
403                      " signature `~s', view count ~p~n"
404                      "active partitions:      ~w~n"
405                      "passive partitions:     ~w~n"
406                      "cleanup partitions:     ~w~n"
407                      "unindexable partitions: ~w~n"
408                      "~sreplica support~n" ++
409                      case Header#set_view_index_header.has_replica of
410                      true ->
411                          "replica partitions: ~w~n"
412                          "replica partitions on transfer: ~w~n";
413                      false ->
414                          ""
415                      end,
416                      [Type, Category, SetName, Group#set_view_group.name,
417                       hex_sig(Group), ViewCount,
418                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.abitmask),
419                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.pbitmask),
420                       couch_set_view_util:decode_bitmask(Header#set_view_index_header.cbitmask),
421                       Header#set_view_index_header.unindexable_seqs,
422                       case Header#set_view_index_header.has_replica of
423                       true ->
424                           "";
425                       false ->
426                           "no "
427                       end] ++
428                       case Header#set_view_index_header.has_replica of
429                       true ->
430                           [ReplicaParts, ?set_replicas_on_transfer(Group)];
431                       false ->
432                           []
433                       end)
434        end,
435        couch_set_view_mapreduce:get_map_context(Group),
436        DcpFlags = case couch_set_view_mapreduce:is_doc_used(Group) of
437        {ok, doc_fields_unused} ->
438            ?LOG_INFO("~s set view group `~s`, set `~s` (~s), doc_fields_unused",
439              [Type, Group#set_view_group.name, SetName, Category]),
440            ?DCP_OPEN_NO_VALUE;
441        {ok, doc_fields_used} ->
442            ?LOG_INFO("~s set view group `~s`, set `~s` (~s), doc_fields_used",
443              [Type, Group#set_view_group.name, SetName, Category]),
444            0
445        end,
446        DcpName = <<(atom_to_binary(Mod, latin1))/binary, ": ",
447            SetName/binary, " ", (Group#set_view_group.name)/binary,
448            " (", (atom_to_binary(Category, latin1))/binary, "/",
449            (atom_to_binary(Type, latin1))/binary, ")">>,
450        {User, Passwd} = get_auth(),
451        DcpBufferSize = list_to_integer(couch_config:get("dcp",
452            "flow_control_buffer_size", ?DCP_CONTROL_BUFFER_SIZE)),
453        ?LOG_INFO("Flow control buffer size is ~p bytes", [DcpBufferSize]),
454
455        case couch_dcp_client:start(DcpName, SetName, User, Passwd,
456            DcpBufferSize, DcpFlags) of
457        {ok, DcpPid} ->
458            Group2 = maybe_upgrade_header(Group, DcpPid),
459            State = #state{
460                init_args = InitArgs,
461                replica_group = ReplicaPid,
462                replica_partitions = ReplicaParts,
463                group = Group2#set_view_group{
464                    ref_counter = RefCounter,
465                    replica_pid = ReplicaPid,
466                    dcp_pid = DcpPid
467                }
468            },
469            init_seqs_cache(),
470            true = ets:insert(
471                 Group#set_view_group.stats_ets,
472                 #set_view_group_stats{ets_key = ?set_view_group_stats_key(Group)}),
473            TmpDir = updater_tmp_dir(State),
474            ok = couch_set_view_util:delete_sort_files(TmpDir, all),
475            reset_util_stats(),
476            {ok, maybe_apply_pending_transition(State)};
477        Error ->
478            couch_file:close(Fd),
479            throw(Error)
480        end;
481    Error ->
482        throw(Error)
483    end.
484
485handle_call(get_sig, _From, #state{group = Group} = State) ->
486    {reply, {ok, Group#set_view_group.sig}, State, ?GET_TIMEOUT(State)};
487
488handle_call({set_auto_cleanup, Enabled}, _From, State) ->
489    % To be used only by unit tests.
490    {reply, ok, State#state{auto_cleanup = Enabled}, ?GET_TIMEOUT(State)};
491
492handle_call({set_timeout, T}, _From, State) ->
493    % To be used only by unit tests.
494    {reply, ok, State#state{timeout = T}, T};
495
496handle_call({set_auto_transfer_replicas, Enabled}, _From, State) ->
497    % To be used only by unit tests.
498    {reply, ok, State#state{auto_transfer_replicas = Enabled}, ?GET_TIMEOUT(State)};
499
500handle_call({define_view, NumPartitions, _, _, _, _, _}, _From, State)
501        when (not ?is_defined(State)), NumPartitions > ?MAX_NUM_PARTITIONS ->
502    {reply, {error, <<"Too high value for number of partitions">>}, State};
503
504handle_call({define_view, NumPartitions, ActiveList, ActiveBitmask,
505        PassiveList, PassiveBitmask, UseReplicaIndex}, _From, State) when not ?is_defined(State) ->
506    #state{init_args = InitArgs, group = Group} = State,
507    PartitionsList = lists:usort(ActiveList ++ PassiveList),
508    Seqs = lists:map(
509        fun(PartId) -> {PartId, 0} end, PartitionsList),
510    PartVersions = lists:map(
511        fun(PartId) -> {PartId, [{0, 0}]} end, PartitionsList),
512    #set_view_group{
513        name = DDocId,
514        index_header = Header
515    } = Group,
516    NewHeader = Header#set_view_index_header{
517        num_partitions = NumPartitions,
518        abitmask = ActiveBitmask,
519        pbitmask = PassiveBitmask,
520        seqs = Seqs,
521        has_replica = UseReplicaIndex,
522        partition_versions = PartVersions
523    },
524    case (?type(State) =:= main) andalso UseReplicaIndex of
525    false ->
526        ReplicaPid = nil;
527    true ->
528        ReplicaPid = open_replica_group(InitArgs),
529        ok = gen_server:call(ReplicaPid, {define_view, NumPartitions, [], 0, [], 0, false}, infinity)
530    end,
531    NewGroup = Group#set_view_group{
532        index_header = NewHeader,
533        replica_pid = ReplicaPid
534    },
535    State2 = State#state{
536        group = NewGroup,
537        replica_group = ReplicaPid
538    },
539    {ok, HeaderPos} = commit_header(NewGroup),
540    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
541              " configured with:~n"
542              "~p partitions~n"
543              "~sreplica support~n"
544              "initial active partitions ~w~n"
545              "initial passive partitions ~w",
546              [?set_name(State), ?type(State), ?category(State), DDocId,
547               hex_sig(Group), NumPartitions,
548               case UseReplicaIndex of
549               true ->  "";
550               false -> "no "
551               end,
552        ActiveList, PassiveList]),
553    NewGroup2 = (State2#state.group)#set_view_group{
554        header_pos = HeaderPos
555    },
556    State3 = State2#state{
557        group = NewGroup2
558    },
559    {reply, ok, State3, ?GET_TIMEOUT(State3)};
560
561handle_call({define_view, _, _, _, _, _, _}, _From, State) ->
562    {reply, view_already_defined, State, ?GET_TIMEOUT(State)};
563
564handle_call(is_view_defined, _From, State) ->
565    {reply, ?is_defined(State), State, ?GET_TIMEOUT(State)};
566
567handle_call(_Msg, _From, State) when not ?is_defined(State) ->
568    {reply, {error, view_undefined}, State};
569
570handle_call({set_state, ActiveList, PassiveList, CleanupList}, _From, State) ->
571    try
572        NewState = maybe_update_partition_states(
573            ActiveList, PassiveList, CleanupList, State),
574        {reply, ok, NewState, ?GET_TIMEOUT(NewState)}
575    catch
576    throw:Error ->
577        {reply, Error, State}
578    end;
579
580handle_call({add_replicas, BitMask}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
581    #state{
582        group = Group,
583        replica_partitions = ReplicaParts
584    } = State,
585    BitMask2 = case BitMask band ?set_abitmask(Group) of
586    0 ->
587        BitMask;
588    Common1 ->
589        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
590                  " set partitions ~w to replica state because they are"
591                  " currently marked as active",
592                  [?set_name(State), ?type(State), ?category(State),
593                   ?group_id(State),
594                   couch_set_view_util:decode_bitmask(Common1)]),
595        BitMask bxor Common1
596    end,
597    BitMask3 = case BitMask2 band ?set_pbitmask(Group) of
598    0 ->
599        BitMask2;
600    Common2 ->
601        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
602                  " set partitions  ~w to replica state because they are"
603                  " currently marked as passive",
604                  [?set_name(State), ?type(State), ?category(State),
605                   ?group_id(State),
606                   couch_set_view_util:decode_bitmask(Common2)]),
607        BitMask2 bxor Common2
608    end,
609    Parts = couch_set_view_util:decode_bitmask(BitMask3),
610    ok = set_state(ReplicaPid, [], Parts, []),
611    NewReplicaParts = ordsets:union(ReplicaParts, Parts),
612    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
613              " defined new replica partitions: ~w~n"
614              "New full set of replica partitions is: ~w~n",
615              [?set_name(State), ?type(State), ?category(State),
616               ?group_id(State), Parts, NewReplicaParts]),
617    State2 = State#state{
618        replica_partitions = NewReplicaParts
619    },
620    {reply, ok, State2, ?GET_TIMEOUT(State2)};
621
622handle_call({remove_replicas, Partitions}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
623    #state{
624        replica_partitions = ReplicaParts,
625        group = Group
626    } = State,
627    case ordsets:intersection(?set_replicas_on_transfer(Group), Partitions) of
628    [] ->
629        ok = set_state(ReplicaPid, [], [], Partitions),
630        NewState = State#state{
631            replica_partitions = ordsets:subtract(ReplicaParts, Partitions)
632        };
633    Common ->
634        UpdaterWasRunning = is_pid(State#state.updater_pid),
635        State2 = stop_cleaner(State),
636        #state{group = Group3} = State3 = stop_updater(State2),
637        {ok, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs, NewVersions} =
638            set_cleanup_partitions(
639                Common,
640                ?set_abitmask(Group3),
641                ?set_pbitmask(Group3),
642                ?set_cbitmask(Group3),
643                ?set_seqs(Group3),
644                ?set_partition_versions(Group)),
645        case NewCbitmask =/= ?set_cbitmask(Group3) of
646        true ->
647             State4 = stop_compactor(State3);
648        false ->
649             State4 = State3
650        end,
651        ReplicaPartitions2 = ordsets:subtract(ReplicaParts, Common),
652        ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group3), Common),
653        State5 = update_header(
654            State4,
655            NewAbitmask,
656            NewPbitmask,
657            NewCbitmask,
658            NewSeqs,
659            ?set_unindexable_seqs(State4#state.group),
660            ReplicasOnTransfer2,
661            ReplicaPartitions2,
662            ?set_pending_transition(State4#state.group),
663            NewVersions),
664        ok = set_state(ReplicaPid, [], [], Partitions),
665        case UpdaterWasRunning of
666        true ->
667            State6 = start_updater(State5);
668        false ->
669            State6 = State5
670        end,
671        NewState = maybe_start_cleaner(State6)
672    end,
673    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, marked the following"
674              " replica partitions for removal: ~w",
675              [?set_name(State), ?type(State), ?category(State),
676               ?group_id(State), Partitions]),
677    {reply, ok, NewState, ?GET_TIMEOUT(NewState)};
678
679handle_call({mark_as_unindexable, Partitions}, _From, State) ->
680    try
681        State2 = process_mark_as_unindexable(State, Partitions),
682        {reply, ok, State2, ?GET_TIMEOUT(State2)}
683    catch
684    throw:Error ->
685        {reply, Error, State, ?GET_TIMEOUT(State)}
686    end;
687
688handle_call({mark_as_indexable, Partitions}, _From, State) ->
689    try
690        State2 = process_mark_as_indexable(State, Partitions),
691        {reply, ok, State2, ?GET_TIMEOUT(State2)}
692    catch
693    throw:Error ->
694        {reply, Error, State, ?GET_TIMEOUT(State)}
695    end;
696
697handle_call(increment_stat, _From, State) ->
698    ?inc_accesses(State#state.group),
699    {reply, ok, State, ?GET_TIMEOUT(State)};
700
701handle_call(#set_view_group_req{} = Req, From, State) ->
702    #state{
703        group = Group,
704        pending_transition_waiters = Waiters
705    } = State,
706    State2 = case is_any_partition_pending(Req, Group) of
707    false ->
708        process_view_group_request(Req, From, State);
709    true ->
710        State#state{pending_transition_waiters = [{From, Req} | Waiters]}
711    end,
712    inc_view_group_access_stats(Req, State2#state.group),
713    {noreply, State2, ?GET_TIMEOUT(State2)};
714
715handle_call(request_group, _From, #state{group = Group} = State) ->
716    % Meant to be called only by this module and the compactor module.
717    % Callers aren't supposed to read from the group's fd, we don't
718    % increment here the ref counter on behalf of the caller.
719    {reply, {ok, Group}, State, ?GET_TIMEOUT(State)};
720
721handle_call(replica_pid, _From, #state{replica_group = Pid} = State) ->
722    % To be used only by unit tests.
723    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
724
725handle_call({start_updater, Options}, _From, State) ->
726    % To be used only by unit tests.
727    State2 = start_updater(State, Options),
728    {reply, {ok, State2#state.updater_pid}, State2, ?GET_TIMEOUT(State2)};
729
730handle_call(start_cleaner, _From, State) ->
731    % To be used only by unit tests.
732    State2 = maybe_start_cleaner(State#state{auto_cleanup = true}),
733    State3 = State2#state{auto_cleanup = State#state.auto_cleanup},
734    {reply, {ok, State2#state.cleaner_pid}, State3, ?GET_TIMEOUT(State3)};
735
736handle_call(updater_pid, _From, #state{updater_pid = Pid} = State) ->
737    % To be used only by unit tests.
738    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
739
740handle_call(cleaner_pid, _From, #state{cleaner_pid = Pid} = State) ->
741    % To be used only by unit tests.
742    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
743
744handle_call({test_rollback, RollbackSeqs}, _From, State0) ->
745    % To be used only by unit tests.
746    Response = case rollback(State0, RollbackSeqs) of
747    {ok, State} ->
748        ok;
749    {error, {cannot_rollback, State}} ->
750        cannot_rollback
751    end,
752    {reply, Response, State, ?GET_TIMEOUT(State)};
753
754handle_call(request_group_info, _From, State) ->
755    GroupInfo = get_group_info(State),
756    {reply, {ok, GroupInfo}, State, ?GET_TIMEOUT(State)};
757
758handle_call(get_data_size, _From, State) ->
759    DataSizeInfo = get_data_size_info(State),
760    {reply, {ok, DataSizeInfo}, State, ?GET_TIMEOUT(State)};
761
762handle_call({start_compact, _CompactFun}, _From,
763            #state{updater_pid = UpPid, initial_build = true} = State) when is_pid(UpPid) ->
764    {reply, {error, initial_build}, State, ?GET_TIMEOUT(State)};
765handle_call({start_compact, CompactFun}, _From, #state{compactor_pid = nil} = State) ->
766    #state{compactor_pid = Pid} = State2 = start_compactor(State, CompactFun),
767    {reply, {ok, Pid}, State2, ?GET_TIMEOUT(State2)};
768handle_call({start_compact, _}, _From, State) ->
769    %% compact already running, this is a no-op
770    {reply, {ok, State#state.compactor_pid}, State};
771
772handle_call({compact_done, Result}, {Pid, _}, #state{compactor_pid = Pid} = State) ->
773    #state{
774        update_listeners = Listeners,
775        group = Group,
776        updater_pid = UpdaterPid,
777        compactor_pid = CompactorPid
778    } = State,
779    #set_view_group{
780        fd = OldFd,
781        ref_counter = RefCounter,
782        filepath = OldFilepath
783    } = Group,
784    #set_view_compactor_result{
785        group = NewGroup0,
786        compact_time = Duration,
787        cleanup_kv_count = CleanupKVCount
788    } = Result,
789
790    MissingChangesCount = couch_set_view_util:missing_changes_count(
791        ?set_seqs(Group), ?set_seqs(NewGroup0)),
792    case MissingChangesCount == 0 of
793    true ->
794        % Compactor might have received a group snapshot from an updater.
795        NewGroup = fix_updater_group(NewGroup0, Group),
796        HeaderBin = couch_set_view_util:group_to_header_bin(NewGroup),
797        {ok, NewHeaderPos} = couch_file:write_header_bin(
798            NewGroup#set_view_group.fd, HeaderBin),
799        if is_pid(UpdaterPid) ->
800            ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compact group"
801                      " up to date - restarting updater",
802                      [?set_name(State), ?type(State), ?category(State),
803                       ?group_id(State)]),
804            % Decided to switch to compacted file
805            % Compactor has caught up and hence discard the running updater
806            couch_set_view_util:shutdown_wait(UpdaterPid);
807        true ->
808            ok
809        end,
810        NewFilepath = increment_filepath(Group),
811        NewRefCounter = new_fd_ref_counter(NewGroup#set_view_group.fd),
812        case ?set_replicas_on_transfer(Group) /= ?set_replicas_on_transfer(NewGroup) of
813        true ->
814            % Set of replicas on transfer changed while compaction was running.
815            % Just write a new header with the new set of replicas on transfer and all the
816            % metadata that is updated when that set changes (active and passive bitmasks).
817            % This happens only during (or after, for a short period) a cluster rebalance or
818            % failover. This shouldn't take too long, as we are writing and fsync'ing only
819            % one header, all data was already fsync'ed by the compactor process.
820            NewGroup2 = NewGroup#set_view_group{
821                ref_counter = NewRefCounter,
822                filepath = NewFilepath,
823                index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
824                    replicas_on_transfer = ?set_replicas_on_transfer(Group),
825                    abitmask = ?set_abitmask(Group),
826                    pbitmask = ?set_pbitmask(Group)
827                }
828            },
829            {ok, NewHeaderPos2} = commit_header(NewGroup2);
830        false ->
831            % The compactor process committed an header with up to date state information and
832            % did an fsync before calling us. No need to commit a new header here (and fsync).
833            NewGroup2 = NewGroup#set_view_group{
834                ref_counter = NewRefCounter,
835                filepath = NewFilepath
836	    },
837	    NewHeaderPos2 = NewHeaderPos
838        end,
839        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compaction complete"
840                  " in ~.3f seconds, filtered ~p key-value pairs",
841                  [?set_name(State), ?type(State), ?category(State),
842                   ?group_id(State), Duration, CleanupKVCount]),
843        inc_util_stat(#util_stats.compaction_time, Duration),
844        ok = couch_file:only_snapshot_reads(OldFd),
845        %% After rename call we're sure the header was written to the file
846        %% (no need for couch_file:flush/1 call).
847        ok = couch_file:rename(NewGroup#set_view_group.fd, NewFilepath),
848        ok = couch_file:delete(?root_dir(State), OldFilepath),
849
850        %% cleanup old group
851        unlink(CompactorPid),
852        couch_ref_counter:drop(RefCounter),
853
854        NewUpdaterPid =
855        if is_pid(UpdaterPid) ->
856            CurSeqs = indexable_partition_seqs(State),
857            spawn_link(couch_set_view_updater,
858                       update,
859                       [self(), NewGroup2, CurSeqs, false, updater_tmp_dir(State), []]);
860        true ->
861            nil
862        end,
863
864        Listeners2 = notify_update_listeners(State, Listeners, NewGroup2),
865        State2 = State#state{
866            update_listeners = Listeners2,
867            compactor_pid = nil,
868            compactor_file = nil,
869            compactor_fun = nil,
870            compact_log_files = nil,
871            compactor_retry_number = 0,
872            updater_pid = NewUpdaterPid,
873            initial_build = is_pid(NewUpdaterPid) andalso
874                    couch_set_view_util:is_initial_build(NewGroup2),
875            updater_state = case is_pid(NewUpdaterPid) of
876                true -> starting;
877                false -> not_running
878            end,
879            group = NewGroup2#set_view_group{
880                header_pos = NewHeaderPos2
881            }
882        },
883        inc_compactions(Result),
884        {reply, ok, maybe_apply_pending_transition(State2), ?GET_TIMEOUT(State2)};
885    false ->
886        State2 = State#state{
887            compactor_retry_number = State#state.compactor_retry_number + 1
888        },
889        {reply, {update, MissingChangesCount}, State2, ?GET_TIMEOUT(State2)}
890    end;
891handle_call({compact_done, _Result}, _From, State) ->
892    % From a previous compactor that was killed/stopped, ignore.
893    {noreply, State, ?GET_TIMEOUT(State)};
894
895handle_call(cancel_compact, _From, #state{compactor_pid = nil} = State) ->
896    {reply, ok, State, ?GET_TIMEOUT(State)};
897handle_call(cancel_compact, _From, #state{compactor_pid = Pid} = State) ->
898    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
899              " canceling compaction (pid ~p)",
900              [?set_name(State), ?type(State), ?category(State),
901               ?group_id(State), Pid]),
902    State2 = stop_compactor(State),
903    State3 = maybe_start_cleaner(State2),
904    {reply, ok, State3, ?GET_TIMEOUT(State3)};
905
906handle_call({monitor_partition_update, PartId, _Ref, _Pid}, _From, State)
907        when PartId >= ?set_num_partitions(State#state.group) ->
908    Msg = io_lib:format("Invalid partition: ~p", [PartId]),
909    {reply, {error, iolist_to_binary(Msg)}, State, ?GET_TIMEOUT(State)};
910
911handle_call({monitor_partition_update, PartId, Ref, Pid}, _From, State) ->
912    try
913        State2 = process_monitor_partition_update(State, PartId, Ref, Pid),
914        {reply, ok, State2, ?GET_TIMEOUT(State2)}
915    catch
916    throw:Error ->
917        {reply, Error, State, ?GET_TIMEOUT(State)}
918    end;
919
920handle_call({demonitor_partition_update, Ref}, _From, State) ->
921    #state{update_listeners = Listeners} = State,
922    case dict:find(Ref, Listeners) of
923    error ->
924        {reply, ok, State, ?GET_TIMEOUT(State)};
925    {ok, #up_listener{monref = MonRef, partition = PartId}} ->
926        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, removing partition ~p"
927                   "update monitor, reference ~p",
928                   [?set_name(State), ?type(State), ?category(State),
929                    ?group_id(State), PartId, Ref]),
930        erlang:demonitor(MonRef, [flush]),
931        State2 = State#state{update_listeners = dict:erase(Ref, Listeners)},
932        {reply, ok, State2, ?GET_TIMEOUT(State2)}
933    end;
934
935handle_call(compact_log_files, _From, State) ->
936    {Files0, Seqs, PartVersions} = State#state.compact_log_files,
937    Files = lists:map(fun lists:reverse/1, Files0),
938    NewState = State#state{compact_log_files = nil},
939    {reply, {ok, {Files, Seqs, PartVersions}}, NewState, ?GET_TIMEOUT(NewState)};
940
941handle_call(reset_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
942    reset_util_stats(),
943    case is_pid(RepPid) of
944    true ->
945        ok = gen_server:call(RepPid, reset_utilization_stats, infinity);
946    false ->
947        ok
948    end,
949    {reply, ok, State, ?GET_TIMEOUT(State)};
950
951handle_call(get_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
952    Stats = erlang:get(util_stats),
953    UsefulIndexing = Stats#util_stats.useful_indexing_time,
954    WastedIndexing = Stats#util_stats.wasted_indexing_time,
955    StatNames = record_info(fields, util_stats),
956    StatPoses = lists:seq(2, record_info(size, util_stats)),
957    StatsList0 = lists:foldr(
958        fun({StatName, StatPos}, Acc) ->
959            Val = element(StatPos, Stats),
960            [{StatName, Val} | Acc]
961        end,
962        [], lists:zip(StatNames, StatPoses)),
963    StatsList1 = [{total_indexing_time, UsefulIndexing + WastedIndexing} | StatsList0],
964    case is_pid(RepPid) of
965    true ->
966        {ok, RepStats} = gen_server:call(RepPid, get_utilization_stats, infinity),
967        StatsList = StatsList1 ++ [{replica_utilization_stats, {RepStats}}];
968    false ->
969        StatsList = StatsList1
970    end,
971    {reply, {ok, StatsList}, State, ?GET_TIMEOUT(State)};
972
973handle_call(before_master_delete, _From, State) ->
974    Error = {error, {db_deleted, ?master_dbname((?set_name(State)))}},
975    State2 = reply_all(State, Error),
976    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, going to shutdown because "
977              "master database is being deleted",
978              [?set_name(State), ?type(State), ?category(State),
979               ?group_id(State)]),
980    {stop, shutdown, ok, State2}.
981
982
983handle_cast(_Msg, State) when not ?is_defined(State) ->
984    {noreply, State};
985
986handle_cast({compact_log_files, Files, Seqs, PartVersions, _Init},
987                                #state{compact_log_files = nil} = State) ->
988    LogList = lists:map(fun(F) -> [F] end, Files),
989    {noreply, State#state{compact_log_files = {LogList, Seqs, PartVersions}},
990        ?GET_TIMEOUT(State)};
991
992handle_cast({compact_log_files, Files, NewSeqs, NewPartVersions, Init}, State) ->
993    LogList = case Init of
994    true ->
995        lists:map(fun(F) -> [F] end, Files);
996    false ->
997        {OldL, _OldSeqs, _OldPartVersions} = State#state.compact_log_files,
998        lists:zipwith(
999            fun(F, Current) -> [F | Current] end,
1000            Files, OldL)
1001    end,
1002    {noreply, State#state{compact_log_files = {LogList, NewSeqs, NewPartVersions}},
1003        ?GET_TIMEOUT(State)};
1004
1005handle_cast({ddoc_updated, NewSig, Aliases}, State) ->
1006    #state{
1007        waiting_list = Waiters,
1008        group = #set_view_group{sig = CurSig}
1009    } = State,
1010    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
1011              " design document was updated~n"
1012              "  new signature:   ~s~n"
1013              "  current aliases: ~p~n"
1014              "  shutdown flag:   ~s~n"
1015              "  waiting clients: ~p~n",
1016              [?set_name(State), ?type(State), ?category(State),
1017               ?group_id(State), hex_sig(CurSig), hex_sig(NewSig), Aliases,
1018               State#state.shutdown, length(Waiters)]),
1019    case NewSig of
1020    CurSig ->
1021        State2 = State#state{shutdown = false, shutdown_aliases = undefined},
1022        {noreply, State2, ?GET_TIMEOUT(State2)};
1023    <<>> ->
1024        remove_mapreduce_context_store(State#state.group),
1025        State2 = State#state{shutdown = true, shutdown_aliases = Aliases},
1026        Error = {error, <<"Design document was deleted">>},
1027        {stop, normal, reply_all(State2, Error)};
1028    _ ->
1029        remove_mapreduce_context_store(State#state.group),
1030        State2 = State#state{shutdown = true, shutdown_aliases = Aliases},
1031        case Waiters of
1032        [] ->
1033            {stop, normal, State2};
1034        _ ->
1035            {noreply, State2}
1036        end
1037    end;
1038
1039handle_cast({update, MinNumChanges}, #state{group = Group} = State) ->
1040    case is_pid(State#state.updater_pid) of
1041    true ->
1042        {noreply, State};
1043    false ->
1044        CurSeqs = indexable_partition_seqs(State),
1045        MissingCount = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
1046        case (MissingCount >= MinNumChanges) andalso (MissingCount > 0) of
1047        true ->
1048            {noreply, do_start_updater(State, CurSeqs, [])};
1049        false ->
1050            {noreply, State}
1051        end
1052    end;
1053
1054handle_cast({update_replica, _MinNumChanges}, #state{replica_group = nil} = State) ->
1055    {noreply, State};
1056
1057handle_cast({update_replica, MinNumChanges}, #state{replica_group = Pid} = State) ->
1058    ok = gen_server:cast(Pid, {update, MinNumChanges}),
1059    {noreply, State}.
1060
1061
1062handle_info(timeout, State) when not ?is_defined(State) ->
1063    {noreply, State};
1064
1065handle_info(timeout, #state{group = Group} = State) ->
1066    TransferReplicas = (?set_replicas_on_transfer(Group) /= []) andalso
1067        State#state.auto_transfer_replicas,
1068    case TransferReplicas orelse (dict:size(State#state.update_listeners) > 0) of
1069    true ->
1070        {noreply, start_updater(State)};
1071    false ->
1072        {noreply, maybe_start_cleaner(State)}
1073    end;
1074
1075handle_info({partial_update, Pid, AckTo, NewGroup}, #state{updater_pid = Pid} = State) ->
1076    case ?have_pending_transition(State) andalso
1077        (?set_cbitmask(NewGroup) =:= 0) andalso
1078        (?set_cbitmask(State#state.group) =/= 0) andalso
1079        (State#state.waiting_list =:= []) of
1080    true ->
1081        State2 = process_partial_update(State, NewGroup),
1082        AckTo ! update_processed,
1083        State3 = stop_updater(State2),
1084        NewState = maybe_apply_pending_transition(State3);
1085    false ->
1086        NewState = process_partial_update(State, NewGroup),
1087        AckTo ! update_processed
1088    end,
1089    {noreply, NewState};
1090handle_info({partial_update, _, AckTo, _}, State) ->
1091    %% message from an old (probably pre-compaction) updater; ignore
1092    AckTo ! update_processed,
1093    {noreply, State, ?GET_TIMEOUT(State)};
1094
1095handle_info({updater_info, Pid, {state, UpdaterState}}, #state{updater_pid = Pid} = State) ->
1096    #state{
1097        group = Group,
1098        waiting_list = WaitList,
1099        replica_partitions = RepParts
1100    } = State,
1101    State2 = State#state{updater_state = UpdaterState},
1102    case UpdaterState of
1103    updating_passive ->
1104        WaitList2 = reply_with_group(Group, RepParts, WaitList),
1105        State3 = State2#state{waiting_list = WaitList2},
1106        case State#state.shutdown of
1107        true ->
1108            State4 = stop_updater(State3),
1109            {stop, normal, State4};
1110        false ->
1111            State4 = maybe_apply_pending_transition(State3),
1112            {noreply, State4}
1113        end;
1114    _ ->
1115        {noreply, State2}
1116    end;
1117
1118handle_info({updater_info, _Pid, {state, _UpdaterState}}, State) ->
1119    % Message from an old updater, ignore.
1120    {noreply, State, ?GET_TIMEOUT(State)};
1121
1122handle_info({'EXIT', Pid, {clean_group, CleanGroup, Count, Time}}, #state{cleaner_pid = Pid} = State) ->
1123    #state{group = OldGroup} = State,
1124    case CleanGroup#set_view_group.mod of
1125    % The mapreduce view cleaner is a native C based one that writes the
1126    % information to disk.
1127    mapreduce_view ->
1128        {ok, NewGroup0} =
1129             couch_set_view_util:refresh_viewgroup_header(CleanGroup);
1130    spatial_view ->
1131        NewGroup0 = CleanGroup
1132    end,
1133    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
1134    ?LOG_INFO("Cleanup finished for set view `~s`, ~s (~s) group `~s`~n"
1135              "Removed ~p values from the index in ~.3f seconds~n"
1136              "active partitions before:  ~w~n"
1137              "active partitions after:   ~w~n"
1138              "passive partitions before: ~w~n"
1139              "passive partitions after:  ~w~n"
1140              "cleanup partitions before: ~w~n"
1141              "cleanup partitions after:  ~w~n" ++
1142          case is_pid(State#state.replica_group) of
1143          true ->
1144              "Current set of replica partitions: ~w~n"
1145              "Current set of replicas on transfer: ~w~n";
1146          false ->
1147              []
1148          end,
1149          [?set_name(State), ?type(State), ?category(State), ?group_id(State),
1150           Count, Time,
1151           couch_set_view_util:decode_bitmask(?set_abitmask(OldGroup)),
1152           couch_set_view_util:decode_bitmask(?set_abitmask(NewGroup)),
1153           couch_set_view_util:decode_bitmask(?set_pbitmask(OldGroup)),
1154           couch_set_view_util:decode_bitmask(?set_pbitmask(NewGroup)),
1155           couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup)),
1156           couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup))] ++
1157              case is_pid(State#state.replica_group) of
1158              true ->
1159                  [State#state.replica_partitions, ?set_replicas_on_transfer(NewGroup)];
1160              false ->
1161                  []
1162              end),
1163    State2 = State#state{
1164        cleaner_pid = nil,
1165        group = NewGroup
1166    },
1167    inc_cleanups(State2#state.group, Time, Count, false),
1168    {noreply, maybe_apply_pending_transition(State2)};
1169
1170handle_info({'EXIT', Pid, Reason}, #state{cleaner_pid = Pid, group = Group} = State) ->
1171    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1172    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`, cleanup process ~p"
1173               " died with unexpected reason: ~p",
1174               [?set_name(State), ?type(State), ?category(State),
1175                ?group_id(State), Pid, Reason]),
1176    {noreply, State#state{cleaner_pid = nil}, ?GET_TIMEOUT(State)};
1177
1178handle_info({'EXIT', Pid, {updater_finished, Result}}, #state{updater_pid = Pid} = State) ->
1179    #set_view_updater_result{
1180        stats = Stats,
1181        group = #set_view_group{filepath = Path} = NewGroup0,
1182        tmp_file = BuildFile
1183    } = Result,
1184    #set_view_updater_stats{
1185        indexing_time = IndexingTime,
1186        blocked_time = BlockedTime,
1187        inserted_ids = InsertedIds,
1188        deleted_ids = DeletedIds,
1189        inserted_kvs = InsertedKVs,
1190        deleted_kvs = DeletedKVs,
1191        cleanup_kv_count = CleanupKVCount,
1192        seqs = SeqsDone
1193    } = Stats,
1194    case State#state.initial_build of
1195    true ->
1196        NewRefCounter = new_fd_ref_counter(BuildFile),
1197        ok = couch_file:only_snapshot_reads(NewGroup0#set_view_group.fd),
1198        ok = couch_file:delete(?root_dir(State), Path),
1199        ok = couch_file:rename(BuildFile, Path),
1200        couch_ref_counter:drop(NewGroup0#set_view_group.ref_counter),
1201        NewGroup = NewGroup0#set_view_group{
1202            ref_counter = NewRefCounter,
1203            fd = BuildFile
1204        };
1205    false ->
1206        ok = couch_file:refresh_eof(NewGroup0#set_view_group.fd),
1207        NewGroup = NewGroup0
1208    end,
1209    State2 = process_partial_update(State, NewGroup),
1210    #state{
1211        waiting_list = WaitList,
1212        replica_partitions = ReplicaParts,
1213        shutdown = Shutdown,
1214        group = NewGroup2,
1215        update_listeners = UpdateListeners2,
1216        initial_build = InitialBuild
1217    } = State2,
1218    inc_updates(NewGroup2, Result, false, false),
1219    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, updater finished~n"
1220              "Indexing time: ~.3f seconds~n"
1221              "Blocked time:  ~.3f seconds~n"
1222              "Inserted IDs:  ~p~n"
1223              "Deleted IDs:   ~p~n"
1224              "Inserted KVs:  ~p~n"
1225              "Deleted KVs:   ~p~n"
1226              "Cleaned KVs:   ~p~n"
1227              "# seqs done:   ~p~n",
1228              [?set_name(State), ?type(State), ?category(State),
1229               ?group_id(State), IndexingTime, BlockedTime, InsertedIds,
1230               DeletedIds, InsertedKVs, DeletedKVs, CleanupKVCount, SeqsDone]),
1231    UpdaterRestarted = case InitialBuild of
1232    true ->
1233        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1234                  " initial index build of on-disk items is done,"
1235                  " restart updater to index in-memory items in case"
1236                  " there are any",
1237                  [?set_name(State2), ?type(State2), ?category(State2),
1238                   ?group_id(State2)]),
1239        StoppedUpdaterState = State2#state{
1240            updater_pid = nil,
1241            initial_build = false,
1242            updater_state = not_running
1243        },
1244        State3 = start_updater(StoppedUpdaterState),
1245        WaitList2 = State3#state.waiting_list,
1246        is_pid(State3#state.updater_pid);
1247    false ->
1248        WaitList2 = reply_with_group(NewGroup2, ReplicaParts, WaitList),
1249        State3 = State2,
1250        false
1251    end,
1252    case UpdaterRestarted of
1253    true ->
1254        {noreply, State3, ?GET_TIMEOUT(State3)};
1255    false ->
1256        case Shutdown andalso (WaitList2 == []) of
1257        true ->
1258            {stop, normal, State3#state{waiting_list = []}};
1259        false ->
1260            State4 = State3#state{
1261                updater_pid = nil,
1262                initial_build = false,
1263                updater_state = not_running,
1264                waiting_list = WaitList2
1265            },
1266            State5 = maybe_apply_pending_transition(State4),
1267            State6 = case (WaitList2 /= []) orelse (dict:size(UpdateListeners2) > 0) of
1268            true ->
1269                start_updater(State5);
1270            false ->
1271                State5
1272            end,
1273            State7 = maybe_start_cleaner(State6),
1274            {noreply, State7, ?GET_TIMEOUT(State7)}
1275        end
1276    end;
1277
1278handle_info({'EXIT', Pid, {updater_error, purge}}, #state{updater_pid = Pid} = State) ->
1279    State2 = reset_group_from_state(State),
1280    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because updater"
1281              " detected missed document deletes (purge)",
1282              [?set_name(State), ?type(State),
1283               ?category(State), ?group_id(State)]),
1284    State3 = start_updater(State2),
1285    {noreply, State3, ?GET_TIMEOUT(State3)};
1286
1287handle_info({'EXIT', Pid, {updater_error, {rollback, RollbackSeqs}}},
1288        #state{updater_pid = Pid} = State0) ->
1289    Rollback = rollback(State0, RollbackSeqs),
1290    State2 = case Rollback of
1291    {ok, State} ->
1292        ?LOG_INFO(
1293            "Set view `~s`, ~s (~s) group `~s`, group update because group"
1294            " needed to be rolled back",
1295            [?set_name(State), ?type(State),
1296             ?category(State), ?group_id(State)]),
1297        State#state{
1298            updater_pid = nil,
1299            initial_build = false,
1300            updater_state = not_running
1301        };
1302    {error, {cannot_rollback, State}} ->
1303        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because "
1304            "a rollback wasn't possible",
1305            [?set_name(State), ?type(State),
1306             ?category(State), ?group_id(State)]),
1307        reset_group_from_state(State)
1308    end,
1309    State3 = start_updater(State2),
1310    {noreply, State3, ?GET_TIMEOUT(State3)};
1311
1312handle_info({'EXIT', Pid, {updater_error, Error}}, #state{updater_pid = Pid, group = Group} = State) ->
1313    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1314    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1315               " received error from updater: ~p",
1316               [?set_name(State), ?type(State), ?category(State),
1317                ?group_id(State), Error]),
1318    Listeners2 = error_notify_update_listeners(
1319        State, State#state.update_listeners, {updater_error, Error}),
1320    State2 = State#state{
1321        updater_pid = nil,
1322        initial_build = false,
1323        updater_state = not_running,
1324        update_listeners = Listeners2
1325    },
1326    ?inc_updater_errors(State2#state.group),
1327    case State#state.shutdown of
1328    true ->
1329        {stop, normal, reply_all(State2, {error, Error})};
1330    false ->
1331        Error2 = case Error of
1332        {_, 86, Msg} ->
1333            {error, <<"Reducer: ", Msg/binary>>};
1334        {_, 87, _} ->
1335            {error, <<"reduction too large">>};
1336        {_, _Reason} ->
1337            Error;
1338        _ ->
1339            {error, Error}
1340        end,
1341        State3 = reply_all(State2, Error2),
1342        {noreply, maybe_start_cleaner(State3), ?GET_TIMEOUT(State3)}
1343    end;
1344
1345handle_info({'EXIT', _Pid, {updater_error, _Error}}, State) ->
1346    % from old, shutdown updater, ignore
1347    {noreply, State, ?GET_TIMEOUT(State)};
1348
1349handle_info({'EXIT', UpPid, reset}, #state{updater_pid = UpPid} = State) ->
1350    % TODO: once purge support is properly added, this needs to take into
1351    % account the replica index.
1352    State2 = stop_cleaner(State),
1353    case prepare_group(State#state.init_args, true) of
1354    {ok, ResetGroup} ->
1355        {ok, start_updater(State2#state{group = ResetGroup})};
1356    Error ->
1357        {stop, normal, reply_all(State2, Error)}
1358    end;
1359
1360handle_info({'EXIT', Pid, normal}, State) ->
1361    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1362              " linked PID ~p stopped normally",
1363              [?set_name(State), ?type(State), ?category(State),
1364               ?group_id(State), Pid]),
1365    {noreply, State, ?GET_TIMEOUT(State)};
1366
1367handle_info({'EXIT', Pid, Reason}, #state{compactor_pid = Pid} = State) ->
1368    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1369               " compactor process ~p died with unexpected reason: ~p",
1370               [?set_name(State), ?type(State), ?category(State),
1371                ?group_id(State), Pid, Reason]),
1372    couch_util:shutdown_sync(State#state.compactor_file),
1373    _ = couch_file:delete(?root_dir(State), compact_file_name(State)),
1374    State2 = State#state{
1375        compactor_pid = nil,
1376        compactor_file = nil,
1377        compact_log_files = nil
1378    },
1379    {noreply, State2, ?GET_TIMEOUT(State2)};
1380
1381handle_info({'EXIT', Pid, Reason},
1382        #state{group = #set_view_group{dcp_pid = Pid}} = State) ->
1383    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1384               " DCP process ~p died with unexpected reason: ~p",
1385               [?set_name(State), ?type(State), ?category(State),
1386                ?group_id(State), Pid, Reason]),
1387    {stop, {dcp_died, Reason}, State};
1388
1389handle_info({'EXIT', Pid, Reason}, State) ->
1390    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1391               " terminating because linked PID ~p died with reason: ~p",
1392               [?set_name(State), ?type(State), ?category(State),
1393                ?group_id(State), Pid, Reason]),
1394    {stop, Reason, State};
1395
1396handle_info({all_seqs, nil, StatsResponse}, State) ->
1397    #state{
1398       group = Group
1399    } = State,
1400    SeqsCache = erlang:get(seqs_cache),
1401    NewState = case StatsResponse of
1402    {ok, Seqs} ->
1403        Partitions = group_partitions(Group),
1404        Seqs2 = couch_set_view_util:filter_seqs(Partitions, Seqs),
1405        NewCacheVal = #seqs_cache{
1406            timestamp = os:timestamp(),
1407            is_waiting = false,
1408            seqs = Seqs2
1409        },
1410        State2 = case is_pid(State#state.updater_pid) of
1411        true ->
1412            State;
1413        false ->
1414            CurSeqs = indexable_partition_seqs(State, Seqs2),
1415            case CurSeqs > ?set_seqs(State#state.group) of
1416            true ->
1417                do_start_updater(State, CurSeqs, []);
1418            false ->
1419                State
1420            end
1421        end,
1422        % If a rollback happened in between async seqs update request and
1423        % its response, is_waiting flag will be resetted. In that case, we should
1424        % just discard the update seqs.
1425        case SeqsCache#seqs_cache.is_waiting of
1426        true ->
1427            erlang:put(seqs_cache, NewCacheVal);
1428        false ->
1429            ok
1430        end,
1431        State2;
1432    _ ->
1433        ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1434                              " received bad response for update seqs"
1435                              " request ~p",
1436                              [?set_name(State), ?type(State),
1437                               ?category(State), ?group_id(State),
1438                               StatsResponse]),
1439        SeqsCache2 = SeqsCache#seqs_cache{is_waiting = false},
1440        erlang:put(seqs_cache, SeqsCache2),
1441        State
1442    end,
1443    {noreply, NewState};
1444
1445handle_info({group_fd, _Fd}, State) ->
1446    % If no error occured, we don't care about the fd message
1447    {noreply, State, ?GET_TIMEOUT(State)}.
1448
1449
1450terminate(Reason, #state{group = #set_view_group{sig = Sig} = Group} = State) ->
1451    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s`,"
1452              " terminating with reason: ~p",
1453              [?set_name(State), ?type(State), ?category(State),
1454               ?group_id(State), hex_sig(Sig), Reason]),
1455    Listeners2 = error_notify_update_listeners(
1456        State, State#state.update_listeners, {shutdown, Reason}),
1457    State2 = reply_all(State#state{update_listeners = Listeners2}, Reason),
1458    State3 = notify_pending_transition_waiters(State2, {shutdown, Reason}),
1459    couch_set_view_util:shutdown_wait(State3#state.cleaner_pid),
1460    couch_set_view_util:shutdown_wait(State3#state.updater_pid),
1461    couch_util:shutdown_sync(State3#state.compactor_pid),
1462    couch_util:shutdown_sync(State3#state.compactor_file),
1463    couch_util:shutdown_sync(State3#state.replica_group),
1464    Group = State#state.group,
1465    true = ets:delete(Group#set_view_group.stats_ets,
1466        ?set_view_group_stats_key(Group)),
1467    catch couch_file:only_snapshot_reads((State3#state.group)#set_view_group.fd),
1468    case State#state.shutdown of
1469    true when ?type(State) == main ->
1470        % Important to delete files here. A quick succession of ddoc updates, updating
1471        % the ddoc back to a previous version (while we're here in terminate), may lead
1472        % us to a case where it should start clean but it doesn't, because
1473        % couch_set_view:cleanup_index_files/1 was not invoked right before the last
1474        % reverting ddoc update, or it was invoked but did not finish before the view
1475        % group got spawned again. Problem here is during rebalance, where one of the
1476        % databases might have been deleted after the last ddoc update and while or after
1477        % we're here in terminate, so we must ensure we don't leave old index files around.
1478        % MB-6415 and MB-6517
1479        case State#state.shutdown_aliases of
1480        [] ->
1481            delete_index_file(?root_dir(State), Group, main),
1482            delete_index_file(?root_dir(State), Group, replica);
1483        _ ->
1484            ok
1485        end;
1486    _ ->
1487        ok
1488    end,
1489    TmpDir = updater_tmp_dir(State),
1490    ok = couch_set_view_util:delete_sort_files(TmpDir, all),
1491    _ = file:del_dir(TmpDir),
1492    ok.
1493
1494
1495code_change(_OldVsn, State, _Extra) ->
1496    {ok, State}.
1497
1498
1499-spec reply_with_group(#set_view_group{},
1500                       ordsets:ordset(partition_id()),
1501                       [#waiter{}]) -> [#waiter{}].
1502reply_with_group(_Group, _ReplicaPartitions, []) ->
1503    [];
1504reply_with_group(Group, ReplicaPartitions, WaitList) ->
1505    ActiveReplicasBitmask = couch_set_view_util:build_bitmask(
1506        ?set_replicas_on_transfer(Group)),
1507    ActiveIndexable = [{P, S} || {P, S} <- ?set_seqs(Group),
1508                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1509    ActiveUnindexable = [{P, S} || {P, S} <- ?set_unindexable_seqs(Group),
1510                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1511    GroupSeqs = ordsets:union(ActiveIndexable, ActiveUnindexable),
1512    WaitList2 = lists:foldr(
1513        fun(#waiter{debug = false} = Waiter, Acc) ->
1514            case maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) of
1515            true ->
1516                Acc;
1517            false ->
1518                [Waiter | Acc]
1519            end;
1520        (#waiter{debug = true} = Waiter, Acc) ->
1521            [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1522                ?set_view_group_stats_key(Group)),
1523            DebugGroup = Group#set_view_group{
1524                debug_info = #set_view_debug_info{
1525                    stats = Stats,
1526                    original_abitmask = ?set_abitmask(Group),
1527                    original_pbitmask = ?set_pbitmask(Group),
1528                    replica_partitions = ReplicaPartitions,
1529                    wanted_seqs = Waiter#waiter.seqs
1530                }
1531            },
1532            case maybe_reply_with_group(Waiter, DebugGroup, GroupSeqs, ActiveReplicasBitmask) of
1533            true ->
1534                Acc;
1535            false ->
1536                [Waiter | Acc]
1537            end
1538        end,
1539        [], WaitList),
1540    WaitList2.
1541
1542
1543-spec maybe_reply_with_group(#waiter{}, #set_view_group{}, partition_seqs(), bitmask()) -> boolean().
1544maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) ->
1545    #waiter{from = {Pid, _} = From, seqs = ClientSeqs} = Waiter,
1546    case (ClientSeqs == []) orelse (GroupSeqs >= ClientSeqs) of
1547    true ->
1548        couch_ref_counter:add(Group#set_view_group.ref_counter, Pid),
1549        gen_server:reply(From, {ok, Group, ActiveReplicasBitmask}),
1550        true;
1551    false ->
1552        false
1553    end.
1554
1555
1556-spec reply_all(#state{}, term()) -> #state{}.
1557reply_all(#state{waiting_list = []} = State, _Reply) ->
1558    State;
1559reply_all(#state{waiting_list = WaitList} = State, Reply) ->
1560    lists:foreach(fun(#waiter{from = From}) ->
1561        catch gen_server:reply(From, Reply)
1562    end, WaitList),
1563    State#state{waiting_list = []}.
1564
1565
1566-spec prepare_group(init_args(), boolean()) -> {'ok', #set_view_group{}} |
1567                                               {'error', atom()}.
1568prepare_group({RootDir, SetName, Group0}, ForceReset)->
1569    #set_view_group{
1570        sig = Sig,
1571        type = Type
1572    } = Group0,
1573    Filepath = find_index_file(RootDir, Group0),
1574    Group = Group0#set_view_group{filepath = Filepath},
1575    case open_index_file(Filepath) of
1576    {ok, Fd} ->
1577        try
1578            if ForceReset ->
1579                % this can happen if we missed a purge
1580                {ok, reset_file(Fd, Group)};
1581            true ->
1582                case (catch couch_file:read_header_bin(Fd)) of
1583                {ok, HeaderBin, HeaderPos} ->
1584                    HeaderSig = couch_set_view_util:header_bin_sig(HeaderBin);
1585                _ ->
1586                    HeaderPos = 0,  % keep dialyzer happy
1587                    HeaderSig = <<>>,
1588                    HeaderBin = <<>>
1589                end,
1590                case HeaderSig == Sig of
1591                true ->
1592                    HeaderInfo = couch_set_view_util:header_bin_to_term(HeaderBin),
1593                    {ok, init_group(Fd, Group, HeaderInfo, HeaderPos)};
1594                _ ->
1595                    % this happens on a new file
1596                    case (not ForceReset) andalso (Type =:= main) of
1597                    true ->
1598                        % initializing main view group
1599                        ok = delete_index_file(RootDir, Group, replica);
1600                    false ->
1601                        ok
1602                    end,
1603                    {ok, reset_file(Fd, Group)}
1604                end
1605            end
1606        catch
1607        _:Error ->
1608            % In case of any error, we need to close the file as it isn't yet
1609            % associated with the group, hence can't be cleaned up during
1610            % group cleanup.
1611            couch_file:close(Fd),
1612            Error
1613        end;
1614    {error, emfile} = Error ->
1615        ?LOG_ERROR("Can't open set view `~s`, ~s (~s) group `~s`:"
1616                   " too many files open",
1617                   [SetName, Type, Group#set_view_group.category,
1618                    Group#set_view_group.name]),
1619        Error;
1620    Error ->
1621        ok = delete_index_file(RootDir, Group, Type),
1622        case (not ForceReset) andalso (Type =:= main) of
1623        true ->
1624            % initializing main view group
1625            ok = delete_index_file(RootDir, Group, replica);
1626        false ->
1627            ok
1628        end,
1629        Error
1630    end.
1631
1632
1633-spec hex_sig(#set_view_group{} | binary()) -> string().
1634hex_sig(#set_view_group{sig = Sig}) ->
1635    hex_sig(Sig);
1636hex_sig(GroupSig) ->
1637    couch_util:to_hex(GroupSig).
1638
1639
1640-spec base_index_file_name(#set_view_group{}, set_view_group_type()) -> string().
1641base_index_file_name(Group, Type) ->
1642    atom_to_list(Type) ++ "_" ++ hex_sig(Group#set_view_group.sig) ++
1643        Group#set_view_group.extension.
1644
1645
1646-spec find_index_file(string(), #set_view_group{}) -> string().
1647find_index_file(RootDir, Group) ->
1648    #set_view_group{
1649        set_name = SetName,
1650        type = Type,
1651        category = Category
1652    } = Group,
1653    DesignRoot = couch_set_view:set_index_dir(RootDir, SetName, Category),
1654    BaseName = base_index_file_name(Group, Type),
1655    FullPath = filename:join([DesignRoot, BaseName]),
1656    case filelib:wildcard(FullPath ++ ".[0-9]*") of
1657    [] ->
1658        FullPath ++ ".1";
1659    Matching ->
1660        BaseNameSplitted = string:tokens(BaseName, "."),
1661        Matching2 = lists:filter(
1662            fun(Match) ->
1663                MatchBase = filename:basename(Match),
1664                [Suffix | Rest] = lists:reverse(string:tokens(MatchBase, ".")),
1665                (lists:reverse(Rest) =:= BaseNameSplitted) andalso
1666                    is_integer((catch list_to_integer(Suffix)))
1667            end,
1668            Matching),
1669        case Matching2 of
1670        [] ->
1671            FullPath ++ ".1";
1672        _ ->
1673            GetSuffix = fun(FileName) ->
1674                list_to_integer(lists:last(string:tokens(FileName, ".")))
1675            end,
1676            Matching3 = lists:sort(
1677                fun(A, B) -> GetSuffix(A) > GetSuffix(B) end,
1678                Matching2),
1679            hd(Matching3)
1680        end
1681    end.
1682
1683
1684-spec delete_index_file(string(), #set_view_group{}, set_view_group_type()) -> no_return().
1685delete_index_file(RootDir, Group, Type) ->
1686    #set_view_group{
1687        set_name = SetName,
1688        category = Category
1689    } = Group,
1690    SetDir = couch_set_view:set_index_dir(RootDir, SetName, Category),
1691    BaseName = filename:join([SetDir, base_index_file_name(Group, Type)]),
1692    lists:foreach(
1693        fun(F) -> ok = couch_file:delete(RootDir, F) end,
1694        filelib:wildcard(BaseName ++ ".[0-9]*")).
1695
1696
1697-spec compact_file_name(#state{} | #set_view_group{}) -> string().
1698compact_file_name(#state{group = Group}) ->
1699    compact_file_name(Group);
1700compact_file_name(#set_view_group{filepath = CurFilepath}) ->
1701    CurFilepath ++ ".compact".
1702
1703
1704-spec increment_filepath(#set_view_group{}) -> string().
1705increment_filepath(#set_view_group{filepath = CurFilepath}) ->
1706    [Suffix | Rest] = lists:reverse(string:tokens(CurFilepath, ".")),
1707    NewSuffix = integer_to_list(list_to_integer(Suffix) + 1),
1708    string:join(lists:reverse(Rest), ".") ++ "." ++ NewSuffix.
1709
1710
1711-spec open_index_file(string()) -> {'ok', pid()} | {'error', atom()}.
1712open_index_file(Filepath) ->
1713    case do_open_index_file(Filepath) of
1714    {ok, Fd} ->
1715        unlink(Fd),
1716        {ok, Fd};
1717    Error ->
1718        Error
1719    end.
1720
1721do_open_index_file(Filepath) ->
1722    case couch_file:open(Filepath) of
1723    {ok, Fd}        -> {ok, Fd};
1724    {error, enoent} -> couch_file:open(Filepath, [create]);
1725    Error           -> Error
1726    end.
1727
1728
1729open_set_group(Mod, SetName, GroupId) ->
1730    case couch_set_view_ddoc_cache:get_ddoc(SetName, GroupId) of
1731    {ok, DDoc} ->
1732        {ok, Mod:design_doc_to_set_view_group(SetName, DDoc)};
1733    {doc_open_error, Error} ->
1734        Error;
1735    {db_open_error, Error} ->
1736        Error
1737    end.
1738
1739
1740% To be used for debug/troubleshooting only (accessible via REST/HTTP API)
1741get_group_info(State) ->
1742    #state{
1743        group = Group,
1744        replica_group = ReplicaPid,
1745        updater_pid = UpdaterPid,
1746        updater_state = UpdaterState,
1747        compactor_pid = CompactorPid,
1748        waiting_list = WaitersList,
1749        cleaner_pid = CleanerPid,
1750        replica_partitions = ReplicaParts
1751    } = State,
1752    #set_view_group{
1753        fd = Fd,
1754        sig = GroupSig,
1755        id_btree = Btree,
1756        views = Views,
1757        mod = Mod
1758    } = Group,
1759    PendingTrans = get_pending_transition(State),
1760    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1761        ?set_view_group_stats_key(Group)),
1762    JsonStats = {[
1763        {full_updates, Stats#set_view_group_stats.full_updates},
1764        {partial_updates, Stats#set_view_group_stats.partial_updates},
1765        {stopped_updates, Stats#set_view_group_stats.stopped_updates},
1766        {updater_cleanups, Stats#set_view_group_stats.updater_cleanups},
1767        {compactions, Stats#set_view_group_stats.compactions},
1768        {cleanups, Stats#set_view_group_stats.cleanups},
1769        {waiting_clients, length(WaitersList)},
1770        {cleanup_interruptions, Stats#set_view_group_stats.cleanup_stops},
1771        {update_history, Stats#set_view_group_stats.update_history},
1772        {compaction_history, Stats#set_view_group_stats.compaction_history},
1773        {cleanup_history, Stats#set_view_group_stats.cleanup_history},
1774        {accesses, Stats#set_view_group_stats.accesses},
1775        {update_errors, Stats#set_view_group_stats.update_errors},
1776        {duplicated_partition_versions_seen,
1777         Stats#set_view_group_stats.dup_partitions_counter}
1778    ]},
1779    {ok, Size} = couch_file:bytes(Fd),
1780    GroupPartitions = ordsets:from_list(
1781        couch_set_view_util:decode_bitmask(?set_abitmask(Group) bor ?set_pbitmask(Group))),
1782    PartVersions = lists:map(fun({PartId, PartVersion}) ->
1783        {couch_util:to_binary(PartId), [tuple_to_list(V) || V <- PartVersion]}
1784    end, ?set_partition_versions(Group)),
1785
1786    IndexSeqs = ?set_seqs(Group),
1787    IndexPartitions = [PartId || {PartId, _} <- IndexSeqs],
1788    % Extract the seqnum from KV store for all indexible partitions.
1789    {ok, GroupSeqs} = get_seqs(State, GroupPartitions),
1790    PartSeqs = couch_set_view_util:filter_seqs(IndexPartitions, GroupSeqs),
1791
1792    % Calculate the total sum over difference of Seqnum between KV
1793    % and Index partition.
1794    SeqDiffs = lists:zipwith(
1795        fun({PartId, Seq1}, {PartId, Seq2}) ->
1796            Seq1 - Seq2
1797        end, PartSeqs, IndexSeqs),
1798    TotalSeqDiff = lists:sum(SeqDiffs),
1799
1800    [
1801        {signature, ?l2b(hex_sig(GroupSig))},
1802        {disk_size, Size},
1803        {data_size, Mod:view_group_data_size(Btree, Views)},
1804        {updater_running, is_pid(UpdaterPid)},
1805        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build},
1806        {updater_state, couch_util:to_binary(UpdaterState)},
1807        {compact_running, CompactorPid /= nil},
1808        {cleanup_running, (CleanerPid /= nil) orelse
1809            ((CompactorPid /= nil) andalso (?set_cbitmask(Group) =/= 0))},
1810        {max_number_partitions, ?set_num_partitions(Group)},
1811        {update_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- IndexSeqs]}},
1812        {partition_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- GroupSeqs]}},
1813        {total_seqs_diff, TotalSeqDiff},
1814        {active_partitions, couch_set_view_util:decode_bitmask(?set_abitmask(Group))},
1815        {passive_partitions, couch_set_view_util:decode_bitmask(?set_pbitmask(Group))},
1816        {cleanup_partitions, couch_set_view_util:decode_bitmask(?set_cbitmask(Group))},
1817        {unindexable_partitions, {[{couch_util:to_binary(P), S} || {P, S} <- ?set_unindexable_seqs(Group)]}},
1818        {stats, JsonStats},
1819        {pending_transition, case PendingTrans of
1820            nil ->
1821                null;
1822            #set_view_transition{} ->
1823                {[
1824                    {active, PendingTrans#set_view_transition.active},
1825                    {passive, PendingTrans#set_view_transition.passive}
1826                ]}
1827            end
1828        },
1829        {partition_versions, {PartVersions}}
1830    ] ++
1831    case (?type(State) =:= main) andalso is_pid(ReplicaPid) of
1832    true ->
1833        [{replica_partitions, ReplicaParts}, {replicas_on_transfer, ?set_replicas_on_transfer(Group)}];
1834    false ->
1835        []
1836    end ++
1837    get_replica_group_info(ReplicaPid).
1838
1839get_replica_group_info(ReplicaPid) when is_pid(ReplicaPid) ->
1840    {ok, RepGroupInfo} = gen_server:call(ReplicaPid, request_group_info, infinity),
1841    [{replica_group_info, {RepGroupInfo}}];
1842get_replica_group_info(_) ->
1843    [].
1844
1845
1846get_data_size_info(State) ->
1847    #state{
1848        group = Group,
1849        replica_group = ReplicaPid,
1850        updater_pid = UpdaterPid
1851    } = State,
1852    #set_view_group{
1853        fd = Fd,
1854        id_btree = Btree,
1855        sig = GroupSig,
1856        views = Views,
1857        mod = Mod
1858    } = Group,
1859    {ok, FileSize} = couch_file:bytes(Fd),
1860    DataSize = Mod:view_group_data_size(Btree, Views),
1861    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1862        ?set_view_group_stats_key(Group)),
1863    Info = [
1864        {signature, hex_sig(GroupSig)},
1865        {disk_size, FileSize},
1866        {data_size, DataSize},
1867        {accesses, Stats#set_view_group_stats.accesses},
1868        {updater_running, is_pid(UpdaterPid)},
1869        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build}
1870    ],
1871    case is_pid(ReplicaPid) of
1872    false ->
1873        Info;
1874    true ->
1875        {ok, RepInfo} = gen_server:call(ReplicaPid, get_data_size, infinity),
1876        [{replica_group_info, RepInfo} | Info]
1877    end.
1878
1879
1880-spec reset_group(#set_view_group{}) -> #set_view_group{}.
1881reset_group(Group) ->
1882    #set_view_group{
1883        views = Views,
1884        mod = Mod
1885    } = Group,
1886    Views2 = lists:map(fun(View) ->
1887        Indexer = Mod:reset_view(View#set_view.indexer),
1888        View#set_view{indexer = Indexer}
1889    end, Views),
1890    Group#set_view_group{
1891        fd = nil,
1892        index_header = #set_view_index_header{},
1893        id_btree = nil,
1894        views = Views2
1895    }.
1896
1897
1898-spec reset_file(pid(), #set_view_group{}) -> #set_view_group{}.
1899reset_file(Fd, #set_view_group{views = Views, index_header = Header} = Group) ->
1900    ok = couch_file:truncate(Fd, 0),
1901    EmptyHeader = Header#set_view_index_header{
1902        seqs = [{PartId, 0} ||
1903            {PartId, _} <- Header#set_view_index_header.seqs],
1904        id_btree_state = nil,
1905        view_states = [nil || _ <- Views],
1906        replicas_on_transfer = [],
1907        pending_transition = nil,
1908        unindexable_seqs = [{PartId, 0} ||
1909            {PartId, _} <- Header#set_view_index_header.unindexable_seqs],
1910        partition_versions = [{PartId, [{0, 0}]} ||
1911            {PartId, _} <- Header#set_view_index_header.partition_versions]
1912    },
1913    EmptyGroup = Group#set_view_group{index_header = EmptyHeader},
1914    EmptyHeaderBin = couch_set_view_util:group_to_header_bin(EmptyGroup),
1915    {ok, Pos} = couch_file:write_header_bin(Fd, EmptyHeaderBin),
1916    init_group(Fd, reset_group(EmptyGroup), EmptyHeader, Pos).
1917
1918
1919-spec reset_group_from_state(#state{}) -> #state{}.
1920reset_group_from_state(State) ->
1921    Group = State#state.group,
1922    Group2 = reset_file(Group#set_view_group.fd, Group),
1923    State#state{
1924        updater_pid = nil,
1925        initial_build = false,
1926        updater_state = not_running,
1927        group = Group2
1928    }.
1929
1930
1931-spec init_group(pid(),
1932                 #set_view_group{},
1933                 #set_view_index_header{},
1934                 non_neg_integer()) -> #set_view_group{}.
1935init_group(Fd, Group, IndexHeader, HeaderPos) ->
1936    #set_view_group{
1937        views = Views0,
1938        mod = Mod
1939    } = Group,
1940    Views = [V#set_view{ref = make_ref()} || V <- Views0],
1941    #set_view_index_header{
1942        id_btree_state = IdBtreeState,
1943        view_states = ViewStates
1944    } = IndexHeader,
1945    IdTreeReduce = fun(reduce, KVs) ->
1946        <<(length(KVs)):40, (couch_set_view_util:partitions_map(KVs, 0)):?MAX_NUM_PARTITIONS>>;
1947    (rereduce, [First | Rest]) ->
1948        lists:foldl(
1949            fun(<<S:40, M:?MAX_NUM_PARTITIONS>>, <<T:40, A:?MAX_NUM_PARTITIONS>>) ->
1950                <<(S + T):40, (M bor A):?MAX_NUM_PARTITIONS>>
1951            end,
1952            First, Rest)
1953    end,
1954    KvChunkThreshold = couch_config:get("set_views", "btree_kv_node_threshold", "7168"),
1955    KpChunkThreshold = couch_config:get("set_views", "btree_kp_node_threshold", "6144"),
1956    BtreeOptions = [
1957        {kv_chunk_threshold, list_to_integer(KvChunkThreshold)},
1958        {kp_chunk_threshold, list_to_integer(KpChunkThreshold)},
1959        {binary_mode, true}
1960    ],
1961    {ok, IdBtree} = couch_btree:open(
1962        IdBtreeState, Fd, [{reduce, IdTreeReduce} | BtreeOptions]),
1963    Views2 = Mod:setup_views(Fd, BtreeOptions, Group, ViewStates, Views),
1964    Group#set_view_group{
1965        fd = Fd,
1966        id_btree = IdBtree,
1967        views = Views2,
1968        index_header = IndexHeader,
1969        header_pos = HeaderPos
1970    }.
1971
1972
1973commit_header(Group) ->
1974    commit_header(Group, true).
1975
1976-spec commit_header(#set_view_group{}, boolean()) -> {'ok', non_neg_integer()}.
1977commit_header(Group, Fsync) ->
1978    HeaderBin = couch_set_view_util:group_to_header_bin(Group),
1979    {ok, Pos} = couch_file:write_header_bin(Group#set_view_group.fd, HeaderBin),
1980    case Fsync of
1981    true ->
1982        ok = couch_file:sync(Group#set_view_group.fd);
1983    false ->
1984        ok = couch_file:flush(Group#set_view_group.fd)
1985    end,
1986    {ok, Pos}.
1987
1988-spec filter_out_bitmask_partitions(ordsets:ordset(partition_id()),
1989                                    bitmask()) -> ordsets:ordset(partition_id()).
1990filter_out_bitmask_partitions(Partitions, BMask) ->
1991    [P || P <- Partitions, ((BMask bsr P) band 1) =/= 1].
1992
1993-spec maybe_update_partition_states(ordsets:ordset(partition_id()),
1994                                    ordsets:ordset(partition_id()),
1995                                    ordsets:ordset(partition_id()),
1996                                    #state{}) -> #state{}.
1997maybe_update_partition_states(ActiveList0, PassiveList0, CleanupList0, State) ->
1998    #state{group = Group} = State,
1999    PendingTrans = ?set_pending_transition(Group),
2000    PendingActive = ?pending_transition_active(PendingTrans),
2001    PendingPassive = ?pending_transition_passive(PendingTrans),
2002    PendingUnindexable = ?pending_transition_unindexable(PendingTrans),
2003    case (?set_unindexable_seqs(Group) == []) andalso (PendingUnindexable == []) of
2004    true ->
2005        ActiveList = ActiveList0,
2006        PassiveList = PassiveList0,
2007        CleanupList = CleanupList0;
2008    false ->
2009        AlreadyActive = couch_set_view_util:build_bitmask(PendingActive) bor ?set_abitmask(Group),
2010        AlreadyPassive = couch_set_view_util:build_bitmask(PendingPassive) bor ?set_pbitmask(Group),
2011        ActiveList = filter_out_bitmask_partitions(ActiveList0, AlreadyActive),
2012        PassiveList = filter_out_bitmask_partitions(PassiveList0, AlreadyPassive),
2013        CleanupList = filter_out_bitmask_partitions(CleanupList0, ?set_cbitmask(Group)),
2014        ActiveMarkedAsUnindexable0 = [
2015            P || P <- ActiveList, is_unindexable_part(P, Group)
2016        ],
2017
2018        % Replicas on transfer look like normal active partitions for the
2019        % caller. Hence they can be marked as unindexable. The actual
2020        % transfer to a real active partition needs to happen though. Thus
2021        % an intersection between newly activated partitions and unindexable
2022        % ones is possible (MB-8677).
2023        ActiveMarkedAsUnindexable = ordsets:subtract(
2024            ActiveMarkedAsUnindexable0, ?set_replicas_on_transfer(Group)),
2025
2026        case ActiveMarkedAsUnindexable of
2027        [] ->
2028            ok;
2029        _ ->
2030            ErrorMsg1 = io_lib:format("Intersection between requested active list "
2031                "and current unindexable partitions: ~w", [ActiveMarkedAsUnindexable]),
2032            throw({error, iolist_to_binary(ErrorMsg1)})
2033        end,
2034        PassiveMarkedAsUnindexable = [
2035            P || P <- PassiveList, is_unindexable_part(P, Group)
2036        ],
2037        case PassiveMarkedAsUnindexable of
2038        [] ->
2039            ok;
2040        _ ->
2041            ErrorMsg2 = io_lib:format("Intersection between requested passive list "
2042                "and current unindexable partitions: ~w", [PassiveMarkedAsUnindexable]),
2043            throw({error, iolist_to_binary(ErrorMsg2)})
2044        end,
2045        CleanupMarkedAsUnindexable = [
2046            P || P <- CleanupList, is_unindexable_part(P, Group)
2047        ],
2048        case CleanupMarkedAsUnindexable of
2049        [] ->
2050            ok;
2051        _ ->
2052            ErrorMsg3 = io_lib:format("Intersection between requested cleanup list "
2053                "and current unindexable partitions: ~w", [CleanupMarkedAsUnindexable]),
2054            throw({error, iolist_to_binary(ErrorMsg3)})
2055        end
2056    end,
2057    ActiveMask = couch_set_view_util:build_bitmask(ActiveList),
2058    case ActiveMask >= (1 bsl ?set_num_partitions(Group)) of
2059    true ->
2060        throw({error, <<"Invalid active partitions list">>});
2061    false ->
2062        ok
2063    end,
2064    PassiveMask = couch_set_view_util:build_bitmask(PassiveList),
2065    case PassiveMask >= (1 bsl ?set_num_partitions(Group)) of
2066    true ->
2067        throw({error, <<"Invalid passive partitions list">>});
2068    false ->
2069        ok
2070    end,
2071    CleanupMask = couch_set_view_util:build_bitmask(CleanupList),
2072    case CleanupMask >= (1 bsl ?set_num_partitions(Group)) of
2073    true ->
2074        throw({error, <<"Invalid cleanup partitions list">>});
2075    false ->
2076        ok
2077    end,
2078
2079    ActivePending = ?pending_transition_active(PendingTrans),
2080    PassivePending = ?pending_transition_passive(PendingTrans),
2081    IsEffectlessTransition =
2082        (ActiveMask bor ?set_abitmask(Group)) == ?set_abitmask(Group) andalso
2083        (PassiveMask bor ?set_pbitmask(Group)) == ?set_pbitmask(Group) andalso
2084        ((CleanupMask band (?set_abitmask(Group) bor ?set_pbitmask(Group))) == 0) andalso
2085        ordsets:is_disjoint(CleanupList, ActivePending) andalso
2086        ordsets:is_disjoint(CleanupList, PassivePending) andalso
2087        ordsets:is_disjoint(ActiveList, PassivePending) andalso
2088        ordsets:is_disjoint(PassiveList, ActivePending),
2089
2090    case IsEffectlessTransition of
2091    true ->
2092        State;
2093    false ->
2094        RestartUpdater = updater_needs_restart(
2095            Group, ActiveMask, PassiveMask, CleanupMask),
2096        NewState = update_partition_states(
2097            ActiveList, PassiveList, CleanupList, State, RestartUpdater),
2098        #state{group = NewGroup, updater_pid = UpdaterPid} = NewState,
2099        case RestartUpdater of
2100        false when is_pid(UpdaterPid) ->
2101            case missing_partitions(Group, NewGroup) of
2102            [] ->
2103                ok;
2104            MissingPassive ->
2105                UpdaterPid ! {new_passive_partitions, MissingPassive}
2106            end;
2107        _ ->
2108            ok
2109        end,
2110        NewState
2111    end.
2112
2113
2114-spec update_partition_states(ordsets:ordset(partition_id()),
2115                              ordsets:ordset(partition_id()),
2116                              ordsets:ordset(partition_id()),
2117                              #state{},
2118                              boolean()) -> #state{}.
2119update_partition_states(ActiveList, PassiveList, CleanupList, State, RestartUpdater) ->
2120    State2 = stop_cleaner(State),
2121    case RestartUpdater of
2122    true ->
2123        #state{group = Group3} = State3 = stop_updater(State2);
2124    false ->
2125        #state{group = Group3} = State3 = State2
2126    end,
2127    UpdaterWasRunning = is_pid(State#state.updater_pid),
2128    ActiveInCleanup = partitions_still_in_cleanup(ActiveList, Group3),
2129    PassiveInCleanup = partitions_still_in_cleanup(PassiveList, Group3),
2130    NewPendingTrans = merge_into_pending_transition(
2131        Group3, ActiveInCleanup, PassiveInCleanup, CleanupList),
2132    ApplyActiveList = ordsets:subtract(ActiveList, ActiveInCleanup),
2133    ApplyPassiveList = ordsets:subtract(PassiveList, PassiveInCleanup),
2134    ApplyCleanupList = CleanupList,
2135    State4 = persist_partition_states(
2136               State3, ApplyActiveList, ApplyPassiveList,
2137               ApplyCleanupList, NewPendingTrans, []),
2138    State5 = notify_pending_transition_waiters(State4),
2139    after_partition_states_updated(State5, UpdaterWasRunning).
2140
2141
2142-spec merge_into_pending_transition(#set_view_group{},
2143                                    ordsets:ordset(partition_id()),
2144                                    ordsets:ordset(partition_id()),
2145                                    ordsets:ordset(partition_id())) ->
2146                                           #set_view_transition{} | 'nil'.
2147merge_into_pending_transition(Group, ActiveInCleanup, PassiveInCleanup, CleanupList) ->
2148    PendingTrans = ?set_pending_transition(Group),
2149    ActivePending = ?pending_transition_active(PendingTrans),
2150    PassivePending = ?pending_transition_passive(PendingTrans),
2151    case ordsets:intersection(PassivePending, ActiveInCleanup) of
2152    [] ->
2153        PassivePending2 = PassivePending;
2154    Int ->
2155        PassivePending2 = ordsets:subtract(PassivePending, Int)
2156    end,
2157    case ordsets:intersection(ActivePending, PassiveInCleanup) of
2158    [] ->
2159        ActivePending2 = ActivePending;
2160    Int2 ->
2161        ActivePending2 = ordsets:subtract(ActivePending, Int2)
2162    end,
2163    ActivePending3 = ordsets:subtract(ActivePending2, CleanupList),
2164    PassivePending3 = ordsets:subtract(PassivePending2, CleanupList),
2165    ActivePending4 = ordsets:union(ActivePending3, ActiveInCleanup),
2166    PassivePending4 = ordsets:union(PassivePending3, PassiveInCleanup),
2167    case (ActivePending4 == []) andalso (PassivePending4 == []) of
2168    true ->
2169        nil;
2170    false ->
2171        #set_view_transition{
2172            active = ActivePending4,
2173            passive = PassivePending4
2174        }
2175    end.
2176
2177
2178-spec after_partition_states_updated(#state{}, boolean()) -> #state{}.
2179after_partition_states_updated(State, UpdaterWasRunning) ->
2180    State2 = case UpdaterWasRunning of
2181    true ->
2182        % Updater was running, we stopped it, updated the group we received
2183        % from the updater, updated that group's bitmasks and update seqs,
2184        % and now restart the updater with this modified group.
2185        start_updater(State);
2186    false ->
2187        State
2188    end,
2189    State3 = stop_compactor(State2),
2190    maybe_start_cleaner(State3).
2191
2192
2193-spec persist_partition_states(#state{},
2194                               ordsets:ordset(partition_id()),
2195                               ordsets:ordset(partition_id()),
2196                               ordsets:ordset(partition_id()),
2197                               #set_view_transition{} | 'nil',
2198                               ordsets:ordset(partition_id())) -> #state{}.
2199persist_partition_states(State, ActiveList, PassiveList, CleanupList, PendingTrans, ToBeUnindexable) ->
2200    % There can never be intersection between given active, passive and cleanup lists.
2201    % This check is performed elsewhere, outside the gen_server.
2202    #state{
2203        group = Group,
2204        replica_partitions = ReplicaParts,
2205        replica_group = ReplicaPid,
2206        update_listeners = Listeners,
2207        waiting_list = WaitList
2208    } = State,
2209    case ordsets:intersection(ActiveList, ReplicaParts) of
2210    [] ->
2211         ActiveList2 = ActiveList,
2212         PassiveList2 = PassiveList,
2213         ReplicasOnTransfer2 = ?set_replicas_on_transfer(Group),
2214         ReplicasToMarkActive = [];
2215    CommonRep ->
2216         PassiveList2 = ordsets:union(PassiveList, CommonRep),
2217         ActiveList2 = ordsets:subtract(ActiveList, CommonRep),
2218         ReplicasOnTransfer2 = ordsets:union(?set_replicas_on_transfer(Group), CommonRep),
2219         ReplicasToMarkActive = CommonRep
2220    end,
2221    case ordsets:intersection(PassiveList, ReplicasOnTransfer2) of
2222    [] ->
2223        ReplicasToCleanup = [],
2224        PassiveList3 = PassiveList2,
2225        ReplicasOnTransfer3 = ReplicasOnTransfer2;
2226    CommonRep2 ->
2227        ReplicasToCleanup = CommonRep2,
2228        PassiveList3 = ordsets:subtract(PassiveList2, CommonRep2),
2229        ReplicasOnTransfer3 = ordsets:subtract(ReplicasOnTransfer2, CommonRep2)
2230    end,
2231    case ordsets:intersection(CleanupList, ReplicasOnTransfer3) of
2232    [] ->
2233        ReplicaParts2 = ReplicaParts,
2234        ReplicasOnTransfer4 = ReplicasOnTransfer3,
2235        ReplicasToCleanup2 = ReplicasToCleanup;
2236    CommonRep3 ->
2237        ReplicaParts2 = ordsets:subtract(ReplicaParts, CommonRep3),
2238        ReplicasOnTransfer4 = ordsets:subtract(ReplicasOnTransfer3, CommonRep3),
2239        ReplicasToCleanup2 = ordsets:union(ReplicasToCleanup, CommonRep3)
2240    end,
2241    {ok, NewAbitmask1, NewPbitmask1, NewSeqs1, NewVersions1} =
2242        set_active_partitions(
2243            ActiveList2,
2244            ?set_abitmask(Group),
2245            ?set_pbitmask(Group),
2246            ?set_seqs(Group),
2247            ?set_partition_versions(Group)),
2248    {ok, NewAbitmask2, NewPbitmask2, NewSeqs2, NewVersions2} =
2249        set_passive_partitions(
2250            PassiveList3,
2251            NewAbitmask1,
2252            NewPbitmask1,
2253            NewSeqs1,
2254            NewVersions1),
2255    {ok, NewAbitmask3, NewPbitmask3, NewCbitmask3, NewSeqs3, NewVersions3} =
2256        set_cleanup_partitions(
2257            CleanupList,
2258            NewAbitmask2,
2259            NewPbitmask2,
2260            ?set_cbitmask(Group),
2261            NewSeqs2,
2262            NewVersions2),
2263    {NewSeqs4, NewUnindexableSeqs, NewVersions4} = lists:foldl(
2264        fun(PartId, {AccSeqs, AccUnSeqs, AccVersions}) ->
2265            PartSeq = couch_set_view_util:get_part_seq(PartId, AccSeqs),
2266            AccSeqs2 = orddict:erase(PartId, AccSeqs),
2267            AccUnSeqs2 = orddict:store(PartId, PartSeq, AccUnSeqs),
2268            AccVersions2 = orddict:erase(PartId, AccVersions),
2269            {AccSeqs2, AccUnSeqs2, AccVersions2}
2270        end,
2271        {NewSeqs3, ?set_unindexable_seqs(Group), NewVersions3},
2272        ToBeUnindexable),
2273    State2 = update_header(
2274        State,
2275        NewAbitmask3,
2276        NewPbitmask3,
2277        NewCbitmask3,
2278        NewSeqs4,
2279        NewUnindexableSeqs,
2280        ReplicasOnTransfer4,
2281        ReplicaParts2,
2282        PendingTrans,
2283        NewVersions4),
2284    % A crash might happen between updating our header and updating the state of
2285    % replica view group. The init function must detect and correct this.
2286    ok = set_state(ReplicaPid, ReplicasToMarkActive, [], ReplicasToCleanup2),
2287    % Need to update list of active partition sequence numbers for every blocked client.
2288    WaitList2 = update_waiting_list(
2289        WaitList, State, ActiveList2, PassiveList3, CleanupList),
2290    State3 = State2#state{waiting_list = WaitList2},
2291    case (dict:size(Listeners) > 0) andalso (CleanupList /= []) of
2292    true ->
2293        Listeners2 = dict:filter(
2294            fun(Ref, Listener) ->
2295                #up_listener{
2296                    pid = Pid,
2297                    monref = MonRef,
2298                    partition = PartId
2299                } = Listener,
2300                case lists:member(PartId, CleanupList) of
2301                true ->
2302                    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2303                              " replying to partition ~p"
2304                              " update monitor, reference ~p,"
2305                              " error: marked_for_cleanup",
2306                              [?set_name(State), ?type(State),
2307                               ?category(State), ?group_id(State),
2308                               Ref, PartId]),
2309                    Pid ! {Ref, marked_for_cleanup},
2310                    erlang:demonitor(MonRef, [flush]),
2311                    false;
2312                false ->
2313                    true
2314                end
2315            end,
2316            Listeners),
2317        State3#state{update_listeners = Listeners2};
2318    false ->
2319        State3
2320    end.
2321
2322
2323-spec update_waiting_list([#waiter{}],
2324                          #state{},
2325                          ordsets:ordset(partition_id()),
2326                          ordsets:ordset(partition_id()),
2327                          ordsets:ordset(partition_id())) -> [#waiter{}].
2328update_waiting_list([], _State, _AddActiveList, _AddPassiveList, _AddCleanupList) ->
2329    [];
2330update_waiting_list(WaitList, State, AddActiveList, AddPassiveList, AddCleanupList) ->
2331    {ok, AddActiveSeqs} = get_seqs(State, AddActiveList),
2332    RemoveSet = ordsets:union(AddPassiveList, AddCleanupList),
2333    MapFun = fun(W) -> update_waiter_seqs(W, AddActiveSeqs, RemoveSet) end,
2334    [MapFun(W) || W <- WaitList].
2335
2336
2337-spec update_waiter_seqs(#waiter{},
2338                         partition_seqs(),
2339                         ordsets:ordset(partition_id())) -> #waiter{}.
2340update_waiter_seqs(Waiter, AddActiveSeqs, ToRemove) ->
2341    Seqs2 = lists:foldl(
2342        fun({PartId, Seq}, Acc) ->
2343            case couch_set_view_util:has_part_seq(PartId, Acc) of
2344            true ->
2345                Acc;
2346            false ->
2347                orddict:store(PartId, Seq, Acc)
2348            end
2349        end,
2350        Waiter#waiter.seqs, AddActiveSeqs),
2351    Seqs3 = lists:foldl(
2352        fun(PartId, Acc) -> orddict:erase(PartId, Acc) end,
2353        Seqs2, ToRemove),
2354    Waiter#waiter{seqs = Seqs3}.
2355
2356
2357-spec maybe_apply_pending_transition(#state{}) -> #state{}.
2358maybe_apply_pending_transition(State) when not ?have_pending_transition(State) ->
2359    State;
2360maybe_apply_pending_transition(State) ->
2361    State2 = stop_cleaner(State),
2362    #state{group = Group3} = State3 = stop_updater(State2),
2363    UpdaterWasRunning = is_pid(State#state.updater_pid),
2364    #set_view_transition{
2365        active = ActivePending,
2366        passive = PassivePending,
2367        unindexable = UnindexablePending
2368    } = get_pending_transition(State),
2369    ActiveInCleanup = partitions_still_in_cleanup(ActivePending, Group3),
2370    PassiveInCleanup = partitions_still_in_cleanup(PassivePending, Group3),
2371    ApplyActiveList = ordsets:subtract(ActivePending, ActiveInCleanup),
2372    ApplyPassiveList = ordsets:subtract(PassivePending, PassiveInCleanup),
2373    {ApplyUnindexableList, NewUnindexablePending} = lists:partition(
2374        fun(P) ->
2375            lists:member(P, ApplyActiveList) orelse
2376            lists:member(P, ApplyPassiveList)
2377        end,
2378        UnindexablePending),
2379    case (ApplyActiveList /= []) orelse (ApplyPassiveList /= []) of
2380    true ->
2381        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2382                  " applying state transitions from pending transition:~n"
2383                  "  Active partitions:  ~w~n"
2384                  "  Passive partitions: ~w~n"
2385                  "  Unindexable:        ~w~n",
2386                  [?set_name(State), ?type(State), ?category(State),
2387                   ?group_id(State), ApplyActiveList, ApplyPassiveList,
2388                   ApplyUnindexableList]),
2389        case (ActiveInCleanup == []) andalso (PassiveInCleanup == []) of
2390        true ->
2391            NewPendingTrans = nil;
2392        false ->
2393            NewPendingTrans = #set_view_transition{
2394                active = ActiveInCleanup,
2395                passive = PassiveInCleanup,
2396                unindexable = NewUnindexablePending
2397            }
2398        end,
2399        State4 = set_pending_transition(State3, NewPendingTrans),
2400        State5 = persist_partition_states(
2401            State4, ApplyActiveList, ApplyPassiveList, [], NewPendingTrans, ApplyUnindexableList),
2402        State6 = notify_pending_transition_waiters(State5),
2403        NewState = case dict:size(State6#state.update_listeners) > 0 of
2404        true ->
2405            start_updater(State6);
2406        false ->
2407            State6
2408        end;
2409    false ->
2410        NewState = State3
2411    end,
2412    after_partition_states_updated(NewState, UpdaterWasRunning).
2413
2414
2415-spec notify_pending_transition_waiters(#state{}) -> #state{}.
2416notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State) ->
2417    State;
2418notify_pending_transition_waiters(State) ->
2419    #state{
2420        pending_transition_waiters = TransWaiters,
2421        group = Group,
2422        replica_partitions = RepParts,
2423        waiting_list = WaitList
2424    } = State,
2425    CurSeqs = active_partition_seqs(State),
2426    {TransWaiters2, WaitList2, GroupReplyList, TriggerGroupUpdate} =
2427        lists:foldr(
2428            fun({From, Req} = TransWaiter, {AccTrans, AccWait, ReplyAcc, AccTriggerUp}) ->
2429                #set_view_group_req{
2430                    stale = Stale,
2431                    debug = Debug
2432                } = Req,
2433                case is_any_partition_pending(Req, Group) of
2434                true ->
2435                    {[TransWaiter | AccTrans], AccWait, ReplyAcc, AccTriggerUp};
2436                false when Stale == ok ->
2437                    Waiter = #waiter{from = From, debug = Debug},
2438                    {AccTrans, AccWait, [Waiter | ReplyAcc], AccTriggerUp};
2439                false when Stale == update_after ->
2440                    Waiter = #waiter{from = From, debug = Debug},
2441                    {AccTrans, AccWait, [Waiter | ReplyAcc], true};
2442                false when Stale == false ->
2443                    Waiter = #waiter{from = From, debug = Debug, seqs = CurSeqs},
2444                    {AccTrans, [Waiter | AccWait], ReplyAcc, true}
2445                end
2446            end,
2447            {[], WaitList, [], false},
2448            TransWaiters),
2449    [] = reply_with_group(Group, RepParts, GroupReplyList),
2450    WaitList3 = reply_with_group(Group, RepParts, WaitList2),
2451    State2 = State#state{
2452        pending_transition_waiters = TransWaiters2,
2453        waiting_list = WaitList3
2454    },
2455    case TriggerGroupUpdate of
2456    true ->
2457        start_updater(State2);
2458    false ->
2459        State2
2460    end.
2461
2462
2463-spec notify_pending_transition_waiters(#state{}, term()) -> #state{}.
2464notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State, _Reply) ->
2465    State;
2466notify_pending_transition_waiters(#state{pending_transition_waiters = Waiters} = State, Reply) ->
2467    lists:foreach(fun(F) -> catch gen_server:reply(F, Reply) end, Waiters),
2468    State#state{pending_transition_waiters = []}.
2469
2470
2471-spec set_passive_partitions(ordsets:ordset(partition_id()),
2472                             bitmask(),
2473                             bitmask(),
2474                             partition_seqs(),
2475                             partition_versions()) ->
2476                                    {'ok', bitmask(), bitmask(),
2477                                     partition_seqs(), partition_versions()}.
2478set_passive_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2479    {ok, Abitmask, Pbitmask, Seqs, Versions};
2480
2481set_passive_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2482    PartMask = 1 bsl PartId,
2483    case PartMask band Abitmask of
2484    0 ->
2485        case PartMask band Pbitmask of
2486        PartMask ->
2487            set_passive_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2488        0 ->
2489            NewSeqs = lists:ukeymerge(1, [{PartId, 0}], Seqs),
2490            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2491            set_passive_partitions(
2492                Rest, Abitmask, Pbitmask bor PartMask, NewSeqs, NewVersions)
2493        end;
2494    PartMask ->
2495        set_passive_partitions(
2496            Rest, Abitmask bxor PartMask, Pbitmask bor PartMask, Seqs,
2497            Versions)
2498    end.
2499
2500
2501-spec set_active_partitions(ordsets:ordset(partition_id()),
2502                            bitmask(),
2503                            bitmask(),
2504                            partition_seqs(),
2505                            partition_versions()) ->
2506                                   {'ok', bitmask(), bitmask(),
2507                                    partition_seqs(), partition_versions()}.
2508set_active_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2509    {ok, Abitmask, Pbitmask, Seqs, Versions};
2510
2511set_active_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2512    PartMask = 1 bsl PartId,
2513    case PartMask band Pbitmask of
2514    0 ->
2515        case PartMask band Abitmask of
2516        PartMask ->
2517            set_active_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2518        0 ->
2519            NewSeqs = lists:ukeymerge(1, Seqs, [{PartId, 0}]),
2520            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2521            set_active_partitions(Rest, Abitmask bor PartMask, Pbitmask,
2522                NewSeqs, NewVersions)
2523        end;
2524    PartMask ->
2525        set_active_partitions(
2526            Rest, Abitmask bor PartMask, Pbitmask bxor PartMask, Seqs,
2527            Versions)
2528    end.
2529
2530
2531-spec set_cleanup_partitions(ordsets:ordset(partition_id()),
2532                             bitmask(),
2533                             bitmask(),
2534                             bitmask(),
2535                             partition_seqs(),
2536                             partition_versions()) ->
2537                                    {'ok', bitmask(), bitmask(), bitmask(),
2538                                     partition_seqs(), partition_versions()}.
2539set_cleanup_partitions([], Abitmask, Pbitmask, Cbitmask, Seqs, Versions) ->
2540    {ok, Abitmask, Pbitmask, Cbitmask, Seqs, Versions};
2541
2542set_cleanup_partitions([PartId | Rest], Abitmask, Pbitmask, Cbitmask, Seqs,
2543        Versions) ->
2544    PartMask = 1 bsl PartId,
2545    case PartMask band Cbitmask of
2546    PartMask ->
2547        set_cleanup_partitions(
2548            Rest, Abitmask, Pbitmask, Cbitmask, Seqs, Versions);
2549    0 ->
2550        Seqs2 = lists:keydelete(PartId, 1, Seqs),
2551        Versions2 = lists:keydelete(PartId, 1, Versions),
2552        Cbitmask2 = Cbitmask bor PartMask,
2553        case PartMask band Abitmask of
2554        PartMask ->
2555            set_cleanup_partitions(
2556                Rest, Abitmask bxor PartMask, Pbitmask, Cbitmask2, Seqs2,
2557                Versions2);
2558        0 ->
2559            case (PartMask band Pbitmask) of
2560            PartMask ->
2561                set_cleanup_partitions(
2562                    Rest, Abitmask, Pbitmask bxor PartMask, Cbitmask2, Seqs2,
2563                    Versions2);
2564            0 ->
2565                set_cleanup_partitions(
2566                    Rest, Abitmask, Pbitmask, Cbitmask, Seqs, Versions)
2567            end
2568        end
2569    end.
2570
2571
2572-spec update_header(#state{},
2573                    bitmask(),
2574                    bitmask(),
2575                    bitmask(),
2576                    partition_seqs(),
2577                    partition_seqs(),
2578                    ordsets:ordset(partition_id()),
2579                    ordsets:ordset(partition_id()),
2580                    #set_view_transition{} | 'nil',
2581                    partition_versions()) -> #state{}.
2582update_header(State, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs,
2583              NewUnindexableSeqs, NewRelicasOnTransfer, NewReplicaParts,
2584              NewPendingTrans, NewPartVersions) ->
2585    #state{
2586        group = #set_view_group{
2587            index_header =
2588                #set_view_index_header{
2589                    abitmask = Abitmask,
2590                    pbitmask = Pbitmask,
2591                    cbitmask = Cbitmask,
2592                    replicas_on_transfer = ReplicasOnTransfer,
2593                    unindexable_seqs = UnindexableSeqs,
2594                    pending_transition = PendingTrans,
2595                    partition_versions = PartVersions
2596                } = Header
2597        } = Group,
2598        replica_partitions = ReplicaParts
2599    } = State,
2600    NewGroup = Group#set_view_group{
2601        index_header = Header#set_view_index_header{
2602            abitmask = NewAbitmask,
2603            pbitmask = NewPbitmask,
2604            cbitmask = NewCbitmask,
2605            seqs = NewSeqs,
2606            unindexable_seqs = NewUnindexableSeqs,
2607            replicas_on_transfer = NewRelicasOnTransfer,
2608            pending_transition = NewPendingTrans,
2609            partition_versions = NewPartVersions
2610        }
2611    },
2612    NewGroup2 = remove_duplicate_partitions(NewGroup),
2613    #set_view_group{
2614        index_header = #set_view_index_header{
2615            partition_versions = NewPartVersions2
2616        }
2617    } = NewGroup2,
2618    NewState = State#state{
2619        group = NewGroup2,
2620        replica_partitions = NewReplicaParts
2621    },
2622    FsyncHeader = (NewCbitmask /= Cbitmask),
2623    {ok, HeaderPos} = commit_header(NewState#state.group, FsyncHeader),
2624    NewGroup3 = (NewState#state.group)#set_view_group{
2625        header_pos = HeaderPos
2626    },
2627    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, partition states updated~n"
2628              "active partitions before:    ~w~n"
2629              "active partitions after:     ~w~n"
2630              "passive partitions before:   ~w~n"
2631              "passive partitions after:    ~w~n"
2632              "cleanup partitions before:   ~w~n"
2633              "cleanup partitions after:    ~w~n"
2634              "unindexable partitions:      ~w~n"
2635              "replica partitions before:   ~w~n"
2636              "replica partitions after:    ~w~n"
2637              "replicas on transfer before: ~w~n"
2638              "replicas on transfer after:  ~w~n"
2639              "pending transition before:~n"
2640              "  active:      ~w~n"
2641              "  passive:     ~w~n"
2642              "  unindexable: ~w~n"
2643              "pending transition after:~n"
2644              "  active:      ~w~n"
2645              "  passive:     ~w~n"
2646              "  unindexable: ~w~n"
2647              "partition versions before:~n~w~n"
2648              "partition versions after:~n~w~n",
2649              [?set_name(State), ?type(State), ?category(State),
2650               ?group_id(State),
2651               couch_set_view_util:decode_bitmask(Abitmask),
2652               couch_set_view_util:decode_bitmask(NewAbitmask),
2653               couch_set_view_util:decode_bitmask(Pbitmask),
2654               couch_set_view_util:decode_bitmask(NewPbitmask),
2655               couch_set_view_util:decode_bitmask(Cbitmask),
2656               couch_set_view_util:decode_bitmask(NewCbitmask),
2657               UnindexableSeqs,
2658               ReplicaParts,
2659               NewReplicaParts,
2660               ReplicasOnTransfer,
2661               NewRelicasOnTransfer,
2662               ?pending_transition_active(PendingTrans),
2663               ?pending_transition_passive(PendingTrans),
2664               ?pending_transition_unindexable(PendingTrans),
2665               ?pending_transition_active(NewPendingTrans),
2666               ?pending_transition_passive(NewPendingTrans),
2667               ?pending_transition_unindexable(NewPendingTrans),
2668               PartVersions,
2669               NewPartVersions2]),
2670    NewState#state{group = NewGroup3}.
2671
2672
2673-spec maybe_start_cleaner(#state{}) -> #state{}.
2674maybe_start_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
2675    State;
2676maybe_start_cleaner(#state{auto_cleanup = false} = State) ->
2677    State;
2678maybe_start_cleaner(#state{group = Group} = State) ->
2679    case is_pid(State#state.compactor_pid) orelse
2680        is_pid(State#state.updater_pid) orelse (?set_cbitmask(Group) == 0) of
2681    true ->
2682        State;
2683    false ->
2684        Cleaner = spawn_link(fun() -> exit(cleaner(State)) end),
2685        ?LOG_INFO("Started cleanup process ~p for"
2686                  " set view `~s`, ~s (~s) group `~s`",
2687                  [Cleaner, ?set_name(State), ?type(State), ?category(State),
2688                   ?group_id(State)]),
2689        State#state{cleaner_pid = Cleaner}
2690    end.
2691
2692
2693-spec stop_cleaner(#state{}) -> #state{}.
2694stop_cleaner(#state{cleaner_pid = nil} = State) ->
2695    State;
2696stop_cleaner(#state{cleaner_pid = Pid, group = Group} = State) when is_pid(Pid) ->
2697    MRef = erlang:monitor(process, Pid),
2698    Pid ! stop,
2699    unlink(Pid),
2700    ?LOG_INFO("Stopping cleanup process for set view `~s`, group `~s` (~s)",
2701              [?set_name(State), ?group_id(State), ?category(State)]),
2702    % NOTE vmx 2015-06-29: There is currently no way to cleanly stop the
2703    % spatial view cleaner while it is running. Hence do a hard stop right way.
2704    case Group#set_view_group.mod of
2705    spatial_view ->
2706        couch_util:shutdown_sync(Pid);
2707    _ ->
2708        ok
2709    end,
2710    NewState = receive
2711    {'EXIT', Pid, Reason} ->
2712        after_cleaner_stopped(State, Reason);
2713    {'DOWN', MRef, process, Pid, Reason} ->
2714        receive {'EXIT', Pid, _} -> ok after 0 -> ok end,
2715        after_cleaner_stopped(State, Reason)
2716    after 5000 ->
2717        couch_set_view_util:shutdown_cleaner(Group, Pid),
2718        ok = couch_file:refresh_eof(Group#set_view_group.fd),
2719        ?LOG_ERROR("Timeout stopping cleanup process ~p for"
2720                   " set view `~s`, ~s (~s) group `~s`",
2721                   [Pid, ?set_name(State), ?type(State), ?category(State),
2722                    ?group_id(State)]),
2723        State#state{cleaner_pid = nil}
2724    end,
2725    erlang:demonitor(MRef, [flush]),
2726    NewState.
2727
2728
2729after_cleaner_stopped(State, {clean_group, CleanGroup, Count, Time}) ->
2730    #state{group = OldGroup} = State,
2731    {ok, NewGroup0} = couch_set_view_util:refresh_viewgroup_header(CleanGroup),
2732    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
2733    ?LOG_INFO("Stopped cleanup process for"
2734              " set view `~s`, ~s (~s) group `~s`.~n"
2735              "Removed ~p values from the index in ~.3f seconds~n"
2736              "New set of partitions to cleanup: ~w~n"
2737              "Old set of partitions to cleanup: ~w~n",
2738              [?set_name(State), ?type(State), ?category(State),
2739               ?group_id(State), Count, Time,
2740               couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup)),
2741               couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup))]),
2742    case ?set_cbitmask(NewGroup) of
2743    0 ->
2744        inc_cleanups(State#state.group, Time, Count, false);
2745    _ ->
2746        ?inc_cleanup_stops(State#state.group)
2747    end,
2748    State#state{
2749        group = NewGroup,
2750        cleaner_pid = nil
2751    };
2752after_cleaner_stopped(#state{cleaner_pid = Pid, group = Group} = State, Reason) ->
2753    ok = couch_file:refresh_eof(Group#set_view_group.fd),
2754    ?LOG_ERROR("Cleanup process ~p for set view `~s`, ~s (~s) group `~s`,"
2755               " died with reason: ~p",
2756               [Pid, ?set_name(State), ?type(State), ?category(State),
2757                ?group_id(State), Reason]),
2758    State#state{cleaner_pid = nil}.
2759
2760
2761-spec cleaner(#state{}) -> {'clean_group', #set_view_group{}, non_neg_integer(), float()}.
2762cleaner(#state{group = Group}) ->
2763    StartTime = os:timestamp(),
2764    {ok, NewGroup, TotalPurgedCount} = couch_set_view_util:cleanup_group(Group),
2765    Duration = timer:now_diff(os:timestamp(), StartTime) / 1000000,
2766    {clean_group, NewGroup, TotalPurgedCount, Duration}.
2767
2768
2769-spec indexable_partition_seqs(#state{}) -> partition_seqs().
2770indexable_partition_seqs(State) ->
2771    Partitions = group_partitions(State#state.group),
2772    {ok, Seqs} = get_seqs(State, Partitions),
2773    indexable_partition_seqs(State, Seqs).
2774
2775-spec indexable_partition_seqs(#state{}, partition_seqs()) -> partition_seqs().
2776indexable_partition_seqs(#state{group = Group}, Seqs) ->
2777    case ?set_unindexable_seqs(Group) of
2778    [] ->
2779        Seqs;
2780    _ ->
2781        IndexSeqs = ?set_seqs(Group),
2782        CurPartitions = [P || {P, _} <- IndexSeqs],
2783        ReplicasOnTransfer = ?set_replicas_on_transfer(Group),
2784        Partitions = ordsets:union(CurPartitions, ReplicasOnTransfer),
2785        % Index unindexable replicas on transfer though (as the reason for the
2786        % transfer is to become active and indexable).
2787        couch_set_view_util:filter_seqs(Partitions, Seqs)
2788    end.
2789
2790
2791-spec active_partition_seqs(#state{}) -> partition_seqs().
2792active_partition_seqs(#state{group = Group} = State) ->
2793    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
2794    {ok, CurSeqs} = get_seqs(State, ActiveParts),
2795    CurSeqs.
2796
2797-spec active_partition_seqs(#state{}, partition_seqs()) -> partition_seqs().
2798active_partition_seqs(#state{group = Group}, Seqs) ->
2799    ActiveParts = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
2800    couch_set_view_util:filter_seqs(ActiveParts, Seqs).