1/**
2 * @copyright 2013 Couchbase, Inc.
3 *
4 * @author Filipe Manana  <filipe@couchbase.com>
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7 * use this file except in compliance with the License. You may obtain a copy of
8 * the License at
9 *
10 *  http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 * License for the specific language governing permissions and limitations under
16 * the License.
17 **/
18
19/**
20 * Implementation of all exported (public) functions, pure C.
21 **/
22#include "mapreduce.h"
23#include "mapreduce_internal.h"
24#include <iostream>
25#include <map>
26#include <cstring>
27#include <assert.h>
28#include <condition_variable>
29
30static const char *MEM_ALLOC_ERROR_MSG = "memory allocation failure";
31
32static cb_thread_t terminator_thread;
33static std::condition_variable cv;
34static std::mutex  cvMutex;
35static std::atomic<int> terminator_timeout;
36static std::atomic<bool> shutdown_terminator;
37
38static std::map<uintptr_t, mapreduce_ctx_t *> ctx_registry;
39
40class RegistryMutex {
41public:
42    RegistryMutex() {
43        cb_mutex_initialize(&mutex);
44    }
45    ~RegistryMutex() {
46        cb_mutex_destroy(&mutex);
47    }
48    void lock() {
49        cb_mutex_enter(&mutex);
50    }
51    void unlock() {
52        cb_mutex_exit(&mutex);
53    }
54private:
55    cb_mutex_t mutex;
56};
57
58static RegistryMutex registryMutex;
59
60
61static mapreduce_error_t start_context(const char *functions[],
62                                       int num_functions,
63                                       void **context,
64                                       char **error_msg);
65
66static void make_function_list(const char *sources[],
67                               int num_sources,
68                               std::list<std::string> &list);
69
70static void copy_error_msg(const std::string &msg, char **to);
71
72static void register_ctx(mapreduce_ctx_t *ctx);
73static void unregister_ctx(mapreduce_ctx_t *ctx);
74static void terminator_loop(void *);
75
76
77LIBMAPREDUCE_API
78mapreduce_error_t mapreduce_start_map_context(const char *map_functions[],
79                                              int num_functions,
80                                              void **context,
81                                              char **error_msg)
82{
83    return start_context(map_functions, num_functions, context, error_msg);
84}
85
86
87LIBMAPREDUCE_API
88mapreduce_error_t mapreduce_map(void *context,
89                                const mapreduce_json_t *doc,
90                                const mapreduce_json_t *meta,
91                                mapreduce_map_result_list_t **result)
92{
93    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
94
95    *result = (mapreduce_map_result_list_t *) malloc(sizeof(**result));
96    if (*result == NULL) {
97        return MAPREDUCE_ALLOC_ERROR;
98    }
99
100    int num_funs = ctx->functions->size();
101    size_t sz = sizeof(mapreduce_map_result_t) * num_funs;
102    (*result)->list = (mapreduce_map_result_t *) malloc(sz);
103
104    if ((*result)->list == NULL) {
105        free(*result);
106        *result = NULL;
107        return MAPREDUCE_ALLOC_ERROR;
108    }
109
110    (*result)->length = 0;
111    try {
112        mapDoc(ctx, *doc, *meta, *result);
113    } catch (MapReduceError &e) {
114        mapreduce_free_map_result_list(*result);
115        *result = NULL;
116        return e.getError();
117    } catch (std::bad_alloc &) {
118        mapreduce_free_map_result_list(*result);
119        *result = NULL;
120        return MAPREDUCE_ALLOC_ERROR;
121    }
122
123    assert((*result)->length == num_funs);
124    return MAPREDUCE_SUCCESS;
125}
126
127
128LIBMAPREDUCE_API
129mapreduce_error_t mapreduce_start_reduce_context(const char *reduce_functions[],
130                                                 int num_functions,
131                                                 void **context,
132                                                 char **error_msg)
133{
134    return start_context(reduce_functions, num_functions, context, error_msg);
135}
136
137
138LIBMAPREDUCE_API
139mapreduce_error_t mapreduce_reduce_all(void *context,
140                                       const mapreduce_json_list_t *keys,
141                                       const mapreduce_json_list_t *values,
142                                       mapreduce_json_list_t **result,
143                                       char **error_msg)
144{
145    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
146
147    try {
148        json_results_list_t list = runReduce(ctx, *keys, *values);
149        size_t sz = list.size();
150        json_results_list_t::iterator it = list.begin();
151
152        assert(sz == ctx->functions->size());
153
154        *result = (mapreduce_json_list_t *) malloc(sizeof(**result));
155        if (*result == NULL) {
156            for ( ; it != list.end(); ++it) {
157                free((*it).json);
158            }
159            throw std::bad_alloc();
160        }
161
162        (*result)->length = sz;
163        (*result)->values = (mapreduce_json_t *) malloc(sizeof(mapreduce_json_t) * sz);
164        if ((*result)->values == NULL) {
165            free(*result);
166            for ( ; it != list.end(); ++it) {
167                free((*it).json);
168            }
169            throw std::bad_alloc();
170        }
171        for (int i = 0; it != list.end(); ++it, ++i) {
172            (*result)->values[i] = *it;
173        }
174    } catch (MapReduceError &e) {
175        copy_error_msg(e.getMsg(), error_msg);
176        *result = NULL;
177        return e.getError();
178    } catch (std::bad_alloc &) {
179        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
180        *result = NULL;
181        return MAPREDUCE_ALLOC_ERROR;
182    }
183
184    *error_msg = NULL;
185    return MAPREDUCE_SUCCESS;
186}
187
188
189LIBMAPREDUCE_API
190mapreduce_error_t mapreduce_reduce(void *context,
191                                   int reduceFunNum,
192                                   const mapreduce_json_list_t *keys,
193                                   const mapreduce_json_list_t *values,
194                                   mapreduce_json_t **result,
195                                   char **error_msg)
196{
197    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
198
199    try {
200        mapreduce_json_t red = runReduce(ctx, reduceFunNum, *keys, *values);
201
202        *result = (mapreduce_json_t *) malloc(sizeof(**result));
203        if (*result == NULL) {
204            free(red.json);
205            throw std::bad_alloc();
206        }
207        **result = red;
208    } catch (MapReduceError &e) {
209        copy_error_msg(e.getMsg(), error_msg);
210        *result = NULL;
211        return e.getError();
212    } catch (std::bad_alloc &) {
213        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
214        *result = NULL;
215        return MAPREDUCE_ALLOC_ERROR;
216    }
217
218    *error_msg = NULL;
219    return MAPREDUCE_SUCCESS;
220}
221
222
223LIBMAPREDUCE_API
224mapreduce_error_t mapreduce_rereduce(void *context,
225                                     int reduceFunNum,
226                                     const mapreduce_json_list_t *reductions,
227                                     mapreduce_json_t **result,
228                                     char **error_msg)
229{
230    mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
231
232    try {
233        mapreduce_json_t red = runRereduce(ctx, reduceFunNum, *reductions);
234
235        *result = (mapreduce_json_t *) malloc(sizeof(**result));
236        if (*result == NULL) {
237            free(red.json);
238            throw std::bad_alloc();
239        }
240        **result = red;
241    } catch (MapReduceError &e) {
242        copy_error_msg(e.getMsg(), error_msg);
243        *result = NULL;
244        return e.getError();
245    } catch (std::bad_alloc &) {
246        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
247        *result = NULL;
248        return MAPREDUCE_ALLOC_ERROR;
249    }
250
251    *error_msg = NULL;
252    return MAPREDUCE_SUCCESS;
253}
254
255
256LIBMAPREDUCE_API
257void mapreduce_free_context(void *context)
258{
259    if (context != NULL) {
260        mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
261
262        unregister_ctx(ctx);
263        destroyContext(ctx);
264        delete ctx;
265    }
266}
267
268
269LIBMAPREDUCE_API
270void mapreduce_free_json(mapreduce_json_t *value)
271{
272    if (value != NULL) {
273        free(value->json);
274        free(value);
275    }
276}
277
278
279LIBMAPREDUCE_API
280void mapreduce_free_json_list(mapreduce_json_list_t *list)
281{
282    if (list != NULL) {
283        for (int i = 0; i < list->length; ++i) {
284            free(list->values[i].json);
285        }
286        free(list->values);
287        free(list);
288    }
289}
290
291
292LIBMAPREDUCE_API
293void mapreduce_free_map_result_list(mapreduce_map_result_list_t *list)
294{
295    if (list == NULL) {
296        return;
297    }
298
299    for (int i = 0; i < list->length; ++i) {
300        mapreduce_map_result_t mr = list->list[i];
301
302        switch (mr.error) {
303        case MAPREDUCE_SUCCESS:
304            {
305                mapreduce_kv_list_t kvs = mr.result.kvs;
306
307                for (int j = 0; j < kvs.length; ++j) {
308                    mapreduce_kv_t kv = kvs.kvs[j];
309                    free(kv.key.json);
310                    free(kv.value.json);
311                }
312                free(kvs.kvs);
313            }
314            break;
315        default:
316            free(mr.result.error_msg);
317            break;
318        }
319    }
320
321    free(list->list);
322    free(list);
323}
324
325
326LIBMAPREDUCE_API
327void mapreduce_free_error_msg(char *error_msg)
328{
329    free(error_msg);
330}
331
332
333LIBMAPREDUCE_API
334void mapreduce_set_timeout(unsigned int seconds)
335{
336    std::lock_guard<std::mutex> lk(cvMutex);
337    terminator_timeout = seconds;
338    cv.notify_one();
339}
340
341
342static mapreduce_error_t start_context(const char *functions[],
343                                       int num_functions,
344                                       void **context,
345                                       char **error_msg)
346{
347    mapreduce_ctx_t *ctx = NULL;
348    mapreduce_error_t ret = MAPREDUCE_SUCCESS;
349
350    try {
351        ctx = new mapreduce_ctx_t();
352        std::list<std::string> functions_list;
353
354        make_function_list(functions, num_functions, functions_list);
355        initContext(ctx, functions_list);
356    } catch (MapReduceError &e) {
357        copy_error_msg(e.getMsg(), error_msg);
358        ret = e.getError();
359    } catch (std::bad_alloc &) {
360        copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
361        ret = MAPREDUCE_ALLOC_ERROR;
362    }
363
364    if (ret == MAPREDUCE_SUCCESS) {
365        register_ctx(ctx);
366        *context = (void *) ctx;
367        *error_msg = NULL;
368    } else {
369        delete ctx;
370    }
371    return ret;
372}
373
374
375static void make_function_list(const char *sources[],
376                               int num_sources,
377                               std::list<std::string> &list)
378{
379    for (int i = 0; i < num_sources; ++i) {
380        std::string source;
381        size_t len = strlen(sources[i]);
382
383        source.reserve(1 + len + 1);
384        source += '(';
385        source.append(sources[i], len);
386        source += ')';
387
388        list.push_back(source);
389    }
390}
391
392
393static void copy_error_msg(const std::string &msg, char **to)
394{
395    if (to != NULL) {
396        size_t len = msg.length();
397
398        *to = (char *) malloc(len + 1);
399        if (*to != NULL) {
400            msg.copy(*to, len);
401            (*to)[len] = '\0';
402        }
403    }
404}
405
406LIBCOUCHSTORE_API
407void init_terminator_thread()
408{
409    shutdown_terminator = false;
410    // Default 5 seconds for mapreduce tasks
411    terminator_timeout = 5;
412    int ret = cb_create_thread(&terminator_thread, terminator_loop, NULL, 0);
413    if (ret != 0) {
414        std::cerr << "Error creating terminator thread: " << ret << std::endl;
415        exit(1);
416    }
417}
418
419LIBCOUCHSTORE_API
420void deinit_terminator_thread()
421{
422    // There is no conditional wait on this shared variable. Hence no mutex.
423    shutdown_terminator = true;
424    // Wake the thread up to shutdown
425    cv.notify_one();
426    cb_join_thread(terminator_thread);
427}
428
429
430LIBCOUCHSTORE_API
431void mapreduce_init()
432{
433    initV8();
434    init_terminator_thread();
435}
436
437LIBCOUCHSTORE_API
438void mapreduce_deinit()
439{
440    deinit_terminator_thread();
441    deinitV8();
442}
443
444static void register_ctx(mapreduce_ctx_t *ctx)
445{
446    uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
447    registryMutex.lock();
448
449    ctx_registry[key] = ctx;
450    registryMutex.unlock();
451}
452
453
454static void unregister_ctx(mapreduce_ctx_t *ctx)
455{
456    uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
457
458    registryMutex.lock();
459    ctx_registry.erase(key);
460    registryMutex.unlock();
461}
462
463
464static void terminator_loop(void *)
465{
466    std::map<uintptr_t, mapreduce_ctx_t *>::iterator it;
467    time_t now;
468
469    while (!shutdown_terminator) {
470        registryMutex.lock();
471        now = time(NULL);
472        for (it = ctx_registry.begin(); it != ctx_registry.end(); ++it) {
473            mapreduce_ctx_t *ctx = (*it).second;
474
475            if (ctx->taskStartTime >= 0) {
476                if (ctx->taskStartTime + terminator_timeout < now) {
477                    terminateTask(ctx);
478                }
479            }
480        }
481
482        registryMutex.unlock();
483        std::unique_lock<std::mutex> lk(cvMutex);
484        cv.wait_for(lk, std::chrono::seconds(terminator_timeout));
485    }
486}
487