1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_spatial_updater).
14
15-ifdef(makecheck).
16-compile(export_all).
17-endif.
18
19-export([start_update/3, process_doc/3, finish_update/1]).
20
21% for polygon search
22-export([extract_bbox/2]).
23
24-include("couch_db.hrl").
25-include("couch_spatial.hrl").
26-include_lib("vtree/include/vtree.hrl").
27
28
29start_update(Partial, State, NumChanges) ->
30    QueueOpts = [{max_size, 100000}, {max_items, 500}],
31    {ok, DocQueue} = couch_work_queue:new(QueueOpts),
32    {ok, WriteQueue} = couch_work_queue:new(QueueOpts),
33
34    #spatial_state{
35        update_seq = UpdateSeq,
36        db_name = DbName,
37        idx_name = IdxName
38    } = State,
39
40    InitState = State#spatial_state{
41        first_build = UpdateSeq == 0,
42        partial_resp_pid = Partial,
43        doc_acc = [],
44        doc_queue = DocQueue,
45        write_queue = WriteQueue
46    },
47
48    Self = self(),
49    SpatialFun = fun() ->
50        couch_task_status:add_task([
51            {type, indexer},
52            {database, DbName},
53            {design_document, IdxName},
54            {progress, 0},
55            {changes_done, 0},
56            {total_changes, NumChanges}
57        ]),
58        couch_task_status:set_update_frequency(500),
59        map_docs(Self, InitState)
60    end,
61    WriteFun = fun() -> write_results(Self, InitState) end,
62
63    spawn_link(SpatialFun),
64    spawn_link(WriteFun),
65
66    {ok, InitState}.
67
68
69process_doc(Doc, Seq, #spatial_state{doc_acc=Acc}=State) when
70        length(Acc) > 100 ->
71    couch_work_queue:queue(State#spatial_state.doc_queue, lists:reverse(Acc)),
72    process_doc(Doc, Seq, State#spatial_state{doc_acc=[]});
73process_doc(nil, Seq, #spatial_state{doc_acc=Acc}=State) ->
74    {ok, State#spatial_state{doc_acc=[{nil, Seq, nil} | Acc]}};
75process_doc(#doc{id=Id, deleted=true}, Seq,
76        #spatial_state{doc_acc=Acc}=State) ->
77    {ok, State#spatial_state{doc_acc=[{Id, Seq, deleted} | Acc]}};
78process_doc(#doc{id=Id}=Doc, Seq, #spatial_state{doc_acc=Acc}=State) ->
79    {ok, State#spatial_state{doc_acc=[{Id, Seq, Doc} | Acc]}}.
80
81
82finish_update(State) ->
83    #spatial_state{
84        doc_acc = Acc,
85        doc_queue = DocQueue
86    } = State,
87    if Acc /= [] ->
88        couch_work_queue:queue(DocQueue, Acc);
89        true -> ok
90    end,
91    couch_work_queue:close(DocQueue),
92    receive
93        {new_state, NewState} ->
94            {ok, NewState#spatial_state{
95                first_build = undefined,
96                partial_resp_pid = undefined,
97                doc_acc = undefined,
98                doc_queue = undefined,
99                write_queue = undefined,
100                query_server = nil
101            }}
102    end.
103
104
105map_docs(Parent, State0) ->
106    #spatial_state{
107        doc_queue = DocQueue,
108        write_queue = WriteQueue,
109        query_server = QueryServer0
110    }= State0,
111    case couch_work_queue:dequeue(DocQueue) of
112        closed ->
113            couch_query_servers:stop_doc_map(QueryServer0),
114            couch_work_queue:close(WriteQueue);
115        {ok, Dequeued} ->
116            % Run all the non deleted docs through the view engine and
117            % then pass the results on to the writer process.
118            State1 = case QueryServer0 of
119                nil -> start_query_server(State0);
120                _ -> State0
121            end,
122            QueryServer = State1#spatial_state.query_server,
123            DocFun = fun
124                ({nil, Seq, _}, {SeqAcc, Results}) ->
125                    {erlang:max(Seq, SeqAcc), Results};
126                ({Id, Seq, deleted}, {SeqAcc, Results}) ->
127                    {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
128                ({Id, Seq, Doc}, {SeqAcc, Results}) ->
129                    {ok, Res} = couch_query_servers:map_doc_raw(
130                        QueryServer, Doc),
131                    {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
132            end,
133            FoldFun = fun(Docs, Acc) ->
134                update_task(length(Docs)),
135                lists:foldl(DocFun, Acc, Docs)
136            end,
137            Results = lists:foldl(FoldFun, {0, []}, Dequeued),
138            couch_work_queue:queue(WriteQueue, Results),
139            map_docs(Parent, State1)
140    end.
141
142
143write_results(Parent, State) ->
144    #spatial_state{
145        write_queue = WriteQueue,
146        views = Views
147    } = State,
148    case couch_work_queue:dequeue(WriteQueue) of
149        closed ->
150            Parent ! {new_state, State};
151        {ok, Info} ->
152            EmptyKVs = [{View#spatial.id_num, []} || View <- Views],
153            {Seq, ViewKVs, DocIdKeys} = merge_results(Info, 0, EmptyKVs, []),
154            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys),
155            send_partial(NewState#spatial_state.partial_resp_pid, NewState),
156            write_results(Parent, NewState)
157    end.
158
159
160start_query_server(State) ->
161    #spatial_state{
162        language = Language,
163        lib = Lib,
164        views = Views
165    } = State,
166    Defs = [View#spatial.def || View <- Views],
167    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
168    State#spatial_state{query_server=QServer}.
169
170
171% This is a verbatim copy from couch_mrview_updater
172merge_results([], SeqAcc, ViewKVs, DocIdKeys) ->
173    {SeqAcc, ViewKVs, DocIdKeys};
174merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) ->
175    Fun = fun(RawResults, {VKV, DIK}) ->
176        merge_results(RawResults, VKV, DIK)
177    end,
178    {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results),
179    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1).
180
181
182% The processing of the results is different for each indexer
183merge_results({DocId, []}, ViewKVs, DocIdKeys) ->
184    {ViewKVs, [{DocId, []} | DocIdKeys]};
185merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) ->
186    JsonResults = couch_query_servers:raw_to_ejson(RawResults),
187    Results = [[process_result(Res) || Res <- FunRs] || FunRs <- JsonResults],
188    {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []),
189    {ViewKVs1, [ViewIdKeys | DocIdKeys]}.
190
191
192insert_results(DocId, [], [], ViewKVs, ViewIdKeys) ->
193    {lists:reverse(ViewKVs), {DocId, ViewIdKeys}};
194insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
195    CombineDupesFun = fun
196        ({Key, {Geom, Val}}, {[{Key, {dups, {Geom, Vals}}} | Rest], IdKeys}) ->
197            {[{Key, {dups, {Geom, [Val | Vals]}}} | Rest], IdKeys};
198        ({Key, {Geom, Val1}}, {[{Key, {Geom, Val2}} | Rest], IdKeys}) ->
199            {[{Key, {dups, {Geom, [Val1, Val2]}}} | Rest], IdKeys};
200        ({Key, _}=KV, {Rest, IdKeys}) ->
201            {[KV | Rest], [{Id, Key} | IdKeys]}
202    end,
203    InitAcc = {[], VIdKeys},
204    {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)),
205
206    FinalKVs = lists:map(fun
207        ({Key, {dups, {Geom, Vals}}}) ->
208            #kv_node{
209                key = Key,
210                docid = DocId,
211                geometry = Geom,
212                body = ?term_to_bin({dups, Vals})
213            };
214        ({Key, {Geom, Val}}) ->
215            #kv_node{
216                key = Key,
217                docid = DocId,
218                geometry = Geom,
219                body = ?term_to_bin(Val)
220            }
221        end, Duped) ++ VKVs,
222    insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0).
223
224
225write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
226    #spatial_state{
227        id_btree=IdBtree,
228        first_build=FirstBuild,
229        views = Views
230    } = State,
231
232    {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
233    ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
234
235    UpdateView = fun(#spatial{id_num = ViewId} = View, {ViewId, KVs}) ->
236        ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
237        Vtree = vtree_delete:delete(View#spatial.vtree, ToRem),
238        Vtree2 = vtree_insert:insert(Vtree, KVs),
239        NewUpdateSeq = case
240                Vtree2#vtree.root =/= (View#spatial.vtree)#vtree.root of
241            true -> UpdateSeq;
242            false -> View#spatial.update_seq
243        end,
244        View#spatial{vtree=Vtree2, update_seq=NewUpdateSeq}
245    end,
246
247    State#spatial_state{
248        views = lists:zipwith(UpdateView, Views, ViewKVs),
249        update_seq = UpdateSeq,
250        id_btree = IdBtree2
251    }.
252
253
254% This is a verbatim copy from couch_mrview_updater
255update_id_btree(Btree, DocIdKeys, true) ->
256    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
257    couch_btree:query_modify(Btree, [], ToAdd, []);
258update_id_btree(Btree, DocIdKeys, _) ->
259    ToFind = [Id || {Id, _} <- DocIdKeys],
260    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
261    ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
262    couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
263
264
265% Use this step to convert the data from tuples to KV-nodes where only the
266% `docid` and the `key` is set (that's enough for deleting them from the tree)
267collapse_rem_keys([], Acc) ->
268    Acc;
269collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) ->
270    NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) ->
271        Node = #kv_node{
272            docid = DocId,
273            key = Key
274        },
275        dict:append(ViewId, Node, Acc2)
276    end, Acc, ViewIdKeys),
277    collapse_rem_keys(Rest, NewAcc);
278collapse_rem_keys([{not_found, _} | Rest], Acc) ->
279    collapse_rem_keys(Rest, Acc).
280
281
282% This is a verbatim copy from couch_mrview_updater
283send_partial(Pid, State) when is_pid(Pid) ->
284    gen_server:cast(Pid, {new_state, State});
285send_partial(_, _) ->
286    ok.
287
288
289% This is a verbatim copy from couch_mrview_updater
290update_task(NumChanges) ->
291    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
292    Changes2 = Changes + NumChanges,
293    Progress = case Total of
294        0 ->
295            % updater restart after compaction finishes
296            0;
297        _ ->
298            (Changes2 * 100) div Total
299    end,
300    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
301
302
303% The multidimensional case with a geometry
304% XXX NOTE vmx 2012-11-29: Currently it is expected that the geometry
305%     is the first value of the emit.
306process_result([[{Geo}|Rest]|[Value]]) ->
307    Tuples = process_range(Rest),
308    {Bbox, Geom} = process_geometry(Geo),
309    {Bbox ++ Tuples, {Geom, Value}};
310% The multidimensional case without a geometry
311process_result([MultiDim|[Value]]) when is_list(MultiDim) ->
312    Tuples = process_range(MultiDim),
313    {Tuples, {<<>>, Value}};
314% There old case when only two dimensions were supported
315process_result([{Geo}|[Value]]) ->
316    {Bbox, Geom} = process_geometry(Geo),
317    {Bbox, {Geom, Value}}.
318
319
320% Transform the range from the query (which is JSON based) to a list of tuples
321% that can be used for actual querying
322process_range(Range) ->
323    lists:map(
324        fun([]) ->
325            throw({emit_key, <<"A range cannot be an empty array.">>});
326        ([_SingleElementList]) ->
327            throw({emit_key, <<"A range cannot be single element array.">>});
328        ([Min, Max]) when not (is_number(Min) andalso is_number(Max)) ->
329            throw({emit_key, <<"Ranges must be numbers.">>});
330        ([Min, Max]) when Min > Max ->
331            throw({emit_key, <<"The minimum of a range must be smaller than "
332                            "the maximum.">>});
333        ([Min, Max]) ->
334            {Min, Max};
335        (SingleValue) when is_tuple(SingleValue)->
336            throw({emit_key, <<"A geometry is only allowed as the first "
337                            "element in the array.">>});
338        (SingleValue) when not is_number(SingleValue)->
339            throw({emit_key, <<"The values of the key must be numbers or "
340                            "a GeoJSON geometry.">>});
341        % A single value means that the mininum and the maximum are the same
342        (SingleValue) ->
343             {SingleValue, SingleValue}
344    end, Range).
345
346
347% Returns an Erlang encoded geometry and the corresponding bounding box
348process_geometry(Geo) ->
349    Bbox = try
350        Type = binary_to_atom(proplists:get_value(<<"type">>, Geo), utf8),
351        case Type of
352        'GeometryCollection' ->
353            Geometries = proplists:get_value(<<"geometries">>, Geo),
354            lists:foldl(fun({Geometry}, CurBbox) ->
355                Type2 = binary_to_atom(
356                    proplists:get_value(<<"type">>, Geometry), utf8),
357                Coords = proplists:get_value(<<"coordinates">>, Geometry),
358                case proplists:get_value(<<"bbox">>, Geo) of
359                undefined ->
360                    extract_bbox(Type2, Coords, CurBbox);
361                [W, S, E, N] ->
362                   [{W, E}, {S, N}]
363                end
364            end, nil, Geometries);
365        _ ->
366            Coords = proplists:get_value(<<"coordinates">>, Geo),
367            case proplists:get_value(<<"bbox">>, Geo) of
368            undefined ->
369                extract_bbox(Type, Coords);
370            [W, S, E, N] ->
371               [{W, E}, {S, N}]
372            end
373        end
374    catch _:badarg ->
375        throw({emit_key, <<"The supplied geometry must be valid GeoJSON.">>})
376    end,
377    {ok, Geom} = wkb_writer:geojson_to_wkb({Geo}),
378    {Bbox, Geom}.
379
380
381extract_bbox(Type, Coords) ->
382    extract_bbox(Type, Coords, nil).
383
384extract_bbox(Type, Coords, InitBbox) ->
385    case Type of
386    'Point' ->
387        bbox([Coords], InitBbox);
388    'LineString' ->
389        bbox(Coords, InitBbox);
390    'Polygon' ->
391        % holes don't matter for the bounding box
392        bbox(hd(Coords), InitBbox);
393    'MultiPoint' ->
394        bbox(Coords, InitBbox);
395    'MultiLineString' ->
396        lists:foldl(fun(Linestring, CurBbox) ->
397            bbox(Linestring, CurBbox)
398        end, InitBbox, Coords);
399    'MultiPolygon' ->
400        lists:foldl(fun(Polygon, CurBbox) ->
401            bbox(hd(Polygon), CurBbox)
402        end, InitBbox, Coords);
403    InvalidType ->
404        throw({emit_key,
405            <<"The supplied geometry type `",
406              (atom_to_binary(InvalidType, latin1))/binary,
407              "` is not a valid GeoJSON. "
408              "Valid geometry types are (case sensitive): "
409              "Point, LineString, Polygon, MultiPoint, MultiLineString, "
410              "MultiLineString">>})
411    end.
412
413bbox([], Range) ->
414    Range;
415bbox([[X, Y]|Rest], nil) ->
416    bbox(Rest, [{X, X}, {Y, Y}]);
417bbox([Coords|Rest], Range) ->
418    Range2 = lists:zipwith(
419        fun(Coord, {Min, Max}) ->
420            {erlang:min(Coord, Min), erlang:max(Coord, Max)}
421        end, Coords, Range),
422    bbox(Rest, Range2).
423