1/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2#include "config.h"
3#include <errno.h>
4#include <fcntl.h>
5#include <inttypes.h>
6#include <stdio.h>
7#include <stdlib.h>
8#include <string.h>
9#include <time.h>
10#include <gsl/gsl>
11
12#include <logger/logger.h>
13#include <memcached/server_api.h>
14#include <platform/cb_malloc.h>
15#include <platform/crc32c.h>
16#include "default_engine_internal.h"
17#include "engine_manager.h"
18
19/* Forward Declarations */
20static void item_link_q(struct default_engine *engine, hash_item *it);
21static void item_unlink_q(struct default_engine *engine, hash_item *it);
22static hash_item *do_item_alloc(struct default_engine *engine,
23                                const hash_key *key,
24                                const int flags, const rel_time_t exptime,
25                                const int nbytes,
26                                const void *cookie,
27                                uint8_t datatype);
28static hash_item* do_item_get(struct default_engine* engine,
29                              const hash_key* key,
30                              const DocStateFilter document_state);
31static int do_item_link(struct default_engine *engine,
32                        const void* cookie,
33                        hash_item *it);
34static void do_item_unlink(struct default_engine *engine, hash_item *it);
35static ENGINE_ERROR_CODE do_safe_item_unlink(struct default_engine *engine,
36                                             hash_item *it);
37static void do_item_release(struct default_engine *engine, hash_item *it);
38static void do_item_update(struct default_engine *engine, hash_item *it);
39static int do_item_replace(struct default_engine* engine,
40                           const void* cookie,
41                           hash_item* it,
42                           hash_item* new_it);
43static void item_free(struct default_engine *engine, hash_item *it);
44
45static bool hash_key_create(hash_key* hkey,
46                            const void* key,
47                            const size_t nkey,
48                            struct default_engine* engine,
49                            const void* cookie);
50
51static void hash_key_destroy(hash_key* hkey);
52static void hash_key_copy_to_item(hash_item* dst, const hash_key* src);
53
54/*
55 * We only reposition items in the LRU queue if they haven't been repositioned
56 * in this many seconds. That saves us from churning on frequently-accessed
57 * items.
58 */
59#define ITEM_UPDATE_INTERVAL 60
60/*
61 * To avoid scanning through the complete cache in some circumstances we'll
62 * just give up and return an error after inspecting a fixed number of objects.
63 */
64static const int search_items = 50;
65
66void item_stats_reset(struct default_engine *engine) {
67    cb_mutex_enter(&engine->items.lock);
68    memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
69    cb_mutex_exit(&engine->items.lock);
70}
71
72
73/* warning: don't use these macros with a function, as it evals its arg twice */
74static size_t ITEM_ntotal(struct default_engine *engine,
75                          const hash_item *item) {
76    size_t ret = sizeof(*item) + hash_key_get_alloc_size(item_get_key(item)) + item->nbytes;
77    return ret;
78}
79
80/* Get the next CAS id for a new item. */
81static uint64_t get_cas_id(void) {
82    static uint64_t cas_id = 0;
83    return ++cas_id;
84}
85
86/* Enable this for reference-count debugging. */
87#if 0
88# define DEBUG_REFCNT(it,op) \
89                fprintf(stderr, "item %p refcnt(%c) %d %c%c\n", \
90                        it, op, it->refcount, \
91                        (it->iflag & ITEM_LINKED) ? 'L' : ' ', \
92                        (it->iflag & ITEM_SLABBED) ? 'S' : ' ')
93#else
94# define DEBUG_REFCNT(it,op) while(0)
95#endif
96
97
98/*@null@*/
99hash_item *do_item_alloc(struct default_engine *engine,
100                         const hash_key *key,
101                         const int flags,
102                         const rel_time_t exptime,
103                         const int nbytes,
104                         const void *cookie,
105                         uint8_t datatype) {
106    hash_item *it = NULL;
107    int tries = search_items;
108    hash_item *search;
109    rel_time_t oldest_live;
110    rel_time_t current_time;
111    unsigned int id;
112
113    size_t ntotal = sizeof(hash_item) + hash_key_get_alloc_size(key) + nbytes;
114
115    if ((id = slabs_clsid(engine, ntotal)) == 0) {
116        return 0;
117    }
118
119    /* do a quick check if we have any expired items in the tail.. */
120    oldest_live = engine->config.oldest_live;
121    current_time = engine->server.core->get_current_time();
122
123    for (search = engine->items.tails[id];
124         tries > 0 && search != NULL;
125         tries--, search=search->prev) {
126        if (search->refcount == 0 &&
127            ((search->time < oldest_live) || /* dead by flush */
128             (search->exptime != 0 && search->exptime < current_time)) &&
129            (search->locktime <= current_time)) {
130            it = search;
131            /* I don't want to actually free the object, just steal
132             * the item to avoid to grab the slab mutex twice ;-)
133             */
134            cb_mutex_enter(&engine->stats.lock);
135            engine->stats.reclaimed++;
136            cb_mutex_exit(&engine->stats.lock);
137            engine->items.itemstats[id].reclaimed++;
138            it->refcount = 1;
139            slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
140            do_item_unlink(engine, it);
141            /* Initialize the item block: */
142            it->slabs_clsid = 0;
143            it->refcount = 0;
144            break;
145        }
146    }
147
148    if (it == NULL &&
149        (it = static_cast<hash_item*>(slabs_alloc(engine, ntotal, id))) == NULL) {
150        /*
151        ** Could not find an expired item at the tail, and memory allocation
152        ** failed. Try to evict some items!
153        */
154        tries = search_items;
155
156        /* If requested to not push old items out of cache when memory runs out,
157         * we're out of luck at this point...
158         */
159
160        if (engine->config.evict_to_free == 0) {
161            engine->items.itemstats[id].outofmemory++;
162            return NULL;
163        }
164
165        /*
166         * try to get one off the right LRU
167         * don't necessariuly unlink the tail because it may be locked: refcount>0
168         * search up from tail an item with refcount==0 and unlink it; give up after search_items
169         * tries
170         */
171
172        if (engine->items.tails[id] == 0) {
173            engine->items.itemstats[id].outofmemory++;
174            return NULL;
175        }
176
177        for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
178            if (search->refcount == 0 && search->locktime <= current_time) {
179                if (search->exptime == 0 || search->exptime > current_time) {
180                    engine->items.itemstats[id].evicted++;
181                    engine->items.itemstats[id].evicted_time = current_time - search->time;
182                    if (search->exptime != 0) {
183                        engine->items.itemstats[id].evicted_nonzero++;
184                    }
185                    cb_mutex_enter(&engine->stats.lock);
186                    engine->stats.evictions++;
187                    cb_mutex_exit(&engine->stats.lock);
188                    const hash_key* search_key = item_get_key(search);
189                    engine->server.stat->evicting(cookie,
190                                                  hash_key_get_client_key(search_key),
191                                                  hash_key_get_client_key_len(search_key));
192                } else {
193                    engine->items.itemstats[id].reclaimed++;
194                    cb_mutex_enter(&engine->stats.lock);
195                    engine->stats.reclaimed++;
196                    cb_mutex_exit(&engine->stats.lock);
197                }
198                do_item_unlink(engine, search);
199                break;
200            }
201        }
202        it = static_cast<hash_item*>(slabs_alloc(engine, ntotal, id));
203        if (it == 0) {
204            engine->items.itemstats[id].outofmemory++;
205            /* Last ditch effort. There is a very rare bug which causes
206             * refcount leaks. We've fixed most of them, but it still happens,
207             * and it may happen in the future.
208             * We can reasonably assume no item can stay locked for more than
209             * three hours, so if we find one in the tail which is that old,
210             * free it anyway.
211             */
212            tries = search_items;
213            for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
214                if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
215                    engine->items.itemstats[id].tailrepairs++;
216                    search->refcount = 0;
217                    do_item_unlink(engine, search);
218                    break;
219                }
220            }
221            it = static_cast<hash_item*>(slabs_alloc(engine, ntotal, id));
222            if (it == 0) {
223                return NULL;
224            }
225        }
226    }
227
228    cb_assert(it->slabs_clsid == 0);
229
230    it->slabs_clsid = id;
231
232    cb_assert(it != engine->items.heads[it->slabs_clsid]);
233
234    it->next = it->prev = it->h_next = 0;
235    it->refcount = 1;     /* the caller will have a reference */
236    DEBUG_REFCNT(it, '*');
237    it->iflag = 0;
238    it->nbytes = nbytes;
239    it->flags = flags;
240    it->datatype = datatype;
241    it->exptime = exptime;
242    it->locktime = 0;
243    hash_key_copy_to_item(it, key);
244    return it;
245}
246
247static void item_free(struct default_engine *engine, hash_item *it) {
248    size_t ntotal = ITEM_ntotal(engine, it);
249    unsigned int clsid;
250    cb_assert((it->iflag & ITEM_LINKED) == 0);
251    cb_assert(it != engine->items.heads[it->slabs_clsid]);
252    cb_assert(it != engine->items.tails[it->slabs_clsid]);
253    cb_assert(it->refcount == 0 || engine->scrubber.force_delete);
254
255    /* so slab size changer can tell later if item is already free or not */
256    clsid = it->slabs_clsid;
257    it->slabs_clsid = 0;
258    it->iflag |= ITEM_SLABBED;
259    DEBUG_REFCNT(it, 'F');
260    slabs_free(engine, it, ntotal, clsid);
261}
262
263static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
264    hash_item **head, **tail;
265    cb_assert(it->slabs_clsid < POWER_LARGEST);
266    cb_assert((it->iflag & ITEM_SLABBED) == 0);
267
268    head = &engine->items.heads[it->slabs_clsid];
269    tail = &engine->items.tails[it->slabs_clsid];
270    cb_assert(it != *head);
271    cb_assert((*head && *tail) || (*head == 0 && *tail == 0));
272    it->prev = 0;
273    it->next = *head;
274    if (it->next) it->next->prev = it;
275    *head = it;
276    if (*tail == 0) *tail = it;
277    engine->items.sizes[it->slabs_clsid]++;
278    return;
279}
280
281static void item_unlink_q(struct default_engine *engine, hash_item *it) {
282    hash_item **head, **tail;
283    cb_assert(it->slabs_clsid < POWER_LARGEST);
284    head = &engine->items.heads[it->slabs_clsid];
285    tail = &engine->items.tails[it->slabs_clsid];
286
287    if (*head == it) {
288        cb_assert(it->prev == 0);
289        *head = it->next;
290    }
291    if (*tail == it) {
292        cb_assert(it->next == 0);
293        *tail = it->prev;
294    }
295    cb_assert(it->next != it);
296    cb_assert(it->prev != it);
297
298    if (it->next) it->next->prev = it->prev;
299    if (it->prev) it->prev->next = it->next;
300    engine->items.sizes[it->slabs_clsid]--;
301    return;
302}
303
304int do_item_link(struct default_engine *engine,
305                 const void* cookie,
306                 hash_item *it) {
307    const hash_key* key = item_get_key(it);
308    MEMCACHED_ITEM_LINK(hash_key_get_client_key(key), hash_key_get_client_key_len(key), it->nbytes);
309    cb_assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
310    it->iflag |= ITEM_LINKED;
311    it->time = engine->server.core->get_current_time();
312
313    assoc_insert(crc32c(hash_key_get_key(key), hash_key_get_key_len(key), 0),
314                 it);
315
316    cb_mutex_enter(&engine->stats.lock);
317    engine->stats.curr_bytes += ITEM_ntotal(engine, it);
318    engine->stats.curr_items += 1;
319    engine->stats.total_items += 1;
320    cb_mutex_exit(&engine->stats.lock);
321
322    auto cas = get_cas_id();
323
324    /* Allocate a new CAS ID on link. */
325    item_set_cas(reinterpret_cast<ENGINE_HANDLE*>(&engine), it, cas);
326
327    item_info info = {};
328    info.cas = cas;
329    info.exptime = it->exptime;
330    info.nbytes = it->nbytes;
331    info.flags = it->flags;
332    info.nkey = hash_key_get_client_key_len(key);
333    info.key = hash_key_get_client_key(key);
334    info.value[0].iov_base = item_get_data(it);
335    info.value[0].iov_len = it->nbytes;
336    info.datatype = it->datatype;
337
338    if (engine->server.document->pre_link(cookie, info) != ENGINE_SUCCESS) {
339        return 0;
340    }
341
342    item_link_q(engine, it);
343
344    return 1;
345}
346
347void do_item_unlink(struct default_engine *engine, hash_item *it) {
348    const hash_key* key = item_get_key(it);
349    MEMCACHED_ITEM_UNLINK(hash_key_get_client_key(key),
350                          hash_key_get_client_key_len(key),
351                          it->nbytes);
352    if ((it->iflag & ITEM_LINKED) != 0) {
353        it->iflag &= ~ITEM_LINKED;
354        cb_mutex_enter(&engine->stats.lock);
355        engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
356        engine->stats.curr_items -= 1;
357        cb_mutex_exit(&engine->stats.lock);
358        assoc_delete(crc32c(hash_key_get_key(key), hash_key_get_key_len(key), 0),
359                     key);
360        item_unlink_q(engine, it);
361        if (it->refcount == 0 || engine->scrubber.force_delete) {
362            item_free(engine, it);
363        }
364    }
365}
366
367ENGINE_ERROR_CODE do_safe_item_unlink(struct default_engine* engine,
368                                      hash_item* it) {
369
370    const hash_key* key = item_get_key(it);
371    auto* stored =
372            do_item_get(engine, key, DocStateFilter::AliveOrDeleted);
373    if (stored == nullptr) {
374        return ENGINE_KEY_ENOENT;
375    }
376
377    auto ret = ENGINE_SUCCESS;
378
379    if (it->cas == stored->cas) {
380        MEMCACHED_ITEM_UNLINK(hash_key_get_client_key(key),
381                              hash_key_get_client_key_len(key),
382                              it->nbytes);
383        if ((stored->iflag & ITEM_LINKED) != 0) {
384            stored->iflag &= ~ITEM_LINKED;
385            cb_mutex_enter(&engine->stats.lock);
386            engine->stats.curr_bytes -= ITEM_ntotal(engine, stored);
387            engine->stats.curr_items -= 1;
388            cb_mutex_exit(&engine->stats.lock);
389            assoc_delete(crc32c(hash_key_get_key(key),
390                                hash_key_get_key_len(key), 0),
391                         key);
392            item_unlink_q(engine, stored);
393            if (stored->refcount == 0 || engine->scrubber.force_delete) {
394                item_free(engine, stored);
395            }
396        }
397    } else {
398        ret = ENGINE_KEY_EEXISTS;
399    }
400
401    do_item_release(engine, it);
402    return ret;
403}
404
405void do_item_release(struct default_engine *engine, hash_item *it) {
406    MEMCACHED_ITEM_REMOVE(hash_key_get_client_key(item_get_key(it)),
407                          hash_key_get_client_key_len(item_get_key(it)),
408                          it->nbytes);
409    if (it->refcount != 0) {
410        it->refcount--;
411        DEBUG_REFCNT(it, '-');
412    }
413    if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
414        item_free(engine, it);
415    }
416}
417
418void do_item_update(struct default_engine *engine, hash_item *it) {
419    rel_time_t current_time = engine->server.core->get_current_time();
420    MEMCACHED_ITEM_UPDATE(hash_key_get_client_key(item_get_key(it)),
421                          hash_key_get_client_key_len(item_get_key(it)),
422                          it->nbytes);
423    if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
424        cb_assert((it->iflag & ITEM_SLABBED) == 0);
425
426        if ((it->iflag & ITEM_LINKED) != 0) {
427            item_unlink_q(engine, it);
428            it->time = current_time;
429            item_link_q(engine, it);
430        }
431    }
432}
433
434int do_item_replace(struct default_engine *engine,
435                    const void* cookie,
436                    hash_item *it,
437                    hash_item *new_it) {
438    MEMCACHED_ITEM_REPLACE(hash_key_get_client_key(item_get_key(it)),
439                           hash_key_get_client_key_len(item_get_key(it)),
440                           it->nbytes,
441                           hash_key_get_client_key(item_get_key(new_it)),
442                           hash_key_get_client_key_len(item_get_key(new_it)),
443                           new_it->nbytes);
444    cb_assert((it->iflag & ITEM_SLABBED) == 0);
445
446    do_item_unlink(engine, it);
447    return do_item_link(engine, cookie, new_it);
448}
449
450static void do_item_stats(struct default_engine *engine,
451                          ADD_STAT add_stats, const void *c) {
452    int i;
453    rel_time_t current_time = engine->server.core->get_current_time();
454    for (i = 0; i < POWER_LARGEST; i++) {
455        if (engine->items.tails[i] != NULL) {
456            const char *prefix = "items";
457            int search = search_items;
458            while (search > 0 &&
459                   engine->items.tails[i] != NULL &&
460                   ((engine->config.oldest_live != 0 && /* Item flushd */
461                     engine->config.oldest_live <= current_time &&
462                     engine->items.tails[i]->time <= engine->config.oldest_live) ||
463                    (engine->items.tails[i]->exptime != 0 && /* and not expired */
464                     engine->items.tails[i]->exptime < current_time))) {
465                --search;
466                if (engine->items.tails[i]->refcount == 0) {
467                    do_item_unlink(engine, engine->items.tails[i]);
468                } else {
469                    break;
470                }
471            }
472            if (engine->items.tails[i] == NULL) {
473                /* We removed all of the items in this slab class */
474                continue;
475            }
476
477            add_statistics(c, add_stats, prefix, i, "number", "%u",
478                           engine->items.sizes[i]);
479            add_statistics(c, add_stats, prefix, i, "age", "%u",
480                           engine->items.tails[i]->time);
481            add_statistics(c, add_stats, prefix, i, "evicted",
482                           "%u", engine->items.itemstats[i].evicted);
483            add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
484                           "%u", engine->items.itemstats[i].evicted_nonzero);
485            add_statistics(c, add_stats, prefix, i, "evicted_time",
486                           "%u", engine->items.itemstats[i].evicted_time);
487            add_statistics(c, add_stats, prefix, i, "outofmemory",
488                           "%u", engine->items.itemstats[i].outofmemory);
489            add_statistics(c, add_stats, prefix, i, "tailrepairs",
490                           "%u", engine->items.itemstats[i].tailrepairs);;
491            add_statistics(c, add_stats, prefix, i, "reclaimed",
492                           "%u", engine->items.itemstats[i].reclaimed);;
493        }
494    }
495}
496
497/** dumps out a list of objects of each size, with granularity of 32 bytes */
498/*@null@*/
499static void do_item_stats_sizes(struct default_engine *engine,
500                                ADD_STAT add_stats, const void *c) {
501
502    /* max 1MB object, divided into 32 bytes size buckets */
503    const int num_buckets = 32768;
504    unsigned int* histogram = static_cast<unsigned int*>
505        (cb_calloc(num_buckets, sizeof(unsigned int)));
506
507    if (histogram != NULL) {
508        int i;
509
510        /* build the histogram */
511        for (i = 0; i < POWER_LARGEST; i++) {
512            hash_item *iter = engine->items.heads[i];
513            while (iter) {
514                size_t ntotal = ITEM_ntotal(engine, iter);
515                size_t bucket = ntotal / 32;
516                if ((ntotal % 32) != 0) {
517                    bucket++;
518                }
519                if (bucket < num_buckets) {
520                    histogram[bucket]++;
521                }
522                iter = iter->next;
523            }
524        }
525
526        /* write the buffer */
527        for (i = 0; i < num_buckets; i++) {
528            if (histogram[i] != 0) {
529                char key[8], val[32];
530                int klen, vlen;
531                klen = snprintf(key, sizeof(key), "%d", i * 32);
532                vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
533                if (klen > 0 && klen < int(sizeof(key)) && vlen > 0 &&
534                    vlen < int(sizeof(val))) {
535                    add_stats(key, klen, val, vlen, c);
536                }
537            }
538        }
539        cb_free(histogram);
540    }
541}
542
543/** wrapper around assoc_find which does the lazy expiration logic */
544hash_item* do_item_get(struct default_engine* engine,
545                       const hash_key* key,
546                       const DocStateFilter documentStateFilter) {
547    rel_time_t current_time = engine->server.core->get_current_time();
548    hash_item *it = assoc_find(crc32c(hash_key_get_key(key),
549                                      hash_key_get_key_len(key), 0),
550                               key);
551
552    if (it != NULL && engine->config.oldest_live != 0 &&
553        engine->config.oldest_live <= current_time &&
554        it->time <= engine->config.oldest_live) {
555        do_item_unlink(engine, it);           /* MTSAFE - items.lock held */
556        it = NULL;
557    }
558
559    if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
560        do_item_unlink(engine, it);           /* MTSAFE - items.lock held */
561        it = NULL;
562    }
563
564    if (it != NULL) {
565        if (it->iflag & ITEM_ZOMBIE) {
566            if (documentStateFilter == DocStateFilter::Alive) {
567                // The requested document is deleted, and you asked for alive
568                return nullptr;
569            }
570        } else {
571            if (documentStateFilter == DocStateFilter::Deleted) {
572                // The requested document is Alive, and you asked for Dead
573                return nullptr;
574            }
575        }
576
577        it->refcount++;
578        DEBUG_REFCNT(it, '+');
579        do_item_update(engine, it);
580    }
581
582    return it;
583}
584
585/*
586 * Stores an item in the cache according to the semantics of one of the set
587 * commands. In threaded mode, this is protected by the cache lock.
588 *
589 * Returns the state of storage.
590 */
591static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
592                                       hash_item *it,
593                                       ENGINE_STORE_OPERATION operation,
594                                       const void *cookie,
595                                       hash_item** stored_item) {
596    const hash_key* key = item_get_key(it);
597    hash_item* old_it =
598            do_item_get(engine, key, DocStateFilter::AliveOrDeleted);
599    ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
600
601    bool locked = false;
602    if (old_it != nullptr && old_it->locktime != 0) {
603        locked = old_it->locktime > engine->server.core->get_current_time();
604    }
605
606    if (old_it != NULL && operation == OPERATION_ADD &&
607        (old_it->iflag & ITEM_ZOMBIE) == 0) {
608        /* add only adds a nonexistent item, but promote to head of LRU */
609        do_item_update(engine, old_it);
610    } else if ((!old_it || (old_it->iflag & ITEM_ZOMBIE)) && operation == OPERATION_REPLACE) {
611        /* replace only replaces an existing value; don't store */
612    } else if (operation == OPERATION_CAS) {
613        /* validate cas operation */
614        if (old_it == NULL) {
615            /* LRU expired */
616            stored = ENGINE_KEY_ENOENT;
617        } else if (it->cas == old_it->cas) {
618            /* cas validates */
619            /* it and old_it may belong to different classes. */
620            /* I'm updating the stats for the one that's getting pushed out */
621            do_item_replace(engine, cookie, old_it, it);
622            stored = ENGINE_SUCCESS;
623        } else {
624            if (locked) {
625                stored = ENGINE_LOCKED;
626            } else if (old_it->iflag & ITEM_ZOMBIE && !(it->iflag &ITEM_ZOMBIE)) {
627                // If you try to do replace on a deleted document
628                stored = ENGINE_KEY_ENOENT;
629            } else {
630                stored = ENGINE_KEY_EEXISTS;
631            }
632        }
633    } else {
634        if (locked) {
635            stored = ENGINE_LOCKED;
636        } else {
637            stored = ENGINE_SUCCESS;
638            if (old_it != NULL) {
639                do_item_replace(engine, cookie, old_it, it);
640            } else {
641                if (do_item_link(engine, cookie, it) == 0) {
642                    stored = ENGINE_FAILED;
643                }
644            }
645        }
646    }
647
648    if (old_it != NULL) {
649        do_item_release(engine, old_it);         /* release our reference */
650    }
651
652    if (stored == ENGINE_SUCCESS) {
653        *stored_item = it;
654    }
655
656    return stored;
657}
658
659/********************************* ITEM ACCESS *******************************/
660
661/*
662 * Allocates a new item.
663 */
664hash_item *item_alloc(struct default_engine *engine,
665                      const void *key, size_t nkey, int flags,
666                      rel_time_t exptime, int nbytes, const void *cookie,
667                      uint8_t datatype) {
668    hash_item *it;
669    hash_key hkey;
670    if (!hash_key_create(&hkey, key, nkey, engine, cookie)) {
671        return NULL;
672    }
673    cb_mutex_enter(&engine->items.lock);
674    it = do_item_alloc(engine, &hkey, flags, exptime, nbytes, cookie, datatype);
675    cb_mutex_exit(&engine->items.lock);
676    hash_key_destroy(&hkey);
677    return it;
678}
679
680/*
681 * Returns an item if it hasn't been marked as expired,
682 * lazy-expiring as needed.
683 */
684hash_item* item_get(struct default_engine* engine,
685                    const void* cookie,
686                    const void* key,
687                    const size_t nkey,
688                    DocStateFilter document_state) {
689    hash_item *it;
690    hash_key hkey;
691    if (!hash_key_create(&hkey, key, nkey, engine, cookie)) {
692        return NULL;
693    }
694    it = item_get(engine, cookie, hkey, document_state);
695    hash_key_destroy(&hkey);
696    return it;
697}
698
699/*
700 * Returns an item if it hasn't been marked as expired,
701 * lazy-expiring as needed.
702 */
703hash_item* item_get(struct default_engine* engine,
704                    const void* cookie,
705                    const hash_key& key,
706                    const DocStateFilter state) {
707    cb_mutex_enter(&engine->items.lock);
708    auto* it = do_item_get(engine, &key, state);
709    cb_mutex_exit(&engine->items.lock);
710    return it;
711}
712
713/*
714 * Decrements the reference count on an item and adds it to the freelist if
715 * needed.
716 */
717void item_release(struct default_engine *engine, hash_item *item) {
718    cb_mutex_enter(&engine->items.lock);
719    do_item_release(engine, item);
720    cb_mutex_exit(&engine->items.lock);
721}
722
723/*
724 * Unlinks an item from the LRU and hashtable.
725 */
726void item_unlink(struct default_engine *engine, hash_item *item) {
727    cb_mutex_enter(&engine->items.lock);
728    do_item_unlink(engine, item);
729    cb_mutex_exit(&engine->items.lock);
730}
731
732ENGINE_ERROR_CODE safe_item_unlink(struct default_engine *engine,
733                                   hash_item *it) {
734    cb_mutex_enter(&engine->items.lock);
735    auto ret = do_safe_item_unlink(engine, it);
736    cb_mutex_exit(&engine->items.lock);
737    return ret;
738}
739
740/*
741 * Stores an item in the cache (high level, obeys set/add/replace semantics)
742 */
743ENGINE_ERROR_CODE store_item(struct default_engine *engine,
744                             hash_item *item, uint64_t *cas,
745                             ENGINE_STORE_OPERATION operation,
746                             const void *cookie,
747                             const DocumentState document_state) {
748    ENGINE_ERROR_CODE ret;
749    hash_item* stored_item = NULL;
750
751    if (document_state == DocumentState::Deleted) {
752        item->iflag |= ITEM_ZOMBIE;
753    }
754
755    cb_mutex_enter(&engine->items.lock);
756    ret = do_store_item(engine, item, operation, cookie, &stored_item);
757    if (ret == ENGINE_SUCCESS) {
758        *cas = stored_item->cas;
759    }
760    cb_mutex_exit(&engine->items.lock);
761    return ret;
762}
763
764ENGINE_ERROR_CODE do_item_get_locked(struct default_engine* engine,
765                                     const void* cookie,
766                                     hash_item** it,
767                                     const hash_key* hkey,
768                                     rel_time_t locktime) {
769    hash_item* item = do_item_get(engine, hkey, DocStateFilter::Alive);
770    if (item == nullptr) {
771        return ENGINE_KEY_ENOENT;
772    }
773
774    if (item->locktime != 0 &&
775        item->locktime > engine->server.core->get_current_time()) {
776        do_item_release(engine, item);
777        return ENGINE_LOCKED;
778    }
779
780    /*
781     * Unfortunately I have to create an extra copy of the item to return
782     * back to the caller, as I need a way to know if I should mask out
783     * the CAS or not. We don't have enough memory in the hash_item obj to
784     * identify the connection when we need to check if the CAS should be
785     * masked out (or not). I don't want us to lock memory on a per item
786     * base as I don't suspect most items to ever be locked.
787     *
788     * Instead I decided to create an extra clone of the item, and if the
789     * item isn't linked we should revel the real CAS.
790     *
791     * If I failed to create the temporary item I'm returning ENGINE_TMPFAIL.
792     *
793     * There is one minor optimization here.. If I'm the only one accessing
794     * the object (refcount == 1) I can update the in-memory object and only
795     * create a single copy. If multiple users holds a reference to the object
796     * I have to create two clones (one to put in the hashmap, and the
797     * temporary object to return back).
798     */
799    if (item->refcount == 1) {
800        // we're the only one with access, let's just do an in-place
801        // update of the metadata.
802
803        // Unfortunately I can't return the actual object as that'll cause
804        // the item's cas to be masked out ;-)
805        auto* clone = do_item_alloc(engine, hkey, item->flags, item->exptime,
806                                    item->nbytes, cookie, item->datatype);
807        if (clone == nullptr) {
808            do_item_release(engine, item);
809            return ENGINE_TMPFAIL;
810        }
811
812        // let's just do an in-place update of the metadata. and return the
813        // copy
814        clone->locktime = item->locktime = locktime;
815        clone->cas = item->cas = get_cas_id();
816
817        // Copy the payload
818        std::memcpy(item_get_data(clone), item_get_data(item), item->nbytes);
819
820        // Release the one in the linked table
821        do_item_release(engine, item);
822        *it = clone;
823    } else {
824        // Multiple entities holds a reference to the object. We
825        // need to do a copy/replace.
826        auto* clone1 = do_item_alloc(engine, hkey, item->flags, item->exptime,
827                                     item->nbytes, cookie, item->datatype);
828        if (clone1 == nullptr) {
829            do_item_release(engine, item);
830            return ENGINE_TMPFAIL;
831        }
832
833        auto* clone2 = do_item_alloc(engine, hkey, item->flags, item->exptime,
834                                     item->nbytes, cookie, item->datatype);
835        if (clone2 == nullptr) {
836            do_item_release(engine, item);
837            do_item_release(engine, clone1);
838            return ENGINE_TMPFAIL;
839        }
840
841        std::memcpy(item_get_data(clone1), item_get_data(item), item->nbytes);
842        std::memcpy(item_get_data(clone2), item_get_data(item), item->nbytes);
843        clone1->locktime = clone2->locktime = locktime;
844
845        do_item_replace(engine, cookie, item, clone1);
846
847        // do_item_replace generated a new cas id for this object
848        clone2->cas = clone1->cas;
849
850        // Release references
851        do_item_release(engine, item);
852        do_item_release(engine, clone1);
853        *it = clone2;
854    }
855
856    return ENGINE_SUCCESS;
857}
858
859ENGINE_ERROR_CODE item_get_locked(struct default_engine* engine,
860                                  const void* cookie,
861                                  hash_item** it,
862                                  const void* key,
863                                  const size_t nkey,
864                                  rel_time_t locktime) {
865    hash_key hkey;
866
867    if (!hash_key_create(&hkey, key, nkey, engine, cookie)) {
868        return ENGINE_TMPFAIL;
869    }
870
871    cb_mutex_enter(&engine->items.lock);
872    ENGINE_ERROR_CODE ret = do_item_get_locked(engine, cookie, it, &hkey,
873                                               locktime);
874    cb_mutex_exit(&engine->items.lock);
875    hash_key_destroy(&hkey);
876
877    return ret;
878}
879
880static ENGINE_ERROR_CODE do_item_unlock(struct default_engine* engine,
881                                        const void* cookie,
882                                        const hash_key* hkey,
883                                        uint64_t cas) {
884    hash_item* item = do_item_get(engine, hkey, DocStateFilter::Alive);
885    if (item == nullptr) {
886        return ENGINE_KEY_ENOENT;
887    }
888
889    if (item->cas != cas) {
890        // Invalid CAS value
891        auto ret = ENGINE_KEY_EEXISTS;
892
893        if (item->locktime != 0 &&
894            item->locktime > engine->server.core->get_current_time()) {
895            // To be in line with ep-engine we should return ENGINE_LOCKED
896            // when trying to unlock a locked value with the wrong cas
897            ret = ENGINE_LOCKED;
898        }
899
900        do_item_release(engine, item);
901        return ret;
902    }
903
904    if (item->refcount == 1) {
905        // I'm the only one with a reference to the object..
906        // Just do an in-place release of the object
907        item->locktime = 0;
908        do_item_release(engine, item);
909    } else {
910        // Someone else holds a reference to the object.
911        auto* clone = do_item_alloc(engine, hkey, item->flags, item->exptime,
912                                    item->nbytes, cookie, item->datatype);
913        if (clone == nullptr) {
914            do_item_release(engine, item);
915            return ENGINE_TMPFAIL;
916        }
917
918        std::memcpy(item_get_data(clone), item_get_data(item), item->nbytes);
919        clone->locktime = 0;
920
921        do_item_replace(engine, cookie, item, clone);
922        do_item_release(engine, clone);
923        do_item_release(engine, item);
924    }
925
926    return ENGINE_SUCCESS;
927}
928
929ENGINE_ERROR_CODE item_unlock(struct default_engine* engine,
930                              const void* cookie,
931                              const void* key,
932                              const size_t nkey,
933                              uint64_t cas) {
934    hash_key hkey;
935
936    if (!hash_key_create(&hkey, key, nkey, engine, cookie)) {
937        return ENGINE_TMPFAIL;
938    }
939
940    cb_mutex_enter(&engine->items.lock);
941    ENGINE_ERROR_CODE ret = do_item_unlock(engine, cookie, &hkey, cas);
942    cb_mutex_exit(&engine->items.lock);
943    hash_key_destroy(&hkey);
944
945    return ret;
946}
947
948ENGINE_ERROR_CODE do_item_get_and_touch(struct default_engine* engine,
949                                        const void* cookie,
950                                        hash_item** it,
951                                        const hash_key* hkey,
952                                        rel_time_t exptime) {
953    hash_item* item = do_item_get(engine, hkey, DocStateFilter::Alive);
954    if (item == nullptr) {
955        return ENGINE_KEY_ENOENT;
956    }
957
958    if (item->locktime != 0 &&
959        item->locktime > engine->server.core->get_current_time()) {
960        do_item_release(engine, item);
961        return ENGINE_LOCKED;
962    }
963
964    /*
965     * If I'm the only one accessing the object (refcount == 1) I can update
966     * the in-memory object. Otherwise I need to swap it out with a new one
967     */
968    if (item->exptime == exptime) {
969        // micro optimization:
970        // The new expiry time is set to the same value as it used to have
971        // so we don't need to do anything (this doesn't matter much for
972        // the memcached buckets, but for ep-engine it means that they
973        // don't have to update the disk copy with the new expiry time or
974        // send it out over DCP)
975        *it = item;
976    } else if (item->refcount == 1) {
977        // we're the only one with access, let's just do an in-place
978        // update of the metadata.
979        item->exptime = exptime;
980        item->cas = get_cas_id();
981        *it = item;
982    } else {
983        // Multiple entities holds a reference to the object. We
984        // need to do a copy/replace.
985        auto* clone = do_item_alloc(engine, hkey, item->flags, exptime,
986                                    item->nbytes, cookie, item->datatype);
987        if (clone == nullptr) {
988            do_item_release(engine, item);
989            return ENGINE_TMPFAIL;
990        }
991
992        std::memcpy(item_get_data(clone), item_get_data(item), item->nbytes);
993        clone->locktime = 0;
994        do_item_replace(engine, cookie, item, clone);
995
996        // Release references
997        do_item_release(engine, item);
998        *it = clone;
999    }
1000
1001    return ENGINE_SUCCESS;
1002}
1003
1004ENGINE_ERROR_CODE item_get_and_touch(struct default_engine* engine,
1005                                     const void* cookie,
1006                                     hash_item** it,
1007                                     const void* key,
1008                                     const size_t nkey,
1009                                     rel_time_t exptime) {
1010    hash_key hkey;
1011
1012    if (!hash_key_create(&hkey, key, nkey, engine, cookie)) {
1013        return ENGINE_TMPFAIL;
1014    }
1015
1016    cb_mutex_enter(&engine->items.lock);
1017    ENGINE_ERROR_CODE ret = do_item_get_and_touch(engine, cookie, it, &hkey,
1018                                                  exptime);
1019    cb_mutex_exit(&engine->items.lock);
1020    hash_key_destroy(&hkey);
1021
1022    return ret;
1023}
1024
1025/*
1026 * Flushes expired items after a flush_all call
1027 */
1028void item_flush_expired(struct default_engine *engine) {
1029    cb_mutex_enter(&engine->items.lock);
1030
1031    rel_time_t now = engine->server.core->get_current_time();
1032    if (now > engine->config.oldest_live) {
1033        engine->config.oldest_live = now - 1;
1034    }
1035
1036    for (int ii = 0; ii < POWER_LARGEST; ii++) {
1037        hash_item *iter, *next;
1038        /*
1039         * The LRU is sorted in decreasing time order, and an item's
1040         * timestamp is never newer than its last access time, so we
1041         * only need to walk back until we hit an item older than the
1042         * oldest_live time.
1043         * The oldest_live checking will auto-expire the remaining items.
1044         */
1045        for (iter = engine->items.heads[ii]; iter != NULL; iter = next) {
1046            if (iter->time >= engine->config.oldest_live) {
1047                next = iter->next;
1048                if ((iter->iflag & ITEM_SLABBED) == 0) {
1049                    do_item_unlink(engine, iter);
1050                }
1051            } else {
1052                /* We've hit the first old item. Continue to the next queue. */
1053                break;
1054            }
1055        }
1056    }
1057    cb_mutex_exit(&engine->items.lock);
1058}
1059
1060void item_stats(struct default_engine *engine,
1061                   ADD_STAT add_stat, const void *cookie)
1062{
1063    cb_mutex_enter(&engine->items.lock);
1064    do_item_stats(engine, add_stat, cookie);
1065    cb_mutex_exit(&engine->items.lock);
1066}
1067
1068
1069void item_stats_sizes(struct default_engine *engine,
1070                      ADD_STAT add_stat, const void *cookie)
1071{
1072    cb_mutex_enter(&engine->items.lock);
1073    do_item_stats_sizes(engine, add_stat, cookie);
1074    cb_mutex_exit(&engine->items.lock);
1075}
1076
1077static void do_item_link_cursor(struct default_engine *engine,
1078                                hash_item *cursor, int ii)
1079{
1080    cursor->slabs_clsid = (uint8_t)ii;
1081    cursor->next = NULL;
1082    cursor->prev = engine->items.tails[ii];
1083    engine->items.tails[ii]->next = cursor;
1084    engine->items.tails[ii] = cursor;
1085    engine->items.sizes[ii]++;
1086}
1087
1088typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
1089                                      hash_item *item, void *cookie);
1090
1091static bool do_item_walk_cursor(struct default_engine *engine,
1092                                hash_item *cursor,
1093                                int steplength,
1094                                ITERFUNC itemfunc,
1095                                void* itemdata,
1096                                ENGINE_ERROR_CODE *error)
1097{
1098    int ii = 0;
1099    *error = ENGINE_SUCCESS;
1100
1101    while (cursor->prev != NULL && ii < steplength) {
1102        /* Move cursor */
1103        hash_item *ptr = cursor->prev;
1104        bool done = false;
1105
1106        ++ii;
1107        item_unlink_q(engine, cursor);
1108
1109        if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1110            done = true;
1111            cursor->prev = NULL;
1112        } else {
1113            cursor->next = ptr;
1114            cursor->prev = ptr->prev;
1115            cursor->prev->next = cursor;
1116            ptr->prev = cursor;
1117        }
1118
1119        /* Ignore cursors */
1120        if (item_get_key(ptr)->header.len == 0 && ptr->nbytes == 0) {
1121            --ii;
1122        } else {
1123            *error = itemfunc(engine, ptr, itemdata);
1124            if (*error != ENGINE_SUCCESS) {
1125                return false;
1126            }
1127        }
1128
1129        if (done) {
1130            return false;
1131        }
1132    }
1133
1134    return (cursor->prev != NULL);
1135}
1136
1137static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1138                                    hash_item *item,
1139                                    void *cookie) {
1140    rel_time_t current_time = engine->server.core->get_current_time();
1141    (void)cookie;
1142    engine->scrubber.visited++;
1143    /*
1144        scrubber is used for generic bucket deletion and scrub_cmd
1145        all expired or orphaned items are unlinked
1146    */
1147    if (engine->scrubber.force_delete && item->refcount > 0) {
1148        // warn that someone isn't releasing items before deleting their bucket.
1149        LOG_WARNING("Bucket ({}) deletion is removing an item with refcount {}",
1150                    engine->bucket_id,
1151                    item->refcount);
1152    }
1153
1154    if (engine->scrubber.force_delete || (item->refcount == 0 &&
1155       (item->exptime != 0 && item->exptime < current_time))) {
1156        do_item_unlink(engine, item);
1157        engine->scrubber.cleaned++;
1158    }
1159    return ENGINE_SUCCESS;
1160}
1161
1162static void item_scrub_class(struct default_engine *engine,
1163                             hash_item *cursor) {
1164
1165    ENGINE_ERROR_CODE ret;
1166    bool more;
1167    do {
1168        cb_mutex_enter(&engine->items.lock);
1169        more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1170        cb_mutex_exit(&engine->items.lock);
1171        if (ret != ENGINE_SUCCESS) {
1172            break;
1173        }
1174    } while (more);
1175}
1176
1177void item_scrubber_main(struct default_engine *engine)
1178{
1179    hash_item cursor;
1180    int ii;
1181
1182    memset(&cursor, 0, sizeof(cursor));
1183    cursor.refcount = 1;
1184    for (ii = 0; ii < POWER_LARGEST; ++ii) {
1185        bool skip = false;
1186        cb_mutex_enter(&engine->items.lock);
1187        if (engine->items.heads[ii] == NULL) {
1188            skip = true;
1189        } else {
1190            /* add the item at the tail */
1191            do_item_link_cursor(engine, &cursor, ii);
1192        }
1193        cb_mutex_exit(&engine->items.lock);
1194
1195        if (!skip) {
1196            item_scrub_class(engine, &cursor);
1197        }
1198    }
1199
1200    cb_mutex_enter(&engine->scrubber.lock);
1201    engine->scrubber.stopped = time(NULL);
1202    engine->scrubber.running = false;
1203    cb_mutex_exit(&engine->scrubber.lock);
1204}
1205
1206bool item_start_scrub(struct default_engine *engine)
1207{
1208    bool ret = false;
1209
1210    cb_mutex_enter(&engine->scrubber.lock);
1211
1212    /*
1213        If the scrubber is already scrubbing items, ignore
1214    */
1215    if (!engine->scrubber.running) {
1216
1217        engine->scrubber.started = time(NULL);
1218        engine->scrubber.stopped = 0;
1219        engine->scrubber.visited = 0;
1220        engine->scrubber.cleaned = 0;
1221        engine->scrubber.running = true;
1222        engine_manager_scrub_engine(engine);
1223        ret = true;
1224    }
1225    cb_mutex_exit(&engine->scrubber.lock);
1226
1227    return ret;
1228}
1229
1230static bool hash_key_create(hash_key* hkey,
1231                            const void* key,
1232                            const size_t nkey,
1233                            struct default_engine* engine,
1234                            const void* cookie) {
1235    uint16_t hash_key_len = gsl::narrow<uint16_t>(sizeof(bucket_id_t) + nkey);
1236    if (nkey > sizeof(hkey->key_storage.client_key)) {
1237        hkey->header.full_key =
1238            static_cast<hash_key_data*>(cb_malloc(hash_key_len));
1239        if (hkey->header.full_key == NULL) {
1240            return false;
1241        }
1242    } else {
1243        hkey->header.full_key = (hash_key_data*)&hkey->key_storage;
1244    }
1245    hash_key_set_len(hkey, hash_key_len);
1246    hash_key_set_bucket_index(hkey, engine->bucket_id);
1247    hash_key_set_client_key(hkey, key, nkey);
1248    return true;
1249}
1250
1251static void hash_key_destroy(hash_key* hkey) {
1252    if ((void*)hkey->header.full_key != (void*)&hkey->key_storage) {
1253       cb_free(hkey->header.full_key);
1254    }
1255}
1256
1257/*
1258 * The item object stores a hash_key in a contiguous allocation
1259 * This method ensures correct copying into a contiguous hash_key
1260 */
1261static void hash_key_copy_to_item(hash_item* dst, const hash_key* src) {
1262    hash_key* key = item_get_key(dst);
1263    memcpy(key, src, sizeof(hash_key_header));
1264    key->header.full_key = (hash_key_data*)&key->key_storage;
1265    memcpy(hash_key_get_key(key), hash_key_get_key(src), hash_key_get_key_len(src));
1266}
1267