1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /**
3  * @copyright 2012 Couchbase, Inc.
4  *
5  * @author Filipe Manana  <filipe@couchbase.com>
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8  * use this file except in compliance with the License. You may obtain a copy of
9  * the License at
10  *
11  *  http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16  * License for the specific language governing permissions and limitations under
17  * the License.
18  **/
19 
20 #include <iostream>
21 #include <cstring>
22 #include <sstream>
23 #include <map>
24 #include <time.h>
25 
26 #if defined(WIN32) || defined(_WIN32)
27 #include <windows.h>
28 #define doSleep(Ms) Sleep(Ms)
29 #else
30 #define doSleep(Ms)                             \
31     do {                                        \
32         struct timespec ts;                     \
33         ts.tv_sec = Ms / 1000;                  \
34         ts.tv_nsec = (Ms % 1000) * 1000000;     \
35         nanosleep(&ts, NULL);                   \
36     } while(0)
37 #endif
38 
39 #include "erl_nif_compat.h"
40 #include "mapreduce.h"
41 
42 // NOTE: keep this file clean (without knowledge) of any V8 APIs
43 
44 static ERL_NIF_TERM ATOM_OK;
45 static ERL_NIF_TERM ATOM_ERROR;
46 
47 // maxTaskDuration is in seconds
48 static volatile int                                maxTaskDuration = 5;
49 static int                                         maxKvSize = 1 * 1024 * 1024;
50 static ErlNifResourceType                          *MAP_REDUCE_CTX_RES;
51 static ErlNifTid                                   terminatorThreadId;
52 static ErlNifMutex                                 *terminatorMutex;
53 static volatile int                                shutdownTerminator = 0;
54 static std::map< unsigned int, map_reduce_ctx_t* > contexts;
55 
56 
57 // NIF API functions
58 static ERL_NIF_TERM startMapContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
59 static ERL_NIF_TERM doMapDoc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
60 static ERL_NIF_TERM startReduceContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
61 static ERL_NIF_TERM doReduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
62 static ERL_NIF_TERM doRereduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
63 static ERL_NIF_TERM setTimeout(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
64 static ERL_NIF_TERM setMaxKvSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
65 
66 // NIF API callbacks
67 static int onLoad(ErlNifEnv* env, void** priv, ERL_NIF_TERM info);
68 static void onUnload(ErlNifEnv *env, void *priv_data);
69 
70 // Utilities
71 static ERL_NIF_TERM makeError(ErlNifEnv *env, const std::string &msg);
72 static bool parseFunctions(ErlNifEnv *env, ERL_NIF_TERM functionsArg, function_sources_list_t &result);
73 
74 // NIF resource functions
75 static void free_map_reduce_context(ErlNifEnv *env, void *res);
76 
77 static inline void registerContext(map_reduce_ctx_t *ctx, ErlNifEnv *env, const ERL_NIF_TERM &refTerm);
78 static inline void unregisterContext(map_reduce_ctx_t *ctx);
79 static void *terminatorLoop(void *);
80 
81 
82 
startMapContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])83 ERL_NIF_TERM startMapContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
84 {
85     function_sources_list_t mapFunctions;
86 
87     if (!parseFunctions(env, argv[0], mapFunctions)) {
88         return enif_make_badarg(env);
89     }
90 
91     map_reduce_ctx_t *ctx = static_cast<map_reduce_ctx_t *>(
92         enif_alloc_resource(MAP_REDUCE_CTX_RES, sizeof(map_reduce_ctx_t)));
93 
94     try {
95         initContext(ctx, mapFunctions);
96 
97         ERL_NIF_TERM res = enif_make_resource(env, ctx);
98         enif_release_resource(ctx);
99 
100         registerContext(ctx, env, argv[1]);
101 
102         return enif_make_tuple2(env, ATOM_OK, res);
103 
104     } catch(MapReduceError &e) {
105         return makeError(env, e.getMsg());
106     } catch(std::bad_alloc &) {
107         return makeError(env, "memory allocation failure");
108     }
109 }
110 
111 
doMapDoc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])112 ERL_NIF_TERM doMapDoc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
113 {
114     map_reduce_ctx_t *ctx;
115 
116     if (!enif_get_resource(env, argv[0], MAP_REDUCE_CTX_RES, reinterpret_cast<void **>(&ctx))) {
117         return enif_make_badarg(env);
118     }
119     ctx->env = env;
120     ctx->maxEmitKvSize = maxKvSize;
121 
122     ErlNifBinary docBin;
123 
124     if (!enif_inspect_iolist_as_binary(env, argv[1], &docBin)) {
125         return enif_make_badarg(env);
126     }
127 
128     ErlNifBinary metaBin;
129 
130     if (!enif_inspect_iolist_as_binary(env, argv[2], &metaBin)) {
131         return enif_make_badarg(env);
132     }
133 
134     try {
135         // Map results is a list of lists. An inner list is the list of key value
136         // pairs emitted by a map function for the document.
137         map_results_list_t mapResults = mapDoc(ctx, docBin, metaBin);
138         ERL_NIF_TERM outerList = enif_make_list(env, 0);
139         map_results_list_t::reverse_iterator i = mapResults.rbegin();
140 
141         for ( ; i != mapResults.rend(); ++i) {
142             map_result_t mapResult = *i;
143 
144             switch (mapResult.type) {
145             case MAP_KVS:
146                 {
147                     ERL_NIF_TERM kvList = enif_make_list(env, 0);
148                     kv_pair_list_t::reverse_iterator j = mapResult.result.kvs->rbegin();
149 
150                     for ( ; j != mapResult.result.kvs->rend(); ++j) {
151                         ERL_NIF_TERM key = enif_make_binary(env, &j->first);
152                         ERL_NIF_TERM value = enif_make_binary(env, &j->second);
153                         ERL_NIF_TERM kvPair = enif_make_tuple2(env, key, value);
154                         kvList = enif_make_list_cell(env, kvPair, kvList);
155                     }
156                     mapResult.result.kvs->~kv_pair_list_t();
157                     enif_free(mapResult.result.kvs);
158                     outerList = enif_make_list_cell(env, kvList, outerList);
159                 }
160                 break;
161             case MAP_ERROR:
162                 ERL_NIF_TERM reason = enif_make_binary(env, mapResult.result.error);
163                 ERL_NIF_TERM errorTuple = enif_make_tuple2(env, ATOM_ERROR, reason);
164 
165                 enif_free(mapResult.result.error);
166                 outerList = enif_make_list_cell(env, errorTuple, outerList);
167                 break;
168             }
169         }
170 
171         return enif_make_tuple2(env, ATOM_OK, outerList);
172 
173     } catch(MapReduceError &e) {
174         return makeError(env, e.getMsg());
175     } catch(std::bad_alloc &) {
176         return makeError(env, "memory allocation failure");
177     }
178 }
179 
180 
startReduceContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])181 ERL_NIF_TERM startReduceContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
182 {
183     function_sources_list_t reduceFunctions;
184 
185     if (!parseFunctions(env, argv[0], reduceFunctions)) {
186         return enif_make_badarg(env);
187     }
188 
189     map_reduce_ctx_t *ctx = static_cast<map_reduce_ctx_t *>(
190         enif_alloc_resource(MAP_REDUCE_CTX_RES, sizeof(map_reduce_ctx_t)));
191 
192     try {
193         initContext(ctx, reduceFunctions);
194 
195         ERL_NIF_TERM res = enif_make_resource(env, ctx);
196         enif_release_resource(ctx);
197 
198         registerContext(ctx, env, argv[1]);
199 
200         return enif_make_tuple2(env, ATOM_OK, res);
201 
202     } catch(MapReduceError &e) {
203         return makeError(env, e.getMsg());
204     } catch(std::bad_alloc &) {
205         return makeError(env, "memory allocation failure");
206     }
207 }
208 
209 
doReduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])210 ERL_NIF_TERM doReduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
211 {
212     map_reduce_ctx_t *ctx;
213 
214     if (!enif_get_resource(env, argv[0], MAP_REDUCE_CTX_RES, reinterpret_cast<void **>(&ctx))) {
215         return enif_make_badarg(env);
216     }
217     ctx->env = env;
218 
219     int reduceFunNum = -1;
220     json_results_list_t keys;
221     json_results_list_t values;
222     ERL_NIF_TERM tail;
223     ERL_NIF_TERM head;
224 
225     if (!enif_get_int(env, argv[1], &reduceFunNum)) {
226         if (!enif_is_list(env, argv[1])) {
227             return enif_make_badarg(env);
228         }
229         tail = argv[1];
230     } else {
231         if (!enif_is_list(env, argv[2])) {
232             return enif_make_badarg(env);
233         }
234         tail = argv[2];
235     }
236 
237     while (enif_get_list_cell(env, tail, &head, &tail)) {
238         const ERL_NIF_TERM* array;
239         int arity;
240 
241         if (!enif_get_tuple(env, head, &arity, &array)) {
242             return enif_make_badarg(env);
243         }
244         if (arity != 2) {
245             return enif_make_badarg(env);
246         }
247 
248         ErlNifBinary keyBin;
249         ErlNifBinary valueBin;
250 
251         if (!enif_inspect_iolist_as_binary(env, array[0], &keyBin)) {
252             return enif_make_badarg(env);
253         }
254         if (!enif_inspect_iolist_as_binary(env, array[1], &valueBin)) {
255             return enif_make_badarg(env);
256         }
257 
258         keys.push_back(keyBin);
259         values.push_back(valueBin);
260     }
261 
262     try {
263         if (reduceFunNum == -1) {
264             json_results_list_t results = runReduce(ctx, keys, values);
265 
266             ERL_NIF_TERM list = enif_make_list(env, 0);
267             json_results_list_t::reverse_iterator it = results.rbegin();
268 
269             for ( ; it != results.rend(); ++it) {
270                 ErlNifBinary reduction = *it;
271 
272                 list = enif_make_list_cell(env, enif_make_binary(env, &reduction), list);
273             }
274 
275             return enif_make_tuple2(env, ATOM_OK, list);
276         } else {
277             ErlNifBinary reduction = runReduce(ctx, reduceFunNum, keys, values);
278 
279             return enif_make_tuple2(env, ATOM_OK, enif_make_binary(env, &reduction));
280         }
281 
282     } catch(MapReduceError &e) {
283         return makeError(env, e.getMsg());
284     } catch(std::bad_alloc &) {
285         return makeError(env, "memory allocation failure");
286     }
287 }
288 
289 
doRereduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])290 ERL_NIF_TERM doRereduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
291 {
292     map_reduce_ctx_t *ctx;
293 
294     if (!enif_get_resource(env, argv[0], MAP_REDUCE_CTX_RES, reinterpret_cast<void **>(&ctx))) {
295         return enif_make_badarg(env);
296     }
297     ctx->env = env;
298 
299     int reduceFunNum;
300 
301     if (!enif_get_int(env, argv[1], &reduceFunNum)) {
302         return enif_make_badarg(env);
303     }
304 
305     if (!enif_is_list(env, argv[2])) {
306         return enif_make_badarg(env);
307     }
308 
309     json_results_list_t reductions;
310     ERL_NIF_TERM tail = argv[2];
311     ERL_NIF_TERM head;
312 
313     while (enif_get_list_cell(env, tail, &head, &tail)) {
314         ErlNifBinary reductionBin;
315 
316         if (!enif_inspect_iolist_as_binary(env, head, &reductionBin)) {
317             return enif_make_badarg(env);
318         }
319 
320         reductions.push_back(reductionBin);
321     }
322 
323     try {
324         ErlNifBinary result = runRereduce(ctx, reduceFunNum, reductions);
325 
326         return enif_make_tuple2(env, ATOM_OK, enif_make_binary(env, &result));
327     } catch(MapReduceError &e) {
328         return makeError(env, e.getMsg());
329     } catch(std::bad_alloc &) {
330         return makeError(env, "memory allocation failure");
331     }
332 }
333 
334 
setTimeout(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])335 ERL_NIF_TERM setTimeout(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
336 {
337     int timeout;
338 
339     if (!enif_get_int(env, argv[0], &timeout)) {
340         return enif_make_badarg(env);
341     }
342 
343     maxTaskDuration = (timeout + 999) / 1000;
344 
345     return ATOM_OK;
346 }
347 
348 
setMaxKvSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])349 ERL_NIF_TERM setMaxKvSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
350 {
351     int max;
352 
353     if (!enif_get_int(env, argv[0], &max)) {
354         return enif_make_badarg(env);
355     }
356 
357     maxKvSize = max;
358 
359     return ATOM_OK;
360 }
361 
362 
onLoad(ErlNifEnv *env, void **priv, ERL_NIF_TERM info)363 int onLoad(ErlNifEnv *env, void **priv, ERL_NIF_TERM info)
364 {
365     ATOM_OK = enif_make_atom(env, "ok");
366     ATOM_ERROR = enif_make_atom(env, "error");
367 
368     MAP_REDUCE_CTX_RES = enif_open_resource_type(
369         env,
370         NULL,
371         "map_reduce_context",
372         free_map_reduce_context,
373         ERL_NIF_RT_CREATE,
374         NULL);
375 
376     if (MAP_REDUCE_CTX_RES == NULL) {
377         return -1;
378     }
379 
380     terminatorMutex = enif_mutex_create(const_cast<char *>("terminator mutex"));
381     if (terminatorMutex == NULL) {
382         return -2;
383     }
384 
385     if (enif_thread_create(const_cast<char *>("terminator thread"),
386                            &terminatorThreadId,
387                            terminatorLoop,
388                            NULL,
389                            NULL) != 0) {
390         enif_mutex_destroy(terminatorMutex);
391         return -4;
392     }
393 
394     return 0;
395 }
396 
397 
onUnload(ErlNifEnv *env, void *priv_data)398 void onUnload(ErlNifEnv *env, void *priv_data)
399 {
400     void *result = NULL;
401 
402     shutdownTerminator = 1;
403     enif_thread_join(terminatorThreadId, &result);
404     enif_mutex_destroy(terminatorMutex);
405 }
406 
407 
parseFunctions(ErlNifEnv *env, ERL_NIF_TERM functionsArg, function_sources_list_t &result)408 bool parseFunctions(ErlNifEnv *env, ERL_NIF_TERM functionsArg, function_sources_list_t &result)
409 {
410     if (!enif_is_list(env, functionsArg)) {
411         return false;
412     }
413 
414     ERL_NIF_TERM tail = functionsArg;
415     ERL_NIF_TERM head;
416 
417     while (enif_get_list_cell(env, tail, &head, &tail)) {
418         ErlNifBinary funBin;
419 
420         if (!enif_inspect_iolist_as_binary(env, head, &funBin)) {
421             return false;
422         }
423 
424         function_source_t src;
425 
426         src.reserve(funBin.size + 2);
427         src += '(';
428         src.append(reinterpret_cast<char *>(funBin.data), funBin.size);
429         src += ')';
430 
431         result.push_back(src);
432     }
433 
434     return true;
435 }
436 
437 
makeError(ErlNifEnv *env, const std::string &msg)438 ERL_NIF_TERM makeError(ErlNifEnv *env, const std::string &msg)
439 {
440     ErlNifBinary reason;
441 
442     if (!enif_alloc_binary_compat(env, msg.length(), &reason)) {
443         return ATOM_ERROR;
444     } else {
445         memcpy(reason.data, msg.data(), msg.length());
446         return enif_make_tuple2(env, ATOM_ERROR, enif_make_binary(env, &reason));
447     }
448 }
449 
450 
free_map_reduce_context(ErlNifEnv *env, void *res)451 void free_map_reduce_context(ErlNifEnv *env, void *res) {
452     map_reduce_ctx_t *ctx = static_cast<map_reduce_ctx_t *>(res);
453 
454     unregisterContext(ctx);
455     destroyContext(ctx);
456 }
457 
458 
terminatorLoop(void *args)459 void *terminatorLoop(void *args)
460 {
461     std::map< unsigned int, map_reduce_ctx_t* >::iterator it;
462     time_t now;
463 
464     while (!shutdownTerminator) {
465         enif_mutex_lock(terminatorMutex);
466         // due to truncation of second's fraction lets pretend we're one second before
467         now = time(NULL) - 1;
468 
469         for (it = contexts.begin(); it != contexts.end(); ++it) {
470             map_reduce_ctx_t *ctx = (*it).second;
471 
472             if (ctx->taskStartTime >= 0) {
473                 if (ctx->taskStartTime + maxTaskDuration < now) {
474                     terminateTask(ctx);
475                 }
476             }
477         }
478 
479         enif_mutex_unlock(terminatorMutex);
480         doSleep(maxTaskDuration * 1000);
481     }
482 
483     return NULL;
484 }
485 
486 
registerContext(map_reduce_ctx_t *ctx, ErlNifEnv *env, const ERL_NIF_TERM &refTerm)487 void registerContext(map_reduce_ctx_t *ctx, ErlNifEnv *env, const ERL_NIF_TERM &refTerm)
488 {
489     if (!enif_get_uint(env, refTerm, &ctx->key)) {
490         throw MapReduceError("invalid context reference");
491     }
492 
493     enif_mutex_lock(terminatorMutex);
494     contexts[ctx->key] = ctx;
495     enif_mutex_unlock(terminatorMutex);
496 }
497 
498 
unregisterContext(map_reduce_ctx_t *ctx)499 void unregisterContext(map_reduce_ctx_t *ctx)
500 {
501     enif_mutex_lock(terminatorMutex);
502     contexts.erase(ctx->key);
503     enif_mutex_unlock(terminatorMutex);
504 }
505 
506 
507 static ErlNifFunc nif_functions[] = {
508     {"start_map_context", 2, startMapContext},
509     {"map_doc", 3, doMapDoc},
510     {"start_reduce_context", 2, startReduceContext},
511     {"reduce", 2, doReduce},
512     {"reduce", 3, doReduce},
513     {"rereduce", 3, doRereduce},
514     {"set_timeout", 1, setTimeout},
515     {"set_max_kv_size_per_doc", 1, setMaxKvSize}
516 };
517 
518 // Due to the stupid macros I need to manually do this in order
519 // to get the correct linkage attributes :P
520 extern "C" {
521 #if defined (__SUNPRO_C) && (__SUNPRO_C >= 0x550)
522     __global ErlNifEntry* nif_init(void);
523 #elif defined __GNUC__
524     __attribute__((visibility("default"))) ErlNifEntry* nif_init(void);
525 #endif
526 }
527 
528 ERL_NIF_INIT(mapreduce, nif_functions, &onLoad, NULL, NULL, &onUnload)
529