1 /**
2  * @copyright 2013 Couchbase, Inc.
3  *
4  * @author Filipe Manana  <filipe@couchbase.com>
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7  * use this file except in compliance with the License. You may obtain a copy of
8  * the License at
9  *
10  *  http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15  * License for the specific language governing permissions and limitations under
16  * the License.
17  **/
18 
19 /**
20  * Implementation of all exported (public) functions, pure C.
21  **/
22 #include "mapreduce.h"
23 #include "mapreduce_internal.h"
24 #include <iostream>
25 #include <map>
26 #include <cstring>
27 #include <assert.h>
28 
29 #if defined(WIN32) || defined(_WIN32)
30 #define doSleep(Secs) Sleep(Secs * 1000)
31 #else
32 #include <unistd.h>
33 #define doSleep(Secs) sleep(Secs)
34 #endif
35 
36 static const char *MEM_ALLOC_ERROR_MSG = "memory allocation failure";
37 
38 static cb_thread_t terminator_thread;
39 static bool terminator_thread_created = false;
40 static volatile unsigned int terminator_timeout = 5;
41 
42 static std::map<uintptr_t, mapreduce_ctx_t *> ctx_registry;
43 
44 class RegistryMutex {
45 public:
RegistryMutex()46     RegistryMutex() {
47         cb_mutex_initialize(&mutex);
48     }
~RegistryMutex()49     ~RegistryMutex() {
50         cb_mutex_destroy(&mutex);
51     }
lock()52     void lock() {
53         cb_mutex_enter(&mutex);
54     }
unlock()55     void unlock() {
56         cb_mutex_exit(&mutex);
57     }
58 private:
59     cb_mutex_t mutex;
60 };
61 
62 static RegistryMutex registryMutex;
63 
64 
65 static mapreduce_error_t start_context(const char *functions[],
66                                        int num_functions,
67                                        void **context,
68                                        char **error_msg);
69 
70 static void make_function_list(const char *sources[],
71                                int num_sources,
72                                std::list<std::string> &list);
73 
74 static void copy_error_msg(const std::string &msg, char **to);
75 
76 static void register_ctx(mapreduce_ctx_t *ctx);
77 static void unregister_ctx(mapreduce_ctx_t *ctx);
78 static void terminator_loop(void *);
79 
80 
81 LIBMAPREDUCE_API
mapreduce_start_map_context(const char *map_functions[], int num_functions, void **context, char **error_msg)82 mapreduce_error_t mapreduce_start_map_context(const char *map_functions[],
83                                               int num_functions,
84                                               void **context,
85                                               char **error_msg)
86 {
87     return start_context(map_functions, num_functions, context, error_msg);
88 }
89 
90 
91 LIBMAPREDUCE_API
mapreduce_map(void *context, const mapreduce_json_t *doc, const mapreduce_json_t *meta, mapreduce_map_result_list_t **result)92 mapreduce_error_t mapreduce_map(void *context,
93                                 const mapreduce_json_t *doc,
94                                 const mapreduce_json_t *meta,
95                                 mapreduce_map_result_list_t **result)
96 {
97     mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
98 
99     *result = (mapreduce_map_result_list_t *) malloc(sizeof(**result));
100     if (*result == NULL) {
101         return MAPREDUCE_ALLOC_ERROR;
102     }
103 
104     int num_funs = ctx->functions->size();
105     size_t sz = sizeof(mapreduce_map_result_t) * num_funs;
106     (*result)->list = (mapreduce_map_result_t *) malloc(sz);
107 
108     if ((*result)->list == NULL) {
109         free(*result);
110         *result = NULL;
111         return MAPREDUCE_ALLOC_ERROR;
112     }
113 
114     (*result)->length = 0;
115     try {
116         mapDoc(ctx, *doc, *meta, *result);
117     } catch (MapReduceError &e) {
118         mapreduce_free_map_result_list(*result);
119         *result = NULL;
120         return e.getError();
121     } catch (std::bad_alloc &) {
122         mapreduce_free_map_result_list(*result);
123         *result = NULL;
124         return MAPREDUCE_ALLOC_ERROR;
125     }
126 
127     assert((*result)->length == num_funs);
128     return MAPREDUCE_SUCCESS;
129 }
130 
131 
132 LIBMAPREDUCE_API
mapreduce_start_reduce_context(const char *reduce_functions[], int num_functions, void **context, char **error_msg)133 mapreduce_error_t mapreduce_start_reduce_context(const char *reduce_functions[],
134                                                  int num_functions,
135                                                  void **context,
136                                                  char **error_msg)
137 {
138     return start_context(reduce_functions, num_functions, context, error_msg);
139 }
140 
141 
142 LIBMAPREDUCE_API
mapreduce_reduce_all(void *context, const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, mapreduce_json_list_t **result, char **error_msg)143 mapreduce_error_t mapreduce_reduce_all(void *context,
144                                        const mapreduce_json_list_t *keys,
145                                        const mapreduce_json_list_t *values,
146                                        mapreduce_json_list_t **result,
147                                        char **error_msg)
148 {
149     mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
150 
151     try {
152         json_results_list_t list = runReduce(ctx, *keys, *values);
153         size_t sz = list.size();
154         json_results_list_t::iterator it = list.begin();
155 
156         assert(sz == ctx->functions->size());
157 
158         *result = (mapreduce_json_list_t *) malloc(sizeof(**result));
159         if (*result == NULL) {
160             for ( ; it != list.end(); ++it) {
161                 free((*it).json);
162             }
163             throw std::bad_alloc();
164         }
165 
166         (*result)->length = sz;
167         (*result)->values = (mapreduce_json_t *) malloc(sizeof(mapreduce_json_t) * sz);
168         if ((*result)->values == NULL) {
169             free(*result);
170             for ( ; it != list.end(); ++it) {
171                 free((*it).json);
172             }
173             throw std::bad_alloc();
174         }
175         for (int i = 0; it != list.end(); ++it, ++i) {
176             (*result)->values[i] = *it;
177         }
178     } catch (MapReduceError &e) {
179         copy_error_msg(e.getMsg(), error_msg);
180         *result = NULL;
181         return e.getError();
182     } catch (std::bad_alloc &) {
183         copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
184         *result = NULL;
185         return MAPREDUCE_ALLOC_ERROR;
186     }
187 
188     *error_msg = NULL;
189     return MAPREDUCE_SUCCESS;
190 }
191 
192 
193 LIBMAPREDUCE_API
mapreduce_reduce(void *context, int reduceFunNum, const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, mapreduce_json_t **result, char **error_msg)194 mapreduce_error_t mapreduce_reduce(void *context,
195                                    int reduceFunNum,
196                                    const mapreduce_json_list_t *keys,
197                                    const mapreduce_json_list_t *values,
198                                    mapreduce_json_t **result,
199                                    char **error_msg)
200 {
201     mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
202 
203     try {
204         mapreduce_json_t red = runReduce(ctx, reduceFunNum, *keys, *values);
205 
206         *result = (mapreduce_json_t *) malloc(sizeof(**result));
207         if (*result == NULL) {
208             free(red.json);
209             throw std::bad_alloc();
210         }
211         **result = red;
212     } catch (MapReduceError &e) {
213         copy_error_msg(e.getMsg(), error_msg);
214         *result = NULL;
215         return e.getError();
216     } catch (std::bad_alloc &) {
217         copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
218         *result = NULL;
219         return MAPREDUCE_ALLOC_ERROR;
220     }
221 
222     *error_msg = NULL;
223     return MAPREDUCE_SUCCESS;
224 }
225 
226 
227 LIBMAPREDUCE_API
mapreduce_rereduce(void *context, int reduceFunNum, const mapreduce_json_list_t *reductions, mapreduce_json_t **result, char **error_msg)228 mapreduce_error_t mapreduce_rereduce(void *context,
229                                      int reduceFunNum,
230                                      const mapreduce_json_list_t *reductions,
231                                      mapreduce_json_t **result,
232                                      char **error_msg)
233 {
234     mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
235 
236     try {
237         mapreduce_json_t red = runRereduce(ctx, reduceFunNum, *reductions);
238 
239         *result = (mapreduce_json_t *) malloc(sizeof(**result));
240         if (*result == NULL) {
241             free(red.json);
242             throw std::bad_alloc();
243         }
244         **result = red;
245     } catch (MapReduceError &e) {
246         copy_error_msg(e.getMsg(), error_msg);
247         *result = NULL;
248         return e.getError();
249     } catch (std::bad_alloc &) {
250         copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
251         *result = NULL;
252         return MAPREDUCE_ALLOC_ERROR;
253     }
254 
255     *error_msg = NULL;
256     return MAPREDUCE_SUCCESS;
257 }
258 
259 
260 LIBMAPREDUCE_API
mapreduce_free_context(void *context)261 void mapreduce_free_context(void *context)
262 {
263     if (context != NULL) {
264         mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
265 
266         unregister_ctx(ctx);
267         destroyContext(ctx);
268         delete ctx;
269     }
270 }
271 
272 
273 LIBMAPREDUCE_API
mapreduce_free_json(mapreduce_json_t *value)274 void mapreduce_free_json(mapreduce_json_t *value)
275 {
276     if (value != NULL) {
277         free(value->json);
278         free(value);
279     }
280 }
281 
282 
283 LIBMAPREDUCE_API
mapreduce_free_json_list(mapreduce_json_list_t *list)284 void mapreduce_free_json_list(mapreduce_json_list_t *list)
285 {
286     if (list != NULL) {
287         for (int i = 0; i < list->length; ++i) {
288             free(list->values[i].json);
289         }
290         free(list->values);
291         free(list);
292     }
293 }
294 
295 
296 LIBMAPREDUCE_API
mapreduce_free_map_result_list(mapreduce_map_result_list_t *list)297 void mapreduce_free_map_result_list(mapreduce_map_result_list_t *list)
298 {
299     if (list == NULL) {
300         return;
301     }
302 
303     for (int i = 0; i < list->length; ++i) {
304         mapreduce_map_result_t mr = list->list[i];
305 
306         switch (mr.error) {
307         case MAPREDUCE_SUCCESS:
308             {
309                 mapreduce_kv_list_t kvs = mr.result.kvs;
310 
311                 for (int j = 0; j < kvs.length; ++j) {
312                     mapreduce_kv_t kv = kvs.kvs[j];
313                     free(kv.key.json);
314                     free(kv.value.json);
315                 }
316                 free(kvs.kvs);
317             }
318             break;
319         default:
320             free(mr.result.error_msg);
321             break;
322         }
323     }
324 
325     free(list->list);
326     free(list);
327 }
328 
329 
330 LIBMAPREDUCE_API
mapreduce_free_error_msg(char *error_msg)331 void mapreduce_free_error_msg(char *error_msg)
332 {
333     free(error_msg);
334 }
335 
336 
337 LIBMAPREDUCE_API
mapreduce_set_timeout(unsigned int seconds)338 void mapreduce_set_timeout(unsigned int seconds)
339 {
340     terminator_timeout = seconds;
341 }
342 
343 
start_context(const char *functions[], int num_functions, void **context, char **error_msg)344 static mapreduce_error_t start_context(const char *functions[],
345                                        int num_functions,
346                                        void **context,
347                                        char **error_msg)
348 {
349     mapreduce_ctx_t *ctx = NULL;
350     mapreduce_error_t ret = MAPREDUCE_SUCCESS;
351 
352     try {
353         ctx = new mapreduce_ctx_t();
354         std::list<std::string> functions_list;
355 
356         make_function_list(functions, num_functions, functions_list);
357         initContext(ctx, functions_list);
358     } catch (MapReduceError &e) {
359         copy_error_msg(e.getMsg(), error_msg);
360         ret = e.getError();
361     } catch (std::bad_alloc &) {
362         copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
363         ret = MAPREDUCE_ALLOC_ERROR;
364     }
365 
366     if (ret == MAPREDUCE_SUCCESS) {
367         register_ctx(ctx);
368         *context = (void *) ctx;
369         *error_msg = NULL;
370     } else {
371         delete ctx;
372     }
373     return ret;
374 }
375 
376 
make_function_list(const char *sources[], int num_sources, std::list<std::string> &list)377 static void make_function_list(const char *sources[],
378                                int num_sources,
379                                std::list<std::string> &list)
380 {
381     for (int i = 0; i < num_sources; ++i) {
382         std::string source;
383         size_t len = strlen(sources[i]);
384 
385         source.reserve(1 + len + 1);
386         source += '(';
387         source.append(sources[i], len);
388         source += ')';
389 
390         list.push_back(source);
391     }
392 }
393 
394 
copy_error_msg(const std::string &msg, char **to)395 static void copy_error_msg(const std::string &msg, char **to)
396 {
397     if (to != NULL) {
398         size_t len = msg.length();
399 
400         *to = (char *) malloc(len + 1);
401         if (*to != NULL) {
402             msg.copy(*to, len);
403             (*to)[len] = '\0';
404         }
405     }
406 }
407 
408 
register_ctx(mapreduce_ctx_t *ctx)409 static void register_ctx(mapreduce_ctx_t *ctx)
410 {
411     uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
412     registryMutex.lock();
413 
414     if (!terminator_thread_created) {
415         int ret = cb_create_thread(&terminator_thread, terminator_loop, NULL, 1);
416         if (ret != 0) {
417             std::cerr << "Error creating terminator thread: " << ret << std::endl;
418             exit(1);
419         }
420         terminator_thread_created = true;
421     }
422 
423     ctx_registry[key] = ctx;
424     registryMutex.unlock();
425 }
426 
427 
unregister_ctx(mapreduce_ctx_t *ctx)428 static void unregister_ctx(mapreduce_ctx_t *ctx)
429 {
430     uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
431 
432     registryMutex.lock();
433     ctx_registry.erase(key);
434     registryMutex.unlock();
435 }
436 
437 
terminator_loop(void *)438 static void terminator_loop(void *)
439 {
440     std::map<uintptr_t, mapreduce_ctx_t *>::iterator it;
441     time_t now;
442 
443     while (true) {
444         registryMutex.lock();
445         now = time(NULL);
446         for (it = ctx_registry.begin(); it != ctx_registry.end(); ++it) {
447             mapreduce_ctx_t *ctx = (*it).second;
448 
449             if (ctx->taskStartTime >= 0) {
450                 if (ctx->taskStartTime + terminator_timeout < now) {
451                     terminateTask(ctx);
452                 }
453             }
454         }
455 
456         registryMutex.unlock();
457         doSleep(terminator_timeout);
458     }
459 }
460