1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13% This is where all request parameter processing happens. The
14% `capi_spatial`/`capi_indexer` just pass on the request (hence the query
15% arguments) into this module.
16
17-module(spatial_merger).
18
19-export([parse_http_params/4, make_funs/3, get_skip_and_limit/1,
20    make_event_fun/2, view_qs/2, process_extra_params/2,
21    map_view_merge_callback/2]).
22-export([simple_set_view_query/3]).
23
24-include("couch_db.hrl").
25-include_lib("couch_spatial.hrl").
26-include_lib("vtree/include/vtree.hrl").
27-include_lib("couch_index_merger/include/couch_index_merger.hrl").
28-include_lib("couch_index_merger/include/couch_view_merger.hrl").
29-include_lib("couch_set_view/include/couch_set_view.hrl").
30
31-define(LOCAL, <<"local">>).
32
33
34% callback!
35parse_http_params(Req, _DDoc, _IndexName, _Extra) ->
36    spatial_http:parse_qs(Req).
37
38% callback!
39make_funs(_DDoc, _IndexName, IndexMergeParams) ->
40    #index_merge{
41        http_params = #spatial_query_args{
42            debug = DebugMode
43        },
44        make_row_fun = MakeRowFun0
45    } = IndexMergeParams,
46    {fun spatial_less_fun/2,
47    fun spatial_view_folder/6,
48    fun merge_spatial/1,
49    fun(NumFolders, Callback, UserAcc) ->
50        fun(Item) ->
51            MakeRowFun = case is_function(MakeRowFun0) of
52            true ->
53                MakeRowFun0;
54            false ->
55                fun(RowDetails) ->
56                   spatial_row_obj(RowDetails, DebugMode)
57                end
58            end,
59            couch_index_merger:collect_row_count(
60                NumFolders, 0, MakeRowFun, Callback, UserAcc, Item)
61        end
62    end,
63    nil}.
64
65% callback!
66get_skip_and_limit(#spatial_query_args{skip=Skip, limit=Limit}) ->
67    {Skip, Limit}.
68
69% callback!
70make_event_fun(_SpatialArgs, Queue) ->
71    fun(Ev) ->
72        http_spatial_fold(Ev, Queue)
73    end.
74
75% callback!
76process_extra_params(_, EJson) ->
77    EJson.
78
79% callback!
80map_view_merge_callback(start, Acc) ->
81    {ok, Acc};
82map_view_merge_callback({start, _}, Acc) ->
83    {ok, Acc};
84map_view_merge_callback(stop, Acc) ->
85    {ok, Acc};
86map_view_merge_callback({row, Row}, Macc) ->
87    #merge_acc{
88        fold_fun = Fun,
89        acc = Acc
90    } = Macc,
91    % The difference between spatial and mapredue is the arity of the fold fun
92    case Fun(Row, Acc) of
93    {ok, Acc2} ->
94        {ok, Macc#merge_acc{acc = Acc2}};
95    {stop, Acc2} ->
96        {stop, Macc#merge_acc{acc = Acc2}}
97    end;
98map_view_merge_callback({debug_info, _From, _Info}, Acc) ->
99    {ok, Acc}.
100
101
102% Optimized path, row assembled by couch_http_view_streamer
103% NOTE vmx 2013-07-19: Not sure if that case is ever used by the spatial
104%     indexer
105spatial_row_obj({_KeyDocId, {row_json, RowJson}}, _Debug) ->
106    RowJson;
107spatial_row_obj({{Key, error}, Reason}, _Debug) ->
108    <<"{\"key\":", (?JSON_ENCODE(Key))/binary,
109      ",\"error\":",
110      (?JSON_ENCODE(couch_util:to_binary(Reason)))/binary, "}">>;
111% NOTE vmx 2013-07-10: Those parameters are the ones the function in
112%     spatial_view:fold_fun() gets. It might be pre-processes by the
113%     `FoldFun` (in e.g. simple_spatial_view_query/4).
114% Row from local node, query with ?debug=true without a geometry
115spatial_row_obj({{Mbb, DocId}, {PartId, Value, nil}}, true) ->
116    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
117      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
118      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
119      ",\"node\":\"", (?LOCAL)/binary, "\"",
120      ",\"value\":", Value/binary, "}">>;
121
122% Row from local node, query with ?debug=true and a geometry
123spatial_row_obj({{Mbb, DocId}, {PartId, Value, Geom}}, true) ->
124    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
125      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
126      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
127      ",\"node\":\"", (?LOCAL)/binary, "\"",
128      ",\"value\":", Value/binary,
129      ",\"geometry\":", (?JSON_ENCODE(Geom))/binary, "}">>;
130
131% Row from remote node, using Erlang based stream JSON parser, query with
132% ?debug=true and without a geometry
133spatial_row_obj({{Mbb, DocId}, {{PartId, Node, Value}, nil}}, true) ->
134    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
135      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
136      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
137      ",\"node\":", (?JSON_ENCODE(Node))/binary,
138      ",\"value\":", (?JSON_ENCODE(Value))/binary, "}">>;
139
140% Row from remote node, using Erlang based stream JSON parser, query with
141% ?debug=true and a geometry
142spatial_row_obj({{Mbb, DocId}, {{PartId, Node, Value}, Geom}}, true) ->
143    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
144      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
145      ",\"partition\":", (?l2b(integer_to_list(PartId)))/binary,
146      ",\"node\":", (?JSON_ENCODE(Node))/binary,
147      ",\"value\":", (?JSON_ENCODE(Value))/binary,
148      ",\"geometry\":", (?JSON_ENCODE(Geom))/binary, "}">>;
149
150% Row from local node, query with ?debug=false without a geometry
151spatial_row_obj({{Mbb, DocId}, {_PartId, Value, nil}}, false) ->
152    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
153      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
154      ",\"value\":", Value/binary, "}">>;
155
156% Row from local node, query with ?debug=false and a geometry
157spatial_row_obj({{Mbb, DocId}, {_PartId, Value, Geom}}, false) ->
158    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
159      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
160      ",\"value\":", Value/binary,
161      ",\"geometry\":", (?JSON_ENCODE(Geom))/binary, "}">>;
162
163% Row from remote node, query with ?debug=false without a geometry
164spatial_row_obj({{Mbb, DocId}, {Value, nil}}, false) ->
165    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
166      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
167      ",\"value\":", (?JSON_ENCODE(Value))/binary, "}">>;
168
169% Row from remote node, query with ?debug=false and a geometry
170spatial_row_obj({{Mbb, DocId}, {Value, Geom}}, false) ->
171    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
172      ",\"key\":", (?JSON_ENCODE(Mbb))/binary,
173      ",\"value\":", (?JSON_ENCODE(Value))/binary,
174      ",\"geometry\":", (?JSON_ENCODE(Geom))/binary, "}">>.
175
176spatial_less_fun(A, B) ->
177    A < B.
178
179
180% This wrapper is needed to be compatible with the mapreduce views, which
181% also have the case for _all_docs.
182spatial_view_folder(_Db, ViewSpec, MergeParams, _UserCtx, DDoc, Queue) ->
183    spatial_view_folder(ViewSpec, MergeParams, DDoc, Queue).
184spatial_view_folder(ViewSpec, MergeParams, DDoc, Queue) ->
185    #set_view_spec{
186        name = SetName,
187        ddoc_id = DDocId,
188        partitions = WantedPartitions0
189    } = ViewSpec,
190    #index_merge{
191        http_params = ViewArgs
192    } = MergeParams,
193    #spatial_query_args{
194        stale = Stale,
195        debug = Debug,
196        type = IndexType
197    } = ViewArgs,
198    DDocDbName = ?master_dbname(SetName),
199
200    PrepareResult = case (ViewSpec#set_view_spec.view =/= nil) andalso
201        (ViewSpec#set_view_spec.group =/= nil) of
202    true ->
203        ViewGroupReq = nil,
204        {ViewSpec#set_view_spec.view, ViewSpec#set_view_spec.group};
205    false ->
206        WantedPartitions = case IndexType of
207        main ->
208            WantedPartitions0;
209        replica ->
210            []
211        end,
212        ViewGroupReq = #set_view_group_req{
213            stale = Stale,
214            update_stats = true,
215            wanted_partitions = WantedPartitions,
216            debug = Debug,
217            type = IndexType
218        },
219        couch_view_merger:prepare_set_view(ViewSpec, ViewGroupReq, DDoc, Queue,
220                                           fun spatial_view:get_spatial_view/4)
221    end,
222
223    case PrepareResult of
224    error ->
225        %%  handled by prepare_set_view
226        ok;
227    {View, Group} ->
228        couch_view_merger:queue_debug_info(Debug, Group, ViewGroupReq, Queue),
229        try
230            % No include_docs for now
231            FoldFun = make_spatial_fold_fun(Queue),
232
233            case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
234                couch_index_merger:ddoc_unchanged(DDocDbName, DDoc) of
235            true ->
236                RowCount = couch_set_view:get_row_count(Group, View),
237                ok = couch_view_merger_queue:queue(Queue,
238                    {row_count, RowCount}),
239                %ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
240                {ok, _, _} = couch_set_view:fold(Group, View, FoldFun, [], ViewArgs);
241            false ->
242                ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
243            end
244        catch
245        ddoc_db_not_found ->
246            ok = couch_view_merger_queue:queue(
247                Queue, {error, ?LOCAL,
248                    couch_index_merger:ddoc_not_found_msg(DDocDbName, DDocId)});
249        _Tag:Error ->
250            Stack = erlang:get_stacktrace(),
251            ?LOG_ERROR("Caught unexpected error "
252                       "while serving view query ~s/~s: ~p~n~s",
253                       [?LOG_USERDATA(SetName), ?LOG_USERDATA(DDocId), Error, ?LOG_USERDATA(Stack)]),
254            couch_view_merger_queue:queue(Queue,
255                {error, ?LOCAL, couch_util:to_binary(Error)})
256        after
257            couch_set_view:release_group(Group),
258            ok = couch_view_merger_queue:done(Queue)
259        end
260    end.
261
262
263% Counterpart to http_view_fold/3 in couch_view_merger
264http_spatial_fold(object_start, Queue) ->
265    ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
266    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
267
268% Counterpart to http_view_fold_rows_1/2 in couch_view_merger
269http_spatial_fold_rows_1({key, <<"rows">>}, Queue) ->
270    fun(array_start) -> fun(Ev) -> http_spatial_fold_rows_2(Ev, Queue) end end;
271http_spatial_fold_rows_1(_Ev, Queue) ->
272    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
273
274% Counterpart to http_view_fold_fold_rows_2/2 in couch_view_merger
275http_spatial_fold_rows_2(array_end, Queue) ->
276    fun(Ev) -> http_spatial_fold_errors_1(Ev, Queue) end;
277http_spatial_fold_rows_2(object_start, Queue) ->
278    fun(Ev) ->
279        json_stream_parse:collect_object(
280            Ev,
281            fun(Row) ->
282                http_spatial_fold_queue_row(Row, Queue),
283                fun(Ev2) -> http_spatial_fold_rows_2(Ev2, Queue) end
284            end)
285    end.
286
287% Counterpart to http_view_fold_errors_1/2 in couch_view_merger
288http_spatial_fold_errors_1({key, <<"errors">>}, Queue) ->
289    fun(array_start) -> fun(Ev) -> http_spatial_fold_errors_2(Ev, Queue) end end;
290http_spatial_fold_errors_1(_Ev, _Queue) ->
291    fun couch_index_merger:void_event/1.
292
293% Counterpart to http_view_fold_errors_2/2 in couch_view_merger
294http_spatial_fold_errors_2(array_end, _Queue) ->
295    fun couch_index_merger:void_event/1;
296http_spatial_fold_errors_2(object_start, Queue) ->
297    fun(Ev) ->
298        json_stream_parse:collect_object(
299            Ev,
300            fun(Error) ->
301                http_view_fold_queue_error(Error, Queue),
302                fun(Ev2) -> http_spatial_fold_errors_2(Ev2, Queue) end
303            end)
304    end.
305
306% Carbon copy of http_view_fold_queue_error/2 in couch_view_merger
307http_view_fold_queue_error({Props}, Queue) ->
308    From0 = couch_util:get_value(<<"from">>, Props, ?LOCAL),
309    From = case From0 of
310        ?LOCAL ->
311        get(from_url);
312    _ ->
313        From0
314    end,
315    Reason = couch_util:get_value(<<"reason">>, Props, null),
316    ok = couch_view_merger_queue:queue(Queue, {error, From, Reason}).
317
318% Counterpart to http_view_fold_queue_row/2 in couch_view_merger
319% Used for merges of remote DBs
320http_spatial_fold_queue_row({Props}, Queue) ->
321    Id = couch_util:get_value(<<"id">>, Props, nil),
322    Mbb = couch_util:get_value(<<"key">>, Props, null),
323    Val = couch_util:get_value(<<"value">>, Props),
324    Geom = couch_util:get_value(<<"geometry">>, Props, nil),
325    Value = case couch_util:get_value(<<"partition">>, Props, nil) of
326    nil ->
327        Val;
328    PartId ->
329        % we're in debug mode, add node info
330        {PartId, get(from_url), Val}
331    end,
332    Row = case couch_util:get_value(<<"error">>, Props, nil) of
333    nil ->
334        case couch_util:get_value(<<"doc">>, Props, nil) of
335        nil ->
336            {{Mbb, Id}, {Value, Geom}};
337        % NOTE vmx 20110818: GeoCouch doesn't support include_docs atm,
338        %     but I'll just leave the code here
339        Doc ->
340            {{Mbb, Id}, Value, Doc}
341        end;
342    Error ->
343        % error in a map row
344        {{Mbb, error}, Error}
345    end,
346    ok = couch_view_merger_queue:queue(Queue, Row).
347
348
349% Counterpart to merge_map_views/6 in couch_view_merger
350merge_spatial(#merge_params{limit = 0} = Params) ->
351    couch_index_merger:merge_indexes_no_limit(Params);
352
353merge_spatial(#merge_params{row_acc = []} = Params) ->
354    case couch_index_merger:merge_indexes_no_acc(
355            Params, fun merge_spatial_min_row/2) of
356    {params, Params2} ->
357        merge_spatial(Params2);
358    Else ->
359        Else
360    end;
361
362% ??? vmx 20110805: Does this case ever happen in the spatial index?
363merge_spatial(Params) ->
364    Params2 = couch_index_merger:handle_skip(Params),
365    merge_spatial(Params2).
366
367% Counterpart to merge_map_min_row/2 in couch_view_merger
368merge_spatial_min_row(Params, MinRow) ->
369    ok = couch_view_merger_queue:flush(Params#merge_params.queue),
370    couch_index_merger:handle_skip(Params#merge_params{row_acc=[MinRow]}).
371
372
373% Counterpart to make_map_fold_fun/4 in couch_view_merger
374% Used for merges of local DBs
375make_spatial_fold_fun(Queue) ->
376    fun({{_Mbb, _DocId}, {_PartId, _Value, _Geom}}=Row, Acc) ->
377        ok = couch_view_merger_queue:queue(Queue, Row),
378        {ok, Acc}
379    end.
380
381% Counterpart to view_qs/2 in couch_view_merger
382view_qs(SpatialArgs, MergeParams) ->
383    DefSpatialArgs = #spatial_query_args{},
384    #spatial_query_args{
385        range = Range,
386        stale = Stale,
387        limit = Limit,
388        debug = Debug,
389        skip = Skip
390    } = SpatialArgs,
391    #index_merge{on_error = OnError} = MergeParams,
392
393    QsList =
394    case Range =:= DefSpatialArgs#spatial_query_args.range of
395    true ->
396        [];
397    false ->
398        {Start, End} = lists:unzip(Range),
399        ["start_range=" ++ range_to_json(Start)] ++
400        ["end_range=" ++ range_to_json(End)]
401    end ++
402    case Stale =:= DefSpatialArgs#spatial_query_args.stale of
403    true ->
404        [];
405    false ->
406        ["stale=" ++ atom_to_list(Stale)]
407    end ++
408    case Limit =:= DefSpatialArgs#spatial_query_args.limit of
409    true ->
410        [];
411    false ->
412        ["limit=" ++ integer_to_list(Limit + Skip)]
413    end ++
414    case OnError =:= ?ON_ERROR_DEFAULT of
415    true ->
416        [];
417    false ->
418        ["on_error=" ++ atom_to_list(OnError)]
419    end ++
420    case Debug =:= DefSpatialArgs#spatial_query_args.debug of
421    true ->
422        [];
423    false ->
424        ["debug=" ++ atom_to_list(Debug)]
425    end,
426
427    case QsList of
428    [] ->
429        [];
430    _ ->
431        "?" ++ string:join(QsList, "&")
432    end.
433
434
435% Convert the erlang range into a JSON string
436-spec range_to_json([number() | nil]) -> string().
437range_to_json(Range) ->
438    Strings = lists:map(fun(nil) ->
439        "null";
440    (Number) ->
441        io_lib:format("~w", [Number])
442    end, Range),
443    "[" ++ string:join(Strings, ",") ++ "]".
444
445
446% Query with a single view to merge, trigger a simpler code path
447% (no queue, no child processes, etc).
448simple_set_view_query(Params, DDoc, Req) ->
449    #index_merge{
450        callback = Callback,
451        user_acc = UserAcc,
452        indexes = [SetViewSpec]
453    } = Params,
454    #set_view_spec{
455        name = SetName,
456        partitions = Partitions0,
457        ddoc_id = DDocId,
458        view_name = ViewName,
459        category = Category
460    } = SetViewSpec,
461
462    QueryArgs = spatial_http:parse_qs(Req),
463    Debug = QueryArgs#spatial_query_args.debug,
464    % XXX vmx 2014-10-31: support the _type parameter properly
465    IndexType = list_to_existing_atom(
466        couch_httpd:qs_value(Req, "_type", "main")),
467    Partitions = case IndexType of
468    main ->
469        Partitions0;
470    replica ->
471        []
472    end,
473    GroupReq = #set_view_group_req{
474        stale = QueryArgs#spatial_query_args.stale,
475        update_stats = true,
476        wanted_partitions = Partitions,
477        debug = Debug,
478        type = IndexType,
479        category = Category
480    },
481
482    case couch_view_merger:get_set_view(
483        fun spatial_view:get_spatial_view/4, SetName, DDoc, ViewName, GroupReq) of
484    {ok, View, Group, MissingPartitions} ->
485        ok;
486    Error ->
487        MissingPartitions = Group = View = nil,
488        ErrorMsg = io_lib:format("Error opening view `~s`, from set `~s`, "
489            "design document `~s`: ~p", [ViewName, SetName, DDocId, Error]),
490        throw({not_found, iolist_to_binary(ErrorMsg)})
491    end,
492
493    case MissingPartitions of
494    [] ->
495        ok;
496    _ ->
497        couch_set_view:release_group(Group),
498        ?LOG_INFO("Set view `~s`, group `~s`, missing partitions: ~w",
499                  [?LOG_USERDATA(SetName), ?LOG_USERDATA(DDocId), MissingPartitions]),
500        throw({error, set_view_outdated})
501    end,
502
503    QueryArgs2 = QueryArgs#spatial_query_args{
504        view_name = ViewName
505     },
506
507    case couch_view_merger:debug_info(Debug, Group, GroupReq) of
508    nil ->
509        Params2 = Params#index_merge{user_ctx = Req#httpd.user_ctx};
510    DebugInfo ->
511        {ok, UserAcc2} = Callback(DebugInfo, UserAcc),
512        Params2 = Params#index_merge{
513            user_ctx = Req#httpd.user_ctx,
514            user_acc = UserAcc2
515        }
516    end,
517
518    try
519        simple_spatial_view_query(Params2, Group, View, QueryArgs2)
520    after
521        couch_set_view:release_group(Group)
522    end.
523
524
525simple_spatial_view_query(Params, Group, View, ViewArgs) ->
526    #index_merge{
527        callback = Callback,
528        user_acc = UserAcc
529    } = Params,
530    #spatial_query_args{
531        limit = Limit,
532        skip = Skip,
533        debug = DebugMode
534    } = ViewArgs,
535
536    FoldFun = fun(_Kv, {0, _, _} = Acc) ->
537            {stop, Acc};
538        (_Kv, {AccLim, AccSkip, UAcc}) when AccSkip > 0 ->
539            {ok, {AccLim, AccSkip - 1, UAcc}};
540        ({{_Key, _DocId}, {_PartId, _Value, _Geom}} = Kv, {AccLim, 0, UAcc}) ->
541            Row = spatial_row_obj(Kv, DebugMode),
542            {ok, UAcc2} = Callback({row, Row}, UAcc),
543            {ok, {AccLim - 1, 0, UAcc2}}
544    end,
545
546    Root = ((View#set_view.indexer)#spatial_view.vtree)#vtree.root,
547    case Root of
548    nil ->
549        ok;
550    % Use the original MBB for comparison as the key is not necessarily
551    % set. The original MBB is good enough for this check as it will have
552    % the same dimesionality.
553    #kp_node{mbb_orig = MbbOrig} ->
554        Range = ViewArgs#spatial_query_args.range,
555        case Range =:= [] orelse length(Range) =:= length(MbbOrig)  of
556        true ->
557            ok;
558        false ->
559            throw({query_parse_error, list_to_binary(io_lib:format(
560                  "The query range must have the same dimensionality as "
561                  "the index. Your range was `~10000p`, but the index has a "
562                  "dimensionality of `~p`.", [Range, length(MbbOrig)]))})
563        end
564    end,
565
566    RowCount = couch_set_view:get_row_count(Group, View),
567    {ok, UserAcc2} = Callback({start, RowCount}, UserAcc),
568
569    {ok, _, {_, _, UserAcc3}} = couch_set_view:fold(
570        Group, View, FoldFun, {Limit, Skip, UserAcc2}, ViewArgs),
571    Callback(stop, UserAcc3).
572