1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_mapreduce).
16
17-include("couch_db.hrl").
18-include_lib("couch_set_view/include/couch_set_view.hrl").
19
20-export([get_map_context/1, get_reduce_context/1]).
21-export([map/5, reduce/2, reduce/3, rereduce/2, rereduce/3]).
22-export([builtin_reduce/3]).
23-export([validate_ddoc_views/1]).
24-export([is_doc_used/1]).
25
26-define(STATS_ERROR_MSG, <<"Builtin _stats function requires map values to be numbers">>).
27-define(SUM_ERROR_MSG,   <<"Builtin _sum function requires map values to be numbers">>).
28
29
30get_map_context(#set_view_group{mod = Mod, views = Views, sig = Sig}) ->
31    case ets:lookup(map_context_store, Sig) of
32    [] ->
33        {ok, Ctx} = mapreduce:start_map_context(
34            Mod, [View#set_view.def || View <- Views]),
35        ets:insert(map_context_store, {Sig, Ctx}),
36        Ctx;
37    [{Sig, Ctx}] ->
38        Ctx
39    end.
40
41get_reduce_context(SetView) ->
42    #set_view{
43        ref = Ref,
44        indexer = #mapreduce_view{
45            reduce_funs = RedFuns
46        }
47    } = SetView,
48    FunSrcs = lists:foldr(
49        fun({_Name, <<"_", _/binary>>}, Acc) ->
50            Acc;
51        ({_Name, Src}, Acc) ->
52            [Src | Acc]
53        end,
54        [], RedFuns),
55    case FunSrcs of
56    [] ->
57        ok;
58    _ ->
59        case ets:lookup(reduce_context_store, Ref) of
60        [] ->
61            {ok, Ctx} = mapreduce:start_reduce_context(FunSrcs),
62            ets:insert(reduce_context_store, {Ref, Ctx}),
63            Ctx;
64        [{Ref, Ctx}] ->
65            Ctx
66        end
67    end.
68
69map(Doc, XATTRs, PartId, Seq, Group) ->
70    Ctx = get_map_context(Group),
71    {DocBody, DocMeta0} = couch_doc:to_raw_json_binary_views(Doc, PartId, Seq),
72    MetaLen = byte_size(DocMeta0),
73    TempDocMeta = binary:part(DocMeta0, 0, MetaLen-1),
74    DocMeta = <<TempDocMeta/binary, ",", XATTRs/binary, "}">>,
75    case mapreduce:map_doc(Ctx, DocBody, DocMeta) of
76    {ok, _Results, _LogList} = Ok ->
77        Ok;
78    Error ->
79        throw(Error)
80    end.
81
82-spec is_doc_used(#set_view_group{}) -> {ok, doc_fields_used} | {ok, doc_fields_unused}.
83is_doc_used(Group) ->
84    OptimizeDocLoad = list_to_atom(couch_config:get("mapreduce",
85                         "optimize_doc_loading")),
86    case OptimizeDocLoad of
87    true ->
88        Ctx = get_map_context(Group),
89        mapreduce:is_doc_used(Ctx);
90    _ ->
91        % When in doubt always pull doc from ep engine
92        {ok, doc_fields_used}
93    end.
94
95reduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}}, _KVs) ->
96    {ok, []};
97reduce(SetView, KVs0) ->
98    #set_view{
99        indexer = #mapreduce_view{
100            reduce_funs = RedFuns
101        }
102    } = SetView,
103    RedFunSources = [FunSource || {_Name, FunSource} <- RedFuns],
104    {NativeFuns, JsFuns} = lists:partition(
105        fun(<<"_", _/binary>>) -> true; (_) -> false end,
106        RedFunSources),
107    KVs = encode_kvs(KVs0, []),
108    case JsFuns of
109    [] ->
110        builtin_reduce(reduce, NativeFuns, KVs, []);
111    _ ->
112        {ok, NativeResults} = builtin_reduce(reduce, NativeFuns, KVs, []),
113        Ctx = get_reduce_context(SetView),
114        case mapreduce:reduce(Ctx, KVs) of
115        {ok, JsResults} ->
116            recombine_reduce_results(RedFunSources, JsResults, NativeResults, []);
117        Error ->
118            throw(Error)
119        end
120    end.
121
122
123reduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}},
124        _NthRed, _KVs) ->
125    {ok, []};
126reduce(SetView, NthRed, KVs0) ->
127    #set_view{
128        indexer = #mapreduce_view{
129            reduce_funs = RedFuns
130        }
131    } = SetView,
132    {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns),
133    KVs = encode_kvs(KVs0, []),
134    case FunSrc of
135    <<"_", _/binary>> ->
136        builtin_reduce(reduce, [FunSrc], KVs, []);
137    _ ->
138        Ctx = get_reduce_context(SetView),
139        NthRed2 = lists:foldl(
140            fun({_, <<"_", _/binary>>}, Acc) ->
141                    Acc - 1;
142                (_, Acc) ->
143                    Acc
144            end,
145            NthRed,
146            Before),
147        case mapreduce:reduce(Ctx, NthRed2, KVs) of
148        {ok, ReduceValue} ->
149            {ok, [ReduceValue]};
150        Error ->
151            throw(Error)
152        end
153    end.
154
155
156rereduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}},
157        _ReducedValues) ->
158    {ok, []};
159rereduce(SetView, ReducedValues) ->
160    #set_view{
161        indexer = #mapreduce_view{
162            reduce_funs = RedFuns
163        }
164    } = SetView,
165    Grouped = group_reductions_results(ReducedValues),
166    Results = lists:zipwith(
167        fun({native, FunSrc}, Values) ->
168            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []),
169            Result;
170        (Idx, Values) ->
171            Ctx = get_reduce_context(SetView),
172            case mapreduce:rereduce(Ctx, Idx, Values) of
173            {ok, Reduction} ->
174                Reduction;
175            Error ->
176                throw(Error)
177            end
178        end, reduce_fun_indexes(RedFuns), Grouped),
179    {ok, Results}.
180
181
182rereduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}},
183        _NthRed, _ReducedValues) ->
184    {ok, []};
185rereduce(SetView, NthRed, ReducedValues) ->
186    #set_view{
187        indexer = #mapreduce_view{
188            reduce_funs = RedFuns
189        }
190    } = SetView,
191    {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns),
192    [Values] = group_reductions_results(ReducedValues),
193    case FunSrc of
194    <<"_", _/binary>> ->
195        builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []);
196    _ ->
197        Ctx = get_reduce_context(SetView),
198        NthRed2 = lists:foldl(
199            fun({_, <<"_", _/binary>>}, Acc) ->
200                    Acc - 1;
201                (_, Acc) ->
202                    Acc
203            end,
204            NthRed,
205            Before),
206        case mapreduce:rereduce(Ctx, NthRed2, Values) of
207        {ok, ReduceValue} ->
208            {ok, [ReduceValue]};
209        Error ->
210            throw(Error)
211        end
212    end.
213
214
215reduce_fun_indexes(RedFuns) ->
216    {L, _} = lists:mapfoldl(
217        fun({_Name, <<"_", _/binary>> = Src}, Idx) ->
218                {{native, Src}, Idx};
219            ({_Name, _JsSrc}, Idx) ->
220                {Idx, Idx + 1}
221        end,
222        1, RedFuns),
223    L.
224
225
226recombine_reduce_results([], [], [], Acc) ->
227    {ok, lists:reverse(Acc)};
228recombine_reduce_results([<<"_", _/binary>> | RedSrcs], JsResults, [BRes | BuiltinResults], Acc) ->
229    recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [BRes | Acc]);
230recombine_reduce_results([_JsFun | RedSrcs], [JsR | JsResults], BuiltinResults, Acc) ->
231    recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [JsR | Acc]).
232
233
234group_reductions_results([]) ->
235    [];
236group_reductions_results(List) ->
237    {Heads, Tails} = lists:foldl(
238        fun([H | T], {HAcc, TAcc}) ->
239            {[H | HAcc], [T | TAcc]}
240        end,
241        {[], []}, List),
242    case Tails of
243    [[] | _] -> % no tails left
244        [Heads];
245    _ ->
246        [Heads | group_reductions_results(Tails)]
247    end.
248
249
250builtin_reduce(ReduceType, FunSrcs, Values) ->
251    builtin_reduce(ReduceType, FunSrcs, Values, []).
252
253builtin_reduce(_Re, [], _KVs, Acc) ->
254    {ok, lists:reverse(Acc)};
255builtin_reduce(Re, [<<"_sum", _/binary>> | BuiltinReds], KVs, Acc) ->
256    Sum = builtin_sum_rows(KVs),
257    builtin_reduce(Re, BuiltinReds, KVs, [Sum | Acc]);
258builtin_reduce(reduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) ->
259    Json = ?JSON_ENCODE(length(KVs)),
260    builtin_reduce(reduce, BuiltinReds, KVs, [Json | Acc]);
261builtin_reduce(rereduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) ->
262    Count = builtin_sum_rows(KVs),
263    builtin_reduce(rereduce, BuiltinReds, KVs, [Count | Acc]);
264builtin_reduce(Re, [<<"_stats", _/binary>> | BuiltinReds], KVs, Acc) ->
265    Stats = builtin_stats(Re, KVs),
266    builtin_reduce(Re, BuiltinReds, KVs, [Stats | Acc]);
267builtin_reduce(_Re, [InvalidBuiltin | _BuiltinReds], _KVs, _Acc) ->
268    throw({error, <<"Invalid builtin reduce function: ", InvalidBuiltin/binary>>}).
269
270
271parse_number(NumberBin, ErrorMsg) when is_binary(NumberBin) ->
272    parse_number(?b2l(NumberBin), ErrorMsg);
273parse_number(NumberStr, ErrorMsg) ->
274    case (catch list_to_integer(NumberStr)) of
275    {'EXIT', {badarg, _Stack}} ->
276        case (catch list_to_float(NumberStr)) of
277        {'EXIT', {badarg, _Stack2}} ->
278            throw({error, ErrorMsg});
279        Float ->
280            Float
281        end;
282    Int ->
283        Int
284    end.
285
286
287builtin_sum_rows(KVs) ->
288    Result = lists:foldl(fun({_Key, Value}, SumAcc) ->
289        parse_number(Value, ?SUM_ERROR_MSG) + SumAcc
290    end, 0, KVs),
291    ?JSON_ENCODE(Result).
292
293
294builtin_stats(reduce, []) ->
295    <<"{}">>;
296
297builtin_stats(reduce, [{_, First0} | Rest]) ->
298    First = parse_number(First0, ?STATS_ERROR_MSG),
299    {Sum, Cnt, Min, Max, Sqr} = lists:foldl(fun({_K, V0}, {S, C , Mi, Ma, Sq}) ->
300        V = parse_number(V0, ?STATS_ERROR_MSG),
301        {S + V, C + 1, erlang:min(Mi, V), erlang:max(Ma, V), Sq + (V * V)}
302    end, {First, 1, First, First, First * First}, Rest),
303    Result = {[
304        {<<"sum">>, Sum},
305        {<<"count">>, Cnt},
306        {<<"min">>, Min},
307        {<<"max">>, Max},
308        {<<"sumsqr">>, Sqr}
309    ]},
310    ?JSON_ENCODE(Result);
311
312builtin_stats(rereduce, [{_, First} | Rest]) ->
313    {[{<<"sum">>, Sum0},
314      {<<"count">>, Cnt0},
315      {<<"min">>, Min0},
316      {<<"max">>, Max0},
317      {<<"sumsqr">>, Sqr0}]} = ?JSON_DECODE(First),
318    {Sum, Cnt, Min, Max, Sqr} = lists:foldl(fun({_K, Red}, {S, C, Mi, Ma, Sq}) ->
319        {[{<<"sum">>, Sum},
320          {<<"count">>, Cnt},
321          {<<"min">>, Min},
322          {<<"max">>, Max},
323          {<<"sumsqr">>, Sqr}]} = ?JSON_DECODE(Red),
324        {Sum + S, Cnt + C, erlang:min(Min, Mi), erlang:max(Max, Ma), Sqr + Sq}
325    end, {Sum0, Cnt0, Min0, Max0, Sqr0}, Rest),
326    Result = {[
327        {<<"sum">>, Sum},
328        {<<"count">>, Cnt},
329        {<<"min">>, Min},
330        {<<"max">>, Max},
331        {<<"sumsqr">>, Sqr}
332    ]},
333    ?JSON_ENCODE(Result).
334
335
336encode_kvs([], Acc) ->
337    lists:reverse(Acc);
338encode_kvs([KV | Rest], Acc) ->
339    {KeyDocId, <<_PartId:16, Value/binary>>} = KV,
340    {Key, _DocId} = mapreduce_view:decode_key_docid(KeyDocId),
341    encode_kvs(Rest, [{Key, Value} | Acc]).
342
343
344validate_ddoc_views(#doc{body = {Body}}) ->
345    Views = couch_util:get_value(<<"views">>, Body, {[]}),
346    case Views of
347    {L} when is_list(L) ->
348        ok;
349    _ ->
350        throw({error, <<"The field `views' is not a json object.">>})
351    end,
352    lists:foreach(
353        fun({ViewName, Value}) ->
354            validate_view_definition(ViewName, Value)
355        end,
356        element(1, Views)).
357
358
359validate_view_definition(<<"">>, _) ->
360    throw({error, <<"View name cannot be an empty string">>});
361validate_view_definition(ViewName, {ViewProps}) when is_list(ViewProps) ->
362    validate_view_name(ViewName, iolist_to_binary(io_lib:format(
363        "View name `~s` cannot have leading or trailing whitespace",
364        [ViewName]))),
365    MapDef = couch_util:get_value(<<"map">>, ViewProps),
366    validate_view_map_function(ViewName, MapDef),
367    ReduceDef = couch_util:get_value(<<"reduce">>, ViewProps),
368    validate_view_reduce_function(ViewName, ReduceDef);
369validate_view_definition(ViewName, _) ->
370    ErrorMsg = io_lib:format("Value for view `~s' is not "
371                             "a json object.", [ViewName]),
372    throw({error, iolist_to_binary(ErrorMsg)}).
373
374
375% Make sure the view name doesn't contain leading or trailing whitespace
376% (space, tab, newline or carriage return)
377validate_view_name(<<" ", _Rest/binary>>, ErrorMsg) ->
378    throw({error, ErrorMsg});
379validate_view_name(<<"\t", _Rest/binary>>, ErrorMsg) ->
380    throw({error, ErrorMsg});
381validate_view_name(<<"\n", _Rest/binary>>, ErrorMsg) ->
382    throw({error, ErrorMsg});
383validate_view_name(<<"\r", _Rest/binary>>, ErrorMsg) ->
384    throw({error, ErrorMsg});
385validate_view_name(Bin, ErrorMsg) when size(Bin) > 1 ->
386    Size = size(Bin) - 1 ,
387    <<_:Size/binary, Trailing/bits>> = Bin,
388    % Check for trailing whitespace
389    validate_view_name(Trailing, ErrorMsg);
390validate_view_name(_, _) ->
391    ok.
392
393
394validate_view_map_function(ViewName, undefined) ->
395    ErrorMsg = io_lib:format("The `map' field for the view `~s' is missing.",
396                             [ViewName]),
397    throw({error, iolist_to_binary(ErrorMsg)});
398validate_view_map_function(ViewName, MapDef) when not is_binary(MapDef) ->
399    ErrorMsg = io_lib:format("The `map' field of the view `~s' is not "
400                             "a json string.", [ViewName]),
401    throw({error, iolist_to_binary(ErrorMsg)});
402validate_view_map_function(ViewName, MapDef) ->
403    case mapreduce:start_map_context(mapreduce_view, [MapDef]) of
404    {ok, _Ctx} ->
405        ok;
406    {error, Reason} ->
407        ErrorMsg = io_lib:format("Syntax error in the map function of"
408                                 " the view `~s': ~s", [ViewName, Reason]),
409        throw({error, iolist_to_binary(ErrorMsg)})
410    end.
411
412
413validate_view_reduce_function(_ViewName, undefined) ->
414    ok;
415validate_view_reduce_function(ViewName, ReduceDef) when not is_binary(ReduceDef) ->
416    ErrorMsg = io_lib:format("The `reduce' field of the view `~s' is not "
417                             "a json string.", [ViewName]),
418    throw({error, iolist_to_binary(ErrorMsg)});
419validate_view_reduce_function(_ViewName, <<"_count">>) ->
420    ok;
421validate_view_reduce_function(_ViewName, <<"_sum">>) ->
422    ok;
423validate_view_reduce_function(_ViewName, <<"_stats">>) ->
424    ok;
425validate_view_reduce_function(ViewName, <<"_", _/binary>> = ReduceDef) ->
426    ErrorMsg = io_lib:format("Invalid built-in reduce function "
427                             "for view `~s': ~s",
428                             [ViewName, ReduceDef]),
429    throw({error, iolist_to_binary(ErrorMsg)});
430validate_view_reduce_function(ViewName, ReduceDef) ->
431    case mapreduce:start_reduce_context([ReduceDef]) of
432    {ok, _Ctx} ->
433        ok;
434    {error, Reason} ->
435        ErrorMsg = io_lib:format("Syntax error in the reduce function of"
436                                 " the view `~s': ~s", [ViewName, Reason]),
437        throw({error, iolist_to_binary(ErrorMsg)})
438    end.
439