1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_http).
16
17-export([handle_req/1]).
18
19-export([finish_view_fold/4, finish_view_fold/5]).
20-export([view_etag/2, view_etag/3]).
21-export([design_doc_view/6, parse_bool_param/1]).
22
23-import(couch_httpd,
24    [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,send_chunk/2,
25    start_json_response/2, start_json_response/3, end_json_response/1,
26    send_chunked_error/2]).
27
28-include("couch_db.hrl").
29-include_lib("couch_set_view/include/couch_set_view.hrl").
30
31
32
33handle_req(#httpd{path_parts = [<<"_set_view">>, SetName, <<"_cleanup">>]} = Req) ->
34    case Req#httpd.method of
35    'POST' ->
36         couch_httpd:validate_ctype(Req, "application/json"),
37         ok = couch_set_view:cleanup_index_files(mapreduce_view, SetName),
38         send_json(Req, 202, {[{ok, true}]});
39     _ ->
40         send_method_not_allowed(Req, "POST")
41     end;
42
43handle_req(#httpd{path_parts = PathParts} = Req) ->
44    [<<"_set_view">>, SetName, <<"_design">>, DesignName | Rest] = PathParts,
45    route_request(Req, SetName, <<"_design/", DesignName/binary>>, Rest).
46
47route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_define">>]) ->
48    couch_httpd:validate_ctype(Req, "application/json"),
49    {Fields} = couch_httpd:json_body_obj(Req),
50    SetViewParams = #set_view_params{
51        max_partitions = couch_util:get_value(<<"number_partitions">>, Fields, 0),
52        active_partitions = couch_util:get_value(<<"active_partitions">>, Fields, []),
53        passive_partitions = couch_util:get_value(<<"passive_partitions">>, Fields, []),
54        use_replica_index = couch_util:get_value(<<"use_replica_index">>, Fields, false)
55    },
56    ok = couch_set_view:define_group(
57        mapreduce_view, SetName, DDocId, SetViewParams),
58    couch_httpd:send_json(Req, 201, {[{ok, true}]});
59
60route_request(Req, _SetName, _DDocId, [<<"_define">>]) ->
61    send_method_not_allowed(Req, "POST");
62
63route_request(#httpd{method = 'GET'} = Req, SetName, DDocId, [<<"_info">>]) ->
64    {ok, Info} = couch_set_view:get_group_info(
65        mapreduce_view, SetName, DDocId, prod),
66    couch_httpd:send_json(Req, 200, {Info});
67
68route_request(#httpd{method = 'GET'} = Req, SetName, DDocId, [<<"_btree_stats">>]) ->
69    GroupReq = #set_view_group_req{
70        type = list_to_existing_atom(couch_httpd:qs_value(Req, "_type", "main")),
71        stale = ok,
72        update_stats = false
73    },
74    {ok, Group} = couch_set_view:get_group(
75        mapreduce_view, SetName, DDocId, GroupReq),
76    #set_view_group{
77        id_btree = IdBtree,
78        views = Views
79    } = Group,
80    IdBtreeStats = couch_btree_stats:compute(IdBtree),
81    ViewStats = lists:foldr(
82        fun(SetView, Acc) ->
83            #mapreduce_view{
84                btree = Bt,
85                reduce_funs = RedFuns,
86                map_names = MapNames
87            } = SetView#set_view.indexer,
88            S = couch_btree_stats:compute(Bt),
89            case RedFuns of
90            [{ViewName, _} | _] ->
91                ok;
92            [] ->
93                [ViewName | _] = MapNames
94            end,
95            [{ViewName, {S}} | Acc]
96        end,
97        [], Views),
98    Stats = {[{<<"id_btree">>, {IdBtreeStats}} | ViewStats]},
99    couch_httpd:send_json(Req, 200, Stats);
100
101route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_reset_utilization_stats">>]) ->
102    ok = couch_set_view:reset_utilization_stats(
103        mapreduce_view, SetName, DDocId),
104    couch_httpd:send_json(Req, 201, true);
105
106route_request(#httpd{method = 'GET'} = Req, SetName, DDocId, [<<"_get_utilization_stats">>]) ->
107    {ok, Stats} = couch_set_view:get_utilization_stats(
108        mapreduce_view, SetName, DDocId),
109    couch_httpd:send_json(Req, 200, {Stats});
110
111route_request(#httpd{method = 'GET'} = Req, _SetName, DDocId, [<<"_get_query_stats">>]) ->
112    DDocStats = ets:lookup(?QUERY_TIMING_STATS_ETS, DDocId),
113    DefaultHeaders = [{"Content-Type", couch_httpd:negotiate_content_type(Req)},
114                      {"Cache-Control", "must-revalidate"}],
115    % using mochijson2:encode because ejson:encode doesn't work
116    % with nested arrays
117    Body = [mochijson2:encode([{query_timing_in_ms, DDocStats}])],
118    couch_httpd:send_response(Req, 200, DefaultHeaders, Body);
119
120route_request(#httpd{method = 'GET'} = Req, SetName, DDocId, [<<"_view">>, ViewName]) ->
121    Keys = couch_httpd:qs_json_value(Req, "keys", nil),
122    FilteredPartitions = couch_httpd:qs_json_value(Req, "partitions", []),
123    validate_json_partition_list(FilteredPartitions),
124    design_doc_view(Req, SetName, DDocId, ViewName, FilteredPartitions, Keys);
125
126route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_view">>, ViewName]) ->
127    couch_httpd:validate_ctype(Req, "application/json"),
128    {Fields} = couch_httpd:json_body_obj(Req),
129    Keys = couch_util:get_value(<<"keys">>, Fields, nil),
130    case Keys of
131    nil ->
132        ok;
133    _ when is_list(Keys) ->
134        ok;
135    _ ->
136        throw({bad_request, "`keys` member must be a array."})
137    end,
138    FilteredPartitions = couch_util:get_value(<<"partitions">>, Fields, []),
139    validate_json_partition_list(FilteredPartitions),
140    design_doc_view(Req, SetName, DDocId, ViewName, FilteredPartitions, Keys);
141
142route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_compact">>]) ->
143    couch_httpd:validate_ctype(Req, "application/json"),
144    {ok, _Pid} = couch_set_view_compactor:start_compact(
145        mapreduce_view, SetName, DDocId, main),
146    couch_httpd:send_json(Req, 202, {[{ok, true}]});
147
148route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_compact">>, <<"main">>]) ->
149    couch_httpd:validate_ctype(Req, "application/json"),
150    {ok, _Pid} = couch_set_view_compactor:start_compact(
151        mapreduce_view, SetName, DDocId, main),
152    couch_httpd:send_json(Req, 202, {[{ok, true}]});
153
154route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_compact">>, <<"replica">>]) ->
155    couch_httpd:validate_ctype(Req, "application/json"),
156    {ok, _Pid} = couch_set_view_compactor:start_compact(
157        mapreduce_view, SetName, DDocId, replica),
158    couch_httpd:send_json(Req, 202, {[{ok, true}]});
159
160route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_set_partition_states">>]) ->
161    couch_httpd:validate_ctype(Req, "application/json"),
162    {Fields} = couch_httpd:json_body_obj(Req),
163    Active = couch_util:get_value(<<"active">>, Fields, []),
164    Passive = couch_util:get_value(<<"passive">>, Fields, []),
165    Cleanup = couch_util:get_value(<<"cleanup">>, Fields, []),
166    ok = couch_set_view:set_partition_states(
167        mapreduce_view, SetName, DDocId, Active, Passive, Cleanup),
168    couch_httpd:send_json(Req, 201, {[{ok, true}]});
169
170route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_add_replica_partitions">>]) ->
171    couch_httpd:validate_ctype(Req, "application/json"),
172    List = [_ | _] = couch_httpd:json_body(Req),
173    ok = couch_set_view:add_replica_partitions(
174        mapreduce_view, SetName, DDocId, List),
175    couch_httpd:send_json(Req, 201, {[{ok, true}]});
176
177route_request(#httpd{method = 'POST'} = Req, SetName, DDocId, [<<"_remove_replica_partitions">>]) ->
178    couch_httpd:validate_ctype(Req, "application/json"),
179    List = [_ | _] = couch_httpd:json_body(Req),
180    ok = couch_set_view:remove_replica_partitions(mapreduce_view,
181        SetName, DDocId, List),
182    couch_httpd:send_json(Req, 201, {[{ok, true}]}).
183
184
185validate_json_partition_list(L) when is_list(L) ->
186    lists:foreach(
187        fun(P) when not is_number(P) ->
188                throw({bad_request, "Expected a JSON array of partition IDs."});
189            (_) ->
190                ok
191        end, L);
192validate_json_partition_list(_) ->
193    throw({bad_request, "Expected a JSON array of partition IDs."}).
194
195
196design_doc_view(Req, SetName, DDocId, ViewName, FilteredPartitions, Keys) ->
197    Stale = get_stale_type(Req),
198    Reduce = get_reduce_type(Req),
199    GroupReq = #set_view_group_req{
200        stale = Stale,
201        update_stats = true,
202        wanted_partitions = FilteredPartitions,
203        debug = parse_bool_param(couch_httpd:qs_value(Req, "debug", "false")),
204        % Either "main" (for the normal index) or "replica"
205        type = list_to_existing_atom(
206            couch_httpd:qs_value(Req, "_type", "main"))
207    },
208    case couch_set_view:get_map_view(SetName, DDocId, ViewName, GroupReq) of
209    {ok, View, Group, _} ->
210        QueryArgs = parse_view_params(Req, Keys, ViewName, map),
211        Result = output_map_view(Req, View, Group, QueryArgs),
212        couch_set_view:release_group(Group);
213    {not_found, Reason} ->
214        GroupReq2 = GroupReq#set_view_group_req{
215            update_stats = false
216        },
217        case couch_set_view:get_reduce_view(SetName, DDocId, ViewName, GroupReq2) of
218        {ok, ReduceView, Group, _} ->
219            case Reduce of
220            false ->
221                QueryArgs = parse_view_params(Req, Keys, ViewName, red_map),
222                MapView = couch_set_view:extract_map_view(ReduceView),
223                Result = output_map_view(Req, MapView, Group, QueryArgs),
224                couch_set_view:release_group(Group);
225            _ ->
226                QueryArgs = parse_view_params(Req, Keys, ViewName, reduce),
227                Result = output_reduce_view(Req, ReduceView, Group, QueryArgs),
228                couch_set_view:release_group(Group)
229            end;
230        _ ->
231            Result = nil,
232            throw({not_found, Reason})
233        end
234    end,
235    Result.
236
237output_map_view(Req, View, Group, QueryArgs) ->
238    #view_query_args{
239        limit = Limit,
240        skip = SkipCount
241    } = QueryArgs,
242    CurrentEtag = view_etag(Group, View, QueryArgs#view_query_args.keys),
243    couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
244        RowCount = couch_set_view:get_row_count(Group, View),
245        RedCountFun = get_reduce_count_fun(Group),
246        FoldHelpers = #view_fold_helper_funs{reduce_count = RedCountFun},
247        FoldlFun = make_view_fold_fun(Req, QueryArgs, CurrentEtag, RowCount, FoldHelpers),
248        FoldAccInit = {Limit, SkipCount, undefined, []},
249        {ok, LastReduce, FoldResult} = couch_set_view:fold(Group, View, FoldlFun, FoldAccInit, QueryArgs),
250        finish_view_fold(Req, RowCount, RedCountFun(LastReduce), FoldResult)
251    end).
252
253-spec output_reduce_view(term(),
254                         {'reduce', non_neg_integer(), #set_view{}} | #set_view{},
255                         #set_view_group{},
256                         #view_query_args{})
257                        -> no_return().
258output_reduce_view(Req, View, Group, QueryArgs) ->
259    #view_query_args{
260        limit = Limit,
261        skip = Skip
262    } = QueryArgs,
263    CurrentEtag = view_etag(Group, View, QueryArgs#view_query_args.keys),
264    couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
265        FoldFun = make_reduce_fold_fun(Req, QueryArgs, CurrentEtag, #reduce_fold_helper_funs{}),
266        FoldAccInit = {Limit, Skip, undefined, []},
267        {ok, {_, _, Resp, _}} = couch_set_view:fold_reduce(
268            Group, View, FoldFun, FoldAccInit, QueryArgs),
269        finish_reduce_fold(Req, Resp)
270    end).
271
272
273get_reduce_count_fun(#set_view_group{replica_group = nil}) ->
274    fun couch_set_view:reduce_to_count/1;
275get_reduce_count_fun(#set_view_group{replica_group = #set_view_group{}}) ->
276    fun(_) -> nil end.
277
278
279get_stale_type(Req) ->
280    list_to_existing_atom(couch_httpd:qs_value(Req, "stale", "false")).
281
282get_reduce_type(Req) ->
283    list_to_existing_atom(couch_httpd:qs_value(Req, "reduce", "true")).
284
285parse_view_params(Req, Keys, ViewName, ViewType) ->
286    Params = couch_httpd_view:parse_view_params(Req, Keys, ViewType),
287    Params#view_query_args{view_name = ViewName}.
288
289make_view_fold_fun(Req, QueryArgs, Etag, TotalViewCount, HelperFuns) ->
290    #view_fold_helper_funs{
291        start_response = StartRespFun,
292        send_row = SendRowFun,
293        reduce_count = ReduceCountFun
294    } = apply_default_helper_funs(HelperFuns),
295
296    #view_query_args{
297        debug = Debug
298    } = QueryArgs,
299
300    fun(Kv, OffsetReds, {AccLimit, AccSkip, Resp, RowFunAcc}) ->
301        case {AccLimit, AccSkip, Resp} of
302        {0, _, _} ->
303            % we've done "limit" rows, stop foldling
304            {stop, {0, 0, Resp, RowFunAcc}};
305        {_, AccSkip, _} when AccSkip > 0 ->
306            % just keep skipping
307            {ok, {AccLimit, AccSkip - 1, Resp, RowFunAcc}};
308        {_, _, undefined} ->
309            % rendering the first row, first we start the response
310            Offset = ReduceCountFun(OffsetReds),
311            {ok, Resp2, RowFunAcc0} = StartRespFun(Req, Etag,
312                TotalViewCount, Offset, RowFunAcc),
313            {Go, RowFunAcc2} = SendRowFun(Resp2, Kv, RowFunAcc0, Debug),
314            {Go, {AccLimit - 1, 0, Resp2, RowFunAcc2}};
315        {AccLimit, _, Resp} when (AccLimit > 0) ->
316            % rendering all other rows
317            {Go, RowFunAcc2} = SendRowFun(Resp, Kv, RowFunAcc, Debug),
318            {Go, {AccLimit - 1, 0, Resp, RowFunAcc2}}
319        end
320    end.
321
322
323make_reduce_fold_fun(Req, _QueryArgs, Etag, HelperFuns) ->
324    #reduce_fold_helper_funs{
325        start_response = StartRespFun,
326        send_row = SendRowFun
327    } = apply_default_helper_funs(HelperFuns),
328
329    fun
330    (_GroupedKey, _Red, {AccLimit, AccSkip, Resp, RowAcc}) when AccSkip > 0 ->
331        % keep skipping
332        {ok, {AccLimit, AccSkip - 1, Resp, RowAcc}};
333    (_GroupedKey, _Red, {0, _AccSkip, Resp, RowAcc}) ->
334        % we've exhausted limit rows, stop
335        {stop, {0, _AccSkip, Resp, RowAcc}};
336
337    (GroupedKey, Red, {AccLimit, 0, undefined, RowAcc0}) ->
338        % group=true and we haven't responded yet
339        {ok, Resp2, RowAcc} = StartRespFun(Req, Etag, RowAcc0),
340        {Go, RowAcc2} = SendRowFun(Resp2, {GroupedKey, Red}, RowAcc),
341        {Go, {AccLimit - 1, 0, Resp2, RowAcc2}};
342    (GroupedKey, Red, {AccLimit, 0, Resp, RowAcc}) ->
343        % group=true and we've already started the response
344        {Go, RowAcc2} = SendRowFun(Resp, {GroupedKey, Red}, RowAcc),
345        {Go, {AccLimit - 1, 0, Resp, RowAcc2}}
346    end.
347
348apply_default_helper_funs(#view_fold_helper_funs{} = Helpers) ->
349    Helpers#view_fold_helper_funs{
350        start_response = fun json_view_start_resp/5,
351        send_row = fun send_json_view_row/4
352    };
353apply_default_helper_funs(#reduce_fold_helper_funs{} = Helpers) ->
354    Helpers#reduce_fold_helper_funs{
355        start_response = fun json_reduce_start_resp/3,
356        send_row = fun send_json_reduce_row/3
357    }.
358
359json_view_start_resp(Req, Etag, TotalRowCount, Offset, _Acc) ->
360    {ok, Resp} = start_json_response(Req, 200, [{"Etag", Etag}]),
361    % TODO: likely, remove offset, won't make sense with passive partitions.
362    %       Also, merged views don't have it.
363    BeginBody0 = io_lib:format("{\"total_rows\":~w,", [TotalRowCount]),
364    BeginBody = case is_number(Offset) of
365    true ->
366        [BeginBody0, io_lib:format("\"offset\":~w,", [Offset])];
367    false ->
368        BeginBody0
369    end,
370    {ok, Resp, [BeginBody, "\"rows\":[\r\n"]}.
371
372send_json_view_row(Resp, Kv, RowFront, DebugMode) ->
373    JsonObj = view_row_obj(Kv, DebugMode),
374    send_chunk(Resp, RowFront ++ ?JSON_ENCODE(JsonObj)),
375    {ok, ",\r\n"}.
376
377json_reduce_start_resp(Req, Etag, _Acc0) ->
378    {ok, Resp} = start_json_response(Req, 200, [{"Etag", Etag}]),
379    {ok, Resp, "{\"rows\":[\r\n"}.
380
381send_json_reduce_row(Resp, {Key, Value}, RowFront) ->
382    send_chunk(Resp, RowFront ++ ?JSON_ENCODE({[{<<"key">>, Key}, {<<"value">>, Value}]})),
383    {ok, ",\r\n"}.
384
385view_etag(Group, View) ->
386    view_etag(Group, View, nil).
387
388view_etag(Group, {reduce, _, View}, Extra) ->
389    view_etag(Group, View, Extra);
390view_etag(Group, #set_view{}, Extra) ->
391    #set_view_group{
392        sig = Sig,
393        index_header = Header
394    } = Group,
395    #set_view_index_header{
396        num_partitions = NumPartitions,
397        abitmask = Abitmask,
398        seqs = UpdateSeqs
399    } = Header,
400    couch_httpd:make_etag({Sig, UpdateSeqs, Extra, NumPartitions, Abitmask}).
401
402% the view row has an error
403view_row_obj({{Key, error}, Value}, _DebugMode) ->
404    {[{<<"key">>, Key}, {<<"error">>, Value}]};
405view_row_obj({{Key, DocId}, {_PartId, Value}}, false) ->
406    {[{<<"id">>, DocId}, {<<"key">>, Key}, {<<"value">>, Value}]};
407view_row_obj({{Key, DocId}, {PartId, Value}}, true) ->
408    {[{<<"id">>, DocId}, {<<"key">>, Key}, {<<"partition">>, PartId}, {<<"value">>, Value}]}.
409
410
411finish_view_fold(Req, TotalRows, Offset, FoldResult) ->
412    finish_view_fold(Req, TotalRows, Offset, FoldResult, []).
413
414finish_view_fold(Req, TotalRows, Offset, FoldResult, Fields) ->
415    case FoldResult of
416    {_, _, undefined, _} ->
417        % nothing found in the view or keys, nothing has been returned
418        % send empty view
419        Props = case is_number(Offset) of
420        true ->
421            [{total_rows, TotalRows}, {offset, Offset}, {rows, []}];
422        false ->
423            [{total_rows, TotalRows}, {rows, []}]
424        end,
425        send_json(Req, 200, {Props ++ Fields});
426    {_, _, Resp, _} ->
427        % end the view
428        send_chunk(Resp, "\r\n]}"),
429        end_json_response(Resp)
430    end.
431
432finish_reduce_fold(Req, Resp) ->
433    finish_reduce_fold(Req, Resp, []).
434
435finish_reduce_fold(Req, Resp, Fields) ->
436    case Resp of
437    undefined ->
438        send_json(Req, 200, {[
439            {rows, []}
440        ] ++ Fields});
441    Resp ->
442        send_chunk(Resp, "\r\n]}"),
443        end_json_response(Resp)
444    end.
445
446parse_bool_param(Val) ->
447    couch_httpd_view:parse_bool_param(Val).
448