1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
3% Licensed under the Apache License, Version 2.0 (the "License"); you may not
4% use this file except in compliance with the License. You may obtain a copy of
5% the License at
6%
7%   http://www.apache.org/licenses/LICENSE-2.0
8%
9% Unless required by applicable law or agreed to in writing, software
10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12% License for the specific language governing permissions and limitations under
13% the License.
14
15-module(couch_set_view_mapreduce).
16
17-include("couch_db.hrl").
18-include_lib("couch_set_view/include/couch_set_view.hrl").
19
20-export([start_map_context/1, start_reduce_context/1]).
21-export([end_map_context/0, end_reduce_context/1]).
22-export([map/1, reduce/2, reduce/3, rereduce/2, rereduce/3]).
23-export([builtin_reduce/3]).
24-export([validate_ddoc_views/1]).
25
26-define(STATS_ERROR_MSG, <<"Builtin _stats function requires map values to be numbers">>).
27-define(SUM_ERROR_MSG,   <<"Builtin _sum function requires map values to be numbers">>).
28
29
30start_map_context(#set_view_group{views = Views}) ->
31    {ok, Ctx} = mapreduce:start_map_context([View#set_view.def || View <- Views]),
32    erlang:put(map_context, Ctx),
33    ok.
34
35
36end_map_context() ->
37    erlang:erase(map_context),
38    ok.
39
40
41start_reduce_context(#set_view_group{views = Views}) ->
42    lists:foreach(fun start_reduce_context/1, Views);
43
44start_reduce_context(SetView) ->
45    #set_view{
46        ref = Ref,
47        indexer = #mapreduce_view{
48            reduce_funs = RedFuns
49        }
50    } = SetView,
51    FunSrcs = lists:foldr(
52        fun({_Name, <<"_", _/binary>>}, Acc) ->
53            Acc;
54        ({_Name, Src}, Acc) ->
55            [Src | Acc]
56        end,
57        [], RedFuns),
58    case FunSrcs of
59    [] ->
60        ok;
61    _ ->
62        {ok, Ctx} = mapreduce:start_reduce_context(FunSrcs),
63        erlang:put({reduce_context, Ref}, Ctx),
64        ok
65    end.
66
67
68end_reduce_context(#set_view_group{views = Views}) ->
69    lists:foreach(fun end_reduce_context/1, Views);
70
71end_reduce_context(#set_view{ref = Ref}) ->
72    erlang:erase({reduce_context, Ref}),
73    ok.
74
75
76map(Doc) ->
77    Ctx = erlang:get(map_context),
78    {DocBody, DocMeta} = couch_doc:to_raw_json_binary_views(Doc),
79    case mapreduce:map_doc(Ctx, DocBody, DocMeta) of
80    {ok, _Results} = Ok ->
81        Ok;
82    Error ->
83        throw(Error)
84    end.
85
86
87reduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}}, _KVs) ->
88    {ok, []};
89reduce(SetView, KVs0) ->
90    #set_view{
91        ref = Ref,
92        indexer = #mapreduce_view{
93            reduce_funs = RedFuns
94        }
95    } = SetView,
96    RedFunSources = [FunSource || {_Name, FunSource} <- RedFuns],
97    {NativeFuns, JsFuns} = lists:partition(
98        fun(<<"_", _/binary>>) -> true; (_) -> false end,
99        RedFunSources),
100    KVs = encode_kvs(KVs0, []),
101    case JsFuns of
102    [] ->
103        builtin_reduce(reduce, NativeFuns, KVs, []);
104    _ ->
105        {ok, NativeResults} = builtin_reduce(reduce, NativeFuns, KVs, []),
106        Ctx = erlang:get({reduce_context, Ref}),
107        case mapreduce:reduce(Ctx, KVs) of
108        {ok, JsResults} ->
109            recombine_reduce_results(RedFunSources, JsResults, NativeResults, []);
110        Error ->
111            throw(Error)
112        end
113    end.
114
115
116reduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}},
117        _NthRed, _KVs) ->
118    {ok, []};
119reduce(SetView, NthRed, KVs0) ->
120    #set_view{
121        ref = Ref,
122        indexer = #mapreduce_view{
123            reduce_funs = RedFuns
124        }
125    } = SetView,
126    {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns),
127    KVs = encode_kvs(KVs0, []),
128    case FunSrc of
129    <<"_", _/binary>> ->
130        builtin_reduce(reduce, [FunSrc], KVs, []);
131    _ ->
132        Ctx = erlang:get({reduce_context, Ref}),
133        NthRed2 = lists:foldl(
134            fun({_, <<"_", _/binary>>}, Acc) ->
135                    Acc - 1;
136                (_, Acc) ->
137                    Acc
138            end,
139            NthRed,
140            Before),
141        case mapreduce:reduce(Ctx, NthRed2, KVs) of
142        {ok, ReduceValue} ->
143            {ok, [ReduceValue]};
144        Error ->
145            throw(Error)
146        end
147    end.
148
149
150rereduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}},
151        _ReducedValues) ->
152    {ok, []};
153rereduce(SetView, ReducedValues) ->
154    #set_view{
155        ref = Ref,
156        indexer = #mapreduce_view{
157            reduce_funs = RedFuns
158        }
159    } = SetView,
160    Grouped = group_reductions_results(ReducedValues),
161    Ctx = erlang:get({reduce_context, Ref}),
162    Results = lists:zipwith(
163        fun({native, FunSrc}, Values) ->
164            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []),
165            Result;
166        (Idx, Values) ->
167            case mapreduce:rereduce(Ctx, Idx, Values) of
168            {ok, Reduction} ->
169                Reduction;
170            Error ->
171                throw(Error)
172            end
173        end, reduce_fun_indexes(RedFuns), Grouped),
174    {ok, Results}.
175
176
177rereduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}},
178        _NthRed, _ReducedValues) ->
179    {ok, []};
180rereduce(SetView, NthRed, ReducedValues) ->
181    #set_view{
182        ref = Ref,
183        indexer = #mapreduce_view{
184            reduce_funs = RedFuns
185        }
186    } = SetView,
187    {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns),
188    [Values] = group_reductions_results(ReducedValues),
189    case FunSrc of
190    <<"_", _/binary>> ->
191        builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []);
192    _ ->
193        Ctx = erlang:get({reduce_context, Ref}),
194        NthRed2 = lists:foldl(
195            fun({_, <<"_", _/binary>>}, Acc) ->
196                    Acc - 1;
197                (_, Acc) ->
198                    Acc
199            end,
200            NthRed,
201            Before),
202        case mapreduce:rereduce(Ctx, NthRed2, Values) of
203        {ok, ReduceValue} ->
204            {ok, [ReduceValue]};
205        Error ->
206            throw(Error)
207        end
208    end.
209
210
211reduce_fun_indexes(RedFuns) ->
212    {L, _} = lists:mapfoldl(
213        fun({_Name, <<"_", _/binary>> = Src}, Idx) ->
214                {{native, Src}, Idx};
215            ({_Name, _JsSrc}, Idx) ->
216                {Idx, Idx + 1}
217        end,
218        1, RedFuns),
219    L.
220
221
222recombine_reduce_results([], [], [], Acc) ->
223    {ok, lists:reverse(Acc)};
224recombine_reduce_results([<<"_", _/binary>> | RedSrcs], JsResults, [BRes | BuiltinResults], Acc) ->
225    recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [BRes | Acc]);
226recombine_reduce_results([_JsFun | RedSrcs], [JsR | JsResults], BuiltinResults, Acc) ->
227    recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [JsR | Acc]).
228
229
230group_reductions_results([]) ->
231    [];
232group_reductions_results(List) ->
233    {Heads, Tails} = lists:foldl(
234        fun([H | T], {HAcc, TAcc}) ->
235            {[H | HAcc], [T | TAcc]}
236        end,
237        {[], []}, List),
238    case Tails of
239    [[] | _] -> % no tails left
240        [Heads];
241    _ ->
242        [Heads | group_reductions_results(Tails)]
243    end.
244
245
246builtin_reduce(ReduceType, FunSrcs, Values) ->
247    builtin_reduce(ReduceType, FunSrcs, Values, []).
248
249builtin_reduce(_Re, [], _KVs, Acc) ->
250    {ok, lists:reverse(Acc)};
251builtin_reduce(Re, [<<"_sum", _/binary>> | BuiltinReds], KVs, Acc) ->
252    Sum = builtin_sum_rows(KVs),
253    builtin_reduce(Re, BuiltinReds, KVs, [Sum | Acc]);
254builtin_reduce(reduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) ->
255    Json = ?JSON_ENCODE(length(KVs)),
256    builtin_reduce(reduce, BuiltinReds, KVs, [Json | Acc]);
257builtin_reduce(rereduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) ->
258    Count = builtin_sum_rows(KVs),
259    builtin_reduce(rereduce, BuiltinReds, KVs, [Count | Acc]);
260builtin_reduce(Re, [<<"_stats", _/binary>> | BuiltinReds], KVs, Acc) ->
261    Stats = builtin_stats(Re, KVs),
262    builtin_reduce(Re, BuiltinReds, KVs, [Stats | Acc]);
263builtin_reduce(_Re, [InvalidBuiltin | _BuiltinReds], _KVs, _Acc) ->
264    throw({error, <<"Invalid builtin reduce function: ", InvalidBuiltin/binary>>}).
265
266
267parse_number(NumberBin, ErrorMsg) when is_binary(NumberBin) ->
268    parse_number(?b2l(NumberBin), ErrorMsg);
269parse_number(NumberStr, ErrorMsg) ->
270    case (catch list_to_integer(NumberStr)) of
271    {'EXIT', {badarg, _Stack}} ->
272        case (catch list_to_float(NumberStr)) of
273        {'EXIT', {badarg, _Stack2}} ->
274            throw({error, ErrorMsg});
275        Float ->
276            Float
277        end;
278    Int ->
279        Int
280    end.
281
282
283builtin_sum_rows(KVs) ->
284    Result = lists:foldl(fun({_Key, Value}, SumAcc) ->
285        parse_number(Value, ?SUM_ERROR_MSG) + SumAcc
286    end, 0, KVs),
287    ?JSON_ENCODE(Result).
288
289
290builtin_stats(reduce, []) ->
291    <<"{}">>;
292
293builtin_stats(reduce, [{_, First0} | Rest]) ->
294    First = parse_number(First0, ?STATS_ERROR_MSG),
295    {Sum, Cnt, Min, Max, Sqr} = lists:foldl(fun({_K, V0}, {S, C , Mi, Ma, Sq}) ->
296        V = parse_number(V0, ?STATS_ERROR_MSG),
297        {S + V, C + 1, erlang:min(Mi, V), erlang:max(Ma, V), Sq + (V * V)}
298    end, {First, 1, First, First, First * First}, Rest),
299    Result = {[
300        {<<"sum">>, Sum},
301        {<<"count">>, Cnt},
302        {<<"min">>, Min},
303        {<<"max">>, Max},
304        {<<"sumsqr">>, Sqr}
305    ]},
306    ?JSON_ENCODE(Result);
307
308builtin_stats(rereduce, [{_, First} | Rest]) ->
309    {[{<<"sum">>, Sum0},
310      {<<"count">>, Cnt0},
311      {<<"min">>, Min0},
312      {<<"max">>, Max0},
313      {<<"sumsqr">>, Sqr0}]} = ?JSON_DECODE(First),
314    {Sum, Cnt, Min, Max, Sqr} = lists:foldl(fun({_K, Red}, {S, C, Mi, Ma, Sq}) ->
315        {[{<<"sum">>, Sum},
316          {<<"count">>, Cnt},
317          {<<"min">>, Min},
318          {<<"max">>, Max},
319          {<<"sumsqr">>, Sqr}]} = ?JSON_DECODE(Red),
320        {Sum + S, Cnt + C, erlang:min(Min, Mi), erlang:max(Max, Ma), Sqr + Sq}
321    end, {Sum0, Cnt0, Min0, Max0, Sqr0}, Rest),
322    Result = {[
323        {<<"sum">>, Sum},
324        {<<"count">>, Cnt},
325        {<<"min">>, Min},
326        {<<"max">>, Max},
327        {<<"sumsqr">>, Sqr}
328    ]},
329    ?JSON_ENCODE(Result).
330
331
332encode_kvs([], Acc) ->
333    lists:reverse(Acc);
334encode_kvs([KV | Rest], Acc) ->
335    {KeyDocId, <<_PartId:16, Value/binary>>} = KV,
336    {Key, _DocId} = couch_set_view_util:split_key_docid(KeyDocId),
337    encode_kvs(Rest, [{Key, Value} | Acc]).
338
339
340validate_ddoc_views(#doc{body = {Body}}) ->
341    Views = couch_util:get_value(<<"views">>, Body, {[]}),
342    case Views of
343    {L} when is_list(L) ->
344        ok;
345    _ ->
346        throw({error, <<"The field `views' is not a json object.">>})
347    end,
348    lists:foreach(
349        fun({ViewName, Value}) ->
350            validate_view_definition(ViewName, Value)
351        end,
352        element(1, Views)).
353
354
355validate_view_definition(<<"">>, _) ->
356    throw({error, <<"View name cannot be an empty string">>});
357validate_view_definition(ViewName, {ViewProps}) when is_list(ViewProps) ->
358    validate_view_name(ViewName, iolist_to_binary(io_lib:format(
359        "View name `~s` cannot have leading or trailing whitespace",
360        [ViewName]))),
361    MapDef = couch_util:get_value(<<"map">>, ViewProps),
362    validate_view_map_function(ViewName, MapDef),
363    ReduceDef = couch_util:get_value(<<"reduce">>, ViewProps),
364    validate_view_reduce_function(ViewName, ReduceDef);
365validate_view_definition(ViewName, _) ->
366    ErrorMsg = io_lib:format("Value for view `~s' is not "
367                             "a json object.", [ViewName]),
368    throw({error, iolist_to_binary(ErrorMsg)}).
369
370
371% Make sure the view name doesn't contain leading or trailing whitespace
372% (space, tab, newline or carriage return)
373validate_view_name(<<" ", _Rest/binary>>, ErrorMsg) ->
374    throw({error, ErrorMsg});
375validate_view_name(<<"\t", _Rest/binary>>, ErrorMsg) ->
376    throw({error, ErrorMsg});
377validate_view_name(<<"\n", _Rest/binary>>, ErrorMsg) ->
378    throw({error, ErrorMsg});
379validate_view_name(<<"\r", _Rest/binary>>, ErrorMsg) ->
380    throw({error, ErrorMsg});
381validate_view_name(Bin, ErrorMsg) when size(Bin) > 1 ->
382    Size = size(Bin) - 1 ,
383    <<_:Size/binary, Trailing/bits>> = Bin,
384    % Check for trailing whitespace
385    validate_view_name(Trailing, ErrorMsg);
386validate_view_name(_, _) ->
387    ok.
388
389
390validate_view_map_function(ViewName, undefined) ->
391    ErrorMsg = io_lib:format("The `map' field for the view `~s' is missing.",
392                             [ViewName]),
393    throw({error, iolist_to_binary(ErrorMsg)});
394validate_view_map_function(ViewName, MapDef) when not is_binary(MapDef) ->
395    ErrorMsg = io_lib:format("The `map' field of the view `~s' is not "
396                             "a json string.", [ViewName]),
397    throw({error, iolist_to_binary(ErrorMsg)});
398validate_view_map_function(ViewName, MapDef) ->
399    case mapreduce:start_map_context([MapDef]) of
400    {ok, _Ctx} ->
401        ok;
402    {error, Reason} ->
403        ErrorMsg = io_lib:format("Syntax error in the map function of"
404                                 " the view `~s': ~s", [ViewName, Reason]),
405        throw({error, iolist_to_binary(ErrorMsg)})
406    end.
407
408
409validate_view_reduce_function(_ViewName, undefined) ->
410    ok;
411validate_view_reduce_function(ViewName, ReduceDef) when not is_binary(ReduceDef) ->
412    ErrorMsg = io_lib:format("The `reduce' field of the view `~s' is not "
413                             "a json string.", [ViewName]),
414    throw({error, iolist_to_binary(ErrorMsg)});
415validate_view_reduce_function(_ViewName, <<"_count">>) ->
416    ok;
417validate_view_reduce_function(_ViewName, <<"_sum">>) ->
418    ok;
419validate_view_reduce_function(_ViewName, <<"_stats">>) ->
420    ok;
421validate_view_reduce_function(ViewName, <<"_", _/binary>> = ReduceDef) ->
422    ErrorMsg = io_lib:format("Invalid built-in reduce function "
423                             "for view `~s': ~s",
424                             [ViewName, ReduceDef]),
425    throw({error, iolist_to_binary(ErrorMsg)});
426validate_view_reduce_function(ViewName, ReduceDef) ->
427    case mapreduce:start_reduce_context([ReduceDef]) of
428    {ok, _Ctx} ->
429        ok;
430    {error, Reason} ->
431        ErrorMsg = io_lib:format("Syntax error in the reduce function of"
432                                 " the view `~s': ~s", [ViewName, Reason]),
433        throw({error, iolist_to_binary(ErrorMsg)})
434    end.
435