1 /**
2 * @copyright 2013 Couchbase, Inc.
3 *
4 * @author Filipe Manana <filipe@couchbase.com>
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
7 * use this file except in compliance with the License. You may obtain a copy of
8 * the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 * License for the specific language governing permissions and limitations under
16 * the License.
17 **/
18
19 /**
20 * Implementation of all exported (public) functions, pure C.
21 **/
22 #include "mapreduce.h"
23 #include "mapreduce_internal.h"
24 #include <iostream>
25 #include <map>
26 #include <cstring>
27 #include <assert.h>
28
29 #if defined(WIN32) || defined(_WIN32)
30 #define doSleep(Secs) Sleep(Secs * 1000)
31 #else
32 #include <unistd.h>
33 #define doSleep(Secs) sleep(Secs)
34 #endif
35
36 static const char *MEM_ALLOC_ERROR_MSG = "memory allocation failure";
37
38 static cb_thread_t terminator_thread;
39 static bool terminator_thread_created = false;
40 static volatile unsigned int terminator_timeout = 5;
41
42 static std::map<uintptr_t, mapreduce_ctx_t *> ctx_registry;
43
44 class RegistryMutex {
45 public:
RegistryMutex()46 RegistryMutex() {
47 cb_mutex_initialize(&mutex);
48 }
~RegistryMutex()49 ~RegistryMutex() {
50 cb_mutex_destroy(&mutex);
51 }
lock()52 void lock() {
53 cb_mutex_enter(&mutex);
54 }
unlock()55 void unlock() {
56 cb_mutex_exit(&mutex);
57 }
58 private:
59 cb_mutex_t mutex;
60 };
61
62 static RegistryMutex registryMutex;
63
64
65 static mapreduce_error_t start_context(const char *functions[],
66 int num_functions,
67 void **context,
68 char **error_msg);
69
70 static void make_function_list(const char *sources[],
71 int num_sources,
72 std::list<std::string> &list);
73
74 static void copy_error_msg(const std::string &msg, char **to);
75
76 static void register_ctx(mapreduce_ctx_t *ctx);
77 static void unregister_ctx(mapreduce_ctx_t *ctx);
78 static void terminator_loop(void *);
79
80
81 LIBMAPREDUCE_API
mapreduce_start_map_context(const char *map_functions[], int num_functions, void **context, char **error_msg)82 mapreduce_error_t mapreduce_start_map_context(const char *map_functions[],
83 int num_functions,
84 void **context,
85 char **error_msg)
86 {
87 return start_context(map_functions, num_functions, context, error_msg);
88 }
89
90
91 LIBMAPREDUCE_API
mapreduce_map(void *context, const mapreduce_json_t *doc, const mapreduce_json_t *meta, mapreduce_map_result_list_t **result)92 mapreduce_error_t mapreduce_map(void *context,
93 const mapreduce_json_t *doc,
94 const mapreduce_json_t *meta,
95 mapreduce_map_result_list_t **result)
96 {
97 mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
98
99 *result = (mapreduce_map_result_list_t *) malloc(sizeof(**result));
100 if (*result == NULL) {
101 return MAPREDUCE_ALLOC_ERROR;
102 }
103
104 int num_funs = ctx->functions->size();
105 size_t sz = sizeof(mapreduce_map_result_t) * num_funs;
106 (*result)->list = (mapreduce_map_result_t *) malloc(sz);
107
108 if ((*result)->list == NULL) {
109 free(*result);
110 *result = NULL;
111 return MAPREDUCE_ALLOC_ERROR;
112 }
113
114 (*result)->length = 0;
115 try {
116 mapDoc(ctx, *doc, *meta, *result);
117 } catch (MapReduceError &e) {
118 mapreduce_free_map_result_list(*result);
119 *result = NULL;
120 return e.getError();
121 } catch (std::bad_alloc &) {
122 mapreduce_free_map_result_list(*result);
123 *result = NULL;
124 return MAPREDUCE_ALLOC_ERROR;
125 }
126
127 assert((*result)->length == num_funs);
128 return MAPREDUCE_SUCCESS;
129 }
130
131
132 LIBMAPREDUCE_API
mapreduce_start_reduce_context(const char *reduce_functions[], int num_functions, void **context, char **error_msg)133 mapreduce_error_t mapreduce_start_reduce_context(const char *reduce_functions[],
134 int num_functions,
135 void **context,
136 char **error_msg)
137 {
138 return start_context(reduce_functions, num_functions, context, error_msg);
139 }
140
141
142 LIBMAPREDUCE_API
mapreduce_reduce_all(void *context, const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, mapreduce_json_list_t **result, char **error_msg)143 mapreduce_error_t mapreduce_reduce_all(void *context,
144 const mapreduce_json_list_t *keys,
145 const mapreduce_json_list_t *values,
146 mapreduce_json_list_t **result,
147 char **error_msg)
148 {
149 mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
150
151 try {
152 json_results_list_t list = runReduce(ctx, *keys, *values);
153 size_t sz = list.size();
154 json_results_list_t::iterator it = list.begin();
155
156 assert(sz == ctx->functions->size());
157
158 *result = (mapreduce_json_list_t *) malloc(sizeof(**result));
159 if (*result == NULL) {
160 for ( ; it != list.end(); ++it) {
161 free((*it).json);
162 }
163 throw std::bad_alloc();
164 }
165
166 (*result)->length = sz;
167 (*result)->values = (mapreduce_json_t *) malloc(sizeof(mapreduce_json_t) * sz);
168 if ((*result)->values == NULL) {
169 free(*result);
170 for ( ; it != list.end(); ++it) {
171 free((*it).json);
172 }
173 throw std::bad_alloc();
174 }
175 for (int i = 0; it != list.end(); ++it, ++i) {
176 (*result)->values[i] = *it;
177 }
178 } catch (MapReduceError &e) {
179 copy_error_msg(e.getMsg(), error_msg);
180 *result = NULL;
181 return e.getError();
182 } catch (std::bad_alloc &) {
183 copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
184 *result = NULL;
185 return MAPREDUCE_ALLOC_ERROR;
186 }
187
188 *error_msg = NULL;
189 return MAPREDUCE_SUCCESS;
190 }
191
192
193 LIBMAPREDUCE_API
mapreduce_reduce(void *context, int reduceFunNum, const mapreduce_json_list_t *keys, const mapreduce_json_list_t *values, mapreduce_json_t **result, char **error_msg)194 mapreduce_error_t mapreduce_reduce(void *context,
195 int reduceFunNum,
196 const mapreduce_json_list_t *keys,
197 const mapreduce_json_list_t *values,
198 mapreduce_json_t **result,
199 char **error_msg)
200 {
201 mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
202
203 try {
204 mapreduce_json_t red = runReduce(ctx, reduceFunNum, *keys, *values);
205
206 *result = (mapreduce_json_t *) malloc(sizeof(**result));
207 if (*result == NULL) {
208 free(red.json);
209 throw std::bad_alloc();
210 }
211 **result = red;
212 } catch (MapReduceError &e) {
213 copy_error_msg(e.getMsg(), error_msg);
214 *result = NULL;
215 return e.getError();
216 } catch (std::bad_alloc &) {
217 copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
218 *result = NULL;
219 return MAPREDUCE_ALLOC_ERROR;
220 }
221
222 *error_msg = NULL;
223 return MAPREDUCE_SUCCESS;
224 }
225
226
227 LIBMAPREDUCE_API
mapreduce_rereduce(void *context, int reduceFunNum, const mapreduce_json_list_t *reductions, mapreduce_json_t **result, char **error_msg)228 mapreduce_error_t mapreduce_rereduce(void *context,
229 int reduceFunNum,
230 const mapreduce_json_list_t *reductions,
231 mapreduce_json_t **result,
232 char **error_msg)
233 {
234 mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
235
236 try {
237 mapreduce_json_t red = runRereduce(ctx, reduceFunNum, *reductions);
238
239 *result = (mapreduce_json_t *) malloc(sizeof(**result));
240 if (*result == NULL) {
241 free(red.json);
242 throw std::bad_alloc();
243 }
244 **result = red;
245 } catch (MapReduceError &e) {
246 copy_error_msg(e.getMsg(), error_msg);
247 *result = NULL;
248 return e.getError();
249 } catch (std::bad_alloc &) {
250 copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
251 *result = NULL;
252 return MAPREDUCE_ALLOC_ERROR;
253 }
254
255 *error_msg = NULL;
256 return MAPREDUCE_SUCCESS;
257 }
258
259
260 LIBMAPREDUCE_API
mapreduce_free_context(void *context)261 void mapreduce_free_context(void *context)
262 {
263 if (context != NULL) {
264 mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
265
266 unregister_ctx(ctx);
267 destroyContext(ctx);
268 delete ctx;
269 }
270 }
271
272
273 LIBMAPREDUCE_API
mapreduce_free_json(mapreduce_json_t *value)274 void mapreduce_free_json(mapreduce_json_t *value)
275 {
276 if (value != NULL) {
277 free(value->json);
278 free(value);
279 }
280 }
281
282
283 LIBMAPREDUCE_API
mapreduce_free_json_list(mapreduce_json_list_t *list)284 void mapreduce_free_json_list(mapreduce_json_list_t *list)
285 {
286 if (list != NULL) {
287 for (int i = 0; i < list->length; ++i) {
288 free(list->values[i].json);
289 }
290 free(list->values);
291 free(list);
292 }
293 }
294
295
296 LIBMAPREDUCE_API
mapreduce_free_map_result_list(mapreduce_map_result_list_t *list)297 void mapreduce_free_map_result_list(mapreduce_map_result_list_t *list)
298 {
299 if (list == NULL) {
300 return;
301 }
302
303 for (int i = 0; i < list->length; ++i) {
304 mapreduce_map_result_t mr = list->list[i];
305
306 switch (mr.error) {
307 case MAPREDUCE_SUCCESS:
308 {
309 mapreduce_kv_list_t kvs = mr.result.kvs;
310
311 for (int j = 0; j < kvs.length; ++j) {
312 mapreduce_kv_t kv = kvs.kvs[j];
313 free(kv.key.json);
314 free(kv.value.json);
315 }
316 free(kvs.kvs);
317 }
318 break;
319 default:
320 free(mr.result.error_msg);
321 break;
322 }
323 }
324
325 free(list->list);
326 free(list);
327 }
328
329
330 LIBMAPREDUCE_API
mapreduce_free_error_msg(char *error_msg)331 void mapreduce_free_error_msg(char *error_msg)
332 {
333 free(error_msg);
334 }
335
336
337 LIBMAPREDUCE_API
mapreduce_set_timeout(unsigned int seconds)338 void mapreduce_set_timeout(unsigned int seconds)
339 {
340 terminator_timeout = seconds;
341 }
342
343
start_context(const char *functions[], int num_functions, void **context, char **error_msg)344 static mapreduce_error_t start_context(const char *functions[],
345 int num_functions,
346 void **context,
347 char **error_msg)
348 {
349 mapreduce_ctx_t *ctx = NULL;
350 mapreduce_error_t ret = MAPREDUCE_SUCCESS;
351
352 try {
353 ctx = new mapreduce_ctx_t();
354 std::list<std::string> functions_list;
355
356 make_function_list(functions, num_functions, functions_list);
357 initContext(ctx, functions_list);
358 } catch (MapReduceError &e) {
359 copy_error_msg(e.getMsg(), error_msg);
360 ret = e.getError();
361 } catch (std::bad_alloc &) {
362 copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
363 ret = MAPREDUCE_ALLOC_ERROR;
364 }
365
366 if (ret == MAPREDUCE_SUCCESS) {
367 register_ctx(ctx);
368 *context = (void *) ctx;
369 *error_msg = NULL;
370 } else {
371 delete ctx;
372 }
373 return ret;
374 }
375
376
make_function_list(const char *sources[], int num_sources, std::list<std::string> &list)377 static void make_function_list(const char *sources[],
378 int num_sources,
379 std::list<std::string> &list)
380 {
381 for (int i = 0; i < num_sources; ++i) {
382 std::string source;
383 size_t len = strlen(sources[i]);
384
385 source.reserve(1 + len + 1);
386 source += '(';
387 source.append(sources[i], len);
388 source += ')';
389
390 list.push_back(source);
391 }
392 }
393
394
copy_error_msg(const std::string &msg, char **to)395 static void copy_error_msg(const std::string &msg, char **to)
396 {
397 if (to != NULL) {
398 size_t len = msg.length();
399
400 *to = (char *) malloc(len + 1);
401 if (*to != NULL) {
402 msg.copy(*to, len);
403 (*to)[len] = '\0';
404 }
405 }
406 }
407
408
register_ctx(mapreduce_ctx_t *ctx)409 static void register_ctx(mapreduce_ctx_t *ctx)
410 {
411 uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
412 registryMutex.lock();
413
414 if (!terminator_thread_created) {
415 int ret = cb_create_thread(&terminator_thread, terminator_loop, NULL, 1);
416 if (ret != 0) {
417 std::cerr << "Error creating terminator thread: " << ret << std::endl;
418 exit(1);
419 }
420 terminator_thread_created = true;
421 }
422
423 ctx_registry[key] = ctx;
424 registryMutex.unlock();
425 }
426
427
unregister_ctx(mapreduce_ctx_t *ctx)428 static void unregister_ctx(mapreduce_ctx_t *ctx)
429 {
430 uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
431
432 registryMutex.lock();
433 ctx_registry.erase(key);
434 registryMutex.unlock();
435 }
436
437
terminator_loop(void *)438 static void terminator_loop(void *)
439 {
440 std::map<uintptr_t, mapreduce_ctx_t *>::iterator it;
441 time_t now;
442
443 while (true) {
444 registryMutex.lock();
445 now = time(NULL);
446 for (it = ctx_registry.begin(); it != ctx_registry.end(); ++it) {
447 mapreduce_ctx_t *ctx = (*it).second;
448
449 if (ctx->taskStartTime >= 0) {
450 if (ctx->taskStartTime + terminator_timeout < now) {
451 terminateTask(ctx);
452 }
453 }
454 }
455
456 registryMutex.unlock();
457 doSleep(terminator_timeout);
458 }
459 }
460