1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(couch_view_mapreduce). 14 15-include("couch_db.hrl"). 16 17-export([start_map_context/1, start_reduce_context/1]). 18-export([end_map_context/0, end_reduce_context/1]). 19-export([map/1, reduce/2, reduce/3, rereduce/2, rereduce/3]). 20 21 22start_map_context(#group{views = Views}) -> 23 {ok, Ctx} = mapreduce:start_map_context([View#view.def || View <- Views]), 24 erlang:put(map_context, Ctx), 25 ok. 26 27 28end_map_context() -> 29 erlang:erase(map_context), 30 ok. 31 32 33start_reduce_context(#group{views = Views}) -> 34 lists:foreach(fun start_reduce_context/1, Views); 35 36start_reduce_context(#view{ref = Ref, reduce_funs = RedFuns}) -> 37 FunSrcs = lists:foldr( 38 fun({_Name, <<"_", _/binary>>}, Acc) -> 39 Acc; 40 ({_Name, Src}, Acc) -> 41 [Src | Acc] 42 end, 43 [], RedFuns), 44 case FunSrcs of 45 [] -> 46 ok; 47 _ -> 48 {ok, Ctx} = mapreduce:start_reduce_context(FunSrcs), 49 erlang:put({reduce_context, Ref}, Ctx), 50 ok 51 end. 52 53 54end_reduce_context(#group{views = Views}) -> 55 lists:foreach(fun end_reduce_context/1, Views); 56 57end_reduce_context(#view{ref = Ref}) -> 58 erlang:erase({reduce_context, Ref}), 59 ok. 60 61 62map(Doc) -> 63 Ctx = erlang:get(map_context), 64 {DocBody, DocMeta} = couch_doc:to_raw_json_binary_views(Doc), 65 case mapreduce:map_doc(Ctx, DocBody, DocMeta) of 66 {ok, Results} -> 67 Fun = fun({error, _Reason} = Error) -> 68 Error; 69 (KvList) -> 70 lists:map( 71 fun({K,V}) -> {?JSON_DECODE(K), ?JSON_DECODE(V)} end, 72 KvList) 73 end, 74 {ok, lists:map(Fun, Results)}; 75 Error -> 76 throw(Error) 77 end. 78 79 80reduce(#view{reduce_funs = []}, _KVs) -> 81 {ok, []}; 82reduce(#view{ref = Ref, reduce_funs = RedFuns}, KVs0) -> 83 RedFunSources = [FunSource || {_Name, FunSource} <- RedFuns], 84 {NativeFuns, JsFuns} = lists:partition( 85 fun(<<"_", _/binary>>) -> true; (_) -> false end, 86 RedFunSources), 87 KVs = encode_kvs(KVs0, []), 88 case JsFuns of 89 [] -> 90 builtin_reduce(reduce, NativeFuns, KVs, []); 91 _ -> 92 {ok, NativeResults} = builtin_reduce(reduce, NativeFuns, KVs, []), 93 Ctx = erlang:get({reduce_context, Ref}), 94 case mapreduce:reduce(Ctx, KVs) of 95 {ok, JsResults0} -> 96 JsResults = [?JSON_DECODE(R) || R <- JsResults0], 97 recombine_reduce_results(RedFunSources, JsResults, NativeResults, []); 98 Error -> 99 throw(Error) 100 end 101 end. 102 103 104reduce(#view{reduce_funs = []}, _NthRed, _KVs) -> 105 {ok, []}; 106reduce(#view{ref = Ref, reduce_funs = RedFuns}, NthRed, KVs0) -> 107 {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns), 108 KVs = encode_kvs(KVs0, []), 109 case FunSrc of 110 <<"_", _/binary>> -> 111 builtin_reduce(reduce, [FunSrc], KVs, []); 112 _ -> 113 Ctx = erlang:get({reduce_context, Ref}), 114 NthRed2 = lists:foldl( 115 fun({_, <<"_", _/binary>>}, Acc) -> 116 Acc - 1; 117 (_, Acc) -> 118 Acc 119 end, 120 NthRed, 121 Before), 122 case mapreduce:reduce(Ctx, NthRed2, KVs) of 123 {ok, ReduceValue} -> 124 {ok, [?JSON_DECODE(ReduceValue)]}; 125 Error -> 126 throw(Error) 127 end 128 end. 129 130 131rereduce(#view{reduce_funs = []}, _ReducedValues) -> 132 {ok, []}; 133rereduce(#view{ref = Ref, reduce_funs = RedFuns}, ReducedValues) -> 134 Grouped = group_reductions_results(ReducedValues), 135 Ctx = erlang:get({reduce_context, Ref}), 136 Results = lists:zipwith( 137 fun({native, FunSrc}, Values) -> 138 {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []), 139 Result; 140 (Idx, Values) -> 141 case mapreduce:rereduce(Ctx, Idx, [?JSON_ENCODE(V) || V <- Values]) of 142 {ok, Reduction} -> 143 ?JSON_DECODE(Reduction); 144 Error -> 145 throw(Error) 146 end 147 end, reduce_fun_indexes(RedFuns), Grouped), 148 {ok, Results}. 149 150 151rereduce(#view{reduce_funs = []}, _NthRed, _ReducedValues) -> 152 {ok, []}; 153rereduce(#view{ref = Ref, reduce_funs = RedFuns}, NthRed, ReducedValues) -> 154 {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns), 155 [Values] = group_reductions_results(ReducedValues), 156 case FunSrc of 157 <<"_", _/binary>> -> 158 builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []); 159 _ -> 160 Ctx = erlang:get({reduce_context, Ref}), 161 NthRed2 = lists:foldl( 162 fun({_, <<"_", _/binary>>}, Acc) -> 163 Acc - 1; 164 (_, Acc) -> 165 Acc 166 end, 167 NthRed, 168 Before), 169 case mapreduce:rereduce(Ctx, NthRed2, [?JSON_ENCODE(V) || V <- Values]) of 170 {ok, ReduceValue} -> 171 {ok, [?JSON_DECODE(ReduceValue)]}; 172 Error -> 173 throw(Error) 174 end 175 end. 176 177 178reduce_fun_indexes(RedFuns) -> 179 {L, _} = lists:mapfoldl( 180 fun({_Name, <<"_", _/binary>> = Src}, Idx) -> 181 {{native, Src}, Idx}; 182 ({_Name, _JsSrc}, Idx) -> 183 {Idx, Idx + 1} 184 end, 185 1, RedFuns), 186 L. 187 188 189recombine_reduce_results([], [], [], Acc) -> 190 {ok, lists:reverse(Acc)}; 191recombine_reduce_results([<<"_", _/binary>> | RedSrcs], JsResults, [BRes | BuiltinResults], Acc) -> 192 recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [BRes | Acc]); 193recombine_reduce_results([_JsFun | RedSrcs], [JsR | JsResults], BuiltinResults, Acc) -> 194 recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [JsR | Acc]). 195 196 197group_reductions_results([]) -> 198 []; 199group_reductions_results(List) -> 200 {Heads, Tails} = lists:foldl( 201 fun([H | T], {HAcc, TAcc}) -> 202 {[H | HAcc], [T | TAcc]} 203 end, 204 {[], []}, List), 205 case Tails of 206 [[] | _] -> % no tails left 207 [Heads]; 208 _ -> 209 [Heads | group_reductions_results(Tails)] 210 end. 211 212 213builtin_reduce(_Re, [], _KVs, Acc) -> 214 {ok, lists:reverse(Acc)}; 215builtin_reduce(Re, [<<"_sum", _/binary>> | BuiltinReds], KVs0, Acc) -> 216 case Re of 217 reduce -> 218 KVs = [{K, ?JSON_DECODE(V)} || {K, V} <- KVs0]; 219 rereduce -> 220 KVs = KVs0 221 end, 222 Sum = builtin_sum_rows(KVs), 223 builtin_reduce(Re, BuiltinReds, KVs, [Sum | Acc]); 224builtin_reduce(reduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) -> 225 Count = length(KVs), 226 builtin_reduce(reduce, BuiltinReds, KVs, [Count | Acc]); 227builtin_reduce(rereduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) -> 228 Count = builtin_sum_rows(KVs), 229 builtin_reduce(rereduce, BuiltinReds, KVs, [Count | Acc]); 230builtin_reduce(Re, [<<"_stats", _/binary>> | BuiltinReds], KVs0, Acc) -> 231 case Re of 232 reduce -> 233 KVs = [{K, ?JSON_DECODE(V)} || {K, V} <- KVs0]; 234 rereduce -> 235 KVs = KVs0 236 end, 237 Stats = builtin_stats(Re, KVs), 238 builtin_reduce(Re, BuiltinReds, KVs, [Stats | Acc]); 239builtin_reduce(_Re, [InvalidBuiltin | _BuiltinReds], _KVs, _Acc) -> 240 throw({error, <<"Invalid builtin reduce function: ", InvalidBuiltin/binary>>}). 241 242 243builtin_sum_rows(KVs) -> 244 lists:foldl(fun 245 ({_Key, Value}, Acc) when is_number(Value), is_number(Acc) -> 246 Acc + Value; 247 ({_Key, Value}, Acc) when is_list(Value), is_list(Acc) -> 248 sum_terms(Acc, Value); 249 ({_Key, Value}, Acc) when is_number(Value), is_list(Acc) -> 250 sum_terms(Acc, [Value]); 251 ({_Key, Value}, Acc) when is_list(Value), is_number(Acc) -> 252 sum_terms([Acc], Value); 253 (_Else, _Acc) -> 254 throw({error, <<"Builtin _sum function requires map values to be numbers or lists of numbers">>}) 255 end, 0, KVs). 256 257sum_terms([], []) -> 258 []; 259sum_terms([_ | _] = Xs, []) -> 260 Xs; 261sum_terms([], [_ | _] = Ys) -> 262 Ys; 263sum_terms([X | Xs], [Y | Ys]) when is_number(X), is_number(Y) -> 264 [X + Y | sum_terms(Xs, Ys)]; 265sum_terms(_, _) -> 266 throw({error, <<"Builtin _sum function requires map values to be numbers or lists of numbers">>}). 267 268builtin_stats(reduce, []) -> 269 {[]}; 270builtin_stats(reduce, [{_, First} | Rest]) when is_number(First) -> 271 Stats = lists:foldl(fun({_K, V}, {S, C , Mi, Ma, Sq}) when is_number(V) -> 272 {S + V, C + 1, erlang:min(Mi, V), erlang:max(Ma, V), Sq + (V * V)}; 273 (_, _) -> 274 throw({error, <<"Builtin _stats function requires map values to be numbers">>}) 275 end, {First, 1, First, First, First * First}, Rest), 276 {Sum, Cnt, Min, Max, Sqr} = Stats, 277 {[{<<"sum">>, Sum}, {<<"count">>, Cnt}, {<<"min">>, Min}, {<<"max">>, Max}, {<<"sumsqr">>, Sqr}]}; 278builtin_stats(reduce, KVs) when is_list(KVs) -> 279 throw({error, <<"Builtin _stats function requires map values to be numbers">>}); 280 281builtin_stats(rereduce, [{_, First} | Rest]) -> 282 {[{<<"sum">>, Sum0}, {<<"count">>, Cnt0}, {<<"min">>, Min0}, {<<"max">>, Max0}, {<<"sumsqr">>, Sqr0}]} = First, 283 Stats = lists:foldl(fun({_K, Red}, {S, C, Mi, Ma, Sq}) -> 284 {[{<<"sum">>, Sum}, {<<"count">>, Cnt}, {<<"min">>, Min}, {<<"max">>, Max}, {<<"sumsqr">>, Sqr}]} = Red, 285 {Sum + S, Cnt + C, erlang:min(Min, Mi), erlang:max(Max, Ma), Sqr + Sq} 286 end, {Sum0, Cnt0, Min0, Max0, Sqr0}, Rest), 287 {Sum, Cnt, Min, Max, Sqr} = Stats, 288 {[{<<"sum">>, Sum}, {<<"count">>, Cnt}, {<<"min">>, Min}, {<<"max">>, Max}, {<<"sumsqr">>, Sqr}]}. 289 290 291encode_kvs([], Acc) -> 292 lists:reverse(Acc); 293encode_kvs([KV | Rest], Acc) -> 294 {{Key, Id}, Value} = KV, 295 NKV = {?JSON_ENCODE([Key, Id]), ?JSON_ENCODE(Value)}, 296 encode_kvs(Rest, [NKV | Acc]). 297