1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_spatial_merger).
14
15%-export([query_spatial/2]).
16-export([parse_http_params/4, make_funs/3, get_skip_and_limit/1,
17    http_index_folder_req_details/3, make_event_fun/2]).
18
19-include("couch_db.hrl").
20-include("couch_index_merger.hrl").
21-include("couch_spatial.hrl").
22
23-define(LOCAL, <<"local">>).
24
25% callback!
26parse_http_params(Req, _DDoc, _IndexName, _Extra) ->
27    couch_httpd_spatial:parse_spatial_params(Req).
28
29% callback!
30make_funs(_DDoc, _IndexName, _IndexMergeParams) ->
31    {fun spatial_less_fun/2,
32    fun spatial_folder/6,
33    fun merge_spatial/1,
34    fun(NumFolders, Callback, UserAcc) ->
35        fun(Item) ->
36            couch_index_merger:collect_row_count(
37                NumFolders, 0, fun spatial_row_obj/1, Callback, UserAcc, Item)
38        end
39    end,
40    nil}.
41
42% callback!
43get_skip_and_limit(#spatial_query_args{skip=Skip, limit=Limit}) ->
44    {Skip, Limit}.
45
46% callback!
47make_event_fun(_SpatialArgs, Queue) ->
48    fun(Ev) ->
49        http_spatial_fold(Ev, Queue)
50    end.
51
52% callback!
53http_index_folder_req_details(#merged_index_spec{} = Spec, MergeParams, DDoc) ->
54    #merged_index_spec{
55        url = MergeUrl0,
56        ejson_spec = {EJson}
57    } = Spec,
58    #index_merge{
59        conn_timeout = Timeout,
60        http_params = SpatialArgs
61    } = MergeParams,
62    {ok, #httpdb{url = Url, lhttpc_options = Options} = Db} =
63        couch_index_merger:open_db(MergeUrl0, nil, Timeout),
64    MergeUrl = Url ++ spatial_qs(SpatialArgs),
65    Headers = [{"Content-Type", "application/json"} | Db#httpdb.headers],
66
67    EJson2 = case couch_index_merger:should_check_rev(MergeParams, DDoc) of
68    true ->
69        P = fun (Tuple) -> element(1, Tuple) =/= <<"ddoc_revision">> end,
70        [{<<"ddoc_revision">>, couch_index_merger:ddoc_rev_str(DDoc)} |
71            lists:filter(P, EJson)];
72    false ->
73        EJson
74    end,
75
76    Body = {EJson2},
77    put(from_url, Url),
78    {MergeUrl, post, Headers, ?JSON_ENCODE(Body), Options};
79
80http_index_folder_req_details(#simple_index_spec{} = Spec, MergeParams, _DDoc) ->
81    #simple_index_spec{
82        database = DbUrl,
83        ddoc_id = DDocId,
84        index_name = SpatialName
85    } = Spec,
86    #index_merge{
87        conn_timeout = Timeout,
88        http_params = SpatialArgs
89    } = MergeParams,
90    {ok, #httpdb{url = Url, lhttpc_options = Options}} =
91        couch_index_merger:open_db(DbUrl, nil, Timeout),
92    SpatialUrl = Url ++ ?b2l(DDocId) ++ "/_spatial/" ++ ?b2l(SpatialName) ++
93        spatial_qs(SpatialArgs),
94    put(from_url, DbUrl),
95    {SpatialUrl, get, [], [], Options}.
96
97spatial_row_obj({{Key, error}, Value}) ->
98    {[{key, Key}, {error, Value}]};
99
100spatial_row_obj({{Bbox, DocId}, {{Type, Coords}, Value}}) ->
101    {[{id, DocId},
102      {bbox, tuple_to_list(Bbox)},
103      {geometry, {[{type, Type}, {coordinates, Coords}]}},
104      {value, Value}]}.
105
106spatial_less_fun(A, B) ->
107    A < B.
108
109% Counterpart to map_view_folder/6 in couch_view_merger
110spatial_folder(Db, SpatialSpec, MergeParams, _UserCtx, DDoc, Queue) ->
111    #simple_index_spec{
112        ddoc_database = DDocDbName, ddoc_id = DDocId, index_name = SpatialName
113    } = SpatialSpec,
114    #spatial_query_args{
115        bbox = Bbox,
116        bounds = Bounds,
117        stale = Stale
118    } = MergeParams#index_merge.http_params,
119    FoldlFun = make_spatial_fold_fun(Queue),
120    {DDocDb, Index} = get_spatial_index(Db, DDocDbName, DDocId,
121        SpatialName, Stale),
122
123    case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
124            couch_index_merger:ddoc_unchanged(DDocDb, DDoc) of
125    true ->
126        % The spatial index doesn't output a total_rows property, hence
127        % we don't need a proper row_count (but we need it in the queue to
128        % make the index merging work correctly)
129        ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
130        couch_spatial:fold(Index, FoldlFun, nil, Bbox, Bounds);
131    false ->
132        ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
133    end,
134    catch couch_db:close(DDocDb).
135
136% Counterpart to get_map_view/5 in couch_view_merger
137get_spatial_index(Db, DDocDbName, DDocId, SpatialName, Stale) ->
138    GroupId = couch_index_merger:get_group_id(DDocDbName, DDocId),
139    {ok, Index, _Group} = couch_spatial:get_spatial_index(Db, GroupId,
140        SpatialName, Stale),
141    case GroupId of
142        {DDocDb, DDocId} -> {DDocDb, Index};
143        DDocId -> {nil, Index}
144    end.
145
146% Counterpart to http_view_fold/3 in couch_view_merger
147http_spatial_fold(object_start, Queue) ->
148    ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
149    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
150
151% Counterpart to http_view_fold_rows_1/2 in couch_view_merger
152http_spatial_fold_rows_1({key, <<"rows">>}, Queue) ->
153    fun(array_start) -> fun(Ev) -> http_spatial_fold_rows_2(Ev, Queue) end end;
154http_spatial_fold_rows_1(_Ev, Queue) ->
155    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
156
157% Counterpart to http_view_fold_fold_rows_2/2 in couch_view_merger
158http_spatial_fold_rows_2(array_end, Queue) ->
159    fun(Ev) -> http_spatial_fold_errors_1(Ev, Queue) end;
160http_spatial_fold_rows_2(object_start, Queue) ->
161    fun(Ev) ->
162        json_stream_parse:collect_object(
163            Ev,
164            fun(Row) ->
165                http_spatial_fold_queue_row(Row, Queue),
166                fun(Ev2) -> http_spatial_fold_rows_2(Ev2, Queue) end
167            end)
168    end.
169
170% Counterpart to http_view_fold_errors_1/2 in couch_view_merger
171http_spatial_fold_errors_1({key, <<"errors">>}, Queue) ->
172    fun(array_start) -> fun(Ev) -> http_spatial_fold_errors_2(Ev, Queue) end end;
173http_spatial_fold_errors_1(_Ev, _Queue) ->
174    fun couch_index_merger:void_event/1.
175
176% Counterpart to http_view_fold_errors_2/2 in couch_view_merger
177http_spatial_fold_errors_2(array_end, _Queue) ->
178    fun couch_index_merger:void_event/1;
179http_spatial_fold_errors_2(object_start, Queue) ->
180    fun(Ev) ->
181        json_stream_parse:collect_object(
182            Ev,
183            fun(Error) ->
184                http_view_fold_queue_error(Error, Queue),
185                fun(Ev2) -> http_spatial_fold_errors_2(Ev2, Queue) end
186            end)
187    end.
188
189% Carbon copy of http_view_fold_queue_error/2 in couch_view_merger
190http_view_fold_queue_error({Props}, Queue) ->
191    From0 = couch_util:get_value(<<"from">>, Props, ?LOCAL),
192    From = case From0 of
193        ?LOCAL ->
194        get(from_url);
195    _ ->
196        From0
197    end,
198    Reason = couch_util:get_value(<<"reason">>, Props, null),
199    ok = couch_view_merger_queue:queue(Queue, {error, From, Reason}).
200
201% Counterpart to http_view_fold_queue_row/2 in couch_view_merger
202% Used for merges of remote DBs
203http_spatial_fold_queue_row({Props}, Queue) ->
204    Id = couch_util:get_value(<<"id">>, Props, nil),
205    Bbox = couch_util:get_value(<<"bbox">>, Props, null),
206    {Geom} = couch_util:get_value(<<"geometry">>, Props, null),
207    Val = couch_util:get_value(<<"value">>, Props),
208    Row = case couch_util:get_value(<<"error">>, Props, nil) of
209    nil ->
210        GeomType = couch_util:get_value(<<"type">>, Geom),
211        Coords = couch_util:get_value(<<"coordinates">>, Geom),
212        case couch_util:get_value(<<"doc">>, Props, nil) of
213        nil ->
214            {{list_to_tuple(Bbox), Id}, {{GeomType,Coords}, Val}};
215        % NOTE vmx 20110818: GeoCouch doesn't support include_docs atm,
216        %     but I'll just leave the code here
217        Doc ->
218            {{list_to_tuple(Bbox), Id}, {{GeomType,Coords}, Val}, {doc, Doc}}
219        end;
220    Error ->
221        % error in a map row
222        {{list_to_tuple(Bbox), error}, Error}
223    end,
224    ok = couch_view_merger_queue:queue(Queue, Row).
225
226
227
228% Counterpart to make_map_fold_fun/4 in couch_view_merger
229% Used for merges of local DBs
230make_spatial_fold_fun(Queue) ->
231    fun({{_Bbox, _DocId}, {_Geom, _Value}}=Row, Acc) ->
232        ok = couch_view_merger_queue:queue(Queue, Row),
233        {ok, Acc}
234    end.
235
236% Counterpart to merge_map_views/6 in couch_view_merger
237merge_spatial(#merge_params{limit = 0} = Params) ->
238    couch_index_merger:merge_indexes_no_limit(Params);
239
240merge_spatial(#merge_params{row_acc = []} = Params) ->
241    case couch_index_merger:merge_indexes_no_acc(
242            Params, fun merge_spatial_min_row/2) of
243    {params, Params2} ->
244        merge_spatial(Params2);
245    Else ->
246        Else
247    end;
248
249% ??? vmx 20110805: Does this case ever happen in the spatial index?
250merge_spatial(Params) ->
251    Params2 = couch_index_merger:handle_skip(Params),
252    merge_spatial(Params2).
253
254% Counterpart to merge_map_min_row/2 in couch_view_merger
255merge_spatial_min_row(Params, MinRow) ->
256    ok = couch_view_merger_queue:flush(Params#merge_params.queue),
257    couch_index_merger:handle_skip(Params#merge_params{row_acc=[MinRow]}).
258
259% Counterpart to view_qs/1 in couch_view_merger
260spatial_qs(SpatialArgs) ->
261    DefSpatialArgs = #spatial_query_args{},
262    #spatial_query_args{
263        bbox = Bbox,
264        stale = Stale,
265        count = Count,
266        bounds = Bounds
267    } = SpatialArgs,
268    QsList = case Bbox =:= DefSpatialArgs#spatial_query_args.bbox of
269    true ->
270        [];
271    false ->
272        ["bbox=" ++ ?b2l(iolist_to_binary(
273            lists:nth(2, hd(io_lib:format("~p", [Bbox])))))]
274    end ++
275    case Stale =:= DefSpatialArgs#spatial_query_args.stale of
276    true ->
277        [];
278    false ->
279        ["stale=" ++ atom_to_list(Stale)]
280    end ++
281    case Count =:= DefSpatialArgs#spatial_query_args.count of
282    true ->
283        [];
284    false ->
285        ["count=" ++ atom_to_list(Count)]
286    end ++
287    case Bounds =:= DefSpatialArgs#spatial_query_args.bounds of
288    true ->
289        [];
290    false ->
291        ["bounds=" ++ ?b2l(iolist_to_binary(
292            lists:nth(2, hd(io_lib:format("~p", [Bounds])))))]
293    end,
294    case QsList of
295    [] ->
296        [];
297    _ ->
298        "?" ++ string:join(QsList, "&")
299    end.
300