1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_view_merger).
16
17% export callbacks
18-export([parse_http_params/4, make_funs/3, get_skip_and_limit/1]).
19-export([make_event_fun/2, view_qs/2, process_extra_params/2]).
20-export([map_view_merge_callback/2, reduce_view_merge_callback/2]).
21-export([simple_set_view_query/3]).
22
23% exports for spatial_merger
24-export([queue_debug_info/4, debug_info/3, get_set_view/5,
25         prepare_set_view/5]).
26
27-include("couch_db.hrl").
28-include_lib("couch_index_merger/include/couch_index_merger.hrl").
29-include_lib("couch_index_merger/include/couch_view_merger.hrl").
30-include_lib("couch_set_view/include/couch_set_view.hrl").
31
32-define(LOCAL, <<"local">>).
33
34-import(couch_util, [
35    get_value/2,
36    get_value/3,
37    to_binary/1,
38    get_nested_json_value/2
39]).
40
41-define(DEFAULT_STALENESS, update_after).
42
43
44% callback!
45parse_http_params(Req, DDoc, ViewName, #view_merge{keys = Keys}) ->
46    % view type =~ query type
47    ViewType0 = view_type(DDoc, ViewName),
48    ViewType = case {ViewType0, couch_httpd:qs_value(Req, "reduce", "true")} of
49    {reduce, "false"} ->
50       red_map;
51    _ ->
52       ViewType0
53    end,
54
55    StaleDefined = couch_httpd:qs_value(Req, "stale") =/= undefined,
56    QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, ViewType),
57    QueryArgs1 = QueryArgs#view_query_args{view_name = ViewName},
58
59    case StaleDefined of
60    true ->
61        QueryArgs1;
62    false ->
63        QueryArgs1#view_query_args{stale = ?DEFAULT_STALENESS}
64    end.
65
66% callback!
67make_funs(DDoc, ViewName, IndexMergeParams) ->
68    #index_merge{
69       extra = Extra,
70       http_params = ViewArgs,
71       make_row_fun = MakeRowFun0
72    } = IndexMergeParams,
73    #view_merge{
74       rereduce_fun = InRedFun,
75       keys = Keys
76    } = Extra,
77    #view_query_args{
78        debug = DebugMode,
79        view_type = ViewType0,
80        direction = Dir
81    } = ViewArgs,
82    ViewType = case ViewType0 of
83    nil ->
84        view_type(DDoc, ViewName);
85    _ when ViewType0 == map; ViewType0 == red_map; ViewType0 == reduce ->
86        ViewType0
87    end,
88    RedFun = case {ViewType, InRedFun} of
89    {reduce, nil} ->
90        reduce_function(DDoc, ViewName);
91    {reduce, _} when is_binary(InRedFun) ->
92        InRedFun;
93    _ ->
94        nil
95    end,
96    LessFun = view_less_fun(Dir, ViewType, Keys),
97    {FoldFun, MergeFun} = case ViewType of
98    reduce ->
99        {fun reduce_view_folder/6, fun merge_reduce_views/1};
100    _ when ViewType =:= map; ViewType =:= red_map ->
101        {fun map_view_folder/6, fun merge_map_views/1}
102    end,
103    CollectorFun = case ViewType of
104    reduce ->
105        fun (_NumFolders, Callback2, UserAcc2) ->
106            fun (Item) ->
107                {ok, UserAcc3} = Callback2(start, UserAcc2),
108                MakeRowFun = case is_function(MakeRowFun0) of
109                true ->
110                    MakeRowFun0;
111                false ->
112                    fun(RowDetails) -> view_row_obj_reduce(RowDetails, DebugMode) end
113                end,
114                couch_index_merger:collect_rows(MakeRowFun, Callback2, UserAcc3, Item)
115            end
116        end;
117     % red_map|map
118     _ ->
119        fun (NumFolders, Callback2, UserAcc2) ->
120            fun (Item) ->
121                MakeRowFun = case is_function(MakeRowFun0) of
122                true ->
123                    MakeRowFun0;
124                false ->
125                    fun(RowDetails) -> view_row_obj_map(RowDetails, DebugMode) end
126                end,
127                couch_index_merger:collect_row_count(
128                    NumFolders, 0, MakeRowFun, Callback2, UserAcc2, Item)
129            end
130        end
131    end,
132    Extra2 = #view_merge{
133        rereduce_fun = RedFun
134    },
135    {LessFun, FoldFun, MergeFun, CollectorFun, Extra2}.
136
137% callback!
138get_skip_and_limit(#view_query_args{skip=Skip, limit=Limit}) ->
139    {Skip, Limit}.
140
141% callback!
142make_event_fun(ViewArgs, Queue) ->
143    fun(Ev) ->
144        http_view_fold(Ev, ViewArgs#view_query_args.view_type, Queue)
145    end.
146
147% callback!
148process_extra_params(#view_merge{keys = nil}, EJson) ->
149    EJson;
150process_extra_params(#view_merge{keys = Keys}, EJson) ->
151    [{<<"keys">>, Keys} | EJson].
152
153% callback!
154map_view_merge_callback(start, Acc) ->
155    {ok, Acc};
156map_view_merge_callback({start, _}, Acc) ->
157    {ok, Acc};
158map_view_merge_callback(stop, Acc) ->
159    {ok, Acc};
160map_view_merge_callback({row, Row}, Macc) ->
161    #merge_acc{
162        fold_fun = Fun,
163        acc = Acc
164    } = Macc,
165    case Fun(Row, nil, Acc) of
166    {ok, Acc2} ->
167        {ok, Macc#merge_acc{acc = Acc2}};
168    {stop, Acc2} ->
169        {stop, Macc#merge_acc{acc = Acc2}}
170    end;
171map_view_merge_callback({debug_info, _From, _Info}, Acc) ->
172    {ok, Acc}.
173
174
175reduce_view_merge_callback(start, Acc) ->
176    {ok, Acc};
177reduce_view_merge_callback({start, _}, Acc) ->
178    {ok, Acc};
179reduce_view_merge_callback(stop, Acc) ->
180    {ok, Acc};
181reduce_view_merge_callback({row, {Key, Red}}, Macc) ->
182    #merge_acc{
183        fold_fun = Fun,
184        acc = Acc
185    } = Macc,
186    case Fun(Key, Red, Acc) of
187    {ok, Acc2} ->
188        {ok, Macc#merge_acc{acc = Acc2}};
189    {stop, Acc2} ->
190        {stop, Macc#merge_acc{acc = Acc2}}
191    end;
192reduce_view_merge_callback({debug_info, _From, _Info}, Acc) ->
193    {ok, Acc}.
194
195
196view_type(DDoc, ViewName) ->
197    {ViewDef} = get_view_def(DDoc, ViewName),
198    case get_value(<<"reduce">>, ViewDef) of
199    undefined ->
200        map;
201    RedFun when is_binary(RedFun) ->
202        reduce
203    end.
204
205
206reduce_function(#doc{id = DDocId} = DDoc, ViewName) ->
207    {ViewDef} = get_view_def(DDoc, ViewName),
208    case get_value(<<"reduce">>, ViewDef) of
209    FunString when is_binary(FunString) ->
210        FunString;
211    _ ->
212        NotFoundMsg = io_lib:format("Reduce field for view `~s`, local "
213            "design document `~s`, is missing or is not a string.",
214            [ViewName, DDocId]),
215        throw({error, iolist_to_binary(NotFoundMsg)})
216    end.
217
218
219get_view_def(#doc{body = DDoc, id = DDocId}, ViewName) ->
220    try
221        get_nested_json_value(DDoc, [<<"views">>, ViewName])
222    catch throw:{not_found, _} ->
223        NotFoundMsg = io_lib:format("View `~s` not defined in local "
224            "design document `~s`.", [ViewName, DDocId]),
225        throw({not_found, iolist_to_binary(NotFoundMsg)})
226    end.
227
228
229-spec view_less_fun(fwd | rev, map | red_map | reduce, nil | list()) ->
230                           fun((term(), term()) -> boolean()).
231view_less_fun(Dir, ViewType, nil) ->
232    LessFun = case ViewType of
233    reduce ->
234        fun couch_set_view:reduce_view_key_compare/2;
235    _ ->
236        fun couch_set_view:map_view_key_compare/2
237    end,
238    case Dir of
239    fwd ->
240        fun(RowA, RowB) -> LessFun(element(1, RowA), element(1, RowB)) end;
241    rev ->
242        fun(RowA, RowB) -> not LessFun(element(1, RowA), element(1, RowB)) end
243    end;
244view_less_fun(_Dir, ViewType, Keys) ->
245    KeysEncoded = [?JSON_ENCODE(K) || K <- Keys],
246    case ViewType of
247    reduce ->
248        fun(RowA, RowB) ->
249            % The row tuple can have 2 or 3 elements
250            {json, KeyA} = RowKeyA = element(1, RowA),
251            {json, KeyB} = RowKeyB = element(1, RowB),
252            case KeyA =:= KeyB of
253            true ->
254                couch_set_view:reduce_view_key_compare(RowKeyA, RowKeyB);
255            false ->
256                first_key_matches(KeysEncoded, KeyA, KeyB)
257            end
258        end;
259    _ ->
260        fun({{{json, KeyA}, _} = KeyDocIdA, _},
261            {{{json, KeyB}, _} = KeyDocIdB, _}) ->
262            case KeyA =:= KeyB of
263            true ->
264                couch_set_view:map_view_key_compare(KeyDocIdA, KeyDocIdB);
265            false ->
266                first_key_matches(KeysEncoded, KeyA, KeyB)
267            end
268        end
269    end.
270
271
272% The case of an empty list never happens, as result keys always match one
273% of the supplied keys.
274-spec first_key_matches([binary()], binary(), binary()) -> boolean().
275first_key_matches([KeyA | _], KeyA, _) ->
276    true;
277first_key_matches([KeyB | _], _, KeyB) ->
278    false;
279first_key_matches([_ | T], KeyA, KeyB) ->
280    first_key_matches(T, KeyA, KeyB).
281
282
283% Optimized path, row assembled by couch_http_view_streamer
284view_row_obj_map({_KeyDocId, {row_json, RowJson}}, _Debug) ->
285    RowJson;
286
287% Row from local node, query with ?debug=true
288view_row_obj_map({{Key, DocId}, {PartId, Value}}, true) when is_integer(PartId) ->
289    {json, RawValue} = Value,
290    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
291      ",\"key\":", (?JSON_ENCODE(Key))/binary,
292      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
293      ",\"node\":\"", (?LOCAL)/binary, "\"",
294      ",\"value\":", RawValue/binary, "}">>;
295
296% Row from remote node, using Erlang based stream JSON parser, query with ?debug=true
297view_row_obj_map({{Key, DocId}, {PartId, Node, Value}}, true) when is_integer(PartId) ->
298    {json, RawValue} = Value,
299    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
300      ",\"key\":", (?JSON_ENCODE(Key))/binary,
301      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
302      ",\"node\":", (?JSON_ENCODE(Node))/binary,
303      ",\"value\":", RawValue/binary, "}">>;
304
305% Row from local node, query with ?debug=false
306view_row_obj_map({{Key, DocId}, {PartId, Value}}, false) when is_integer(PartId) ->
307    {json, RawValue} = Value,
308    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
309      ",\"key\":", (?JSON_ENCODE(Key))/binary,
310      ",\"value\":", RawValue/binary, "}">>;
311
312% Row from local node, old couchdb views
313view_row_obj_map({{Key, DocId}, Value}, _DebugMode) ->
314    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
315      ",\"key\":", (?JSON_ENCODE(Key))/binary,
316      ",\"value\":", (?JSON_ENCODE(Value))/binary, "}">>;
317
318% Row from local node, old couchdb views (no partition id)
319view_row_obj_map({{Key, DocId}, Value, Doc}, _DebugMode) ->
320    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
321      ",\"key\":", (?JSON_ENCODE(Key))/binary,
322      ",\"value\":", (?JSON_ENCODE(Value))/binary,
323      ",\"doc\":", (?JSON_ENCODE(Doc))/binary, "}">>.
324
325% Optimized path, reduce row assembled by couch_http_view_streamer
326view_row_obj_reduce({_Key, {row_json, RowJson}, _ValueJson}, _DebugMode) ->
327    RowJson;
328% Reduce row from local node
329view_row_obj_reduce({Key, Value}, _DebugMode) ->
330    <<"{\"key\":", (?JSON_ENCODE(Key))/binary,
331      ",\"value\":", (?JSON_ENCODE(Value))/binary, "}">>.
332
333
334merge_map_views(#merge_params{limit = 0} = Params) ->
335    couch_index_merger:merge_indexes_no_limit(Params);
336
337merge_map_views(#merge_params{row_acc = []} = Params) ->
338    case couch_index_merger:merge_indexes_no_acc(
339        Params, fun merge_map_min_row/2) of
340    {params, Params2} ->
341        merge_map_views(Params2);
342    Else ->
343        Else
344    end;
345
346merge_map_views(Params) ->
347    Params2 = couch_index_merger:handle_skip(Params),
348    merge_map_views(Params2).
349
350
351% A new Params record is returned
352merge_map_min_row(Params, MinRow) ->
353    ok = couch_view_merger_queue:flush(Params#merge_params.queue),
354    couch_index_merger:handle_skip(Params#merge_params{row_acc=[MinRow]}).
355
356
357merge_reduce_views(#merge_params{limit = 0} = Params) ->
358    couch_index_merger:merge_indexes_no_limit(Params);
359
360merge_reduce_views(Params) ->
361    case couch_index_merger:merge_indexes_no_acc(
362        Params, fun merge_reduce_min_row/2) of
363    {params, Params2} ->
364        merge_reduce_views(Params2);
365    Else ->
366        Else
367    end.
368
369merge_reduce_min_row(Params, MinRow) ->
370    #merge_params{
371        queue = Queue, limit = Limit, skip = Skip, collector = Col
372    } = Params,
373    case group_keys_for_rereduce(Queue, [MinRow]) of
374    revision_mismatch -> revision_mismatch;
375    RowGroup ->
376        ok = couch_view_merger_queue:flush(Queue),
377        {Row, Col2} = case RowGroup of
378        [R] ->
379            {{row, R}, Col};
380        [FirstRow, _ | _] ->
381            try
382                RedVal = rereduce(RowGroup, Params),
383                {{row, {element(1, FirstRow), {json, RedVal}}}, Col}
384            catch
385            _Tag:Error ->
386                Stack = erlang:get_stacktrace(),
387                ?LOG_ERROR("Caught unexpected error while "
388                           "merging reduce view: ~p~n~p", [Error, Stack]),
389                on_rereduce_error(Col, Error)
390            end
391        end,
392        case Row of
393        {stop, _Resp} = Stop ->
394            Stop;
395        _ ->
396            case Skip > 0 of
397            true ->
398                Limit2 = Limit,
399                Col3 = Col2;
400            false ->
401                case Row of
402                {row, _} ->
403                    {ok, Col3} = Col2(Row);
404                _ ->
405                    Col3 = Col2
406                end,
407                Limit2 = couch_index_merger:dec_counter(Limit)
408            end,
409            Params#merge_params{
410                skip = couch_index_merger:dec_counter(Skip), limit = Limit2,
411                collector = Col3
412            }
413        end
414    end.
415
416
417on_rereduce_error(Col, Error) ->
418    case Col(reduce_error(Error)) of
419    {stop, _Resp} = Stop ->
420            {Stop, undefined};
421    Other ->
422            Other
423    end.
424
425reduce_error({error, Reason}) ->
426    {error, ?LOCAL, to_binary(Reason)};
427reduce_error(Error) ->
428    {error, ?LOCAL, to_binary(Error)}.
429
430
431group_keys_for_rereduce(Queue, [Row | _] = Acc) ->
432    K = element(1, Row),
433    case couch_view_merger_queue:peek(Queue) of
434    empty ->
435        Acc;
436    {ok, Row2} when element(1, Row2) == K ->
437        {ok, Row2} = couch_view_merger_queue:pop_next(Queue),
438        group_keys_for_rereduce(Queue, [Row2 | Acc]);
439    {ok, revision_mismatch} ->
440        revision_mismatch;
441    {ok, _} ->
442        Acc
443    end.
444
445
446rereduce(Reds0, #merge_params{extra = #view_merge{rereduce_fun = <<"_", _/binary>> = FunSrc}}) ->
447    Reds = lists:map(
448        fun({Key, _RowJson, {value_json, ValueJson}}) ->
449            {Key, ValueJson};
450        ({Key, {json, ValueJson}}) ->
451            {Key, ValueJson};
452        ({Key, Ejson}) ->
453            {Key, ?JSON_ENCODE(Ejson)}
454        end, Reds0),
455    {ok, [Value]} = couch_set_view_mapreduce:builtin_reduce(rereduce, [FunSrc], Reds),
456    Value;
457
458rereduce(Rows, #merge_params{extra = #view_merge{rereduce_fun = FunSrc}}) ->
459    Reds = lists:map(
460        fun({_Key, _RowJson, {value_json, ValueJson}}) ->
461            ValueJson;
462        ({_Key, {json, ValueJson}}) ->
463            ValueJson;
464        ({_Key, Val}) ->
465            ?JSON_ENCODE(Val)
466        end, Rows),
467    case get(reduce_context) of
468    undefined ->
469        {ok, Ctx} = mapreduce:start_reduce_context([FunSrc]),
470        erlang:put(reduce_context, Ctx);
471    Ctx ->
472        ok
473    end,
474    case mapreduce:rereduce(Ctx, 1, Reds) of
475    {ok, Value} ->
476        Value;
477    Error ->
478        throw(Error)
479    end.
480
481get_set_view(GetSetViewFn, SetName, DDoc, ViewName,
482        #set_view_group_req{stale = false} = ViewGroupReq) ->
483    ViewGroupReq1 = ViewGroupReq#set_view_group_req{update_stats = false},
484    case GetSetViewFn(SetName, DDoc, ViewName, ViewGroupReq1) of
485    {ok, _View, Group, []} = Reply ->
486        couch_set_view:inc_group_access_stat(Group),
487        Reply;
488    Other ->
489        Other
490    end;
491
492get_set_view(GetSetViewFn, SetName, DDoc, ViewName, ViewGroupReq) ->
493    GetSetViewFn(SetName, DDoc, ViewName, ViewGroupReq).
494
495prepare_set_view(ViewSpec, ViewGroupReq, DDoc, Queue, GetSetViewFn) ->
496    #set_view_spec{
497        name = SetName,
498        ddoc_id = DDocId,
499        view_name = ViewName
500    } = ViewSpec,
501    try
502        case get_set_view(GetSetViewFn, SetName, DDoc, ViewName, ViewGroupReq) of
503        {ok, View, Group, []} ->
504            {View, Group};
505        {ok, _, Group, MissingPartitions} ->
506            ?LOG_INFO("Set view `~s`, group `~s`, missing partitions: ~w",
507                      [SetName, DDocId, MissingPartitions]),
508            couch_set_view:release_group(Group),
509            couch_view_merger_queue:queue(Queue, set_view_outdated),
510            couch_view_merger_queue:done(Queue),
511            error;
512        {not_found, missing_named_view} ->
513            not_found
514        end
515    catch _:Error ->
516        QueueError = queue_get_view_group_error(Error, SetName, DDocId),
517        couch_view_merger_queue:queue(Queue, QueueError),
518        couch_view_merger_queue:done(Queue),
519        error
520    end.
521
522
523queue_get_view_group_error({error, {error, Reason}}, _SetName, _DDocId) ->
524    {error, ?LOCAL, Reason};
525queue_get_view_group_error({error, Reason}, _SetName, _DDocId) ->
526    {error, ?LOCAL, Reason};
527queue_get_view_group_error(view_undefined, SetName, DDocId) ->
528    {error, ?LOCAL, view_undefined_msg(SetName, DDocId)};
529queue_get_view_group_error(Error, _SetName, _DDocId) ->
530    {error, ?LOCAL, Error}.
531
532
533map_view_folder(_Db, #set_view_spec{} = ViewSpec, MergeParams, _UserCtx, DDoc, Queue) ->
534    map_set_view_folder(ViewSpec, MergeParams, DDoc, Queue).
535
536
537map_set_view_folder(ViewSpec, MergeParams, DDoc, Queue) ->
538    #set_view_spec{
539        name = SetName,
540        ddoc_id = DDocId,
541        partitions = WantedPartitions0
542    } = ViewSpec,
543    #index_merge{
544        http_params = ViewArgs
545    } = MergeParams,
546    #view_query_args{
547        stale = Stale,
548        debug = Debug,
549        type = IndexType
550    } = ViewArgs,
551    DDocDbName = ?master_dbname(SetName),
552
553    PrepareResult = case (ViewSpec#set_view_spec.view =/= nil) andalso
554        (ViewSpec#set_view_spec.group =/= nil) of
555    true ->
556        ViewGroupReq2 = nil,
557        {ViewSpec#set_view_spec.view, ViewSpec#set_view_spec.group};
558    false ->
559        WantedPartitions = case IndexType of
560        main ->
561            WantedPartitions0;
562        replica ->
563            []
564        end,
565        ViewGroupReq1 = #set_view_group_req{
566            stale = Stale,
567            update_stats = true,
568            wanted_partitions = WantedPartitions,
569            debug = Debug,
570            type = IndexType
571        },
572        case prepare_set_view(
573            ViewSpec, ViewGroupReq1, DDoc, Queue, fun couch_set_view:get_map_view/4) of
574        not_found ->
575            ViewGroupReq2 = ViewGroupReq1#set_view_group_req{
576                update_stats = false
577            },
578            case prepare_set_view(
579                ViewSpec, ViewGroupReq2, DDoc, Queue, fun couch_set_view:get_reduce_view/4) of
580            {RedView, Group0} ->
581                {couch_set_view:extract_map_view(RedView), Group0};
582            Else ->
583                Else
584            end;
585        Else ->
586            ViewGroupReq2 = ViewGroupReq1,
587            Else
588        end
589    end,
590
591    case PrepareResult of
592    error ->
593        %%  handled by prepare_set_view
594        ok;
595    {View, Group} ->
596        queue_debug_info(Debug, Group, ViewGroupReq2, Queue),
597        try
598            FoldFun = make_map_set_fold_fun(Queue),
599
600            case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
601                couch_index_merger:ddoc_unchanged(DDocDbName, DDoc) of
602            true ->
603                RowCount = couch_set_view:get_row_count(Group, View),
604                ok = couch_view_merger_queue:queue(Queue, {row_count, RowCount}),
605                {ok, _, _} = couch_set_view:fold(Group, View, FoldFun, [], ViewArgs);
606            false ->
607                ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
608            end
609        catch
610        ddoc_db_not_found ->
611            ok = couch_view_merger_queue:queue(
612                Queue, {error, ?LOCAL,
613                    couch_index_merger:ddoc_not_found_msg(DDocDbName, DDocId)});
614        throw:queue_shutdown ->
615            % The merger process shutdown our queue, limit was reached and this is
616            % expected, so don't long unnecessary error message and stack trace.
617            ok;
618        _Tag:Error ->
619            Stack = erlang:get_stacktrace(),
620            ?LOG_ERROR("Caught unexpected error "
621                       "while serving view query ~s/~s: ~p~n~p",
622                       [SetName, DDocId, Error, Stack]),
623            couch_view_merger_queue:queue(Queue, {error, ?LOCAL, to_binary(Error)})
624        after
625            couch_set_view:release_group(Group),
626            ok = couch_view_merger_queue:done(Queue)
627        end
628    end.
629
630
631http_view_fold(object_start, map, Queue) ->
632    fun(Ev) -> http_view_fold_rc_1(Ev, Queue) end;
633http_view_fold(object_start, red_map, Queue) ->
634    fun(Ev) -> http_view_fold_rc_1(Ev, Queue) end;
635http_view_fold(object_start, reduce, Queue) ->
636    fun(Ev) -> http_view_fold_rows_1(Ev, Queue) end.
637
638http_view_fold_rc_1({key, <<"total_rows">>}, Queue) ->
639    fun(Ev) -> http_view_fold_rc_2(Ev, Queue) end;
640http_view_fold_rc_1({key, <<"debug_info">>}, Queue) ->
641    fun(object_start) ->
642        fun(Ev) ->
643            http_view_fold_debug_info(Ev, Queue, [], fun http_view_fold_rc_1/2)
644        end
645    end;
646http_view_fold_rc_1(_Ev, Queue) ->
647    fun(Ev) -> http_view_fold_rc_1(Ev, Queue) end.
648
649http_view_fold_rc_2(RowCount, Queue) when is_number(RowCount) ->
650    ok = couch_view_merger_queue:queue(Queue, {row_count, RowCount}),
651    fun(Ev) -> http_view_fold_rows_1(Ev, Queue) end.
652
653http_view_fold_rows_1({key, <<"rows">>}, Queue) ->
654    fun(array_start) -> fun(Ev) -> http_view_fold_rows_2(Ev, Queue) end end;
655http_view_fold_rows_1({key, <<"debug_info">>}, Queue) ->
656    fun(object_start) ->
657        fun(Ev) ->
658            http_view_fold_debug_info(Ev, Queue, [], fun http_view_fold_rows_1/2)
659        end
660    end;
661http_view_fold_rows_1(_Ev, Queue) ->
662    fun(Ev) -> http_view_fold_rows_1(Ev, Queue) end.
663
664http_view_fold_rows_2(array_end, Queue) ->
665    fun(Ev) -> http_view_fold_extra(Ev, Queue) end;
666http_view_fold_rows_2(object_start, Queue) ->
667    fun(Ev) ->
668        json_stream_parse:collect_object(
669            Ev,
670            fun(Row) ->
671                http_view_fold_queue_row(Row, Queue),
672                fun(Ev2) -> http_view_fold_rows_2(Ev2, Queue) end
673            end)
674    end.
675
676http_view_fold_extra({key, <<"errors">>}, Queue) ->
677    fun(array_start) -> fun(Ev) -> http_view_fold_errors(Ev, Queue) end end;
678http_view_fold_extra(_Ev, _Queue) ->
679    fun couch_index_merger:void_event/1.
680
681http_view_fold_errors(array_end, _Queue) ->
682    fun couch_index_merger:void_event/1;
683http_view_fold_errors(object_start, Queue) ->
684    fun(Ev) ->
685        json_stream_parse:collect_object(
686            Ev,
687            fun(Error) ->
688                http_view_fold_queue_error(Error, Queue),
689                fun(Ev2) -> http_view_fold_errors(Ev2, Queue) end
690            end)
691    end.
692
693http_view_fold_debug_info({key, Key}, Queue, Acc, RetFun) ->
694    fun(object_start) ->
695        fun(Ev) ->
696            json_stream_parse:collect_object(
697                Ev,
698                fun(DebugInfo) ->
699                    Acc2 = [{Key, DebugInfo} | Acc],
700                    fun(Ev2) -> http_view_fold_debug_info(Ev2, Queue, Acc2, RetFun) end
701                end)
702        end
703    end;
704http_view_fold_debug_info(object_end, Queue, Acc, RetFun) ->
705    case Acc of
706    [{?LOCAL, Info}] ->
707        ok;
708    _ ->
709        Info = {lists:reverse(Acc)}
710    end,
711    ok = couch_view_merger_queue:queue(Queue, {debug_info, get(from_url), Info}),
712    fun(Ev2) -> RetFun(Ev2, Queue) end.
713
714
715http_view_fold_queue_error({Props}, Queue) ->
716    Reason = get_value(<<"reason">>, Props, null),
717    ok = couch_view_merger_queue:queue(Queue, {error, get(from_url), Reason}).
718
719
720http_view_fold_queue_row({Props}, Queue) ->
721    Key = {json, ?JSON_ENCODE(get_value(<<"key">>, Props, null))},
722    Id = get_value(<<"id">>, Props, nil),
723    Val = {json, ?JSON_ENCODE(get_value(<<"value">>, Props))},
724    Value = case get_value(<<"partition">>, Props, nil) of
725    nil ->
726        Val;
727    PartId ->
728        % we're in debug mode, add node info
729        {PartId, get(from_url), Val}
730    end,
731    Row = case get_value(<<"error">>, Props, nil) of
732    nil ->
733        case Id of
734        nil ->
735            % reduce row
736            {Key, Val};
737        _ ->
738            % map row
739            case get_value(<<"doc">>, Props, nil) of
740            nil ->
741                {{Key, Id}, Value};
742            Doc ->
743                {{Key, Id}, Value, Doc}
744            end
745        end;
746    Error ->
747        % error in a map row
748        {{Key, error}, Error}
749    end,
750    ok = couch_view_merger_queue:queue(Queue, Row).
751
752reduce_view_folder(_Db, #set_view_spec{} = ViewSpec, MergeParams,
753                   _UserCtx, DDoc, Queue) ->
754    reduce_set_view_folder(ViewSpec, MergeParams, DDoc, Queue).
755
756
757reduce_set_view_folder(ViewSpec, MergeParams, DDoc, Queue) ->
758    #set_view_spec{
759        name = SetName,
760        ddoc_id = DDocId,
761        partitions = WantedPartitions0
762    } = ViewSpec,
763    #index_merge{
764        http_params = ViewArgs
765    } = MergeParams,
766    #view_query_args{
767        stale = Stale,
768        debug = Debug,
769        type = IndexType
770    } = ViewArgs,
771
772    DDocDbName = ?master_dbname(SetName),
773    PrepareResult = case (ViewSpec#set_view_spec.view =/= nil) andalso
774        (ViewSpec#set_view_spec.group =/= nil) of
775    true ->
776        ViewGroupReq = nil,
777        {ViewSpec#set_view_spec.view, ViewSpec#set_view_spec.group};
778    false ->
779        WantedPartitions = case IndexType of
780        main ->
781            WantedPartitions0;
782        replica ->
783            []
784        end,
785        ViewGroupReq = #set_view_group_req{
786            stale = Stale,
787            update_stats = true,
788            wanted_partitions = WantedPartitions,
789            debug = Debug,
790            type = IndexType
791        },
792        prepare_set_view(ViewSpec, ViewGroupReq, DDoc, Queue, fun couch_set_view:get_reduce_view/4)
793    end,
794
795    case PrepareResult of
796    error ->
797        %%  handled by prepare_set_view
798        ok;
799    {View, Group} ->
800        queue_debug_info(Debug, Group, ViewGroupReq, Queue),
801        try
802            FoldFun = fun(GroupedKey, Red, Acc) ->
803                ok = couch_view_merger_queue:queue(Queue, {GroupedKey, Red}),
804                {ok, Acc}
805            end,
806
807            case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
808                couch_index_merger:ddoc_unchanged(DDocDbName, DDoc) of
809            true ->
810                {ok, _} = couch_set_view:fold_reduce(Group, View, FoldFun, [], ViewArgs);
811            false ->
812                ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
813            end
814        catch
815        ddoc_db_not_found ->
816            ok = couch_view_merger_queue:queue(
817                Queue, {error, ?LOCAL,
818                    couch_index_merger:ddoc_not_found_msg(DDocDbName, DDocId)});
819        throw:queue_shutdown ->
820            % The merger process shutdown our queue, limit was reached and this is
821            % expected, so don't long unnecessary error message and stack trace.
822            ok;
823        _Tag:Error ->
824            Stack = erlang:get_stacktrace(),
825            ?LOG_ERROR("Caught unexpected error "
826                       "while serving view query ~s/~s: ~p~n~p",
827                       [SetName, DDocId, Error, Stack]),
828            couch_view_merger_queue:queue(Queue, {error, ?LOCAL, to_binary(Error)})
829        after
830            couch_set_view:release_group(Group),
831            ok = couch_view_merger_queue:done(Queue)
832        end
833    end.
834
835
836make_map_set_fold_fun(Queue) ->
837    fun(Kv, _, Acc) ->
838        ok = couch_view_merger_queue:queue(Queue, Kv),
839        {ok, Acc}
840    end.
841
842
843view_undefined_msg(SetName, DDocId) ->
844    Msg = io_lib:format(
845        "Undefined set view `~s` for `~s` design document.", [SetName, DDocId]),
846    iolist_to_binary(Msg).
847
848view_qs(ViewArgs, MergeParams) ->
849    DefViewArgs = #view_query_args{},
850    #view_query_args{
851        start_key = StartKey, end_key = EndKey,
852        start_docid = StartDocId, end_docid = EndDocId,
853        direction = Dir,
854        inclusive_end = IncEnd,
855        group_level = GroupLevel,
856        view_type = ViewType,
857        conflicts = Conflicts,
858        stale = Stale,
859        limit = Limit,
860        debug = Debug,
861        filter = Filter,
862        type = IndexType,
863        skip = Skip
864    } = ViewArgs,
865    #index_merge{on_error = OnError} = MergeParams,
866
867    QsList = case StartKey =:= DefViewArgs#view_query_args.start_key of
868    true ->
869        [];
870    false ->
871        ["startkey=" ++ json_qs_val(StartKey)]
872    end ++
873    case EndKey =:= DefViewArgs#view_query_args.end_key of
874    true ->
875        [];
876    false ->
877        ["endkey=" ++ json_qs_val(EndKey)]
878    end ++
879    case {Dir, StartDocId =:= DefViewArgs#view_query_args.start_docid} of
880    {fwd, false} ->
881        ["startkey_docid=" ++ qs_val(StartDocId)];
882    _ ->
883        []
884    end ++
885    case {Dir, EndDocId =:= DefViewArgs#view_query_args.end_docid} of
886    {fwd, false} ->
887        ["endkey_docid=" ++ qs_val(EndDocId)];
888    _ ->
889        []
890    end ++
891    case Dir of
892    fwd ->
893        [];
894    rev ->
895        StartDocId1 = reverse_key_default(StartDocId),
896        EndDocId1 = reverse_key_default(EndDocId),
897        ["descending=true"] ++
898        case StartDocId1 =:= DefViewArgs#view_query_args.start_docid of
899        true ->
900            [];
901        false ->
902            ["startkey_docid=" ++ qs_val(StartDocId1)]
903        end ++
904        case EndDocId1 =:= DefViewArgs#view_query_args.end_docid of
905        true ->
906            [];
907        false ->
908            ["endkey_docid=" ++ qs_val(EndDocId1)]
909        end
910    end ++
911    case IncEnd =:= DefViewArgs#view_query_args.inclusive_end of
912    true ->
913        [];
914    false ->
915        ["inclusive_end=" ++ atom_to_list(IncEnd)]
916    end ++
917    case GroupLevel =:= DefViewArgs#view_query_args.group_level of
918    true ->
919        [];
920    false ->
921        case GroupLevel of
922        exact ->
923            ["group=true"];
924        _ when is_number(GroupLevel) ->
925            ["group_level=" ++ integer_to_list(GroupLevel)]
926        end
927    end ++
928    case ViewType of
929    red_map ->
930        ["reduce=false"];
931    _ ->
932        []
933    end ++
934    case Conflicts =:= DefViewArgs#view_query_args.conflicts of
935    true ->
936        [];
937    false ->
938        ["conflicts=" ++ atom_to_list(Conflicts)]
939    end ++
940    %% we now have different default
941    case Stale =:= ?DEFAULT_STALENESS of
942    true ->
943        [];
944    false ->
945        ["stale=" ++ atom_to_list(Stale)]
946    end ++
947    case OnError =:= ?ON_ERROR_DEFAULT of
948    true ->
949        [];
950    false ->
951        ["on_error=" ++ atom_to_list(OnError)]
952    end ++
953    case Limit =:= DefViewArgs#view_query_args.limit of
954    true ->
955        [];
956    false ->
957        ["limit=" ++ integer_to_list(Limit + Skip)]
958    end ++
959    case Debug =:= DefViewArgs#view_query_args.debug of
960    true ->
961        [];
962    false ->
963        ["debug=" ++ atom_to_list(Debug)]
964    end ++
965    case Filter =:= DefViewArgs#view_query_args.filter of
966    true ->
967        [];
968    false ->
969        ["_filter=" ++ atom_to_list(Filter)]
970    end ++
971    case IndexType =:= DefViewArgs#view_query_args.type of
972    true ->
973        [];
974    false ->
975        ["_type=" ++ atom_to_list(IndexType)]
976    end,
977
978    case QsList of
979    [] ->
980        [];
981    _ ->
982        "?" ++ string:join(QsList, "&")
983    end.
984
985json_qs_val(Value) ->
986    couch_httpd:quote(?b2l(iolist_to_binary(?JSON_ENCODE(Value)))).
987
988qs_val(Value) ->
989    couch_httpd:quote(couch_util:to_list(Value)).
990
991reverse_key_default(?MIN_STR) -> ?MAX_STR;
992reverse_key_default(?MAX_STR) -> ?MIN_STR;
993reverse_key_default(Key) -> Key.
994
995
996queue_debug_info(Debug, Group, GroupReq, Queue) ->
997    case debug_info(Debug, Group, GroupReq) of
998    nil ->
999        ok;
1000    DebugInfo ->
1001        ok = couch_view_merger_queue:queue(Queue, DebugInfo)
1002    end.
1003
1004debug_info(false, _Group, _GroupReq) ->
1005    nil;
1006debug_info(true, #set_view_group{} = Group, GroupReq) ->
1007    #set_view_debug_info{
1008        original_abitmask = OrigMainAbitmask,
1009        original_pbitmask = OrigMainPbitmask,
1010        stats = Stats,
1011        replica_partitions = ReplicaPartitions,
1012        wanted_seqs = WantedSeqs0
1013    } = Group#set_view_group.debug_info,
1014    OrigMainActive = couch_set_view_util:decode_bitmask(OrigMainAbitmask),
1015    ModMainActive = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
1016    OrigMainPassive = couch_set_view_util:decode_bitmask(OrigMainPbitmask),
1017    ModMainPassive = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
1018    MainCleanup = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
1019    % 0 padded so that a pretty print JSON can sanely sort the keys (partition IDs)
1020    IndexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- ?set_seqs(Group)],
1021    UnindexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} ||
1022        {P, S} <- ?set_unindexable_seqs(Group)],
1023    WantedSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- WantedSeqs0],
1024    MainInfo = [
1025        {<<"active_partitions">>, ordsets:from_list(ModMainActive)},
1026        {<<"original_active_partitions">>, ordsets:from_list(OrigMainActive)},
1027        {<<"passive_partitions">>, ordsets:from_list(ModMainPassive)},
1028        {<<"original_passive_partitions">>, ordsets:from_list(OrigMainPassive)},
1029        {<<"cleanup_partitions">>, ordsets:from_list(MainCleanup)},
1030        {<<"replica_partitions">>, ordsets:from_list(ReplicaPartitions)},
1031        {<<"replicas_on_transfer">>, ?set_replicas_on_transfer(Group)},
1032        {<<"indexable_seqs">>, {IndexableSeqs}},
1033        {<<"unindexeable_seqs">>, {UnindexableSeqs}},
1034        {<<"wanted_seqs">>, {WantedSeqs}},
1035        case GroupReq of
1036        nil ->
1037            {<<"wanted_partitions">>, null};
1038        #set_view_group_req{wanted_partitions = WantedPartitions} ->
1039            {<<"wanted_partitions">>, WantedPartitions}
1040        end,
1041        pending_transition_debug_info(Group),
1042        {<<"stats">>, set_view_group_stats_ejson(Stats)}
1043    ],
1044    RepInfo = replica_group_debug_info(Group),
1045    Info = case RepInfo of
1046    [] ->
1047        { [{<<"main_group">>, {MainInfo}}] };
1048    _ ->
1049        { [{<<"main_group">>, {MainInfo}}, {<<"replica_group">>, {RepInfo}}] }
1050    end,
1051    {debug_info, ?LOCAL, Info}.
1052
1053replica_group_debug_info(#set_view_group{replica_group = nil}) ->
1054    [];
1055replica_group_debug_info(#set_view_group{replica_group = RepGroup}) ->
1056    #set_view_group{
1057        debug_info = #set_view_debug_info{
1058            original_abitmask = OrigRepAbitmask,
1059            original_pbitmask = OrigRepPbitmask,
1060            stats = Stats,
1061            wanted_seqs = WantedSeqs0
1062        }
1063    } = RepGroup,
1064    OrigRepActive = couch_set_view_util:decode_bitmask(OrigRepAbitmask),
1065    ModRepActive = couch_set_view_util:decode_bitmask(?set_abitmask(RepGroup)),
1066    OrigRepPassive = couch_set_view_util:decode_bitmask(OrigRepPbitmask),
1067    ModRepPassive = couch_set_view_util:decode_bitmask(?set_pbitmask(RepGroup)),
1068    RepCleanup = couch_set_view_util:decode_bitmask(?set_cbitmask(RepGroup)),
1069    % 0 padded so that a pretty print JSON can sanely sort the keys (partition IDs)
1070    IndexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- ?set_seqs(RepGroup)],
1071    UnindexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} ||
1072        {P, S} <- ?set_unindexable_seqs(RepGroup)],
1073    WantedSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- WantedSeqs0],
1074    [
1075        {<<"replica_active_partitions">>, ordsets:from_list(ModRepActive)},
1076        {<<"replica_original_active_partitions">>, ordsets:from_list(OrigRepActive)},
1077        {<<"replica_passive_partitions">>, ordsets:from_list(ModRepPassive)},
1078        {<<"replica_original_passive_partitions">>, ordsets:from_list(OrigRepPassive)},
1079        {<<"replica_cleanup_partitions">>, ordsets:from_list(RepCleanup)},
1080        {<<"replica_indexable_seqs">>, {IndexableSeqs}},
1081        {<<"replica_unindexable_seqs">>, {UnindexableSeqs}},
1082        {<<"replica_wanted_seqs">>, {WantedSeqs}},
1083        pending_transition_debug_info(RepGroup),
1084        {<<"replica_stats">>, set_view_group_stats_ejson(Stats)}
1085    ].
1086
1087
1088pending_transition_debug_info(#set_view_group{index_header = Header}) ->
1089    Pt = Header#set_view_index_header.pending_transition,
1090    case Pt of
1091    nil ->
1092        {<<"pending_transition">>, null};
1093    #set_view_transition{} ->
1094        {<<"pending_transition">>,
1095            {[
1096                {<<"active">>, Pt#set_view_transition.active},
1097                {<<"passive">>, Pt#set_view_transition.passive},
1098                {<<"unindexable">>, Pt#set_view_transition.unindexable}
1099            ]}
1100        }
1101    end.
1102
1103
1104set_view_group_stats_ejson(Stats) ->
1105    StatNames = record_info(fields, set_view_group_stats),
1106    StatPoses = lists:seq(2, record_info(size, set_view_group_stats)),
1107    {lists:foldl(
1108        fun({ets_key, _}, Acc) ->
1109            Acc;
1110        ({StatName, StatPos}, Acc) ->
1111            [{StatName, element(StatPos, Stats)} | Acc]
1112        end,
1113        [],
1114        lists:zip(StatNames, StatPoses))}.
1115
1116
1117% Query with a single view to merge, trigger a simpler code path
1118% (no queue, no child processes, etc).
1119simple_set_view_query(Params, DDoc, Req) ->
1120    #index_merge{
1121        callback = Callback,
1122        user_acc = UserAcc,
1123        indexes = [SetViewSpec],
1124        extra = #view_merge{keys = Keys}
1125    } = Params,
1126    #set_view_spec{
1127        name = SetName,
1128        partitions = Partitions0,
1129        ddoc_id = DDocId,
1130        view_name = ViewName,
1131        category = Category
1132    } = SetViewSpec,
1133
1134    Stale = list_to_existing_atom(string:to_lower(
1135        couch_httpd:qs_value(Req, "stale", "update_after"))),
1136    Debug = couch_set_view_http:parse_bool_param(
1137        couch_httpd:qs_value(Req, "debug", "false")),
1138    IndexType = list_to_existing_atom(
1139        couch_httpd:qs_value(Req, "_type", "main")),
1140    Partitions = case IndexType of
1141    main ->
1142        Partitions0;
1143    replica ->
1144        []
1145    end,
1146    GroupReq = #set_view_group_req{
1147        stale = Stale,
1148        update_stats = true,
1149        wanted_partitions = Partitions,
1150        debug = Debug,
1151        type = IndexType,
1152        category = Category
1153    },
1154
1155    case get_set_view(
1156        fun couch_set_view:get_map_view/4, SetName, DDoc, ViewName, GroupReq) of
1157    {ok, View, Group, MissingPartitions} ->
1158        ViewType = map;
1159    {not_found, _} ->
1160        GroupReq2 = GroupReq#set_view_group_req{
1161            update_stats = false
1162        },
1163        case get_set_view(
1164            fun couch_set_view:get_reduce_view/4, SetName, DDoc, ViewName, GroupReq2) of
1165        {ok, ReduceView, Group, MissingPartitions} ->
1166            Reduce = list_to_existing_atom(
1167                string:to_lower(couch_httpd:qs_value(Req, "reduce", "true"))),
1168            case Reduce of
1169            false ->
1170                ViewType = red_map,
1171                View = couch_set_view:extract_map_view(ReduceView);
1172            true ->
1173                ViewType = reduce,
1174                View = ReduceView
1175            end;
1176        Error ->
1177            MissingPartitions = Group = View = ViewType = nil,
1178            ErrorMsg = io_lib:format("Error opening view `~s`, from set `~s`, "
1179                "design document `~s`: ~p", [ViewName, SetName, DDocId, Error]),
1180            throw({not_found, iolist_to_binary(ErrorMsg)})
1181        end
1182    end,
1183
1184    case MissingPartitions of
1185    [] ->
1186        ok;
1187    _ ->
1188        couch_set_view:release_group(Group),
1189        ?LOG_INFO("Set view `~s`, group `~s`, missing partitions: ~w",
1190                  [SetName, DDocId, MissingPartitions]),
1191        throw({error, set_view_outdated})
1192    end,
1193
1194    QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, ViewType),
1195    QueryArgs2 = QueryArgs#view_query_args{
1196        view_name = ViewName,
1197        stale = Stale
1198     },
1199
1200    case debug_info(Debug, Group, GroupReq) of
1201    nil ->
1202        Params2 = Params#index_merge{user_ctx = Req#httpd.user_ctx};
1203    DebugInfo ->
1204        {ok, UserAcc2} = Callback(DebugInfo, UserAcc),
1205        Params2 = Params#index_merge{
1206            user_ctx = Req#httpd.user_ctx,
1207            user_acc = UserAcc2
1208        }
1209    end,
1210
1211    try
1212        case ViewType of
1213        reduce ->
1214            simple_set_view_reduce_query(Params2, Group, View, QueryArgs2);
1215        _ ->
1216            simple_set_view_map_query(Params2, Group, View, QueryArgs2)
1217        end
1218    after
1219        couch_set_view:release_group(Group)
1220    end.
1221
1222
1223simple_set_view_map_query(Params, Group, View, ViewArgs) ->
1224    #index_merge{
1225        callback = Callback,
1226        user_acc = UserAcc
1227    } = Params,
1228    #view_query_args{
1229        limit = Limit,
1230        skip = Skip,
1231        debug = DebugMode
1232    } = ViewArgs,
1233
1234    FoldFun = fun(_Kv, _, {0, _, _} = Acc) ->
1235            {stop, Acc};
1236        (_Kv, _, {AccLim, AccSkip, UAcc}) when AccSkip > 0 ->
1237            {ok, {AccLim, AccSkip - 1, UAcc}};
1238        (Kv, _, {AccLim, 0, UAcc}) ->
1239            Row = view_row_obj_map(Kv, DebugMode),
1240            {ok, UAcc2} = Callback({row, Row}, UAcc),
1241            {ok, {AccLim - 1, 0, UAcc2}}
1242    end,
1243
1244    RowCount = couch_set_view:get_row_count(Group, View),
1245    {ok, UserAcc2} = Callback({start, RowCount}, UserAcc),
1246
1247    {ok, _, {_, _, UserAcc3}} = couch_set_view:fold(
1248        Group, View, FoldFun, {Limit, Skip, UserAcc2}, ViewArgs),
1249    Callback(stop, UserAcc3).
1250
1251
1252simple_set_view_reduce_query(Params, Group, View, ViewArgs) ->
1253    #index_merge{
1254        callback = Callback,
1255        user_acc = UserAcc
1256    } = Params,
1257    #view_query_args{
1258        limit = Limit,
1259        skip = Skip,
1260        debug = DebugMode
1261    } = ViewArgs,
1262
1263    FoldFun = fun(_GroupedKey, _Red, {0, _, _} = Acc) ->
1264            {stop, Acc};
1265        (_GroupedKey, _Red, {AccLim, AccSkip, UAcc}) when AccSkip > 0 ->
1266            {ok, {AccLim, AccSkip - 1, UAcc}};
1267        (GroupedKey, Red, {AccLim, 0, UAcc}) ->
1268            Row = view_row_obj_reduce({GroupedKey, Red}, DebugMode),
1269            {ok, UAcc2} = Callback({row, Row}, UAcc),
1270            {ok, {AccLim - 1, 0, UAcc2}}
1271    end,
1272
1273    {ok, UserAcc2} = Callback(start, UserAcc),
1274    {ok, {_, _, UserAcc3}} = couch_set_view:fold_reduce(
1275        Group, View, FoldFun, {Limit, Skip, UserAcc2}, ViewArgs),
1276    Callback(stop, UserAcc3).
1277