xref: /6.6.0/kv_engine/engines/ep/src/ep_bucket.cc (revision 257ba402)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2018 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "ep_bucket.h"
19
20#include "bgfetcher.h"
21#include "bucket_logger.h"
22#include "checkpoint_manager.h"
23#include "collections/manager.h"
24#include "dcp/dcpconnmap.h"
25#include "ep_engine.h"
26#include "ep_time.h"
27#include "ep_vb.h"
28#include "executorpool.h"
29#include "failover-table.h"
30#include "flusher.h"
31#include "item.h"
32#include "persistence_callback.h"
33#include "replicationthrottle.h"
34#include "rollback_result.h"
35#include "statwriter.h"
36#include "tasks.h"
37#include "vb_visitors.h"
38#include "vbucket_state.h"
39#include "warmup.h"
40
41
42#include <platform/timeutils.h>
43#include <utilities/hdrhistogram.h>
44#include <utilities/logtags.h>
45
46#include <gsl.h>
47
48/**
49 * Callback class used by EpStore, for adding relevant keys
50 * to bloomfilter during compaction.
51 */
52class BloomFilterCallback : public Callback<Vbid&, const DocKey&, bool&> {
53public:
54    BloomFilterCallback(KVBucket& eps) : store(eps) {
55    }
56
57    void callback(Vbid& vbucketId, const DocKey& key, bool& isDeleted) {
58        VBucketPtr vb = store.getVBucket(vbucketId);
59        if (vb) {
60            /* Check if a temporary filter has been initialized. If not,
61             * initialize it. If initialization fails, throw an exception
62             * to the caller and let the caller deal with it.
63             */
64            bool tempFilterInitialized = vb->isTempFilterAvailable();
65            if (!tempFilterInitialized) {
66                tempFilterInitialized = initTempFilter(vbucketId);
67            }
68
69            if (!tempFilterInitialized) {
70                throw std::runtime_error(
71                        "BloomFilterCallback::callback: Failed "
72                        "to initialize temporary filter for " +
73                        vbucketId.to_string());
74            }
75
76            if (store.getItemEvictionPolicy() == EvictionPolicy::Value) {
77                /**
78                 * VALUE-ONLY EVICTION POLICY
79                 * Consider deleted items only.
80                 */
81                if (isDeleted) {
82                    vb->addToTempFilter(key);
83                }
84            } else {
85                /**
86                 * FULL EVICTION POLICY
87                 * If vbucket's resident ratio is found to be less than
88                 * the residency threshold, consider all items, otherwise
89                 * consider deleted and non-resident items only.
90                 */
91                bool residentRatioLessThanThreshold =
92                        vb->isResidentRatioUnderThreshold(
93                                store.getBfiltersResidencyThreshold());
94                if (residentRatioLessThanThreshold) {
95                    vb->addToTempFilter(key);
96                } else {
97                    if (isDeleted || !store.isMetaDataResident(vb, key)) {
98                        vb->addToTempFilter(key);
99                    }
100                }
101            }
102        }
103    }
104
105private:
106    bool initTempFilter(Vbid vbucketId);
107    KVBucket& store;
108};
109
110bool BloomFilterCallback::initTempFilter(Vbid vbucketId) {
111    Configuration& config = store.getEPEngine().getConfiguration();
112    VBucketPtr vb = store.getVBucket(vbucketId);
113    if (!vb) {
114        return false;
115    }
116
117    size_t initial_estimation = config.getBfilterKeyCount();
118    size_t estimated_count;
119    size_t num_deletes = 0;
120    try {
121        num_deletes = store.getROUnderlying(vbucketId)->getNumPersistedDeletes(vbucketId);
122    } catch (std::runtime_error& re) {
123        EP_LOG_WARN(
124                "BloomFilterCallback::initTempFilter: runtime error while "
125                "getting "
126                "number of persisted deletes for {} Details: {}",
127                vbucketId,
128                re.what());
129        return false;
130    }
131
132    EvictionPolicy eviction_policy = store.getItemEvictionPolicy();
133    if (eviction_policy == EvictionPolicy::Value) {
134        /**
135         * VALUE-ONLY EVICTION POLICY
136         * Obtain number of persisted deletes from underlying kvstore.
137         * Bloomfilter's estimated_key_count = 1.25 * deletes
138         */
139        estimated_count = round(1.25 * num_deletes);
140    } else {
141        /**
142         * FULL EVICTION POLICY
143         * First determine if the resident ratio of vbucket is less than
144         * the threshold from configuration.
145         */
146        bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
147                store.getBfiltersResidencyThreshold());
148
149        /**
150         * Based on resident ratio against threshold, estimate count.
151         *
152         * 1. If resident ratio is greater than the threshold:
153         * Obtain number of persisted deletes from underlying kvstore.
154         * Obtain number of non-resident-items for vbucket.
155         * Bloomfilter's estimated_key_count =
156         *                              1.25 * (deletes + non-resident)
157         *
158         * 2. Otherwise:
159         * Obtain number of items for vbucket.
160         * Bloomfilter's estimated_key_count =
161         *                              1.25 * (num_items)
162         */
163
164        if (residentRatioAlert) {
165            estimated_count = round(1.25 * vb->getNumItems());
166        } else {
167            estimated_count =
168                    round(1.25 * (num_deletes + vb->getNumNonResidentItems()));
169        }
170    }
171
172    if (estimated_count < initial_estimation) {
173        estimated_count = initial_estimation;
174    }
175
176    vb->initTempFilter(estimated_count, config.getBfilterFpProb());
177
178    return true;
179}
180
181class ExpiredItemsCallback : public Callback<Item&, time_t&> {
182public:
183    ExpiredItemsCallback(KVBucket& store) : epstore(store) {
184    }
185
186    void callback(Item& it, time_t& startTime) {
187        if (epstore.compactionCanExpireItems()) {
188            epstore.deleteExpiredItem(it, startTime, ExpireBy::Compactor);
189        }
190    }
191
192private:
193    KVBucket& epstore;
194};
195
196/**
197 * Callback for notifying flusher about pending mutations.
198 */
199class NotifyFlusherCB : public Callback<Vbid> {
200public:
201    NotifyFlusherCB(KVShard* sh) : shard(sh) {
202    }
203
204    void callback(Vbid& vb) override {
205        if (shard->getBucket(vb)) {
206            shard->getFlusher()->notifyFlushEvent(vb);
207        }
208    }
209
210private:
211    KVShard* shard;
212};
213
214class EPBucket::ValueChangedListener : public ::ValueChangedListener {
215public:
216    ValueChangedListener(EPBucket& bucket) : bucket(bucket) {
217    }
218
219    virtual void sizeValueChanged(const std::string& key,
220                                  size_t value) override {
221        if (key == "flusher_batch_split_trigger") {
222            bucket.setFlusherBatchSplitTrigger(value);
223        } else if (key == "alog_sleep_time") {
224            bucket.setAccessScannerSleeptime(value, false);
225        } else if (key == "alog_task_time") {
226            bucket.resetAccessScannerStartTime();
227        } else {
228            EP_LOG_WARN("Failed to change value for unknown variable, {}", key);
229        }
230    }
231
232    virtual void booleanValueChanged(const std::string& key,
233                                     bool value) override {
234        if (key == "access_scanner_enabled") {
235            if (value) {
236                bucket.enableAccessScannerTask();
237            } else {
238                bucket.disableAccessScannerTask();
239            }
240        } else if (key == "retain_erroneous_tombstones") {
241            bucket.setRetainErroneousTombstones(value);
242        } else  {
243            EP_LOG_WARN("Failed to change value for unknown variable, {}", key);
244        }
245    }
246
247private:
248    EPBucket& bucket;
249};
250
251EPBucket::EPBucket(EventuallyPersistentEngine& theEngine)
252    : KVBucket(theEngine) {
253    auto& config = engine.getConfiguration();
254    const std::string& policy = config.getItemEvictionPolicy();
255    if (policy.compare("value_only") == 0) {
256        eviction_policy = EvictionPolicy::Value;
257    } else {
258        eviction_policy = EvictionPolicy::Full;
259    }
260    replicationThrottle = std::make_unique<ReplicationThrottle>(
261            engine.getConfiguration(), stats);
262
263    vbMap.enablePersistence(*this);
264
265    flusherBatchSplitTrigger = config.getFlusherBatchSplitTrigger();
266    config.addValueChangedListener(
267            "flusher_batch_split_trigger",
268            std::make_unique<ValueChangedListener>(*this));
269
270    retainErroneousTombstones = config.isRetainErroneousTombstones();
271    config.addValueChangedListener(
272           "retain_erroneous_tombstones",
273           std::make_unique<ValueChangedListener>(*this));
274
275    initializeWarmupTask();
276}
277
278EPBucket::~EPBucket() {
279}
280
281bool EPBucket::initialize() {
282    KVBucket::initialize();
283
284    startWarmupTask();
285
286    enableItemPager();
287
288    if (!startBgFetcher()) {
289        EP_LOG_CRITICAL(
290                "EPBucket::initialize: Failed to create and start "
291                "bgFetchers");
292        return false;
293    }
294    startFlusher();
295
296    return true;
297}
298
299void EPBucket::deinitialize() {
300    stopFlusher();
301    stopBgFetcher();
302
303    stopWarmup();
304    KVBucket::deinitialize();
305}
306
307/**
308 * @returns true if the item `candidate` can be de-duplicated (skipped) because
309 * `lastFlushed` already supercedes it.
310 */
311static bool canDeDuplicate(Item* lastFlushed, Item& candidate) {
312    if (!lastFlushed) {
313        // Nothing to de-duplicate against.
314        return false;
315    }
316    if (lastFlushed->getKey() != candidate.getKey()) {
317        // Keys differ - cannot de-dupe.
318        return false;
319    }
320    if (lastFlushed->isCommitted() != candidate.isCommitted()) {
321        // Committed / pending namespace differs - cannot de-dupe.
322        return false;
323    }
324
325    // items match - the candidate must have a lower seqno.
326    Expects(lastFlushed->getBySeqno() > candidate.getBySeqno());
327
328    // Otherwise - valid to de-dupe.
329    return true;
330}
331
332EPBucket::FlushResult EPBucket::flushVBucket(Vbid vbid) {
333    const auto flushStart = std::chrono::steady_clock::now();
334
335    auto vb = getLockedVBucket(vbid, std::try_to_lock);
336    if (!vb.owns_lock()) {
337        // Try another bucket if this one is locked to avoid blocking flusher.
338        return {MoreAvailable::Yes, 0, WakeCkptRemover::No};
339    }
340    if (!vb) {
341        return {MoreAvailable::No, 0, WakeCkptRemover::No};
342    }
343
344    // Obtain the set of items to flush, up to the maximum allowed for
345    // a single flush.
346    auto toFlush = vb->getItemsToPersist(flusherBatchSplitTrigger);
347
348    // Callback must be initialized at persistence
349    Expects(toFlush.flushHandle.get());
350
351    const auto moreAvailable =
352            toFlush.moreAvailable ? MoreAvailable::Yes : MoreAvailable::No;
353
354    // The Flusher will wake up the CheckpointRemover if necessary.
355    const auto wakeupCheckpointRemover =
356            vb->checkpointManager
357                            ->isEligibleForCheckpointRemovalAfterPersistence()
358                    ? WakeCkptRemover::Yes
359                    : WakeCkptRemover::No;
360
361    if (toFlush.items.empty()) {
362        // getItemsToPersist() may move the persistence cursor to a new
363        // checkpoint, so some pending CheckpointPersistence request could be
364        // satisfied now.
365        //
366        // Note: We do not need to notify SeqnoPersistence request here, as
367        //   there is definitely nothing new to notify if we do not flush any
368        //   mutation.
369        handleCheckpointPersistence(*vb);
370
371        return {moreAvailable, 0, wakeupCheckpointRemover};
372    }
373
374    // The range becomes initialised only when an item is flushed
375    boost::optional<snapshot_range_t> range;
376    KVStore* rwUnderlying = getRWUnderlying(vb->getId());
377    vbucket_state vbstate, vbstateRollback;
378
379    while (!rwUnderlying->begin(
380            std::make_unique<EPTransactionContext>(stats, *vb))) {
381        ++stats.beginFailed;
382        EP_LOG_WARN(
383                "Failed to start a transaction!!! "
384                "Retry in 1 sec ...");
385        std::this_thread::sleep_for(std::chrono::seconds(1));
386    }
387
388    rwUnderlying->optimizeWrites(toFlush.items);
389
390    Item* prev = NULL;
391
392    // Read the vbucket_state from disk as many values from the
393    // in-memory vbucket_state may be ahead of what we are flushing.
394    const auto* persistedVbState = rwUnderlying->getVBucketState(vb->getId());
395
396    // The first flush we do populates the cachedVBStates of the KVStore
397    // so we may not (if this is the first flush) have a state returned
398    // from the KVStore.
399    if (persistedVbState) {
400        // Take two copies.
401        // First will be mutated as the new state
402        // Second remains unchanged and will be used on failure
403        vbstateRollback = vbstate = *persistedVbState;
404    }
405    // We need to set a few values from the in-memory state.
406    uint64_t maxSeqno = 0;
407    uint64_t maxVbStateOpCas = 0;
408
409    auto minSeqno = std::numeric_limits<uint64_t>::max();
410
411    // Stores the number of items added to the flush-batch in KVStore.
412    // Note:
413    //  - Does not carry any information on whether the flush-batch is
414    //    successfully persisted or not
415    //  - Does not account set-vbstate items
416    size_t flushBatchSize = 0;
417
418    // Set if we process an explicit set-vbstate item, which requires a flush
419    // to disk regardless of whether we have any other item to flush or not
420    bool mustPersistVBState = false;
421
422    Collections::VB::Flush collectionFlush(vb->getManifest());
423
424    // HCS is optional because we have to update it on disk only if some
425    // Commit/Abort SyncWrite is found in the flush-batch. If we're
426    // flushing Disk checkpoints then the toFlush value may be
427    // supplied. In this case, this should be the HCS received from the
428    // Active node and should be greater than or equal to the HCS for
429    // any other item in this flush batch. This is required because we
430    // send mutations instead of a commits and would not otherwise
431    // update the HCS on disk.
432    boost::optional<uint64_t> hcs = boost::make_optional(false, uint64_t());
433
434    // HPS is optional because we have to update it on disk only if a
435    // prepare is found in the flush-batch
436    // This value is read at warmup to determine what seqno to stop
437    // loading prepares at (there will not be any prepares after this
438    // point) but cannot be used to initialise a PassiveDM after warmup
439    // as this value will advance into snapshots immediately, without
440    // the entire snapshot needing to be persisted.
441    boost::optional<uint64_t> hps = boost::make_optional(false, uint64_t());
442
443    // We always maintain the maxVisibleSeqno at the current value
444    // and only change it to a higher-seqno when a flush of a visible
445    // item is seen. This value must be tracked to provide a correct
446    // snapshot range for non-sync write aware consumers during backfill
447    // (the snapshot should not end on a prepare or an abort, as these
448    // items will not be sent). This value is also used at warmup so
449    // that vbuckets can resume with the same visible seqno as before
450    // the restart.
451    Monotonic<uint64_t> maxVisibleSeqno{vbstate.maxVisibleSeqno};
452
453    if (toFlush.maxDeletedRevSeqno) {
454        vbstate.maxDeletedSeqno = toFlush.maxDeletedRevSeqno.get();
455    }
456
457    VBucket::AggregatedFlushStats aggStats;
458
459    // Iterate through items, checking if we (a) can skip persisting,
460    // (b) can de-duplicate as the previous key was the same, or (c)
461    // actually need to persist.
462    // Note: This assumes items have been sorted by key and then by
463    // seqno (see optimizeWrites() above) such that duplicate keys are
464    // adjacent but with the highest seqno first.
465    // Note(2): The de-duplication here is an optimization to save
466    // creating and enqueuing multiple set() operations on the
467    // underlying KVStore - however the KVStore itself only stores a
468    // single value per key, and so even if we don't de-dupe here the
469    // KVStore will eventually - just potentialy after unnecessary work.
470    for (const auto& item : toFlush.items) {
471        if (!item->shouldPersist()) {
472            continue;
473        }
474
475        const auto op = item->getOperation();
476        if ((op == queue_op::commit_sync_write ||
477             op == queue_op::abort_sync_write) &&
478            toFlush.checkpointType != CheckpointType::Disk) {
479            // If we are receiving a disk snapshot then we want to skip
480            // the HCS update as we will persist a correct one when we
481            // flush the last item. If we were to persist an incorrect
482            // HCS then we would have to backtrack the start seqno of
483            // our warmup to ensure that we do warmup prepares that may
484            // not have been completed if they were completed out of
485            // order.
486            hcs = std::max(hcs.value_or(0), item->getPrepareSeqno());
487        }
488
489        if (item->isVisible() &&
490            static_cast<uint64_t>(item->getBySeqno()) > maxVisibleSeqno) {
491            maxVisibleSeqno = static_cast<uint64_t>(item->getBySeqno());
492        }
493
494        if (op == queue_op::pending_sync_write) {
495            Expects(item->getBySeqno() > 0);
496            hps = std::max(hps.value_or(0),
497                           static_cast<uint64_t>(item->getBySeqno()));
498        }
499
500        if (op == queue_op::set_vbucket_state) {
501            // Only process vbstate if it's sequenced higher (by cas).
502            // We use the cas instead of the seqno here because a
503            // set_vbucket_state does not increment the lastBySeqno in
504            // the CheckpointManager when it is created. This means that
505            // it is possible to have two set_vbucket_state items that
506            // follow one another with the same seqno. The cas will be
507            // bumped for every item so it can be used to distinguish
508            // which item is the latest and should be flushed.
509            if (item->getCas() > maxVbStateOpCas) {
510                // Should only bump the stat once for the latest state
511                // change that we want to flush
512                if (maxVbStateOpCas == 0) {
513                    // There is at least a commit to be done, so
514                    // increase todo
515                    ++stats.flusher_todo;
516                }
517
518                maxVbStateOpCas = item->getCas();
519
520                // It could be the case that the set_vbucket_state is
521                // alone, i.e. no mutations are being flushed, we must
522                // trigger an update of the vbstate, which will always
523                // happen when we set this.
524                mustPersistVBState = true;
525
526                // Process the Item's value into the transition struct
527                vbstate.transition.fromItem(*item);
528            }
529
530            // Register the item for deferred (flush success only) stats update.
531            aggStats.accountItem(*item);
532        } else if (!canDeDuplicate(prev, *item)) {
533            // This is an item we must persist.
534            prev = item.get();
535            ++flushBatchSize;
536
537            if (mcbp::datatype::is_xattr(item->getDataType())) {
538                vbstate.mightContainXattrs = true;
539            }
540
541            flushOneDelOrSet(item, vb.getVB());
542
543            maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
544
545            // Track the lowest seqno, so we can set the HLC epoch
546            minSeqno = std::min(minSeqno, (uint64_t)item->getBySeqno());
547            vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
548            ++stats.flusher_todo;
549
550            if (!range.is_initialized()) {
551                range = snapshot_range_t{
552                        vbstate.lastSnapStart,
553                        toFlush.ranges.empty()
554                                ? vbstate.lastSnapEnd
555                                : toFlush.ranges.back().getEnd()};
556            }
557
558            // Is the item the end item of one of the ranges we're
559            // flushing? Note all the work here only affects replica VBs
560            auto itr = std::find_if(toFlush.ranges.begin(),
561                                    toFlush.ranges.end(),
562                                    [&item](auto& range) {
563                                        return uint64_t(item->getBySeqno()) ==
564                                               range.getEnd();
565                                    });
566
567            // If this is the end item, we can adjust the start of our
568            // flushed range, which would be used for failure purposes.
569            // Primarily by bringing the start to be a consistent point
570            // allows for promotion to active to set the fail-over table
571            // to a consistent point.
572            if (itr != toFlush.ranges.end()) {
573                // Use std::max as the flusher is not visiting in seqno
574                // order.
575                range->setStart(
576                        std::max(range->getStart(), itr->range.getEnd()));
577                // HCS may be weakly monotonic when received via a disk
578                // snapshot so we special case this for the disk
579                // snapshot instead of relaxing the general constraint.
580                if (toFlush.checkpointType == CheckpointType::Disk &&
581                    itr->highCompletedSeqno !=
582                            vbstate.persistedCompletedSeqno) {
583                    hcs = itr->highCompletedSeqno;
584                }
585
586                // Now that the end of a snapshot has been reached,
587                // store the hps tracked by the checkpoint to disk
588                if (itr->highPreparedSeqno) {
589                    auto newHps =
590                            toFlush.checkpointType == CheckpointType::Memory
591                                    ? *(itr->highPreparedSeqno)
592                                    : itr->getEnd();
593                    vbstate.highPreparedSeqno =
594                            std::max(vbstate.highPreparedSeqno, newHps);
595                }
596            }
597        } else {
598            // Item is the same key as the previous[1] one - don't need
599            // to flush to disk.
600            // [1] Previous here really means 'next' - optimizeWrites()
601            //     above has actually re-ordered items such that items
602            //     with the same key are ordered from high->low seqno.
603            //     This means we only write the highest (i.e. newest)
604            //     item for a given key, and discard any duplicate,
605            //     older items.
606
607            // Register the item for deferred (flush success only) stats update.
608            aggStats.accountItem(*item);
609        }
610    }
611
612    // Just return if nothing to flush
613    if (!mustPersistVBState && flushBatchSize == 0) {
614        // The persistence cursor may have moved to a new checkpoint, which may
615        // satisfy pending checkpoint-persistence requests
616        handleCheckpointPersistence(*vb);
617
618        return {moreAvailable, 0, wakeupCheckpointRemover};
619    }
620
621    if (vbstate.transition.state == vbucket_state_active) {
622        if (maxSeqno) {
623            range = snapshot_range_t(maxSeqno, maxSeqno);
624        }
625    }
626
627    // Update VBstate based on the changes we have just made,
628    // then tell the rwUnderlying the 'new' state
629    // (which will persisted as part of the commit() below).
630
631    // only update the snapshot range if items were flushed, i.e.
632    // don't appear to be in a snapshot when you have no data for it
633    // We also update the checkpointType here as this should only
634    // change with snapshots.
635    if (range) {
636        vbstate.lastSnapStart = range->getStart();
637        vbstate.lastSnapEnd = range->getEnd();
638        vbstate.checkpointType = toFlush.checkpointType;
639    }
640
641    // Track the lowest seqno written in spock and record it as
642    // the HLC epoch, a seqno which we can be sure the value has a
643    // HLC CAS.
644    vbstate.hlcCasEpochSeqno = vb->getHLCEpochSeqno();
645    if (vbstate.hlcCasEpochSeqno == HlcCasSeqnoUninitialised &&
646        minSeqno != std::numeric_limits<uint64_t>::max()) {
647        vbstate.hlcCasEpochSeqno = minSeqno;
648
649        // @todo MB-37692: Defer this call at flush-success only or reset
650        //  the value if flush fails.
651        vb->setHLCEpochSeqno(vbstate.hlcCasEpochSeqno);
652    }
653
654    // Do we need to trigger a persist of the state?
655    // If there are no "real" items to flush, and we encountered
656    // a set_vbucket_state meta-item.
657    const auto persistVBStateOnly = mustPersistVBState && (flushBatchSize == 0);
658    const auto options = persistVBStateOnly
659                                 ? VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT
660                                 : VBStatePersist::VBSTATE_CACHE_UPDATE_ONLY;
661
662    if (hcs) {
663        Expects(hcs > vbstate.persistedCompletedSeqno);
664        vbstate.persistedCompletedSeqno = *hcs;
665    }
666
667    if (hps) {
668        Expects(hps > vbstate.persistedPreparedSeqno);
669        vbstate.persistedPreparedSeqno = *hps;
670    }
671
672    vbstate.maxVisibleSeqno = maxVisibleSeqno;
673
674    // @todo MB-37920: This call potentially does 2 things:
675    //   1) update the cached vbstate
676    //   2) persisted the new vbstate
677    // The function returns false if the operation fails. But, (1) may
678    // succeed and (2) may fail, which makes function to return false.
679    // In that case we do not rollback the cached vbstate, which exposes
680    // a wrong on-disk state at that point.
681    // Also, when we re-attempt to flush a set-vbstate item we may fail again
682    // because of the optimization at vbucket_state::needsToBePersisted().
683    if (!rwUnderlying->snapshotVBucket(vb->getId(), vbstate, options)) {
684        // Flush failed, we need to reset the pcursor to the original
685        // position. At the next run the flusher will re-attempt by retrieving
686        // all the items from the disk queue again.
687        toFlush.flushHandle->markFlushFailed();
688
689        return {MoreAvailable::Yes, 0, WakeCkptRemover::No};
690    }
691
692    // We have already flushed the new vbstate if it was the only thing to
693    // flush. All done.
694    if (persistVBStateOnly) {
695        flushSuccessEpilogue(*vb, flushStart, 0, aggStats, collectionFlush);
696
697        return {moreAvailable, 0, wakeupCheckpointRemover};
698    }
699
700    // The flush-batch must be non-empty by logic at this point.
701    Expects(flushBatchSize > 0);
702
703    // Release the memory allocated for vectors in toFlush before we call
704    // into KVStore::commit. This reduces memory peaks (every queued_item in
705    // toFlush.items is a pointer (8 bytes); also, having a big
706    // toFlush.ranges is not likely but may happen).
707    //
708    // Note:
709    //  - std::vector::clear() leaves the capacity of vector unchanged,
710    //    so memory is not released.
711    //  - we cannot rely on clear() + shrink_to_fit() as the latter is a
712    //    non-binding request to reduce capacity() to size(), it depends on
713    //    the implementation whether the request is fulfilled.
714    {
715        const auto itemsToRelease = std::move(toFlush.items);
716        const auto rangesToRelease = std::move(toFlush.ranges);
717    }
718
719    // Persist the flush-batch.
720    const auto flushSuccess =
721            commit(vb->getId(), *rwUnderlying, collectionFlush);
722
723    if (!flushSuccess) {
724        // Flusher failed to commit the batch, rollback vbstate
725        if (rwUnderlying->getVBucketState(vbid)) {
726            *rwUnderlying->getVBucketState(vbid) = vbstateRollback;
727        }
728
729        // Flush failed, we need to reset the pcursor to the original
730        // position. At the next run the flusher will re-attempt by retrieving
731        // all the items from the disk queue again.
732        toFlush.flushHandle->markFlushFailed();
733
734        return {MoreAvailable::Yes, 0, WakeCkptRemover::No};
735    }
736
737    // Note: We want to update the snap-range only if we have flushed at least
738    // one item. I.e. don't appear to be in a snap when you have no data for it
739    Expects(range.is_initialized());
740    vb->setPersistedSnapshot(*range);
741
742    uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
743    if (highSeqno > 0 && highSeqno != vb->getPersistenceSeqno()) {
744        vb->setPersistenceSeqno(highSeqno);
745    }
746
747    // Notify the local DM that the Flusher has run. Persistence
748    // could unblock some pending Prepares in the DM.
749    // If it is the case, this call updates the High Prepared Seqno
750    // for this node.
751    // In the case of a Replica node, that could trigger a SeqnoAck
752    // to the Active.
753    //
754    // Note: This is a NOP if the there's no Prepare queued in DM.
755    //     We could notify the DM only if strictly required (i.e.,
756    //     only when the Flusher has persisted up to the snap-end
757    //     mutation of an in-memory snapshot, see HPS comments in
758    //     PassiveDM for details), but that requires further work.
759    //     The main problem is that in general a flush-batch does
760    //     not coincide with in-memory snapshots (ie, we don't
761    //     persist at snapshot boundaries). So, the Flusher could
762    //     split a single in-memory snapshot into multiple
763    //     flush-batches. That may happen at Replica, e.g.:
764    //
765    //     1) received snap-marker [1, 2]
766    //     2) received 1:PRE
767    //     3) flush-batch {1:PRE}
768    //     4) received 2:mutation
769    //     5) flush-batch {2:mutation}
770    //
771    //     In theory we need to notify the DM only at step (5) and
772    //     only if the the snapshot contains at least 1 Prepare
773    //     (which is the case in our example), but the problem is
774    //     that the Flusher doesn't know about 1:PRE at step (5).
775    //
776    //     So, given that here we are executing in a slow bg-thread
777    //     (write+sync to disk), then we can just afford to calling
778    //     back to the DM unconditionally.
779    vb->notifyPersistenceToDurabilityMonitor();
780
781    flushSuccessEpilogue(
782            *vb, flushStart, flushBatchSize, aggStats, collectionFlush);
783
784    // Handle Seqno Persistence requests
785    vb->notifyHighPriorityRequests(
786            engine, vb->getPersistenceSeqno(), HighPriorityVBNotify::Seqno);
787
788    return {moreAvailable, flushBatchSize, wakeupCheckpointRemover};
789}
790
791void EPBucket::handleCheckpointPersistence(VBucket& vb) const {
792    auto& manager = *vb.checkpointManager;
793    manager.itemsPersisted(); // update pCursorPreCkptId
794    vb.notifyHighPriorityRequests(engine,
795                                  manager.getPersistenceCursorPreChkId(),
796                                  HighPriorityVBNotify::ChkPersistence);
797}
798
799void EPBucket::flushSuccessEpilogue(
800        VBucket& vb,
801        const std::chrono::steady_clock::time_point flushStart,
802        size_t itemsFlushed,
803        const VBucket::AggregatedFlushStats& aggStats,
804        const Collections::VB::Flush& collectionFlush) {
805    // Clear the flag if set (ie, only at vbucket creation)
806    if (vb.setBucketCreation(false)) {
807        EP_LOG_DEBUG("EPBucket::flushSuccessEpilogue: {} created", vb.getId());
808    }
809
810    // Update flush stats
811    const auto flushEnd = std::chrono::steady_clock::now();
812    const auto transTime =
813            std::chrono::duration_cast<std::chrono::milliseconds>(flushEnd -
814                                                                  flushStart)
815                    .count();
816    const auto transTimePerItem =
817            itemsFlushed ? static_cast<double>(transTime) / itemsFlushed : 0;
818    lastTransTimePerItem.store(transTimePerItem);
819    stats.cumulativeFlushTime.fetch_add(transTime);
820    stats.flusher_todo.store(0);
821    stats.totalPersistVBState++;
822
823    vb.doAggregatedFlushStats(aggStats);
824
825    // By definition, does not need to be called if no flush performed or
826    // if flush failed.
827    collectionFlush.checkAndTriggerPurge(vb.getId(), *this);
828
829    // By definition, this function is called after persisting a batch of
830    // data, so it can be safely skipped if no flush performed or if flush
831    // failed.
832    getRWUnderlying(vb.getId())->pendingTasks();
833
834    // The persistence cursor may have moved to a new checkpoint, which may
835    // satisfy pending checkpoint-persistence requests
836    handleCheckpointPersistence(vb);
837}
838
839void EPBucket::setFlusherBatchSplitTrigger(size_t limit) {
840    flusherBatchSplitTrigger = limit;
841}
842
843bool EPBucket::commit(Vbid vbid,
844                      KVStore& kvstore,
845                      Collections::VB::Flush& collectionsFlush) {
846    BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
847    auto commit_start = std::chrono::steady_clock::now();
848
849    const auto res = kvstore.commit(collectionsFlush);
850    if (res) {
851        ++stats.flusherCommits;
852    } else {
853        ++stats.commitFailed;
854        EP_LOG_WARN("KVBucket::commit: kvstore.commit failed {}", vbid);
855    }
856
857    auto commit_end = std::chrono::steady_clock::now();
858    auto commit_time = std::chrono::duration_cast<std::chrono::milliseconds>(
859                               commit_end - commit_start)
860                               .count();
861    stats.commit_time.store(commit_time);
862    stats.cumulativeCommitTime.fetch_add(commit_time);
863
864    return res;
865}
866
867void EPBucket::startFlusher() {
868    for (const auto& shard : vbMap.shards) {
869        shard->getFlusher()->start();
870    }
871}
872
873void EPBucket::stopFlusher() {
874    for (const auto& shard : vbMap.shards) {
875        auto* flusher = shard->getFlusher();
876        EP_LOG_INFO(
877                "Attempting to stop the flusher for "
878                "shard:{}",
879                shard->getId());
880        bool rv = flusher->stop(stats.forceShutdown);
881        if (rv && !stats.forceShutdown) {
882            flusher->wait();
883        }
884    }
885}
886
887bool EPBucket::pauseFlusher() {
888    bool rv = true;
889    for (const auto& shard : vbMap.shards) {
890        auto* flusher = shard->getFlusher();
891        if (!flusher->pause()) {
892            EP_LOG_WARN(
893                    "Attempted to pause flusher in state "
894                    "[{}], shard = {}",
895                    flusher->stateName(),
896                    shard->getId());
897            rv = false;
898        }
899    }
900    return rv;
901}
902
903bool EPBucket::resumeFlusher() {
904    bool rv = true;
905    for (const auto& shard : vbMap.shards) {
906        auto* flusher = shard->getFlusher();
907        if (!flusher->resume()) {
908            EP_LOG_WARN(
909                    "Attempted to resume flusher in state [{}], "
910                    "shard = {}",
911                    flusher->stateName(),
912                    shard->getId());
913            rv = false;
914        }
915    }
916    return rv;
917}
918
919void EPBucket::wakeUpFlusher() {
920    if (stats.diskQueueSize.load() == 0) {
921        for (const auto& shard : vbMap.shards) {
922            shard->getFlusher()->wake();
923        }
924    }
925}
926
927bool EPBucket::startBgFetcher() {
928    for (const auto& shard : vbMap.shards) {
929        BgFetcher* bgfetcher = shard->getBgFetcher();
930        if (bgfetcher == NULL) {
931            EP_LOG_WARN("Failed to start bg fetcher for shard {}",
932                        shard->getId());
933            return false;
934        }
935        bgfetcher->start();
936    }
937    return true;
938}
939
940void EPBucket::stopBgFetcher() {
941    for (const auto& shard : vbMap.shards) {
942        BgFetcher* bgfetcher = shard->getBgFetcher();
943        if (bgfetcher->pendingJob()) {
944            EP_LOG_WARN(
945                    "Shutting down engine while there are still pending data "
946                    "read for shard {} from database storage",
947                    shard->getId());
948        }
949        EP_LOG_INFO("Stopping bg fetcher for shard:{}", shard->getId());
950        bgfetcher->stop();
951    }
952}
953
954ENGINE_ERROR_CODE EPBucket::scheduleCompaction(Vbid vbid,
955                                               const CompactionConfig& c,
956                                               const void* cookie) {
957    ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
958    if (errCode != ENGINE_SUCCESS) {
959        return errCode;
960    }
961
962    /* Obtain the vbucket so we can get the previous purge seqno */
963    VBucketPtr vb = vbMap.getBucket(vbid);
964    if (!vb) {
965        return ENGINE_NOT_MY_VBUCKET;
966    }
967
968    LockHolder lh(compactionLock);
969    ExTask task = std::make_shared<CompactTask>(
970            *this, c, vb->getPurgeSeqno(), cookie);
971    compactionTasks.push_back(std::make_pair(c.db_file_id, task));
972    if (compactionTasks.size() > 1) {
973        if ((stats.diskQueueSize > compactionWriteQueueCap &&
974             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
975            engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
976            // Snooze a new compaction task.
977            // We will wake it up when one of the existing compaction tasks is
978            // done.
979            task->snooze(60);
980        }
981    }
982
983    ExecutorPool::get()->schedule(task);
984
985    EP_LOG_DEBUG(
986            "Scheduled compaction task {} on {},"
987            "purge_before_ts = {}, purge_before_seq = {}, dropdeletes = {}",
988            uint64_t(task->getId()),
989            c.db_file_id,
990            c.purge_before_ts,
991            c.purge_before_seq,
992            c.drop_deletes);
993
994    return ENGINE_EWOULDBLOCK;
995}
996
997ENGINE_ERROR_CODE EPBucket::cancelCompaction(Vbid vbid) {
998    LockHolder lh(compactionLock);
999    for (const auto& task : compactionTasks) {
1000        task.second->cancel();
1001    }
1002    return ENGINE_SUCCESS;
1003}
1004
1005
1006void EPBucket::flushOneDelOrSet(const queued_item& qi, VBucketPtr& vb) {
1007    if (!vb) {
1008        --stats.diskQueueSize;
1009        return;
1010    }
1011
1012    int64_t bySeqno = qi->getBySeqno();
1013    const bool deleted = qi->isDeleted() && !qi->isPending();
1014
1015    std::chrono::microseconds dirtyAge =
1016            std::chrono::duration_cast<std::chrono::microseconds>(
1017                    std::chrono::steady_clock::now() - qi->getQueuedTime());
1018    stats.dirtyAgeHisto.add(dirtyAge);
1019    stats.dirtyAge.store(static_cast<rel_time_t>(dirtyAge.count()));
1020    stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
1021                                         stats.dirtyAgeHighWat.load()));
1022
1023    KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
1024    if (!deleted) {
1025        // TODO: Need to separate disk_insert from disk_update because
1026        // bySeqno doesn't give us that information.
1027        BlockTimer timer(
1028                bySeqno == -1 ? &stats.diskInsertHisto : &stats.diskUpdateHisto,
1029                bySeqno == -1 ? "disk_insert" : "disk_update",
1030                stats.timingLog);
1031        if (qi->isSystemEvent()) {
1032            rwUnderlying->setSystemEvent(qi);
1033        } else {
1034            rwUnderlying->set(qi);
1035        }
1036    } else {
1037        HdrMicroSecBlockTimer timer(
1038                &stats.diskDelHisto, "disk_delete", stats.timingLog);
1039        if (qi->isSystemEvent()) {
1040            rwUnderlying->delSystemEvent(qi);
1041        } else {
1042            rwUnderlying->del(qi);
1043        }
1044    }
1045}
1046
1047void EPBucket::dropKey(Vbid vbid, const DiskDocKey& diskKey, int64_t bySeqno) {
1048    auto vb = getVBucket(vbid);
1049    if (!vb) {
1050        return;
1051    }
1052
1053    auto docKey = diskKey.getDocKey();
1054    auto collectionId = docKey.getCollectionID();
1055    if (collectionId.isSystem()) {
1056        throw std::logic_error("EPBucket::dropKey called for a system key");
1057    }
1058
1059    { // collections read lock scope
1060        // @todo this lock could be removed - fetchValidValue requires it
1061        // in-case of expiry, however dropKey doesn't generate expired values
1062        auto cHandle = vb->lockCollections(docKey);
1063
1064        // ... drop it from the VB (hashtable)
1065        // @todo-durability: If prepared need to remove it from the Durability
1066        // Monitor (in-flight prepared SyncWrite from a collection which no
1067        // longer exists == abort the SyncWrite).
1068        Expects(diskKey.isCommitted());
1069        vb->dropKey(bySeqno, cHandle);
1070    }
1071}
1072
1073void EPBucket::compactInternal(const CompactionConfig& config,
1074                               uint64_t purgeSeqno) {
1075    compaction_ctx ctx(config, purgeSeqno);
1076
1077    BloomFilterCBPtr filter(new BloomFilterCallback(*this));
1078    ctx.bloomFilterCallback = filter;
1079
1080    ExpiredItemsCBPtr expiry(new ExpiredItemsCallback(*this));
1081    ctx.expiryCallback = expiry;
1082
1083    ctx.droppedKeyCb = std::bind(&EPBucket::dropKey,
1084                                 this,
1085                                 config.db_file_id,
1086                                 std::placeholders::_1,
1087                                 std::placeholders::_2);
1088
1089    KVShard* shard = vbMap.getShardByVbId(config.db_file_id);
1090    KVStore* store = shard->getRWUnderlying();
1091    bool result = store->compactDB(&ctx);
1092
1093    /* Iterate over all the vbucket ids set in max_purged_seq map. If there is
1094     * an entry
1095     * in the map for a vbucket id, then it was involved in compaction and thus
1096     * can
1097     * be used to update the associated bloom filters and purge sequence numbers
1098     */
1099    VBucketPtr vb = getVBucket(config.db_file_id);
1100    if (vb) {
1101        if (getEPEngine().getConfiguration().isBfilterEnabled() && result) {
1102            vb->swapFilter();
1103        } else {
1104            vb->clearFilter();
1105        }
1106        vb->setPurgeSeqno(ctx.max_purged_seq);
1107        vb->setNumTotalItems(vb->getNumTotalItems() -
1108                             ctx.stats.collectionsItemsPurged);
1109    }
1110
1111    EP_LOG_INFO(
1112            "Compaction of {} done ({}). "
1113            "purged tombstones:{}, prepares:{}, "
1114            "collection_items_erased:alive:{},deleted:{}, "
1115            "size/items/tombstones/purge_seqno pre{{{}, {}, {}, {}}}, "
1116            "post{{{}, {}, {}, {}}}",
1117            config.db_file_id,
1118            result ? "ok" : "failed",
1119            ctx.stats.tombstonesPurged,
1120            ctx.stats.preparesPurged,
1121            ctx.stats.collectionsItemsPurged,
1122            ctx.stats.collectionsDeletedItemsPurged,
1123            ctx.stats.pre.size,
1124            ctx.stats.pre.items,
1125            ctx.stats.pre.deletedItems,
1126            ctx.stats.pre.purgeSeqno,
1127            ctx.stats.post.size,
1128            ctx.stats.post.items,
1129            ctx.stats.post.deletedItems,
1130            ctx.stats.post.purgeSeqno);
1131}
1132
1133bool EPBucket::doCompact(const CompactionConfig& config,
1134                         uint64_t purgeSeqno,
1135                         const void* cookie) {
1136    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1137    StorageProperties storeProp = getStorageProperties();
1138    bool concWriteCompact = storeProp.hasConcWriteCompact();
1139    Vbid vbid = config.db_file_id;
1140
1141    /**
1142     * Check if the underlying storage engine allows writes concurrently
1143     * as the database file is being compacted. If not, a lock needs to
1144     * be held in order to serialize access to the database file between
1145     * the writer and compactor threads
1146     */
1147    if (concWriteCompact == false) {
1148        auto vb = getLockedVBucket(vbid, std::try_to_lock);
1149        if (!vb.owns_lock()) {
1150            // VB currently locked; try again later.
1151            return true;
1152        }
1153
1154        if (!vb) {
1155            err = ENGINE_NOT_MY_VBUCKET;
1156            engine.storeEngineSpecific(cookie, NULL);
1157            /**
1158             * Decrement session counter here, as memcached thread wouldn't
1159             * visit the engine interface in case of a NOT_MY_VB notification
1160             */
1161            engine.decrementSessionCtr();
1162        } else {
1163            compactInternal(config, purgeSeqno);
1164        }
1165    } else {
1166        compactInternal(config, purgeSeqno);
1167    }
1168
1169    updateCompactionTasks(vbid);
1170
1171    if (cookie) {
1172        engine.notifyIOComplete(cookie, err);
1173    }
1174    --stats.pendingCompactions;
1175    return false;
1176}
1177
1178void EPBucket::updateCompactionTasks(Vbid db_file_id) {
1179    LockHolder lh(compactionLock);
1180    bool erased = false, woke = false;
1181    std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1182    while (it != compactionTasks.end()) {
1183        if ((*it).first == db_file_id) {
1184            it = compactionTasks.erase(it);
1185            erased = true;
1186        } else {
1187            ExTask& task = (*it).second;
1188            if (task->getState() == TASK_SNOOZED) {
1189                ExecutorPool::get()->wake(task->getId());
1190                woke = true;
1191            }
1192            ++it;
1193        }
1194        if (erased && woke) {
1195            break;
1196        }
1197    }
1198}
1199
1200std::pair<uint64_t, bool> EPBucket::getLastPersistedCheckpointId(Vbid vb) {
1201    auto vbucket = vbMap.getBucket(vb);
1202    if (vbucket) {
1203        return {vbucket->checkpointManager->getPersistenceCursorPreChkId(),
1204                true};
1205    } else {
1206        return {0, true};
1207    }
1208}
1209
1210ENGINE_ERROR_CODE EPBucket::getFileStats(const void* cookie,
1211                                         const AddStatFn& add_stat) {
1212    const auto numShards = vbMap.getNumShards();
1213    DBFileInfo totalInfo;
1214
1215    for (uint16_t shardId = 0; shardId < numShards; shardId++) {
1216        const auto dbInfo =
1217                getRWUnderlyingByShard(shardId)->getAggrDbFileInfo();
1218        totalInfo.spaceUsed += dbInfo.spaceUsed;
1219        totalInfo.fileSize += dbInfo.fileSize;
1220    }
1221
1222    add_casted_stat("ep_db_data_size", totalInfo.spaceUsed, add_stat, cookie);
1223    add_casted_stat("ep_db_file_size", totalInfo.fileSize, add_stat, cookie);
1224
1225    return ENGINE_SUCCESS;
1226}
1227
1228ENGINE_ERROR_CODE EPBucket::getPerVBucketDiskStats(const void* cookie,
1229                                                   const AddStatFn& add_stat) {
1230    class DiskStatVisitor : public VBucketVisitor {
1231    public:
1232        DiskStatVisitor(const void* c, const AddStatFn& a)
1233            : cookie(c), add_stat(a) {
1234        }
1235
1236        void visitBucket(const VBucketPtr& vb) override {
1237            char buf[32];
1238            Vbid vbid = vb->getId();
1239            try {
1240                auto dbInfo =
1241                        vb->getShard()->getRWUnderlying()->getDbFileInfo(vbid);
1242
1243                checked_snprintf(
1244                        buf, sizeof(buf), "vb_%d:data_size", vbid.get());
1245                add_casted_stat(buf, dbInfo.spaceUsed, add_stat, cookie);
1246                checked_snprintf(
1247                        buf, sizeof(buf), "vb_%d:file_size", vbid.get());
1248                add_casted_stat(buf, dbInfo.fileSize, add_stat, cookie);
1249            } catch (std::exception& error) {
1250                EP_LOG_WARN(
1251                        "DiskStatVisitor::visitBucket: Failed to build stat: "
1252                        "{}",
1253                        error.what());
1254            }
1255        }
1256
1257    private:
1258        const void* cookie;
1259        AddStatFn add_stat;
1260    };
1261
1262    DiskStatVisitor dsv(cookie, add_stat);
1263    visit(dsv);
1264    return ENGINE_SUCCESS;
1265}
1266
1267VBucketPtr EPBucket::makeVBucket(
1268        Vbid id,
1269        vbucket_state_t state,
1270        KVShard* shard,
1271        std::unique_ptr<FailoverTable> table,
1272        NewSeqnoCallback newSeqnoCb,
1273        std::unique_ptr<Collections::VB::Manifest> manifest,
1274        vbucket_state_t initState,
1275        int64_t lastSeqno,
1276        uint64_t lastSnapStart,
1277        uint64_t lastSnapEnd,
1278        uint64_t purgeSeqno,
1279        uint64_t maxCas,
1280        int64_t hlcEpochSeqno,
1281        bool mightContainXattrs,
1282        const nlohmann::json& replicationTopology,
1283        uint64_t maxVisibleSeqno) {
1284    auto flusherCb = std::make_shared<NotifyFlusherCB>(shard);
1285    // Not using make_shared or allocate_shared
1286    // 1. make_shared doesn't accept a Deleter
1287    // 2. allocate_shared has inconsistencies between platforms in calling
1288    //    alloc.destroy (libc++ doesn't call it)
1289    return VBucketPtr(new EPVBucket(id,
1290                                    state,
1291                                    stats,
1292                                    engine.getCheckpointConfig(),
1293                                    shard,
1294                                    lastSeqno,
1295                                    lastSnapStart,
1296                                    lastSnapEnd,
1297                                    std::move(table),
1298                                    flusherCb,
1299                                    std::move(newSeqnoCb),
1300                                    makeSyncWriteResolvedCB(),
1301                                    makeSyncWriteCompleteCB(),
1302                                    makeSeqnoAckCB(),
1303                                    engine.getConfiguration(),
1304                                    eviction_policy,
1305                                    std::move(manifest),
1306                                    initState,
1307                                    purgeSeqno,
1308                                    maxCas,
1309                                    hlcEpochSeqno,
1310                                    mightContainXattrs,
1311                                    replicationTopology,
1312                                    maxVisibleSeqno),
1313                      VBucket::DeferredDeleter(engine));
1314}
1315
1316ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key,
1317                                      Vbid vbucket,
1318                                      const void* cookie) {
1319    VBucketPtr vb = getVBucket(vbucket);
1320    if (!vb) {
1321        return ENGINE_NOT_MY_VBUCKET;
1322    }
1323
1324    return vb->statsVKey(key, cookie, engine);
1325}
1326
1327void EPBucket::completeStatsVKey(const void* cookie,
1328                                 const DocKey& key,
1329                                 Vbid vbid,
1330                                 uint64_t bySeqNum) {
1331    GetValue gcb = getROUnderlying(vbid)->get(DiskDocKey{key}, vbid);
1332
1333    if (eviction_policy == EvictionPolicy::Full) {
1334        VBucketPtr vb = getVBucket(vbid);
1335        if (vb) {
1336            vb->completeStatsVKey(key, gcb);
1337        }
1338    }
1339
1340    if (gcb.getStatus() == ENGINE_SUCCESS) {
1341        engine.addLookupResult(cookie, std::move(gcb.item));
1342    } else {
1343        engine.addLookupResult(cookie, NULL);
1344    }
1345
1346    --stats.numRemainingBgJobs;
1347    engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1348}
1349
1350/**
1351 * Class that handles the disk callback during the rollback.
1352 * For each mutation/deletion which was discarded as part of the rollback,
1353 * the callback() method is invoked with the key of the discarded update.
1354 * It can then lookup the state of that key using dbHandle (which represents the
1355 * new, rolled-back file) and correct the in-memory view:
1356 *
1357 * a) If the key is not present in the Rollback header then delete it from
1358 *    the HashTable (if either didn't exist yet, or had previously been
1359 *    deleted in the Rollback header).
1360 * b) If the key is present in the Rollback header then replace the in-memory
1361 *    value with the value from the Rollback header.
1362 */
1363class EPDiskRollbackCB : public RollbackCB {
1364public:
1365    EPDiskRollbackCB(EventuallyPersistentEngine& e, uint64_t rollbackSeqno)
1366        : RollbackCB(), engine(e), rollbackSeqno(rollbackSeqno) {
1367    }
1368
1369    void callback(GetValue& val) {
1370        if (!val.item) {
1371            throw std::invalid_argument(
1372                    "EPDiskRollbackCB::callback: val is NULL");
1373        }
1374        if (dbHandle == nullptr) {
1375            throw std::logic_error(
1376                    "EPDiskRollbackCB::callback: dbHandle is NULL");
1377        }
1378
1379        // Skip system keys, they aren't stored in the hashtable
1380        if (val.item->getKey().getCollectionID().isSystem()) {
1381            return;
1382        }
1383
1384        // This is the item in its current state, after the rollback seqno
1385        // (i.e. the state that we are reverting)
1386        UniqueItemPtr postRbSeqnoItem(std::move(val.item));
1387
1388        VBucketPtr vb = engine.getVBucket(postRbSeqnoItem->getVBucketId());
1389
1390        // Nuke anything in the prepare namespace, we'll do a "warmup" later
1391        // which will restore everything to the way it should be and this is
1392        // far easier than dealing with individual states.
1393        if (postRbSeqnoItem->isPending() || postRbSeqnoItem->isAbort()) {
1394            // Log any prepares with majority level as they are vulnerable to
1395            // being "lost" to an active bounce if it comes back up within the
1396            // failover window. Only log from the rollback seqno as the active
1397            // will have any that came before this.
1398            if (postRbSeqnoItem->isPending() &&
1399                postRbSeqnoItem->getDurabilityReqs().getLevel() ==
1400                        cb::durability::Level::Majority &&
1401                postRbSeqnoItem->getBySeqno() >=
1402                        static_cast<int64_t>(rollbackSeqno)) {
1403                EP_LOG_INFO(
1404                        "({}) Rolling back a Majority level prepare with "
1405                        "key:{} and seqno:{}",
1406                        vb->getId(),
1407                        cb::UserData(postRbSeqnoItem->getKey().to_string()),
1408                        postRbSeqnoItem->getBySeqno());
1409            }
1410            removeDeletedDoc(*vb, *postRbSeqnoItem);
1411            return;
1412        }
1413
1414        EP_LOG_DEBUG("EPDiskRollbackCB: Handling post rollback item: {}",
1415                     *postRbSeqnoItem);
1416
1417        // The get value of the item before the rollback seqno
1418        GetValue preRbSeqnoGetValue =
1419                engine.getKVBucket()
1420                        ->getROUnderlying(postRbSeqnoItem->getVBucketId())
1421                        ->getWithHeader(dbHandle,
1422                                        DiskDocKey{*postRbSeqnoItem},
1423                                        postRbSeqnoItem->getVBucketId(),
1424                                        GetMetaOnly::No);
1425
1426        // This is the item in the state it was before the rollback seqno
1427        // (i.e. the desired state). null if there was no previous
1428        // Item.
1429        UniqueItemPtr preRbSeqnoItem(std::move(preRbSeqnoGetValue.item));
1430
1431        if (preRbSeqnoGetValue.getStatus() == ENGINE_SUCCESS) {
1432            EP_LOG_DEBUG(
1433                    "EPDiskRollbackCB: Item existed pre-rollback; restoring to "
1434                    "pre-rollback state: {}",
1435                    *preRbSeqnoItem);
1436
1437            if (preRbSeqnoItem->isDeleted()) {
1438                // If the item existed before, but had been deleted, we
1439                // should delete it now
1440                removeDeletedDoc(*vb, *postRbSeqnoItem);
1441            } else {
1442                // The item existed before and was not deleted, we need to
1443                // revert the items state to the preRollbackSeqno state
1444                MutationStatus mtype = vb->setFromInternal(*preRbSeqnoItem);
1445                switch (mtype) {
1446                case MutationStatus::NotFound:
1447                    // NotFound is valid - if the item has been deleted
1448                    // in-memory, but that was not flushed to disk as of
1449                    // post-rollback seqno.
1450                    break;
1451                case MutationStatus::WasClean:
1452                    // Item hasn't been modified since it was persisted to disk
1453                    // as of post-rollback seqno.
1454                    break;
1455                case MutationStatus::WasDirty:
1456                    // Item was modifed since it was persisted to disk - this
1457                    // is ok because it's just a mutation which has not yet
1458                    // been persisted to disk as of post-rollback seqno.
1459                    break;
1460                case MutationStatus::NoMem:
1461                    setStatus(ENGINE_ENOMEM);
1462                    break;
1463                case MutationStatus::InvalidCas:
1464                case MutationStatus::IsLocked:
1465                case MutationStatus::NeedBgFetch:
1466                case MutationStatus::IsPendingSyncWrite:
1467                    std::stringstream ss;
1468                    ss << "EPDiskRollbackCB: Unexpected status:"
1469                       << to_string(mtype)
1470                       << " after setFromInternal for item:" << *preRbSeqnoItem;
1471                    throw std::logic_error(ss.str());
1472                }
1473
1474                // If we are rolling back a deletion then we should increment
1475                // our disk counts. We need to increment the vBucket disk
1476                // count here too because we're not going to flush this item
1477                // later
1478                if (postRbSeqnoItem->isDeleted() &&
1479                    postRbSeqnoItem->isCommitted()) {
1480                    vb->incrNumTotalItems();
1481                    vb->getManifest()
1482                            .lock(preRbSeqnoItem->getKey())
1483                            .incrementDiskCount();
1484                }
1485            }
1486        } else if (preRbSeqnoGetValue.getStatus() == ENGINE_KEY_ENOENT) {
1487            // If the item did not exist before we should delete it now
1488            removeDeletedDoc(*vb, *postRbSeqnoItem);
1489        } else {
1490            EP_LOG_WARN(
1491                    "EPDiskRollbackCB::callback:Unexpected Error Status: {}",
1492                    preRbSeqnoGetValue.getStatus());
1493        }
1494    }
1495
1496    /// Remove a deleted-on-disk document from the VBucket's hashtable.
1497    void removeDeletedDoc(VBucket& vb, const Item& item) {
1498        if (vb.removeItemFromMemory(item)) {
1499            setStatus(ENGINE_SUCCESS);
1500        } else {
1501            // Document didn't exist in memory - may have been deleted in since
1502            // the checkpoint.
1503            setStatus(ENGINE_KEY_ENOENT);
1504        }
1505
1506        if (!item.isDeleted() && item.isCommitted()) {
1507            // Irrespective of if the in-memory delete succeeded; the document
1508            // doesn't exist on disk; so decrement the item count.
1509            vb.decrNumTotalItems();
1510            vb.getManifest().lock(item.getKey()).decrementDiskCount();
1511        }
1512    }
1513
1514private:
1515    EventuallyPersistentEngine& engine;
1516
1517    /// The seqno to which we are rolling back
1518    uint64_t rollbackSeqno;
1519};
1520
1521RollbackResult EPBucket::doRollback(Vbid vbid, uint64_t rollbackSeqno) {
1522    auto cb = std::make_shared<EPDiskRollbackCB>(engine, rollbackSeqno);
1523    KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
1524    auto result = rwUnderlying->rollback(vbid, rollbackSeqno, cb);
1525    return result;
1526}
1527
1528void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) {
1529    std::vector<queued_item> items;
1530
1531    // Iterate until we have no more items for the persistence cursor
1532    CheckpointManager::ItemsForCursor itemsForCursor;
1533    do {
1534        itemsForCursor =
1535                vb.checkpointManager->getNextItemsForPersistence(items);
1536        // RAII callback, need to trigger it manually here
1537        itemsForCursor.flushHandle.reset();
1538
1539        for (const auto& item : items) {
1540            if (item->getBySeqno() <= rollbackSeqno ||
1541                item->isCheckPointMetaItem() ||
1542                item->getKey().getCollectionID().isSystem()) {
1543                continue;
1544            }
1545
1546            // Currently we remove prepares from the HashTable on completion but
1547            // they may still exist on disk. If we were to reload the prepare
1548            // from disk, because we have a new unpersisted one, then we would
1549            // end up in an inconsistent state to pre-rollback. Just remove the
1550            // prepare from the HashTable. We will "warm up" any incomplete
1551            // prepares in a later stage of rollback.
1552            if (item->isPending()) {
1553                EP_LOG_INFO(
1554                        "({}) Rolling back an unpersisted {} prepare with "
1555                        "key:{} and seqno:{}",
1556                        vb.getId(),
1557                        to_string(item->getDurabilityReqs().getLevel()),
1558                        cb::UserData(item->getKey().to_string()),
1559                        item->getBySeqno());
1560                vb.removeItemFromMemory(*item);
1561                continue;
1562            }
1563
1564            if (item->isAbort()) {
1565                EP_LOG_INFO(
1566                        "({}) Rolling back an unpersisted abort with "
1567                        "key:{} and seqno:{}",
1568                        vb.getId(),
1569                        cb::UserData(item->getKey().to_string()),
1570                        item->getBySeqno());
1571                // Aborts are not kept in the hashtable so do not
1572                // need to be removed.
1573                continue;
1574            }
1575
1576            // Committed items only past this point
1577            GetValue gcb = getROUnderlying(vb.getId())
1578                                   ->get(DiskDocKey{*item}, vb.getId());
1579
1580            if (gcb.getStatus() == ENGINE_SUCCESS) {
1581                vb.setFromInternal(*gcb.item.get());
1582            } else {
1583                vb.removeItemFromMemory(*item);
1584            }
1585        }
1586    } while (itemsForCursor.moreAvailable);
1587}
1588
1589// Perform an in-order scan of the seqno index.
1590// a) For each Prepared item found, add to a map of outstanding Prepares.
1591// b) For each Committed (via Mutation or Prepare) item, if there's an
1592//    outstanding Prepare then that prepare has already been Committed,
1593//    hence remove it from the map.
1594//
1595// At the end of the scan, all outstanding Prepared items (which did not
1596// have a Commit persisted to disk) will be registered with the Durability
1597// Monitor.
1598EPBucket::LoadPreparedSyncWritesResult EPBucket::loadPreparedSyncWrites(
1599        folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) {
1600    /// Disk load callback for scan.
1601    struct LoadSyncWrites : public StatusCallback<GetValue> {
1602        LoadSyncWrites(EPVBucket& vb, uint64_t highPreparedSeqno)
1603            : vb(vb), highPreparedSeqno(highPreparedSeqno) {
1604        }
1605
1606        void callback(GetValue& val) override {
1607            // Abort the scan early if we have passed the HPS as we don't need
1608            // to load any more prepares.
1609            if (val.item->getBySeqno() >
1610                static_cast<int64_t>(highPreparedSeqno)) {
1611                // ENOMEM may seem like an odd status code to abort the scan but
1612                // disk backfill to a given seqno also returns ENGINE_ENOMEM
1613                // when it has received all the seqnos that it cares about to
1614                // abort the scan.
1615                setStatus(ENGINE_ENOMEM);
1616                return;
1617            }
1618
1619            itemsVisited++;
1620            if (val.item->isPending()) {
1621                // Pending item which was not aborted (deleted). Add to
1622                // outstanding Prepare map.
1623                outstandingPrepares.emplace(val.item->getKey(),
1624                                            std::move(val.item));
1625                return;
1626            }
1627
1628            if (val.item->isCommitted()) {
1629                // Committed item. _If_ there's an outstanding prepared
1630                // SyncWrite, remove it (as it has already been committed).
1631                outstandingPrepares.erase(val.item->getKey());
1632                return;
1633            }
1634        }
1635
1636        EPVBucket& vb;
1637
1638        // HPS after which we can abort the scan
1639        uint64_t highPreparedSeqno = std::numeric_limits<uint64_t>::max();
1640
1641        // Number of items our callback "visits". Used to validate how many
1642        // items we look at when loading SyncWrites.
1643        uint64_t itemsVisited = 0;
1644
1645        /// Map of Document key -> outstanding (not yet Committed / Aborted)
1646        /// prepares.
1647        std::unordered_map<StoredDocKey, std::unique_ptr<Item>>
1648                outstandingPrepares;
1649    };
1650
1651    auto& epVb = dynamic_cast<EPVBucket&>(vb);
1652    const auto start = std::chrono::steady_clock::now();
1653
1654    // Get the kvStore. Using the RW store as the rollback code that will call
1655    // this function will modify vbucket_state that will only be reflected in
1656    // RW store. For warmup case, we don't allow writes at this point in time
1657    // anyway.
1658    auto* kvStore = getRWUnderlyingByShard(epVb.getShard()->getId());
1659
1660    // Need the HPS/HCS so the DurabilityMonitor can be fully resumed
1661    auto vbState = kvStore->getVBucketState(epVb.getId());
1662    if (!vbState) {
1663        throw std::logic_error("EPBucket::loadPreparedSyncWrites: processing " +
1664                               epVb.getId().to_string() +
1665                               ", but found no vbucket_state");
1666    }
1667
1668    // Insert all outstanding Prepares into the VBucket (HashTable &
1669    // DurabilityMonitor).
1670    std::vector<queued_item> prepares;
1671    if (vbState->persistedPreparedSeqno == vbState->persistedCompletedSeqno) {
1672        // We don't need to warm up anything for this vBucket as all of our
1673        // prepares have been completed, but we do need to create the DM
1674        // with our vbucket_state.
1675        epVb.loadOutstandingPrepares(vbStateLh, *vbState, std::move(prepares));
1676        // No prepares loaded
1677        return {0, 0};
1678    }
1679
1680    // We optimise this step by starting the scan at the seqno following the
1681    // Persisted Completed Seqno. By definition, all earlier prepares have been
1682    // completed (Committed or Aborted).
1683    uint64_t startSeqno = vbState->persistedCompletedSeqno + 1;
1684
1685    // The seqno up to which we will scan for SyncWrites
1686    uint64_t endSeqno = vbState->persistedPreparedSeqno;
1687
1688    // If we are were in the middle of receiving/persisting a Disk snapshot then
1689    // we cannot solely rely on the PCS and PPS due to de-dupe/out of order
1690    // commit. We could have our committed item higher than the HPS if we do a
1691    // normal mutation after a SyncWrite and we have not yet fully persisted the
1692    // disk snapshot to correct the high completed seqno. In this case, we need
1693    // to read the rest of the disk snapshot to ensure that we pick up any
1694    // completions of prepares that we may attempt to warm up.
1695    //
1696    // Example:
1697    //
1698    // Relica receives Disk snapshot
1699    // [1:Prepare(a), 2:Prepare(b), 3:Mutation(a)...]
1700    //
1701    // After receiving and flushing the mutation at seqno 3, the replica has:
1702    // HPS - 0 (only moves on snapshot end)
1703    // HCS - 1 (the DM takes care of this)
1704    // PPS - 2
1705    // PCS - 0 (we can only move the PCS correctly at snap-end)
1706    //
1707    // If we warmup in this case then we load SyncWrites from seqno 1 to 2. If
1708    // we do this then we will skip the logical completion at seqno 3 for the
1709    // prepare at seqno 1. This will cause us to have a completed SyncWrite in
1710    // the DM when we transition to memory which will block any further
1711    // SyncWrite completions on this node.
1712    if (vbState->checkpointType == CheckpointType::Disk &&
1713        static_cast<uint64_t>(vbState->highSeqno) != vbState->lastSnapEnd) {
1714        endSeqno = vbState->highSeqno;
1715    }
1716
1717    auto storageCB = std::make_shared<LoadSyncWrites>(epVb, endSeqno);
1718
1719    // Don't expect to find anything already in the HashTable, so use
1720    // NoLookupCallback.
1721    auto cacheCB = std::make_shared<NoLookupCallback>();
1722
1723    // Use ALL_ITEMS filter for the scan. NO_DELETES is insufficient
1724    // because (committed) SyncDeletes manifest as a prepared_sync_write
1725    // (doc on disk not deleted) followed by a commit_sync_write (which
1726    // *is* marked as deleted as that's the resulting state).
1727    // We need to see that Commit, hence ALL_ITEMS.
1728    const auto docFilter = DocumentFilter::ALL_ITEMS;
1729    const auto valFilter = getValueFilterForCompressionMode();
1730
1731    auto* scanCtx = kvStore->initScanContext(
1732            storageCB, cacheCB, epVb.getId(), startSeqno, docFilter, valFilter);
1733
1734    // Storage problems can lead to a null context, kvstore logs details
1735    if (!scanCtx) {
1736        EP_LOG_CRITICAL(
1737                "EPBucket::loadPreparedSyncWrites: scanCtx is null for {}",
1738                epVb.getId());
1739        // No prepares loaded
1740        return {0, 0};
1741    }
1742
1743    auto scanResult = kvStore->scan(scanCtx);
1744
1745    // If we abort our scan early due to reaching the HPS then the scan result
1746    // will be failure but we will have scanned correctly.
1747    if (storageCB->getStatus() != ENGINE_ENOMEM) {
1748        Expects(scanResult == scan_success);
1749    }
1750
1751    kvStore->destroyScanContext(scanCtx);
1752
1753    EP_LOG_DEBUG(
1754            "EPBucket::loadPreparedSyncWrites: Identified {} outstanding "
1755            "prepared SyncWrites for {} in {}",
1756            storageCB->outstandingPrepares.size(),
1757            epVb.getId(),
1758            cb::time2text(std::chrono::steady_clock::now() - start));
1759
1760    // Insert all outstanding Prepares into the VBucket (HashTable &
1761    // DurabilityMonitor).
1762    prepares.reserve(storageCB->outstandingPrepares.size());
1763    for (auto& prepare : storageCB->outstandingPrepares) {
1764        prepares.emplace_back(std::move(prepare.second));
1765    }
1766    // Sequence must be sorted by seqno (ascending) for DurabilityMonitor.
1767    std::sort(
1768            prepares.begin(), prepares.end(), [](const auto& a, const auto& b) {
1769                return a->getBySeqno() < b->getBySeqno();
1770            });
1771
1772    auto numPrepares = prepares.size();
1773    epVb.loadOutstandingPrepares(vbStateLh, *vbState, std::move(prepares));
1774    return {storageCB->itemsVisited, numPrepares};
1775}
1776
1777ValueFilter EPBucket::getValueFilterForCompressionMode() {
1778    auto compressionMode = engine.getCompressionMode();
1779    if (compressionMode != BucketCompressionMode::Off) {
1780        return ValueFilter::VALUES_COMPRESSED;
1781    }
1782
1783    return ValueFilter::VALUES_DECOMPRESSED;
1784}
1785
1786void EPBucket::notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) {
1787    if (notifyCtx.notifyFlusher) {
1788        notifyFlusher(vbid);
1789    }
1790    if (notifyCtx.notifyReplication) {
1791        notifyReplication(vbid, notifyCtx.bySeqno, notifyCtx.syncWrite);
1792    }
1793}
1794
1795Warmup* EPBucket::getWarmup(void) const {
1796    return warmupTask.get();
1797}
1798
1799bool EPBucket::isWarmingUp() {
1800    return warmupTask && !warmupTask->isComplete();
1801}
1802
1803bool EPBucket::isWarmupOOMFailure() {
1804    return warmupTask && warmupTask->hasOOMFailure();
1805}
1806
1807bool EPBucket::maybeWaitForVBucketWarmup(const void* cookie) {
1808    if (warmupTask) {
1809        return warmupTask->maybeWaitForVBucketWarmup(cookie);
1810    }
1811    return false;
1812}
1813
1814void EPBucket::initializeWarmupTask() {
1815    if (engine.getConfiguration().isWarmup()) {
1816        warmupTask = std::make_unique<Warmup>(*this, engine.getConfiguration());
1817    }
1818}
1819
1820void EPBucket::startWarmupTask() {
1821    if (warmupTask) {
1822        warmupTask->start();
1823    } else {
1824        // No warmup, immediately online the bucket.
1825        warmupCompleted();
1826    }
1827}
1828
1829bool EPBucket::maybeEnableTraffic() {
1830    // @todo rename.. skal vaere isTrafficDisabled elns
1831    double memoryUsed =
1832            static_cast<double>(stats.getEstimatedTotalMemoryUsed());
1833    double maxSize = static_cast<double>(stats.getMaxDataSize());
1834
1835    if (memoryUsed >= stats.mem_low_wat) {
1836        EP_LOG_INFO(
1837                "Total memory use reached to the low water mark, stop warmup"
1838                ": memoryUsed ({}) >= low water mark ({})",
1839                memoryUsed,
1840                uint64_t(stats.mem_low_wat.load()));
1841        return true;
1842    } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
1843        EP_LOG_INFO(
1844                "Enough MB of data loaded to enable traffic"
1845                ": memoryUsed ({}) > (maxSize({}) * warmupMemUsedCap({}))",
1846                memoryUsed,
1847                maxSize,
1848                stats.warmupMemUsedCap.load());
1849        return true;
1850    } else if (eviction_policy == EvictionPolicy::Value &&
1851               stats.warmedUpValues >=
1852                       (stats.warmedUpKeys * stats.warmupNumReadCap)) {
1853        // Let ep-engine think we're done with the warmup phase
1854        // (we should refactor this into "enableTraffic")
1855        EP_LOG_INFO(
1856                "Enough number of items loaded to enable traffic (value "
1857                "eviction)"
1858                ": warmedUpValues({}) >= (warmedUpKeys({}) * "
1859                "warmupNumReadCap({}))",
1860                uint64_t(stats.warmedUpValues.load()),
1861                uint64_t(stats.warmedUpKeys.load()),
1862                stats.warmupNumReadCap.load());
1863        return true;
1864    } else if (eviction_policy == EvictionPolicy::Full &&
1865               stats.warmedUpValues >= (warmupTask->getEstimatedItemCount() *
1866                                        stats.warmupNumReadCap)) {
1867        // In case of FULL EVICTION, warmed up keys always matches the number
1868        // of warmed up values, therefore for honoring the min_item threshold
1869        // in this scenario, we can consider warmup's estimated item count.
1870        EP_LOG_INFO(
1871                "Enough number of items loaded to enable traffic (full "
1872                "eviction)"
1873                ": warmedUpValues({}) >= (warmup est items({}) * "
1874                "warmupNumReadCap({}))",
1875                uint64_t(stats.warmedUpValues.load()),
1876                uint64_t(warmupTask->getEstimatedItemCount()),
1877                stats.warmupNumReadCap.load());
1878        return true;
1879    }
1880    return false;
1881}
1882
1883void EPBucket::warmupCompleted() {
1884    // Snapshot VBucket state after warmup to ensure Failover table is
1885    // persisted.
1886    scheduleVBStatePersist();
1887
1888    if (engine.getConfiguration().getAlogPath().length() > 0) {
1889        if (engine.getConfiguration().isAccessScannerEnabled()) {
1890            {
1891                LockHolder lh(accessScanner.mutex);
1892                accessScanner.enabled = true;
1893            }
1894            EP_LOG_INFO("Access Scanner task enabled");
1895            size_t smin = engine.getConfiguration().getAlogSleepTime();
1896            setAccessScannerSleeptime(smin, true);
1897        } else {
1898            LockHolder lh(accessScanner.mutex);
1899            accessScanner.enabled = false;
1900            EP_LOG_INFO("Access Scanner task disabled");
1901        }
1902
1903        Configuration& config = engine.getConfiguration();
1904        config.addValueChangedListener(
1905                "access_scanner_enabled",
1906                std::make_unique<ValueChangedListener>(*this));
1907        config.addValueChangedListener(
1908                "alog_sleep_time",
1909                std::make_unique<ValueChangedListener>(*this));
1910        config.addValueChangedListener(
1911                "alog_task_time",
1912                std::make_unique<ValueChangedListener>(*this));
1913    }
1914
1915    // "0" sleep_time means that the first snapshot task will be executed
1916    // right after warmup. Subsequent snapshot tasks will be scheduled every
1917    // 60 sec by default.
1918    ExecutorPool* iom = ExecutorPool::get();
1919    ExTask task = std::make_shared<StatSnap>(&engine, 0, false);
1920    statsSnapshotTaskId = iom->schedule(task);
1921
1922    collectionsManager->warmupCompleted(*this);
1923}
1924
1925void EPBucket::stopWarmup(void) {
1926    // forcefully stop current warmup task
1927    if (isWarmingUp()) {
1928        EP_LOG_INFO(
1929                "Stopping warmup while engine is loading "
1930                "data from underlying storage, shutdown = {}",
1931                stats.isShutdown ? "yes" : "no");
1932        warmupTask->stop();
1933    }
1934}
1935
1936bool EPBucket::isValidBucketDurabilityLevel(cb::durability::Level level) const {
1937    switch (level) {
1938    case cb::durability::Level::None:
1939    case cb::durability::Level::Majority:
1940    case cb::durability::Level::MajorityAndPersistOnMaster:
1941    case cb::durability::Level::PersistToMajority:
1942        return true;
1943    }
1944    folly::assume_unreachable();
1945}