1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_spatial_merger).
14
15%-export([query_spatial/2]).
16-export([parse_http_params/4, make_funs/3, get_skip_and_limit/1,
17    http_index_folder_req_details/3, make_event_fun/2]).
18
19-include("couch_db.hrl").
20-include("couch_index_merger.hrl").
21-include("couch_spatial.hrl").
22
23-define(LOCAL, <<"local">>).
24
25% callback!
26parse_http_params(Req, _DDoc, _IndexName, _Extra) ->
27    couch_httpd_spatial:parse_spatial_params(Req).
28
29% callback!
30make_funs(_DDoc, _IndexName, _IndexMergeParams) ->
31    {fun spatial_less_fun/2,
32    fun spatial_folder/6,
33    fun merge_spatial/1,
34    fun(NumFolders, Callback, UserAcc) ->
35        fun(Item) ->
36            couch_index_merger:collect_row_count(
37                NumFolders, 0, fun spatial_row_obj/1, Callback, UserAcc, Item)
38        end
39    end,
40    nil}.
41
42% callback!
43get_skip_and_limit(#spatial_query_args{skip=Skip, limit=Limit}) ->
44    {Skip, Limit}.
45
46% callback!
47make_event_fun(_SpatialArgs, Queue) ->
48    fun(Ev) ->
49        http_spatial_fold(Ev, Queue)
50    end.
51
52% callback!
53http_index_folder_req_details(#merged_index_spec{} = Spec, MergeParams, DDoc) ->
54    #merged_index_spec{
55        url = MergeUrl0,
56        ejson_spec = {EJson}
57    } = Spec,
58    #index_merge{
59        conn_timeout = Timeout,
60        http_params = SpatialArgs
61    } = MergeParams,
62    {ok, #httpdb{url = Url, lhttpc_options = Options} = Db} =
63        couch_index_merger:open_db(MergeUrl0, nil, Timeout),
64    MergeUrl = Url ++ spatial_qs(SpatialArgs),
65    Headers = [{"Content-Type", "application/json"} | Db#httpdb.headers],
66
67    EJson2 = case couch_index_merger:should_check_rev(MergeParams, DDoc) of
68    true ->
69        P = fun (Tuple) -> element(1, Tuple) =/= <<"ddoc_revision">> end,
70        [{<<"ddoc_revision">>, couch_index_merger:ddoc_rev_str(DDoc)} |
71            lists:filter(P, EJson)];
72    false ->
73        EJson
74    end,
75
76    Body = {EJson2},
77    put(from_url, Url),
78    {MergeUrl, post, Headers, ?JSON_ENCODE(Body), Options};
79
80http_index_folder_req_details(#simple_index_spec{} = Spec, MergeParams, _DDoc) ->
81    #simple_index_spec{
82        database = DbUrl,
83        ddoc_id = DDocId,
84        index_name = SpatialName
85    } = Spec,
86    #index_merge{
87        conn_timeout = Timeout,
88        http_params = SpatialArgs
89    } = MergeParams,
90    {ok, #httpdb{url = Url, lhttpc_options = Options}} =
91        couch_index_merger:open_db(DbUrl, nil, Timeout),
92    SpatialUrl = Url ++ ?b2l(DDocId) ++ "/_spatial/" ++ ?b2l(SpatialName) ++
93        spatial_qs(SpatialArgs),
94    put(from_url, DbUrl),
95    {SpatialUrl, get, [], [], Options}.
96
97spatial_row_obj({{Key, error}, Value}) ->
98    {[{key, Key}, {error, Value}]};
99
100spatial_row_obj({{Bbox, DocId}, {{Type, Coords}, Value}}) ->
101    {[{id, DocId},
102      {bbox, tuple_to_list(Bbox)},
103      {geometry, {[{type, Type}, {coordinates, Coords}]}},
104      {value, Value}]}.
105
106spatial_less_fun(A, B) ->
107    A < B.
108
109% Counterpart to map_view_folder/6 in couch_view_merger
110spatial_folder(Db, SpatialSpec, MergeParams, _UserCtx, DDoc, Queue) ->
111    #simple_index_spec{
112        ddoc_database = DDocDbName, ddoc_id = DDocId, index_name = SpatialName
113    } = SpatialSpec,
114    #spatial_query_args{
115        bbox = Bbox,
116        bounds = Bounds,
117        stale = Stale,
118        geometry = QueryGeom
119    } = MergeParams#index_merge.http_params,
120    FoldlFun = make_spatial_fold_fun(Queue, QueryGeom),
121    {DDocDb, Index} = get_spatial_index(Db, DDocDbName, DDocId,
122        SpatialName, Stale),
123
124    case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
125            couch_index_merger:ddoc_unchanged(DDocDb, DDoc) of
126    true ->
127        % The spatial index doesn't output a total_rows property, hence
128        % we don't need a proper row_count (but we need it in the queue to
129        % make the index merging work correctly)
130        ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
131        couch_spatial:fold(Index, FoldlFun, nil, Bbox, Bounds);
132    false ->
133        ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
134    end,
135    catch couch_db:close(DDocDb).
136
137% Counterpart to get_map_view/5 in couch_view_merger
138get_spatial_index(Db, DDocDbName, DDocId, SpatialName, Stale) ->
139    GroupId = couch_index_merger:get_group_id(DDocDbName, DDocId),
140    {ok, Index, _Group} = couch_spatial:get_spatial_index(Db, GroupId,
141        SpatialName, Stale),
142    case GroupId of
143        {DDocDb, DDocId} -> {DDocDb, Index};
144        DDocId -> {nil, Index}
145    end.
146
147% Counterpart to http_view_fold/3 in couch_view_merger
148http_spatial_fold(object_start, Queue) ->
149    ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
150    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
151
152% Counterpart to http_view_fold_rows_1/2 in couch_view_merger
153http_spatial_fold_rows_1({key, <<"rows">>}, Queue) ->
154    fun(array_start) -> fun(Ev) -> http_spatial_fold_rows_2(Ev, Queue) end end;
155http_spatial_fold_rows_1(_Ev, Queue) ->
156    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
157
158% Counterpart to http_view_fold_fold_rows_2/2 in couch_view_merger
159http_spatial_fold_rows_2(array_end, Queue) ->
160    fun(Ev) -> http_spatial_fold_errors_1(Ev, Queue) end;
161http_spatial_fold_rows_2(object_start, Queue) ->
162    fun(Ev) ->
163        json_stream_parse:collect_object(
164            Ev,
165            fun(Row) ->
166                http_spatial_fold_queue_row(Row, Queue),
167                fun(Ev2) -> http_spatial_fold_rows_2(Ev2, Queue) end
168            end)
169    end.
170
171% Counterpart to http_view_fold_errors_1/2 in couch_view_merger
172http_spatial_fold_errors_1({key, <<"errors">>}, Queue) ->
173    fun(array_start) -> fun(Ev) -> http_spatial_fold_errors_2(Ev, Queue) end end;
174http_spatial_fold_errors_1(_Ev, _Queue) ->
175    fun couch_index_merger:void_event/1.
176
177% Counterpart to http_view_fold_errors_2/2 in couch_view_merger
178http_spatial_fold_errors_2(array_end, _Queue) ->
179    fun couch_index_merger:void_event/1;
180http_spatial_fold_errors_2(object_start, Queue) ->
181    fun(Ev) ->
182        json_stream_parse:collect_object(
183            Ev,
184            fun(Error) ->
185                http_view_fold_queue_error(Error, Queue),
186                fun(Ev2) -> http_spatial_fold_errors_2(Ev2, Queue) end
187            end)
188    end.
189
190% Carbon copy of http_view_fold_queue_error/2 in couch_view_merger
191http_view_fold_queue_error({Props}, Queue) ->
192    From0 = couch_util:get_value(<<"from">>, Props, ?LOCAL),
193    From = case From0 of
194        ?LOCAL ->
195        get(from_url);
196    _ ->
197        From0
198    end,
199    Reason = couch_util:get_value(<<"reason">>, Props, null),
200    ok = couch_view_merger_queue:queue(Queue, {error, From, Reason}).
201
202% Counterpart to http_view_fold_queue_row/2 in couch_view_merger
203% Used for merges of remote DBs
204http_spatial_fold_queue_row({Props}, Queue) ->
205    Id = couch_util:get_value(<<"id">>, Props, nil),
206    Bbox = couch_util:get_value(<<"bbox">>, Props, null),
207    {Geom} = couch_util:get_value(<<"geometry">>, Props, null),
208    Val = couch_util:get_value(<<"value">>, Props),
209    Row = case couch_util:get_value(<<"error">>, Props, nil) of
210    nil ->
211        GeomType = couch_util:get_value(<<"type">>, Geom),
212        Coords = couch_util:get_value(<<"coordinates">>, Geom),
213        case couch_util:get_value(<<"doc">>, Props, nil) of
214        nil ->
215            {{list_to_tuple(Bbox), Id}, {{GeomType,Coords}, Val}};
216        % NOTE vmx 20110818: GeoCouch doesn't support include_docs atm,
217        %     but I'll just leave the code here
218        Doc ->
219            {{list_to_tuple(Bbox), Id}, {{GeomType,Coords}, Val}, {doc, Doc}}
220        end;
221    Error ->
222        % error in a map row
223        {{list_to_tuple(Bbox), error}, Error}
224    end,
225    ok = couch_view_merger_queue:queue(Queue, Row).
226
227
228
229% Counterpart to make_map_fold_fun/4 in couch_view_merger
230% Used for merges of local DBs
231make_spatial_fold_fun(Queue, nil) ->
232    fun({{_Bbox, _DocId}, {_Geom, _Value}}=Row, Acc) ->
233        ok = couch_view_merger_queue:queue(Queue, Row),
234        {ok, Acc}
235    end;
236make_spatial_fold_fun(Queue, QueryGeom) ->
237    QueryGeom2 = erlgeom:to_geom(QueryGeom),
238    fun({{_DocId, _Bbox}, {Geom, _Value}}=Row, Acc) ->
239        case couch_httpd_spatial:condition_disjoint(QueryGeom2, Geom) of
240            true -> ok = couch_view_merger_queue:queue(Queue, Row);
241            false -> ok
242        end,
243        {ok, Acc}
244    end.
245
246% Counterpart to merge_map_views/6 in couch_view_merger
247merge_spatial(#merge_params{limit = 0} = Params) ->
248    couch_index_merger:merge_indexes_no_limit(Params);
249
250merge_spatial(#merge_params{row_acc = []} = Params) ->
251    case couch_index_merger:merge_indexes_no_acc(
252            Params, fun merge_spatial_min_row/2) of
253    {params, Params2} ->
254        merge_spatial(Params2);
255    Else ->
256        Else
257    end;
258
259% ??? vmx 20110805: Does this case ever happen in the spatial index?
260merge_spatial(Params) ->
261    Params2 = couch_index_merger:handle_skip(Params),
262    merge_spatial(Params2).
263
264% Counterpart to merge_map_min_row/2 in couch_view_merger
265merge_spatial_min_row(Params, MinRow) ->
266    ok = couch_view_merger_queue:flush(Params#merge_params.queue),
267    couch_index_merger:handle_skip(Params#merge_params{row_acc=[MinRow]}).
268
269% Counterpart to view_qs/1 in couch_view_merger
270spatial_qs(SpatialArgs) ->
271    DefSpatialArgs = #spatial_query_args{},
272    #spatial_query_args{
273        bbox = Bbox,
274        stale = Stale,
275        count = Count,
276        bounds = Bounds
277    } = SpatialArgs,
278    QsList = case Bbox =:= DefSpatialArgs#spatial_query_args.bbox of
279    true ->
280        [];
281    false ->
282        ["bbox=" ++ ?b2l(iolist_to_binary(
283            lists:nth(2, hd(io_lib:format("~p", [Bbox])))))]
284    end ++
285    case Stale =:= DefSpatialArgs#spatial_query_args.stale of
286    true ->
287        [];
288    false ->
289        ["stale=" ++ atom_to_list(Stale)]
290    end ++
291    case Count =:= DefSpatialArgs#spatial_query_args.count of
292    true ->
293        [];
294    false ->
295        ["count=" ++ atom_to_list(Count)]
296    end ++
297    case Bounds =:= DefSpatialArgs#spatial_query_args.bounds of
298    true ->
299        [];
300    false ->
301        ["bounds=" ++ ?b2l(iolist_to_binary(
302            lists:nth(2, hd(io_lib:format("~p", [Bounds])))))]
303    end,
304    case QsList of
305    [] ->
306        [];
307    _ ->
308        "?" ++ string:join(QsList, "&")
309    end.
310