1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /**
3  * @copyright 2012 Couchbase, Inc.
4  *
5  * @author Filipe Manana  <filipe@couchbase.com>
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at
10  *
11  *  http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16  * License for the specific language governing permissions and limitations under
17  * the License.
18  **/
19 
20 #include <iostream>
21 #include <list>
22 #include <string>
23 #include <string.h>
24 #include <sstream>
25 #include <time.h>
26 #include <vector>
27 #include "mapreduce.h"
28 // This is libv8_libplatform library which handles garbage collection for v8
29 #include <libplatform/libplatform.h>
30 // Esprima unused and builtin JavaScript contents in raw string format
31 #include "jsfunctions/jsfunctions_data.h"
32 
33 #define MAX_LOG_STRING_SIZE 1024
34 
35 #define MAX_EMIT_KEY_SIZE 4096
36 
37 using namespace v8;
38 
39 static const char *SUM_FUNCTION_STRING =
40     "(function(values) {"
41     "    var sum = 0;"
42     "    for (var i = 0; i < values.length; ++i) {"
43     "        sum += values[i];"
44     "    }"
45     "    return sum;"
46     "})";
47 
48 static const char *DATE_FUNCTION_STRING =
49     // I wish it was on the prototype, but that will require bigger
50     // C changes as adding to the date prototype should be done on
51     // process launch. The code you see here may be faster, but it
52     // is less JavaScripty.
53     // "Date.prototype.toArray = (function() {"
54     "(function(date) {"
55     "    date = date.getUTCDate ? date : new Date(date);"
56     "    return isFinite(date.valueOf()) ?"
57     "      [date.getUTCFullYear(),"
58     "      (date.getUTCMonth() + 1),"
59     "       date.getUTCDate(),"
60     "       date.getUTCHours(),"
61     "       date.getUTCMinutes(),"
62     "       date.getUTCSeconds()] : null;"
63     "})";
64 
65 static const char *BASE64_FUNCTION_STRING =
66     "(function(b64) {"
67     "    var i, j, l, tmp, scratch, arr = [];"
68     "    var lookup = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';"
69     "    if (typeof b64 !== 'string') {"
70     "        throw 'Input is not a string';"
71     "    }"
72     "    if (b64.length % 4 > 0) {"
73     "        throw 'Invalid base64 source.';"
74     "    }"
75     "    scratch = b64.indexOf('=');"
76     "    scratch = scratch > 0 ? b64.length - scratch : 0;"
77     "    l = scratch > 0 ? b64.length - 4 : b64.length;"
78     "    for (i = 0, j = 0; i < l; i += 4, j += 3) {"
79     "        tmp = (lookup.indexOf(b64[i]) << 18) | (lookup.indexOf(b64[i + 1]) << 12);"
80     "        tmp |= (lookup.indexOf(b64[i + 2]) << 6) | lookup.indexOf(b64[i + 3]);"
81     "        arr.push((tmp & 0xFF0000) >> 16);"
82     "        arr.push((tmp & 0xFF00) >> 8);"
83     "        arr.push(tmp & 0xFF);"
84     "    }"
85     "    if (scratch === 2) {"
86     "        tmp = (lookup.indexOf(b64[i]) << 2) | (lookup.indexOf(b64[i + 1]) >> 4);"
87     "        arr.push(tmp & 0xFF);"
88     "    } else if (scratch === 1) {"
89     "        tmp = (lookup.indexOf(b64[i]) << 10) | (lookup.indexOf(b64[i + 1]) << 4);"
90     "        tmp |= (lookup.indexOf(b64[i + 2]) >> 2);"
91     "        arr.push((tmp >> 8) & 0xFF);"
92     "        arr.push(tmp & 0xFF);"
93     "    }"
94     "    return arr;"
95     "})";
96 
97 
98 
99 typedef struct {
100     Persistent<Object>    jsonObject;
101     Persistent<Function>  jsonParseFun;
102     Persistent<Function>  stringifyFun;
103     map_reduce_ctx_t      *ctx;
104 } isolate_data_t;
105 
106 static bool optimize_doc_load = true;
107 static void doInitContext(map_reduce_ctx_t *ctx,
108                           const function_sources_list_t &funs,
109                           const view_index_type_t viewType);
110 static Local<Context> createJsContext(map_reduce_ctx_t *ctx);
111 static void emit(const v8::FunctionCallbackInfo<Value>& args);
112 static void log(const v8::FunctionCallbackInfo<Value>& args);
113 
114 static void loadFunctions(map_reduce_ctx_t *ctx,
115                           const function_sources_list_t &funs);
116 static void freeJsonData(const json_results_list_t &data);
117 static void freeMapResult(const map_result_t &data);
118 static void freeMapResultList(const map_results_list_t &results);
119 static Handle<Function> compileFunction(const function_source_t &funSource);
120 static inline ErlNifBinary jsonStringify(const Handle<Value> &obj);
121 static inline Handle<Value> jsonParse(const ErlNifBinary &thing);
122 static inline Handle<Array> jsonListToJsArray(const json_results_list_t &list);
123 static inline isolate_data_t *getIsolateData();
124 static inline void taskStarted(map_reduce_ctx_t *ctx);
125 static inline void taskFinished(map_reduce_ctx_t *ctx);
126 static std::string exceptionString(const TryCatch &tryCatch);
127 static void freeLogResults(map_reduce_ctx_t *ctx);
128 
129 static Platform *v8platform;
initV8()130 void initV8()
131 {
132     V8::InitializeICU();
133     v8platform = platform::CreateDefaultPlatform();
134     V8::InitializePlatform(v8platform);
135     V8::Initialize();
136 }
137 
deinitV8()138 void deinitV8()
139 {
140     V8::Dispose();
141     V8::ShutdownPlatform();
142     delete v8platform;
143 }
144 
setOptimizeDocLoadFlag(const char *flag)145 void setOptimizeDocLoadFlag(const char *flag)
146 {
147     if(!strcmp(flag, "true"))
148         optimize_doc_load = true;
149     else
150         optimize_doc_load = false;
151 }
152 
initContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs, const view_index_type_t viewType)153 void initContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs,
154                  const view_index_type_t viewType)
155 {
156     ctx = new (ctx) map_reduce_ctx_t();
157 
158     try {
159         doInitContext(ctx, funs, viewType);
160         Locker locker(ctx->isolate);
161         Isolate::Scope isolate_scope(ctx->isolate);
162         HandleScope handle_scope(ctx->isolate);
163         Local<Context> context =
164             Local<Context>::New(ctx->isolate, ctx->jsContext);
165         Context::Scope context_scope(context);
166 
167         loadFunctions(ctx, funs);
168     } catch (...) {
169         // Releasing resource will invoke NIF destructor that calls destroyCtx
170         enif_release_resource(ctx);
171         throw;
172     }
173 }
174 
createUtf8String(Isolate *isolate, const char *str)175 static Local<String> createUtf8String(Isolate *isolate, const char *str)
176 {
177     return String::NewFromUtf8(isolate, str,
178         NewStringType::kNormal).ToLocalChecked();
179 }
180 
createUtf8String(Isolate *isolate, const char *str, size_t len)181 static Local<String> createUtf8String(Isolate *isolate, const char *str,
182                                       size_t len)
183 {
184     return String::NewFromUtf8(isolate, str,
185         NewStringType::kNormal, len).ToLocalChecked();
186 }
187 
188 
doInitContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs, const view_index_type_t viewType)189 void doInitContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs,
190                    const view_index_type_t viewType)
191 {
192     ctx->viewType = viewType;
193     Isolate::CreateParams createParams;
194     createParams.array_buffer_allocator =
195       ArrayBuffer::Allocator::NewDefaultAllocator();
196     ctx->isolate = Isolate::New(createParams);
197     ctx->logResults = NULL;
198     Locker locker(ctx->isolate);
199     Isolate::Scope isolate_scope(ctx->isolate);
200     HandleScope handle_scope(ctx->isolate);
201 
202     ctx->jsContext.Reset(ctx->isolate, createJsContext(ctx));
203     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
204     Context::Scope context_scope(context);
205 
206     Local<String> jsonString = createUtf8String(ctx->isolate, "JSON");
207     Handle<Object> jsonObject =
208         Local<Object>::Cast(context->Global()->Get(jsonString));
209 
210     Local<String> parseString = createUtf8String(ctx->isolate, "parse");
211     Handle<Function> parseFun =
212         Local<Function>::Cast(jsonObject->Get(parseString));
213     Local<String> stringifyString = createUtf8String(ctx->isolate, "stringify");
214     Handle<Function> stringifyFun =
215         Local<Function>::Cast(jsonObject->Get(stringifyString));
216 
217     isolate_data_t *isoData =
218         (isolate_data_t *) enif_alloc(sizeof(isolate_data_t));
219     if (isoData == NULL) {
220         throw std::bad_alloc();
221     }
222 
223     isoData = new (isoData) isolate_data_t();
224     isoData->jsonObject.Reset(ctx->isolate, jsonObject);
225     isoData->jsonParseFun.Reset(ctx->isolate, parseFun);
226     isoData->stringifyFun.Reset(ctx->isolate, stringifyFun);
227 
228     isoData->ctx = ctx;
229 
230     ctx->isolate->SetData(0, (void *)isoData);
231     ctx->taskStartTime = {};
232 }
233 
234 
mapDoc(map_reduce_ctx_t *ctx, const ErlNifBinary &doc, const ErlNifBinary &meta)235 map_results_list_t mapDoc(map_reduce_ctx_t *ctx,
236                           const ErlNifBinary &doc,
237                           const ErlNifBinary &meta)
238 {
239     Locker locker(ctx->isolate);
240     Isolate::Scope isolate_scope(ctx->isolate);
241     HandleScope handle_scope(ctx->isolate);
242     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
243     Context::Scope context_scope(context);
244     Handle<Value> docObject = jsonParse(doc);
245     Handle<Value> metaObject = jsonParse(meta);
246 
247     if (!metaObject->IsObject()) {
248         throw MapReduceError("metadata is not a JSON object");
249     }
250 
251     map_results_list_t results;
252     Handle<Value> funArgs[] = { docObject, metaObject };
253 
254     taskStarted(ctx);
255 
256     for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
257         map_result_t mapResult;
258         Local<Function> fun =
259             Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
260         TryCatch try_catch(ctx->isolate);
261 
262         mapResult.type = MAP_KVS;
263         mapResult.result.kvs =
264             (kv_pair_list_t *) enif_alloc(sizeof(kv_pair_list_t));
265 
266         if (mapResult.result.kvs == NULL) {
267             freeMapResultList(results);
268             throw std::bad_alloc();
269         }
270 
271         mapResult.result.kvs = new (mapResult.result.kvs) kv_pair_list_t();
272         ctx->kvs = mapResult.result.kvs;
273         ctx->emitKvSize = 0;
274         Handle<Value> result = fun->Call(context->Global(), 2, funArgs);
275 
276         if (result.IsEmpty()) {
277             freeMapResult(mapResult);
278 
279             if (!try_catch.CanContinue()) {
280                 freeMapResultList(results);
281                 throw MapReduceError("timeout");
282             }
283 
284             mapResult.type = MAP_ERROR;
285             std::string exceptString = exceptionString(try_catch);
286             size_t len = exceptString.length();
287 
288             mapResult.result.error =
289                 (ErlNifBinary *) enif_alloc(sizeof(ErlNifBinary));
290             if (mapResult.result.error == NULL) {
291                 freeMapResultList(results);
292                 throw std::bad_alloc();
293             }
294             if (!enif_alloc_binary_compat(ctx->env, len,
295                     mapResult.result.error)) {
296                 freeMapResultList(results);
297                 throw std::bad_alloc();
298             }
299             // Caller responsible for invoking enif_make_binary()
300             // or enif_release_binary()
301             memcpy(mapResult.result.error->data, exceptString.data(), len);
302         }
303 
304         results.push_back(mapResult);
305     }
306 
307     taskFinished(ctx);
308 
309     return results;
310 }
311 
312 
runReduce(map_reduce_ctx_t *ctx, const json_results_list_t &keys, const json_results_list_t &values)313 json_results_list_t runReduce(map_reduce_ctx_t *ctx,
314                               const json_results_list_t &keys,
315                               const json_results_list_t &values)
316 {
317     Locker locker(ctx->isolate);
318     Isolate::Scope isolate_scope(ctx->isolate);
319     HandleScope handle_scope(ctx->isolate);
320     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
321     Context::Scope context_scope(context);
322     Handle<Array> keysArray = jsonListToJsArray(keys);
323     Handle<Array> valuesArray = jsonListToJsArray(values);
324     json_results_list_t results;
325 
326     Handle<Value> args[] = { keysArray, valuesArray,
327                              Boolean::New(ctx->isolate, false) };
328 
329     taskStarted(ctx);
330 
331     for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
332         Local<Function> fun =
333             Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
334         TryCatch try_catch(ctx->isolate);
335         Handle<Value> result = fun->Call(context->Global(), 3, args);
336 
337         if (result.IsEmpty()) {
338             freeJsonData(results);
339 
340             if (!try_catch.CanContinue()) {
341                 throw MapReduceError("timeout");
342             }
343 
344             throw MapReduceError(exceptionString(try_catch));
345         }
346 
347         try {
348             ErlNifBinary jsonResult = jsonStringify(result);
349             results.push_back(jsonResult);
350         } catch(Handle<String> &ex) {
351             freeJsonData(results);
352             ctx->isolate->ThrowException(ex);
353         }
354     }
355 
356     taskFinished(ctx);
357     freeLogResults(ctx);
358 
359     return results;
360 }
361 
362 
runReduce(map_reduce_ctx_t *ctx, int reduceFunNum, const json_results_list_t &keys, const json_results_list_t &values)363 ErlNifBinary runReduce(map_reduce_ctx_t *ctx,
364                        int reduceFunNum,
365                        const json_results_list_t &keys,
366                        const json_results_list_t &values)
367 {
368     Locker locker(ctx->isolate);
369     Isolate::Scope isolate_scope(ctx->isolate);
370     HandleScope handle_scope(ctx->isolate);
371     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
372     Context::Scope context_scope(context);
373 
374     reduceFunNum -= 1;
375     if (reduceFunNum < 0 ||
376         static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
377         throw MapReduceError("invalid reduce function number");
378     }
379 
380     Local<Function> fun =
381         Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
382     Handle<Array> keysArray = jsonListToJsArray(keys);
383     Handle<Array> valuesArray = jsonListToJsArray(values);
384     Handle<Value> args[] = { keysArray, valuesArray,
385                              Boolean::New(ctx->isolate, false) };
386 
387     taskStarted(ctx);
388 
389     TryCatch try_catch(ctx->isolate);
390     Handle<Value> result = fun->Call(context->Global(), 3, args);
391 
392     taskFinished(ctx);
393     freeLogResults(ctx);
394 
395     if (result.IsEmpty()) {
396         if (!try_catch.CanContinue()) {
397             throw MapReduceError("timeout");
398         }
399 
400         throw MapReduceError(exceptionString(try_catch));
401     }
402 
403     ErlNifBinary jsonResult;
404     try {
405         jsonResult = jsonStringify(result);
406     } catch(Handle<String> &ex) {
407         ctx->isolate->ThrowException(ex);
408     }
409 
410     return jsonResult;
411 }
412 
413 
runRereduce(map_reduce_ctx_t *ctx, int reduceFunNum, const json_results_list_t &reductions)414 ErlNifBinary runRereduce(map_reduce_ctx_t *ctx,
415                        int reduceFunNum,
416                        const json_results_list_t &reductions)
417 {
418     Locker locker(ctx->isolate);
419     Isolate::Scope isolate_scope(ctx->isolate);
420     HandleScope handle_scope(ctx->isolate);
421     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
422     Context::Scope context_scope(context);
423 
424     reduceFunNum -= 1;
425     if (reduceFunNum < 0 ||
426         static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
427         throw MapReduceError("invalid reduce function number");
428     }
429 
430     Local<Function> fun =
431         Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
432     Handle<Array> valuesArray = jsonListToJsArray(reductions);
433     Handle<Value> args[] =
434         { Null(ctx->isolate), valuesArray, Boolean::New(ctx->isolate, true) };
435 
436     taskStarted(ctx);
437 
438     TryCatch try_catch(ctx->isolate);
439     Handle<Value> result = fun->Call(context->Global(), 3, args);
440 
441     taskFinished(ctx);
442     freeLogResults(ctx);
443 
444     if (result.IsEmpty()) {
445         if (!try_catch.CanContinue()) {
446             throw MapReduceError("timeout");
447         }
448 
449         throw MapReduceError(exceptionString(try_catch));
450     }
451 
452     ErlNifBinary jsonResult;
453     try {
454         jsonResult = jsonStringify(result);
455     } catch(Handle<String> &ex) {
456         ctx->isolate->ThrowException(ex);
457     }
458 
459     return jsonResult;
460 }
461 
462 
destroyContext(map_reduce_ctx_t *ctx)463 void destroyContext(map_reduce_ctx_t *ctx)
464 {
465     {
466         Locker locker(ctx->isolate);
467         Isolate::Scope isolate_scope(ctx->isolate);
468         HandleScope handle_scope(ctx->isolate);
469         Local<Context> context =
470             Local<Context>::New(ctx->isolate, ctx->jsContext);
471         Context::Scope context_scope(context);
472 
473         if (ctx->functions) {
474             for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
475                 (*ctx->functions)[i]->Reset();
476                 (*ctx->functions)[i]->~Persistent<v8::Function>();
477                 enif_free((*ctx->functions)[i]);
478             }
479             ctx->functions->~function_vector_t();
480             enif_free(ctx->functions);
481         }
482 
483         isolate_data_t *isoData = getIsolateData();
484         if(isoData) {
485             isoData->jsonObject.Reset();
486             isoData->jsonParseFun.Reset();
487             isoData->stringifyFun.Reset();
488             isoData->~isolate_data_t();
489             enif_free(isoData);
490         }
491 
492         ctx->jsContext.Reset();
493     }
494 
495     ctx->isolate->Dispose();
496     ctx->~map_reduce_ctx_t();
497 }
498 
499 
createJsContext(map_reduce_ctx_t *ctx)500 static Local<Context> createJsContext(map_reduce_ctx_t *ctx)
501 {
502     EscapableHandleScope handle_scope(ctx->isolate);
503     Handle<ObjectTemplate> global = ObjectTemplate::New();
504 
505     global->Set(createUtf8String(ctx->isolate, "emit"),
506             FunctionTemplate::New(ctx->isolate, emit));
507 
508     global->Set(createUtf8String(ctx->isolate, "log"),
509             FunctionTemplate::New(ctx->isolate, log));
510 
511     Handle<Context> context = Context::New(ctx->isolate, NULL, global);
512     Context::Scope context_scope(context);
513 
514     Handle<Function> sumFun = compileFunction(SUM_FUNCTION_STRING);
515     context->Global()->Set(createUtf8String(ctx->isolate, "sum"), sumFun);
516 
517     Handle<Function> decodeBase64Fun =
518         compileFunction(BASE64_FUNCTION_STRING);
519     context->Global()->Set(createUtf8String(ctx->isolate, "decodeBase64"),
520         decodeBase64Fun);
521 
522     Handle<Function> dateToArrayFun =
523         compileFunction(DATE_FUNCTION_STRING);
524     context->Global()->Set(createUtf8String(ctx->isolate, "dateToArray"),
525                            dateToArrayFun);
526 
527 
528     // Use EscapableHandleScope and return using .Escape
529     // This will ensure that return values are not garbage collected
530     // as soon as the function returns.
531     return handle_scope.Escape(context);
532 }
533 
534 #define TRUNCATE_STR "Truncated: "
535 
log(const v8::FunctionCallbackInfo<Value>& args)536 static void log(const v8::FunctionCallbackInfo<Value>& args)
537 {
538     isolate_data_t *isoData = getIsolateData();
539     map_reduce_ctx_t *ctx = isoData->ctx;
540     try {
541         /* Initialize only if log function is used */
542         if (ctx->logResults == NULL) {
543             ctx->logResults = (log_results_list_t *) enif_alloc(sizeof
544                     (log_results_list_t));
545             if (ctx->logResults == NULL) {
546                 throw std::bad_alloc();
547             }
548             ctx->logResults = new (ctx->logResults) log_results_list_t();
549         }
550         /* use only first argument */
551         Handle<Value> logMsg = args[0];
552         Handle<String> str;
553         unsigned int len = 0;
554         if (logMsg->IsString()) {
555             str = Handle<String>::Cast(logMsg);
556             len = str->Length();
557             if (len > MAX_LOG_STRING_SIZE) {
558                 str = Handle<String>(String::Concat(
559                           createUtf8String(ctx->isolate, TRUNCATE_STR), str)),
560                 len = MAX_LOG_STRING_SIZE + sizeof(TRUNCATE_STR) - 1;
561             }
562         } else {
563             str = Handle<String>(createUtf8String(ctx->isolate,
564                         "Error while logging:Log value is not a string"));
565             len = str->Length();
566         }
567         ErlNifBinary resultBin;
568         if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
569             throw std::bad_alloc();
570         }
571         str->WriteUtf8(reinterpret_cast<char *>(resultBin.data),
572                 len, NULL, String::NO_NULL_TERMINATION);
573         ctx->logResults->push_back(resultBin);
574     } catch(Handle<String> &ex) {
575         ctx->isolate->ThrowException(ex);
576     }
577 }
578 
579 
emit(const v8::FunctionCallbackInfo<Value>& args)580 static void emit(const v8::FunctionCallbackInfo<Value>& args)
581 {
582     isolate_data_t *isoData = getIsolateData();
583 
584     if (isoData->ctx->kvs == NULL) {
585         return;
586     }
587 
588     ErlNifBinary keyJson;
589     try {
590         keyJson = jsonStringify(args[0]);
591     } catch(Handle<String> &ex) {
592         isoData->ctx->isolate->ThrowException(ex);
593         return;
594     }
595 
596     // Spatial views may emit a geometry that is bigger, when serialized
597     // to JSON, than the allowed size of a key. In later steps it will then
598     // be reduced to a bouning box. Hence don't check the string size of the
599     // key of spatial views here.
600     if (isoData->ctx->viewType != VIEW_INDEX_TYPE_SPATIAL &&
601             keyJson.size >= MAX_EMIT_KEY_SIZE) {
602         std::stringstream msg;
603         msg << "too long key emitted: " << keyJson.size << " bytes";
604 
605         isoData->ctx->isolate->ThrowException(
606                 createUtf8String(isoData->ctx->isolate, msg.str().c_str())
607                                              );
608 	return;
609     }
610 
611     try {
612         ErlNifBinary valueJson = jsonStringify(args[1]);
613 
614         kv_pair_t result = kv_pair_t(keyJson, valueJson);
615         isoData->ctx->kvs->push_back(result);
616         isoData->ctx->emitKvSize += keyJson.size;
617         isoData->ctx->emitKvSize += valueJson.size;
618 
619     } catch(Handle<String> &ex) {
620         isoData->ctx->isolate->ThrowException(ex);
621     }
622 
623     if ((isoData->ctx->maxEmitKvSize > 0) &&
624         (isoData->ctx->emitKvSize > isoData->ctx->maxEmitKvSize)) {
625         std::stringstream msg;
626         msg << "too much data emitted: " << isoData->ctx->emitKvSize << " bytes";
627 
628         isoData->ctx->isolate->ThrowException(
629                 createUtf8String(isoData->ctx->isolate, msg.str().c_str()));
630     }
631 }
632 
633 
jsonStringify(const Handle<Value> &obj)634 ErlNifBinary jsonStringify(const Handle<Value> &obj)
635 {
636     isolate_data_t *isoData = getIsolateData();
637     Handle<Value> args[] = { obj };
638     TryCatch try_catch(isoData->ctx->isolate);
639     Local<Function> stringifyFun =
640         Local<Function>::New(isoData->ctx->isolate, isoData->stringifyFun);
641     Local<Object> jsonObject =
642         Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
643     Handle<Value> result = stringifyFun->Call(jsonObject, 1, args);
644 
645     if (result.IsEmpty()) {
646         if (try_catch.HasCaught()) {
647             Local<Message> m = try_catch.Message();
648             if (!m.IsEmpty()) {
649                 throw Local<String>(m->Get());
650             }
651         }
652         throw Handle<Value>(createUtf8String(isoData->ctx->isolate,
653                     "JSON.stringify() error"));
654     }
655 
656     unsigned len;
657     ErlNifBinary resultBin;
658 
659     if (result->IsUndefined()) {
660         len = static_cast<unsigned>(sizeof("null") - 1);
661         if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
662             throw std::bad_alloc();
663         }
664         memcpy(resultBin.data, "null", len);
665     } else {
666         Handle<String> str = Handle<String>::Cast(result);
667         len = str->Utf8Length();
668         if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
669             throw std::bad_alloc();
670         }
671         str->WriteUtf8(reinterpret_cast<char *>(resultBin.data),
672                        len, NULL, String::NO_NULL_TERMINATION);
673     }
674 
675     // Caller responsible for invoking enif_make_binary()
676     // or enif_release_binary()
677     return resultBin;
678 }
679 
680 
jsonParse(const ErlNifBinary &thing)681 Handle<Value> jsonParse(const ErlNifBinary &thing)
682 {
683     isolate_data_t *isoData = getIsolateData();
684     Handle<Value> args[] = { createUtf8String(isoData->ctx->isolate,
685                              reinterpret_cast<char *>(thing.data),
686                              (size_t)thing.size) };
687     TryCatch try_catch(isoData->ctx->isolate);
688     Local<Function> jsonParseFun =
689         Local<Function>::New(isoData->ctx->isolate, isoData->jsonParseFun);
690     Local<Object> jsonObject =
691         Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
692     Handle<Value> result = jsonParseFun->Call(jsonObject, 1, args);
693 
694     if (result.IsEmpty()) {
695         throw MapReduceError(exceptionString(try_catch));
696     }
697 
698     return result;
699 }
700 
701 
loadFunctions(map_reduce_ctx_t *ctx, const function_sources_list_t &funStrings)702 void loadFunctions(map_reduce_ctx_t *ctx,
703                    const function_sources_list_t &funStrings)
704 {
705     HandleScope handle_scope(ctx->isolate);
706     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
707     Context::Scope context_scope(context);
708 
709     bool isDocUsed;
710     if(optimize_doc_load) {
711         // If esprima compilation fails restore back to pulling in documents.
712         try {
713             compileFunction((char *)jsFunction_src);
714             isDocUsed = false;
715         } catch(...) {
716             isDocUsed = true;
717         }
718     }
719     else {
720         isDocUsed = true;
721     }
722 
723     ctx->functions = (function_vector_t *) enif_alloc(sizeof(function_vector_t));
724     if (ctx->functions == NULL) {
725         throw std::bad_alloc();
726     }
727 
728     ctx->functions = new (ctx->functions) function_vector_t();
729     function_sources_list_t::const_iterator it = funStrings.begin();
730 
731     for ( ; it != funStrings.end(); ++it) {
732         Handle<Function> fun = compileFunction(*it);
733         // Do this if is_doc_unused function compilation is successful
734         if(optimize_doc_load && !isDocUsed) {
735             Handle<Value> val = context->Global()->Get(
736                 createUtf8String(ctx->isolate, "is_doc_unused"));
737             Handle<Function> unusedFun = Handle<Function>::Cast(val);
738             Handle<Value> arg = createUtf8String(ctx->isolate, it->data());
739             TryCatch try_catch(ctx->isolate);
740             Handle<Value> js_result = unusedFun->Call(context->Global(), 1, &arg);
741             if (try_catch.HasCaught()) {
742                 throw MapReduceError(exceptionString(try_catch));
743             }
744             if (js_result.IsEmpty()) {
745                 // Some error during static analysis of map function
746                 throw MapReduceError("Malformed map function");
747             }
748             bool isDocUnused = js_result->BooleanValue();
749             if (isDocUnused == false) {
750                 isDocUsed = true;
751             }
752         }
753         Persistent<Function> *perFn =
754             (Persistent<Function> *) enif_alloc(sizeof(Persistent<Function>));
755         if (perFn == NULL) {
756             throw std::bad_alloc();
757         }
758         perFn = new (perFn) Persistent<Function>();
759         perFn->Reset(ctx->isolate, fun);
760         ctx->functions->push_back(perFn);
761     }
762     ctx->isDocUsed = isDocUsed;
763 }
764 
765 
compileFunction(const function_source_t &funSource)766 Handle<Function> compileFunction(const function_source_t &funSource)
767 {
768     Isolate *isolate = Isolate::GetCurrent();
769     Local<Context> context(isolate->GetCurrentContext());
770     EscapableHandleScope handle_scope(isolate);
771     TryCatch try_catch(isolate);
772     Handle<String> source =
773         createUtf8String(isolate, funSource.data(), funSource.length());
774     Local<Script> script;
775     if (!Script::Compile(context, source).ToLocal(&script)) {
776         throw MapReduceError(exceptionString(try_catch));
777     }
778 
779     if (script.IsEmpty()) {
780         throw MapReduceError(exceptionString(try_catch));
781     }
782 
783     Handle<Value> result = script->Run();
784 
785     if (result.IsEmpty()) {
786         throw MapReduceError(exceptionString(try_catch));
787     }
788 
789     if (!result->IsFunction()) {
790         throw MapReduceError(std::string("Invalid function: ") +
791                 funSource.c_str());
792     }
793 
794     return handle_scope.Escape(Handle<Function>::Cast(result));
795 }
796 
797 
jsonListToJsArray(const json_results_list_t &list)798 Handle<Array> jsonListToJsArray(const json_results_list_t &list)
799 {
800     Isolate *isolate = Isolate::GetCurrent();
801     Handle<Array> array = Array::New(isolate, list.size());
802     json_results_list_t::const_iterator it = list.begin();
803     int i = 0;
804 
805     for ( ; it != list.end(); ++it, ++i) {
806         Handle<Value> v = jsonParse(*it);
807         array->Set(Number::New(isolate, i), v);
808     }
809 
810     return array;
811 }
812 
813 
getIsolateData()814 isolate_data_t *getIsolateData()
815 {
816     Isolate *isolate = Isolate::GetCurrent();
817     return reinterpret_cast<isolate_data_t*>(isolate->GetData(0));
818 }
819 
820 
taskStarted(map_reduce_ctx_t *ctx)821 void taskStarted(map_reduce_ctx_t *ctx)
822 {
823     ctx->exitMutex.lock();
824     ctx->taskStartTime = std::chrono::high_resolution_clock::now();
825     ctx->exitMutex.unlock();
826     ctx->kvs = NULL;
827 }
828 
829 
taskFinished(map_reduce_ctx_t *ctx)830 void taskFinished(map_reduce_ctx_t *ctx)
831 {
832     ctx->exitMutex.lock();
833     ctx->taskStartTime = {};
834     ctx->exitMutex.unlock();
835 }
836 
837 
terminateTask(map_reduce_ctx_t *ctx)838 void terminateTask(map_reduce_ctx_t *ctx)
839 {
840     V8::TerminateExecution(ctx->isolate);
841     ctx->taskStartTime = {};
842 }
843 
844 
exceptionString(const TryCatch &tryCatch)845 std::string exceptionString(const TryCatch &tryCatch)
846 {
847     HandleScope handle_scope(Isolate::GetCurrent());
848     String::Utf8Value exception(tryCatch.Exception());
849     const char *exceptionString = (*exception);
850 
851     if (exceptionString) {
852         Handle<Message> message = tryCatch.Message();
853         return std::string(exceptionString) + " (line " +
854                 std::to_string(message->GetLineNumber()) + ":" +
855                 std::to_string(message->GetStartColumn()) + ")";
856     }
857 
858     return std::string("runtime error");
859 }
860 
861 
freeMapResultList(const map_results_list_t &results)862 void freeMapResultList(const map_results_list_t &results)
863 {
864     map_results_list_t::const_iterator it = results.begin();
865 
866     for ( ; it != results.end(); ++it) {
867         freeMapResult(*it);
868     }
869 }
870 
871 
freeMapResult(const map_result_t &mapResult)872 void freeMapResult(const map_result_t &mapResult)
873 {
874     switch (mapResult.type) {
875     case MAP_KVS:
876         {
877             kv_pair_list_t::const_iterator it = mapResult.result.kvs->begin();
878             for ( ; it != mapResult.result.kvs->end(); ++it) {
879                 ErlNifBinary key = it->first;
880                 ErlNifBinary value = it->second;
881                 enif_release_binary(&key);
882                 enif_release_binary(&value);
883             }
884             mapResult.result.kvs->~kv_pair_list_t();
885             enif_free(mapResult.result.kvs);
886         }
887         break;
888     case MAP_ERROR:
889         enif_release_binary(mapResult.result.error);
890         enif_free(mapResult.result.error);
891         break;
892     }
893 }
894 
895 
freeJsonData(const json_results_list_t &data)896 void freeJsonData(const json_results_list_t &data)
897 {
898     json_results_list_t::const_iterator it = data.begin();
899     for ( ; it != data.end(); ++it) {
900         ErlNifBinary bin = *it;
901         enif_release_binary(&bin);
902     }
903 }
904 
905 // Free the logresults with data. For avoiding one extra allocation
906 // of logResults, allocation is done in log function, so which can cause
907 // memory leak if it is called from other (reduce, rereduce) context,
908 // so free that memory here.
freeLogResults(map_reduce_ctx_t *ctx)909 void freeLogResults(map_reduce_ctx_t *ctx)
910 {
911     if (ctx->logResults) {
912         log_results_list_t::reverse_iterator k = ctx->logResults->rbegin();
913         for ( ; k != ctx->logResults->rend(); ++k) {
914             enif_release_binary_compat(ctx->env, &(*k));
915         }
916         ctx->logResults->~log_results_list_t();
917         enif_free(ctx->logResults);
918         ctx->logResults = NULL;
919     }
920 }
921