xref: /5.5.2/kv_engine/engines/ep/src/vbucket.cc (revision 865fbe39)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "atomic.h"
21#include "bgfetcher.h"
22#include "checkpoint.h"
23#include "conflict_resolution.h"
24#include "ep_engine.h"
25#include "ep_time.h"
26#include "ep_types.h"
27#include "failover-table.h"
28#include "flusher.h"
29#include "hash_table.h"
30#include "pre_link_document_context.h"
31#include "statwriter.h"
32#include "stored_value_factories.h"
33#include "vbucket.h"
34#include "vbucketdeletiontask.h"
35
36#include <platform/compress.h>
37#include <xattr/blob.h>
38#include <xattr/utils.h>
39
40#include <functional>
41#include <list>
42#include <set>
43#include <string>
44#include <vector>
45
46/* Macros */
47const auto MIN_CHK_FLUSH_TIMEOUT = std::chrono::seconds(10);
48const auto MAX_CHK_FLUSH_TIMEOUT = std::chrono::seconds(30);
49
50/* Statics definitions */
51cb::AtomicDuration VBucket::chkFlushTimeout(MIN_CHK_FLUSH_TIMEOUT);
52double VBucket::mutationMemThreshold = 0.9;
53
54VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
55    std::vector<uint16_t> tmp(acceptable.size() + other.size());
56    std::vector<uint16_t>::iterator end;
57    end = std::set_symmetric_difference(acceptable.begin(),
58                                        acceptable.end(),
59                                        other.acceptable.begin(),
60                                        other.acceptable.end(),
61                                        tmp.begin());
62    return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
63}
64
65VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
66                                                                        const {
67    std::vector<uint16_t> tmp(acceptable.size() + other.size());
68    std::vector<uint16_t>::iterator end;
69
70    end = std::set_intersection(acceptable.begin(), acceptable.end(),
71                                other.acceptable.begin(),
72                                other.acceptable.end(),
73                                tmp.begin());
74    return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
75}
76
77static bool isRange(std::set<uint16_t>::const_iterator it,
78                    const std::set<uint16_t>::const_iterator &end,
79                    size_t &length)
80{
81    length = 0;
82    for (uint16_t val = *it;
83         it != end && (val + length) == *it;
84         ++it, ++length) {
85        // empty
86    }
87
88    --length;
89
90    return length > 1;
91}
92
93std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
94{
95    std::set<uint16_t>::const_iterator it;
96
97    if (filter.acceptable.empty()) {
98        out << "{ empty }";
99    } else {
100        bool needcomma = false;
101        out << "{ ";
102        for (it = filter.acceptable.begin();
103             it != filter.acceptable.end();
104             ++it) {
105            if (needcomma) {
106                out << ", ";
107            }
108
109            size_t length;
110            if (isRange(it, filter.acceptable.end(), length)) {
111                std::set<uint16_t>::iterator last = it;
112                for (size_t i = 0; i < length; ++i) {
113                    ++last;
114                }
115                out << "[" << *it << "," << *last << "]";
116                it = last;
117            } else {
118                out << *it;
119            }
120            needcomma = true;
121        }
122        out << " }";
123    }
124
125    return out;
126}
127
128const vbucket_state_t VBucket::ACTIVE =
129                     static_cast<vbucket_state_t>(htonl(vbucket_state_active));
130const vbucket_state_t VBucket::REPLICA =
131                    static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
132const vbucket_state_t VBucket::PENDING =
133                    static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
134const vbucket_state_t VBucket::DEAD =
135                    static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
136
137VBucket::VBucket(id_type i,
138                 vbucket_state_t newState,
139                 EPStats& st,
140                 CheckpointConfig& chkConfig,
141                 int64_t lastSeqno,
142                 uint64_t lastSnapStart,
143                 uint64_t lastSnapEnd,
144                 std::unique_ptr<FailoverTable> table,
145                 std::shared_ptr<Callback<id_type>> flusherCb,
146                 std::unique_ptr<AbstractStoredValueFactory> valFact,
147                 NewSeqnoCallback newSeqnoCb,
148                 Configuration& config,
149                 item_eviction_policy_t evictionPolicy,
150                 vbucket_state_t initState,
151                 uint64_t purgeSeqno,
152                 uint64_t maxCas,
153                 int64_t hlcEpochSeqno,
154                 bool mightContainXattrs,
155                 const std::string& collectionsManifest)
156    : ht(st, std::move(valFact), config.getHtSize(), config.getHtLocks()),
157      checkpointManager(std::make_unique<CheckpointManager>(st,
158                                                            i,
159                                                            chkConfig,
160                                                            lastSeqno,
161                                                            lastSnapStart,
162                                                            lastSnapEnd,
163                                                            flusherCb)),
164      failovers(std::move(table)),
165      opsCreate(0),
166      opsUpdate(0),
167      opsDelete(0),
168      opsReject(0),
169      dirtyQueueSize(0),
170      dirtyQueueMem(0),
171      dirtyQueueFill(0),
172      dirtyQueueDrain(0),
173      dirtyQueueAge(0),
174      dirtyQueuePendingWrites(0),
175      metaDataDisk(0),
176      numExpiredItems(0),
177      eviction(evictionPolicy),
178      stats(st),
179      persistenceSeqno(0),
180      numHpVBReqs(0),
181      id(i),
182      state(newState),
183      initialState(initState),
184      purge_seqno(purgeSeqno),
185      takeover_backed_up(false),
186      persisted_snapshot_start(lastSnapStart),
187      persisted_snapshot_end(lastSnapEnd),
188      rollbackItemCount(0),
189      hlc(maxCas,
190          hlcEpochSeqno,
191          std::chrono::microseconds(config.getHlcDriftAheadThresholdUs()),
192          std::chrono::microseconds(config.getHlcDriftBehindThresholdUs())),
193      statPrefix("vb_" + std::to_string(i)),
194      persistenceCheckpointId(0),
195      bucketCreation(false),
196      deferredDeletion(false),
197      deferredDeletionCookie(nullptr),
198      newSeqnoCb(std::move(newSeqnoCb)),
199      manifest(collectionsManifest),
200      mayContainXattrs(mightContainXattrs) {
201    if (config.getConflictResolutionType().compare("lww") == 0) {
202        conflictResolver.reset(new LastWriteWinsResolution());
203    } else {
204        conflictResolver.reset(new RevisionSeqnoResolution());
205    }
206
207    backfill.isBackfillPhase = false;
208    pendingOpsStart = ProcessClock::time_point();
209    stats.coreLocal.get()->memOverhead.fetch_add(
210            sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
211    LOG(EXTENSION_LOG_NOTICE,
212        "VBucket: created vbucket:%" PRIu16
213        " with state:%s "
214        "initialState:%s lastSeqno:%" PRIu64 " lastSnapshot:{%" PRIu64
215        ",%" PRIu64 "} persisted_snapshot:{%" PRIu64 ",%" PRIu64
216        "} max_cas:%" PRIu64 " uuid:%s",
217        id,
218        VBucket::toString(state),
219        VBucket::toString(initialState),
220        lastSeqno,
221        lastSnapStart,
222        lastSnapEnd,
223        persisted_snapshot_start,
224        persisted_snapshot_end,
225        getMaxCas(),
226        failovers ? std::to_string(failovers->getLatestUUID()).c_str() : "<>");
227}
228
229VBucket::~VBucket() {
230    if (!pendingOps.empty()) {
231        LOG(EXTENSION_LOG_WARNING,
232            "~Vbucket(): vbucket:%" PRIu16 " has %ld pending ops",
233            id,
234            pendingOps.size());
235    }
236
237    stats.diskQueueSize.fetch_sub(dirtyQueueSize.load());
238    stats.vbBackfillQueueSize.fetch_sub(getBackfillSize());
239
240    // Clear out the bloomfilter(s)
241    clearFilter();
242
243    stats.coreLocal.get()->memOverhead.fetch_sub(
244            sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
245
246    LOG(EXTENSION_LOG_NOTICE, "Destroying vbucket %d", id);
247}
248
249int64_t VBucket::getHighSeqno() const {
250    return checkpointManager->getHighSeqno();
251}
252
253size_t VBucket::getChkMgrMemUsage() const {
254    return checkpointManager->getMemoryUsage();
255}
256
257size_t VBucket::getChkMgrMemUsageOfUnrefCheckpoints() const {
258    return checkpointManager->getMemoryUsageOfUnrefCheckpoints();
259}
260
261size_t VBucket::getChkMgrMemUsageOverhead() const {
262    return checkpointManager->getMemoryOverhead();
263}
264
265void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
266                         ENGINE_ERROR_CODE code) {
267    std::unique_lock<std::mutex> lh(pendingOpLock);
268
269    if (pendingOpsStart > ProcessClock::time_point()) {
270        auto now = ProcessClock::now();
271        if (now > pendingOpsStart) {
272            auto d = std::chrono::duration_cast<std::chrono::microseconds>(
273                    now - pendingOpsStart);
274            stats.pendingOpsHisto.add(d);
275            atomic_setIfBigger(stats.pendingOpsMaxDuration,
276                               std::make_unsigned<hrtime_t>::type(d.count()));
277        }
278    } else {
279        return;
280    }
281
282    pendingOpsStart = ProcessClock::time_point();
283    stats.pendingOps.fetch_sub(pendingOps.size());
284    atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
285
286    while (!pendingOps.empty()) {
287        const void *pendingOperation = pendingOps.back();
288        pendingOps.pop_back();
289        // We don't want to hold the pendingOpLock when
290        // calling notifyIOComplete.
291        lh.unlock();
292        engine.notifyIOComplete(pendingOperation, code);
293        lh.lock();
294    }
295
296    LOG(EXTENSION_LOG_INFO,
297        "Fired pendings ops for vbucket %" PRIu16 " in state %s",
298        id,
299        VBucket::toString(state));
300}
301
302void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
303
304    if (state == vbucket_state_active) {
305        fireAllOps(engine, ENGINE_SUCCESS);
306    } else if (state == vbucket_state_pending) {
307        // Nothing
308    } else {
309        fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
310    }
311}
312
313VBucket::ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
314    // Fetch up to approxLimit items from rejectQueue, backfill items and
315    // checkpointManager (in that order); then check if we obtained everything
316    // which is available.
317    ItemsToFlush result;
318
319    // First add any items from the rejectQueue.
320    while (result.items.size() < approxLimit && !rejectQueue.empty()) {
321        result.items.push_back(rejectQueue.front());
322        rejectQueue.pop();
323    }
324
325    // Append any 'backfill' items (mutations added by a DCP stream).
326    bool backfillEmpty;
327    {
328        LockHolder lh(backfill.mutex);
329        size_t num_items = 0;
330        while (result.items.size() < approxLimit && !backfill.items.empty()) {
331            result.items.push_back(backfill.items.front());
332            backfill.items.pop();
333            num_items++;
334        }
335        backfillEmpty = backfill.items.empty();
336        stats.vbBackfillQueueSize.fetch_sub(num_items);
337        stats.coreLocal.get()->memOverhead.fetch_sub(num_items *
338                                                     sizeof(queued_item));
339    }
340
341    // Append up to approxLimit checkpoint items outstanding for the persistence
342    // cursor, if we haven't yet hit the limit.
343    // Note that it is only valid to queue a complete checkpoint - this is where
344    // the "approx" in the limit comes from.
345    const auto ckptMgrLimit = approxLimit - result.items.size();
346    CheckpointManager::ItemsForCursor ckptItems;
347    if (ckptMgrLimit > 0) {
348        auto _begin_ = ProcessClock::now();
349        ckptItems = checkpointManager->getItemsForCursor(
350                CheckpointManager::pCursorName, result.items, ckptMgrLimit);
351        result.range = ckptItems.range;
352        stats.persistenceCursorGetItemsHisto.add(
353                std::chrono::duration_cast<std::chrono::microseconds>(
354                        ProcessClock::now() - _begin_));
355    } else {
356        // We haven't got sufficient remaining capacity to read items from
357        // CheckpoitnManager, therefore we must assume that there /could/
358        // more data to follow.
359        ckptItems.moreAvailable = true;
360    }
361
362    // Check if there's any more items remaining.
363    result.moreAvailable =
364            !rejectQueue.empty() || !backfillEmpty || ckptItems.moreAvailable;
365
366    return result;
367}
368
369void VBucket::setState(vbucket_state_t to) {
370    WriterLockHolder wlh(stateLock);
371    setState_UNLOCKED(to, wlh);
372}
373
374void VBucket::setState_UNLOCKED(vbucket_state_t to,
375                                WriterLockHolder &vbStateLock) {
376    vbucket_state_t oldstate = state;
377
378    if (to == vbucket_state_active &&
379        checkpointManager->getOpenCheckpointId() < 2) {
380        checkpointManager->setOpenCheckpointId(2);
381    }
382
383    LOG(EXTENSION_LOG_NOTICE,
384        "VBucket::setState: transitioning vbucket:%" PRIu16 " from:%s to:%s",
385        id,
386        VBucket::toString(oldstate),
387        VBucket::toString(to));
388
389    state = to;
390}
391
392vbucket_state VBucket::getVBucketState() const {
393     auto persisted_range = getPersistedSnapshot();
394
395     return vbucket_state{getState(),
396                          getPersistenceCheckpointId(),
397                          0,
398                          getHighSeqno(),
399                          getPurgeSeqno(),
400                          persisted_range.start,
401                          persisted_range.end,
402                          getMaxCas(),
403                          hlc.getEpochSeqno(),
404                          mightContainXattrs(),
405                          failovers->toJSON()};
406}
407
408
409
410void VBucket::doStatsForQueueing(const Item& qi, size_t itemBytes)
411{
412    ++dirtyQueueSize;
413    dirtyQueueMem.fetch_add(sizeof(Item));
414    ++dirtyQueueFill;
415    dirtyQueueAge.fetch_add(qi.getQueuedTime());
416    dirtyQueuePendingWrites.fetch_add(itemBytes);
417}
418
419void VBucket::doStatsForFlushing(const Item& qi, size_t itemBytes) {
420    --dirtyQueueSize;
421    decrDirtyQueueMem(sizeof(Item));
422    ++dirtyQueueDrain;
423    decrDirtyQueueAge(qi.getQueuedTime());
424    decrDirtyQueuePendingWrites(itemBytes);
425}
426
427void VBucket::incrMetaDataDisk(const Item& qi) {
428    metaDataDisk.fetch_add(qi.getKey().size() + sizeof(ItemMetaData));
429}
430
431void VBucket::decrMetaDataDisk(const Item& qi) {
432    // assume couchstore remove approx this much data from disk
433    metaDataDisk.fetch_sub((qi.getKey().size() + sizeof(ItemMetaData)));
434}
435
436void VBucket::resetStats() {
437    opsCreate.store(0);
438    opsUpdate.store(0);
439    opsDelete.store(0);
440    opsReject.store(0);
441
442    stats.diskQueueSize.fetch_sub(dirtyQueueSize.exchange(0));
443    dirtyQueueMem.store(0);
444    dirtyQueueFill.store(0);
445    dirtyQueueAge.store(0);
446    dirtyQueuePendingWrites.store(0);
447    dirtyQueueDrain.store(0);
448
449    hlc.resetStats();
450}
451
452uint64_t VBucket::getQueueAge() {
453    uint64_t currDirtyQueueAge = dirtyQueueAge.load(std::memory_order_relaxed);
454    rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
455    if (currentAge < currDirtyQueueAge) {
456        return 0;
457    }
458    return (currentAge - currDirtyQueueAge) * 1000;
459}
460
461template <typename T>
462void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
463                      const void *c) {
464    std::string stat = statPrefix;
465    if (nm != NULL) {
466        add_prefixed_stat(statPrefix, nm, val, add_stat, c);
467    } else {
468        add_casted_stat(statPrefix.data(), val, add_stat, c);
469    }
470}
471
472void VBucket::handlePreExpiry(const std::unique_lock<std::mutex>& hbl,
473                              StoredValue& v) {
474    value_t value = v.getValue();
475    if (value) {
476        std::unique_ptr<Item> itm(v.toItem(false, id));
477        item_info itm_info;
478        EventuallyPersistentEngine* engine = ObjectRegistry::getCurrentEngine();
479        itm_info =
480                itm->toItemInfo(failovers->getLatestUUID(), getHLCEpochSeqno());
481        value_t new_val(Blob::Copy(*value));
482        itm->replaceValue(new_val.get());
483        itm->setDataType(v.getDatatype());
484
485        SERVER_HANDLE_V1* sapi = engine->getServerApi();
486        /* TODO: In order to minimize allocations, the callback needs to
487         * allocate an item whose value size will be exactly the size of the
488         * value after pre-expiry is performed.
489         */
490        if (sapi->document->pre_expiry(itm_info)) {
491            Item new_item(v.getKey(),
492                          v.getFlags(),
493                          v.getExptime(),
494                          itm_info.value[0].iov_base,
495                          itm_info.value[0].iov_len,
496                          itm_info.datatype,
497                          v.getCas(),
498                          v.getBySeqno(),
499                          id,
500                          v.getRevSeqno());
501
502            new_item.setNRUValue(v.getNRUValue());
503            new_item.setFreqCounterValue(v.getFreqCounterValue());
504            new_item.setDeleted();
505            ht.unlocked_updateStoredValue(hbl, v, new_item);
506        }
507    }
508}
509
510bool VBucket::addPendingOp(const void* cookie) {
511    LockHolder lh(pendingOpLock);
512    if (state != vbucket_state_pending) {
513        // State transitioned while we were waiting.
514        return false;
515    }
516    // Start a timer when enqueuing the first client.
517    if (pendingOps.empty()) {
518        pendingOpsStart = ProcessClock::now();
519    }
520    pendingOps.push_back(cookie);
521    ++stats.pendingOps;
522    ++stats.pendingOpsTotal;
523    return true;
524}
525
526uint64_t VBucket::getPersistenceCheckpointId() const {
527    return persistenceCheckpointId.load();
528}
529
530void VBucket::setPersistenceCheckpointId(uint64_t checkpointId) {
531    persistenceCheckpointId.store(checkpointId);
532}
533
534void VBucket::markDirty(const DocKey& key) {
535    auto hbl = ht.getLockedBucket(key);
536    StoredValue* v = ht.unlocked_find(
537            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::Yes);
538    if (v) {
539        v->markDirty();
540    } else {
541        LOG(EXTENSION_LOG_WARNING, "markDirty: Error marking dirty, a key "
542            "is missing from vb:%" PRIu16, id);
543    }
544}
545
546bool VBucket::isResidentRatioUnderThreshold(float threshold) {
547    if (eviction != FULL_EVICTION) {
548        throw std::invalid_argument("VBucket::isResidentRatioUnderThreshold: "
549                "policy (which is " + std::to_string(eviction) +
550                ") must be FULL_EVICTION");
551    }
552    size_t num_items = getNumItems();
553    size_t num_non_resident_items = getNumNonResidentItems();
554    float ratio =
555            num_items
556                    ? ((float)(num_items - num_non_resident_items) / num_items)
557                    : 0.0;
558    if (threshold >= ratio) {
559        return true;
560    } else {
561        return false;
562    }
563}
564
565void VBucket::createFilter(size_t key_count, double probability) {
566    // Create the actual bloom filter upon vbucket creation during
567    // scenarios:
568    //      - Bucket creation
569    //      - Rebalance
570    LockHolder lh(bfMutex);
571    if (bFilter == nullptr && tempFilter == nullptr) {
572        bFilter = std::make_unique<BloomFilter>(key_count, probability,
573                                        BFILTER_ENABLED);
574    } else {
575        LOG(EXTENSION_LOG_WARNING, "(vb %" PRIu16 ") Bloom filter / Temp filter"
576            " already exist!", id);
577    }
578}
579
580void VBucket::initTempFilter(size_t key_count, double probability) {
581    // Create a temp bloom filter with status as COMPACTING,
582    // if the main filter is found to exist, set its state to
583    // COMPACTING as well.
584    LockHolder lh(bfMutex);
585    tempFilter = std::make_unique<BloomFilter>(key_count, probability,
586                                     BFILTER_COMPACTING);
587    if (bFilter) {
588        bFilter->setStatus(BFILTER_COMPACTING);
589    }
590}
591
592void VBucket::addToFilter(const DocKey& key) {
593    LockHolder lh(bfMutex);
594    if (bFilter) {
595        bFilter->addKey(key);
596    }
597
598    // If the temp bloom filter is not found to be NULL,
599    // it means that compaction is running on the particular
600    // vbucket. Therefore add the key to the temp filter as
601    // well, as once compaction completes the temp filter
602    // will replace the main bloom filter.
603    if (tempFilter) {
604        tempFilter->addKey(key);
605    }
606}
607
608bool VBucket::maybeKeyExistsInFilter(const DocKey& key) {
609    LockHolder lh(bfMutex);
610    if (bFilter) {
611        return bFilter->maybeKeyExists(key);
612    } else {
613        // If filter doesn't exist, allow the BgFetch to go through.
614        return true;
615    }
616}
617
618bool VBucket::isTempFilterAvailable() {
619    LockHolder lh(bfMutex);
620    if (tempFilter &&
621        (tempFilter->getStatus() == BFILTER_COMPACTING ||
622         tempFilter->getStatus() == BFILTER_ENABLED)) {
623        return true;
624    } else {
625        return false;
626    }
627}
628
629void VBucket::addToTempFilter(const DocKey& key) {
630    // Keys will be added to only the temp filter during
631    // compaction.
632    LockHolder lh(bfMutex);
633    if (tempFilter) {
634        tempFilter->addKey(key);
635    }
636}
637
638void VBucket::swapFilter() {
639    // Delete the main bloom filter and replace it with
640    // the temp filter that was populated during compaction,
641    // only if the temp filter's state is found to be either at
642    // COMPACTING or ENABLED (if in the case the user enables
643    // bloomfilters for some reason while compaction was running).
644    // Otherwise, it indicates that the filter's state was
645    // possibly disabled during compaction, therefore clear out
646    // the temp filter. If it gets enabled at some point, a new
647    // bloom filter will be made available after the next
648    // compaction.
649
650    LockHolder lh(bfMutex);
651    if (tempFilter) {
652        bFilter.reset();
653
654        if (tempFilter->getStatus() == BFILTER_COMPACTING ||
655             tempFilter->getStatus() == BFILTER_ENABLED) {
656            bFilter = std::move(tempFilter);
657            bFilter->setStatus(BFILTER_ENABLED);
658        }
659        tempFilter.reset();
660    }
661}
662
663void VBucket::clearFilter() {
664    LockHolder lh(bfMutex);
665    bFilter.reset();
666    tempFilter.reset();
667}
668
669void VBucket::setFilterStatus(bfilter_status_t to) {
670    LockHolder lh(bfMutex);
671    if (bFilter) {
672        bFilter->setStatus(to);
673    }
674    if (tempFilter) {
675        tempFilter->setStatus(to);
676    }
677}
678
679std::string VBucket::getFilterStatusString() {
680    LockHolder lh(bfMutex);
681    if (bFilter) {
682        return bFilter->getStatusString();
683    } else if (tempFilter) {
684        return tempFilter->getStatusString();
685    } else {
686        return "DOESN'T EXIST";
687    }
688}
689
690size_t VBucket::getFilterSize() {
691    LockHolder lh(bfMutex);
692    if (bFilter) {
693        return bFilter->getFilterSize();
694    } else {
695        return 0;
696    }
697}
698
699size_t VBucket::getNumOfKeysInFilter() {
700    LockHolder lh(bfMutex);
701    if (bFilter) {
702        return bFilter->getNumOfKeysInFilter();
703    } else {
704        return 0;
705    }
706}
707
708VBNotifyCtx VBucket::queueDirty(
709        StoredValue& v,
710        const GenerateBySeqno generateBySeqno,
711        const GenerateCas generateCas,
712        const bool isBackfillItem,
713        PreLinkDocumentContext* preLinkDocumentContext) {
714    VBNotifyCtx notifyCtx;
715
716    queued_item qi(v.toItem(false, getId()));
717
718    // MB-27457: Timestamp deletes only when they don't already have a timestamp
719    // assigned. This is here to ensure all deleted items have a timestamp which
720    // our tombstone purger can use to determine which tombstones to purge. A
721    // DCP replicated or deleteWithMeta created delete may already have a time
722    // assigned to it.
723    if (qi->isDeleted() && qi->getDeleteTime() == 0) {
724        qi->setExpTime(ep_real_time());
725    }
726
727    if (!mightContainXattrs() && mcbp::datatype::is_xattr(v.getDatatype())) {
728        setMightContainXattrs();
729    }
730
731    if (isBackfillItem) {
732        queueBackfillItem(qi, generateBySeqno);
733        notifyCtx.notifyFlusher = true;
734        /* During backfill on a TAP receiver we need to update the snapshot
735         range in the checkpoint. Has to be done here because in case of TAP
736         backfill, above, we use vb.queueBackfillItem() instead of
737         vb.checkpointManager->queueDirty() */
738        if (generateBySeqno == GenerateBySeqno::Yes) {
739            checkpointManager->resetSnapshotRange();
740        }
741    } else {
742        notifyCtx.notifyFlusher =
743                checkpointManager->queueDirty(*this,
744                                              qi,
745                                              generateBySeqno,
746                                              generateCas,
747                                              preLinkDocumentContext);
748        notifyCtx.notifyReplication = true;
749        if (GenerateCas::Yes == generateCas) {
750            v.setCas(qi->getCas());
751        }
752    }
753
754    v.setBySeqno(qi->getBySeqno());
755    notifyCtx.bySeqno = qi->getBySeqno();
756
757    return notifyCtx;
758}
759
760StoredValue* VBucket::fetchValidValue(HashTable::HashBucketLock& hbl,
761                                      const DocKey& key,
762                                      WantsDeleted wantsDeleted,
763                                      TrackReference trackReference,
764                                      QueueExpired queueExpired) {
765    if (!hbl.getHTLock()) {
766        throw std::logic_error(
767                "Hash bucket lock not held in "
768                "VBucket::fetchValidValue() for hash bucket: " +
769                std::to_string(hbl.getBucketNum()) + "for key: " +
770                std::string(reinterpret_cast<const char*>(key.data()),
771                            key.size()));
772    }
773    StoredValue* v = ht.unlocked_find(
774            key, hbl.getBucketNum(), wantsDeleted, trackReference);
775    if (v && !v->isDeleted() && !v->isTempItem()) {
776        // In the deleted case, we ignore expiration time.
777        if (v->isExpired(ep_real_time())) {
778            if (getState() != vbucket_state_active) {
779                return wantsDeleted == WantsDeleted::Yes ? v : NULL;
780            }
781
782            // queueDirty only allowed on active VB
783            if (queueExpired == QueueExpired::Yes &&
784                getState() == vbucket_state_active) {
785                incExpirationStat(ExpireBy::Access);
786                handlePreExpiry(hbl.getHTLock(), *v);
787                VBNotifyCtx notifyCtx;
788                std::tie(std::ignore, v, notifyCtx) =
789                        processExpiredItem(hbl, *v);
790                notifyNewSeqno(notifyCtx);
791            }
792            return wantsDeleted == WantsDeleted::Yes ? v : NULL;
793        }
794    }
795    return v;
796}
797
798void VBucket::incExpirationStat(const ExpireBy source) {
799    switch (source) {
800    case ExpireBy::Pager:
801        ++stats.expired_pager;
802        break;
803    case ExpireBy::Compactor:
804        ++stats.expired_compactor;
805        break;
806    case ExpireBy::Access:
807        ++stats.expired_access;
808        break;
809    }
810    ++numExpiredItems;
811}
812
813MutationStatus VBucket::setFromInternal(Item& itm) {
814    if (!hasMemoryForStoredValue(stats, itm, false)) {
815        return MutationStatus::NoMem;
816    }
817    return ht.set(itm);
818}
819
820cb::StoreIfStatus VBucket::callPredicate(cb::StoreIfPredicate predicate,
821                                         StoredValue* v) {
822    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
823    if (v) {
824        auto info = v->getItemInfo(failovers->getLatestUUID());
825        storeIfStatus = predicate(info, getInfo());
826        // No no, you can't ask for it again
827        if (storeIfStatus == cb::StoreIfStatus::GetItemInfo &&
828            info.is_initialized()) {
829            throw std::logic_error(
830                    "VBucket::callPredicate invalid result of GetItemInfo");
831        }
832    } else {
833        storeIfStatus = predicate({/*no info*/}, getInfo());
834    }
835
836    if (storeIfStatus == cb::StoreIfStatus::GetItemInfo &&
837        eviction == VALUE_ONLY) {
838        // We're VE, if we don't have, we don't have it.
839        storeIfStatus = cb::StoreIfStatus::Continue;
840    }
841
842    return storeIfStatus;
843}
844
845ENGINE_ERROR_CODE VBucket::set(Item& itm,
846                               const void* cookie,
847                               EventuallyPersistentEngine& engine,
848                               const int bgFetchDelay,
849                               cb::StoreIfPredicate predicate) {
850    bool cas_op = (itm.getCas() != 0);
851    auto hbl = ht.getLockedBucket(itm.getKey());
852    StoredValue* v = ht.unlocked_find(itm.getKey(),
853                                      hbl.getBucketNum(),
854                                      WantsDeleted::Yes,
855                                      TrackReference::No);
856
857    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
858    if (predicate &&
859        (storeIfStatus = callPredicate(predicate, v)) ==
860                cb::StoreIfStatus::Fail) {
861        return ENGINE_PREDICATE_FAILED;
862    }
863
864    if (v && v->isLocked(ep_current_time()) &&
865        (getState() == vbucket_state_replica ||
866         getState() == vbucket_state_pending)) {
867        v->unlock();
868    }
869
870    bool maybeKeyExists = true;
871    // If we didn't find a valid item then check the bloom filter, but only
872    // if we're full-eviction with a CAS operation or a have a predicate that
873    // requires the item's info
874    if ((v == nullptr || v->isTempInitialItem()) &&
875        (eviction == FULL_EVICTION) &&
876        ((itm.getCas() != 0) ||
877         storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
878        // Check Bloomfilter's prediction
879        if (!maybeKeyExistsInFilter(itm.getKey())) {
880            maybeKeyExists = false;
881        }
882    }
883
884    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
885    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
886                               GenerateCas::Yes,
887                               TrackCasDrift::No,
888                               /*isBackfillItem*/ false,
889                               &preLinkDocumentContext);
890
891    MutationStatus status;
892    boost::optional<VBNotifyCtx> notifyCtx;
893    std::tie(status, notifyCtx) = processSet(hbl,
894                                             v,
895                                             itm,
896                                             itm.getCas(),
897                                             /*allowExisting*/ true,
898                                             /*hashMetaData*/ false,
899                                             queueItmCtx,
900                                             storeIfStatus,
901                                             maybeKeyExists);
902
903    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
904    switch (status) {
905    case MutationStatus::NoMem:
906        ret = ENGINE_ENOMEM;
907        break;
908    case MutationStatus::InvalidCas:
909        ret = ENGINE_KEY_EEXISTS;
910        break;
911    case MutationStatus::IsLocked:
912        ret = ENGINE_LOCKED;
913        break;
914    case MutationStatus::NotFound:
915        if (cas_op) {
916            ret = ENGINE_KEY_ENOENT;
917            break;
918        }
919    // FALLTHROUGH
920    case MutationStatus::WasDirty:
921    // Even if the item was dirty, push it into the vbucket's open
922    // checkpoint.
923    case MutationStatus::WasClean:
924        notifyNewSeqno(*notifyCtx);
925
926        itm.setBySeqno(v->getBySeqno());
927        itm.setCas(v->getCas());
928        break;
929    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
930        // +
931        // full eviction.
932        if (v) {
933            // temp item is already created. Simply schedule a bg fetch job
934            hbl.getHTLock().unlock();
935            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
936            return ENGINE_EWOULDBLOCK;
937        }
938        ret = addTempItemAndBGFetch(
939                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
940        break;
941    }
942    }
943
944    return ret;
945}
946
947ENGINE_ERROR_CODE VBucket::replace(
948        Item& itm,
949        const void* cookie,
950        EventuallyPersistentEngine& engine,
951        const int bgFetchDelay,
952        cb::StoreIfPredicate predicate,
953        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
954    auto hbl = ht.getLockedBucket(itm.getKey());
955    StoredValue* v = ht.unlocked_find(itm.getKey(),
956                                      hbl.getBucketNum(),
957                                      WantsDeleted::Yes,
958                                      TrackReference::No);
959
960    cb::StoreIfStatus storeIfStatus = cb::StoreIfStatus::Continue;
961    if (predicate &&
962        (storeIfStatus = callPredicate(predicate, v)) ==
963                cb::StoreIfStatus::Fail) {
964        return ENGINE_PREDICATE_FAILED;
965    }
966
967    if (v) {
968        if (isLogicallyNonExistent(*v, readHandle)) {
969            ht.cleanupIfTemporaryItem(hbl, *v);
970            return ENGINE_KEY_ENOENT;
971        }
972
973        MutationStatus mtype;
974        boost::optional<VBNotifyCtx> notifyCtx;
975        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
976            mtype = MutationStatus::NeedBgFetch;
977        } else {
978            PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
979            VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
980                                       GenerateCas::Yes,
981                                       TrackCasDrift::No,
982                                       /*isBackfillItem*/ false,
983                                       &preLinkDocumentContext);
984            std::tie(mtype, notifyCtx) = processSet(hbl,
985                                                    v,
986                                                    itm,
987                                                    0,
988                                                    /*allowExisting*/ true,
989                                                    /*hasMetaData*/ false,
990                                                    queueItmCtx,
991                                                    storeIfStatus);
992        }
993
994        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
995        switch (mtype) {
996        case MutationStatus::NoMem:
997            ret = ENGINE_ENOMEM;
998            break;
999        case MutationStatus::IsLocked:
1000            ret = ENGINE_LOCKED;
1001            break;
1002        case MutationStatus::InvalidCas:
1003        case MutationStatus::NotFound:
1004            ret = ENGINE_NOT_STORED;
1005            break;
1006        // FALLTHROUGH
1007        case MutationStatus::WasDirty:
1008        // Even if the item was dirty, push it into the vbucket's open
1009        // checkpoint.
1010        case MutationStatus::WasClean:
1011            notifyNewSeqno(*notifyCtx);
1012
1013            itm.setBySeqno(v->getBySeqno());
1014            itm.setCas(v->getCas());
1015            break;
1016        case MutationStatus::NeedBgFetch: {
1017            // temp item is already created. Simply schedule a bg fetch job
1018            hbl.getHTLock().unlock();
1019            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1020            ret = ENGINE_EWOULDBLOCK;
1021            break;
1022        }
1023        }
1024
1025        return ret;
1026    } else {
1027        if (eviction == VALUE_ONLY) {
1028            return ENGINE_KEY_ENOENT;
1029        }
1030
1031        if (maybeKeyExistsInFilter(itm.getKey())) {
1032            return addTempItemAndBGFetch(
1033                    hbl, itm.getKey(), cookie, engine, bgFetchDelay, false);
1034        } else {
1035            // As bloomfilter predicted that item surely doesn't exist
1036            // on disk, return ENOENT for replace().
1037            return ENGINE_KEY_ENOENT;
1038        }
1039    }
1040}
1041
1042ENGINE_ERROR_CODE VBucket::addBackfillItem(Item& itm,
1043                                           const GenerateBySeqno genBySeqno) {
1044    auto hbl = ht.getLockedBucket(itm.getKey());
1045    StoredValue* v = ht.unlocked_find(itm.getKey(),
1046                                      hbl.getBucketNum(),
1047                                      WantsDeleted::Yes,
1048                                      TrackReference::No);
1049
1050    // Note that this function is only called on replica or pending vbuckets.
1051    if (v && v->isLocked(ep_current_time())) {
1052        v->unlock();
1053    }
1054
1055    VBQueueItemCtx queueItmCtx(genBySeqno,
1056                               GenerateCas::No,
1057                               TrackCasDrift::No,
1058                               /*isBackfillItem*/ true,
1059                               nullptr /* No pre link should happen */);
1060    MutationStatus status;
1061    boost::optional<VBNotifyCtx> notifyCtx;
1062    std::tie(status, notifyCtx) = processSet(hbl,
1063                                             v,
1064                                             itm,
1065                                             0,
1066                                             /*allowExisting*/ true,
1067                                             /*hasMetaData*/ true,
1068                                             queueItmCtx,
1069                                             {/*no predicate*/});
1070
1071    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1072    switch (status) {
1073    case MutationStatus::NoMem:
1074        ret = ENGINE_ENOMEM;
1075        break;
1076    case MutationStatus::InvalidCas:
1077    case MutationStatus::IsLocked:
1078        ret = ENGINE_KEY_EEXISTS;
1079        break;
1080    case MutationStatus::WasDirty:
1081    // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
1082    // set correctly, and also the sequence numbers are ordered correctly.
1083    // (MB-14003)
1084    case MutationStatus::NotFound:
1085    // FALLTHROUGH
1086    case MutationStatus::WasClean: {
1087        setMaxCas(v->getCas());
1088        // we unlock ht lock here because we want to avoid potential lock
1089        // inversions arising from notifyNewSeqno() call
1090        hbl.getHTLock().unlock();
1091        notifyNewSeqno(*notifyCtx);
1092    } break;
1093    case MutationStatus::NeedBgFetch:
1094        throw std::logic_error(
1095                "VBucket::addBackfillItem: "
1096                "SET on a non-active vbucket should not require a "
1097                "bg_metadata_fetch.");
1098    }
1099
1100    return ret;
1101}
1102
1103ENGINE_ERROR_CODE VBucket::setWithMeta(
1104        Item& itm,
1105        uint64_t cas,
1106        uint64_t* seqno,
1107        const void* cookie,
1108        EventuallyPersistentEngine& engine,
1109        int bgFetchDelay,
1110        CheckConflicts checkConflicts,
1111        bool allowExisting,
1112        GenerateBySeqno genBySeqno,
1113        GenerateCas genCas,
1114        bool isReplication,
1115        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1116    auto hbl = ht.getLockedBucket(itm.getKey());
1117    StoredValue* v = ht.unlocked_find(itm.getKey(),
1118                                      hbl.getBucketNum(),
1119                                      WantsDeleted::Yes,
1120                                      TrackReference::No);
1121
1122    bool maybeKeyExists = true;
1123
1124    // Effectively ignore logically deleted keys, they cannot stop the op
1125    if (v && readHandle.isLogicallyDeleted(v->getBySeqno())) {
1126        // v is not really here, operate like it's not and skip conflict checks
1127        checkConflicts = CheckConflicts::No;
1128        // And ensure ADD_W_META works like SET_W_META, just overwrite existing
1129        allowExisting = true;
1130    }
1131
1132    if (checkConflicts == CheckConflicts::Yes) {
1133        if (v) {
1134            if (v->isTempInitialItem()) {
1135                bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1136                return ENGINE_EWOULDBLOCK;
1137            }
1138
1139            if (!(conflictResolver->resolve(*v,
1140                                            itm.getMetaData(),
1141                                            itm.getDataType(),
1142                                            itm.isDeleted()))) {
1143                ++stats.numOpsSetMetaResolutionFailed;
1144                // If the existing item happens to be a temporary item,
1145                // delete the item to save memory in the hash table
1146                if (v->isTempItem()) {
1147                    deleteStoredValue(hbl, *v);
1148                }
1149                return ENGINE_KEY_EEXISTS;
1150            }
1151        } else {
1152            if (maybeKeyExistsInFilter(itm.getKey())) {
1153                return addTempItemAndBGFetch(hbl,
1154                                             itm.getKey(),
1155                                             cookie,
1156                                             engine,
1157                                             bgFetchDelay,
1158                                             true,
1159                                             isReplication);
1160            } else {
1161                maybeKeyExists = false;
1162            }
1163        }
1164    } else {
1165        if (eviction == FULL_EVICTION) {
1166            // Check Bloomfilter's prediction
1167            if (!maybeKeyExistsInFilter(itm.getKey())) {
1168                maybeKeyExists = false;
1169            }
1170        }
1171    }
1172
1173    if (v && v->isLocked(ep_current_time()) &&
1174        (getState() == vbucket_state_replica ||
1175         getState() == vbucket_state_pending)) {
1176        v->unlock();
1177    }
1178
1179    VBQueueItemCtx queueItmCtx(genBySeqno,
1180                               genCas,
1181                               TrackCasDrift::Yes,
1182                               /*isBackfillItem*/ false,
1183                               nullptr /* No pre link step needed */);
1184    MutationStatus status;
1185    boost::optional<VBNotifyCtx> notifyCtx;
1186    std::tie(status, notifyCtx) = processSet(hbl,
1187                                             v,
1188                                             itm,
1189                                             cas,
1190                                             allowExisting,
1191                                             true,
1192                                             queueItmCtx,
1193                                             {/*no predicate*/},
1194                                             maybeKeyExists,
1195                                             isReplication);
1196
1197    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1198    switch (status) {
1199    case MutationStatus::NoMem:
1200        ret = ENGINE_ENOMEM;
1201        break;
1202    case MutationStatus::InvalidCas:
1203        ret = ENGINE_KEY_EEXISTS;
1204        break;
1205    case MutationStatus::IsLocked:
1206        ret = ENGINE_LOCKED;
1207        break;
1208    case MutationStatus::WasDirty:
1209    case MutationStatus::WasClean: {
1210        if (seqno) {
1211            *seqno = static_cast<uint64_t>(v->getBySeqno());
1212        }
1213        // we unlock ht lock here because we want to avoid potential lock
1214        // inversions arising from notifyNewSeqno() call
1215        hbl.getHTLock().unlock();
1216        notifyNewSeqno(*notifyCtx);
1217    } break;
1218    case MutationStatus::NotFound:
1219        ret = ENGINE_KEY_ENOENT;
1220        break;
1221    case MutationStatus::NeedBgFetch: { // CAS operation with non-resident item
1222        // + full eviction.
1223        if (v) { // temp item is already created. Simply schedule a
1224            hbl.getHTLock().unlock(); // bg fetch job.
1225            bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1226            return ENGINE_EWOULDBLOCK;
1227        }
1228        ret = addTempItemAndBGFetch(hbl,
1229                                    itm.getKey(),
1230                                    cookie,
1231                                    engine,
1232                                    bgFetchDelay,
1233                                    true,
1234                                    isReplication);
1235    }
1236    }
1237
1238    return ret;
1239}
1240
1241ENGINE_ERROR_CODE VBucket::deleteItem(
1242        uint64_t& cas,
1243        const void* cookie,
1244        EventuallyPersistentEngine& engine,
1245        const int bgFetchDelay,
1246        ItemMetaData* itemMeta,
1247        mutation_descr_t& mutInfo,
1248        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1249    auto hbl = ht.getLockedBucket(readHandle.getKey());
1250    StoredValue* v = ht.unlocked_find(readHandle.getKey(),
1251                                      hbl.getBucketNum(),
1252                                      WantsDeleted::Yes,
1253                                      TrackReference::No);
1254
1255    if (!v || v->isDeleted() || v->isTempItem() ||
1256        readHandle.isLogicallyDeleted(v->getBySeqno())) {
1257        if (eviction == VALUE_ONLY) {
1258            return ENGINE_KEY_ENOENT;
1259        } else { // Full eviction.
1260            if (!v) { // Item might be evicted from cache.
1261                if (maybeKeyExistsInFilter(readHandle.getKey())) {
1262                    return addTempItemAndBGFetch(hbl,
1263                                                 readHandle.getKey(),
1264                                                 cookie,
1265                                                 engine,
1266                                                 bgFetchDelay,
1267                                                 true);
1268                } else {
1269                    // As bloomfilter predicted that item surely doesn't
1270                    // exist on disk, return ENOENT for deleteItem().
1271                    return ENGINE_KEY_ENOENT;
1272                }
1273            } else if (v->isTempInitialItem()) {
1274                hbl.getHTLock().unlock();
1275                bgFetch(readHandle.getKey(),
1276                        cookie,
1277                        engine,
1278                        bgFetchDelay,
1279                        true);
1280                return ENGINE_EWOULDBLOCK;
1281            } else { // Non-existent or deleted key.
1282                if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1283                    // Delete a temp non-existent item to ensure that
1284                    // if a delete were issued over an item that doesn't
1285                    // exist, then we don't preserve a temp item.
1286                    deleteStoredValue(hbl, *v);
1287                }
1288                return ENGINE_KEY_ENOENT;
1289            }
1290        }
1291    }
1292
1293    if (v->isLocked(ep_current_time()) &&
1294        (getState() == vbucket_state_replica ||
1295         getState() == vbucket_state_pending)) {
1296        v->unlock();
1297    }
1298
1299    if (itemMeta != nullptr) {
1300        itemMeta->cas = v->getCas();
1301    }
1302
1303    MutationStatus delrv;
1304    boost::optional<VBNotifyCtx> notifyCtx;
1305    if (v->isExpired(ep_real_time())) {
1306        std::tie(delrv, v, notifyCtx) = processExpiredItem(hbl, *v);
1307    } else {
1308        ItemMetaData metadata;
1309        metadata.revSeqno = v->getRevSeqno() + 1;
1310        std::tie(delrv, v, notifyCtx) =
1311                processSoftDelete(hbl,
1312                                  *v,
1313                                  cas,
1314                                  metadata,
1315                                  VBQueueItemCtx(GenerateBySeqno::Yes,
1316                                                 GenerateCas::Yes,
1317                                                 TrackCasDrift::No,
1318                                                 /*isBackfillItem*/ false,
1319                                                 nullptr /* no pre link */),
1320                                  /*use_meta*/ false,
1321                                  /*bySeqno*/ v->getBySeqno());
1322    }
1323
1324    uint64_t seqno = 0;
1325    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1326    switch (delrv) {
1327    case MutationStatus::NoMem:
1328        ret = ENGINE_ENOMEM;
1329        break;
1330    case MutationStatus::InvalidCas:
1331        ret = ENGINE_KEY_EEXISTS;
1332        break;
1333    case MutationStatus::IsLocked:
1334        ret = ENGINE_LOCKED_TMPFAIL;
1335        break;
1336    case MutationStatus::NotFound:
1337        ret = ENGINE_KEY_ENOENT;
1338    /* Fallthrough:
1339     * A NotFound return value at this point indicates that the
1340     * item has expired. But, a deletion still needs to be queued
1341     * for the item in order to persist it.
1342     */
1343    case MutationStatus::WasClean:
1344    case MutationStatus::WasDirty:
1345        if (itemMeta != nullptr) {
1346            itemMeta->revSeqno = v->getRevSeqno();
1347            itemMeta->flags = v->getFlags();
1348            itemMeta->exptime = v->getExptime();
1349        }
1350
1351        notifyNewSeqno(*notifyCtx);
1352        seqno = static_cast<uint64_t>(v->getBySeqno());
1353        cas = v->getCas();
1354
1355        if (delrv != MutationStatus::NotFound) {
1356            mutInfo.seqno = seqno;
1357            mutInfo.vbucket_uuid = failovers->getLatestUUID();
1358            if (itemMeta != nullptr) {
1359                itemMeta->cas = v->getCas();
1360            }
1361        }
1362        break;
1363    case MutationStatus::NeedBgFetch:
1364        // We already figured out if a bg fetch is requred for a full-evicted
1365        // item above.
1366        throw std::logic_error(
1367                "VBucket::deleteItem: "
1368                "Unexpected NEEDS_BG_FETCH from processSoftDelete");
1369    }
1370    return ret;
1371}
1372
1373ENGINE_ERROR_CODE VBucket::deleteWithMeta(
1374        uint64_t& cas,
1375        uint64_t* seqno,
1376        const void* cookie,
1377        EventuallyPersistentEngine& engine,
1378        int bgFetchDelay,
1379        CheckConflicts checkConflicts,
1380        const ItemMetaData& itemMeta,
1381        bool backfill,
1382        GenerateBySeqno genBySeqno,
1383        GenerateCas generateCas,
1384        uint64_t bySeqno,
1385        bool isReplication,
1386        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1387    const auto& key = readHandle.getKey();
1388    auto hbl = ht.getLockedBucket(key);
1389    StoredValue* v = ht.unlocked_find(
1390            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1391
1392    if (v && readHandle.isLogicallyDeleted(v->getBySeqno())) {
1393        return ENGINE_KEY_ENOENT;
1394    }
1395
1396    // Need conflict resolution?
1397    if (checkConflicts == CheckConflicts::Yes) {
1398        if (v) {
1399            if (v->isTempInitialItem()) {
1400                bgFetch(key, cookie, engine, bgFetchDelay, true);
1401                return ENGINE_EWOULDBLOCK;
1402            }
1403
1404            if (!(conflictResolver->resolve(*v,
1405                                            itemMeta,
1406                                            PROTOCOL_BINARY_RAW_BYTES,
1407                                            true))) {
1408                ++stats.numOpsDelMetaResolutionFailed;
1409                return ENGINE_KEY_EEXISTS;
1410            }
1411        } else {
1412            // Item is 1) deleted or not existent in the value eviction case OR
1413            // 2) deleted or evicted in the full eviction.
1414            if (maybeKeyExistsInFilter(key)) {
1415                return addTempItemAndBGFetch(hbl,
1416                                             key,
1417                                             cookie,
1418                                             engine,
1419                                             bgFetchDelay,
1420                                             true,
1421                                             isReplication);
1422            } else {
1423                // Even though bloomfilter predicted that item doesn't exist
1424                // on disk, we must put this delete on disk if the cas is valid.
1425                TempAddStatus rv = addTempStoredValue(hbl, key, isReplication);
1426                if (rv == TempAddStatus::NoMem) {
1427                    return ENGINE_ENOMEM;
1428                }
1429                v = ht.unlocked_find(key,
1430                                     hbl.getBucketNum(),
1431                                     WantsDeleted::Yes,
1432                                     TrackReference::No);
1433                v->setTempDeleted();
1434            }
1435        }
1436    } else {
1437        if (!v) {
1438            // We should always try to persist a delete here.
1439            TempAddStatus rv = addTempStoredValue(hbl, key, isReplication);
1440            if (rv == TempAddStatus::NoMem) {
1441                return ENGINE_ENOMEM;
1442            }
1443            v = ht.unlocked_find(key,
1444                                 hbl.getBucketNum(),
1445                                 WantsDeleted::Yes,
1446                                 TrackReference::No);
1447            v->setTempDeleted();
1448            v->setCas(cas);
1449        } else if (v->isTempInitialItem()) {
1450            v->setTempDeleted();
1451            v->setCas(cas);
1452        }
1453    }
1454
1455    if (v && v->isLocked(ep_current_time()) &&
1456        (getState() == vbucket_state_replica ||
1457         getState() == vbucket_state_pending)) {
1458        v->unlock();
1459    }
1460
1461    MutationStatus delrv;
1462    boost::optional<VBNotifyCtx> notifyCtx;
1463    bool metaBgFetch = true;
1464    if (!v) {
1465        if (eviction == FULL_EVICTION) {
1466            delrv = MutationStatus::NeedBgFetch;
1467        } else {
1468            delrv = MutationStatus::NotFound;
1469        }
1470    } else if (v->isTempDeletedItem() &&
1471               mcbp::datatype::is_xattr(v->getDatatype()) && !v->isResident()) {
1472        // MB-25671: A temp deleted xattr with no value must be fetched before
1473        // the deleteWithMeta can be applied.
1474        delrv = MutationStatus::NeedBgFetch;
1475        metaBgFetch = false;
1476    } else {
1477        VBQueueItemCtx queueItmCtx(genBySeqno,
1478                                   generateCas,
1479                                   TrackCasDrift::Yes,
1480                                   backfill,
1481                                   nullptr /* No pre link step needed */);
1482
1483        // system xattrs must remain, however no need to prune xattrs if this is
1484        // a replication call, the active has done this and we must just store
1485        // what we're given
1486        std::unique_ptr<Item> itm;
1487        if (!isReplication && mcbp::datatype::is_xattr(v->getDatatype()) &&
1488            (itm = pruneXattrDocument(*v, itemMeta))) {
1489            // A new item has been generated and must be given a new seqno
1490            queueItmCtx.genBySeqno = GenerateBySeqno::Yes;
1491
1492            std::tie(v, delrv, notifyCtx) =
1493                    updateStoredValue(hbl, *v, *itm, queueItmCtx);
1494        } else {
1495            std::tie(delrv, v, notifyCtx) = processSoftDelete(hbl,
1496                                                              *v,
1497                                                              cas,
1498                                                              itemMeta,
1499                                                              queueItmCtx,
1500                                                              /*use_meta*/ true,
1501                                                              bySeqno);
1502        }
1503    }
1504    cas = v ? v->getCas() : 0;
1505
1506    switch (delrv) {
1507    case MutationStatus::NoMem:
1508        return ENGINE_ENOMEM;
1509    case MutationStatus::InvalidCas:
1510        return ENGINE_KEY_EEXISTS;
1511    case MutationStatus::IsLocked:
1512        return ENGINE_LOCKED_TMPFAIL;
1513    case MutationStatus::NotFound:
1514        return ENGINE_KEY_ENOENT;
1515    case MutationStatus::WasDirty:
1516    case MutationStatus::WasClean: {
1517        if (seqno) {
1518            *seqno = static_cast<uint64_t>(v->getBySeqno());
1519        }
1520        // we unlock ht lock here because we want to avoid potential lock
1521        // inversions arising from notifyNewSeqno() call
1522        hbl.getHTLock().unlock();
1523        notifyNewSeqno(*notifyCtx);
1524        break;
1525    }
1526    case MutationStatus::NeedBgFetch:
1527        hbl.getHTLock().unlock();
1528        bgFetch(key, cookie, engine, bgFetchDelay, metaBgFetch);
1529        return ENGINE_EWOULDBLOCK;
1530    }
1531    return ENGINE_SUCCESS;
1532}
1533
1534void VBucket::deleteExpiredItem(const Item& it,
1535                                time_t startTime,
1536                                ExpireBy source) {
1537
1538    // The item is correctly trimmed (by the caller). Fetch the one in the
1539    // hashtable and replace it if the CAS match (same item; no race).
1540    // If not found in the hashtable we should add it as a deleted item
1541    const DocKey& key = it.getKey();
1542    auto hbl = ht.getLockedBucket(key);
1543    StoredValue* v = ht.unlocked_find(
1544            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
1545    if (v) {
1546        if (v->getCas() != it.getCas()) {
1547            return;
1548        }
1549
1550        if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1551            bool deleted = deleteStoredValue(hbl, *v);
1552            if (!deleted) {
1553                throw std::logic_error(
1554                        "VBucket::deleteExpiredItem: "
1555                        "Failed to delete seqno:" +
1556                        std::to_string(v->getBySeqno()) + " from bucket " +
1557                        std::to_string(hbl.getBucketNum()));
1558            }
1559        } else if (v->isExpired(startTime) && !v->isDeleted()) {
1560            VBNotifyCtx notifyCtx;
1561            ht.unlocked_updateStoredValue(hbl.getHTLock(), *v, it);
1562            std::tie(std::ignore, std::ignore, notifyCtx) =
1563                    processExpiredItem(hbl, *v);
1564            // we unlock ht lock here because we want to avoid potential lock
1565            // inversions arising from notifyNewSeqno() call
1566            hbl.getHTLock().unlock();
1567            notifyNewSeqno(notifyCtx);
1568        }
1569    } else {
1570        if (eviction == FULL_EVICTION) {
1571            // Create a temp item and delete and push it
1572            // into the checkpoint queue, only if the bloomfilter
1573            // predicts that the item may exist on disk.
1574            if (maybeKeyExistsInFilter(key)) {
1575                TempAddStatus rv = addTempStoredValue(hbl, key);
1576                if (rv == TempAddStatus::NoMem) {
1577                    return;
1578                }
1579                v = ht.unlocked_find(key,
1580                                     hbl.getBucketNum(),
1581                                     WantsDeleted::Yes,
1582                                     TrackReference::No);
1583                v->setTempDeleted();
1584                v->setRevSeqno(it.getRevSeqno());
1585                ht.unlocked_updateStoredValue(hbl.getHTLock(), *v, it);
1586                VBNotifyCtx notifyCtx;
1587                std::tie(std::ignore, std::ignore, notifyCtx) =
1588                        processExpiredItem(hbl, *v);
1589                // we unlock ht lock here because we want to avoid potential
1590                // lock inversions arising from notifyNewSeqno() call
1591                hbl.getHTLock().unlock();
1592                notifyNewSeqno(notifyCtx);
1593            }
1594        }
1595    }
1596    incExpirationStat(source);
1597}
1598
1599ENGINE_ERROR_CODE VBucket::add(
1600        Item& itm,
1601        const void* cookie,
1602        EventuallyPersistentEngine& engine,
1603        int bgFetchDelay,
1604        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1605    auto hbl = ht.getLockedBucket(itm.getKey());
1606    StoredValue* v = ht.unlocked_find(itm.getKey(),
1607                                      hbl.getBucketNum(),
1608                                      WantsDeleted::Yes,
1609                                      TrackReference::No);
1610
1611    bool maybeKeyExists = true;
1612    if ((v == nullptr || v->isTempInitialItem()) &&
1613        (eviction == FULL_EVICTION)) {
1614        // Check bloomfilter's prediction
1615        if (!maybeKeyExistsInFilter(itm.getKey())) {
1616            maybeKeyExists = false;
1617        }
1618    }
1619
1620    PreLinkDocumentContext preLinkDocumentContext(engine, cookie, &itm);
1621    VBQueueItemCtx queueItmCtx(GenerateBySeqno::Yes,
1622                               GenerateCas::Yes,
1623                               TrackCasDrift::No,
1624                               /*isBackfillItem*/ false,
1625                               &preLinkDocumentContext);
1626    AddStatus status;
1627    boost::optional<VBNotifyCtx> notifyCtx;
1628    std::tie(status, notifyCtx) = processAdd(
1629            hbl, v, itm, maybeKeyExists, false, queueItmCtx, readHandle);
1630
1631    switch (status) {
1632    case AddStatus::NoMem:
1633        return ENGINE_ENOMEM;
1634    case AddStatus::Exists:
1635        return ENGINE_NOT_STORED;
1636    case AddStatus::AddTmpAndBgFetch:
1637        return addTempItemAndBGFetch(
1638                hbl, itm.getKey(), cookie, engine, bgFetchDelay, true);
1639    case AddStatus::BgFetch:
1640        hbl.getHTLock().unlock();
1641        bgFetch(itm.getKey(), cookie, engine, bgFetchDelay, true);
1642        return ENGINE_EWOULDBLOCK;
1643    case AddStatus::Success:
1644    case AddStatus::UnDel:
1645        notifyNewSeqno(*notifyCtx);
1646        itm.setBySeqno(v->getBySeqno());
1647        itm.setCas(v->getCas());
1648        break;
1649    }
1650    return ENGINE_SUCCESS;
1651}
1652
1653std::pair<MutationStatus, GetValue> VBucket::processGetAndUpdateTtl(
1654        HashTable::HashBucketLock& hbl,
1655        StoredValue* v,
1656        time_t exptime,
1657        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1658    if (v) {
1659        if (isLogicallyNonExistent(*v, readHandle)) {
1660            ht.cleanupIfTemporaryItem(hbl, *v);
1661            return {MutationStatus::NotFound, GetValue()};
1662        }
1663
1664        if (!v->isResident()) {
1665            return {MutationStatus::NeedBgFetch, GetValue()};
1666        }
1667
1668        if (v->isLocked(ep_current_time())) {
1669            return {MutationStatus::IsLocked,
1670                    GetValue(nullptr, ENGINE_KEY_EEXISTS, 0)};
1671        }
1672
1673        const bool exptime_mutated = exptime != v->getExptime();
1674        auto bySeqNo = v->getBySeqno();
1675        if (exptime_mutated) {
1676            v->markDirty();
1677            v->setExptime(exptime);
1678            v->setRevSeqno(v->getRevSeqno() + 1);
1679        }
1680
1681        GetValue rv(v->toItem(v->isLocked(ep_current_time()), getId()),
1682                    ENGINE_SUCCESS,
1683                    bySeqNo);
1684
1685        if (exptime_mutated) {
1686            VBQueueItemCtx qItemCtx(GenerateBySeqno::Yes,
1687                                    GenerateCas::Yes,
1688                                    TrackCasDrift::No,
1689                                    false,
1690                                    nullptr);
1691            VBNotifyCtx notifyCtx;
1692            std::tie(v, std::ignore, notifyCtx) =
1693                    updateStoredValue(hbl, *v, *rv.item, qItemCtx, true);
1694            rv.item->setCas(v->getCas());
1695            // we unlock ht lock here because we want to avoid potential lock
1696            // inversions arising from notifyNewSeqno() call
1697            hbl.getHTLock().unlock();
1698            notifyNewSeqno(notifyCtx);
1699        }
1700
1701        return {MutationStatus::WasClean, std::move(rv)};
1702    } else {
1703        if (eviction == VALUE_ONLY) {
1704            return {MutationStatus::NotFound, GetValue()};
1705        } else {
1706            if (maybeKeyExistsInFilter(readHandle.getKey())) {
1707                return {MutationStatus::NeedBgFetch, GetValue()};
1708            } else {
1709                // As bloomfilter predicted that item surely doesn't exist
1710                // on disk, return ENOENT for getAndUpdateTtl().
1711                return {MutationStatus::NotFound, GetValue()};
1712            }
1713        }
1714    }
1715}
1716
1717GetValue VBucket::getAndUpdateTtl(
1718        const void* cookie,
1719        EventuallyPersistentEngine& engine,
1720        int bgFetchDelay,
1721        time_t exptime,
1722        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1723    auto hbl = ht.getLockedBucket(readHandle.getKey());
1724    StoredValue* v = fetchValidValue(hbl,
1725                                     readHandle.getKey(),
1726                                     WantsDeleted::Yes,
1727                                     TrackReference::Yes,
1728                                     QueueExpired::Yes);
1729    GetValue gv;
1730    MutationStatus status;
1731    std::tie(status, gv) = processGetAndUpdateTtl(hbl, v, exptime, readHandle);
1732
1733    if (status == MutationStatus::NeedBgFetch) {
1734        if (v) {
1735            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay);
1736            return GetValue(nullptr, ENGINE_EWOULDBLOCK, v->getBySeqno());
1737        } else {
1738            ENGINE_ERROR_CODE ec = addTempItemAndBGFetch(hbl,
1739                                                         readHandle.getKey(),
1740                                                         cookie,
1741                                                         engine,
1742                                                         bgFetchDelay,
1743                                                         false);
1744            return GetValue(NULL, ec, -1, true);
1745        }
1746    }
1747
1748    return gv;
1749}
1750
1751GetValue VBucket::getInternal(
1752        const void* cookie,
1753        EventuallyPersistentEngine& engine,
1754        int bgFetchDelay,
1755        get_options_t options,
1756        bool diskFlushAll,
1757        GetKeyOnly getKeyOnly,
1758        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1759    const TrackReference trackReference = (options & TRACK_REFERENCE)
1760                                                  ? TrackReference::Yes
1761                                                  : TrackReference::No;
1762    const bool metadataOnly = (options & ALLOW_META_ONLY);
1763    const bool getDeletedValue = (options & GET_DELETED_VALUE);
1764    const bool bgFetchRequired = (options & QUEUE_BG_FETCH);
1765    auto hbl = ht.getLockedBucket(readHandle.getKey());
1766    StoredValue* v = fetchValidValue(hbl,
1767                                     readHandle.getKey(),
1768                                     WantsDeleted::Yes,
1769                                     trackReference,
1770                                     QueueExpired::Yes);
1771    if (v) {
1772        // 1 If SV is deleted and user didn't request deleted items
1773        // 2 (or) If collection says this key is gone.
1774        // then return ENOENT.
1775        if ((v->isDeleted() && !getDeletedValue) ||
1776            readHandle.isLogicallyDeleted(v->getBySeqno())) {
1777            return GetValue();
1778        }
1779
1780        // If SV is a temp deleted item (i.e. marker added after a BgFetch to
1781        // note that the item has been deleted), *but* the user requested
1782        // full deleted items, then we need to fetch the complete deleted item
1783        // (including body) from disk.
1784        if (v->isTempDeletedItem() && getDeletedValue && !metadataOnly) {
1785            const auto queueBgFetch =
1786                    (bgFetchRequired) ? QueueBgFetch::Yes : QueueBgFetch::No;
1787            return getInternalNonResident(readHandle.getKey(),
1788                                          cookie,
1789                                          engine,
1790                                          bgFetchDelay,
1791                                          queueBgFetch,
1792                                          *v);
1793        }
1794
1795        // If SV is otherwise a temp non-existent (i.e. a marker added after a
1796        // BgFetch to note that no such item exists) or temp deleted, then we
1797        // should cleanup the SV (if requested) before returning ENOENT (so we
1798        // don't keep temp items in HT).
1799        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1800            if (options & DELETE_TEMP) {
1801                deleteStoredValue(hbl, *v);
1802            }
1803            return GetValue();
1804        }
1805
1806        // If the value is not resident (and it was requested), wait for it...
1807        if (!v->isResident() && !metadataOnly) {
1808            auto queueBgFetch = (bgFetchRequired) ?
1809                    QueueBgFetch::Yes :
1810                    QueueBgFetch::No;
1811            return getInternalNonResident(readHandle.getKey(),
1812                                          cookie,
1813                                          engine,
1814                                          bgFetchDelay,
1815                                          queueBgFetch,
1816                                          *v);
1817        }
1818
1819        // Should we hide (return -1) for the items' CAS?
1820        const bool hideCas =
1821                (options & HIDE_LOCKED_CAS) && v->isLocked(ep_current_time());
1822        std::unique_ptr<Item> item;
1823        if (getKeyOnly == GetKeyOnly::Yes) {
1824            item = v->toItemKeyOnly(getId());
1825        } else {
1826            item = v->toItem(hideCas, getId());
1827        }
1828        return GetValue(std::move(item),
1829                        ENGINE_SUCCESS,
1830                        v->getBySeqno(),
1831                        !v->isResident(),
1832                        v->getNRUValue());
1833    } else {
1834        if (!getDeletedValue && (eviction == VALUE_ONLY || diskFlushAll)) {
1835            return GetValue();
1836        }
1837
1838        if (maybeKeyExistsInFilter(readHandle.getKey())) {
1839            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1840            if (bgFetchRequired) { // Full eviction and need a bg fetch.
1841                ec = addTempItemAndBGFetch(hbl,
1842                                           readHandle.getKey(),
1843                                           cookie,
1844                                           engine,
1845                                           bgFetchDelay,
1846                                           metadataOnly);
1847            }
1848            return GetValue(NULL, ec, -1, true);
1849        } else {
1850            // As bloomfilter predicted that item surely doesn't exist
1851            // on disk, return ENOENT, for getInternal().
1852            return GetValue();
1853        }
1854    }
1855}
1856
1857ENGINE_ERROR_CODE VBucket::getMetaData(
1858        const void* cookie,
1859        EventuallyPersistentEngine& engine,
1860        int bgFetchDelay,
1861        const Collections::VB::Manifest::CachingReadHandle& readHandle,
1862        ItemMetaData& metadata,
1863        uint32_t& deleted,
1864        uint8_t& datatype) {
1865    deleted = 0;
1866    auto hbl = ht.getLockedBucket(readHandle.getKey());
1867    StoredValue* v = ht.unlocked_find(readHandle.getKey(),
1868                                      hbl.getBucketNum(),
1869                                      WantsDeleted::Yes,
1870                                      TrackReference::No);
1871
1872    if (v) {
1873        stats.numOpsGetMeta++;
1874        if (v->isTempInitialItem()) {
1875            // Need bg meta fetch.
1876            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay, true);
1877            return ENGINE_EWOULDBLOCK;
1878        } else if (v->isTempNonExistentItem()) {
1879            metadata.cas = v->getCas();
1880            return ENGINE_KEY_ENOENT;
1881        } else if (readHandle.isLogicallyDeleted(v->getBySeqno())) {
1882            return ENGINE_KEY_ENOENT;
1883        } else {
1884            if (v->isTempDeletedItem() || v->isDeleted() ||
1885                v->isExpired(ep_real_time())) {
1886                deleted |= GET_META_ITEM_DELETED_FLAG;
1887            }
1888
1889            if (v->isLocked(ep_current_time())) {
1890                metadata.cas = static_cast<uint64_t>(-1);
1891            } else {
1892                metadata.cas = v->getCas();
1893            }
1894            metadata.flags = v->getFlags();
1895            metadata.exptime = v->getExptime();
1896            metadata.revSeqno = v->getRevSeqno();
1897            datatype = v->getDatatype();
1898
1899            return ENGINE_SUCCESS;
1900        }
1901    } else {
1902        // The key wasn't found. However, this may be because it was previously
1903        // deleted or evicted with the full eviction strategy.
1904        // So, add a temporary item corresponding to the key to the hash table
1905        // and schedule a background fetch for its metadata from the persistent
1906        // store. The item's state will be updated after the fetch completes.
1907        //
1908        // Schedule this bgFetch only if the key is predicted to be may-be
1909        // existent on disk by the bloomfilter.
1910
1911        if (maybeKeyExistsInFilter(readHandle.getKey())) {
1912            return addTempItemAndBGFetch(hbl,
1913                                         readHandle.getKey(),
1914                                         cookie,
1915                                         engine,
1916                                         bgFetchDelay,
1917                                         true);
1918        } else {
1919            stats.numOpsGetMeta++;
1920            return ENGINE_KEY_ENOENT;
1921        }
1922    }
1923}
1924
1925ENGINE_ERROR_CODE VBucket::getKeyStats(
1926        const void* cookie,
1927        EventuallyPersistentEngine& engine,
1928        int bgFetchDelay,
1929        struct key_stats& kstats,
1930        WantsDeleted wantsDeleted,
1931        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1932    auto hbl = ht.getLockedBucket(readHandle.getKey());
1933    StoredValue* v = fetchValidValue(hbl,
1934                                     readHandle.getKey(),
1935                                     WantsDeleted::Yes,
1936                                     TrackReference::Yes,
1937                                     QueueExpired::Yes);
1938
1939    if (v) {
1940        if ((v->isDeleted() ||
1941             readHandle.isLogicallyDeleted(v->getBySeqno())) &&
1942            wantsDeleted == WantsDeleted::No) {
1943            return ENGINE_KEY_ENOENT;
1944        }
1945
1946        if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1947            deleteStoredValue(hbl, *v);
1948            return ENGINE_KEY_ENOENT;
1949        }
1950        if (eviction == FULL_EVICTION && v->isTempInitialItem()) {
1951            hbl.getHTLock().unlock();
1952            bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay, true);
1953            return ENGINE_EWOULDBLOCK;
1954        }
1955        kstats.logically_deleted =
1956                v->isDeleted() ||
1957                readHandle.isLogicallyDeleted(v->getBySeqno());
1958        kstats.dirty = v->isDirty();
1959        kstats.exptime = v->getExptime();
1960        kstats.flags = v->getFlags();
1961        kstats.cas = v->getCas();
1962        kstats.vb_state = getState();
1963        kstats.resident = v->isResident();
1964
1965        return ENGINE_SUCCESS;
1966    } else {
1967        if (eviction == VALUE_ONLY) {
1968            return ENGINE_KEY_ENOENT;
1969        } else {
1970            if (maybeKeyExistsInFilter(readHandle.getKey())) {
1971                return addTempItemAndBGFetch(hbl,
1972                                             readHandle.getKey(),
1973                                             cookie,
1974                                             engine,
1975                                             bgFetchDelay,
1976                                             true);
1977            } else {
1978                // If bgFetch were false, or bloomfilter predicted that
1979                // item surely doesn't exist on disk, return ENOENT for
1980                // getKeyStats().
1981                return ENGINE_KEY_ENOENT;
1982            }
1983        }
1984    }
1985}
1986
1987GetValue VBucket::getLocked(
1988        rel_time_t currentTime,
1989        uint32_t lockTimeout,
1990        const void* cookie,
1991        EventuallyPersistentEngine& engine,
1992        int bgFetchDelay,
1993        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
1994    auto hbl = ht.getLockedBucket(readHandle.getKey());
1995    StoredValue* v = fetchValidValue(hbl,
1996                                     readHandle.getKey(),
1997                                     WantsDeleted::Yes,
1998                                     TrackReference::Yes,
1999                                     QueueExpired::Yes);
2000
2001    if (v) {
2002        if (isLogicallyNonExistent(*v, readHandle)) {
2003            ht.cleanupIfTemporaryItem(hbl, *v);
2004            return GetValue(NULL, ENGINE_KEY_ENOENT);
2005        }
2006
2007        // if v is locked return error
2008        if (v->isLocked(currentTime)) {
2009            return GetValue(NULL, ENGINE_LOCKED_TMPFAIL);
2010        }
2011
2012        // If the value is not resident, wait for it...
2013        if (!v->isResident()) {
2014            if (cookie) {
2015                bgFetch(readHandle.getKey(), cookie, engine, bgFetchDelay);
2016            }
2017            return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
2018        }
2019
2020        // acquire lock and increment cas value
2021        v->lock(currentTime + lockTimeout);
2022
2023        auto it = v->toItem(false, getId());
2024        it->setCas(nextHLCCas());
2025        v->setCas(it->getCas());
2026
2027        return GetValue(std::move(it));
2028
2029    } else {
2030        // No value found in the hashtable.
2031        switch (eviction) {
2032        case VALUE_ONLY:
2033            return GetValue(NULL, ENGINE_KEY_ENOENT);
2034
2035        case FULL_EVICTION:
2036            if (maybeKeyExistsInFilter(readHandle.getKey())) {
2037                ENGINE_ERROR_CODE ec =
2038                        addTempItemAndBGFetch(hbl,
2039                                              readHandle.getKey(),
2040                                              cookie,
2041                                              engine,
2042                                              bgFetchDelay,
2043                                              false);
2044                return GetValue(NULL, ec, -1, true);
2045            } else {
2046                // As bloomfilter predicted that item surely doesn't exist
2047                // on disk, return ENOENT for getLocked().
2048                return GetValue(NULL, ENGINE_KEY_ENOENT);
2049            }
2050        }
2051        return GetValue(); // just to prevent compiler warning
2052    }
2053}
2054
2055void VBucket::deletedOnDiskCbk(const Item& queuedItem, bool deleted) {
2056    auto hbl = ht.getLockedBucket(queuedItem.getKey());
2057    StoredValue* v = fetchValidValue(hbl,
2058                                     queuedItem.getKey(),
2059                                     WantsDeleted::Yes,
2060                                     TrackReference::No,
2061                                     QueueExpired::Yes);
2062    // Delete the item in the hash table iff:
2063    //  1. Item is existent in hashtable, and deleted flag is true
2064    //  2. rev seqno of queued item matches rev seqno of hash table item
2065    if (v && v->isDeleted() && (queuedItem.getRevSeqno() == v->getRevSeqno())) {
2066        bool isDeleted = deleteStoredValue(hbl, *v);
2067        if (!isDeleted) {
2068            throw std::logic_error(
2069                    "deletedOnDiskCbk:callback: "
2070                    "Failed to delete key with seqno:" +
2071                    std::to_string(v->getBySeqno()) + "' from bucket " +
2072                    std::to_string(hbl.getBucketNum()));
2073        }
2074
2075        /**
2076         * Deleted items are to be added to the bloomfilter,
2077         * in either eviction policy.
2078         */
2079        addToFilter(queuedItem.getKey());
2080    }
2081
2082    if (deleted) {
2083        ++stats.totalPersisted;
2084        ++opsDelete;
2085
2086        /**
2087         * MB-30137: Decrement the total number of on-disk items. This needs to be
2088         * done to ensure that the item count is accurate in the case of full
2089         * eviction
2090         */
2091        if (v) {
2092            decrNumTotalItems();
2093        }
2094    }
2095    doStatsForFlushing(queuedItem, queuedItem.size());
2096    --stats.diskQueueSize;
2097    decrMetaDataDisk(queuedItem);
2098}
2099
2100bool VBucket::deleteKey(const DocKey& key) {
2101    auto hbl = ht.getLockedBucket(key);
2102    StoredValue* v = ht.unlocked_find(
2103            key, hbl.getBucketNum(), WantsDeleted::Yes, TrackReference::No);
2104    if (!v) {
2105        return false;
2106    }
2107    return deleteStoredValue(hbl, *v);
2108}
2109
2110void VBucket::postProcessRollback(const RollbackResult& rollbackResult,
2111                                  uint64_t prevHighSeqno) {
2112    failovers->pruneEntries(rollbackResult.highSeqno);
2113    checkpointManager->clear(*this, rollbackResult.highSeqno);
2114    setPersistedSnapshot(rollbackResult.snapStartSeqno,
2115                         rollbackResult.snapEndSeqno);
2116    incrRollbackItemCount(prevHighSeqno - rollbackResult.highSeqno);
2117    checkpointManager->setOpenCheckpointId(1);
2118}
2119
2120void VBucket::dump() const {
2121    std::cerr << "VBucket[" << this << "] with state: " << toString(getState())
2122              << " numItems:" << getNumItems()
2123              << " numNonResident:" << getNumNonResidentItems()
2124              << " ht: " << std::endl << "  " << ht << std::endl
2125              << "]" << std::endl;
2126}
2127
2128void VBucket::setMutationMemoryThreshold(double memThreshold) {
2129    if (memThreshold > 0.0 && memThreshold <= 1.0) {
2130        mutationMemThreshold = memThreshold;
2131    }
2132}
2133
2134bool VBucket::hasMemoryForStoredValue(EPStats& st,
2135                                      const Item& item,
2136                                      bool isReplication) {
2137    double newSize = static_cast<double>(estimateNewMemoryUsage(st, item));
2138    double maxSize = static_cast<double>(st.getMaxDataSize());
2139    if (isReplication) {
2140        return newSize <= (maxSize * st.replicationThrottleThreshold);
2141    } else {
2142        return newSize <= (maxSize * mutationMemThreshold);
2143    }
2144}
2145
2146void VBucket::_addStats(bool details, ADD_STAT add_stat, const void* c) {
2147    addStat(NULL, toString(state), add_stat, c);
2148    if (details) {
2149        size_t numItems = getNumItems();
2150        size_t tempItems = getNumTempItems();
2151        addStat("num_items", numItems, add_stat, c);
2152        addStat("num_temp_items", tempItems, add_stat, c);
2153        addStat("num_non_resident", getNumNonResidentItems(), add_stat, c);
2154        addStat("ht_memory", ht.memorySize(), add_stat, c);
2155        addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
2156        addStat("ht_item_memory_uncompressed",
2157                ht.getUncompressedItemMemory(),
2158                add_stat,
2159                c);
2160        addStat("ht_cache_size", ht.getCacheSize(), add_stat, c);
2161        addStat("ht_size", ht.getSize(), add_stat, c);
2162        addStat("num_ejects", ht.getNumEjects(), add_stat, c);
2163        addStat("ops_create", opsCreate.load(), add_stat, c);
2164        addStat("ops_update", opsUpdate.load(), add_stat, c);
2165        addStat("ops_delete", opsDelete.load(), add_stat, c);
2166        addStat("ops_reject", opsReject.load(), add_stat, c);
2167        addStat("queue_size", dirtyQueueSize.load(), add_stat, c);
2168        addStat("backfill_queue_size", getBackfillSize(), add_stat, c);
2169        addStat("queue_memory", dirtyQueueMem.load(), add_stat, c);
2170        addStat("queue_fill", dirtyQueueFill.load(), add_stat, c);
2171        addStat("queue_drain", dirtyQueueDrain.load(), add_stat, c);
2172        addStat("queue_age", getQueueAge(), add_stat, c);
2173        addStat("pending_writes", dirtyQueuePendingWrites.load(), add_stat, c);
2174
2175        addStat("high_seqno", getHighSeqno(), add_stat, c);
2176        addStat("uuid", failovers->getLatestUUID(), add_stat, c);
2177        addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
2178        addStat("bloom_filter", getFilterStatusString().data(),
2179                add_stat, c);
2180        addStat("bloom_filter_size", getFilterSize(), add_stat, c);
2181        addStat("bloom_filter_key_count", getNumOfKeysInFilter(), add_stat, c);
2182        addStat("rollback_item_count", getRollbackItemCount(), add_stat, c);
2183        addStat("hp_vb_req_size", getHighPriorityChkSize(), add_stat, c);
2184        addStat("might_contain_xattrs", mightContainXattrs(), add_stat, c);
2185        addStat("max_deleted_revid", ht.getMaxDeletedRevSeqno(), add_stat, c);
2186        hlc.addStats(statPrefix, add_stat, c);
2187    }
2188}
2189
2190void VBucket::decrDirtyQueueMem(size_t decrementBy)
2191{
2192    size_t oldVal, newVal;
2193    do {
2194        oldVal = dirtyQueueMem.load(std::memory_order_relaxed);
2195        if (oldVal < decrementBy) {
2196            newVal = 0;
2197        } else {
2198            newVal = oldVal - decrementBy;
2199        }
2200    } while (!dirtyQueueMem.compare_exchange_strong(oldVal, newVal));
2201}
2202
2203void VBucket::decrDirtyQueueAge(uint32_t decrementBy)
2204{
2205    uint64_t oldVal, newVal;
2206    do {
2207        oldVal = dirtyQueueAge.load(std::memory_order_relaxed);
2208        if (oldVal < decrementBy) {
2209            newVal = 0;
2210        } else {
2211            newVal = oldVal - decrementBy;
2212        }
2213    } while (!dirtyQueueAge.compare_exchange_strong(oldVal, newVal));
2214}
2215
2216void VBucket::decrDirtyQueuePendingWrites(size_t decrementBy)
2217{
2218    size_t oldVal, newVal;
2219    do {
2220        oldVal = dirtyQueuePendingWrites.load(std::memory_order_relaxed);
2221        if (oldVal < decrementBy) {
2222            newVal = 0;
2223        } else {
2224            newVal = oldVal - decrementBy;
2225        }
2226    } while (!dirtyQueuePendingWrites.compare_exchange_strong(oldVal, newVal));
2227}
2228
2229std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
2230        const HashTable::HashBucketLock& hbl,
2231        StoredValue*& v,
2232        Item& itm,
2233        uint64_t cas,
2234        bool allowExisting,
2235        bool hasMetaData,
2236        const VBQueueItemCtx& queueItmCtx,
2237        cb::StoreIfStatus storeIfStatus,
2238        bool maybeKeyExists,
2239        bool isReplication) {
2240    if (!hbl.getHTLock()) {
2241        throw std::invalid_argument(
2242                "VBucket::processSet: htLock not held for "
2243                "VBucket " +
2244                std::to_string(getId()));
2245    }
2246
2247    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2248        return {MutationStatus::NoMem, {}};
2249    }
2250
2251    if (v == nullptr && itm.isDeleted() && cas &&
2252        !areDeletedItemsAlwaysResident()) {
2253        // Request to perform a CAS operation on a deleted body which may
2254        // not be resident. Need a bg_fetch to be able to perform this request.
2255        return {MutationStatus::NeedBgFetch, VBNotifyCtx()};
2256    }
2257
2258    // bgFetch only in FE, only if the bloom-filter thinks the key may exist.
2259    // But only for cas operations or if a store_if is requiring the item_info.
2260    if (eviction == FULL_EVICTION && maybeKeyExists &&
2261        (cas || storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
2262        if (!v || v->isTempInitialItem()) {
2263            return {MutationStatus::NeedBgFetch, {}};
2264        }
2265    }
2266
2267    /*
2268     * prior to checking for the lock, we should check if this object
2269     * has expired. If so, then check if CAS value has been provided
2270     * for this set op. In this case the operation should be denied since
2271     * a cas operation for a key that doesn't exist is not a very cool
2272     * thing to do. See MB 3252
2273     */
2274    if (v && v->isExpired(ep_real_time()) && !hasMetaData && !itm.isDeleted()) {
2275        if (v->isLocked(ep_current_time())) {
2276            v->unlock();
2277        }
2278        if (cas) {
2279            /* item has expired and cas value provided. Deny ! */
2280            return {MutationStatus::NotFound, {}};
2281        }
2282    }
2283
2284    if (v) {
2285        if (!allowExisting && !v->isTempItem() && !v->isDeleted()) {
2286            return {MutationStatus::InvalidCas, {}};
2287        }
2288        if (v->isLocked(ep_current_time())) {
2289            /*
2290             * item is locked, deny if there is cas value mismatch
2291             * or no cas value is provided by the user
2292             */
2293            if (cas != v->getCas()) {
2294                return {MutationStatus::IsLocked, {}};
2295            }
2296            /* allow operation*/
2297            v->unlock();
2298        } else if (cas && cas != v->getCas()) {
2299            if (v->isTempNonExistentItem()) {
2300                // This is a temporary item which marks a key as non-existent;
2301                // therefore specifying a non-matching CAS should be exposed
2302                // as item not existing.
2303                return {MutationStatus::NotFound, {}};
2304            }
2305            if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
2306                // Existing item is deleted, and we are not replacing it with
2307                // a (different) deleted value - return not existing.
2308                return {MutationStatus::NotFound, {}};
2309            }
2310            // None of the above special cases; the existing item cannot be
2311            // modified with the specified CAS.
2312            return {MutationStatus::InvalidCas, {}};
2313        }
2314        if (!hasMetaData) {
2315            itm.setRevSeqno(v->getRevSeqno() + 1);
2316            /* MB-23530: We must ensure that a replace operation (i.e.
2317             * set with a CAS) /fails/ if the old document is deleted; it
2318             * logically "doesn't exist". However, if the new value is deleted
2319             * this op is a /delete/ with a CAS and we must permit a
2320             * deleted -> deleted transition for Deleted Bodies.
2321             */
2322            if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
2323                !itm.isDeleted()) {
2324                return {MutationStatus::NotFound, {}};
2325            }
2326        }
2327
2328        MutationStatus status;
2329        VBNotifyCtx notifyCtx;
2330        std::tie(v, status, notifyCtx) =
2331                updateStoredValue(hbl, *v, itm, queueItmCtx);
2332        return {status, notifyCtx};
2333    } else if (cas != 0) {
2334        return {MutationStatus::NotFound, {}};
2335    } else {
2336        VBNotifyCtx notifyCtx;
2337        auto genRevSeqno = hasMetaData ? GenerateRevSeqno::No :
2338                           GenerateRevSeqno::Yes;
2339        std::tie(v, notifyCtx) = addNewStoredValue(hbl, itm, queueItmCtx,
2340                                                   genRevSeqno);
2341        itm.setRevSeqno(v->getRevSeqno());
2342        return {MutationStatus::WasClean, notifyCtx};
2343    }
2344}
2345
2346std::pair<AddStatus, boost::optional<VBNotifyCtx>> VBucket::processAdd(
2347        const HashTable::HashBucketLock& hbl,
2348        StoredValue*& v,
2349        Item& itm,
2350        bool maybeKeyExists,
2351        bool isReplication,
2352        const VBQueueItemCtx& queueItmCtx,
2353        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2354    if (!hbl.getHTLock()) {
2355        throw std::invalid_argument(
2356                "VBucket::processAdd: htLock not held for "
2357                "VBucket " +
2358                std::to_string(getId()));
2359    }
2360    if (v && !v->isDeleted() && !v->isExpired(ep_real_time()) &&
2361        !v->isTempItem() && !readHandle.isLogicallyDeleted(v->getBySeqno())) {
2362        return {AddStatus::Exists, {}};
2363    }
2364    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2365        return {AddStatus::NoMem, {}};
2366    }
2367
2368    std::pair<AddStatus, VBNotifyCtx> rv = {AddStatus::Success, {}};
2369
2370    if (v) {
2371        if (v->isTempInitialItem() && eviction == FULL_EVICTION &&
2372            maybeKeyExists) {
2373            // Need to figure out if an item exists on disk
2374            return {AddStatus::BgFetch, {}};
2375        }
2376
2377        rv.first = (v->isDeleted() || v->isExpired(ep_real_time()))
2378                           ? AddStatus::UnDel
2379                           : AddStatus::Success;
2380
2381        if (v->isTempDeletedItem()) {
2382            itm.setRevSeqno(v->getRevSeqno() + 1);
2383        } else {
2384            itm.setRevSeqno(ht.getMaxDeletedRevSeqno() + 1);
2385        }
2386
2387        if (!v->isTempItem()) {
2388            itm.setRevSeqno(v->getRevSeqno() + 1);
2389        }
2390
2391        std::tie(v, std::ignore, rv.second) =
2392                updateStoredValue(hbl, *v, itm, queueItmCtx);
2393    } else {
2394        if (itm.getBySeqno() != StoredValue::state_temp_init) {
2395            if (eviction == FULL_EVICTION && maybeKeyExists) {
2396                return {AddStatus::AddTmpAndBgFetch, VBNotifyCtx()};
2397            }
2398        }
2399
2400        if (itm.getBySeqno() == StoredValue::state_temp_init) {
2401            /* A 'temp initial item' is just added to the hash table. It is
2402             not put on checkpoint manager or sequence list */
2403            v = ht.unlocked_addNewStoredValue(hbl, itm);
2404            updateRevSeqNoOfNewStoredValue(*v);
2405        } else {
2406            std::tie(v, rv.second) = addNewStoredValue(hbl, itm, queueItmCtx,
2407                                                       GenerateRevSeqno::Yes);
2408        }
2409
2410        itm.setRevSeqno(v->getRevSeqno());
2411
2412        if (v->isTempItem()) {
2413            rv.first = AddStatus::BgFetch;
2414        }
2415    }
2416
2417    if (v->isTempItem()) {
2418        v->setNRUValue(MAX_NRU_VALUE);
2419    }
2420    return rv;
2421}
2422
2423std::tuple<MutationStatus, StoredValue*, boost::optional<VBNotifyCtx>>
2424VBucket::processSoftDelete(const HashTable::HashBucketLock& hbl,
2425                           StoredValue& v,
2426                           uint64_t cas,
2427                           const ItemMetaData& metadata,
2428                           const VBQueueItemCtx& queueItmCtx,
2429                           bool use_meta,
2430                           uint64_t bySeqno) {
2431    boost::optional<VBNotifyCtx> empty;
2432    if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2433        return std::make_tuple(MutationStatus::NeedBgFetch, &v, empty);
2434    }
2435
2436    if (v.isLocked(ep_current_time())) {
2437        if (cas != v.getCas()) {
2438            return std::make_tuple(MutationStatus::IsLocked, &v, empty);
2439        }
2440        v.unlock();
2441    }
2442
2443    if (cas != 0 && cas != v.getCas()) {
2444        return std::make_tuple(MutationStatus::InvalidCas, &v, empty);
2445    }
2446
2447    /* allow operation */
2448    v.unlock();
2449
2450    MutationStatus rv =
2451            v.isDirty() ? MutationStatus::WasDirty : MutationStatus::WasClean;
2452
2453    if (use_meta) {
2454        v.setCas(metadata.cas);
2455        v.setFlags(metadata.flags);
2456        v.setExptime(metadata.exptime);
2457    }
2458
2459    v.setRevSeqno(metadata.revSeqno);
2460    VBNotifyCtx notifyCtx;
2461    StoredValue* newSv;
2462    std::tie(newSv, notifyCtx) =
2463            softDeleteStoredValue(hbl,
2464                                  v,
2465                                  /*onlyMarkDeleted*/ false,
2466                                  queueItmCtx,
2467                                  bySeqno);
2468    ht.updateMaxDeletedRevSeqno(metadata.revSeqno);
2469    return std::make_tuple(rv, newSv, notifyCtx);
2470}
2471
2472std::tuple<MutationStatus, StoredValue*, VBNotifyCtx>
2473VBucket::processExpiredItem(const HashTable::HashBucketLock& hbl,
2474                            StoredValue& v) {
2475    if (!hbl.getHTLock()) {
2476        throw std::invalid_argument(
2477                "VBucket::processExpiredItem: htLock not held for VBucket " +
2478                std::to_string(getId()));
2479    }
2480
2481    if (v.isTempInitialItem() && eviction == FULL_EVICTION) {
2482        return std::make_tuple(MutationStatus::NeedBgFetch,
2483                               &v,
2484                               queueDirty(v,
2485                                          GenerateBySeqno::Yes,
2486                                          GenerateCas::Yes,
2487                                          /*isBackfillItem*/ false));
2488    }
2489
2490    /* If the datatype is XATTR, mark the item as deleted
2491     * but don't delete the value as system xattrs can
2492     * still be queried by mobile clients even after
2493     * deletion.
2494     * TODO: The current implementation is inefficient
2495     * but functionally correct and for performance reasons
2496     * only the system xattrs need to be stored.
2497     */
2498    value_t value = v.getValue();
2499    bool onlyMarkDeleted = value && mcbp::datatype::is_xattr(v.getDatatype());
2500    v.setRevSeqno(v.getRevSeqno() + 1);
2501    VBNotifyCtx notifyCtx;
2502    StoredValue* newSv;
2503    std::tie(newSv, notifyCtx) =
2504            softDeleteStoredValue(hbl,
2505                                  v,
2506                                  onlyMarkDeleted,
2507                                  VBQueueItemCtx(GenerateBySeqno::Yes,
2508                                                 GenerateCas::Yes,
2509                                                 TrackCasDrift::No,
2510                                                 /*isBackfillItem*/ false,
2511                                                 nullptr /* no pre link */),
2512                                  v.getBySeqno());
2513    ht.updateMaxDeletedRevSeqno(newSv->getRevSeqno() + 1);
2514    return std::make_tuple(MutationStatus::NotFound, newSv, notifyCtx);
2515}
2516
2517bool VBucket::deleteStoredValue(const HashTable::HashBucketLock& hbl,
2518                                StoredValue& v) {
2519    if (!v.isDeleted() && v.isLocked(ep_current_time())) {
2520        return false;
2521    }
2522
2523    /* StoredValue deleted here. If any other in-memory data structures are
2524       using the StoredValue intrusively then they must have handled the delete
2525       by this point */
2526    ht.unlocked_del(hbl, v.getKey());
2527    return true;
2528}
2529
2530TempAddStatus VBucket::addTempStoredValue(const HashTable::HashBucketLock& hbl,
2531                                          const DocKey& key,
2532                                          bool isReplication) {
2533    if (!hbl.getHTLock()) {
2534        throw std::invalid_argument(
2535                "VBucket::addTempStoredValue: htLock not held for "
2536                "VBucket " +
2537                std::to_string(getId()));
2538    }
2539
2540    Item itm(key,
2541             /*flags*/ 0,
2542             /*exp*/ 0,
2543             /*data*/ NULL,
2544             /*size*/ 0,
2545             PROTOCOL_BINARY_RAW_BYTES,
2546             0,
2547             StoredValue::state_temp_init);
2548
2549    if (!hasMemoryForStoredValue(stats, itm, isReplication)) {
2550        return TempAddStatus::NoMem;
2551    }
2552
2553    /* A 'temp initial item' is just added to the hash table. It is
2554       not put on checkpoint manager or sequence list */
2555    StoredValue* v = ht.unlocked_addNewStoredValue(hbl, itm);
2556
2557    updateRevSeqNoOfNewStoredValue(*v);
2558    itm.setRevSeqno(v->getRevSeqno());
2559    v->setNRUValue(MAX_NRU_VALUE);
2560
2561    return TempAddStatus::BgFetch;
2562}
2563
2564void VBucket::notifyNewSeqno(const VBNotifyCtx& notifyCtx) {
2565    if (newSeqnoCb) {
2566        newSeqnoCb->callback(getId(), notifyCtx);
2567    }
2568}
2569
2570/*
2571 * Queue the item to the checkpoint and return the seqno the item was
2572 * allocated.
2573 */
2574int64_t VBucket::queueItem(Item* item, OptionalSeqno seqno) {
2575    item->setVBucketId(id);
2576    queued_item qi(item);
2577    if (isBackfillPhase()) {
2578        queueBackfillItem(qi, getGenerateBySeqno(seqno));
2579    } else {
2580        checkpointManager->queueDirty(
2581                *this,
2582                qi,
2583                getGenerateBySeqno(seqno),
2584                GenerateCas::Yes,
2585                nullptr /* No pre link step as this is for system events */);
2586    }
2587    VBNotifyCtx notifyCtx;
2588    // If the seqno is initialized, skip replication notification
2589    notifyCtx.notifyReplication = !seqno.is_initialized();
2590    notifyCtx.notifyFlusher = true;
2591    notifyCtx.bySeqno = qi->getBySeqno();
2592    notifyNewSeqno(notifyCtx);
2593    return qi->getBySeqno();
2594}
2595
2596VBNotifyCtx VBucket::queueDirty(StoredValue& v,
2597                                const VBQueueItemCtx& queueItmCtx) {
2598    if (queueItmCtx.trackCasDrift == TrackCasDrift::Yes) {
2599        setMaxCasAndTrackDrift(v.getCas());
2600    }
2601    return queueDirty(v,
2602                      queueItmCtx.genBySeqno,
2603                      queueItmCtx.genCas,
2604                      queueItmCtx.isBackfillItem,
2605                      queueItmCtx.preLinkDocumentContext);
2606}
2607
2608void VBucket::updateRevSeqNoOfNewStoredValue(StoredValue& v) {
2609    /**
2610     * Possibly, this item is being recreated. Conservatively assign it
2611     * a seqno that is greater than the greatest seqno of all deleted
2612     * items seen so far.
2613     */
2614    uint64_t seqno = ht.getMaxDeletedRevSeqno();
2615    if (!v.isTempItem()) {
2616        ++seqno;
2617    }
2618    v.setRevSeqno(seqno);
2619}
2620
2621void VBucket::addHighPriorityVBEntry(uint64_t seqnoOrChkId,
2622                                     const void* cookie,
2623                                     HighPriorityVBNotify reqType) {
2624    std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2625    hpVBReqs.push_back(HighPriorityVBEntry(cookie, seqnoOrChkId, reqType));
2626    numHpVBReqs.store(hpVBReqs.size());
2627
2628    LOG(EXTENSION_LOG_NOTICE,
2629        "Added high priority async request %s "
2630        "for vb:%" PRIu16 ", Check for:%" PRIu64 ", "
2631        "Persisted upto:%" PRIu64 ", cookie:%p",
2632        to_string(reqType).c_str(),
2633        getId(),
2634        seqnoOrChkId,
2635        getPersistenceSeqno(),
2636        cookie);
2637}
2638
2639std::map<const void*, ENGINE_ERROR_CODE> VBucket::getHighPriorityNotifications(
2640        EventuallyPersistentEngine& engine,
2641        uint64_t idNum,
2642        HighPriorityVBNotify notifyType) {
2643    std::unique_lock<std::mutex> lh(hpVBReqsMutex);
2644    std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2645
2646    auto entry = hpVBReqs.begin();
2647
2648    while (entry != hpVBReqs.end()) {
2649        if (notifyType != entry->reqType) {
2650            ++entry;
2651            continue;
2652        }
2653
2654        std::string logStr(to_string(notifyType));
2655
2656        auto wall_time = ProcessClock::now() - entry->start;
2657        auto spent =
2658                std::chrono::duration_cast<std::chrono::seconds>(wall_time);
2659        if (entry->id <= idNum) {
2660            toNotify[entry->cookie] = ENGINE_SUCCESS;
2661            stats.chkPersistenceHisto.add(
2662                    std::chrono::duration_cast<std::chrono::microseconds>(
2663                            wall_time));
2664            adjustCheckpointFlushTimeout(spent);
2665            LOG(EXTENSION_LOG_NOTICE,
2666                "Notified the completion of %s "
2667                "for vbucket %" PRIu16 ", Check for: %" PRIu64
2668                ", "
2669                "Persisted upto: %" PRIu64 ", cookie %p",
2670                logStr.c_str(),
2671                getId(),
2672                entry->id,
2673                idNum,
2674                entry->cookie);
2675            entry = hpVBReqs.erase(entry);
2676        } else if (spent > getCheckpointFlushTimeout()) {
2677            adjustCheckpointFlushTimeout(spent);
2678            engine.storeEngineSpecific(entry->cookie, NULL);
2679            toNotify[entry->cookie] = ENGINE_TMPFAIL;
2680            LOG(EXTENSION_LOG_WARNING,
2681                "Notified the timeout on %s "
2682                "for vbucket %" PRIu16 ", Check for: %" PRIu64
2683                ", "
2684                "Persisted upto: %" PRIu64 ", cookie %p",
2685                logStr.c_str(),
2686                getId(),
2687                entry->id,
2688                idNum,
2689                entry->cookie);
2690            entry = hpVBReqs.erase(entry);
2691        } else {
2692            ++entry;
2693        }
2694    }
2695    numHpVBReqs.store(hpVBReqs.size());
2696    return toNotify;
2697}
2698
2699std::map<const void*, ENGINE_ERROR_CODE> VBucket::tmpFailAndGetAllHpNotifies(
2700        EventuallyPersistentEngine& engine) {
2701    std::map<const void*, ENGINE_ERROR_CODE> toNotify;
2702
2703    LockHolder lh(hpVBReqsMutex);
2704
2705    for (auto& entry : hpVBReqs) {
2706        toNotify[entry.cookie] = ENGINE_TMPFAIL;
2707        engine.storeEngineSpecific(entry.cookie, NULL);
2708    }
2709    hpVBReqs.clear();
2710
2711    return toNotify;
2712}
2713
2714void VBucket::adjustCheckpointFlushTimeout(std::chrono::seconds wall_time) {
2715    auto middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
2716
2717    if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
2718        chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
2719    } else if (wall_time <= middle) {
2720        chkFlushTimeout = middle;
2721    } else {
2722        chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
2723    }
2724}
2725
2726std::chrono::seconds VBucket::getCheckpointFlushTimeout() {
2727    return std::chrono::duration_cast<std::chrono::seconds>(
2728            chkFlushTimeout.load());
2729}
2730
2731std::unique_ptr<Item> VBucket::pruneXattrDocument(
2732        StoredValue& v, const ItemMetaData& itemMeta) {
2733    // Need to take a copy of the value, prune it, and add it back
2734
2735    // Create work-space document
2736    std::vector<char> workspace(
2737            v.getValue()->getData(),
2738            v.getValue()->getData() + v.getValue()->valueSize());
2739
2740    // Now attach to the XATTRs in the document
2741    cb::xattr::Blob xattr({workspace.data(), workspace.size()},
2742                          mcbp::datatype::is_snappy(v.getDatatype()));
2743    xattr.prune_user_keys();
2744
2745    auto prunedXattrs = xattr.finalize();
2746
2747    if (prunedXattrs.size()) {
2748        // Something remains - Create a Blob and copy-in just the XATTRs
2749        auto newValue =
2750                Blob::New(reinterpret_cast<const char*>(prunedXattrs.data()),
2751                          prunedXattrs.size());
2752        auto rv = v.toItem(false, getId());
2753        rv->setCas(itemMeta.cas);
2754        rv->setFlags(itemMeta.flags);
2755        rv->setExpTime(itemMeta.exptime);
2756        rv->setRevSeqno(itemMeta.revSeqno);
2757        rv->replaceValue(newValue);
2758        rv->setDataType(PROTOCOL_BINARY_DATATYPE_XATTR);
2759        return rv;
2760    } else {
2761        return {};
2762    }
2763}
2764
2765void VBucket::removeKey(const DocKey& key, int64_t bySeqno) {
2766    auto hbl = ht.getLockedBucket(key);
2767    StoredValue* v = fetchValidValue(
2768            hbl, key, WantsDeleted::No, TrackReference::No, QueueExpired::Yes);
2769
2770    if (v && v->getBySeqno() == bySeqno) {
2771        ht.unlocked_del(hbl, v->getKey());
2772    }
2773}
2774
2775bool VBucket::isLogicallyNonExistent(
2776        const StoredValue& v,
2777        const Collections::VB::Manifest::CachingReadHandle& readHandle) {
2778    return v.isDeleted() || v.isTempDeletedItem() ||
2779           v.isTempNonExistentItem() ||
2780           readHandle.isLogicallyDeleted(v.getBySeqno());
2781}
2782
2783void VBucket::DeferredDeleter::operator()(VBucket* vb) const {
2784    // If the vbucket is marked as deleting then we must schedule task to
2785    // perform the resource destruction (memory/disk).
2786    if (vb->isDeletionDeferred()) {
2787        vb->scheduleDeferredDeletion(engine);
2788        return;
2789    }
2790    delete vb;
2791}
2792
2793void VBucket::setFreqSaturatedCallback(std::function<void()> callbackFunction) {
2794    ht.setFreqSaturatedCallback(callbackFunction);
2795}
2796