1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_spatial_updater).
14
15-ifdef(makecheck).
16-compile(export_all).
17-endif.
18
19
20-export([update/3]).
21
22% for benchmark script
23-export([geojson_get_bbox/1]).
24
25% for output (couch_http_spatial, couch_http_spatial_list)
26-export([geocouch_to_geojsongeom/1]).
27
28% for polygon search
29-export([extract_bbox/2, geojsongeom_to_geocouch/1]).
30
31-include("couch_db.hrl").
32-include("couch_spatial.hrl").
33
34
35update(Owner, Group, DbName) when is_binary(DbName) ->
36    {ok, Db} = couch_db:open_int(DbName, []),
37    try
38        update(Owner, Group, Db)
39    after
40        couch_db:close(Db)
41    end;
42
43update(Owner, Group, #db{name=DbName} = Db) ->
44    #spatial_group{
45        name = GroupName,
46        current_seq = Seq,
47        indexes = Indexes
48        %purge_seq = PurgeSeq
49    } = Group,
50    % XXX vmx: what are purges? when do they happen?
51    %DbPurgeSeq = couch_db:get_purge_seq(Db),
52    %Group2 =
53    %if DbPurgeSeq == PurgeSeq ->
54    %    Group;
55    %DbPurgeSeq == PurgeSeq + 1 ->
56    %    couch_task_status:update(<<"Removing purged entries from view index.">>),
57    %    purge_index(Group);
58    %true ->
59    %    couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
60    %    % NOTE vmx:probably  needs handle_info({'EXIT', FromPid, reset}
61    %    %     in couch_spatial_group.erl
62    %    exit(reset)
63    %end,
64
65    %ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
66    % List of indexes with their (initially empty) results
67    IndexEmptyKVs = [{Index, []} || Index <- Group#spatial_group.indexes],
68    % compute on all docs modified since we last computed.
69    TotalChanges = couch_db:count_changes_since(Db, Seq),
70    couch_task_status:add_task([
71        {type, indexer},
72        {database, DbName},
73        {design_document, GroupName},
74        {progress, 0},
75        {changes_done, 0},
76        {total_changes, TotalChanges}
77    ]),
78    couch_task_status:set_update_frequency(500),
79    {ok, MapCtx} = mapreduce:start_map_context([I#spatial.def || I <- Indexes]),
80    EmptyResults = [[] || _ <- Indexes],
81
82    {ok, _, {_,{UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}}
83        = couch_db:enum_docs_since(Db, Seq,
84        fun(DocInfo, _, {ChangesProcessed, Acc}) ->
85            Progress = (ChangesProcessed*100) div TotalChanges,
86            couch_task_status:update([
87                {progress, Progress},
88                {changes_done, ChangesProcessed}
89            ]),
90            %?LOG_DEBUG("enum_doc_since: ~p", [Acc]),
91            Acc2 = process_doc(Db, Owner, MapCtx, EmptyResults, DocInfo, Acc),
92            {ok, {ChangesProcessed+1, Acc2}}
93        end, {0, {[], Group, IndexEmptyKVs, []}}, []),
94     %?LOG_DEBUG("enum_doc_since results: ~p~n~p~n~p", [UncomputedDocs, ViewKVsToAdd, DocIdViewIdKeys]),
95    Results = spatial_docs(MapCtx, UncomputedDocs, EmptyResults),
96    % Output is way to huge
97    %?LOG_DEBUG("spatial_docs results: ~p", [Results]),
98    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
99            UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
100    NewSeq = couch_db:get_update_seq(Db),
101    ?LOG_DEBUG("new seq num: ~p", [NewSeq]),
102    {ok, Group4} = write_changes(Group3, ViewKVsToAdd2, DocIdViewIdKeys2,
103                NewSeq),
104    exit({new_group, Group4}).
105
106
107
108
109% NOTE vmx: whatever it does, it seems to be doing a good job
110view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
111    {ViewKVs, DocIdViewIdKeysAcc};
112view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
113    {NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(Doc, QueryResults, ViewKVs, [], []),
114    NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
115    view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
116
117
118view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
119    {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
120view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
121    % Take any identical keys and combine the values
122    ResultKVs2 = lists:foldl(
123        % Key is the bounding box of the geometry,
124        % Value is a tuple of the the geometry and the actual value
125        fun({Key,Value}, [{PrevKey,PrevVal}|AccRest]) ->
126            case Key == PrevKey of
127            true ->
128                case PrevVal of
129                {dups, Dups} ->
130                    [{PrevKey, {dups, [Value|Dups]}} | AccRest];
131                _ ->
132                    [{PrevKey, {dups, [Value,PrevVal]}} | AccRest]
133                end;
134            false ->
135                [{Key,Value},{PrevKey,PrevVal}|AccRest]
136            end;
137        (KV, []) ->
138           [KV]
139        end, [], lists:sort(ResultKVs)),
140    NewKVs = [{{Key, DocId}, Value} || {Key, Value} <- ResultKVs2],
141    NewViewKVsAcc = [{View, NewKVs ++ KVs} | ViewKVsAcc],
142    NewViewIdKeys = [{View#spatial.id_num, Key} || {Key, _Value} <- ResultKVs2],
143    NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
144    view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
145
146spatial_docs(MapCtx, Docs, EmptyResults) ->
147    spatial_docs(MapCtx, Docs, EmptyResults, []).
148
149spatial_docs(_MapCtx, [], _EmptyResults, Acc) ->
150    lists:reverse(Acc);
151spatial_docs(MapCtx, [Doc | RestDocs], EmptyResults, Acc) ->
152    {DocBody, DocMeta} = couch_doc:to_raw_json_binary_views(Doc),
153    % NOTE vmx: perhaps should map_doc renamed to something more
154    % general as it can be used for most indexers
155    case mapreduce:map_doc(MapCtx, DocBody, DocMeta) of
156    {ok, FunsResults} ->
157        % the results are a json array of function map yields like this:
158        % [FunResults1, FunResults2 ...]
159        % where funresults is are json arrays of key value pairs:
160        % [{Geom1Json, Value1Json}, {Geom2Json, Value2Json}]
161        % Convert the key, value pairs to tuples like
162        % [{Bbox1, {Geom1, Value1}}, {Bbox2, {Geom2, Value2}}]
163        SpatialResults = lists:map(
164            fun(FunRs) ->
165                case FunRs of
166                    {error, Reason} ->
167                        ?LOG_ERROR("Error computing spatial result for document `~s`: ~p",
168                            [Doc#doc.id, Reason]),
169                        [];
170                    [] ->
171                        [];
172                    FunRs ->
173                        % do some post-processing of the result documents
174                        process_results(FunRs)
175                end
176            end,
177            FunsResults),
178        spatial_docs(MapCtx, RestDocs, EmptyResults, [SpatialResults | Acc]);
179    {error, Reason} ->
180        ?LOG_ERROR("Error computing spatial result for document `~s`: ~p",
181            [Doc#doc.id, Reason]),
182        spatial_docs(MapCtx, RestDocs, EmptyResults, [EmptyResults | Acc])
183    end.
184
185
186% This fun computes once for each document
187% This is from an old revision (796805) of couch_view_updater
188process_doc(Db, Owner, MapCtx, EmptyResults, DocInfo, {Docs, Group, IndexKVs, DocIdIndexIdKeys}) ->
189    #spatial_group{ design_options = DesignOptions } = Group,
190    #doc_info{id=DocId, deleted=Deleted} = DocInfo,
191    LocalSeq = proplists:get_value(<<"local_seq">>,
192        DesignOptions, false),
193    DocOpts = case LocalSeq of
194        true ->
195            [conflicts, deleted_conflicts, local_seq];
196        _ ->
197            [conflicts, deleted_conflicts]
198    end,
199    case DocId of
200    <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
201        {Docs, Group, IndexKVs, DocIdIndexIdKeys};
202    _ ->
203        {Docs2, DocIdIndexIdKeys2} =
204        if Deleted ->
205            {Docs, [{DocId, []} | DocIdIndexIdKeys]};
206        true ->
207            {ok, Doc} = couch_db:open_doc_int(Db, DocInfo,
208                DocOpts),
209            {[Doc | Docs], DocIdIndexIdKeys}
210        end,
211
212        case couch_util:should_flush() of
213        true ->
214            Results = spatial_docs(MapCtx, Docs2, EmptyResults),
215            {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2,
216                Results, IndexKVs, DocIdIndexIdKeys2),
217            {ok, Group2} = write_changes(Group, ViewKVs3, DocIdViewIdKeys3,
218                DocInfo#doc_info.local_seq),
219            if is_pid(Owner) ->
220                ok = gen_server:cast(Owner, {partial_update, self(), Group2});
221            true -> ok end,
222            garbage_collect(),
223            IndexEmptyKVs = [{Index, []} || Index <- Group#spatial_group.indexes],
224            {[], Group2, IndexEmptyKVs, []};
225        false ->
226            {Docs2, Group, IndexKVs, DocIdIndexIdKeys2}
227        end
228    end.
229
230
231write_changes(Group, IndexKeyValuesToAdd, DocIdIndexIdKeys, NewSeq) ->
232    #spatial_group{id_btree=IdBtree, fd=Fd} = Group,
233    AddDocIdIndexIdKeys = [{DocId, IndexIdKeys} || {DocId, IndexIdKeys} <- DocIdIndexIdKeys, IndexIdKeys /= []],
234    RemoveDocIds = [DocId || {DocId, IndexIdKeys} <- DocIdIndexIdKeys, IndexIdKeys == []],
235    LookupDocIds = [DocId || {DocId, _IndexIdKeys} <- DocIdIndexIdKeys],
236    {ok, LookupResults, IdBtree2}
237        = couch_btree:query_modify(IdBtree, LookupDocIds, AddDocIdIndexIdKeys, RemoveDocIds),
238    KeysToRemoveByIndex = lists:foldl(
239        fun(LookupResult, KeysToRemoveByIndexAcc) ->
240            case LookupResult of
241            {ok, {DocId, IndexIdKeys}} ->
242                lists:foldl(
243                    fun({IndexId, Key}, KeysToRemoveByIndexAcc2) ->
244                        dict:append(IndexId, {Key, DocId}, KeysToRemoveByIndexAcc2)
245                    end,
246                    KeysToRemoveByIndexAcc, IndexIdKeys);
247            {not_found, _} ->
248                KeysToRemoveByIndexAcc
249            end
250        end,
251        dict:new(), LookupResults),
252    Indexes2 = lists:zipwith(fun(Index, {_Index, AddKeyValues}) ->
253        KeysToRemove = couch_util:dict_find(Index#spatial.id_num, KeysToRemoveByIndex, []),
254        %?LOG_DEBUG("storing spatial data: ~n~p~n~p~n~p",
255        %           [Index, AddKeyValues, KeysToRemove]),
256        {ok, IndexTreePos, IndexTreeHeight} = vtree:add_remove(
257                Fd, Index#spatial.treepos, Index#spatial.treeheight,
258                AddKeyValues, KeysToRemove),
259        case IndexTreePos =/= Index#spatial.treepos of
260        true ->
261             Index#spatial{treepos=IndexTreePos, treeheight=IndexTreeHeight,
262                 update_seq=NewSeq};
263        _ ->
264             Index#spatial{treepos=IndexTreePos, treeheight=IndexTreeHeight}
265        end
266    end, Group#spatial_group.indexes, IndexKeyValuesToAdd),
267    couch_file:flush(Fd),
268    Group2 = Group#spatial_group{indexes=Indexes2, current_seq=NewSeq, id_btree=IdBtree2},
269    lists:foreach(fun(Index) ->
270        ?LOG_INFO("Position of the spatial index (~p) root node: ~p",
271                [Index#spatial.id_num, Index#spatial.treepos])
272    end, Indexes2),
273    {ok, Group2}.
274
275
276% NOTE vmx: This is kind of ugly. This function is needed for a benchmark for
277%     the replication filter
278% Return the bounding box of a GeoJSON geometry. "Geo" is wrapped in
279% brackets ({}) as returned from proplists:get_value()
280geojson_get_bbox(Geo) ->
281    {Bbox, {_, nil}} = process_result([Geo|[nil]]),
282    Bbox.
283
284
285process_results(Results) ->
286    % NOTE vmx (2011-02-01): the ordering of the results doesn't matter
287    %     therefore we don't need to reverse the list.
288    lists:foldl(fun(Result, Acc) ->
289        [process_result(Result)|Acc]
290    end, [], Results).
291
292process_result({K, V}) ->
293    {Geo} = ?JSON_DECODE(K),
294    Value = ?JSON_DECODE(V),
295    Type = binary_to_atom(proplists:get_value(<<"type">>, Geo), utf8),
296    Bbox = case Type of
297    'GeometryCollection' ->
298        Geometries = proplists:get_value(<<"geometries">>, Geo),
299        lists:foldl(fun({Geometry}, CurBbox) ->
300            Type2 = binary_to_atom(
301                proplists:get_value(<<"type">>, Geometry), utf8),
302            Coords = proplists:get_value(<<"coordinates">>, Geometry),
303            case proplists:get_value(<<"bbox">>, Geo) of
304            undefined ->
305                extract_bbox(Type2, Coords, CurBbox);
306            Bbox2 ->
307                Bbox2
308            end
309        end, nil, Geometries);
310    _ ->
311        Coords = proplists:get_value(<<"coordinates">>, Geo),
312        case proplists:get_value(<<"bbox">>, Geo) of
313        undefined ->
314            extract_bbox(Type, Coords);
315        Bbox2 ->
316            Bbox2
317        end
318    end,
319
320    Geom = geojsongeom_to_geocouch(Geo),
321    {erlang:list_to_tuple(Bbox), {Geom, Value}}.
322
323
324extract_bbox(Type, Coords) ->
325    extract_bbox(Type, Coords, nil).
326
327extract_bbox(Type, Coords, InitBbox) ->
328    case Type of
329    'Point' ->
330        bbox([Coords], InitBbox);
331    'LineString' ->
332        bbox(Coords, InitBbox);
333    'Polygon' ->
334        % holes don't matter for the bounding box
335        bbox(hd(Coords), InitBbox);
336    'MultiPoint' ->
337        bbox(Coords, InitBbox);
338    'MultiLineString' ->
339        lists:foldl(fun(Linestring, CurBbox) ->
340            bbox(Linestring, CurBbox)
341        end, InitBbox, Coords);
342    'MultiPolygon' ->
343        lists:foldl(fun(Polygon, CurBbox) ->
344            bbox(hd(Polygon), CurBbox)
345        end, InitBbox, Coords)
346    end.
347
348bbox([], {Min, Max}) ->
349    Min ++ Max;
350bbox([Coords|Rest], nil) ->
351    bbox(Rest, {Coords, Coords});
352bbox(Coords, Bbox) when is_list(Bbox)->
353    MinMax = lists:split(length(Bbox) div 2, Bbox),
354    bbox(Coords, MinMax);
355bbox([Coords|Rest], {Min, Max}) ->
356    Min2 = lists:zipwith(fun(X, Y) -> erlang:min(X,Y) end, Coords, Min),
357    Max2 = lists:zipwith(fun(X, Y) -> erlang:max(X,Y) end, Coords, Max),
358    bbox(Rest, {Min2, Max2}).
359
360
361% @doc Transforms a GeoJSON geometry (as Erlang terms), to an internal
362% structure
363geojsongeom_to_geocouch(Geom) ->
364    Type = proplists:get_value(<<"type">>, Geom),
365    Coords = case Type of
366    <<"GeometryCollection">> ->
367        Geometries = proplists:get_value(<<"geometries">>, Geom),
368        [geojsongeom_to_geocouch(G) || {G} <- Geometries];
369    _ ->
370        proplists:get_value(<<"coordinates">>, Geom)
371    end,
372    {binary_to_atom(Type, utf8), Coords}.
373
374% @doc Transforms internal structure to a GeoJSON geometry (as Erlang terms)
375geocouch_to_geojsongeom({Type, Coords}) ->
376    Coords2 = case Type of
377    'GeometryCollection' ->
378        Geoms = [geocouch_to_geojsongeom(C) || C <- Coords],
379        {"geometries", Geoms};
380    _ ->
381        {<<"coordinates">>, Coords}
382    end,
383    {[{<<"type">>, Type}, Coords2]}.
384