1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_view_merger).
16
17% export callbacks
18-export([parse_http_params/4, make_funs/3, get_skip_and_limit/1]).
19-export([make_event_fun/2, view_qs/2, process_extra_params/2]).
20-export([map_view_merge_callback/2, reduce_view_merge_callback/2]).
21-export([simple_set_view_query/3]).
22
23% exports for spatial_merger
24-export([queue_debug_info/4, debug_info/3, get_set_view/5,
25         prepare_set_view/5]).
26
27-export([update_timing_stat/3]).
28
29-include("couch_db.hrl").
30-include_lib("couch_index_merger/include/couch_index_merger.hrl").
31-include_lib("couch_index_merger/include/couch_view_merger.hrl").
32-include_lib("couch_set_view/include/couch_set_view.hrl").
33
34-define(LOCAL, <<"local">>).
35
36-import(couch_util, [
37    get_value/2,
38    get_value/3,
39    to_binary/1,
40    get_nested_json_value/2
41]).
42
43-define(DEFAULT_STALENESS, update_after).
44
45
46% callback!
47parse_http_params(Req, DDoc, ViewName, #view_merge{keys = Keys}) ->
48    % view type =~ query type
49    ViewType0 = view_type(DDoc, ViewName),
50    ViewType = case {ViewType0, couch_httpd:qs_value(Req, "reduce", "true")} of
51    {reduce, "false"} ->
52       red_map;
53    _ ->
54       ViewType0
55    end,
56
57    StaleDefined = couch_httpd:qs_value(Req, "stale") =/= undefined,
58    QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, ViewType),
59    QueryArgs1 = QueryArgs#view_query_args{view_name = ViewName},
60
61    case StaleDefined of
62    true ->
63        QueryArgs1;
64    false ->
65        QueryArgs1#view_query_args{stale = ?DEFAULT_STALENESS}
66    end.
67
68% callback!
69make_funs(DDoc, ViewName, IndexMergeParams) ->
70    #index_merge{
71       extra = Extra,
72       http_params = ViewArgs,
73       make_row_fun = MakeRowFun0
74    } = IndexMergeParams,
75    #view_merge{
76       rereduce_fun = InRedFun,
77       keys = Keys
78    } = Extra,
79    #view_query_args{
80        debug = DebugMode,
81        view_type = ViewType0,
82        direction = Dir
83    } = ViewArgs,
84    ViewType = case ViewType0 of
85    nil ->
86        view_type(DDoc, ViewName);
87    _ when ViewType0 == map; ViewType0 == red_map; ViewType0 == reduce ->
88        ViewType0
89    end,
90    RedFun = case {ViewType, InRedFun} of
91    {reduce, nil} ->
92        reduce_function(DDoc, ViewName);
93    {reduce, _} when is_binary(InRedFun) ->
94        InRedFun;
95    _ ->
96        nil
97    end,
98    LessFun = view_less_fun(Dir, ViewType, Keys),
99    {FoldFun, MergeFun} = case ViewType of
100    reduce ->
101        {fun reduce_view_folder/6, fun merge_reduce_views/1};
102    _ when ViewType =:= map; ViewType =:= red_map ->
103        {fun map_view_folder/6, fun merge_map_views/1}
104    end,
105    CollectorFun = case ViewType of
106    reduce ->
107        fun (_NumFolders, Callback2, UserAcc2) ->
108            fun (Item) ->
109                {ok, UserAcc3} = Callback2(start, UserAcc2),
110                MakeRowFun = case is_function(MakeRowFun0) of
111                true ->
112                    MakeRowFun0;
113                false ->
114                    fun(RowDetails) -> view_row_obj_reduce(RowDetails, DebugMode) end
115                end,
116                couch_index_merger:collect_rows(MakeRowFun, Callback2, UserAcc3, Item)
117            end
118        end;
119     % red_map|map
120     _ ->
121        fun (NumFolders, Callback2, UserAcc2) ->
122            fun (Item) ->
123                MakeRowFun = case is_function(MakeRowFun0) of
124                true ->
125                    MakeRowFun0;
126                false ->
127                    fun(RowDetails) -> view_row_obj_map(RowDetails, DebugMode) end
128                end,
129                couch_index_merger:collect_row_count(
130                    NumFolders, 0, MakeRowFun, Callback2, UserAcc2, Item)
131            end
132        end
133    end,
134    Extra2 = #view_merge{
135        rereduce_fun = RedFun
136    },
137    {LessFun, FoldFun, MergeFun, CollectorFun, Extra2}.
138
139% callback!
140get_skip_and_limit(#view_query_args{skip=Skip, limit=Limit}) ->
141    {Skip, Limit}.
142
143% callback!
144make_event_fun(ViewArgs, Queue) ->
145    fun(Ev) ->
146        http_view_fold(Ev, ViewArgs#view_query_args.view_type, Queue)
147    end.
148
149% callback!
150process_extra_params(#view_merge{keys = nil}, EJson) ->
151    EJson;
152process_extra_params(#view_merge{keys = Keys}, EJson) ->
153    [{<<"keys">>, Keys} | EJson].
154
155% callback!
156map_view_merge_callback(start, Acc) ->
157    {ok, Acc};
158map_view_merge_callback({start, _}, Acc) ->
159    {ok, Acc};
160map_view_merge_callback(stop, Acc) ->
161    {ok, Acc};
162map_view_merge_callback({row, Row}, Macc) ->
163    #merge_acc{
164        fold_fun = Fun,
165        acc = Acc
166    } = Macc,
167    case Fun(Row, nil, Acc) of
168    {ok, Acc2} ->
169        {ok, Macc#merge_acc{acc = Acc2}};
170    {stop, Acc2} ->
171        {stop, Macc#merge_acc{acc = Acc2}}
172    end;
173map_view_merge_callback({debug_info, _From, _Info}, Acc) ->
174    {ok, Acc}.
175
176
177reduce_view_merge_callback(start, Acc) ->
178    {ok, Acc};
179reduce_view_merge_callback({start, _}, Acc) ->
180    {ok, Acc};
181reduce_view_merge_callback(stop, Acc) ->
182    {ok, Acc};
183reduce_view_merge_callback({row, {Key, Red}}, Macc) ->
184    #merge_acc{
185        fold_fun = Fun,
186        acc = Acc
187    } = Macc,
188    case Fun(Key, Red, Acc) of
189    {ok, Acc2} ->
190        {ok, Macc#merge_acc{acc = Acc2}};
191    {stop, Acc2} ->
192        {stop, Macc#merge_acc{acc = Acc2}}
193    end;
194reduce_view_merge_callback({debug_info, _From, _Info}, Acc) ->
195    {ok, Acc}.
196
197
198view_type(DDoc, ViewName) ->
199    {ViewDef} = get_view_def(DDoc, ViewName),
200    case get_value(<<"reduce">>, ViewDef) of
201    undefined ->
202        map;
203    RedFun when is_binary(RedFun) ->
204        reduce
205    end.
206
207
208reduce_function(#doc{id = DDocId} = DDoc, ViewName) ->
209    {ViewDef} = get_view_def(DDoc, ViewName),
210    case get_value(<<"reduce">>, ViewDef) of
211    FunString when is_binary(FunString) ->
212        FunString;
213    _ ->
214        NotFoundMsg = io_lib:format("Reduce field for view `~s`, local "
215            "design document `~s`, is missing or is not a string.",
216            [ViewName, DDocId]),
217        throw({error, iolist_to_binary(NotFoundMsg)})
218    end.
219
220
221get_view_def(#doc{body = DDoc, id = DDocId}, ViewName) ->
222    try
223        get_nested_json_value(DDoc, [<<"views">>, ViewName])
224    catch throw:{not_found, _} ->
225        NotFoundMsg = io_lib:format("View `~s` not defined in local "
226            "design document `~s`.", [ViewName, DDocId]),
227        throw({not_found, iolist_to_binary(NotFoundMsg)})
228    end.
229
230
231-spec view_less_fun(fwd | rev, map | red_map | reduce, nil | list()) ->
232                           fun((term(), term()) -> boolean()).
233view_less_fun(Dir, ViewType, nil) ->
234    LessFun = case ViewType of
235    reduce ->
236        fun couch_set_view:reduce_view_key_compare/2;
237    _ ->
238        fun couch_set_view:map_view_key_compare/2
239    end,
240    case Dir of
241    fwd ->
242        fun(RowA, RowB) -> LessFun(element(1, RowA), element(1, RowB)) end;
243    rev ->
244        fun(RowA, RowB) -> not LessFun(element(1, RowA), element(1, RowB)) end
245    end;
246view_less_fun(_Dir, ViewType, Keys) ->
247    KeysEncoded = [?JSON_ENCODE(K) || K <- Keys],
248    case ViewType of
249    reduce ->
250        fun(RowA, RowB) ->
251            % The row tuple can have 2 or 3 elements
252            {json, KeyA} = RowKeyA = element(1, RowA),
253            {json, KeyB} = RowKeyB = element(1, RowB),
254            case KeyA =:= KeyB of
255            true ->
256                couch_set_view:reduce_view_key_compare(RowKeyA, RowKeyB);
257            false ->
258                first_key_matches(KeysEncoded, KeyA, KeyB)
259            end
260        end;
261    _ ->
262        fun({{{json, KeyA}, _} = KeyDocIdA, _},
263            {{{json, KeyB}, _} = KeyDocIdB, _}) ->
264            case KeyA =:= KeyB of
265            true ->
266                couch_set_view:map_view_key_compare(KeyDocIdA, KeyDocIdB);
267            false ->
268                first_key_matches(KeysEncoded, KeyA, KeyB)
269            end
270        end
271    end.
272
273
274% The case of an empty list never happens, as result keys always match one
275% of the supplied keys.
276-spec first_key_matches([binary()], binary(), binary()) -> boolean().
277first_key_matches([KeyA | _], KeyA, _) ->
278    true;
279first_key_matches([KeyB | _], _, KeyB) ->
280    false;
281first_key_matches([_ | T], KeyA, KeyB) ->
282    first_key_matches(T, KeyA, KeyB).
283
284
285% Optimized path, row assembled by couch_http_view_streamer
286view_row_obj_map({_KeyDocId, {row_json, RowJson}}, _Debug) ->
287    RowJson;
288
289% Row from local node, query with ?debug=true
290view_row_obj_map({{Key, DocId}, {PartId, Value}}, true) when is_integer(PartId) ->
291    {json, RawValue} = Value,
292    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
293      ",\"key\":", (?JSON_ENCODE(Key))/binary,
294      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
295      ",\"node\":\"", (?LOCAL)/binary, "\"",
296      ",\"value\":", RawValue/binary, "}">>;
297
298% Row from remote node, using Erlang based stream JSON parser, query with ?debug=true
299view_row_obj_map({{Key, DocId}, {PartId, Node, Value}}, true) when is_integer(PartId) ->
300    {json, RawValue} = Value,
301    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
302      ",\"key\":", (?JSON_ENCODE(Key))/binary,
303      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
304      ",\"node\":", (?JSON_ENCODE(Node))/binary,
305      ",\"value\":", RawValue/binary, "}">>;
306
307% Row from local node, query with ?debug=false
308view_row_obj_map({{Key, DocId}, {PartId, Value}}, false) when is_integer(PartId) ->
309    {json, RawValue} = Value,
310    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
311      ",\"key\":", (?JSON_ENCODE(Key))/binary,
312      ",\"value\":", RawValue/binary, "}">>;
313
314% Row from local node, old couchdb views
315view_row_obj_map({{Key, DocId}, Value}, _DebugMode) ->
316    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
317      ",\"key\":", (?JSON_ENCODE(Key))/binary,
318      ",\"value\":", (?JSON_ENCODE(Value))/binary, "}">>;
319
320% Row from local node, old couchdb views (no partition id)
321view_row_obj_map({{Key, DocId}, Value, Doc}, _DebugMode) ->
322    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
323      ",\"key\":", (?JSON_ENCODE(Key))/binary,
324      ",\"value\":", (?JSON_ENCODE(Value))/binary,
325      ",\"doc\":", (?JSON_ENCODE(Doc))/binary, "}">>.
326
327% Optimized path, reduce row assembled by couch_http_view_streamer
328view_row_obj_reduce({_Key, {row_json, RowJson}, _ValueJson}, _DebugMode) ->
329    RowJson;
330% Reduce row from local node
331view_row_obj_reduce({Key, Value}, _DebugMode) ->
332    <<"{\"key\":", (?JSON_ENCODE(Key))/binary,
333      ",\"value\":", (?JSON_ENCODE(Value))/binary, "}">>.
334
335
336merge_map_views(#merge_params{limit = 0} = Params) ->
337    couch_index_merger:merge_indexes_no_limit(Params);
338
339merge_map_views(#merge_params{row_acc = []} = Params) ->
340    case couch_index_merger:merge_indexes_no_acc(
341        Params, fun merge_map_min_row/2) of
342    {params, Params2} ->
343        merge_map_views(Params2);
344    Else ->
345        Else
346    end;
347
348merge_map_views(Params) ->
349    Params2 = couch_index_merger:handle_skip(Params),
350    merge_map_views(Params2).
351
352
353% A new Params record is returned
354merge_map_min_row(Params, MinRow) ->
355    ok = couch_view_merger_queue:flush(Params#merge_params.queue),
356    couch_index_merger:handle_skip(Params#merge_params{row_acc=[MinRow]}).
357
358
359merge_reduce_views(#merge_params{limit = 0} = Params) ->
360    couch_index_merger:merge_indexes_no_limit(Params);
361
362merge_reduce_views(Params) ->
363    case couch_index_merger:merge_indexes_no_acc(
364        Params, fun merge_reduce_min_row/2) of
365    {params, Params2} ->
366        merge_reduce_views(Params2);
367    Else ->
368        Else
369    end.
370
371merge_reduce_min_row(Params, MinRow) ->
372    #merge_params{
373        queue = Queue, limit = Limit, skip = Skip, collector = Col
374    } = Params,
375    case group_keys_for_rereduce(Queue, [MinRow]) of
376    revision_mismatch -> revision_mismatch;
377    RowGroup ->
378        ok = couch_view_merger_queue:flush(Queue),
379        {Row, Col2} = case RowGroup of
380        [R] ->
381            {{row, R}, Col};
382        [FirstRow, _ | _] ->
383            try
384                RedVal = rereduce(RowGroup, Params),
385                {{row, {element(1, FirstRow), {json, RedVal}}}, Col}
386            catch
387            _Tag:Error ->
388                Stack = erlang:get_stacktrace(),
389                ?LOG_ERROR("Caught unexpected error while "
390                           "merging reduce view: ~p~n~s", [Error, ?LOG_USERDATA(Stack)]),
391                on_rereduce_error(Col, Error)
392            end
393        end,
394        case Row of
395        {stop, _Resp} = Stop ->
396            Stop;
397        _ ->
398            case Skip > 0 of
399            true ->
400                Limit2 = Limit,
401                Col3 = Col2;
402            false ->
403                case Row of
404                {row, _} ->
405                    {ok, Col3} = Col2(Row);
406                _ ->
407                    Col3 = Col2
408                end,
409                Limit2 = couch_index_merger:dec_counter(Limit)
410            end,
411            Params#merge_params{
412                skip = couch_index_merger:dec_counter(Skip), limit = Limit2,
413                collector = Col3
414            }
415        end
416    end.
417
418
419on_rereduce_error(Col, Error) ->
420    case Col(reduce_error(Error)) of
421    {stop, _Resp} = Stop ->
422            {Stop, undefined};
423    Other ->
424            Other
425    end.
426
427reduce_error({error, Reason}) ->
428    {error, ?LOCAL, to_binary(Reason)};
429reduce_error(Error) ->
430    {error, ?LOCAL, to_binary(Error)}.
431
432
433group_keys_for_rereduce(Queue, [Row | _] = Acc) ->
434    K = element(1, Row),
435    case couch_view_merger_queue:peek(Queue) of
436    empty ->
437        Acc;
438    {ok, Row2} when element(1, Row2) == K ->
439        {ok, Row2} = couch_view_merger_queue:pop_next(Queue),
440        group_keys_for_rereduce(Queue, [Row2 | Acc]);
441    {ok, revision_mismatch} ->
442        revision_mismatch;
443    {ok, _} ->
444        Acc
445    end.
446
447
448rereduce(Reds0, #merge_params{extra = #view_merge{rereduce_fun = <<"_", _/binary>> = FunSrc}}) ->
449    Reds = lists:map(
450        fun({Key, _RowJson, {value_json, ValueJson}}) ->
451            {Key, ValueJson};
452        ({Key, {json, ValueJson}}) ->
453            {Key, ValueJson};
454        ({Key, Ejson}) ->
455            {Key, ?JSON_ENCODE(Ejson)}
456        end, Reds0),
457    {ok, [Value]} = couch_set_view_mapreduce:builtin_reduce(rereduce, [FunSrc], Reds),
458    Value;
459
460rereduce(Rows, #merge_params{extra = #view_merge{rereduce_fun = FunSrc}}) ->
461    Reds = lists:map(
462        fun({_Key, _RowJson, {value_json, ValueJson}}) ->
463            ValueJson;
464        ({_Key, {json, ValueJson}}) ->
465            ValueJson;
466        ({_Key, Val}) ->
467            ?JSON_ENCODE(Val)
468        end, Rows),
469    case get(reduce_context) of
470    undefined ->
471        {ok, Ctx} = mapreduce:start_reduce_context([FunSrc]),
472        erlang:put(reduce_context, Ctx);
473    Ctx ->
474        ok
475    end,
476    case mapreduce:rereduce(Ctx, 1, Reds) of
477    {ok, Value} ->
478        Value;
479    Error ->
480        throw(Error)
481    end.
482
483get_set_view(GetSetViewFn, SetName, DDoc, ViewName,
484        #set_view_group_req{stale = false} = ViewGroupReq) ->
485    ViewGroupReq1 = ViewGroupReq#set_view_group_req{update_stats = false},
486    case GetSetViewFn(SetName, DDoc, ViewName, ViewGroupReq1) of
487    {ok, _View, Group, []} = Reply ->
488        couch_set_view:inc_group_access_stat(Group),
489        Reply;
490    Other ->
491        Other
492    end;
493
494get_set_view(GetSetViewFn, SetName, DDoc, ViewName, ViewGroupReq) ->
495    GetSetViewFn(SetName, DDoc, ViewName, ViewGroupReq).
496
497prepare_set_view(ViewSpec, ViewGroupReq, DDoc, Queue, GetSetViewFn) ->
498    #set_view_spec{
499        name = SetName,
500        ddoc_id = DDocId,
501        view_name = ViewName
502    } = ViewSpec,
503    try
504        case get_set_view(GetSetViewFn, SetName, DDoc, ViewName, ViewGroupReq) of
505        {ok, View, Group, []} ->
506            {View, Group};
507        {ok, _, Group, MissingPartitions} ->
508            ?LOG_INFO("Set view `~s`, group `~s`, missing partitions: ~w",
509                      [?LOG_USERDATA(SetName), ?LOG_USERDATA(DDocId), MissingPartitions]),
510            couch_set_view:release_group(Group),
511            couch_view_merger_queue:queue(Queue, set_view_outdated),
512            couch_view_merger_queue:done(Queue),
513            error;
514        {not_found, missing_named_view} ->
515            not_found
516        end
517    catch _:Error ->
518        QueueError = queue_get_view_group_error(Error, SetName, DDocId),
519        couch_view_merger_queue:queue(Queue, QueueError),
520        couch_view_merger_queue:done(Queue),
521        error
522    end.
523
524
525queue_get_view_group_error({error, {error, Reason}}, _SetName, _DDocId) ->
526    {error, ?LOCAL, Reason};
527queue_get_view_group_error({error, Reason}, _SetName, _DDocId) ->
528    {error, ?LOCAL, Reason};
529queue_get_view_group_error(view_undefined, SetName, DDocId) ->
530    {error, ?LOCAL, view_undefined_msg(SetName, DDocId)};
531queue_get_view_group_error(Error, _SetName, _DDocId) ->
532    {error, ?LOCAL, Error}.
533
534
535map_view_folder(_Db, #set_view_spec{} = ViewSpec, MergeParams, _UserCtx, DDoc, Queue) ->
536    map_set_view_folder(ViewSpec, MergeParams, DDoc, Queue).
537
538
539map_set_view_folder(ViewSpec, MergeParams, DDoc, Queue) ->
540    #set_view_spec{
541        name = SetName,
542        ddoc_id = DDocId,
543        partitions = WantedPartitions0
544    } = ViewSpec,
545    #index_merge{
546        http_params = ViewArgs
547    } = MergeParams,
548    #view_query_args{
549        stale = Stale,
550        debug = Debug,
551        type = IndexType
552    } = ViewArgs,
553    DDocDbName = ?master_dbname(SetName),
554
555    PrepareResult = case (ViewSpec#set_view_spec.view =/= nil) andalso
556        (ViewSpec#set_view_spec.group =/= nil) of
557    true ->
558        ViewGroupReq2 = nil,
559        {ViewSpec#set_view_spec.view, ViewSpec#set_view_spec.group};
560    false ->
561        WantedPartitions = case IndexType of
562        main ->
563            WantedPartitions0;
564        replica ->
565            []
566        end,
567        ViewGroupReq1 = #set_view_group_req{
568            stale = Stale,
569            update_stats = true,
570            wanted_partitions = WantedPartitions,
571            debug = Debug,
572            type = IndexType
573        },
574        case prepare_set_view(
575            ViewSpec, ViewGroupReq1, DDoc, Queue, fun couch_set_view:get_map_view/4) of
576        not_found ->
577            ViewGroupReq2 = ViewGroupReq1#set_view_group_req{
578                update_stats = false
579            },
580            case prepare_set_view(
581                ViewSpec, ViewGroupReq2, DDoc, Queue, fun couch_set_view:get_reduce_view/4) of
582            {RedView, Group0} ->
583                {couch_set_view:extract_map_view(RedView), Group0};
584            Else ->
585                Else
586            end;
587        Else ->
588            ViewGroupReq2 = ViewGroupReq1,
589            Else
590        end
591    end,
592
593    case PrepareResult of
594    error ->
595        %%  handled by prepare_set_view
596        ok;
597    {View, Group} ->
598        queue_debug_info(Debug, Group, ViewGroupReq2, Queue),
599        try
600            FoldFun = make_map_set_fold_fun(Queue),
601
602            case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
603                couch_index_merger:ddoc_unchanged(DDocDbName, DDoc) of
604            true ->
605                RowCount = couch_set_view:get_row_count(Group, View),
606                ok = couch_view_merger_queue:queue(Queue, {row_count, RowCount}),
607                {ok, _, _} = couch_set_view:fold(Group, View, FoldFun, [], ViewArgs);
608            false ->
609                ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
610            end
611        catch
612        ddoc_db_not_found ->
613            ok = couch_view_merger_queue:queue(
614                Queue, {error, ?LOCAL,
615                    couch_index_merger:ddoc_not_found_msg(DDocDbName, DDocId)});
616        throw:queue_shutdown ->
617            % The merger process shutdown our queue, limit was reached and this is
618            % expected, so don't long unnecessary error message and stack trace.
619            ok;
620        _Tag:Error ->
621            Stack = erlang:get_stacktrace(),
622            ?LOG_ERROR("Caught unexpected error "
623                       "while serving view query ~s/~s: ~p~n~s",
624                       [?LOG_USERDATA(SetName), ?LOG_USERDATA(DDocId), Error, ?LOG_USERDATA(Stack)]),
625            couch_view_merger_queue:queue(Queue, {error, ?LOCAL, to_binary(Error)})
626        after
627            couch_set_view:release_group(Group),
628            ok = couch_view_merger_queue:done(Queue)
629        end
630    end.
631
632
633http_view_fold(object_start, map, Queue) ->
634    fun(Ev) -> http_view_fold_rc_1(Ev, Queue) end;
635http_view_fold(object_start, red_map, Queue) ->
636    fun(Ev) -> http_view_fold_rc_1(Ev, Queue) end;
637http_view_fold(object_start, reduce, Queue) ->
638    fun(Ev) -> http_view_fold_rows_1(Ev, Queue) end.
639
640http_view_fold_rc_1({key, <<"total_rows">>}, Queue) ->
641    fun(Ev) -> http_view_fold_rc_2(Ev, Queue) end;
642http_view_fold_rc_1({key, <<"debug_info">>}, Queue) ->
643    fun(object_start) ->
644        fun(Ev) ->
645            http_view_fold_debug_info(Ev, Queue, [], fun http_view_fold_rc_1/2)
646        end
647    end;
648http_view_fold_rc_1(_Ev, Queue) ->
649    fun(Ev) -> http_view_fold_rc_1(Ev, Queue) end.
650
651http_view_fold_rc_2(RowCount, Queue) when is_number(RowCount) ->
652    ok = couch_view_merger_queue:queue(Queue, {row_count, RowCount}),
653    fun(Ev) -> http_view_fold_rows_1(Ev, Queue) end.
654
655http_view_fold_rows_1({key, <<"rows">>}, Queue) ->
656    fun(array_start) -> fun(Ev) -> http_view_fold_rows_2(Ev, Queue) end end;
657http_view_fold_rows_1({key, <<"debug_info">>}, Queue) ->
658    fun(object_start) ->
659        fun(Ev) ->
660            http_view_fold_debug_info(Ev, Queue, [], fun http_view_fold_rows_1/2)
661        end
662    end;
663http_view_fold_rows_1(_Ev, Queue) ->
664    fun(Ev) -> http_view_fold_rows_1(Ev, Queue) end.
665
666http_view_fold_rows_2(array_end, Queue) ->
667    fun(Ev) -> http_view_fold_extra(Ev, Queue) end;
668http_view_fold_rows_2(object_start, Queue) ->
669    fun(Ev) ->
670        json_stream_parse:collect_object(
671            Ev,
672            fun(Row) ->
673                http_view_fold_queue_row(Row, Queue),
674                fun(Ev2) -> http_view_fold_rows_2(Ev2, Queue) end
675            end)
676    end.
677
678http_view_fold_extra({key, <<"errors">>}, Queue) ->
679    fun(array_start) -> fun(Ev) -> http_view_fold_errors(Ev, Queue) end end;
680http_view_fold_extra(_Ev, _Queue) ->
681    fun couch_index_merger:void_event/1.
682
683http_view_fold_errors(array_end, _Queue) ->
684    fun couch_index_merger:void_event/1;
685http_view_fold_errors(object_start, Queue) ->
686    fun(Ev) ->
687        json_stream_parse:collect_object(
688            Ev,
689            fun(Error) ->
690                http_view_fold_queue_error(Error, Queue),
691                fun(Ev2) -> http_view_fold_errors(Ev2, Queue) end
692            end)
693    end.
694
695http_view_fold_debug_info({key, Key}, Queue, Acc, RetFun) ->
696    fun(object_start) ->
697        fun(Ev) ->
698            json_stream_parse:collect_object(
699                Ev,
700                fun(DebugInfo) ->
701                    Acc2 = [{Key, DebugInfo} | Acc],
702                    fun(Ev2) -> http_view_fold_debug_info(Ev2, Queue, Acc2, RetFun) end
703                end)
704        end
705    end;
706http_view_fold_debug_info(object_end, Queue, Acc, RetFun) ->
707    case Acc of
708    [{?LOCAL, Info}] ->
709        ok;
710    _ ->
711        Info = {lists:reverse(Acc)}
712    end,
713    ok = couch_view_merger_queue:queue(Queue, {debug_info, get(from_url), Info}),
714    fun(Ev2) -> RetFun(Ev2, Queue) end.
715
716
717http_view_fold_queue_error({Props}, Queue) ->
718    Reason = get_value(<<"reason">>, Props, null),
719    ok = couch_view_merger_queue:queue(Queue, {error, get(from_url), Reason}).
720
721
722http_view_fold_queue_row({Props}, Queue) ->
723    Key = {json, ?JSON_ENCODE(get_value(<<"key">>, Props, null))},
724    Id = get_value(<<"id">>, Props, nil),
725    Val = {json, ?JSON_ENCODE(get_value(<<"value">>, Props))},
726    Value = case get_value(<<"partition">>, Props, nil) of
727    nil ->
728        Val;
729    PartId ->
730        % we're in debug mode, add node info
731        {PartId, get(from_url), Val}
732    end,
733    Row = case get_value(<<"error">>, Props, nil) of
734    nil ->
735        case Id of
736        nil ->
737            % reduce row
738            {Key, Val};
739        _ ->
740            % map row
741            case get_value(<<"doc">>, Props, nil) of
742            nil ->
743                {{Key, Id}, Value};
744            Doc ->
745                {{Key, Id}, Value, Doc}
746            end
747        end;
748    Error ->
749        % error in a map row
750        {{Key, error}, Error}
751    end,
752    ok = couch_view_merger_queue:queue(Queue, Row).
753
754reduce_view_folder(_Db, #set_view_spec{} = ViewSpec, MergeParams,
755                   _UserCtx, DDoc, Queue) ->
756    reduce_set_view_folder(ViewSpec, MergeParams, DDoc, Queue).
757
758
759reduce_set_view_folder(ViewSpec, MergeParams, DDoc, Queue) ->
760    #set_view_spec{
761        name = SetName,
762        ddoc_id = DDocId,
763        partitions = WantedPartitions0
764    } = ViewSpec,
765    #index_merge{
766        http_params = ViewArgs
767    } = MergeParams,
768    #view_query_args{
769        stale = Stale,
770        debug = Debug,
771        type = IndexType
772    } = ViewArgs,
773
774    DDocDbName = ?master_dbname(SetName),
775    PrepareResult = case (ViewSpec#set_view_spec.view =/= nil) andalso
776        (ViewSpec#set_view_spec.group =/= nil) of
777    true ->
778        ViewGroupReq = nil,
779        {ViewSpec#set_view_spec.view, ViewSpec#set_view_spec.group};
780    false ->
781        WantedPartitions = case IndexType of
782        main ->
783            WantedPartitions0;
784        replica ->
785            []
786        end,
787        ViewGroupReq = #set_view_group_req{
788            stale = Stale,
789            update_stats = true,
790            wanted_partitions = WantedPartitions,
791            debug = Debug,
792            type = IndexType
793        },
794        prepare_set_view(ViewSpec, ViewGroupReq, DDoc, Queue, fun couch_set_view:get_reduce_view/4)
795    end,
796
797    case PrepareResult of
798    error ->
799        %%  handled by prepare_set_view
800        ok;
801    {View, Group} ->
802        queue_debug_info(Debug, Group, ViewGroupReq, Queue),
803        try
804            FoldFun = fun(GroupedKey, Red, Acc) ->
805                ok = couch_view_merger_queue:queue(Queue, {GroupedKey, Red}),
806                {ok, Acc}
807            end,
808
809            case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
810                couch_index_merger:ddoc_unchanged(DDocDbName, DDoc) of
811            true ->
812                {ok, _} = couch_set_view:fold_reduce(Group, View, FoldFun, [], ViewArgs);
813            false ->
814                ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
815            end
816        catch
817        ddoc_db_not_found ->
818            ok = couch_view_merger_queue:queue(
819                Queue, {error, ?LOCAL,
820                    couch_index_merger:ddoc_not_found_msg(DDocDbName, DDocId)});
821        throw:queue_shutdown ->
822            % The merger process shutdown our queue, limit was reached and this is
823            % expected, so don't long unnecessary error message and stack trace.
824            ok;
825        _Tag:Error ->
826            Stack = erlang:get_stacktrace(),
827            ?LOG_ERROR("Caught unexpected error "
828                       "while serving view query ~s/~s: ~p~n~s",
829                       [?LOG_USERDATA(SetName), ?LOG_USERDATA(DDocId), Error, ?LOG_USERDATA(Stack)]),
830            couch_view_merger_queue:queue(Queue, {error, ?LOCAL, to_binary(Error)})
831        after
832            couch_set_view:release_group(Group),
833            ok = couch_view_merger_queue:done(Queue)
834        end
835    end.
836
837
838make_map_set_fold_fun(Queue) ->
839    fun(Kv, _, Acc) ->
840        ok = couch_view_merger_queue:queue(Queue, Kv),
841        {ok, Acc}
842    end.
843
844
845view_undefined_msg(SetName, DDocId) ->
846    Msg = io_lib:format(
847        "Undefined set view `~s` for `~s` design document.", [SetName, DDocId]),
848    iolist_to_binary(Msg).
849
850view_qs(ViewArgs, MergeParams) ->
851    DefViewArgs = #view_query_args{},
852    #view_query_args{
853        start_key = StartKey, end_key = EndKey,
854        start_docid = StartDocId, end_docid = EndDocId,
855        direction = Dir,
856        inclusive_end = IncEnd,
857        group_level = GroupLevel,
858        view_type = ViewType,
859        conflicts = Conflicts,
860        stale = Stale,
861        limit = Limit,
862        debug = Debug,
863        filter = Filter,
864        type = IndexType,
865        skip = Skip
866    } = ViewArgs,
867    #index_merge{
868        on_error = OnError,
869        conn_timeout = Timeout
870    } = MergeParams,
871
872    QsList = case StartKey =:= DefViewArgs#view_query_args.start_key of
873    true ->
874        [];
875    false ->
876        ["startkey=" ++ json_qs_val(StartKey)]
877    end ++
878    case EndKey =:= DefViewArgs#view_query_args.end_key of
879    true ->
880        [];
881    false ->
882        ["endkey=" ++ json_qs_val(EndKey)]
883    end ++
884    case {Dir, StartDocId =:= DefViewArgs#view_query_args.start_docid} of
885    {fwd, false} ->
886        ["startkey_docid=" ++ qs_val(StartDocId)];
887    _ ->
888        []
889    end ++
890    case {Dir, EndDocId =:= DefViewArgs#view_query_args.end_docid} of
891    {fwd, false} ->
892        ["endkey_docid=" ++ qs_val(EndDocId)];
893    _ ->
894        []
895    end ++
896    case Dir of
897    fwd ->
898        [];
899    rev ->
900        StartDocId1 = reverse_key_default(StartDocId),
901        EndDocId1 = reverse_key_default(EndDocId),
902        ["descending=true"] ++
903        case StartDocId1 =:= DefViewArgs#view_query_args.start_docid of
904        true ->
905            [];
906        false ->
907            ["startkey_docid=" ++ qs_val(StartDocId1)]
908        end ++
909        case EndDocId1 =:= DefViewArgs#view_query_args.end_docid of
910        true ->
911            [];
912        false ->
913            ["endkey_docid=" ++ qs_val(EndDocId1)]
914        end
915    end ++
916    case IncEnd =:= DefViewArgs#view_query_args.inclusive_end of
917    true ->
918        [];
919    false ->
920        ["inclusive_end=" ++ atom_to_list(IncEnd)]
921    end ++
922    case GroupLevel =:= DefViewArgs#view_query_args.group_level of
923    true ->
924        [];
925    false ->
926        case GroupLevel of
927        exact ->
928            ["group=true"];
929        _ when is_number(GroupLevel) ->
930            ["group_level=" ++ integer_to_list(GroupLevel)]
931        end
932    end ++
933    case ViewType of
934    red_map ->
935        ["reduce=false"];
936    _ ->
937        []
938    end ++
939    case Conflicts =:= DefViewArgs#view_query_args.conflicts of
940    true ->
941        [];
942    false ->
943        ["conflicts=" ++ atom_to_list(Conflicts)]
944    end ++
945    %% we now have different default
946    case Stale =:= ?DEFAULT_STALENESS of
947    true ->
948        [];
949    false ->
950        ["stale=" ++ atom_to_list(Stale)]
951    end ++
952    case OnError =:= ?ON_ERROR_DEFAULT of
953    true ->
954        [];
955    false ->
956        ["on_error=" ++ atom_to_list(OnError)]
957    end ++
958    case Limit =:= DefViewArgs#view_query_args.limit of
959    true ->
960        [];
961    false ->
962        ["limit=" ++ integer_to_list(Limit + Skip)]
963    end ++
964    case Debug =:= DefViewArgs#view_query_args.debug of
965    true ->
966        [];
967    false ->
968        ["debug=" ++ atom_to_list(Debug)]
969    end ++
970    case Filter =:= DefViewArgs#view_query_args.filter of
971    true ->
972        [];
973    false ->
974        ["_filter=" ++ atom_to_list(Filter)]
975    end ++
976    case IndexType =:= DefViewArgs#view_query_args.type of
977    true ->
978        [];
979    false ->
980        ["_type=" ++ atom_to_list(IndexType)]
981    end ++
982    ["connection_timeout=" ++ integer_to_list(Timeout)],
983
984    case QsList of
985    [] ->
986        [];
987    _ ->
988        "?" ++ string:join(QsList, "&")
989    end.
990
991json_qs_val(Value) ->
992    couch_httpd:quote(?b2l(iolist_to_binary(?JSON_ENCODE(Value)))).
993
994qs_val(Value) ->
995    couch_httpd:quote(couch_util:to_list(Value)).
996
997reverse_key_default(?MIN_STR) -> ?MAX_STR;
998reverse_key_default(?MAX_STR) -> ?MIN_STR;
999reverse_key_default(Key) -> Key.
1000
1001
1002queue_debug_info(Debug, Group, GroupReq, Queue) ->
1003    case debug_info(Debug, Group, GroupReq) of
1004    nil ->
1005        ok;
1006    DebugInfo ->
1007        ok = couch_view_merger_queue:queue(Queue, DebugInfo)
1008    end.
1009
1010debug_info(false, _Group, _GroupReq) ->
1011    nil;
1012debug_info(true, #set_view_group{} = Group, GroupReq) ->
1013    #set_view_debug_info{
1014        original_abitmask = OrigMainAbitmask,
1015        original_pbitmask = OrigMainPbitmask,
1016        stats = Stats,
1017        replica_partitions = ReplicaPartitions,
1018        wanted_seqs = WantedSeqs0
1019    } = Group#set_view_group.debug_info,
1020    OrigMainActive = couch_set_view_util:decode_bitmask(OrigMainAbitmask),
1021    ModMainActive = couch_set_view_util:decode_bitmask(?set_abitmask(Group)),
1022    OrigMainPassive = couch_set_view_util:decode_bitmask(OrigMainPbitmask),
1023    ModMainPassive = couch_set_view_util:decode_bitmask(?set_pbitmask(Group)),
1024    MainCleanup = couch_set_view_util:decode_bitmask(?set_cbitmask(Group)),
1025    % 0 padded so that a pretty print JSON can sanely sort the keys (partition IDs)
1026    IndexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- ?set_seqs(Group)],
1027    UnindexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} ||
1028        {P, S} <- ?set_unindexable_seqs(Group)],
1029    WantedSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- WantedSeqs0],
1030    MainInfo = [
1031        {<<"active_partitions">>, ordsets:from_list(ModMainActive)},
1032        {<<"original_active_partitions">>, ordsets:from_list(OrigMainActive)},
1033        {<<"passive_partitions">>, ordsets:from_list(ModMainPassive)},
1034        {<<"original_passive_partitions">>, ordsets:from_list(OrigMainPassive)},
1035        {<<"cleanup_partitions">>, ordsets:from_list(MainCleanup)},
1036        {<<"replica_partitions">>, ordsets:from_list(ReplicaPartitions)},
1037        {<<"replicas_on_transfer">>, ?set_replicas_on_transfer(Group)},
1038        {<<"indexable_seqs">>, {IndexableSeqs}},
1039        {<<"unindexeable_seqs">>, {UnindexableSeqs}},
1040        {<<"wanted_seqs">>, {WantedSeqs}},
1041        case GroupReq of
1042        nil ->
1043            {<<"wanted_partitions">>, null};
1044        #set_view_group_req{wanted_partitions = WantedPartitions} ->
1045            {<<"wanted_partitions">>, WantedPartitions}
1046        end,
1047        pending_transition_debug_info(Group),
1048        {<<"stats">>, set_view_group_stats_ejson(Stats)}
1049    ],
1050    RepInfo = replica_group_debug_info(Group),
1051    Info = case RepInfo of
1052    [] ->
1053        { [{<<"main_group">>, {MainInfo}}] };
1054    _ ->
1055        { [{<<"main_group">>, {MainInfo}}, {<<"replica_group">>, {RepInfo}}] }
1056    end,
1057    {debug_info, ?LOCAL, Info}.
1058
1059replica_group_debug_info(#set_view_group{replica_group = nil}) ->
1060    [];
1061replica_group_debug_info(#set_view_group{replica_group = RepGroup}) ->
1062    #set_view_group{
1063        debug_info = #set_view_debug_info{
1064            original_abitmask = OrigRepAbitmask,
1065            original_pbitmask = OrigRepPbitmask,
1066            stats = Stats,
1067            wanted_seqs = WantedSeqs0
1068        }
1069    } = RepGroup,
1070    OrigRepActive = couch_set_view_util:decode_bitmask(OrigRepAbitmask),
1071    ModRepActive = couch_set_view_util:decode_bitmask(?set_abitmask(RepGroup)),
1072    OrigRepPassive = couch_set_view_util:decode_bitmask(OrigRepPbitmask),
1073    ModRepPassive = couch_set_view_util:decode_bitmask(?set_pbitmask(RepGroup)),
1074    RepCleanup = couch_set_view_util:decode_bitmask(?set_cbitmask(RepGroup)),
1075    % 0 padded so that a pretty print JSON can sanely sort the keys (partition IDs)
1076    IndexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- ?set_seqs(RepGroup)],
1077    UnindexableSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} ||
1078        {P, S} <- ?set_unindexable_seqs(RepGroup)],
1079    WantedSeqs = [{?l2b(io_lib:format("~4..0b", [P])), S} || {P, S} <- WantedSeqs0],
1080    [
1081        {<<"replica_active_partitions">>, ordsets:from_list(ModRepActive)},
1082        {<<"replica_original_active_partitions">>, ordsets:from_list(OrigRepActive)},
1083        {<<"replica_passive_partitions">>, ordsets:from_list(ModRepPassive)},
1084        {<<"replica_original_passive_partitions">>, ordsets:from_list(OrigRepPassive)},
1085        {<<"replica_cleanup_partitions">>, ordsets:from_list(RepCleanup)},
1086        {<<"replica_indexable_seqs">>, {IndexableSeqs}},
1087        {<<"replica_unindexable_seqs">>, {UnindexableSeqs}},
1088        {<<"replica_wanted_seqs">>, {WantedSeqs}},
1089        pending_transition_debug_info(RepGroup),
1090        {<<"replica_stats">>, set_view_group_stats_ejson(Stats)}
1091    ].
1092
1093
1094pending_transition_debug_info(#set_view_group{index_header = Header}) ->
1095    Pt = Header#set_view_index_header.pending_transition,
1096    case Pt of
1097    nil ->
1098        {<<"pending_transition">>, null};
1099    #set_view_transition{} ->
1100        {<<"pending_transition">>,
1101            {[
1102                {<<"active">>, Pt#set_view_transition.active},
1103                {<<"passive">>, Pt#set_view_transition.passive},
1104                {<<"unindexable">>, Pt#set_view_transition.unindexable}
1105            ]}
1106        }
1107    end.
1108
1109
1110set_view_group_stats_ejson(Stats) ->
1111    StatNames = record_info(fields, set_view_group_stats),
1112    StatPoses = lists:seq(2, record_info(size, set_view_group_stats)),
1113    {lists:foldl(
1114        fun({ets_key, _}, Acc) ->
1115            Acc;
1116        ({StatName, StatPos}, Acc) ->
1117            [{StatName, element(StatPos, Stats)} | Acc]
1118        end,
1119        [],
1120        lists:zip(StatNames, StatPoses))}.
1121
1122-spec stat_ets_initial_insert(atom(), binary(), binary()) -> boolean().
1123stat_ets_initial_insert(EtsTable, DDocId, ViewName) ->
1124    % Populating slabs from 1ms to 100sec
1125    ViewStats = [{ViewName,[
1126        {<<"1">>, 0},
1127        {<<"10">>, 0},
1128        {<<"100">>, 0},
1129        {<<"1000">>, 0},
1130        {<<"10000">>, 0},
1131        {<<"100000">>, 0}]}],
1132    DDocStats = case EtsTable of
1133    ?QUERY_TIMING_STATS_ETS ->
1134        case ets:lookup(?QUERY_TIMING_STATS_ETS, DDocId) of
1135        % case where ddoc already exists but a new view has been added to it
1136        [{DDocId, DDocStats0}] ->
1137            lists:append(DDocStats0, ViewStats);
1138        [] ->
1139            % case where new ddoc has been created
1140            ViewStats
1141        end
1142    end,
1143    ets:insert(?QUERY_TIMING_STATS_ETS, {DDocId, DDocStats}).
1144
1145check_key_in_ets(EtsTable, DDocId, ViewName) ->
1146    case ets:lookup(EtsTable, DDocId) of
1147    [{DDocId, DDocStats}] ->
1148        case lists:keyfind(ViewName, 1, DDocStats) of
1149        {ViewName, _} ->
1150            true;
1151        _ ->
1152            false
1153        end;
1154    [] ->
1155        false
1156    end.
1157
1158check_and_set_key(EtsTable, DDocId, ViewName) ->
1159    case check_key_in_ets(EtsTable, DDocId, ViewName) of
1160    false ->
1161        stat_ets_initial_insert(EtsTable, DDocId, ViewName);
1162    true ->
1163        already_exists
1164    end.
1165
1166update_ets_key(EtsTable, DDocId, ViewName, Exp0) ->
1167    case EtsTable of
1168    ?QUERY_TIMING_STATS_ETS ->
1169        [{DDocId, DDocStats0}] = ets:lookup(?QUERY_TIMING_STATS_ETS, DDocId),
1170        case lists:keyfind(ViewName, 1, DDocStats0) of
1171        {ViewName, ViewStats0} ->
1172            % If query latency is too high then put
1173            % it in last timeslab i.e. 100s
1174            Exp = case Exp0 < length(ViewStats0) + 1 of
1175            true ->
1176                Exp0;
1177            false ->
1178                length(ViewStats0)
1179            end,
1180            {TimeSlab, CurrentCounter} = lists:nth(Exp, ViewStats0),
1181            NewTuple = {TimeSlab, CurrentCounter + 1},
1182
1183            % updating the TimeSlab counter within the view stats
1184            ViewStats = lists:keyreplace(
1185                TimeSlab, 1, ViewStats0, NewTuple),
1186
1187            % updating the ddoc stat with updated view stats
1188            DDocStats = lists:keyreplace(
1189                ViewName, 1, DDocStats0, {ViewName, ViewStats}),
1190
1191            ets:insert(?QUERY_TIMING_STATS_ETS, {DDocId, DDocStats})
1192        end
1193    end.
1194
1195update_timing_stat(DDocId, ViewName, TimeElapsed) ->
1196    Exp = case TimeElapsed > 1 of
1197    true ->
1198        % Incrementing the exponent by 1 because lists:nth starts index from 1
1199        round(math:log10(TimeElapsed)) + 1;
1200    false ->
1201        1
1202    end,
1203    check_and_set_key(?QUERY_TIMING_STATS_ETS, DDocId, ViewName),
1204    update_ets_key(?QUERY_TIMING_STATS_ETS, DDocId, ViewName, Exp).
1205
1206% Query with a single view to merge, trigger a simpler code path
1207% (no queue, no child processes, etc).
1208simple_set_view_query(Params, DDoc, Req) ->
1209    #index_merge{
1210        callback = Callback,
1211        user_acc = UserAcc,
1212        indexes = [SetViewSpec],
1213        extra = #view_merge{keys = Keys},
1214        start_timer = StartTimer
1215    } = Params,
1216    #set_view_spec{
1217        name = SetName,
1218        partitions = Partitions0,
1219        ddoc_id = DDocId,
1220        view_name = ViewName,
1221        category = Category
1222    } = SetViewSpec,
1223
1224    Stale = list_to_existing_atom(string:to_lower(
1225        couch_httpd:qs_value(Req, "stale", "update_after"))),
1226    Debug = couch_set_view_http:parse_bool_param(
1227        couch_httpd:qs_value(Req, "debug", "false")),
1228    IndexType = list_to_existing_atom(
1229        couch_httpd:qs_value(Req, "_type", "main")),
1230    Partitions = case IndexType of
1231    main ->
1232        Partitions0;
1233    replica ->
1234        []
1235    end,
1236    GroupReq = #set_view_group_req{
1237        stale = Stale,
1238        update_stats = true,
1239        wanted_partitions = Partitions,
1240        debug = Debug,
1241        type = IndexType,
1242        category = Category
1243    },
1244
1245    case get_set_view(
1246        fun couch_set_view:get_map_view/4, SetName, DDoc, ViewName, GroupReq) of
1247    {ok, View, Group, MissingPartitions} ->
1248        ViewType = map;
1249    {not_found, _} ->
1250        GroupReq2 = GroupReq#set_view_group_req{
1251            update_stats = false
1252        },
1253        case get_set_view(
1254            fun couch_set_view:get_reduce_view/4, SetName, DDoc, ViewName, GroupReq2) of
1255        {ok, ReduceView, Group, MissingPartitions} ->
1256            Reduce = list_to_existing_atom(
1257                string:to_lower(couch_httpd:qs_value(Req, "reduce", "true"))),
1258            case Reduce of
1259            false ->
1260                ViewType = red_map,
1261                View = couch_set_view:extract_map_view(ReduceView);
1262            true ->
1263                ViewType = reduce,
1264                View = ReduceView
1265            end;
1266        Error ->
1267            MissingPartitions = Group = View = ViewType = nil,
1268            ErrorMsg = io_lib:format("Error opening view `~s`, from set `~s`, "
1269                "design document `~s`: ~p", [ViewName, SetName, DDocId, Error]),
1270            throw({not_found, iolist_to_binary(ErrorMsg)})
1271        end
1272    end,
1273
1274    case MissingPartitions of
1275    [] ->
1276        ok;
1277    _ ->
1278        couch_set_view:release_group(Group),
1279        ?LOG_INFO("Set view `~s`, group `~s`, missing partitions: ~w",
1280                  [?LOG_USERDATA(SetName), ?LOG_USERDATA(DDocId), MissingPartitions]),
1281        throw({error, set_view_outdated})
1282    end,
1283
1284    QueryArgs = couch_httpd_view:parse_view_params(Req, Keys, ViewType),
1285    QueryArgs2 = QueryArgs#view_query_args{
1286        view_name = ViewName,
1287        stale = Stale
1288     },
1289
1290    case debug_info(Debug, Group, GroupReq) of
1291    nil ->
1292        Params2 = Params#index_merge{user_ctx = Req#httpd.user_ctx};
1293    DebugInfo ->
1294        {ok, UserAcc2} = Callback(DebugInfo, UserAcc),
1295        Params2 = Params#index_merge{
1296            user_ctx = Req#httpd.user_ctx,
1297            user_acc = UserAcc2
1298        }
1299    end,
1300
1301    try
1302        case ViewType of
1303        reduce ->
1304            simple_set_view_reduce_query(Params2, Group, View, QueryArgs2);
1305        _ ->
1306            simple_set_view_map_query(Params2, Group, View, QueryArgs2)
1307        end
1308    after
1309        case StartTimer of
1310        nil ->
1311            start_timer_not_set;
1312        _ ->
1313            case Req#httpd.method of
1314            'GET' ->
1315                TimeElapsed = timer:now_diff(
1316                    os:timestamp(), StartTimer) / 1000,
1317                update_timing_stat(DDocId, ViewName, TimeElapsed);
1318            'POST' ->
1319                    ignore_subquery_stat
1320            end
1321        end,
1322        couch_set_view:release_group(Group)
1323    end.
1324
1325
1326simple_set_view_map_query(Params, Group, View, ViewArgs) ->
1327    #index_merge{
1328        callback = Callback,
1329        user_acc = UserAcc
1330    } = Params,
1331    #view_query_args{
1332        limit = Limit,
1333        skip = Skip,
1334        debug = DebugMode
1335    } = ViewArgs,
1336
1337    FoldFun = fun(_Kv, _, {0, _, _} = Acc) ->
1338            {stop, Acc};
1339        (_Kv, _, {AccLim, AccSkip, UAcc}) when AccSkip > 0 ->
1340            {ok, {AccLim, AccSkip - 1, UAcc}};
1341        (Kv, _, {AccLim, 0, UAcc}) ->
1342            Row = view_row_obj_map(Kv, DebugMode),
1343            {ok, UAcc2} = Callback({row, Row}, UAcc),
1344            {ok, {AccLim - 1, 0, UAcc2}}
1345    end,
1346
1347    RowCount = couch_set_view:get_row_count(Group, View),
1348    {ok, UserAcc2} = Callback({start, RowCount}, UserAcc),
1349
1350    {ok, _, {_, _, UserAcc3}} = couch_set_view:fold(
1351        Group, View, FoldFun, {Limit, Skip, UserAcc2}, ViewArgs),
1352    Callback(stop, UserAcc3).
1353
1354
1355simple_set_view_reduce_query(Params, Group, View, ViewArgs) ->
1356    #index_merge{
1357        callback = Callback,
1358        user_acc = UserAcc
1359    } = Params,
1360    #view_query_args{
1361        limit = Limit,
1362        skip = Skip,
1363        debug = DebugMode
1364    } = ViewArgs,
1365
1366    FoldFun = fun(_GroupedKey, _Red, {0, _, _} = Acc) ->
1367            {stop, Acc};
1368        (_GroupedKey, _Red, {AccLim, AccSkip, UAcc}) when AccSkip > 0 ->
1369            {ok, {AccLim, AccSkip - 1, UAcc}};
1370        (GroupedKey, Red, {AccLim, 0, UAcc}) ->
1371            Row = view_row_obj_reduce({GroupedKey, Red}, DebugMode),
1372            {ok, UAcc2} = Callback({row, Row}, UAcc),
1373            {ok, {AccLim - 1, 0, UAcc2}}
1374    end,
1375
1376    {ok, UserAcc2} = Callback(start, UserAcc),
1377    {ok, {_, _, UserAcc3}} = couch_set_view:fold_reduce(
1378        Group, View, FoldFun, {Limit, Skip, UserAcc2}, ViewArgs),
1379    Callback(stop, UserAcc3).
1380