1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_group).
16-behaviour(gen_server).
17
18%% API
19-export([start_link/1, request_group_info/1, get_data_size/1]).
20-export([open_set_group/3]).
21-export([request_group/2, release_group/1]).
22-export([is_view_defined/1, define_view/2]).
23-export([set_state/4]).
24-export([add_replica_partitions/2, remove_replica_partitions/2]).
25-export([mark_as_unindexable/2, mark_as_indexable/2]).
26-export([monitor_partition_update/4, demonitor_partition_update/2]).
27-export([reset_utilization_stats/1, get_utilization_stats/1]).
28-export([inc_access_stat/1, remove_duplicate_partitions/1]).
29
30%% gen_server callbacks
31-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
32         terminate/2, code_change/3]).
33
34%% Imported for log cleanup
35-import(couch_set_view_util, [condense/1]).
36
37-include("couch_db.hrl").
38-include_lib("couch_set_view/include/couch_set_view.hrl").
39
40-type init_args()   :: {string(), binary(), #set_view_group{}}.
41-type compact_fun() :: fun((#set_view_group{},
42                            #set_view_group{},
43                            string(),
44                            pid() | 'nil',
45                            pid()) -> no_return()).
46-type monitor_error() :: {'shutdown', any()} |
47                         'marked_for_cleanup' |
48                         {'updater_error', any()}.
49
50-define(DEFAULT_TIMEOUT, 3000).
51-define(GET_TIMEOUT(State), ((State)#state.timeout)).
52
53-define(root_dir(State), element(1, State#state.init_args)).
54-define(set_name(State), element(2, State#state.init_args)).
55-define(type(State), (element(3, State#state.init_args))#set_view_group.type).
56-define(group_sig(State), (element(3, State#state.init_args))#set_view_group.sig).
57-define(group_id(State), (State#state.group)#set_view_group.name).
58-define(dcp_pid(State), (State#state.group)#set_view_group.dcp_pid).
59-define(category(State), (State#state.group)#set_view_group.category).
60-define(is_defined(State),
61    (((State#state.group)#set_view_group.index_header)#set_view_index_header.num_partitions > 0)).
62-define(replicas_on_transfer(State),
63        ((State#state.group)#set_view_group.index_header)#set_view_index_header.replicas_on_transfer).
64-define(have_pending_transition(State),
65        ((((State#state.group)#set_view_group.index_header)
66          #set_view_index_header.pending_transition) /= nil)).
67
68-define(MAX_HIST_SIZE, 10).
69% flow control buffer size 20 MB
70-define(DCP_CONTROL_BUFFER_SIZE, "20971520").
71
72% Seqs cache ttl in microseconds
73-define(SEQS_CACHE_TTL, 300000).
74
75-define(DCP_OPEN_INCLUDE_XATTRS, 16#04).
76-define(DCP_OPEN_NO_VALUE_WITH_UNDERLYING_DATATYPE, 16#40).
77
78-record(util_stats, {
79    useful_indexing_time = 0.0  :: float(),
80    wasted_indexing_time = 0.0  :: float(),
81    updates = 0                 :: non_neg_integer(),
82    updater_interruptions = 0   :: non_neg_integer(),
83    compaction_time = 0.0       :: float(),
84    compactions = 0             :: non_neg_integer(),
85    compactor_interruptions = 0 :: non_neg_integer()
86}).
87
88-record(up_listener, {
89    pid,
90    monref,
91    partition,
92    seq
93}).
94
95-record(waiter, {
96    from,
97    debug = false :: boolean(),
98    % seqs for active partitions only
99    seqs = []     :: partition_seqs()
100}).
101
102-record(seqs_cache, {
103    timestamp = {0, 0, 0}              :: timer:time(),
104    is_waiting = false                 :: boolean(),
105    seqs = []                          :: partition_seqs()
106}).
107
108-record(state, {
109    init_args                          :: init_args(),
110    replica_group = nil                :: 'nil' | pid(),
111    group = #set_view_group{}          :: #set_view_group{},
112    updater_pid = nil                  :: 'nil' | pid(),
113    initial_build = false              :: boolean(),
114    updater_state = not_running        :: set_view_updater_state() | 'not_running' | 'starting',
115    compactor_pid = nil                :: 'nil' | pid(),
116    compactor_file = nil               :: 'nil' | pid(),
117    compactor_fun = nil                :: 'nil' | compact_fun(),
118    compactor_retry_number = 0         :: non_neg_integer(),
119    waiting_list = []                  :: [#waiter{}],
120    cleaner_pid = nil                  :: 'nil' | pid(),
121    shutdown = false                   :: boolean(),
122    shutdown_aliases                   :: [binary()],
123    auto_cleanup = true                :: boolean(),
124    auto_transfer_replicas = true      :: boolean(),
125    replica_partitions = []            :: ordsets:ordset(partition_id()),
126    pending_transition_waiters = []    :: [{From::{pid(), reference()}, #set_view_group_req{}}],
127    update_listeners = dict:new()      :: dict(),
128    compact_log_files = nil            :: 'nil' | {[[string()]], partition_seqs(), partition_versions()},
129    timeout = ?DEFAULT_TIMEOUT         :: non_neg_integer() | 'infinity'
130}).
131
132-define(inc_stat(Group, S),
133    ets:update_counter(
134        Group#set_view_group.stats_ets,
135        ?set_view_group_stats_key(Group),
136        {S, 1})).
137-define(inc_cleanup_stops(Group), ?inc_stat(Group, #set_view_group_stats.cleanup_stops)).
138-define(inc_updater_errors(Group), ?inc_stat(Group, #set_view_group_stats.update_errors)).
139-define(inc_accesses(Group), ?inc_stat(Group, #set_view_group_stats.accesses)).
140
141% Same as in couch_file. That's the offset where headers
142% are stored
143-define(SIZE_BLOCK, 4096).
144
145
146% api methods
147-spec request_group(pid(), #set_view_group_req{}) ->
148                   {'ok', #set_view_group{}} | {'error', term()}.
149request_group(Pid, Req) ->
150    #set_view_group_req{wanted_partitions = WantedPartitions} = Req,
151    Req2 = Req#set_view_group_req{
152        wanted_partitions = ordsets:from_list(WantedPartitions)
153    },
154    request_group(Pid, Req2, 1).
155
156-spec request_group(pid(), #set_view_group_req{}, non_neg_integer()) ->
157                   {'ok', #set_view_group{}} | {'error', term()}.
158request_group(Pid, Req, Retries) ->
159    case gen_server:call(Pid, Req, infinity) of
160    {ok, Group, ActiveReplicasBitmask} ->
161        #set_view_group{
162            ref_counter = RefCounter,
163            replica_pid = RepPid,
164            name = GroupName,
165            set_name = SetName,
166            category = Category
167        } = Group,
168        case request_replica_group(RepPid, ActiveReplicasBitmask, Req) of
169        {ok, RepGroup} ->
170            {ok, Group#set_view_group{replica_group = RepGroup}};
171        retry ->
172            couch_ref_counter:drop(RefCounter),
173            ?LOG_INFO("Retrying group `~s` (~s) request, stale=~s,"
174                      " set `~s`, retry attempt #~p",
175                      [?LOG_USERDATA(GroupName), Category, Req#set_view_group_req.stale,
176                       ?LOG_USERDATA(SetName), Retries]),
177            request_group(Pid, Req, Retries + 1)
178        end;
179    Error ->
180        Error
181    end.
182
183
184-spec request_replica_group(pid(), bitmask(), #set_view_group_req{}) ->
185                           {'ok', #set_view_group{} | 'nil'} | 'retry'.
186request_replica_group(_RepPid, 0, _Req) ->
187    {ok, nil};
188request_replica_group(RepPid, ActiveReplicasBitmask, Req) ->
189    {ok, RepGroup, 0} = gen_server:call(RepPid, Req, infinity),
190    case ?set_abitmask(RepGroup) =:= ActiveReplicasBitmask of
191    true ->
192        {ok, RepGroup};
193    false ->
194        couch_ref_counter:drop(RepGroup#set_view_group.ref_counter),
195        retry
196    end.
197
198
199-spec release_group(#set_view_group{}) -> ok.
200release_group(#set_view_group{ref_counter = RefCounter, replica_group = RepGroup}) ->
201    couch_ref_counter:drop(RefCounter),
202    case RepGroup of
203    #set_view_group{ref_counter = RepRefCounter} ->
204        couch_ref_counter:drop(RepRefCounter);
205    nil ->
206        ok
207    end.
208
209
210-spec request_group_info(pid()) -> {'ok', [{term(), term()}]}.
211request_group_info(Pid) ->
212    case gen_server:call(Pid, request_group_info, infinity) of
213    {ok, GroupInfoList} ->
214        {ok, GroupInfoList};
215    Error ->
216        throw(Error)
217    end.
218
219
220-spec get_data_size(pid()) -> {'ok', [{term(), term()}]}.
221get_data_size(Pid) ->
222    case gen_server:call(Pid, get_data_size, infinity) of
223    {ok, _Info} = Ok ->
224        Ok;
225    Error ->
226        throw(Error)
227    end.
228
229
230-spec define_view(pid(), #set_view_params{}) -> 'ok' | {'error', term()}.
231define_view(Pid, Params) ->
232    #set_view_params{
233        max_partitions = NumPartitions,
234        active_partitions = ActivePartitionsList,
235        passive_partitions = PassivePartitionsList,
236        use_replica_index = UseReplicaIndex
237    } = Params,
238    ActiveList = lists:usort(ActivePartitionsList),
239    ActiveBitmask = couch_set_view_util:build_bitmask(ActiveList),
240    PassiveList = lists:usort(PassivePartitionsList),
241    PassiveBitmask = couch_set_view_util:build_bitmask(PassiveList),
242    case (ActiveBitmask band PassiveBitmask) /= 0 of
243    true ->
244        throw({bad_view_definition,
245            <<"Intersection between active and passive bitmasks">>});
246    false ->
247        ok
248    end,
249    gen_server:call(
250        Pid, {define_view, NumPartitions, ActiveList, ActiveBitmask,
251            PassiveList, PassiveBitmask, UseReplicaIndex}, infinity).
252
253
254-spec is_view_defined(pid()) -> boolean().
255is_view_defined(Pid) ->
256    gen_server:call(Pid, is_view_defined, infinity).
257
258
259-spec set_state(pid(),
260                ordsets:ordset(partition_id()),
261                ordsets:ordset(partition_id()),
262                ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
263set_state(_Pid, [], [], []) ->
264    ok;
265set_state(Pid, Active, Passive, Cleanup) ->
266    ordsets:is_set(Active) orelse throw({error, <<"Active list is not an ordset">>}),
267    ordsets:is_set(Passive) orelse throw({error, <<"Passive list is not an ordset">>}),
268    case ordsets:intersection(Active, Passive) of
269    [] ->
270        ordsets:is_set(Cleanup) orelse throw({error, <<"Cleanup list is not an ordset">>}),
271        case ordsets:intersection(Active, Cleanup) of
272        [] ->
273            case ordsets:intersection(Passive, Cleanup) of
274            [] ->
275                gen_server:call(
276                    Pid, {set_state, Active, Passive, Cleanup}, infinity);
277            _ ->
278                {error,
279                    <<"Intersection between passive and cleanup partition lists">>}
280            end;
281        _ ->
282            {error, <<"Intersection between active and cleanup partition lists">>}
283        end;
284    _ ->
285        {error, <<"Intersection between active and passive partition lists">>}
286    end.
287
288
289-spec add_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
290add_replica_partitions(_Pid, []) ->
291    ok;
292add_replica_partitions(Pid, Partitions) ->
293    BitMask = couch_set_view_util:build_bitmask(Partitions),
294    gen_server:call(Pid, {add_replicas, BitMask}, infinity).
295
296
297-spec remove_replica_partitions(pid(), ordsets:ordset(partition_id())) -> 'ok' | {'error', term()}.
298remove_replica_partitions(_Pid, []) ->
299    ok;
300remove_replica_partitions(Pid, Partitions) ->
301    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
302    gen_server:call(Pid, {remove_replicas, Partitions}, infinity).
303
304
305-spec mark_as_unindexable(pid(), ordsets:ordset(partition_id())) ->
306                                 'ok' | {'error', term()}.
307mark_as_unindexable(Pid, Partitions) ->
308    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
309    gen_server:call(Pid, {mark_as_unindexable, Partitions}, infinity).
310
311
312-spec mark_as_indexable(pid(), ordsets:ordset(partition_id())) ->
313                               'ok' | {'error', term()}.
314mark_as_indexable(Pid, Partitions) ->
315    ordsets:is_set(Partitions) orelse throw({error, <<"List is not an ordset">>}),
316    gen_server:call(Pid, {mark_as_indexable, Partitions}, infinity).
317
318
319-spec monitor_partition_update(pid(), partition_id(), reference(), pid()) ->
320                               'ok' | {'error', term()}.
321monitor_partition_update(Pid, PartitionId, Ref, CallerPid) ->
322    gen_server:call(
323        Pid, {monitor_partition_update, PartitionId, Ref, CallerPid}, infinity).
324
325
326-spec demonitor_partition_update(pid(), reference()) -> 'ok'.
327demonitor_partition_update(Pid, Ref) ->
328    ok = gen_server:call(Pid, {demonitor_partition_update, Ref}, infinity).
329
330
331-spec reset_utilization_stats(pid()) -> 'ok'.
332reset_utilization_stats(Pid) ->
333    ok = gen_server:call(Pid, reset_utilization_stats, infinity).
334
335
336-spec get_utilization_stats(pid()) -> {'ok', [{atom() | binary(), term()}]}.
337get_utilization_stats(Pid) ->
338    gen_server:call(Pid, get_utilization_stats, infinity).
339
340-spec inc_access_stat(pid()) -> 'ok'.
341inc_access_stat(Pid) ->
342    gen_server:call(Pid, increment_stat, infinity).
343
344start_link({RootDir, SetName, Group}) ->
345    Args = {RootDir, SetName, Group#set_view_group{type = main}},
346    proc_lib:start_link(?MODULE, init, [Args]).
347
348
349init({_, _, Group} = InitArgs) ->
350    process_flag(trap_exit, true),
351    {ok, State} = try
352        do_init(InitArgs)
353    catch
354    _:Error ->
355        % An error might occur before the file handler is associated with the
356        % group. There's a message once the file handler is created, so that
357        % we can close the file and make the write guard happy.
358        receive
359        {group_fd, Fd} ->
360                couch_file:close(Fd)
361        after 0 ->
362            ok
363        end,
364        ?LOG_ERROR("~s error opening set view group `~s` (~s), signature `~s',"
365                   " from set `~s`: ~p",
366                   [?MODULE, ?LOG_USERDATA(Group#set_view_group.name),
367                    Group#set_view_group.category, hex_sig(Group),
368                    ?LOG_USERDATA(Group#set_view_group.set_name), Error]),
369        exit(Error)
370    end,
371    proc_lib:init_ack({ok, self()}),
372    gen_server:enter_loop(?MODULE, [], State, 1).
373
374
375do_init({_, SetName, _} = InitArgs) ->
376    case prepare_group(InitArgs, false) of
377    {ok, Group} ->
378        #set_view_group{
379            fd = Fd,
380            index_header = Header,
381            type = Type,
382            category = Category,
383            mod = Mod
384        } = Group,
385        % Send the file handler to yourself, so that in case of an error
386        % we can clean up the write guard properly
387        self() ! {group_fd, Fd},
388        RefCounter = new_fd_ref_counter(Fd),
389        case Header#set_view_index_header.has_replica of
390        false ->
391            ReplicaPid = nil,
392            ReplicaParts = [];
393        true ->
394            ReplicaPid = open_replica_group(InitArgs),
395            maybe_fix_replica_group(ReplicaPid, Group),
396            ReplicaParts = get_replica_partitions(ReplicaPid)
397        end,
398        ViewCount = length(Group#set_view_group.views),
399        case Header#set_view_index_header.num_partitions > 0 of
400        false ->
401            ?LOG_INFO("Started undefined ~s (~s) set view group `~s`,"
402                      " group `~s`,  signature `~s', view count: ~p",
403                      [Type, Category, ?LOG_USERDATA(SetName),
404                       ?LOG_USERDATA(Group#set_view_group.name), hex_sig(Group), ViewCount]);
405        true ->
406            ?LOG_INFO("Started ~s (~s) set view group `~s`, group `~s`,"
407                      " signature `~s', view count ~p~n"
408                      "active partitions:      ~s~n"
409                      "passive partitions:     ~s~n"
410                      "cleanup partitions:     ~s~n"
411                      "unindexable partitions: ~w~n"
412                      "~sreplica support~n" ++
413                      case Header#set_view_index_header.has_replica of
414                      true ->
415                          "replica partitions: ~s~n"
416                          "replica partitions on transfer: ~s~n";
417                      false ->
418                          ""
419                      end,
420                      [Type, Category, ?LOG_USERDATA(SetName), ?LOG_USERDATA(Group#set_view_group.name),
421                       hex_sig(Group), ViewCount,
422                       condense(couch_set_view_util:decode_bitmask(
423                                  Header#set_view_index_header.abitmask)),
424                       condense(couch_set_view_util:decode_bitmask(
425                                  Header#set_view_index_header.pbitmask)),
426                       condense(couch_set_view_util:decode_bitmask(
427                                  Header#set_view_index_header.cbitmask)),
428                       Header#set_view_index_header.unindexable_seqs,
429                       case Header#set_view_index_header.has_replica of
430                       true ->
431                           "";
432                       false ->
433                           "no "
434                       end] ++
435                       case Header#set_view_index_header.has_replica of
436                       true ->
437                           [condense(ReplicaParts),
438                            condense(?set_replicas_on_transfer(Group))];
439                       false ->
440                           []
441                       end)
442        end,
443        couch_set_view_mapreduce:get_map_context(Group),
444        DcpFlags = case couch_set_view_mapreduce:is_doc_used(Group) of
445        {ok, doc_fields_unused} ->
446            ?LOG_INFO("~s set view group `~s`, set `~s` (~s), doc_fields_unused",
447              [Type, ?LOG_USERDATA(Group#set_view_group.name), ?LOG_USERDATA(SetName), Category]),
448            ?DCP_OPEN_NO_VALUE_WITH_UNDERLYING_DATATYPE bor ?DCP_OPEN_INCLUDE_XATTRS;
449        {ok, doc_fields_used} ->
450            ?LOG_INFO("~s set view group `~s`, set `~s` (~s), doc_fields_used",
451              [Type, ?LOG_USERDATA(Group#set_view_group.name), ?LOG_USERDATA(SetName), Category]),
452            ?DCP_OPEN_INCLUDE_XATTRS
453        end,
454        DcpName = <<(atom_to_binary(Mod, latin1))/binary, ": ",
455            SetName/binary, " ", (Group#set_view_group.name)/binary,
456            " (", (atom_to_binary(Category, latin1))/binary, "/",
457            (atom_to_binary(Type, latin1))/binary, ")">>,
458        {User, Passwd} = get_auth(),
459        DcpBufferSize = list_to_integer(couch_config:get("dcp",
460            "flow_control_buffer_size", ?DCP_CONTROL_BUFFER_SIZE)),
461        ?LOG_INFO("Flow control buffer size is ~p bytes", [DcpBufferSize]),
462
463        case couch_dcp_client:start(DcpName, SetName, User, Passwd,
464            DcpBufferSize, DcpFlags) of
465        {ok, DcpPid} ->
466            Group2 = maybe_upgrade_header(Group, DcpPid),
467            State = #state{
468                init_args = InitArgs,
469                replica_group = ReplicaPid,
470                replica_partitions = ReplicaParts,
471                group = Group2#set_view_group{
472                    ref_counter = RefCounter,
473                    replica_pid = ReplicaPid,
474                    dcp_pid = DcpPid
475                }
476            },
477            init_seqs_cache(),
478            true = ets:insert(
479                 Group#set_view_group.stats_ets,
480                 #set_view_group_stats{ets_key = ?set_view_group_stats_key(Group)}),
481            TmpDir = updater_tmp_dir(State),
482            ok = couch_set_view_util:delete_sort_files(TmpDir, all),
483            reset_util_stats(),
484            {ok, maybe_apply_pending_transition(State)};
485        Error ->
486            couch_file:close(Fd),
487            throw(Error)
488        end;
489    Error ->
490        throw(Error)
491    end.
492
493handle_call(get_sig, _From, #state{group = Group} = State) ->
494    {reply, {ok, Group#set_view_group.sig}, State, ?GET_TIMEOUT(State)};
495
496handle_call({set_auto_cleanup, Enabled}, _From, State) ->
497    % To be used only by unit tests.
498    {reply, ok, State#state{auto_cleanup = Enabled}, ?GET_TIMEOUT(State)};
499
500handle_call({set_timeout, T}, _From, State) ->
501    % To be used only by unit tests.
502    {reply, ok, State#state{timeout = T}, T};
503
504handle_call({set_auto_transfer_replicas, Enabled}, _From, State) ->
505    % To be used only by unit tests.
506    {reply, ok, State#state{auto_transfer_replicas = Enabled}, ?GET_TIMEOUT(State)};
507
508handle_call({define_view, NumPartitions, _, _, _, _, _}, _From, State)
509        when (not ?is_defined(State)), NumPartitions > ?MAX_NUM_PARTITIONS ->
510    {reply, {error, <<"Too high value for number of partitions">>}, State};
511
512handle_call({define_view, NumPartitions, ActiveList, ActiveBitmask,
513        PassiveList, PassiveBitmask, UseReplicaIndex}, _From, State) when not ?is_defined(State) ->
514    #state{init_args = InitArgs, group = Group} = State,
515    PartitionsList = lists:usort(ActiveList ++ PassiveList),
516    Seqs = lists:map(
517        fun(PartId) -> {PartId, 0} end, PartitionsList),
518    PartVersions = lists:map(
519        fun(PartId) -> {PartId, [{0, 0}]} end, PartitionsList),
520    #set_view_group{
521        name = DDocId,
522        index_header = Header
523    } = Group,
524    NewHeader = Header#set_view_index_header{
525        num_partitions = NumPartitions,
526        abitmask = ActiveBitmask,
527        pbitmask = PassiveBitmask,
528        seqs = Seqs,
529        has_replica = UseReplicaIndex,
530        partition_versions = PartVersions
531    },
532    case (?type(State) =:= main) andalso UseReplicaIndex of
533    false ->
534        ReplicaPid = nil;
535    true ->
536        ReplicaPid = open_replica_group(InitArgs),
537        ok = gen_server:call(ReplicaPid, {define_view, NumPartitions, [], 0, [], 0, false}, infinity)
538    end,
539    NewGroup = Group#set_view_group{
540        index_header = NewHeader,
541        replica_pid = ReplicaPid
542    },
543    State2 = State#state{
544        group = NewGroup,
545        replica_group = ReplicaPid
546    },
547    {ok, HeaderPos} = commit_header(NewGroup),
548    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
549              " configured with:~n"
550              "~p partitions~n"
551              "~sreplica support~n"
552              "initial active partitions ~s~n"
553              "initial passive partitions ~s",
554              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State), ?LOG_USERDATA(DDocId),
555               hex_sig(Group), NumPartitions,
556               case UseReplicaIndex of
557               true ->  "";
558               false -> "no "
559               end,
560        condense(ActiveList),condense(PassiveList)]),
561    NewGroup2 = (State2#state.group)#set_view_group{
562        header_pos = HeaderPos
563    },
564    State3 = State2#state{
565        group = NewGroup2
566    },
567    {reply, ok, State3, ?GET_TIMEOUT(State3)};
568
569handle_call({define_view, _, _, _, _, _, _}, _From, State) ->
570    {reply, view_already_defined, State, ?GET_TIMEOUT(State)};
571
572handle_call(is_view_defined, _From, State) ->
573    {reply, ?is_defined(State), State, ?GET_TIMEOUT(State)};
574
575handle_call(_Msg, _From, State) when not ?is_defined(State) ->
576    {reply, {error, view_undefined}, State};
577
578handle_call({set_state, ActiveList, PassiveList, CleanupList}, _From, State) ->
579    try
580        NewState = maybe_update_partition_states(
581            ActiveList, PassiveList, CleanupList, State),
582        {reply, ok, NewState, ?GET_TIMEOUT(NewState)}
583    catch
584    throw:Error ->
585        {reply, Error, State}
586    end;
587
588handle_call({add_replicas, BitMask}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
589    #state{
590        group = Group,
591        replica_partitions = ReplicaParts
592    } = State,
593    BitMask2 = case BitMask band ?set_abitmask(Group) of
594    0 ->
595        BitMask;
596    Common1 ->
597        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
598                  " set partitions ~s to replica state because they are"
599                  " currently marked as active",
600                  [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
601                   ?LOG_USERDATA(?group_id(State)),
602                   condense(couch_set_view_util:decode_bitmask(Common1))]),
603        BitMask bxor Common1
604    end,
605    BitMask3 = case BitMask2 band ?set_pbitmask(Group) of
606    0 ->
607        BitMask2;
608    Common2 ->
609        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, ignoring request to"
610                  " set partitions  ~s to replica state because they are"
611                  " currently marked as passive",
612                  [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
613                   ?LOG_USERDATA(?group_id(State)),
614                   condense(couch_set_view_util:decode_bitmask(Common2))]),
615        BitMask2 bxor Common2
616    end,
617    Parts = couch_set_view_util:decode_bitmask(BitMask3),
618    ok = set_state(ReplicaPid, [], Parts, []),
619    NewReplicaParts = ordsets:union(ReplicaParts, Parts),
620    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
621              " defined new replica partitions: ~s~n"
622              "New full set of replica partitions is: ~s~n",
623              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
624               ?LOG_USERDATA(?group_id(State)), condense(Parts), condense(NewReplicaParts)]),
625    State2 = State#state{
626        replica_partitions = NewReplicaParts
627    },
628    {reply, ok, State2, ?GET_TIMEOUT(State2)};
629
630handle_call({remove_replicas, Partitions}, _From, #state{replica_group = ReplicaPid} = State) when is_pid(ReplicaPid) ->
631    #state{
632        replica_partitions = ReplicaParts,
633        group = Group
634    } = State,
635    case ordsets:intersection(?set_replicas_on_transfer(Group), Partitions) of
636    [] ->
637        ok = set_state(ReplicaPid, [], [], Partitions),
638        NewState = State#state{
639            replica_partitions = ordsets:subtract(ReplicaParts, Partitions)
640        };
641    Common ->
642        UpdaterWasRunning = is_pid(State#state.updater_pid),
643        State2 = stop_cleaner(State),
644        #state{group = Group3} = State3 = stop_updater(State2),
645        {ok, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs, NewVersions} =
646            set_cleanup_partitions(
647                Common,
648                ?set_abitmask(Group3),
649                ?set_pbitmask(Group3),
650                ?set_cbitmask(Group3),
651                ?set_seqs(Group3),
652                ?set_partition_versions(Group)),
653        case NewCbitmask =/= ?set_cbitmask(Group3) of
654        true ->
655             State4 = stop_compactor(State3);
656        false ->
657             State4 = State3
658        end,
659        ReplicaPartitions2 = ordsets:subtract(ReplicaParts, Common),
660        ReplicasOnTransfer2 = ordsets:subtract(?set_replicas_on_transfer(Group3), Common),
661        State5 = update_header(
662            State4,
663            NewAbitmask,
664            NewPbitmask,
665            NewCbitmask,
666            NewSeqs,
667            ?set_unindexable_seqs(State4#state.group),
668            ReplicasOnTransfer2,
669            ReplicaPartitions2,
670            ?set_pending_transition(State4#state.group),
671            NewVersions),
672        ok = set_state(ReplicaPid, [], [], Partitions),
673        case UpdaterWasRunning of
674        true ->
675            State6 = start_updater(State5);
676        false ->
677            State6 = State5
678        end,
679        NewState = maybe_start_cleaner(State6)
680    end,
681    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, marked the following"
682              " replica partitions for removal: ~s",
683              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
684               ?LOG_USERDATA(?group_id(State)), condense(Partitions)]),
685    {reply, ok, NewState, ?GET_TIMEOUT(NewState)};
686
687handle_call({mark_as_unindexable, Partitions}, _From, State) ->
688    try
689        State2 = process_mark_as_unindexable(State, Partitions),
690        {reply, ok, State2, ?GET_TIMEOUT(State2)}
691    catch
692    throw:Error ->
693        {reply, Error, State, ?GET_TIMEOUT(State)}
694    end;
695
696handle_call({mark_as_indexable, Partitions}, _From, State) ->
697    try
698        State2 = process_mark_as_indexable(State, Partitions),
699        {reply, ok, State2, ?GET_TIMEOUT(State2)}
700    catch
701    throw:Error ->
702        {reply, Error, State, ?GET_TIMEOUT(State)}
703    end;
704
705handle_call(increment_stat, _From, State) ->
706    ?inc_accesses(State#state.group),
707    {reply, ok, State, ?GET_TIMEOUT(State)};
708
709handle_call(#set_view_group_req{} = Req, From, State) ->
710    #state{
711        group = Group,
712        pending_transition_waiters = Waiters
713    } = State,
714    State2 = case is_any_partition_pending(Req, Group) of
715    false ->
716        process_view_group_request(Req, From, State);
717    true ->
718        State#state{pending_transition_waiters = [{From, Req} | Waiters]}
719    end,
720    inc_view_group_access_stats(Req, State2#state.group),
721    {noreply, State2, ?GET_TIMEOUT(State2)};
722
723handle_call(request_group, _From, #state{group = Group} = State) ->
724    % Meant to be called only by this module and the compactor module.
725    % Callers aren't supposed to read from the group's fd, we don't
726    % increment here the ref counter on behalf of the caller.
727    {reply, {ok, Group}, State, ?GET_TIMEOUT(State)};
728
729handle_call(replica_pid, _From, #state{replica_group = Pid} = State) ->
730    % To be used only by unit tests.
731    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
732
733handle_call({start_updater, Options}, _From, State) ->
734    % To be used only by unit tests.
735    State2 = start_updater(State, Options),
736    {reply, {ok, State2#state.updater_pid}, State2, ?GET_TIMEOUT(State2)};
737
738handle_call(start_cleaner, _From, State) ->
739    % To be used only by unit tests.
740    State2 = maybe_start_cleaner(State#state{auto_cleanup = true}),
741    State3 = State2#state{auto_cleanup = State#state.auto_cleanup},
742    {reply, {ok, State2#state.cleaner_pid}, State3, ?GET_TIMEOUT(State3)};
743
744handle_call(updater_pid, _From, #state{updater_pid = Pid} = State) ->
745    % To be used only by unit tests.
746    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
747
748handle_call(cleaner_pid, _From, #state{cleaner_pid = Pid} = State) ->
749    % To be used only by unit tests.
750    {reply, {ok, Pid}, State, ?GET_TIMEOUT(State)};
751
752handle_call({test_rollback, RollbackSeqs}, _From, State0) ->
753    % To be used only by unit tests.
754    Response = case rollback(State0, RollbackSeqs) of
755    {ok, State} ->
756        ok;
757    {error, {cannot_rollback, State}} ->
758        cannot_rollback
759    end,
760    {reply, Response, State, ?GET_TIMEOUT(State)};
761
762handle_call(request_group_info, _From, State) ->
763    GroupInfo = get_group_info(State),
764    {reply, {ok, GroupInfo}, State, ?GET_TIMEOUT(State)};
765
766handle_call(get_data_size, _From, State) ->
767    DataSizeInfo = get_data_size_info(State),
768    {reply, {ok, DataSizeInfo}, State, ?GET_TIMEOUT(State)};
769
770handle_call({start_compact, _CompactFun}, _From,
771            #state{updater_pid = UpPid, initial_build = true} = State) when is_pid(UpPid) ->
772    {reply, {error, initial_build}, State, ?GET_TIMEOUT(State)};
773handle_call({start_compact, CompactFun}, _From, #state{compactor_pid = nil} = State) ->
774    #state{compactor_pid = Pid} = State2 = start_compactor(State, CompactFun),
775    {reply, {ok, Pid}, State2, ?GET_TIMEOUT(State2)};
776handle_call({start_compact, _}, _From, State) ->
777    %% compact already running, this is a no-op
778    {reply, {ok, State#state.compactor_pid}, State};
779
780handle_call({compact_done, Result}, {Pid, _}, #state{compactor_pid = Pid} = State) ->
781    #state{
782        update_listeners = Listeners,
783        group = Group,
784        updater_pid = UpdaterPid,
785        compactor_pid = CompactorPid
786    } = State,
787    #set_view_group{
788        fd = OldFd,
789        ref_counter = RefCounter,
790        filepath = OldFilepath
791    } = Group,
792    #set_view_compactor_result{
793        group = NewGroup0,
794        compact_time = Duration,
795        cleanup_kv_count = CleanupKVCount
796    } = Result,
797
798    MissingChangesCount = couch_set_view_util:missing_changes_count(
799        ?set_seqs(Group), ?set_seqs(NewGroup0)),
800    case MissingChangesCount == 0 of
801    true ->
802        % Compactor might have received a group snapshot from an updater.
803        NewGroup = fix_updater_group(NewGroup0, Group),
804        HeaderBin = couch_set_view_util:group_to_header_bin(NewGroup),
805        {ok, NewHeaderPos} = couch_file:write_header_bin(
806            NewGroup#set_view_group.fd, HeaderBin),
807        if is_pid(UpdaterPid) ->
808            ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compact group"
809                      " up to date - restarting updater",
810                      [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
811                       ?LOG_USERDATA(?group_id(State))]),
812            % Decided to switch to compacted file
813            % Compactor has caught up and hence discard the running updater
814            couch_set_view_util:shutdown_wait(UpdaterPid);
815        true ->
816            ok
817        end,
818        NewFilepath = increment_filepath(Group),
819        NewRefCounter = new_fd_ref_counter(NewGroup#set_view_group.fd),
820        case ?set_replicas_on_transfer(Group) /= ?set_replicas_on_transfer(NewGroup) of
821        true ->
822            % Set of replicas on transfer changed while compaction was running.
823            % Just write a new header with the new set of replicas on transfer and all the
824            % metadata that is updated when that set changes (active and passive bitmasks).
825            % This happens only during (or after, for a short period) a cluster rebalance or
826            % failover. This shouldn't take too long, as we are writing and fsync'ing only
827            % one header, all data was already fsync'ed by the compactor process.
828            NewGroup2 = NewGroup#set_view_group{
829                ref_counter = NewRefCounter,
830                filepath = NewFilepath,
831                index_header = (NewGroup#set_view_group.index_header)#set_view_index_header{
832                    replicas_on_transfer = ?set_replicas_on_transfer(Group),
833                    abitmask = ?set_abitmask(Group),
834                    pbitmask = ?set_pbitmask(Group)
835                }
836            },
837            {ok, NewHeaderPos2} = commit_header(NewGroup2);
838        false ->
839            % The compactor process committed an header with up to date state information and
840            % did an fsync before calling us. No need to commit a new header here (and fsync).
841            NewGroup2 = NewGroup#set_view_group{
842                ref_counter = NewRefCounter,
843                filepath = NewFilepath
844	    },
845	    NewHeaderPos2 = NewHeaderPos
846        end,
847        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, compaction complete"
848                  " in ~.3f seconds, filtered ~p key-value pairs",
849                  [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
850                   ?LOG_USERDATA(?group_id(State)), Duration, CleanupKVCount]),
851        inc_util_stat(#util_stats.compaction_time, Duration),
852        ok = couch_file:only_snapshot_reads(OldFd),
853        %% After rename call we're sure the header was written to the file
854        %% (no need for couch_file:flush/1 call).
855        ok = couch_file:rename(NewGroup#set_view_group.fd, NewFilepath),
856        ok = couch_file:delete(?root_dir(State), OldFilepath),
857
858        %% cleanup old group
859        unlink(CompactorPid),
860        couch_ref_counter:drop(RefCounter),
861
862        NewUpdaterPid =
863        if is_pid(UpdaterPid) ->
864            CurSeqs = indexable_partition_seqs(State),
865            spawn_link(couch_set_view_updater,
866                       update,
867                       [self(), NewGroup2, CurSeqs, false, updater_tmp_dir(State), []]);
868        true ->
869            nil
870        end,
871
872        Listeners2 = notify_update_listeners(State, Listeners, NewGroup2),
873        State2 = State#state{
874            update_listeners = Listeners2,
875            compactor_pid = nil,
876            compactor_file = nil,
877            compactor_fun = nil,
878            compact_log_files = nil,
879            compactor_retry_number = 0,
880            updater_pid = NewUpdaterPid,
881            initial_build = is_pid(NewUpdaterPid) andalso
882                    couch_set_view_util:is_initial_build(NewGroup2),
883            updater_state = case is_pid(NewUpdaterPid) of
884                true -> starting;
885                false -> not_running
886            end,
887            group = NewGroup2#set_view_group{
888                header_pos = NewHeaderPos2
889            }
890        },
891        inc_compactions(Result),
892        {reply, ok, maybe_apply_pending_transition(State2), ?GET_TIMEOUT(State2)};
893    false ->
894        State2 = State#state{
895            compactor_retry_number = State#state.compactor_retry_number + 1
896        },
897        {reply, {update, MissingChangesCount}, State2, ?GET_TIMEOUT(State2)}
898    end;
899handle_call({compact_done, _Result}, _From, State) ->
900    % From a previous compactor that was killed/stopped, ignore.
901    {noreply, State, ?GET_TIMEOUT(State)};
902
903handle_call(cancel_compact, _From, #state{compactor_pid = nil} = State) ->
904    {reply, ok, State, ?GET_TIMEOUT(State)};
905handle_call(cancel_compact, _From, #state{compactor_pid = Pid} = State) ->
906    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
907              " canceling compaction (pid ~p)",
908              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
909               ?LOG_USERDATA(?group_id(State)), Pid]),
910    State2 = stop_compactor(State),
911    State3 = maybe_start_cleaner(State2),
912    {reply, ok, State3, ?GET_TIMEOUT(State3)};
913
914handle_call({monitor_partition_update, PartId, _Ref, _Pid}, _From, State)
915        when PartId >= ?set_num_partitions(State#state.group) ->
916    Msg = io_lib:format("Invalid partition: ~p", [PartId]),
917    {reply, {error, iolist_to_binary(Msg)}, State, ?GET_TIMEOUT(State)};
918
919handle_call({monitor_partition_update, PartId, Ref, Pid}, _From, State) ->
920    try
921        State2 = process_monitor_partition_update(State, PartId, Ref, Pid),
922        {reply, ok, State2, ?GET_TIMEOUT(State2)}
923    catch
924    throw:Error ->
925        {reply, Error, State, ?GET_TIMEOUT(State)}
926    end;
927
928handle_call({demonitor_partition_update, Ref}, _From, State) ->
929    #state{update_listeners = Listeners} = State,
930    case dict:find(Ref, Listeners) of
931    error ->
932        {reply, ok, State, ?GET_TIMEOUT(State)};
933    {ok, #up_listener{monref = MonRef, partition = PartId}} ->
934        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, removing partition ~p"
935                   "update monitor, reference ~p",
936                   [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
937                    ?LOG_USERDATA(?group_id(State)), PartId, Ref]),
938        erlang:demonitor(MonRef, [flush]),
939        State2 = State#state{update_listeners = dict:erase(Ref, Listeners)},
940        {reply, ok, State2, ?GET_TIMEOUT(State2)}
941    end;
942
943handle_call(compact_log_files, _From, State) ->
944    {Files0, Seqs, PartVersions} = State#state.compact_log_files,
945    Files = lists:map(fun lists:reverse/1, Files0),
946    NewState = State#state{compact_log_files = nil},
947    {reply, {ok, {Files, Seqs, PartVersions}}, NewState, ?GET_TIMEOUT(NewState)};
948
949handle_call(reset_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
950    reset_util_stats(),
951    case is_pid(RepPid) of
952    true ->
953        ok = gen_server:call(RepPid, reset_utilization_stats, infinity);
954    false ->
955        ok
956    end,
957    {reply, ok, State, ?GET_TIMEOUT(State)};
958
959handle_call(get_utilization_stats, _From, #state{replica_group = RepPid} = State) ->
960    Stats = erlang:get(util_stats),
961    UsefulIndexing = Stats#util_stats.useful_indexing_time,
962    WastedIndexing = Stats#util_stats.wasted_indexing_time,
963    StatNames = record_info(fields, util_stats),
964    StatPoses = lists:seq(2, record_info(size, util_stats)),
965    StatsList0 = lists:foldr(
966        fun({StatName, StatPos}, Acc) ->
967            Val = element(StatPos, Stats),
968            [{StatName, Val} | Acc]
969        end,
970        [], lists:zip(StatNames, StatPoses)),
971    StatsList1 = [{total_indexing_time, UsefulIndexing + WastedIndexing} | StatsList0],
972    case is_pid(RepPid) of
973    true ->
974        {ok, RepStats} = gen_server:call(RepPid, get_utilization_stats, infinity),
975        StatsList = StatsList1 ++ [{replica_utilization_stats, {RepStats}}];
976    false ->
977        StatsList = StatsList1
978    end,
979    {reply, {ok, StatsList}, State, ?GET_TIMEOUT(State)};
980
981handle_call(before_master_delete, _From, State) ->
982    Error = {error, {db_deleted, ?master_dbname((?set_name(State)))}},
983    State2 = reply_all(State, Error),
984    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, going to shutdown because "
985              "master database is being deleted",
986              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
987               ?LOG_USERDATA(?group_id(State))]),
988    {stop, shutdown, ok, State2}.
989
990
991handle_cast(_Msg, State) when not ?is_defined(State) ->
992    {noreply, State};
993
994handle_cast({compact_log_files, Files, Seqs, PartVersions, _Init},
995                                #state{compact_log_files = nil} = State) ->
996    LogList = lists:map(fun(F) -> [F] end, Files),
997    {noreply, State#state{compact_log_files = {LogList, Seqs, PartVersions}},
998        ?GET_TIMEOUT(State)};
999
1000handle_cast({compact_log_files, Files, NewSeqs, NewPartVersions, Init}, State) ->
1001    LogList = case Init of
1002    true ->
1003        lists:map(fun(F) -> [F] end, Files);
1004    false ->
1005        {OldL, _OldSeqs, _OldPartVersions} = State#state.compact_log_files,
1006        lists:zipwith(
1007            fun(F, Current) -> [F | Current] end,
1008            Files, OldL)
1009    end,
1010    {noreply, State#state{compact_log_files = {LogList, NewSeqs, NewPartVersions}},
1011        ?GET_TIMEOUT(State)};
1012
1013handle_cast({ddoc_updated, NewSig, Aliases}, State) ->
1014    #state{
1015        waiting_list = Waiters,
1016        group = #set_view_group{sig = CurSig}
1017    } = State,
1018    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s',"
1019              " design document was updated~n"
1020              "  new signature:   ~s~n"
1021              "  current aliases: ~p~n"
1022              "  shutdown flag:   ~s~n"
1023              "  waiting clients: ~p~n",
1024              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1025               ?LOG_USERDATA(?group_id(State)), hex_sig(CurSig), hex_sig(NewSig), Aliases,
1026               State#state.shutdown, length(Waiters)]),
1027    case NewSig of
1028    CurSig ->
1029        State2 = State#state{shutdown = false, shutdown_aliases = undefined},
1030        {noreply, State2, ?GET_TIMEOUT(State2)};
1031    <<>> ->
1032        remove_mapreduce_context_store(State#state.group),
1033        State2 = State#state{shutdown = true, shutdown_aliases = Aliases},
1034        Error = {error, <<"Design document was deleted">>},
1035        {stop, normal, reply_all(State2, Error)};
1036    _ ->
1037        remove_mapreduce_context_store(State#state.group),
1038        State2 = State#state{shutdown = true, shutdown_aliases = Aliases},
1039        case Waiters of
1040        [] ->
1041            {stop, normal, State2};
1042        _ ->
1043            {noreply, State2}
1044        end
1045    end;
1046
1047handle_cast({update, MinNumChanges}, #state{group = Group} = State) ->
1048    case is_pid(State#state.updater_pid) of
1049    true ->
1050        {noreply, State};
1051    false ->
1052        CurSeqs = indexable_partition_seqs(State),
1053        MissingCount = couch_set_view_util:missing_changes_count(CurSeqs, ?set_seqs(Group)),
1054        case (MissingCount >= MinNumChanges) andalso (MissingCount > 0) of
1055        true ->
1056            {noreply, do_start_updater(State, CurSeqs, [])};
1057        false ->
1058            {noreply, State}
1059        end
1060    end;
1061
1062handle_cast({update_replica, _MinNumChanges}, #state{replica_group = nil} = State) ->
1063    {noreply, State};
1064
1065handle_cast({update_replica, MinNumChanges}, #state{replica_group = Pid} = State) ->
1066    ok = gen_server:cast(Pid, {update, MinNumChanges}),
1067    {noreply, State}.
1068
1069
1070handle_info({'DOWN', MRef, process, Pid, _Reason}, State) ->
1071    #state{update_listeners = Listeners} = State,
1072    case dict:find(MRef, Listeners) of
1073    error ->
1074        ?LOG_INFO("Listner ~p is down and its reference ~p is not stored in dictonary", [Pid, MRef]),
1075        {noreply, State};
1076    {ok, #up_listener{monref = MonRef, partition = PartId}} ->
1077        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, removing partition ~p"
1078                   "update monitor, reference ~p",
1079                   [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1080                    ?LOG_USERDATA(?group_id(State)), PartId, MRef]),
1081        erlang:demonitor(MonRef, [flush]),
1082        State2 = State#state{update_listeners = dict:erase(MRef, Listeners)},
1083        {noreply, State2}
1084    end;
1085
1086handle_info(timeout, State) when not ?is_defined(State) ->
1087    {noreply, State};
1088
1089handle_info(timeout, #state{group = Group} = State) ->
1090    TransferReplicas = (?set_replicas_on_transfer(Group) /= []) andalso
1091        State#state.auto_transfer_replicas,
1092    case TransferReplicas orelse (dict:size(State#state.update_listeners) > 0) of
1093    true ->
1094        {noreply, start_updater(State)};
1095    false ->
1096        {noreply, maybe_start_cleaner(State)}
1097    end;
1098
1099handle_info({partial_update, Pid, AckTo, NewGroup}, #state{updater_pid = Pid} = State) ->
1100    case ?have_pending_transition(State) andalso
1101        (?set_cbitmask(NewGroup) =:= 0) andalso
1102        (?set_cbitmask(State#state.group) =/= 0) andalso
1103        (State#state.waiting_list =:= []) of
1104    true ->
1105        State2 = process_partial_update(State, NewGroup),
1106        AckTo ! update_processed,
1107        State3 = stop_updater(State2),
1108        NewState = maybe_apply_pending_transition(State3);
1109    false ->
1110        NewState = process_partial_update(State, NewGroup),
1111        AckTo ! update_processed
1112    end,
1113    {noreply, NewState};
1114handle_info({partial_update, _, AckTo, _}, State) ->
1115    %% message from an old (probably pre-compaction) updater; ignore
1116    AckTo ! update_processed,
1117    {noreply, State, ?GET_TIMEOUT(State)};
1118
1119handle_info({updater_info, Pid, {state, UpdaterState}}, #state{updater_pid = Pid} = State) ->
1120    #state{
1121        group = Group,
1122        waiting_list = WaitList,
1123        replica_partitions = RepParts
1124    } = State,
1125    State2 = State#state{updater_state = UpdaterState},
1126    case UpdaterState of
1127    updating_passive ->
1128        WaitList2 = reply_with_group(Group, RepParts, WaitList),
1129        State3 = State2#state{waiting_list = WaitList2},
1130        case State#state.shutdown of
1131        true ->
1132            State4 = stop_updater(State3),
1133            {stop, normal, State4};
1134        false ->
1135            State4 = maybe_apply_pending_transition(State3),
1136            {noreply, State4}
1137        end;
1138    _ ->
1139        {noreply, State2}
1140    end;
1141
1142handle_info({updater_info, _Pid, {state, _UpdaterState}}, State) ->
1143    % Message from an old updater, ignore.
1144    {noreply, State, ?GET_TIMEOUT(State)};
1145
1146handle_info({'EXIT', Pid, {clean_group, CleanGroup, Count, Time}}, #state{cleaner_pid = Pid} = State) ->
1147    #state{group = OldGroup} = State,
1148    case CleanGroup#set_view_group.mod of
1149    % The mapreduce view cleaner is a native C based one that writes the
1150    % information to disk.
1151    mapreduce_view ->
1152        {ok, NewGroup0} =
1153             couch_set_view_util:refresh_viewgroup_header(CleanGroup);
1154    spatial_view ->
1155        NewGroup0 = CleanGroup
1156    end,
1157    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
1158    ?LOG_INFO("Cleanup finished for set view `~s`, ~s (~s) group `~s`~n"
1159              "Removed ~p values from the index in ~.3f seconds~n"
1160              "active partitions before:  ~s~n"
1161              "active partitions after:   ~s~n"
1162              "passive partitions before: ~s~n"
1163              "passive partitions after:  ~s~n"
1164              "cleanup partitions before: ~s~n"
1165              "cleanup partitions after:  ~s~n" ++
1166          case is_pid(State#state.replica_group) of
1167          true ->
1168              "Current set of replica partitions: ~s~n"
1169              "Current set of replicas on transfer: ~s~n";
1170          false ->
1171              []
1172          end,
1173          [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State), ?LOG_USERDATA(?group_id(State)),
1174           Count, Time,
1175           condense(couch_set_view_util:decode_bitmask(?set_abitmask(OldGroup))),
1176           condense(couch_set_view_util:decode_bitmask(?set_abitmask(NewGroup))),
1177           condense(couch_set_view_util:decode_bitmask(?set_pbitmask(OldGroup))),
1178           condense(couch_set_view_util:decode_bitmask(?set_pbitmask(NewGroup))),
1179           condense(couch_set_view_util:decode_bitmask(?set_cbitmask(OldGroup))),
1180           condense(couch_set_view_util:decode_bitmask(?set_cbitmask(NewGroup)))] ++
1181              case is_pid(State#state.replica_group) of
1182              true ->
1183                  [condense(State#state.replica_partitions),
1184                   condense(?set_replicas_on_transfer(NewGroup))];
1185              false ->
1186                  []
1187              end),
1188    State2 = State#state{
1189        cleaner_pid = nil,
1190        group = NewGroup
1191    },
1192    inc_cleanups(State2#state.group, Time, Count, false),
1193    {noreply, maybe_apply_pending_transition(State2)};
1194
1195handle_info({'EXIT', Pid, Reason}, #state{cleaner_pid = Pid, group = Group} = State) ->
1196    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1197    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`, cleanup process ~p"
1198               " died with unexpected reason: ~p",
1199               [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1200                ?LOG_USERDATA(?group_id(State)), Pid, Reason]),
1201    {noreply, State#state{cleaner_pid = nil}, ?GET_TIMEOUT(State)};
1202
1203handle_info({'EXIT', Pid, {updater_finished, Result}}, #state{updater_pid = Pid} = State) ->
1204    #set_view_updater_result{
1205        stats = Stats,
1206        group = #set_view_group{filepath = Path} = NewGroup0,
1207        tmp_file = BuildFile
1208    } = Result,
1209    #set_view_updater_stats{
1210        indexing_time = IndexingTime,
1211        blocked_time = BlockedTime,
1212        inserted_ids = InsertedIds,
1213        deleted_ids = DeletedIds,
1214        inserted_kvs = InsertedKVs,
1215        deleted_kvs = DeletedKVs,
1216        cleanup_kv_count = CleanupKVCount,
1217        seqs = SeqsDone
1218    } = Stats,
1219    case State#state.initial_build of
1220    true ->
1221        NewRefCounter = new_fd_ref_counter(BuildFile),
1222        ok = couch_file:only_snapshot_reads(NewGroup0#set_view_group.fd),
1223        ok = couch_file:delete(?root_dir(State), Path),
1224        ok = couch_file:rename(BuildFile, Path),
1225        couch_ref_counter:drop(NewGroup0#set_view_group.ref_counter),
1226        NewGroup = NewGroup0#set_view_group{
1227            ref_counter = NewRefCounter,
1228            fd = BuildFile
1229        };
1230    false ->
1231        ok = couch_file:refresh_eof(NewGroup0#set_view_group.fd),
1232        NewGroup = NewGroup0
1233    end,
1234    State2 = process_partial_update(State, NewGroup),
1235    #state{
1236        waiting_list = WaitList,
1237        replica_partitions = ReplicaParts,
1238        shutdown = Shutdown,
1239        group = NewGroup2,
1240        update_listeners = UpdateListeners2,
1241        initial_build = InitialBuild
1242    } = State2,
1243    inc_updates(NewGroup2, Result, false, false),
1244    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, updater finished~n"
1245              "Indexing time: ~.3f seconds~n"
1246              "Blocked time:  ~.3f seconds~n"
1247              "Inserted IDs:  ~p~n"
1248              "Deleted IDs:   ~p~n"
1249              "Inserted KVs:  ~p~n"
1250              "Deleted KVs:   ~p~n"
1251              "Cleaned KVs:   ~p~n"
1252              "# seqs done:   ~p~n",
1253              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1254               ?LOG_USERDATA(?group_id(State)), IndexingTime, BlockedTime, InsertedIds,
1255               DeletedIds, InsertedKVs, DeletedKVs, CleanupKVCount, SeqsDone]),
1256    UpdaterRestarted = case InitialBuild of
1257    true ->
1258        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1259                  " initial index build of on-disk items is done,"
1260                  " restart updater to index in-memory items in case"
1261                  " there are any",
1262                  [?LOG_USERDATA(?set_name(State2)), ?type(State2), ?category(State2),
1263                   ?LOG_USERDATA(?group_id(State2))]),
1264        StoppedUpdaterState = State2#state{
1265            updater_pid = nil,
1266            initial_build = false,
1267            updater_state = not_running
1268        },
1269        State3 = start_updater(StoppedUpdaterState),
1270        WaitList2 = State3#state.waiting_list,
1271        is_pid(State3#state.updater_pid);
1272    false ->
1273        WaitList2 = reply_with_group(NewGroup2, ReplicaParts, WaitList),
1274        State3 = State2,
1275        false
1276    end,
1277    case UpdaterRestarted of
1278    true ->
1279        {noreply, State3, ?GET_TIMEOUT(State3)};
1280    false ->
1281        case Shutdown andalso (WaitList2 == []) of
1282        true ->
1283            {stop, normal, State3#state{waiting_list = []}};
1284        false ->
1285            State4 = State3#state{
1286                updater_pid = nil,
1287                initial_build = false,
1288                updater_state = not_running,
1289                waiting_list = WaitList2
1290            },
1291            State5 = maybe_apply_pending_transition(State4),
1292            State6 = case (WaitList2 /= []) orelse (dict:size(UpdateListeners2) > 0) of
1293            true ->
1294                start_updater(State5);
1295            false ->
1296                State5
1297            end,
1298            State7 = maybe_start_cleaner(State6),
1299            {noreply, State7, ?GET_TIMEOUT(State7)}
1300        end
1301    end;
1302
1303handle_info({'EXIT', Pid, {updater_error, purge}}, #state{updater_pid = Pid} = State) ->
1304    State2 = reset_group_from_state(State),
1305    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because updater"
1306              " detected missed document deletes (purge)",
1307              [?LOG_USERDATA(?set_name(State)), ?type(State),
1308               ?category(State), ?LOG_USERDATA(?group_id(State))]),
1309    State3 = start_updater(State2),
1310    {noreply, State3, ?GET_TIMEOUT(State3)};
1311
1312handle_info({'EXIT', Pid, {updater_error, {rollback, RollbackSeqs}}},
1313        #state{updater_pid = Pid} = State0) ->
1314    Rollback = rollback(State0, RollbackSeqs),
1315    State2 = case Rollback of
1316    {ok, State} ->
1317        ?LOG_INFO(
1318            "Set view `~s`, ~s (~s) group `~s`, group update because group"
1319            " needed to be rolled back",
1320            [?LOG_USERDATA(?set_name(State)), ?type(State),
1321             ?category(State), ?LOG_USERDATA(?group_id(State))]),
1322        State#state{
1323            updater_pid = nil,
1324            initial_build = false,
1325            updater_state = not_running
1326        };
1327    {error, {cannot_rollback, State}} ->
1328        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, group reset because "
1329            "a rollback wasn't possible",
1330            [?LOG_USERDATA(?set_name(State)), ?type(State),
1331             ?category(State), ?LOG_USERDATA(?group_id(State))]),
1332        reset_group_from_state(State)
1333    end,
1334    State3 = start_updater(State2),
1335    {noreply, State3, ?GET_TIMEOUT(State3)};
1336
1337handle_info({'EXIT', Pid, {updater_error, Error}}, #state{updater_pid = Pid, group = Group} = State) ->
1338    ok = couch_file:refresh_eof(Group#set_view_group.fd),
1339    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1340               " received error from updater: ~s",
1341               [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1342                ?LOG_USERDATA(?group_id(State)), ?LOG_USERDATA(Error)]),
1343    Listeners2 = error_notify_update_listeners(
1344        State, State#state.update_listeners, {updater_error, Error}),
1345    State2 = State#state{
1346        updater_pid = nil,
1347        initial_build = false,
1348        updater_state = not_running,
1349        update_listeners = Listeners2
1350    },
1351    ?inc_updater_errors(State2#state.group),
1352    case State#state.shutdown of
1353    true ->
1354        {stop, normal, reply_all(State2, {error, Error})};
1355    false ->
1356        Error2 = case Error of
1357        {_, 86, Msg} ->
1358            {error, <<"Reducer: ", Msg/binary>>};
1359        {_, 87, _} ->
1360            {error, <<"reduction too large">>};
1361        {_, _Reason} ->
1362            Error;
1363        _ ->
1364            {error, Error}
1365        end,
1366        State3 = reply_all(State2, Error2),
1367        {noreply, maybe_start_cleaner(State3), ?GET_TIMEOUT(State3)}
1368    end;
1369
1370handle_info({'EXIT', _Pid, {updater_error, _Error}}, State) ->
1371    % from old, shutdown updater, ignore
1372    {noreply, State, ?GET_TIMEOUT(State)};
1373
1374handle_info({'EXIT', UpPid, reset}, #state{updater_pid = UpPid} = State) ->
1375    % TODO: once purge support is properly added, this needs to take into
1376    % account the replica index.
1377    State2 = stop_cleaner(State),
1378    case prepare_group(State#state.init_args, true) of
1379    {ok, ResetGroup} ->
1380        {ok, start_updater(State2#state{group = ResetGroup})};
1381    Error ->
1382        {stop, normal, reply_all(State2, Error)}
1383    end;
1384
1385handle_info({'EXIT', Pid, normal}, State) ->
1386    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
1387              " linked PID ~p stopped normally",
1388              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1389               ?LOG_USERDATA(?group_id(State)), Pid]),
1390    {noreply, State, ?GET_TIMEOUT(State)};
1391
1392handle_info({'EXIT', Pid, Reason}, #state{compactor_pid = Pid} = State) ->
1393    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1394               " compactor process ~p died with unexpected reason: ~p",
1395               [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1396                ?LOG_USERDATA(?group_id(State)), Pid, Reason]),
1397    couch_util:shutdown_sync(State#state.compactor_file),
1398    _ = couch_file:delete(?root_dir(State), compact_file_name(State)),
1399    State2 = State#state{
1400        compactor_pid = nil,
1401        compactor_file = nil,
1402        compact_log_files = nil
1403    },
1404    {noreply, State2, ?GET_TIMEOUT(State2)};
1405
1406handle_info({'EXIT', Pid, Reason},
1407        #state{group = #set_view_group{dcp_pid = Pid}} = State) ->
1408    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1409               " DCP process ~p died with unexpected reason: ~s",
1410               [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1411                ?LOG_USERDATA(?group_id(State)), Pid, ?LOG_USERDATA(Reason)]),
1412    {stop, {dcp_died, Reason}, State};
1413
1414handle_info({'EXIT', Pid, Reason}, State) ->
1415    ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1416               " terminating because linked PID ~p died with reason: ~p",
1417               [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1418                ?LOG_USERDATA(?group_id(State)), Pid, Reason]),
1419    {stop, Reason, State};
1420
1421handle_info({all_seqs, nil, StatsResponse}, State) ->
1422    #state{
1423       group = Group
1424    } = State,
1425    SeqsCache = erlang:get(seqs_cache),
1426    NewState = case StatsResponse of
1427    {ok, Seqs} ->
1428        Partitions = group_partitions(Group),
1429        Seqs2 = couch_set_view_util:filter_seqs(Partitions, Seqs),
1430        NewCacheVal = #seqs_cache{
1431            timestamp = os:timestamp(),
1432            is_waiting = false,
1433            seqs = Seqs2
1434        },
1435        State2 = case is_pid(State#state.updater_pid) of
1436        true ->
1437            State;
1438        false ->
1439            CurSeqs = indexable_partition_seqs(State, Seqs2),
1440            case CurSeqs > ?set_seqs(State#state.group) of
1441            true ->
1442                do_start_updater(State, CurSeqs, []);
1443            false ->
1444                State
1445            end
1446        end,
1447        % If a rollback happened in between async seqs update request and
1448        % its response, is_waiting flag will be resetted. In that case, we should
1449        % just discard the update seqs.
1450        case SeqsCache#seqs_cache.is_waiting of
1451        true ->
1452            erlang:put(seqs_cache, NewCacheVal);
1453        false ->
1454            ok
1455        end,
1456        State2;
1457    _ ->
1458        ?LOG_ERROR("Set view `~s`, ~s (~s) group `~s`,"
1459                              " received bad response for update seqs"
1460                              " request ~p",
1461                              [?LOG_USERDATA(?set_name(State)), ?type(State),
1462                               ?category(State), ?LOG_USERDATA(?group_id(State)),
1463                               StatsResponse]),
1464        SeqsCache2 = SeqsCache#seqs_cache{is_waiting = false},
1465        erlang:put(seqs_cache, SeqsCache2),
1466        State
1467    end,
1468    {noreply, NewState};
1469
1470handle_info({group_fd, _Fd}, State) ->
1471    % If no error occured, we don't care about the fd message
1472    {noreply, State, ?GET_TIMEOUT(State)}.
1473
1474
1475terminate(Reason, #state{group = #set_view_group{sig = Sig} = Group} = State) ->
1476    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, signature `~s`,"
1477              " terminating with reason: ~s",
1478              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
1479               ?LOG_USERDATA(?group_id(State)), hex_sig(Sig), ?LOG_USERDATA(Reason)]),
1480    Listeners2 = error_notify_update_listeners(
1481        State, State#state.update_listeners, {shutdown, Reason}),
1482    State2 = reply_all(State#state{update_listeners = Listeners2}, Reason),
1483    State3 = notify_pending_transition_waiters(State2, {shutdown, Reason}),
1484    couch_set_view_util:shutdown_wait(State3#state.cleaner_pid),
1485    couch_set_view_util:shutdown_wait(State3#state.updater_pid),
1486    couch_util:shutdown_sync(State3#state.compactor_pid),
1487    couch_util:shutdown_sync(State3#state.compactor_file),
1488    couch_util:shutdown_sync(State3#state.replica_group),
1489    Group = State#state.group,
1490    true = ets:delete(Group#set_view_group.stats_ets,
1491        ?set_view_group_stats_key(Group)),
1492    catch couch_file:only_snapshot_reads((State3#state.group)#set_view_group.fd),
1493    case State#state.shutdown of
1494    true when ?type(State) == main ->
1495        % Important to delete files here. A quick succession of ddoc updates, updating
1496        % the ddoc back to a previous version (while we're here in terminate), may lead
1497        % us to a case where it should start clean but it doesn't, because
1498        % couch_set_view:cleanup_index_files/1 was not invoked right before the last
1499        % reverting ddoc update, or it was invoked but did not finish before the view
1500        % group got spawned again. Problem here is during rebalance, where one of the
1501        % databases might have been deleted after the last ddoc update and while or after
1502        % we're here in terminate, so we must ensure we don't leave old index files around.
1503        % MB-6415 and MB-6517
1504        case State#state.shutdown_aliases of
1505        [] ->
1506            delete_index_file(?root_dir(State), Group, main),
1507            delete_index_file(?root_dir(State), Group, replica);
1508        _ ->
1509            ok
1510        end;
1511    _ ->
1512        ok
1513    end,
1514    TmpDir = updater_tmp_dir(State),
1515    ok = couch_set_view_util:delete_sort_files(TmpDir, all),
1516    _ = file:del_dir(TmpDir),
1517    ok.
1518
1519
1520code_change(_OldVsn, State, _Extra) ->
1521    {ok, State}.
1522
1523
1524-spec reply_with_group(#set_view_group{},
1525                       ordsets:ordset(partition_id()),
1526                       [#waiter{}]) -> [#waiter{}].
1527reply_with_group(_Group, _ReplicaPartitions, []) ->
1528    [];
1529reply_with_group(Group, ReplicaPartitions, WaitList) ->
1530    ActiveReplicasBitmask = couch_set_view_util:build_bitmask(
1531        ?set_replicas_on_transfer(Group)),
1532    ActiveIndexable = [{P, S} || {P, S} <- ?set_seqs(Group),
1533                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1534    ActiveUnindexable = [{P, S} || {P, S} <- ?set_unindexable_seqs(Group),
1535                          ((1 bsl P) band ?set_abitmask(Group) =/= 0)],
1536    GroupSeqs = ordsets:union(ActiveIndexable, ActiveUnindexable),
1537    WaitList2 = lists:foldr(
1538        fun(#waiter{debug = false} = Waiter, Acc) ->
1539            case maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) of
1540            true ->
1541                Acc;
1542            false ->
1543                [Waiter | Acc]
1544            end;
1545        (#waiter{debug = true} = Waiter, Acc) ->
1546            [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1547                ?set_view_group_stats_key(Group)),
1548            DebugGroup = Group#set_view_group{
1549                debug_info = #set_view_debug_info{
1550                    stats = Stats,
1551                    original_abitmask = ?set_abitmask(Group),
1552                    original_pbitmask = ?set_pbitmask(Group),
1553                    replica_partitions = ReplicaPartitions,
1554                    wanted_seqs = Waiter#waiter.seqs
1555                }
1556            },
1557            case maybe_reply_with_group(Waiter, DebugGroup, GroupSeqs, ActiveReplicasBitmask) of
1558            true ->
1559                Acc;
1560            false ->
1561                [Waiter | Acc]
1562            end
1563        end,
1564        [], WaitList),
1565    WaitList2.
1566
1567
1568-spec maybe_reply_with_group(#waiter{}, #set_view_group{}, partition_seqs(), bitmask()) -> boolean().
1569maybe_reply_with_group(Waiter, Group, GroupSeqs, ActiveReplicasBitmask) ->
1570    #waiter{from = {Pid, _} = From, seqs = ClientSeqs} = Waiter,
1571    case (ClientSeqs == []) orelse (GroupSeqs >= ClientSeqs) of
1572    true ->
1573        couch_ref_counter:add(Group#set_view_group.ref_counter, Pid),
1574        gen_server:reply(From, {ok, Group, ActiveReplicasBitmask}),
1575        true;
1576    false ->
1577        false
1578    end.
1579
1580
1581-spec reply_all(#state{}, term()) -> #state{}.
1582reply_all(#state{waiting_list = []} = State, _Reply) ->
1583    State;
1584reply_all(#state{waiting_list = WaitList} = State, Reply) ->
1585    lists:foreach(fun(#waiter{from = From}) ->
1586        catch gen_server:reply(From, Reply)
1587    end, WaitList),
1588    State#state{waiting_list = []}.
1589
1590
1591-spec prepare_group(init_args(), boolean()) -> {'ok', #set_view_group{}} |
1592                                               {'error', atom()}.
1593prepare_group({RootDir, SetName, Group0}, ForceReset)->
1594    #set_view_group{
1595        sig = Sig,
1596        type = Type
1597    } = Group0,
1598    Filepath = find_index_file(RootDir, Group0),
1599    Group = Group0#set_view_group{filepath = Filepath},
1600    case open_index_file(Filepath) of
1601    {ok, Fd} ->
1602        try
1603            if ForceReset ->
1604                % this can happen if we missed a purge
1605                {ok, reset_file(Fd, Group)};
1606            true ->
1607                case (catch couch_file:read_header_bin(Fd)) of
1608                {ok, HeaderBin, HeaderPos} ->
1609                    HeaderSig = couch_set_view_util:header_bin_sig(HeaderBin);
1610                _ ->
1611                    HeaderPos = 0,  % keep dialyzer happy
1612                    HeaderSig = <<>>,
1613                    HeaderBin = <<>>
1614                end,
1615                case HeaderSig == Sig of
1616                true ->
1617                    HeaderInfo = couch_set_view_util:header_bin_to_term(HeaderBin),
1618                    {ok, init_group(Fd, Group, HeaderInfo, HeaderPos)};
1619                _ ->
1620                    % this happens on a new file
1621                    case (not ForceReset) andalso (Type =:= main) of
1622                    true ->
1623                        % initializing main view group
1624                        ok = delete_index_file(RootDir, Group, replica);
1625                    false ->
1626                        ok
1627                    end,
1628                    {ok, reset_file(Fd, Group)}
1629                end
1630            end
1631        catch
1632        _:Error ->
1633            % In case of any error, we need to close the file as it isn't yet
1634            % associated with the group, hence can't be cleaned up during
1635            % group cleanup.
1636            couch_file:close(Fd),
1637            Error
1638        end;
1639    {error, emfile} = Error ->
1640        ?LOG_ERROR("Can't open set view `~s`, ~s (~s) group `~s`:"
1641                   " too many files open",
1642                   [?LOG_USERDATA(SetName), Type, Group#set_view_group.category,
1643                    ?LOG_USERDATA(Group#set_view_group.name)]),
1644        Error;
1645    Error ->
1646        ok = delete_index_file(RootDir, Group, Type),
1647        case (not ForceReset) andalso (Type =:= main) of
1648        true ->
1649            % initializing main view group
1650            ok = delete_index_file(RootDir, Group, replica);
1651        false ->
1652            ok
1653        end,
1654        Error
1655    end.
1656
1657
1658-spec hex_sig(#set_view_group{} | binary()) -> string().
1659hex_sig(#set_view_group{sig = Sig}) ->
1660    hex_sig(Sig);
1661hex_sig(GroupSig) ->
1662    couch_util:to_hex(GroupSig).
1663
1664
1665-spec base_index_file_name(#set_view_group{}, set_view_group_type()) -> string().
1666base_index_file_name(Group, Type) ->
1667    atom_to_list(Type) ++ "_" ++ hex_sig(Group#set_view_group.sig) ++
1668        Group#set_view_group.extension.
1669
1670
1671-spec find_index_file(string(), #set_view_group{}) -> string().
1672find_index_file(RootDir, Group) ->
1673    #set_view_group{
1674        set_name = SetName,
1675        type = Type,
1676        category = Category
1677    } = Group,
1678    DesignRoot = couch_set_view:set_index_dir(RootDir, SetName, Category),
1679    BaseName = base_index_file_name(Group, Type),
1680    FullPath = filename:join([DesignRoot, BaseName]),
1681    case filelib:wildcard(FullPath ++ ".[0-9]*") of
1682    [] ->
1683        FullPath ++ ".1";
1684    Matching ->
1685        BaseNameSplitted = string:tokens(BaseName, "."),
1686        Matching2 = lists:filter(
1687            fun(Match) ->
1688                MatchBase = filename:basename(Match),
1689                [Suffix | Rest] = lists:reverse(string:tokens(MatchBase, ".")),
1690                (lists:reverse(Rest) =:= BaseNameSplitted) andalso
1691                    is_integer((catch list_to_integer(Suffix)))
1692            end,
1693            Matching),
1694        case Matching2 of
1695        [] ->
1696            FullPath ++ ".1";
1697        _ ->
1698            GetSuffix = fun(FileName) ->
1699                list_to_integer(lists:last(string:tokens(FileName, ".")))
1700            end,
1701            Matching3 = lists:sort(
1702                fun(A, B) -> GetSuffix(A) > GetSuffix(B) end,
1703                Matching2),
1704            hd(Matching3)
1705        end
1706    end.
1707
1708
1709-spec delete_index_file(string(), #set_view_group{}, set_view_group_type()) -> no_return().
1710delete_index_file(RootDir, Group, Type) ->
1711    #set_view_group{
1712        set_name = SetName,
1713        category = Category
1714    } = Group,
1715    SetDir = couch_set_view:set_index_dir(RootDir, SetName, Category),
1716    BaseName = filename:join([SetDir, base_index_file_name(Group, Type)]),
1717    lists:foreach(
1718        fun(F) -> ok = couch_file:delete(RootDir, F) end,
1719        filelib:wildcard(BaseName ++ ".[0-9]*")).
1720
1721
1722-spec compact_file_name(#state{} | #set_view_group{}) -> string().
1723compact_file_name(#state{group = Group}) ->
1724    compact_file_name(Group);
1725compact_file_name(#set_view_group{filepath = CurFilepath}) ->
1726    CurFilepath ++ ".compact".
1727
1728
1729-spec increment_filepath(#set_view_group{}) -> string().
1730increment_filepath(#set_view_group{filepath = CurFilepath}) ->
1731    [Suffix | Rest] = lists:reverse(string:tokens(CurFilepath, ".")),
1732    NewSuffix = integer_to_list(list_to_integer(Suffix) + 1),
1733    string:join(lists:reverse(Rest), ".") ++ "." ++ NewSuffix.
1734
1735
1736-spec open_index_file(string()) -> {'ok', pid()} | {'error', atom()}.
1737open_index_file(Filepath) ->
1738    case do_open_index_file(Filepath) of
1739    {ok, Fd} ->
1740        unlink(Fd),
1741        {ok, Fd};
1742    Error ->
1743        Error
1744    end.
1745
1746do_open_index_file(Filepath) ->
1747    case couch_file:open(Filepath) of
1748    {ok, Fd}        -> {ok, Fd};
1749    {error, enoent} -> couch_file:open(Filepath, [create]);
1750    Error           -> Error
1751    end.
1752
1753
1754open_set_group(Mod, SetName, GroupId) ->
1755    case couch_set_view_ddoc_cache:get_ddoc(SetName, GroupId) of
1756    {ok, DDoc} ->
1757        {ok, Mod:design_doc_to_set_view_group(SetName, DDoc)};
1758    {doc_open_error, Error} ->
1759        Error;
1760    {db_open_error, Error} ->
1761        Error
1762    end.
1763
1764
1765% To be used for debug/troubleshooting only (accessible via REST/HTTP API)
1766get_group_info(State) ->
1767    #state{
1768        group = Group,
1769        replica_group = ReplicaPid,
1770        updater_pid = UpdaterPid,
1771        updater_state = UpdaterState,
1772        compactor_pid = CompactorPid,
1773        waiting_list = WaitersList,
1774        cleaner_pid = CleanerPid,
1775        replica_partitions = ReplicaParts
1776    } = State,
1777    #set_view_group{
1778        fd = Fd,
1779        sig = GroupSig,
1780        id_btree = Btree,
1781        views = Views,
1782        mod = Mod
1783    } = Group,
1784    PendingTrans = get_pending_transition(State),
1785    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1786        ?set_view_group_stats_key(Group)),
1787    JsonStats = {[
1788        {full_updates, Stats#set_view_group_stats.full_updates},
1789        {partial_updates, Stats#set_view_group_stats.partial_updates},
1790        {stopped_updates, Stats#set_view_group_stats.stopped_updates},
1791        {updater_cleanups, Stats#set_view_group_stats.updater_cleanups},
1792        {compactions, Stats#set_view_group_stats.compactions},
1793        {cleanups, Stats#set_view_group_stats.cleanups},
1794        {waiting_clients, length(WaitersList)},
1795        {cleanup_interruptions, Stats#set_view_group_stats.cleanup_stops},
1796        {update_history, Stats#set_view_group_stats.update_history},
1797        {compaction_history, Stats#set_view_group_stats.compaction_history},
1798        {cleanup_history, Stats#set_view_group_stats.cleanup_history},
1799        {accesses, Stats#set_view_group_stats.accesses},
1800        {update_errors, Stats#set_view_group_stats.update_errors},
1801        {duplicated_partition_versions_seen,
1802         Stats#set_view_group_stats.dup_partitions_counter}
1803    ]},
1804    {ok, Size} = couch_file:bytes(Fd),
1805    GroupPartitions = ordsets:from_list(
1806        couch_set_view_util:decode_bitmask(?set_abitmask(Group) bor ?set_pbitmask(Group))),
1807    PartVersions = lists:map(fun({PartId, PartVersion}) ->
1808        {couch_util:to_binary(PartId), [tuple_to_list(V) || V <- PartVersion]}
1809    end, ?set_partition_versions(Group)),
1810
1811    IndexSeqs = ?set_seqs(Group),
1812    IndexPartitions = [PartId || {PartId, _} <- IndexSeqs],
1813    % Extract the seqnum from KV store for all indexible partitions.
1814    {ok, GroupSeqs} = get_seqs(State, GroupPartitions),
1815    PartSeqs = couch_set_view_util:filter_seqs(IndexPartitions, GroupSeqs),
1816
1817    % Calculate the total sum over difference of Seqnum between KV
1818    % and Index partition.
1819    SeqDiffs = lists:zipwith(
1820        fun({PartId, Seq1}, {PartId, Seq2}) ->
1821            Seq1 - Seq2
1822        end, PartSeqs, IndexSeqs),
1823    TotalSeqDiff = lists:sum(SeqDiffs),
1824
1825    [
1826        {signature, ?l2b(hex_sig(GroupSig))},
1827        {disk_size, Size},
1828        {data_size, Mod:view_group_data_size(Btree, Views)},
1829        {updater_running, is_pid(UpdaterPid)},
1830        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build},
1831        {updater_state, couch_util:to_binary(UpdaterState)},
1832        {compact_running, CompactorPid /= nil},
1833        {cleanup_running, (CleanerPid /= nil) orelse
1834            ((CompactorPid /= nil) andalso (?set_cbitmask(Group) =/= 0))},
1835        {max_number_partitions, ?set_num_partitions(Group)},
1836        {update_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- IndexSeqs]}},
1837        {partition_seqs, {[{couch_util:to_binary(P), S} || {P, S} <- GroupSeqs]}},
1838        {total_seqs_diff, TotalSeqDiff},
1839        {active_partitions, couch_set_view_util:decode_bitmask(?set_abitmask(Group))},
1840        {passive_partitions, couch_set_view_util:decode_bitmask(?set_pbitmask(Group))},
1841        {cleanup_partitions, couch_set_view_util:decode_bitmask(?set_cbitmask(Group))},
1842        {unindexable_partitions, {[{couch_util:to_binary(P), S} || {P, S} <- ?set_unindexable_seqs(Group)]}},
1843        {stats, JsonStats},
1844        {pending_transition, case PendingTrans of
1845            nil ->
1846                null;
1847            #set_view_transition{} ->
1848                {[
1849                    {active, PendingTrans#set_view_transition.active},
1850                    {passive, PendingTrans#set_view_transition.passive}
1851                ]}
1852            end
1853        },
1854        {partition_versions, {PartVersions}}
1855    ] ++
1856    case (?type(State) =:= main) andalso is_pid(ReplicaPid) of
1857    true ->
1858        [{replica_partitions, ReplicaParts}, {replicas_on_transfer, ?set_replicas_on_transfer(Group)}];
1859    false ->
1860        []
1861    end ++
1862    get_replica_group_info(ReplicaPid).
1863
1864get_replica_group_info(ReplicaPid) when is_pid(ReplicaPid) ->
1865    {ok, RepGroupInfo} = gen_server:call(ReplicaPid, request_group_info, infinity),
1866    [{replica_group_info, {RepGroupInfo}}];
1867get_replica_group_info(_) ->
1868    [].
1869
1870
1871get_data_size_info(State) ->
1872    #state{
1873        group = Group,
1874        replica_group = ReplicaPid,
1875        updater_pid = UpdaterPid
1876    } = State,
1877    #set_view_group{
1878        fd = Fd,
1879        id_btree = Btree,
1880        sig = GroupSig,
1881        views = Views,
1882        mod = Mod
1883    } = Group,
1884    {ok, FileSize} = couch_file:bytes(Fd),
1885    DataSize = Mod:view_group_data_size(Btree, Views),
1886    [Stats] = ets:lookup(Group#set_view_group.stats_ets,
1887        ?set_view_group_stats_key(Group)),
1888    Info = [
1889        {signature, hex_sig(GroupSig)},
1890        {disk_size, FileSize},
1891        {data_size, DataSize},
1892        {accesses, Stats#set_view_group_stats.accesses},
1893        {updater_running, is_pid(UpdaterPid)},
1894        {initial_build, is_pid(UpdaterPid) andalso State#state.initial_build}
1895    ],
1896    case is_pid(ReplicaPid) of
1897    false ->
1898        Info;
1899    true ->
1900        {ok, RepInfo} = gen_server:call(ReplicaPid, get_data_size, infinity),
1901        [{replica_group_info, RepInfo} | Info]
1902    end.
1903
1904
1905-spec reset_group(#set_view_group{}) -> #set_view_group{}.
1906reset_group(Group) ->
1907    #set_view_group{
1908        views = Views,
1909        mod = Mod
1910    } = Group,
1911    Views2 = lists:map(fun(View) ->
1912        Indexer = Mod:reset_view(View#set_view.indexer),
1913        View#set_view{indexer = Indexer}
1914    end, Views),
1915    Group#set_view_group{
1916        fd = nil,
1917        index_header = #set_view_index_header{},
1918        id_btree = nil,
1919        views = Views2
1920    }.
1921
1922
1923-spec reset_file(pid(), #set_view_group{}) -> #set_view_group{}.
1924reset_file(Fd, #set_view_group{views = Views, index_header = Header} = Group) ->
1925    ok = couch_file:truncate(Fd, 0),
1926    EmptyHeader = Header#set_view_index_header{
1927        seqs = [{PartId, 0} ||
1928            {PartId, _} <- Header#set_view_index_header.seqs],
1929        id_btree_state = nil,
1930        view_states = [nil || _ <- Views],
1931        replicas_on_transfer = [],
1932        pending_transition = nil,
1933        unindexable_seqs = [{PartId, 0} ||
1934            {PartId, _} <- Header#set_view_index_header.unindexable_seqs],
1935        partition_versions = [{PartId, [{0, 0}]} ||
1936            {PartId, _} <- Header#set_view_index_header.partition_versions]
1937    },
1938    EmptyGroup = Group#set_view_group{index_header = EmptyHeader},
1939    EmptyHeaderBin = couch_set_view_util:group_to_header_bin(EmptyGroup),
1940    {ok, Pos} = couch_file:write_header_bin(Fd, EmptyHeaderBin),
1941    init_group(Fd, reset_group(EmptyGroup), EmptyHeader, Pos).
1942
1943
1944-spec reset_group_from_state(#state{}) -> #state{}.
1945reset_group_from_state(State) ->
1946    Group = State#state.group,
1947    Group2 = reset_file(Group#set_view_group.fd, Group),
1948    State#state{
1949        updater_pid = nil,
1950        initial_build = false,
1951        updater_state = not_running,
1952        group = Group2
1953    }.
1954
1955
1956-spec init_group(pid(),
1957                 #set_view_group{},
1958                 #set_view_index_header{},
1959                 non_neg_integer()) -> #set_view_group{}.
1960init_group(Fd, Group, IndexHeader, HeaderPos) ->
1961    #set_view_group{
1962        views = Views0,
1963        mod = Mod
1964    } = Group,
1965    Views = [V#set_view{ref = make_ref()} || V <- Views0],
1966    #set_view_index_header{
1967        id_btree_state = IdBtreeState,
1968        view_states = ViewStates
1969    } = IndexHeader,
1970    IdTreeReduce = fun(reduce, KVs) ->
1971        <<(length(KVs)):40, (couch_set_view_util:partitions_map(KVs, 0)):?MAX_NUM_PARTITIONS>>;
1972    (rereduce, [First | Rest]) ->
1973        lists:foldl(
1974            fun(<<S:40, M:?MAX_NUM_PARTITIONS>>, <<T:40, A:?MAX_NUM_PARTITIONS>>) ->
1975                <<(S + T):40, (M bor A):?MAX_NUM_PARTITIONS>>
1976            end,
1977            First, Rest)
1978    end,
1979    KvChunkThreshold = couch_config:get("set_views", "btree_kv_node_threshold", "7168"),
1980    KpChunkThreshold = couch_config:get("set_views", "btree_kp_node_threshold", "6144"),
1981    BtreeOptions = [
1982        {kv_chunk_threshold, list_to_integer(KvChunkThreshold)},
1983        {kp_chunk_threshold, list_to_integer(KpChunkThreshold)},
1984        {binary_mode, true}
1985    ],
1986    {ok, IdBtree} = couch_btree:open(
1987        IdBtreeState, Fd, [{reduce, IdTreeReduce} | BtreeOptions]),
1988    Views2 = Mod:setup_views(Fd, BtreeOptions, Group, ViewStates, Views),
1989    Group#set_view_group{
1990        fd = Fd,
1991        id_btree = IdBtree,
1992        views = Views2,
1993        index_header = IndexHeader,
1994        header_pos = HeaderPos
1995    }.
1996
1997
1998commit_header(Group) ->
1999    commit_header(Group, true).
2000
2001-spec commit_header(#set_view_group{}, boolean()) -> {'ok', non_neg_integer()}.
2002commit_header(Group, Fsync) ->
2003    HeaderBin = couch_set_view_util:group_to_header_bin(Group),
2004    {ok, Pos} = couch_file:write_header_bin(Group#set_view_group.fd, HeaderBin),
2005    case Fsync of
2006    true ->
2007        ok = couch_file:sync(Group#set_view_group.fd);
2008    false ->
2009        ok = couch_file:flush(Group#set_view_group.fd)
2010    end,
2011    {ok, Pos}.
2012
2013-spec filter_out_bitmask_partitions(ordsets:ordset(partition_id()),
2014                                    bitmask()) -> ordsets:ordset(partition_id()).
2015filter_out_bitmask_partitions(Partitions, BMask) ->
2016    [P || P <- Partitions, ((BMask bsr P) band 1) =/= 1].
2017
2018-spec maybe_update_partition_states(ordsets:ordset(partition_id()),
2019                                    ordsets:ordset(partition_id()),
2020                                    ordsets:ordset(partition_id()),
2021                                    #state{}) -> #state{}.
2022maybe_update_partition_states(ActiveList0, PassiveList0, CleanupList0, State) ->
2023    #state{group = Group} = State,
2024    PendingTrans = ?set_pending_transition(Group),
2025    PendingActive = ?pending_transition_active(PendingTrans),
2026    PendingPassive = ?pending_transition_passive(PendingTrans),
2027    PendingUnindexable = ?pending_transition_unindexable(PendingTrans),
2028    case (?set_unindexable_seqs(Group) == []) andalso (PendingUnindexable == []) of
2029    true ->
2030        ActiveList = ActiveList0,
2031        PassiveList = PassiveList0,
2032        CleanupList = CleanupList0;
2033    false ->
2034        AlreadyActive = couch_set_view_util:build_bitmask(PendingActive) bor ?set_abitmask(Group),
2035        AlreadyPassive = couch_set_view_util:build_bitmask(PendingPassive) bor ?set_pbitmask(Group),
2036        ActiveList = filter_out_bitmask_partitions(ActiveList0, AlreadyActive),
2037        PassiveList = filter_out_bitmask_partitions(PassiveList0, AlreadyPassive),
2038        CleanupList = filter_out_bitmask_partitions(CleanupList0, ?set_cbitmask(Group)),
2039        ActiveMarkedAsUnindexable0 = [
2040            P || P <- ActiveList, is_unindexable_part(P, Group)
2041        ],
2042
2043        % Replicas on transfer look like normal active partitions for the
2044        % caller. Hence they can be marked as unindexable. The actual
2045        % transfer to a real active partition needs to happen though. Thus
2046        % an intersection between newly activated partitions and unindexable
2047        % ones is possible (MB-8677).
2048        ActiveMarkedAsUnindexable = ordsets:subtract(
2049            ActiveMarkedAsUnindexable0, ?set_replicas_on_transfer(Group)),
2050
2051        case ActiveMarkedAsUnindexable of
2052        [] ->
2053            ok;
2054        _ ->
2055            ErrorMsg1 = io_lib:format("Intersection between requested active list "
2056                "and current unindexable partitions: ~s",
2057                [condense(ActiveMarkedAsUnindexable)]),
2058            throw({error, iolist_to_binary(ErrorMsg1)})
2059        end,
2060        PassiveMarkedAsUnindexable = [
2061            P || P <- PassiveList, is_unindexable_part(P, Group)
2062        ],
2063        case PassiveMarkedAsUnindexable of
2064        [] ->
2065            ok;
2066        _ ->
2067            ErrorMsg2 = io_lib:format("Intersection between requested passive list "
2068                "and current unindexable partitions: ~s",
2069                [condense(PassiveMarkedAsUnindexable)]),
2070            throw({error, iolist_to_binary(ErrorMsg2)})
2071        end,
2072        CleanupMarkedAsUnindexable = [
2073            P || P <- CleanupList, is_unindexable_part(P, Group)
2074        ],
2075        case CleanupMarkedAsUnindexable of
2076        [] ->
2077            ok;
2078        _ ->
2079            ErrorMsg3 = io_lib:format("Intersection between requested cleanup list "
2080                "and current unindexable partitions: ~s", [condense(CleanupMarkedAsUnindexable)]),
2081            throw({error, iolist_to_binary(ErrorMsg3)})
2082        end
2083    end,
2084    ActiveMask = couch_set_view_util:build_bitmask(ActiveList),
2085    case ActiveMask >= (1 bsl ?set_num_partitions(Group)) of
2086    true ->
2087        throw({error, <<"Invalid active partitions list">>});
2088    false ->
2089        ok
2090    end,
2091    PassiveMask = couch_set_view_util:build_bitmask(PassiveList),
2092    case PassiveMask >= (1 bsl ?set_num_partitions(Group)) of
2093    true ->
2094        throw({error, <<"Invalid passive partitions list">>});
2095    false ->
2096        ok
2097    end,
2098    CleanupMask = couch_set_view_util:build_bitmask(CleanupList),
2099    case CleanupMask >= (1 bsl ?set_num_partitions(Group)) of
2100    true ->
2101        throw({error, <<"Invalid cleanup partitions list">>});
2102    false ->
2103        ok
2104    end,
2105
2106    ActivePending = ?pending_transition_active(PendingTrans),
2107    PassivePending = ?pending_transition_passive(PendingTrans),
2108    IsEffectlessTransition =
2109        (ActiveMask bor ?set_abitmask(Group)) == ?set_abitmask(Group) andalso
2110        (PassiveMask bor ?set_pbitmask(Group)) == ?set_pbitmask(Group) andalso
2111        ((CleanupMask band (?set_abitmask(Group) bor ?set_pbitmask(Group))) == 0) andalso
2112        ordsets:is_disjoint(CleanupList, ActivePending) andalso
2113        ordsets:is_disjoint(CleanupList, PassivePending) andalso
2114        ordsets:is_disjoint(ActiveList, PassivePending) andalso
2115        ordsets:is_disjoint(PassiveList, ActivePending),
2116
2117    case IsEffectlessTransition of
2118    true ->
2119        State;
2120    false ->
2121        RestartUpdater = updater_needs_restart(
2122            Group, ActiveMask, PassiveMask, CleanupMask),
2123        NewState = update_partition_states(
2124            ActiveList, PassiveList, CleanupList, State, RestartUpdater),
2125        #state{group = NewGroup, updater_pid = UpdaterPid} = NewState,
2126        case RestartUpdater of
2127        false when is_pid(UpdaterPid) ->
2128            case missing_partitions(Group, NewGroup) of
2129            [] ->
2130                ok;
2131            MissingPassive ->
2132                UpdaterPid ! {new_passive_partitions, MissingPassive}
2133            end;
2134        _ ->
2135            ok
2136        end,
2137        NewState
2138    end.
2139
2140
2141-spec update_partition_states(ordsets:ordset(partition_id()),
2142                              ordsets:ordset(partition_id()),
2143                              ordsets:ordset(partition_id()),
2144                              #state{},
2145                              boolean()) -> #state{}.
2146update_partition_states(ActiveList, PassiveList, CleanupList, State, RestartUpdater) ->
2147    State2 = stop_cleaner(State),
2148    case RestartUpdater of
2149    true ->
2150        #state{group = Group3} = State3 = stop_updater(State2);
2151    false ->
2152        #state{group = Group3} = State3 = State2
2153    end,
2154    UpdaterWasRunning = is_pid(State#state.updater_pid),
2155    ActiveInCleanup = partitions_still_in_cleanup(ActiveList, Group3),
2156    PassiveInCleanup = partitions_still_in_cleanup(PassiveList, Group3),
2157    NewPendingTrans = merge_into_pending_transition(
2158        Group3, ActiveInCleanup, PassiveInCleanup, CleanupList),
2159    ApplyActiveList = ordsets:subtract(ActiveList, ActiveInCleanup),
2160    ApplyPassiveList = ordsets:subtract(PassiveList, PassiveInCleanup),
2161    ApplyCleanupList = CleanupList,
2162    State4 = persist_partition_states(
2163               State3, ApplyActiveList, ApplyPassiveList,
2164               ApplyCleanupList, NewPendingTrans, []),
2165    State5 = notify_pending_transition_waiters(State4),
2166    after_partition_states_updated(State5, UpdaterWasRunning).
2167
2168
2169-spec merge_into_pending_transition(#set_view_group{},
2170                                    ordsets:ordset(partition_id()),
2171                                    ordsets:ordset(partition_id()),
2172                                    ordsets:ordset(partition_id())) ->
2173                                           #set_view_transition{} | 'nil'.
2174merge_into_pending_transition(Group, ActiveInCleanup, PassiveInCleanup, CleanupList) ->
2175    PendingTrans = ?set_pending_transition(Group),
2176    ActivePending = ?pending_transition_active(PendingTrans),
2177    PassivePending = ?pending_transition_passive(PendingTrans),
2178    case ordsets:intersection(PassivePending, ActiveInCleanup) of
2179    [] ->
2180        PassivePending2 = PassivePending;
2181    Int ->
2182        PassivePending2 = ordsets:subtract(PassivePending, Int)
2183    end,
2184    case ordsets:intersection(ActivePending, PassiveInCleanup) of
2185    [] ->
2186        ActivePending2 = ActivePending;
2187    Int2 ->
2188        ActivePending2 = ordsets:subtract(ActivePending, Int2)
2189    end,
2190    ActivePending3 = ordsets:subtract(ActivePending2, CleanupList),
2191    PassivePending3 = ordsets:subtract(PassivePending2, CleanupList),
2192    ActivePending4 = ordsets:union(ActivePending3, ActiveInCleanup),
2193    PassivePending4 = ordsets:union(PassivePending3, PassiveInCleanup),
2194    case (ActivePending4 == []) andalso (PassivePending4 == []) of
2195    true ->
2196        nil;
2197    false ->
2198        #set_view_transition{
2199            active = ActivePending4,
2200            passive = PassivePending4
2201        }
2202    end.
2203
2204
2205-spec after_partition_states_updated(#state{}, boolean()) -> #state{}.
2206after_partition_states_updated(State, UpdaterWasRunning) ->
2207    State2 = case UpdaterWasRunning of
2208    true ->
2209        % Updater was running, we stopped it, updated the group we received
2210        % from the updater, updated that group's bitmasks and update seqs,
2211        % and now restart the updater with this modified group.
2212        start_updater(State);
2213    false ->
2214        State
2215    end,
2216    State3 = stop_compactor(State2),
2217    maybe_start_cleaner(State3).
2218
2219
2220-spec persist_partition_states(#state{},
2221                               ordsets:ordset(partition_id()),
2222                               ordsets:ordset(partition_id()),
2223                               ordsets:ordset(partition_id()),
2224                               #set_view_transition{} | 'nil',
2225                               ordsets:ordset(partition_id())) -> #state{}.
2226persist_partition_states(State, ActiveList, PassiveList, CleanupList, PendingTrans, ToBeUnindexable) ->
2227    % There can never be intersection between given active, passive and cleanup lists.
2228    % This check is performed elsewhere, outside the gen_server.
2229    #state{
2230        group = Group,
2231        replica_partitions = ReplicaParts,
2232        replica_group = ReplicaPid,
2233        update_listeners = Listeners,
2234        waiting_list = WaitList
2235    } = State,
2236    case ordsets:intersection(ActiveList, ReplicaParts) of
2237    [] ->
2238         ActiveList2 = ActiveList,
2239         PassiveList2 = PassiveList,
2240         ReplicasOnTransfer2 = ?set_replicas_on_transfer(Group),
2241         ReplicasToMarkActive = [];
2242    CommonRep ->
2243         PassiveList2 = ordsets:union(PassiveList, CommonRep),
2244         ActiveList2 = ordsets:subtract(ActiveList, CommonRep),
2245         ReplicasOnTransfer2 = ordsets:union(?set_replicas_on_transfer(Group), CommonRep),
2246         ReplicasToMarkActive = CommonRep
2247    end,
2248    case ordsets:intersection(PassiveList, ReplicasOnTransfer2) of
2249    [] ->
2250        ReplicasToCleanup = [],
2251        PassiveList3 = PassiveList2,
2252        ReplicasOnTransfer3 = ReplicasOnTransfer2;
2253    CommonRep2 ->
2254        ReplicasToCleanup = CommonRep2,
2255        PassiveList3 = ordsets:subtract(PassiveList2, CommonRep2),
2256        ReplicasOnTransfer3 = ordsets:subtract(ReplicasOnTransfer2, CommonRep2)
2257    end,
2258    case ordsets:intersection(CleanupList, ReplicasOnTransfer3) of
2259    [] ->
2260        ReplicaParts2 = ReplicaParts,
2261        ReplicasOnTransfer4 = ReplicasOnTransfer3,
2262        ReplicasToCleanup2 = ReplicasToCleanup;
2263    CommonRep3 ->
2264        ReplicaParts2 = ordsets:subtract(ReplicaParts, CommonRep3),
2265        ReplicasOnTransfer4 = ordsets:subtract(ReplicasOnTransfer3, CommonRep3),
2266        ReplicasToCleanup2 = ordsets:union(ReplicasToCleanup, CommonRep3)
2267    end,
2268    {ok, NewAbitmask1, NewPbitmask1, NewSeqs1, NewVersions1} =
2269        set_active_partitions(
2270            ActiveList2,
2271            ?set_abitmask(Group),
2272            ?set_pbitmask(Group),
2273            ?set_seqs(Group),
2274            ?set_partition_versions(Group)),
2275    {ok, NewAbitmask2, NewPbitmask2, NewSeqs2, NewVersions2} =
2276        set_passive_partitions(
2277            PassiveList3,
2278            NewAbitmask1,
2279            NewPbitmask1,
2280            NewSeqs1,
2281            NewVersions1),
2282    {ok, NewAbitmask3, NewPbitmask3, NewCbitmask3, NewSeqs3, NewVersions3} =
2283        set_cleanup_partitions(
2284            CleanupList,
2285            NewAbitmask2,
2286            NewPbitmask2,
2287            ?set_cbitmask(Group),
2288            NewSeqs2,
2289            NewVersions2),
2290    {NewSeqs4, NewUnindexableSeqs, NewVersions4} = lists:foldl(
2291        fun(PartId, {AccSeqs, AccUnSeqs, AccVersions}) ->
2292            PartSeq = couch_set_view_util:get_part_seq(PartId, AccSeqs),
2293            AccSeqs2 = orddict:erase(PartId, AccSeqs),
2294            AccUnSeqs2 = orddict:store(PartId, PartSeq, AccUnSeqs),
2295            AccVersions2 = orddict:erase(PartId, AccVersions),
2296            {AccSeqs2, AccUnSeqs2, AccVersions2}
2297        end,
2298        {NewSeqs3, ?set_unindexable_seqs(Group), NewVersions3},
2299        ToBeUnindexable),
2300    State2 = update_header(
2301        State,
2302        NewAbitmask3,
2303        NewPbitmask3,
2304        NewCbitmask3,
2305        NewSeqs4,
2306        NewUnindexableSeqs,
2307        ReplicasOnTransfer4,
2308        ReplicaParts2,
2309        PendingTrans,
2310        NewVersions4),
2311    % A crash might happen between updating our header and updating the state of
2312    % replica view group. The init function must detect and correct this.
2313    ok = set_state(ReplicaPid, ReplicasToMarkActive, [], ReplicasToCleanup2),
2314    % Need to update list of active partition sequence numbers for every blocked client.
2315    WaitList2 = update_waiting_list(
2316        WaitList, State, ActiveList2, PassiveList3, CleanupList),
2317    State3 = State2#state{waiting_list = WaitList2},
2318    case (dict:size(Listeners) > 0) andalso (CleanupList /= []) of
2319    true ->
2320        Listeners2 = dict:filter(
2321            fun(Ref, Listener) ->
2322                #up_listener{
2323                    pid = Pid,
2324                    monref = MonRef,
2325                    partition = PartId
2326                } = Listener,
2327                case lists:member(PartId, CleanupList) of
2328                true ->
2329                    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2330                              " replying to partition ~p"
2331                              " update monitor, reference ~p,"
2332                              " error: marked_for_cleanup",
2333                              [?LOG_USERDATA(?set_name(State)), ?type(State),
2334                               ?category(State), ?LOG_USERDATA(?group_id(State)),
2335                               Ref, PartId]),
2336                    Pid ! {Ref, marked_for_cleanup},
2337                    erlang:demonitor(MonRef, [flush]),
2338                    false;
2339                false ->
2340                    true
2341                end
2342            end,
2343            Listeners),
2344        State3#state{update_listeners = Listeners2};
2345    false ->
2346        State3
2347    end.
2348
2349
2350-spec update_waiting_list([#waiter{}],
2351                          #state{},
2352                          ordsets:ordset(partition_id()),
2353                          ordsets:ordset(partition_id()),
2354                          ordsets:ordset(partition_id())) -> [#waiter{}].
2355update_waiting_list([], _State, _AddActiveList, _AddPassiveList, _AddCleanupList) ->
2356    [];
2357update_waiting_list(WaitList, State, AddActiveList, AddPassiveList, AddCleanupList) ->
2358    {ok, AddActiveSeqs} = get_seqs(State, AddActiveList),
2359    RemoveSet = ordsets:union(AddPassiveList, AddCleanupList),
2360    MapFun = fun(W) -> update_waiter_seqs(W, AddActiveSeqs, RemoveSet) end,
2361    [MapFun(W) || W <- WaitList].
2362
2363
2364-spec update_waiter_seqs(#waiter{},
2365                         partition_seqs(),
2366                         ordsets:ordset(partition_id())) -> #waiter{}.
2367update_waiter_seqs(Waiter, AddActiveSeqs, ToRemove) ->
2368    Seqs2 = lists:foldl(
2369        fun({PartId, Seq}, Acc) ->
2370            case couch_set_view_util:has_part_seq(PartId, Acc) of
2371            true ->
2372                Acc;
2373            false ->
2374                orddict:store(PartId, Seq, Acc)
2375            end
2376        end,
2377        Waiter#waiter.seqs, AddActiveSeqs),
2378    Seqs3 = lists:foldl(
2379        fun(PartId, Acc) -> orddict:erase(PartId, Acc) end,
2380        Seqs2, ToRemove),
2381    Waiter#waiter{seqs = Seqs3}.
2382
2383
2384-spec maybe_apply_pending_transition(#state{}) -> #state{}.
2385maybe_apply_pending_transition(State) when not ?have_pending_transition(State) ->
2386    State;
2387maybe_apply_pending_transition(State) ->
2388    State2 = stop_cleaner(State),
2389    #state{group = Group3} = State3 = stop_updater(State2),
2390    UpdaterWasRunning = is_pid(State#state.updater_pid),
2391    #set_view_transition{
2392        active = ActivePending,
2393        passive = PassivePending,
2394        unindexable = UnindexablePending
2395    } = get_pending_transition(State),
2396    ActiveInCleanup = partitions_still_in_cleanup(ActivePending, Group3),
2397    PassiveInCleanup = partitions_still_in_cleanup(PassivePending, Group3),
2398    ApplyActiveList = ordsets:subtract(ActivePending, ActiveInCleanup),
2399    ApplyPassiveList = ordsets:subtract(PassivePending, PassiveInCleanup),
2400    {ApplyUnindexableList, NewUnindexablePending} = lists:partition(
2401        fun(P) ->
2402            lists:member(P, ApplyActiveList) orelse
2403            lists:member(P, ApplyPassiveList)
2404        end,
2405        UnindexablePending),
2406    case (ApplyActiveList /= []) orelse (ApplyPassiveList /= []) of
2407    true ->
2408        ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`,"
2409                  " applying state transitions from pending transition:~n"
2410                  "  Active partitions:  ~s~n"
2411                  "  Passive partitions: ~s~n"
2412                  "  Unindexable:        ~s~n",
2413                  [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
2414                   ?LOG_USERDATA(?group_id(State)), condense(ApplyActiveList),
2415                   condense(ApplyPassiveList), condense(ApplyUnindexableList)]),
2416        case (ActiveInCleanup == []) andalso (PassiveInCleanup == []) of
2417        true ->
2418            NewPendingTrans = nil;
2419        false ->
2420            NewPendingTrans = #set_view_transition{
2421                active = ActiveInCleanup,
2422                passive = PassiveInCleanup,
2423                unindexable = NewUnindexablePending
2424            }
2425        end,
2426        State4 = set_pending_transition(State3, NewPendingTrans),
2427        State5 = persist_partition_states(
2428            State4, ApplyActiveList, ApplyPassiveList, [], NewPendingTrans, ApplyUnindexableList),
2429        State6 = notify_pending_transition_waiters(State5),
2430        NewState = case dict:size(State6#state.update_listeners) > 0 of
2431        true ->
2432            start_updater(State6);
2433        false ->
2434            State6
2435        end;
2436    false ->
2437        NewState = State3
2438    end,
2439    after_partition_states_updated(NewState, UpdaterWasRunning).
2440
2441
2442-spec notify_pending_transition_waiters(#state{}) -> #state{}.
2443notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State) ->
2444    State;
2445notify_pending_transition_waiters(State) ->
2446    #state{
2447        pending_transition_waiters = TransWaiters,
2448        group = Group,
2449        replica_partitions = RepParts,
2450        waiting_list = WaitList
2451    } = State,
2452    CurSeqs = active_partition_seqs(State),
2453    {TransWaiters2, WaitList2, GroupReplyList, TriggerGroupUpdate} =
2454        lists:foldr(
2455            fun({From, Req} = TransWaiter, {AccTrans, AccWait, ReplyAcc, AccTriggerUp}) ->
2456                #set_view_group_req{
2457                    stale = Stale,
2458                    debug = Debug
2459                } = Req,
2460                case is_any_partition_pending(Req, Group) of
2461                true ->
2462                    {[TransWaiter | AccTrans], AccWait, ReplyAcc, AccTriggerUp};
2463                false when Stale == ok ->
2464                    Waiter = #waiter{from = From, debug = Debug},
2465                    {AccTrans, AccWait, [Waiter | ReplyAcc], AccTriggerUp};
2466                false when Stale == update_after ->
2467                    Waiter = #waiter{from = From, debug = Debug},
2468                    {AccTrans, AccWait, [Waiter | ReplyAcc], true};
2469                false when Stale == false ->
2470                    Waiter = #waiter{from = From, debug = Debug, seqs = CurSeqs},
2471                    {AccTrans, [Waiter | AccWait], ReplyAcc, true}
2472                end
2473            end,
2474            {[], WaitList, [], false},
2475            TransWaiters),
2476    [] = reply_with_group(Group, RepParts, GroupReplyList),
2477    WaitList3 = reply_with_group(Group, RepParts, WaitList2),
2478    State2 = State#state{
2479        pending_transition_waiters = TransWaiters2,
2480        waiting_list = WaitList3
2481    },
2482    case TriggerGroupUpdate of
2483    true ->
2484        start_updater(State2);
2485    false ->
2486        State2
2487    end.
2488
2489
2490-spec notify_pending_transition_waiters(#state{}, term()) -> #state{}.
2491notify_pending_transition_waiters(#state{pending_transition_waiters = []} = State, _Reply) ->
2492    State;
2493notify_pending_transition_waiters(#state{pending_transition_waiters = Waiters} = State, Reply) ->
2494    lists:foreach(fun(F) -> catch gen_server:reply(F, Reply) end, Waiters),
2495    State#state{pending_transition_waiters = []}.
2496
2497
2498-spec set_passive_partitions(ordsets:ordset(partition_id()),
2499                             bitmask(),
2500                             bitmask(),
2501                             partition_seqs(),
2502                             partition_versions()) ->
2503                                    {'ok', bitmask(), bitmask(),
2504                                     partition_seqs(), partition_versions()}.
2505set_passive_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2506    {ok, Abitmask, Pbitmask, Seqs, Versions};
2507
2508set_passive_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2509    PartMask = 1 bsl PartId,
2510    case PartMask band Abitmask of
2511    0 ->
2512        case PartMask band Pbitmask of
2513        PartMask ->
2514            set_passive_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2515        0 ->
2516            NewSeqs = lists:ukeymerge(1, [{PartId, 0}], Seqs),
2517            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2518            set_passive_partitions(
2519                Rest, Abitmask, Pbitmask bor PartMask, NewSeqs, NewVersions)
2520        end;
2521    PartMask ->
2522        set_passive_partitions(
2523            Rest, Abitmask bxor PartMask, Pbitmask bor PartMask, Seqs,
2524            Versions)
2525    end.
2526
2527
2528-spec set_active_partitions(ordsets:ordset(partition_id()),
2529                            bitmask(),
2530                            bitmask(),
2531                            partition_seqs(),
2532                            partition_versions()) ->
2533                                   {'ok', bitmask(), bitmask(),
2534                                    partition_seqs(), partition_versions()}.
2535set_active_partitions([], Abitmask, Pbitmask, Seqs, Versions) ->
2536    {ok, Abitmask, Pbitmask, Seqs, Versions};
2537
2538set_active_partitions([PartId | Rest], Abitmask, Pbitmask, Seqs, Versions) ->
2539    PartMask = 1 bsl PartId,
2540    case PartMask band Pbitmask of
2541    0 ->
2542        case PartMask band Abitmask of
2543        PartMask ->
2544            set_active_partitions(Rest, Abitmask, Pbitmask, Seqs, Versions);
2545        0 ->
2546            NewSeqs = lists:ukeymerge(1, Seqs, [{PartId, 0}]),
2547            NewVersions = lists:ukeymerge(1, [{PartId, [{0, 0}]}], Versions),
2548            set_active_partitions(Rest, Abitmask bor PartMask, Pbitmask,
2549                NewSeqs, NewVersions)
2550        end;
2551    PartMask ->
2552        set_active_partitions(
2553            Rest, Abitmask bor PartMask, Pbitmask bxor PartMask, Seqs,
2554            Versions)
2555    end.
2556
2557
2558-spec set_cleanup_partitions(ordsets:ordset(partition_id()),
2559                             bitmask(),
2560                             bitmask(),
2561                             bitmask(),
2562                             partition_seqs(),
2563                             partition_versions()) ->
2564                                    {'ok', bitmask(), bitmask(), bitmask(),
2565                                     partition_seqs(), partition_versions()}.
2566set_cleanup_partitions([], Abitmask, Pbitmask, Cbitmask, Seqs, Versions) ->
2567    {ok, Abitmask, Pbitmask, Cbitmask, Seqs, Versions};
2568
2569set_cleanup_partitions([PartId | Rest], Abitmask, Pbitmask, Cbitmask, Seqs,
2570        Versions) ->
2571    PartMask = 1 bsl PartId,
2572    case PartMask band Cbitmask of
2573    PartMask ->
2574        set_cleanup_partitions(
2575            Rest, Abitmask, Pbitmask, Cbitmask, Seqs, Versions);
2576    0 ->
2577        Seqs2 = lists:keydelete(PartId, 1, Seqs),
2578        Versions2 = lists:keydelete(PartId, 1, Versions),
2579        Cbitmask2 = Cbitmask bor PartMask,
2580        case PartMask band Abitmask of
2581        PartMask ->
2582            set_cleanup_partitions(
2583                Rest, Abitmask bxor PartMask, Pbitmask, Cbitmask2, Seqs2,
2584                Versions2);
2585        0 ->
2586            case (PartMask band Pbitmask) of
2587            PartMask ->
2588                set_cleanup_partitions(
2589                    Rest, Abitmask, Pbitmask bxor PartMask, Cbitmask2, Seqs2,
2590                    Versions2);
2591            0 ->
2592                set_cleanup_partitions(
2593                    Rest, Abitmask, Pbitmask, Cbitmask, Seqs, Versions)
2594            end
2595        end
2596    end.
2597
2598
2599-spec update_header(#state{},
2600                    bitmask(),
2601                    bitmask(),
2602                    bitmask(),
2603                    partition_seqs(),
2604                    partition_seqs(),
2605                    ordsets:ordset(partition_id()),
2606                    ordsets:ordset(partition_id()),
2607                    #set_view_transition{} | 'nil',
2608                    partition_versions()) -> #state{}.
2609update_header(State, NewAbitmask, NewPbitmask, NewCbitmask, NewSeqs,
2610              NewUnindexableSeqs, NewRelicasOnTransfer, NewReplicaParts,
2611              NewPendingTrans, NewPartVersions) ->
2612    #state{
2613        group = #set_view_group{
2614            index_header =
2615                #set_view_index_header{
2616                    abitmask = Abitmask,
2617                    pbitmask = Pbitmask,
2618                    cbitmask = Cbitmask,
2619                    replicas_on_transfer = ReplicasOnTransfer,
2620                    unindexable_seqs = UnindexableSeqs,
2621                    pending_transition = PendingTrans,
2622                    partition_versions = PartVersions
2623                } = Header
2624        } = Group,
2625        replica_partitions = ReplicaParts
2626    } = State,
2627    NewGroup = Group#set_view_group{
2628        index_header = Header#set_view_index_header{
2629            abitmask = NewAbitmask,
2630            pbitmask = NewPbitmask,
2631            cbitmask = NewCbitmask,
2632            seqs = NewSeqs,
2633            unindexable_seqs = NewUnindexableSeqs,
2634            replicas_on_transfer = NewRelicasOnTransfer,
2635            pending_transition = NewPendingTrans,
2636            partition_versions = NewPartVersions
2637        }
2638    },
2639    NewGroup2 = remove_duplicate_partitions(NewGroup),
2640    #set_view_group{
2641        index_header = #set_view_index_header{
2642            partition_versions = NewPartVersions2
2643        }
2644    } = NewGroup2,
2645    NewState = State#state{
2646        group = NewGroup2,
2647        replica_partitions = NewReplicaParts
2648    },
2649    FsyncHeader = (NewCbitmask /= Cbitmask),
2650    {ok, HeaderPos} = commit_header(NewState#state.group, FsyncHeader),
2651    NewGroup3 = (NewState#state.group)#set_view_group{
2652        header_pos = HeaderPos
2653    },
2654    ?LOG_INFO("Set view `~s`, ~s (~s) group `~s`, partition states updated~n"
2655              "active partitions before:    ~s~n"
2656              "active partitions after:     ~s~n"
2657              "passive partitions before:   ~s~n"
2658              "passive partitions after:    ~s~n"
2659              "cleanup partitions before:   ~s~n"
2660              "cleanup partitions after:    ~s~n"
2661              "unindexable partitions:      ~w~n"
2662              "replica partitions before:   ~s~n"
2663              "replica partitions after:    ~s~n"
2664              "replicas on transfer before: ~s~n"
2665              "replicas on transfer after:  ~s~n"
2666              "pending transition before:~n"
2667              "  active:      ~s~n"
2668              "  passive:     ~s~n"
2669              "  unindexable: ~s~n"
2670              "pending transition after:~n"
2671              "  active:      ~s~n"
2672              "  passive:     ~s~n"
2673              "  unindexable: ~s~n",
2674              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
2675               ?LOG_USERDATA(?group_id(State)),
2676               condense(couch_set_view_util:decode_bitmask(Abitmask)),
2677               condense(couch_set_view_util:decode_bitmask(NewAbitmask)),
2678               condense(couch_set_view_util:decode_bitmask(Pbitmask)),
2679               condense(couch_set_view_util:decode_bitmask(NewPbitmask)),
2680               condense(couch_set_view_util:decode_bitmask(Cbitmask)),
2681               condense(couch_set_view_util:decode_bitmask(NewCbitmask)),
2682               UnindexableSeqs,
2683               condense(ReplicaParts),
2684               condense(NewReplicaParts),
2685               condense(ReplicasOnTransfer),
2686               condense(NewRelicasOnTransfer),
2687               condense(?pending_transition_active(PendingTrans)),
2688               condense(?pending_transition_passive(PendingTrans)),
2689               condense(?pending_transition_unindexable(PendingTrans)),
2690               condense(?pending_transition_active(NewPendingTrans)),
2691               condense(?pending_transition_passive(NewPendingTrans)),
2692               condense(?pending_transition_unindexable(NewPendingTrans))]),
2693    ?LOG_DEBUG("Set view `~s`, ~s (~s) group `~s`, partition states updated~n"
2694               "partition versions before:~n~w~n"
2695               "partition versions after:~n~w~n",
2696               [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
2697                ?LOG_USERDATA(?group_id(State)),
2698                PartVersions,
2699                NewPartVersions2]),
2700    NewState#state{group = NewGroup3}.
2701
2702
2703-spec maybe_start_cleaner(#state{}) -> #state{}.
2704maybe_start_cleaner(#state{cleaner_pid = Pid} = State) when is_pid(Pid) ->
2705    State;
2706maybe_start_cleaner(#state{auto_cleanup = false} = State) ->
2707    State;
2708maybe_start_cleaner(#state{group = Group} = State) ->
2709    case is_pid(State#state.compactor_pid) orelse
2710        is_pid(State#state.updater_pid) orelse (?set_cbitmask(Group) == 0) of
2711    true ->
2712        State;
2713    false ->
2714        Cleaner = spawn_link(fun() -> exit(cleaner(State)) end),
2715        ?LOG_INFO("Started cleanup process ~p for"
2716                  " set view `~s`, ~s (~s) group `~s`",
2717                  [Cleaner, ?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
2718                   ?LOG_USERDATA(?group_id(State))]),
2719        State#state{cleaner_pid = Cleaner}
2720    end.
2721
2722
2723-spec stop_cleaner(#state{}) -> #state{}.
2724stop_cleaner(#state{cleaner_pid = nil} = State) ->
2725    State;
2726stop_cleaner(#state{cleaner_pid = Pid, group = Group} = State) when is_pid(Pid) ->
2727    MRef = erlang:monitor(process, Pid),
2728    Pid ! stop,
2729    unlink(Pid),
2730    ?LOG_INFO("Stopping cleanup process for set view `~s`, group `~s` (~s)",
2731              [?LOG_USERDATA(?set_name(State)), ?LOG_USERDATA(?group_id(State)), ?category(State)]),
2732    % NOTE vmx 2015-06-29: There is currently no way to cleanly stop the
2733    % spatial view cleaner while it is running. Hence do a hard stop right way.
2734    case Group#set_view_group.mod of
2735    spatial_view ->
2736        couch_util:shutdown_sync(Pid);
2737    _ ->
2738        ok
2739    end,
2740    NewState = receive
2741    {'EXIT', Pid, Reason} ->
2742        after_cleaner_stopped(State, Reason);
2743    {'DOWN', MRef, process, Pid, Reason} ->
2744        receive {'EXIT', Pid, _} -> ok after 0 -> ok end,
2745        after_cleaner_stopped(State, Reason)
2746    after 5000 ->
2747        couch_set_view_util:shutdown_cleaner(Group, Pid),
2748        ok = couch_file:refresh_eof(Group#set_view_group.fd),
2749        ?LOG_ERROR("Timeout stopping cleanup process ~p for"
2750                   " set view `~s`, ~s (~s) group `~s`",
2751                   [Pid, ?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
2752                    ?LOG_USERDATA(?group_id(State))]),
2753        State#state{cleaner_pid = nil}
2754    end,
2755    erlang:demonitor(MRef, [flush]),
2756    NewState.
2757
2758
2759after_cleaner_stopped(State, {clean_group, CleanGroup, Count, Time}) ->
2760    #state{group = OldGroup} = State,
2761    {ok, NewGroup0} = couch_set_view_util:refresh_viewgroup_header(CleanGroup),
2762    NewGroup = update_clean_group_seqs(OldGroup, NewGroup0),
2763    ?LOG_INFO("Stopped cleanup process for"
2764              " set view `~s`, ~s (~s) group `~s`.~n"
2765              "Removed ~p values from the index in ~.3f seconds~n"
2766              "New set of partitions to cleanup: ~s~n"
2767              "Old set of partitions to cleanup: ~s~n",
2768              [?LOG_USERDATA(?set_name(State)), ?type(State), ?category(State),
2769               ?LOG_USERDATA(?group_id(State)), Count, Time,
2770               condense(couch_set_view_util:decode_bitmask(
2771                          ?set_cbitmask(NewGroup))),
2772               condense(couch_set_view_util:decode_bitmask(
2773                          ?set_cbitmask(OldGroup)))]),
2774    case ?set_cbitmask(NewGroup) of
2775    0 ->
2776        inc_cleanups(State#state.group, Time, Count, false);
2777    _ ->
2778        ?inc_cleanup_stops(State#state.group)
2779    end,
2780    State#state{
2781        group = NewGroup,
2782        cleaner_pid = nil
2783    };
2784after_cleaner_stopped(#state{cleaner_pid = Pid, group = Group} = State, Reason) ->
2785    ok = couch_file:refresh_eof(Group#set_view_group.fd),
2786    ?LOG_ERROR("Cleanup process ~p for set view `~s`, ~s (~s) group `~s`,"
2787               " died with reason: ~p",
2788               [Pid, ?LOG_USERDATA(?set_name(State)), ?type(