1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/**
3 * @copyright 2012 Couchbase, Inc.
4 *
5 * @author Filipe Manana  <filipe@couchbase.com>
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
8 * use this file except in compliance with the License. You may obtain a copy of
9 * the License at
10 *
11 *  http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations under
17 * the License.
18 **/
19
20#ifdef WIN32
21#define NOMINMAX
22#endif
23
24#include <platform/platform.h>
25#include <algorithm>
26#include <iostream>
27#include <cstring>
28#include <sstream>
29#include <map>
30#include <chrono>
31#include <mutex>
32#include <condition_variable>
33
34#include "erl_nif_compat.h"
35#include "mapreduce.h"
36
37// NOTE: keep this file clean (without knowledge) of any V8 APIs
38
39static ERL_NIF_TERM ATOM_OK;
40static ERL_NIF_TERM ATOM_ERROR;
41
42// maxTaskDuration is in seconds
43static std::atomic<int>                            maxTaskDuration;
44static int                                         maxKvSize = 1 * 1024 * 1024;
45static ErlNifResourceType                          *MAP_REDUCE_CTX_RES;
46static ErlNifTid                                   terminatorThreadId;
47static ErlNifMutex                                 *terminatorMutex;
48static std::condition_variable                     cv;
49static std::mutex                                  cvMutex;
50static std::atomic<bool>                           shutdownTerminator;
51static std::map< unsigned int, map_reduce_ctx_t* > contexts;
52
53
54// NIF API functions
55static ERL_NIF_TERM startMapContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
56static ERL_NIF_TERM doMapDoc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
57static ERL_NIF_TERM startReduceContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
58static ERL_NIF_TERM doReduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
59static ERL_NIF_TERM doRereduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
60static ERL_NIF_TERM setTimeout(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
61static ERL_NIF_TERM setMaxKvSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
62
63// NIF API callbacks
64static int onLoad(ErlNifEnv* env, void** priv, ERL_NIF_TERM info);
65static void onUnload(ErlNifEnv *env, void *priv_data);
66
67// Utilities
68static ERL_NIF_TERM makeError(ErlNifEnv *env, const std::string &msg);
69static bool parseFunctions(ErlNifEnv *env, ERL_NIF_TERM functionsArg, function_sources_list_t &result);
70
71// NIF resource functions
72static void free_map_reduce_context(ErlNifEnv *env, void *res);
73
74static inline void registerContext(map_reduce_ctx_t *ctx, ErlNifEnv *env, const ERL_NIF_TERM &refTerm);
75static inline void unregisterContext(map_reduce_ctx_t *ctx);
76static void *terminatorLoop(void *);
77
78
79
80ERL_NIF_TERM startMapContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
81{
82    char viewTypeAtom[256];
83    view_index_type_t viewType;
84    function_sources_list_t mapFunctions;
85
86    if (!enif_get_atom(env, argv[0], viewTypeAtom, sizeof(viewTypeAtom),
87                       ERL_NIF_LATIN1)) {
88        return enif_make_badarg(env);
89    }
90    if (!strcmp(viewTypeAtom, "mapreduce_view")) {
91        viewType = VIEW_INDEX_TYPE_MAPREDUCE;
92    } else if (!strcmp(viewTypeAtom, "spatial_view")) {
93        viewType = VIEW_INDEX_TYPE_SPATIAL;
94    } else {
95        return makeError(env, "unknown view type");
96    }
97
98    if (!parseFunctions(env, argv[1], mapFunctions)) {
99        return enif_make_badarg(env);
100    }
101
102    map_reduce_ctx_t *ctx = static_cast<map_reduce_ctx_t *>(
103        enif_alloc_resource(MAP_REDUCE_CTX_RES, sizeof(map_reduce_ctx_t)));
104
105    try {
106        initContext(ctx, mapFunctions, viewType);
107
108        ERL_NIF_TERM res = enif_make_resource(env, ctx);
109        enif_release_resource(ctx);
110
111        registerContext(ctx, env, argv[2]);
112
113        return enif_make_tuple2(env, ATOM_OK, res);
114
115    } catch(MapReduceError &e) {
116        return makeError(env, e.getMsg());
117    } catch(std::bad_alloc &) {
118        return makeError(env, "memory allocation failure");
119    }
120}
121
122
123ERL_NIF_TERM doMapDoc(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
124{
125    map_reduce_ctx_t *ctx;
126
127    if (!enif_get_resource(env, argv[0], MAP_REDUCE_CTX_RES, reinterpret_cast<void **>(&ctx))) {
128        return enif_make_badarg(env);
129    }
130    ctx->env = env;
131    ctx->maxEmitKvSize = maxKvSize;
132
133    ErlNifBinary docBin;
134
135    if (!enif_inspect_iolist_as_binary(env, argv[1], &docBin)) {
136        return enif_make_badarg(env);
137    }
138
139    ErlNifBinary metaBin;
140
141    if (!enif_inspect_iolist_as_binary(env, argv[2], &metaBin)) {
142        return enif_make_badarg(env);
143    }
144
145    try {
146        // Map results is a list of lists. An inner list is the list of key value
147        // pairs emitted by a map function for the document.
148        map_results_list_t mapResults = mapDoc(ctx, docBin, metaBin);
149        ERL_NIF_TERM outerList = enif_make_list(env, 0);
150        ERL_NIF_TERM logList = enif_make_list(env, 0);
151        map_results_list_t::reverse_iterator i = mapResults.rbegin();
152
153        for ( ; i != mapResults.rend(); ++i) {
154            map_result_t mapResult = *i;
155
156            switch (mapResult.type) {
157            case MAP_KVS:
158                {
159                    ERL_NIF_TERM kvList = enif_make_list(env, 0);
160                    kv_pair_list_t::reverse_iterator j = mapResult.result.kvs->rbegin();
161
162                    for ( ; j != mapResult.result.kvs->rend(); ++j) {
163                        ERL_NIF_TERM key = enif_make_binary(env, &j->first);
164                        ERL_NIF_TERM value = enif_make_binary(env, &j->second);
165                        ERL_NIF_TERM kvPair = enif_make_tuple2(env, key, value);
166                        kvList = enif_make_list_cell(env, kvPair, kvList);
167                    }
168                    mapResult.result.kvs->~kv_pair_list_t();
169                    enif_free(mapResult.result.kvs);
170                    outerList = enif_make_list_cell(env, kvList, outerList);
171                }
172                break;
173            case MAP_ERROR:
174                ERL_NIF_TERM reason = enif_make_binary(env, mapResult.result.error);
175                ERL_NIF_TERM errorTuple = enif_make_tuple2(env, ATOM_ERROR, reason);
176
177                enif_free(mapResult.result.error);
178                outerList = enif_make_list_cell(env, errorTuple, outerList);
179                break;
180            }
181        }
182        if (ctx->logResults) {
183            log_results_list_t::reverse_iterator k = ctx->logResults->rbegin();
184            for ( ; k != ctx->logResults->rend(); ++k) {
185                ERL_NIF_TERM logMsg = enif_make_binary(env, &(*k));
186                logList = enif_make_list_cell(env, logMsg, logList);
187            }
188            ctx->logResults->~log_results_list_t();
189            enif_free(ctx->logResults);
190            ctx->logResults = NULL;
191        }
192        return enif_make_tuple3(env, ATOM_OK, outerList, logList);
193
194    } catch(MapReduceError &e) {
195        return makeError(env, e.getMsg());
196    } catch(std::bad_alloc &) {
197        return makeError(env, "memory allocation failure");
198    }
199}
200
201
202ERL_NIF_TERM startReduceContext(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
203{
204    function_sources_list_t reduceFunctions;
205
206    if (!parseFunctions(env, argv[0], reduceFunctions)) {
207        return enif_make_badarg(env);
208    }
209
210    map_reduce_ctx_t *ctx = static_cast<map_reduce_ctx_t *>(
211        enif_alloc_resource(MAP_REDUCE_CTX_RES, sizeof(map_reduce_ctx_t)));
212
213    try {
214        initContext(ctx, reduceFunctions, VIEW_INDEX_TYPE_MAPREDUCE);
215
216        ERL_NIF_TERM res = enif_make_resource(env, ctx);
217        enif_release_resource(ctx);
218
219        registerContext(ctx, env, argv[1]);
220
221        return enif_make_tuple2(env, ATOM_OK, res);
222
223    } catch(MapReduceError &e) {
224        return makeError(env, e.getMsg());
225    } catch(std::bad_alloc &) {
226        return makeError(env, "memory allocation failure");
227    }
228}
229
230
231ERL_NIF_TERM doReduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
232{
233    map_reduce_ctx_t *ctx;
234
235    if (!enif_get_resource(env, argv[0], MAP_REDUCE_CTX_RES, reinterpret_cast<void **>(&ctx))) {
236        return enif_make_badarg(env);
237    }
238    ctx->env = env;
239
240    int reduceFunNum = -1;
241    json_results_list_t keys;
242    json_results_list_t values;
243    ERL_NIF_TERM tail;
244    ERL_NIF_TERM head;
245
246    if (!enif_get_int(env, argv[1], &reduceFunNum)) {
247        if (!enif_is_list(env, argv[1])) {
248            return enif_make_badarg(env);
249        }
250        tail = argv[1];
251    } else {
252        if (!enif_is_list(env, argv[2])) {
253            return enif_make_badarg(env);
254        }
255        tail = argv[2];
256    }
257
258    while (enif_get_list_cell(env, tail, &head, &tail)) {
259        const ERL_NIF_TERM* array;
260        int arity;
261
262        if (!enif_get_tuple(env, head, &arity, &array)) {
263            return enif_make_badarg(env);
264        }
265        if (arity != 2) {
266            return enif_make_badarg(env);
267        }
268
269        ErlNifBinary keyBin;
270        ErlNifBinary valueBin;
271
272        if (!enif_inspect_iolist_as_binary(env, array[0], &keyBin)) {
273            return enif_make_badarg(env);
274        }
275        if (!enif_inspect_iolist_as_binary(env, array[1], &valueBin)) {
276            return enif_make_badarg(env);
277        }
278
279        keys.push_back(keyBin);
280        values.push_back(valueBin);
281    }
282
283    try {
284        if (reduceFunNum == -1) {
285            json_results_list_t results = runReduce(ctx, keys, values);
286
287            ERL_NIF_TERM list = enif_make_list(env, 0);
288            json_results_list_t::reverse_iterator it = results.rbegin();
289
290            for ( ; it != results.rend(); ++it) {
291                ErlNifBinary reduction = *it;
292
293                list = enif_make_list_cell(env, enif_make_binary(env, &reduction), list);
294            }
295
296            return enif_make_tuple2(env, ATOM_OK, list);
297        } else {
298            ErlNifBinary reduction = runReduce(ctx, reduceFunNum, keys, values);
299
300            return enif_make_tuple2(env, ATOM_OK, enif_make_binary(env, &reduction));
301        }
302
303    } catch(MapReduceError &e) {
304        return makeError(env, e.getMsg());
305    } catch(std::bad_alloc &) {
306        return makeError(env, "memory allocation failure");
307    }
308}
309
310
311ERL_NIF_TERM doRereduce(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
312{
313    map_reduce_ctx_t *ctx;
314
315    if (!enif_get_resource(env, argv[0], MAP_REDUCE_CTX_RES, reinterpret_cast<void **>(&ctx))) {
316        return enif_make_badarg(env);
317    }
318    ctx->env = env;
319
320    int reduceFunNum;
321
322    if (!enif_get_int(env, argv[1], &reduceFunNum)) {
323        return enif_make_badarg(env);
324    }
325
326    if (!enif_is_list(env, argv[2])) {
327        return enif_make_badarg(env);
328    }
329
330    json_results_list_t reductions;
331    ERL_NIF_TERM tail = argv[2];
332    ERL_NIF_TERM head;
333
334    while (enif_get_list_cell(env, tail, &head, &tail)) {
335        ErlNifBinary reductionBin;
336
337        if (!enif_inspect_iolist_as_binary(env, head, &reductionBin)) {
338            return enif_make_badarg(env);
339        }
340
341        reductions.push_back(reductionBin);
342    }
343
344    try {
345        ErlNifBinary result = runRereduce(ctx, reduceFunNum, reductions);
346
347        return enif_make_tuple2(env, ATOM_OK, enif_make_binary(env, &result));
348    } catch(MapReduceError &e) {
349        return makeError(env, e.getMsg());
350    } catch(std::bad_alloc &) {
351        return makeError(env, "memory allocation failure");
352    }
353}
354
355
356ERL_NIF_TERM setTimeout(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
357{
358    int timeout;
359
360    if (!enif_get_int(env, argv[0], &timeout)) {
361        return enif_make_badarg(env);
362    }
363
364    std::lock_guard<std::mutex> lk(cvMutex);
365    maxTaskDuration = (timeout + 999) / 1000;
366
367    cv.notify_one();
368
369    return ATOM_OK;
370}
371
372
373ERL_NIF_TERM setMaxKvSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
374{
375    int max;
376
377    if (!enif_get_int(env, argv[0], &max)) {
378        return enif_make_badarg(env);
379    }
380
381    maxKvSize = max;
382
383    return ATOM_OK;
384}
385
386
387int onLoad(ErlNifEnv *env, void **priv, ERL_NIF_TERM info)
388{
389    ATOM_OK = enif_make_atom(env, "ok");
390    ATOM_ERROR = enif_make_atom(env, "error");
391
392    MAP_REDUCE_CTX_RES = enif_open_resource_type(
393        env,
394        NULL,
395        "map_reduce_context",
396        free_map_reduce_context,
397        ERL_NIF_RT_CREATE,
398        NULL);
399
400    if (MAP_REDUCE_CTX_RES == NULL) {
401        return -1;
402    }
403
404    terminatorMutex = enif_mutex_create(const_cast<char *>("terminator mutex"));
405    if (terminatorMutex == NULL) {
406        return -2;
407    }
408
409    shutdownTerminator = false;
410    maxTaskDuration = 5;
411    if (enif_thread_create(const_cast<char *>("terminator thread"),
412                           &terminatorThreadId,
413                           terminatorLoop,
414                           NULL,
415                           NULL) != 0) {
416        enif_mutex_destroy(terminatorMutex);
417        return -4;
418    }
419
420    initV8();
421    return 0;
422}
423
424
425void onUnload(ErlNifEnv *env, void *priv_data)
426{
427    void *result = NULL;
428
429    shutdownTerminator = true;
430    cv.notify_one();
431    enif_thread_join(terminatorThreadId, &result);
432    enif_mutex_destroy(terminatorMutex);
433    deinitV8();
434}
435
436
437bool parseFunctions(ErlNifEnv *env, ERL_NIF_TERM functionsArg, function_sources_list_t &result)
438{
439    if (!enif_is_list(env, functionsArg)) {
440        return false;
441    }
442
443    ERL_NIF_TERM tail = functionsArg;
444    ERL_NIF_TERM head;
445
446    while (enif_get_list_cell(env, tail, &head, &tail)) {
447        ErlNifBinary funBin;
448
449        if (!enif_inspect_iolist_as_binary(env, head, &funBin)) {
450            return false;
451        }
452
453        function_source_t src;
454
455        src.reserve(funBin.size + 2);
456        src += '(';
457        src.append(reinterpret_cast<char *>(funBin.data), funBin.size);
458        src += ')';
459
460        result.push_back(src);
461    }
462
463    return true;
464}
465
466
467ERL_NIF_TERM makeError(ErlNifEnv *env, const std::string &msg)
468{
469    ErlNifBinary reason;
470
471    if (!enif_alloc_binary_compat(env, msg.length(), &reason)) {
472        return ATOM_ERROR;
473    } else {
474        memcpy(reason.data, msg.data(), msg.length());
475        return enif_make_tuple2(env, ATOM_ERROR, enif_make_binary(env, &reason));
476    }
477}
478
479
480void free_map_reduce_context(ErlNifEnv *env, void *res) {
481    map_reduce_ctx_t *ctx = static_cast<map_reduce_ctx_t *>(res);
482
483    unregisterContext(ctx);
484    destroyContext(ctx);
485}
486
487
488#define SEC_TO_NSEC 1000000000ULL
489#define NSEC_TO_MSEC (1.0/1000000.0)
490
491void *terminatorLoop(void *args)
492{
493    std::map< unsigned int, map_reduce_ctx_t* >::iterator it;
494
495    while (!shutdownTerminator) {
496        // Convert maxTaskDuration to nanoseconds
497        const hrtime_t maxTaskTimeNSec = maxTaskDuration * SEC_TO_NSEC;
498        // gethrtime() returns values in nanoseconds
499        hrtime_t now, minTimeDiff = maxTaskTimeNSec;
500
501        enif_mutex_lock(terminatorMutex);
502        now = gethrtime();
503
504        for (it = contexts.begin(); it != contexts.end(); ++it) {
505            map_reduce_ctx_t *ctx = (*it).second;
506            if (ctx->taskStartTime > 0) {
507                int64_t  timeGap = maxTaskTimeNSec -
508                        (now - ctx->taskStartTime);
509                if ((int64_t)gethrtime_period() > timeGap) {
510                    terminateTask(ctx);
511                }
512                else {
513                    minTimeDiff = std::min((hrtime_t)timeGap, minTimeDiff);
514                }
515            }
516        }
517
518        enif_mutex_unlock(terminatorMutex);
519        // Convert minTimeDiff to miliseconds
520        hrtime_t minTimeMSec = (hrtime_t)(minTimeDiff * NSEC_TO_MSEC);
521        std::unique_lock<std::mutex> lk(cvMutex);
522        cv.wait_for(lk, std::chrono::milliseconds(minTimeMSec));
523    }
524
525    return NULL;
526}
527
528
529void registerContext(map_reduce_ctx_t *ctx, ErlNifEnv *env, const ERL_NIF_TERM &refTerm)
530{
531    if (!enif_get_uint(env, refTerm, &ctx->key)) {
532        throw MapReduceError("invalid context reference");
533    }
534
535    enif_mutex_lock(terminatorMutex);
536    contexts[ctx->key] = ctx;
537    enif_mutex_unlock(terminatorMutex);
538}
539
540
541void unregisterContext(map_reduce_ctx_t *ctx)
542{
543    enif_mutex_lock(terminatorMutex);
544    contexts.erase(ctx->key);
545    enif_mutex_unlock(terminatorMutex);
546}
547
548
549static ErlNifFunc nif_functions[] = {
550    {"start_map_context", 3, startMapContext},
551    {"map_doc", 3, doMapDoc},
552    {"start_reduce_context", 2, startReduceContext},
553    {"reduce", 2, doReduce},
554    {"reduce", 3, doReduce},
555    {"rereduce", 3, doRereduce},
556    {"set_timeout", 1, setTimeout},
557    {"set_max_kv_size_per_doc", 1, setMaxKvSize}
558};
559
560// Due to the stupid macros I need to manually do this in order
561// to get the correct linkage attributes :P
562extern "C" {
563#if defined (__SUNPRO_C) && (__SUNPRO_C >= 0x550)
564    __global ErlNifEntry* nif_init(void);
565#elif defined __GNUC__
566    __attribute__((visibility("default"))) ErlNifEntry* nif_init(void);
567#endif
568}
569
570ERL_NIF_INIT(mapreduce, nif_functions, &onLoad, NULL, NULL, &onUnload)
571