1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/**
3 * @todo "chain" the loggers - I should use the next logger instead of stderr
4 * @todo don't format into a temporary buffer, but directly into the
5 *       destination buffer
6 */
7#include "config.h"
8#include <stdarg.h>
9#include <stdio.h>
10#include <errno.h>
11#include <string.h>
12#include <strings.h>
13#include <stdlib.h>
14#include <time.h>
15
16#ifdef WIN32
17#include <io.h>
18#define F_OK 00
19#endif
20
21#include <platform/platform.h>
22
23#ifdef WIN32_H
24#undef close
25#endif
26
27#include <memcached/extension.h>
28#include <memcached/engine.h>
29
30#include "extensions/protocol_extension.h"
31
32/* Pointer to the server API */
33static SERVER_HANDLE_V1 *sapi;
34
35/* The current log level set by the user. We should ignore all log requests
36 * with a finer log level than this. We've registered a listener to update
37 * the log level when the user change it
38 */
39static EXTENSION_LOG_LEVEL current_log_level = EXTENSION_LOG_WARNING;
40
41/* All messages above the current level shall be sent to stderr immediately */
42static EXTENSION_LOG_LEVEL output_level = EXTENSION_LOG_WARNING;
43
44/* To avoid the logfile to grow forever, we'll start logging to another
45 * file when we've added a certain amount of data to the logfile. You may
46 * tune this size by using the "cyclesize" configuration parameter. Use 100MB
47 * as the default (makes it a reasonable size to work with in your favorite
48 * editor ;-))
49 */
50static size_t cyclesz = 100 * 1024 * 1024;
51
52/*
53 * We're using two buffers for logging. We'll be inserting data into one,
54 * while we're working on writing the other one to disk. Given that the disk
55 * is way slower than our CPU, we might end up in a situation that we'll be
56 * blocking the frontend threads if you're logging too much.
57 */
58static struct logbuffer {
59    /* Pointer to beginning of the datasegment of this buffer */
60    char *data;
61    /* The current offset of the buffer */
62    size_t offset;
63} buffers[2];
64
65/* The index in the buffers where we're currently inserting more data */
66static int currbuffer;
67
68/* If we should try to pretty-print the severity or not */
69static bool prettyprint = false;
70
71/* Are we running in a unit test (don't print warnings to stderr) */
72static bool unit_test = false;
73
74/* The size of the buffers (this may be tuned by the buffersize configuration
75 * parameter */
76static size_t buffersz = 2048 * 1024;
77
78/* The sleeptime between each forced flush of the buffer */
79static size_t sleeptime = 60;
80
81/* To avoid race condition we're protecting our shared resources with a
82 * single mutex. */
83static cb_mutex_t mutex;
84
85/* The thread performing the disk IO will be waiting for the input buffers
86 * to be filled by sleeping on the following condition variable. The
87 * frontend threads will notify the condition variable when the buffer is
88 * > 75% full
89 */
90static cb_cond_t cond;
91
92/* In the "worst case scenarios" we're logging so much that the disk thread
93 * can't keep up with the the frontend threads. In these rare situations
94 * the frontend threads will block and wait for the flusher to free up log
95 * space
96 */
97static cb_cond_t space_cond;
98
99typedef void * HANDLE;
100
101static HANDLE stdio_open(const char *path, const char *mode) {
102    HANDLE ret = fopen(path, mode);
103    if (ret) {
104        setbuf(ret, NULL);
105    }
106    return ret;
107}
108
109static void stdio_close(HANDLE handle) {
110    (void)fclose(handle);
111}
112
113static void stdio_flush(HANDLE handle) {
114    fflush(handle);
115}
116
117static ssize_t stdio_write(HANDLE handle, const void *ptr, size_t nbytes) {
118    return (ssize_t)fwrite(ptr, 1, nbytes, handle);
119}
120
121struct io_ops {
122    HANDLE (*open)(const char *path, const char *mode);
123    void (*close)(HANDLE handle);
124    void (*flush)(HANDLE handle);
125    ssize_t (*write)(HANDLE handle, const void *ptr, size_t nbytes);
126} iops;
127
128static const char *extension = "txt";
129
130static void add_log_entry(const char *msg, size_t size)
131{
132    cb_mutex_enter(&mutex);
133    /* wait until there is room in the current buffer */
134    while ((buffers[currbuffer].offset + size) >= buffersz) {
135        if (!unit_test) {
136            fprintf(stderr, "WARNING: waiting for log space to be available\n");
137        }
138        cb_cond_wait(&space_cond, &mutex);
139    }
140
141    /* We could have performed the memcpy outside the locked region,
142     * but then we would need to handle the situation where we're
143     * flipping the ownership of the buffer (otherwise we could be
144     * writing rubbish to the file) */
145    memcpy(buffers[currbuffer].data + buffers[currbuffer].offset,
146           msg, size);
147    buffers[currbuffer].offset += size;
148    if (buffers[currbuffer].offset > (buffersz * 0.75)) {
149        /* we're getting full.. time get the logger to start doing stuff! */
150        cb_cond_signal(&cond);
151    }
152    cb_mutex_exit(&mutex);
153}
154
155static const char *severity2string(EXTENSION_LOG_LEVEL sev) {
156    switch (sev) {
157    case EXTENSION_LOG_WARNING:
158        return "WARNING";
159    case EXTENSION_LOG_INFO:
160        return "INFO";
161    case EXTENSION_LOG_DEBUG:
162        return "DEBUG";
163    case EXTENSION_LOG_DETAIL:
164        return "DETAIL";
165    default:
166        return "????";
167    }
168}
169
170static void logger_log(EXTENSION_LOG_LEVEL severity,
171                       const void* client_cookie,
172                       const char *fmt, ...)
173{
174    (void)client_cookie;
175    if (severity >= current_log_level || severity >= output_level) {
176        /* @fixme: We shouldn't have to go through this temporary
177         *         buffer, but rather insert the data directly into
178         *         the destination buffer
179         */
180        char buffer[2048];
181        size_t avail = sizeof(buffer) - 1;
182        int prefixlen = 0;
183        va_list ap;
184        size_t len;
185        struct timeval now;
186
187        if (cb_get_timeofday(&now) == 0) {
188            struct tm tval;
189            time_t nsec = (time_t)now.tv_sec;
190            char str[40];
191            int error;
192
193#ifdef WIN32
194            localtime_s(&tval, &nsec);
195            error = (asctime_s(str, sizeof(str), &tval) != 0);
196#else
197            localtime_r(&nsec, &tval);
198            error = (asctime_r(&tval, str) == NULL);
199#endif
200
201            if (error) {
202                prefixlen = snprintf(buffer, avail, "%u.%06u",
203                                     (unsigned int)now.tv_sec,
204                                     (unsigned int)now.tv_usec);
205            } else {
206                const char *tz;
207#ifdef HAVE_TM_ZONE
208                tz = tval.tm_zone;
209#else
210                tz = tzname[tval.tm_isdst ? 1 : 0];
211#endif
212                /* trim off ' YYYY\n' */
213                str[strlen(str) - 6] = '\0';
214                prefixlen = snprintf(buffer, avail, "%s.%06u %s",
215                                     str, (unsigned int)now.tv_usec,
216                                     tz);
217            }
218        } else {
219            fprintf(stderr, "gettimeofday failed: %s\n", strerror(errno));
220            return;
221        }
222
223        if (prettyprint) {
224            prefixlen += snprintf(buffer+prefixlen, avail-prefixlen,
225                                  " %s: ", severity2string(severity));
226        } else {
227            prefixlen += snprintf(buffer+prefixlen, avail-prefixlen,
228                                  " %u: ", (unsigned int)severity);
229        }
230
231        avail -= prefixlen;
232        va_start(ap, fmt);
233        len = vsnprintf(buffer + prefixlen, avail, fmt, ap);
234        va_end(ap);
235
236        if (len < avail) {
237            len += prefixlen;
238            if (buffer[len - 1] != '\n') {
239                buffer[len++] = '\n';
240                buffer[len] ='\0';
241            }
242
243            if (severity >= output_level) {
244                fputs(buffer, stderr);
245                fflush(stderr);
246            }
247
248            if (severity >= current_log_level) {
249                add_log_entry(buffer, len);
250            }
251        } else {
252            fprintf(stderr, "Log message dropped... too big\n");
253        }
254    }
255}
256
257static HANDLE open_logfile(const char *fnm) {
258    static unsigned int next_id = 0;
259    char fname[1024];
260    HANDLE ret;
261    do {
262        sprintf(fname, "%s.%d.%s", fnm, next_id++, extension);
263    } while (access(fname, F_OK) == 0);
264    ret = iops.open(fname, "wb");
265    if (!ret) {
266        fprintf(stderr, "Failed to open memcached log file\n");
267    }
268    return ret;
269}
270
271static void close_logfile(HANDLE fp) {
272    if (fp) {
273        iops.close(fp);
274    }
275}
276
277static HANDLE reopen_logfile(HANDLE old, const char *fnm) {
278    close_logfile(old);
279    return open_logfile(fnm);
280}
281
282static size_t flush_pending_io(HANDLE file, struct logbuffer *lb) {
283    size_t ret = 0;
284    if (lb->offset > 0) {
285        char *ptr = lb->data;
286        size_t towrite = ret = lb->offset;
287
288        while (towrite > 0) {
289            int nw = iops.write(file, ptr, towrite);
290            if (nw > 0) {
291                ptr += nw;
292                towrite -= nw;
293            }
294        }
295        lb->offset = 0;
296        iops.flush(file);
297    }
298
299    return ret;
300}
301
302static volatile int run = 1;
303static cb_thread_t tid;
304
305static void logger_thead_main(void* arg)
306{
307    size_t currsize = 0;
308    HANDLE fp = open_logfile(arg);
309
310    struct timeval tp;
311    cb_get_timeofday(&tp);
312    time_t next = (time_t)tp.tv_sec;
313
314    cb_mutex_enter(&mutex);
315    while (run) {
316        cb_get_timeofday(&tp);
317
318        while ((time_t)tp.tv_sec >= next  ||
319               buffers[currbuffer].offset > (buffersz * 0.75)) {
320            int this  = currbuffer;
321            next = (time_t)tp.tv_sec + 1;
322            currbuffer = (currbuffer == 0) ? 1 : 0;
323            /* Let people who is blocked for space continue */
324            cb_cond_broadcast(&space_cond);
325
326            /* Perform file IO without the lock */
327            cb_mutex_exit(&mutex);
328
329            currsize += flush_pending_io(fp, buffers + this);
330            if (currsize > cyclesz) {
331                fp = reopen_logfile(fp, arg);
332                currsize = 0;
333            }
334            cb_mutex_enter(&mutex);
335        }
336
337        cb_get_timeofday(&tp);
338        next = (time_t)tp.tv_sec + (time_t)sleeptime;
339        if (unit_test) {
340            cb_cond_timedwait(&cond, &mutex, 100);
341        } else {
342            cb_cond_timedwait(&cond, &mutex, 1000 * sleeptime);
343        }
344    }
345
346    if (fp) {
347        while (buffers[currbuffer].offset) {
348            int this  = currbuffer;
349            currbuffer = (currbuffer == 0) ? 1 : 0;
350            flush_pending_io(fp, buffers + this);
351        }
352        close_logfile(fp);
353    }
354
355    cb_mutex_exit(&mutex);
356    free(arg);
357    free(buffers[0].data);
358    free(buffers[1].data);
359}
360
361static void exit_handler(void) {
362    /* Unfortunately it looks like the C runtime from MSVC "kills" the
363     * threads before the "atexit" handler is run, causing the program
364     * to halt in one of these steps depending on the state of the
365     * variables. Just disable the code for now.
366     */
367#ifndef WIN32
368    cb_mutex_enter(&mutex);
369    run = 0;
370    cb_cond_signal(&cond);
371    cb_mutex_exit(&mutex);
372
373    cb_join_thread(tid);
374#endif
375}
376
377static const char *get_name(void) {
378    return "file logger";
379}
380
381static EXTENSION_LOGGER_DESCRIPTOR descriptor;
382
383static void on_log_level(const void *cookie, ENGINE_EVENT_TYPE type,
384                         const void *event_data, const void *cb_data) {
385    if (sapi != NULL) {
386        current_log_level = sapi->log->get_level();
387    }
388}
389
390static void logger_shutdown(void)  {
391    int running;
392    cb_mutex_enter(&mutex);
393    running = run;
394    run = 0;
395    cb_cond_signal(&cond);
396    cb_mutex_exit(&mutex);
397
398    if (running) {
399        cb_join_thread(tid);
400    }
401}
402
403MEMCACHED_PUBLIC_API
404EXTENSION_ERROR_CODE memcached_extensions_initialize(const char *config,
405                                                     GET_SERVER_API get_server_api)
406{
407    char *fname = NULL;
408
409    cb_mutex_initialize(&mutex);
410    cb_cond_initialize(&cond);
411    cb_cond_initialize(&space_cond);
412
413    iops.open = stdio_open;
414    iops.close = stdio_close;
415    iops.flush = stdio_flush;
416    iops.write = stdio_write;
417    descriptor.get_name = get_name;
418    descriptor.log = logger_log;
419    descriptor.shutdown = logger_shutdown;
420
421#ifdef HAVE_TM_ZONE
422    tzset();
423#endif
424
425    sapi = get_server_api();
426    if (sapi == NULL) {
427        return EXTENSION_FATAL;
428    }
429
430    if (config != NULL) {
431        char *loglevel = NULL;
432        struct config_item items[8];
433        int ii = 0;
434        memset(&items, 0, sizeof(items));
435
436        items[ii].key = "filename";
437        items[ii].datatype = DT_STRING;
438        items[ii].value.dt_string = &fname;
439        ++ii;
440
441        items[ii].key = "buffersize";
442        items[ii].datatype = DT_SIZE;
443        items[ii].value.dt_size = &buffersz;
444        ++ii;
445
446        items[ii].key = "cyclesize";
447        items[ii].datatype = DT_SIZE;
448        items[ii].value.dt_size = &cyclesz;
449        ++ii;
450
451        items[ii].key = "loglevel";
452        items[ii].datatype = DT_STRING;
453        items[ii].value.dt_string = &loglevel;
454        ++ii;
455
456        items[ii].key = "prettyprint";
457        items[ii].datatype = DT_BOOL;
458        items[ii].value.dt_bool = &prettyprint;
459        ++ii;
460
461        items[ii].key = "sleeptime";
462        items[ii].datatype = DT_SIZE;
463        items[ii].value.dt_size = &sleeptime;
464        ++ii;
465
466        items[ii].key = "unit_test";
467        items[ii].datatype = DT_BOOL;
468        items[ii].value.dt_bool = &unit_test;
469        ++ii;
470
471        items[ii].key = NULL;
472        ++ii;
473        cb_assert(ii == 8);
474
475        if (sapi->core->parse_config(config, items, stderr) != ENGINE_SUCCESS) {
476            return EXTENSION_FATAL;
477        }
478
479        if (loglevel != NULL) {
480            if (strcasecmp("warning", loglevel) == 0) {
481                output_level = EXTENSION_LOG_WARNING;
482            } else if (strcasecmp("info", loglevel) == 0) {
483                output_level = EXTENSION_LOG_INFO;
484            } else if (strcasecmp("debug", loglevel) == 0) {
485                output_level = EXTENSION_LOG_DEBUG;
486            } else if (strcasecmp("detail", loglevel) == 0) {
487                output_level = EXTENSION_LOG_DETAIL;
488            } else {
489                fprintf(stderr, "Unknown loglevel: %s. Use warning/info/debug/detail\n",
490                        loglevel);
491                return EXTENSION_FATAL;
492            }
493        }
494        free(loglevel);
495    }
496
497    if (fname == NULL) {
498        fname = strdup("memcached");
499    }
500
501    buffers[0].data = malloc(buffersz);
502    buffers[1].data = malloc(buffersz);
503
504    if (buffers[0].data == NULL || buffers[1].data == NULL || fname == NULL) {
505        fprintf(stderr, "Failed to allocate memory for the logger\n");
506        free(fname);
507        free(buffers[0].data);
508        free(buffers[1].data);
509        return EXTENSION_FATAL;
510    }
511
512    if (cb_create_thread(&tid, logger_thead_main, fname, 0) < 0) {
513        fprintf(stderr, "Failed to initialize the logger\n");
514        free(fname);
515        free(buffers[0].data);
516        free(buffers[1].data);
517        return EXTENSION_FATAL;
518    }
519    atexit(exit_handler);
520
521    current_log_level = sapi->log->get_level();
522    if (!sapi->extension->register_extension(EXTENSION_LOGGER, &descriptor)) {
523        return EXTENSION_FATAL;
524    }
525    sapi->callback->register_callback(NULL, ON_LOG_LEVEL, on_log_level, NULL);
526
527    return EXTENSION_SUCCESS;
528}
529