xref: /6.0.3/couchdb/src/mapreduce/mapreduce.cc (revision 45731d9f)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/**
3 * @copyright 2012 Couchbase, Inc.
4 *
5 * @author Filipe Manana  <filipe@couchbase.com>
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at
10 *
11 *  http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations under
17 * the License.
18 **/
19
20#include <iostream>
21#include <list>
22#include <string>
23#include <string.h>
24#include <sstream>
25#include <time.h>
26#include <vector>
27#include "mapreduce.h"
28// This is libv8_libplatform library which handles garbage collection for v8
29#include <libplatform/libplatform.h>
30// Esprima unused and builtin JavaScript contents in raw string format
31#include "jsfunctions/jsfunctions_data.h"
32
33#define MAX_LOG_STRING_SIZE 1024
34
35#define MAX_EMIT_KEY_SIZE 4096
36
37using namespace v8;
38
39static const char *SUM_FUNCTION_STRING =
40    "(function(values) {"
41    "    var sum = 0;"
42    "    for (var i = 0; i < values.length; ++i) {"
43    "        sum += values[i];"
44    "    }"
45    "    return sum;"
46    "})";
47
48static const char *DATE_FUNCTION_STRING =
49    // I wish it was on the prototype, but that will require bigger
50    // C changes as adding to the date prototype should be done on
51    // process launch. The code you see here may be faster, but it
52    // is less JavaScripty.
53    // "Date.prototype.toArray = (function() {"
54    "(function(date) {"
55    "    date = date.getUTCDate ? date : new Date(date);"
56    "    return isFinite(date.valueOf()) ?"
57    "      [date.getUTCFullYear(),"
58    "      (date.getUTCMonth() + 1),"
59    "       date.getUTCDate(),"
60    "       date.getUTCHours(),"
61    "       date.getUTCMinutes(),"
62    "       date.getUTCSeconds()] : null;"
63    "})";
64
65static const char *BASE64_FUNCTION_STRING =
66    "(function(b64) {"
67    "    var i, j, l, tmp, scratch, arr = [];"
68    "    var lookup = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';"
69    "    if (typeof b64 !== 'string') {"
70    "        throw 'Input is not a string';"
71    "    }"
72    "    if (b64.length % 4 > 0) {"
73    "        throw 'Invalid base64 source.';"
74    "    }"
75    "    scratch = b64.indexOf('=');"
76    "    scratch = scratch > 0 ? b64.length - scratch : 0;"
77    "    l = scratch > 0 ? b64.length - 4 : b64.length;"
78    "    for (i = 0, j = 0; i < l; i += 4, j += 3) {"
79    "        tmp = (lookup.indexOf(b64[i]) << 18) | (lookup.indexOf(b64[i + 1]) << 12);"
80    "        tmp |= (lookup.indexOf(b64[i + 2]) << 6) | lookup.indexOf(b64[i + 3]);"
81    "        arr.push((tmp & 0xFF0000) >> 16);"
82    "        arr.push((tmp & 0xFF00) >> 8);"
83    "        arr.push(tmp & 0xFF);"
84    "    }"
85    "    if (scratch === 2) {"
86    "        tmp = (lookup.indexOf(b64[i]) << 2) | (lookup.indexOf(b64[i + 1]) >> 4);"
87    "        arr.push(tmp & 0xFF);"
88    "    } else if (scratch === 1) {"
89    "        tmp = (lookup.indexOf(b64[i]) << 10) | (lookup.indexOf(b64[i + 1]) << 4);"
90    "        tmp |= (lookup.indexOf(b64[i + 2]) >> 2);"
91    "        arr.push((tmp >> 8) & 0xFF);"
92    "        arr.push(tmp & 0xFF);"
93    "    }"
94    "    return arr;"
95    "})";
96
97
98
99typedef struct {
100    Persistent<Object>    jsonObject;
101    Persistent<Function>  jsonParseFun;
102    Persistent<Function>  stringifyFun;
103    map_reduce_ctx_t      *ctx;
104} isolate_data_t;
105
106static bool optimize_doc_load = true;
107static void doInitContext(map_reduce_ctx_t *ctx,
108                          const function_sources_list_t &funs,
109                          const view_index_type_t viewType);
110static Local<Context> createJsContext(map_reduce_ctx_t *ctx);
111static void emit(const v8::FunctionCallbackInfo<Value>& args);
112static void log(const v8::FunctionCallbackInfo<Value>& args);
113
114static void loadFunctions(map_reduce_ctx_t *ctx,
115                          const function_sources_list_t &funs);
116static void freeJsonData(const json_results_list_t &data);
117static void freeMapResult(const map_result_t &data);
118static void freeMapResultList(const map_results_list_t &results);
119static Handle<Function> compileFunction(const function_source_t &funSource);
120static inline ErlNifBinary jsonStringify(const Handle<Value> &obj);
121static inline Handle<Value> jsonParse(const ErlNifBinary &thing);
122static inline Handle<Array> jsonListToJsArray(const json_results_list_t &list);
123static inline isolate_data_t *getIsolateData();
124static inline void taskStarted(map_reduce_ctx_t *ctx);
125static inline void taskFinished(map_reduce_ctx_t *ctx);
126static std::string exceptionString(const TryCatch &tryCatch);
127static void freeLogResults(map_reduce_ctx_t *ctx);
128
129static Platform *v8platform;
130void initV8()
131{
132    V8::InitializeICUDefaultLocation("");
133    v8platform = platform::CreateDefaultPlatform();
134    V8::InitializePlatform(v8platform);
135    V8::Initialize();
136}
137
138void deinitV8()
139{
140    V8::Dispose();
141    V8::ShutdownPlatform();
142    delete v8platform;
143}
144
145void setOptimizeDocLoadFlag(const char *flag)
146{
147    if(!strcmp(flag, "true"))
148        optimize_doc_load = true;
149    else
150        optimize_doc_load = false;
151}
152
153void initContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs,
154                 const view_index_type_t viewType)
155{
156    ctx = new (ctx) map_reduce_ctx_t();
157
158    try {
159        doInitContext(ctx, funs, viewType);
160        Locker locker(ctx->isolate);
161        Isolate::Scope isolate_scope(ctx->isolate);
162        HandleScope handle_scope(ctx->isolate);
163        Local<Context> context =
164            Local<Context>::New(ctx->isolate, ctx->jsContext);
165        Context::Scope context_scope(context);
166
167        loadFunctions(ctx, funs);
168    } catch (...) {
169        // Releasing resource will invoke NIF destructor that calls destroyCtx
170        enif_release_resource(ctx);
171        throw;
172    }
173}
174
175static Local<String> createUtf8String(Isolate *isolate, const char *str)
176{
177    return String::NewFromUtf8(isolate, str,
178        NewStringType::kNormal).ToLocalChecked();
179}
180
181static Local<String> createUtf8String(Isolate *isolate, const char *str,
182                                      size_t len)
183{
184    return String::NewFromUtf8(isolate, str,
185        NewStringType::kNormal, len).ToLocalChecked();
186}
187
188
189void doInitContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs,
190                   const view_index_type_t viewType)
191{
192    ctx->viewType = viewType;
193    Isolate::CreateParams createParams;
194    createParams.array_buffer_allocator =
195      ArrayBuffer::Allocator::NewDefaultAllocator();
196    ctx->isolate = Isolate::New(createParams);
197    ctx->logResults = NULL;
198    Locker locker(ctx->isolate);
199    Isolate::Scope isolate_scope(ctx->isolate);
200    HandleScope handle_scope(ctx->isolate);
201
202    ctx->jsContext.Reset(ctx->isolate, createJsContext(ctx));
203    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
204    Context::Scope context_scope(context);
205
206    Local<String> jsonString = createUtf8String(ctx->isolate, "JSON");
207    Handle<Object> jsonObject =
208        Local<Object>::Cast(context->Global()->Get(jsonString));
209
210    Local<String> parseString = createUtf8String(ctx->isolate, "parse");
211    Handle<Function> parseFun =
212        Local<Function>::Cast(jsonObject->Get(parseString));
213    Local<String> stringifyString = createUtf8String(ctx->isolate, "stringify");
214    Handle<Function> stringifyFun =
215        Local<Function>::Cast(jsonObject->Get(stringifyString));
216
217    isolate_data_t *isoData =
218        (isolate_data_t *) enif_alloc(sizeof(isolate_data_t));
219    if (isoData == NULL) {
220        throw std::bad_alloc();
221    }
222
223    isoData = new (isoData) isolate_data_t();
224    isoData->jsonObject.Reset(ctx->isolate, jsonObject);
225    isoData->jsonParseFun.Reset(ctx->isolate, parseFun);
226    isoData->stringifyFun.Reset(ctx->isolate, stringifyFun);
227
228    isoData->ctx = ctx;
229
230    ctx->isolate->SetData(0, (void *)isoData);
231    ctx->taskStartTime = {};
232}
233
234
235map_results_list_t mapDoc(map_reduce_ctx_t *ctx,
236                          const ErlNifBinary &doc,
237                          const ErlNifBinary &meta)
238{
239    Locker locker(ctx->isolate);
240    Isolate::Scope isolate_scope(ctx->isolate);
241    HandleScope handle_scope(ctx->isolate);
242    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
243    Context::Scope context_scope(context);
244    Handle<Value> docObject = jsonParse(doc);
245    Handle<Value> metaObject = jsonParse(meta);
246
247    if (!metaObject->IsObject()) {
248        throw MapReduceError("metadata is not a JSON object");
249    }
250
251    map_results_list_t results;
252    Handle<Value> funArgs[] = { docObject, metaObject };
253
254    taskStarted(ctx);
255
256    for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
257        map_result_t mapResult;
258        Local<Function> fun =
259            Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
260        TryCatch try_catch(ctx->isolate);
261
262        mapResult.type = MAP_KVS;
263        mapResult.result.kvs =
264            (kv_pair_list_t *) enif_alloc(sizeof(kv_pair_list_t));
265
266        if (mapResult.result.kvs == NULL) {
267            freeMapResultList(results);
268            throw std::bad_alloc();
269        }
270
271        mapResult.result.kvs = new (mapResult.result.kvs) kv_pair_list_t();
272        ctx->kvs = mapResult.result.kvs;
273        ctx->emitKvSize = 0;
274        Handle<Value> result = fun->Call(context->Global(), 2, funArgs);
275
276        if (result.IsEmpty()) {
277            freeMapResult(mapResult);
278
279            if (!try_catch.CanContinue()) {
280                freeMapResultList(results);
281                throw MapReduceError("timeout");
282            }
283
284            mapResult.type = MAP_ERROR;
285            std::string exceptString = exceptionString(try_catch);
286            size_t len = exceptString.length();
287
288            mapResult.result.error =
289                (ErlNifBinary *) enif_alloc(sizeof(ErlNifBinary));
290            if (mapResult.result.error == NULL) {
291                freeMapResultList(results);
292                throw std::bad_alloc();
293            }
294            if (!enif_alloc_binary_compat(ctx->env, len,
295                    mapResult.result.error)) {
296                freeMapResultList(results);
297                throw std::bad_alloc();
298            }
299            // Caller responsible for invoking enif_make_binary()
300            // or enif_release_binary()
301            memcpy(mapResult.result.error->data, exceptString.data(), len);
302        }
303
304        results.push_back(mapResult);
305    }
306
307    taskFinished(ctx);
308
309    return results;
310}
311
312
313json_results_list_t runReduce(map_reduce_ctx_t *ctx,
314                              const json_results_list_t &keys,
315                              const json_results_list_t &values)
316{
317    Locker locker(ctx->isolate);
318    Isolate::Scope isolate_scope(ctx->isolate);
319    HandleScope handle_scope(ctx->isolate);
320    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
321    Context::Scope context_scope(context);
322    Handle<Array> keysArray = jsonListToJsArray(keys);
323    Handle<Array> valuesArray = jsonListToJsArray(values);
324    json_results_list_t results;
325
326    Handle<Value> args[] = { keysArray, valuesArray,
327                             Boolean::New(ctx->isolate, false) };
328
329    taskStarted(ctx);
330
331    for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
332        Local<Function> fun =
333            Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
334        TryCatch try_catch(ctx->isolate);
335        Handle<Value> result = fun->Call(context->Global(), 3, args);
336
337        if (result.IsEmpty()) {
338            freeJsonData(results);
339
340            if (!try_catch.CanContinue()) {
341                throw MapReduceError("timeout");
342            }
343
344            throw MapReduceError(exceptionString(try_catch));
345        }
346
347        try {
348            ErlNifBinary jsonResult = jsonStringify(result);
349            results.push_back(jsonResult);
350        } catch(Handle<String> &ex) {
351            freeJsonData(results);
352            ctx->isolate->ThrowException(ex);
353        }
354    }
355
356    taskFinished(ctx);
357    freeLogResults(ctx);
358
359    return results;
360}
361
362
363ErlNifBinary runReduce(map_reduce_ctx_t *ctx,
364                       int reduceFunNum,
365                       const json_results_list_t &keys,
366                       const json_results_list_t &values)
367{
368    Locker locker(ctx->isolate);
369    Isolate::Scope isolate_scope(ctx->isolate);
370    HandleScope handle_scope(ctx->isolate);
371    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
372    Context::Scope context_scope(context);
373
374    reduceFunNum -= 1;
375    if (reduceFunNum < 0 ||
376        static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
377        throw MapReduceError("invalid reduce function number");
378    }
379
380    Local<Function> fun =
381        Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
382    Handle<Array> keysArray = jsonListToJsArray(keys);
383    Handle<Array> valuesArray = jsonListToJsArray(values);
384    Handle<Value> args[] = { keysArray, valuesArray,
385                             Boolean::New(ctx->isolate, false) };
386
387    taskStarted(ctx);
388
389    TryCatch try_catch(ctx->isolate);
390    Handle<Value> result = fun->Call(context->Global(), 3, args);
391
392    taskFinished(ctx);
393    freeLogResults(ctx);
394
395    if (result.IsEmpty()) {
396        if (!try_catch.CanContinue()) {
397            throw MapReduceError("timeout");
398        }
399
400        throw MapReduceError(exceptionString(try_catch));
401    }
402
403    ErlNifBinary jsonResult;
404    try {
405        jsonResult = jsonStringify(result);
406    } catch(Handle<String> &ex) {
407        ctx->isolate->ThrowException(ex);
408    }
409
410    return jsonResult;
411}
412
413
414ErlNifBinary runRereduce(map_reduce_ctx_t *ctx,
415                       int reduceFunNum,
416                       const json_results_list_t &reductions)
417{
418    Locker locker(ctx->isolate);
419    Isolate::Scope isolate_scope(ctx->isolate);
420    HandleScope handle_scope(ctx->isolate);
421    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
422    Context::Scope context_scope(context);
423
424    reduceFunNum -= 1;
425    if (reduceFunNum < 0 ||
426        static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
427        throw MapReduceError("invalid reduce function number");
428    }
429
430    Local<Function> fun =
431        Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
432    Handle<Array> valuesArray = jsonListToJsArray(reductions);
433    Handle<Value> args[] =
434        { Null(ctx->isolate), valuesArray, Boolean::New(ctx->isolate, true) };
435
436    taskStarted(ctx);
437
438    TryCatch try_catch(ctx->isolate);
439    Handle<Value> result = fun->Call(context->Global(), 3, args);
440
441    taskFinished(ctx);
442    freeLogResults(ctx);
443
444    if (result.IsEmpty()) {
445        if (!try_catch.CanContinue()) {
446            throw MapReduceError("timeout");
447        }
448
449        throw MapReduceError(exceptionString(try_catch));
450    }
451
452    ErlNifBinary jsonResult;
453    try {
454        jsonResult = jsonStringify(result);
455    } catch(Handle<String> &ex) {
456        ctx->isolate->ThrowException(ex);
457    }
458
459    return jsonResult;
460}
461
462
463void destroyContext(map_reduce_ctx_t *ctx)
464{
465    {
466        Locker locker(ctx->isolate);
467        Isolate::Scope isolate_scope(ctx->isolate);
468        HandleScope handle_scope(ctx->isolate);
469        Local<Context> context =
470            Local<Context>::New(ctx->isolate, ctx->jsContext);
471        Context::Scope context_scope(context);
472
473        if (ctx->functions) {
474            for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
475                (*ctx->functions)[i]->Reset();
476                (*ctx->functions)[i]->~Persistent<v8::Function>();
477                enif_free((*ctx->functions)[i]);
478            }
479            ctx->functions->~function_vector_t();
480            enif_free(ctx->functions);
481        }
482
483        isolate_data_t *isoData = getIsolateData();
484        if(isoData) {
485            isoData->jsonObject.Reset();
486            isoData->jsonParseFun.Reset();
487            isoData->stringifyFun.Reset();
488            isoData->~isolate_data_t();
489            enif_free(isoData);
490        }
491
492        ctx->jsContext.Reset();
493    }
494
495    ctx->isolate->Dispose();
496    ctx->~map_reduce_ctx_t();
497}
498
499
500static Local<Context> createJsContext(map_reduce_ctx_t *ctx)
501{
502    EscapableHandleScope handle_scope(ctx->isolate);
503    Handle<ObjectTemplate> global = ObjectTemplate::New();
504
505    global->Set(createUtf8String(ctx->isolate, "emit"),
506            FunctionTemplate::New(ctx->isolate, emit));
507
508    global->Set(createUtf8String(ctx->isolate, "log"),
509            FunctionTemplate::New(ctx->isolate, log));
510
511    Handle<Context> context = Context::New(ctx->isolate, NULL, global);
512    Context::Scope context_scope(context);
513
514    Handle<Function> sumFun = compileFunction(SUM_FUNCTION_STRING);
515    context->Global()->Set(createUtf8String(ctx->isolate, "sum"), sumFun);
516
517    Handle<Function> decodeBase64Fun =
518        compileFunction(BASE64_FUNCTION_STRING);
519    context->Global()->Set(createUtf8String(ctx->isolate, "decodeBase64"),
520        decodeBase64Fun);
521
522    Handle<Function> dateToArrayFun =
523        compileFunction(DATE_FUNCTION_STRING);
524    context->Global()->Set(createUtf8String(ctx->isolate, "dateToArray"),
525                           dateToArrayFun);
526
527
528    // Use EscapableHandleScope and return using .Escape
529    // This will ensure that return values are not garbage collected
530    // as soon as the function returns.
531    return handle_scope.Escape(context);
532}
533
534#define TRUNCATE_STR "Truncated: "
535
536static void log(const v8::FunctionCallbackInfo<Value>& args)
537{
538    isolate_data_t *isoData = getIsolateData();
539    map_reduce_ctx_t *ctx = isoData->ctx;
540    try {
541        /* Initialize only if log function is used */
542        if (ctx->logResults == NULL) {
543            ctx->logResults = (log_results_list_t *) enif_alloc(sizeof
544                    (log_results_list_t));
545            if (ctx->logResults == NULL) {
546                throw std::bad_alloc();
547            }
548            ctx->logResults = new (ctx->logResults) log_results_list_t();
549        }
550        /* use only first argument */
551        Handle<Value> logMsg = args[0];
552        Handle<String> str;
553        unsigned int len = 0;
554        if (logMsg->IsString()) {
555            str = Handle<String>::Cast(logMsg);
556            len = str->Length();
557            if (len > MAX_LOG_STRING_SIZE) {
558                str = Handle<String>(String::Concat(
559                          createUtf8String(ctx->isolate, TRUNCATE_STR), str)),
560                len = MAX_LOG_STRING_SIZE + sizeof(TRUNCATE_STR) - 1;
561            }
562        } else {
563            str = Handle<String>(createUtf8String(ctx->isolate,
564                        "Error while logging:Log value is not a string"));
565            len = str->Length();
566        }
567        ErlNifBinary resultBin;
568        if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
569            throw std::bad_alloc();
570        }
571        str->WriteUtf8(reinterpret_cast<char *>(resultBin.data),
572                len, NULL, String::NO_NULL_TERMINATION);
573        ctx->logResults->push_back(resultBin);
574    } catch(Handle<String> &ex) {
575        ctx->isolate->ThrowException(ex);
576    }
577}
578
579
580static void emit(const v8::FunctionCallbackInfo<Value>& args)
581{
582    isolate_data_t *isoData = getIsolateData();
583
584    if (isoData->ctx->kvs == NULL) {
585        return;
586    }
587
588    ErlNifBinary keyJson;
589    try {
590        keyJson = jsonStringify(args[0]);
591    } catch(Handle<String> &ex) {
592        isoData->ctx->isolate->ThrowException(ex);
593        return;
594    }
595
596    // Spatial views may emit a geometry that is bigger, when serialized
597    // to JSON, than the allowed size of a key. In later steps it will then
598    // be reduced to a bouning box. Hence don't check the string size of the
599    // key of spatial views here.
600    if (isoData->ctx->viewType != VIEW_INDEX_TYPE_SPATIAL &&
601            keyJson.size >= MAX_EMIT_KEY_SIZE) {
602        std::stringstream msg;
603        msg << "too long key emitted: " << keyJson.size << " bytes";
604
605        isoData->ctx->isolate->ThrowException(
606                createUtf8String(isoData->ctx->isolate, msg.str().c_str())
607                                             );
608	return;
609    }
610
611    try {
612        ErlNifBinary valueJson = jsonStringify(args[1]);
613
614        kv_pair_t result = kv_pair_t(keyJson, valueJson);
615        isoData->ctx->kvs->push_back(result);
616        isoData->ctx->emitKvSize += keyJson.size;
617        isoData->ctx->emitKvSize += valueJson.size;
618
619    } catch(Handle<String> &ex) {
620        isoData->ctx->isolate->ThrowException(ex);
621    }
622
623    if ((isoData->ctx->maxEmitKvSize > 0) &&
624        (isoData->ctx->emitKvSize > isoData->ctx->maxEmitKvSize)) {
625        std::stringstream msg;
626        msg << "too much data emitted: " << isoData->ctx->emitKvSize << " bytes";
627
628        isoData->ctx->isolate->ThrowException(
629                createUtf8String(isoData->ctx->isolate, msg.str().c_str()));
630    }
631}
632
633
634ErlNifBinary jsonStringify(const Handle<Value> &obj)
635{
636    isolate_data_t *isoData = getIsolateData();
637    Handle<Value> args[] = { obj };
638    TryCatch try_catch(isoData->ctx->isolate);
639    Local<Function> stringifyFun =
640        Local<Function>::New(isoData->ctx->isolate, isoData->stringifyFun);
641    Local<Object> jsonObject =
642        Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
643    Handle<Value> result = stringifyFun->Call(jsonObject, 1, args);
644
645    if (result.IsEmpty()) {
646        if (try_catch.HasCaught()) {
647            Local<Message> m = try_catch.Message();
648            if (!m.IsEmpty()) {
649                throw Local<String>(m->Get());
650            }
651        }
652        throw Handle<Value>(createUtf8String(isoData->ctx->isolate,
653                    "JSON.stringify() error"));
654    }
655
656    unsigned len;
657    ErlNifBinary resultBin;
658
659    if (result->IsUndefined()) {
660        len = static_cast<unsigned>(sizeof("null") - 1);
661        if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
662            throw std::bad_alloc();
663        }
664        memcpy(resultBin.data, "null", len);
665    } else {
666        Handle<String> str = Handle<String>::Cast(result);
667        len = str->Utf8Length();
668        if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
669            throw std::bad_alloc();
670        }
671        str->WriteUtf8(reinterpret_cast<char *>(resultBin.data),
672                       len, NULL, String::NO_NULL_TERMINATION);
673    }
674
675    // Caller responsible for invoking enif_make_binary()
676    // or enif_release_binary()
677    return resultBin;
678}
679
680
681Handle<Value> jsonParse(const ErlNifBinary &thing)
682{
683    isolate_data_t *isoData = getIsolateData();
684    Handle<Value> args[] = { createUtf8String(isoData->ctx->isolate,
685                             reinterpret_cast<char *>(thing.data),
686                             (size_t)thing.size) };
687    TryCatch try_catch(isoData->ctx->isolate);
688    Local<Function> jsonParseFun =
689        Local<Function>::New(isoData->ctx->isolate, isoData->jsonParseFun);
690    Local<Object> jsonObject =
691        Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
692    Handle<Value> result = jsonParseFun->Call(jsonObject, 1, args);
693
694    if (result.IsEmpty()) {
695        throw MapReduceError(exceptionString(try_catch));
696    }
697
698    return result;
699}
700
701
702void loadFunctions(map_reduce_ctx_t *ctx,
703                   const function_sources_list_t &funStrings)
704{
705    HandleScope handle_scope(ctx->isolate);
706    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
707    Context::Scope context_scope(context);
708
709    bool isDocUsed;
710    if(optimize_doc_load) {
711        // If esprima compilation fails restore back to pulling in documents.
712        try {
713            compileFunction((char *)jsFunction_src);
714            isDocUsed = false;
715        } catch(...) {
716            isDocUsed = true;
717        }
718    }
719    else {
720        isDocUsed = true;
721    }
722
723    ctx->functions = (function_vector_t *) enif_alloc(sizeof(function_vector_t));
724    if (ctx->functions == NULL) {
725        throw std::bad_alloc();
726    }
727
728    ctx->functions = new (ctx->functions) function_vector_t();
729    function_sources_list_t::const_iterator it = funStrings.begin();
730
731    for ( ; it != funStrings.end(); ++it) {
732        Handle<Function> fun = compileFunction(*it);
733        // Do this if is_doc_unused function compilation is successful
734        if(optimize_doc_load && !isDocUsed) {
735            Handle<Value> val = context->Global()->Get(
736                createUtf8String(ctx->isolate, "is_doc_unused"));
737            Handle<Function> unusedFun = Handle<Function>::Cast(val);
738            Handle<Value> arg = createUtf8String(ctx->isolate, it->data());
739            TryCatch try_catch(ctx->isolate);
740            Handle<Value> js_result = unusedFun->Call(context->Global(), 1, &arg);
741            if (try_catch.HasCaught()) {
742                throw MapReduceError(exceptionString(try_catch));
743            }
744            if (js_result.IsEmpty()) {
745                // Some error during static analysis of map function
746                throw MapReduceError("Malformed map function");
747            }
748            bool isDocUnused = js_result->BooleanValue();
749            if (isDocUnused == false) {
750                isDocUsed = true;
751            }
752        }
753        Persistent<Function> *perFn =
754            (Persistent<Function> *) enif_alloc(sizeof(Persistent<Function>));
755        if (perFn == NULL) {
756            throw std::bad_alloc();
757        }
758        perFn = new (perFn) Persistent<Function>();
759        perFn->Reset(ctx->isolate, fun);
760        ctx->functions->push_back(perFn);
761    }
762    ctx->isDocUsed = isDocUsed;
763}
764
765
766Handle<Function> compileFunction(const function_source_t &funSource)
767{
768    Isolate *isolate = Isolate::GetCurrent();
769    Local<Context> context(isolate->GetCurrentContext());
770    EscapableHandleScope handle_scope(isolate);
771    TryCatch try_catch(isolate);
772    Handle<String> source =
773        createUtf8String(isolate, funSource.data(), funSource.length());
774    Local<Script> script;
775    if (!Script::Compile(context, source).ToLocal(&script)) {
776        throw MapReduceError(exceptionString(try_catch));
777    }
778
779    if (script.IsEmpty()) {
780        throw MapReduceError(exceptionString(try_catch));
781    }
782
783    Handle<Value> result = script->Run();
784
785    if (result.IsEmpty()) {
786        throw MapReduceError(exceptionString(try_catch));
787    }
788
789    if (!result->IsFunction()) {
790        throw MapReduceError(std::string("Invalid function: ") +
791                funSource.c_str());
792    }
793
794    return handle_scope.Escape(Handle<Function>::Cast(result));
795}
796
797
798Handle<Array> jsonListToJsArray(const json_results_list_t &list)
799{
800    Isolate *isolate = Isolate::GetCurrent();
801    Handle<Array> array = Array::New(isolate, list.size());
802    json_results_list_t::const_iterator it = list.begin();
803    int i = 0;
804
805    for ( ; it != list.end(); ++it, ++i) {
806        Handle<Value> v = jsonParse(*it);
807        array->Set(Number::New(isolate, i), v);
808    }
809
810    return array;
811}
812
813
814isolate_data_t *getIsolateData()
815{
816    Isolate *isolate = Isolate::GetCurrent();
817    return reinterpret_cast<isolate_data_t*>(isolate->GetData(0));
818}
819
820
821void taskStarted(map_reduce_ctx_t *ctx)
822{
823    ctx->exitMutex.lock();
824    ctx->taskStartTime = std::chrono::high_resolution_clock::now();
825    ctx->exitMutex.unlock();
826    ctx->kvs = NULL;
827}
828
829
830void taskFinished(map_reduce_ctx_t *ctx)
831{
832    ctx->exitMutex.lock();
833    ctx->taskStartTime = {};
834    ctx->exitMutex.unlock();
835}
836
837
838void terminateTask(map_reduce_ctx_t *ctx)
839{
840    V8::TerminateExecution(ctx->isolate);
841    ctx->taskStartTime = {};
842}
843
844
845std::string exceptionString(const TryCatch &tryCatch)
846{
847    HandleScope handle_scope(Isolate::GetCurrent());
848    String::Utf8Value exception(tryCatch.Exception());
849    const char *exceptionString = (*exception);
850
851    if (exceptionString) {
852        Handle<Message> message = tryCatch.Message();
853        return std::string(exceptionString) + " (line " +
854                std::to_string(message->GetLineNumber()) + ":" +
855                std::to_string(message->GetStartColumn()) + ")";
856    }
857
858    return std::string("runtime error");
859}
860
861
862void freeMapResultList(const map_results_list_t &results)
863{
864    map_results_list_t::const_iterator it = results.begin();
865
866    for ( ; it != results.end(); ++it) {
867        freeMapResult(*it);
868    }
869}
870
871
872void freeMapResult(const map_result_t &mapResult)
873{
874    switch (mapResult.type) {
875    case MAP_KVS:
876        {
877            kv_pair_list_t::const_iterator it = mapResult.result.kvs->begin();
878            for ( ; it != mapResult.result.kvs->end(); ++it) {
879                ErlNifBinary key = it->first;
880                ErlNifBinary value = it->second;
881                enif_release_binary(&key);
882                enif_release_binary(&value);
883            }
884            mapResult.result.kvs->~kv_pair_list_t();
885            enif_free(mapResult.result.kvs);
886        }
887        break;
888    case MAP_ERROR:
889        enif_release_binary(mapResult.result.error);
890        enif_free(mapResult.result.error);
891        break;
892    }
893}
894
895
896void freeJsonData(const json_results_list_t &data)
897{
898    json_results_list_t::const_iterator it = data.begin();
899    for ( ; it != data.end(); ++it) {
900        ErlNifBinary bin = *it;
901        enif_release_binary(&bin);
902    }
903}
904
905// Free the logresults with data. For avoiding one extra allocation
906// of logResults, allocation is done in log function, so which can cause
907// memory leak if it is called from other (reduce, rereduce) context,
908// so free that memory here.
909void freeLogResults(map_reduce_ctx_t *ctx)
910{
911    if (ctx->logResults) {
912        log_results_list_t::reverse_iterator k = ctx->logResults->rbegin();
913        for ( ; k != ctx->logResults->rend(); ++k) {
914            enif_release_binary_compat(ctx->env, &(*k));
915        }
916        ctx->logResults->~log_results_list_t();
917        enif_free(ctx->logResults);
918        ctx->logResults = NULL;
919    }
920}
921