1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view).
16-behaviour(gen_server).
17
18% public API
19-export([start_link/2]).
20-export([define_group/4]).
21-export([cleanup_index_files/2, set_index_dir/3]).
22-export([get_group_data_size/3, get_group_signature/3]).
23-export([reset_utilization_stats/3, get_utilization_stats/3]).
24-export([set_partition_states/6, add_replica_partitions/4, remove_replica_partitions/4]).
25-export([mark_partitions_unindexable/4, mark_partitions_indexable/4]).
26-export([monitor_partition_update/4, demonitor_partition_update/4]).
27-export([trigger_update/4, trigger_replica_update/4]).
28% Exported for ns_server
29-export([delete_index_dir/2]).
30-export([get_indexed_seqs/4]).
31
32% Internal, not meant to be used by components other than the view engine.
33-export([get_group_pid/4, get_group/4, release_group/1, get_group_info/4]).
34-export([get_map_view/4, get_reduce_view/4]).
35-export([fold/5, fold_reduce/5]).
36-export([get_row_count/2, reduce_to_count/1, extract_map_view/1]).
37-export([map_view_key_compare/2, reduce_view_key_compare/2]).
38-export([get_map_view0/2, get_reduce_view0/2]).
39-export([inc_group_access_stat/1]).
40
41% Exported for spatial index
42-export([modify_bitmasks/2]).
43
44% gen_server callbacks
45-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
46
47-include("couch_db.hrl").
48-include_lib("couch_index_merger/include/couch_index_merger.hrl").
49-include_lib("couch_index_merger/include/couch_view_merger.hrl").
50-include_lib("couch_set_view/include/couch_set_view.hrl").
51
52
53-record(server, {
54    root_dir = [],
55    db_notifier,
56    stats_ets       :: atom(),
57    name_to_sig_ets :: atom(),
58    sig_to_pid_ets  :: atom(),
59    pid_to_sig_ets  :: atom(),
60    name            :: atom(),
61    indexer         :: mapreduce_view | spatial_view
62}).
63
64
65% For a "set view" we have multiple databases which are indexed.
66% The set has a name which is a prefix common to all source databases.
67% Each database is designated as a "partition" and internally identified
68% with an integer between 0 and N - 1 (N is total number of partitions).
69% For example, if the set name is "myset", and the number of partitions
70% is 4, then the "set view" indexer will index the following 4 databases:
71%
72%    "myset/0", "myset/1", "myset/2" and "myset/3"
73%
74% Not all paritions are necessarily indexed, so when the set view is created,
75% the caller should specify not only the set name but also:
76% 1) Total number of partitions
77% 2) A list of active partition IDs
78%
79% Once a view is created, the caller can (via other APIs):
80% 1) Change the list of active partitions (add or remove)
81% 2) Add several "passive" partitions - these are partitions that are
82%    indexed but whose results are not included in queries
83% 3) Define a list of partitions to cleanup from the index. All
84%    the view key/values that originated from any of these
85%    partitions will eventually be removed from the index
86%
87-spec get_group(atom(),
88                binary(),
89                binary() | #doc{},
90                #set_view_group_req{}) -> {'ok', #set_view_group{}}.
91get_group(Mod, SetName, DDoc, #set_view_group_req{type = main} = Req) ->
92    GroupPid = get_group_pid(Mod, SetName, DDoc,
93        Req#set_view_group_req.category),
94    case couch_set_view_group:request_group(GroupPid, Req) of
95    {ok, Group} ->
96        {ok, Group};
97    {error, view_undefined} ->
98        % caller must call ?MODULE:define_group/3
99        throw(view_undefined);
100    Error ->
101        throw(Error)
102    end;
103get_group(Mod, SetName, DDoc, #set_view_group_req{type = replica} = Req) ->
104    {ok, MainGroup} = get_group(
105        Mod, SetName, DDoc, Req#set_view_group_req{type = main, stale = ok}),
106    release_group(MainGroup),
107    case MainGroup#set_view_group.replica_pid of
108    nil ->
109        throw({error, <<"Requested replica group doesn't exist">>});
110    ReplicaPid ->
111        case couch_set_view_group:request_group(ReplicaPid, Req) of
112        {ok, Group} ->
113            {ok, Group};
114        {error, view_undefined} ->
115            % caller must call ?MODULE:define_group/3
116            throw(view_undefined);
117        Error ->
118            throw(Error)
119        end
120    end.
121
122
123-spec get_group_pid(atom(), binary(), binary() | #doc{}, dev | prod) -> pid().
124get_group_pid(Mod, SetName, #doc{} = DDoc, Category) ->
125    Group = Mod:design_doc_to_set_view_group(SetName, DDoc),
126    StatsEts = Mod:stats_ets(Category),
127    get_group_server(SetName, Group#set_view_group{
128        category = Category,
129        stats_ets = StatsEts
130    });
131get_group_pid(Mod, SetName, DDocId, Category) when is_binary(DDocId) ->
132    get_group_server(SetName, open_set_group(Mod, SetName, DDocId, Category)).
133
134
135-spec release_group(#set_view_group{}) -> ok.
136release_group(Group) ->
137    couch_set_view_group:release_group(Group).
138
139
140-spec define_group(atom(), binary(), binary(), #set_view_params{}) -> 'ok'.
141define_group(Mod, SetName, DDocId, #set_view_params{} = Params) ->
142    try
143        GroupPid = get_group_pid(Mod, SetName, DDocId, prod),
144        case couch_set_view_group:define_view(GroupPid, Params) of
145        ok ->
146            ok;
147        Error ->
148            throw(Error)
149        end
150    catch
151        throw:{error, empty_group} ->
152            ok;
153        exit:{normal, _} ->
154            ?LOG_INFO("Group process normally exited. So retrying..", []),
155            timer:sleep(100),
156            define_group(Mod, SetName, DDocId, Params)
157    end.
158
159
160% This is an incremental operation. That is, the following sequence of calls:
161%
162% set_partitions_states(<<"myset">>, <<"_design/foo">>, [0, 1], [5], [8])
163% set_partitions_states(<<"myset">>, <<"_design/foo">>, [2, 3], [6, 7], [9])
164% set_partitions_states(<<"myset">>, <<"_design/foo">>, [], [], [10])
165%
166% Will cause the set view index to have the following state:
167%
168%   active partitions:   [0, 1, 2, 3]
169%   passive partitions:  [5, 6, 7]
170%   cleanup partitions:  [8, 9, 10]
171%
172% Also, to move partition(s) from one state to another, simply do a call
173% where that partition(s) is listed in the new desired state. Example:
174%
175% set_partitions_states(<<"myset">>, <<"_design/foo">>, [0, 1, 2], [3], [4])
176% set_partitions_states(<<"myset">>, <<"_design/foo">>, [], [2], [])
177%
178% This will result in the following set view index state:
179%
180%   active partitions:   [0, 1]
181%   passive_partitions:  [2, 3]
182%   cleanup_partitions:  [4]
183%
184% (partition 2 was first set to active state and then moved into the passive state)
185%
186% New partitions are added by specifying them for the first time in the active
187% or passive state lists.
188%
189% If a request asks to set to active a partition that is currently marked as a
190% replica partition, data from that partition will start to be transfered from
191% the replica index into the main index.
192%
193-spec set_partition_states(atom(),
194                           binary(),
195                           binary(),
196                           ordsets:ordset(partition_id()),
197                           ordsets:ordset(partition_id()),
198                           ordsets:ordset(partition_id())) -> 'ok'.
199set_partition_states(Mod, SetName, DDocId, ActivePartitions, PassivePartitions, CleanupPartitions) ->
200    try
201        GroupPid = get_group_pid(Mod, SetName, DDocId, prod),
202        case couch_set_view_group:set_state(
203            GroupPid, ActivePartitions, PassivePartitions, CleanupPartitions) of
204        ok ->
205            ok;
206        Error ->
207            throw(Error)
208        end
209    catch throw:{error, empty_group} ->
210        ok
211    end.
212
213
214% Mark a set of partitions as replicas. They will be indexed in the replica index.
215% This will only work if the view was defined with the option "use_replica_index".
216%
217% All the given partitions must not be in the active nor passive state.
218% Like set_partition_states, this is an incremental operation.
219%
220-spec add_replica_partitions(atom(), binary(), binary(), ordsets:ordset(partition_id())) -> 'ok'.
221add_replica_partitions(Mod, SetName, DDocId, Partitions) ->
222    try
223        GroupPid = get_group_pid(Mod, SetName, DDocId, prod),
224        case couch_set_view_group:add_replica_partitions(GroupPid, Partitions) of
225        ok ->
226            ok;
227        Error ->
228            throw(Error)
229        end
230    catch throw:{error, empty_group} ->
231        ok
232    end.
233
234
235% Unmark a set of partitions as replicas. Their data will be cleaned from the
236% replica index. This will only work if the view was defined with the option
237% "use_replica_index".
238%
239% This is a no-op for partitions not currently marked as replicas.
240% Like set_partition_states, this is an incremental operation.
241%
242-spec remove_replica_partitions(atom(), binary(), binary(), ordsets:ordset(partition_id())) -> 'ok'.
243remove_replica_partitions(Mod, SetName, DDocId, Partitions) ->
244    try
245        GroupPid = get_group_pid(Mod, SetName, DDocId, prod),
246        case couch_set_view_group:remove_replica_partitions(GroupPid, Partitions) of
247        ok ->
248            ok;
249        Error ->
250            throw(Error)
251        end
252    catch throw:{error, empty_group} ->
253        ok
254    end.
255
256
257% Mark a set of partitions, currently either in the active or passive states, as
258% unindexable. This means future index updates will ignore new changes found in the
259% corresponding partition databases. This operation doesn't remove any data from
260% the index, nor does it start any cleanup operation. Queries will still see
261% and get data from the corresponding partitions.
262-spec mark_partitions_unindexable(atom(), binary(), binary(), ordsets:ordset(partition_id())) -> 'ok'.
263mark_partitions_unindexable(_Mod, _SetName, _DDocId, []) ->
264    ok;
265mark_partitions_unindexable(Mod, SetName, DDocId, Partitions) ->
266    try
267        Pid = get_group_pid(Mod, SetName, DDocId, prod),
268        case couch_set_view_group:mark_as_unindexable(Pid, Partitions) of
269        ok ->
270            ok;
271        Error ->
272            throw(Error)
273        end
274    catch throw:{error, empty_group} ->
275        ok
276    end.
277
278
279% This is the counterpart of mark_partitions_unindexable/3. It marks a set of partitions
280% as indexable again, meaning future index updates will process all new partition database
281% changes (changes that happened since the last index update prior to the
282% mark_partitions_unindexable/3 call). The given partitions are currently in either the
283% active or passive states and were marked as unindexable before.
284-spec mark_partitions_indexable(atom(), binary(), binary(), ordsets:ordset(partition_id())) -> 'ok'.
285mark_partitions_indexable(_Mod, _SetName, _DDocId, []) ->
286    ok;
287mark_partitions_indexable(Mod, SetName, DDocId, Partitions) ->
288    try
289        Pid = get_group_pid(Mod, SetName, DDocId, prod),
290        case couch_set_view_group:mark_as_indexable(Pid, Partitions) of
291        ok ->
292            ok;
293        Error ->
294            throw(Error)
295        end
296    catch throw:{error, empty_group} ->
297        ok
298    end.
299
300
301% Allow a caller to be notified, via a message, when a particular partition is
302% up to date in the index (its current database sequence number matches the
303% one in the index for that partition).
304% When the partition is up to date, the caller will receive a message with the
305% following shape:
306%
307%    {Ref::reference(), updated}
308%
309% Where the reference is the one returned when this function is called.
310% If the underlying view group process dies before the partition is up to date,
311% the caller will receive a message with the following shape:
312%
313%    {Ref::reference(), {shutdown, Reason::term()}}
314%
315% If the requested partition is marked for cleanup (because some process asked
316% for that or the partition's database was deleted), the caller will receive a
317% message with the following shape:
318%
319%    {Ref::reference(), marked_for_cleanup}
320%
321% If an error happens in the index updater, preventing the caller from ever getting
322% a notification, the caller will receive a message with the following shape:
323%
324%    {Ref::reference(), {updater_error, Reason::term()}}
325%
326% The target partition must be either an active or passive partition.
327% Replica partitions are not supported at the moment.
328-spec monitor_partition_update(atom(), binary(), binary(), partition_id()) -> reference().
329monitor_partition_update(Mod, SetName, DDocId, PartitionId) ->
330    Ref = make_ref(),
331    try
332        Pid = get_group_pid(Mod, SetName, DDocId, prod),
333        case couch_set_view_group:monitor_partition_update(Pid, PartitionId, Ref, self()) of
334        ok ->
335            Ref;
336        Error ->
337            throw(Error)
338        end
339    catch throw:{error, empty_group} ->
340        self() ! {Ref, updated},
341        Ref
342    end.
343
344
345% Stop monitoring for notification of when a partition is fully indexed.
346% This is a counter part to monitor_partition_update/3. This call flushes
347% any monitor messsages from the callers mailbox.
348-spec demonitor_partition_update(atom(), binary(), binary(), reference()) -> 'ok'.
349demonitor_partition_update(Mod, SetName, DDocId, Ref) ->
350    receive
351    {Ref, _} ->
352        ok
353    after 0 ->
354        try
355            Pid = get_group_pid(Mod, SetName, DDocId, prod),
356            ok = couch_set_view_group:demonitor_partition_update(Pid, Ref),
357            receive
358            {Ref, _} ->
359                ok
360            after 0 ->
361                ok
362            end
363        catch throw:{error, empty_group} ->
364            ok
365        end
366    end.
367
368
369% Trigger a view group index update if there are at least N new changes
370% (from all the active/passive partitions) to index.
371-spec trigger_update(atom(), binary(), binary(), non_neg_integer()) -> ok.
372trigger_update(Mod, SetName, DDocId, MinNumChanges) ->
373    try
374        Pid = get_group_pid(Mod, SetName, DDocId, prod),
375        ok = gen_server:cast(Pid, {update, MinNumChanges})
376    catch throw:{error, empty_group} ->
377        ok
378    end.
379
380
381% Trigger a replica view group index update if there are at least N new
382% changes (from all the currently defined replica partitions) to index.
383-spec trigger_replica_update(atom(), binary(), binary(), non_neg_integer()) -> ok.
384trigger_replica_update(Mod, SetName, DDocId, MinNumChanges) ->
385    try
386        Pid = get_group_pid(Mod, SetName, DDocId, prod),
387        ok = gen_server:cast(Pid, {update_replica, MinNumChanges})
388    catch throw:{error, empty_group} ->
389        ok
390    end.
391
392
393-spec get_indexed_seqs(atom(), binary(), binary(), dev | prod) ->
394                              {ok, PartSeqs::partition_seqs()}.
395get_indexed_seqs(Mod, SetName, DDocId, Category) ->
396    Pid = couch_set_view:get_group_pid(Mod, SetName, DDocId, Category),
397    {ok, Group} = gen_server:call(Pid, request_group, infinity),
398    {ok, RepPid} = gen_server:call(Pid, replica_pid, infinity),
399    MainSeqs = ordsets:union(?set_seqs(Group), ?set_unindexable_seqs(Group)),
400    case is_pid(RepPid) of
401    false ->
402        {ok, MainSeqs};
403    true ->
404        {ok, RepGroup} = gen_server:call(RepPid, request_group, infinity),
405        RepSeqs0 = ordsets:union(?set_seqs(RepGroup),
406                                 ?set_unindexable_seqs(RepGroup)),
407        RepSeqs = case ?set_replicas_on_transfer(Group) of
408        [] ->
409            RepSeqs0;
410        OnTransfer ->
411            [{P, S} || {P, S} <- RepSeqs0, not lists:member(P, OnTransfer)]
412        end,
413        {ok, ordsets:union(MainSeqs, RepSeqs)}
414    end.
415
416
417-spec get_group_server(binary(), #set_view_group{}) -> pid().
418get_group_server(_SetName, #set_view_group{views = []}) ->
419    throw({error, empty_group});
420get_group_server(SetName, Group) ->
421    #set_view_group{
422        sig = Sig,
423        category = Category,
424        mod = Mod
425    } = Group,
426    ServerName = Mod:server_name(Category),
427    SigToPidEts = Mod:sig_to_pid_ets(Category),
428    case ets:lookup(SigToPidEts, {SetName, Sig}) of
429    [{_, Pid}] when is_pid(Pid) ->
430        Pid;
431    _ ->
432        case gen_server:call(ServerName, {get_group_server, SetName, Group},
433                infinity) of
434        {ok, Pid} ->
435            Pid;
436        Error ->
437            throw(Error)
438        end
439    end.
440
441
442-spec open_set_group(atom(), binary(), binary(), dev | prod) ->
443                            #set_view_group{}.
444open_set_group(Mod, SetName, GroupId, Category) ->
445    case couch_set_view_group:open_set_group(Mod, SetName, GroupId) of
446    {ok, Group} ->
447        StatsEts = Mod:stats_ets(Category),
448        Group#set_view_group{
449            category = Category,
450            stats_ets = StatsEts
451        };
452    Error ->
453        throw(Error)
454    end.
455
456-spec start_link(dev | prod, mapreduce_view | spatial_view) ->
457                        {ok, pid()} | ignore |
458                        {error, {already_started, pid()} | term()}.
459start_link(Category, Indexer) ->
460    ServerName = Indexer:server_name(Category),
461    gen_server:start_link({local, ServerName}, ?MODULE,
462        {Category, Indexer}, []).
463
464
465% To be used only for debugging. This is a very expensive call.
466get_group_info(Mod, SetName, DDocId, Category) ->
467    GroupPid = get_group_pid(Mod, SetName, DDocId, Category),
468    {ok, _Info} = couch_set_view_group:request_group_info(GroupPid).
469
470
471get_group_data_size(Mod, SetName, DDocId) ->
472    try
473        GroupPid = get_group_pid(Mod, SetName, DDocId, prod),
474        {ok, _Info} = couch_set_view_group:get_data_size(GroupPid)
475    catch throw:{error, empty_group} ->
476        {ok, Sig} = get_group_signature(Mod, SetName, DDocId),
477        EmptyInfo = [
478            {signature, ?b2l(Sig)},
479            {disk_size, 0},
480            {data_size, 0},
481            {accesses, 0},
482            {updater_running, false},
483            {initial_build, false}
484        ],
485        {ok, EmptyInfo}
486    end.
487
488
489-spec reset_utilization_stats(atom(), binary(), binary()) -> 'ok'.
490reset_utilization_stats(Mod, SetName, DDocId) ->
491    GroupPid = get_group_pid(Mod, SetName, DDocId, prod),
492    ok = couch_set_view_group:reset_utilization_stats(GroupPid).
493
494
495-spec get_utilization_stats(atom(), binary(), binary()) ->
496                                   {'ok', [{atom() | binary(), term()}]}.
497get_utilization_stats(Mod, SetName, DDocId) ->
498    GroupPid = get_group_pid(Mod, SetName, DDocId, prod),
499    {ok, _} = couch_set_view_group:get_utilization_stats(GroupPid).
500
501
502-spec get_group_signature(atom(), binary(), binary()) -> {'ok', binary()}.
503get_group_signature(Mod, SetName, DDocId) ->
504    case couch_set_view_ddoc_cache:get_ddoc(SetName, DDocId) of
505    {ok, DDoc} ->
506        Group = Mod:design_doc_to_set_view_group(SetName, DDoc),
507        {ok, ?l2b(couch_util:to_hex(Group#set_view_group.sig))};
508    Error ->
509        throw(Error)
510    end.
511
512
513cleanup_index_files(Mod, SetName) ->
514    % load all ddocs
515    {ok, Db} = couch_db:open_int(?master_dbname(SetName), []),
516    {ok, DesignDocs} = couch_db:get_design_docs(Db),
517    couch_db:close(Db),
518
519    % make unique list of group sigs and get the file
520    % extension (which is the same for all groups)
521    Sigs = lists:map(fun(DDoc) ->
522            #set_view_group{sig = Sig} =
523                Mod:design_doc_to_set_view_group(SetName, DDoc),
524            couch_util:to_hex(Sig)
525        end,
526        [DD || DD <- DesignDocs, not DD#doc.deleted]),
527
528    Extension = Mod:index_extension(),
529    FileList = list_index_files(SetName, Extension),
530
531    % regex that matches all ddocs
532    RegExp = "("++ string:join(Sigs, "|") ++")",
533
534    % filter out the ones in use
535    DeleteFiles = case Sigs of
536    [] ->
537        FileList;
538    _ ->
539        [FilePath || FilePath <- FileList,
540            re:run(FilePath, RegExp, [{capture, none}]) =:= nomatch]
541    end,
542    % delete unused files
543    case DeleteFiles of
544    [] ->
545        ok;
546    _ ->
547        ?LOG_INFO("Deleting unused (old) set view `~s` index files:~n~n~s",
548            [?LOG_USERDATA(SetName), string:join(DeleteFiles, "\n")])
549    end,
550    RootDir = couch_config:get("couchdb", "view_index_dir"),
551    lists:foreach(
552        fun(File) -> couch_file:delete(RootDir, File, false) end,
553        DeleteFiles).
554
555list_index_files(SetName, Extension) ->
556    % call server to fetch the index files
557    RootDir = couch_config:get("couchdb", "view_index_dir"),
558    Wildcard = "*" ++ Extension ++ "*",
559    ProdIndexDir = filename:join(
560        set_index_dir(RootDir, SetName, prod), Wildcard),
561    DevIndexDir = filename:join(
562        set_index_dir(RootDir, SetName, dev), Wildcard),
563    filelib:wildcard(ProdIndexDir) ++ filelib:wildcard(DevIndexDir).
564
565
566-spec get_row_count(#set_view_group{}, #set_view{}) -> non_neg_integer().
567get_row_count(#set_view_group{replica_group = nil} = Group, View) ->
568    Mod = Group#set_view_group.mod,
569    Mod:get_row_count(View);
570get_row_count(#set_view_group{replica_group = RepGroup} = Group, View) ->
571    Mod = Group#set_view_group.mod,
572    RepView = lists:nth(View#set_view.id_num + 1, RepGroup#set_view_group.views),
573    CountMain = Mod:get_row_count(View),
574    CountRep = Mod:get_row_count(RepView),
575    CountMain + CountRep.
576
577
578extract_map_view({reduce, _N, View}) ->
579    View.
580
581
582% This case is triggered when at least one partition of the replica group is
583% active. This happens during failover, when a replica index is transferred
584% to the main index
585-spec fold_reduce(#set_view_group{},
586                  {'reduce', non_neg_integer(), #set_view{}},
587                  set_view_fold_reduce_fun(),
588                  term(),
589                  #view_query_args{}) -> {'ok', term()}.
590fold_reduce(#set_view_group{replica_group = #set_view_group{} = RepGroup} = Group, View, FoldFun, FoldAcc, ViewQueryArgs) ->
591    {reduce, NthRed, #set_view{id_num = Id}} = View,
592    RepView = {reduce, NthRed, lists:nth(Id + 1, RepGroup#set_view_group.views)},
593    ViewSpecs = [
594        #set_view_spec{
595            name = Group#set_view_group.set_name,
596            ddoc_id = Group#set_view_group.name,
597            view_name = ViewQueryArgs#view_query_args.view_name,
598            partitions = [],  % not needed in this context
599            group = Group#set_view_group{replica_group = nil},
600            view = View
601        },
602        #set_view_spec{
603            name = RepGroup#set_view_group.set_name,
604            ddoc_id = RepGroup#set_view_group.name,
605            view_name = ViewQueryArgs#view_query_args.view_name,
606            partitions = [],  % not needed in this context
607            % We want the partitions filtered like it would be a main group
608            group = RepGroup#set_view_group{type = main},
609            view = RepView
610        }
611    ],
612    MergeParams = #index_merge{
613        indexes = ViewSpecs,
614        callback = fun couch_view_merger:reduce_view_merge_callback/2,
615        user_acc = #merge_acc{fold_fun = FoldFun, acc = FoldAcc},
616        user_ctx = #user_ctx{roles = [<<"_admin">>]},
617        http_params = ViewQueryArgs,
618        make_row_fun = fun(RowData) -> RowData end,
619        extra = #view_merge{
620            keys = ViewQueryArgs#view_query_args.keys
621        }
622    },
623    #merge_acc{acc = FinalAcc} = couch_index_merger:query_index(couch_view_merger, MergeParams),
624    {ok, FinalAcc};
625
626fold_reduce(Group, View, FoldFun, FoldAcc, #view_query_args{keys = nil} = ViewQueryArgs) ->
627    KeyGroupFun = make_reduce_group_keys_fun(ViewQueryArgs#view_query_args.group_level),
628    Options = [{key_group_fun, KeyGroupFun} | mapreduce_view:make_key_options(ViewQueryArgs)],
629    do_fold_reduce(Group, View, FoldFun, FoldAcc, Options, ViewQueryArgs);
630
631fold_reduce(Group, View, FoldFun, FoldAcc, #view_query_args{keys = Keys} = ViewQueryArgs0) ->
632    KeyGroupFun = make_reduce_group_keys_fun(ViewQueryArgs0#view_query_args.group_level),
633    {_, FinalAcc} = lists:foldl(
634        fun(Key, {_, Acc}) ->
635            ViewQueryArgs = ViewQueryArgs0#view_query_args{start_key = Key, end_key = Key},
636            Options = [{key_group_fun, KeyGroupFun} | mapreduce_view:make_key_options(ViewQueryArgs)],
637            do_fold_reduce(Group, View, FoldFun, Acc, Options, ViewQueryArgs)
638        end,
639        {ok, FoldAcc},
640        Keys),
641    {ok, FinalAcc}.
642
643
644do_fold_reduce(Group, ViewInfo, Fun, Acc, Options0, ViewQueryArgs) ->
645    {reduce, NthRed, View} = ViewInfo,
646    #mapreduce_view{
647        btree = Bt,
648        reduce_funs = RedFuns
649    } = View#set_view.indexer,
650    #view_query_args{
651        filter = DoFilter,
652        group_level = GroupLevel
653    } = ViewQueryArgs,
654    Filter = case DoFilter of
655        false ->
656            false;
657        true ->
658            filter(Group)
659    end,
660
661    Options = case Filter of
662    false ->
663        Options0;
664    {true, ExcludeBitmask, IncludeBitmask} ->
665        FilterFun = fun(value, {_K, <<PartId:16, _/binary>>}) ->
666            ((1 bsl PartId) band IncludeBitmask) =/= 0;
667        (branch, <<_Count:40, PartsBitmap:?MAX_NUM_PARTITIONS, _/binary>>) ->
668            case PartsBitmap band ExcludeBitmask of
669            0 ->
670                all;
671            PartsBitmap ->
672                none;
673            _ ->
674                partial
675            end
676        end,
677        lists:keystore(filter_fun, 1, Options0, {filter_fun, FilterFun})
678    end,
679    PreResultPadding = lists:duplicate(NthRed - 1, <<>>),
680    PostResultPadding = lists:duplicate(length(RedFuns) - NthRed, <<>>),
681    ReduceFun =
682        fun(reduce, KVs) ->
683            KVs2 = couch_set_view_util:expand_dups(KVs, []),
684            {ok, Reduced} = couch_set_view_mapreduce:reduce(View, NthRed, KVs2),
685            Reduced2 = PreResultPadding ++ Reduced ++ PostResultPadding,
686            LenReductions = [<<(size(R)):16, R/binary>> || R <- Reduced2],
687            iolist_to_binary([<<0:40, 0:?MAX_NUM_PARTITIONS>> | LenReductions]);
688        (rereduce, Reds) ->
689            UserReds = lists:map(
690                fun(<<_Count:40, _BitMap:?MAX_NUM_PARTITIONS, UserRedsList/binary>>) ->
691                    [lists:nth(NthRed, couch_set_view_util:parse_reductions(UserRedsList))]
692                end,
693                Reds),
694            {ok, Reduced} = couch_set_view_mapreduce:rereduce(View, NthRed, UserReds),
695            Reduced2 = PreResultPadding ++ Reduced ++ PostResultPadding,
696            LenReductions = [<<(size(R)):16, R/binary>> || R <- Reduced2],
697            iolist_to_binary([<<0:40, 0:?MAX_NUM_PARTITIONS>> | LenReductions])
698        end,
699    WrapperFun = fun(KeyDocId, PartialReds, Acc0) ->
700            GroupedKey = case GroupLevel of
701            0 ->
702                <<"null">>;
703            _ when is_integer(GroupLevel) ->
704                {KeyJson, _DocId} = mapreduce_view:decode_key_docid(KeyDocId),
705                case is_array_key(KeyJson) of
706                true ->
707                    ?JSON_ENCODE(lists:sublist(?JSON_DECODE(KeyJson), GroupLevel));
708                false ->
709                    KeyJson
710                end;
711            _ ->
712                {KeyJson, _DocId} = mapreduce_view:decode_key_docid(KeyDocId),
713                KeyJson
714            end,
715            <<_Count:40, _BitMap:?MAX_NUM_PARTITIONS, Reds/binary>> =
716                couch_btree:final_reduce(ReduceFun, PartialReds),
717            UserRed = lists:nth(NthRed, couch_set_view_util:parse_reductions(Reds)),
718            Fun({json, GroupedKey}, {json, UserRed}, Acc0)
719        end,
720    couch_set_view_util:open_raw_read_fd(Group),
721    try
722        couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options)
723    after
724        couch_set_view_util:close_raw_read_fd(Group)
725    end.
726
727
728get_key_pos(_Key, [], _N) ->
729    0;
730get_key_pos(Key, [{Key1,_Value}|_], N) when Key == Key1 ->
731    N + 1;
732get_key_pos(Key, [_|Rest], N) ->
733    get_key_pos(Key, Rest, N+1).
734
735
736get_map_view(SetName, DDoc, ViewName, Req) ->
737    #set_view_group_req{wanted_partitions = WantedPartitions} = Req,
738    try
739        {ok, Group0} = get_group(mapreduce_view, SetName, DDoc, Req),
740        {Group, Unindexed} = modify_bitmasks(Group0, WantedPartitions),
741        case get_map_view0(ViewName, Group#set_view_group.views) of
742        {ok, View} ->
743            {ok, View, Group, Unindexed};
744        Else ->
745            release_group(Group0),
746            Else
747        end
748    catch
749    throw:{error, empty_group} ->
750        {not_found, missing_named_view}
751    end.
752
753get_map_view0(_Name, []) ->
754    {not_found, missing_named_view};
755get_map_view0(Name, [#set_view{} = View|Rest]) ->
756    MapNames = (View#set_view.indexer)#mapreduce_view.map_names,
757    case lists:member(Name, MapNames) of
758        true -> {ok, View};
759        false -> get_map_view0(Name, Rest)
760    end.
761
762
763get_reduce_view(SetName, DDoc, ViewName, Req) ->
764    #set_view_group_req{wanted_partitions = WantedPartitions} = Req,
765    try
766        {ok, Group0} = get_group(mapreduce_view, SetName, DDoc, Req),
767        {Group, Unindexed} = modify_bitmasks(Group0, WantedPartitions),
768        #set_view_group{
769            views = Views
770        } = Group,
771        case get_reduce_view0(ViewName, Views) of
772        {ok, View} ->
773            {ok, View, Group, Unindexed};
774        Else ->
775            release_group(Group0),
776            Else
777        end
778    catch
779    throw:{error, empty_group} ->
780        {not_found, missing_named_view}
781    end.
782
783get_reduce_view0(_Name, []) ->
784    {not_found, missing_named_view};
785get_reduce_view0(Name, [#set_view{} = View|Rest]) ->
786    RedFuns = (View#set_view.indexer)#mapreduce_view.reduce_funs,
787    case get_key_pos(Name, RedFuns, 0) of
788        0 -> get_reduce_view0(Name, Rest);
789        N -> {ok, {reduce, N, View}}
790    end.
791
792
793reduce_to_count(Reductions) ->
794    <<Count:40, _/binary>> =
795    couch_btree:final_reduce(
796        fun(reduce, KVs) ->
797            Count = length(couch_set_view_util:expand_dups(KVs, [])),
798            <<Count:40>>;
799        (rereduce, Reds) ->
800            Count = lists:foldl(fun(<<C:40, _/binary>>, Acc) -> Acc + C end, 0, Reds),
801            <<Count:40>>
802        end, Reductions),
803    Count.
804
805-spec inc_group_access_stat(#set_view_group{}) -> 'ok'.
806inc_group_access_stat(Group) ->
807    GroupPid = get_group_server(Group#set_view_group.set_name, Group),
808    ok = couch_set_view_group:inc_access_stat(GroupPid).
809
810% This case is triggered when at least one partition of the replica group is
811% active. This happens during failover, when a replica index is transferred
812% to the main index
813% The "ors" in the spec are for the spatial views
814-spec fold(#set_view_group{},
815           #set_view{},
816           set_view_fold_fun() | fun((tuple(), term()) -> {ok, term()}),
817           term(),
818           #view_query_args{} | tuple()) -> {'ok', term(), term()}.
819fold(#set_view_group{replica_group = #set_view_group{} = RepGroup} = Group, View, Fun, Acc, ViewQueryArgs) ->
820    RepView = lists:nth(View#set_view.id_num + 1, RepGroup#set_view_group.views),
821    case ViewQueryArgs of
822    #view_query_args{keys = Keys} ->
823        Extra = #view_merge{keys = Keys},
824        Merger = couch_view_merger;
825    _ ->
826        Extra = nil,
827        Merger = spatial_merger
828    end,
829    Mod = Group#set_view_group.mod,
830    ViewName = Mod:query_args_view_name(ViewQueryArgs),
831    ViewSpecs = [
832        #set_view_spec{
833            name = Group#set_view_group.set_name,
834            ddoc_id = Group#set_view_group.name,
835            view_name = ViewName,
836            partitions = [],  % not needed in this context
837            group = Group#set_view_group{replica_group = nil},
838            view = View
839        },
840        #set_view_spec{
841            name = RepGroup#set_view_group.set_name,
842            ddoc_id = RepGroup#set_view_group.name,
843            view_name = ViewName,
844            partitions = [],  % not needed in this context
845            % We want the partitions filtered like it would be a main group
846            group = RepGroup#set_view_group{type = main},
847            view = RepView
848        }
849    ],
850    MergeParams = #index_merge{
851        indexes = ViewSpecs,
852        callback = fun Merger:map_view_merge_callback/2,
853        user_acc = #merge_acc{fold_fun = Fun, acc = Acc},
854        user_ctx = #user_ctx{roles = [<<"_admin">>]},
855        http_params = ViewQueryArgs,
856        make_row_fun = fun(RowData) -> RowData end,
857        extra = Extra
858    },
859    #merge_acc{acc = FinalAcc} = couch_index_merger:query_index(
860        Merger, MergeParams),
861    {ok, nil, FinalAcc};
862
863fold(Group, View, Fun, Acc, #view_query_args{keys = Keys} = ViewQueryArgs0)
864        when Keys =/= nil ->
865    lists:foldl(
866        fun(Key, {ok, _, FoldAcc}) ->
867            ViewQueryArgs = ViewQueryArgs0#view_query_args{start_key = Key, end_key = Key},
868            do_fold(Group, View, Fun, FoldAcc, ViewQueryArgs)
869        end,
870        {ok, {[], []}, Acc},
871        Keys);
872fold(Group, View, Fun, Acc, ViewQueryArgs) ->
873    do_fold(Group, View, Fun, Acc, ViewQueryArgs).
874
875
876do_fold(Group, SetView, Fun, Acc, ViewQueryArgs) ->
877    View = SetView#set_view.indexer,
878    Mod = Group#set_view_group.mod,
879
880    Filter = case Mod:should_filter(ViewQueryArgs) of
881        false ->
882            false;
883        true ->
884            filter(Group)
885    end,
886
887    WrapperFun = Mod:make_wrapper_fun(Fun, Filter),
888    couch_set_view_util:open_raw_read_fd(Group),
889    try
890        Options = Mod:make_key_options(ViewQueryArgs),
891        {ok, _LastReduce, _AccResult} =
892            Mod:fold(View, WrapperFun, Acc, Options)
893    after
894        couch_set_view_util:close_raw_read_fd(Group)
895    end.
896
897
898-spec init({prod | dev, mapreduce_view | spatial_view}) -> {ok, #server{}}.
899init({Category, Indexer}) ->
900    % read configuration settings and register for configuration changes
901    RootDir = couch_config:get("couchdb", "view_index_dir"),
902    ok = couch_config:register(
903        fun("mapreduce", "function_timeout", NewTimeout) ->
904                ok = mapreduce:set_timeout(list_to_integer(NewTimeout));
905            ("mapreduce", "max_kv_size_per_doc", NewMax) ->
906                ok = mapreduce:set_max_kv_size_per_doc(list_to_integer(NewMax));
907            ("mapreduce", "optimize_doc_load", NewFlag) ->
908                ok = mapreduce:set_optimize_doc_load(list_to_atom(NewFlag))
909        end),
910
911    ok = mapreduce:set_timeout(list_to_integer(
912        couch_config:get("mapreduce", "function_timeout", "10000"))),
913
914    ok = mapreduce:set_optimize_doc_load(list_to_atom(
915        couch_config:get("mapreduce", "optimize_doc_load", "true"))),
916
917    Server = init_server(Category, Indexer),
918    % {SetName, {DDocId, Signature}}
919    ets:new(Server#server.name_to_sig_ets,
920            [bag, protected, named_table, {read_concurrency, true}]),
921    % {{SetName, Signature}, Pid | WaitListPids}
922    ets:new(Server#server.sig_to_pid_ets,
923            [set, protected, named_table, {read_concurrency, true}]),
924    % {Pid, {SetName, Sig, DDocId}}
925    ets:new(Server#server.pid_to_sig_ets, [set, private, named_table]),
926
927    ets:new(Server#server.stats_ets,
928        [set, public, named_table, {keypos, #set_view_group_stats.ets_key}]),
929
930    {ok, Notifier} = couch_db_update_notifier:start_link(
931        make_handle_db_event_fun(
932            Indexer, Server#server.name, Server#server.sig_to_pid_ets,
933            Server#server.name_to_sig_ets)),
934
935    process_flag(trap_exit, true),
936    ok = couch_file:init_delete_dir(RootDir),
937    {ok, Server#server{root_dir = RootDir, db_notifier = Notifier}}.
938
939init_server(Category, Indexer) ->
940    #server{
941        stats_ets = Indexer:stats_ets(Category),
942        name_to_sig_ets = Indexer:name_to_sig_ets(Category),
943        sig_to_pid_ets = Indexer:sig_to_pid_ets(Category),
944        pid_to_sig_ets = Indexer:pid_to_sig_ets(Category),
945        name = Indexer:server_name(Category),
946        indexer = Indexer
947    }.
948
949
950terminate(_Reason, Server) ->
951    [couch_util:shutdown_sync(Pid) || {Pid, _} <-
952            ets:tab2list(Server#server.pid_to_sig_ets)],
953    ok.
954
955
956handle_call({get_group_server, SetName, Group}, From, Server) ->
957    #set_view_group{sig = Sig} = Group,
958    case ets:lookup(Server#server.sig_to_pid_ets, {SetName, Sig}) of
959    [] ->
960        WaitList = [From],
961        _ = spawn_monitor(fun() ->
962            exit(new_group(Server#server.root_dir, SetName, Group))
963        end),
964        ets:insert(Server#server.sig_to_pid_ets, {{SetName, Sig}, WaitList}),
965        {noreply, Server};
966    [{_, WaitList}] when is_list(WaitList) ->
967        WaitList2 = [From | WaitList],
968        ets:insert(Server#server.sig_to_pid_ets, {{SetName, Sig}, WaitList2}),
969        {noreply, Server};
970    [{_, ExistingPid}] ->
971        {reply, {ok, ExistingPid}, Server}
972    end;
973
974handle_call({before_database_delete, SetName}, _From, Server) ->
975    #server{root_dir = RootDir} = Server,
976    lists:foreach(
977        fun({_SetName, {_DDocId, Sig}}) ->
978            case ets:lookup(Server#server.sig_to_pid_ets, {SetName, Sig}) of
979            [{_, Pid}] when is_pid(Pid) ->
980                try
981                    gen_server:call(Pid, before_master_delete)
982                catch
983                    exit:{noproc, _Reason} -> ok
984                end;
985            _ ->
986                ok
987            end
988        end,
989        ets:lookup(Server#server.name_to_sig_ets, SetName)),
990    true = ets:delete(Server#server.name_to_sig_ets, SetName),
991    ?LOG_INFO("Deleting index files for set `~s` because master database "
992              "is about to deleted", [?LOG_USERDATA(SetName)]),
993    try
994        delete_index_dir(RootDir, SetName)
995    catch _:Error ->
996        Stack = erlang:get_stacktrace(),
997        ?LOG_ERROR("Error deleting index files for set `~s`:~n"
998                   "  error: ~p~n  stacktrace: ~s~n",
999                   [?LOG_USERDATA(SetName), Error, ?LOG_USERDATA(Stack)])
1000    end,
1001    {reply, ok, Server};
1002
1003handle_call({ddoc_updated, SetName, #doc{deleted = false} = DDoc0}, _From, Server) ->
1004    #doc{id = DDocId} = DDoc = couch_doc:with_ejson_body(DDoc0),
1005    Indexer = Server#server.indexer,
1006    #set_view_group{sig = Sig} = Indexer:design_doc_to_set_view_group(SetName, DDoc),
1007    true = ets:insert(Server#server.name_to_sig_ets, {SetName, {DDocId, Sig}}),
1008    {reply, ok, Server};
1009
1010handle_call({ddoc_updated, SetName, #doc{id = DDocId, deleted = true}}, _From, Server) ->
1011    true = ets:match_delete(Server#server.name_to_sig_ets, {SetName, {DDocId, '$1'}}),
1012    {reply, ok, Server}.
1013
1014
1015handle_cast(Msg, Server) ->
1016    {stop, {unexpected_cast, Msg}, Server}.
1017
1018new_group(Root, SetName, Group) ->
1019    #set_view_group{
1020        name = DDocId,
1021        sig = Sig,
1022        mod = Mod
1023    } = Group,
1024    process_flag(trap_exit, true),
1025    Reply = case (catch couch_set_view_group:start_link({Root, SetName, Group})) of
1026    {ok, NewPid} ->
1027        Aliases = get_ddoc_ids_with_sig(Mod, SetName, Sig),
1028        unlink(NewPid),
1029        {ok, NewPid, Aliases};
1030    {error, Reason} ->
1031        Reason;
1032    Error ->
1033        Error
1034    end,
1035    {SetName, DDocId, Sig, Reply}.
1036
1037handle_info({'EXIT', Pid, Reason}, #server{db_notifier = Pid} = Server) ->
1038    ?LOG_ERROR("Database update notifer died with reason: ~p", [Reason]),
1039    {stop, Reason, Server};
1040
1041handle_info({'EXIT', FromPid, Reason}, Server) ->
1042    case ets:lookup(Server#server.pid_to_sig_ets, FromPid) of
1043    [] ->
1044        if Reason /= normal ->
1045            % non-updater linked process died, we propagate the error
1046            ?LOG_ERROR("Exit on non-updater process: ~p", [Reason]),
1047            exit(Reason);
1048        true -> ok
1049        end;
1050    [{_, {SetName, Sig, DDocId}}] ->
1051        delete_from_ets(FromPid, SetName, DDocId, Sig, Server),
1052        true = ets:match_delete(Server#server.name_to_sig_ets, {SetName, {'$1', Sig}})
1053    end,
1054    {noreply, Server};
1055
1056handle_info({'DOWN', _MonRef, _, _Pid, {SetName, DDocId, Sig, Reply}}, Server) ->
1057    Key = {SetName, Sig},
1058    [{_, WaitList}] = ets:lookup(Server#server.sig_to_pid_ets, Key),
1059    case Reply of
1060    {ok, NewPid, Aliases} ->
1061        lists:foreach(fun(From) -> gen_server:reply(From, {ok, NewPid}) end, WaitList),
1062        true = link(NewPid),
1063        add_to_ets(NewPid, SetName, DDocId, Sig, Server),
1064        lists:foreach(fun(AliasDDocId) ->
1065            true = ets:insert(Server#server.name_to_sig_ets, {SetName, {AliasDDocId, Sig}})
1066        end, Aliases);
1067    _ ->
1068        lists:foreach(fun(From) -> gen_server:reply(From, Reply) end, WaitList),
1069        ets:delete(Server#server.sig_to_pid_ets, Key)
1070    end,
1071    {noreply, Server}.
1072
1073add_to_ets(Pid, SetName, DDocId, Sig, Server) ->
1074    true = ets:insert(Server#server.pid_to_sig_ets, {Pid, {SetName, Sig, DDocId}}),
1075    true = ets:insert(Server#server.sig_to_pid_ets, {{SetName, Sig}, Pid}),
1076    true = ets:insert(Server#server.name_to_sig_ets, {SetName, {DDocId, Sig}}).
1077
1078delete_from_ets(Pid, SetName, DDocId, Sig, Server) ->
1079    true = ets:delete(Server#server.pid_to_sig_ets, Pid),
1080    true = ets:delete(Server#server.sig_to_pid_ets, {SetName, Sig}),
1081    true = ets:delete_object(Server#server.name_to_sig_ets, {SetName, {DDocId, Sig}}),
1082    true = ets:delete(Server#server.stats_ets, {SetName, DDocId, Sig, main}),
1083    true = ets:delete(Server#server.stats_ets, {SetName, DDocId, Sig, replica}).
1084
1085code_change(_OldVsn, State, _Extra) ->
1086    {ok, State}.
1087
1088
1089delete_index_dir(RootDir, SetName) ->
1090    ProdDirName = set_index_dir(RootDir, SetName, prod),
1091    DevDirName = set_index_dir(RootDir, SetName, dev),
1092    nuke_dir(RootDir, ProdDirName),
1093    nuke_dir(RootDir, DevDirName).
1094
1095set_index_dir(RootDir, SetName, prod) ->
1096    filename:join([RootDir, "@indexes", ?b2l(SetName)]);
1097% development views must be stored in a different directory as they have
1098% the same signature/filename as the production views
1099set_index_dir(RootDir, SetName, dev) ->
1100    filename:join([RootDir, "@indexes_dev", ?b2l(SetName)]).
1101
1102
1103nuke_dir(RootDelDir, Dir) ->
1104    case file:list_dir(Dir) of
1105    {error, enoent} -> ok; % doesn't exist
1106    {ok, Files} ->
1107        lists:foreach(
1108            fun(File)->
1109                Full = Dir ++ "/" ++ File,
1110                case couch_file:delete(RootDelDir, Full, false) of
1111                ok -> ok;
1112                {error, enoent} -> ok;
1113                {error, eperm} ->
1114                    ok = nuke_dir(RootDelDir, Full)
1115                end
1116            end,
1117            Files),
1118        ok = file:del_dir(Dir)
1119    end.
1120
1121
1122map_view_key_compare({Key1, DocId1}, {Key2, DocId2}) ->
1123    case couch_ejson_compare:less_json(Key1, Key2) of
1124    0 ->
1125        DocId1 < DocId2;
1126    LessResult ->
1127        LessResult < 0
1128    end.
1129
1130
1131reduce_view_key_compare(A, B) ->
1132    couch_ejson_compare:less_json(A, B) < 0.
1133
1134
1135modify_bitmasks(Group, []) ->
1136    {Group, []};
1137
1138modify_bitmasks(#set_view_group{replica_group = nil} = Group, Partitions) ->
1139    IndexedBitmask = ?set_abitmask(Group) bor ?set_pbitmask(Group),
1140    WantedBitmask = couch_set_view_util:build_bitmask(Partitions),
1141    UnindexedBitmask = WantedBitmask band (bnot IndexedBitmask),
1142    ABitmask2 = WantedBitmask band IndexedBitmask,
1143    PBitmask2 = (bnot ABitmask2) band IndexedBitmask,
1144    Header = (Group#set_view_group.index_header)#set_view_index_header{
1145        abitmask = ABitmask2 band (bnot ?set_cbitmask(Group)),
1146        pbitmask = PBitmask2
1147    },
1148    Unindexed = couch_set_view_util:decode_bitmask(UnindexedBitmask),
1149    {Group#set_view_group{index_header = Header}, Unindexed};
1150
1151modify_bitmasks(#set_view_group{replica_group = RepGroup} = Group, Partitions) ->
1152    IndexedBitmaskMain = ?set_abitmask(Group) bor ?set_pbitmask(Group),
1153    IndexedBitmaskRep = ?set_abitmask(RepGroup) bor ?set_pbitmask(RepGroup),
1154    WantedBitmask = couch_set_view_util:build_bitmask(Partitions),
1155
1156    UnindexedBitmaskMain = (WantedBitmask band (bnot IndexedBitmaskMain)) band (bnot IndexedBitmaskRep),
1157    UnindexedBitmaskRep = (WantedBitmask band (bnot IndexedBitmaskRep)) band (bnot IndexedBitmaskMain),
1158
1159    ABitmaskRep2 = WantedBitmask band IndexedBitmaskRep,
1160    ABitmaskMain2 = (WantedBitmask band IndexedBitmaskMain) band (bnot ABitmaskRep2),
1161
1162    PBitmaskMain2 = (bnot ABitmaskMain2) band IndexedBitmaskMain,
1163    PBitmaskRep2 = (bnot ABitmaskRep2) band IndexedBitmaskRep,
1164
1165    HeaderMain = (Group#set_view_group.index_header)#set_view_index_header{
1166        abitmask = ABitmaskMain2 band (bnot ?set_cbitmask(Group)),
1167        pbitmask = PBitmaskMain2
1168    },
1169    HeaderRep = (RepGroup#set_view_group.index_header)#set_view_index_header{
1170        abitmask = ABitmaskRep2 band (bnot ?set_cbitmask(RepGroup)),
1171        pbitmask = PBitmaskRep2
1172    },
1173    Unindexed = couch_set_view_util:decode_bitmask(UnindexedBitmaskMain bor UnindexedBitmaskRep),
1174    Group2 = Group#set_view_group{
1175        index_header = HeaderMain,
1176        replica_group = RepGroup#set_view_group{index_header = HeaderRep}
1177    },
1178    {Group2, Unindexed}.
1179
1180
1181% Wrap the handle_db_event handler so that it works independent of the
1182% indexer and production/development views.
1183make_handle_db_event_fun(Mod, ServerName, SigToPidEts, NameToSigEts) ->
1184    fun
1185    ({before_delete, DbName}) ->
1186        case couch_set_view_util:split_set_db_name(DbName) of
1187        {ok, SetName, master} ->
1188            ok = gen_server:call(
1189                ServerName, {before_database_delete, SetName}, infinity);
1190        _ ->
1191            ok
1192        end;
1193    ({ddoc_updated, {DbName, #doc{id = DDocId} = DDoc}}) ->
1194        case couch_set_view_util:split_set_db_name(DbName) of
1195        {ok, SetName, master} ->
1196            case DDoc#doc.deleted of
1197            false ->
1198                DDoc2 = couch_doc:with_ejson_body(DDoc),
1199                #set_view_group{sig = NewSig} = Mod:design_doc_to_set_view_group(
1200                    SetName, DDoc2);
1201            true ->
1202                NewSig = <<>>
1203            end,
1204
1205            lists:foreach(
1206                fun({_SetName, {_DDocId, Sig}}) ->
1207                    case ets:lookup(SigToPidEts, {SetName, Sig}) of
1208                    [{_, GroupPid}] when is_pid(GroupPid), Sig =/= NewSig->
1209                        Aliases = [
1210                            AliasDDocId || {_SetName2, {AliasDDocId, _Sig2}} <-
1211                                ets:match_object(NameToSigEts, {SetName, {'$1', Sig}}),
1212                                AliasDDocId =/= DDocId
1213                        ],
1214                        ok = gen_server:cast(GroupPid, {ddoc_updated, NewSig, Aliases});
1215                    _ ->
1216                        ok
1217                    end
1218                end,
1219                ets:match_object(NameToSigEts, {SetName, {DDocId, '$1'}})),
1220            ok = gen_server:call(ServerName, {ddoc_updated, SetName, DDoc}, infinity);
1221
1222        _ ->
1223            ok
1224        end;
1225    (_) ->
1226        ok
1227    end.
1228
1229
1230% Returns whether the results should be filtered based on a bitmask or not
1231-spec filter(#set_view_group{}) -> false | {true, bitmask(), bitmask()}.
1232filter(#set_view_group{type = main} = Group) ->
1233    case ?set_pbitmask(Group) bor ?set_cbitmask(Group) of
1234    0 ->
1235        false;
1236    ExcludeBitmask ->
1237        {true, ExcludeBitmask, ?set_abitmask(Group)}
1238    end;
1239filter(#set_view_group{type = replica} = Group) ->
1240    case ?set_abitmask(Group) bor ?set_cbitmask(Group) of
1241    0 ->
1242        false;
1243    ExcludeBitmask ->
1244        {true, ExcludeBitmask, ?set_pbitmask(Group)}
1245    end.
1246
1247
1248make_reduce_group_keys_fun(0) ->
1249    fun(_, _) -> true end;
1250make_reduce_group_keys_fun(GroupLevel) when is_integer(GroupLevel) ->
1251    fun(KeyDocId1, KeyDocId2) ->
1252        {Key1, _DocId1} = mapreduce_view:decode_key_docid(KeyDocId1),
1253        {Key2, _DocId2} = mapreduce_view:decode_key_docid(KeyDocId2),
1254        case is_array_key(Key1) andalso is_array_key(Key2) of
1255        true ->
1256            lists:sublist(?JSON_DECODE(Key1), GroupLevel) ==
1257                    lists:sublist(?JSON_DECODE(Key2), GroupLevel);
1258        false ->
1259            couch_ejson_compare:less_json(Key1, Key2) == 0
1260        end
1261    end;
1262make_reduce_group_keys_fun(_) ->
1263    fun(KeyDocId1, KeyDocId2) ->
1264        {Key1, _DocId1} = mapreduce_view:decode_key_docid(KeyDocId1),
1265        {Key2, _DocId2} = mapreduce_view:decode_key_docid(KeyDocId2),
1266        couch_ejson_compare:less_json(Key1, Key2) == 0
1267    end.
1268
1269is_array_key(<<"[", _/binary>>) ->
1270    true;
1271is_array_key(K) when is_binary(K) ->
1272    false.
1273
1274
1275-spec get_ddoc_ids_with_sig(atom(), binary(), binary()) -> [binary()].
1276get_ddoc_ids_with_sig(Mod, SetName, ViewGroupSig) ->
1277    {ok, Db} = couch_db:open_int(?master_dbname(SetName), []),
1278    {ok, DDocList} = couch_db:get_design_docs(Db, no_deletes),
1279    ok = couch_db:close(Db),
1280    lists:foldl(
1281        fun(#doc{id = Id} = DDoc, Acc) ->
1282            case Mod:design_doc_to_set_view_group(SetName, DDoc) of
1283            #set_view_group{sig = ViewGroupSig} ->
1284                [Id | Acc];
1285            #set_view_group{sig = _OtherSig} ->
1286                Acc
1287            end
1288        end,
1289        [], DDocList).
1290