1 /**
2 * @copyright 2013 Couchbase, Inc.
3 *
4 * @author Filipe Manana <filipe@couchbase.com>
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7 * use this file except in compliance with the License. You may obtain a copy of
8 * the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 * License for the specific language governing permissions and limitations under
16 * the License.
17 **/
18
19 #include "mapreduce.h"
20 #include "mapreduce_internal.h"
21 #include <iostream>
22 #include <cstring>
23 #include <platform/cb_malloc.h>
24 #include <stdlib.h>
25 // This is libv8_libplatform library which handles garbage collection for v8
26 #include <libplatform/libplatform.h>
27
28 using namespace v8;
29
30 typedef struct {
31 Persistent<Object> jsonObject;
32 Persistent<Function> jsonParseFun;
33 Persistent<Function> stringifyFun;
34 mapreduce_ctx_t *ctx;
35 } isolate_data_t;
36
37
38 static const char *SUM_FUNCTION_STRING =
39 "(function(values) {"
40 " var sum = 0;"
41 " for (var i = 0; i < values.length; ++i) {"
42 " sum += values[i];"
43 " }"
44 " return sum;"
45 "})";
46
47 static const char *DATE_FUNCTION_STRING =
48 // I wish it was on the prototype, but that will require bigger
49 // C changes as adding to the date prototype should be done on
50 // process launch. The code you see here may be faster, but it
51 // is less JavaScripty.
52 // "Date.prototype.toArray = (function() {"
53 "(function(date) {"
54 " date = date.getUTCDate ? date : new Date(date);"
55 " return isFinite(date.valueOf()) ?"
56 " [date.getUTCFullYear(),"
57 " (date.getUTCMonth() + 1),"
58 " date.getUTCDate(),"
59 " date.getUTCHours(),"
60 " date.getUTCMinutes(),"
61 " date.getUTCSeconds()] : null;"
62 "})";
63
64 static const char *BASE64_FUNCTION_STRING =
65 "(function(b64) {"
66 " var i, j, l, tmp, scratch, arr = [];"
67 " var lookup = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';"
68 " if (typeof b64 !== 'string') {"
69 " throw 'Input is not a string';"
70 " }"
71 " if (b64.length % 4 > 0) {"
72 " throw 'Invalid base64 source.';"
73 " }"
74 " scratch = b64.indexOf('=');"
75 " scratch = scratch > 0 ? b64.length - scratch : 0;"
76 " l = scratch > 0 ? b64.length - 4 : b64.length;"
77 " for (i = 0, j = 0; i < l; i += 4, j += 3) {"
78 " tmp = (lookup.indexOf(b64[i]) << 18) | (lookup.indexOf(b64[i + 1]) << 12);"
79 " tmp |= (lookup.indexOf(b64[i + 2]) << 6) | lookup.indexOf(b64[i + 3]);"
80 " arr.push((tmp & 0xFF0000) >> 16);"
81 " arr.push((tmp & 0xFF00) >> 8);"
82 " arr.push(tmp & 0xFF);"
83 " }"
84 " if (scratch === 2) {"
85 " tmp = (lookup.indexOf(b64[i]) << 2) | (lookup.indexOf(b64[i + 1]) >> 4);"
86 " arr.push(tmp & 0xFF);"
87 " } else if (scratch === 1) {"
88 " tmp = (lookup.indexOf(b64[i]) << 10) | (lookup.indexOf(b64[i + 1]) << 4);"
89 " tmp |= (lookup.indexOf(b64[i + 2]) >> 2);"
90 " arr.push((tmp >> 8) & 0xFF);"
91 " arr.push(tmp & 0xFF);"
92 " }"
93 " return arr;"
94 "})";
95
96
97
98 static Local<Context> createJsContext();
99 static void emit(const FunctionCallbackInfo<Value> &args);
100
101 static void doInitContext(mapreduce_ctx_t *ctx);
102 static Handle<Function> compileFunction(const std::string &function);
103 static std::string exceptionString(const TryCatch &tryCatch);
104 static void loadFunctions(mapreduce_ctx_t *ctx,
105 const std::list<std::string> &function_sources);
106 static inline isolate_data_t *getIsolateData();
107 static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj);
108 static inline Handle<Value> jsonParse(const mapreduce_json_t &thing);
109 static inline void taskStarted(mapreduce_ctx_t *ctx);
110 static inline void taskFinished(mapreduce_ctx_t *ctx);
111 static void freeKvListEntries(kv_list_int_t &kvs);
112 static void freeJsonListEntries(json_results_list_t &list);
113 static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list);
114
115 static Platform *v8platform;
initV8()116 void initV8()
117 {
118 V8::InitializeICU();
119 v8platform = platform::CreateDefaultPlatform();
120 V8::InitializePlatform(v8platform);
121 V8::Initialize();
122 }
123
deinitV8()124 void deinitV8()
125 {
126 V8::Dispose();
127 V8::ShutdownPlatform();
128 delete v8platform;
129 }
130
initContext(mapreduce_ctx_t *ctx, const std::list<std::string> &function_sources)131 void initContext(mapreduce_ctx_t *ctx,
132 const std::list<std::string> &function_sources)
133 {
134 doInitContext(ctx);
135
136 try {
137 Locker locker(ctx->isolate);
138 Isolate::Scope isolate_scope(ctx->isolate);
139 HandleScope handle_scope(ctx->isolate);
140 Local<Context> context =
141 Local<Context>::New(ctx->isolate, ctx->jsContext);
142 Context::Scope context_scope(context);
143
144 loadFunctions(ctx, function_sources);
145 } catch (...) {
146 destroyContext(ctx);
147 throw;
148 }
149 }
150
151
destroyContext(mapreduce_ctx_t *ctx)152 void destroyContext(mapreduce_ctx_t *ctx)
153 {
154 {
155 Locker locker(ctx->isolate);
156 Isolate::Scope isolate_scope(ctx->isolate);
157 HandleScope handle_scope(ctx->isolate);
158 Local<Context> context =
159 Local<Context>::New(ctx->isolate, ctx->jsContext);
160 Context::Scope context_scope(context);
161
162 for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
163 (*ctx->functions)[i]->Reset();
164 delete (*ctx->functions)[i];
165
166 }
167 delete ctx->functions;
168
169 isolate_data_t *isoData = getIsolateData();
170 isoData->jsonObject.Reset();
171 isoData->jsonParseFun.Reset();
172 isoData->stringifyFun.Reset();
173 delete isoData;
174
175 ctx->jsContext.Reset();
176 }
177
178 ctx->isolate->Dispose();
179 delete ctx->bufAllocator;
180 }
181
createUtf8String(Isolate *isolate, const char *str)182 static Local<String> createUtf8String(Isolate *isolate, const char *str)
183 {
184 return String::NewFromUtf8(isolate, str,
185 NewStringType::kNormal).ToLocalChecked();
186 }
187
createUtf8String(Isolate *isolate, const char *str, size_t len)188 static Local<String> createUtf8String(Isolate *isolate, const char *str,
189 size_t len)
190 {
191 return String::NewFromUtf8(isolate, str,
192 NewStringType::kNormal, len).ToLocalChecked();
193 }
194
doInitContext(mapreduce_ctx_t *ctx)195 static void doInitContext(mapreduce_ctx_t *ctx)
196 {
197 ctx->bufAllocator = ArrayBuffer::Allocator::NewDefaultAllocator();
198 Isolate::CreateParams createParams;
199 createParams.array_buffer_allocator = ctx->bufAllocator;
200 ctx->isolate = Isolate::New(createParams);
201 Locker locker(ctx->isolate);
202 Isolate::Scope isolate_scope(ctx->isolate);
203
204 HandleScope handle_scope(ctx->isolate);
205 ctx->jsContext.Reset(ctx->isolate, createJsContext());
206 Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
207 Context::Scope context_scope(context);
208 Local<String> jsonString = createUtf8String(ctx->isolate, "JSON");
209 Handle<Object> jsonObject =
210 Local<Object>::Cast(context->Global()->Get(jsonString));
211
212 Local<String> parseString = createUtf8String(ctx->isolate, "parse");
213 Handle<Function> parseFun =
214 Local<Function>::Cast(jsonObject->Get(parseString));
215 Local<String> stringifyString = createUtf8String(ctx->isolate, "stringify");
216 Handle<Function> stringifyFun =
217 Local<Function>::Cast(jsonObject->Get(stringifyString));
218
219 isolate_data_t *isoData = new isolate_data_t();
220 isoData->jsonObject.Reset(ctx->isolate, jsonObject);
221 isoData->jsonParseFun.Reset(ctx->isolate, parseFun);
222 isoData->stringifyFun.Reset(ctx->isolate, stringifyFun);
223 isoData->ctx = ctx;
224
225 ctx->isolate->SetData(0, (void *)isoData);
226 ctx->taskStartTime = -1;
227 }
228
229
createJsContext()230 static Local<Context> createJsContext()
231 {
232 Isolate *isolate = Isolate::GetCurrent();
233 EscapableHandleScope handle_scope(isolate);
234
235 Handle<ObjectTemplate> global = ObjectTemplate::New();
236 global->Set(createUtf8String(isolate, "emit"),
237 FunctionTemplate::New(isolate, emit));
238
239 Handle<Context> context = Context::New(isolate, NULL, global);
240 Context::Scope context_scope(context);
241
242 Handle<Function> sumFun = compileFunction(SUM_FUNCTION_STRING);
243 context->Global()->Set(createUtf8String(isolate, "sum"), sumFun);
244
245 Handle<Function> decodeBase64Fun =
246 compileFunction(BASE64_FUNCTION_STRING);
247 context->Global()->Set(createUtf8String(isolate, "decodeBase64"),
248 decodeBase64Fun);
249
250 Handle<Function> dateToArrayFun =
251 compileFunction(DATE_FUNCTION_STRING);
252 context->Global()->Set(createUtf8String(isolate, "dateToArray"),
253 dateToArrayFun);
254
255 // Use EscapableHandleScope and return using .Escape
256 // This will ensure that return values are not garbage collected
257 // as soon as the function returns.
258 return handle_scope.Escape(context);
259 }
260
261
mapDoc(mapreduce_ctx_t *ctx, const mapreduce_json_t &doc, const mapreduce_json_t &meta, mapreduce_map_result_list_t *results)262 void mapDoc(mapreduce_ctx_t *ctx,
263 const mapreduce_json_t &doc,
264 const mapreduce_json_t &meta,
265 mapreduce_map_result_list_t *results)
266 {
267 Locker locker(ctx->isolate);
268 Isolate::Scope isolate_scope(ctx->isolate);
269 HandleScope handle_scope(ctx->isolate);
270 Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
271 Context::Scope context_scope(context);
272 Handle<Value> docObject = jsonParse(doc);
273 Handle<Value> metaObject = jsonParse(meta);
274
275
276 if (!metaObject->IsObject()) {
277 throw MapReduceError(MAPREDUCE_INVALID_ARG,
278 "metadata is not a JSON object");
279 }
280
281 Handle<Value> funArgs[] = { docObject, metaObject };
282
283 taskStarted(ctx);
284 kv_list_int_t kvs;
285 ctx->kvs = &kvs;
286
287 for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
288 mapreduce_map_result_t mapResult;
289 Local<Function> fun =
290 Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
291 TryCatch try_catch(ctx->isolate);
292 Handle<Value> result = fun->Call(context->Global(), 2, funArgs);
293
294 if (!result.IsEmpty()) {
295 mapResult.error = MAPREDUCE_SUCCESS;
296 mapResult.result.kvs.length = kvs.size();
297 size_t sz = sizeof(mapreduce_kv_t) * mapResult.result.kvs.length;
298 mapResult.result.kvs.kvs = (mapreduce_kv_t *) cb_malloc(sz);
299 if (mapResult.result.kvs.kvs == NULL) {
300 freeKvListEntries(kvs);
301 throw std::bad_alloc();
302 }
303 kv_list_int_t::iterator it = kvs.begin();
304 for (int j = 0; it != kvs.end(); ++it, ++j) {
305 mapResult.result.kvs.kvs[j] = *it;
306 }
307 } else {
308 freeKvListEntries(kvs);
309
310 if (!try_catch.CanContinue()) {
311 throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
312 }
313
314 mapResult.error = MAPREDUCE_RUNTIME_ERROR;
315 std::string exceptString = exceptionString(try_catch);
316 size_t len = exceptString.length();
317
318 mapResult.result.error_msg = (char *) cb_malloc(len + 1);
319 if (mapResult.result.error_msg == NULL) {
320 throw std::bad_alloc();
321 }
322 memcpy(mapResult.result.error_msg, exceptString.data(), len);
323 mapResult.result.error_msg[len] = '\0';
324 }
325
326 results->list[i] = mapResult;
327 results->length += 1;
328 kvs.clear();
329 }
330
331 taskFinished(ctx);
332 }
333
334
runReduce(mapreduce_ctx_t *ctx, const mapreduce_json_list_t &keys, const mapreduce_json_list_t &values)335 json_results_list_t runReduce(mapreduce_ctx_t *ctx,
336 const mapreduce_json_list_t &keys,
337 const mapreduce_json_list_t &values)
338 {
339 Locker locker(ctx->isolate);
340 Isolate::Scope isolateScope(ctx->isolate);
341 HandleScope handle_scope(ctx->isolate);
342 Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
343 Context::Scope context_scope(context);
344 Handle<Array> keysArray = jsonListToJsArray(keys);
345 Handle<Array> valuesArray = jsonListToJsArray(values);
346 json_results_list_t results;
347
348 Handle<Value> args[] =
349 { keysArray, valuesArray, Boolean::New(ctx->isolate, false) };
350
351 taskStarted(ctx);
352
353 for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
354 Local<Function> fun =
355 Local<Function>::New(ctx->isolate, *(*ctx->functions)[i]);
356 TryCatch try_catch(ctx->isolate);
357 Handle<Value> result = fun->Call(context->Global(), 3, args);
358
359 if (result.IsEmpty()) {
360 freeJsonListEntries(results);
361
362 if (!try_catch.CanContinue()) {
363 throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
364 }
365
366 throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
367 exceptionString(try_catch));
368 }
369
370 try {
371 mapreduce_json_t jsonResult = jsonStringify(result);
372 results.push_back(jsonResult);
373 } catch(...) {
374 freeJsonListEntries(results);
375 throw;
376 }
377 }
378
379 taskFinished(ctx);
380
381 return results;
382 }
383
384
runReduce(mapreduce_ctx_t *ctx, int reduceFunNum, const mapreduce_json_list_t &keys, const mapreduce_json_list_t &values)385 mapreduce_json_t runReduce(mapreduce_ctx_t *ctx,
386 int reduceFunNum,
387 const mapreduce_json_list_t &keys,
388 const mapreduce_json_list_t &values)
389 {
390 Locker locker(ctx->isolate);
391 Isolate::Scope isolateScope(ctx->isolate);
392 HandleScope handle_scope(ctx->isolate);
393 Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
394 Context::Scope context_scope(context);
395
396 reduceFunNum -= 1;
397 if (reduceFunNum < 0 ||
398 static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
399 throw MapReduceError(MAPREDUCE_INVALID_ARG,
400 "invalid reduce function number");
401 }
402
403 Local<Function> fun =
404 Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
405 Handle<Array> keysArray = jsonListToJsArray(keys);
406 Handle<Array> valuesArray = jsonListToJsArray(values);
407 Handle<Value> args[] =
408 { keysArray, valuesArray, Boolean::New(ctx->isolate, false) };
409
410 taskStarted(ctx);
411
412 TryCatch try_catch(ctx->isolate);
413 Handle<Value> result = fun->Call(context->Global(), 3, args);
414
415 taskFinished(ctx);
416
417 if (result.IsEmpty()) {
418 if (!try_catch.CanContinue()) {
419 throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
420 }
421
422 throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
423 exceptionString(try_catch));
424 }
425
426 return jsonStringify(result);
427 }
428
429
runRereduce(mapreduce_ctx_t *ctx, int reduceFunNum, const mapreduce_json_list_t &reductions)430 mapreduce_json_t runRereduce(mapreduce_ctx_t *ctx,
431 int reduceFunNum,
432 const mapreduce_json_list_t &reductions)
433 {
434 Locker locker(ctx->isolate);
435 Isolate::Scope isolateScope(ctx->isolate);
436 HandleScope handle_scope(ctx->isolate);
437 Local<Context> context = Local<Context>::New(ctx->isolate, ctx->jsContext);
438 Context::Scope context_scope(context);
439
440 reduceFunNum -= 1;
441 if (reduceFunNum < 0 ||
442 static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
443 throw MapReduceError(MAPREDUCE_INVALID_ARG,
444 "invalid reduce function number");
445 }
446
447 Local<Function> fun =
448 Local<Function>::New(ctx->isolate, *(*ctx->functions)[reduceFunNum]);
449 Handle<Array> valuesArray = jsonListToJsArray(reductions);
450 Handle<Value> args[] =
451 { Null(ctx->isolate), valuesArray, Boolean::New(ctx->isolate, true) };
452
453 taskStarted(ctx);
454
455 TryCatch try_catch(ctx->isolate);
456 Handle<Value> result = fun->Call(context->Global(), 3, args);
457
458 taskFinished(ctx);
459
460 if (result.IsEmpty()) {
461 if (!try_catch.CanContinue()) {
462 throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
463 }
464
465 throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
466 exceptionString(try_catch));
467 }
468
469 return jsonStringify(result);
470 }
471
472
terminateTask(mapreduce_ctx_t *ctx)473 void terminateTask(mapreduce_ctx_t *ctx)
474 {
475 V8::TerminateExecution(ctx->isolate);
476 ctx->taskStartTime = -1;
477 }
478
479
freeKvListEntries(kv_list_int_t &kvs)480 static void freeKvListEntries(kv_list_int_t &kvs)
481 {
482 kv_list_int_t::iterator it = kvs.begin();
483
484 for ( ; it != kvs.end(); ++it) {
485 mapreduce_kv_t kv = *it;
486 cb_free(kv.key.json);
487 cb_free(kv.value.json);
488 }
489 kvs.clear();
490 }
491
492
freeJsonListEntries(json_results_list_t &list)493 static void freeJsonListEntries(json_results_list_t &list)
494 {
495 json_results_list_t::iterator it = list.begin();
496
497 for ( ; it != list.end(); ++it) {
498 cb_free((*it).json);
499 }
500 list.clear();
501 }
502
503
compileFunction(const std::string &funSource)504 static Handle<Function> compileFunction(const std::string &funSource)
505 {
506 Isolate *isolate = Isolate::GetCurrent();
507 Local<Context> context(isolate->GetCurrentContext());
508 EscapableHandleScope handle_scope(isolate);
509 TryCatch try_catch(isolate);
510 Local<String> source = createUtf8String(isolate, funSource.data(),
511 funSource.length());
512 Local<Script> script;
513 if (!Script::Compile(context, source).ToLocal(&script)) {
514 throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
515 exceptionString(try_catch));
516 }
517
518 if (script.IsEmpty()) {
519 throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
520 exceptionString(try_catch));
521 }
522
523 Handle<Value> result = script->Run();
524
525 if (result.IsEmpty()) {
526 throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
527 exceptionString(try_catch));
528 }
529
530 if (!result->IsFunction()) {
531 throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
532 std::string("Invalid function: ") + funSource.c_str());
533 }
534
535 // Use EscapableHandleScope and return using .Escape
536 // This will ensure that return values are not garbage collected
537 // as soon as the function returns.
538 return handle_scope.Escape(Handle<Function>::Cast(result));
539 }
540
541
exceptionString(const TryCatch &tryCatch)542 static std::string exceptionString(const TryCatch &tryCatch)
543 {
544 HandleScope handle_scope(Isolate::GetCurrent());
545 String::Utf8Value exception(tryCatch.Exception());
546 const char *exceptionString = (*exception);
547
548 if (exceptionString) {
549 Handle<Message> message = tryCatch.Message();
550 return std::string(exceptionString) + " (line " +
551 std::to_string(message->GetLineNumber()) + ":" +
552 std::to_string(message->GetStartColumn()) + ")";
553 }
554
555 return std::string("runtime error");
556 }
557
558
loadFunctions(mapreduce_ctx_t *ctx, const std::list<std::string> &function_sources)559 static void loadFunctions(mapreduce_ctx_t *ctx,
560 const std::list<std::string> &function_sources)
561 {
562 HandleScope handle_scope(ctx->isolate);
563
564 ctx->functions = new function_vector_t();
565
566 std::list<std::string>::const_iterator it = function_sources.begin();
567
568 for ( ; it != function_sources.end(); ++it) {
569 Handle<Function> fun = compileFunction(*it);
570 Persistent<Function> *perFn = new Persistent<Function>();
571 perFn->Reset(ctx->isolate, fun);
572 ctx->functions->push_back(perFn);
573 }
574 }
575
576
emit(const FunctionCallbackInfo<Value> &args)577 static void emit(const FunctionCallbackInfo<Value> &args)
578 {
579 isolate_data_t *isoData = getIsolateData();
580
581 if (isoData->ctx->kvs == NULL) {
582 return;
583 }
584
585 try {
586 mapreduce_kv_t result;
587
588 result.key = jsonStringify(args[0]);
589 result.value = jsonStringify(args[1]);
590 isoData->ctx->kvs->push_back(result);
591
592 return;
593 } catch(Local<String> &ex) {
594 isoData->ctx->isolate->ThrowException(ex);
595 }
596 }
597
598
getIsolateData()599 static inline isolate_data_t *getIsolateData()
600 {
601 Isolate *isolate = Isolate::GetCurrent();
602 return reinterpret_cast<isolate_data_t*>(isolate->GetData(0));
603 }
604
605
jsonStringify(const Handle<Value> &obj)606 static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj)
607 {
608 isolate_data_t *isoData = getIsolateData();
609 Handle<Value> args[] = { obj };
610 TryCatch try_catch(isoData->ctx->isolate);
611 Local<Function> stringifyFun =
612 Local<Function>::New(isoData->ctx->isolate, isoData->stringifyFun);
613 Local<Object> jsonObject =
614 Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
615 Handle<Value> result = stringifyFun->Call(jsonObject, 1, args);
616
617 if (result.IsEmpty()) {
618 throw try_catch.Exception();
619 }
620
621 mapreduce_json_t jsonResult;
622
623 if (!result->IsUndefined()) {
624 Handle<String> str = Handle<String>::Cast(result);
625 jsonResult.length = str->Utf8Length();
626 jsonResult.json = (char *) cb_malloc(jsonResult.length);
627 if (jsonResult.json == NULL) {
628 throw std::bad_alloc();
629 }
630 str->WriteUtf8(jsonResult.json, jsonResult.length,
631 NULL, String::NO_NULL_TERMINATION);
632 } else {
633 jsonResult.length = sizeof("null") - 1;
634 jsonResult.json = (char *) cb_malloc(jsonResult.length);
635 if (jsonResult.json == NULL) {
636 throw std::bad_alloc();
637 }
638 memcpy(jsonResult.json, "null", jsonResult.length);
639 }
640
641 // Caller responsible for freeing jsonResult.json
642 return jsonResult;
643 }
644
645
jsonParse(const mapreduce_json_t &thing)646 static inline Handle<Value> jsonParse(const mapreduce_json_t &thing)
647 {
648 isolate_data_t *isoData = getIsolateData();
649 Handle<Value> args[] =
650 { createUtf8String(isoData->ctx->isolate, thing.json,
651 thing.length) };
652 TryCatch try_catch(isoData->ctx->isolate);
653 Local<Function> jsonParseFun =
654 Local<Function>::New(isoData->ctx->isolate, isoData->jsonParseFun);
655 Local<Object> jsonObject =
656 Local<Object>::New(isoData->ctx->isolate, isoData->jsonObject);
657 Handle<Value> result = jsonParseFun->Call(jsonObject, 1, args);
658
659 if (result.IsEmpty()) {
660 throw MapReduceError(MAPREDUCE_RUNTIME_ERROR,
661 exceptionString(try_catch));
662 }
663
664 return result;
665 }
666
667
taskStarted(mapreduce_ctx_t *ctx)668 static inline void taskStarted(mapreduce_ctx_t *ctx)
669 {
670 ctx->taskStartTime = time(NULL);
671 ctx->kvs = NULL;
672 }
673
674
taskFinished(mapreduce_ctx_t *ctx)675 static inline void taskFinished(mapreduce_ctx_t *ctx)
676 {
677 ctx->exitMutex.lock();
678 ctx->taskStartTime = -1;
679 ctx->exitMutex.unlock();
680 }
681
682
jsonListToJsArray(const mapreduce_json_list_t &list)683 static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list)
684 {
685 Isolate *isolate = Isolate::GetCurrent();
686 Handle<Array> array = Array::New(isolate, list.length);
687
688 for (int i = 0 ; i < list.length; ++i) {
689 Handle<Value> v = jsonParse(list.values[i]);
690 array->Set(Number::New(isolate, i), v);
691 }
692
693 return array;
694 }
695