1 /**
2  * @copyright 2013 Couchbase, Inc.
3  *
4  * @author Filipe Manana  <filipe@couchbase.com>
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7  * use this file except in compliance with the License. You may obtain a copy of
8  * the License at
9  *
10  *  http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15  * License for the specific language governing permissions and limitations under
16  * the License.
17  **/
18 
19 #include "mapreduce.h"
20 #include "mapreduce_internal.h"
21 #include <iostream>
22 #include <cstring>
23 #include <platform/cb_malloc.h>
24 #include <stdlib.h>
25 // This is libv8_libplatform library which handles garbage collection for v8
26 #include <libplatform/libplatform.h>
27 
28 using namespace v8;
29 
30 typedef struct {
31     Persistent<Object>    jsonObject;
32     Persistent<Function>  jsonParseFun;
33     Persistent<Function>  stringifyFun;
34     mapreduce_ctx_t       *ctx;
35 } isolate_data_t;
36 
37 
38 static const char *SUM_FUNCTION_STRING =
39     "(function(values) {"
40     "    var sum = 0;"
41     "    for (var i = 0; i < values.length; ++i) {"
42     "        sum += values[i];"
43     "    }"
44     "    return sum;"
45     "})";
46 
47 static const char *DATE_FUNCTION_STRING =
48     // I wish it was on the prototype, but that will require bigger
49     // C changes as adding to the date prototype should be done on
50     // process launch. The code you see here may be faster, but it
51     // is less JavaScripty.
52     // "Date.prototype.toArray = (function() {"
53     "(function(date) {"
54     "    date = date.getUTCDate ? date : new Date(date);"
55     "    return isFinite(date.valueOf()) ?"
56     "      [date.getUTCFullYear(),"
57     "      (date.getUTCMonth() + 1),"
58     "       date.getUTCDate(),"
59     "       date.getUTCHours(),"
60     "       date.getUTCMinutes(),"
61     "       date.getUTCSeconds()] : null;"
62     "})";
63 
64 static const char *BASE64_FUNCTION_STRING =
65     "(function(b64) {"
66     "    var i, j, l, tmp, scratch, arr = [];"
67     "    var lookup = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';"
68     "    if (typeof b64 !== 'string') {"
69     "        throw 'Input is not a string';"
70     "    }"
71     "    if (b64.length % 4 > 0) {"
72     "        throw 'Invalid base64 source.';"
73     "    }"
74     "    scratch = b64.indexOf('=');"
75     "    scratch = scratch > 0 ? b64.length - scratch : 0;"
76     "    l = scratch > 0 ? b64.length - 4 : b64.length;"
77     "    for (i = 0, j = 0; i < l; i += 4, j += 3) {"
78     "        tmp = (lookup.indexOf(b64[i]) << 18) | (lookup.indexOf(b64[i + 1]) << 12);"
79     "        tmp |= (lookup.indexOf(b64[i + 2]) << 6) | lookup.indexOf(b64[i + 3]);"
80     "        arr.push((tmp & 0xFF0000) >> 16);"
81     "        arr.push((tmp & 0xFF00) >> 8);"
82     "        arr.push(tmp & 0xFF);"
83     "    }"
84     "    if (scratch === 2) {"
85     "        tmp = (lookup.indexOf(b64[i]) << 2) | (lookup.indexOf(b64[i + 1]) >> 4);"
86     "        arr.push(tmp & 0xFF);"
87     "    } else if (scratch === 1) {"
88     "        tmp = (lookup.indexOf(b64[i]) << 10) | (lookup.indexOf(b64[i + 1]) << 4);"
89     "        tmp |= (lookup.indexOf(b64[i + 2]) >> 2);"
90     "        arr.push((tmp >> 8) & 0xFF);"
91     "        arr.push(tmp & 0xFF);"
92     "    }"
93     "    return arr;"
94     "})";
95 
96 
97 
98 static Local<Context> createJsContext();
99 static void emit(const FunctionCallbackInfo<Value> &args);
100 
101 static void doInitContext(mapreduce_ctx_t *ctx);
102 static Handle<Function> compileFunction(const std::string &function);
103 static std::string exceptionString(const TryCatch &tryCatch);
104 static void loadFunctions(mapreduce_ctx_t *ctx,
105                           const std::list<std::string> &function_sources);
106 static inline isolate_data_t *getIsolateData();
107 static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj);
108 static inline Handle<Value> jsonParse(const mapreduce_json_t &thing);
109 static inline void taskStarted(mapreduce_ctx_t *ctx);
110 static inline void taskFinished(mapreduce_ctx_t *ctx);
111 static void freeKvListEntries(kv_list_int_t &kvs);
112 static void freeJsonListEntries(json_results_list_t &list);
113 static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list);
114 
115 static Platform *v8platform;
initV8()116 void initV8()
117 {
118     V8::InitializeICU();
119     v8platform = platform::CreateDefaultPlatform();
120     V8::InitializePlatform(v8platform);
121     V8::Initialize();
122 }
123 
deinitV8()124 void deinitV8()
125 {
126     V8::Dispose();
127     V8::ShutdownPlatform();
128     delete v8platform;
129 }
130 
initContext(mapreduce_ctx_t *ctx, const std::list<std::string> &function_sources)131 void initContext(mapreduce_ctx_t *ctx,
132                  const std::list<std::string> &function_sources)
133 {
134     doInitContext(ctx);
135 
136     try {
137         Locker locker(ctx->isolate);
138         Isolate::Scope isolate_scope(ctx->isolate);
139         HandleScope handle_scope(ctx->isolate);
140         Local<Context> context =
141             Local<Context>::New(ctx->isolate, ctx->jsContext);
142         Context::Scope context_scope(context);
143 
144         loadFunctions(ctx, function_sources);
145     } catch (...) {
146         destroyContext(ctx);
147         throw;
148     }
149 }
150 
151 
destroyContext(mapreduce_ctx_t *ctx)152 void destroyContext(mapreduce_ctx_t *ctx)
153 {
154     {
155         Locker locker(ctx->isolate);
156         Isolate::Scope isolate_scope(ctx->isolate);
157         HandleScope handle_scope(ctx->isolate);
158         Local<Context> context =
159             Local<Context>::New(ctx->isolate, ctx->jsContext);
160         Context::Scope context_scope(context);
161 
162         for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
163             (*ctx->functions)[i]->Reset();
164             delete (*ctx->functions)[i];
165 
166         }
167         delete ctx->functions;
168 
169         isolate_data_t *isoData = getIsolateData();
170         isoData->jsonObject.Reset();
171         isoData->jsonParseFun.Reset();
172         isoData->stringifyFun.Reset();
173         delete isoData;
174 
175         ctx->jsContext.Reset();
176     }
177 
178     ctx->isolate->Dispose();
179     delete ctx->bufAllocator;
180 }
181 
createUtf8String(Isolate *isolate, const char *str)182 static Local<String> createUtf8String(Isolate *isolate, const char *str)
183 {
184     return String::NewFromUtf8(isolate, str,
185         NewStringType::kNormal).ToLocalChecked();
186 }
187 
createUtf8String(Isolate *isolate, const char *str, size_t len)188 static Local<String> createUtf8String(Isolate *isolate, const char *str,
189                                       size_t len)
190 {
191     return String::NewFromUtf8(isolate, str,
192         NewStringType::kNormal, len).ToLocalChecked();
193 }
194 
doInitContext(mapreduce_ctx_t *ctx)195 static void doInitContext(mapreduce_ctx_t *ctx)
196 {
197     ctx->bufAllocator = ArrayBuffer::Allocator::NewDefaultAllocator();
198     Isolate::CreateParams createParams;
199     createParams.array_buffer_allocator = ctx->bufAllocator;
200     ctx->isolate = Isolate::New(createParams);
201     Locker locker(ctx->isolate);
202     Isolate::Scope isolate_scope(ctx->isolate);
203 
204     HandleScope handle_scope(ctx->isolate);
205     ctx->jsContext.Reset(ctx->isolate, createJsContext());
206     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
207     Context::Scope context_scope(context);
208     Local<String> jsonString = createUtf8String(ctx->isolate, "JSON");
209     Handle<Object> jsonObject =
210         Local<Object>::Cast(context->Global()->Get(jsonString));
211 
212     Local<String> parseString = createUtf8String(ctx->isolate, "parse");
213     Handle<Function> parseFun =
214         Local<Function>::Cast(jsonObject->Get(parseString));
215     Local<String> stringifyString = createUtf8String(ctx->isolate, "stringify");
216     Handle<Function> stringifyFun =
217         Local<Function>::Cast(jsonObject->Get(stringifyString));
218 
219     isolate_data_t *isoData = new isolate_data_t();
220     isoData->jsonObject.Reset(ctx->isolate, jsonObject);
221     isoData->jsonParseFun.Reset(ctx->isolate, parseFun);
222     isoData->stringifyFun.Reset(ctx->isolate, stringifyFun);
223     isoData->ctx = ctx;
224 
225     ctx->isolate->SetData(0, (void *)isoData);
226     ctx->taskStartTime = -1;
227 }
228 
229 
createJsContext()230 static Local<Context> createJsContext()
231 {
232     Isolate *isolate = Isolate::GetCurrent();
233     EscapableHandleScope handle_scope(isolate);
234 
235     Handle<ObjectTemplate> global = ObjectTemplate::New();
236     global->Set(createUtf8String(isolate, "emit"),
237             FunctionTemplate::New(isolate, emit));
238 
239     Handle<Context> context = Context::New(isolate, NULL, global);
240     Context::Scope context_scope(context);
241 
242     Handle<Function> sumFun = compileFunction(SUM_FUNCTION_STRING);
243     context->Global()->Set(createUtf8String(isolate, "sum"), sumFun);
244 
245     Handle<Function> decodeBase64Fun =
246         compileFunction(BASE64_FUNCTION_STRING);
247     context->Global()->Set(createUtf8String(isolate, "decodeBase64"),
248         decodeBase64Fun);
249 
250     Handle<Function> dateToArrayFun =
251         compileFunction(DATE_FUNCTION_STRING);
252     context->Global()->Set(createUtf8String(isolate, "dateToArray"),
253                            dateToArrayFun);
254 
255     // Use EscapableHandleScope and return using .Escape
256     // This will ensure that return values are not garbage collected
257     // as soon as the function returns.
258     return handle_scope.Escape(context);
259 }
260 
261 
mapDoc(mapreduce_ctx_t *ctx, const mapreduce_json_t &doc, const mapreduce_json_t &meta, mapreduce_map_result_list_t *results)262 void mapDoc(mapreduce_ctx_t *ctx,
263             const mapreduce_json_t &doc,
264             const mapreduce_json_t &meta,
265             mapreduce_map_result_list_t *results)
266 {
267     Locker locker(ctx->isolate);
268     Isolate::Scope isolate_scope(ctx->isolate);
269     HandleScope handle_scope(ctx->isolate);
270     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
271     Context::Scope context_scope(context);
272     Handle<Value> docObject = jsonParse(doc);
273     Handle<Value> metaObject = jsonParse(meta);
274 
275 
276     if (!metaObject->IsObject()) {
277         throw MapReduceError(MAPREDUCE_INVALID_ARG,
278                 "metadata is not a JSON object");
279     }
280 
281     Handle<Value> funArgs[] = { docObject, metaObject };
282 
283     taskStarted(ctx);
284     kv_list_int_t kvs;
285     ctx->kvs = &kvs;
286 
287     for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
288         mapreduce_map_result_t mapResult;
289         Local<Function> fun =
290             Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
291         TryCatch try_catch(ctx->isolate);
292         Handle<Value> result = fun->Call(context->Global(), 2, funArgs);
293 
294         if (!result.IsEmpty()) {
295             mapResult.error = MAPREDUCE_SUCCESS;
296             mapResult.result.kvs.length = kvs.size();
297             size_t sz = sizeof(mapreduce_kv_t) * mapResult.result.kvs.length;
298             mapResult.result.kvs.kvs = (mapreduce_kv_t *) cb_malloc(sz);
299             if (mapResult.result.kvs.kvs == NULL) {
300                 freeKvListEntries(kvs);
301                 throw std::bad_alloc();
302             }
303             kv_list_int_t::iterator it = kvs.begin();
304             for (int j = 0; it != kvs.end(); ++it, ++j) {
305                 mapResult.result.kvs.kvs[j] = *it;
306             }
307         } else {
308             freeKvListEntries(kvs);
309 
310             if (!try_catch.CanContinue()) {
311                 throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
312             }
313 
314             mapResult.error = MAPREDUCE_RUNTIME_ERROR;
315             std::string exceptString = exceptionString(try_catch);
316             size_t len = exceptString.length();
317 
318             mapResult.result.error_msg = (char *) cb_malloc(len + 1);
319             if (mapResult.result.error_msg == NULL) {
320                 throw std::bad_alloc();
321             }
322             memcpy(mapResult.result.error_msg, exceptString.data(), len);
323             mapResult.result.error_msg[len] = '\0';
324         }
325 
326         results->list[i] = mapResult;
327         results->length += 1;
328         kvs.clear();
329     }
330 
331     taskFinished(ctx);
332 }
333 
334 
runReduce(mapreduce_ctx_t *ctx, const mapreduce_json_list_t &keys, const mapreduce_json_list_t &values)335 json_results_list_t runReduce(mapreduce_ctx_t *ctx,
336                               const mapreduce_json_list_t &keys,
337                               const mapreduce_json_list_t &values)
338 {
339     Locker locker(ctx->isolate);
340     Isolate::Scope isolateScope(ctx->isolate);
341     HandleScope handle_scope(ctx->isolate);
342     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
343     Context::Scope context_scope(context);
344     Handle<Array> keysArray = jsonListToJsArray(keys);
345     Handle<Array> valuesArray = jsonListToJsArray(values);
346     json_results_list_t results;
347 
348     Handle<Value> args[] =
349         { keysArray, valuesArray, Boolean::New(ctx->isolate, false) };
350 
351     taskStarted(ctx);
352 
353     for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
354         Local<Function> fun =
355             Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
356         TryCatch try_catch(ctx->isolate);
357         Handle<Value> result = fun->Call(context->Global(), 3, args);
358 
359         if (result.IsEmpty()) {
360             freeJsonListEntries(results);
361 
362             if (!try_catch.CanContinue()) {
363                 throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
364             }
365 
366             throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
367                     exceptionString(try_catch));
368         }
369 
370         try {
371             mapreduce_json_t jsonResult = jsonStringify(result);
372             results.push_back(jsonResult);
373         } catch(...) {
374             freeJsonListEntries(results);
375             throw;
376         }
377     }
378 
379     taskFinished(ctx);
380 
381     return results;
382 }
383 
384 
runReduce(mapreduce_ctx_t *ctx, int reduceFunNum, const mapreduce_json_list_t &keys, const mapreduce_json_list_t &values)385 mapreduce_json_t runReduce(mapreduce_ctx_t *ctx,
386                            int reduceFunNum,
387                            const mapreduce_json_list_t &keys,
388                            const mapreduce_json_list_t &values)
389 {
390     Locker locker(ctx->isolate);
391     Isolate::Scope isolateScope(ctx->isolate);
392     HandleScope handle_scope(ctx->isolate);
393     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
394     Context::Scope context_scope(context);
395 
396     reduceFunNum -= 1;
397     if (reduceFunNum < 0 ||
398         static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
399         throw MapReduceError(MAPREDUCE_INVALID_ARG,
400                 "invalid reduce function number");
401     }
402 
403     Local<Function> fun =
404         Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
405     Handle<Array> keysArray = jsonListToJsArray(keys);
406     Handle<Array> valuesArray = jsonListToJsArray(values);
407     Handle<Value> args[] =
408         { keysArray, valuesArray, Boolean::New(ctx->isolate, false) };
409 
410     taskStarted(ctx);
411 
412     TryCatch try_catch(ctx->isolate);
413     Handle<Value> result = fun->Call(context->Global(), 3, args);
414 
415     taskFinished(ctx);
416 
417     if (result.IsEmpty()) {
418         if (!try_catch.CanContinue()) {
419             throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
420         }
421 
422         throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
423                 exceptionString(try_catch));
424     }
425 
426     return jsonStringify(result);
427 }
428 
429 
runRereduce(mapreduce_ctx_t *ctx, int reduceFunNum, const mapreduce_json_list_t &reductions)430 mapreduce_json_t runRereduce(mapreduce_ctx_t *ctx,
431                              int reduceFunNum,
432                              const mapreduce_json_list_t &reductions)
433 {
434     Locker locker(ctx->isolate);
435     Isolate::Scope isolateScope(ctx->isolate);
436     HandleScope handle_scope(ctx->isolate);
437     Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
438     Context::Scope context_scope(context);
439 
440     reduceFunNum -= 1;
441     if (reduceFunNum < 0 ||
442         static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
443         throw MapReduceError(MAPREDUCE_INVALID_ARG,
444                 "invalid reduce function number");
445     }
446 
447     Local<Function> fun =
448         Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
449     Handle<Array> valuesArray = jsonListToJsArray(reductions);
450     Handle<Value> args[] =
451         { Null(ctx->isolate), valuesArray, Boolean::New(ctx->isolate, true) };
452 
453     taskStarted(ctx);
454 
455     TryCatch try_catch(ctx->isolate);
456     Handle<Value> result = fun->Call(context->Global(), 3, args);
457 
458     taskFinished(ctx);
459 
460     if (result.IsEmpty()) {
461         if (!try_catch.CanContinue()) {
462             throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
463         }
464 
465         throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
466                 exceptionString(try_catch));
467     }
468 
469     return jsonStringify(result);
470 }
471 
472 
terminateTask(mapreduce_ctx_t *ctx)473 void terminateTask(mapreduce_ctx_t *ctx)
474 {
475     V8::TerminateExecution(ctx->isolate);
476     ctx->taskStartTime = -1;
477 }
478 
479 
freeKvListEntries(kv_list_int_t &kvs)480 static void freeKvListEntries(kv_list_int_t &kvs)
481 {
482     kv_list_int_t::iterator it = kvs.begin();
483 
484     for ( ; it != kvs.end(); ++it) {
485         mapreduce_kv_t kv = *it;
486         cb_free(kv.key.json);
487         cb_free(kv.value.json);
488     }
489     kvs.clear();
490 }
491 
492 
freeJsonListEntries(json_results_list_t &list)493 static void freeJsonListEntries(json_results_list_t &list)
494 {
495     json_results_list_t::iterator it = list.begin();
496 
497     for ( ; it != list.end(); ++it) {
498         cb_free((*it).json);
499     }
500     list.clear();
501 }
502 
503 
compileFunction(const std::string &funSource)504 static Handle<Function> compileFunction(const std::string &funSource)
505 {
506     Isolate *isolate = Isolate::GetCurrent();
507     Local<Context> context(isolate->GetCurrentContext());
508     EscapableHandleScope handle_scope(isolate);
509     TryCatch try_catch(isolate);
510     Local<String> source = createUtf8String(isolate, funSource.data(),
511         funSource.length());
512     Local<Script> script;
513     if (!Script::Compile(context, source).ToLocal(&script)) {
514         throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
515                 exceptionString(try_catch));
516     }
517 
518     if (script.IsEmpty()) {
519         throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
520                 exceptionString(try_catch));
521     }
522 
523     Handle<Value> result = script->Run();
524 
525     if (result.IsEmpty()) {
526         throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
527                 exceptionString(try_catch));
528     }
529 
530     if (!result->IsFunction()) {
531         throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
532                 std::string("Invalid function: ") + funSource.c_str());
533     }
534 
535     // Use EscapableHandleScope and return using .Escape
536     // This will ensure that return values are not garbage collected
537     // as soon as the function returns.
538     return handle_scope.Escape(Handle<Function>::Cast(result));
539 }
540 
541 
exceptionString(const TryCatch &tryCatch)542 static std::string exceptionString(const TryCatch &tryCatch)
543 {
544     HandleScope handle_scope(Isolate::GetCurrent());
545     String::Utf8Value exception(tryCatch.Exception());
546     const char *exceptionString = (*exception);
547 
548     if (exceptionString) {
549         Handle<Message> message = tryCatch.Message();
550         return std::string(exceptionString) + " (line " +
551             std::to_string(message->GetLineNumber()) + ":" +
552             std::to_string(message->GetStartColumn()) + ")";
553     }
554 
555     return std::string("runtime error");
556 }
557 
558 
loadFunctions(mapreduce_ctx_t *ctx, const std::list<std::string> &function_sources)559 static void loadFunctions(mapreduce_ctx_t *ctx,
560                           const std::list<std::string> &function_sources)
561 {
562     HandleScope handle_scope(ctx->isolate);
563 
564     ctx->functions = new function_vector_t();
565 
566     std::list<std::string>::const_iterator it = function_sources.begin();
567 
568     for ( ; it != function_sources.end(); ++it) {
569         Handle<Function> fun = compileFunction(*it);
570         Persistent<Function> *perFn = new Persistent<Function>();
571         perFn->Reset(ctx->isolate, fun);
572         ctx->functions->push_back(perFn);
573     }
574 }
575 
576 
emit(const FunctionCallbackInfo<Value> &args)577 static void emit(const FunctionCallbackInfo<Value> &args)
578 {
579     isolate_data_t *isoData = getIsolateData();
580 
581     if (isoData->ctx->kvs == NULL) {
582         return;
583     }
584 
585     try {
586         mapreduce_kv_t result;
587 
588         result.key   = jsonStringify(args[0]);
589         result.value = jsonStringify(args[1]);
590         isoData->ctx->kvs->push_back(result);
591 
592         return;
593     } catch(Local<String> &ex) {
594         isoData->ctx->isolate->ThrowException(ex);
595     }
596 }
597 
598 
getIsolateData()599 static inline isolate_data_t *getIsolateData()
600 {
601     Isolate *isolate = Isolate::GetCurrent();
602     return reinterpret_cast<isolate_data_t*>(isolate->GetData(0));
603 }
604 
605 
jsonStringify(const Handle<Value> &obj)606 static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj)
607 {
608     isolate_data_t *isoData = getIsolateData();
609     Handle<Value> args[] = { obj };
610     TryCatch try_catch(isoData->ctx->isolate);
611     Local<Function> stringifyFun =
612         Local<Function>::New(isoData->ctx->isolate, isoData->stringifyFun);
613     Local<Object> jsonObject =
614         Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
615     Handle<Value> result = stringifyFun->Call(jsonObject, 1, args);
616 
617     if (result.IsEmpty()) {
618         throw try_catch.Exception();
619     }
620 
621     mapreduce_json_t jsonResult;
622 
623     if (!result->IsUndefined()) {
624         Handle<String> str = Handle<String>::Cast(result);
625         jsonResult.length = str->Utf8Length();
626         jsonResult.json = (char *) cb_malloc(jsonResult.length);
627         if (jsonResult.json == NULL) {
628             throw std::bad_alloc();
629         }
630         str->WriteUtf8(jsonResult.json, jsonResult.length,
631                        NULL, String::NO_NULL_TERMINATION);
632     } else {
633         jsonResult.length = sizeof("null") - 1;
634         jsonResult.json = (char *) cb_malloc(jsonResult.length);
635         if (jsonResult.json == NULL) {
636             throw std::bad_alloc();
637         }
638         memcpy(jsonResult.json, "null", jsonResult.length);
639     }
640 
641     // Caller responsible for freeing jsonResult.json
642     return jsonResult;
643 }
644 
645 
jsonParse(const mapreduce_json_t &thing)646 static inline Handle<Value> jsonParse(const mapreduce_json_t &thing)
647 {
648     isolate_data_t *isoData = getIsolateData();
649     Handle<Value> args[] =
650         { createUtf8String(isoData->ctx->isolate, thing.json,
651             thing.length) };
652     TryCatch try_catch(isoData->ctx->isolate);
653     Local<Function> jsonParseFun =
654         Local<Function>::New(isoData->ctx->isolate, isoData->jsonParseFun);
655     Local<Object> jsonObject =
656         Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
657     Handle<Value> result = jsonParseFun->Call(jsonObject, 1, args);
658 
659     if (result.IsEmpty()) {
660         throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
661                 exceptionString(try_catch));
662     }
663 
664     return result;
665 }
666 
667 
taskStarted(mapreduce_ctx_t *ctx)668 static inline void taskStarted(mapreduce_ctx_t *ctx)
669 {
670     ctx->taskStartTime = time(NULL);
671     ctx->kvs = NULL;
672 }
673 
674 
taskFinished(mapreduce_ctx_t *ctx)675 static inline void taskFinished(mapreduce_ctx_t *ctx)
676 {
677     ctx->exitMutex.lock();
678     ctx->taskStartTime = -1;
679     ctx->exitMutex.unlock();
680 }
681 
682 
jsonListToJsArray(const mapreduce_json_list_t &list)683 static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list)
684 {
685     Isolate *isolate = Isolate::GetCurrent();
686     Handle<Array> array = Array::New(isolate, list.length);
687 
688     for (int i = 0 ; i < list.length; ++i) {
689         Handle<Value> v = jsonParse(list.values[i]);
690         array->Set(Number::New(isolate, i), v);
691     }
692 
693     return array;
694 }
695