1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include <fcntl.h>
4#include <errno.h>
5#include <stdlib.h>
6#include <stdio.h>
7#include <string.h>
8#include <time.h>
9#include <inttypes.h>
10
11#include "default_engine.h"
12
13/* Forward Declarations */
14static void item_link_q(struct default_engine *engine, hash_item *it);
15static void item_unlink_q(struct default_engine *engine, hash_item *it);
16static hash_item *do_item_alloc(struct default_engine *engine,
17                                const void *key, const size_t nkey,
18                                const int flags, const rel_time_t exptime,
19                                const int nbytes,
20                                const void *cookie,
21                                uint8_t datatype);
22static hash_item *do_item_get(struct default_engine *engine,
23                              const char *key, const size_t nkey);
24static int do_item_link(struct default_engine *engine, hash_item *it);
25static void do_item_unlink(struct default_engine *engine, hash_item *it);
26static void do_item_release(struct default_engine *engine, hash_item *it);
27static void do_item_update(struct default_engine *engine, hash_item *it);
28static int do_item_replace(struct default_engine *engine,
29                            hash_item *it, hash_item *new_it);
30static void item_free(struct default_engine *engine, hash_item *it);
31
32/*
33 * We only reposition items in the LRU queue if they haven't been repositioned
34 * in this many seconds. That saves us from churning on frequently-accessed
35 * items.
36 */
37#define ITEM_UPDATE_INTERVAL 60
38/*
39 * To avoid scanning through the complete cache in some circumstances we'll
40 * just give up and return an error after inspecting a fixed number of objects.
41 */
42static const int search_items = 50;
43
44void item_stats_reset(struct default_engine *engine) {
45    cb_mutex_enter(&engine->cache_lock);
46    memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47    cb_mutex_exit(&engine->cache_lock);
48}
49
50
51/* warning: don't use these macros with a function, as it evals its arg twice */
52static size_t ITEM_ntotal(struct default_engine *engine,
53                          const hash_item *item) {
54    size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55    if (engine->config.use_cas) {
56        ret += sizeof(uint64_t);
57    }
58
59    return ret;
60}
61
62/* Get the next CAS id for a new item. */
63static uint64_t get_cas_id(void) {
64    static uint64_t cas_id = 0;
65    return ++cas_id;
66}
67
68/* Enable this for reference-count debugging. */
69#if 0
70# define DEBUG_REFCNT(it,op) \
71                fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72                        it, op, it->refcount, \
73                        (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74                        (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
75#else
76# define DEBUG_REFCNT(it,op) while(0)
77#endif
78
79
80/*@null@*/
81hash_item *do_item_alloc(struct default_engine *engine,
82                         const void *key,
83                         const size_t nkey,
84                         const int flags,
85                         const rel_time_t exptime,
86                         const int nbytes,
87                         const void *cookie,
88                         uint8_t datatype) {
89    hash_item *it = NULL;
90    int tries = search_items;
91    hash_item *search;
92    rel_time_t oldest_live;
93    rel_time_t current_time;
94    unsigned int id;
95
96    size_t ntotal = sizeof(hash_item) + nkey + nbytes;
97    if (engine->config.use_cas) {
98        ntotal += sizeof(uint64_t);
99    }
100
101    if ((id = slabs_clsid(engine, ntotal)) == 0) {
102        return 0;
103    }
104
105    /* do a quick check if we have any expired items in the tail.. */
106    tries = search_items;
107    oldest_live = engine->config.oldest_live;
108    current_time = engine->server.core->get_current_time();
109
110    for (search = engine->items.tails[id];
111         tries > 0 && search != NULL;
112         tries--, search=search->prev) {
113        if (search->refcount == 0 &&
114            ((search->time < oldest_live) || /* dead by flush */
115             (search->exptime != 0 && search->exptime < current_time))) {
116            it = search;
117            /* I don't want to actually free the object, just steal
118             * the item to avoid to grab the slab mutex twice ;-)
119             */
120            cb_mutex_enter(&engine->stats.lock);
121            engine->stats.reclaimed++;
122            cb_mutex_exit(&engine->stats.lock);
123            engine->items.itemstats[id].reclaimed++;
124            it->refcount = 1;
125            slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
126            do_item_unlink(engine, it);
127            /* Initialize the item block: */
128            it->slabs_clsid = 0;
129            it->refcount = 0;
130            break;
131        }
132    }
133
134    if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
135        /*
136        ** Could not find an expired item at the tail, and memory allocation
137        ** failed. Try to evict some items!
138        */
139        tries = search_items;
140
141        /* If requested to not push old items out of cache when memory runs out,
142         * we're out of luck at this point...
143         */
144
145        if (engine->config.evict_to_free == 0) {
146            engine->items.itemstats[id].outofmemory++;
147            return NULL;
148        }
149
150        /*
151         * try to get one off the right LRU
152         * don't necessariuly unlink the tail because it may be locked: refcount>0
153         * search up from tail an item with refcount==0 and unlink it; give up after search_items
154         * tries
155         */
156
157        if (engine->items.tails[id] == 0) {
158            engine->items.itemstats[id].outofmemory++;
159            return NULL;
160        }
161
162        for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
163            if (search->refcount == 0) {
164                if (search->exptime == 0 || search->exptime > current_time) {
165                    engine->items.itemstats[id].evicted++;
166                    engine->items.itemstats[id].evicted_time = current_time - search->time;
167                    if (search->exptime != 0) {
168                        engine->items.itemstats[id].evicted_nonzero++;
169                    }
170                    cb_mutex_enter(&engine->stats.lock);
171                    engine->stats.evictions++;
172                    cb_mutex_exit(&engine->stats.lock);
173                    engine->server.stat->evicting(cookie,
174                                                  item_get_key(search),
175                                                  search->nkey);
176                } else {
177                    engine->items.itemstats[id].reclaimed++;
178                    cb_mutex_enter(&engine->stats.lock);
179                    engine->stats.reclaimed++;
180                    cb_mutex_exit(&engine->stats.lock);
181                }
182                do_item_unlink(engine, search);
183                break;
184            }
185        }
186        it = slabs_alloc(engine, ntotal, id);
187        if (it == 0) {
188            engine->items.itemstats[id].outofmemory++;
189            /* Last ditch effort. There is a very rare bug which causes
190             * refcount leaks. We've fixed most of them, but it still happens,
191             * and it may happen in the future.
192             * We can reasonably assume no item can stay locked for more than
193             * three hours, so if we find one in the tail which is that old,
194             * free it anyway.
195             */
196            tries = search_items;
197            for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
198                if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
199                    engine->items.itemstats[id].tailrepairs++;
200                    search->refcount = 0;
201                    do_item_unlink(engine, search);
202                    break;
203                }
204            }
205            it = slabs_alloc(engine, ntotal, id);
206            if (it == 0) {
207                return NULL;
208            }
209        }
210    }
211
212    cb_assert(it->slabs_clsid == 0);
213
214    it->slabs_clsid = id;
215
216    cb_assert(it != engine->items.heads[it->slabs_clsid]);
217
218    it->next = it->prev = it->h_next = 0;
219    it->refcount = 1;     /* the caller will have a reference */
220    DEBUG_REFCNT(it, '*');
221    it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
222    it->nkey = (uint16_t)nkey;
223    it->nbytes = nbytes;
224    it->flags = flags;
225    it->datatype = datatype;
226    memcpy((void*)item_get_key(it), key, nkey);
227    it->exptime = exptime;
228    return it;
229}
230
231static void item_free(struct default_engine *engine, hash_item *it) {
232    size_t ntotal = ITEM_ntotal(engine, it);
233    unsigned int clsid;
234    cb_assert((it->iflag & ITEM_LINKED) == 0);
235    cb_assert(it != engine->items.heads[it->slabs_clsid]);
236    cb_assert(it != engine->items.tails[it->slabs_clsid]);
237    cb_assert(it->refcount == 0);
238
239    /* so slab size changer can tell later if item is already free or not */
240    clsid = it->slabs_clsid;
241    it->slabs_clsid = 0;
242    it->iflag |= ITEM_SLABBED;
243    DEBUG_REFCNT(it, 'F');
244    slabs_free(engine, it, ntotal, clsid);
245}
246
247static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
248    hash_item **head, **tail;
249    cb_assert(it->slabs_clsid < POWER_LARGEST);
250    cb_assert((it->iflag & ITEM_SLABBED) == 0);
251
252    head = &engine->items.heads[it->slabs_clsid];
253    tail = &engine->items.tails[it->slabs_clsid];
254    cb_assert(it != *head);
255    cb_assert((*head && *tail) || (*head == 0 && *tail == 0));
256    it->prev = 0;
257    it->next = *head;
258    if (it->next) it->next->prev = it;
259    *head = it;
260    if (*tail == 0) *tail = it;
261    engine->items.sizes[it->slabs_clsid]++;
262    return;
263}
264
265static void item_unlink_q(struct default_engine *engine, hash_item *it) {
266    hash_item **head, **tail;
267    cb_assert(it->slabs_clsid < POWER_LARGEST);
268    head = &engine->items.heads[it->slabs_clsid];
269    tail = &engine->items.tails[it->slabs_clsid];
270
271    if (*head == it) {
272        cb_assert(it->prev == 0);
273        *head = it->next;
274    }
275    if (*tail == it) {
276        cb_assert(it->next == 0);
277        *tail = it->prev;
278    }
279    cb_assert(it->next != it);
280    cb_assert(it->prev != it);
281
282    if (it->next) it->next->prev = it->prev;
283    if (it->prev) it->prev->next = it->next;
284    engine->items.sizes[it->slabs_clsid]--;
285    return;
286}
287
288int do_item_link(struct default_engine *engine, hash_item *it) {
289    MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
290    cb_assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
291    cb_assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
292    it->iflag |= ITEM_LINKED;
293    it->time = engine->server.core->get_current_time();
294    assoc_insert(engine, engine->server.core->hash(item_get_key(it),
295                                                        it->nkey, 0),
296                 it);
297
298    cb_mutex_enter(&engine->stats.lock);
299    engine->stats.curr_bytes += ITEM_ntotal(engine, it);
300    engine->stats.curr_items += 1;
301    engine->stats.total_items += 1;
302    cb_mutex_exit(&engine->stats.lock);
303
304    /* Allocate a new CAS ID on link. */
305    item_set_cas(NULL, NULL, it, get_cas_id());
306
307    item_link_q(engine, it);
308
309    return 1;
310}
311
312void do_item_unlink(struct default_engine *engine, hash_item *it) {
313    MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
314    if ((it->iflag & ITEM_LINKED) != 0) {
315        it->iflag &= ~ITEM_LINKED;
316        cb_mutex_enter(&engine->stats.lock);
317        engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
318        engine->stats.curr_items -= 1;
319        cb_mutex_exit(&engine->stats.lock);
320        assoc_delete(engine, engine->server.core->hash(item_get_key(it),
321                                                            it->nkey, 0),
322                     item_get_key(it), it->nkey);
323        item_unlink_q(engine, it);
324        if (it->refcount == 0) {
325            item_free(engine, it);
326        }
327    }
328}
329
330void do_item_release(struct default_engine *engine, hash_item *it) {
331    MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
332    if (it->refcount != 0) {
333        it->refcount--;
334        DEBUG_REFCNT(it, '-');
335    }
336    if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
337        item_free(engine, it);
338    }
339}
340
341void do_item_update(struct default_engine *engine, hash_item *it) {
342    rel_time_t current_time = engine->server.core->get_current_time();
343    MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
344    if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
345        cb_assert((it->iflag & ITEM_SLABBED) == 0);
346
347        if ((it->iflag & ITEM_LINKED) != 0) {
348            item_unlink_q(engine, it);
349            it->time = current_time;
350            item_link_q(engine, it);
351        }
352    }
353}
354
355int do_item_replace(struct default_engine *engine,
356                    hash_item *it, hash_item *new_it) {
357    MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
358                           item_get_key(new_it), new_it->nkey, new_it->nbytes);
359    cb_assert((it->iflag & ITEM_SLABBED) == 0);
360
361    do_item_unlink(engine, it);
362    return do_item_link(engine, new_it);
363}
364
365/*@null@*/
366static char *do_item_cachedump(const unsigned int slabs_clsid,
367                               const unsigned int limit,
368                               unsigned int *bytes) {
369#ifdef FUTURE
370    unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
371    char *buffer;
372    unsigned int bufcurr;
373    hash_item *it;
374    unsigned int len;
375    unsigned int shown = 0;
376    char key_temp[KEY_MAX_LENGTH + 1];
377    char temp[512];
378
379    it = engine->items.heads[slabs_clsid];
380
381    buffer = malloc((size_t)memlimit);
382    if (buffer == 0) return NULL;
383    bufcurr = 0;
384
385
386    while (it != NULL && (limit == 0 || shown < limit)) {
387        cb_assert(it->nkey <= KEY_MAX_LENGTH);
388        /* Copy the key since it may not be null-terminated in the struct */
389        strncpy(key_temp, item_get_key(it), it->nkey);
390        key_temp[it->nkey] = 0x00; /* terminate */
391        len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
392                       key_temp, it->nbytes,
393                       (unsigned long)it->exptime + process_started);
394        if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
395            break;
396        memcpy(buffer + bufcurr, temp, len);
397        bufcurr += len;
398        shown++;
399        it = it->next;
400    }
401
402
403    memcpy(buffer + bufcurr, "END\r\n", 6);
404    bufcurr += 5;
405
406    *bytes = bufcurr;
407    return buffer;
408#endif
409    (void)slabs_clsid;
410    (void)limit;
411    (void)bytes;
412    return NULL;
413}
414
415static void do_item_stats(struct default_engine *engine,
416                          ADD_STAT add_stats, const void *c) {
417    int i;
418    rel_time_t current_time = engine->server.core->get_current_time();
419    for (i = 0; i < POWER_LARGEST; i++) {
420        if (engine->items.tails[i] != NULL) {
421            const char *prefix = "items";
422            int search = search_items;
423            while (search > 0 &&
424                   engine->items.tails[i] != NULL &&
425                   ((engine->config.oldest_live != 0 && /* Item flushd */
426                     engine->config.oldest_live <= current_time &&
427                     engine->items.tails[i]->time <= engine->config.oldest_live) ||
428                    (engine->items.tails[i]->exptime != 0 && /* and not expired */
429                     engine->items.tails[i]->exptime < current_time))) {
430                --search;
431                if (engine->items.tails[i]->refcount == 0) {
432                    do_item_unlink(engine, engine->items.tails[i]);
433                } else {
434                    break;
435                }
436            }
437            if (engine->items.tails[i] == NULL) {
438                /* We removed all of the items in this slab class */
439                continue;
440            }
441
442            add_statistics(c, add_stats, prefix, i, "number", "%u",
443                           engine->items.sizes[i]);
444            add_statistics(c, add_stats, prefix, i, "age", "%u",
445                           engine->items.tails[i]->time);
446            add_statistics(c, add_stats, prefix, i, "evicted",
447                           "%u", engine->items.itemstats[i].evicted);
448            add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
449                           "%u", engine->items.itemstats[i].evicted_nonzero);
450            add_statistics(c, add_stats, prefix, i, "evicted_time",
451                           "%u", engine->items.itemstats[i].evicted_time);
452            add_statistics(c, add_stats, prefix, i, "outofmemory",
453                           "%u", engine->items.itemstats[i].outofmemory);
454            add_statistics(c, add_stats, prefix, i, "tailrepairs",
455                           "%u", engine->items.itemstats[i].tailrepairs);;
456            add_statistics(c, add_stats, prefix, i, "reclaimed",
457                           "%u", engine->items.itemstats[i].reclaimed);;
458        }
459    }
460}
461
462/** dumps out a list of objects of each size, with granularity of 32 bytes */
463/*@null@*/
464static void do_item_stats_sizes(struct default_engine *engine,
465                                ADD_STAT add_stats, const void *c) {
466
467    /* max 1MB object, divided into 32 bytes size buckets */
468    const int num_buckets = 32768;
469    unsigned int *histogram = calloc(num_buckets, sizeof(unsigned int));
470
471    if (histogram != NULL) {
472        int i;
473
474        /* build the histogram */
475        for (i = 0; i < POWER_LARGEST; i++) {
476            hash_item *iter = engine->items.heads[i];
477            while (iter) {
478                size_t ntotal = ITEM_ntotal(engine, iter);
479                size_t bucket = ntotal / 32;
480                if ((ntotal % 32) != 0) {
481                    bucket++;
482                }
483                if (bucket < num_buckets) {
484                    histogram[bucket]++;
485                }
486                iter = iter->next;
487            }
488        }
489
490        /* write the buffer */
491        for (i = 0; i < num_buckets; i++) {
492            if (histogram[i] != 0) {
493                char key[8], val[32];
494                int klen, vlen;
495                klen = snprintf(key, sizeof(key), "%d", i * 32);
496                vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
497                cb_assert(klen < sizeof(key));
498                cb_assert(vlen < sizeof(val));
499                add_stats(key, klen, val, vlen, c);
500            }
501        }
502        free(histogram);
503    }
504}
505
506/** wrapper around assoc_find which does the lazy expiration logic */
507hash_item *do_item_get(struct default_engine *engine,
508                       const char *key, const size_t nkey) {
509    rel_time_t current_time = engine->server.core->get_current_time();
510    hash_item *it = assoc_find(engine, engine->server.core->hash(key,
511                                                                      nkey, 0),
512                               key, nkey);
513    int was_found = 0;
514
515    if (engine->config.verbose > 2) {
516        EXTENSION_LOGGER_DESCRIPTOR *logger;
517        logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
518        if (it == NULL) {
519            logger->log(EXTENSION_LOG_DEBUG, NULL,
520                        "> NOT FOUND %s", key);
521        } else {
522            logger->log(EXTENSION_LOG_DEBUG, NULL,
523                        "> FOUND KEY %s",
524                        (const char*)item_get_key(it));
525            was_found++;
526        }
527    }
528
529    if (it != NULL && engine->config.oldest_live != 0 &&
530        engine->config.oldest_live <= current_time &&
531        it->time <= engine->config.oldest_live) {
532        do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
533        it = NULL;
534    }
535
536    if (it == NULL && was_found) {
537        EXTENSION_LOGGER_DESCRIPTOR *logger;
538        logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
539        logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
540        was_found--;
541    }
542
543    if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
544        do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
545        it = NULL;
546    }
547
548    if (it == NULL && was_found) {
549        EXTENSION_LOGGER_DESCRIPTOR *logger;
550        logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
551        logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
552        was_found--;
553    }
554
555    if (it != NULL) {
556        it->refcount++;
557        DEBUG_REFCNT(it, '+');
558        do_item_update(engine, it);
559    }
560
561    return it;
562}
563
564/*
565 * Stores an item in the cache according to the semantics of one of the set
566 * commands. In threaded mode, this is protected by the cache lock.
567 *
568 * Returns the state of storage.
569 */
570static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
571                                       hash_item *it, uint64_t *cas,
572                                       ENGINE_STORE_OPERATION operation,
573                                       const void *cookie) {
574    const char *key = item_get_key(it);
575    hash_item *old_it = do_item_get(engine, key, it->nkey);
576    ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
577
578    hash_item *new_it = NULL;
579
580    if (old_it != NULL && operation == OPERATION_ADD) {
581        /* add only adds a nonexistent item, but promote to head of LRU */
582        do_item_update(engine, old_it);
583    } else if (!old_it && (operation == OPERATION_REPLACE
584        || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
585    {
586        /* replace only replaces an existing value; don't store */
587    } else if (operation == OPERATION_CAS) {
588        /* validate cas operation */
589        if(old_it == NULL) {
590            /* LRU expired */
591            stored = ENGINE_KEY_ENOENT;
592        }
593        else if (item_get_cas(it) == item_get_cas(old_it)) {
594            /* cas validates */
595            /* it and old_it may belong to different classes. */
596            /* I'm updating the stats for the one that's getting pushed out */
597            do_item_replace(engine, old_it, it);
598            stored = ENGINE_SUCCESS;
599        } else {
600            if (engine->config.verbose > 1) {
601                EXTENSION_LOGGER_DESCRIPTOR *logger;
602                logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
603                logger->log(EXTENSION_LOG_INFO, NULL,
604                        "CAS:  failure: expected %"PRIu64", got %"PRIu64"\n",
605                        item_get_cas(old_it),
606                        item_get_cas(it));
607            }
608            stored = ENGINE_KEY_EEXISTS;
609        }
610    } else {
611        /*
612         * Append - combine new and old record into single one. Here it's
613         * atomic and thread-safe.
614         */
615        if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
616            /*
617             * Validate CAS
618             */
619            if (item_get_cas(it) != 0) {
620                /* CAS much be equal */
621                if (item_get_cas(it) != item_get_cas(old_it)) {
622                    stored = ENGINE_KEY_EEXISTS;
623                }
624            }
625
626            if (stored == ENGINE_NOT_STORED) {
627                size_t total = it->nbytes + old_it->nbytes;
628                if (total > engine->config.item_size_max) {
629                    return ENGINE_E2BIG;
630                }
631
632                /* we have it and old_it here - alloc memory to hold both */
633                new_it = do_item_alloc(engine, key, it->nkey,
634                                       old_it->flags,
635                                       old_it->exptime,
636                                       it->nbytes + old_it->nbytes,
637                                       cookie, it->datatype);
638                if (new_it == NULL) {
639                    /* SERVER_ERROR out of memory */
640                    if (old_it != NULL) {
641                        do_item_release(engine, old_it);
642                    }
643
644                    return ENGINE_NOT_STORED;
645                }
646
647                /* copy data from it and old_it to new_it */
648
649                if (operation == OPERATION_APPEND) {
650                    memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
651                    memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
652                } else {
653                    /* OPERATION_PREPEND */
654                    memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
655                    memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
656                }
657
658                it = new_it;
659            }
660        }
661
662        if (stored == ENGINE_NOT_STORED) {
663            if (old_it != NULL) {
664                do_item_replace(engine, old_it, it);
665            } else {
666                do_item_link(engine, it);
667            }
668
669            *cas = item_get_cas(it);
670            stored = ENGINE_SUCCESS;
671        }
672    }
673
674    if (old_it != NULL) {
675        do_item_release(engine, old_it);         /* release our reference */
676    }
677
678    if (new_it != NULL) {
679        do_item_release(engine, new_it);
680    }
681
682    if (stored == ENGINE_SUCCESS) {
683        *cas = item_get_cas(it);
684    }
685
686    return stored;
687}
688
689
690/*
691 * adds a delta value to a numeric item.
692 *
693 * c     connection requesting the operation
694 * it    item to adjust
695 * incr  true to increment value, false to decrement
696 * delta amount to adjust value by
697 * buf   buffer for response string
698 *
699 * returns a response string to send back to the client.
700 */
701static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
702                                      hash_item *it, const bool incr,
703                                      const int64_t delta, uint64_t *rcas,
704                                      uint64_t *result, const void *cookie) {
705    const char *ptr;
706    uint64_t value;
707    char buf[80];
708    int res;
709
710    if (it->nbytes >= (sizeof(buf) - 1)) {
711        return ENGINE_EINVAL;
712    }
713
714    ptr = item_get_data(it);
715    memcpy(buf, ptr, it->nbytes);
716    buf[it->nbytes] = '\0';
717
718    if (!safe_strtoull(buf, &value)) {
719        return ENGINE_EINVAL;
720    }
721
722    if (incr) {
723        value += delta;
724    } else {
725        if ((uint64_t)delta > value) {
726            value = 0;
727        } else {
728            value -= delta;
729        }
730    }
731
732    *result = value;
733    if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
734        return ENGINE_EINVAL;
735    }
736
737    if (it->refcount == 1 && res <= (int)it->nbytes) {
738        /* we can do inline replacement */
739        memcpy(item_get_data(it), buf, res);
740        memset(item_get_data(it) + res, ' ', it->nbytes - res);
741        item_set_cas(NULL, NULL, it, get_cas_id());
742        *rcas = item_get_cas(it);
743    } else {
744        hash_item *new_it = do_item_alloc(engine, item_get_key(it),
745                                          it->nkey, it->flags,
746                                          it->exptime, res,
747                                          cookie, it->datatype);
748        if (new_it == NULL) {
749            do_item_unlink(engine, it);
750            return ENGINE_ENOMEM;
751        }
752        memcpy(item_get_data(new_it), buf, res);
753        do_item_replace(engine, it, new_it);
754        *rcas = item_get_cas(new_it);
755        do_item_release(engine, new_it);       /* release our reference */
756    }
757
758    return ENGINE_SUCCESS;
759}
760
761/********************************* ITEM ACCESS *******************************/
762
763/*
764 * Allocates a new item.
765 */
766hash_item *item_alloc(struct default_engine *engine,
767                      const void *key, size_t nkey, int flags,
768                      rel_time_t exptime, int nbytes, const void *cookie,
769                      uint8_t datatype) {
770    hash_item *it;
771    cb_mutex_enter(&engine->cache_lock);
772    it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie,
773                       datatype);
774    cb_mutex_exit(&engine->cache_lock);
775    return it;
776}
777
778/*
779 * Returns an item if it hasn't been marked as expired,
780 * lazy-expiring as needed.
781 */
782hash_item *item_get(struct default_engine *engine,
783                    const void *key, const size_t nkey) {
784    hash_item *it;
785    cb_mutex_enter(&engine->cache_lock);
786    it = do_item_get(engine, key, nkey);
787    cb_mutex_exit(&engine->cache_lock);
788    return it;
789}
790
791/*
792 * Decrements the reference count on an item and adds it to the freelist if
793 * needed.
794 */
795void item_release(struct default_engine *engine, hash_item *item) {
796    cb_mutex_enter(&engine->cache_lock);
797    do_item_release(engine, item);
798    cb_mutex_exit(&engine->cache_lock);
799}
800
801/*
802 * Unlinks an item from the LRU and hashtable.
803 */
804void item_unlink(struct default_engine *engine, hash_item *item) {
805    cb_mutex_enter(&engine->cache_lock);
806    do_item_unlink(engine, item);
807    cb_mutex_exit(&engine->cache_lock);
808}
809
810static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
811                                       const void* cookie,
812                                       const void* key,
813                                       const int nkey,
814                                       const bool increment,
815                                       const bool create,
816                                       const uint64_t delta,
817                                       const uint64_t initial,
818                                       const rel_time_t exptime,
819                                       uint64_t *cas,
820                                       uint8_t datatype,
821                                       uint64_t *result)
822{
823   hash_item *item = do_item_get(engine, key, nkey);
824   ENGINE_ERROR_CODE ret;
825
826   if (item == NULL) {
827      if (!create) {
828         return ENGINE_KEY_ENOENT;
829      } else {
830         char buffer[128];
831         int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
832                            (uint64_t)initial);
833
834         item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie,
835                              datatype);
836         if (item == NULL) {
837            return ENGINE_ENOMEM;
838         }
839         memcpy((void*)item_get_data(item), buffer, len);
840         if ((ret = do_store_item(engine, item, cas,
841                                  OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
842             *result = initial;
843             *cas = item_get_cas(item);
844         }
845         do_item_release(engine, item);
846      }
847   } else {
848      ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
849      do_item_release(engine, item);
850   }
851
852   return ret;
853}
854
855ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
856                             const void* cookie,
857                             const void* key,
858                             const int nkey,
859                             const bool increment,
860                             const bool create,
861                             const uint64_t delta,
862                             const uint64_t initial,
863                             const rel_time_t exptime,
864                             uint64_t *cas,
865                             uint8_t datatype,
866                             uint64_t *result)
867{
868    ENGINE_ERROR_CODE ret;
869
870    cb_mutex_enter(&engine->cache_lock);
871    ret = do_arithmetic(engine, cookie, key, nkey, increment,
872                        create, delta, initial, exptime, cas,
873                        datatype, result);
874    cb_mutex_exit(&engine->cache_lock);
875    return ret;
876}
877
878/*
879 * Stores an item in the cache (high level, obeys set/add/replace semantics)
880 */
881ENGINE_ERROR_CODE store_item(struct default_engine *engine,
882                             hash_item *item, uint64_t *cas,
883                             ENGINE_STORE_OPERATION operation,
884                             const void *cookie) {
885    ENGINE_ERROR_CODE ret;
886
887    cb_mutex_enter(&engine->cache_lock);
888    ret = do_store_item(engine, item, cas, operation, cookie);
889    cb_mutex_exit(&engine->cache_lock);
890    return ret;
891}
892
893static hash_item *do_touch_item(struct default_engine *engine,
894                                     const void *key,
895                                     uint16_t nkey,
896                                     uint32_t exptime)
897{
898   hash_item *item = do_item_get(engine, key, nkey);
899   if (item != NULL) {
900       item->exptime = exptime;
901   }
902   return item;
903}
904
905hash_item *touch_item(struct default_engine *engine,
906                           const void *key,
907                           uint16_t nkey,
908                           uint32_t exptime)
909{
910    hash_item *ret;
911
912    cb_mutex_enter(&engine->cache_lock);
913    ret = do_touch_item(engine, key, nkey, exptime);
914    cb_mutex_exit(&engine->cache_lock);
915    return ret;
916}
917
918/*
919 * Flushes expired items after a flush_all call
920 */
921void item_flush_expired(struct default_engine *engine, time_t when) {
922    int i;
923    hash_item *iter, *next;
924
925    cb_mutex_enter(&engine->cache_lock);
926
927    if (when == 0) {
928        engine->config.oldest_live = engine->server.core->get_current_time() - 1;
929    } else {
930        engine->config.oldest_live = engine->server.core->realtime(when) - 1;
931    }
932
933    if (engine->config.oldest_live != 0) {
934        for (i = 0; i < POWER_LARGEST; i++) {
935            /*
936             * The LRU is sorted in decreasing time order, and an item's
937             * timestamp is never newer than its last access time, so we
938             * only need to walk back until we hit an item older than the
939             * oldest_live time.
940             * The oldest_live checking will auto-expire the remaining items.
941             */
942            for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
943                if (iter->time >= engine->config.oldest_live) {
944                    next = iter->next;
945                    if ((iter->iflag & ITEM_SLABBED) == 0) {
946                        do_item_unlink(engine, iter);
947                    }
948                } else {
949                    /* We've hit the first old item. Continue to the next queue. */
950                    break;
951                }
952            }
953        }
954    }
955    cb_mutex_exit(&engine->cache_lock);
956}
957
958/*
959 * Dumps part of the cache
960 */
961char *item_cachedump(struct default_engine *engine,
962                     const unsigned int slabs_clsid,
963                     const unsigned int limit,
964                     unsigned int *bytes) {
965    char *ret;
966
967    cb_mutex_enter(&engine->cache_lock);
968    ret = do_item_cachedump(slabs_clsid, limit, bytes);
969    cb_mutex_exit(&engine->cache_lock);
970    return ret;
971}
972
973void item_stats(struct default_engine *engine,
974                   ADD_STAT add_stat, const void *cookie)
975{
976    cb_mutex_enter(&engine->cache_lock);
977    do_item_stats(engine, add_stat, cookie);
978    cb_mutex_exit(&engine->cache_lock);
979}
980
981
982void item_stats_sizes(struct default_engine *engine,
983                      ADD_STAT add_stat, const void *cookie)
984{
985    cb_mutex_enter(&engine->cache_lock);
986    do_item_stats_sizes(engine, add_stat, cookie);
987    cb_mutex_exit(&engine->cache_lock);
988}
989
990static void do_item_link_cursor(struct default_engine *engine,
991                                hash_item *cursor, int ii)
992{
993    cursor->slabs_clsid = (uint8_t)ii;
994    cursor->next = NULL;
995    cursor->prev = engine->items.tails[ii];
996    engine->items.tails[ii]->next = cursor;
997    engine->items.tails[ii] = cursor;
998    engine->items.sizes[ii]++;
999}
1000
1001typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
1002                                      hash_item *item, void *cookie);
1003
1004static bool do_item_walk_cursor(struct default_engine *engine,
1005                                hash_item *cursor,
1006                                int steplength,
1007                                ITERFUNC itemfunc,
1008                                void* itemdata,
1009                                ENGINE_ERROR_CODE *error)
1010{
1011    int ii = 0;
1012    *error = ENGINE_SUCCESS;
1013
1014    while (cursor->prev != NULL && ii < steplength) {
1015        /* Move cursor */
1016        hash_item *ptr = cursor->prev;
1017        bool done = false;
1018
1019        ++ii;
1020        item_unlink_q(engine, cursor);
1021
1022        if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1023            done = true;
1024            cursor->prev = NULL;
1025        } else {
1026            cursor->next = ptr;
1027            cursor->prev = ptr->prev;
1028            cursor->prev->next = cursor;
1029            ptr->prev = cursor;
1030        }
1031
1032        /* Ignore cursors */
1033        if (ptr->nkey == 0 && ptr->nbytes == 0) {
1034            --ii;
1035        } else {
1036            *error = itemfunc(engine, ptr, itemdata);
1037            if (*error != ENGINE_SUCCESS) {
1038                return false;
1039            }
1040        }
1041
1042        if (done) {
1043            return false;
1044        }
1045    }
1046
1047    return (cursor->prev != NULL);
1048}
1049
1050static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1051                                    hash_item *item,
1052                                    void *cookie) {
1053    rel_time_t current_time = engine->server.core->get_current_time();
1054    (void)cookie;
1055    engine->scrubber.visited++;
1056    if (item->refcount == 0 &&
1057        (item->exptime != 0 && item->exptime < current_time)) {
1058        do_item_unlink(engine, item);
1059        engine->scrubber.cleaned++;
1060    }
1061    return ENGINE_SUCCESS;
1062}
1063
1064static void item_scrub_class(struct default_engine *engine,
1065                             hash_item *cursor) {
1066
1067    ENGINE_ERROR_CODE ret;
1068    bool more;
1069    do {
1070        cb_mutex_enter(&engine->cache_lock);
1071        more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1072        cb_mutex_exit(&engine->cache_lock);
1073        if (ret != ENGINE_SUCCESS) {
1074            break;
1075        }
1076    } while (more);
1077}
1078
1079static void item_scubber_main(void *arg)
1080{
1081    struct default_engine *engine = arg;
1082    hash_item cursor;
1083    int ii;
1084
1085    memset(&cursor, 0, sizeof(cursor));
1086    cursor.refcount = 1;
1087    for (ii = 0; ii < POWER_LARGEST; ++ii) {
1088        bool skip = false;
1089        cb_mutex_enter(&engine->cache_lock);
1090        if (engine->items.heads[ii] == NULL) {
1091            skip = true;
1092        } else {
1093            /* add the item at the tail */
1094            do_item_link_cursor(engine, &cursor, ii);
1095        }
1096        cb_mutex_exit(&engine->cache_lock);
1097
1098        if (!skip) {
1099            item_scrub_class(engine, &cursor);
1100        }
1101    }
1102
1103    cb_mutex_enter(&engine->scrubber.lock);
1104    engine->scrubber.stopped = time(NULL);
1105    engine->scrubber.running = false;
1106    cb_mutex_exit(&engine->scrubber.lock);
1107}
1108
1109bool item_start_scrub(struct default_engine *engine)
1110{
1111    bool ret = false;
1112    cb_mutex_enter(&engine->scrubber.lock);
1113    if (!engine->scrubber.running) {
1114        cb_thread_t t;
1115
1116        engine->scrubber.started = time(NULL);
1117        engine->scrubber.stopped = 0;
1118        engine->scrubber.visited = 0;
1119        engine->scrubber.cleaned = 0;
1120        engine->scrubber.running = true;
1121
1122        if (cb_create_thread(&t, item_scubber_main, engine, 1) != 0)
1123        {
1124            engine->scrubber.running = false;
1125        } else {
1126            ret = true;
1127        }
1128    }
1129    cb_mutex_exit(&engine->scrubber.lock);
1130
1131    return ret;
1132}
1133
1134struct tap_client {
1135    hash_item cursor;
1136    hash_item *it;
1137};
1138
1139static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1140                                    hash_item *item,
1141                                    void *cookie) {
1142    struct tap_client *client = cookie;
1143    client->it = item;
1144    ++client->it->refcount;
1145    return ENGINE_SUCCESS;
1146}
1147
1148static tap_event_t do_item_tap_walker(struct default_engine *engine,
1149                                         const void *cookie, item **itm,
1150                                         void **es, uint16_t *nes, uint8_t *ttl,
1151                                         uint16_t *flags, uint32_t *seqno,
1152                                         uint16_t *vbucket)
1153{
1154    ENGINE_ERROR_CODE r;
1155    struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1156    if (client == NULL) {
1157        return TAP_DISCONNECT;
1158    }
1159
1160    *es = NULL;
1161    *nes = 0;
1162    *ttl = (uint8_t)-1;
1163    *seqno = 0;
1164    *flags = 0;
1165    *vbucket = 0;
1166    client->it = NULL;
1167
1168    do {
1169        if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1170            /* find next slab class to look at.. */
1171            bool linked = false;
1172            int ii;
1173            for (ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked;  ++ii) {
1174                if (engine->items.heads[ii] != NULL) {
1175                    /* add the item at the tail */
1176                    do_item_link_cursor(engine, &client->cursor, ii);
1177                    linked = true;
1178                }
1179            }
1180            if (!linked) {
1181                break;
1182            }
1183        }
1184    } while (client->it == NULL);
1185    *itm = client->it;
1186
1187    return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1188}
1189
1190tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1191                            const void *cookie, item **itm,
1192                            void **es, uint16_t *nes, uint8_t *ttl,
1193                            uint16_t *flags, uint32_t *seqno,
1194                            uint16_t *vbucket)
1195{
1196    tap_event_t ret;
1197    struct default_engine *engine = (struct default_engine*)handle;
1198    cb_mutex_enter(&engine->cache_lock);
1199    ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1200    cb_mutex_exit(&engine->cache_lock);
1201
1202    return ret;
1203}
1204
1205bool initialize_item_tap_walker(struct default_engine *engine,
1206                                const void* cookie)
1207{
1208    bool linked = false;
1209    int ii;
1210    struct tap_client *client = calloc(1, sizeof(*client));
1211    if (client == NULL) {
1212        return false;
1213    }
1214    client->cursor.refcount = 1;
1215
1216    /* Link the cursor! */
1217    for (ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1218        cb_mutex_enter(&engine->cache_lock);
1219        if (engine->items.heads[ii] != NULL) {
1220            /* add the item at the tail */
1221            do_item_link_cursor(engine, &client->cursor, ii);
1222            linked = true;
1223        }
1224        cb_mutex_exit(&engine->cache_lock);
1225    }
1226
1227    engine->server.cookie->store_engine_specific(cookie, client);
1228    return true;
1229}
1230
1231void link_dcp_walker(struct default_engine *engine,
1232                     struct dcp_connection *connection)
1233{
1234    bool linked = false;
1235    int ii;
1236    connection->cursor.refcount = 1;
1237
1238    /* Link the cursor! */
1239    for (ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1240        cb_mutex_enter(&engine->cache_lock);
1241        if (engine->items.heads[ii] != NULL) {
1242            /* add the item at the tail */
1243            do_item_link_cursor(engine, &connection->cursor, ii);
1244            linked = true;
1245        }
1246        cb_mutex_exit(&engine->cache_lock);
1247    }
1248}
1249
1250static ENGINE_ERROR_CODE item_dcp_iterfunc(struct default_engine *engine,
1251                                           hash_item *item,
1252                                           void *cookie) {
1253    struct dcp_connection *connection = cookie;
1254    connection->it = item;
1255    ++connection->it->refcount;
1256    return ENGINE_SUCCESS;
1257}
1258
1259static ENGINE_ERROR_CODE do_item_dcp_step(struct default_engine *engine,
1260                                          struct dcp_connection *connection,
1261                                          const void *cookie,
1262                                          struct dcp_message_producers *producers)
1263{
1264    ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
1265
1266    while (connection->it == NULL) {
1267        if (!do_item_walk_cursor(engine, &connection->cursor, 1,
1268                                 item_dcp_iterfunc, connection, &ret)) {
1269            /* find next slab class to look at.. */
1270            bool linked = false;
1271            int ii;
1272            for (ii = connection->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked;  ++ii) {
1273                if (engine->items.heads[ii] != NULL) {
1274                    /* add the item at the tail */
1275                    do_item_link_cursor(engine, &connection->cursor, ii);
1276                    linked = true;
1277                }
1278            }
1279            if (!linked) {
1280                break;
1281            }
1282        }
1283    }
1284
1285    if (connection->it != NULL) {
1286        rel_time_t current_time = engine->server.core->get_current_time();
1287        rel_time_t exptime = connection->it->exptime;
1288
1289        if (exptime != 0 && exptime < current_time) {
1290            ret = producers->expiration(cookie, connection->opaque,
1291                                        item_get_key(connection->it),
1292                                        connection->it->nkey,
1293                                        item_get_cas(connection->it),
1294                                        0, 0, 0, NULL, 0);
1295            if (ret == ENGINE_SUCCESS) {
1296                do_item_unlink(engine, connection->it);
1297                do_item_release(engine, connection->it);
1298            }
1299        } else {
1300            ret = producers->mutation(cookie, connection->opaque,
1301                                      connection->it, 0, 0, 0, 0, NULL, 0, 0);
1302        }
1303
1304        if (ret == ENGINE_SUCCESS) {
1305            connection->it = NULL;
1306        }
1307    } else {
1308        return ENGINE_DISCONNECT;
1309    }
1310
1311    return ret;
1312}
1313
1314ENGINE_ERROR_CODE item_dcp_step(struct default_engine *engine,
1315                                struct dcp_connection *connection,
1316                                const void *cookie,
1317                                struct dcp_message_producers *producers)
1318{
1319    ENGINE_ERROR_CODE ret;
1320    cb_mutex_enter(&engine->cache_lock);
1321    ret = do_item_dcp_step(engine, connection, cookie, producers);
1322    cb_mutex_exit(&engine->cache_lock);
1323    return ret;
1324}
1325