1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_spatial_merger).
14
15%-export([query_spatial/2]).
16-export([parse_http_params/4, make_funs/3, get_skip_and_limit/1,
17    http_index_folder_req_details/3, make_event_fun/2]).
18
19-include("couch_db.hrl").
20-include_lib("couch_index_merger/include/couch_index_merger.hrl").
21-include("couch_spatial.hrl").
22
23-define(LOCAL, <<"local">>).
24
25% callback!
26parse_http_params(Req, _DDoc, _IndexName, _Extra) ->
27    couch_httpd_spatial:parse_spatial_params(Req).
28
29% callback!
30make_funs(_DDoc, _IndexName, _IndexMergeParams) ->
31    {fun spatial_less_fun/2,
32    fun spatial_folder/6,
33    fun merge_spatial/1,
34    fun(NumFolders, Callback, UserAcc) ->
35        fun(Item) ->
36            couch_index_merger:collect_row_count(
37                NumFolders, 0, fun spatial_row_obj/1, Callback, UserAcc, Item)
38        end
39    end,
40    nil}.
41
42% callback!
43get_skip_and_limit(#spatial_query_args{skip=Skip, limit=Limit}) ->
44    {Skip, Limit}.
45
46% callback!
47make_event_fun(_SpatialArgs, Queue) ->
48    fun(Ev) ->
49        http_spatial_fold(Ev, Queue)
50    end.
51
52% callback!
53http_index_folder_req_details(#merged_index_spec{} = Spec, MergeParams, DDoc) ->
54    #merged_index_spec{
55        url = MergeUrl0,
56        ejson_spec = {EJson}
57    } = Spec,
58    #index_merge{
59        conn_timeout = Timeout,
60        http_params = SpatialArgs
61    } = MergeParams,
62    {ok, #httpdb{url = Url, lhttpc_options = Options} = Db} =
63        couch_index_merger:open_db(MergeUrl0, nil, Timeout),
64    MergeUrl = Url ++ spatial_qs(SpatialArgs),
65    Headers = [{"Content-Type", "application/json"} | Db#httpdb.headers],
66
67    EJson2 = case couch_index_merger:should_check_rev(MergeParams, DDoc) of
68    true ->
69        P = fun (Tuple) -> element(1, Tuple) =/= <<"ddoc_revision">> end,
70        [{<<"ddoc_revision">>, couch_index_merger:ddoc_rev_str(DDoc)} |
71            lists:filter(P, EJson)];
72    false ->
73        EJson
74    end,
75
76    Body = {EJson2},
77    put(from_url, Url),
78    {MergeUrl, post, Headers, ?JSON_ENCODE(Body), Options};
79
80http_index_folder_req_details(#simple_index_spec{} = Spec, MergeParams, _DDoc) ->
81    #simple_index_spec{
82        database = DbUrl,
83        ddoc_id = DDocId,
84        index_name = SpatialName
85    } = Spec,
86    #index_merge{
87        conn_timeout = Timeout,
88        http_params = SpatialArgs
89    } = MergeParams,
90    {ok, #httpdb{url = Url, lhttpc_options = Options}} =
91        couch_index_merger:open_db(DbUrl, nil, Timeout),
92    SpatialUrl = Url ++ ?b2l(DDocId) ++ "/_spatial/" ++ ?b2l(SpatialName) ++
93        spatial_qs(SpatialArgs),
94    put(from_url, DbUrl),
95    {SpatialUrl, get, [], [], Options}.
96
97spatial_row_obj({{Key, error}, Reason}) ->
98    <<"{\"key\":", (?JSON_ENCODE(Key))/binary,
99      ",\"error\":",
100      (?JSON_ENCODE(couch_util:to_binary(Reason)))/binary, "}">>;
101spatial_row_obj({{Bbox, DocId}, {{Type, Coords}, Value}}) ->
102    <<"{\"id\":", (?JSON_ENCODE(DocId))/binary,
103      ",\"bbox\":", (?JSON_ENCODE(tuple_to_list(Bbox)))/binary,
104      ",\"geometry\":",
105      (?JSON_ENCODE({[{type, Type}, {coordinates, Coords}]}))/binary,
106      ",\"value\":", (?JSON_ENCODE(Value))/binary, "}">>.
107
108spatial_less_fun(A, B) ->
109    A < B.
110
111% Counterpart to map_view_folder/6 in couch_view_merger
112spatial_folder(Db, SpatialSpec, MergeParams, _UserCtx, DDoc, Queue) ->
113    #simple_index_spec{
114        ddoc_database = DDocDbName, ddoc_id = DDocId, index_name = SpatialName
115    } = SpatialSpec,
116    #spatial_query_args{
117        bbox = Bbox,
118        bounds = Bounds,
119        stale = Stale
120    } = MergeParams#index_merge.http_params,
121    FoldlFun = make_spatial_fold_fun(Queue),
122    {DDocDb, Index} = get_spatial_index(Db, DDocDbName, DDocId,
123        SpatialName, Stale),
124
125    case not(couch_index_merger:should_check_rev(MergeParams, DDoc)) orelse
126            couch_index_merger:ddoc_unchanged(DDocDb, DDoc) of
127    true ->
128        % The spatial index doesn't output a total_rows property, hence
129        % we don't need a proper row_count (but we need it in the queue to
130        % make the index merging work correctly)
131        ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
132        couch_spatial:fold(Index, FoldlFun, nil, Bbox, Bounds);
133    false ->
134        ok = couch_view_merger_queue:queue(Queue, revision_mismatch)
135    end,
136    catch couch_db:close(DDocDb).
137
138% Counterpart to get_map_view/5 in couch_view_merger
139get_spatial_index(Db, DDocDbName, DDocId, SpatialName, Stale) ->
140    GroupId = couch_index_merger:get_group_id(DDocDbName, DDocId),
141    {ok, Index, _Group} = couch_spatial:get_spatial_index(Db, GroupId,
142        SpatialName, Stale),
143    case GroupId of
144        {DDocDb, DDocId} -> {DDocDb, Index};
145        DDocId -> {nil, Index}
146    end.
147
148% Counterpart to http_view_fold/3 in couch_view_merger
149http_spatial_fold(object_start, Queue) ->
150    ok = couch_view_merger_queue:queue(Queue, {row_count, 0}),
151    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
152
153% Counterpart to http_view_fold_rows_1/2 in couch_view_merger
154http_spatial_fold_rows_1({key, <<"rows">>}, Queue) ->
155    fun(array_start) -> fun(Ev) -> http_spatial_fold_rows_2(Ev, Queue) end end;
156http_spatial_fold_rows_1(_Ev, Queue) ->
157    fun(Ev) -> http_spatial_fold_rows_1(Ev, Queue) end.
158
159% Counterpart to http_view_fold_fold_rows_2/2 in couch_view_merger
160http_spatial_fold_rows_2(array_end, Queue) ->
161    fun(Ev) -> http_spatial_fold_errors_1(Ev, Queue) end;
162http_spatial_fold_rows_2(object_start, Queue) ->
163    fun(Ev) ->
164        json_stream_parse:collect_object(
165            Ev,
166            fun(Row) ->
167                http_spatial_fold_queue_row(Row, Queue),
168                fun(Ev2) -> http_spatial_fold_rows_2(Ev2, Queue) end
169            end)
170    end.
171
172% Counterpart to http_view_fold_errors_1/2 in couch_view_merger
173http_spatial_fold_errors_1({key, <<"errors">>}, Queue) ->
174    fun(array_start) -> fun(Ev) -> http_spatial_fold_errors_2(Ev, Queue) end end;
175http_spatial_fold_errors_1(_Ev, _Queue) ->
176    fun couch_index_merger:void_event/1.
177
178% Counterpart to http_view_fold_errors_2/2 in couch_view_merger
179http_spatial_fold_errors_2(array_end, _Queue) ->
180    fun couch_index_merger:void_event/1;
181http_spatial_fold_errors_2(object_start, Queue) ->
182    fun(Ev) ->
183        json_stream_parse:collect_object(
184            Ev,
185            fun(Error) ->
186                http_view_fold_queue_error(Error, Queue),
187                fun(Ev2) -> http_spatial_fold_errors_2(Ev2, Queue) end
188            end)
189    end.
190
191% Carbon copy of http_view_fold_queue_error/2 in couch_view_merger
192http_view_fold_queue_error({Props}, Queue) ->
193    From0 = couch_util:get_value(<<"from">>, Props, ?LOCAL),
194    From = case From0 of
195        ?LOCAL ->
196        get(from_url);
197    _ ->
198        From0
199    end,
200    Reason = couch_util:get_value(<<"reason">>, Props, null),
201    ok = couch_view_merger_queue:queue(Queue, {error, From, Reason}).
202
203% Counterpart to http_view_fold_queue_row/2 in couch_view_merger
204% Used for merges of remote DBs
205http_spatial_fold_queue_row({Props}, Queue) ->
206    Id = couch_util:get_value(<<"id">>, Props, nil),
207    Bbox = couch_util:get_value(<<"bbox">>, Props, null),
208    {Geom} = couch_util:get_value(<<"geometry">>, Props, null),
209    Val = couch_util:get_value(<<"value">>, Props),
210    Row = case couch_util:get_value(<<"error">>, Props, nil) of
211    nil ->
212        GeomType = couch_util:get_value(<<"type">>, Geom),
213        Coords = couch_util:get_value(<<"coordinates">>, Geom),
214        case couch_util:get_value(<<"doc">>, Props, nil) of
215        nil ->
216            {{list_to_tuple(Bbox), Id}, {{GeomType,Coords}, Val}};
217        % NOTE vmx 20110818: GeoCouch doesn't support include_docs atm,
218        %     but I'll just leave the code here
219        Doc ->
220            {{list_to_tuple(Bbox), Id}, {{GeomType,Coords}, Val}, {doc, Doc}}
221        end;
222    Error ->
223        % error in a map row
224        {{list_to_tuple(Bbox), error}, Error}
225    end,
226    ok = couch_view_merger_queue:queue(Queue, Row).
227
228
229
230% Counterpart to make_map_fold_fun/4 in couch_view_merger
231% Used for merges of local DBs
232make_spatial_fold_fun(Queue) ->
233    fun({{_Bbox, _DocId}, {_Geom, _Value}}=Row, Acc) ->
234        ok = couch_view_merger_queue:queue(Queue, Row),
235        {ok, Acc}
236    end.
237
238% Counterpart to merge_map_views/6 in couch_view_merger
239merge_spatial(#merge_params{limit = 0} = Params) ->
240    couch_index_merger:merge_indexes_no_limit(Params);
241
242merge_spatial(#merge_params{row_acc = []} = Params) ->
243    case couch_index_merger:merge_indexes_no_acc(
244            Params, fun merge_spatial_min_row/2) of
245    {params, Params2} ->
246        merge_spatial(Params2);
247    Else ->
248        Else
249    end;
250
251% ??? vmx 20110805: Does this case ever happen in the spatial index?
252merge_spatial(Params) ->
253    Params2 = couch_index_merger:handle_skip(Params),
254    merge_spatial(Params2).
255
256% Counterpart to merge_map_min_row/2 in couch_view_merger
257merge_spatial_min_row(Params, MinRow) ->
258    ok = couch_view_merger_queue:flush(Params#merge_params.queue),
259    couch_index_merger:handle_skip(Params#merge_params{row_acc=[MinRow]}).
260
261% Counterpart to view_qs/1 in couch_view_merger
262spatial_qs(SpatialArgs) ->
263    DefSpatialArgs = #spatial_query_args{},
264    #spatial_query_args{
265        bbox = Bbox,
266        stale = Stale,
267        count = Count,
268        bounds = Bounds
269    } = SpatialArgs,
270    QsList = case Bbox =:= DefSpatialArgs#spatial_query_args.bbox of
271    true ->
272        [];
273    false ->
274        ["bbox=" ++ ?b2l(iolist_to_binary(
275            lists:nth(2, hd(io_lib:format("~p", [Bbox])))))]
276    end ++
277    case Stale =:= DefSpatialArgs#spatial_query_args.stale of
278    true ->
279        [];
280    false ->
281        ["stale=" ++ atom_to_list(Stale)]
282    end ++
283    case Count =:= DefSpatialArgs#spatial_query_args.count of
284    true ->
285        [];
286    false ->
287        ["count=" ++ atom_to_list(Count)]
288    end ++
289    case Bounds =:= DefSpatialArgs#spatial_query_args.bounds of
290    true ->
291        [];
292    false ->
293        ["bounds=" ++ ?b2l(iolist_to_binary(
294            lists:nth(2, hd(io_lib:format("~p", [Bounds])))))]
295    end,
296    case QsList of
297    [] ->
298        [];
299    _ ->
300        "?" ++ string:join(QsList, "&")
301    end.
302