1/**
2 * @copyright 2013 Couchbase, Inc.
3 *
4 * @author Filipe Manana  <filipe@couchbase.com>
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7 * use this file except in compliance with the License. You may obtain a copy of
8 * the License at
9 *
10 *  http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 * License for the specific language governing permissions and limitations under
16 * the License.
17 **/
18
19/**
20 * Implementation of all exported (public) functions, pure C.
21 **/
22#include <condition_variable>
23#include <cstring>
24#include <iostream>
25#include <platform/cb_malloc.h>
26#include <platform/cbassert.h>
27#include <thread>
28#include <unordered_set>
29
30#include "mapreduce.h"
31#include "mapreduce_internal.h"
32
33static const char *MEM_ALLOC_ERROR_MSG = "memory allocation failure";
34static bool terminator_active = false;
35
36static std::thread terminator_thread;
37static std::unique_ptr<std::condition_variable> cv;
38static std::mutex  cvMutex, registryMutex, initMutex;
39static std::atomic<int> terminator_timeout;
40static std::atomic<bool> shutdown_terminator;
41
42static std::unordered_set<mapreduce_ctx_t *> ctx_registry;
43
44static mapreduce_error_t start_context(const char *functions[],
45                                       int num_functions,
46                                       void **context,
47                                       char **error_msg);
48
49static void make_function_list(const char *sources[],
50                               int num_sources,
51                               std::list<std::string> &list);
52
53static void copy_error_msg(const std::string &msg, char **to);
54
55static void register_ctx(mapreduce_ctx_t *ctx);
56static void unregister_ctx(mapreduce_ctx_t *ctx);
57static void terminator_loop();
58
59
60LIBMAPREDUCE_API
61mapreduce_error_t mapreduce_start_map_context(const char *map_functions[],
62                                              int num_functions,
63                                              void **context,
64                                              char **error_msg)
65{
66    return start_context(map_functions, num_functions, context, error_msg);
67}
68
69
70LIBMAPREDUCE_API
71mapreduce_error_t mapreduce_map(void *context,
72                                const mapreduce_json_t *doc,
73                                const mapreduce_json_t *meta,
74                                mapreduce_map_result_list_t **result)
75{
76    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
77
78    *result = (mapreduce_map_result_list_t *) cb_malloc(sizeof(**result));
79    if (*result == NULL) {
80        return MAPREDUCE_ALLOC_ERROR;
81    }
82
83    int num_funs = ctx->functions->size();
84    size_t sz = sizeof(mapreduce_map_result_t) * num_funs;
85    (*result)->list = (mapreduce_map_result_t *) cb_malloc(sz);
86
87    if ((*result)->list == NULL) {
88        cb_free(*result);
89        *result = NULL;
90        return MAPREDUCE_ALLOC_ERROR;
91    }
92
93    (*result)->length = 0;
94    try {
95        mapDoc(ctx, *doc, *meta, *result);
96    } catch (MapReduceError &e) {
97        mapreduce_free_map_result_list(*result);
98        *result = NULL;
99        return e.getError();
100    } catch (std::bad_alloc &) {
101        mapreduce_free_map_result_list(*result);
102        *result = NULL;
103        return MAPREDUCE_ALLOC_ERROR;
104    }
105
106    cb_assert((*result)->length == num_funs);
107    return MAPREDUCE_SUCCESS;
108}
109
110
111LIBMAPREDUCE_API
112mapreduce_error_t mapreduce_start_reduce_context(const char *reduce_functions[],
113                                                 int num_functions,
114                                                 void **context,
115                                                 char **error_msg)
116{
117    return start_context(reduce_functions, num_functions, context, error_msg);
118}
119
120
121LIBMAPREDUCE_API
122mapreduce_error_t mapreduce_reduce_all(void *context,
123                                       const mapreduce_json_list_t *keys,
124                                       const mapreduce_json_list_t *values,
125                                       mapreduce_json_list_t **result,
126                                       char **error_msg)
127{
128    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
129
130    try {
131        json_results_list_t list = runReduce(ctx, *keys, *values);
132        size_t sz = list.size();
133        json_results_list_t::iterator it = list.begin();
134
135        cb_assert(sz == ctx->functions->size());
136
137        *result = (mapreduce_json_list_t *) cb_malloc(sizeof(**result));
138        if (*result == NULL) {
139            for ( ; it != list.end(); ++it) {
140                cb_free((*it).json);
141            }
142            throw std::bad_alloc();
143        }
144
145        (*result)->length = sz;
146        (*result)->values = (mapreduce_json_t *) cb_malloc(sizeof(mapreduce_json_t) * sz);
147        if ((*result)->values == NULL) {
148            cb_free(*result);
149            for ( ; it != list.end(); ++it) {
150                cb_free((*it).json);
151            }
152            throw std::bad_alloc();
153        }
154        for (int i = 0; it != list.end(); ++it, ++i) {
155            (*result)->values[i] = *it;
156        }
157    } catch (MapReduceError &e) {
158        copy_error_msg(e.getMsg(), error_msg);
159        *result = NULL;
160        return e.getError();
161    } catch (std::bad_alloc &) {
162        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
163        *result = NULL;
164        return MAPREDUCE_ALLOC_ERROR;
165    }
166
167    *error_msg = NULL;
168    return MAPREDUCE_SUCCESS;
169}
170
171
172LIBMAPREDUCE_API
173mapreduce_error_t mapreduce_reduce(void *context,
174                                   int reduceFunNum,
175                                   const mapreduce_json_list_t *keys,
176                                   const mapreduce_json_list_t *values,
177                                   mapreduce_json_t **result,
178                                   char **error_msg)
179{
180    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
181
182    try {
183        mapreduce_json_t red = runReduce(ctx, reduceFunNum, *keys, *values);
184
185        *result = (mapreduce_json_t *) cb_malloc(sizeof(**result));
186        if (*result == NULL) {
187            cb_free(red.json);
188            throw std::bad_alloc();
189        }
190        **result = red;
191    } catch (MapReduceError &e) {
192        copy_error_msg(e.getMsg(), error_msg);
193        *result = NULL;
194        return e.getError();
195    } catch (std::bad_alloc &) {
196        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
197        *result = NULL;
198        return MAPREDUCE_ALLOC_ERROR;
199    }
200
201    *error_msg = NULL;
202    return MAPREDUCE_SUCCESS;
203}
204
205
206LIBMAPREDUCE_API
207mapreduce_error_t mapreduce_rereduce(void *context,
208                                     int reduceFunNum,
209                                     const mapreduce_json_list_t *reductions,
210                                     mapreduce_json_t **result,
211                                     char **error_msg)
212{
213    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
214
215    try {
216        mapreduce_json_t red = runRereduce(ctx, reduceFunNum, *reductions);
217
218        *result = (mapreduce_json_t *) cb_malloc(sizeof(**result));
219        if (*result == NULL) {
220            cb_free(red.json);
221            throw std::bad_alloc();
222        }
223        **result = red;
224    } catch (MapReduceError &e) {
225        copy_error_msg(e.getMsg(), error_msg);
226        *result = NULL;
227        return e.getError();
228    } catch (std::bad_alloc &) {
229        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
230        *result = NULL;
231        return MAPREDUCE_ALLOC_ERROR;
232    }
233
234    *error_msg = NULL;
235    return MAPREDUCE_SUCCESS;
236}
237
238
239LIBMAPREDUCE_API
240void mapreduce_free_context(void *context)
241{
242    if (context != NULL) {
243        mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
244
245        unregister_ctx(ctx);
246        destroyContext(ctx);
247        delete ctx;
248    }
249}
250
251
252LIBMAPREDUCE_API
253void mapreduce_free_json(mapreduce_json_t *value)
254{
255    if (value != NULL) {
256        cb_free(value->json);
257        cb_free(value);
258    }
259}
260
261
262LIBMAPREDUCE_API
263void mapreduce_free_json_list(mapreduce_json_list_t *list)
264{
265    if (list != NULL) {
266        for (int i = 0; i < list->length; ++i) {
267            cb_free(list->values[i].json);
268        }
269        cb_free(list->values);
270        cb_free(list);
271    }
272}
273
274
275LIBMAPREDUCE_API
276void mapreduce_free_map_result_list(mapreduce_map_result_list_t *list)
277{
278    if (list == NULL) {
279        return;
280    }
281
282    for (int i = 0; i < list->length; ++i) {
283        mapreduce_map_result_t mr = list->list[i];
284
285        switch (mr.error) {
286        case MAPREDUCE_SUCCESS:
287            {
288                mapreduce_kv_list_t kvs = mr.result.kvs;
289
290                for (int j = 0; j < kvs.length; ++j) {
291                    mapreduce_kv_t kv = kvs.kvs[j];
292                    cb_free(kv.key.json);
293                    cb_free(kv.value.json);
294                }
295                cb_free(kvs.kvs);
296            }
297            break;
298        default:
299            cb_free(mr.result.error_msg);
300            break;
301        }
302    }
303
304    cb_free(list->list);
305    cb_free(list);
306}
307
308
309LIBMAPREDUCE_API
310void mapreduce_free_error_msg(char *error_msg)
311{
312    cb_free(error_msg);
313}
314
315
316LIBMAPREDUCE_API
317void mapreduce_set_timeout(unsigned int seconds)
318{
319    std::lock_guard<std::mutex> lk(cvMutex);
320    terminator_timeout = seconds;
321    cv->notify_one();
322}
323
324
325static mapreduce_error_t start_context(const char *functions[],
326                                       int num_functions,
327                                       void **context,
328                                       char **error_msg)
329{
330    mapreduce_ctx_t *ctx = NULL;
331    mapreduce_error_t ret = MAPREDUCE_SUCCESS;
332
333    try {
334        ctx = new mapreduce_ctx_t();
335        std::list<std::string> functions_list;
336
337        make_function_list(functions, num_functions, functions_list);
338        initContext(ctx, functions_list);
339    } catch (MapReduceError &e) {
340        copy_error_msg(e.getMsg(), error_msg);
341        ret = e.getError();
342    } catch (std::bad_alloc &) {
343        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
344        ret = MAPREDUCE_ALLOC_ERROR;
345    }
346
347    if (ret == MAPREDUCE_SUCCESS) {
348        register_ctx(ctx);
349        *context = (void *) ctx;
350        *error_msg = NULL;
351    } else {
352        delete ctx;
353    }
354    return ret;
355}
356
357
358static void make_function_list(const char *sources[],
359                               int num_sources,
360                               std::list<std::string> &list)
361{
362    for (int i = 0; i < num_sources; ++i) {
363        std::string source;
364        size_t len = strlen(sources[i]);
365
366        source.reserve(1 + len + 1);
367        source += '(';
368        source.append(sources[i], len);
369        source += ')';
370
371        list.push_back(source);
372    }
373}
374
375
376static void copy_error_msg(const std::string &msg, char **to)
377{
378    if (to != NULL) {
379        size_t len = msg.length();
380
381        *to = (char *) cb_malloc(len + 1);
382        if (*to != NULL) {
383            msg.copy(*to, len);
384            (*to)[len] = '\0';
385        }
386    }
387}
388
389LIBCOUCHSTORE_API
390void init_terminator_thread()
391{
392    std::lock_guard<std::mutex> initGuard(initMutex);
393    shutdown_terminator = false;
394    // Default 5 seconds for mapreduce tasks
395    terminator_timeout = 5;
396    try {
397        cv = std::unique_ptr<std::condition_variable>(new std::condition_variable());
398    }
399    catch (std::bad_alloc) {
400        std::cerr << "Error creating conditional variable: " << std::endl;
401        exit(1);
402    }
403    try {
404        terminator_thread = std::thread(terminator_loop);
405    }
406    catch (...) {
407        std::cerr << "Error creating terminator thread: " << std::endl;
408        exit(1);
409    }
410    terminator_active = true;
411}
412
413LIBCOUCHSTORE_API
414void deinit_terminator_thread(bool fatal_exit=false)
415{
416    initMutex.lock();
417    if(terminator_active) {
418        // There is no conditional wait on this shared variable. Hence no mutex.
419        shutdown_terminator = true;
420        // Wake the thread up to shutdown
421        cv->notify_one();
422        terminator_thread.join();
423        //delete cv;
424        terminator_active = false;
425    }
426    if(!fatal_exit) initMutex.unlock();
427}
428
429
430LIBCOUCHSTORE_API
431void mapreduce_init()
432{
433    initV8();
434    init_terminator_thread();
435}
436
437LIBCOUCHSTORE_API
438void mapreduce_deinit()
439{
440    deinit_terminator_thread();
441    deinitV8();
442}
443
444static void register_ctx(mapreduce_ctx_t *ctx)
445{
446    registryMutex.lock();
447    bool inserted = ctx_registry.insert(ctx).second;
448    registryMutex.unlock();
449    cb_assert(inserted == true);
450}
451
452
453static void unregister_ctx(mapreduce_ctx_t *ctx)
454{
455    registryMutex.lock();
456    ctx_registry.erase(ctx);
457    registryMutex.unlock();
458}
459
460
461static void terminator_loop()
462{
463    time_t now;
464
465    while (!shutdown_terminator) {
466        registryMutex.lock();
467        now = time(NULL);
468        for (mapreduce_ctx_t *ctx : ctx_registry) {
469            ctx->exitMutex.lock();
470            if (ctx->taskStartTime >= 0) {
471                if (ctx->taskStartTime + terminator_timeout < now) {
472                    terminateTask(ctx);
473                }
474            }
475            ctx->exitMutex.unlock();
476        }
477
478        registryMutex.unlock();
479        std::unique_lock<std::mutex> lk(cvMutex);
480        cv->wait_for(lk, std::chrono::seconds(terminator_timeout));
481    }
482}
483