1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_spatial_updater).
14
15-ifdef(makecheck).
16-compile(export_all).
17-endif.
18
19
20-export([update/2]).
21
22% for benchmark script
23-export([geojson_get_bbox/1]).
24
25% for output (couch_http_spatial, couch_http_spatial_list)
26-export([geocouch_to_geojsongeom/1]).
27
28% for polygon search
29-export([extract_bbox/2, geojsongeom_to_geocouch/1]).
30
31-include("couch_db.hrl").
32-include("couch_spatial.hrl").
33
34update(Owner, Group) ->
35    #spatial_group{
36        db = #db{name=DbName} = Db,
37        name = GroupName,
38        current_seq = Seq,
39        indexes = Indexes
40        %purge_seq = PurgeSeq
41    } = Group,
42    % XXX vmx: what are purges? when do they happen?
43    %DbPurgeSeq = couch_db:get_purge_seq(Db),
44    %Group2 =
45    %if DbPurgeSeq == PurgeSeq ->
46    %    Group;
47    %DbPurgeSeq == PurgeSeq + 1 ->
48    %    couch_task_status:update(<<"Removing purged entries from view index.">>),
49    %    purge_index(Group);
50    %true ->
51    %    couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
52    %    % NOTE vmx:probably  needs handle_info({'EXIT', FromPid, reset}
53    %    %     in couch_spatial_group.erl
54    %    exit(reset)
55    %end,
56
57    %ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
58    % List of indexes with their (initially empty) results
59    IndexEmptyKVs = [{Index, []} || Index <- Group#spatial_group.indexes],
60    % compute on all docs modified since we last computed.
61    TotalChanges = couch_db:count_changes_since(Db, Seq),
62    couch_task_status:add_task([
63        {type, indexer},
64        {database, DbName},
65        {design_document, GroupName},
66        {progress, 0},
67        {changes_done, 0},
68        {total_changes, TotalChanges}
69    ]),
70    couch_task_status:set_update_frequency(500),
71    {ok, MapCtx} = mapreduce:start_map_context([I#spatial.def || I <- Indexes]),
72    EmptyResults = [[] || _ <- Indexes],
73
74    {ok, _, {_,{UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}}
75        = couch_db:enum_docs_since(Db, Seq,
76        fun(DocInfo, _, {ChangesProcessed, Acc}) ->
77            Progress = (ChangesProcessed*100) div TotalChanges,
78            couch_task_status:update([
79                {progress, Progress},
80                {changes_done, ChangesProcessed}
81            ]),
82            %?LOG_DEBUG("enum_doc_since: ~p", [Acc]),
83            Acc2 = process_doc(Db, Owner, MapCtx, EmptyResults, DocInfo, Acc),
84            {ok, {ChangesProcessed+1, Acc2}}
85        end, {0, {[], Group, IndexEmptyKVs, []}}, []),
86     %?LOG_DEBUG("enum_doc_since results: ~p~n~p~n~p", [UncomputedDocs, ViewKVsToAdd, DocIdViewIdKeys]),
87    Results = spatial_docs(MapCtx, UncomputedDocs, EmptyResults),
88    % Output is way to huge
89    %?LOG_DEBUG("spatial_docs results: ~p", [Results]),
90    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
91            UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
92    NewSeq = couch_db:get_update_seq(Db),
93    ?LOG_DEBUG("new seq num: ~p", [NewSeq]),
94    {ok, Group4} = write_changes(Group3, ViewKVsToAdd2, DocIdViewIdKeys2,
95                NewSeq),
96    exit({new_group, Group4}).
97
98
99
100
101% NOTE vmx: whatever it does, it seems to be doing a good job
102view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
103    {ViewKVs, DocIdViewIdKeysAcc};
104view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
105    {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),
106    NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
107    view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
108
109
110view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
111    {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
112view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
113    % Take any identical keys and combine the values
114    ResultKVs2 = lists:foldl(
115        % Key is the bounding box of the geometry,
116        % Value is a tuple of the the geometry and the actual value
117        fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->
118            case Key == PrevKey of
119            true ->
120                case PrevVal of
121                {dups, Dups} ->
122                    [{PrevKey, {dups, [Value|Dups]}} | AccRest];
123                _ ->
124                    [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]
125                end;
126            false ->
127                [{Key,Value},{PrevKey,PrevVal}|AccRest]
128            end;
129        (KV, []) ->
130           [KV]
131        end, [], lists:sort(ResultKVs)),
132    NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
133    NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
134    NewViewIdKeys = [{View#spatial.id_num, Key} || {Key, _Value} <- ResultKVs2],
135    NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
136    view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
137
138spatial_docs(MapCtx, Docs, EmptyResults) ->
139    spatial_docs(MapCtx, Docs, EmptyResults, []).
140
141spatial_docs(_MapCtx, [], _EmptyResults, Acc) ->
142    lists:reverse(Acc);
143spatial_docs(MapCtx, [Doc | RestDocs], EmptyResults, Acc) ->
144    JsonDoc = couch_doc:to_raw_json_binary(Doc),
145    % NOTE vmx: perhaps should map_doc renamed to something more
146    % general as it can be used for most indexers
147    case mapreduce:map_doc(MapCtx, JsonDoc) of
148    {ok, FunsResults} ->
149        % the results are a json array of function map yields like this:
150        % [FunResults1, FunResults2 ...]
151        % where funresults is are json arrays of key value pairs:
152        % [{Geom1Json, Value1Json}, {Geom2Json, Value2Json}]
153        % Convert the key, value pairs to tuples like
154        % [{Bbox1, {Geom1, Value1}}, {Bbox2, {Geom2, Value2}}]
155        SpatialResults = lists:map(
156            fun(FunRs) ->
157                case FunRs of
158                    [] -> [];
159                    % do some post-processing of the result documents
160                    FunRs -> process_results(FunRs)
161                end
162            end,
163            FunsResults),
164        spatial_docs(MapCtx, RestDocs, EmptyResults, [SpatialResults | Acc]);
165    {error, Reason} ->
166        ?LOG_ERROR("Error computing spatial result for document `~s`: ~p",
167            [Doc#doc.id, Reason]),
168        spatial_docs(MapCtx, RestDocs, EmptyResults, [EmptyResults | Acc])
169    end.
170
171
172% This fun computes once for each document
173% This is from an old revision (796805) of couch_view_updater
174process_doc(Db, Owner, MapCtx, EmptyResults, DocInfo, {Docs, Group, IndexKVs, DocIdIndexIdKeys}) ->
175    #spatial_group{ design_options = DesignOptions } = Group,
176    #doc_info{id=DocId, deleted=Deleted} = DocInfo,
177    LocalSeq = proplists:get_value(<<"local_seq">>,
178        DesignOptions, false),
179    DocOpts = case LocalSeq of
180        true ->
181            [conflicts, deleted_conflicts, local_seq];
182        _ ->
183            [conflicts, deleted_conflicts]
184    end,
185    case DocId of
186    <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
187        {Docs, Group, IndexKVs, DocIdIndexIdKeys};
188    _ ->
189        {Docs2, DocIdIndexIdKeys2} =
190        if Deleted ->
191            {Docs, [{DocId, []} | DocIdIndexIdKeys]};
192        true ->
193            {ok, Doc} = couch_db:open_doc_int(Db, DocInfo,
194                DocOpts),
195            {[Doc | Docs], DocIdIndexIdKeys}
196        end,
197
198        case couch_util:should_flush() of
199        true ->
200            Results = spatial_docs(MapCtx, Docs2, EmptyResults),
201            {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2,
202                Results, IndexKVs, DocIdIndexIdKeys2),
203            {ok, Group2} = write_changes(Group, ViewKVs3, DocIdViewIdKeys3,
204                DocInfo#doc_info.local_seq),
205            if is_pid(Owner) ->
206                ok = gen_server:cast(Owner, {partial_update, self(), Group2});
207            true -> ok end,
208            garbage_collect(),
209            IndexEmptyKVs = [{Index, []} || Index <- Group#spatial_group.indexes],
210            {[], Group2, IndexEmptyKVs, []};
211        false ->
212            {Docs2, Group, IndexKVs, DocIdIndexIdKeys2}
213        end
214    end.
215
216
217write_changes(Group, IndexKeyValuesToAdd, DocIdIndexIdKeys, NewSeq) ->
218    #spatial_group{id_btree=IdBtree, fd=Fd} = Group,
219    AddDocIdIndexIdKeys = [{DocId, IndexIdKeys} || {DocId, IndexIdKeys} <- DocIdIndexIdKeys, IndexIdKeys /= []],
220    RemoveDocIds = [DocId || {DocId, IndexIdKeys} <- DocIdIndexIdKeys, IndexIdKeys == []],
221    LookupDocIds = [DocId || {DocId, _IndexIdKeys} <- DocIdIndexIdKeys],
222    {ok, LookupResults, IdBtree2}
223        = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdIndexIdKeys, RemoveDocIds),
224    KeysToRemoveByIndex = lists:foldl(
225        fun(LookupResult, KeysToRemoveByIndexAcc) ->
226            case LookupResult of
227            {ok, {DocId, IndexIdKeys}} ->
228                lists:foldl(
229                    fun({IndexId, Key}, KeysToRemoveByIndexAcc2) ->
230                        dict:append(IndexId, {Key, DocId}, KeysToRemoveByIndexAcc2)
231                    end,
232                    KeysToRemoveByIndexAcc, IndexIdKeys);
233            {not_found, _} ->
234                KeysToRemoveByIndexAcc
235            end
236        end,
237        dict:new(), LookupResults),
238    Indexes2 = lists:zipwith(fun(Index, {_Index, AddKeyValues}) ->
239        KeysToRemove = couch_util:dict_find(Index#spatial.id_num, KeysToRemoveByIndex, []),
240        %?LOG_DEBUG("storing spatial data: ~n~p~n~p~n~p",
241        %           [Index, AddKeyValues, KeysToRemove]),
242        {ok, IndexTreePos, IndexTreeHeight} = vtree:add_remove(
243                Fd, Index#spatial.treepos, Index#spatial.treeheight,
244                AddKeyValues, KeysToRemove),
245        case IndexTreePos =/= Index#spatial.treepos of
246        true ->
247             Index#spatial{treepos=IndexTreePos, treeheight=IndexTreeHeight,
248                 update_seq=NewSeq};
249        _ ->
250             Index#spatial{treepos=IndexTreePos, treeheight=IndexTreeHeight}
251        end
252    end, Group#spatial_group.indexes, IndexKeyValuesToAdd),
253    couch_file:flush(Fd),
254    Group2 = Group#spatial_group{indexes=Indexes2, current_seq=NewSeq, id_btree=IdBtree2},
255    lists:foreach(fun(Index) ->
256        ?LOG_INFO("Position of the spatial index (~p) root node: ~p",
257                [Index#spatial.id_num, Index#spatial.treepos])
258    end, Indexes2),
259    {ok, Group2}.
260
261
262% NOTE vmx: This is kind of ugly. This function is needed for a benchmark for
263%     the replication filter
264% Return the bounding box of a GeoJSON geometry. "Geo" is wrapped in
265% brackets ({}) as returned from proplists:get_value()
266geojson_get_bbox(Geo) ->
267    {Bbox, {_, nil}} = process_result([Geo|[nil]]),
268    Bbox.
269
270
271process_results(Results) ->
272    % NOTE vmx (2011-02-01): the ordering of the results doesn't matter
273    %     therefore we don't need to reverse the list.
274    lists:foldl(fun(Result, Acc) ->
275        [process_result(Result)|Acc]
276    end, [], Results).
277
278process_result({K, V}) ->
279    {Geo} = ?JSON_DECODE(K),
280    Value = ?JSON_DECODE(V),
281    Type = binary_to_atom(proplists:get_value(<<"type">>, Geo), utf8),
282    Bbox = case Type of
283    'GeometryCollection' ->
284        Geometries = proplists:get_value(<<"geometries">>, Geo),
285        lists:foldl(fun({Geometry}, CurBbox) ->
286            Type2 = binary_to_atom(
287                proplists:get_value(<<"type">>, Geometry), utf8),
288            Coords = proplists:get_value(<<"coordinates">>, Geometry),
289            case proplists:get_value(<<"bbox">>, Geo) of
290            undefined ->
291                extract_bbox(Type2, Coords, CurBbox);
292            Bbox2 ->
293                Bbox2
294            end
295        end, nil, Geometries);
296    _ ->
297        Coords = proplists:get_value(<<"coordinates">>, Geo),
298        case proplists:get_value(<<"bbox">>, Geo) of
299        undefined ->
300            extract_bbox(Type, Coords);
301        Bbox2 ->
302            Bbox2
303        end
304    end,
305
306    Geom = geojsongeom_to_geocouch(Geo),
307    {erlang:list_to_tuple(Bbox), {Geom, Value}}.
308
309
310extract_bbox(Type, Coords) ->
311    extract_bbox(Type, Coords, nil).
312
313extract_bbox(Type, Coords, InitBbox) ->
314    case Type of
315    'Point' ->
316        bbox([Coords], InitBbox);
317    'LineString' ->
318        bbox(Coords, InitBbox);
319    'Polygon' ->
320        % holes don't matter for the bounding box
321        bbox(hd(Coords), InitBbox);
322    'MultiPoint' ->
323        bbox(Coords, InitBbox);
324    'MultiLineString' ->
325        lists:foldl(fun(Linestring, CurBbox) ->
326            bbox(Linestring, CurBbox)
327        end, InitBbox, Coords);
328    'MultiPolygon' ->
329        lists:foldl(fun(Polygon, CurBbox) ->
330            bbox(hd(Polygon), CurBbox)
331        end, InitBbox, Coords)
332    end.
333
334bbox([], {Min, Max}) ->
335    Min ++ Max;
336bbox([Coords|Rest], nil) ->
337    bbox(Rest, {Coords, Coords});
338bbox(Coords, Bbox) when is_list(Bbox)->
339    MinMax = lists:split(length(Bbox) div 2, Bbox),
340    bbox(Coords, MinMax);
341bbox([Coords|Rest], {Min, Max}) ->
342    Min2 = lists:zipwith(fun(X, Y) -> erlang:min(X,Y) end, Coords, Min),
343    Max2 = lists:zipwith(fun(X, Y) -> erlang:max(X,Y) end, Coords, Max),
344    bbox(Rest, {Min2, Max2}).
345
346
347% @doc Transforms a GeoJSON geometry (as Erlang terms), to an internal
348% structure
349geojsongeom_to_geocouch(Geom) ->
350    Type = proplists:get_value(<<"type">>, Geom),
351    Coords = case Type of
352    <<"GeometryCollection">> ->
353        Geometries = proplists:get_value(<<"geometries">>, Geom),
354        [geojsongeom_to_geocouch(G) || {G} <- Geometries];
355    _ ->
356        proplists:get_value(<<"coordinates">>, Geom)
357    end,
358    {binary_to_atom(Type, utf8), Coords}.
359
360% @doc Transforms internal structure to a GeoJSON geometry (as Erlang terms)
361geocouch_to_geojsongeom({Type, Coords}) ->
362    Coords2 = case Type of
363    'GeometryCollection' ->
364        Geoms = [geocouch_to_geojsongeom(C) || C <- Coords],
365        {"geometries", Geoms};
366    _ ->
367        {<<"coordinates">>, Coords}
368    end,
369    {[{<<"type">>, Type}, Coords2]}.
370