1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_spatial_updater).
14
15-ifdef(makecheck).
16-compile(export_all).
17-endif.
18
19-export([start_update/3, process_doc/3, finish_update/1]).
20
21% for output (couch_http_spatial, couch_http_spatial_list)
22-export([geocouch_to_geojsongeom/1]).
23
24% for polygon search
25-export([extract_bbox/2, geojsongeom_to_geocouch/1]).
26
27-include("couch_db.hrl").
28-include("couch_spatial.hrl").
29-include_lib("vtree/include/vtree.hrl").
30
31
32start_update(Partial, State, NumChanges) ->
33    QueueOpts = [{max_size, 100000}, {max_items, 500}],
34    {ok, DocQueue} = couch_work_queue:new(QueueOpts),
35    {ok, WriteQueue} = couch_work_queue:new(QueueOpts),
36
37    #spatial_state{
38        update_seq = UpdateSeq,
39        db_name = DbName,
40        idx_name = IdxName
41    } = State,
42
43    InitState = State#spatial_state{
44        first_build = UpdateSeq == 0,
45        partial_resp_pid = Partial,
46        doc_acc = [],
47        doc_queue = DocQueue,
48        write_queue = WriteQueue
49    },
50
51    Self = self(),
52    SpatialFun = fun() ->
53        couch_task_status:add_task([
54            {type, indexer},
55            {database, DbName},
56            {design_document, IdxName},
57            {progress, 0},
58            {changes_done, 0},
59            {total_changes, NumChanges}
60        ]),
61        couch_task_status:set_update_frequency(500),
62        map_docs(Self, InitState)
63    end,
64    WriteFun = fun() -> write_results(Self, InitState) end,
65
66    spawn_link(SpatialFun),
67    spawn_link(WriteFun),
68
69    {ok, InitState}.
70
71
72process_doc(Doc, Seq, #spatial_state{doc_acc=Acc}=State) when
73        length(Acc) > 100 ->
74    couch_work_queue:queue(State#spatial_state.doc_queue, lists:reverse(Acc)),
75    process_doc(Doc, Seq, State#spatial_state{doc_acc=[]});
76process_doc(nil, Seq, #spatial_state{doc_acc=Acc}=State) ->
77    {ok, State#spatial_state{doc_acc=[{nil, Seq, nil} | Acc]}};
78process_doc(#doc{id=Id, deleted=true}, Seq,
79        #spatial_state{doc_acc=Acc}=State) ->
80    {ok, State#spatial_state{doc_acc=[{Id, Seq, deleted} | Acc]}};
81process_doc(#doc{id=Id}=Doc, Seq, #spatial_state{doc_acc=Acc}=State) ->
82    {ok, State#spatial_state{doc_acc=[{Id, Seq, Doc} | Acc]}}.
83
84
85finish_update(State) ->
86    #spatial_state{
87        doc_acc = Acc,
88        doc_queue = DocQueue
89    } = State,
90    if Acc /= [] ->
91        couch_work_queue:queue(DocQueue, Acc);
92        true -> ok
93    end,
94    couch_work_queue:close(DocQueue),
95    receive
96        {new_state, NewState} ->
97            {ok, NewState#spatial_state{
98                first_build = undefined,
99                partial_resp_pid = undefined,
100                doc_acc = undefined,
101                doc_queue = undefined,
102                write_queue = undefined,
103                query_server = nil
104            }}
105    end.
106
107
108map_docs(Parent, State0) ->
109    #spatial_state{
110        doc_queue = DocQueue,
111        write_queue = WriteQueue,
112        query_server = QueryServer0
113    }= State0,
114    case couch_work_queue:dequeue(DocQueue) of
115        closed ->
116            couch_query_servers:stop_doc_map(QueryServer0),
117            couch_work_queue:close(WriteQueue);
118        {ok, Dequeued} ->
119            % Run all the non deleted docs through the view engine and
120            % then pass the results on to the writer process.
121            State1 = case QueryServer0 of
122                nil -> start_query_server(State0);
123                _ -> State0
124            end,
125            QueryServer = State1#spatial_state.query_server,
126            DocFun = fun
127                ({nil, Seq, _}, {SeqAcc, Results}) ->
128                    {erlang:max(Seq, SeqAcc), Results};
129                ({Id, Seq, deleted}, {SeqAcc, Results}) ->
130                    {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
131                ({Id, Seq, Doc}, {SeqAcc, Results}) ->
132                    {ok, Res} = couch_query_servers:map_doc_raw(
133                        QueryServer, Doc),
134                    {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
135            end,
136            FoldFun = fun(Docs, Acc) ->
137                update_task(length(Docs)),
138                lists:foldl(DocFun, Acc, Docs)
139            end,
140            Results = lists:foldl(FoldFun, {0, []}, Dequeued),
141            couch_work_queue:queue(WriteQueue, Results),
142            map_docs(Parent, State1)
143    end.
144
145
146write_results(Parent, State) ->
147    #spatial_state{
148        write_queue = WriteQueue,
149        views = Views
150    } = State,
151    case couch_work_queue:dequeue(WriteQueue) of
152        closed ->
153            Parent ! {new_state, State};
154        {ok, Info} ->
155            EmptyKVs = [{View#spatial.id_num, []} || View <- Views],
156            {Seq, ViewKVs, DocIdKeys} = merge_results(Info, 0, EmptyKVs, []),
157            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys),
158            send_partial(NewState#spatial_state.partial_resp_pid, NewState),
159            write_results(Parent, NewState)
160    end.
161
162
163start_query_server(State) ->
164    #spatial_state{
165        language = Language,
166        lib = Lib,
167        views = Views
168    } = State,
169    Defs = [View#spatial.def || View <- Views],
170    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
171    State#spatial_state{query_server=QServer}.
172
173
174% This is a verbatim copy from couch_mrview_updater
175merge_results([], SeqAcc, ViewKVs, DocIdKeys) ->
176    {SeqAcc, ViewKVs, DocIdKeys};
177merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) ->
178    Fun = fun(RawResults, {VKV, DIK}) ->
179        merge_results(RawResults, VKV, DIK)
180    end,
181    {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results),
182    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1).
183
184
185% The processing of the results is different for each indexer
186merge_results({DocId, []}, ViewKVs, DocIdKeys) ->
187    {ViewKVs, [{DocId, []} | DocIdKeys]};
188merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) ->
189    JsonResults = couch_query_servers:raw_to_ejson(RawResults),
190    Results = [[process_result(Res) || Res <- FunRs] || FunRs <- JsonResults],
191    {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []),
192    {ViewKVs1, [ViewIdKeys | DocIdKeys]}.
193
194
195insert_results(DocId, [], [], ViewKVs, ViewIdKeys) ->
196    {lists:reverse(ViewKVs), {DocId, ViewIdKeys}};
197insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
198    CombineDupesFun = fun
199        ({Key, {Geom, Val}}, {[{Key, {dups, {Geom, Vals}}} | Rest], IdKeys}) ->
200            {[{Key, {dups, {Geom, [Val | Vals]}}} | Rest], IdKeys};
201        ({Key, {Geom, Val1}}, {[{Key, {Geom, Val2}} | Rest], IdKeys}) ->
202            {[{Key, {dups, {Geom, [Val1, Val2]}}} | Rest], IdKeys};
203        ({Key, _}=KV, {Rest, IdKeys}) ->
204            {[KV | Rest], [{Id, Key} | IdKeys]}
205    end,
206    InitAcc = {[], VIdKeys},
207    {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)),
208
209    FinalKVs = lists:map(fun
210        ({Key, {dups, {Geom, Vals}}}) ->
211            #kv_node{
212                key = Key,
213                docid = DocId,
214                geometry = Geom,
215                body = ?term_to_bin({dups, Vals})
216            };
217        ({Key, {Geom, Val}}) ->
218            #kv_node{
219                key = Key,
220                docid = DocId,
221                geometry = Geom,
222                body = ?term_to_bin(Val)
223            }
224        end, Duped) ++ VKVs,
225    insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0).
226
227
228write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
229    #spatial_state{
230        id_btree=IdBtree,
231        first_build=FirstBuild,
232        views = Views
233    } = State,
234
235    {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
236    ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
237
238    UpdateView = fun(#spatial{id_num = ViewId} = View, {ViewId, KVs}) ->
239        ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
240        Vtree = vtree_delete:delete(View#spatial.vtree, ToRem),
241        Vtree2 = vtree_insert:insert(Vtree, KVs),
242        NewUpdateSeq = case
243                Vtree2#vtree.root =/= (View#spatial.vtree)#vtree.root of
244            true -> UpdateSeq;
245            false -> View#spatial.update_seq
246        end,
247        View#spatial{vtree=Vtree2, update_seq=NewUpdateSeq}
248    end,
249
250    State#spatial_state{
251        views = lists:zipwith(UpdateView, Views, ViewKVs),
252        update_seq = UpdateSeq,
253        id_btree = IdBtree2
254    }.
255
256
257% This is a verbatim copy from couch_mrview_updater
258update_id_btree(Btree, DocIdKeys, true) ->
259    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
260    couch_btree:query_modify(Btree, [], ToAdd, []);
261update_id_btree(Btree, DocIdKeys, _) ->
262    ToFind = [Id || {Id, _} <- DocIdKeys],
263    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
264    ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
265    couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
266
267
268% Use this step to convert the data from tuples to KV-nodes where only the
269% `docid` and the `key` is set (that's enough for deleting them from the tree)
270collapse_rem_keys([], Acc) ->
271    Acc;
272collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) ->
273    NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) ->
274        Node = #kv_node{
275            docid = DocId,
276            key = Key
277        },
278        dict:append(ViewId, Node, Acc2)
279    end, Acc, ViewIdKeys),
280    collapse_rem_keys(Rest, NewAcc);
281collapse_rem_keys([{not_found, _} | Rest], Acc) ->
282    collapse_rem_keys(Rest, Acc).
283
284
285% This is a verbatim copy from couch_mrview_updater
286send_partial(Pid, State) when is_pid(Pid) ->
287    gen_server:cast(Pid, {new_state, State});
288send_partial(_, _) ->
289    ok.
290
291
292% This is a verbatim copy from couch_mrview_updater
293update_task(NumChanges) ->
294    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
295    Changes2 = Changes + NumChanges,
296    Progress = case Total of
297        0 ->
298            % updater restart after compaction finishes
299            0;
300        _ ->
301            (Changes2 * 100) div Total
302    end,
303    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
304
305
306% The multidimensional case with a geometry
307% XXX NOTE vmx 2012-11-29: Currently it is expected that the geometry
308%     is the first value of the emit.
309process_result([[{Geo}|Rest]|[Value]]) ->
310    Tuples = process_range(Rest),
311    {Bbox, Geom} = process_geometry(Geo),
312    {Bbox ++ Tuples, {Geom, Value}};
313% The multidimensional case without a geometry
314process_result([MultiDim|[Value]]) when is_list(MultiDim) ->
315    Tuples = process_range(MultiDim),
316    {Tuples, {nil, Value}};
317% There old case when only two dimensions were supported
318process_result([{Geo}|[Value]]) ->
319    {Bbox, Geom} = process_geometry(Geo),
320    {Bbox, {Geom, Value}}.
321
322
323% Transform the range from the query (which is JSON based) to a list of tuples
324% that can be used for actual querying
325process_range(Range) ->
326    lists:map(
327        fun([]) ->
328            throw({emit_key, <<"A range cannot be an empty array.">>});
329        ([_SingleElementList]) ->
330            throw({emit_key, <<"A range cannot be single element array.">>});
331        ([Min, Max]) when not (is_number(Min) andalso is_number(Max)) ->
332            throw({emit_key, <<"Ranges must be numbers.">>});
333        ([Min, Max]) when Min > Max ->
334            throw({emit_key, <<"The minimum of a range must be smaller than "
335                            "the maximum.">>});
336        ([Min, Max]) ->
337            [Min, Max];
338        (SingleValue) when is_tuple(SingleValue)->
339            throw({emit_key, <<"A geometry is only allowed as the first "
340                            "element in the array.">>});
341        (SingleValue) when not is_number(SingleValue)->
342            throw({emit_key, <<"The values of the key must be numbers or "
343                            "a GeoJSON geometry.">>});
344        % A single value means that the mininum and the maximum are the same
345        (SingleValue) ->
346             [SingleValue, SingleValue]
347    end, Range).
348
349
350% Returns an Erlang encoded geometry and the corresponding bounding box
351process_geometry(Geo) ->
352    Bbox = try
353        Type = binary_to_atom(proplists:get_value(<<"type">>, Geo), utf8),
354        case Type of
355        'GeometryCollection' ->
356            Geometries = proplists:get_value(<<"geometries">>, Geo),
357            lists:foldl(fun({Geometry}, CurBbox) ->
358                Type2 = binary_to_atom(
359                    proplists:get_value(<<"type">>, Geometry), utf8),
360                Coords = proplists:get_value(<<"coordinates">>, Geometry),
361                case proplists:get_value(<<"bbox">>, Geo) of
362                undefined ->
363                    extract_bbox(Type2, Coords, CurBbox);
364                Bbox2 ->
365                    Bbox2
366                end
367            end, nil, Geometries);
368        _ ->
369            Coords = proplists:get_value(<<"coordinates">>, Geo),
370            case proplists:get_value(<<"bbox">>, Geo) of
371            undefined ->
372                extract_bbox(Type, Coords);
373            Bbox2 ->
374                Bbox2
375            end
376        end
377    catch _:badarg ->
378        throw({emit_key, <<"The supplied geometry must be valid GeoJSON.">>})
379    end,
380    Geom = geojsongeom_to_geocouch(Geo),
381    {Bbox, Geom}.
382
383
384extract_bbox(Type, Coords) ->
385    extract_bbox(Type, Coords, nil).
386
387extract_bbox(Type, Coords, InitBbox) ->
388    case Type of
389    'Point' ->
390        bbox([Coords], InitBbox);
391    'LineString' ->
392        bbox(Coords, InitBbox);
393    'Polygon' ->
394        % holes don't matter for the bounding box
395        bbox(hd(Coords), InitBbox);
396    'MultiPoint' ->
397        bbox(Coords, InitBbox);
398    'MultiLineString' ->
399        lists:foldl(fun(Linestring, CurBbox) ->
400            bbox(Linestring, CurBbox)
401        end, InitBbox, Coords);
402    'MultiPolygon' ->
403        lists:foldl(fun(Polygon, CurBbox) ->
404            bbox(hd(Polygon), CurBbox)
405        end, InitBbox, Coords)
406    end.
407
408bbox([], Range) ->
409    Range;
410bbox([[X, Y]|Rest], nil) ->
411    bbox(Rest, [[X, X], [Y, Y]]);
412bbox([Coords|Rest], Range) ->
413    Range2 = lists:zipwith(
414        fun(Coord, [Min, Max]) ->
415            {erlang:min(Coord, Min), erlang:max(Coord, Max)}
416        end, Coords, Range),
417    bbox(Rest, Range2).
418
419
420% @doc Transforms a GeoJSON geometry (as Erlang terms), to an internal
421% structure
422geojsongeom_to_geocouch(Geom) ->
423    Type = proplists:get_value(<<"type">>, Geom),
424    Coords = case Type of
425    <<"GeometryCollection">> ->
426        Geometries = proplists:get_value(<<"geometries">>, Geom),
427        [geojsongeom_to_geocouch(G) || {G} <- Geometries];
428    _ ->
429        proplists:get_value(<<"coordinates">>, Geom)
430    end,
431    {binary_to_atom(Type, utf8), Coords}.
432
433% @doc Transforms internal structure to a GeoJSON geometry (as Erlang terms)
434geocouch_to_geojsongeom({Type, Coords}) ->
435    Coords2 = case Type of
436    'GeometryCollection' ->
437        Geoms = [geocouch_to_geojsongeom(C) || C <- Coords],
438        {<<"geometries">>, Geoms};
439    _ ->
440        {<<"coordinates">>, Coords}
441    end,
442    {[{<<"type">>, Type}, Coords2]}.
443