xref: /5.5.2/couchdb/src/mapreduce/mapreduce.cc (revision 1c372c04)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/**
3 * @copyright 2012 Couchbase, Inc.
4 *
5 * @author Filipe Manana  <filipe@couchbase.com>
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at
10 *
11 *  http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations under
17 * the License.
18 **/
19
20#include <iostream>
21#include <list>
22#include <vector>
23#include <string>
24#include <string.h>
25#include <sstream>
26#include <time.h>
27#include "mapreduce.h"
28// This is libv8_libplatform library which handles garbage collection for v8
29#include <include/libplatform/libplatform.h>
30// Esprima unused and builtin JavaScript contents in raw string format
31#include "jsfunctions/jsfunctions_data.h"
32
33#define MAX_LOG_STRING_SIZE 1024
34
35#define MAX_EMIT_KEY_SIZE 4096
36
37using namespace v8;
38
39typedef struct {
40    Persistent<Object>    jsonObject;
41    Persistent<Function>  jsonParseFun;
42    Persistent<Function>  stringifyFun;
43    map_reduce_ctx_t      *ctx;
44} isolate_data_t;
45
46static bool optimize_doc_load = true;
47static void doInitContext(map_reduce_ctx_t *ctx,
48                          const function_sources_list_t &funs,
49                          const view_index_type_t viewType);
50static Local<Context> createJsContext(map_reduce_ctx_t *ctx);
51static void emit(const v8::FunctionCallbackInfo<Value>& args);
52static void log(const v8::FunctionCallbackInfo<Value>& args);
53
54static void loadFunctions(map_reduce_ctx_t *ctx,
55                          const function_sources_list_t &funs);
56static void freeJsonData(const json_results_list_t &data);
57static void freeMapResult(const map_result_t &data);
58static void freeMapResultList(const map_results_list_t &results);
59static Handle<Function> compileFunction(const function_source_t &funSource);
60static inline ErlNifBinary jsonStringify(const Handle<Value> &obj);
61static inline Handle<Value> jsonParse(const ErlNifBinary &thing);
62static inline Handle<Array> jsonListToJsArray(const json_results_list_t &list);
63static inline isolate_data_t *getIsolateData();
64static inline void taskStarted(map_reduce_ctx_t *ctx);
65static inline void taskFinished(map_reduce_ctx_t *ctx);
66static std::string exceptionString(const TryCatch &tryCatch);
67static void freeLogResults(map_reduce_ctx_t *ctx);
68
69static Platform *v8platform;
70static StartupData startupData;
71void initV8()
72{
73    V8::InitializeICU();
74    v8platform = platform::CreateDefaultPlatform();
75    V8::InitializePlatform(v8platform);
76    V8::Initialize();
77    startupData = V8::CreateSnapshotDataBlob((char *)jsFunction_src);
78}
79
80void deinitV8()
81{
82    V8::Dispose();
83    V8::ShutdownPlatform();
84    delete v8platform;
85    delete[] startupData.data;
86}
87
88void setOptimizeDocLoadFlag(const char *flag)
89{
90    if(!strcmp(flag, "true"))
91        optimize_doc_load = true;
92    else
93        optimize_doc_load = false;
94}
95
96void initContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs,
97                 const view_index_type_t viewType)
98{
99    ctx = new (ctx) map_reduce_ctx_t();
100
101    try {
102        doInitContext(ctx, funs, viewType);
103        Locker locker(ctx->isolate);
104        Isolate::Scope isolate_scope(ctx->isolate);
105        HandleScope handle_scope(ctx->isolate);
106        Local<Context> context =
107            Local<Context>::New(ctx->isolate, ctx->jsContext);
108        Context::Scope context_scope(context);
109
110        loadFunctions(ctx, funs);
111    } catch (...) {
112        destroyContext(ctx);
113        throw;
114    }
115}
116
117static Local<String> createUtf8String(Isolate *isolate, const char *str)
118{
119    return String::NewFromUtf8(isolate, str,
120        NewStringType::kNormal).ToLocalChecked();
121}
122
123static Local<String> createUtf8String(Isolate *isolate, const char *str,
124                                      size_t len)
125{
126    return String::NewFromUtf8(isolate, str,
127        NewStringType::kNormal, len).ToLocalChecked();
128}
129
130
131void doInitContext(map_reduce_ctx_t *ctx, const function_sources_list_t &funs,
132                   const view_index_type_t viewType)
133{
134    ctx->viewType = viewType;
135    ctx->bufAllocator = new ArrayBufferAllocator();
136    Isolate::CreateParams createParams;
137    createParams.snapshot_blob = &startupData;
138    createParams.array_buffer_allocator = ctx->bufAllocator;
139    ctx->isolate = Isolate::New(createParams);
140    ctx->logResults = NULL;
141    Locker locker(ctx->isolate);
142    Isolate::Scope isolate_scope(ctx->isolate);
143    HandleScope handle_scope(ctx->isolate);
144
145    ctx->jsContext.Reset(ctx->isolate, createJsContext(ctx));
146    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
147    Context::Scope context_scope(context);
148
149    Local<String> jsonString = createUtf8String(ctx->isolate, "JSON");
150    Handle<Object> jsonObject =
151        Local<Object>::Cast(context->Global()->Get(jsonString));
152
153    Local<String> parseString = createUtf8String(ctx->isolate, "parse");
154    Handle<Function> parseFun =
155        Local<Function>::Cast(jsonObject->Get(parseString));
156    Local<String> stringifyString = createUtf8String(ctx->isolate, "stringify");
157    Handle<Function> stringifyFun =
158        Local<Function>::Cast(jsonObject->Get(stringifyString));
159
160    isolate_data_t *isoData =
161        (isolate_data_t *) enif_alloc(sizeof(isolate_data_t));
162    if (isoData == NULL) {
163        throw std::bad_alloc();
164    }
165
166    isoData = new (isoData) isolate_data_t();
167    isoData->jsonObject.Reset(ctx->isolate, jsonObject);
168    isoData->jsonParseFun.Reset(ctx->isolate, parseFun);
169    isoData->stringifyFun.Reset(ctx->isolate, stringifyFun);
170
171    isoData->ctx = ctx;
172
173    ctx->isolate->SetData(0, (void *)isoData);
174    ctx->taskStartTime = 0;
175}
176
177
178map_results_list_t mapDoc(map_reduce_ctx_t *ctx,
179                          const ErlNifBinary &doc,
180                          const ErlNifBinary &meta)
181{
182    Locker locker(ctx->isolate);
183    Isolate::Scope isolate_scope(ctx->isolate);
184    HandleScope handle_scope(ctx->isolate);
185    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
186    Context::Scope context_scope(context);
187    Handle<Value> docObject = jsonParse(doc);
188    Handle<Value> metaObject = jsonParse(meta);
189
190    if (!metaObject->IsObject()) {
191        throw MapReduceError("metadata is not a JSON object");
192    }
193
194    map_results_list_t results;
195    Handle<Value> funArgs[] = { docObject, metaObject };
196
197    taskStarted(ctx);
198
199    for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
200        map_result_t mapResult;
201        Local<Function> fun =
202            Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
203        TryCatch try_catch(ctx->isolate);
204
205        mapResult.type = MAP_KVS;
206        mapResult.result.kvs =
207            (kv_pair_list_t *) enif_alloc(sizeof(kv_pair_list_t));
208
209        if (mapResult.result.kvs == NULL) {
210            freeMapResultList(results);
211            throw std::bad_alloc();
212        }
213
214        mapResult.result.kvs = new (mapResult.result.kvs) kv_pair_list_t();
215        ctx->kvs = mapResult.result.kvs;
216        ctx->emitKvSize = 0;
217        Handle<Value> result = fun->Call(context->Global(), 2, funArgs);
218
219        if (result.IsEmpty()) {
220            freeMapResult(mapResult);
221
222            if (!try_catch.CanContinue()) {
223                freeMapResultList(results);
224                throw MapReduceError("timeout");
225            }
226
227            mapResult.type = MAP_ERROR;
228            std::string exceptString = exceptionString(try_catch);
229            size_t len = exceptString.length();
230
231            mapResult.result.error =
232                (ErlNifBinary *) enif_alloc(sizeof(ErlNifBinary));
233            if (mapResult.result.error == NULL) {
234                freeMapResultList(results);
235                throw std::bad_alloc();
236            }
237            if (!enif_alloc_binary_compat(ctx->env, len,
238                    mapResult.result.error)) {
239                freeMapResultList(results);
240                throw std::bad_alloc();
241            }
242            // Caller responsible for invoking enif_make_binary()
243            // or enif_release_binary()
244            memcpy(mapResult.result.error->data, exceptString.data(), len);
245        }
246
247        results.push_back(mapResult);
248    }
249
250    taskFinished(ctx);
251
252    return results;
253}
254
255
256json_results_list_t runReduce(map_reduce_ctx_t *ctx,
257                              const json_results_list_t &keys,
258                              const json_results_list_t &values)
259{
260    Locker locker(ctx->isolate);
261    Isolate::Scope isolate_scope(ctx->isolate);
262    HandleScope handle_scope(ctx->isolate);
263    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
264    Context::Scope context_scope(context);
265    Handle<Array> keysArray = jsonListToJsArray(keys);
266    Handle<Array> valuesArray = jsonListToJsArray(values);
267    json_results_list_t results;
268
269    Handle<Value> args[] = { keysArray, valuesArray,
270                             Boolean::New(ctx->isolate, false) };
271
272    taskStarted(ctx);
273
274    for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
275        Local<Function> fun =
276            Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
277        TryCatch try_catch(ctx->isolate);
278        Handle<Value> result = fun->Call(context->Global(), 3, args);
279
280        if (result.IsEmpty()) {
281            freeJsonData(results);
282
283            if (!try_catch.CanContinue()) {
284                throw MapReduceError("timeout");
285            }
286
287            throw MapReduceError(exceptionString(try_catch));
288        }
289
290        try {
291            ErlNifBinary jsonResult = jsonStringify(result);
292            results.push_back(jsonResult);
293        } catch(...) {
294            freeJsonData(results);
295            throw;
296        }
297    }
298
299    taskFinished(ctx);
300    freeLogResults(ctx);
301
302    return results;
303}
304
305
306ErlNifBinary runReduce(map_reduce_ctx_t *ctx,
307                       int reduceFunNum,
308                       const json_results_list_t &keys,
309                       const json_results_list_t &values)
310{
311    Locker locker(ctx->isolate);
312    Isolate::Scope isolate_scope(ctx->isolate);
313    HandleScope handle_scope(ctx->isolate);
314    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
315    Context::Scope context_scope(context);
316
317    reduceFunNum -= 1;
318    if (reduceFunNum < 0 ||
319        static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
320        throw MapReduceError("invalid reduce function number");
321    }
322
323    Local<Function> fun =
324        Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
325    Handle<Array> keysArray = jsonListToJsArray(keys);
326    Handle<Array> valuesArray = jsonListToJsArray(values);
327    Handle<Value> args[] = { keysArray, valuesArray,
328                             Boolean::New(ctx->isolate, false) };
329
330    taskStarted(ctx);
331
332    TryCatch try_catch(ctx->isolate);
333    Handle<Value> result = fun->Call(context->Global(), 3, args);
334
335    taskFinished(ctx);
336    freeLogResults(ctx);
337
338    if (result.IsEmpty()) {
339        if (!try_catch.CanContinue()) {
340            throw MapReduceError("timeout");
341        }
342
343        throw MapReduceError(exceptionString(try_catch));
344    }
345
346    return jsonStringify(result);
347}
348
349
350ErlNifBinary runRereduce(map_reduce_ctx_t *ctx,
351                       int reduceFunNum,
352                       const json_results_list_t &reductions)
353{
354    Locker locker(ctx->isolate);
355    Isolate::Scope isolate_scope(ctx->isolate);
356    HandleScope handle_scope(ctx->isolate);
357    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
358    Context::Scope context_scope(context);
359
360    reduceFunNum -= 1;
361    if (reduceFunNum < 0 ||
362        static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
363        throw MapReduceError("invalid reduce function number");
364    }
365
366    Local<Function> fun =
367        Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
368    Handle<Array> valuesArray = jsonListToJsArray(reductions);
369    Handle<Value> args[] =
370        { Null(ctx->isolate), valuesArray, Boolean::New(ctx->isolate, true) };
371
372    taskStarted(ctx);
373
374    TryCatch try_catch(ctx->isolate);
375    Handle<Value> result = fun->Call(context->Global(), 3, args);
376
377    taskFinished(ctx);
378    freeLogResults(ctx);
379
380    if (result.IsEmpty()) {
381        if (!try_catch.CanContinue()) {
382            throw MapReduceError("timeout");
383        }
384
385        throw MapReduceError(exceptionString(try_catch));
386    }
387
388    return jsonStringify(result);
389}
390
391
392void destroyContext(map_reduce_ctx_t *ctx)
393{
394    {
395        Locker locker(ctx->isolate);
396        Isolate::Scope isolate_scope(ctx->isolate);
397        HandleScope handle_scope(ctx->isolate);
398        Local<Context> context =
399            Local<Context>::New(ctx->isolate, ctx->jsContext);
400        Context::Scope context_scope(context);
401
402        for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
403            (*ctx->functions)[i]->Reset();
404            (*ctx->functions)[i]->~Persistent<v8::Function>();
405            enif_free((*ctx->functions)[i]);
406        }
407        ctx->functions->~function_vector_t();
408        enif_free(ctx->functions);
409
410        isolate_data_t *isoData = getIsolateData();
411        isoData->jsonObject.Reset();
412        isoData->jsonParseFun.Reset();
413        isoData->stringifyFun.Reset();
414        isoData->~isolate_data_t();
415        enif_free(isoData);
416
417        ctx->jsContext.Reset();
418    }
419
420    ctx->isolate->Dispose();
421    delete ctx->bufAllocator;
422    ctx->~map_reduce_ctx_t();
423}
424
425
426static Local<Context> createJsContext(map_reduce_ctx_t *ctx)
427{
428    EscapableHandleScope handle_scope(ctx->isolate);
429    Handle<ObjectTemplate> global = ObjectTemplate::New();
430
431    global->Set(createUtf8String(ctx->isolate, "emit"),
432            FunctionTemplate::New(ctx->isolate, emit));
433
434    global->Set(createUtf8String(ctx->isolate, "log"),
435            FunctionTemplate::New(ctx->isolate, log));
436
437    Handle<Context> context = Context::New(ctx->isolate, NULL, global);
438    Context::Scope context_scope(context);
439
440    // Use EscapableHandleScope and return using .Escape
441    // This will ensure that return values are not garbage collected
442    // as soon as the function returns.
443    return handle_scope.Escape(context);
444}
445
446#define TRUNCATE_STR "Truncated: "
447
448static void log(const v8::FunctionCallbackInfo<Value>& args)
449{
450    isolate_data_t *isoData = getIsolateData();
451    map_reduce_ctx_t *ctx = isoData->ctx;
452    try {
453        /* Initialize only if log function is used */
454        if (ctx->logResults == NULL) {
455            ctx->logResults = (log_results_list_t *) enif_alloc(sizeof
456                    (log_results_list_t));
457            if (ctx->logResults == NULL) {
458                throw std::bad_alloc();
459            }
460            ctx->logResults = new (ctx->logResults) log_results_list_t();
461        }
462        /* use only first argument */
463        Handle<Value> logMsg = args[0];
464        Handle<String> str;
465        unsigned int len = 0;
466        if (logMsg->IsString()) {
467            str = Handle<String>::Cast(logMsg);
468            len = str->Length();
469            if (len > MAX_LOG_STRING_SIZE) {
470                str = Handle<String>(String::Concat(
471                          createUtf8String(ctx->isolate, TRUNCATE_STR), str)),
472                len = MAX_LOG_STRING_SIZE + sizeof(TRUNCATE_STR) - 1;
473            }
474        } else {
475            str = Handle<String>(createUtf8String(ctx->isolate,
476                        "Error while logging:Log value is not a string"));
477            len = str->Length();
478        }
479        ErlNifBinary resultBin;
480        if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
481            throw std::bad_alloc();
482        }
483        str->WriteUtf8(reinterpret_cast<char *>(resultBin.data),
484                len, NULL, String::NO_NULL_TERMINATION);
485        ctx->logResults->push_back(resultBin);
486    } catch(Handle<String> &ex) {
487        ctx->isolate->ThrowException(ex);
488    }
489}
490
491
492static void emit(const v8::FunctionCallbackInfo<Value>& args)
493{
494    isolate_data_t *isoData = getIsolateData();
495
496    if (isoData->ctx->kvs == NULL) {
497        return;
498    }
499
500    ErlNifBinary keyJson = jsonStringify(args[0]);
501
502    // Spatial views may emit a geometry that is bigger, when serialized
503    // to JSON, than the allowed size of a key. In later steps it will then
504    // be reduced to a bouning box. Hence don't check the string size of the
505    // key of spatial views here.
506    if (isoData->ctx->viewType != VIEW_INDEX_TYPE_SPATIAL &&
507            keyJson.size >= MAX_EMIT_KEY_SIZE) {
508        std::stringstream msg;
509        msg << "too long key emitted: " << keyJson.size << " bytes";
510
511        isoData->ctx->isolate->ThrowException(
512                createUtf8String(isoData->ctx->isolate, msg.str().c_str())
513                                             );
514	return;
515    }
516
517    try {
518        ErlNifBinary valueJson = jsonStringify(args[1]);
519
520        kv_pair_t result = kv_pair_t(keyJson, valueJson);
521        isoData->ctx->kvs->push_back(result);
522        isoData->ctx->emitKvSize += keyJson.size;
523        isoData->ctx->emitKvSize += valueJson.size;
524
525    } catch(Handle<String> &ex) {
526        isoData->ctx->isolate->ThrowException(ex);
527    }
528
529    if ((isoData->ctx->maxEmitKvSize > 0) &&
530        (isoData->ctx->emitKvSize > isoData->ctx->maxEmitKvSize)) {
531        std::stringstream msg;
532        msg << "too much data emitted: " << isoData->ctx->emitKvSize << " bytes";
533
534        isoData->ctx->isolate->ThrowException(
535                createUtf8String(isoData->ctx->isolate, msg.str().c_str()));
536    }
537}
538
539
540ErlNifBinary jsonStringify(const Handle<Value> &obj)
541{
542    isolate_data_t *isoData = getIsolateData();
543    Handle<Value> args[] = { obj };
544    TryCatch try_catch(isoData->ctx->isolate);
545    Local<Function> stringifyFun =
546        Local<Function>::New(isoData->ctx->isolate, isoData->stringifyFun);
547    Local<Object> jsonObject =
548        Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
549    Handle<Value> result = stringifyFun->Call(jsonObject, 1, args);
550
551    if (result.IsEmpty()) {
552        if (try_catch.HasCaught()) {
553            Local<Message> m = try_catch.Message();
554            if (!m.IsEmpty()) {
555                throw Local<String>(m->Get());
556            }
557        }
558        throw Handle<Value>(createUtf8String(isoData->ctx->isolate,
559                    "JSON.stringify() error"));
560    }
561
562    unsigned len;
563    ErlNifBinary resultBin;
564
565    if (result->IsUndefined()) {
566        len = static_cast<unsigned>(sizeof("null") - 1);
567        if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
568            throw std::bad_alloc();
569        }
570        memcpy(resultBin.data, "null", len);
571    } else {
572        Handle<String> str = Handle<String>::Cast(result);
573        len = str->Utf8Length();
574        if (!enif_alloc_binary_compat(isoData->ctx->env, len, &resultBin)) {
575            throw std::bad_alloc();
576        }
577        str->WriteUtf8(reinterpret_cast<char *>(resultBin.data),
578                       len, NULL, String::NO_NULL_TERMINATION);
579    }
580
581    // Caller responsible for invoking enif_make_binary()
582    // or enif_release_binary()
583    return resultBin;
584}
585
586
587Handle<Value> jsonParse(const ErlNifBinary &thing)
588{
589    isolate_data_t *isoData = getIsolateData();
590    Handle<Value> args[] = { createUtf8String(isoData->ctx->isolate,
591                             reinterpret_cast<char *>(thing.data),
592                             (size_t)thing.size) };
593    TryCatch try_catch(isoData->ctx->isolate);
594    Local<Function> jsonParseFun =
595        Local<Function>::New(isoData->ctx->isolate, isoData->jsonParseFun);
596    Local<Object> jsonObject =
597        Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
598    Handle<Value> result = jsonParseFun->Call(jsonObject, 1, args);
599
600    if (result.IsEmpty()) {
601        throw MapReduceError(exceptionString(try_catch));
602    }
603
604    return result;
605}
606
607
608void loadFunctions(map_reduce_ctx_t *ctx,
609                   const function_sources_list_t &funStrings)
610{
611    HandleScope handle_scope(ctx->isolate);
612    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
613    Context::Scope context_scope(context);
614
615    bool isDocUsed;
616    if(optimize_doc_load) {
617        // If esprima compilation fails restore back to pulling in documents.
618        try {
619            compileFunction("is_doc_unused");
620            isDocUsed = false;
621        } catch(...) {
622            isDocUsed = true;
623        }
624    }
625    else {
626        isDocUsed = true;
627    }
628
629    ctx->functions = (function_vector_t *) enif_alloc(sizeof(function_vector_t));
630    if (ctx->functions == NULL) {
631        throw std::bad_alloc();
632    }
633
634    ctx->functions = new (ctx->functions) function_vector_t();
635    function_sources_list_t::const_iterator it = funStrings.begin();
636
637    for ( ; it != funStrings.end(); ++it) {
638        Handle<Function> fun = compileFunction(*it);
639        if(optimize_doc_load) {
640            Handle<Value> val = context->Global()->Get(
641                createUtf8String(ctx->isolate, "is_doc_unused"));
642            Handle<Function> unusedFun = Handle<Function>::Cast(val);
643            Handle<Value> arg = createUtf8String(ctx->isolate, it->data());
644            Handle<Value> js_result = unusedFun->Call(context->Global(), 1, &arg);
645            bool isDocUnused = js_result->BooleanValue();
646            if (isDocUnused == false) {
647                isDocUsed = true;
648            }
649        }
650        Persistent<Function> *perFn =
651            (Persistent<Function> *) enif_alloc(sizeof(Persistent<Function>));
652        if (perFn == NULL) {
653            throw std::bad_alloc();
654        }
655        perFn = new (perFn) Persistent<Function>();
656        perFn->Reset(ctx->isolate, fun);
657        ctx->functions->push_back(perFn);
658    }
659    ctx->isDocUsed = isDocUsed;
660}
661
662
663Handle<Function> compileFunction(const function_source_t &funSource)
664{
665    Isolate *isolate = Isolate::GetCurrent();
666    Local<Context> context(isolate->GetCurrentContext());
667    EscapableHandleScope handle_scope(isolate);
668    TryCatch try_catch(isolate);
669    Handle<String> source =
670        createUtf8String(isolate, funSource.data(), funSource.length());
671    Local<Script> script;
672    if (!Script::Compile(context, source).ToLocal(&script)) {
673        throw MapReduceError(exceptionString(try_catch));
674    }
675
676    if (script.IsEmpty()) {
677        throw MapReduceError(exceptionString(try_catch));
678    }
679
680    Handle<Value> result = script->Run();
681
682    if (result.IsEmpty()) {
683        throw MapReduceError(exceptionString(try_catch));
684    }
685
686    if (!result->IsFunction()) {
687        throw MapReduceError(std::string("Invalid function: ") +
688                funSource.c_str());
689    }
690
691    return handle_scope.Escape(Handle<Function>::Cast(result));
692}
693
694
695Handle<Array> jsonListToJsArray(const json_results_list_t &list)
696{
697    Isolate *isolate = Isolate::GetCurrent();
698    Handle<Array> array = Array::New(isolate, list.size());
699    json_results_list_t::const_iterator it = list.begin();
700    int i = 0;
701
702    for ( ; it != list.end(); ++it, ++i) {
703        Handle<Value> v = jsonParse(*it);
704        array->Set(Number::New(isolate, i), v);
705    }
706
707    return array;
708}
709
710
711isolate_data_t *getIsolateData()
712{
713    Isolate *isolate = Isolate::GetCurrent();
714    return reinterpret_cast<isolate_data_t*>(isolate->GetData(0));
715}
716
717
718void taskStarted(map_reduce_ctx_t *ctx)
719{
720    ctx->taskStartTime = gethrtime();
721    ctx->kvs = NULL;
722}
723
724
725void taskFinished(map_reduce_ctx_t *ctx)
726{
727    ctx->taskStartTime = 0;
728}
729
730
731void terminateTask(map_reduce_ctx_t *ctx)
732{
733    V8::TerminateExecution(ctx->isolate);
734    taskFinished(ctx);
735}
736
737
738std::string exceptionString(const TryCatch &tryCatch)
739{
740    HandleScope handle_scope(Isolate::GetCurrent());
741    String::Utf8Value exception(tryCatch.Exception());
742    const char *exceptionString = (*exception);
743
744    if (exceptionString) {
745        Handle<Message> message = tryCatch.Message();
746        return std::string(exceptionString) + " (line " +
747                std::to_string(message->GetLineNumber()) + ":" +
748                std::to_string(message->GetStartColumn()) + ")";
749    }
750
751    return std::string("runtime error");
752}
753
754
755void freeMapResultList(const map_results_list_t &results)
756{
757    map_results_list_t::const_iterator it = results.begin();
758
759    for ( ; it != results.end(); ++it) {
760        freeMapResult(*it);
761    }
762}
763
764
765void freeMapResult(const map_result_t &mapResult)
766{
767    switch (mapResult.type) {
768    case MAP_KVS:
769        {
770            kv_pair_list_t::const_iterator it = mapResult.result.kvs->begin();
771            for ( ; it != mapResult.result.kvs->end(); ++it) {
772                ErlNifBinary key = it->first;
773                ErlNifBinary value = it->second;
774                enif_release_binary(&key);
775                enif_release_binary(&value);
776            }
777            mapResult.result.kvs->~kv_pair_list_t();
778            enif_free(mapResult.result.kvs);
779        }
780        break;
781    case MAP_ERROR:
782        enif_release_binary(mapResult.result.error);
783        enif_free(mapResult.result.error);
784        break;
785    }
786}
787
788
789void freeJsonData(const json_results_list_t &data)
790{
791    json_results_list_t::const_iterator it = data.begin();
792    for ( ; it != data.end(); ++it) {
793        ErlNifBinary bin = *it;
794        enif_release_binary(&bin);
795    }
796}
797
798// Free the logresults with data. For avoiding one extra allocation
799// of logResults, allocation is done in log function, so which can cause
800// memory leak if it is called from other (reduce, rereduce) context,
801// so free that memory here.
802void freeLogResults(map_reduce_ctx_t *ctx)
803{
804    if (ctx->logResults) {
805        log_results_list_t::reverse_iterator k = ctx->logResults->rbegin();
806        for ( ; k != ctx->logResults->rend(); ++k) {
807            enif_release_binary_compat(ctx->env, &(*k));
808        }
809        ctx->logResults->~log_results_list_t();
810        enif_free(ctx->logResults);
811        ctx->logResults = NULL;
812    }
813}
814