xref: /6.0.3/kv_engine/engines/ep/src/vbucket.cc (revision 893b9cdd)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "atomic.h"
21#include "bgfetcher.h"
22#include "checkpoint.h"
23#include "conflict_resolution.h"
24#include "ep_engine.h"
25#include "ep_time.h"
26#include "ep_types.h"
27#include "failover-table.h"
28#include "flusher.h"
29#include "hash_table.h"
30#include "pre_link_document_context.h"
31#include "statwriter.h"
32#include "stored_value_factories.h"
33#include "vbucket.h"
34#include "vbucketdeletiontask.h"
35
36#include <platform/compress.h>
37#include <xattr/blob.h>
38#include <xattr/utils.h>
39
40#include <functional>
41#include <list>
42#include <set>
43#include <string>
44#include <vector>
45
46/* Macros */
47const auto MIN_CHK_FLUSH_TIMEOUT = std::chrono::seconds(10);
48const auto MAX_CHK_FLUSH_TIMEOUT = std::chrono::seconds(30);
49
50/* Statics definitions */
51cb::AtomicDuration VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
52double VBucket::mutationMemThreshold = 0.9;
53
54VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
55    std::vector<uint16_t> tmp(acceptable.size() + other.size());
56    std::vector<uint16_t>::iterator end;
57    end = std::set_symmetric_difference(acceptable.begin(),
58                                        acceptable.end(),
59                                        other.acceptable.begin(),
60                                        other.acceptable.end(),
61                                        tmp.begin());
62    return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
63}
64
65VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
66                                                                        const {
67    std::vector<uint16_t> tmp(acceptable.size() + other.size());
68    std::vector<uint16_t>::iterator end;
69
70    end = std::set_intersection(acceptable.begin(), acceptable.end(),
71                                other.acceptable.begin(),
72                                other.acceptable.end(),
73                                tmp.begin());
74    return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
75}
76
77static bool isRange(std::set<uint16_t>::const_iterator it,
78                    const std::set<uint16_t>::const_iterator &end,
79                    size_t &length)
80{
81    length = 0;
82    for (uint16_t val = *it;
83         it != end && (val + length) == *it;
84         ++it, ++length) {
85        // empty
86    }
87
88    --length;
89
90    return length > 1;
91}
92
93std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
94{
95    std::set<uint16_t>::const_iterator it;
96
97    if (filter.acceptable.empty()) {
98        out << "{ empty }";
99    } else {
100        bool needcomma = false;
101        out << "{ ";
102        for (it = filter.acceptable.begin();
103             it != filter.acceptable.end();
104             ++it) {
105            if (needcomma) {
106                out << ", ";
107            }
108
109            size_t length;
110            if (isRange(it, filter.acceptable.end(), length)) {
111                std::set<uint16_t>::iterator last = it;
112                for (size_t i = 0; i < length; ++i) {
113                    ++last;
114                }
115                out << "[" << *it << "," << *last << "]";
116                it = last;
117            } else {
118                out << *it;
119            }
120            needcomma = true;
121        }
122        out << " }";
123    }
124
125    return out;
126}
127
128const vbucket_state_t VBucket::ACTIVE =
129                     static_cast<vbucket_state_t>(htonl(vbucket_state_active));
130const vbucket_state_t VBucket::REPLICA =
131                    static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
132const vbucket_state_t VBucket::PENDING =
133                    static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
134const vbucket_state_t VBucket::DEAD =
135                    static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
136
137VBucket::VBucket(id_type i,
138                 vbucket_state_t newState,
139                 EPStats& st,
140                 CheckpointConfig& chkConfig,
141                 int64_t lastSeqno,
142                 uint64_t lastSnapStart,
143                 uint64_t lastSnapEnd,
144                 std::unique_ptr<FailoverTable> table,
145                 std::shared_ptr<Callback<id_type>> flusherCb,
146                 std::unique_ptr<AbstractStoredValueFactory> valFact,
147                 NewSeqnoCallback newSeqnoCb,
148                 Configuration& config,
149                 item_eviction_policy_t evictionPolicy,
150                 vbucket_state_t initState,
151                 uint64_t purgeSeqno,
152                 uint64_t maxCas,
153                 int64_t hlcEpochSeqno,
154                 bool mightContainXattrs,
155                 const std::string& collectionsManifest)
156    : ht(st, std::move(valFact), config.getHtSize(), config.getHtLocks()),
157      checkpointManager(std::make_unique<CheckpointManager>(st,
158                                                            i,
159                                                            chkConfig,
160                                                            lastSeqno,
161                                                            lastSnapStart,
162                                                            lastSnapEnd,
163                                                            flusherCb)),
164      failovers(std::move(table)),
165      opsCreate(0),
166      opsUpdate(0),
167      opsDelete(0),
168      opsReject(0),
169      dirtyQueueSize(0),
170      dirtyQueueMem(0),
171      dirtyQueueFill(0),
172      dirtyQueueDrain(0),
173      dirtyQueueAge(0),
174      dirtyQueuePendingWrites(0),
175      metaDataDisk(0),
176      numExpiredItems(0),
177      eviction(evictionPolicy),
178      stats(st),
179      persistenceSeqno(0),
180      numHpVBReqs(0),
181      id(i),
182      state(newState),
183      initialState(initState),
184      purge_seqno(purgeSeqno),
185      takeover_backed_up(false),
186      persisted_snapshot_start(lastSnapStart),
187      persisted_snapshot_end(lastSnapEnd),
188      rollbackItemCount(0),
189      hlc(maxCas,
190          hlcEpochSeqno,
191          std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
192          std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
193      statPrefix("vb_" + std::to_string(i)),
194      persistenceCheckpointId(0),
195      bucketCreation(false),
196      deferredDeletion(false),
197      deferredDeletionCookie(nullptr),
198      newSeqnoCb(std::move(newSeqnoCb)),
199      manifest(collectionsManifest),
200      mayContainXattrs(mightContainXattrs) {
201    if (config.getConflictResolutionType().compare("lww") == 0) {
202        conflictResolver.reset(new LastWriteWinsResolution());
203    } else {
204        conflictResolver.reset(new RevisionSeqnoResolution());
205    }
206
207    backfill.isBackfillPhase = false;
208    pendingOpsStart = ProcessClock::time_point();
209    stats.coreLocal.get()->memOverhead.fetch_add(
210            sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
211    LOG(EXTENSION_LOG_NOTICE,
212        "VBucket: created vbucket:%" PRIu16
213        " with state:%s "
214        "initialState:%s lastSeqno:%" PRIu64 " lastSnapshot:{%" PRIu64
215        ",%" PRIu64 "} persisted_snapshot:{%" PRIu64 ",%" PRIu64
216        "} max_cas:%" PRIu64 " uuid:%s",
217        id,
218        VBucket::toString(state),
219        VBucket::toString(initialState),
220        lastSeqno,
221        lastSnapStart,
222        lastSnapEnd,
223        persisted_snapshot_start,
224        persisted_snapshot_end,
225        getMaxCas(),
226        failovers ? std::to_string(failovers->getLatestUUID()).c_str() : "<>");
227}
228
229VBucket::~VBucket() {
230    if (!pendingOps.empty()) {
231        LOG(EXTENSION_LOG_WARNING,
232            "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
233            id,
234            pendingOps.size());
235    }
236
237    stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
238    stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
239
240    // Clear out the bloomfilter(s)
241    clearFilter();
242
243    stats.coreLocal.get()->memOverhead.fetch_sub(
244            sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
245
246    LOG(EXTENSION_LOG_NOTICE, "Destroying vbucket %d", id);
247}
248
249int64_t VBucket::getHighSeqno() const {
250    return checkpointManager->getHighSeqno();
251}
252
253size_t VBucket::getChkMgrMemUsage() const {
254    return checkpointManager->getMemoryUsage();
255}
256
257size_t VBucket::getChkMgrMemUsageOfUnrefCheckpoints() const {
258    return checkpointManager->getMemoryUsageOfUnrefCheckpoints();
259}
260
261size_t VBucket::getChkMgrMemUsageOverhead() const {
262    return checkpointManager->getMemoryOverhead();
263}
264
265void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
266                         ENGINE_ERROR_CODE code) {
267    std::unique_lock<std::mutex> lh(pendingOpLock);
268
269    if (pendingOpsStart > ProcessClock::time_point()) {
270        auto now = ProcessClock::now();
271        if (now > pendingOpsStart) {
272            auto d = std::chrono::duration_cast<std::chrono::microseconds>(
273                    now - pendingOpsStart);
274            stats.pendingOpsHisto.add(d);
275            atomic_setIfBigger(stats.pendingOpsMaxDuration,
276                               std::make_unsigned<hrtime_t>::type(d.count()));
277        }
278    } else {
279        return;
280    }
281
282    pendingOpsStart = ProcessClock::time_point();
283    stats.pendingOps.fetch_sub(pendingOps.size());
284    atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
285
286    while (!pendingOps.empty()) {
287        const void *pendingOperation = pendingOps.back();
288        pendingOps.pop_back();
289        // We don't want to hold the pendingOpLock when
290        // calling notifyIOComplete.
291        lh.unlock();
292        engine.notifyIOComplete(pendingOperation, code);
293        lh.lock();
294    }
295
296    LOG(EXTENSION_LOG_INFO,
297        "Fired pendings ops for vbucket %" PRIu16 " in state %s",
298        id,
299        VBucket::toString(state));
300}
301
302void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
303
304    if (state == vbucket_state_active) {
305        fireAllOps(engine, ENGINE_SUCCESS);
306    } else if (state == vbucket_state_pending) {
307        // Nothing
308    } else {
309        fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
310    }
311}
312
313VBucket::ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
314    // Fetch up to approxLimit items from rejectQueue, backfill items and
315    // checkpointManager (in that order); then check if we obtained everything
316    // which is available.
317    ItemsToFlush result;
318
319    // First add any items from the rejectQueue.
320    while (result.items.size() < approxLimit && !rejectQueue.empty()) {
321        result.items.push_back(rejectQueue.front());
322        rejectQueue.pop();
323    }
324
325    // Append any 'backfill' items (mutations added by a DCP stream).
326    bool backfillEmpty;
327    {
328        LockHolder lh(backfill.mutex);
329        size_t num_items = 0;
330        while (result.items.size() < approxLimit && !backfill.items.empty()) {
331            result.items.push_back(backfill.items.front());
332            backfill.items.pop();
333            num_items++;
334        }
335        backfillEmpty = backfill.items.empty();
336        stats.vbBackfillQueueSize.fetch_sub(num_items);
337        stats.coreLocal.get()->memOverhead.fetch_sub(num_items *
338                                                     sizeof(queued_item));
339    }
340
341    // Append up to approxLimit checkpoint items outstanding for the persistence
342    // cursor, if we haven't yet hit the limit.
343    // Note that it is only valid to queue a complete checkpoint - this is where
344    // the "approx" in the limit comes from.
345    const auto ckptMgrLimit = approxLimit - result.items.size();
346    bool ckptItemsAvailable = true;
347    if (ckptMgrLimit > 0) {
348        auto _begin_ = ProcessClock::now();
349        auto ckptItems = checkpointManager->getItemsForCursor(
350                CheckpointManager::pCursorName, result.items, ckptMgrLimit);
351        result.range = ckptItems.range;
352        ckptItemsAvailable = ckptItems.moreAvailable;
353        stats.persistenceCursorGetItemsHisto.add(
354                std::chrono::duration_cast<std::chrono::microseconds>(
355                        ProcessClock::now() - _begin_));
356    } else {
357        // We haven't got sufficient remaining capacity to read items from
358        // CheckpoitnManager, therefore we must assume that there /could/
359        // more data to follow (leaving ckptItemsAvailable true). We also must
360        // ensure the valid snapshot range is returned
361        result.range = checkpointManager->getSnapshotInfo().range;
362    }
363
364    // Check if there's any more items remaining.
365    result.moreAvailable =
366            !rejectQueue.empty() || !backfillEmpty || ckptItemsAvailable;
367
368    return result;
369}
370
371void VBucket::setState(vbucket_state_t to) {
372    WriterLockHolder wlh(stateLock);
373    setState_UNLOCKED(to, wlh);
374}
375
376void VBucket::setState_UNLOCKED(vbucket_state_t to,
377                                WriterLockHolder &vbStateLock) {
378    vbucket_state_t oldstate = state;
379
380    if (to == vbucket_state_active &&
381        checkpointManager->getOpenCheckpointId() < 2) {
382        checkpointManager->setOpenCheckpointId(2);
383    }
384
385    LOG(EXTENSION_LOG_NOTICE,
386        "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
387        id,
388        VBucket::toString(oldstate),
389        VBucket::toString(to));
390
391    state = to;
392}
393
394vbucket_state VBucket::getVBucketState() const {
395     auto persisted_range = getPersistedSnapshot();
396
397     return vbucket_state{getState(),
398                          getPersistenceCheckpointId(),
399                          0,
400                          getHighSeqno(),
401                          getPurgeSeqno(),
402                          persisted_range.start,
403                          persisted_range.end,
404                          getMaxCas(),
405                          hlc.getEpochSeqno(),
406                          mightContainXattrs(),
407                          failovers->toJSON()};
408}
409
410
411
412void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
413{
414    ++dirtyQueueSize;
415    dirtyQueueMem.fetch_add(sizeof(Item));
416    ++dirtyQueueFill;
417    dirtyQueueAge.fetch_add(qi.getQueuedTime());
418    dirtyQueuePendingWrites.fetch_add(itemBytes);
419}
420
421void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
422    --dirtyQueueSize;
423    decrDirtyQueueMem(sizeof(Item));
424    ++dirtyQueueDrain;
425    decrDirtyQueueAge(qi.getQueuedTime());
426    decrDirtyQueuePendingWrites(itemBytes);
427}
428
429void VBucket::incrMetaDataDisk(const Item& qi) {
430    metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
431}
432
433void VBucket::decrMetaDataDisk(const Item& qi) {
434    // assume couchstore remove approx this much data from disk
435    metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
436}
437
438void VBucket::resetStats() {
439    opsCreate.store(0);
440    opsUpdate.store(0);
441    opsDelete.store(0);
442    opsReject.store(0);
443
444    stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
445    dirtyQueueMem.store(0);
446    dirtyQueueFill.store(0);
447    dirtyQueueAge.store(0);
448    dirtyQueuePendingWrites.store(0);
449    dirtyQueueDrain.store(0);
450
451    hlc.resetStats();
452}
453
454uint64_t VBucket::getQueueAge() {
455    uint64_t currDirtyQueueAge = dirtyQueueAge.load(std::memory_order_relaxed);
456    rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
457    if (currentAge < currDirtyQueueAge) {
458        return 0;
459    }
460    return (currentAge - currDirtyQueueAge) * 1000;
461}
462
463template <typename T>
464void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
465                      const void *c) {
466    std::string stat = statPrefix;
467    if (nm != NULL) {
468        add_prefixed_stat(statPrefix, nm, val, add_stat, c);
469    } else {
470        add_casted_stat(statPrefix.data(), val, add_stat, c);
471    }
472}
473
474void VBucket::handlePreExpiry(const std::unique_lock<std::mutex>& hbl,
475                              StoredValue& v) {
476    value_t value = v.getValue();
477    if (value) {
478        std::unique_ptr<Item> itm(v.toItem(false, id));
479        item_info itm_info;
480        EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
481        itm_info =
482                itm->toItemInfo(failovers->getLatestUUID(), getHLCEpochSeqno());
483
484        SERVER_HANDLE_V1* sapi = engine->getServerApi();
485        /* TODO: In order to minimize allocations, the callback needs to
486         * allocate an item whose value size will be exactly the size of the
487         * value after pre-expiry is performed.
488         */
489        auto result = sapi->document->pre_expiry(itm_info);
490        // The API states only uncompressed xattr values are returned
491        auto datatype = PROTOCOL_BINARY_DATATYPE_XATTR;
492        if (!result.empty()) {
493            Item new_item(v.getKey(),
494                          v.getFlags(),
495                          v.getExptime(),
496                          result.data(),
497                          result.size(),
498                          datatype,
499                          v.getCas(),
500                          v.getBySeqno(),
501                          id,
502                          v.getRevSeqno());
503
504            new_item.setNRUValue(v.getNRUValue());
505            new_item.setFreqCounterValue(v.getFreqCounterValue());
506            new_item.setDeleted();
507            ht.unlocked_updateStoredValue(hbl, v, new_item);
508        }
509    }
510}
511
512bool VBucket::addPendingOp(const void* cookie) {
513    LockHolder lh(pendingOpLock);
514    if (state != vbucket_state_pending) {
515        // State transitioned while we were waiting.
516        return false;
517    }
518    // Start a timer when enqueuing the first client.
519    if (pendingOps.empty()) {
520        pendingOpsStart = ProcessClock::now();
521    }
522    pendingOps.push_back(cookie);
523    ++stats.pendingOps;
524    ++stats.pendingOpsTotal;
525    return true;
526}
527
528uint64_t VBucket::getPersistenceCheckpointId() const {
529    return persistenceCheckpointId.load();
530}
531
532void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
533    persistenceCheckpointId.store(checkpointId);
534}
535
536void VBucket::markDirty(const DocKey& key) {
537    auto hbl = ht.getLockedBucket(key);
538    StoredValue* v = ht.unlocked_find(
539            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
540    if (v) {
541        v->markDirty();
542    } else {
543        LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
544            "is missing from vb:%" PRIu16, id);
545    }
546}
547
548bool VBucket::isResidentRatioUnderThreshold(float threshold) {
549    if (eviction != FULL_EVICTION) {
550        throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
551                "policy (which is " + std::to_string(eviction) +
552                ") must be FULL_EVICTION");
553    }
554    size_t num_items = getNumItems();
555    size_t num_non_resident_items = getNumNonResidentItems();
556    float ratio =
557            num_items
558                    ? ((float)(num_items - num_non_resident_items) / num_items)
559                    : 0.0;
560    if (threshold >= ratio) {
561        return true;
562    } else {
563        return false;
564    }
565}
566
567void VBucket::createFilter(size_t key_count, double probability) {
568    // Create the actual bloom filter upon vbucket creation during
569    // scenarios:
570    //      - Bucket creation
571    //      - Rebalance
572    LockHolder lh(bfMutex);
573    if (bFilter == nullptr && tempFilter == nullptr) {
574        bFilter = std::make_unique<BloomFilter>(key_count, probability,
575                                        BFILTER_ENABLED);
576    } else {
577        LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
578            " already exist!", id);
579    }
580}
581
582void VBucket::initTempFilter(size_t key_count, double probability) {
583    // Create a temp bloom filter with status as COMPACTING,
584    // if the main filter is found to exist, set its state to
585    // COMPACTING as well.
586    LockHolder lh(bfMutex);
587    tempFilter = std::make_unique<BloomFilter>(key_count, probability,
588                                     BFILTER_COMPACTING);
589    if (bFilter) {
590        bFilter->setStatus(BFILTER_COMPACTING);
591    }
592}
593
594void VBucket::addToFilter(const DocKey& key) {
595    LockHolder lh(bfMutex);
596    if (bFilter) {
597        bFilter->addKey(key);
598    }
599
600    // If the temp bloom filter is not found to be NULL,
601    // it means that compaction is running on the particular
602    // vbucket. Therefore add the key to the temp filter as
603    // well, as once compaction completes the temp filter
604    // will replace the main bloom filter.
605    if (tempFilter) {
606        tempFilter->addKey(key);
607    }
608}
609
610bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
611    LockHolder lh(bfMutex);
612    if (bFilter) {
613        return bFilter->maybeKeyExists(key);
614    } else {
615        // If filter doesn't exist, allow the BgFetch to go through.
616        return true;
617    }
618}
619
620bool VBucket::isTempFilterAvailable() {
621    LockHolder lh(bfMutex);
622    if (tempFilter &&
623        (tempFilter->getStatus() == BFILTER_COMPACTING ||
624         tempFilter->getStatus() == BFILTER_ENABLED)) {
625        return true;
626    } else {
627        return false;
628    }
629}
630
631void VBucket::addToTempFilter(const DocKey& key) {
632    // Keys will be added to only the temp filter during
633    // compaction.
634    LockHolder lh(bfMutex);
635    if (tempFilter) {
636        tempFilter->addKey(key);
637    }
638}
639
640void VBucket::swapFilter() {
641    // Delete the main bloom filter and replace it with
642    // the temp filter that was populated during compaction,
643    // only if the temp filter's state is found to be either at
644    // COMPACTING or ENABLED (if in the case the user enables
645    // bloomfilters for some reason while compaction was running).
646    // Otherwise, it indicates that the filter's state was
647    // possibly disabled during compaction, therefore clear out
648    // the temp filter. If it gets enabled at some point, a new
649    // bloom filter will be made available after the next
650    // compaction.
651
652    LockHolder lh(bfMutex);
653    if (tempFilter) {
654        bFilter.reset();
655
656        if (tempFilter->getStatus() == BFILTER_COMPACTING ||
657             tempFilter->getStatus() == BFILTER_ENABLED) {
658            bFilter = std::move(tempFilter);
659            bFilter->setStatus(BFILTER_ENABLED);
660        }
661        tempFilter.reset();
662    }
663}
664
665void VBucket::clearFilter() {
666    LockHolder lh(bfMutex);
667    bFilter.reset();
668    tempFilter.reset();
669}
670
671void VBucket::setFilterStatus(bfilter_status_t to) {
672    LockHolder lh(bfMutex);
673    if (bFilter) {
674        bFilter->setStatus(to);
675    }
676    if (tempFilter) {
677        tempFilter->setStatus(to);
678    }
679}
680
681std::string VBucket::getFilterStatusString() {
682    LockHolder lh(bfMutex);
683    if (bFilter) {
684        return bFilter->getStatusString();
685    } else if (tempFilter) {
686        return tempFilter->getStatusString();
687    } else {
688        return "DOESN'T EXIST";
689    }
690}
691
692size_t VBucket::getFilterSize() {
693    LockHolder lh(bfMutex);
694    if (bFilter) {
695        return bFilter->getFilterSize();
696    } else {
697        return 0;
698    }
699}
700
701size_t VBucket::getNumOfKeysInFilter() {
702    LockHolder lh(bfMutex);
703    if (bFilter) {
704        return bFilter->getNumOfKeysInFilter();
705    } else {
706        return 0;
707    }
708}
709
710VBNotifyCtx VBucket::queueDirty(
711        StoredValue& v,
712        const GenerateBySeqno generateBySeqno,
713        const GenerateCas generateCas,
714        const GenerateDeleteTime generateDeleteTime,
715        const bool isBackfillItem,
716        PreLinkDocumentContext* preLinkDocumentContext) {
717    VBNotifyCtx notifyCtx;
718
719    queued_item qi(v.toItem(false, getId()));
720
721    // MB-27457: Timestamp deletes only when they don't already have a timestamp
722    // assigned. This is here to ensure all deleted items have a timestamp which
723    // our tombstone purger can use to determine which tombstones to purge. A
724    // DCP replicated or deleteWithMeta created delete may already have a time
725    // assigned to it.
726    if (qi->isDeleted() && (generateDeleteTime == GenerateDeleteTime::Yes ||
727                            qi->getExptime() == 0)) {
728        qi->setExpTime(ep_real_time());
729    }
730
731    if (!mightContainXattrs() && mcbp::datatype::is_xattr(v.getDatatype())) {
732        setMightContainXattrs();
733    }
734
735    if (isBackfillItem) {
736        queueBackfillItem(qi, generateBySeqno);
737        notifyCtx.notifyFlusher = true;
738        /* During backfill on a TAP receiver we need to update the snapshot
739         range in the checkpoint. Has to be done here because in case of TAP
740         backfill, above, we use vb.queueBackfillItem() instead of
741         vb.checkpointManager->queueDirty() */
742        if (generateBySeqno == GenerateBySeqno::Yes) {
743            checkpointManager->resetSnapshotRange();
744        }
745    } else {
746        notifyCtx.notifyFlusher =
747                checkpointManager->queueDirty(*this,
748                                              qi,
749                                              generateBySeqno,
750                                              generateCas,
751                                              preLinkDocumentContext);
752        notifyCtx.notifyReplication = true;
753        if (GenerateCas::Yes == generateCas) {
754            v.setCas(qi->getCas());
755        }
756    }
757
758    v.setBySeqno(qi->getBySeqno());
759    notifyCtx.bySeqno = qi->getBySeqno();
760
761    return notifyCtx;
762}
763
764StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
765                                      const DocKey& key,
766                                      WantsDeleted wantsDeleted,
767                                      TrackReference trackReference,
768                                      QueueExpired queueExpired) {
769    if (!hbl.getHTLock()) {
770        throw std::logic_error(
771                "Hash bucket lock not held in "
772                "VBucket::fetchValidValue() for hash bucket: " +
773                std::to_string(hbl.getBucketNum()) + "for key: " +
774                std::string(reinterpret_cast<const char*>(key.data()),
775                            key.size()));
776    }
777    StoredValue* v = ht.unlocked_find(
778            key, hbl.getBucketNum(), wantsDeleted, trackReference);
779    if (v && !v->isDeleted() && !v->isTempItem()) {
780        // In the deleted case, we ignore expiration time.
781        if (v->isExpired(ep_real_time())) {
782            if (getState() != vbucket_state_active) {
783                return wantsDeleted == WantsDeleted::Yes ? v : NULL;
784            }
785
786            // queueDirty only allowed on active VB
787            if (queueExpired == QueueExpired::Yes &&
788                getState() == vbucket_state_active) {
789                incExpirationStat(ExpireBy::Access);
790                handlePreExpiry(hbl.getHTLock(), *v);
791                VBNotifyCtx notifyCtx;
792                std::tie(std::ignore, v, notifyCtx) =
793                        processExpiredItem(hbl, *v);
794                notifyNewSeqno(notifyCtx);
795            }
796            return wantsDeleted == WantsDeleted::Yes ? v : NULL;
797        }
798    }
799    return v;
800}
801
802void VBucket::incExpirationStat(const ExpireBy source) {
803    switch (source) {
804    case ExpireBy::Pager:
805        ++stats.expired_pager;
806        break;
807    case ExpireBy::Compactor:
808        ++stats.expired_compactor;
809        break;
810    case ExpireBy::Access:
811        ++stats.expired_access;
812        break;
813    }
814    ++numExpiredItems;
815}
816
817MutationStatus VBucket::setFromInternal(Item& itm) {
818    if (!hasMemoryForStoredValue(stats, itm, false)) {
819        return MutationStatus::NoMem;
820    }
821    return ht.set(itm);
822}
823
824cb::StoreIfStatus VBucket::callPredicate(cb::StoreIfPredicate predicate,
825                                         StoredValue* v) {
826    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
827    if (v) {
828        auto info = v->getItemInfo(failovers->getLatestUUID());
829        storeIfStatus = predicate(info, getInfo());
830        // No no, you can't ask for it again
831        if (storeIfStatus == cb::StoreIfStatus::GetItemInfo &&
832            info.is_initialized()) {
833            throw std::logic_error(
834                    "VBucket::callPredicate invalid result of GetItemInfo");
835        }
836    } else {
837        storeIfStatus = predicate({/*no info*/}, getInfo());
838    }
839
840    if (storeIfStatus == cb::StoreIfStatus::GetItemInfo &&
841        eviction == VALUE_ONLY) {
842        // We're VE, if we don't have, we don't have it.
843        storeIfStatus = cb::StoreIfStatus::Continue;
844    }
845
846    return storeIfStatus;
847}
848
849ENGINE_ERROR_CODE VBucket::set(Item& itm,
850                               const void* cookie,
851                               EventuallyPersistentEngine& engine,
852                               const int bgFetchDelay,
853                               cb::StoreIfPredicate predicate) {
854    bool cas_op = (itm.getCas() != 0);
855    auto hbl = ht.getLockedBucket(itm.getKey());
856    StoredValue* v = ht.unlocked_find(itm.getKey(),
857                                      hbl.getBucketNum(),
858                                      WantsDeleted::Yes,
859                                      TrackReference::No);
860
861    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
862    if (predicate &&
863        (storeIfStatus = callPredicate(predicate, v)) ==
864                cb::StoreIfStatus::Fail) {
865        return ENGINE_PREDICATE_FAILED;
866    }
867
868    if (v && v->isLocked(ep_current_time()) &&
869        (getState() == vbucket_state_replica ||
870         getState() == vbucket_state_pending)) {
871        v->unlock();
872    }
873
874    bool maybeKeyExists = true;
875    // If we didn't find a valid item then check the bloom filter, but only
876    // if we're full-eviction with a CAS operation or a have a predicate that
877    // requires the item's info
878    if ((v == nullptr || v->isTempInitialItem()) &&
879        (eviction == FULL_EVICTION) &&
880        ((itm.getCas() != 0) ||
881         storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
882        // Check Bloomfilter's prediction
883        if (!maybeKeyExistsInFilter(itm.getKey())) {
884            maybeKeyExists = false;
885        }
886    }
887
888    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
889    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
890                               GenerateCas::Yes,
891                               TrackCasDrift::No,
892                               /*isBackfillItem*/ false,
893                               &preLinkDocumentContext);
894
895    MutationStatus status;
896    boost::optional<VBNotifyCtx> notifyCtx;
897    std::tie(status, notifyCtx) = processSet(hbl,
898                                             v,
899                                             itm,
900                                             itm.getCas(),
901                                             /*allowExisting*/ true,
902                                             /*hashMetaData*/ false,
903                                             queueItmCtx,
904                                             storeIfStatus,
905                                             maybeKeyExists);
906
907    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
908    switch (status) {
909    case MutationStatus::NoMem:
910        ret = ENGINE_ENOMEM;
911        break;
912    case MutationStatus::InvalidCas:
913        ret = ENGINE_KEY_EEXISTS;
914        break;
915    case MutationStatus::IsLocked:
916        ret = ENGINE_LOCKED;
917        break;
918    case MutationStatus::NotFound:
919        if (cas_op) {
920            ret = ENGINE_KEY_ENOENT;
921            break;
922        }
923    // FALLTHROUGH
924    case MutationStatus::WasDirty:
925    // Even if the item was dirty, push it into the vbucket's open
926    // checkpoint.
927    case MutationStatus::WasClean:
928        notifyNewSeqno(*notifyCtx);
929
930        itm.setBySeqno(v->getBySeqno());
931        itm.setCas(v->getCas());
932        break;
933    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
934        // +
935        // full eviction.
936        if (v) {
937            // temp item is already created. Simply schedule a bg fetch job
938            hbl.getHTLock().unlock();
939            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
940            return ENGINE_EWOULDBLOCK;
941        }
942        ret = addTempItemAndBGFetch(
943                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
944        break;
945    }
946    }
947
948    return ret;
949}
950
951ENGINE_ERROR_CODE VBucket::replace(
952        Item& itm,
953        const void* cookie,
954        EventuallyPersistentEngine& engine,
955        const int bgFetchDelay,
956        cb::StoreIfPredicate predicate,
957        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
958    auto hbl = ht.getLockedBucket(itm.getKey());
959    StoredValue* v = ht.unlocked_find(itm.getKey(),
960                                      hbl.getBucketNum(),
961                                      WantsDeleted::Yes,
962                                      TrackReference::No);
963
964    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
965    if (predicate &&
966        (storeIfStatus = callPredicate(predicate, v)) ==
967                cb::StoreIfStatus::Fail) {
968        return ENGINE_PREDICATE_FAILED;
969    }
970
971    if (v) {
972        if (isLogicallyNonExistent(*v, readHandle)) {
973            ht.cleanupIfTemporaryItem(hbl, *v);
974            return ENGINE_KEY_ENOENT;
975        }
976
977        MutationStatus mtype;
978        boost::optional<VBNotifyCtx> notifyCtx;
979        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
980            mtype = MutationStatus::NeedBgFetch;
981        } else {
982            PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
983            VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
984                                       GenerateCas::Yes,
985                                       TrackCasDrift::No,
986                                       /*isBackfillItem*/ false,
987                                       &preLinkDocumentContext);
988            std::tie(mtype, notifyCtx) = processSet(hbl,
989                                                    v,
990                                                    itm,
991                                                    0,
992                                                    /*allowExisting*/ true,
993                                                    /*hasMetaData*/ false,
994                                                    queueItmCtx,
995                                                    storeIfStatus);
996        }
997
998        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
999        switch (mtype) {
1000        case MutationStatus::NoMem:
1001            ret = ENGINE_ENOMEM;
1002            break;
1003        case MutationStatus::IsLocked:
1004            ret = ENGINE_LOCKED;
1005            break;
1006        case MutationStatus::InvalidCas:
1007        case MutationStatus::NotFound:
1008            ret = ENGINE_NOT_STORED;
1009            break;
1010        // FALLTHROUGH
1011        case MutationStatus::WasDirty:
1012        // Even if the item was dirty, push it into the vbucket's open
1013        // checkpoint.
1014        case MutationStatus::WasClean:
1015            notifyNewSeqno(*notifyCtx);
1016
1017            itm.setBySeqno(v->getBySeqno());
1018            itm.setCas(v->getCas());
1019            break;
1020        case MutationStatus::NeedBgFetch: {
1021            // temp item is already created. Simply schedule a bg fetch job
1022            hbl.getHTLock().unlock();
1023            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1024            ret = ENGINE_EWOULDBLOCK;
1025            break;
1026        }
1027        }
1028
1029        return ret;
1030    } else {
1031        if (eviction == VALUE_ONLY) {
1032            return ENGINE_KEY_ENOENT;
1033        }
1034
1035        if (maybeKeyExistsInFilter(itm.getKey())) {
1036            return addTempItemAndBGFetch(
1037                    hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
1038        } else {
1039            // As bloomfilter predicted that item surely doesn't exist
1040            // on disk, return ENOENT for replace().
1041            return ENGINE_KEY_ENOENT;
1042        }
1043    }
1044}
1045
1046ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
1047                                           const GenerateBySeqno genBySeqno) {
1048    auto hbl = ht.getLockedBucket(itm.getKey());
1049    StoredValue* v = ht.unlocked_find(itm.getKey(),
1050                                      hbl.getBucketNum(),
1051                                      WantsDeleted::Yes,
1052                                      TrackReference::No);
1053
1054    // Note that this function is only called on replica or pending vbuckets.
1055    if (v && v->isLocked(ep_current_time())) {
1056        v->unlock();
1057    }
1058
1059    // MB-33919: DCP backfill - we must respect the value set by the producer
1060    // unless it is zero, which queueDirty will fix-up. Not mutations and
1061    // deletions come through this path.
1062    VBQueueItemCtx queueItmCtx(genBySeqno,
1063                               GenerateCas::No,
1064                               GenerateDeleteTime::No,
1065                               TrackCasDrift::No,
1066                               /*isBackfillItem*/ true,
1067                               nullptr /* No pre link should happen */);
1068    MutationStatus status;
1069    boost::optional<VBNotifyCtx> notifyCtx;
1070    std::tie(status, notifyCtx) = processSet(hbl,
1071                                             v,
1072                                             itm,
1073                                             0,
1074                                             /*allowExisting*/ true,
1075                                             /*hasMetaData*/ true,
1076                                             queueItmCtx,
1077                                             {/*no predicate*/});
1078
1079    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1080    switch (status) {
1081    case MutationStatus::NoMem:
1082        ret = ENGINE_ENOMEM;
1083        break;
1084    case MutationStatus::InvalidCas:
1085    case MutationStatus::IsLocked:
1086        ret = ENGINE_KEY_EEXISTS;
1087        break;
1088    case MutationStatus::WasDirty:
1089    // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
1090    // set correctly, and also the sequence numbers are ordered correctly.
1091    // (MB-14003)
1092    case MutationStatus::NotFound:
1093    // FALLTHROUGH
1094    case MutationStatus::WasClean: {
1095        setMaxCas(v->getCas());
1096        // we unlock ht lock here because we want to avoid potential lock
1097        // inversions arising from notifyNewSeqno() call
1098        hbl.getHTLock().unlock();
1099        notifyNewSeqno(*notifyCtx);
1100    } break;
1101    case MutationStatus::NeedBgFetch:
1102        throw std::logic_error(
1103                "VBucket::addBackfillItem: "
1104                "SET on a non-active vbucket should not require a "
1105                "bg_metadata_fetch.");
1106    }
1107
1108    return ret;
1109}
1110
1111ENGINE_ERROR_CODE VBucket::setWithMeta(
1112        Item& itm,
1113        uint64_t cas,
1114        uint64_t* seqno,
1115        const void* cookie,
1116        EventuallyPersistentEngine& engine,
1117        int bgFetchDelay,
1118        CheckConflicts checkConflicts,
1119        bool allowExisting,
1120        GenerateBySeqno genBySeqno,
1121        GenerateCas genCas,
1122        bool isReplication,
1123        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1124    auto hbl = ht.getLockedBucket(itm.getKey());
1125    StoredValue* v = ht.unlocked_find(itm.getKey(),
1126                                      hbl.getBucketNum(),
1127                                      WantsDeleted::Yes,
1128                                      TrackReference::No);
1129
1130    bool maybeKeyExists = true;
1131
1132    // Effectively ignore logically deleted keys, they cannot stop the op
1133    if (v && readHandle.isLogicallyDeleted(v->getBySeqno())) {
1134        // v is not really here, operate like it's not and skip conflict checks
1135        checkConflicts = CheckConflicts::No;
1136        // And ensure ADD_W_META works like SET_W_META, just overwrite existing
1137        allowExisting = true;
1138    }
1139
1140    if (checkConflicts == CheckConflicts::Yes) {
1141        if (v) {
1142            if (v->isTempInitialItem()) {
1143                bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1144                return ENGINE_EWOULDBLOCK;
1145            }
1146
1147            if (!(conflictResolver->resolve(*v,
1148                                            itm.getMetaData(),
1149                                            itm.getDataType(),
1150                                            itm.isDeleted()))) {
1151                ++stats.numOpsSetMetaResolutionFailed;
1152                // If the existing item happens to be a temporary item,
1153                // delete the item to save memory in the hash table
1154                if (v->isTempItem()) {
1155                    deleteStoredValue(hbl, *v);
1156                }
1157                return ENGINE_KEY_EEXISTS;
1158            }
1159        } else {
1160            if (maybeKeyExistsInFilter(itm.getKey())) {
1161                return addTempItemAndBGFetch(hbl,
1162                                             itm.getKey(),
1163                                             cookie,
1164                                             engine,
1165                                             bgFetchDelay,
1166                                             true,
1167                                             isReplication);
1168            } else {
1169                maybeKeyExists = false;
1170            }
1171        }
1172    } else {
1173        if (eviction == FULL_EVICTION) {
1174            // Check Bloomfilter's prediction
1175            if (!maybeKeyExistsInFilter(itm.getKey())) {
1176                maybeKeyExists = false;
1177            }
1178        }
1179    }
1180
1181    if (v && v->isLocked(ep_current_time()) &&
1182        (getState() == vbucket_state_replica ||
1183         getState() == vbucket_state_pending)) {
1184        v->unlock();
1185    }
1186
1187    // MB-33919: Do not generate the delete-time - delete's can come through
1188    // this path and the delete time from the input should be used (unless it
1189    // is 0, where it must be regenerated)
1190    VBQueueItemCtx queueItmCtx(genBySeqno,
1191                               genCas,
1192                               GenerateDeleteTime::No,
1193                               TrackCasDrift::Yes,
1194                               /*isBackfillItem*/ false,
1195                               nullptr /* No pre link step needed */);
1196    MutationStatus status;
1197    boost::optional<VBNotifyCtx> notifyCtx;
1198    std::tie(status, notifyCtx) = processSet(hbl,
1199                                             v,
1200                                             itm,
1201                                             cas,
1202                                             allowExisting,
1203                                             true,
1204                                             queueItmCtx,
1205                                             {/*no predicate*/},
1206                                             maybeKeyExists,
1207                                             isReplication);
1208
1209    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1210    switch (status) {
1211    case MutationStatus::NoMem:
1212        ret = ENGINE_ENOMEM;
1213        break;
1214    case MutationStatus::InvalidCas:
1215        ret = ENGINE_KEY_EEXISTS;
1216        break;
1217    case MutationStatus::IsLocked:
1218        ret = ENGINE_LOCKED;
1219        break;
1220    case MutationStatus::WasDirty:
1221    case MutationStatus::WasClean: {
1222        if (seqno) {
1223            *seqno = static_cast<uint64_t>(v->getBySeqno());
1224        }
1225        // we unlock ht lock here because we want to avoid potential lock
1226        // inversions arising from notifyNewSeqno() call
1227        hbl.getHTLock().unlock();
1228        notifyNewSeqno(*notifyCtx);
1229    } break;
1230    case MutationStatus::NotFound:
1231        ret = ENGINE_KEY_ENOENT;
1232        break;
1233    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1234        // + full eviction.
1235        if (v) { // temp item is already created. Simply schedule a
1236            hbl.getHTLock().unlock(); // bg fetch job.
1237            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1238            return ENGINE_EWOULDBLOCK;
1239        }
1240        ret = addTempItemAndBGFetch(hbl,
1241                                    itm.getKey(),
1242                                    cookie,
1243                                    engine,
1244                                    bgFetchDelay,
1245                                    true,
1246                                    isReplication);
1247    }
1248    }
1249
1250    return ret;
1251}
1252
1253ENGINE_ERROR_CODE VBucket::deleteItem(
1254        uint64_t& cas,
1255        const void* cookie,
1256        EventuallyPersistentEngine& engine,
1257        const int bgFetchDelay,
1258        ItemMetaData* itemMeta,
1259        mutation_descr_t& mutInfo,
1260        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1261    auto hbl = ht.getLockedBucket(readHandle.getKey());
1262    StoredValue* v = ht.unlocked_find(readHandle.getKey(),
1263                                      hbl.getBucketNum(),
1264                                      WantsDeleted::Yes,
1265                                      TrackReference::No);
1266
1267    if (!v || v->isDeleted() || v->isTempItem() ||
1268        readHandle.isLogicallyDeleted(v->getBySeqno())) {
1269        if (eviction == VALUE_ONLY) {
1270            return ENGINE_KEY_ENOENT;
1271        } else { // Full eviction.
1272            if (!v) { // Item might be evicted from cache.
1273                if (maybeKeyExistsInFilter(readHandle.getKey())) {
1274                    return addTempItemAndBGFetch(hbl,
1275                                                 readHandle.getKey(),
1276                                                 cookie,
1277                                                 engine,
1278                                                 bgFetchDelay,
1279                                                 true);
1280                } else {
1281                    // As bloomfilter predicted that item surely doesn't
1282                    // exist on disk, return ENOENT for deleteItem().
1283                    return ENGINE_KEY_ENOENT;
1284                }
1285            } else if (v->isTempInitialItem()) {
1286                hbl.getHTLock().unlock();
1287                bgFetch(readHandle.getKey(),
1288                        cookie,
1289                        engine,
1290                        bgFetchDelay,
1291                        true);
1292                return ENGINE_EWOULDBLOCK;
1293            } else { // Non-existent or deleted key.
1294                if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1295                    // Delete a temp non-existent item to ensure that
1296                    // if a delete were issued over an item that doesn't
1297                    // exist, then we don't preserve a temp item.
1298                    deleteStoredValue(hbl, *v);
1299                }
1300                return ENGINE_KEY_ENOENT;
1301            }
1302        }
1303    }
1304
1305    if (v->isLocked(ep_current_time()) &&
1306        (getState() == vbucket_state_replica ||
1307         getState() == vbucket_state_pending)) {
1308        v->unlock();
1309    }
1310
1311    if (itemMeta != nullptr) {
1312        itemMeta->cas = v->getCas();
1313    }
1314
1315    MutationStatus delrv;
1316    boost::optional<VBNotifyCtx> notifyCtx;
1317    if (v->isExpired(ep_real_time())) {
1318        std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1319    } else {
1320        ItemMetaData metadata;
1321        metadata.revSeqno = v->getRevSeqno() + 1;
1322        std::tie(delrv, v, notifyCtx) =
1323                processSoftDelete(hbl,
1324                                  *v,
1325                                  cas,
1326                                  metadata,
1327                                  VBQueueItemCtx(GenerateBySeqno::Yes,
1328                                                 GenerateCas::Yes,
1329                                                 GenerateDeleteTime::Yes,
1330                                                 TrackCasDrift::No,
1331                                                 /*isBackfillItem*/ false,
1332                                                 nullptr /* no pre link */),
1333                                  /*use_meta*/ false,
1334                                  /*bySeqno*/ v->getBySeqno());
1335    }
1336
1337    uint64_t seqno = 0;
1338    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1339    switch (delrv) {
1340    case MutationStatus::NoMem:
1341        ret = ENGINE_ENOMEM;
1342        break;
1343    case MutationStatus::InvalidCas:
1344        ret = ENGINE_KEY_EEXISTS;
1345        break;
1346    case MutationStatus::IsLocked:
1347        ret = ENGINE_LOCKED_TMPFAIL;
1348        break;
1349    case MutationStatus::NotFound:
1350        ret = ENGINE_KEY_ENOENT;
1351    /* Fallthrough:
1352     * A NotFound return value at this point indicates that the
1353     * item has expired. But, a deletion still needs to be queued
1354     * for the item in order to persist it.
1355     */
1356    case MutationStatus::WasClean:
1357    case MutationStatus::WasDirty:
1358        if (itemMeta != nullptr) {
1359            itemMeta->revSeqno = v->getRevSeqno();
1360            itemMeta->flags = v->getFlags();
1361            itemMeta->exptime = v->getExptime();
1362        }
1363
1364        notifyNewSeqno(*notifyCtx);
1365        seqno = static_cast<uint64_t>(v->getBySeqno());
1366        cas = v->getCas();
1367
1368        if (delrv != MutationStatus::NotFound) {
1369            mutInfo.seqno = seqno;
1370            mutInfo.vbucket_uuid = failovers->getLatestUUID();
1371            if (itemMeta != nullptr) {
1372                itemMeta->cas = v->getCas();
1373            }
1374        }
1375        break;
1376    case MutationStatus::NeedBgFetch:
1377        // We already figured out if a bg fetch is requred for a full-evicted
1378        // item above.
1379        throw std::logic_error(
1380                "VBucket::deleteItem: "
1381                "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1382    }
1383    return ret;
1384}
1385
1386ENGINE_ERROR_CODE VBucket::deleteWithMeta(
1387        uint64_t& cas,
1388        uint64_t* seqno,
1389        const void* cookie,
1390        EventuallyPersistentEngine& engine,
1391        int bgFetchDelay,
1392        CheckConflicts checkConflicts,
1393        const ItemMetaData& itemMeta,
1394        bool backfill,
1395        GenerateBySeqno genBySeqno,
1396        GenerateCas generateCas,
1397        uint64_t bySeqno,
1398        bool isReplication,
1399        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1400    const auto& key = readHandle.getKey();
1401    auto hbl = ht.getLockedBucket(key);
1402    StoredValue* v = ht.unlocked_find(
1403            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1404
1405    if (v && readHandle.isLogicallyDeleted(v->getBySeqno())) {
1406        return ENGINE_KEY_ENOENT;
1407    }
1408
1409    // Need conflict resolution?
1410    if (checkConflicts == CheckConflicts::Yes) {
1411        if (v) {
1412            if (v->isTempInitialItem()) {
1413                bgFetch(key, cookie, engine, bgFetchDelay, true);
1414                return ENGINE_EWOULDBLOCK;
1415            }
1416
1417            if (!(conflictResolver->resolve(*v,
1418                                            itemMeta,
1419                                            PROTOCOL_BINARY_RAW_BYTES,
1420                                            true))) {
1421                ++stats.numOpsDelMetaResolutionFailed;
1422                return ENGINE_KEY_EEXISTS;
1423            }
1424        } else {
1425            // Item is 1) deleted or not existent in the value eviction case OR
1426            // 2) deleted or evicted in the full eviction.
1427            if (maybeKeyExistsInFilter(key)) {
1428                return addTempItemAndBGFetch(hbl,
1429                                             key,
1430                                             cookie,
1431                                             engine,
1432                                             bgFetchDelay,
1433                                             true,
1434                                             isReplication);
1435            } else {
1436                // Even though bloomfilter predicted that item doesn't exist
1437                // on disk, we must put this delete on disk if the cas is valid.
1438                TempAddStatus rv = addTempStoredValue(hbl, key, isReplication);
1439                if (rv == TempAddStatus::NoMem) {
1440                    return ENGINE_ENOMEM;
1441                }
1442                v = ht.unlocked_find(key,
1443                                     hbl.getBucketNum(),
1444                                     WantsDeleted::Yes,
1445                                     TrackReference::No);
1446                v->setTempDeleted();
1447            }
1448        }
1449    } else {
1450        if (!v) {
1451            // We should always try to persist a delete here.
1452            TempAddStatus rv = addTempStoredValue(hbl, key, isReplication);
1453            if (rv == TempAddStatus::NoMem) {
1454                return ENGINE_ENOMEM;
1455            }
1456            v = ht.unlocked_find(key,
1457                                 hbl.getBucketNum(),
1458                                 WantsDeleted::Yes,
1459                                 TrackReference::No);
1460            v->setTempDeleted();
1461            v->setCas(cas);
1462        } else if (v->isTempInitialItem()) {
1463            v->setTempDeleted();
1464            v->setCas(cas);
1465        }
1466    }
1467
1468    if (v && v->isLocked(ep_current_time()) &&
1469        (getState() == vbucket_state_replica ||
1470         getState() == vbucket_state_pending)) {
1471        v->unlock();
1472    }
1473
1474    MutationStatus delrv;
1475    boost::optional<VBNotifyCtx> notifyCtx;
1476    bool metaBgFetch = true;
1477    if (!v) {
1478        if (eviction == FULL_EVICTION) {
1479            delrv = MutationStatus::NeedBgFetch;
1480        } else {
1481            delrv = MutationStatus::NotFound;
1482        }
1483    } else if (mcbp::datatype::is_xattr(v->getDatatype()) && !v->isResident()) {
1484        // MB-25671: A temp deleted xattr with no value must be fetched before
1485        // the deleteWithMeta can be applied.
1486        delrv = MutationStatus::NeedBgFetch;
1487        metaBgFetch = false;
1488    } else {
1489        // MB-33919: The incoming meta.exptime should be used as the delete-time
1490        // so request GenerateDeleteTime::No, if the incoming value is 0, a new
1491        // delete-time will be generated.
1492        VBQueueItemCtx queueItmCtx(genBySeqno,
1493                                   generateCas,
1494                                   GenerateDeleteTime::No,
1495                                   TrackCasDrift::Yes,
1496                                   backfill,
1497                                   nullptr /* No pre link step needed */);
1498
1499        // system xattrs must remain, however no need to prune xattrs if this is
1500        // a replication call, the active has done this and we must just store
1501        // what we're given
1502        std::unique_ptr<Item> itm;
1503        if (!isReplication && mcbp::datatype::is_xattr(v->getDatatype()) &&
1504            (itm = pruneXattrDocument(*v, itemMeta))) {
1505            // A new item has been generated and must be given a new seqno
1506            queueItmCtx.genBySeqno = GenerateBySeqno::Yes;
1507
1508            std::tie(v, delrv, notifyCtx) =
1509                    updateStoredValue(hbl, *v, *itm, queueItmCtx);
1510        } else {
1511            std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1512                                                              *v,
1513                                                              cas,
1514                                                              itemMeta,
1515                                                              queueItmCtx,
1516                                                              /*use_meta*/ true,
1517                                                              bySeqno);
1518        }
1519    }
1520    cas = v ? v->getCas() : 0;
1521
1522    switch (delrv) {
1523    case MutationStatus::NoMem:
1524        return ENGINE_ENOMEM;
1525    case MutationStatus::InvalidCas:
1526        return ENGINE_KEY_EEXISTS;
1527    case MutationStatus::IsLocked:
1528        return ENGINE_LOCKED_TMPFAIL;
1529    case MutationStatus::NotFound:
1530        return ENGINE_KEY_ENOENT;
1531    case MutationStatus::WasDirty:
1532    case MutationStatus::WasClean: {
1533        if (seqno) {
1534            *seqno = static_cast<uint64_t>(v->getBySeqno());
1535        }
1536        // we unlock ht lock here because we want to avoid potential lock
1537        // inversions arising from notifyNewSeqno() call
1538        hbl.getHTLock().unlock();
1539        notifyNewSeqno(*notifyCtx);
1540        break;
1541    }
1542    case MutationStatus::NeedBgFetch:
1543        hbl.getHTLock().unlock();
1544        bgFetch(key, cookie, engine, bgFetchDelay, metaBgFetch);
1545        return ENGINE_EWOULDBLOCK;
1546    }
1547    return ENGINE_SUCCESS;
1548}
1549
1550void VBucket::deleteExpiredItem(const Item& it,
1551                                time_t startTime,
1552                                ExpireBy source) {
1553
1554    // The item is correctly trimmed (by the caller). Fetch the one in the
1555    // hashtable and replace it if the CAS match (same item; no race).
1556    // If not found in the hashtable we should add it as a deleted item
1557    const DocKey& key = it.getKey();
1558    auto hbl = ht.getLockedBucket(key);
1559    StoredValue* v = ht.unlocked_find(
1560            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1561    if (v) {
1562        if (v->getCas() != it.getCas()) {
1563            return;
1564        }
1565
1566        if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1567            bool deleted = deleteStoredValue(hbl, *v);
1568            if (!deleted) {
1569                throw std::logic_error(
1570                        "VBucket::deleteExpiredItem: "
1571                        "Failed to delete seqno:" +
1572                        std::to_string(v->getBySeqno()) + " from bucket " +
1573                        std::to_string(hbl.getBucketNum()));
1574            }
1575        } else if (v->isExpired(startTime) && !v->isDeleted()) {
1576            VBNotifyCtx notifyCtx;
1577            ht.unlocked_updateStoredValue(hbl.getHTLock(), *v, it);
1578            std::tie(std::ignore, std::ignore, notifyCtx) =
1579                    processExpiredItem(hbl, *v);
1580            // we unlock ht lock here because we want to avoid potential lock
1581            // inversions arising from notifyNewSeqno() call
1582            hbl.getHTLock().unlock();
1583            notifyNewSeqno(notifyCtx);
1584        }
1585    } else {
1586        if (eviction == FULL_EVICTION) {
1587            // Create a temp item and delete and push it
1588            // into the checkpoint queue, only if the bloomfilter
1589            // predicts that the item may exist on disk.
1590            if (maybeKeyExistsInFilter(key)) {
1591                TempAddStatus rv = addTempStoredValue(hbl, key);
1592                if (rv == TempAddStatus::NoMem) {
1593                    return;
1594                }
1595                v = ht.unlocked_find(key,
1596                                     hbl.getBucketNum(),
1597                                     WantsDeleted::Yes,
1598                                     TrackReference::No);
1599                v->setTempDeleted();
1600                v->setRevSeqno(it.getRevSeqno());
1601                ht.unlocked_updateStoredValue(hbl.getHTLock(), *v, it);
1602                VBNotifyCtx notifyCtx;
1603                std::tie(std::ignore, std::ignore, notifyCtx) =
1604                        processExpiredItem(hbl, *v);
1605                // we unlock ht lock here because we want to avoid potential
1606                // lock inversions arising from notifyNewSeqno() call
1607                hbl.getHTLock().unlock();
1608                notifyNewSeqno(notifyCtx);
1609            }
1610        }
1611    }
1612    incExpirationStat(source);
1613}
1614
1615ENGINE_ERROR_CODE VBucket::add(
1616        Item& itm,
1617        const void* cookie,
1618        EventuallyPersistentEngine& engine,
1619        int bgFetchDelay,
1620        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1621    auto hbl = ht.getLockedBucket(itm.getKey());
1622    StoredValue* v = ht.unlocked_find(itm.getKey(),
1623                                      hbl.getBucketNum(),
1624                                      WantsDeleted::Yes,
1625                                      TrackReference::No);
1626
1627    bool maybeKeyExists = true;
1628    if ((v == nullptr || v->isTempInitialItem()) &&
1629        (eviction == FULL_EVICTION)) {
1630        // Check bloomfilter's prediction
1631        if (!maybeKeyExistsInFilter(itm.getKey())) {
1632            maybeKeyExists = false;
1633        }
1634    }
1635
1636    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1637    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1638                               GenerateCas::Yes,
1639                               TrackCasDrift::No,
1640                               /*isBackfillItem*/ false,
1641                               &preLinkDocumentContext);
1642    AddStatus status;
1643    boost::optional<VBNotifyCtx> notifyCtx;
1644    std::tie(status, notifyCtx) = processAdd(
1645            hbl, v, itm, maybeKeyExists, false, queueItmCtx, readHandle);
1646
1647    switch (status) {
1648    case AddStatus::NoMem:
1649        return ENGINE_ENOMEM;
1650    case AddStatus::Exists:
1651        return ENGINE_NOT_STORED;
1652    case AddStatus::AddTmpAndBgFetch:
1653        return addTempItemAndBGFetch(
1654                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1655    case AddStatus::BgFetch:
1656        hbl.getHTLock().unlock();
1657        bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1658        return ENGINE_EWOULDBLOCK;
1659    case AddStatus::Success:
1660    case AddStatus::UnDel:
1661        notifyNewSeqno(*notifyCtx);
1662        itm.setBySeqno(v->getBySeqno());
1663        itm.setCas(v->getCas());
1664        break;
1665    }
1666    return ENGINE_SUCCESS;
1667}
1668
1669std::pair<MutationStatus, GetValue> VBucket::processGetAndUpdateTtl(
1670        HashTable::HashBucketLock& hbl,
1671        StoredValue* v,
1672        time_t exptime,
1673        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1674    if (v) {
1675        if (isLogicallyNonExistent(*v, readHandle)) {
1676            ht.cleanupIfTemporaryItem(hbl, *v);
1677            return {MutationStatus::NotFound, GetValue()};
1678        }
1679
1680        if (!v->isResident()) {
1681            return {MutationStatus::NeedBgFetch, GetValue()};
1682        }
1683
1684        if (v->isLocked(ep_current_time())) {
1685            return {MutationStatus::IsLocked,
1686                    GetValue(nullptr, ENGINE_KEY_EEXISTS, 0)};
1687        }
1688
1689        const bool exptime_mutated = exptime != v->getExptime();
1690        auto bySeqNo = v->getBySeqno();
1691        if (exptime_mutated) {
1692            v->markDirty();
1693            v->setExptime(exptime);
1694            v->setRevSeqno(v->getRevSeqno() + 1);
1695        }
1696
1697        GetValue rv(v->toItem(v->isLocked(ep_current_time()), getId()),
1698                    ENGINE_SUCCESS,
1699                    bySeqNo);
1700
1701        if (exptime_mutated) {
1702            VBQueueItemCtx qItemCtx(GenerateBySeqno::Yes,
1703                                    GenerateCas::Yes,
1704                                    TrackCasDrift::No,
1705                                    false,
1706                                    nullptr);
1707            VBNotifyCtx notifyCtx;
1708            std::tie(v, std::ignore, notifyCtx) =
1709                    updateStoredValue(hbl, *v, *rv.item, qItemCtx, true);
1710            rv.item->setCas(v->getCas());
1711            // we unlock ht lock here because we want to avoid potential lock
1712            // inversions arising from notifyNewSeqno() call
1713            hbl.getHTLock().unlock();
1714            notifyNewSeqno(notifyCtx);
1715        }
1716
1717        return {MutationStatus::WasClean, std::move(rv)};
1718    } else {
1719        if (eviction == VALUE_ONLY) {
1720            return {MutationStatus::NotFound, GetValue()};
1721        } else {
1722            if (maybeKeyExistsInFilter(readHandle.getKey())) {
1723                return {MutationStatus::NeedBgFetch, GetValue()};
1724            } else {
1725                // As bloomfilter predicted that item surely doesn't exist
1726                // on disk, return ENOENT for getAndUpdateTtl().
1727                return {MutationStatus::NotFound, GetValue()};
1728            }
1729        }
1730    }
1731}
1732
1733GetValue VBucket::getAndUpdateTtl(
1734        const void* cookie,
1735        EventuallyPersistentEngine& engine,
1736        int bgFetchDelay,
1737        time_t exptime,
1738        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1739    auto hbl = ht.getLockedBucket(readHandle.getKey());
1740    StoredValue* v = fetchValidValue(hbl,
1741                                     readHandle.getKey(),
1742                                     WantsDeleted::Yes,
1743                                     TrackReference::Yes,
1744                                     QueueExpired::Yes);
1745    GetValue gv;
1746    MutationStatus status;
1747    std::tie(status, gv) = processGetAndUpdateTtl(hbl, v, exptime, readHandle);
1748
1749    if (status == MutationStatus::NeedBgFetch) {
1750        if (v) {
1751            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay);
1752            return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1753        } else {
1754            ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(hbl,
1755                                                         readHandle.getKey(),
1756                                                         cookie,
1757                                                         engine,
1758                                                         bgFetchDelay,
1759                                                         false);
1760            return GetValue(NULL, ec, -1, true);
1761        }
1762    }
1763
1764    return gv;
1765}
1766
1767GetValue VBucket::getInternal(
1768        const void* cookie,
1769        EventuallyPersistentEngine& engine,
1770        int bgFetchDelay,
1771        get_options_t options,
1772        bool diskFlushAll,
1773        GetKeyOnly getKeyOnly,
1774        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1775    const TrackReference trackReference = (options & TRACK_REFERENCE)
1776                                                  ? TrackReference::Yes
1777                                                  : TrackReference::No;
1778    const bool metadataOnly = (options & ALLOW_META_ONLY);
1779    const bool getDeletedValue = (options & GET_DELETED_VALUE);
1780    const bool bgFetchRequired = (options & QUEUE_BG_FETCH);
1781    auto hbl = ht.getLockedBucket(readHandle.getKey());
1782    StoredValue* v = fetchValidValue(hbl,
1783                                     readHandle.getKey(),
1784                                     WantsDeleted::Yes,
1785                                     trackReference,
1786                                     QueueExpired::Yes);
1787    if (v) {
1788        // 1 If SV is deleted and user didn't request deleted items
1789        // 2 (or) If collection says this key is gone.
1790        // then return ENOENT.
1791        if ((v->isDeleted() && !getDeletedValue) ||
1792            readHandle.isLogicallyDeleted(v->getBySeqno())) {
1793            return GetValue();
1794        }
1795
1796        // If SV is a temp deleted item (i.e. marker added after a BgFetch to
1797        // note that the item has been deleted), *but* the user requested
1798        // full deleted items, then we need to fetch the complete deleted item
1799        // (including body) from disk.
1800        if (v->isTempDeletedItem() && getDeletedValue && !metadataOnly) {
1801            const auto queueBgFetch =
1802                    (bgFetchRequired) ? QueueBgFetch::Yes : QueueBgFetch::No;
1803            return getInternalNonResident(readHandle.getKey(),
1804                                          cookie,
1805                                          engine,
1806                                          bgFetchDelay,
1807                                          queueBgFetch,
1808                                          *v);
1809        }
1810
1811        // If SV is otherwise a temp non-existent (i.e. a marker added after a
1812        // BgFetch to note that no such item exists) or temp deleted, then we
1813        // should cleanup the SV (if requested) before returning ENOENT (so we
1814        // don't keep temp items in HT).
1815        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1816            if (options & DELETE_TEMP) {
1817                deleteStoredValue(hbl, *v);
1818            }
1819            return GetValue();
1820        }
1821
1822        // If the value is not resident (and it was requested), wait for it...
1823        if (!v->isResident() && !metadataOnly) {
1824            auto queueBgFetch = (bgFetchRequired) ?
1825                    QueueBgFetch::Yes :
1826                    QueueBgFetch::No;
1827            return getInternalNonResident(readHandle.getKey(),
1828                                          cookie,
1829                                          engine,
1830                                          bgFetchDelay,
1831                                          queueBgFetch,
1832                                          *v);
1833        }
1834
1835        // Should we hide (return -1) for the items' CAS?
1836        const bool hideCas =
1837                (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1838        std::unique_ptr<Item> item;
1839        if (getKeyOnly == GetKeyOnly::Yes) {
1840            item = v->toItemKeyOnly(getId());
1841        } else {
1842            item = v->toItem(hideCas, getId());
1843        }
1844        return GetValue(std::move(item),
1845                        ENGINE_SUCCESS,
1846                        v->getBySeqno(),
1847                        !v->isResident(),
1848                        v->getNRUValue());
1849    } else {
1850        if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1851            return GetValue();
1852        }
1853
1854        if (maybeKeyExistsInFilter(readHandle.getKey())) {
1855            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1856            if (bgFetchRequired) { // Full eviction and need a bg fetch.
1857                ec = addTempItemAndBGFetch(hbl,
1858                                           readHandle.getKey(),
1859                                           cookie,
1860                                           engine,
1861                                           bgFetchDelay,
1862                                           metadataOnly);
1863            }
1864            return GetValue(NULL, ec, -1, true);
1865        } else {
1866            // As bloomfilter predicted that item surely doesn't exist
1867            // on disk, return ENOENT, for getInternal().
1868            return GetValue();
1869        }
1870    }
1871}
1872
1873ENGINE_ERROR_CODE VBucket::getMetaData(
1874        const void* cookie,
1875        EventuallyPersistentEngine& engine,
1876        int bgFetchDelay,
1877        const Collections::VB::Manifest::CachingReadHandle& readHandle,
1878        ItemMetaData& metadata,
1879        uint32_t& deleted,
1880        uint8_t& datatype) {
1881    deleted = 0;
1882    auto hbl = ht.getLockedBucket(readHandle.getKey());
1883    StoredValue* v = ht.unlocked_find(readHandle.getKey(),
1884                                      hbl.getBucketNum(),
1885                                      WantsDeleted::Yes,
1886                                      TrackReference::No);
1887
1888    if (v) {
1889        stats.numOpsGetMeta++;
1890        if (v->isTempInitialItem()) {
1891            // Need bg meta fetch.
1892            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay, true);
1893            return ENGINE_EWOULDBLOCK;
1894        } else if (v->isTempNonExistentItem()) {
1895            metadata.cas = v->getCas();
1896            return ENGINE_KEY_ENOENT;
1897        } else if (readHandle.isLogicallyDeleted(v->getBySeqno())) {
1898            return ENGINE_KEY_ENOENT;
1899        } else {
1900            if (v->isTempDeletedItem() || v->isDeleted() ||
1901                v->isExpired(ep_real_time())) {
1902                deleted |= GET_META_ITEM_DELETED_FLAG;
1903            }
1904
1905            if (v->isLocked(ep_current_time())) {
1906                metadata.cas = static_cast<uint64_t>(-1);
1907            } else {
1908                metadata.cas = v->getCas();
1909            }
1910            metadata.flags = v->getFlags();
1911            metadata.exptime = v->getExptime();
1912            metadata.revSeqno = v->getRevSeqno();
1913            datatype = v->getDatatype();
1914
1915            return ENGINE_SUCCESS;
1916        }
1917    } else {
1918        // The key wasn't found. However, this may be because it was previously
1919        // deleted or evicted with the full eviction strategy.
1920        // So, add a temporary item corresponding to the key to the hash table
1921        // and schedule a background fetch for its metadata from the persistent
1922        // store. The item's state will be updated after the fetch completes.
1923        //
1924        // Schedule this bgFetch only if the key is predicted to be may-be
1925        // existent on disk by the bloomfilter.
1926
1927        if (maybeKeyExistsInFilter(readHandle.getKey())) {
1928            return addTempItemAndBGFetch(hbl,
1929                                         readHandle.getKey(),
1930                                         cookie,
1931                                         engine,
1932                                         bgFetchDelay,
1933                                         true);
1934        } else {
1935            stats.numOpsGetMeta++;
1936            return ENGINE_KEY_ENOENT;
1937        }
1938    }
1939}
1940
1941ENGINE_ERROR_CODE VBucket::getKeyStats(
1942        const void* cookie,
1943        EventuallyPersistentEngine& engine,
1944        int bgFetchDelay,
1945        struct key_stats& kstats,
1946        WantsDeleted wantsDeleted,
1947        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1948    auto hbl = ht.getLockedBucket(readHandle.getKey());
1949    StoredValue* v = fetchValidValue(hbl,
1950                                     readHandle.getKey(),
1951                                     WantsDeleted::Yes,
1952                                     TrackReference::Yes,
1953                                     QueueExpired::Yes);
1954
1955    if (v) {
1956        if ((v->isDeleted() ||
1957             readHandle.isLogicallyDeleted(v->getBySeqno())) &&
1958            wantsDeleted == WantsDeleted::No) {
1959            return ENGINE_KEY_ENOENT;
1960        }
1961
1962        if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1963            deleteStoredValue(hbl, *v);
1964            return ENGINE_KEY_ENOENT;
1965        }
1966        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1967            hbl.getHTLock().unlock();
1968            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay, true);
1969            return ENGINE_EWOULDBLOCK;
1970        }
1971        kstats.logically_deleted =
1972                v->isDeleted() ||
1973                readHandle.isLogicallyDeleted(v->getBySeqno());
1974        kstats.dirty = v->isDirty();
1975        kstats.exptime = v->getExptime();
1976        kstats.flags = v->getFlags();
1977        kstats.cas = v->getCas();
1978        kstats.vb_state = getState();
1979        kstats.resident = v->isResident();
1980
1981        return ENGINE_SUCCESS;
1982    } else {
1983        if (eviction == VALUE_ONLY) {
1984            return ENGINE_KEY_ENOENT;
1985        } else {
1986            if (maybeKeyExistsInFilter(readHandle.getKey())) {
1987                return addTempItemAndBGFetch(hbl,
1988                                             readHandle.getKey(),
1989                                             cookie,
1990                                             engine,
1991                                             bgFetchDelay,
1992                                             true);
1993            } else {
1994                // If bgFetch were false, or bloomfilter predicted that
1995                // item surely doesn't exist on disk, return ENOENT for
1996                // getKeyStats().
1997                return ENGINE_KEY_ENOENT;
1998            }
1999        }
2000    }
2001}
2002
2003GetValue VBucket::getLocked(
2004        rel_time_t currentTime,
2005        uint32_t lockTimeout,
2006        const void* cookie,
2007        EventuallyPersistentEngine& engine,
2008        int bgFetchDelay,
2009        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2010    auto hbl = ht.getLockedBucket(readHandle.getKey());
2011    StoredValue* v = fetchValidValue(hbl,
2012                                     readHandle.getKey(),
2013                                     WantsDeleted::Yes,
2014                                     TrackReference::Yes,
2015                                     QueueExpired::Yes);
2016
2017    if (v) {
2018        if (isLogicallyNonExistent(*v, readHandle)) {
2019            ht.cleanupIfTemporaryItem(hbl, *v);
2020            return GetValue(NULL, ENGINE_KEY_ENOENT);
2021        }
2022
2023        // if v is locked return error
2024        if (v->isLocked(currentTime)) {
2025            return GetValue(NULL, ENGINE_LOCKED_TMPFAIL);
2026        }
2027
2028        // If the value is not resident, wait for it...
2029        if (!v->isResident()) {
2030            if (cookie) {
2031                bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay);
2032            }
2033            return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
2034        }
2035
2036        // acquire lock and increment cas value
2037        v->lock(currentTime + lockTimeout);
2038
2039        auto it = v->toItem(false, getId());
2040        it->setCas(nextHLCCas());
2041        v->setCas(it->getCas());
2042
2043        return GetValue(std::move(it));
2044
2045    } else {
2046        // No value found in the hashtable.
2047        switch (eviction) {
2048        case VALUE_ONLY:
2049            return GetValue(NULL, ENGINE_KEY_ENOENT);
2050
2051        case FULL_EVICTION:
2052            if (maybeKeyExistsInFilter(readHandle.getKey())) {
2053                ENGINE_ERROR_CODE ec =
2054                        addTempItemAndBGFetch(hbl,
2055                                              readHandle.getKey(),
2056                                              cookie,
2057                                              engine,
2058                                              bgFetchDelay,
2059                                              false);
2060                return GetValue(NULL, ec, -1, true);
2061            } else {
2062                // As bloomfilter predicted that item surely doesn't exist
2063                // on disk, return ENOENT for getLocked().
2064                return GetValue(NULL, ENGINE_KEY_ENOENT);
2065            }
2066        }
2067        return GetValue(); // just to prevent compiler warning
2068    }
2069}
2070
2071void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
2072    auto hbl = ht.getLockedBucket(queuedItem.getKey());
2073    StoredValue* v = fetchValidValue(hbl,
2074                                     queuedItem.getKey(),
2075                                     WantsDeleted::Yes,
2076                                     TrackReference::No,
2077                                     QueueExpired::Yes);
2078    // Delete the item in the hash table iff:
2079    //  1. Item is existent in hashtable, and deleted flag is true
2080    //  2. rev seqno of queued item matches rev seqno of hash table item
2081    if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
2082        bool isDeleted = deleteStoredValue(hbl, *v);
2083        if (!isDeleted) {
2084            throw std::logic_error(
2085                    "deletedOnDiskCbk:callback: "
2086                    "Failed to delete key with seqno:" +
2087                    std::to_string(v->getBySeqno()) + "' from bucket " +
2088                    std::to_string(hbl.getBucketNum()));
2089        }
2090
2091        /**
2092         * Deleted items are to be added to the bloomfilter,
2093         * in either eviction policy.
2094         */
2095        addToFilter(queuedItem.getKey());
2096    }
2097
2098    if (deleted) {
2099        ++stats.totalPersisted;
2100        ++opsDelete;
2101
2102        /**
2103         * MB-30137: Decrement the total number of on-disk items. This needs to be
2104         * done to ensure that the item count is accurate in the case of full
2105         * eviction
2106         */
2107        if (v) {
2108            decrNumTotalItems();
2109        }
2110    }
2111    doStatsForFlushing(queuedItem, queuedItem.size());
2112    --stats.diskQueueSize;
2113    decrMetaDataDisk(queuedItem);
2114}
2115
2116bool VBucket::deleteKey(const DocKey& key) {
2117    auto hbl = ht.getLockedBucket(key);
2118    StoredValue* v = ht.unlocked_find(
2119            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
2120    if (!v) {
2121        return false;
2122    }
2123    return deleteStoredValue(hbl, *v);
2124}
2125
2126void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
2127                                  uint64_t prevHighSeqno) {
2128    failovers->pruneEntries(rollbackResult.highSeqno);
2129    checkpointManager->clear(*this, rollbackResult.highSeqno);
2130    setPersistedSnapshot(rollbackResult.snapStartSeqno,
2131                         rollbackResult.snapEndSeqno);
2132    incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
2133    checkpointManager->setOpenCheckpointId(1);
2134}
2135
2136void VBucket::dump() const {
2137    std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
2138              << " numItems:" << getNumItems()
2139              << " numNonResident:" << getNumNonResidentItems()
2140              << " ht: " << std::endl << "  " << ht << std::endl
2141              << "]" << std::endl;
2142}
2143
2144void VBucket::setMutationMemoryThreshold(double memThreshold) {
2145    if (memThreshold > 0.0 && memThreshold <= 1.0) {
2146        mutationMemThreshold = memThreshold;
2147    }
2148}
2149
2150bool VBucket::hasMemoryForStoredValue(EPStats& st,
2151                                      const Item& item,
2152                                      bool isReplication) {
2153    double newSize = static_cast<double>(estimateNewMemoryUsage(st, item));
2154    double maxSize = static_cast<double>(st.getMaxDataSize());
2155    if (isReplication) {
2156        return newSize <= (maxSize * st.replicationThrottleThreshold);
2157    } else {
2158        return newSize <= (maxSize * mutationMemThreshold);
2159    }
2160}
2161
2162void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
2163    addStat(NULL, toString(state), add_stat, c);
2164    if (details) {
2165        size_t numItems = getNumItems();
2166        size_t tempItems = getNumTempItems();
2167        addStat("num_items", numItems, add_stat, c);
2168        addStat("num_temp_items", tempItems, add_stat, c);
2169        addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
2170        addStat("ht_memory", ht.memorySize(), add_stat, c);
2171        addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
2172        addStat("ht_item_memory_uncompressed",
2173                ht.getUncompressedItemMemory(),
2174                add_stat,
2175                c);
2176        addStat("ht_cache_size", ht.getCacheSize(), add_stat, c);
2177        addStat("ht_size", ht.getSize(), add_stat, c);
2178        addStat("num_ejects", ht.getNumEjects(), add_stat, c);
2179        addStat("ops_create", opsCreate.load(), add_stat, c);
2180        addStat("ops_update", opsUpdate.load(), add_stat, c);
2181        addStat("ops_delete", opsDelete.load(), add_stat, c);
2182        addStat("ops_reject", opsReject.load(), add_stat, c);
2183        addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
2184        addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
2185        addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
2186        addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
2187        addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
2188        addStat("queue_age", getQueueAge(), add_stat, c);
2189        addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
2190
2191        addStat("high_seqno", getHighSeqno(), add_stat, c);
2192        addStat("uuid", failovers->getLatestUUID(), add_stat, c);
2193        addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
2194        addStat("bloom_filter", getFilterStatusString().data(),
2195                add_stat, c);
2196        addStat("bloom_filter_size", getFilterSize(), add_stat, c);
2197        addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
2198        addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
2199        addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
2200        addStat("might_contain_xattrs", mightContainXattrs(), add_stat, c);
2201        addStat("max_deleted_revid", ht.getMaxDeletedRevSeqno(), add_stat, c);
2202        hlc.addStats(statPrefix, add_stat, c);
2203    }
2204}
2205
2206void VBucket::decrDirtyQueueMem(size_t decrementBy)
2207{
2208    size_t oldVal, newVal;
2209    do {
2210        oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
2211        if (oldVal < decrementBy) {
2212            newVal = 0;
2213        } else {
2214            newVal = oldVal - decrementBy;
2215        }
2216    } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
2217}
2218
2219void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
2220{
2221    uint64_t oldVal, newVal;
2222    do {
2223        oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
2224        if (oldVal < decrementBy) {
2225            newVal = 0;
2226        } else {
2227            newVal = oldVal - decrementBy;
2228        }
2229    } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
2230}
2231
2232void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
2233{
2234    size_t oldVal, newVal;
2235    do {
2236        oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
2237        if (oldVal < decrementBy) {
2238            newVal = 0;
2239        } else {
2240            newVal = oldVal - decrementBy;
2241        }
2242    } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
2243}
2244
2245std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
2246        const HashTable::HashBucketLock& hbl,
2247        StoredValue*& v,
2248        Item& itm,
2249        uint64_t cas,
2250        bool allowExisting,
2251        bool hasMetaData,
2252        const VBQueueItemCtx& queueItmCtx,
2253        cb::StoreIfStatus storeIfStatus,
2254        bool maybeKeyExists,
2255        bool isReplication) {
2256    if (!hbl.getHTLock()) {
2257        throw std::invalid_argument(
2258                "VBucket::processSet: htLock not held for "
2259                "VBucket " +
2260                std::to_string(getId()));
2261    }
2262
2263    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2264        return {MutationStatus::NoMem, {}};
2265    }
2266
2267    if (v == nullptr && itm.isDeleted() && cas &&
2268        !areDeletedItemsAlwaysResident()) {
2269        // Request to perform a CAS operation on a deleted body which may
2270        // not be resident. Need a bg_fetch to be able to perform this request.
2271        return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
2272    }
2273
2274    // bgFetch only in FE, only if the bloom-filter thinks the key may exist.
2275    // But only for cas operations or if a store_if is requiring the item_info.
2276    if (eviction == FULL_EVICTION && maybeKeyExists &&
2277        (cas || storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
2278        if (!v || v->isTempInitialItem()) {
2279            return {MutationStatus::NeedBgFetch, {}};
2280        }
2281    }
2282
2283    /*
2284     * prior to checking for the lock, we should check if this object
2285     * has expired. If so, then check if CAS value has been provided
2286     * for this set op. In this case the operation should be denied since
2287     * a cas operation for a key that doesn't exist is not a very cool
2288     * thing to do. See MB 3252
2289     */
2290    if (v && v->isExpired(ep_real_time()) && !hasMetaData && !itm.isDeleted()) {
2291        if (v->isLocked(ep_current_time())) {
2292            v->unlock();
2293        }
2294        if (cas) {
2295            /* item has expired and cas value provided. Deny ! */
2296            return {MutationStatus::NotFound, {}};
2297        }
2298    }
2299
2300    if (v) {
2301        if (!allowExisting && !v->isTempItem() && !v->isDeleted()) {
2302            return {MutationStatus::InvalidCas, {}};
2303        }
2304        if (v->isLocked(ep_current_time())) {
2305            /*
2306             * item is locked, deny if there is cas value mismatch
2307             * or no cas value is provided by the user
2308             */
2309            if (cas != v->getCas()) {
2310                return {MutationStatus::IsLocked, {}};
2311            }
2312            /* allow operation*/
2313            v->unlock();
2314        } else if (cas && cas != v->getCas()) {
2315            if (v->isTempNonExistentItem()) {
2316                // This is a temporary item which marks a key as non-existent;
2317                // therefore specifying a non-matching CAS should be exposed
2318                // as item not existing.
2319                return {MutationStatus::NotFound, {}};
2320            }
2321            if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
2322                // Existing item is deleted, and we are not replacing it with
2323                // a (different) deleted value - return not existing.
2324                return {MutationStatus::NotFound, {}};
2325            }
2326            // None of the above special cases; the existing item cannot be
2327            // modified with the specified CAS.
2328            return {MutationStatus::InvalidCas, {}};
2329        }
2330        if (!hasMetaData) {
2331            itm.setRevSeqno(v->getRevSeqno() + 1);
2332            /* MB-23530: We must ensure that a replace operation (i.e.
2333             * set with a CAS) /fails/ if the old document is deleted; it
2334             * logically "doesn't exist". However, if the new value is deleted
2335             * this op is a /delete/ with a CAS and we must permit a
2336             * deleted -> deleted transition for Deleted Bodies.
2337             */
2338            if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
2339                !itm.isDeleted()) {
2340                return {MutationStatus::NotFound, {}};
2341            }
2342        }
2343
2344        MutationStatus status;
2345        VBNotifyCtx notifyCtx;
2346        std::tie(v, status, notifyCtx) =
2347                updateStoredValue(hbl, *v, itm, queueItmCtx);
2348        return {status, notifyCtx};
2349    } else if (cas != 0) {
2350        return {MutationStatus::NotFound, {}};
2351    } else {
2352        VBNotifyCtx notifyCtx;
2353        auto genRevSeqno = hasMetaData ? GenerateRevSeqno::No :
2354                           GenerateRevSeqno::Yes;
2355        std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx,
2356                                                   genRevSeqno);
2357        itm.setRevSeqno(v->getRevSeqno());
2358        return {MutationStatus::WasClean, notifyCtx};
2359    }
2360}
2361
2362std::pair<AddStatus, boost::optional<VBNotifyCtx>> VBucket::processAdd(
2363        const HashTable::HashBucketLock& hbl,
2364        StoredValue*& v,
2365        Item& itm,
2366        bool maybeKeyExists,
2367        bool isReplication,
2368        const VBQueueItemCtx& queueItmCtx,
2369        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2370    if (!hbl.getHTLock()) {
2371        throw std::invalid_argument(
2372                "VBucket::processAdd: htLock not held for "
2373                "VBucket " +
2374                std::to_string(getId()));
2375    }
2376    if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2377        !v->isTempItem() && !readHandle.isLogicallyDeleted(v->getBySeqno())) {
2378        return {AddStatus::Exists, {}};
2379    }
2380    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2381        return {AddStatus::NoMem, {}};
2382    }
2383
2384    std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, {}};
2385
2386    if (v) {
2387        if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2388            maybeKeyExists) {
2389            // Need to figure out if an item exists on disk
2390            return {AddStatus::BgFetch, {}};
2391        }
2392
2393        rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2394                           ? AddStatus::UnDel
2395                           : AddStatus::Success;
2396
2397        if (v->isTempDeletedItem()) {
2398            itm.setRevSeqno(v->getRevSeqno() + 1);
2399        } else {
2400            itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2401        }
2402
2403        if (!v->isTempItem()) {
2404            itm.setRevSeqno(v->getRevSeqno() + 1);
2405        }
2406
2407        std::tie(v, std::ignore, rv.second) =
2408                updateStoredValue(hbl, *v, itm, queueItmCtx);
2409    } else {
2410        if (itm.getBySeqno() != StoredValue::state_temp_init) {
2411            if (eviction == FULL_EVICTION && maybeKeyExists) {
2412                return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2413            }
2414        }
2415
2416        if (itm.getBySeqno() == StoredValue::state_temp_init) {
2417            /* A 'temp initial item' is just added to the hash table. It is
2418             not put on checkpoint manager or sequence list */
2419            v = ht.unlocked_addNewStoredValue(hbl, itm);
2420            updateRevSeqNoOfNewStoredValue(*v);
2421        } else {
2422            std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx,
2423                                                       GenerateRevSeqno::Yes);
2424        }
2425
2426        itm.setRevSeqno(v->getRevSeqno());
2427
2428        if (v->isTempItem()) {
2429            rv.first = AddStatus::BgFetch;
2430        }
2431    }
2432
2433    if (v->isTempItem()) {
2434        v->setNRUValue(MAX_NRU_VALUE);
2435    }
2436    return rv;
2437}
2438
2439std::tuple<MutationStatus, StoredValue*, boost::optional<VBNotifyCtx>>
2440VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2441                           StoredValue& v,
2442                           uint64_t cas,
2443                           const ItemMetaData& metadata,
2444                           const VBQueueItemCtx& queueItmCtx,
2445                           bool use_meta,
2446                           uint64_t bySeqno) {
2447    boost::optional<VBNotifyCtx> empty;
2448    if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2449        return std::make_tuple(MutationStatus::NeedBgFetch, &v, empty);
2450    }
2451
2452    if (v.isLocked(ep_current_time())) {
2453        if (cas != v.getCas()) {
2454            return std::make_tuple(MutationStatus::IsLocked, &v, empty);
2455        }
2456        v.unlock();
2457    }
2458
2459    if (cas != 0 && cas != v.getCas()) {
2460        return std::make_tuple(MutationStatus::InvalidCas, &v, empty);
2461    }
2462
2463    /* allow operation */
2464    v.unlock();
2465
2466    MutationStatus rv =
2467            v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2468
2469    if (use_meta) {
2470        v.setCas(metadata.cas);
2471        v.setFlags(metadata.flags);
2472        v.setExptime(metadata.exptime);
2473    }
2474
2475    v.setRevSeqno(metadata.revSeqno);
2476    VBNotifyCtx notifyCtx;
2477    StoredValue* newSv;
2478    std::tie(newSv, notifyCtx) =
2479            softDeleteStoredValue(hbl,
2480                                  v,
2481                                  /*onlyMarkDeleted*/ false,
2482                                  queueItmCtx,
2483                                  bySeqno);
2484    ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2485    return std::make_tuple(rv, newSv, notifyCtx);
2486}
2487
2488std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2489VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2490                            StoredValue& v) {
2491    if (!hbl.getHTLock()) {
2492        throw std::invalid_argument(
2493                "VBucket::processExpiredItem: htLock not held for VBucket " +
2494                std::to_string(getId()));
2495    }
2496
2497    if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2498        return std::make_tuple(MutationStatus::NeedBgFetch,
2499                               &v,
2500                               queueDirty(v,
2501
2502                                          GenerateBySeqno::Yes,
2503                                          GenerateCas::Yes,
2504                                          GenerateDeleteTime::Yes,
2505                                          /*isBackfillItem*/ false));
2506    }
2507
2508    /* If the datatype is XATTR, mark the item as deleted
2509     * but don't delete the value as system xattrs can
2510     * still be queried by mobile clients even after
2511     * deletion.
2512     * TODO: The current implementation is inefficient
2513     * but functionally correct and for performance reasons
2514     * only the system xattrs need to be stored.
2515     */
2516    value_t value = v.getValue();
2517    bool onlyMarkDeleted = value && mcbp::datatype::is_xattr(v.getDatatype());
2518    v.setRevSeqno(v.getRevSeqno() + 1);
2519    VBNotifyCtx notifyCtx;
2520    StoredValue* newSv;
2521    std::tie(newSv, notifyCtx) =
2522            softDeleteStoredValue(hbl,
2523                                  v,
2524                                  onlyMarkDeleted,
2525                                  VBQueueItemCtx(GenerateBySeqno::Yes,
2526                                                 GenerateCas::Yes,
2527                                                 GenerateDeleteTime::Yes,
2528                                                 TrackCasDrift::No,
2529                                                 /*isBackfillItem*/ false,
2530                                                 nullptr /* no pre link */),
2531                                  v.getBySeqno());
2532    ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2533    return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2534}
2535
2536bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2537                                StoredValue& v) {
2538    if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2539        return false;
2540    }
2541
2542    /* StoredValue deleted here. If any other in-memory data structures are
2543       using the StoredValue intrusively then they must have handled the delete
2544       by this point */
2545    ht.unlocked_del(hbl, v.getKey());
2546    return true;
2547}
2548
2549TempAddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2550                                          const DocKey& key,
2551                                          bool isReplication) {
2552    if (!hbl.getHTLock()) {
2553        throw std::invalid_argument(
2554                "VBucket::addTempStoredValue: htLock not held for "
2555                "VBucket " +
2556                std::to_string(getId()));
2557    }
2558
2559    Item itm(key,
2560             /*flags*/ 0,
2561             /*exp*/ 0,
2562             /*data*/ NULL,
2563             /*size*/ 0,
2564             PROTOCOL_BINARY_RAW_BYTES,
2565             0,
2566             StoredValue::state_temp_init);
2567
2568    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2569        return TempAddStatus::NoMem;
2570    }
2571
2572    /* A 'temp initial item' is just added to the hash table. It is
2573       not put on checkpoint manager or sequence list */
2574    StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
2575
2576    updateRevSeqNoOfNewStoredValue(*v);
2577    itm.setRevSeqno(v->getRevSeqno());
2578    v->setNRUValue(MAX_NRU_VALUE);
2579
2580    return TempAddStatus::BgFetch;
2581}
2582
2583void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2584    if (newSeqnoCb) {
2585        newSeqnoCb->callback(getId(), notifyCtx);
2586    }
2587}
2588
2589/*
2590 * Queue the item to the checkpoint and return the seqno the item was
2591 * allocated.
2592 */
2593int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2594    item->setVBucketId(id);
2595    queued_item qi(item);
2596    if (isBackfillPhase()) {
2597        queueBackfillItem(qi, getGenerateBySeqno(seqno));
2598    } else {
2599        checkpointManager->queueDirty(
2600                *this,
2601                qi,
2602                getGenerateBySeqno(seqno),
2603                GenerateCas::Yes,
2604                nullptr /* No pre link step as this is for system events */);
2605    }
2606    VBNotifyCtx notifyCtx;
2607    // If the seqno is initialized, skip replication notification
2608    notifyCtx.notifyReplication = !seqno.is_initialized();
2609    notifyCtx.notifyFlusher = true;
2610    notifyCtx.bySeqno = qi->getBySeqno();
2611    notifyNewSeqno(notifyCtx);
2612    return qi->getBySeqno();
2613}
2614
2615VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2616                                const VBQueueItemCtx& queueItmCtx) {
2617    if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2618        setMaxCasAndTrackDrift(v.getCas());
2619    }
2620    return queueDirty(v,
2621                      queueItmCtx.genBySeqno,
2622                      queueItmCtx.genCas,
2623                      queueItmCtx.generateDeleteTime,
2624                      queueItmCtx.isBackfillItem,
2625                      queueItmCtx.preLinkDocumentContext);
2626}
2627
2628void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2629    /**
2630     * Possibly, this item is being recreated. Conservatively assign it
2631     * a seqno that is greater than the greatest seqno of all deleted
2632     * items seen so far.
2633     */
2634    uint64_t seqno = ht.getMaxDeletedRevSeqno();
2635    if (!v.isTempItem()) {
2636        ++seqno;
2637    }
2638    v.setRevSeqno(seqno);
2639}
2640
2641void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2642                                     const void* cookie,
2643                                     HighPriorityVBNotify reqType) {
2644    std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2645    hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2646    numHpVBReqs.store(hpVBReqs.size());
2647
2648    LOG(EXTENSION_LOG_NOTICE,
2649        "Added high priority async request %s "
2650        "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2651        "Persisted upto:%" PRIu64 ", cookie:%p",
2652        to_string(reqType).c_str(),
2653        getId(),
2654        seqnoOrChkId,
2655        getPersistenceSeqno(),
2656        cookie);
2657}
2658
2659std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2660        EventuallyPersistentEngine& engine,
2661        uint64_t idNum,
2662        HighPriorityVBNotify notifyType) {
2663    std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2664    std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2665
2666    auto entry = hpVBReqs.begin();
2667
2668    while (entry != hpVBReqs.end()) {
2669        if (notifyType != entry->reqType) {
2670            ++entry;
2671            continue;
2672        }
2673
2674        std::string logStr(to_string(notifyType));
2675
2676        auto wall_time = ProcessClock::now() - entry->start;
2677        auto spent =
2678                std::chrono::duration_cast<std::chrono::seconds>(wall_time);
2679        if (entry->id <= idNum) {
2680            toNotify[entry->cookie] = ENGINE_SUCCESS;
2681            stats.chkPersistenceHisto.add(
2682                    std::chrono::duration_cast<std::chrono::microseconds>(
2683                            wall_time));
2684            adjustCheckpointFlushTimeout(spent);
2685            LOG(EXTENSION_LOG_NOTICE,
2686                "Notified the completion of %s "
2687                "for vbucket %" PRIu16 ", Check for: %" PRIu64
2688                ", "
2689                "Persisted upto: %" PRIu64 ", cookie %p",
2690                logStr.c_str(),
2691                getId(),
2692                entry->id,
2693                idNum,
2694                entry->cookie);
2695            entry = hpVBReqs.erase(entry);
2696        } else if (spent > getCheckpointFlushTimeout()) {
2697            adjustCheckpointFlushTimeout(spent);
2698            engine.storeEngineSpecific(entry->cookie, NULL);
2699            toNotify[entry->cookie] = ENGINE_TMPFAIL;
2700            LOG(EXTENSION_LOG_WARNING,
2701                "Notified the timeout on %s "
2702                "for vbucket %" PRIu16 ", Check for: %" PRIu64
2703                ", "
2704                "Persisted upto: %" PRIu64 ", cookie %p",
2705                logStr.c_str(),
2706                getId(),
2707                entry->id,
2708                idNum,
2709                entry->cookie);
2710            entry = hpVBReqs.erase(entry);
2711        } else {
2712            ++entry;
2713        }
2714    }
2715    numHpVBReqs.store(hpVBReqs.size());
2716    return toNotify;
2717}
2718
2719std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2720        EventuallyPersistentEngine& engine) {
2721    std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2722
2723    LockHolder lh(hpVBReqsMutex);
2724
2725    for (auto& entry : hpVBReqs) {
2726        toNotify[entry.cookie] = ENGINE_TMPFAIL;
2727        engine.storeEngineSpecific(entry.cookie, NULL);
2728    }
2729    hpVBReqs.clear();
2730
2731    return toNotify;
2732}
2733
2734void VBucket::adjustCheckpointFlushTimeout(std::chrono::seconds wall_time) {
2735    auto middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2736
2737    if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2738        chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2739    } else if (wall_time <= middle) {
2740        chkFlushTimeout = middle;
2741    } else {
2742        chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2743    }
2744}
2745
2746std::chrono::seconds VBucket::getCheckpointFlushTimeout() {
2747    return std::chrono::duration_cast<std::chrono::seconds>(
2748            chkFlushTimeout.load());
2749}
2750
2751std::unique_ptr<Item> VBucket::pruneXattrDocument(
2752        StoredValue& v, const ItemMetaData& itemMeta) {
2753    // Need to take a copy of the value, prune it, and add it back
2754
2755    // Create work-space document
2756    std::vector<char> workspace(
2757            v.getValue()->getData(),
2758            v.getValue()->getData() + v.getValue()->valueSize());
2759
2760    // Now attach to the XATTRs in the document
2761    cb::xattr::Blob xattr({workspace.data(), workspace.size()},
2762                          mcbp::datatype::is_snappy(v.getDatatype()));
2763    xattr.prune_user_keys();
2764
2765    auto prunedXattrs = xattr.finalize();
2766
2767    if (prunedXattrs.size()) {
2768        // Something remains - Create a Blob and copy-in just the XATTRs
2769        auto newValue =
2770                Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2771                          prunedXattrs.size());
2772        auto rv = v.toItem(false, getId());
2773        rv->setCas(itemMeta.cas);
2774        rv->setFlags(itemMeta.flags);
2775        rv->setExpTime(itemMeta.exptime);
2776        rv->setRevSeqno(itemMeta.revSeqno);
2777        rv->replaceValue(newValue);
2778        rv->setDataType(PROTOCOL_BINARY_DATATYPE_XATTR);
2779        return rv;
2780    } else {
2781        return {};
2782    }
2783}
2784
2785void VBucket::removeKey(const DocKey& key, int64_t bySeqno) {
2786    auto hbl = ht.getLockedBucket(key);
2787    StoredValue* v = fetchValidValue(
2788            hbl, key, WantsDeleted::No, TrackReference::No, QueueExpired::Yes);
2789
2790    if (v && v->getBySeqno() == bySeqno) {
2791        ht.unlocked_del(hbl, v->getKey());
2792    }
2793}
2794
2795bool VBucket::isLogicallyNonExistent(
2796        const StoredValue& v,
2797        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2798    return v.isDeleted() || v.isTempDeletedItem() ||
2799           v.isTempNonExistentItem() ||
2800           readHandle.isLogicallyDeleted(v.getBySeqno());
2801}
2802
2803void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
2804    // If the vbucket is marked as deleting then we must schedule task to
2805    // perform the resource destruction (memory/disk).
2806    if (vb->isDeletionDeferred()) {
2807        vb->scheduleDeferredDeletion(engine);
2808        return;
2809    }
2810    delete vb;
2811}
2812
2813void VBucket::setFreqSaturatedCallback(std::function<void()> callbackFunction) {
2814    ht.setFreqSaturatedCallback(callbackFunction);
2815}
2816