1% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 3% Licensed under the Apache License, Version 2.0 (the "License"); you may not 4% use this file except in compliance with the License. You may obtain a copy of 5% the License at 6% 7% http://www.apache.org/licenses/LICENSE-2.0 8% 9% Unless required by applicable law or agreed to in writing, software 10% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12% License for the specific language governing permissions and limitations under 13% the License. 14 15-module(couch_set_view_mapreduce). 16 17-include("couch_db.hrl"). 18-include_lib("couch_set_view/include/couch_set_view.hrl"). 19 20-export([start_map_context/1, start_reduce_context/1]). 21-export([end_map_context/0, end_reduce_context/1]). 22-export([map/1, reduce/2, reduce/3, rereduce/2, rereduce/3]). 23-export([builtin_reduce/3]). 24-export([validate_ddoc_views/1]). 25 26-define(STATS_ERROR_MSG, <<"Builtin _stats function requires map values to be numbers">>). 27-define(SUM_ERROR_MSG, <<"Builtin _sum function requires map values to be numbers">>). 28 29 30start_map_context(#set_view_group{views = Views}) -> 31 {ok, Ctx} = mapreduce:start_map_context([View#set_view.def || View <- Views]), 32 erlang:put(map_context, Ctx), 33 ok. 34 35 36end_map_context() -> 37 erlang:erase(map_context), 38 ok. 39 40 41start_reduce_context(#set_view_group{views = Views}) -> 42 lists:foreach(fun start_reduce_context/1, Views); 43 44start_reduce_context(SetView) -> 45 #set_view{ 46 ref = Ref, 47 indexer = #mapreduce_view{ 48 reduce_funs = RedFuns 49 } 50 } = SetView, 51 FunSrcs = lists:foldr( 52 fun({_Name, <<"_", _/binary>>}, Acc) -> 53 Acc; 54 ({_Name, Src}, Acc) -> 55 [Src | Acc] 56 end, 57 [], RedFuns), 58 case FunSrcs of 59 [] -> 60 ok; 61 _ -> 62 {ok, Ctx} = mapreduce:start_reduce_context(FunSrcs), 63 erlang:put({reduce_context, Ref}, Ctx), 64 ok 65 end. 66 67 68end_reduce_context(#set_view_group{views = Views}) -> 69 lists:foreach(fun end_reduce_context/1, Views); 70 71end_reduce_context(#set_view{ref = Ref}) -> 72 erlang:erase({reduce_context, Ref}), 73 ok. 74 75 76map(Doc) -> 77 Ctx = erlang:get(map_context), 78 {DocBody, DocMeta} = couch_doc:to_raw_json_binary_views(Doc), 79 case mapreduce:map_doc(Ctx, DocBody, DocMeta) of 80 {ok, _Results} = Ok -> 81 Ok; 82 Error -> 83 throw(Error) 84 end. 85 86 87reduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}}, _KVs) -> 88 {ok, []}; 89reduce(SetView, KVs0) -> 90 #set_view{ 91 ref = Ref, 92 indexer = #mapreduce_view{ 93 reduce_funs = RedFuns 94 } 95 } = SetView, 96 RedFunSources = [FunSource || {_Name, FunSource} <- RedFuns], 97 {NativeFuns, JsFuns} = lists:partition( 98 fun(<<"_", _/binary>>) -> true; (_) -> false end, 99 RedFunSources), 100 KVs = encode_kvs(KVs0, []), 101 case JsFuns of 102 [] -> 103 builtin_reduce(reduce, NativeFuns, KVs, []); 104 _ -> 105 {ok, NativeResults} = builtin_reduce(reduce, NativeFuns, KVs, []), 106 Ctx = erlang:get({reduce_context, Ref}), 107 case mapreduce:reduce(Ctx, KVs) of 108 {ok, JsResults} -> 109 recombine_reduce_results(RedFunSources, JsResults, NativeResults, []); 110 Error -> 111 throw(Error) 112 end 113 end. 114 115 116reduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}}, 117 _NthRed, _KVs) -> 118 {ok, []}; 119reduce(SetView, NthRed, KVs0) -> 120 #set_view{ 121 ref = Ref, 122 indexer = #mapreduce_view{ 123 reduce_funs = RedFuns 124 } 125 } = SetView, 126 {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns), 127 KVs = encode_kvs(KVs0, []), 128 case FunSrc of 129 <<"_", _/binary>> -> 130 builtin_reduce(reduce, [FunSrc], KVs, []); 131 _ -> 132 Ctx = erlang:get({reduce_context, Ref}), 133 NthRed2 = lists:foldl( 134 fun({_, <<"_", _/binary>>}, Acc) -> 135 Acc - 1; 136 (_, Acc) -> 137 Acc 138 end, 139 NthRed, 140 Before), 141 case mapreduce:reduce(Ctx, NthRed2, KVs) of 142 {ok, ReduceValue} -> 143 {ok, [ReduceValue]}; 144 Error -> 145 throw(Error) 146 end 147 end. 148 149 150rereduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}}, 151 _ReducedValues) -> 152 {ok, []}; 153rereduce(SetView, ReducedValues) -> 154 #set_view{ 155 ref = Ref, 156 indexer = #mapreduce_view{ 157 reduce_funs = RedFuns 158 } 159 } = SetView, 160 Grouped = group_reductions_results(ReducedValues), 161 Ctx = erlang:get({reduce_context, Ref}), 162 Results = lists:zipwith( 163 fun({native, FunSrc}, Values) -> 164 {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []), 165 Result; 166 (Idx, Values) -> 167 case mapreduce:rereduce(Ctx, Idx, Values) of 168 {ok, Reduction} -> 169 Reduction; 170 Error -> 171 throw(Error) 172 end 173 end, reduce_fun_indexes(RedFuns), Grouped), 174 {ok, Results}. 175 176 177rereduce(#set_view{indexer = #mapreduce_view{reduce_funs = []}}, 178 _NthRed, _ReducedValues) -> 179 {ok, []}; 180rereduce(SetView, NthRed, ReducedValues) -> 181 #set_view{ 182 ref = Ref, 183 indexer = #mapreduce_view{ 184 reduce_funs = RedFuns 185 } 186 } = SetView, 187 {Before, [{_Name, FunSrc} | _]} = lists:split(NthRed - 1, RedFuns), 188 [Values] = group_reductions_results(ReducedValues), 189 case FunSrc of 190 <<"_", _/binary>> -> 191 builtin_reduce(rereduce, [FunSrc], [{[], V} || V <- Values], []); 192 _ -> 193 Ctx = erlang:get({reduce_context, Ref}), 194 NthRed2 = lists:foldl( 195 fun({_, <<"_", _/binary>>}, Acc) -> 196 Acc - 1; 197 (_, Acc) -> 198 Acc 199 end, 200 NthRed, 201 Before), 202 case mapreduce:rereduce(Ctx, NthRed2, Values) of 203 {ok, ReduceValue} -> 204 {ok, [ReduceValue]}; 205 Error -> 206 throw(Error) 207 end 208 end. 209 210 211reduce_fun_indexes(RedFuns) -> 212 {L, _} = lists:mapfoldl( 213 fun({_Name, <<"_", _/binary>> = Src}, Idx) -> 214 {{native, Src}, Idx}; 215 ({_Name, _JsSrc}, Idx) -> 216 {Idx, Idx + 1} 217 end, 218 1, RedFuns), 219 L. 220 221 222recombine_reduce_results([], [], [], Acc) -> 223 {ok, lists:reverse(Acc)}; 224recombine_reduce_results([<<"_", _/binary>> | RedSrcs], JsResults, [BRes | BuiltinResults], Acc) -> 225 recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [BRes | Acc]); 226recombine_reduce_results([_JsFun | RedSrcs], [JsR | JsResults], BuiltinResults, Acc) -> 227 recombine_reduce_results(RedSrcs, JsResults, BuiltinResults, [JsR | Acc]). 228 229 230group_reductions_results([]) -> 231 []; 232group_reductions_results(List) -> 233 {Heads, Tails} = lists:foldl( 234 fun([H | T], {HAcc, TAcc}) -> 235 {[H | HAcc], [T | TAcc]} 236 end, 237 {[], []}, List), 238 case Tails of 239 [[] | _] -> % no tails left 240 [Heads]; 241 _ -> 242 [Heads | group_reductions_results(Tails)] 243 end. 244 245 246builtin_reduce(ReduceType, FunSrcs, Values) -> 247 builtin_reduce(ReduceType, FunSrcs, Values, []). 248 249builtin_reduce(_Re, [], _KVs, Acc) -> 250 {ok, lists:reverse(Acc)}; 251builtin_reduce(Re, [<<"_sum", _/binary>> | BuiltinReds], KVs, Acc) -> 252 Sum = builtin_sum_rows(KVs), 253 builtin_reduce(Re, BuiltinReds, KVs, [Sum | Acc]); 254builtin_reduce(reduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) -> 255 Json = ?JSON_ENCODE(length(KVs)), 256 builtin_reduce(reduce, BuiltinReds, KVs, [Json | Acc]); 257builtin_reduce(rereduce, [<<"_count", _/binary>> | BuiltinReds], KVs, Acc) -> 258 Count = builtin_sum_rows(KVs), 259 builtin_reduce(rereduce, BuiltinReds, KVs, [Count | Acc]); 260builtin_reduce(Re, [<<"_stats", _/binary>> | BuiltinReds], KVs, Acc) -> 261 Stats = builtin_stats(Re, KVs), 262 builtin_reduce(Re, BuiltinReds, KVs, [Stats | Acc]); 263builtin_reduce(_Re, [InvalidBuiltin | _BuiltinReds], _KVs, _Acc) -> 264 throw({error, <<"Invalid builtin reduce function: ", InvalidBuiltin/binary>>}). 265 266 267parse_number(NumberBin, ErrorMsg) when is_binary(NumberBin) -> 268 parse_number(?b2l(NumberBin), ErrorMsg); 269parse_number(NumberStr, ErrorMsg) -> 270 case (catch list_to_integer(NumberStr)) of 271 {'EXIT', {badarg, _Stack}} -> 272 case (catch list_to_float(NumberStr)) of 273 {'EXIT', {badarg, _Stack2}} -> 274 throw({error, ErrorMsg}); 275 Float -> 276 Float 277 end; 278 Int -> 279 Int 280 end. 281 282 283builtin_sum_rows(KVs) -> 284 Result = lists:foldl(fun({_Key, Value}, SumAcc) -> 285 parse_number(Value, ?SUM_ERROR_MSG) + SumAcc 286 end, 0, KVs), 287 ?JSON_ENCODE(Result). 288 289 290builtin_stats(reduce, []) -> 291 <<"{}">>; 292 293builtin_stats(reduce, [{_, First0} | Rest]) -> 294 First = parse_number(First0, ?STATS_ERROR_MSG), 295 {Sum, Cnt, Min, Max, Sqr} = lists:foldl(fun({_K, V0}, {S, C , Mi, Ma, Sq}) -> 296 V = parse_number(V0, ?STATS_ERROR_MSG), 297 {S + V, C + 1, erlang:min(Mi, V), erlang:max(Ma, V), Sq + (V * V)} 298 end, {First, 1, First, First, First * First}, Rest), 299 Result = {[ 300 {<<"sum">>, Sum}, 301 {<<"count">>, Cnt}, 302 {<<"min">>, Min}, 303 {<<"max">>, Max}, 304 {<<"sumsqr">>, Sqr} 305 ]}, 306 ?JSON_ENCODE(Result); 307 308builtin_stats(rereduce, [{_, First} | Rest]) -> 309 {[{<<"sum">>, Sum0}, 310 {<<"count">>, Cnt0}, 311 {<<"min">>, Min0}, 312 {<<"max">>, Max0}, 313 {<<"sumsqr">>, Sqr0}]} = ?JSON_DECODE(First), 314 {Sum, Cnt, Min, Max, Sqr} = lists:foldl(fun({_K, Red}, {S, C, Mi, Ma, Sq}) -> 315 {[{<<"sum">>, Sum}, 316 {<<"count">>, Cnt}, 317 {<<"min">>, Min}, 318 {<<"max">>, Max}, 319 {<<"sumsqr">>, Sqr}]} = ?JSON_DECODE(Red), 320 {Sum + S, Cnt + C, erlang:min(Min, Mi), erlang:max(Max, Ma), Sqr + Sq} 321 end, {Sum0, Cnt0, Min0, Max0, Sqr0}, Rest), 322 Result = {[ 323 {<<"sum">>, Sum}, 324 {<<"count">>, Cnt}, 325 {<<"min">>, Min}, 326 {<<"max">>, Max}, 327 {<<"sumsqr">>, Sqr} 328 ]}, 329 ?JSON_ENCODE(Result). 330 331 332encode_kvs([], Acc) -> 333 lists:reverse(Acc); 334encode_kvs([KV | Rest], Acc) -> 335 {KeyDocId, <<_PartId:16, Value/binary>>} = KV, 336 {Key, _DocId} = couch_set_view_util:split_key_docid(KeyDocId), 337 encode_kvs(Rest, [{Key, Value} | Acc]). 338 339 340validate_ddoc_views(#doc{body = {Body}}) -> 341 Views = couch_util:get_value(<<"views">>, Body, {[]}), 342 case Views of 343 {L} when is_list(L) -> 344 ok; 345 _ -> 346 throw({error, <<"The field `views' is not a json object.">>}) 347 end, 348 lists:foreach( 349 fun({ViewName, Value}) -> 350 validate_view_definition(ViewName, Value) 351 end, 352 element(1, Views)). 353 354 355validate_view_definition(<<"">>, _) -> 356 throw({error, <<"View name cannot be an empty string">>}); 357validate_view_definition(ViewName, {ViewProps}) when is_list(ViewProps) -> 358 validate_view_name(ViewName, iolist_to_binary(io_lib:format( 359 "View name `~s` cannot have leading or trailing whitespace", 360 [ViewName]))), 361 MapDef = couch_util:get_value(<<"map">>, ViewProps), 362 validate_view_map_function(ViewName, MapDef), 363 ReduceDef = couch_util:get_value(<<"reduce">>, ViewProps), 364 validate_view_reduce_function(ViewName, ReduceDef); 365validate_view_definition(ViewName, _) -> 366 ErrorMsg = io_lib:format("Value for view `~s' is not " 367 "a json object.", [ViewName]), 368 throw({error, iolist_to_binary(ErrorMsg)}). 369 370 371% Make sure the view name doesn't contain leading or trailing whitespace 372% (space, tab, newline or carriage return) 373validate_view_name(<<" ", _Rest/binary>>, ErrorMsg) -> 374 throw({error, ErrorMsg}); 375validate_view_name(<<"\t", _Rest/binary>>, ErrorMsg) -> 376 throw({error, ErrorMsg}); 377validate_view_name(<<"\n", _Rest/binary>>, ErrorMsg) -> 378 throw({error, ErrorMsg}); 379validate_view_name(<<"\r", _Rest/binary>>, ErrorMsg) -> 380 throw({error, ErrorMsg}); 381validate_view_name(Bin, ErrorMsg) when size(Bin) > 1 -> 382 Size = size(Bin) - 1 , 383 <<_:Size/binary, Trailing/bits>> = Bin, 384 % Check for trailing whitespace 385 validate_view_name(Trailing, ErrorMsg); 386validate_view_name(_, _) -> 387 ok. 388 389 390validate_view_map_function(ViewName, undefined) -> 391 ErrorMsg = io_lib:format("The `map' field for the view `~s' is missing.", 392 [ViewName]), 393 throw({error, iolist_to_binary(ErrorMsg)}); 394validate_view_map_function(ViewName, MapDef) when not is_binary(MapDef) -> 395 ErrorMsg = io_lib:format("The `map' field of the view `~s' is not " 396 "a json string.", [ViewName]), 397 throw({error, iolist_to_binary(ErrorMsg)}); 398validate_view_map_function(ViewName, MapDef) -> 399 case mapreduce:start_map_context([MapDef]) of 400 {ok, _Ctx} -> 401 ok; 402 {error, Reason} -> 403 ErrorMsg = io_lib:format("Syntax error in the map function of" 404 " the view `~s': ~s", [ViewName, Reason]), 405 throw({error, iolist_to_binary(ErrorMsg)}) 406 end. 407 408 409validate_view_reduce_function(_ViewName, undefined) -> 410 ok; 411validate_view_reduce_function(ViewName, ReduceDef) when not is_binary(ReduceDef) -> 412 ErrorMsg = io_lib:format("The `reduce' field of the view `~s' is not " 413 "a json string.", [ViewName]), 414 throw({error, iolist_to_binary(ErrorMsg)}); 415validate_view_reduce_function(_ViewName, <<"_count">>) -> 416 ok; 417validate_view_reduce_function(_ViewName, <<"_sum">>) -> 418 ok; 419validate_view_reduce_function(_ViewName, <<"_stats">>) -> 420 ok; 421validate_view_reduce_function(ViewName, <<"_", _/binary>> = ReduceDef) -> 422 ErrorMsg = io_lib:format("Invalid built-in reduce function " 423 "for view `~s': ~s", 424 [ViewName, ReduceDef]), 425 throw({error, iolist_to_binary(ErrorMsg)}); 426validate_view_reduce_function(ViewName, ReduceDef) -> 427 case mapreduce:start_reduce_context([ReduceDef]) of 428 {ok, _Ctx} -> 429 ok; 430 {error, Reason} -> 431 ErrorMsg = io_lib:format("Syntax error in the reduce function of" 432 " the view `~s': ~s", [ViewName, Reason]), 433 throw({error, iolist_to_binary(ErrorMsg)}) 434 end. 435