1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <inttypes.h>
10 
11 #include "default_engine.h"
12 
13 /* Forward Declarations */
14 static void item_link_q(struct default_engine *engine, hash_item *it);
15 static void item_unlink_q(struct default_engine *engine, hash_item *it);
16 static hash_item *do_item_alloc(struct default_engine *engine,
17                                 const void *key, const size_t nkey,
18                                 const int flags, const rel_time_t exptime,
19                                 const int nbytes,
20                                 const void *cookie,
21                                 uint8_t datatype);
22 static hash_item *do_item_get(struct default_engine *engine,
23                               const char *key, const size_t nkey);
24 static int do_item_link(struct default_engine *engine, hash_item *it);
25 static void do_item_unlink(struct default_engine *engine, hash_item *it);
26 static void do_item_release(struct default_engine *engine, hash_item *it);
27 static void do_item_update(struct default_engine *engine, hash_item *it);
28 static int do_item_replace(struct default_engine *engine,
29                             hash_item *it, hash_item *new_it);
30 static void item_free(struct default_engine *engine, hash_item *it);
31 
32 /*
33  * We only reposition items in the LRU queue if they haven't been repositioned
34  * in this many seconds. That saves us from churning on frequently-accessed
35  * items.
36  */
37 #define ITEM_UPDATE_INTERVAL 60
38 /*
39  * To avoid scanning through the complete cache in some circumstances we'll
40  * just give up and return an error after inspecting a fixed number of objects.
41  */
42 static const int search_items = 50;
43 
item_stats_reset(struct default_engine *engine)44 void item_stats_reset(struct default_engine *engine) {
45     cb_mutex_enter(&engine->cache_lock);
46     memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47     cb_mutex_exit(&engine->cache_lock);
48 }
49 
50 
51 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine *engine, const hash_item *item)52 static size_t ITEM_ntotal(struct default_engine *engine,
53                           const hash_item *item) {
54     size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55     if (engine->config.use_cas) {
56         ret += sizeof(uint64_t);
57     }
58 
59     return ret;
60 }
61 
62 /* Get the next CAS id for a new item. */
get_cas_id(void)63 static uint64_t get_cas_id(void) {
64     static uint64_t cas_id = 0;
65     return ++cas_id;
66 }
67 
68 /* Enable this for reference-count debugging. */
69 #if 0
70 # define DEBUG_REFCNT(it,op) \
71                 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72                         it, op, it->refcount, \
73                         (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74                         (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
75 #else
76 # define DEBUG_REFCNT(it,op) while(0)
77 #endif
78 
79 
80 /*@null@*/
do_item_alloc(struct default_engine *engine, const void *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes, const void *cookie, uint8_t datatype)81 hash_item *do_item_alloc(struct default_engine *engine,
82                          const void *key,
83                          const size_t nkey,
84                          const int flags,
85                          const rel_time_t exptime,
86                          const int nbytes,
87                          const void *cookie,
88                          uint8_t datatype) {
89     hash_item *it = NULL;
90     int tries = search_items;
91     hash_item *search;
92     rel_time_t oldest_live;
93     rel_time_t current_time;
94     unsigned int id;
95 
96     size_t ntotal = sizeof(hash_item) + nkey + nbytes;
97     if (engine->config.use_cas) {
98         ntotal += sizeof(uint64_t);
99     }
100 
101     if ((id = slabs_clsid(engine, ntotal)) == 0) {
102         return 0;
103     }
104 
105     /* do a quick check if we have any expired items in the tail.. */
106     tries = search_items;
107     oldest_live = engine->config.oldest_live;
108     current_time = engine->server.core->get_current_time();
109 
110     for (search = engine->items.tails[id];
111          tries > 0 && search != NULL;
112          tries--, search=search->prev) {
113         if (search->refcount == 0 &&
114             ((search->time < oldest_live) || /* dead by flush */
115              (search->exptime != 0 && search->exptime < current_time))) {
116             it = search;
117             /* I don't want to actually free the object, just steal
118              * the item to avoid to grab the slab mutex twice ;-)
119              */
120             cb_mutex_enter(&engine->stats.lock);
121             engine->stats.reclaimed++;
122             cb_mutex_exit(&engine->stats.lock);
123             engine->items.itemstats[id].reclaimed++;
124             it->refcount = 1;
125             slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
126             do_item_unlink(engine, it);
127             /* Initialize the item block: */
128             it->slabs_clsid = 0;
129             it->refcount = 0;
130             break;
131         }
132     }
133 
134     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
135         /*
136         ** Could not find an expired item at the tail, and memory allocation
137         ** failed. Try to evict some items!
138         */
139         tries = search_items;
140 
141         /* If requested to not push old items out of cache when memory runs out,
142          * we're out of luck at this point...
143          */
144 
145         if (engine->config.evict_to_free == 0) {
146             engine->items.itemstats[id].outofmemory++;
147             return NULL;
148         }
149 
150         /*
151          * try to get one off the right LRU
152          * don't necessariuly unlink the tail because it may be locked: refcount>0
153          * search up from tail an item with refcount==0 and unlink it; give up after search_items
154          * tries
155          */
156 
157         if (engine->items.tails[id] == 0) {
158             engine->items.itemstats[id].outofmemory++;
159             return NULL;
160         }
161 
162         for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
163             if (search->refcount == 0) {
164                 if (search->exptime == 0 || search->exptime > current_time) {
165                     engine->items.itemstats[id].evicted++;
166                     engine->items.itemstats[id].evicted_time = current_time - search->time;
167                     if (search->exptime != 0) {
168                         engine->items.itemstats[id].evicted_nonzero++;
169                     }
170                     cb_mutex_enter(&engine->stats.lock);
171                     engine->stats.evictions++;
172                     cb_mutex_exit(&engine->stats.lock);
173                     engine->server.stat->evicting(cookie,
174                                                   item_get_key(search),
175                                                   search->nkey);
176                 } else {
177                     engine->items.itemstats[id].reclaimed++;
178                     cb_mutex_enter(&engine->stats.lock);
179                     engine->stats.reclaimed++;
180                     cb_mutex_exit(&engine->stats.lock);
181                 }
182                 do_item_unlink(engine, search);
183                 break;
184             }
185         }
186         it = slabs_alloc(engine, ntotal, id);
187         if (it == 0) {
188             engine->items.itemstats[id].outofmemory++;
189             /* Last ditch effort. There is a very rare bug which causes
190              * refcount leaks. We've fixed most of them, but it still happens,
191              * and it may happen in the future.
192              * We can reasonably assume no item can stay locked for more than
193              * three hours, so if we find one in the tail which is that old,
194              * free it anyway.
195              */
196             tries = search_items;
197             for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
198                 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
199                     engine->items.itemstats[id].tailrepairs++;
200                     search->refcount = 0;
201                     do_item_unlink(engine, search);
202                     break;
203                 }
204             }
205             it = slabs_alloc(engine, ntotal, id);
206             if (it == 0) {
207                 return NULL;
208             }
209         }
210     }
211 
212     cb_assert(it->slabs_clsid == 0);
213 
214     it->slabs_clsid = id;
215 
216     cb_assert(it != engine->items.heads[it->slabs_clsid]);
217 
218     it->next = it->prev = it->h_next = 0;
219     it->refcount = 1;     /* the caller will have a reference */
220     DEBUG_REFCNT(it, '*');
221     it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
222     it->nkey = (uint16_t)nkey;
223     it->nbytes = nbytes;
224     it->flags = flags;
225     it->datatype = datatype;
226     memcpy((void*)item_get_key(it), key, nkey);
227     it->exptime = exptime;
228     return it;
229 }
230 
item_free(struct default_engine *engine, hash_item *it)231 static void item_free(struct default_engine *engine, hash_item *it) {
232     size_t ntotal = ITEM_ntotal(engine, it);
233     unsigned int clsid;
234     cb_assert((it->iflag & ITEM_LINKED) == 0);
235     cb_assert(it != engine->items.heads[it->slabs_clsid]);
236     cb_assert(it != engine->items.tails[it->slabs_clsid]);
237     cb_assert(it->refcount == 0);
238 
239     /* so slab size changer can tell later if item is already free or not */
240     clsid = it->slabs_clsid;
241     it->slabs_clsid = 0;
242     it->iflag |= ITEM_SLABBED;
243     DEBUG_REFCNT(it, 'F');
244     slabs_free(engine, it, ntotal, clsid);
245 }
246 
item_link_q(struct default_engine *engine, hash_item *it)247 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
248     hash_item **head, **tail;
249     cb_assert(it->slabs_clsid < POWER_LARGEST);
250     cb_assert((it->iflag & ITEM_SLABBED) == 0);
251 
252     head = &engine->items.heads[it->slabs_clsid];
253     tail = &engine->items.tails[it->slabs_clsid];
254     cb_assert(it != *head);
255     cb_assert((*head && *tail) || (*head == 0 && *tail == 0));
256     it->prev = 0;
257     it->next = *head;
258     if (it->next) it->next->prev = it;
259     *head = it;
260     if (*tail == 0) *tail = it;
261     engine->items.sizes[it->slabs_clsid]++;
262     return;
263 }
264 
item_unlink_q(struct default_engine *engine, hash_item *it)265 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
266     hash_item **head, **tail;
267     cb_assert(it->slabs_clsid < POWER_LARGEST);
268     head = &engine->items.heads[it->slabs_clsid];
269     tail = &engine->items.tails[it->slabs_clsid];
270 
271     if (*head == it) {
272         cb_assert(it->prev == 0);
273         *head = it->next;
274     }
275     if (*tail == it) {
276         cb_assert(it->next == 0);
277         *tail = it->prev;
278     }
279     cb_assert(it->next != it);
280     cb_assert(it->prev != it);
281 
282     if (it->next) it->next->prev = it->prev;
283     if (it->prev) it->prev->next = it->next;
284     engine->items.sizes[it->slabs_clsid]--;
285     return;
286 }
287 
do_item_link(struct default_engine *engine, hash_item *it)288 int do_item_link(struct default_engine *engine, hash_item *it) {
289     MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
290     cb_assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
291     cb_assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
292     it->iflag |= ITEM_LINKED;
293     it->time = engine->server.core->get_current_time();
294     assoc_insert(engine, engine->server.core->hash(item_get_key(it),
295                                                         it->nkey, 0),
296                  it);
297 
298     cb_mutex_enter(&engine->stats.lock);
299     engine->stats.curr_bytes += ITEM_ntotal(engine, it);
300     engine->stats.curr_items += 1;
301     engine->stats.total_items += 1;
302     cb_mutex_exit(&engine->stats.lock);
303 
304     /* Allocate a new CAS ID on link. */
305     item_set_cas(NULL, NULL, it, get_cas_id());
306 
307     item_link_q(engine, it);
308 
309     return 1;
310 }
311 
do_item_unlink(struct default_engine *engine, hash_item *it)312 void do_item_unlink(struct default_engine *engine, hash_item *it) {
313     MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
314     if ((it->iflag & ITEM_LINKED) != 0) {
315         it->iflag &= ~ITEM_LINKED;
316         cb_mutex_enter(&engine->stats.lock);
317         engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
318         engine->stats.curr_items -= 1;
319         cb_mutex_exit(&engine->stats.lock);
320         assoc_delete(engine, engine->server.core->hash(item_get_key(it),
321                                                             it->nkey, 0),
322                      item_get_key(it), it->nkey);
323         item_unlink_q(engine, it);
324         if (it->refcount == 0) {
325             item_free(engine, it);
326         }
327     }
328 }
329 
do_item_release(struct default_engine *engine, hash_item *it)330 void do_item_release(struct default_engine *engine, hash_item *it) {
331     MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
332     if (it->refcount != 0) {
333         it->refcount--;
334         DEBUG_REFCNT(it, '-');
335     }
336     if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
337         item_free(engine, it);
338     }
339 }
340 
do_item_update(struct default_engine *engine, hash_item *it)341 void do_item_update(struct default_engine *engine, hash_item *it) {
342     rel_time_t current_time = engine->server.core->get_current_time();
343     MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
344     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
345         cb_assert((it->iflag & ITEM_SLABBED) == 0);
346 
347         if ((it->iflag & ITEM_LINKED) != 0) {
348             item_unlink_q(engine, it);
349             it->time = current_time;
350             item_link_q(engine, it);
351         }
352     }
353 }
354 
do_item_replace(struct default_engine *engine, hash_item *it, hash_item *new_it)355 int do_item_replace(struct default_engine *engine,
356                     hash_item *it, hash_item *new_it) {
357     MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
358                            item_get_key(new_it), new_it->nkey, new_it->nbytes);
359     cb_assert((it->iflag & ITEM_SLABBED) == 0);
360 
361     do_item_unlink(engine, it);
362     return do_item_link(engine, new_it);
363 }
364 
365 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes)366 static char *do_item_cachedump(const unsigned int slabs_clsid,
367                                const unsigned int limit,
368                                unsigned int *bytes) {
369 #ifdef FUTURE
370     unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
371     char *buffer;
372     unsigned int bufcurr;
373     hash_item *it;
374     unsigned int len;
375     unsigned int shown = 0;
376     char key_temp[KEY_MAX_LENGTH + 1];
377     char temp[512];
378 
379     it = engine->items.heads[slabs_clsid];
380 
381     buffer = malloc((size_t)memlimit);
382     if (buffer == 0) return NULL;
383     bufcurr = 0;
384 
385 
386     while (it != NULL && (limit == 0 || shown < limit)) {
387         cb_assert(it->nkey <= KEY_MAX_LENGTH);
388         /* Copy the key since it may not be null-terminated in the struct */
389         strncpy(key_temp, item_get_key(it), it->nkey);
390         key_temp[it->nkey] = 0x00; /* terminate */
391         len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
392                        key_temp, it->nbytes,
393                        (unsigned long)it->exptime + process_started);
394         if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
395             break;
396         memcpy(buffer + bufcurr, temp, len);
397         bufcurr += len;
398         shown++;
399         it = it->next;
400     }
401 
402 
403     memcpy(buffer + bufcurr, "END\r\n", 6);
404     bufcurr += 5;
405 
406     *bytes = bufcurr;
407     return buffer;
408 #endif
409     (void)slabs_clsid;
410     (void)limit;
411     (void)bytes;
412     return NULL;
413 }
414 
do_item_stats(struct default_engine *engine, ADD_STAT add_stats, const void *c)415 static void do_item_stats(struct default_engine *engine,
416                           ADD_STAT add_stats, const void *c) {
417     int i;
418     rel_time_t current_time = engine->server.core->get_current_time();
419     for (i = 0; i < POWER_LARGEST; i++) {
420         if (engine->items.tails[i] != NULL) {
421             const char *prefix = "items";
422             int search = search_items;
423             while (search > 0 &&
424                    engine->items.tails[i] != NULL &&
425                    ((engine->config.oldest_live != 0 && /* Item flushd */
426                      engine->config.oldest_live <= current_time &&
427                      engine->items.tails[i]->time <= engine->config.oldest_live) ||
428                     (engine->items.tails[i]->exptime != 0 && /* and not expired */
429                      engine->items.tails[i]->exptime < current_time))) {
430                 --search;
431                 if (engine->items.tails[i]->refcount == 0) {
432                     do_item_unlink(engine, engine->items.tails[i]);
433                 } else {
434                     break;
435                 }
436             }
437             if (engine->items.tails[i] == NULL) {
438                 /* We removed all of the items in this slab class */
439                 continue;
440             }
441 
442             add_statistics(c, add_stats, prefix, i, "number", "%u",
443                            engine->items.sizes[i]);
444             add_statistics(c, add_stats, prefix, i, "age", "%u",
445                            engine->items.tails[i]->time);
446             add_statistics(c, add_stats, prefix, i, "evicted",
447                            "%u", engine->items.itemstats[i].evicted);
448             add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
449                            "%u", engine->items.itemstats[i].evicted_nonzero);
450             add_statistics(c, add_stats, prefix, i, "evicted_time",
451                            "%u", engine->items.itemstats[i].evicted_time);
452             add_statistics(c, add_stats, prefix, i, "outofmemory",
453                            "%u", engine->items.itemstats[i].outofmemory);
454             add_statistics(c, add_stats, prefix, i, "tailrepairs",
455                            "%u", engine->items.itemstats[i].tailrepairs);;
456             add_statistics(c, add_stats, prefix, i, "reclaimed",
457                            "%u", engine->items.itemstats[i].reclaimed);;
458         }
459     }
460 }
461 
462 /** dumps out a list of objects of each size, with granularity of 32 bytes */
463 /*@null@*/
do_item_stats_sizes(struct default_engine *engine, ADD_STAT add_stats, const void *c)464 static void do_item_stats_sizes(struct default_engine *engine,
465                                 ADD_STAT add_stats, const void *c) {
466 
467     /* max 1MB object, divided into 32 bytes size buckets */
468     const int num_buckets = 32768;
469     unsigned int *histogram = calloc(num_buckets, sizeof(unsigned int));
470 
471     if (histogram != NULL) {
472         int i;
473 
474         /* build the histogram */
475         for (i = 0; i < POWER_LARGEST; i++) {
476             hash_item *iter = engine->items.heads[i];
477             while (iter) {
478                 size_t ntotal = ITEM_ntotal(engine, iter);
479                 size_t bucket = ntotal / 32;
480                 if ((ntotal % 32) != 0) {
481                     bucket++;
482                 }
483                 if (bucket < num_buckets) {
484                     histogram[bucket]++;
485                 }
486                 iter = iter->next;
487             }
488         }
489 
490         /* write the buffer */
491         for (i = 0; i < num_buckets; i++) {
492             if (histogram[i] != 0) {
493                 char key[8], val[32];
494                 int klen, vlen;
495                 klen = snprintf(key, sizeof(key), "%d", i * 32);
496                 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
497                 cb_assert(klen < sizeof(key));
498                 cb_assert(vlen < sizeof(val));
499                 add_stats(key, klen, val, vlen, c);
500             }
501         }
502         free(histogram);
503     }
504 }
505 
506 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine *engine, const char *key, const size_t nkey)507 hash_item *do_item_get(struct default_engine *engine,
508                        const char *key, const size_t nkey) {
509     rel_time_t current_time = engine->server.core->get_current_time();
510     hash_item *it = assoc_find(engine, engine->server.core->hash(key,
511                                                                       nkey, 0),
512                                key, nkey);
513     int was_found = 0;
514 
515     if (engine->config.verbose > 2) {
516         EXTENSION_LOGGER_DESCRIPTOR *logger;
517         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
518         if (it == NULL) {
519             logger->log(EXTENSION_LOG_DEBUG, NULL,
520                         "> NOT FOUND %s", key);
521         } else {
522             logger->log(EXTENSION_LOG_DEBUG, NULL,
523                         "> FOUND KEY %s",
524                         (const char*)item_get_key(it));
525             was_found++;
526         }
527     }
528 
529     if (it != NULL && engine->config.oldest_live != 0 &&
530         engine->config.oldest_live <= current_time &&
531         it->time <= engine->config.oldest_live) {
532         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
533         it = NULL;
534     }
535 
536     if (it == NULL && was_found) {
537         EXTENSION_LOGGER_DESCRIPTOR *logger;
538         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
539         logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
540         was_found--;
541     }
542 
543     if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
544         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
545         it = NULL;
546     }
547 
548     if (it == NULL && was_found) {
549         EXTENSION_LOGGER_DESCRIPTOR *logger;
550         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
551         logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
552         was_found--;
553     }
554 
555     if (it != NULL) {
556         it->refcount++;
557         DEBUG_REFCNT(it, '+');
558         do_item_update(engine, it);
559     }
560 
561     return it;
562 }
563 
564 /*
565  * Stores an item in the cache according to the semantics of one of the set
566  * commands. In threaded mode, this is protected by the cache lock.
567  *
568  * Returns the state of storage.
569  */
do_store_item(struct default_engine *engine, hash_item *it, uint64_t *cas, ENGINE_STORE_OPERATION operation, const void *cookie)570 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
571                                        hash_item *it, uint64_t *cas,
572                                        ENGINE_STORE_OPERATION operation,
573                                        const void *cookie) {
574     const char *key = item_get_key(it);
575     hash_item *old_it = do_item_get(engine, key, it->nkey);
576     ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
577 
578     hash_item *new_it = NULL;
579 
580     if (old_it != NULL && operation == OPERATION_ADD) {
581         /* add only adds a nonexistent item, but promote to head of LRU */
582         do_item_update(engine, old_it);
583     } else if (!old_it && (operation == OPERATION_REPLACE
584         || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
585     {
586         /* replace only replaces an existing value; don't store */
587     } else if (operation == OPERATION_CAS) {
588         /* validate cas operation */
589         if(old_it == NULL) {
590             /* LRU expired */
591             stored = ENGINE_KEY_ENOENT;
592         }
593         else if (item_get_cas(it) == item_get_cas(old_it)) {
594             /* cas validates */
595             /* it and old_it may belong to different classes. */
596             /* I'm updating the stats for the one that's getting pushed out */
597             do_item_replace(engine, old_it, it);
598             stored = ENGINE_SUCCESS;
599         } else {
600             if (engine->config.verbose > 1) {
601                 EXTENSION_LOGGER_DESCRIPTOR *logger;
602                 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
603                 logger->log(EXTENSION_LOG_INFO, NULL,
604                         "CAS:  failure: expected %"PRIu64", got %"PRIu64"\n",
605                         item_get_cas(old_it),
606                         item_get_cas(it));
607             }
608             stored = ENGINE_KEY_EEXISTS;
609         }
610     } else {
611         /*
612          * Append - combine new and old record into single one. Here it's
613          * atomic and thread-safe.
614          */
615         if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
616             /*
617              * Validate CAS
618              */
619             if (item_get_cas(it) != 0) {
620                 /* CAS much be equal */
621                 if (item_get_cas(it) != item_get_cas(old_it)) {
622                     stored = ENGINE_KEY_EEXISTS;
623                 }
624             }
625 
626             if (stored == ENGINE_NOT_STORED) {
627                 size_t total = it->nbytes + old_it->nbytes;
628                 if (total > engine->config.item_size_max) {
629                     return ENGINE_E2BIG;
630                 }
631 
632                 /* we have it and old_it here - alloc memory to hold both */
633                 new_it = do_item_alloc(engine, key, it->nkey,
634                                        old_it->flags,
635                                        old_it->exptime,
636                                        it->nbytes + old_it->nbytes,
637                                        cookie, it->datatype);
638                 if (new_it == NULL) {
639                     /* SERVER_ERROR out of memory */
640                     if (old_it != NULL) {
641                         do_item_release(engine, old_it);
642                     }
643 
644                     return ENGINE_NOT_STORED;
645                 }
646 
647                 /* copy data from it and old_it to new_it */
648 
649                 if (operation == OPERATION_APPEND) {
650                     memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
651                     memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
652                 } else {
653                     /* OPERATION_PREPEND */
654                     memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
655                     memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
656                 }
657 
658                 it = new_it;
659             }
660         }
661 
662         if (stored == ENGINE_NOT_STORED) {
663             if (old_it != NULL) {
664                 do_item_replace(engine, old_it, it);
665             } else {
666                 do_item_link(engine, it);
667             }
668 
669             *cas = item_get_cas(it);
670             stored = ENGINE_SUCCESS;
671         }
672     }
673 
674     if (old_it != NULL) {
675         do_item_release(engine, old_it);         /* release our reference */
676     }
677 
678     if (new_it != NULL) {
679         do_item_release(engine, new_it);
680     }
681 
682     if (stored == ENGINE_SUCCESS) {
683         *cas = item_get_cas(it);
684     }
685 
686     return stored;
687 }
688 
689 
690 /*
691  * adds a delta value to a numeric item.
692  *
693  * c     connection requesting the operation
694  * it    item to adjust
695  * incr  true to increment value, false to decrement
696  * delta amount to adjust value by
697  * buf   buffer for response string
698  *
699  * returns a response string to send back to the client.
700  */
do_add_delta(struct default_engine *engine, hash_item *it, const bool incr, const int64_t delta, uint64_t *rcas, uint64_t *result, const void *cookie)701 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
702                                       hash_item *it, const bool incr,
703                                       const int64_t delta, uint64_t *rcas,
704                                       uint64_t *result, const void *cookie) {
705     const char *ptr;
706     uint64_t value;
707     char buf[80];
708     int res;
709 
710     if (it->nbytes >= (sizeof(buf) - 1)) {
711         return ENGINE_EINVAL;
712     }
713 
714     ptr = item_get_data(it);
715     memcpy(buf, ptr, it->nbytes);
716     buf[it->nbytes] = '\0';
717 
718     if (!safe_strtoull(buf, &value)) {
719         return ENGINE_EINVAL;
720     }
721 
722     if (incr) {
723         value += delta;
724     } else {
725         if ((uint64_t)delta > value) {
726             value = 0;
727         } else {
728             value -= delta;
729         }
730     }
731 
732     *result = value;
733     if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
734         return ENGINE_EINVAL;
735     }
736 
737     if (it->refcount == 1 && res <= (int)it->nbytes) {
738         /* we can do inline replacement */
739         memcpy(item_get_data(it), buf, res);
740         memset(item_get_data(it) + res, ' ', it->nbytes - res);
741         item_set_cas(NULL, NULL, it, get_cas_id());
742         *rcas = item_get_cas(it);
743     } else {
744         hash_item *new_it = do_item_alloc(engine, item_get_key(it),
745                                           it->nkey, it->flags,
746                                           it->exptime, res,
747                                           cookie, it->datatype);
748         if (new_it == NULL) {
749             do_item_unlink(engine, it);
750             return ENGINE_ENOMEM;
751         }
752         memcpy(item_get_data(new_it), buf, res);
753         do_item_replace(engine, it, new_it);
754         *rcas = item_get_cas(new_it);
755         do_item_release(engine, new_it);       /* release our reference */
756     }
757 
758     return ENGINE_SUCCESS;
759 }
760 
761 /********************************* ITEM ACCESS *******************************/
762 
763 /*
764  * Allocates a new item.
765  */
item_alloc(struct default_engine *engine, const void *key, size_t nkey, int flags, rel_time_t exptime, int nbytes, const void *cookie, uint8_t datatype)766 hash_item *item_alloc(struct default_engine *engine,
767                       const void *key, size_t nkey, int flags,
768                       rel_time_t exptime, int nbytes, const void *cookie,
769                       uint8_t datatype) {
770     hash_item *it;
771     cb_mutex_enter(&engine->cache_lock);
772     it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie,
773                        datatype);
774     cb_mutex_exit(&engine->cache_lock);
775     return it;
776 }
777 
778 /*
779  * Returns an item if it hasn't been marked as expired,
780  * lazy-expiring as needed.
781  */
item_get(struct default_engine *engine, const void *key, const size_t nkey)782 hash_item *item_get(struct default_engine *engine,
783                     const void *key, const size_t nkey) {
784     hash_item *it;
785     cb_mutex_enter(&engine->cache_lock);
786     it = do_item_get(engine, key, nkey);
787     cb_mutex_exit(&engine->cache_lock);
788     return it;
789 }
790 
791 /*
792  * Decrements the reference count on an item and adds it to the freelist if
793  * needed.
794  */
item_release(struct default_engine *engine, hash_item *item)795 void item_release(struct default_engine *engine, hash_item *item) {
796     cb_mutex_enter(&engine->cache_lock);
797     do_item_release(engine, item);
798     cb_mutex_exit(&engine->cache_lock);
799 }
800 
801 /*
802  * Unlinks an item from the LRU and hashtable.
803  */
item_unlink(struct default_engine *engine, hash_item *item)804 void item_unlink(struct default_engine *engine, hash_item *item) {
805     cb_mutex_enter(&engine->cache_lock);
806     do_item_unlink(engine, item);
807     cb_mutex_exit(&engine->cache_lock);
808 }
809 
do_arithmetic(struct default_engine *engine, const void* cookie, const void* key, const int nkey, const bool increment, const bool create, const uint64_t delta, const uint64_t initial, const rel_time_t exptime, uint64_t *cas, uint8_t datatype, uint64_t *result)810 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
811                                        const void* cookie,
812                                        const void* key,
813                                        const int nkey,
814                                        const bool increment,
815                                        const bool create,
816                                        const uint64_t delta,
817                                        const uint64_t initial,
818                                        const rel_time_t exptime,
819                                        uint64_t *cas,
820                                        uint8_t datatype,
821                                        uint64_t *result)
822 {
823    hash_item *item = do_item_get(engine, key, nkey);
824    ENGINE_ERROR_CODE ret;
825 
826    if (item == NULL) {
827       if (!create) {
828          return ENGINE_KEY_ENOENT;
829       } else {
830          char buffer[128];
831          int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
832                             (uint64_t)initial);
833 
834          item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie,
835                               datatype);
836          if (item == NULL) {
837             return ENGINE_ENOMEM;
838          }
839          memcpy((void*)item_get_data(item), buffer, len);
840          if ((ret = do_store_item(engine, item, cas,
841                                   OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
842              *result = initial;
843              *cas = item_get_cas(item);
844          }
845          do_item_release(engine, item);
846       }
847    } else {
848       ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
849       do_item_release(engine, item);
850    }
851 
852    return ret;
853 }
854 
arithmetic(struct default_engine *engine, const void* cookie, const void* key, const int nkey, const bool increment, const bool create, const uint64_t delta, const uint64_t initial, const rel_time_t exptime, uint64_t *cas, uint8_t datatype, uint64_t *result)855 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
856                              const void* cookie,
857                              const void* key,
858                              const int nkey,
859                              const bool increment,
860                              const bool create,
861                              const uint64_t delta,
862                              const uint64_t initial,
863                              const rel_time_t exptime,
864                              uint64_t *cas,
865                              uint8_t datatype,
866                              uint64_t *result)
867 {
868     ENGINE_ERROR_CODE ret;
869 
870     cb_mutex_enter(&engine->cache_lock);
871     ret = do_arithmetic(engine, cookie, key, nkey, increment,
872                         create, delta, initial, exptime, cas,
873                         datatype, result);
874     cb_mutex_exit(&engine->cache_lock);
875     return ret;
876 }
877 
878 /*
879  * Stores an item in the cache (high level, obeys set/add/replace semantics)
880  */
store_item(struct default_engine *engine, hash_item *item, uint64_t *cas, ENGINE_STORE_OPERATION operation, const void *cookie)881 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
882                              hash_item *item, uint64_t *cas,
883                              ENGINE_STORE_OPERATION operation,
884                              const void *cookie) {
885     ENGINE_ERROR_CODE ret;
886 
887     cb_mutex_enter(&engine->cache_lock);
888     ret = do_store_item(engine, item, cas, operation, cookie);
889     cb_mutex_exit(&engine->cache_lock);
890     return ret;
891 }
892 
do_touch_item(struct default_engine *engine, const void *key, uint16_t nkey, uint32_t exptime)893 static hash_item *do_touch_item(struct default_engine *engine,
894                                      const void *key,
895                                      uint16_t nkey,
896                                      uint32_t exptime)
897 {
898    hash_item *item = do_item_get(engine, key, nkey);
899    if (item != NULL) {
900        item->exptime = exptime;
901    }
902    return item;
903 }
904 
touch_item(struct default_engine *engine, const void *key, uint16_t nkey, uint32_t exptime)905 hash_item *touch_item(struct default_engine *engine,
906                            const void *key,
907                            uint16_t nkey,
908                            uint32_t exptime)
909 {
910     hash_item *ret;
911 
912     cb_mutex_enter(&engine->cache_lock);
913     ret = do_touch_item(engine, key, nkey, exptime);
914     cb_mutex_exit(&engine->cache_lock);
915     return ret;
916 }
917 
918 /*
919  * Flushes expired items after a flush_all call
920  */
item_flush_expired(struct default_engine *engine, time_t when)921 void item_flush_expired(struct default_engine *engine, time_t when) {
922     int i;
923     hash_item *iter, *next;
924 
925     cb_mutex_enter(&engine->cache_lock);
926 
927     if (when == 0) {
928         engine->config.oldest_live = engine->server.core->get_current_time() - 1;
929     } else {
930         engine->config.oldest_live = engine->server.core->realtime(when) - 1;
931     }
932 
933     if (engine->config.oldest_live != 0) {
934         for (i = 0; i < POWER_LARGEST; i++) {
935             /*
936              * The LRU is sorted in decreasing time order, and an item's
937              * timestamp is never newer than its last access time, so we
938              * only need to walk back until we hit an item older than the
939              * oldest_live time.
940              * The oldest_live checking will auto-expire the remaining items.
941              */
942             for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
943                 if (iter->time >= engine->config.oldest_live) {
944                     next = iter->next;
945                     if ((iter->iflag & ITEM_SLABBED) == 0) {
946                         do_item_unlink(engine, iter);
947                     }
948                 } else {
949                     /* We've hit the first old item. Continue to the next queue. */
950                     break;
951                 }
952             }
953         }
954     }
955     cb_mutex_exit(&engine->cache_lock);
956 }
957 
958 /*
959  * Dumps part of the cache
960  */
item_cachedump(struct default_engine *engine, const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes)961 char *item_cachedump(struct default_engine *engine,
962                      const unsigned int slabs_clsid,
963                      const unsigned int limit,
964                      unsigned int *bytes) {
965     char *ret;
966 
967     cb_mutex_enter(&engine->cache_lock);
968     ret = do_item_cachedump(slabs_clsid, limit, bytes);
969     cb_mutex_exit(&engine->cache_lock);
970     return ret;
971 }
972 
item_stats(struct default_engine *engine, ADD_STAT add_stat, const void *cookie)973 void item_stats(struct default_engine *engine,
974                    ADD_STAT add_stat, const void *cookie)
975 {
976     cb_mutex_enter(&engine->cache_lock);
977     do_item_stats(engine, add_stat, cookie);
978     cb_mutex_exit(&engine->cache_lock);
979 }
980 
981 
item_stats_sizes(struct default_engine *engine, ADD_STAT add_stat, const void *cookie)982 void item_stats_sizes(struct default_engine *engine,
983                       ADD_STAT add_stat, const void *cookie)
984 {
985     cb_mutex_enter(&engine->cache_lock);
986     do_item_stats_sizes(engine, add_stat, cookie);
987     cb_mutex_exit(&engine->cache_lock);
988 }
989 
do_item_link_cursor(struct default_engine *engine, hash_item *cursor, int ii)990 static void do_item_link_cursor(struct default_engine *engine,
991                                 hash_item *cursor, int ii)
992 {
993     cursor->slabs_clsid = (uint8_t)ii;
994     cursor->next = NULL;
995     cursor->prev = engine->items.tails[ii];
996     engine->items.tails[ii]->next = cursor;
997     engine->items.tails[ii] = cursor;
998     engine->items.sizes[ii]++;
999 }
1000 
1001 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
1002                                       hash_item *item, void *cookie);
1003 
do_item_walk_cursor(struct default_engine *engine, hash_item *cursor, int steplength, ITERFUNC itemfunc, void* itemdata, ENGINE_ERROR_CODE *error)1004 static bool do_item_walk_cursor(struct default_engine *engine,
1005                                 hash_item *cursor,
1006                                 int steplength,
1007                                 ITERFUNC itemfunc,
1008                                 void* itemdata,
1009                                 ENGINE_ERROR_CODE *error)
1010 {
1011     int ii = 0;
1012     *error = ENGINE_SUCCESS;
1013 
1014     while (cursor->prev != NULL && ii < steplength) {
1015         /* Move cursor */
1016         hash_item *ptr = cursor->prev;
1017         bool done = false;
1018 
1019         ++ii;
1020         item_unlink_q(engine, cursor);
1021 
1022         if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1023             done = true;
1024             cursor->prev = NULL;
1025         } else {
1026             cursor->next = ptr;
1027             cursor->prev = ptr->prev;
1028             cursor->prev->next = cursor;
1029             ptr->prev = cursor;
1030         }
1031 
1032         /* Ignore cursors */
1033         if (ptr->nkey == 0 && ptr->nbytes == 0) {
1034             --ii;
1035         } else {
1036             *error = itemfunc(engine, ptr, itemdata);
1037             if (*error != ENGINE_SUCCESS) {
1038                 return false;
1039             }
1040         }
1041 
1042         if (done) {
1043             return false;
1044         }
1045     }
1046 
1047     return (cursor->prev != NULL);
1048 }
1049 
item_scrub(struct default_engine *engine, hash_item *item, void *cookie)1050 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1051                                     hash_item *item,
1052                                     void *cookie) {
1053     rel_time_t current_time = engine->server.core->get_current_time();
1054     (void)cookie;
1055     engine->scrubber.visited++;
1056     if (item->refcount == 0 &&
1057         (item->exptime != 0 && item->exptime < current_time)) {
1058         do_item_unlink(engine, item);
1059         engine->scrubber.cleaned++;
1060     }
1061     return ENGINE_SUCCESS;
1062 }
1063 
item_scrub_class(struct default_engine *engine, hash_item *cursor)1064 static void item_scrub_class(struct default_engine *engine,
1065                              hash_item *cursor) {
1066 
1067     ENGINE_ERROR_CODE ret;
1068     bool more;
1069     do {
1070         cb_mutex_enter(&engine->cache_lock);
1071         more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1072         cb_mutex_exit(&engine->cache_lock);
1073         if (ret != ENGINE_SUCCESS) {
1074             break;
1075         }
1076     } while (more);
1077 }
1078 
item_scubber_main(void *arg)1079 static void item_scubber_main(void *arg)
1080 {
1081     struct default_engine *engine = arg;
1082     hash_item cursor;
1083     int ii;
1084 
1085     memset(&cursor, 0, sizeof(cursor));
1086     cursor.refcount = 1;
1087     for (ii = 0; ii < POWER_LARGEST; ++ii) {
1088         bool skip = false;
1089         cb_mutex_enter(&engine->cache_lock);
1090         if (engine->items.heads[ii] == NULL) {
1091             skip = true;
1092         } else {
1093             /* add the item at the tail */
1094             do_item_link_cursor(engine, &cursor, ii);
1095         }
1096         cb_mutex_exit(&engine->cache_lock);
1097 
1098         if (!skip) {
1099             item_scrub_class(engine, &cursor);
1100         }
1101     }
1102 
1103     cb_mutex_enter(&engine->scrubber.lock);
1104     engine->scrubber.stopped = time(NULL);
1105     engine->scrubber.running = false;
1106     cb_mutex_exit(&engine->scrubber.lock);
1107 }
1108 
item_start_scrub(struct default_engine *engine)1109 bool item_start_scrub(struct default_engine *engine)
1110 {
1111     bool ret = false;
1112     cb_mutex_enter(&engine->scrubber.lock);
1113     if (!engine->scrubber.running) {
1114         cb_thread_t t;
1115 
1116         engine->scrubber.started = time(NULL);
1117         engine->scrubber.stopped = 0;
1118         engine->scrubber.visited = 0;
1119         engine->scrubber.cleaned = 0;
1120         engine->scrubber.running = true;
1121 
1122         if (cb_create_thread(&t, item_scubber_main, engine, 1) != 0)
1123         {
1124             engine->scrubber.running = false;
1125         } else {
1126             ret = true;
1127         }
1128     }
1129     cb_mutex_exit(&engine->scrubber.lock);
1130 
1131     return ret;
1132 }
1133 
1134 struct tap_client {
1135     hash_item cursor;
1136     hash_item *it;
1137 };
1138 
item_tap_iterfunc(struct default_engine *engine, hash_item *item, void *cookie)1139 static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1140                                     hash_item *item,
1141                                     void *cookie) {
1142     struct tap_client *client = cookie;
1143     client->it = item;
1144     ++client->it->refcount;
1145     return ENGINE_SUCCESS;
1146 }
1147 
do_item_tap_walker(struct default_engine *engine, const void *cookie, item **itm, void **es, uint16_t *nes, uint8_t *ttl, uint16_t *flags, uint32_t *seqno, uint16_t *vbucket)1148 static tap_event_t do_item_tap_walker(struct default_engine *engine,
1149                                          const void *cookie, item **itm,
1150                                          void **es, uint16_t *nes, uint8_t *ttl,
1151                                          uint16_t *flags, uint32_t *seqno,
1152                                          uint16_t *vbucket)
1153 {
1154     ENGINE_ERROR_CODE r;
1155     struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1156     if (client == NULL) {
1157         return TAP_DISCONNECT;
1158     }
1159 
1160     *es = NULL;
1161     *nes = 0;
1162     *ttl = (uint8_t)-1;
1163     *seqno = 0;
1164     *flags = 0;
1165     *vbucket = 0;
1166     client->it = NULL;
1167 
1168     do {
1169         if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1170             /* find next slab class to look at.. */
1171             bool linked = false;
1172             int ii;
1173             for (ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked;  ++ii) {
1174                 if (engine->items.heads[ii] != NULL) {
1175                     /* add the item at the tail */
1176                     do_item_link_cursor(engine, &client->cursor, ii);
1177                     linked = true;
1178                 }
1179             }
1180             if (!linked) {
1181                 break;
1182             }
1183         }
1184     } while (client->it == NULL);
1185     *itm = client->it;
1186 
1187     return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1188 }
1189 
item_tap_walker(ENGINE_HANDLE* handle, const void *cookie, item **itm, void **es, uint16_t *nes, uint8_t *ttl, uint16_t *flags, uint32_t *seqno, uint16_t *vbucket)1190 tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1191                             const void *cookie, item **itm,
1192                             void **es, uint16_t *nes, uint8_t *ttl,
1193                             uint16_t *flags, uint32_t *seqno,
1194                             uint16_t *vbucket)
1195 {
1196     tap_event_t ret;
1197     struct default_engine *engine = (struct default_engine*)handle;
1198     cb_mutex_enter(&engine->cache_lock);
1199     ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1200     cb_mutex_exit(&engine->cache_lock);
1201 
1202     return ret;
1203 }
1204 
initialize_item_tap_walker(struct default_engine *engine, const void* cookie)1205 bool initialize_item_tap_walker(struct default_engine *engine,
1206                                 const void* cookie)
1207 {
1208     bool linked = false;
1209     int ii;
1210     struct tap_client *client = calloc(1, sizeof(*client));
1211     if (client == NULL) {
1212         return false;
1213     }
1214     client->cursor.refcount = 1;
1215 
1216     /* Link the cursor! */
1217     for (ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1218         cb_mutex_enter(&engine->cache_lock);
1219         if (engine->items.heads[ii] != NULL) {
1220             /* add the item at the tail */
1221             do_item_link_cursor(engine, &client->cursor, ii);
1222             linked = true;
1223         }
1224         cb_mutex_exit(&engine->cache_lock);
1225     }
1226 
1227     engine->server.cookie->store_engine_specific(cookie, client);
1228     return true;
1229 }
1230 
link_dcp_walker(struct default_engine *engine, struct dcp_connection *connection)1231 void link_dcp_walker(struct default_engine *engine,
1232                      struct dcp_connection *connection)
1233 {
1234     bool linked = false;
1235     int ii;
1236     connection->cursor.refcount = 1;
1237 
1238     /* Link the cursor! */
1239     for (ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1240         cb_mutex_enter(&engine->cache_lock);
1241         if (engine->items.heads[ii] != NULL) {
1242             /* add the item at the tail */
1243             do_item_link_cursor(engine, &connection->cursor, ii);
1244             linked = true;
1245         }
1246         cb_mutex_exit(&engine->cache_lock);
1247     }
1248 }
1249 
item_dcp_iterfunc(struct default_engine *engine, hash_item *item, void *cookie)1250 static ENGINE_ERROR_CODE item_dcp_iterfunc(struct default_engine *engine,
1251                                            hash_item *item,
1252                                            void *cookie) {
1253     struct dcp_connection *connection = cookie;
1254     connection->it = item;
1255     ++connection->it->refcount;
1256     return ENGINE_SUCCESS;
1257 }
1258 
do_item_dcp_step(struct default_engine *engine, struct dcp_connection *connection, const void *cookie, struct dcp_message_producers *producers)1259 static ENGINE_ERROR_CODE do_item_dcp_step(struct default_engine *engine,
1260                                           struct dcp_connection *connection,
1261                                           const void *cookie,
1262                                           struct dcp_message_producers *producers)
1263 {
1264     ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
1265 
1266     while (connection->it == NULL) {
1267         if (!do_item_walk_cursor(engine, &connection->cursor, 1,
1268                                  item_dcp_iterfunc, connection, &ret)) {
1269             /* find next slab class to look at.. */
1270             bool linked = false;
1271             int ii;
1272             for (ii = connection->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked;  ++ii) {
1273                 if (engine->items.heads[ii] != NULL) {
1274                     /* add the item at the tail */
1275                     do_item_link_cursor(engine, &connection->cursor, ii);
1276                     linked = true;
1277                 }
1278             }
1279             if (!linked) {
1280                 break;
1281             }
1282         }
1283     }
1284 
1285     if (connection->it != NULL) {
1286         rel_time_t current_time = engine->server.core->get_current_time();
1287         rel_time_t exptime = connection->it->exptime;
1288 
1289         if (exptime != 0 && exptime < current_time) {
1290             ret = producers->expiration(cookie, connection->opaque,
1291                                         item_get_key(connection->it),
1292                                         connection->it->nkey,
1293                                         item_get_cas(connection->it),
1294                                         0, 0, 0, NULL, 0);
1295             if (ret == ENGINE_SUCCESS) {
1296                 do_item_unlink(engine, connection->it);
1297                 do_item_release(engine, connection->it);
1298             }
1299         } else {
1300             ret = producers->mutation(cookie, connection->opaque,
1301                                       connection->it, 0, 0, 0, 0, NULL, 0, 0);
1302         }
1303 
1304         if (ret == ENGINE_SUCCESS) {
1305             connection->it = NULL;
1306         }
1307     } else {
1308         return ENGINE_DISCONNECT;
1309     }
1310 
1311     return ret;
1312 }
1313 
item_dcp_step(struct default_engine *engine, struct dcp_connection *connection, const void *cookie, struct dcp_message_producers *producers)1314 ENGINE_ERROR_CODE item_dcp_step(struct default_engine *engine,
1315                                 struct dcp_connection *connection,
1316                                 const void *cookie,
1317                                 struct dcp_message_producers *producers)
1318 {
1319     ENGINE_ERROR_CODE ret;
1320     cb_mutex_enter(&engine->cache_lock);
1321     ret = do_item_dcp_step(engine, connection, cookie, producers);
1322     cb_mutex_exit(&engine->cache_lock);
1323     return ret;
1324 }
1325