xref: /3.0.3-GA/ep-engine/src/stored-value.h (revision 21d63272)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_STORED_VALUE_H_
19#define SRC_STORED_VALUE_H_ 1
20
21#include "config.h"
22
23#include <algorithm>
24#include <climits>
25#include <cstring>
26#include <string>
27
28#include "common.h"
29#include "ep_time.h"
30#include "histo.h"
31#include "item.h"
32#include "item_pager.h"
33#include "locks.h"
34#include "stats.h"
35
36// Forward declaration for StoredValue
37class HashTable;
38class StoredValueFactory;
39
40/**
41 * In-memory storage for an item.
42 */
43class StoredValue {
44public:
45
46    void operator delete(void* p) {
47        ::operator delete(p);
48     }
49
50    uint8_t getNRUValue();
51
52    void setNRUValue(uint8_t nru_val);
53
54    uint8_t incrNRUValue();
55
56    void referenced();
57
58    /**
59     * Mark this item as needing to be persisted.
60     */
61    void markDirty() {
62        _isDirty = 1;
63    }
64
65    /**
66     * Mark this item as dirty as of a certain time.
67     *
68     * This method is primarily used to mark an item as dirty again
69     * after a storage failure.
70     *
71     * @param dataAge the previous dataAge of this record
72     */
73    void reDirty() {
74        _isDirty = 1;
75    }
76
77    // returns time this object was dirtied.
78    /**
79     * Mark this item as clean.
80     *
81     * @param dataAge an output parameter that captures the time this
82     *                item was marked dirty
83     */
84    void markClean() {
85        _isDirty = 0;
86    }
87
88    /**
89     * True if this object is dirty.
90     */
91    bool isDirty() const {
92        return _isDirty;
93    }
94
95    /**
96     * True if this object is not dirty.
97     */
98    bool isClean() const {
99        return !isDirty();
100    }
101
102    bool eligibleForEviction(item_eviction_policy_t policy) {
103        if (policy == VALUE_ONLY) {
104            return isResident() && isClean() && !isDeleted();
105        } else {
106            return isClean() && !isDeleted();
107        }
108    }
109
110    /**
111     * Check if this item is expired or not.
112     *
113     * @param asOf the time to be compared with this item's expiry time
114     * @return true if this item's expiry time < asOf
115     */
116    bool isExpired(time_t asOf) const {
117        if (getExptime() != 0 && getExptime() < asOf) {
118            return true;
119        }
120        return false;
121    }
122
123    /**
124     * Get the pointer to the beginning of the key.
125     */
126    const char* getKeyBytes() const {
127        return keybytes;
128    }
129
130    /**
131     * Get the length of the key.
132     */
133    uint8_t getKeyLen() const {
134        return keylen;
135    }
136
137    /**
138     * True of this item is for the given key.
139     *
140     * @param k the key we're checking
141     * @return true if this item's key is equal to k
142     */
143    bool hasKey(const std::string &k) const {
144        return k.length() == getKeyLen()
145            && (std::memcmp(k.data(), getKeyBytes(), getKeyLen()) == 0);
146    }
147
148    /**
149     * Get this item's key.
150     */
151    const std::string getKey() const {
152        return std::string(getKeyBytes(), getKeyLen());
153    }
154
155    /**
156     * Get this item's value.
157     */
158    const value_t &getValue() const {
159        return value;
160    }
161
162    /**
163     * Get the expiration time of this item.
164     *
165     * @return the expiration time for feature items, 0 for small items
166     */
167    time_t getExptime() const {
168        return exptime;
169    }
170
171    void setExptime(time_t tim) {
172        exptime = tim;
173        markDirty();
174    }
175
176    /**
177     * Get the client-defined flags of this item.
178     *
179     * @return the flags for feature items, 0 for small items
180     */
181    uint32_t getFlags() const {
182        return flags;
183    }
184
185    /**
186     * Set the client-defined flags for this item.
187     */
188    void setFlags(uint32_t fl) {
189        flags = fl;
190    }
191
192    /**
193     * Set a new value for this item.
194     *
195     * @param itm the item with a new value
196     * @param ht the hashtable that contains this StoredValue instance
197     * @param preserveSeqno Preserve the revision sequence number from the item.
198     */
199    void setValue(Item &itm, HashTable &ht, bool preserveSeqno) {
200        size_t currSize = size();
201        reduceCacheSize(ht, currSize);
202        value = itm.getValue();
203        deleted = false;
204        flags = itm.getFlags();
205        bySeqno = itm.getBySeqno();
206
207        cas = itm.getCas();
208        exptime = itm.getExptime();
209        if (preserveSeqno) {
210            revSeqno = itm.getRevSeqno();
211        } else {
212            ++revSeqno;
213            itm.setRevSeqno(revSeqno);
214        }
215
216        markDirty();
217        size_t newSize = size();
218        increaseCacheSize(ht, newSize);
219    }
220
221    /**
222     * Reset the value of this item.
223     */
224    void resetValue() {
225        cb_assert(!isDeleted());
226        markNotResident();
227        // item no longer resident once reset the value
228        deleted = true;
229    }
230
231    /**
232     * Eject an item value from memory.
233     * @param ht the hashtable that contains this StoredValue instance
234     */
235    bool ejectValue(HashTable &ht, item_eviction_policy_t policy);
236
237    /**
238     * Restore the value for this item.
239     * @param itm the item to be restored
240     * @param ht the hashtable that contains this StoredValue instance
241     */
242    bool unlocked_restoreValue(Item *itm, HashTable &ht);
243
244    /**
245     * Restore the metadata of of a temporary item upon completion of a
246     * background fetch assuming the hashtable bucket is locked.
247     *
248     * @param itm the Item whose metadata is being restored
249     * @param status the engine code describing the result of the background
250     *               fetch
251     */
252    bool unlocked_restoreMeta(Item *itm, ENGINE_ERROR_CODE status,
253                              HashTable &ht);
254
255    /**
256     * Get this item's CAS identifier.
257     *
258     * @return the cas ID for feature items, 0 for small items
259     */
260    uint64_t getCas() const {
261        return cas;
262    }
263
264    /**
265     * Set a new CAS ID.
266     *
267     * This is a NOOP for small item types.
268     */
269    void setCas(uint64_t c) {
270        cas = c;
271    }
272
273    /**
274     * Lock this item until the given time.
275     *
276     * This is a NOOP for small item types.
277     */
278    void lock(rel_time_t expiry) {
279        lock_expiry = expiry;
280    }
281
282    /**
283     * Unlock this item.
284     */
285    void unlock() {
286        lock_expiry = 0;
287    }
288
289    /**
290     * True if this item has an ID.
291     *
292     * An item always has an ID after it's been persisted.
293     */
294    bool hasBySeqno() {
295        return bySeqno > 0;
296    }
297
298    /**
299     * Get this item's ID.
300     *
301     * @return the ID for the item; 0 if the item has no ID
302     */
303    int64_t getBySeqno() {
304        return bySeqno;
305    }
306
307    /**
308     * Set the ID for this item.
309     *
310     * This is used by the persistene layer.
311     *
312     * It is an error to set an ID on an item that already has one.
313     */
314    void setBySeqno(int64_t to) {
315        bySeqno = to;
316        cb_assert(hasBySeqno());
317    }
318
319    /**
320     * Set the stored value state to the specified value
321     */
322    void setStoredValueState(const int64_t to) {
323        cb_assert(to == state_deleted_key || to == state_non_existent_key);
324        bySeqno = to;
325    }
326
327    /**
328     * Is this a temporary item created for processing a get-meta request?
329     */
330     bool isTempItem() {
331         return(isTempNonExistentItem() || isTempDeletedItem() || isTempInitialItem());
332
333     }
334
335    /**
336     * Is this an initial temporary item?
337     */
338    bool isTempInitialItem() {
339        return bySeqno == state_temp_init;
340    }
341
342    /**
343     * Is this a temporary item created for a non-existent key?
344     */
345     bool isTempNonExistentItem() {
346         return bySeqno == state_non_existent_key;
347
348     }
349
350    /**
351     * Is this a temporary item created for a deleted key?
352     */
353     bool isTempDeletedItem() {
354         return bySeqno == state_deleted_key;
355
356     }
357
358    size_t valuelen() {
359        if (isDeleted() || !isResident()) {
360            return 0;
361        }
362        return value->length();
363    }
364
365    /**
366     * Get the total size of this item.
367     *
368     * @return the amount of memory used by this item.
369     */
370    size_t size() {
371        return sizeof(StoredValue) + getKeyLen() + valuelen();
372    }
373
374    size_t metaDataSize() {
375        return sizeof(StoredValue) + getKeyLen();
376    }
377
378    /**
379     * Return true if this item is locked as of the given timestamp.
380     *
381     * @param curtime lock expiration marker (usually the current time)
382     * @return true if the item is locked
383     */
384    bool isLocked(rel_time_t curtime) {
385        if (lock_expiry == 0 || (curtime > lock_expiry)) {
386            lock_expiry = 0;
387            return false;
388        }
389        return true;
390    }
391
392    /**
393     * True if this value is resident in memory currently.
394     */
395    bool isResident() const {
396        return value.get() != NULL;
397    }
398
399    void markNotResident() {
400        value.reset();
401    }
402
403    /**
404     * True if this object is logically deleted.
405     */
406    bool isDeleted() const {
407        return deleted;
408    }
409
410    /**
411     * Logically delete this object.
412     */
413    void del(HashTable &ht, bool isMetaDelete=false) {
414        if (isDeleted()) {
415            return;
416        }
417
418        reduceCacheSize(ht, valuelen());
419        resetValue();
420        markDirty();
421        if (!isMetaDelete) {
422            setCas(getCas() + 1);
423        }
424    }
425
426
427    uint64_t getRevSeqno() const {
428        return revSeqno;
429    }
430
431    /**
432     * Set a new revision sequence number.
433     */
434    void setRevSeqno(uint64_t s) {
435        revSeqno = s;
436    }
437
438    /**
439     * Return true if this is a new cache item.
440     */
441    bool isNewCacheItem(void) {
442        return newCacheItem;
443    }
444
445    /**
446     * Set / reset a new cache item flag.
447     */
448    void setNewCacheItem(bool newitem) {
449        newCacheItem = newitem;
450    }
451
452    /**
453     * Generate a new Item out of this object.
454     *
455     * @param lck if true, the new item will return a locked CAS ID.
456     * @param vbucket the vbucket containing this item.
457     */
458    Item *toItem(bool lck, uint16_t vbucket) const;
459
460    /**
461     * Set the memory threshold on the current bucket quota for accepting a new mutation
462     */
463    static void setMutationMemoryThreshold(double memThreshold);
464
465    /*
466     * Values of the bySeqno attribute used by temporarily created StoredValue
467     * objects.
468     * state_deleted_key: represents an item that's deleted from memory but
469     *                    present in the persistent store.
470     * state_non_existent_key: represents a non existent item
471     */
472    static const int64_t state_deleted_key;
473    static const int64_t state_non_existent_key;
474    static const int64_t state_temp_init;
475
476private:
477
478    StoredValue(const Item &itm, StoredValue *n, EPStats &stats, HashTable &ht,
479                bool setDirty = true) :
480        value(itm.getValue()), next(n), bySeqno(itm.getBySeqno()),
481        flags(itm.getFlags()) {
482        cas = itm.getCas();
483        exptime = itm.getExptime();
484        deleted = false;
485        newCacheItem = true;
486        nru = INITIAL_NRU_VALUE;
487        lock_expiry = 0;
488        keylen = itm.getNKey();
489        revSeqno = itm.getRevSeqno();
490
491        if (setDirty) {
492            markDirty();
493        } else {
494            markClean();
495        }
496
497        increaseMetaDataSize(ht, stats, metaDataSize());
498        increaseCacheSize(ht, size());
499    }
500
501    friend class HashTable;
502    friend class StoredValueFactory;
503
504    value_t            value;          // 8 bytes
505    StoredValue        *next;          // 8 bytes
506    uint64_t           cas;            //!< CAS identifier.
507    uint64_t           revSeqno;       //!< Revision id sequence number
508    int64_t            bySeqno;        //!< By sequence id number
509    rel_time_t         lock_expiry;    //!< getl lock expiration
510    uint32_t           exptime;        //!< Expiration time of this item.
511    uint32_t           flags;          // 4 bytes
512    bool               _isDirty  :  1; // 1 bit
513    bool               deleted   :  1;
514    bool               newCacheItem : 1;
515    uint8_t            nru       :  2; //!< True if referenced since last sweep
516    uint8_t            keylen;
517    char               keybytes[1];    //!< The key itself.
518
519    static void increaseMetaDataSize(HashTable &ht, EPStats &st, size_t by);
520    static void reduceMetaDataSize(HashTable &ht, EPStats &st, size_t by);
521    static void increaseCacheSize(HashTable &ht, size_t by);
522    static void reduceCacheSize(HashTable &ht, size_t by);
523    static bool hasAvailableSpace(EPStats&, const Item &item);
524    static double mutation_mem_threshold;
525
526    DISALLOW_COPY_AND_ASSIGN(StoredValue);
527};
528
529/**
530 * Mutation types as returned by store commands.
531 */
532typedef enum {
533    /**
534     * Storage was attempted on a vbucket not managed by this node.
535     */
536    INVALID_VBUCKET,
537    NOT_FOUND,                  //!< The item was not found for update
538    INVALID_CAS,                //!< The wrong CAS identifier was sent for a CAS update
539    WAS_CLEAN,                  //!< The item was clean before this mutation
540    WAS_DIRTY,                  //!< This item was already dirty before this mutation
541    IS_LOCKED,                  //!< The item is locked and can't be updated.
542    NOMEM,                      //!< Insufficient memory to store this item.
543    NEED_BG_FETCH               //!< Require a bg fetch to process SET op
544} mutation_type_t;
545
546/**
547 * Result from add operation.
548 */
549typedef enum {
550    ADD_SUCCESS,                //!< Add was successful.
551    ADD_NOMEM,                  //!< No memory for operation
552    ADD_EXISTS,                 //!< Did not update -- item exists with this key
553    ADD_UNDEL,                  //!< Undeletes an existing dirty item
554    ADD_TMP_AND_BG_FETCH,       //!< Create a tmp item and schedule a bg metadata fetch
555    ADD_BG_FETCH                //!< Schedule a bg metadata fetch to process ADD op
556} add_type_t;
557
558/**
559 * Base class for visiting a hash table.
560 */
561class HashTableVisitor {
562public:
563    virtual ~HashTableVisitor() {}
564
565    /**
566     * Visit an individual item within a hash table.
567     *
568     * @param v a pointer to a value in the hash table
569     */
570    virtual void visit(StoredValue *v) = 0;
571    /**
572     * True if the visiting should continue.
573     *
574     * This is called periodically to ensure the visitor still wants
575     * to visit items.
576     */
577    virtual bool shouldContinue() { return true; }
578};
579
580/**
581 * Hash table visitor that reports the depth of each hashtable bucket.
582 */
583class HashTableDepthVisitor {
584public:
585    virtual ~HashTableDepthVisitor() {}
586
587    /**
588     * Called once for each hashtable bucket with its depth.
589     *
590     * @param bucket the index of the hashtable bucket
591     * @param depth the number of entries in this hashtable bucket
592     * @param mem counted memory used by this hash table
593     */
594    virtual void visit(int bucket, int depth, size_t mem) = 0;
595};
596
597/**
598 * Hash table visitor that finds the min and max bucket depths.
599 */
600class HashTableDepthStatVisitor : public HashTableDepthVisitor {
601public:
602
603    HashTableDepthStatVisitor() : depthHisto(GrowingWidthGenerator<unsigned int>(1, 1, 1.3),
604                                             10),
605                                  size(0), memUsed(0), min(-1), max(0) {}
606
607    void visit(int bucket, int depth, size_t mem) {
608        (void)bucket;
609        // -1 is a special case for min.  If there's a value other than
610        // -1, we prefer that.
611        min = std::min(min == -1 ? depth : min, depth);
612        max = std::max(max, depth);
613        depthHisto.add(depth);
614        size += depth;
615        memUsed += mem;
616    }
617
618    Histogram<unsigned int> depthHisto;
619    size_t                  size;
620    size_t                  memUsed;
621    int                     min;
622    int                     max;
623};
624
625/**
626 * Hash table visitor that collects stats of what's inside.
627 */
628class HashTableStatVisitor : public HashTableVisitor {
629public:
630
631    HashTableStatVisitor() : numNonResident(0), numTotal(0),
632                             memSize(0), valSize(0), cacheSize(0) {}
633
634    void visit(StoredValue *v) {
635        ++numTotal;
636        memSize += v->size();
637        valSize += v->valuelen();
638
639        if (v->isResident()) {
640            cacheSize += v->size();
641        } else {
642            ++numNonResident;
643        }
644    }
645
646    size_t numNonResident;
647    size_t numTotal;
648    size_t memSize;
649    size_t valSize;
650    size_t cacheSize;
651};
652
653/**
654 * Track the current number of hashtable visitors.
655 *
656 * This class is a pretty generic counter holder that increments on
657 * entry and decrements on return providing RAII guarantees around an
658 * atomic counter.
659 */
660class VisitorTracker {
661public:
662
663    /**
664     * Mark a visitor as visiting.
665     *
666     * @param c the counter that should be incremented (and later
667     * decremented).
668     */
669    explicit VisitorTracker(AtomicValue<size_t> *c) : counter(c) {
670        counter->fetch_add(1);
671    }
672    ~VisitorTracker() {
673        counter->fetch_sub(1);
674    }
675private:
676    AtomicValue<size_t> *counter;
677};
678
679/**
680 * Creator of StoredValue instances.
681 */
682class StoredValueFactory {
683public:
684
685    /**
686     * Create a new StoredValueFactory of the given type.
687     */
688    StoredValueFactory(EPStats &s) : stats(&s) { }
689
690    /**
691     * Create a new StoredValue with the given item.
692     *
693     * @param itm the item the StoredValue should contain
694     * @param n the the top of the hash bucket into which this will be inserted
695     * @param ht the hashtable that will contain the StoredValue instance created
696     * @param setDirty if true, mark this item as dirty after creating it
697     */
698    StoredValue *operator ()(const Item &itm, StoredValue *n, HashTable &ht,
699                             bool setDirty = true) {
700        return newStoredValue(itm, n, ht, setDirty);
701    }
702
703private:
704
705    StoredValue* newStoredValue(const Item &itm, StoredValue *n, HashTable &ht,
706                                bool setDirty) {
707        size_t base = sizeof(StoredValue);
708
709        const std::string &key = itm.getKey();
710        cb_assert(key.length() < 256);
711        size_t len = key.length() + base;
712
713        StoredValue *t = new (::operator new(len))
714                         StoredValue(itm, n, *stats, ht, setDirty);
715        std::memcpy(t->keybytes, key.data(), key.length());
716        return t;
717    }
718
719    EPStats                *stats;
720};
721
722/**
723 * A container of StoredValue instances.
724 */
725class HashTable {
726public:
727
728    /**
729     * Create a HashTable.
730     *
731     * @param st the global stats reference
732     * @param s the number of hash table buckets
733     * @param l the number of locks in the hash table
734     */
735    HashTable(EPStats &st, size_t s = 0, size_t l = 0) :
736        maxDeletedRevSeqno(0), numTotalItems(0),
737        numNonResidentItems(0), numEjects(0),
738        memSize(0), cacheSize(0), metaDataMemory(0), stats(st),
739        valFact(st), visitors(0), numItems(0), numResizes(0),
740        numTempItems(0)
741    {
742        size = HashTable::getNumBuckets(s);
743        n_locks = HashTable::getNumLocks(l);
744        cb_assert(size > 0);
745        cb_assert(n_locks > 0);
746        cb_assert(visitors == 0);
747        values = static_cast<StoredValue**>(calloc(size, sizeof(StoredValue*)));
748        mutexes = new Mutex[n_locks];
749        activeState = true;
750    }
751
752    ~HashTable() {
753        clear(true);
754        // Wait for any outstanding visitors to finish.
755        while (visitors > 0) {
756#ifdef _MSC_VER
757            Sleep(1);
758#else
759            usleep(100);
760#endif
761        }
762        delete []mutexes;
763        free(values);
764        values = NULL;
765    }
766
767    size_t memorySize() {
768        return sizeof(HashTable)
769            + (size * sizeof(StoredValue*))
770            + (n_locks * sizeof(Mutex));
771    }
772
773    /**
774     * Get the number of hash table buckets this hash table has.
775     */
776    size_t getSize(void) { return size; }
777
778    /**
779     * Get the number of locks in this hash table.
780     */
781    size_t getNumLocks(void) { return n_locks; }
782
783    /**
784     * Get the number of in-memory non-resident and resident items within
785     * this hash table.
786     */
787    size_t getNumInMemoryItems(void) { return numItems; }
788
789    /**
790     * Get the number of in-memory non-resident items within this hash table.
791     */
792    size_t getNumInMemoryNonResItems(void) { return numNonResidentItems; }
793
794    /**
795     * Get the number of non-resident and resident items managed by
796     * this hash table. Note that this will be equal to getNumItems() if
797     * VALUE_ONLY_EVICTION is chosen as a cache management.
798     */
799    size_t getNumItems(void) {
800        return numTotalItems;
801    }
802
803    /**
804     * Get the number of items whose values are ejected from this hash table.
805     */
806    size_t getNumEjects(void) { return numEjects; }
807
808    /**
809     * Get the total item memory size in this hash table.
810     */
811    size_t getItemMemory(void) { return memSize; }
812
813    /**
814     * Clear the hash table.
815     *
816     * @param deactivate true when this hash table is being destroyed completely
817     *
818     * @return a stat visitor reporting how much stuff was removed
819     */
820    HashTableStatVisitor clear(bool deactivate = false);
821
822    /**
823     * Get the number of times this hash table has been resized.
824     */
825    size_t getNumResizes() { return numResizes; }
826
827    /**
828     * Get the number of temp. items within this hash table.
829     */
830    size_t getNumTempItems(void) { return numTempItems; }
831
832    /**
833     * Automatically resize to fit the current data.
834     */
835    void resize();
836
837    /**
838     * Resize to the specified size.
839     */
840    void resize(size_t to);
841
842    /**
843     * Find the item with the given key.
844     *
845     * @param key the key to find
846     * @return a pointer to a StoredValue -- NULL if not found
847     */
848    StoredValue *find(std::string &key, bool trackReference=true) {
849        cb_assert(isActive());
850        int bucket_num(0);
851        LockHolder lh = getLockedBucket(key, &bucket_num);
852        return unlocked_find(key, bucket_num, false, trackReference);
853    }
854
855    /**
856     * Find a resident item
857     *
858     * @param rnd a randomization input
859     * @return an item -- NULL if not fount
860     */
861    Item *getRandomKey(long rnd);
862
863    /**
864     * Set a new Item into this hashtable. Use this function when your item
865     * doesn't contain meta data.
866     *
867     * @param val the Item to store
868     * @param policy item eviction policy
869     * @param nru the nru bit for the item
870     * @return a result indicating the status of the store
871     */
872    mutation_type_t set(const Item &val,
873                        item_eviction_policy_t policy = VALUE_ONLY,
874                        uint8_t nru=0xff)
875    {
876        return set(val, val.getCas(), true, false, policy, nru);
877    }
878
879    /**
880     * Set an Item into the this hashtable. Use this function to do a set
881     * when your item includes meta data.
882     *
883     * @param val the Item to store
884     * @param cas This is the cas value for the item <b>in</b> the cache
885     * @param allowExisting should we allow existing items or not
886     * @param hasMetaData should we keep the same revision seqno or increment it
887     * @param policy item eviction policy
888     * @param nru the nru bit for the item
889     * @return a result indicating the status of the store
890     */
891    mutation_type_t set(const Item &val, uint64_t cas,
892                        bool allowExisting, bool hasMetaData = true,
893                        item_eviction_policy_t policy = VALUE_ONLY,
894                        uint8_t nru=0xff) {
895        int bucket_num(0);
896        LockHolder lh = getLockedBucket(val.getKey(), &bucket_num);
897        StoredValue *v = unlocked_find(val.getKey(), bucket_num, true, false);
898        return unlocked_set(v, val, cas, allowExisting, hasMetaData, policy, nru);
899    }
900
901    mutation_type_t unlocked_set(StoredValue*& v, const Item &val, uint64_t cas,
902                                 bool allowExisting, bool hasMetaData = true,
903                                 item_eviction_policy_t policy = VALUE_ONLY,
904                                 uint8_t nru=0xff) {
905        cb_assert(isActive());
906        Item &itm = const_cast<Item&>(val);
907        if (!StoredValue::hasAvailableSpace(stats, itm)) {
908            return NOMEM;
909        }
910
911        mutation_type_t rv = NOT_FOUND;
912
913        if (cas && policy == FULL_EVICTION) {
914            if (!v || v->isTempInitialItem()) {
915                return NEED_BG_FETCH;
916            }
917        }
918
919        /*
920         * prior to checking for the lock, we should check if this object
921         * has expired. If so, then check if CAS value has been provided
922         * for this set op. In this case the operation should be denied since
923         * a cas operation for a key that doesn't exist is not a very cool
924         * thing to do. See MB 3252
925         */
926        if (v && v->isExpired(ep_real_time()) && !hasMetaData) {
927            if (v->isLocked(ep_current_time())) {
928                v->unlock();
929            }
930            if (cas) {
931                /* item has expired and cas value provided. Deny ! */
932                return NOT_FOUND;
933            }
934        }
935
936        if (v) {
937            if (!allowExisting && !v->isTempItem()) {
938                return INVALID_CAS;
939            }
940            if (v->isLocked(ep_current_time())) {
941                /*
942                 * item is locked, deny if there is cas value mismatch
943                 * or no cas value is provided by the user
944                 */
945                if (cas != v->getCas()) {
946                    return IS_LOCKED;
947                }
948                /* allow operation*/
949                v->unlock();
950            } else if (cas && cas != v->getCas()) {
951                if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
952                    return NOT_FOUND;
953                }
954                return INVALID_CAS;
955            }
956
957            if (!hasMetaData) {
958                itm.setCas();
959            }
960            rv = v->isClean() ? WAS_CLEAN : WAS_DIRTY;
961            if (!v->isResident() && !v->isDeleted() && !v->isTempItem()) {
962                --numNonResidentItems;
963            }
964
965            if (v->isTempItem()) {
966                --numTempItems;
967                ++numItems;
968                ++numTotalItems;
969            }
970
971            v->setValue(itm, *this, hasMetaData /*Preserve revSeqno*/);
972            if (nru <= MAX_NRU_VALUE) {
973                v->setNRUValue(nru);
974            }
975        } else if (cas != 0) {
976            rv = NOT_FOUND;
977        } else {
978            if (!hasMetaData) {
979                itm.setCas();
980            }
981            int bucket_num = getBucketForHash(hash(itm.getKey()));
982            v = valFact(itm, values[bucket_num], *this);
983            values[bucket_num] = v;
984            ++numItems;
985            ++numTotalItems;
986            if (nru <= MAX_NRU_VALUE && !v->isTempItem()) {
987                v->setNRUValue(nru);
988            }
989
990            if (!hasMetaData) {
991                /**
992                 * Possibly, this item is being recreated. Conservatively assign it
993                 * a seqno that is greater than the greatest seqno of all deleted
994                 * items seen so far.
995                 */
996                uint64_t seqno = getMaxDeletedRevSeqno() + 1;
997                v->setRevSeqno(seqno);
998                itm.setRevSeqno(seqno);
999            }
1000            rv = WAS_CLEAN;
1001        }
1002        return rv;
1003    }
1004
1005    /**
1006     * Insert an item to this hashtable. This is called from the backfill
1007     * so we need a bit more logic here. If we're trying to insert a partial
1008     * item we don't allow the object to be stored there (and if you try to
1009     * insert a full item we're only allowing an item without the value
1010     * in memory...)
1011     *
1012     * @param val the Item to insert
1013     * @param policy item eviction policy
1014     * @param eject true if we should eject the value immediately
1015     * @param partial is this a complete item, or just the key and meta-data
1016     * @return a result indicating the status of the store
1017     */
1018    mutation_type_t insert(Item &itm, item_eviction_policy_t policy,
1019                           bool eject, bool partial);
1020
1021    /**
1022     * Add an item to the hash table iff it doesn't already exist.
1023     *
1024     * @param val the item to store
1025     * @param policy item eviction policy
1026     * @param isDirty true if the item should be marked dirty on store
1027     * @param storeVal true if the value should be stored (paged-in)
1028     * @return an indication of what happened
1029     */
1030    add_type_t add(const Item &val, item_eviction_policy_t policy,
1031                   bool isDirty = true, bool storeVal = true) {
1032        cb_assert(isActive());
1033        int bucket_num(0);
1034        LockHolder lh = getLockedBucket(val.getKey(), &bucket_num);
1035        StoredValue *v = unlocked_find(val.getKey(), bucket_num, true, false);
1036        return unlocked_add(bucket_num, v, val, policy, isDirty, storeVal);
1037    }
1038
1039    /**
1040     * Unlocked version of the add() method.
1041     *
1042     * @param bucket_num the locked partition where the key belongs
1043     * @param v the stored value to do this operaiton on
1044     * @param val the item to store
1045     * @param policy item eviction policy
1046     * @param isDirty true if the item should be marked dirty on store
1047     * @param storeVal true if the value should be stored (paged-in)
1048     * @return an indication of what happened
1049     */
1050    add_type_t unlocked_add(int &bucket_num,
1051                            StoredValue*& v,
1052                            const Item &val,
1053                            item_eviction_policy_t policy,
1054                            bool isDirty = true,
1055                            bool storeVal = true);
1056
1057    /**
1058     * Add a temporary item to the hash table iff it doesn't already exist.
1059     *
1060     * NOTE: This method should be called after acquiring the correct
1061     *       bucket/partition lock.
1062     *
1063     * @param bucket_num the locked partition where the key belongs
1064     * @param key the key for which a temporary item needs to be added
1065     * @param policy item eviction policy
1066     * @return an indication of what happened
1067     */
1068    add_type_t unlocked_addTempItem(int &bucket_num,
1069                                    const std::string &key,
1070                                    item_eviction_policy_t policy);
1071
1072    /**
1073     * Mark the given record logically deleted.
1074     *
1075     * @param key the key of the item to delete
1076     * @param cas the expected CAS of the item (or 0 to override)
1077     * @param policy item eviction policy
1078     * @return an indicator of what the deletion did
1079     */
1080    mutation_type_t softDelete(const std::string &key, uint64_t cas,
1081                               item_eviction_policy_t policy = VALUE_ONLY) {
1082        cb_assert(isActive());
1083        int bucket_num(0);
1084        LockHolder lh = getLockedBucket(key, &bucket_num);
1085        StoredValue *v = unlocked_find(key, bucket_num, false, false);
1086        return unlocked_softDelete(v, cas, policy);
1087    }
1088
1089    mutation_type_t unlocked_softDelete(StoredValue *v,
1090                                        uint64_t cas,
1091                                        item_eviction_policy_t policy = VALUE_ONLY) {
1092        ItemMetaData metadata;
1093        if (v) {
1094            metadata.revSeqno = v->getRevSeqno() + 1;
1095        }
1096        return unlocked_softDelete(v, cas, metadata, policy);
1097    }
1098
1099    /**
1100     * Unlocked implementation of softDelete.
1101     */
1102    mutation_type_t unlocked_softDelete(StoredValue *v,
1103                                        uint64_t cas,
1104                                        ItemMetaData &metadata,
1105                                        item_eviction_policy_t policy,
1106                                        bool use_meta=false) {
1107        mutation_type_t rv = NOT_FOUND;
1108
1109        if ((!v || v->isTempInitialItem()) && policy == FULL_EVICTION) {
1110            return NEED_BG_FETCH;
1111        }
1112
1113        if (v) {
1114            if (v->isExpired(ep_real_time()) && !use_meta) {
1115                if (!v->isResident() && !v->isDeleted() && !v->isTempItem()) {
1116                    --numNonResidentItems;
1117                }
1118                v->setRevSeqno(metadata.revSeqno);
1119                v->del(*this, use_meta);
1120                updateMaxDeletedRevSeqno(v->getRevSeqno());
1121                return rv;
1122            }
1123
1124            if (v->isLocked(ep_current_time())) {
1125                if (cas != v->getCas()) {
1126                    return IS_LOCKED;
1127                }
1128                v->unlock();
1129            }
1130
1131            if (cas != 0 && cas != v->getCas()) {
1132                return INVALID_CAS;
1133            }
1134
1135            if (!v->isResident() && !v->isDeleted() && !v->isTempItem()) {
1136                --numNonResidentItems;
1137            }
1138
1139            if (v->isTempItem()) {
1140                --numTempItems;
1141                ++numItems;
1142                ++numTotalItems;
1143            }
1144
1145            /* allow operation*/
1146            v->unlock();
1147
1148            rv = v->isClean() ? WAS_CLEAN : WAS_DIRTY;
1149            v->setRevSeqno(metadata.revSeqno);
1150            if (use_meta) {
1151                v->setCas(metadata.cas);
1152                v->setFlags(metadata.flags);
1153                v->setExptime(metadata.exptime);
1154            }
1155            v->del(*this, use_meta);
1156            updateMaxDeletedRevSeqno(v->getRevSeqno());
1157        }
1158        return rv;
1159    }
1160
1161    /**
1162     * Find an item within a specific bucket assuming you already
1163     * locked the bucket.
1164     *
1165     * @param key the key of the item to find
1166     * @param bucket_num the bucket number
1167     * @param wantsDeleted true if soft deleted items should be returned
1168     *
1169     * @return a pointer to a StoredValue -- NULL if not found
1170     */
1171    StoredValue *unlocked_find(const std::string &key, int bucket_num,
1172                               bool wantsDeleted=false, bool trackReference=true) {
1173        StoredValue *v = values[bucket_num];
1174        while (v) {
1175            if (v->hasKey(key)) {
1176                if (trackReference && !v->isDeleted()) {
1177                    v->referenced();
1178                }
1179                if (wantsDeleted || !v->isDeleted()) {
1180                    return v;
1181                } else {
1182                    return NULL;
1183                }
1184            }
1185            v = v->next;
1186        }
1187        return NULL;
1188    }
1189
1190    /**
1191     * Compute a hash for the given string.
1192     *
1193     * @param str the beginning of the string
1194     * @param len the number of bytes in the string
1195     *
1196     * @return the hash value
1197     */
1198    inline int hash(const char *str, const size_t len) {
1199        cb_assert(isActive());
1200        int h=5381;
1201
1202        for(size_t i=0; i < len; i++) {
1203            h = ((h << 5) + h) ^ str[i];
1204        }
1205
1206        return h;
1207    }
1208
1209    /**
1210     * Compute a hash for the given string.
1211     *
1212     * @param s the string
1213     * @return the hash value
1214     */
1215    inline int hash(const std::string &s) {
1216        return hash(s.data(), s.length());
1217    }
1218
1219    /**
1220     * Get a lock holder holding a lock for the given bucket
1221     *
1222     * @param bucket the bucket to lock
1223     * @return a locked LockHolder
1224     */
1225    inline LockHolder getLockedBucket(int bucket) {
1226        LockHolder rv(mutexes[mutexForBucket(bucket)]);
1227        return rv;
1228    }
1229
1230    /**
1231     * Get a lock holder holding a lock for the bucket for the given
1232     * hash.
1233     *
1234     * @param h the input hash
1235     * @param bucket output parameter to receive a bucket
1236     * @return a locked LockHolder
1237     */
1238    inline LockHolder getLockedBucket(int h, int *bucket) {
1239        while (true) {
1240            cb_assert(isActive());
1241            *bucket = getBucketForHash(h);
1242            LockHolder rv(mutexes[mutexForBucket(*bucket)]);
1243            if (*bucket == getBucketForHash(h)) {
1244                return rv;
1245            }
1246        }
1247    }
1248
1249    /**
1250     * Get a lock holder holding a lock for the bucket for the hash of
1251     * the given key.
1252     *
1253     * @param s the start of the key
1254     * @param n the size of the key
1255     * @param bucket output parameter to receive a bucket
1256     * @return a locked LockHolder
1257     */
1258    inline LockHolder getLockedBucket(const char *s, size_t n, int *bucket) {
1259        return getLockedBucket(hash(s, n), bucket);
1260    }
1261
1262    /**
1263     * Get a lock holder holding a lock for the bucket for the hash of
1264     * the given key.
1265     *
1266     * @param s the key
1267     * @param bucket output parameter to receive a bucket
1268     * @return a locked LockHolder
1269     */
1270    inline LockHolder getLockedBucket(const std::string &s, int *bucket) {
1271        return getLockedBucket(hash(s.data(), s.size()), bucket);
1272    }
1273
1274    /**
1275     * Delete a key from the cache without trying to lock the cache first
1276     * (Please note that you <b>MUST</b> acquire the mutex before calling
1277     * this function!!!
1278     *
1279     * @param key the key to delete
1280     * @param bucket_num the bucket to look in (must already be locked)
1281     * @return true if an object was deleted, false otherwise
1282     */
1283    bool unlocked_del(const std::string &key, int bucket_num) {
1284        cb_assert(isActive());
1285        StoredValue *v = values[bucket_num];
1286
1287        // Special case empty bucket.
1288        if (!v) {
1289            return false;
1290        }
1291
1292        // Special case the first one
1293        if (v->hasKey(key)) {
1294            if (!v->isDeleted() && v->isLocked(ep_current_time())) {
1295                return false;
1296            }
1297
1298            values[bucket_num] = v->next;
1299            StoredValue::reduceCacheSize(*this, v->size());
1300            StoredValue::reduceMetaDataSize(*this, stats, v->metaDataSize());
1301            if (v->isTempItem()) {
1302                --numTempItems;
1303            } else {
1304                --numItems;
1305                --numTotalItems;
1306            }
1307            delete v;
1308            return true;
1309        }
1310
1311        while (v->next) {
1312            if (v->next->hasKey(key)) {
1313                StoredValue *tmp = v->next;
1314                if (!tmp->isDeleted() && tmp->isLocked(ep_current_time())) {
1315                    return false;
1316                }
1317
1318                v->next = v->next->next;
1319                StoredValue::reduceCacheSize(*this, tmp->size());
1320                StoredValue::reduceMetaDataSize(*this, stats, tmp->metaDataSize());
1321                if (tmp->isTempItem()) {
1322                    --numTempItems;
1323                } else {
1324                    --numItems;
1325                    --numTotalItems;
1326                }
1327                delete tmp;
1328                return true;
1329            } else {
1330                v = v->next;
1331            }
1332        }
1333
1334        return false;
1335    }
1336
1337    /**
1338     * Delete the item with the given key.
1339     *
1340     * @param key the key to delete
1341     * @return true if the item existed before this call
1342     */
1343    bool del(const std::string &key) {
1344        cb_assert(isActive());
1345        int bucket_num(0);
1346        LockHolder lh = getLockedBucket(key, &bucket_num);
1347        return unlocked_del(key, bucket_num);
1348    }
1349
1350    /**
1351     * Visit all items within this hashtable.
1352     */
1353    void visit(HashTableVisitor &visitor);
1354
1355    /**
1356     * Visit all items within this call with a depth visitor.
1357     */
1358    void visitDepth(HashTableDepthVisitor &visitor);
1359
1360    /**
1361     * Get the number of buckets that should be used for initialization.
1362     *
1363     * @param s if 0, return the default number of buckets, else return s
1364     */
1365    static size_t getNumBuckets(size_t s = 0);
1366
1367    /**
1368     * Get the number of locks that should be used for initialization.
1369     *
1370     * @param s if 0, return the default number of locks, else return s
1371     */
1372    static size_t getNumLocks(size_t s);
1373
1374    /**
1375     * Set the default number of buckets.
1376     */
1377    static void setDefaultNumBuckets(size_t);
1378
1379    /**
1380     * Set the default number of locks.
1381     */
1382    static void setDefaultNumLocks(size_t);
1383
1384    /**
1385     * Get the max deleted revision seqno seen so far.
1386     */
1387    uint64_t getMaxDeletedRevSeqno() const {
1388        return maxDeletedRevSeqno.load();
1389    }
1390
1391    /**
1392     * Set the max deleted seqno (required during warmup).
1393     */
1394    void setMaxDeletedRevSeqno(const uint64_t seqno) {
1395        maxDeletedRevSeqno.store(seqno);
1396    }
1397
1398    /**
1399     * Update maxDeletedRevSeqno to a (possibly) new value.
1400     */
1401    void updateMaxDeletedRevSeqno(const uint64_t seqno) {
1402        atomic_setIfBigger(maxDeletedRevSeqno, seqno);
1403    }
1404
1405    /**
1406     * Eject an item meta data and value from memory.
1407     * @param vptr the reference to the pointer to the StoredValue instance
1408     * @param policy item eviction policy
1409     * @return true if an item is ejected.
1410     */
1411    bool unlocked_ejectItem(StoredValue*& vptr, item_eviction_policy_t policy);
1412
1413    AtomicValue<uint64_t>     maxDeletedRevSeqno;
1414    AtomicValue<size_t>       numTotalItems;
1415    AtomicValue<size_t>       numNonResidentItems;
1416    AtomicValue<size_t>       numEjects;
1417    //! Memory consumed by items in this hashtable.
1418    AtomicValue<size_t>       memSize;
1419    //! Cache size.
1420    AtomicValue<size_t>       cacheSize;
1421    //! Meta-data size.
1422    AtomicValue<size_t>       metaDataMemory;
1423
1424private:
1425    friend class StoredValue;
1426
1427    inline bool isActive() const { return activeState; }
1428    inline void setActiveState(bool newv) { activeState = newv; }
1429
1430    size_t               size;
1431    size_t               n_locks;
1432    StoredValue        **values;
1433    Mutex               *mutexes;
1434    EPStats&             stats;
1435    StoredValueFactory   valFact;
1436    AtomicValue<size_t>       visitors;
1437    AtomicValue<size_t>       numItems;
1438    AtomicValue<size_t>       numResizes;
1439    AtomicValue<size_t>       numTempItems;
1440    bool                 activeState;
1441
1442    static size_t                 defaultNumBuckets;
1443    static size_t                 defaultNumLocks;
1444
1445    int getBucketForHash(int h) {
1446        return abs(h % static_cast<int>(size));
1447    }
1448
1449    inline int mutexForBucket(int bucket_num) {
1450        cb_assert(isActive());
1451        cb_assert(bucket_num >= 0);
1452        int lock_num = bucket_num % static_cast<int>(n_locks);
1453        cb_assert(lock_num < static_cast<int>(n_locks));
1454        cb_assert(lock_num >= 0);
1455        return lock_num;
1456    }
1457
1458    Item *getRandomKeyFromSlot(int slot);
1459
1460    DISALLOW_COPY_AND_ASSIGN(HashTable);
1461};
1462
1463#endif  // SRC_STORED_VALUE_H_
1464