1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_view_mapreduce).
14
15-include("couch_db.hrl").
16
17-export([start_map_context/1, start_reduce_context/1]).
18-export([end_map_context/0, end_reduce_context/1]).
19-export([map/1, reduce/2, reduce/3, rereduce/2, rereduce/3]).
20
21
22start_map_context(#group{views = Views}) ->
23    {ok, Ctx} = mapreduce:start_map_context([View#view.def || View <- Views]),
24    erlang:put(map_context, Ctx),
25    ok.
26
27
28end_map_context() ->
29    erlang:erase(map_context),
30    ok.
31
32
33start_reduce_context(#group{views = Views}) ->
34    lists:foreach(fun start_reduce_context/1, Views);
35
36start_reduce_context(#view{ref = Ref, reduce_funs = RedFuns}) ->
37    FunSrcs = lists:foldr(
38        fun({_Name, <<"_", _/binary>>}, Acc) ->
39            Acc;
40        ({_Name, Src}, Acc) ->
41            [Src | Acc]
42        end,
43        [], RedFuns),
44    case FunSrcs of
45    [] ->
46        ok;
47    _ ->
48        {ok, Ctx} = mapreduce:start_reduce_context(FunSrcs),
49        erlang:put({reduce_context, Ref}, Ctx),
50        ok
51    end.
52
53
54end_reduce_context(#group{views = Views}) ->
55    lists:foreach(fun end_reduce_context/1, Views);
56
57end_reduce_context(#view{ref = Ref}) ->
58    erlang:erase({reduce_context, Ref}),
59    ok.
60
61
62map(Doc) ->
63    Ctx = erlang:get(map_context),
64    {DocBody, DocMeta} = couch_doc:to_raw_json_binary_views(Doc),
65    case mapreduce:map_doc(Ctx, DocBody, DocMeta) of
66    {ok, Results} ->
67        Fun = fun({error, _Reason} = Error) ->
68                    Error;
69            (KvList) ->
70                    lists:map(
71                        fun({K,V}) -> {?JSON_DECODE(K), ?JSON_DECODE(V)} end,
72                        KvList)
73        end,
74        {ok, lists:map(Fun, Results)};
75    Error ->
76        throw(Error)
77    end.
78
79
80reduce(#view{reduce_funs = []}, _KVs) ->
81    {ok, []};
82reduce(#view{ref = Ref, reduce_funs = RedFuns}, KVs0) ->
83    RedFunSources = [FunSource || {_Name, FunSource} <- RedFuns],
84    {NativeFuns, JsFuns} = lists:partition(
85        fun(<<"_", _/binary>>) -> true; (_) -> false end,
86        RedFunSources),
87    KVs = encode_kvs(KVs0, []),
88    case JsFuns of
89    [] ->
90        builtin_reduce(reduce, NativeFuns, KVs, []);
91    _ ->
92        {ok, NativeResults} = builtin_reduce(reduce, NativeFuns, KVs, []),
93        Ctx = erlang:get({reduce_context, Ref}),
94        case mapreduce:reduce(Ctx, KVs) of
95        {ok, JsResults0} ->
96            JsResults = [?JSON_DECODE(R) || R <- JsResults0],
97            recombine_reduce_results(RedFunSources, JsResults, NativeResults, []);
98        Error ->
99            throw(Error)
100        end
101    end.
102
103
104reduce(#view{reduce_funs = []}, _NthRed, _KVs) ->
105    {ok, []};
106reduce(#view{ref = Ref, reduce_funs = RedFuns}, NthRed, KVs0) ->
107    {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns),
108    KVs = encode_kvs(KVs0, []),
109    case FunSrc of
110    <<"_", _/binary>> ->
111        builtin_reduce(reduce, [FunSrc], KVs, []);
112    _ ->
113        Ctx = erlang:get({reduce_context, Ref}),
114        NthRed2 = lists:foldl(
115            fun({_, <<"_", _/binary>>}, Acc) ->
116                    Acc - 1;
117                (_, Acc) ->
118                    Acc
119            end,
120            NthRed,
121            Before),
122        case mapreduce:reduce(Ctx, NthRed2, KVs) of
123        {ok, ReduceValue} ->
124            {ok, [?JSON_DECODE(ReduceValue)]};
125        Error ->
126            throw(Error)
127        end
128    end.
129
130
131rereduce(#view{reduce_funs = []}, _ReducedValues) ->
132    {ok, []};
133rereduce(#view{ref = Ref, reduce_funs = RedFuns}, ReducedValues) ->
134    Grouped = group_reductions_results(ReducedValues),
135    Ctx = erlang:get({reduce_context, Ref}),
136    Results = lists:zipwith(
137        fun({native, FunSrc}, Values) ->
138            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []),
139            Result;
140        (Idx, Values) ->
141            case mapreduce:rereduce(Ctx, Idx, [?JSON_ENCODE(V) || V <- Values]) of
142            {ok, Reduction} ->
143                ?JSON_DECODE(Reduction);
144            Error ->
145                throw(Error)
146            end
147        end, reduce_fun_indexes(RedFuns), Grouped),
148    {ok, Results}.
149
150
151rereduce(#view{reduce_funs = []}, _NthRed, _ReducedValues) ->
152    {ok, []};
153rereduce(#view{ref = Ref, reduce_funs = RedFuns}, NthRed, ReducedValues) ->
154    {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns),
155    [Values] = group_reductions_results(ReducedValues),
156    case FunSrc of
157    <<"_", _/binary>> ->
158        builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []);
159    _ ->
160        Ctx = erlang:get({reduce_context, Ref}),
161        NthRed2 = lists:foldl(
162            fun({_, <<"_", _/binary>>}, Acc) ->
163                    Acc - 1;
164                (_, Acc) ->
165                    Acc
166            end,
167            NthRed,
168            Before),
169        case mapreduce:rereduce(Ctx, NthRed2, [?JSON_ENCODE(V) || V <- Values]) of
170        {ok, ReduceValue} ->
171            {ok, [?JSON_DECODE(ReduceValue)]};
172        Error ->
173            throw(Error)
174        end
175    end.
176
177
178reduce_fun_indexes(RedFuns) ->
179    {L, _} = lists:mapfoldl(
180        fun({_Name, <<"_", _/binary>> = Src}, Idx) ->
181                {{native, Src}, Idx};
182            ({_Name, _JsSrc}, Idx) ->
183                {Idx, Idx + 1}
184        end,
185        1, RedFuns),
186    L.
187
188
189recombine_reduce_results([], [], [], Acc) ->
190    {ok, lists:reverse(Acc)};
191recombine_reduce_results([<<"_", _/binary>> | RedSrcs], JsResults, [BRes | BuiltinResults], Acc) ->
192    recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [BRes | Acc]);
193recombine_reduce_results([_JsFun | RedSrcs], [JsR | JsResults], BuiltinResults, Acc) ->
194    recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [JsR | Acc]).
195
196
197group_reductions_results([]) ->
198    [];
199group_reductions_results(List) ->
200    {Heads, Tails} = lists:foldl(
201        fun([H | T], {HAcc, TAcc}) ->
202            {[H | HAcc], [T | TAcc]}
203        end,
204        {[], []}, List),
205    case Tails of
206    [[] | _] -> % no tails left
207        [Heads];
208    _ ->
209        [Heads | group_reductions_results(Tails)]
210    end.
211
212
213builtin_reduce(_Re, [], _KVs, Acc) ->
214    {ok, lists:reverse(Acc)};
215builtin_reduce(Re, [<<"_sum", _/binary>> | BuiltinReds], KVs0, Acc) ->
216    case Re of
217    reduce ->
218        KVs = [{K, ?JSON_DECODE(V)} || {K, V} <- KVs0];
219    rereduce ->
220        KVs = KVs0
221    end,
222    Sum = builtin_sum_rows(KVs),
223    builtin_reduce(Re, BuiltinReds, KVs, [Sum | Acc]);
224builtin_reduce(reduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) ->
225    Count = length(KVs),
226    builtin_reduce(reduce, BuiltinReds, KVs, [Count | Acc]);
227builtin_reduce(rereduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) ->
228    Count = builtin_sum_rows(KVs),
229    builtin_reduce(rereduce, BuiltinReds, KVs, [Count | Acc]);
230builtin_reduce(Re, [<<"_stats", _/binary>> | BuiltinReds], KVs0, Acc) ->
231    case Re of
232    reduce ->
233        KVs = [{K, ?JSON_DECODE(V)} || {K, V} <- KVs0];
234    rereduce ->
235        KVs = KVs0
236    end,
237    Stats = builtin_stats(Re, KVs),
238    builtin_reduce(Re, BuiltinReds, KVs, [Stats | Acc]);
239builtin_reduce(_Re, [InvalidBuiltin | _BuiltinReds], _KVs, _Acc) ->
240    throw({error, <<"Invalid builtin reduce function: ", InvalidBuiltin/binary>>}).
241
242
243builtin_sum_rows(KVs) ->
244    lists:foldl(fun
245        ({_Key, Value}, Acc) when is_number(Value), is_number(Acc) ->
246            Acc + Value;
247        ({_Key, Value}, Acc) when is_list(Value), is_list(Acc) ->
248            sum_terms(Acc, Value);
249        ({_Key, Value}, Acc) when is_number(Value), is_list(Acc) ->
250            sum_terms(Acc, [Value]);
251        ({_Key, Value}, Acc) when is_list(Value), is_number(Acc) ->
252            sum_terms([Acc], Value);
253        (_Else, _Acc) ->
254            throw({error, <<"Builtin _sum function requires map values to be numbers or lists of numbers">>})
255    end, 0, KVs).
256
257sum_terms([], []) ->
258    [];
259sum_terms([_ | _] = Xs, []) ->
260    Xs;
261sum_terms([], [_ | _] = Ys) ->
262    Ys;
263sum_terms([X | Xs], [Y | Ys]) when is_number(X), is_number(Y) ->
264    [X + Y | sum_terms(Xs, Ys)];
265sum_terms(_, _) ->
266    throw({error, <<"Builtin _sum function requires map values to be numbers or lists of numbers">>}).
267
268builtin_stats(reduce, []) ->
269    {[]};
270builtin_stats(reduce, [{_, First} | Rest]) when is_number(First) ->
271    Stats = lists:foldl(fun({_K, V}, {S, C , Mi, Ma, Sq}) when is_number(V) ->
272        {S + V, C + 1, erlang:min(Mi, V), erlang:max(Ma, V), Sq + (V * V)};
273    (_, _) ->
274        throw({error, <<"Builtin _stats function requires map values to be numbers">>})
275    end, {First, 1, First, First, First * First}, Rest),
276    {Sum, Cnt, Min, Max, Sqr} = Stats,
277    {[{<<"sum">>, Sum}, {<<"count">>, Cnt}, {<<"min">>, Min}, {<<"max">>, Max}, {<<"sumsqr">>, Sqr}]};
278builtin_stats(reduce, KVs) when is_list(KVs) ->
279    throw({error, <<"Builtin _stats function requires map values to be numbers">>});
280
281builtin_stats(rereduce, [{_, First} | Rest]) ->
282    {[{<<"sum">>, Sum0}, {<<"count">>, Cnt0}, {<<"min">>, Min0}, {<<"max">>, Max0}, {<<"sumsqr">>, Sqr0}]} = First,
283    Stats = lists:foldl(fun({_K, Red}, {S, C, Mi, Ma, Sq}) ->
284        {[{<<"sum">>, Sum}, {<<"count">>, Cnt}, {<<"min">>, Min}, {<<"max">>, Max}, {<<"sumsqr">>, Sqr}]} = Red,
285        {Sum + S, Cnt + C, erlang:min(Min, Mi), erlang:max(Max, Ma), Sqr + Sq}
286    end, {Sum0, Cnt0, Min0, Max0, Sqr0}, Rest),
287    {Sum, Cnt, Min, Max, Sqr} = Stats,
288    {[{<<"sum">>, Sum}, {<<"count">>, Cnt}, {<<"min">>, Min}, {<<"max">>, Max}, {<<"sumsqr">>, Sqr}]}.
289
290
291encode_kvs([], Acc) ->
292    lists:reverse(Acc);
293encode_kvs([KV | Rest], Acc) ->
294    {{Key, Id}, Value} = KV,
295    NKV = {?JSON_ENCODE([Key, Id]), ?JSON_ENCODE(Value)},
296    encode_kvs(Rest, [NKV | Acc]).
297