1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /**
3  * @todo "chain" the loggers - I should use the next logger instead of stderr
4  * @todo don't format into a temporary buffer, but directly into the
5  *       destination buffer
6  */
7 #include "config.h"
8 #include <stdarg.h>
9 #include <stdio.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <strings.h>
13 #include <stdlib.h>
14 #include <time.h>
15 
16 #ifdef WIN32
17 #include <io.h>
18 #define F_OK 00
19 #endif
20 
21 #include <platform/platform.h>
22 
23 #ifdef WIN32_H
24 #undef close
25 #endif
26 
27 #include <memcached/extension.h>
28 #include <memcached/engine.h>
29 
30 #include "extensions/protocol_extension.h"
31 
32 /* Pointer to the server API */
33 static SERVER_HANDLE_V1 *sapi;
34 
35 /* The current log level set by the user. We should ignore all log requests
36  * with a finer log level than this. We've registered a listener to update
37  * the log level when the user change it
38  */
39 static EXTENSION_LOG_LEVEL current_log_level = EXTENSION_LOG_WARNING;
40 
41 /* All messages above the current level shall be sent to stderr immediately */
42 static EXTENSION_LOG_LEVEL output_level = EXTENSION_LOG_WARNING;
43 
44 /* To avoid the logfile to grow forever, we'll start logging to another
45  * file when we've added a certain amount of data to the logfile. You may
46  * tune this size by using the "cyclesize" configuration parameter. Use 100MB
47  * as the default (makes it a reasonable size to work with in your favorite
48  * editor ;-))
49  */
50 static size_t cyclesz = 100 * 1024 * 1024;
51 
52 /*
53  * We're using two buffers for logging. We'll be inserting data into one,
54  * while we're working on writing the other one to disk. Given that the disk
55  * is way slower than our CPU, we might end up in a situation that we'll be
56  * blocking the frontend threads if you're logging too much.
57  */
58 static struct logbuffer {
59     /* Pointer to beginning of the datasegment of this buffer */
60     char *data;
61     /* The current offset of the buffer */
62     size_t offset;
63 } buffers[2];
64 
65 /* The index in the buffers where we're currently inserting more data */
66 static int currbuffer;
67 
68 /* If we should try to pretty-print the severity or not */
69 static bool prettyprint = false;
70 
71 /* Are we running in a unit test (don't print warnings to stderr) */
72 static bool unit_test = false;
73 
74 /* The size of the buffers (this may be tuned by the buffersize configuration
75  * parameter */
76 static size_t buffersz = 2048 * 1024;
77 
78 /* The sleeptime between each forced flush of the buffer */
79 static size_t sleeptime = 60;
80 
81 /* To avoid race condition we're protecting our shared resources with a
82  * single mutex. */
83 static cb_mutex_t mutex;
84 
85 /* The thread performing the disk IO will be waiting for the input buffers
86  * to be filled by sleeping on the following condition variable. The
87  * frontend threads will notify the condition variable when the buffer is
88  * > 75% full
89  */
90 static cb_cond_t cond;
91 
92 /* In the "worst case scenarios" we're logging so much that the disk thread
93  * can't keep up with the the frontend threads. In these rare situations
94  * the frontend threads will block and wait for the flusher to free up log
95  * space
96  */
97 static cb_cond_t space_cond;
98 
99 typedef void * HANDLE;
100 
stdio_open(const char *path, const char *mode)101 static HANDLE stdio_open(const char *path, const char *mode) {
102     HANDLE ret = fopen(path, mode);
103     if (ret) {
104         setbuf(ret, NULL);
105     }
106     return ret;
107 }
108 
stdio_close(HANDLE handle)109 static void stdio_close(HANDLE handle) {
110     (void)fclose(handle);
111 }
112 
stdio_flush(HANDLE handle)113 static void stdio_flush(HANDLE handle) {
114     fflush(handle);
115 }
116 
stdio_write(HANDLE handle, const void *ptr, size_t nbytes)117 static ssize_t stdio_write(HANDLE handle, const void *ptr, size_t nbytes) {
118     return (ssize_t)fwrite(ptr, 1, nbytes, handle);
119 }
120 
121 struct io_ops {
122     HANDLE (*open)(const char *path, const char *mode);
123     void (*close)(HANDLE handle);
124     void (*flush)(HANDLE handle);
125     ssize_t (*write)(HANDLE handle, const void *ptr, size_t nbytes);
126 } iops;
127 
128 static const char *extension = "txt";
129 
add_log_entry(const char *msg, size_t size)130 static void add_log_entry(const char *msg, size_t size)
131 {
132     cb_mutex_enter(&mutex);
133     /* wait until there is room in the current buffer */
134     while ((buffers[currbuffer].offset + size) >= buffersz) {
135         if (!unit_test) {
136             fprintf(stderr, "WARNING: waiting for log space to be available\n");
137         }
138         cb_cond_wait(&space_cond, &mutex);
139     }
140 
141     /* We could have performed the memcpy outside the locked region,
142      * but then we would need to handle the situation where we're
143      * flipping the ownership of the buffer (otherwise we could be
144      * writing rubbish to the file) */
145     memcpy(buffers[currbuffer].data + buffers[currbuffer].offset,
146            msg, size);
147     buffers[currbuffer].offset += size;
148     if (buffers[currbuffer].offset > (buffersz * 0.75)) {
149         /* we're getting full.. time get the logger to start doing stuff! */
150         cb_cond_signal(&cond);
151     }
152     cb_mutex_exit(&mutex);
153 }
154 
severity2string(EXTENSION_LOG_LEVEL sev)155 static const char *severity2string(EXTENSION_LOG_LEVEL sev) {
156     switch (sev) {
157     case EXTENSION_LOG_WARNING:
158         return "WARNING";
159     case EXTENSION_LOG_INFO:
160         return "INFO";
161     case EXTENSION_LOG_DEBUG:
162         return "DEBUG";
163     case EXTENSION_LOG_DETAIL:
164         return "DETAIL";
165     default:
166         return "????";
167     }
168 }
169 
logger_log(EXTENSION_LOG_LEVEL severity, const void* client_cookie, const char *fmt, ...)170 static void logger_log(EXTENSION_LOG_LEVEL severity,
171                        const void* client_cookie,
172                        const char *fmt, ...)
173 {
174     (void)client_cookie;
175     if (severity >= current_log_level || severity >= output_level) {
176         /* @fixme: We shouldn't have to go through this temporary
177          *         buffer, but rather insert the data directly into
178          *         the destination buffer
179          */
180         char buffer[2048];
181         size_t avail = sizeof(buffer) - 1;
182         int prefixlen = 0;
183         va_list ap;
184         size_t len;
185         struct timeval now;
186 
187         if (cb_get_timeofday(&now) == 0) {
188             struct tm tval;
189             time_t nsec = (time_t)now.tv_sec;
190             char str[40];
191             int error;
192 
193 #ifdef WIN32
194             localtime_s(&tval, &nsec);
195             error = (asctime_s(str, sizeof(str), &tval) != 0);
196 #else
197             localtime_r(&nsec, &tval);
198             error = (asctime_r(&tval, str) == NULL);
199 #endif
200 
201             if (error) {
202                 prefixlen = snprintf(buffer, avail, "%u.%06u",
203                                      (unsigned int)now.tv_sec,
204                                      (unsigned int)now.tv_usec);
205             } else {
206                 const char *tz;
207 #ifdef HAVE_TM_ZONE
208                 tz = tval.tm_zone;
209 #else
210                 tz = tzname[tval.tm_isdst ? 1 : 0];
211 #endif
212                 /* trim off ' YYYY\n' */
213                 str[strlen(str) - 6] = '\0';
214                 prefixlen = snprintf(buffer, avail, "%s.%06u %s",
215                                      str, (unsigned int)now.tv_usec,
216                                      tz);
217             }
218         } else {
219             fprintf(stderr, "gettimeofday failed: %s\n", strerror(errno));
220             return;
221         }
222 
223         if (prettyprint) {
224             prefixlen += snprintf(buffer+prefixlen, avail-prefixlen,
225                                   " %s: ", severity2string(severity));
226         } else {
227             prefixlen += snprintf(buffer+prefixlen, avail-prefixlen,
228                                   " %u: ", (unsigned int)severity);
229         }
230 
231         avail -= prefixlen;
232         va_start(ap, fmt);
233         len = vsnprintf(buffer + prefixlen, avail, fmt, ap);
234         va_end(ap);
235 
236         if (len < avail) {
237             len += prefixlen;
238             if (buffer[len - 1] != '\n') {
239                 buffer[len++] = '\n';
240                 buffer[len] ='\0';
241             }
242 
243             if (severity >= output_level) {
244                 fputs(buffer, stderr);
245                 fflush(stderr);
246             }
247 
248             if (severity >= current_log_level) {
249                 add_log_entry(buffer, len);
250             }
251         } else {
252             fprintf(stderr, "Log message dropped... too big\n");
253         }
254     }
255 }
256 
open_logfile(const char *fnm)257 static HANDLE open_logfile(const char *fnm) {
258     static unsigned int next_id = 0;
259     char fname[1024];
260     HANDLE ret;
261     do {
262         sprintf(fname, "%s.%d.%s", fnm, next_id++, extension);
263     } while (access(fname, F_OK) == 0);
264     ret = iops.open(fname, "wb");
265     if (!ret) {
266         fprintf(stderr, "Failed to open memcached log file\n");
267     }
268     return ret;
269 }
270 
close_logfile(HANDLE fp)271 static void close_logfile(HANDLE fp) {
272     if (fp) {
273         iops.close(fp);
274     }
275 }
276 
reopen_logfile(HANDLE old, const char *fnm)277 static HANDLE reopen_logfile(HANDLE old, const char *fnm) {
278     close_logfile(old);
279     return open_logfile(fnm);
280 }
281 
flush_pending_io(HANDLE file, struct logbuffer *lb)282 static size_t flush_pending_io(HANDLE file, struct logbuffer *lb) {
283     size_t ret = 0;
284     if (lb->offset > 0) {
285         char *ptr = lb->data;
286         size_t towrite = ret = lb->offset;
287 
288         while (towrite > 0) {
289             int nw = iops.write(file, ptr, towrite);
290             if (nw > 0) {
291                 ptr += nw;
292                 towrite -= nw;
293             }
294         }
295         lb->offset = 0;
296         iops.flush(file);
297     }
298 
299     return ret;
300 }
301 
302 static volatile int run = 1;
303 static cb_thread_t tid;
304 
logger_thead_main(void* arg)305 static void logger_thead_main(void* arg)
306 {
307     size_t currsize = 0;
308     HANDLE fp = open_logfile(arg);
309 
310     struct timeval tp;
311     cb_get_timeofday(&tp);
312     time_t next = (time_t)tp.tv_sec;
313 
314     cb_mutex_enter(&mutex);
315     while (run) {
316         cb_get_timeofday(&tp);
317 
318         while ((time_t)tp.tv_sec >= next  ||
319                buffers[currbuffer].offset > (buffersz * 0.75)) {
320             int this  = currbuffer;
321             next = (time_t)tp.tv_sec + 1;
322             currbuffer = (currbuffer == 0) ? 1 : 0;
323             /* Let people who is blocked for space continue */
324             cb_cond_broadcast(&space_cond);
325 
326             /* Perform file IO without the lock */
327             cb_mutex_exit(&mutex);
328 
329             currsize += flush_pending_io(fp, buffers + this);
330             if (currsize > cyclesz) {
331                 fp = reopen_logfile(fp, arg);
332                 currsize = 0;
333             }
334             cb_mutex_enter(&mutex);
335         }
336 
337         cb_get_timeofday(&tp);
338         next = (time_t)tp.tv_sec + (time_t)sleeptime;
339         if (unit_test) {
340             cb_cond_timedwait(&cond, &mutex, 100);
341         } else {
342             cb_cond_timedwait(&cond, &mutex, 1000 * sleeptime);
343         }
344     }
345 
346     if (fp) {
347         while (buffers[currbuffer].offset) {
348             int this  = currbuffer;
349             currbuffer = (currbuffer == 0) ? 1 : 0;
350             flush_pending_io(fp, buffers + this);
351         }
352         close_logfile(fp);
353     }
354 
355     cb_mutex_exit(&mutex);
356     free(arg);
357     free(buffers[0].data);
358     free(buffers[1].data);
359 }
360 
exit_handler(void)361 static void exit_handler(void) {
362     /* Unfortunately it looks like the C runtime from MSVC "kills" the
363      * threads before the "atexit" handler is run, causing the program
364      * to halt in one of these steps depending on the state of the
365      * variables. Just disable the code for now.
366      */
367 #ifndef WIN32
368     cb_mutex_enter(&mutex);
369     run = 0;
370     cb_cond_signal(&cond);
371     cb_mutex_exit(&mutex);
372 
373     cb_join_thread(tid);
374 #endif
375 }
376 
get_name(void)377 static const char *get_name(void) {
378     return "file logger";
379 }
380 
381 static EXTENSION_LOGGER_DESCRIPTOR descriptor;
382 
on_log_level(const void *cookie, ENGINE_EVENT_TYPE type, const void *event_data, const void *cb_data)383 static void on_log_level(const void *cookie, ENGINE_EVENT_TYPE type,
384                          const void *event_data, const void *cb_data) {
385     if (sapi != NULL) {
386         current_log_level = sapi->log->get_level();
387     }
388 }
389 
logger_shutdown(void)390 static void logger_shutdown(void)  {
391     int running;
392     cb_mutex_enter(&mutex);
393     running = run;
394     run = 0;
395     cb_cond_signal(&cond);
396     cb_mutex_exit(&mutex);
397 
398     if (running) {
399         cb_join_thread(tid);
400     }
401 }
402 
403 MEMCACHED_PUBLIC_API
memcached_extensions_initialize(const char *config, GET_SERVER_API get_server_api)404 EXTENSION_ERROR_CODE memcached_extensions_initialize(const char *config,
405                                                      GET_SERVER_API get_server_api)
406 {
407     char *fname = NULL;
408 
409     cb_mutex_initialize(&mutex);
410     cb_cond_initialize(&cond);
411     cb_cond_initialize(&space_cond);
412 
413     iops.open = stdio_open;
414     iops.close = stdio_close;
415     iops.flush = stdio_flush;
416     iops.write = stdio_write;
417     descriptor.get_name = get_name;
418     descriptor.log = logger_log;
419     descriptor.shutdown = logger_shutdown;
420 
421 #ifdef HAVE_TM_ZONE
422     tzset();
423 #endif
424 
425     sapi = get_server_api();
426     if (sapi == NULL) {
427         return EXTENSION_FATAL;
428     }
429 
430     if (config != NULL) {
431         char *loglevel = NULL;
432         struct config_item items[8];
433         int ii = 0;
434         memset(&items, 0, sizeof(items));
435 
436         items[ii].key = "filename";
437         items[ii].datatype = DT_STRING;
438         items[ii].value.dt_string = &fname;
439         ++ii;
440 
441         items[ii].key = "buffersize";
442         items[ii].datatype = DT_SIZE;
443         items[ii].value.dt_size = &buffersz;
444         ++ii;
445 
446         items[ii].key = "cyclesize";
447         items[ii].datatype = DT_SIZE;
448         items[ii].value.dt_size = &cyclesz;
449         ++ii;
450 
451         items[ii].key = "loglevel";
452         items[ii].datatype = DT_STRING;
453         items[ii].value.dt_string = &loglevel;
454         ++ii;
455 
456         items[ii].key = "prettyprint";
457         items[ii].datatype = DT_BOOL;
458         items[ii].value.dt_bool = &prettyprint;
459         ++ii;
460 
461         items[ii].key = "sleeptime";
462         items[ii].datatype = DT_SIZE;
463         items[ii].value.dt_size = &sleeptime;
464         ++ii;
465 
466         items[ii].key = "unit_test";
467         items[ii].datatype = DT_BOOL;
468         items[ii].value.dt_bool = &unit_test;
469         ++ii;
470 
471         items[ii].key = NULL;
472         ++ii;
473         cb_assert(ii == 8);
474 
475         if (sapi->core->parse_config(config, items, stderr) != ENGINE_SUCCESS) {
476             return EXTENSION_FATAL;
477         }
478 
479         if (loglevel != NULL) {
480             if (strcasecmp("warning", loglevel) == 0) {
481                 output_level = EXTENSION_LOG_WARNING;
482             } else if (strcasecmp("info", loglevel) == 0) {
483                 output_level = EXTENSION_LOG_INFO;
484             } else if (strcasecmp("debug", loglevel) == 0) {
485                 output_level = EXTENSION_LOG_DEBUG;
486             } else if (strcasecmp("detail", loglevel) == 0) {
487                 output_level = EXTENSION_LOG_DETAIL;
488             } else {
489                 fprintf(stderr, "Unknown loglevel: %s. Use warning/info/debug/detail\n",
490                         loglevel);
491                 return EXTENSION_FATAL;
492             }
493         }
494         free(loglevel);
495     }
496 
497     if (fname == NULL) {
498         fname = strdup("memcached");
499     }
500 
501     buffers[0].data = malloc(buffersz);
502     buffers[1].data = malloc(buffersz);
503 
504     if (buffers[0].data == NULL || buffers[1].data == NULL || fname == NULL) {
505         fprintf(stderr, "Failed to allocate memory for the logger\n");
506         free(fname);
507         free(buffers[0].data);
508         free(buffers[1].data);
509         return EXTENSION_FATAL;
510     }
511 
512     if (cb_create_thread(&tid, logger_thead_main, fname, 0) < 0) {
513         fprintf(stderr, "Failed to initialize the logger\n");
514         free(fname);
515         free(buffers[0].data);
516         free(buffers[1].data);
517         return EXTENSION_FATAL;
518     }
519     atexit(exit_handler);
520 
521     current_log_level = sapi->log->get_level();
522     if (!sapi->extension->register_extension(EXTENSION_LOGGER, &descriptor)) {
523         return EXTENSION_FATAL;
524     }
525     sapi->callback->register_callback(NULL, ON_LOG_LEVEL, on_log_level, NULL);
526 
527     return EXTENSION_SUCCESS;
528 }
529