1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef SRC_ITEM_H_
19 #define SRC_ITEM_H_
20 
21 #include "config.h"
22 
23 #include <memcached/engine.h>
24 #include <stdio.h>
25 #include <string.h>
26 
27 #include <cstring>
28 #include <string>
29 
30 #include "atomic.h"
31 #include "locks.h"
32 #include "mutex.h"
33 #include "objectregistry.h"
34 #include "stats.h"
35 
36 enum queue_operation {
37     queue_op_set,
38     queue_op_del,
39     queue_op_flush,
40     queue_op_empty,
41     queue_op_checkpoint_start,
42     queue_op_checkpoint_end
43 };
44 
45 // Max Value for NRU bits
46 const uint8_t MAX_NRU_VALUE = 3;
47 // Initial value for NRU bits
48 const uint8_t INITIAL_NRU_VALUE = 2;
49 //Min value for NRU bits
50 const uint8_t MIN_NRU_VALUE = 0;
51 
52 /**
53  * A blob is a minimal sized storage for data up to 2^32 bytes long.
54  */
55 class Blob : public RCValue {
56 public:
57 
58     // Constructors.
59 
60     /**
61      * Create a new Blob holding the given data.
62      *
63      * @param start the beginning of the data to copy into this blob
64      * @param len the amount of data to copy in
65      * @param ext_meta pointer to the extended meta section to be added
66      * @param ext_len length of the extended meta section
67      *
68      * @return the new Blob instance
69      */
New(const char *start, const size_t len, uint8_t *ext_meta, uint8_t ext_len)70     static Blob* New(const char *start, const size_t len, uint8_t *ext_meta,
71                      uint8_t ext_len) {
72         size_t total_len = len + sizeof(Blob) + FLEX_DATA_OFFSET + ext_len;
73         Blob *t = new (::operator new(total_len)) Blob(start, len, ext_meta,
74                                                        ext_len);
75         cb_assert(t->vlength() == len);
76         return t;
77     }
78 
79     /**
80      * Create a new Blob of the given size, with ext_meta set to the specified
81      * extended metadata
82      *
83      * @param len the size of the blob
84      * @param ext_meta pointer to the extended meta section to be copied in.
85      * @param ext_len length of the extended meta section
86      *
87      * @return the new Blob instance
88      */
New(const size_t len, uint8_t *ext_meta, uint8_t ext_len)89     static Blob* New(const size_t len, uint8_t *ext_meta, uint8_t ext_len) {
90         size_t total_len = len + sizeof(Blob) + FLEX_DATA_OFFSET + ext_len;
91         Blob *t = new (::operator new(total_len)) Blob(NULL, len, ext_meta,
92                                                        ext_len);
93         cb_assert(t->vlength() == len);
94         return t;
95     }
96 
97     /**
98      * Create a new Blob of the given size.
99      * (Used for appends/prepends)
100      *
101      * @param len the size of the blob
102      * @param ext_len length of the extended meta section
103      *
104      * @return the new Blob instance
105      */
New(const size_t len, uint8_t ext_len)106     static Blob* New(const size_t len, uint8_t ext_len) {
107         size_t total_len = len + sizeof(Blob) + FLEX_DATA_OFFSET + ext_len;
108         Blob *t = new (::operator new(total_len)) Blob(len, ext_len);
109         cb_assert(t->vlength() == len);
110         return t;
111     }
112 
113     /**
114      * Creates an exact copy of the specified Blob.
115      */
Copy(const Blob& other)116     static Blob* Copy(const Blob& other) {
117         Blob *t = new (::operator new(other.getSize())) Blob(other);
118         return t;
119     }
120 
121     // Actual accessorish things.
122 
123     /**
124      * Get the pointer to the contents of the Value part of this Blob.
125      */
getData() const126     const char* getData() const {
127         return data + FLEX_DATA_OFFSET + extMetaLen;
128     }
129 
130     /**
131      * Get the pointer to the contents of Blob.
132      */
getBlob() const133     const char* getBlob() const {
134         return data;
135     }
136 
137     /**
138      * Return datatype stored in Value Blob.
139      */
getDataType() const140     const uint8_t getDataType() const {
141         return extMetaLen > 0 ? *(data + FLEX_DATA_OFFSET) :
142             PROTOCOL_BINARY_RAW_BYTES;
143     }
144 
145     /**
146      * Set datatype for the value Blob.
147      */
setDataType(uint8_t datatype)148     void setDataType(uint8_t datatype) {
149         std::memcpy(data + FLEX_DATA_OFFSET, &datatype, sizeof(uint8_t));
150     }
151 
152     /**
153      * Return the pointer to exteneded metadata, stored in the Blob.
154      */
getExtMeta() const155     const char* getExtMeta() const {
156         cb_assert(data);
157         return extMetaLen > 0 ? data + FLEX_DATA_OFFSET : NULL;
158     }
159 
160     /**
161      * Get the length of this Blob value.
162      */
length() const163     size_t length() const {
164         return size;
165     }
166 
167     /**
168      * Get the length of just the value part in the Blob.
169      */
vlength() const170     size_t vlength() const {
171         return size - extMetaLen - FLEX_DATA_OFFSET;
172     }
173 
174     /**
175      * Get the size of this Blob instance.
176      */
getSize() const177     size_t getSize() const {
178         return size + sizeof(Blob);
179     }
180 
181     /**
182      * Get extended meta data length, after subtracting the
183      * size of FLEX_META_CODE.
184      */
getExtLen() const185     uint8_t getExtLen() const {
186         return extMetaLen;
187     }
188 
189     /**
190      * Returns how old this Blob is (how many epochs have passed since it was
191      * created).
192      */
getAge() const193     uint8_t getAge() const {
194         return age;
195     }
196 
197     /**
198      * Increment the age of the Blob. Saturates at 255.
199      */
incrementAge()200     void incrementAge() {
201         age++;
202         // Saturate the result at 255 if we wrapped.
203         if (age == 0) {
204             age = 255;
205         }
206     }
207 
208     /**
209      * Get a std::string representation of this blob.
210      */
to_s() const211     const std::string to_s() const {
212         return std::string(data + extMetaLen + FLEX_DATA_OFFSET,
213                            vlength());
214     }
215 
216     // This is necessary for making C++ happy when I'm doing a
217     // placement new on fairly "normal" c++ heap allocations, just
218     // with variable-sized objects.
operator delete(void* p)219     void operator delete(void* p) { ::operator delete(p); }
220 
~Blob()221     ~Blob() {
222         ObjectRegistry::onDeleteBlob(this);
223     }
224 
225 private:
226 
227     /* Constructor.
228      * @param start If non-NULL, pointer to array which will be copied into
229      *              the newly-created Blob.
230      * @param len   Size of the data the Blob object will hold, and size of
231      *              the data at {start}.
232      * @param ext_meta Pointer to any extended metadata, which will be copied
233      *                 into the newly created Blob.
234      * @param ext_len Size of the data pointed to by {ext_meta}
235      */
Blob(const char *start, const size_t len, uint8_t* ext_meta, uint8_t ext_len)236     explicit Blob(const char *start, const size_t len, uint8_t* ext_meta,
237                   uint8_t ext_len) :
238         size(static_cast<uint32_t>(len + FLEX_DATA_OFFSET + ext_len)),
239         extMetaLen(static_cast<uint8_t>(ext_len)),
240         age(0)
241     {
242         *(data) = FLEX_META_CODE;
243         std::memcpy(data + FLEX_DATA_OFFSET, ext_meta, ext_len);
244         if (start != NULL) {
245             std::memcpy(data + FLEX_DATA_OFFSET + ext_len, start, len);
246 #ifdef VALGRIND
247         } else {
248             memset(data + FLEX_DATA_OFFSET + ext_len, 0, len);
249 #endif
250         }
251         ObjectRegistry::onCreateBlob(this);
252     }
253 
Blob(const size_t len, uint8_t ext_len)254     explicit Blob(const size_t len, uint8_t ext_len) :
255         size(static_cast<uint32_t>(len + FLEX_DATA_OFFSET + ext_len)),
256         extMetaLen(static_cast<uint8_t>(ext_len)),
257         age(0)
258     {
259 #ifdef VALGRIND
260         memset(data, 0, len);
261 #endif
262         ObjectRegistry::onCreateBlob(this);
263     }
264 
Blob(const Blob& other)265     explicit Blob(const Blob& other)
266       : size(other.size),
267         extMetaLen(other.extMetaLen),
268         // While this is a copy, it is a new allocation therefore reset age.
269         age(0)
270     {
271         std::memcpy(data, other.data, size);
272         ObjectRegistry::onCreateBlob(this);
273     }
274 
275     const uint32_t size;
276     const uint8_t extMetaLen;
277 
278     // The age of this Blob, in terms of some unspecified units of time.
279     uint8_t age;
280     char data[1];
281 
282     DISALLOW_ASSIGN(Blob);
283 };
284 
285 typedef SingleThreadedRCPtr<Blob> value_t;
286 
287 const uint64_t DEFAULT_REV_SEQ_NUM = 1;
288 
289 /**
290  * The ItemMetaData structure is used to pass meta data information of
291  * an Item.
292  */
293 class ItemMetaData {
294 public:
ItemMetaData()295     ItemMetaData() :
296         cas(0), revSeqno(DEFAULT_REV_SEQ_NUM), flags(0), exptime(0) {
297     }
298 
ItemMetaData(uint64_t c, uint64_t s, uint32_t f, time_t e)299     ItemMetaData(uint64_t c, uint64_t s, uint32_t f, time_t e) :
300         cas(c), revSeqno(s == 0 ? DEFAULT_REV_SEQ_NUM : s), flags(f),
301         exptime(e) {
302     }
303 
304     uint64_t cas;
305     uint64_t revSeqno;
306     uint32_t flags;
307     time_t exptime;
308 };
309 
310 /**
311  * Conflict Resolution Modes
312  */
313 enum conflict_resolution_mode {
314     revision_seqno = 0,
315     last_write_wins
316 };
317 
318 /**
319  * The Item structure we use to pass information between the memcached
320  * core and the backend. Please note that the kvstore don't store these
321  * objects, so we do have an extra layer of memory copying :(
322  */
323 class Item : public RCValue {
324 public:
325 
326     /* Constructor (existing value_t).
327      * Used when a value already exists, and the Item should refer to that
328      * value.
329      */
Item(const std::string &k, const uint32_t fl, const time_t exp, const value_t &val, uint64_t theCas = 0, int64_t i = -1, uint16_t vbid = 0, uint64_t sno = 1, uint8_t nru_value = INITIAL_NRU_VALUE, uint8_t conflict_res_value = revision_seqno)330     Item(const std::string &k, const uint32_t fl, const time_t exp,
331          const value_t &val, uint64_t theCas = 0,  int64_t i = -1,
332          uint16_t vbid = 0, uint64_t sno = 1, uint8_t nru_value = INITIAL_NRU_VALUE,
333          uint8_t conflict_res_value = revision_seqno) :
334         metaData(theCas, sno, fl, exp),
335         value(val),
336         key(k),
337         bySeqno(i),
338         queuedTime(ep_current_time()),
339         vbucketId(vbid),
340         op(queue_op_set),
341         nru(nru_value),
342         conflictResMode(conflict_res_value)
343     {
344         cb_assert(bySeqno != 0);
345         ObjectRegistry::onCreateItem(this);
346     }
347 
348     /* Constructor (new value).
349      * {k, nk}   specify the item's key, k must be non-null and point to an
350      *           array of bytes of length nk, where nk must be >0.
351      * fl        Item flags.
352      * exp       Item expiry.
353      * {dta, nb} specify the item's value. nb specifies how much memory will be
354      *           allocated for the value. If dta is non-NULL then the value
355      *           is set from the memory pointed to by dta. If dta is NULL,
356      *           then no data is copied in.
357      *  The remaining arguments specify various optional attributes.
358      */
Item(const void *k, uint16_t nk, const uint32_t fl, const time_t exp, const void *dta, const size_t nb, uint8_t* ext_meta = NULL, uint8_t ext_len = 0, uint64_t theCas = 0, int64_t i = -1, uint16_t vbid = 0, uint64_t sno = 1, uint8_t nru_value = INITIAL_NRU_VALUE, uint8_t conflict_res_value = revision_seqno)359     Item(const void *k, uint16_t nk, const uint32_t fl, const time_t exp,
360          const void *dta, const size_t nb, uint8_t* ext_meta = NULL,
361          uint8_t ext_len = 0, uint64_t theCas = 0, int64_t i = -1,
362          uint16_t vbid = 0, uint64_t sno = 1, uint8_t nru_value = INITIAL_NRU_VALUE,
363          uint8_t conflict_res_value = revision_seqno) :
364         metaData(theCas, sno, fl, exp),
365         key(static_cast<const char*>(k), nk),
366         bySeqno(i),
367         queuedTime(ep_current_time()),
368         vbucketId(vbid),
369         op(queue_op_set),
370         nru(nru_value),
371         conflictResMode(conflict_res_value)
372     {
373         cb_assert(bySeqno != 0);
374         setData(static_cast<const char*>(dta), nb, ext_meta, ext_len);
375         ObjectRegistry::onCreateItem(this);
376     }
377 
Item(const std::string &k, const uint16_t vb, enum queue_operation o, const uint64_t revSeq, const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE, uint8_t conflict_res_value = revision_seqno)378    Item(const std::string &k, const uint16_t vb,
379         enum queue_operation o, const uint64_t revSeq,
380         const int64_t bySeq, uint8_t nru_value = INITIAL_NRU_VALUE,
381         uint8_t conflict_res_value = revision_seqno) :
382        metaData(),
383        key(k),
384        bySeqno(bySeq),
385        queuedTime(ep_current_time()),
386        vbucketId(vb),
387        op(static_cast<uint16_t>(o)),
388        nru(nru_value),
389        conflictResMode(conflict_res_value)
390     {
391        cb_assert(bySeqno >= 0);
392        metaData.revSeqno = revSeq;
393        ObjectRegistry::onCreateItem(this);
394     }
395 
396     /* Copy constructor */
Item(const Item& other)397     Item(const Item& other) :
398         metaData(other.metaData),
399         value(other.value),
400         key(other.key),
401         bySeqno(other.bySeqno),
402         queuedTime(other.queuedTime),
403         vbucketId(other.vbucketId),
404         op(other.op),
405         nru(other.nru),
406         conflictResMode(other.conflictResMode)
407     {
408         ObjectRegistry::onCreateItem(this);
409     }
410 
~Item()411     ~Item() {
412         ObjectRegistry::onDeleteItem(this);
413     }
414 
getData() const415     const char *getData() const {
416         return value.get() ? value->getData() : NULL;
417     }
418 
getBlob() const419     const char *getBlob() const {
420         return value.get() ? value->getBlob() : NULL;
421     }
422 
getValue() const423     const value_t &getValue() const {
424         return value;
425     }
426 
getKey() const427     const std::string &getKey() const {
428         return key;
429     }
430 
getBySeqno() const431     int64_t getBySeqno() const {
432         return bySeqno;
433     }
434 
setBySeqno(int64_t to)435     void setBySeqno(int64_t to) {
436         bySeqno = to;
437     }
438 
getNKey() const439     int getNKey() const {
440         return static_cast<int>(key.length());
441     }
442 
getNBytes() const443     uint32_t getNBytes() const {
444         return value.get() ? static_cast<uint32_t>(value->vlength()) : 0;
445     }
446 
getValMemSize() const447     size_t getValMemSize() const {
448         return value.get() ? value->getSize() : 0;
449     }
450 
getExptime() const451     time_t getExptime() const {
452         return metaData.exptime;
453     }
454 
getFlags() const455     uint32_t getFlags() const {
456         return metaData.flags;
457     }
458 
getCas() const459     uint64_t getCas() const {
460         return metaData.cas;
461     }
462 
getDataType() const463     uint8_t getDataType() const {
464         return value.get() ? value->getDataType() :
465             PROTOCOL_BINARY_RAW_BYTES;
466     }
467 
setDataType(uint8_t datatype)468     void setDataType(uint8_t datatype) {
469         value->setDataType(datatype);
470     }
471 
getExtMeta() const472     const char* getExtMeta() const {
473         return value.get() ? value->getExtMeta() : NULL;
474     }
475 
getExtMetaLen() const476     uint8_t getExtMetaLen() const {
477         return value.get() ? value->getExtLen() : 0;
478     }
479 
setCas()480     void setCas() {
481         metaData.cas = nextCas();
482     }
483 
setCas(uint64_t ncas)484     void setCas(uint64_t ncas) {
485         metaData.cas = ncas;
486     }
487 
setValue(const value_t &v)488     void setValue(const value_t &v) {
489         value.reset(v);
490     }
491 
setFlags(uint32_t f)492     void setFlags(uint32_t f) {
493         metaData.flags = f;
494     }
495 
setExpTime(time_t exp_time)496     void setExpTime(time_t exp_time) {
497         metaData.exptime = exp_time;
498     }
499 
500     /**
501      * Append another item to this item
502      *
503      * @param item the item to append to this one
504      * @param maxItemSize maximum item size permitted
505      * @return ENGINE_SUCCESS if success
506      */
507     ENGINE_ERROR_CODE append(const Item &item, size_t maxItemSize);
508 
509     /**
510      * Prepend another item to this item
511      *
512      * @param item the item to prepend to this one
513      * @param maxItemSize maximum item size permitted
514      * @return ENGINE_SUCCESS if success
515      */
516     ENGINE_ERROR_CODE prepend(const Item &item, size_t maxItemSize);
517 
getVBucketId(void) const518     uint16_t getVBucketId(void) const {
519         return vbucketId;
520     }
521 
setVBucketId(uint16_t to)522     void setVBucketId(uint16_t to) {
523         vbucketId = to;
524     }
525 
526     /**
527      * Check if this item is expired or not.
528      *
529      * @param asOf the time to be compared with this item's expiry time
530      * @return true if this item's expiry time < asOf
531      */
isExpired(time_t asOf) const532     bool isExpired(time_t asOf) const {
533         if (metaData.exptime != 0 && metaData.exptime < asOf) {
534             return true;
535         }
536         return false;
537     }
538 
size(void) const539     size_t size(void) const {
540         return sizeof(Item) + key.size() + getValMemSize();
541     }
542 
getRevSeqno() const543     uint64_t getRevSeqno() const {
544         return metaData.revSeqno;
545     }
546 
setRevSeqno(uint64_t to)547     void setRevSeqno(uint64_t to) {
548         if (to == 0) {
549             to = DEFAULT_REV_SEQ_NUM;
550         }
551         metaData.revSeqno = to;
552     }
553 
getNMetaBytes()554     static uint32_t getNMetaBytes() {
555         return metaDataSize;
556     }
557 
getMetaData() const558     const ItemMetaData& getMetaData() const {
559         return metaData;
560     }
561 
isDeleted()562     bool isDeleted() {
563         return op == queue_op_del;
564     }
565 
setDeleted()566     void setDeleted() {
567         op = queue_op_del;
568     }
569 
getQueuedTime(void) const570     uint32_t getQueuedTime(void) const { return queuedTime; }
571 
setQueuedTime(uint32_t queued_time)572     void setQueuedTime(uint32_t queued_time) {
573         queuedTime = queued_time;
574     }
575 
getOperation(void) const576     enum queue_operation getOperation(void) const {
577         return static_cast<enum queue_operation>(op);
578     }
579 
setOperation(enum queue_operation o)580     void setOperation(enum queue_operation o) {
581         op = static_cast<uint8_t>(o);
582     }
583 
setNRUValue(uint8_t nru_value)584     void setNRUValue(uint8_t nru_value) {
585         nru = nru_value;
586     }
587 
getNRUValue() const588     uint8_t getNRUValue() const {
589         return nru;
590     }
591 
nextCas(void)592     static uint64_t nextCas(void) {
593         return gethrtime() + (++casCounter);
594     }
595 
setConflictResMode(enum conflict_resolution_mode conf_res_value)596     void setConflictResMode(enum conflict_resolution_mode conf_res_value) {
597         conflictResMode = static_cast<uint8_t>(conf_res_value);
598     }
599 
getConflictResMode(void) const600     enum conflict_resolution_mode getConflictResMode(void) const {
601         return static_cast<enum conflict_resolution_mode>(conflictResMode);
602     }
603 
604 private:
605     /**
606      * Set the item's data. This is only used by constructors, so we
607      * make it private.
608      */
setData(const char *dta, const size_t nb, uint8_t* ext_meta, uint8_t ext_len)609     void setData(const char *dta, const size_t nb, uint8_t* ext_meta,
610                  uint8_t ext_len) {
611         Blob *data;
612         if (dta == NULL) {
613             data = Blob::New(nb, ext_meta, ext_len);
614         } else {
615             data = Blob::New(dta, nb, ext_meta, ext_len);
616         }
617         cb_assert(data);
618         value.reset(data);
619     }
620 
621     ItemMetaData metaData;
622     value_t value;
623     std::string key;
624     int64_t bySeqno;
625     uint32_t queuedTime;
626     uint16_t vbucketId;
627     uint8_t op;
628     uint8_t nru  : 2;
629     uint8_t conflictResMode : 2;
630 
631     static AtomicValue<uint64_t> casCounter;
632     static const uint32_t metaDataSize;
633     DISALLOW_ASSIGN(Item);
634 };
635 
636 typedef SingleThreadedRCPtr<Item> queued_item;
637 
638 /**
639  * Order queued_item objects pointed by shared_ptr by their keys.
640  */
641 class CompareQueuedItemsByKey {
642 public:
CompareQueuedItemsByKey()643     CompareQueuedItemsByKey() {}
operator ()(const queued_item &i1, const queued_item &i2)644     bool operator()(const queued_item &i1, const queued_item &i2) {
645         return i1->getKey() < i2->getKey();
646     }
647 };
648 
649 /**
650  * Order QueuedItem objects by their keys and by sequence numbers.
651  */
652 class CompareQueuedItemsBySeqnoAndKey {
653 public:
CompareQueuedItemsBySeqnoAndKey()654     CompareQueuedItemsBySeqnoAndKey() {}
operator ()(const queued_item &i1, const queued_item &i2)655     bool operator()(const queued_item &i1, const queued_item &i2) {
656         return i1->getKey() == i2->getKey()
657             ? i1->getBySeqno() > i2->getBySeqno()
658             : i1->getKey() < i2->getKey();
659     }
660 };
661 
662 #endif  // SRC_ITEM_H_
663