1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2016 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include "stored-value.h"
21
22 #include "ep_time.h"
23 #include "item.h"
24 #include "objectregistry.h"
25 #include "stats.h"
26
27 #include <platform/cb_malloc.h>
28 #include <platform/compress.h>
29
30 const int64_t StoredValue::state_pending_seqno = -2;
31 const int64_t StoredValue::state_deleted_key = -3;
32 const int64_t StoredValue::state_non_existent_key = -4;
33 const int64_t StoredValue::state_temp_init = -5;
34 const int64_t StoredValue::state_collection_open = -6;
35
StoredValue(const Item& itm, UniquePtr n, EPStats& stats, bool isOrdered)36 StoredValue::StoredValue(const Item& itm,
37 UniquePtr n,
38 EPStats& stats,
39 bool isOrdered)
40 : value(itm.getValue()),
41 chain_next_or_replacement(std::move(n)),
42 cas(itm.getCas()),
43 bySeqno(itm.getBySeqno()),
44 lock_expiry_or_delete_time(0),
45 exptime(itm.getExptime()),
46 flags(itm.getFlags()),
47 revSeqno(itm.getRevSeqno()),
48 datatype(itm.getDataType()) {
49 // Initialise bit fields
50 setDeletedPriv(itm.isDeleted());
51 setNewCacheItem(true);
52 setOrdered(isOrdered);
53 setNru(itm.getNRUValue());
54 setResident(!isTempItem());
55 setStale(false);
56 // dirty initialised below
57
58 // Placement-new the key which lives in memory directly after this
59 // object.
60 new (key()) SerialisedDocKey(itm.getKey());
61
62 if (isTempInitialItem()) {
63 markClean();
64 } else {
65 markDirty();
66 }
67
68 if (isTempItem()) {
69 resetValue();
70 }
71
72 ObjectRegistry::onCreateStoredValue(this);
73 }
74
~StoredValue()75 StoredValue::~StoredValue() {
76 ObjectRegistry::onDeleteStoredValue(this);
77 }
78
StoredValue(const StoredValue& other, UniquePtr n, EPStats& stats)79 StoredValue::StoredValue(const StoredValue& other, UniquePtr n, EPStats& stats)
80 : value(other.value), // Implicitly also copies the frequency counter
81 chain_next_or_replacement(std::move(n)),
82 cas(other.cas),
83 bySeqno(other.bySeqno),
84 lock_expiry_or_delete_time(other.lock_expiry_or_delete_time),
85 exptime(other.exptime),
86 flags(other.flags),
87 revSeqno(other.revSeqno),
88 datatype(other.datatype) {
89 setDirty(other.isDirty());
90 setDeletedPriv(other.isDeleted());
91 setNewCacheItem(other.isNewCacheItem());
92 setOrdered(other.isOrdered());
93 setNru(other.getNru());
94 setResident(other.isResident());
95 setStale(false);
96 // Placement-new the key which lives in memory directly after this
97 // object.
98 StoredDocKey sKey(other.getKey());
99 new (key()) SerialisedDocKey(sKey);
100
101 ObjectRegistry::onCreateStoredValue(this);
102 }
103
setValue(const Item& itm)104 void StoredValue::setValue(const Item& itm) {
105 if (isOrdered()) {
106 return static_cast<OrderedStoredValue*>(this)->setValueImpl(itm);
107 } else {
108 return this->setValueImpl(itm);
109 }
110 }
111
ejectValue()112 void StoredValue::ejectValue() {
113 markNotResident();
114 }
115
referenced()116 void StoredValue::referenced() {
117 uint8_t nru = getNru();
118 if (nru > MIN_NRU_VALUE) {
119 setNru(--nru);
120 }
121 }
122
setNRUValue(uint8_t nru_val)123 void StoredValue::setNRUValue(uint8_t nru_val) {
124 if (nru_val <= MAX_NRU_VALUE) {
125 setNru(nru_val);
126 }
127 }
128
incrNRUValue()129 uint8_t StoredValue::incrNRUValue() {
130 uint8_t ret = MAX_NRU_VALUE;
131 uint8_t nru = getNru();
132 if (nru < MAX_NRU_VALUE) {
133 ret = ++nru;
134 setNru(nru);
135 }
136 return ret;
137 }
138
getNRUValue() const139 uint8_t StoredValue::getNRUValue() const {
140 return getNru();
141 }
142
setFreqCounterValue(uint16_t newValue)143 void StoredValue::setFreqCounterValue(uint16_t newValue) {
144 auto taggedPtr = value.get();
145 taggedPtr.setTag(newValue);
146 value.reset(taggedPtr);
147 }
148
getFreqCounterValue() const149 uint16_t StoredValue::getFreqCounterValue() const {
150 return value.get().getTag();
151 }
152
restoreValue(const Item& itm)153 void StoredValue::restoreValue(const Item& itm) {
154 if (isTempInitialItem() || isTempDeletedItem()) {
155 cas = itm.getCas();
156 flags = itm.getFlags();
157 exptime = itm.getExptime();
158 revSeqno = itm.getRevSeqno();
159 bySeqno = itm.getBySeqno();
160 setNru(INITIAL_NRU_VALUE);
161 }
162 datatype = itm.getDataType();
163 setDeletedPriv(itm.isDeleted());
164 value = itm.getValue(); // Implicitly also copies the frequency counter
165 setResident(true);
166 }
167
restoreMeta(const Item& itm)168 void StoredValue::restoreMeta(const Item& itm) {
169 cas = itm.getCas();
170 flags = itm.getFlags();
171 datatype = itm.getDataType();
172 exptime = itm.getExptime();
173 revSeqno = itm.getRevSeqno();
174 if (itm.isDeleted()) {
175 setTempDeleted();
176 } else { /* Regular item with the full eviction */
177 bySeqno = itm.getBySeqno();
178 /* set it back to false as we created a temp item by setting it to true
179 when bg fetch is scheduled (full eviction mode). */
180 setNewCacheItem(false);
181 }
182 if (getNru() == MAX_NRU_VALUE) {
183 setNru(INITIAL_NRU_VALUE);
184 }
185 setFreqCounterValue(itm.getFreqCounterValue());
186 }
187
uncompressedValuelen() const188 size_t StoredValue::uncompressedValuelen() const {
189 if (!value) {
190 return 0;
191 }
192 if (mcbp::datatype::is_snappy(datatype)) {
193 return cb::compression::get_uncompressed_length(
194 cb::compression::Algorithm::Snappy,
195 {value->getData(), value->valueSize()});
196 }
197 return valuelen();
198 }
199
del()200 bool StoredValue::del() {
201 if (isOrdered()) {
202 return static_cast<OrderedStoredValue*>(this)->deleteImpl();
203 } else {
204 return this->deleteImpl();
205 }
206 }
207
getRequiredStorage(const Item& item)208 size_t StoredValue::getRequiredStorage(const Item& item) {
209 return sizeof(StoredValue) +
210 SerialisedDocKey::getObjectSize(item.getKey().size());
211 }
212
toItem(bool lck, uint16_t vbucket) const213 std::unique_ptr<Item> StoredValue::toItem(bool lck, uint16_t vbucket) const {
214 auto itm =
215 std::make_unique<Item>(getKey(),
216 getFlags(),
217 getExptime(),
218 value,
219 datatype,
220 lck ? static_cast<uint64_t>(-1) : getCas(),
221 bySeqno,
222 vbucket,
223 getRevSeqno());
224
225 itm->setNRUValue(getNru());
226 itm->setFreqCounterValue(getFreqCounterValue());
227
228 if (isDeleted()) {
229 itm->setDeleted();
230 }
231
232 return itm;
233 }
234
toItemKeyOnly(uint16_t vbucket) const235 std::unique_ptr<Item> StoredValue::toItemKeyOnly(uint16_t vbucket) const {
236 auto itm =
237 std::make_unique<Item>(getKey(),
238 getFlags(),
239 getExptime(),
240 value_t{},
241 datatype,
242 getCas(),
243 getBySeqno(),
244 vbucket,
245 getRevSeqno());
246
247 itm->setNRUValue(getNru());
248 itm->setFreqCounterValue(getFreqCounterValue());
249
250 if (isDeleted()) {
251 itm->setDeleted();
252 }
253
254 return itm;
255 }
256
reallocate()257 void StoredValue::reallocate() {
258 // Allocate a new Blob for this stored value; copy the existing Blob to
259 // the new one and free the old.
260 value_t new_val(Blob::Copy(*value));
261 replaceValue(new_val.get());
262 }
263
operator ()(StoredValue* val)264 void StoredValue::Deleter::operator()(StoredValue* val) {
265 if (val->isOrdered()) {
266 delete static_cast<OrderedStoredValue*>(val);
267 } else {
268 delete val;
269 }
270 }
271
toOrderedStoredValue()272 OrderedStoredValue* StoredValue::toOrderedStoredValue() {
273 if (isOrdered()) {
274 return static_cast<OrderedStoredValue*>(this);
275 }
276 throw std::bad_cast();
277 }
278
toOrderedStoredValue() const279 const OrderedStoredValue* StoredValue::toOrderedStoredValue() const {
280 if (isOrdered()) {
281 return static_cast<const OrderedStoredValue*>(this);
282 }
283 throw std::bad_cast();
284 }
285
operator ==(const StoredValue& other) const286 bool StoredValue::operator==(const StoredValue& other) const {
287 return (cas == other.cas && revSeqno == other.revSeqno &&
288 bySeqno == other.bySeqno &&
289 lock_expiry_or_delete_time == other.lock_expiry_or_delete_time &&
290 exptime == other.exptime && flags == other.flags &&
291 isDirty() == other.isDirty() && isDeleted() == other.isDeleted() &&
292 isNewCacheItem() == other.isNewCacheItem() &&
293 isOrdered() == other.isOrdered() && getNru() == other.getNru() &&
294 isResident() == other.isResident() && getKey() == other.getKey());
295 }
296
deleteImpl()297 bool StoredValue::deleteImpl() {
298 if (isDeleted() && !getValue()) {
299 // SV is already marked as deleted and has no value - no further
300 // deletion possible.
301 return false;
302 }
303
304 resetValue();
305 setDatatype(PROTOCOL_BINARY_RAW_BYTES);
306 setPendingSeqno();
307
308 setDeletedPriv(true);
309 markDirty();
310
311 return true;
312 }
313
setValueImpl(const Item& itm)314 void StoredValue::setValueImpl(const Item& itm) {
315 if (isDeleted() && !itm.isDeleted()) {
316 // Transitioning from deleted -> alive - this should be considered
317 // a new cache item as it is increasing the number of (alive) items
318 // in the vBucket.
319 setNewCacheItem(true);
320 }
321
322 setDeletedPriv(itm.isDeleted());
323 flags = itm.getFlags();
324 datatype = itm.getDataType();
325 bySeqno = itm.getBySeqno();
326
327 cas = itm.getCas();
328 lock_expiry_or_delete_time = 0;
329 exptime = itm.getExptime();
330 revSeqno = itm.getRevSeqno();
331
332 if (isTempInitialItem()) {
333 markClean();
334 } else {
335 markDirty();
336 }
337
338 if (isTempItem()) {
339 setResident(false);
340 } else {
341 setResident(true);
342 value = itm.getValue();
343 }
344 }
345
compressValue()346 bool StoredValue::compressValue() {
347 if (!mcbp::datatype::is_snappy(datatype)) {
348 // Attempt compression only if datatype indicates
349 // that the value is not compressed already
350 cb::compression::Buffer deflated;
351 if (cb::compression::deflate(cb::compression::Algorithm::Snappy,
352 {value->getData(), value->valueSize()},
353 deflated)) {
354 if (deflated.size() > value->valueSize()) {
355 // No point of keeping it compressed if the deflated length
356 // is greater than the original length
357 return true;
358 }
359 Blob* data = nullptr;
360 data = Blob::New(deflated.data(), deflated.size());
361 datatype |= PROTOCOL_BINARY_DATATYPE_SNAPPY;
362 replaceValue(TaggedPtr<Blob>(data));
363 } else {
364 return false;
365 }
366 }
367
368 return true;
369 }
370
storeCompressedBuffer(cb::const_char_buffer deflated)371 void StoredValue::storeCompressedBuffer(cb::const_char_buffer deflated) {
372 Blob* data = Blob::New(deflated.data(), deflated.size());
373 datatype |= PROTOCOL_BINARY_DATATYPE_SNAPPY;
374 replaceValue(TaggedPtr<Blob>(data));
375 }
376
377 /**
378 * Get an item_info from the StoredValue
379 */
getItemInfo(uint64_t vbuuid) const380 boost::optional<item_info> StoredValue::getItemInfo(uint64_t vbuuid) const {
381 if (isTempItem()) {
382 return boost::none;
383 }
384
385 item_info info;
386 info.cas = cas;
387 info.vbucket_uuid = vbuuid;
388 info.seqno = bySeqno;
389 info.exptime = exptime;
390 info.nbytes = 0;
391 info.flags = flags;
392 info.datatype = datatype;
393 info.document_state =
394 isDeleted() ? DocumentState::Deleted : DocumentState::Alive;
395 info.nkey = getKey().size();
396 info.key = getKey().data();
397 if (getValue()) {
398 info.value[0].iov_base = const_cast<char*>(getValue()->getData());
399 info.value[0].iov_len = getValue()->valueSize();
400 }
401 return info;
402 }
403
operator <<(std::ostream& os, const StoredValue& sv)404 std::ostream& operator<<(std::ostream& os, const StoredValue& sv) {
405
406 // type, address
407 os << (sv.isOrdered() ? "OSV @" : " SV @") << &sv << " ";
408
409 // datatype: XCJ
410 os << (mcbp::datatype::is_xattr(sv.getDatatype()) ? 'X' : '.');
411 os << (mcbp::datatype::is_snappy(sv.getDatatype()) ? 'C' : '.');
412 os << (mcbp::datatype::is_json(sv.getDatatype()) ? 'J' : '.');
413 os << ' ';
414
415 // dirty (Written), deleted, new, locked
416 os << (sv.isDirty() ? 'W' : '.');
417 os << (sv.isDeleted() ? 'D' : '.');
418 os << (sv.isNewCacheItem() ? 'N' : '.');
419 os << (sv.isResident() ? 'R' : '.');
420 os << (sv.isLocked(ep_current_time()) ? 'L' : '.');
421 if (sv.isOrdered()) {
422 const auto* osv = sv.toOrderedStoredValue();
423 os << (osv->isStalePriv() ? 'S' : '.');
424 }
425 os << ' ';
426
427 // Temporary states
428 os << "temp:"
429 << (sv.isTempInitialItem() ? 'I' : ' ')
430 << (sv.isTempDeletedItem() ? 'D' : ' ')
431 << (sv.isTempNonExistentItem() ? 'N' : ' ')
432 << ' ';
433
434 // seqno, revid, expiry / purge time
435 os << "seq:" << sv.getBySeqno() << " rev:" << sv.getRevSeqno();
436 os << " cas:" << sv.getCas();
437 os << " key:\"" << sv.getKey() << "\"";
438 if (sv.isOrdered() && sv.isDeleted()) {
439 os << " del_time:" << sv.lock_expiry_or_delete_time;
440 } else {
441 os << " exp:" << sv.getExptime();
442 }
443
444 os << " vallen:" << sv.valuelen();
445 if (sv.getValue().get()) {
446 os << " val:\"";
447 const char* data = sv.getValue()->getData();
448 // print up to first 40 bytes of value.
449 const size_t limit = std::min(size_t(40), sv.getValue()->valueSize());
450 for (size_t ii = 0; ii < limit; ii++) {
451 os << data[ii];
452 }
453 if (limit < sv.getValue()->valueSize()) {
454 os << " <cut>";
455 }
456 os << "\"";
457 }
458 return os;
459 }
460
operator ==(const OrderedStoredValue& other) const461 bool OrderedStoredValue::operator==(const OrderedStoredValue& other) const {
462 return StoredValue::operator==(other);
463 }
464
getRequiredStorage(const Item& item)465 size_t OrderedStoredValue::getRequiredStorage(const Item& item) {
466 return sizeof(OrderedStoredValue) +
467 SerialisedDocKey::getObjectSize(item.getKey());
468 }
469
470 /**
471 * Return the time the item was deleted. Only valid for deleted items.
472 */
getDeletedTime() const473 rel_time_t OrderedStoredValue::getDeletedTime() const {
474 if (isDeleted()) {
475 return lock_expiry_or_delete_time;
476 } else {
477 throw std::logic_error(
478 "OrderedStoredValue::getDeletedItem: Called on Alive item");
479 }
480 }
481
deleteImpl()482 bool OrderedStoredValue::deleteImpl() {
483 if (StoredValue::deleteImpl()) {
484 // Need to record the time when an item is deleted for subsequent
485 //purging (ephemeral_metadata_purge_age).
486 setDeletedTime(ep_current_time());
487 return true;
488 }
489 return false;
490 }
491
setValueImpl(const Item& itm)492 void OrderedStoredValue::setValueImpl(const Item& itm) {
493 StoredValue::setValueImpl(itm);
494
495 // Update the deleted time (note - even if it was already deleted we should
496 // refresh this).
497 if (isDeleted()) {
498 setDeletedTime(ep_current_time());
499 }
500 }
501
setDeletedTime(rel_time_t time)502 void OrderedStoredValue::setDeletedTime(rel_time_t time) {
503 if (!isDeleted()) {
504 throw std::logic_error(
505 "OrderedStoredValue::setDeletedTime: Called on Alive item");
506 }
507 lock_expiry_or_delete_time = time;
508 }
509