1/**
2 * @copyright 2013 Couchbase, Inc.
3 *
4 * @author Filipe Manana  <filipe@couchbase.com>
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7 * use this file except in compliance with the License. You may obtain a copy of
8 * the License at
9 *
10 *  http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 * License for the specific language governing permissions and limitations under
16 * the License.
17 **/
18
19#include "mapreduce.h"
20#include "mapreduce_internal.h"
21#include <iostream>
22#include <cstring>
23#include <platform/cb_malloc.h>
24#include <stdlib.h>
25// This is libv8_libplatform library which handles garbage collection for v8
26#include <libplatform/libplatform.h>
27
28using namespace v8;
29
30typedef struct {
31    Persistent<Object>    jsonObject;
32    Persistent<Function>  jsonParseFun;
33    Persistent<Function>  stringifyFun;
34    mapreduce_ctx_t       *ctx;
35} isolate_data_t;
36
37
38static const char *SUM_FUNCTION_STRING =
39    "(function(values) {"
40    "    var sum = 0;"
41    "    for (var i = 0; i < values.length; ++i) {"
42    "        sum += values[i];"
43    "    }"
44    "    return sum;"
45    "})";
46
47static const char *DATE_FUNCTION_STRING =
48    // I wish it was on the prototype, but that will require bigger
49    // C changes as adding to the date prototype should be done on
50    // process launch. The code you see here may be faster, but it
51    // is less JavaScripty.
52    // "Date.prototype.toArray = (function() {"
53    "(function(date) {"
54    "    date = date.getUTCDate ? date : new Date(date);"
55    "    return isFinite(date.valueOf()) ?"
56    "      [date.getUTCFullYear(),"
57    "      (date.getUTCMonth() + 1),"
58    "       date.getUTCDate(),"
59    "       date.getUTCHours(),"
60    "       date.getUTCMinutes(),"
61    "       date.getUTCSeconds()] : null;"
62    "})";
63
64static const char *BASE64_FUNCTION_STRING =
65    "(function(b64) {"
66    "    var i, j, l, tmp, scratch, arr = [];"
67    "    var lookup = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';"
68    "    if (typeof b64 !== 'string') {"
69    "        throw 'Input is not a string';"
70    "    }"
71    "    if (b64.length % 4 > 0) {"
72    "        throw 'Invalid base64 source.';"
73    "    }"
74    "    scratch = b64.indexOf('=');"
75    "    scratch = scratch > 0 ? b64.length - scratch : 0;"
76    "    l = scratch > 0 ? b64.length - 4 : b64.length;"
77    "    for (i = 0, j = 0; i < l; i += 4, j += 3) {"
78    "        tmp = (lookup.indexOf(b64[i]) << 18) | (lookup.indexOf(b64[i + 1]) << 12);"
79    "        tmp |= (lookup.indexOf(b64[i + 2]) << 6) | lookup.indexOf(b64[i + 3]);"
80    "        arr.push((tmp & 0xFF0000) >> 16);"
81    "        arr.push((tmp & 0xFF00) >> 8);"
82    "        arr.push(tmp & 0xFF);"
83    "    }"
84    "    if (scratch === 2) {"
85    "        tmp = (lookup.indexOf(b64[i]) << 2) | (lookup.indexOf(b64[i + 1]) >> 4);"
86    "        arr.push(tmp & 0xFF);"
87    "    } else if (scratch === 1) {"
88    "        tmp = (lookup.indexOf(b64[i]) << 10) | (lookup.indexOf(b64[i + 1]) << 4);"
89    "        tmp |= (lookup.indexOf(b64[i + 2]) >> 2);"
90    "        arr.push((tmp >> 8) & 0xFF);"
91    "        arr.push(tmp & 0xFF);"
92    "    }"
93    "    return arr;"
94    "})";
95
96
97
98static Local<Context> createJsContext();
99static void emit(const FunctionCallbackInfo<Value> &args);
100
101static void doInitContext(mapreduce_ctx_t *ctx);
102static Handle<Function> compileFunction(const std::string &function);
103static std::string exceptionString(const TryCatch &tryCatch);
104static void loadFunctions(mapreduce_ctx_t *ctx,
105                          const std::list<std::string> &function_sources);
106static inline isolate_data_t *getIsolateData();
107static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj);
108static inline Handle<Value> jsonParse(const mapreduce_json_t &thing);
109static inline void taskStarted(mapreduce_ctx_t *ctx);
110static inline void taskFinished(mapreduce_ctx_t *ctx);
111static void freeKvListEntries(kv_list_int_t &kvs);
112static void freeJsonListEntries(json_results_list_t &list);
113static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list);
114
115static Platform *v8platform;
116void initV8()
117{
118    V8::InitializeICU();
119    v8platform = platform::CreateDefaultPlatform();
120    V8::InitializePlatform(v8platform);
121    V8::Initialize();
122}
123
124void deinitV8()
125{
126    V8::Dispose();
127    V8::ShutdownPlatform();
128    delete v8platform;
129}
130
131void initContext(mapreduce_ctx_t *ctx,
132                 const std::list<std::string> &function_sources)
133{
134    doInitContext(ctx);
135
136    try {
137        Locker locker(ctx->isolate);
138        Isolate::Scope isolate_scope(ctx->isolate);
139        HandleScope handle_scope(ctx->isolate);
140        Local<Context> context =
141            Local<Context>::New(ctx->isolate, ctx->jsContext);
142        Context::Scope context_scope(context);
143
144        loadFunctions(ctx, function_sources);
145    } catch (...) {
146        destroyContext(ctx);
147        throw;
148    }
149}
150
151
152void destroyContext(mapreduce_ctx_t *ctx)
153{
154    {
155        Locker locker(ctx->isolate);
156        Isolate::Scope isolate_scope(ctx->isolate);
157        HandleScope handle_scope(ctx->isolate);
158        Local<Context> context =
159            Local<Context>::New(ctx->isolate, ctx->jsContext);
160        Context::Scope context_scope(context);
161
162        for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
163            (*ctx->functions)[i]->Reset();
164            delete (*ctx->functions)[i];
165
166        }
167        delete ctx->functions;
168
169        isolate_data_t *isoData = getIsolateData();
170        isoData->jsonObject.Reset();
171        isoData->jsonParseFun.Reset();
172        isoData->stringifyFun.Reset();
173        delete isoData;
174
175        ctx->jsContext.Reset();
176    }
177
178    ctx->isolate->Dispose();
179
180}
181
182static Local<String> createUtf8String(Isolate *isolate, const char *str)
183{
184    return String::NewFromUtf8(isolate, str,
185        NewStringType::kNormal).ToLocalChecked();
186}
187
188static Local<String> createUtf8String(Isolate *isolate, const char *str,
189                                      size_t len)
190{
191    return String::NewFromUtf8(isolate, str,
192        NewStringType::kNormal, len).ToLocalChecked();
193}
194
195static void doInitContext(mapreduce_ctx_t *ctx)
196{
197    Isolate::CreateParams createParams;
198    createParams.array_buffer_allocator =
199      ArrayBuffer::Allocator::NewDefaultAllocator();
200    ctx->isolate = Isolate::New(createParams);
201    Locker locker(ctx->isolate);
202    Isolate::Scope isolate_scope(ctx->isolate);
203
204    HandleScope handle_scope(ctx->isolate);
205    ctx->jsContext.Reset(ctx->isolate, createJsContext());
206    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
207    Context::Scope context_scope(context);
208    Local<String> jsonString = createUtf8String(ctx->isolate, "JSON");
209    Handle<Object> jsonObject =
210        Local<Object>::Cast(context->Global()->Get(jsonString));
211
212    Local<String> parseString = createUtf8String(ctx->isolate, "parse");
213    Handle<Function> parseFun =
214        Local<Function>::Cast(jsonObject->Get(parseString));
215    Local<String> stringifyString = createUtf8String(ctx->isolate, "stringify");
216    Handle<Function> stringifyFun =
217        Local<Function>::Cast(jsonObject->Get(stringifyString));
218
219    isolate_data_t *isoData = new isolate_data_t();
220    isoData->jsonObject.Reset(ctx->isolate, jsonObject);
221    isoData->jsonParseFun.Reset(ctx->isolate, parseFun);
222    isoData->stringifyFun.Reset(ctx->isolate, stringifyFun);
223    isoData->ctx = ctx;
224
225    ctx->isolate->SetData(0, (void *)isoData);
226    ctx->taskStartTime = -1;
227}
228
229
230static Local<Context> createJsContext()
231{
232    Isolate *isolate = Isolate::GetCurrent();
233    EscapableHandleScope handle_scope(isolate);
234
235    Handle<ObjectTemplate> global = ObjectTemplate::New();
236    global->Set(createUtf8String(isolate, "emit"),
237            FunctionTemplate::New(isolate, emit));
238
239    Handle<Context> context = Context::New(isolate, NULL, global);
240    Context::Scope context_scope(context);
241
242    Handle<Function> sumFun = compileFunction(SUM_FUNCTION_STRING);
243    context->Global()->Set(createUtf8String(isolate, "sum"), sumFun);
244
245    Handle<Function> decodeBase64Fun =
246        compileFunction(BASE64_FUNCTION_STRING);
247    context->Global()->Set(createUtf8String(isolate, "decodeBase64"),
248        decodeBase64Fun);
249
250    Handle<Function> dateToArrayFun =
251        compileFunction(DATE_FUNCTION_STRING);
252    context->Global()->Set(createUtf8String(isolate, "dateToArray"),
253                           dateToArrayFun);
254
255    // Use EscapableHandleScope and return using .Escape
256    // This will ensure that return values are not garbage collected
257    // as soon as the function returns.
258    return handle_scope.Escape(context);
259}
260
261
262void mapDoc(mapreduce_ctx_t *ctx,
263            const mapreduce_json_t &doc,
264            const mapreduce_json_t &meta,
265            mapreduce_map_result_list_t *results)
266{
267    Locker locker(ctx->isolate);
268    Isolate::Scope isolate_scope(ctx->isolate);
269    HandleScope handle_scope(ctx->isolate);
270    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
271    Context::Scope context_scope(context);
272    Handle<Value> docObject = jsonParse(doc);
273    Handle<Value> metaObject = jsonParse(meta);
274
275
276    if (!metaObject->IsObject()) {
277        throw MapReduceError(MAPREDUCE_INVALID_ARG,
278                "metadata is not a JSON object");
279    }
280
281    Handle<Value> funArgs[] = { docObject, metaObject };
282
283    taskStarted(ctx);
284    kv_list_int_t kvs;
285    ctx->kvs = &kvs;
286
287    for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
288        mapreduce_map_result_t mapResult;
289        Local<Function> fun =
290            Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
291        TryCatch try_catch(ctx->isolate);
292        Handle<Value> result = fun->Call(context->Global(), 2, funArgs);
293
294        if (!result.IsEmpty()) {
295            mapResult.error = MAPREDUCE_SUCCESS;
296            mapResult.result.kvs.length = kvs.size();
297            size_t sz = sizeof(mapreduce_kv_t) * mapResult.result.kvs.length;
298            mapResult.result.kvs.kvs = (mapreduce_kv_t *) cb_malloc(sz);
299            if (mapResult.result.kvs.kvs == NULL) {
300                freeKvListEntries(kvs);
301                throw std::bad_alloc();
302            }
303            kv_list_int_t::iterator it = kvs.begin();
304            for (int j = 0; it != kvs.end(); ++it, ++j) {
305                mapResult.result.kvs.kvs[j] = *it;
306            }
307        } else {
308            freeKvListEntries(kvs);
309
310            if (!try_catch.CanContinue()) {
311                throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
312            }
313
314            mapResult.error = MAPREDUCE_RUNTIME_ERROR;
315            std::string exceptString = exceptionString(try_catch);
316            size_t len = exceptString.length();
317
318            mapResult.result.error_msg = (char *) cb_malloc(len + 1);
319            if (mapResult.result.error_msg == NULL) {
320                throw std::bad_alloc();
321            }
322            memcpy(mapResult.result.error_msg, exceptString.data(), len);
323            mapResult.result.error_msg[len] = '\0';
324        }
325
326        results->list[i] = mapResult;
327        results->length += 1;
328        kvs.clear();
329    }
330
331    taskFinished(ctx);
332}
333
334
335json_results_list_t runReduce(mapreduce_ctx_t *ctx,
336                              const mapreduce_json_list_t &keys,
337                              const mapreduce_json_list_t &values)
338{
339    Locker locker(ctx->isolate);
340    Isolate::Scope isolateScope(ctx->isolate);
341    HandleScope handle_scope(ctx->isolate);
342    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
343    Context::Scope context_scope(context);
344    Handle<Array> keysArray = jsonListToJsArray(keys);
345    Handle<Array> valuesArray = jsonListToJsArray(values);
346    json_results_list_t results;
347
348    Handle<Value> args[] =
349        { keysArray, valuesArray, Boolean::New(ctx->isolate, false) };
350
351    taskStarted(ctx);
352
353    for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
354        Local<Function> fun =
355            Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
356        TryCatch try_catch(ctx->isolate);
357        Handle<Value> result = fun->Call(context->Global(), 3, args);
358
359        if (result.IsEmpty()) {
360            freeJsonListEntries(results);
361
362            if (!try_catch.CanContinue()) {
363                throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
364            }
365
366            throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
367                    exceptionString(try_catch));
368        }
369
370        try {
371            mapreduce_json_t jsonResult = jsonStringify(result);
372            results.push_back(jsonResult);
373        } catch(...) {
374            freeJsonListEntries(results);
375            throw;
376        }
377    }
378
379    taskFinished(ctx);
380
381    return results;
382}
383
384
385mapreduce_json_t runReduce(mapreduce_ctx_t *ctx,
386                           int reduceFunNum,
387                           const mapreduce_json_list_t &keys,
388                           const mapreduce_json_list_t &values)
389{
390    Locker locker(ctx->isolate);
391    Isolate::Scope isolateScope(ctx->isolate);
392    HandleScope handle_scope(ctx->isolate);
393    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
394    Context::Scope context_scope(context);
395
396    reduceFunNum -= 1;
397    if (reduceFunNum < 0 ||
398        static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
399        throw MapReduceError(MAPREDUCE_INVALID_ARG,
400                "invalid reduce function number");
401    }
402
403    Local<Function> fun =
404        Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
405    Handle<Array> keysArray = jsonListToJsArray(keys);
406    Handle<Array> valuesArray = jsonListToJsArray(values);
407    Handle<Value> args[] =
408        { keysArray, valuesArray, Boolean::New(ctx->isolate, false) };
409
410    taskStarted(ctx);
411
412    TryCatch try_catch(ctx->isolate);
413    Handle<Value> result = fun->Call(context->Global(), 3, args);
414
415    taskFinished(ctx);
416
417    if (result.IsEmpty()) {
418        if (!try_catch.CanContinue()) {
419            throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
420        }
421
422        throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
423                exceptionString(try_catch));
424    }
425
426    return jsonStringify(result);
427}
428
429
430mapreduce_json_t runRereduce(mapreduce_ctx_t *ctx,
431                             int reduceFunNum,
432                             const mapreduce_json_list_t &reductions)
433{
434    Locker locker(ctx->isolate);
435    Isolate::Scope isolateScope(ctx->isolate);
436    HandleScope handle_scope(ctx->isolate);
437    Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
438    Context::Scope context_scope(context);
439
440    reduceFunNum -= 1;
441    if (reduceFunNum < 0 ||
442        static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
443        throw MapReduceError(MAPREDUCE_INVALID_ARG,
444                "invalid reduce function number");
445    }
446
447    Local<Function> fun =
448        Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
449    Handle<Array> valuesArray = jsonListToJsArray(reductions);
450    Handle<Value> args[] =
451        { Null(ctx->isolate), valuesArray, Boolean::New(ctx->isolate, true) };
452
453    taskStarted(ctx);
454
455    TryCatch try_catch(ctx->isolate);
456    Handle<Value> result = fun->Call(context->Global(), 3, args);
457
458    taskFinished(ctx);
459
460    if (result.IsEmpty()) {
461        if (!try_catch.CanContinue()) {
462            throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
463        }
464
465        throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
466                exceptionString(try_catch));
467    }
468
469    return jsonStringify(result);
470}
471
472
473void terminateTask(mapreduce_ctx_t *ctx)
474{
475    V8::TerminateExecution(ctx->isolate);
476    ctx->taskStartTime = -1;
477}
478
479
480static void freeKvListEntries(kv_list_int_t &kvs)
481{
482    kv_list_int_t::iterator it = kvs.begin();
483
484    for ( ; it != kvs.end(); ++it) {
485        mapreduce_kv_t kv = *it;
486        cb_free(kv.key.json);
487        cb_free(kv.value.json);
488    }
489    kvs.clear();
490}
491
492
493static void freeJsonListEntries(json_results_list_t &list)
494{
495    json_results_list_t::iterator it = list.begin();
496
497    for ( ; it != list.end(); ++it) {
498        cb_free((*it).json);
499    }
500    list.clear();
501}
502
503
504static Handle<Function> compileFunction(const std::string &funSource)
505{
506    Isolate *isolate = Isolate::GetCurrent();
507    Local<Context> context(isolate->GetCurrentContext());
508    EscapableHandleScope handle_scope(isolate);
509    TryCatch try_catch(isolate);
510    Local<String> source = createUtf8String(isolate, funSource.data(),
511        funSource.length());
512    Local<Script> script;
513    if (!Script::Compile(context, source).ToLocal(&script)) {
514        throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
515                exceptionString(try_catch));
516    }
517
518    if (script.IsEmpty()) {
519        throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
520                exceptionString(try_catch));
521    }
522
523    Handle<Value> result = script->Run();
524
525    if (result.IsEmpty()) {
526        throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
527                exceptionString(try_catch));
528    }
529
530    if (!result->IsFunction()) {
531        throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
532                std::string("Invalid function: ") + funSource.c_str());
533    }
534
535    // Use EscapableHandleScope and return using .Escape
536    // This will ensure that return values are not garbage collected
537    // as soon as the function returns.
538    return handle_scope.Escape(Handle<Function>::Cast(result));
539}
540
541
542static std::string exceptionString(const TryCatch &tryCatch)
543{
544    HandleScope handle_scope(Isolate::GetCurrent());
545    String::Utf8Value exception(tryCatch.Exception());
546    const char *exceptionString = (*exception);
547
548    if (exceptionString) {
549        Handle<Message> message = tryCatch.Message();
550        return std::string(exceptionString) + " (line " +
551            std::to_string(message->GetLineNumber()) + ":" +
552            std::to_string(message->GetStartColumn()) + ")";
553    }
554
555    return std::string("runtime error");
556}
557
558
559static void loadFunctions(mapreduce_ctx_t *ctx,
560                          const std::list<std::string> &function_sources)
561{
562    HandleScope handle_scope(ctx->isolate);
563
564    ctx->functions = new function_vector_t();
565
566    std::list<std::string>::const_iterator it = function_sources.begin();
567
568    for ( ; it != function_sources.end(); ++it) {
569        Handle<Function> fun = compileFunction(*it);
570        Persistent<Function> *perFn = new Persistent<Function>();
571        perFn->Reset(ctx->isolate, fun);
572        ctx->functions->push_back(perFn);
573    }
574}
575
576
577static void emit(const FunctionCallbackInfo<Value> &args)
578{
579    isolate_data_t *isoData = getIsolateData();
580
581    if (isoData->ctx->kvs == NULL) {
582        return;
583    }
584
585    try {
586        mapreduce_kv_t result;
587
588        result.key   = jsonStringify(args[0]);
589        result.value = jsonStringify(args[1]);
590        isoData->ctx->kvs->push_back(result);
591
592        return;
593    } catch(Local<String> &ex) {
594        isoData->ctx->isolate->ThrowException(ex);
595    }
596}
597
598
599static inline isolate_data_t *getIsolateData()
600{
601    Isolate *isolate = Isolate::GetCurrent();
602    return reinterpret_cast<isolate_data_t*>(isolate->GetData(0));
603}
604
605
606static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj)
607{
608    isolate_data_t *isoData = getIsolateData();
609    Handle<Value> args[] = { obj };
610    TryCatch try_catch(isoData->ctx->isolate);
611    Local<Function> stringifyFun =
612        Local<Function>::New(isoData->ctx->isolate, isoData->stringifyFun);
613    Local<Object> jsonObject =
614        Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
615    Handle<Value> result = stringifyFun->Call(jsonObject, 1, args);
616
617    if (result.IsEmpty()) {
618        throw try_catch.Exception();
619    }
620
621    mapreduce_json_t jsonResult;
622
623    if (!result->IsUndefined()) {
624        Handle<String> str = Handle<String>::Cast(result);
625        jsonResult.length = str->Utf8Length();
626        jsonResult.json = (char *) cb_malloc(jsonResult.length);
627        if (jsonResult.json == NULL) {
628            throw std::bad_alloc();
629        }
630        str->WriteUtf8(jsonResult.json, jsonResult.length,
631                       NULL, String::NO_NULL_TERMINATION);
632    } else {
633        jsonResult.length = sizeof("null") - 1;
634        jsonResult.json = (char *) cb_malloc(jsonResult.length);
635        if (jsonResult.json == NULL) {
636            throw std::bad_alloc();
637        }
638        memcpy(jsonResult.json, "null", jsonResult.length);
639    }
640
641    // Caller responsible for freeing jsonResult.json
642    return jsonResult;
643}
644
645
646static inline Handle<Value> jsonParse(const mapreduce_json_t &thing)
647{
648    isolate_data_t *isoData = getIsolateData();
649    Handle<Value> args[] =
650        { createUtf8String(isoData->ctx->isolate, thing.json,
651            thing.length) };
652    TryCatch try_catch(isoData->ctx->isolate);
653    Local<Function> jsonParseFun =
654        Local<Function>::New(isoData->ctx->isolate, isoData->jsonParseFun);
655    Local<Object> jsonObject =
656        Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
657    Handle<Value> result = jsonParseFun->Call(jsonObject, 1, args);
658
659    if (result.IsEmpty()) {
660        throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
661                exceptionString(try_catch));
662    }
663
664    return result;
665}
666
667
668static inline void taskStarted(mapreduce_ctx_t *ctx)
669{
670    ctx->taskStartTime = time(NULL);
671    ctx->kvs = NULL;
672}
673
674
675static inline void taskFinished(mapreduce_ctx_t *ctx)
676{
677    ctx->exitMutex.lock();
678    ctx->taskStartTime = -1;
679    ctx->exitMutex.unlock();
680}
681
682
683static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list)
684{
685    Isolate *isolate = Isolate::GetCurrent();
686    Handle<Array> array = Array::New(isolate, list.length);
687
688    for (int i = 0 ; i < list.length; ++i) {
689        Handle<Value> v = jsonParse(list.values[i]);
690        array->Set(Number::New(isolate, i), v);
691    }
692
693    return array;
694}
695