xref: /6.0.3/kv_engine/engines/ep/src/vbucket.cc (revision a30fd9d5)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "atomic.h"
21#include "bgfetcher.h"
22#include "checkpoint.h"
23#include "conflict_resolution.h"
24#include "ep_engine.h"
25#include "ep_time.h"
26#include "ep_types.h"
27#include "failover-table.h"
28#include "flusher.h"
29#include "hash_table.h"
30#include "pre_link_document_context.h"
31#include "statwriter.h"
32#include "stored_value_factories.h"
33#include "vbucket.h"
34#include "vbucketdeletiontask.h"
35
36#include <platform/compress.h>
37#include <xattr/blob.h>
38#include <xattr/utils.h>
39
40#include <functional>
41#include <list>
42#include <set>
43#include <string>
44#include <vector>
45
46/* Macros */
47const auto MIN_CHK_FLUSH_TIMEOUT = std::chrono::seconds(10);
48const auto MAX_CHK_FLUSH_TIMEOUT = std::chrono::seconds(30);
49
50/* Statics definitions */
51cb::AtomicDuration VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
52double VBucket::mutationMemThreshold = 0.9;
53
54VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
55    std::vector<uint16_t> tmp(acceptable.size() + other.size());
56    std::vector<uint16_t>::iterator end;
57    end = std::set_symmetric_difference(acceptable.begin(),
58                                        acceptable.end(),
59                                        other.acceptable.begin(),
60                                        other.acceptable.end(),
61                                        tmp.begin());
62    return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
63}
64
65VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
66                                                                        const {
67    std::vector<uint16_t> tmp(acceptable.size() + other.size());
68    std::vector<uint16_t>::iterator end;
69
70    end = std::set_intersection(acceptable.begin(), acceptable.end(),
71                                other.acceptable.begin(),
72                                other.acceptable.end(),
73                                tmp.begin());
74    return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
75}
76
77static bool isRange(std::set<uint16_t>::const_iterator it,
78                    const std::set<uint16_t>::const_iterator &end,
79                    size_t &length)
80{
81    length = 0;
82    for (uint16_t val = *it;
83         it != end && (val + length) == *it;
84         ++it, ++length) {
85        // empty
86    }
87
88    --length;
89
90    return length > 1;
91}
92
93std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
94{
95    std::set<uint16_t>::const_iterator it;
96
97    if (filter.acceptable.empty()) {
98        out << "{ empty }";
99    } else {
100        bool needcomma = false;
101        out << "{ ";
102        for (it = filter.acceptable.begin();
103             it != filter.acceptable.end();
104             ++it) {
105            if (needcomma) {
106                out << ", ";
107            }
108
109            size_t length;
110            if (isRange(it, filter.acceptable.end(), length)) {
111                std::set<uint16_t>::iterator last = it;
112                for (size_t i = 0; i < length; ++i) {
113                    ++last;
114                }
115                out << "[" << *it << "," << *last << "]";
116                it = last;
117            } else {
118                out << *it;
119            }
120            needcomma = true;
121        }
122        out << " }";
123    }
124
125    return out;
126}
127
128const vbucket_state_t VBucket::ACTIVE =
129                     static_cast<vbucket_state_t>(htonl(vbucket_state_active));
130const vbucket_state_t VBucket::REPLICA =
131                    static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
132const vbucket_state_t VBucket::PENDING =
133                    static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
134const vbucket_state_t VBucket::DEAD =
135                    static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
136
137VBucket::VBucket(id_type i,
138                 vbucket_state_t newState,
139                 EPStats& st,
140                 CheckpointConfig& chkConfig,
141                 int64_t lastSeqno,
142                 uint64_t lastSnapStart,
143                 uint64_t lastSnapEnd,
144                 std::unique_ptr<FailoverTable> table,
145                 std::shared_ptr<Callback<id_type>> flusherCb,
146                 std::unique_ptr<AbstractStoredValueFactory> valFact,
147                 NewSeqnoCallback newSeqnoCb,
148                 Configuration& config,
149                 item_eviction_policy_t evictionPolicy,
150                 vbucket_state_t initState,
151                 uint64_t purgeSeqno,
152                 uint64_t maxCas,
153                 int64_t hlcEpochSeqno,
154                 bool mightContainXattrs,
155                 const std::string& collectionsManifest)
156    : ht(st, std::move(valFact), config.getHtSize(), config.getHtLocks()),
157      checkpointManager(std::make_unique<CheckpointManager>(st,
158                                                            i,
159                                                            chkConfig,
160                                                            lastSeqno,
161                                                            lastSnapStart,
162                                                            lastSnapEnd,
163                                                            flusherCb)),
164      failovers(std::move(table)),
165      opsCreate(0),
166      opsUpdate(0),
167      opsDelete(0),
168      opsReject(0),
169      dirtyQueueSize(0),
170      dirtyQueueMem(0),
171      dirtyQueueFill(0),
172      dirtyQueueDrain(0),
173      dirtyQueueAge(0),
174      dirtyQueuePendingWrites(0),
175      metaDataDisk(0),
176      numExpiredItems(0),
177      eviction(evictionPolicy),
178      stats(st),
179      persistenceSeqno(0),
180      numHpVBReqs(0),
181      id(i),
182      state(newState),
183      initialState(initState),
184      purge_seqno(purgeSeqno),
185      takeover_backed_up(false),
186      persisted_snapshot_start(lastSnapStart),
187      persisted_snapshot_end(lastSnapEnd),
188      rollbackItemCount(0),
189      hlc(maxCas,
190          hlcEpochSeqno,
191          std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
192          std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
193      statPrefix("vb_" + std::to_string(i)),
194      persistenceCheckpointId(0),
195      bucketCreation(false),
196      deferredDeletion(false),
197      deferredDeletionCookie(nullptr),
198      newSeqnoCb(std::move(newSeqnoCb)),
199      manifest(collectionsManifest),
200      mayContainXattrs(mightContainXattrs) {
201    if (config.getConflictResolutionType().compare("lww") == 0) {
202        conflictResolver.reset(new LastWriteWinsResolution());
203    } else {
204        conflictResolver.reset(new RevisionSeqnoResolution());
205    }
206
207    backfill.isBackfillPhase = false;
208    pendingOpsStart = ProcessClock::time_point();
209    stats.coreLocal.get()->memOverhead.fetch_add(
210            sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
211    LOG(EXTENSION_LOG_NOTICE,
212        "VBucket: created vbucket:%" PRIu16
213        " with state:%s "
214        "initialState:%s lastSeqno:%" PRIu64 " lastSnapshot:{%" PRIu64
215        ",%" PRIu64 "} persisted_snapshot:{%" PRIu64 ",%" PRIu64
216        "} max_cas:%" PRIu64 " uuid:%s",
217        id,
218        VBucket::toString(state),
219        VBucket::toString(initialState),
220        lastSeqno,
221        lastSnapStart,
222        lastSnapEnd,
223        persisted_snapshot_start,
224        persisted_snapshot_end,
225        getMaxCas(),
226        failovers ? std::to_string(failovers->getLatestUUID()).c_str() : "<>");
227}
228
229VBucket::~VBucket() {
230    if (!pendingOps.empty()) {
231        LOG(EXTENSION_LOG_WARNING,
232            "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
233            id,
234            pendingOps.size());
235    }
236
237    stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
238    stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
239
240    // Clear out the bloomfilter(s)
241    clearFilter();
242
243    stats.coreLocal.get()->memOverhead.fetch_sub(
244            sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
245
246    LOG(EXTENSION_LOG_NOTICE, "Destroying vbucket %d", id);
247}
248
249int64_t VBucket::getHighSeqno() const {
250    return checkpointManager->getHighSeqno();
251}
252
253size_t VBucket::getChkMgrMemUsage() const {
254    return checkpointManager->getMemoryUsage();
255}
256
257size_t VBucket::getChkMgrMemUsageOfUnrefCheckpoints() const {
258    return checkpointManager->getMemoryUsageOfUnrefCheckpoints();
259}
260
261size_t VBucket::getChkMgrMemUsageOverhead() const {
262    return checkpointManager->getMemoryOverhead();
263}
264
265void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
266                         ENGINE_ERROR_CODE code) {
267    std::unique_lock<std::mutex> lh(pendingOpLock);
268
269    if (pendingOpsStart > ProcessClock::time_point()) {
270        auto now = ProcessClock::now();
271        if (now > pendingOpsStart) {
272            auto d = std::chrono::duration_cast<std::chrono::microseconds>(
273                    now - pendingOpsStart);
274            stats.pendingOpsHisto.add(d);
275            atomic_setIfBigger(stats.pendingOpsMaxDuration,
276                               std::make_unsigned<hrtime_t>::type(d.count()));
277        }
278    } else {
279        return;
280    }
281
282    pendingOpsStart = ProcessClock::time_point();
283    stats.pendingOps.fetch_sub(pendingOps.size());
284    atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
285
286    while (!pendingOps.empty()) {
287        const void *pendingOperation = pendingOps.back();
288        pendingOps.pop_back();
289        // We don't want to hold the pendingOpLock when
290        // calling notifyIOComplete.
291        lh.unlock();
292        engine.notifyIOComplete(pendingOperation, code);
293        lh.lock();
294    }
295
296    LOG(EXTENSION_LOG_INFO,
297        "Fired pendings ops for vbucket %" PRIu16 " in state %s",
298        id,
299        VBucket::toString(state));
300}
301
302void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
303
304    if (state == vbucket_state_active) {
305        fireAllOps(engine, ENGINE_SUCCESS);
306    } else if (state == vbucket_state_pending) {
307        // Nothing
308    } else {
309        fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
310    }
311}
312
313VBucket::ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
314    // Fetch up to approxLimit items from rejectQueue, backfill items and
315    // checkpointManager (in that order); then check if we obtained everything
316    // which is available.
317    ItemsToFlush result;
318
319    // First add any items from the rejectQueue.
320    while (result.items.size() < approxLimit && !rejectQueue.empty()) {
321        result.items.push_back(rejectQueue.front());
322        rejectQueue.pop();
323    }
324
325    // Append any 'backfill' items (mutations added by a DCP stream).
326    bool backfillEmpty;
327    {
328        LockHolder lh(backfill.mutex);
329        size_t num_items = 0;
330        while (result.items.size() < approxLimit && !backfill.items.empty()) {
331            result.items.push_back(backfill.items.front());
332            backfill.items.pop();
333            num_items++;
334        }
335        backfillEmpty = backfill.items.empty();
336        stats.vbBackfillQueueSize.fetch_sub(num_items);
337        stats.coreLocal.get()->memOverhead.fetch_sub(num_items *
338                                                     sizeof(queued_item));
339    }
340
341    // Append up to approxLimit checkpoint items outstanding for the persistence
342    // cursor, if we haven't yet hit the limit.
343    // Note that it is only valid to queue a complete checkpoint - this is where
344    // the "approx" in the limit comes from.
345    const auto ckptMgrLimit = approxLimit - result.items.size();
346    bool ckptItemsAvailable = true;
347    if (ckptMgrLimit > 0) {
348        auto _begin_ = ProcessClock::now();
349        auto ckptItems = checkpointManager->getItemsForCursor(
350                CheckpointManager::pCursorName, result.items, ckptMgrLimit);
351        result.range = ckptItems.range;
352        ckptItemsAvailable = ckptItems.moreAvailable;
353        stats.persistenceCursorGetItemsHisto.add(
354                std::chrono::duration_cast<std::chrono::microseconds>(
355                        ProcessClock::now() - _begin_));
356    } else {
357        // We haven't got sufficient remaining capacity to read items from
358        // CheckpoitnManager, therefore we must assume that there /could/
359        // more data to follow (leaving ckptItemsAvailable true). We also must
360        // ensure the valid snapshot range is returned
361        result.range = checkpointManager->getSnapshotInfo().range;
362    }
363
364    // Check if there's any more items remaining.
365    result.moreAvailable =
366            !rejectQueue.empty() || !backfillEmpty || ckptItemsAvailable;
367
368    return result;
369}
370
371void VBucket::setState(vbucket_state_t to) {
372    WriterLockHolder wlh(stateLock);
373    setState_UNLOCKED(to, wlh);
374}
375
376void VBucket::setState_UNLOCKED(vbucket_state_t to,
377                                WriterLockHolder &vbStateLock) {
378    vbucket_state_t oldstate = state;
379
380    if (to == vbucket_state_active &&
381        checkpointManager->getOpenCheckpointId() < 2) {
382        checkpointManager->setOpenCheckpointId(2);
383    }
384
385    LOG(EXTENSION_LOG_NOTICE,
386        "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
387        id,
388        VBucket::toString(oldstate),
389        VBucket::toString(to));
390
391    state = to;
392}
393
394vbucket_state VBucket::getVBucketState() const {
395     auto persisted_range = getPersistedSnapshot();
396
397     return vbucket_state{getState(),
398                          getPersistenceCheckpointId(),
399                          0,
400                          getHighSeqno(),
401                          getPurgeSeqno(),
402                          persisted_range.start,
403                          persisted_range.end,
404                          getMaxCas(),
405                          hlc.getEpochSeqno(),
406                          mightContainXattrs(),
407                          failovers->toJSON()};
408}
409
410
411
412void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
413{
414    ++dirtyQueueSize;
415    dirtyQueueMem.fetch_add(sizeof(Item));
416    ++dirtyQueueFill;
417    dirtyQueueAge.fetch_add(qi.getQueuedTime());
418    dirtyQueuePendingWrites.fetch_add(itemBytes);
419}
420
421void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
422    --dirtyQueueSize;
423    decrDirtyQueueMem(sizeof(Item));
424    ++dirtyQueueDrain;
425    decrDirtyQueueAge(qi.getQueuedTime());
426    decrDirtyQueuePendingWrites(itemBytes);
427}
428
429void VBucket::incrMetaDataDisk(const Item& qi) {
430    metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
431}
432
433void VBucket::decrMetaDataDisk(const Item& qi) {
434    // assume couchstore remove approx this much data from disk
435    metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
436}
437
438void VBucket::resetStats() {
439    opsCreate.store(0);
440    opsUpdate.store(0);
441    opsDelete.store(0);
442    opsReject.store(0);
443
444    stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
445    dirtyQueueMem.store(0);
446    dirtyQueueFill.store(0);
447    dirtyQueueAge.store(0);
448    dirtyQueuePendingWrites.store(0);
449    dirtyQueueDrain.store(0);
450
451    hlc.resetStats();
452}
453
454uint64_t VBucket::getQueueAge() {
455    uint64_t currDirtyQueueAge = dirtyQueueAge.load(std::memory_order_relaxed);
456    rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
457    if (currentAge < currDirtyQueueAge) {
458        return 0;
459    }
460    return (currentAge - currDirtyQueueAge) * 1000;
461}
462
463template <typename T>
464void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
465                      const void *c) {
466    std::string stat = statPrefix;
467    if (nm != NULL) {
468        add_prefixed_stat(statPrefix, nm, val, add_stat, c);
469    } else {
470        add_casted_stat(statPrefix.data(), val, add_stat, c);
471    }
472}
473
474void VBucket::handlePreExpiry(const std::unique_lock<std::mutex>& hbl,
475                              StoredValue& v) {
476    value_t value = v.getValue();
477    if (value) {
478        std::unique_ptr<Item> itm(v.toItem(false, id));
479        item_info itm_info;
480        EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
481        itm_info =
482                itm->toItemInfo(failovers->getLatestUUID(), getHLCEpochSeqno());
483
484        SERVER_HANDLE_V1* sapi = engine->getServerApi();
485        /* TODO: In order to minimize allocations, the callback needs to
486         * allocate an item whose value size will be exactly the size of the
487         * value after pre-expiry is performed.
488         */
489        auto result = sapi->document->pre_expiry(itm_info);
490        // The API states only uncompressed xattr values are returned
491        auto datatype = PROTOCOL_BINARY_DATATYPE_XATTR;
492        if (!result.empty()) {
493            Item new_item(v.getKey(),
494                          v.getFlags(),
495                          v.getExptime(),
496                          result.data(),
497                          result.size(),
498                          datatype,
499                          v.getCas(),
500                          v.getBySeqno(),
501                          id,
502                          v.getRevSeqno());
503
504            new_item.setNRUValue(v.getNRUValue());
505            new_item.setFreqCounterValue(v.getFreqCounterValue());
506            new_item.setDeleted();
507            ht.unlocked_updateStoredValue(hbl, v, new_item);
508        }
509    }
510}
511
512bool VBucket::addPendingOp(const void* cookie) {
513    LockHolder lh(pendingOpLock);
514    if (state != vbucket_state_pending) {
515        // State transitioned while we were waiting.
516        return false;
517    }
518    // Start a timer when enqueuing the first client.
519    if (pendingOps.empty()) {
520        pendingOpsStart = ProcessClock::now();
521    }
522    pendingOps.push_back(cookie);
523    ++stats.pendingOps;
524    ++stats.pendingOpsTotal;
525    return true;
526}
527
528uint64_t VBucket::getPersistenceCheckpointId() const {
529    return persistenceCheckpointId.load();
530}
531
532void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
533    persistenceCheckpointId.store(checkpointId);
534}
535
536void VBucket::markDirty(const DocKey& key) {
537    auto hbl = ht.getLockedBucket(key);
538    StoredValue* v = ht.unlocked_find(
539            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
540    if (v) {
541        v->markDirty();
542    } else {
543        LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
544            "is missing from vb:%" PRIu16, id);
545    }
546}
547
548bool VBucket::isResidentRatioUnderThreshold(float threshold) {
549    if (eviction != FULL_EVICTION) {
550        throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
551                "policy (which is " + std::to_string(eviction) +
552                ") must be FULL_EVICTION");
553    }
554    size_t num_items = getNumItems();
555    size_t num_non_resident_items = getNumNonResidentItems();
556    float ratio =
557            num_items
558                    ? ((float)(num_items - num_non_resident_items) / num_items)
559                    : 0.0;
560    if (threshold >= ratio) {
561        return true;
562    } else {
563        return false;
564    }
565}
566
567void VBucket::createFilter(size_t key_count, double probability) {
568    // Create the actual bloom filter upon vbucket creation during
569    // scenarios:
570    //      - Bucket creation
571    //      - Rebalance
572    LockHolder lh(bfMutex);
573    if (bFilter == nullptr && tempFilter == nullptr) {
574        bFilter = std::make_unique<BloomFilter>(key_count, probability,
575                                        BFILTER_ENABLED);
576    } else {
577        LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
578            " already exist!", id);
579    }
580}
581
582void VBucket::initTempFilter(size_t key_count, double probability) {
583    // Create a temp bloom filter with status as COMPACTING,
584    // if the main filter is found to exist, set its state to
585    // COMPACTING as well.
586    LockHolder lh(bfMutex);
587    tempFilter = std::make_unique<BloomFilter>(key_count, probability,
588                                     BFILTER_COMPACTING);
589    if (bFilter) {
590        bFilter->setStatus(BFILTER_COMPACTING);
591    }
592}
593
594void VBucket::addToFilter(const DocKey& key) {
595    LockHolder lh(bfMutex);
596    if (bFilter) {
597        bFilter->addKey(key);
598    }
599
600    // If the temp bloom filter is not found to be NULL,
601    // it means that compaction is running on the particular
602    // vbucket. Therefore add the key to the temp filter as
603    // well, as once compaction completes the temp filter
604    // will replace the main bloom filter.
605    if (tempFilter) {
606        tempFilter->addKey(key);
607    }
608}
609
610bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
611    LockHolder lh(bfMutex);
612    if (bFilter) {
613        return bFilter->maybeKeyExists(key);
614    } else {
615        // If filter doesn't exist, allow the BgFetch to go through.
616        return true;
617    }
618}
619
620bool VBucket::isTempFilterAvailable() {
621    LockHolder lh(bfMutex);
622    if (tempFilter &&
623        (tempFilter->getStatus() == BFILTER_COMPACTING ||
624         tempFilter->getStatus() == BFILTER_ENABLED)) {
625        return true;
626    } else {
627        return false;
628    }
629}
630
631void VBucket::addToTempFilter(const DocKey& key) {
632    // Keys will be added to only the temp filter during
633    // compaction.
634    LockHolder lh(bfMutex);
635    if (tempFilter) {
636        tempFilter->addKey(key);
637    }
638}
639
640void VBucket::swapFilter() {
641    // Delete the main bloom filter and replace it with
642    // the temp filter that was populated during compaction,
643    // only if the temp filter's state is found to be either at
644    // COMPACTING or ENABLED (if in the case the user enables
645    // bloomfilters for some reason while compaction was running).
646    // Otherwise, it indicates that the filter's state was
647    // possibly disabled during compaction, therefore clear out
648    // the temp filter. If it gets enabled at some point, a new
649    // bloom filter will be made available after the next
650    // compaction.
651
652    LockHolder lh(bfMutex);
653    if (tempFilter) {
654        bFilter.reset();
655
656        if (tempFilter->getStatus() == BFILTER_COMPACTING ||
657             tempFilter->getStatus() == BFILTER_ENABLED) {
658            bFilter = std::move(tempFilter);
659            bFilter->setStatus(BFILTER_ENABLED);
660        }
661        tempFilter.reset();
662    }
663}
664
665void VBucket::clearFilter() {
666    LockHolder lh(bfMutex);
667    bFilter.reset();
668    tempFilter.reset();
669}
670
671void VBucket::setFilterStatus(bfilter_status_t to) {
672    LockHolder lh(bfMutex);
673    if (bFilter) {
674        bFilter->setStatus(to);
675    }
676    if (tempFilter) {
677        tempFilter->setStatus(to);
678    }
679}
680
681std::string VBucket::getFilterStatusString() {
682    LockHolder lh(bfMutex);
683    if (bFilter) {
684        return bFilter->getStatusString();
685    } else if (tempFilter) {
686        return tempFilter->getStatusString();
687    } else {
688        return "DOESN'T EXIST";
689    }
690}
691
692size_t VBucket::getFilterSize() {
693    LockHolder lh(bfMutex);
694    if (bFilter) {
695        return bFilter->getFilterSize();
696    } else {
697        return 0;
698    }
699}
700
701size_t VBucket::getNumOfKeysInFilter() {
702    LockHolder lh(bfMutex);
703    if (bFilter) {
704        return bFilter->getNumOfKeysInFilter();
705    } else {
706        return 0;
707    }
708}
709
710VBNotifyCtx VBucket::queueDirty(
711        StoredValue& v,
712        const GenerateBySeqno generateBySeqno,
713        const GenerateCas generateCas,
714        const GenerateDeleteTime generateDeleteTime,
715        const bool isBackfillItem,
716        PreLinkDocumentContext* preLinkDocumentContext) {
717    VBNotifyCtx notifyCtx;
718
719    queued_item qi(v.toItem(false, getId()));
720
721    // MB-27457: Timestamp deletes only when they don't already have a timestamp
722    // assigned. This is here to ensure all deleted items have a timestamp which
723    // our tombstone purger can use to determine which tombstones to purge. A
724    // DCP replicated or deleteWithMeta created delete may already have a time
725    // assigned to it.
726    if (qi->isDeleted() && (generateDeleteTime == GenerateDeleteTime::Yes ||
727                            qi->getExptime() == 0)) {
728        qi->setExpTime(ep_real_time());
729    }
730
731    if (!mightContainXattrs() && mcbp::datatype::is_xattr(v.getDatatype())) {
732        setMightContainXattrs();
733    }
734
735    if (isBackfillItem) {
736        queueBackfillItem(qi, generateBySeqno);
737        notifyCtx.notifyFlusher = true;
738        /* During backfill on a TAP receiver we need to update the snapshot
739         range in the checkpoint. Has to be done here because in case of TAP
740         backfill, above, we use vb.queueBackfillItem() instead of
741         vb.checkpointManager->queueDirty() */
742        if (generateBySeqno == GenerateBySeqno::Yes) {
743            checkpointManager->resetSnapshotRange();
744        }
745    } else {
746        notifyCtx.notifyFlusher =
747                checkpointManager->queueDirty(*this,
748                                              qi,
749                                              generateBySeqno,
750                                              generateCas,
751                                              preLinkDocumentContext);
752        notifyCtx.notifyReplication = true;
753        if (GenerateCas::Yes == generateCas) {
754            v.setCas(qi->getCas());
755        }
756    }
757
758    v.setBySeqno(qi->getBySeqno());
759    notifyCtx.bySeqno = qi->getBySeqno();
760
761    return notifyCtx;
762}
763
764StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
765                                      const DocKey& key,
766                                      WantsDeleted wantsDeleted,
767                                      TrackReference trackReference,
768                                      QueueExpired queueExpired) {
769    if (!hbl.getHTLock()) {
770        throw std::logic_error(
771                "Hash bucket lock not held in "
772                "VBucket::fetchValidValue() for hash bucket: " +
773                std::to_string(hbl.getBucketNum()) + "for key: " +
774                std::string(reinterpret_cast<const char*>(key.data()),
775                            key.size()));
776    }
777    StoredValue* v = ht.unlocked_find(
778            key, hbl.getBucketNum(), wantsDeleted, trackReference);
779    if (v && !v->isDeleted() && !v->isTempItem()) {
780        // In the deleted case, we ignore expiration time.
781        if (v->isExpired(ep_real_time())) {
782            if (getState() != vbucket_state_active) {
783                return wantsDeleted == WantsDeleted::Yes ? v : NULL;
784            }
785
786            // queueDirty only allowed on active VB
787            if (queueExpired == QueueExpired::Yes &&
788                getState() == vbucket_state_active) {
789                incExpirationStat(ExpireBy::Access);
790                handlePreExpiry(hbl.getHTLock(), *v);
791                VBNotifyCtx notifyCtx;
792                std::tie(std::ignore, v, notifyCtx) =
793                        processExpiredItem(hbl, *v);
794                notifyNewSeqno(notifyCtx);
795            }
796            return wantsDeleted == WantsDeleted::Yes ? v : NULL;
797        }
798    }
799    return v;
800}
801
802void VBucket::incExpirationStat(const ExpireBy source) {
803    switch (source) {
804    case ExpireBy::Pager:
805        ++stats.expired_pager;
806        break;
807    case ExpireBy::Compactor:
808        ++stats.expired_compactor;
809        break;
810    case ExpireBy::Access:
811        ++stats.expired_access;
812        break;
813    }
814    ++numExpiredItems;
815}
816
817MutationStatus VBucket::setFromInternal(Item& itm) {
818    if (!hasMemoryForStoredValue(stats, itm, false)) {
819        return MutationStatus::NoMem;
820    }
821    return ht.set(itm);
822}
823
824cb::StoreIfStatus VBucket::callPredicate(cb::StoreIfPredicate predicate,
825                                         StoredValue* v) {
826    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
827    if (v) {
828        auto info = v->getItemInfo(failovers->getLatestUUID());
829        storeIfStatus = predicate(info, getInfo());
830        // No no, you can't ask for it again
831        if (storeIfStatus == cb::StoreIfStatus::GetItemInfo &&
832            info.is_initialized()) {
833            throw std::logic_error(
834                    "VBucket::callPredicate invalid result of GetItemInfo");
835        }
836    } else {
837        storeIfStatus = predicate({/*no info*/}, getInfo());
838    }
839
840    if (storeIfStatus == cb::StoreIfStatus::GetItemInfo &&
841        eviction == VALUE_ONLY) {
842        // We're VE, if we don't have, we don't have it.
843        storeIfStatus = cb::StoreIfStatus::Continue;
844    }
845
846    return storeIfStatus;
847}
848
849ENGINE_ERROR_CODE VBucket::set(Item& itm,
850                               const void* cookie,
851                               EventuallyPersistentEngine& engine,
852                               const int bgFetchDelay,
853                               cb::StoreIfPredicate predicate) {
854    bool cas_op = (itm.getCas() != 0);
855    auto hbl = ht.getLockedBucket(itm.getKey());
856    StoredValue* v = ht.unlocked_find(itm.getKey(),
857                                      hbl.getBucketNum(),
858                                      WantsDeleted::Yes,
859                                      TrackReference::No);
860
861    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
862    if (predicate &&
863        (storeIfStatus = callPredicate(predicate, v)) ==
864                cb::StoreIfStatus::Fail) {
865        return ENGINE_PREDICATE_FAILED;
866    }
867
868    if (v && v->isLocked(ep_current_time()) &&
869        (getState() == vbucket_state_replica ||
870         getState() == vbucket_state_pending)) {
871        v->unlock();
872    }
873
874    bool maybeKeyExists = true;
875    // If we didn't find a valid item then check the bloom filter, but only
876    // if we're full-eviction with a CAS operation or a have a predicate that
877    // requires the item's info
878    if ((v == nullptr || v->isTempInitialItem()) &&
879        (eviction == FULL_EVICTION) &&
880        ((itm.getCas() != 0) ||
881         storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
882        // Check Bloomfilter's prediction
883        if (!maybeKeyExistsInFilter(itm.getKey())) {
884            maybeKeyExists = false;
885        }
886    }
887
888    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
889    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
890                               GenerateCas::Yes,
891                               TrackCasDrift::No,
892                               /*isBackfillItem*/ false,
893                               &preLinkDocumentContext);
894
895    MutationStatus status;
896    boost::optional<VBNotifyCtx> notifyCtx;
897    std::tie(status, notifyCtx) = processSet(hbl,
898                                             v,
899                                             itm,
900                                             itm.getCas(),
901                                             /*allowExisting*/ true,
902                                             /*hashMetaData*/ false,
903                                             queueItmCtx,
904                                             storeIfStatus,
905                                             maybeKeyExists);
906
907    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
908    switch (status) {
909    case MutationStatus::NoMem:
910        ret = ENGINE_ENOMEM;
911        break;
912    case MutationStatus::InvalidCas:
913        ret = ENGINE_KEY_EEXISTS;
914        break;
915    case MutationStatus::IsLocked:
916        ret = ENGINE_LOCKED;
917        break;
918    case MutationStatus::NotFound:
919        if (cas_op) {
920            ret = ENGINE_KEY_ENOENT;
921            break;
922        }
923    // FALLTHROUGH
924    case MutationStatus::WasDirty:
925    // Even if the item was dirty, push it into the vbucket's open
926    // checkpoint.
927    case MutationStatus::WasClean:
928        notifyNewSeqno(*notifyCtx);
929
930        itm.setBySeqno(v->getBySeqno());
931        itm.setCas(v->getCas());
932        break;
933    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
934        // +
935        // full eviction.
936        if (v) {
937            // temp item is already created. Simply schedule a bg fetch job
938            hbl.getHTLock().unlock();
939            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
940            return ENGINE_EWOULDBLOCK;
941        }
942        ret = addTempItemAndBGFetch(
943                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
944        break;
945    }
946    }
947
948    return ret;
949}
950
951ENGINE_ERROR_CODE VBucket::replace(
952        Item& itm,
953        const void* cookie,
954        EventuallyPersistentEngine& engine,
955        const int bgFetchDelay,
956        cb::StoreIfPredicate predicate,
957        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
958    auto hbl = ht.getLockedBucket(itm.getKey());
959    StoredValue* v = ht.unlocked_find(itm.getKey(),
960                                      hbl.getBucketNum(),
961                                      WantsDeleted::Yes,
962                                      TrackReference::No);
963
964    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
965    if (predicate &&
966        (storeIfStatus = callPredicate(predicate, v)) ==
967                cb::StoreIfStatus::Fail) {
968        return ENGINE_PREDICATE_FAILED;
969    }
970
971    if (v) {
972        if (isLogicallyNonExistent(*v, readHandle)) {
973            ht.cleanupIfTemporaryItem(hbl, *v);
974            return ENGINE_KEY_ENOENT;
975        }
976
977        MutationStatus mtype;
978        boost::optional<VBNotifyCtx> notifyCtx;
979        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
980            mtype = MutationStatus::NeedBgFetch;
981        } else {
982            PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
983            VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
984                                       GenerateCas::Yes,
985                                       TrackCasDrift::No,
986                                       /*isBackfillItem*/ false,
987                                       &preLinkDocumentContext);
988            std::tie(mtype, notifyCtx) = processSet(hbl,
989                                                    v,
990                                                    itm,
991                                                    0,
992                                                    /*allowExisting*/ true,
993                                                    /*hasMetaData*/ false,
994                                                    queueItmCtx,
995                                                    storeIfStatus);
996        }
997
998        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
999        switch (mtype) {
1000        case MutationStatus::NoMem:
1001            ret = ENGINE_ENOMEM;
1002            break;
1003        case MutationStatus::IsLocked:
1004            ret = ENGINE_LOCKED;
1005            break;
1006        case MutationStatus::InvalidCas:
1007        case MutationStatus::NotFound:
1008            ret = ENGINE_NOT_STORED;
1009            break;
1010        // FALLTHROUGH
1011        case MutationStatus::WasDirty:
1012        // Even if the item was dirty, push it into the vbucket's open
1013        // checkpoint.
1014        case MutationStatus::WasClean:
1015            notifyNewSeqno(*notifyCtx);
1016
1017            itm.setBySeqno(v->getBySeqno());
1018            itm.setCas(v->getCas());
1019            break;
1020        case MutationStatus::NeedBgFetch: {
1021            // temp item is already created. Simply schedule a bg fetch job
1022            hbl.getHTLock().unlock();
1023            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1024            ret = ENGINE_EWOULDBLOCK;
1025            break;
1026        }
1027        }
1028
1029        return ret;
1030    } else {
1031        if (eviction == VALUE_ONLY) {
1032            return ENGINE_KEY_ENOENT;
1033        }
1034
1035        if (maybeKeyExistsInFilter(itm.getKey())) {
1036            return addTempItemAndBGFetch(
1037                    hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
1038        } else {
1039            // As bloomfilter predicted that item surely doesn't exist
1040            // on disk, return ENOENT for replace().
1041            return ENGINE_KEY_ENOENT;
1042        }
1043    }
1044}
1045
1046ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
1047                                           const GenerateBySeqno genBySeqno) {
1048    auto hbl = ht.getLockedBucket(itm.getKey());
1049    StoredValue* v = ht.unlocked_find(itm.getKey(),
1050                                      hbl.getBucketNum(),
1051                                      WantsDeleted::Yes,
1052                                      TrackReference::No);
1053
1054    // Note that this function is only called on replica or pending vbuckets.
1055    if (v && v->isLocked(ep_current_time())) {
1056        v->unlock();
1057    }
1058
1059    // MB-33919: DCP backfill - we must respect the value set by the producer
1060    // unless it is zero, which queueDirty will fix-up. Not mutations and
1061    // deletions come through this path.
1062    VBQueueItemCtx queueItmCtx(genBySeqno,
1063                               GenerateCas::No,
1064                               GenerateDeleteTime::No,
1065                               TrackCasDrift::No,
1066                               /*isBackfillItem*/ true,
1067                               nullptr /* No pre link should happen */);
1068    MutationStatus status;
1069    boost::optional<VBNotifyCtx> notifyCtx;
1070    std::tie(status, notifyCtx) = processSet(hbl,
1071                                             v,
1072                                             itm,
1073                                             0,
1074                                             /*allowExisting*/ true,
1075                                             /*hasMetaData*/ true,
1076                                             queueItmCtx,
1077                                             {/*no predicate*/});
1078
1079    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1080    switch (status) {
1081    case MutationStatus::NoMem:
1082        ret = ENGINE_ENOMEM;
1083        break;
1084    case MutationStatus::InvalidCas:
1085    case MutationStatus::IsLocked:
1086        ret = ENGINE_KEY_EEXISTS;
1087        break;
1088    case MutationStatus::WasDirty:
1089    // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
1090    // set correctly, and also the sequence numbers are ordered correctly.
1091    // (MB-14003)
1092    case MutationStatus::NotFound:
1093    // FALLTHROUGH
1094    case MutationStatus::WasClean: {
1095        setMaxCas(v->getCas());
1096        // we unlock ht lock here because we want to avoid potential lock
1097        // inversions arising from notifyNewSeqno() call
1098        hbl.getHTLock().unlock();
1099        notifyNewSeqno(*notifyCtx);
1100    } break;
1101    case MutationStatus::NeedBgFetch:
1102        throw std::logic_error(
1103                "VBucket::addBackfillItem: "
1104                "SET on a non-active vbucket should not require a "
1105                "bg_metadata_fetch.");
1106    }
1107
1108    return ret;
1109}
1110
1111ENGINE_ERROR_CODE VBucket::setWithMeta(
1112        Item& itm,
1113        uint64_t cas,
1114        uint64_t* seqno,
1115        const void* cookie,
1116        EventuallyPersistentEngine& engine,
1117        int bgFetchDelay,
1118        CheckConflicts checkConflicts,
1119        bool allowExisting,
1120        GenerateBySeqno genBySeqno,
1121        GenerateCas genCas,
1122        bool isReplication,
1123        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1124    auto hbl = ht.getLockedBucket(itm.getKey());
1125    StoredValue* v = ht.unlocked_find(itm.getKey(),
1126                                      hbl.getBucketNum(),
1127                                      WantsDeleted::Yes,
1128                                      TrackReference::No);
1129
1130    bool maybeKeyExists = true;
1131
1132    // Effectively ignore logically deleted keys, they cannot stop the op
1133    if (v && readHandle.isLogicallyDeleted(v->getBySeqno())) {
1134        // v is not really here, operate like it's not and skip conflict checks
1135        checkConflicts = CheckConflicts::No;
1136        // And ensure ADD_W_META works like SET_W_META, just overwrite existing
1137        allowExisting = true;
1138    }
1139
1140    if (checkConflicts == CheckConflicts::Yes) {
1141        if (v) {
1142            if (v->isTempInitialItem()) {
1143                bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1144                return ENGINE_EWOULDBLOCK;
1145            }
1146
1147            if (!(conflictResolver->resolve(*v,
1148                                            itm.getMetaData(),
1149                                            itm.getDataType(),
1150                                            itm.isDeleted()))) {
1151                ++stats.numOpsSetMetaResolutionFailed;
1152                // If the existing item happens to be a temporary item,
1153                // delete the item to save memory in the hash table
1154                if (v->isTempItem()) {
1155                    deleteStoredValue(hbl, *v);
1156                }
1157                return ENGINE_KEY_EEXISTS;
1158            }
1159        } else {
1160            if (maybeKeyExistsInFilter(itm.getKey())) {
1161                return addTempItemAndBGFetch(hbl,
1162                                             itm.getKey(),
1163                                             cookie,
1164                                             engine,
1165                                             bgFetchDelay,
1166                                             true,
1167                                             isReplication);
1168            } else {
1169                maybeKeyExists = false;
1170            }
1171        }
1172    } else {
1173        if (eviction == FULL_EVICTION) {
1174            // Check Bloomfilter's prediction
1175            if (!maybeKeyExistsInFilter(itm.getKey())) {
1176                maybeKeyExists = false;
1177            }
1178        }
1179    }
1180
1181    if (v && v->isLocked(ep_current_time()) &&
1182        (getState() == vbucket_state_replica ||
1183         getState() == vbucket_state_pending)) {
1184        v->unlock();
1185    }
1186
1187    // MB-33919: Do not generate the delete-time - delete's can come through
1188    // this path and the delete time from the input should be used (unless it
1189    // is 0, where it must be regenerated)
1190    VBQueueItemCtx queueItmCtx(genBySeqno,
1191                               genCas,
1192                               GenerateDeleteTime::No,
1193                               TrackCasDrift::Yes,
1194                               /*isBackfillItem*/ false,
1195                               nullptr /* No pre link step needed */);
1196    MutationStatus status;
1197    boost::optional<VBNotifyCtx> notifyCtx;
1198    std::tie(status, notifyCtx) = processSet(hbl,
1199                                             v,
1200                                             itm,
1201                                             cas,
1202                                             allowExisting,
1203                                             true,
1204                                             queueItmCtx,
1205                                             {/*no predicate*/},
1206                                             maybeKeyExists,
1207                                             isReplication);
1208
1209    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1210    switch (status) {
1211    case MutationStatus::NoMem:
1212        ret = ENGINE_ENOMEM;
1213        break;
1214    case MutationStatus::InvalidCas:
1215        ret = ENGINE_KEY_EEXISTS;
1216        break;
1217    case MutationStatus::IsLocked:
1218        ret = ENGINE_LOCKED;
1219        break;
1220    case MutationStatus::WasDirty:
1221    case MutationStatus::WasClean: {
1222        if (seqno) {
1223            *seqno = static_cast<uint64_t>(v->getBySeqno());
1224        }
1225        // we unlock ht lock here because we want to avoid potential lock
1226        // inversions arising from notifyNewSeqno() call
1227        hbl.getHTLock().unlock();
1228        notifyNewSeqno(*notifyCtx);
1229    } break;
1230    case MutationStatus::NotFound:
1231        ret = ENGINE_KEY_ENOENT;
1232        break;
1233    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1234        // + full eviction.
1235        if (v) { // temp item is already created. Simply schedule a
1236            hbl.getHTLock().unlock(); // bg fetch job.
1237            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1238            return ENGINE_EWOULDBLOCK;
1239        }
1240        ret = addTempItemAndBGFetch(hbl,
1241                                    itm.getKey(),
1242                                    cookie,
1243                                    engine,
1244                                    bgFetchDelay,
1245                                    true,
1246                                    isReplication);
1247    }
1248    }
1249
1250    return ret;
1251}
1252
1253ENGINE_ERROR_CODE VBucket::deleteItem(
1254        uint64_t& cas,
1255        const void* cookie,
1256        EventuallyPersistentEngine& engine,
1257        const int bgFetchDelay,
1258        ItemMetaData* itemMeta,
1259        mutation_descr_t& mutInfo,
1260        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1261    auto hbl = ht.getLockedBucket(readHandle.getKey());
1262    StoredValue* v = ht.unlocked_find(readHandle.getKey(),
1263                                      hbl.getBucketNum(),
1264                                      WantsDeleted::Yes,
1265                                      TrackReference::No);
1266
1267    if (!v || v->isDeleted() || v->isTempItem() ||
1268        readHandle.isLogicallyDeleted(v->getBySeqno())) {
1269        if (eviction == VALUE_ONLY) {
1270            return ENGINE_KEY_ENOENT;
1271        } else { // Full eviction.
1272            if (!v) { // Item might be evicted from cache.
1273                if (maybeKeyExistsInFilter(readHandle.getKey())) {
1274                    return addTempItemAndBGFetch(hbl,
1275                                                 readHandle.getKey(),
1276                                                 cookie,
1277                                                 engine,
1278                                                 bgFetchDelay,
1279                                                 true);
1280                } else {
1281                    // As bloomfilter predicted that item surely doesn't
1282                    // exist on disk, return ENOENT for deleteItem().
1283                    return ENGINE_KEY_ENOENT;
1284                }
1285            } else if (v->isTempInitialItem()) {
1286                hbl.getHTLock().unlock();
1287                bgFetch(readHandle.getKey(),
1288                        cookie,
1289                        engine,
1290                        bgFetchDelay,
1291                        true);
1292                return ENGINE_EWOULDBLOCK;
1293            } else { // Non-existent or deleted key.
1294                if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1295                    // Delete a temp non-existent item to ensure that
1296                    // if a delete were issued over an item that doesn't
1297                    // exist, then we don't preserve a temp item.
1298                    deleteStoredValue(hbl, *v);
1299                }
1300                return ENGINE_KEY_ENOENT;
1301            }
1302        }
1303    }
1304
1305    if (v->isLocked(ep_current_time()) &&
1306        (getState() == vbucket_state_replica ||
1307         getState() == vbucket_state_pending)) {
1308        v->unlock();
1309    }
1310
1311    if (itemMeta != nullptr) {
1312        itemMeta->cas = v->getCas();
1313    }
1314
1315    MutationStatus delrv;
1316    boost::optional<VBNotifyCtx> notifyCtx;
1317    if (v->isExpired(ep_real_time())) {
1318        std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1319    } else {
1320        ItemMetaData metadata;
1321        metadata.revSeqno = v->getRevSeqno() + 1;
1322        std::tie(delrv, v, notifyCtx) =
1323                processSoftDelete(hbl,
1324                                  *v,
1325                                  cas,
1326                                  metadata,
1327                                  VBQueueItemCtx(GenerateBySeqno::Yes,
1328                                                 GenerateCas::Yes,
1329                                                 GenerateDeleteTime::Yes,
1330                                                 TrackCasDrift::No,
1331                                                 /*isBackfillItem*/ false,
1332                                                 nullptr /* no pre link */),
1333                                  /*use_meta*/ false,
1334                                  /*bySeqno*/ v->getBySeqno());
1335    }
1336
1337    uint64_t seqno = 0;
1338    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1339    switch (delrv) {
1340    case MutationStatus::NoMem:
1341        ret = ENGINE_ENOMEM;
1342        break;
1343    case MutationStatus::InvalidCas:
1344        ret = ENGINE_KEY_EEXISTS;
1345        break;
1346    case MutationStatus::IsLocked:
1347        ret = ENGINE_LOCKED_TMPFAIL;
1348        break;
1349    case MutationStatus::NotFound:
1350        ret = ENGINE_KEY_ENOENT;
1351    /* Fallthrough:
1352     * A NotFound return value at this point indicates that the
1353     * item has expired. But, a deletion still needs to be queued
1354     * for the item in order to persist it.
1355     */
1356    case MutationStatus::WasClean:
1357    case MutationStatus::WasDirty:
1358        if (itemMeta != nullptr) {
1359            itemMeta->revSeqno = v->getRevSeqno();
1360            itemMeta->flags = v->getFlags();
1361            itemMeta->exptime = v->getExptime();
1362        }
1363
1364        notifyNewSeqno(*notifyCtx);
1365        seqno = static_cast<uint64_t>(v->getBySeqno());
1366        cas = v->getCas();
1367
1368        if (delrv != MutationStatus::NotFound) {
1369            mutInfo.seqno = seqno;
1370            mutInfo.vbucket_uuid = failovers->getLatestUUID();
1371            if (itemMeta != nullptr) {
1372                itemMeta->cas = v->getCas();
1373            }
1374        }
1375        break;
1376    case MutationStatus::NeedBgFetch:
1377        // We already figured out if a bg fetch is requred for a full-evicted
1378        // item above.
1379        throw std::logic_error(
1380                "VBucket::deleteItem: "
1381                "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1382    }
1383    return ret;
1384}
1385
1386ENGINE_ERROR_CODE VBucket::deleteWithMeta(
1387        uint64_t& cas,
1388        uint64_t* seqno,
1389        const void* cookie,
1390        EventuallyPersistentEngine& engine,
1391        int bgFetchDelay,
1392        CheckConflicts checkConflicts,
1393        const ItemMetaData& itemMeta,
1394        bool backfill,
1395        GenerateBySeqno genBySeqno,
1396        GenerateCas generateCas,
1397        uint64_t bySeqno,
1398        bool isReplication,
1399        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1400    const auto& key = readHandle.getKey();
1401    auto hbl = ht.getLockedBucket(key);
1402    StoredValue* v = ht.unlocked_find(
1403            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1404
1405    if (v && readHandle.isLogicallyDeleted(v->getBySeqno())) {
1406        return ENGINE_KEY_ENOENT;
1407    }
1408
1409    // Need conflict resolution?
1410    if (checkConflicts == CheckConflicts::Yes) {
1411        if (v) {
1412            if (v->isTempInitialItem()) {
1413                bgFetch(key, cookie, engine, bgFetchDelay, true);
1414                return ENGINE_EWOULDBLOCK;
1415            }
1416
1417            if (!(conflictResolver->resolve(*v,
1418                                            itemMeta,
1419                                            PROTOCOL_BINARY_RAW_BYTES,
1420                                            true))) {
1421                ++stats.numOpsDelMetaResolutionFailed;
1422                return ENGINE_KEY_EEXISTS;
1423            }
1424        } else {
1425            // Item is 1) deleted or not existent in the value eviction case OR
1426            // 2) deleted or evicted in the full eviction.
1427            if (maybeKeyExistsInFilter(key)) {
1428                return addTempItemAndBGFetch(hbl,
1429                                             key,
1430                                             cookie,
1431                                             engine,
1432                                             bgFetchDelay,
1433                                             true,
1434                                             isReplication);
1435            } else {
1436                // Even though bloomfilter predicted that item doesn't exist
1437                // on disk, we must put this delete on disk if the cas is valid.
1438                TempAddStatus rv = addTempStoredValue(hbl, key, isReplication);
1439                if (rv == TempAddStatus::NoMem) {
1440                    return ENGINE_ENOMEM;
1441                }
1442                v = ht.unlocked_find(key,
1443                                     hbl.getBucketNum(),
1444                                     WantsDeleted::Yes,
1445                                     TrackReference::No);
1446                v->setTempDeleted();
1447            }
1448        }
1449    } else {
1450        if (!v) {
1451            // We should always try to persist a delete here.
1452            TempAddStatus rv = addTempStoredValue(hbl, key, isReplication);
1453            if (rv == TempAddStatus::NoMem) {
1454                return ENGINE_ENOMEM;
1455            }
1456            v = ht.unlocked_find(key,
1457                                 hbl.getBucketNum(),
1458                                 WantsDeleted::Yes,
1459                                 TrackReference::No);
1460            v->setTempDeleted();
1461            v->setCas(cas);
1462        } else if (v->isTempInitialItem()) {
1463            v->setTempDeleted();
1464            v->setCas(cas);
1465        }
1466    }
1467
1468    if (v && v->isLocked(ep_current_time()) &&
1469        (getState() == vbucket_state_replica ||
1470         getState() == vbucket_state_pending)) {
1471        v->unlock();
1472    }
1473
1474    MutationStatus delrv;
1475    boost::optional<VBNotifyCtx> notifyCtx;
1476    bool metaBgFetch = true;
1477    if (!v) {
1478        if (eviction == FULL_EVICTION) {
1479            delrv = MutationStatus::NeedBgFetch;
1480        } else {
1481            delrv = MutationStatus::NotFound;
1482        }
1483    } else if (mcbp::datatype::is_xattr(v->getDatatype()) && !v->isResident()) {
1484        // MB-25671: A temp deleted xattr with no value must be fetched before
1485        // the deleteWithMeta can be applied.
1486        delrv = MutationStatus::NeedBgFetch;
1487        metaBgFetch = false;
1488    } else {
1489        // MB-33919: The incoming meta.exptime should be used as the delete-time
1490        // so request GenerateDeleteTime::No, if the incoming value is 0, a new
1491        // delete-time will be generated.
1492        VBQueueItemCtx queueItmCtx(genBySeqno,
1493                                   generateCas,
1494                                   GenerateDeleteTime::No,
1495                                   TrackCasDrift::Yes,
1496                                   backfill,
1497                                   nullptr /* No pre link step needed */);
1498
1499        // system xattrs must remain, however no need to prune xattrs if this is
1500        // a replication call, the active has done this and we must just store
1501        // what we're given
1502        std::unique_ptr<Item> itm;
1503        if (!isReplication && mcbp::datatype::is_xattr(v->getDatatype()) &&
1504            (itm = pruneXattrDocument(*v, itemMeta))) {
1505            // A new item has been generated and must be given a new seqno
1506            queueItmCtx.genBySeqno = GenerateBySeqno::Yes;
1507
1508            // MB-36101: The result should always be a deleted item
1509            itm->setDeleted();
1510            std::tie(v, delrv, notifyCtx) =
1511                    updateStoredValue(hbl, *v, *itm, queueItmCtx);
1512        } else {
1513            std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1514                                                              *v,
1515                                                              cas,
1516                                                              itemMeta,
1517                                                              queueItmCtx,
1518                                                              /*use_meta*/ true,
1519                                                              bySeqno);
1520        }
1521    }
1522    cas = v ? v->getCas() : 0;
1523
1524    switch (delrv) {
1525    case MutationStatus::NoMem:
1526        return ENGINE_ENOMEM;
1527    case MutationStatus::InvalidCas:
1528        return ENGINE_KEY_EEXISTS;
1529    case MutationStatus::IsLocked:
1530        return ENGINE_LOCKED_TMPFAIL;
1531    case MutationStatus::NotFound:
1532        return ENGINE_KEY_ENOENT;
1533    case MutationStatus::WasDirty:
1534    case MutationStatus::WasClean: {
1535        if (seqno) {
1536            *seqno = static_cast<uint64_t>(v->getBySeqno());
1537        }
1538        // we unlock ht lock here because we want to avoid potential lock
1539        // inversions arising from notifyNewSeqno() call
1540        hbl.getHTLock().unlock();
1541        notifyNewSeqno(*notifyCtx);
1542        break;
1543    }
1544    case MutationStatus::NeedBgFetch:
1545        hbl.getHTLock().unlock();
1546        bgFetch(key, cookie, engine, bgFetchDelay, metaBgFetch);
1547        return ENGINE_EWOULDBLOCK;
1548    }
1549    return ENGINE_SUCCESS;
1550}
1551
1552void VBucket::deleteExpiredItem(const Item& it,
1553                                time_t startTime,
1554                                ExpireBy source) {
1555
1556    // The item is correctly trimmed (by the caller). Fetch the one in the
1557    // hashtable and replace it if the CAS match (same item; no race).
1558    // If not found in the hashtable we should add it as a deleted item
1559    const DocKey& key = it.getKey();
1560    auto hbl = ht.getLockedBucket(key);
1561    StoredValue* v = ht.unlocked_find(
1562            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1563    if (v) {
1564        if (v->getCas() != it.getCas()) {
1565            return;
1566        }
1567
1568        if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1569            bool deleted = deleteStoredValue(hbl, *v);
1570            if (!deleted) {
1571                throw std::logic_error(
1572                        "VBucket::deleteExpiredItem: "
1573                        "Failed to delete seqno:" +
1574                        std::to_string(v->getBySeqno()) + " from bucket " +
1575                        std::to_string(hbl.getBucketNum()));
1576            }
1577        } else if (v->isExpired(startTime) && !v->isDeleted()) {
1578            VBNotifyCtx notifyCtx;
1579            ht.unlocked_updateStoredValue(hbl.getHTLock(), *v, it);
1580            std::tie(std::ignore, std::ignore, notifyCtx) =
1581                    processExpiredItem(hbl, *v);
1582            // we unlock ht lock here because we want to avoid potential lock
1583            // inversions arising from notifyNewSeqno() call
1584            hbl.getHTLock().unlock();
1585            notifyNewSeqno(notifyCtx);
1586        }
1587    } else {
1588        if (eviction == FULL_EVICTION) {
1589            // Create a temp item and delete and push it
1590            // into the checkpoint queue, only if the bloomfilter
1591            // predicts that the item may exist on disk.
1592            if (maybeKeyExistsInFilter(key)) {
1593                TempAddStatus rv = addTempStoredValue(hbl, key);
1594                if (rv == TempAddStatus::NoMem) {
1595                    return;
1596                }
1597                v = ht.unlocked_find(key,
1598                                     hbl.getBucketNum(),
1599                                     WantsDeleted::Yes,
1600                                     TrackReference::No);
1601                v->setTempDeleted();
1602                v->setRevSeqno(it.getRevSeqno());
1603                ht.unlocked_updateStoredValue(hbl.getHTLock(), *v, it);
1604                VBNotifyCtx notifyCtx;
1605                std::tie(std::ignore, std::ignore, notifyCtx) =
1606                        processExpiredItem(hbl, *v);
1607                // we unlock ht lock here because we want to avoid potential
1608                // lock inversions arising from notifyNewSeqno() call
1609                hbl.getHTLock().unlock();
1610                notifyNewSeqno(notifyCtx);
1611            }
1612        }
1613    }
1614    incExpirationStat(source);
1615}
1616
1617ENGINE_ERROR_CODE VBucket::add(
1618        Item& itm,
1619        const void* cookie,
1620        EventuallyPersistentEngine& engine,
1621        int bgFetchDelay,
1622        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1623    auto hbl = ht.getLockedBucket(itm.getKey());
1624    StoredValue* v = ht.unlocked_find(itm.getKey(),
1625                                      hbl.getBucketNum(),
1626                                      WantsDeleted::Yes,
1627                                      TrackReference::No);
1628
1629    bool maybeKeyExists = true;
1630    if ((v == nullptr || v->isTempInitialItem()) &&
1631        (eviction == FULL_EVICTION)) {
1632        // Check bloomfilter's prediction
1633        if (!maybeKeyExistsInFilter(itm.getKey())) {
1634            maybeKeyExists = false;
1635        }
1636    }
1637
1638    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1639    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1640                               GenerateCas::Yes,
1641                               TrackCasDrift::No,
1642                               /*isBackfillItem*/ false,
1643                               &preLinkDocumentContext);
1644    AddStatus status;
1645    boost::optional<VBNotifyCtx> notifyCtx;
1646    std::tie(status, notifyCtx) = processAdd(
1647            hbl, v, itm, maybeKeyExists, false, queueItmCtx, readHandle);
1648
1649    switch (status) {
1650    case AddStatus::NoMem:
1651        return ENGINE_ENOMEM;
1652    case AddStatus::Exists:
1653        return ENGINE_NOT_STORED;
1654    case AddStatus::AddTmpAndBgFetch:
1655        return addTempItemAndBGFetch(
1656                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1657    case AddStatus::BgFetch:
1658        hbl.getHTLock().unlock();
1659        bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1660        return ENGINE_EWOULDBLOCK;
1661    case AddStatus::Success:
1662    case AddStatus::UnDel:
1663        notifyNewSeqno(*notifyCtx);
1664        itm.setBySeqno(v->getBySeqno());
1665        itm.setCas(v->getCas());
1666        break;
1667    }
1668    return ENGINE_SUCCESS;
1669}
1670
1671std::pair<MutationStatus, GetValue> VBucket::processGetAndUpdateTtl(
1672        HashTable::HashBucketLock& hbl,
1673        StoredValue* v,
1674        time_t exptime,
1675        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1676    if (v) {
1677        if (isLogicallyNonExistent(*v, readHandle)) {
1678            ht.cleanupIfTemporaryItem(hbl, *v);
1679            return {MutationStatus::NotFound, GetValue()};
1680        }
1681
1682        if (!v->isResident()) {
1683            return {MutationStatus::NeedBgFetch, GetValue()};
1684        }
1685
1686        if (v->isLocked(ep_current_time())) {
1687            return {MutationStatus::IsLocked,
1688                    GetValue(nullptr, ENGINE_KEY_EEXISTS, 0)};
1689        }
1690
1691        const bool exptime_mutated = exptime != v->getExptime();
1692        auto bySeqNo = v->getBySeqno();
1693        if (exptime_mutated) {
1694            v->markDirty();
1695            v->setExptime(exptime);
1696            v->setRevSeqno(v->getRevSeqno() + 1);
1697        }
1698
1699        GetValue rv(v->toItem(v->isLocked(ep_current_time()), getId()),
1700                    ENGINE_SUCCESS,
1701                    bySeqNo);
1702
1703        if (exptime_mutated) {
1704            VBQueueItemCtx qItemCtx(GenerateBySeqno::Yes,
1705                                    GenerateCas::Yes,
1706                                    TrackCasDrift::No,
1707                                    false,
1708                                    nullptr);
1709            VBNotifyCtx notifyCtx;
1710            std::tie(v, std::ignore, notifyCtx) =
1711                    updateStoredValue(hbl, *v, *rv.item, qItemCtx, true);
1712            rv.item->setCas(v->getCas());
1713            // we unlock ht lock here because we want to avoid potential lock
1714            // inversions arising from notifyNewSeqno() call
1715            hbl.getHTLock().unlock();
1716            notifyNewSeqno(notifyCtx);
1717        }
1718
1719        return {MutationStatus::WasClean, std::move(rv)};
1720    } else {
1721        if (eviction == VALUE_ONLY) {
1722            return {MutationStatus::NotFound, GetValue()};
1723        } else {
1724            if (maybeKeyExistsInFilter(readHandle.getKey())) {
1725                return {MutationStatus::NeedBgFetch, GetValue()};
1726            } else {
1727                // As bloomfilter predicted that item surely doesn't exist
1728                // on disk, return ENOENT for getAndUpdateTtl().
1729                return {MutationStatus::NotFound, GetValue()};
1730            }
1731        }
1732    }
1733}
1734
1735GetValue VBucket::getAndUpdateTtl(
1736        const void* cookie,
1737        EventuallyPersistentEngine& engine,
1738        int bgFetchDelay,
1739        time_t exptime,
1740        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1741    auto hbl = ht.getLockedBucket(readHandle.getKey());
1742    StoredValue* v = fetchValidValue(hbl,
1743                                     readHandle.getKey(),
1744                                     WantsDeleted::Yes,
1745                                     TrackReference::Yes,
1746                                     QueueExpired::Yes);
1747    GetValue gv;
1748    MutationStatus status;
1749    std::tie(status, gv) = processGetAndUpdateTtl(hbl, v, exptime, readHandle);
1750
1751    if (status == MutationStatus::NeedBgFetch) {
1752        if (v) {
1753            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay);
1754            return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1755        } else {
1756            ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(hbl,
1757                                                         readHandle.getKey(),
1758                                                         cookie,
1759                                                         engine,
1760                                                         bgFetchDelay,
1761                                                         false);
1762            return GetValue(NULL, ec, -1, true);
1763        }
1764    }
1765
1766    return gv;
1767}
1768
1769GetValue VBucket::getInternal(
1770        const void* cookie,
1771        EventuallyPersistentEngine& engine,
1772        int bgFetchDelay,
1773        get_options_t options,
1774        bool diskFlushAll,
1775        GetKeyOnly getKeyOnly,
1776        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1777    const TrackReference trackReference = (options & TRACK_REFERENCE)
1778                                                  ? TrackReference::Yes
1779                                                  : TrackReference::No;
1780    const bool metadataOnly = (options & ALLOW_META_ONLY);
1781    const bool getDeletedValue = (options & GET_DELETED_VALUE);
1782    const bool bgFetchRequired = (options & QUEUE_BG_FETCH);
1783    auto hbl = ht.getLockedBucket(readHandle.getKey());
1784    StoredValue* v = fetchValidValue(hbl,
1785                                     readHandle.getKey(),
1786                                     WantsDeleted::Yes,
1787                                     trackReference,
1788                                     QueueExpired::Yes);
1789    if (v) {
1790        // 1 If SV is deleted and user didn't request deleted items
1791        // 2 (or) If collection says this key is gone.
1792        // then return ENOENT.
1793        if ((v->isDeleted() && !getDeletedValue) ||
1794            readHandle.isLogicallyDeleted(v->getBySeqno())) {
1795            return GetValue();
1796        }
1797
1798        // If SV is a temp deleted item (i.e. marker added after a BgFetch to
1799        // note that the item has been deleted), *but* the user requested
1800        // full deleted items, then we need to fetch the complete deleted item
1801        // (including body) from disk.
1802        if (v->isTempDeletedItem() && getDeletedValue && !metadataOnly) {
1803            const auto queueBgFetch =
1804                    (bgFetchRequired) ? QueueBgFetch::Yes : QueueBgFetch::No;
1805            return getInternalNonResident(readHandle.getKey(),
1806                                          cookie,
1807                                          engine,
1808                                          bgFetchDelay,
1809                                          queueBgFetch,
1810                                          *v);
1811        }
1812
1813        // If SV is otherwise a temp non-existent (i.e. a marker added after a
1814        // BgFetch to note that no such item exists) or temp deleted, then we
1815        // should cleanup the SV (if requested) before returning ENOENT (so we
1816        // don't keep temp items in HT).
1817        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1818            if (options & DELETE_TEMP) {
1819                deleteStoredValue(hbl, *v);
1820            }
1821            return GetValue();
1822        }
1823
1824        // If the value is not resident (and it was requested), wait for it...
1825        if (!v->isResident() && !metadataOnly) {
1826            auto queueBgFetch = (bgFetchRequired) ?
1827                    QueueBgFetch::Yes :
1828                    QueueBgFetch::No;
1829            return getInternalNonResident(readHandle.getKey(),
1830                                          cookie,
1831                                          engine,
1832                                          bgFetchDelay,
1833                                          queueBgFetch,
1834                                          *v);
1835        }
1836
1837        // Should we hide (return -1) for the items' CAS?
1838        const bool hideCas =
1839                (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1840        std::unique_ptr<Item> item;
1841        if (getKeyOnly == GetKeyOnly::Yes) {
1842            item = v->toItemKeyOnly(getId());
1843        } else {
1844            item = v->toItem(hideCas, getId());
1845        }
1846        return GetValue(std::move(item),
1847                        ENGINE_SUCCESS,
1848                        v->getBySeqno(),
1849                        !v->isResident(),
1850                        v->getNRUValue());
1851    } else {
1852        if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1853            return GetValue();
1854        }
1855
1856        if (maybeKeyExistsInFilter(readHandle.getKey())) {
1857            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1858            if (bgFetchRequired) { // Full eviction and need a bg fetch.
1859                ec = addTempItemAndBGFetch(hbl,
1860                                           readHandle.getKey(),
1861                                           cookie,
1862                                           engine,
1863                                           bgFetchDelay,
1864                                           metadataOnly);
1865            }
1866            return GetValue(NULL, ec, -1, true);
1867        } else {
1868            // As bloomfilter predicted that item surely doesn't exist
1869            // on disk, return ENOENT, for getInternal().
1870            return GetValue();
1871        }
1872    }
1873}
1874
1875ENGINE_ERROR_CODE VBucket::getMetaData(
1876        const void* cookie,
1877        EventuallyPersistentEngine& engine,
1878        int bgFetchDelay,
1879        const Collections::VB::Manifest::CachingReadHandle& readHandle,
1880        ItemMetaData& metadata,
1881        uint32_t& deleted,
1882        uint8_t& datatype) {
1883    deleted = 0;
1884    auto hbl = ht.getLockedBucket(readHandle.getKey());
1885    StoredValue* v = ht.unlocked_find(readHandle.getKey(),
1886                                      hbl.getBucketNum(),
1887                                      WantsDeleted::Yes,
1888                                      TrackReference::No);
1889
1890    if (v) {
1891        stats.numOpsGetMeta++;
1892        if (v->isTempInitialItem()) {
1893            // Need bg meta fetch.
1894            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay, true);
1895            return ENGINE_EWOULDBLOCK;
1896        } else if (v->isTempNonExistentItem()) {
1897            metadata.cas = v->getCas();
1898            return ENGINE_KEY_ENOENT;
1899        } else if (readHandle.isLogicallyDeleted(v->getBySeqno())) {
1900            return ENGINE_KEY_ENOENT;
1901        } else {
1902            if (v->isTempDeletedItem() || v->isDeleted() ||
1903                v->isExpired(ep_real_time())) {
1904                deleted |= GET_META_ITEM_DELETED_FLAG;
1905            }
1906
1907            if (v->isLocked(ep_current_time())) {
1908                metadata.cas = static_cast<uint64_t>(-1);
1909            } else {
1910                metadata.cas = v->getCas();
1911            }
1912            metadata.flags = v->getFlags();
1913            metadata.exptime = v->getExptime();
1914            metadata.revSeqno = v->getRevSeqno();
1915            datatype = v->getDatatype();
1916
1917            return ENGINE_SUCCESS;
1918        }
1919    } else {
1920        // The key wasn't found. However, this may be because it was previously
1921        // deleted or evicted with the full eviction strategy.
1922        // So, add a temporary item corresponding to the key to the hash table
1923        // and schedule a background fetch for its metadata from the persistent
1924        // store. The item's state will be updated after the fetch completes.
1925        //
1926        // Schedule this bgFetch only if the key is predicted to be may-be
1927        // existent on disk by the bloomfilter.
1928
1929        if (maybeKeyExistsInFilter(readHandle.getKey())) {
1930            return addTempItemAndBGFetch(hbl,
1931                                         readHandle.getKey(),
1932                                         cookie,
1933                                         engine,
1934                                         bgFetchDelay,
1935                                         true);
1936        } else {
1937            stats.numOpsGetMeta++;
1938            return ENGINE_KEY_ENOENT;
1939        }
1940    }
1941}
1942
1943ENGINE_ERROR_CODE VBucket::getKeyStats(
1944        const void* cookie,
1945        EventuallyPersistentEngine& engine,
1946        int bgFetchDelay,
1947        struct key_stats& kstats,
1948        WantsDeleted wantsDeleted,
1949        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1950    auto hbl = ht.getLockedBucket(readHandle.getKey());
1951    StoredValue* v = fetchValidValue(hbl,
1952                                     readHandle.getKey(),
1953                                     WantsDeleted::Yes,
1954                                     TrackReference::Yes,
1955                                     QueueExpired::Yes);
1956
1957    if (v) {
1958        if ((v->isDeleted() ||
1959             readHandle.isLogicallyDeleted(v->getBySeqno())) &&
1960            wantsDeleted == WantsDeleted::No) {
1961            return ENGINE_KEY_ENOENT;
1962        }
1963
1964        if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1965            deleteStoredValue(hbl, *v);
1966            return ENGINE_KEY_ENOENT;
1967        }
1968        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1969            hbl.getHTLock().unlock();
1970            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay, true);
1971            return ENGINE_EWOULDBLOCK;
1972        }
1973        kstats.logically_deleted =
1974                v->isDeleted() ||
1975                readHandle.isLogicallyDeleted(v->getBySeqno());
1976        kstats.dirty = v->isDirty();
1977        kstats.exptime = v->getExptime();
1978        kstats.flags = v->getFlags();
1979        kstats.cas = v->getCas();
1980        kstats.vb_state = getState();
1981        kstats.resident = v->isResident();
1982
1983        return ENGINE_SUCCESS;
1984    } else {
1985        if (eviction == VALUE_ONLY) {
1986            return ENGINE_KEY_ENOENT;
1987        } else {
1988            if (maybeKeyExistsInFilter(readHandle.getKey())) {
1989                return addTempItemAndBGFetch(hbl,
1990                                             readHandle.getKey(),
1991                                             cookie,
1992                                             engine,
1993                                             bgFetchDelay,
1994                                             true);
1995            } else {
1996                // If bgFetch were false, or bloomfilter predicted that
1997                // item surely doesn't exist on disk, return ENOENT for
1998                // getKeyStats().
1999                return ENGINE_KEY_ENOENT;
2000            }
2001        }
2002    }
2003}
2004
2005GetValue VBucket::getLocked(
2006        rel_time_t currentTime,
2007        uint32_t lockTimeout,
2008        const void* cookie,
2009        EventuallyPersistentEngine& engine,
2010        int bgFetchDelay,
2011        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2012    auto hbl = ht.getLockedBucket(readHandle.getKey());
2013    StoredValue* v = fetchValidValue(hbl,
2014                                     readHandle.getKey(),
2015                                     WantsDeleted::Yes,
2016                                     TrackReference::Yes,
2017                                     QueueExpired::Yes);
2018
2019    if (v) {
2020        if (isLogicallyNonExistent(*v, readHandle)) {
2021            ht.cleanupIfTemporaryItem(hbl, *v);
2022            return GetValue(NULL, ENGINE_KEY_ENOENT);
2023        }
2024
2025        // if v is locked return error
2026        if (v->isLocked(currentTime)) {
2027            return GetValue(NULL, ENGINE_LOCKED_TMPFAIL);
2028        }
2029
2030        // If the value is not resident, wait for it...
2031        if (!v->isResident()) {
2032            if (cookie) {
2033                bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay);
2034            }
2035            return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
2036        }
2037
2038        // acquire lock and increment cas value
2039        v->lock(currentTime + lockTimeout);
2040
2041        auto it = v->toItem(false, getId());
2042        it->setCas(nextHLCCas());
2043        v->setCas(it->getCas());
2044
2045        return GetValue(std::move(it));
2046
2047    } else {
2048        // No value found in the hashtable.
2049        switch (eviction) {
2050        case VALUE_ONLY:
2051            return GetValue(NULL, ENGINE_KEY_ENOENT);
2052
2053        case FULL_EVICTION:
2054            if (maybeKeyExistsInFilter(readHandle.getKey())) {
2055                ENGINE_ERROR_CODE ec =
2056                        addTempItemAndBGFetch(hbl,
2057                                              readHandle.getKey(),
2058                                              cookie,
2059                                              engine,
2060                                              bgFetchDelay,
2061                                              false);
2062                return GetValue(NULL, ec, -1, true);
2063            } else {
2064                // As bloomfilter predicted that item surely doesn't exist
2065                // on disk, return ENOENT for getLocked().
2066                return GetValue(NULL, ENGINE_KEY_ENOENT);
2067            }
2068        }
2069        return GetValue(); // just to prevent compiler warning
2070    }
2071}
2072
2073void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
2074    auto hbl = ht.getLockedBucket(queuedItem.getKey());
2075    StoredValue* v = fetchValidValue(hbl,
2076                                     queuedItem.getKey(),
2077                                     WantsDeleted::Yes,
2078                                     TrackReference::No,
2079                                     QueueExpired::Yes);
2080    // Delete the item in the hash table iff:
2081    //  1. Item is existent in hashtable, and deleted flag is true
2082    //  2. rev seqno of queued item matches rev seqno of hash table item
2083    if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
2084        bool isDeleted = deleteStoredValue(hbl, *v);
2085        if (!isDeleted) {
2086            throw std::logic_error(
2087                    "deletedOnDiskCbk:callback: "
2088                    "Failed to delete key with seqno:" +
2089                    std::to_string(v->getBySeqno()) + "' from bucket " +
2090                    std::to_string(hbl.getBucketNum()));
2091        }
2092
2093        /**
2094         * Deleted items are to be added to the bloomfilter,
2095         * in either eviction policy.
2096         */
2097        addToFilter(queuedItem.getKey());
2098    }
2099
2100    if (deleted) {
2101        ++stats.totalPersisted;
2102        ++opsDelete;
2103
2104        /**
2105         * MB-30137: Decrement the total number of on-disk items. This needs to be
2106         * done to ensure that the item count is accurate in the case of full
2107         * eviction
2108         */
2109        if (v) {
2110            decrNumTotalItems();
2111        }
2112    }
2113    doStatsForFlushing(queuedItem, queuedItem.size());
2114    --stats.diskQueueSize;
2115    decrMetaDataDisk(queuedItem);
2116}
2117
2118bool VBucket::deleteKey(const DocKey& key) {
2119    auto hbl = ht.getLockedBucket(key);
2120    StoredValue* v = ht.unlocked_find(
2121            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
2122    if (!v) {
2123        return false;
2124    }
2125    return deleteStoredValue(hbl, *v);
2126}
2127
2128void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
2129                                  uint64_t prevHighSeqno) {
2130    failovers->pruneEntries(rollbackResult.highSeqno);
2131    checkpointManager->clear(*this, rollbackResult.highSeqno);
2132    setPersistedSnapshot(rollbackResult.snapStartSeqno,
2133                         rollbackResult.snapEndSeqno);
2134    incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
2135    checkpointManager->setOpenCheckpointId(1);
2136}
2137
2138void VBucket::dump() const {
2139    std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
2140              << " numItems:" << getNumItems()
2141              << " numNonResident:" << getNumNonResidentItems()
2142              << " ht: " << std::endl << "  " << ht << std::endl
2143              << "]" << std::endl;
2144}
2145
2146void VBucket::setMutationMemoryThreshold(double memThreshold) {
2147    if (memThreshold > 0.0 && memThreshold <= 1.0) {
2148        mutationMemThreshold = memThreshold;
2149    }
2150}
2151
2152bool VBucket::hasMemoryForStoredValue(EPStats& st,
2153                                      const Item& item,
2154                                      bool isReplication) {
2155    double newSize = static_cast<double>(estimateNewMemoryUsage(st, item));
2156    double maxSize = static_cast<double>(st.getMaxDataSize());
2157    if (isReplication) {
2158        return newSize <= (maxSize * st.replicationThrottleThreshold);
2159    } else {
2160        return newSize <= (maxSize * mutationMemThreshold);
2161    }
2162}
2163
2164void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
2165    addStat(NULL, toString(state), add_stat, c);
2166    if (details) {
2167        size_t numItems = getNumItems();
2168        size_t tempItems = getNumTempItems();
2169        addStat("num_items", numItems, add_stat, c);
2170        addStat("num_temp_items", tempItems, add_stat, c);
2171        addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
2172        addStat("ht_memory", ht.memorySize(), add_stat, c);
2173        addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
2174        addStat("ht_item_memory_uncompressed",
2175                ht.getUncompressedItemMemory(),
2176                add_stat,
2177                c);
2178        addStat("ht_cache_size", ht.getCacheSize(), add_stat, c);
2179        addStat("ht_size", ht.getSize(), add_stat, c);
2180        addStat("num_ejects", ht.getNumEjects(), add_stat, c);
2181        addStat("ops_create", opsCreate.load(), add_stat, c);
2182        addStat("ops_update", opsUpdate.load(), add_stat, c);
2183        addStat("ops_delete", opsDelete.load(), add_stat, c);
2184        addStat("ops_reject", opsReject.load(), add_stat, c);
2185        addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
2186        addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
2187        addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
2188        addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
2189        addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
2190        addStat("queue_age", getQueueAge(), add_stat, c);
2191        addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
2192
2193        addStat("high_seqno", getHighSeqno(), add_stat, c);
2194        addStat("uuid", failovers->getLatestUUID(), add_stat, c);
2195        addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
2196        addStat("bloom_filter", getFilterStatusString().data(),
2197                add_stat, c);
2198        addStat("bloom_filter_size", getFilterSize(), add_stat, c);
2199        addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
2200        addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
2201        addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
2202        addStat("might_contain_xattrs", mightContainXattrs(), add_stat, c);
2203        addStat("max_deleted_revid", ht.getMaxDeletedRevSeqno(), add_stat, c);
2204        hlc.addStats(statPrefix, add_stat, c);
2205    }
2206}
2207
2208void VBucket::decrDirtyQueueMem(size_t decrementBy)
2209{
2210    size_t oldVal, newVal;
2211    do {
2212        oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
2213        if (oldVal < decrementBy) {
2214            newVal = 0;
2215        } else {
2216            newVal = oldVal - decrementBy;
2217        }
2218    } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
2219}
2220
2221void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
2222{
2223    uint64_t oldVal, newVal;
2224    do {
2225        oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
2226        if (oldVal < decrementBy) {
2227            newVal = 0;
2228        } else {
2229            newVal = oldVal - decrementBy;
2230        }
2231    } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
2232}
2233
2234void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
2235{
2236    size_t oldVal, newVal;
2237    do {
2238        oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
2239        if (oldVal < decrementBy) {
2240            newVal = 0;
2241        } else {
2242            newVal = oldVal - decrementBy;
2243        }
2244    } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
2245}
2246
2247std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
2248        const HashTable::HashBucketLock& hbl,
2249        StoredValue*& v,
2250        Item& itm,
2251        uint64_t cas,
2252        bool allowExisting,
2253        bool hasMetaData,
2254        const VBQueueItemCtx& queueItmCtx,
2255        cb::StoreIfStatus storeIfStatus,
2256        bool maybeKeyExists,
2257        bool isReplication) {
2258    if (!hbl.getHTLock()) {
2259        throw std::invalid_argument(
2260                "VBucket::processSet: htLock not held for "
2261                "VBucket " +
2262                std::to_string(getId()));
2263    }
2264
2265    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2266        return {MutationStatus::NoMem, {}};
2267    }
2268
2269    if (v == nullptr && itm.isDeleted() && cas &&
2270        !areDeletedItemsAlwaysResident()) {
2271        // Request to perform a CAS operation on a deleted body which may
2272        // not be resident. Need a bg_fetch to be able to perform this request.
2273        return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
2274    }
2275
2276    // bgFetch only in FE, only if the bloom-filter thinks the key may exist.
2277    // But only for cas operations or if a store_if is requiring the item_info.
2278    if (eviction == FULL_EVICTION && maybeKeyExists &&
2279        (cas || storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
2280        if (!v || v->isTempInitialItem()) {
2281            return {MutationStatus::NeedBgFetch, {}};
2282        }
2283    }
2284
2285    /*
2286     * prior to checking for the lock, we should check if this object
2287     * has expired. If so, then check if CAS value has been provided
2288     * for this set op. In this case the operation should be denied since
2289     * a cas operation for a key that doesn't exist is not a very cool
2290     * thing to do. See MB 3252
2291     */
2292    if (v && v->isExpired(ep_real_time()) && !hasMetaData && !itm.isDeleted()) {
2293        if (v->isLocked(ep_current_time())) {
2294            v->unlock();
2295        }
2296        if (cas) {
2297            /* item has expired and cas value provided. Deny ! */
2298            return {MutationStatus::NotFound, {}};
2299        }
2300    }
2301
2302    if (v) {
2303        if (!allowExisting && !v->isTempItem() && !v->isDeleted()) {
2304            return {MutationStatus::InvalidCas, {}};
2305        }
2306        if (v->isLocked(ep_current_time())) {
2307            /*
2308             * item is locked, deny if there is cas value mismatch
2309             * or no cas value is provided by the user
2310             */
2311            if (cas != v->getCas()) {
2312                return {MutationStatus::IsLocked, {}};
2313            }
2314            /* allow operation*/
2315            v->unlock();
2316        } else if (cas && cas != v->getCas()) {
2317            if (v->isTempNonExistentItem()) {
2318                // This is a temporary item which marks a key as non-existent;
2319                // therefore specifying a non-matching CAS should be exposed
2320                // as item not existing.
2321                return {MutationStatus::NotFound, {}};
2322            }
2323            if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
2324                // Existing item is deleted, and we are not replacing it with
2325                // a (different) deleted value - return not existing.
2326                return {MutationStatus::NotFound, {}};
2327            }
2328            // None of the above special cases; the existing item cannot be
2329            // modified with the specified CAS.
2330            return {MutationStatus::InvalidCas, {}};
2331        }
2332        if (!hasMetaData) {
2333            itm.setRevSeqno(v->getRevSeqno() + 1);
2334            /* MB-23530: We must ensure that a replace operation (i.e.
2335             * set with a CAS) /fails/ if the old document is deleted; it
2336             * logically "doesn't exist". However, if the new value is deleted
2337             * this op is a /delete/ with a CAS and we must permit a
2338             * deleted -> deleted transition for Deleted Bodies.
2339             */
2340            if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
2341                !itm.isDeleted()) {
2342                return {MutationStatus::NotFound, {}};
2343            }
2344        }
2345
2346        MutationStatus status;
2347        VBNotifyCtx notifyCtx;
2348        std::tie(v, status, notifyCtx) =
2349                updateStoredValue(hbl, *v, itm, queueItmCtx);
2350        return {status, notifyCtx};
2351    } else if (cas != 0) {
2352        return {MutationStatus::NotFound, {}};
2353    } else {
2354        VBNotifyCtx notifyCtx;
2355        auto genRevSeqno = hasMetaData ? GenerateRevSeqno::No :
2356                           GenerateRevSeqno::Yes;
2357        std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx,
2358                                                   genRevSeqno);
2359        itm.setRevSeqno(v->getRevSeqno());
2360        return {MutationStatus::WasClean, notifyCtx};
2361    }
2362}
2363
2364std::pair<AddStatus, boost::optional<VBNotifyCtx>> VBucket::processAdd(
2365        const HashTable::HashBucketLock& hbl,
2366        StoredValue*& v,
2367        Item& itm,
2368        bool maybeKeyExists,
2369        bool isReplication,
2370        const VBQueueItemCtx& queueItmCtx,
2371        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2372    if (!hbl.getHTLock()) {
2373        throw std::invalid_argument(
2374                "VBucket::processAdd: htLock not held for "
2375                "VBucket " +
2376                std::to_string(getId()));
2377    }
2378    if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2379        !v->isTempItem() && !readHandle.isLogicallyDeleted(v->getBySeqno())) {
2380        return {AddStatus::Exists, {}};
2381    }
2382    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2383        return {AddStatus::NoMem, {}};
2384    }
2385
2386    std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, {}};
2387
2388    if (v) {
2389        if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2390            maybeKeyExists) {
2391            // Need to figure out if an item exists on disk
2392            return {AddStatus::BgFetch, {}};
2393        }
2394
2395        rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2396                           ? AddStatus::UnDel
2397                           : AddStatus::Success;
2398
2399        if (v->isTempDeletedItem()) {
2400            itm.setRevSeqno(v->getRevSeqno() + 1);
2401        } else {
2402            itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2403        }
2404
2405        if (!v->isTempItem()) {
2406            itm.setRevSeqno(v->getRevSeqno() + 1);
2407        }
2408
2409        std::tie(v, std::ignore, rv.second) =
2410                updateStoredValue(hbl, *v, itm, queueItmCtx);
2411    } else {
2412        if (itm.getBySeqno() != StoredValue::state_temp_init) {
2413            if (eviction == FULL_EVICTION && maybeKeyExists) {
2414                return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2415            }
2416        }
2417
2418        if (itm.getBySeqno() == StoredValue::state_temp_init) {
2419            /* A 'temp initial item' is just added to the hash table. It is
2420             not put on checkpoint manager or sequence list */
2421            v = ht.unlocked_addNewStoredValue(hbl, itm);
2422            updateRevSeqNoOfNewStoredValue(*v);
2423        } else {
2424            std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx,
2425                                                       GenerateRevSeqno::Yes);
2426        }
2427
2428        itm.setRevSeqno(v->getRevSeqno());
2429
2430        if (v->isTempItem()) {
2431            rv.first = AddStatus::BgFetch;
2432        }
2433    }
2434
2435    if (v->isTempItem()) {
2436        v->setNRUValue(MAX_NRU_VALUE);
2437    }
2438    return rv;
2439}
2440
2441std::tuple<MutationStatus, StoredValue*, boost::optional<VBNotifyCtx>>
2442VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2443                           StoredValue& v,
2444                           uint64_t cas,
2445                           const ItemMetaData& metadata,
2446                           const VBQueueItemCtx& queueItmCtx,
2447                           bool use_meta,
2448                           uint64_t bySeqno) {
2449    boost::optional<VBNotifyCtx> empty;
2450    if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2451        return std::make_tuple(MutationStatus::NeedBgFetch, &v, empty);
2452    }
2453
2454    if (v.isLocked(ep_current_time())) {
2455        if (cas != v.getCas()) {
2456            return std::make_tuple(MutationStatus::IsLocked, &v, empty);
2457        }
2458        v.unlock();
2459    }
2460
2461    if (cas != 0 && cas != v.getCas()) {
2462        return std::make_tuple(MutationStatus::InvalidCas, &v, empty);
2463    }
2464
2465    /* allow operation */
2466    v.unlock();
2467
2468    MutationStatus rv =
2469            v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2470
2471    if (use_meta) {
2472        v.setCas(metadata.cas);
2473        v.setFlags(metadata.flags);
2474        v.setExptime(metadata.exptime);
2475    }
2476
2477    v.setRevSeqno(metadata.revSeqno);
2478    VBNotifyCtx notifyCtx;
2479    StoredValue* newSv;
2480    std::tie(newSv, notifyCtx) =
2481            softDeleteStoredValue(hbl,
2482                                  v,
2483                                  /*onlyMarkDeleted*/ false,
2484                                  queueItmCtx,
2485                                  bySeqno);
2486    ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2487    return std::make_tuple(rv, newSv, notifyCtx);
2488}
2489
2490std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2491VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2492                            StoredValue& v) {
2493    if (!hbl.getHTLock()) {
2494        throw std::invalid_argument(
2495                "VBucket::processExpiredItem: htLock not held for VBucket " +
2496                std::to_string(getId()));
2497    }
2498
2499    if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2500        return std::make_tuple(MutationStatus::NeedBgFetch,
2501                               &v,
2502                               queueDirty(v,
2503
2504                                          GenerateBySeqno::Yes,
2505                                          GenerateCas::Yes,
2506                                          GenerateDeleteTime::Yes,
2507                                          /*isBackfillItem*/ false));
2508    }
2509
2510    /* If the datatype is XATTR, mark the item as deleted
2511     * but don't delete the value as system xattrs can
2512     * still be queried by mobile clients even after
2513     * deletion.
2514     * TODO: The current implementation is inefficient
2515     * but functionally correct and for performance reasons
2516     * only the system xattrs need to be stored.
2517     */
2518    value_t value = v.getValue();
2519    bool onlyMarkDeleted = value && mcbp::datatype::is_xattr(v.getDatatype());
2520    v.setRevSeqno(v.getRevSeqno() + 1);
2521    VBNotifyCtx notifyCtx;
2522    StoredValue* newSv;
2523    std::tie(newSv, notifyCtx) =
2524            softDeleteStoredValue(hbl,
2525                                  v,
2526                                  onlyMarkDeleted,
2527                                  VBQueueItemCtx(GenerateBySeqno::Yes,
2528                                                 GenerateCas::Yes,
2529                                                 GenerateDeleteTime::Yes,
2530                                                 TrackCasDrift::No,
2531                                                 /*isBackfillItem*/ false,
2532                                                 nullptr /* no pre link */),
2533                                  v.getBySeqno());
2534    ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2535    return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2536}
2537
2538bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2539                                StoredValue& v) {
2540    if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2541        return false;
2542    }
2543
2544    /* StoredValue deleted here. If any other in-memory data structures are
2545       using the StoredValue intrusively then they must have handled the delete
2546       by this point */
2547    ht.unlocked_del(hbl, v.getKey());
2548    return true;
2549}
2550
2551TempAddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2552                                          const DocKey& key,
2553                                          bool isReplication) {
2554    if (!hbl.getHTLock()) {
2555        throw std::invalid_argument(
2556                "VBucket::addTempStoredValue: htLock not held for "
2557                "VBucket " +
2558                std::to_string(getId()));
2559    }
2560
2561    Item itm(key,
2562             /*flags*/ 0,
2563             /*exp*/ 0,
2564             /*data*/ NULL,
2565             /*size*/ 0,
2566             PROTOCOL_BINARY_RAW_BYTES,
2567             0,
2568             StoredValue::state_temp_init);
2569
2570    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2571        return TempAddStatus::NoMem;
2572    }
2573
2574    /* A 'temp initial item' is just added to the hash table. It is
2575       not put on checkpoint manager or sequence list */
2576    StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
2577
2578    updateRevSeqNoOfNewStoredValue(*v);
2579    itm.setRevSeqno(v->getRevSeqno());
2580    v->setNRUValue(MAX_NRU_VALUE);
2581
2582    return TempAddStatus::BgFetch;
2583}
2584
2585void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2586    if (newSeqnoCb) {
2587        newSeqnoCb->callback(getId(), notifyCtx);
2588    }
2589}
2590
2591/*
2592 * Queue the item to the checkpoint and return the seqno the item was
2593 * allocated.
2594 */
2595int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2596    item->setVBucketId(id);
2597    queued_item qi(item);
2598    if (isBackfillPhase()) {
2599        queueBackfillItem(qi, getGenerateBySeqno(seqno));
2600    } else {
2601        checkpointManager->queueDirty(
2602                *this,
2603                qi,
2604                getGenerateBySeqno(seqno),
2605                GenerateCas::Yes,
2606                nullptr /* No pre link step as this is for system events */);
2607    }
2608    VBNotifyCtx notifyCtx;
2609    // If the seqno is initialized, skip replication notification
2610    notifyCtx.notifyReplication = !seqno.is_initialized();
2611    notifyCtx.notifyFlusher = true;
2612    notifyCtx.bySeqno = qi->getBySeqno();
2613    notifyNewSeqno(notifyCtx);
2614    return qi->getBySeqno();
2615}
2616
2617VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2618                                const VBQueueItemCtx& queueItmCtx) {
2619    if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2620        setMaxCasAndTrackDrift(v.getCas());
2621    }
2622    return queueDirty(v,
2623                      queueItmCtx.genBySeqno,
2624                      queueItmCtx.genCas,
2625                      queueItmCtx.generateDeleteTime,
2626                      queueItmCtx.isBackfillItem,
2627                      queueItmCtx.preLinkDocumentContext);
2628}
2629
2630void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2631    /**
2632     * Possibly, this item is being recreated. Conservatively assign it
2633     * a seqno that is greater than the greatest seqno of all deleted
2634     * items seen so far.
2635     */
2636    uint64_t seqno = ht.getMaxDeletedRevSeqno();
2637    if (!v.isTempItem()) {
2638        ++seqno;
2639    }
2640    v.setRevSeqno(seqno);
2641}
2642
2643void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2644                                     const void* cookie,
2645                                     HighPriorityVBNotify reqType) {
2646    std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2647    hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2648    numHpVBReqs.store(hpVBReqs.size());
2649
2650    LOG(EXTENSION_LOG_NOTICE,
2651        "Added high priority async request %s "
2652        "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2653        "Persisted upto:%" PRIu64 ", cookie:%p",
2654        to_string(reqType).c_str(),
2655        getId(),
2656        seqnoOrChkId,
2657        getPersistenceSeqno(),
2658        cookie);
2659}
2660
2661std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2662        EventuallyPersistentEngine& engine,
2663        uint64_t idNum,
2664        HighPriorityVBNotify notifyType) {
2665    std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2666    std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2667
2668    auto entry = hpVBReqs.begin();
2669
2670    while (entry != hpVBReqs.end()) {
2671        if (notifyType != entry->reqType) {
2672            ++entry;
2673            continue;
2674        }
2675
2676        std::string logStr(to_string(notifyType));
2677
2678        auto wall_time = ProcessClock::now() - entry->start;
2679        auto spent =
2680                std::chrono::duration_cast<std::chrono::seconds>(wall_time);
2681        if (entry->id <= idNum) {
2682            toNotify[entry->cookie] = ENGINE_SUCCESS;
2683            stats.chkPersistenceHisto.add(
2684                    std::chrono::duration_cast<std::chrono::microseconds>(
2685                            wall_time));
2686            adjustCheckpointFlushTimeout(spent);
2687            LOG(EXTENSION_LOG_NOTICE,
2688                "Notified the completion of %s "
2689                "for vbucket %" PRIu16 ", Check for: %" PRIu64
2690                ", "
2691                "Persisted upto: %" PRIu64 ", cookie %p",
2692                logStr.c_str(),
2693                getId(),
2694                entry->id,
2695                idNum,
2696                entry->cookie);
2697            entry = hpVBReqs.erase(entry);
2698        } else if (spent > getCheckpointFlushTimeout()) {
2699            adjustCheckpointFlushTimeout(spent);
2700            engine.storeEngineSpecific(entry->cookie, NULL);
2701            toNotify[entry->cookie] = ENGINE_TMPFAIL;
2702            LOG(EXTENSION_LOG_WARNING,
2703                "Notified the timeout on %s "
2704                "for vbucket %" PRIu16 ", Check for: %" PRIu64
2705                ", "
2706                "Persisted upto: %" PRIu64 ", cookie %p",
2707                logStr.c_str(),
2708                getId(),
2709                entry->id,
2710                idNum,
2711                entry->cookie);
2712            entry = hpVBReqs.erase(entry);
2713        } else {
2714            ++entry;
2715        }
2716    }
2717    numHpVBReqs.store(hpVBReqs.size());
2718    return toNotify;
2719}
2720
2721std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2722        EventuallyPersistentEngine& engine) {
2723    std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2724
2725    LockHolder lh(hpVBReqsMutex);
2726
2727    for (auto& entry : hpVBReqs) {
2728        toNotify[entry.cookie] = ENGINE_TMPFAIL;
2729        engine.storeEngineSpecific(entry.cookie, NULL);
2730    }
2731    hpVBReqs.clear();
2732
2733    return toNotify;
2734}
2735
2736void VBucket::adjustCheckpointFlushTimeout(std::chrono::seconds wall_time) {
2737    auto middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2738
2739    if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2740        chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2741    } else if (wall_time <= middle) {
2742        chkFlushTimeout = middle;
2743    } else {
2744        chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2745    }
2746}
2747
2748std::chrono::seconds VBucket::getCheckpointFlushTimeout() {
2749    return std::chrono::duration_cast<std::chrono::seconds>(
2750            chkFlushTimeout.load());
2751}
2752
2753std::unique_ptr<Item> VBucket::pruneXattrDocument(
2754        StoredValue& v, const ItemMetaData& itemMeta) {
2755    // Need to take a copy of the value, prune it, and add it back
2756
2757    // Create work-space document
2758    std::vector<char> workspace(
2759            v.getValue()->getData(),
2760            v.getValue()->getData() + v.getValue()->valueSize());
2761
2762    // Now attach to the XATTRs in the document
2763    cb::xattr::Blob xattr({workspace.data(), workspace.size()},
2764                          mcbp::datatype::is_snappy(v.getDatatype()));
2765    xattr.prune_user_keys();
2766
2767    auto prunedXattrs = xattr.finalize();
2768
2769    if (prunedXattrs.size()) {
2770        // Something remains - Create a Blob and copy-in just the XATTRs
2771        auto newValue =
2772                Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2773                          prunedXattrs.size());
2774        auto rv = v.toItem(false, getId());
2775        rv->setCas(itemMeta.cas);
2776        rv->setFlags(itemMeta.flags);
2777        rv->setExpTime(itemMeta.exptime);
2778        rv->setRevSeqno(itemMeta.revSeqno);
2779        rv->replaceValue(newValue);
2780        rv->setDataType(PROTOCOL_BINARY_DATATYPE_XATTR);
2781        return rv;
2782    } else {
2783        return {};
2784    }
2785}
2786
2787void VBucket::removeKey(const DocKey& key, int64_t bySeqno) {
2788    auto hbl = ht.getLockedBucket(key);
2789    StoredValue* v = fetchValidValue(
2790            hbl, key, WantsDeleted::No, TrackReference::No, QueueExpired::Yes);
2791
2792    if (v && v->getBySeqno() == bySeqno) {
2793        ht.unlocked_del(hbl, v->getKey());
2794    }
2795}
2796
2797bool VBucket::isLogicallyNonExistent(
2798        const StoredValue& v,
2799        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2800    return v.isDeleted() || v.isTempDeletedItem() ||
2801           v.isTempNonExistentItem() ||
2802           readHandle.isLogicallyDeleted(v.getBySeqno());
2803}
2804
2805void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
2806    // If the vbucket is marked as deleting then we must schedule task to
2807    // perform the resource destruction (memory/disk).
2808    if (vb->isDeletionDeferred()) {
2809        vb->scheduleDeferredDeletion(engine);
2810        return;
2811    }
2812    delete vb;
2813}
2814
2815void VBucket::setFreqSaturatedCallback(std::function<void()> callbackFunction) {
2816    ht.setFreqSaturatedCallback(callbackFunction);
2817}
2818