1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string.h>
21#include <time.h>
22
23#include <fstream>
24#include <functional>
25#include <iostream>
26#include <map>
27#include <sstream>
28#include <string>
29#include <utility>
30#include <vector>
31
32#include <phosphor/phosphor.h>
33#include <platform/make_unique.h>
34
35#include "access_scanner.h"
36#include "checkpoint.h"
37#include "checkpoint_remover.h"
38#include "collections/manager.h"
39#include "conflict_resolution.h"
40#include "connmap.h"
41#include "dcp/dcpconnmap.h"
42#include "defragmenter.h"
43#include "ep_engine.h"
44#include "ep_time.h"
45#include "ext_meta_parser.h"
46#include "failover-table.h"
47#include "flusher.h"
48#include "htresizer.h"
49#include "item_compressor.h"
50#include "kv_bucket.h"
51#include "kvshard.h"
52#include "kvstore.h"
53#include "locks.h"
54#include "mutation_log.h"
55#include "replicationthrottle.h"
56#include "statwriter.h"
57#include "tasks.h"
58#include "trace_helpers.h"
59#include "vb_count_visitor.h"
60#include "vbucket.h"
61#include "vbucket_bgfetch_item.h"
62#include "vbucketdeletiontask.h"
63#include "warmup.h"
64
65class StatsValueChangeListener : public ValueChangedListener {
66public:
67    StatsValueChangeListener(EPStats& st, KVBucket& str)
68        : stats(st), store(str) {
69        // EMPTY
70    }
71
72    void sizeValueChanged(const std::string& key, size_t value) override {
73        if (key.compare("max_size") == 0) {
74            stats.setMaxDataSize(value);
75            store.getEPEngine().getDcpConnMap(). \
76                                     updateMaxActiveSnoozingBackfills(value);
77            size_t low_wat = static_cast<size_t>
78                    (static_cast<double>(value) * stats.mem_low_wat_percent);
79            size_t high_wat = static_cast<size_t>
80                    (static_cast<double>(value) * stats.mem_high_wat_percent);
81            stats.mem_low_wat.store(low_wat);
82            stats.mem_high_wat.store(high_wat);
83            store.setCursorDroppingLowerUpperThresholds(value);
84        } else if (key.compare("mem_low_wat") == 0) {
85            stats.mem_low_wat.store(value);
86            stats.mem_low_wat_percent.store(
87                                    (double)(value) / stats.getMaxDataSize());
88        } else if (key.compare("mem_high_wat") == 0) {
89            stats.mem_high_wat.store(value);
90            stats.mem_high_wat_percent.store(
91                                    (double)(value) / stats.getMaxDataSize());
92        } else if (key.compare("replication_throttle_threshold") == 0) {
93            stats.replicationThrottleThreshold.store(
94                                          static_cast<double>(value) / 100.0);
95        } else if (key.compare("warmup_min_memory_threshold") == 0) {
96            stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
97        } else if (key.compare("warmup_min_items_threshold") == 0) {
98            stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
99        } else {
100            LOG(EXTENSION_LOG_WARNING,
101                "StatsValueChangeListener(size_t) failed to change value for "
102                "unknown variable, %s",
103                key.c_str());
104        }
105    }
106
107    void floatValueChanged(const std::string& key, float value) override {
108        if (key.compare("mem_used_merge_threshold_percent") == 0) {
109            stats.setMemUsedMergeThresholdPercent(value);
110        } else {
111            LOG(EXTENSION_LOG_WARNING,
112                "StatsValueChangeListener(float) failed to change value for "
113                "unknown variable, %s",
114                key.c_str());
115        }
116    }
117
118private:
119    EPStats& stats;
120    KVBucket& store;
121};
122
123/**
124 * A configuration value changed listener that responds to ep-engine
125 * parameter changes by invoking engine-specific methods on
126 * configuration change events.
127 */
128class EPStoreValueChangeListener : public ValueChangedListener {
129public:
130    EPStoreValueChangeListener(KVBucket& st) : store(st) {
131    }
132
133    virtual void sizeValueChanged(const std::string &key, size_t value) {
134        if (key.compare("bg_fetch_delay") == 0) {
135            store.setBGFetchDelay(static_cast<uint32_t>(value));
136        } else if (key.compare("compaction_write_queue_cap") == 0) {
137            store.setCompactionWriteQueueCap(value);
138        } else if (key.compare("exp_pager_stime") == 0) {
139            store.setExpiryPagerSleeptime(value);
140        } else if (key.compare("alog_sleep_time") == 0) {
141            store.setAccessScannerSleeptime(value, false);
142        } else if (key.compare("alog_task_time") == 0) {
143            store.resetAccessScannerStartTime();
144        } else if (key.compare("mutation_mem_threshold") == 0) {
145            double mem_threshold = static_cast<double>(value) / 100;
146            VBucket::setMutationMemoryThreshold(mem_threshold);
147        } else if (key.compare("backfill_mem_threshold") == 0) {
148            double backfill_threshold = static_cast<double>(value) / 100;
149            store.setBackfillMemoryThreshold(backfill_threshold);
150        } else if (key.compare("compaction_exp_mem_threshold") == 0) {
151            store.setCompactionExpMemThreshold(value);
152        } else if (key.compare("replication_throttle_cap_pcnt") == 0) {
153            store.getEPEngine().getReplicationThrottle().setCapPercent(value);
154        } else if (key.compare("max_ttl") == 0) {
155            store.setMaxTtl(value);
156        } else {
157            LOG(EXTENSION_LOG_WARNING,
158                "Failed to change value for unknown variable, %s",
159                key.c_str());
160        }
161    }
162
163    virtual void ssizeValueChanged(const std::string& key, ssize_t value) {
164        if (key.compare("exp_pager_initial_run_time") == 0) {
165            store.setExpiryPagerTasktime(value);
166        } else if (key.compare("replication_throttle_queue_cap") == 0) {
167            store.getEPEngine().getReplicationThrottle().setQueueCap(value);
168        }
169    }
170
171    virtual void booleanValueChanged(const std::string &key, bool value) {
172        if (key.compare("access_scanner_enabled") == 0) {
173            if (value) {
174                store.enableAccessScannerTask();
175            } else {
176                store.disableAccessScannerTask();
177            }
178        } else if (key.compare("bfilter_enabled") == 0) {
179            store.setAllBloomFilters(value);
180        } else if (key.compare("exp_pager_enabled") == 0) {
181            if (value) {
182                store.enableExpiryPager();
183            } else {
184                store.disableExpiryPager();
185            }
186        } else if (key.compare("xattr_enabled") == 0) {
187            store.setXattrEnabled(value);
188        }
189    }
190
191    virtual void floatValueChanged(const std::string &key, float value) {
192        if (key.compare("bfilter_residency_threshold") == 0) {
193            store.setBfiltersResidencyThreshold(value);
194        } else if (key.compare("dcp_min_compression_ratio") == 0) {
195            store.getEPEngine().updateDcpMinCompressionRatio(value);
196        }
197    }
198
199private:
200    KVBucket& store;
201};
202
203class PendingOpsNotification : public GlobalTask {
204public:
205    PendingOpsNotification(EventuallyPersistentEngine& e, VBucketPtr& vb)
206        : GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
207          engine(e),
208          vbucket(vb),
209          description("Notify pending operations for vbucket " +
210                      std::to_string(vbucket->getId())) {
211    }
212
213    std::string getDescription() {
214        return description;
215    }
216
217    std::chrono::microseconds maxExpectedDuration() {
218        // This should be a very fast operation (p50 under 10us), however we
219        // have observed long tails: p99.9 of 20ms; so use a threshold of 100ms.
220        return std::chrono::milliseconds(100);
221    }
222
223    bool run(void) {
224        TRACE_EVENT1("ep-engine/task",
225                     "PendingOpsNotification",
226                     "vb",
227                     vbucket->getId());
228        vbucket->fireAllOps(engine);
229        return false;
230    }
231
232private:
233    EventuallyPersistentEngine &engine;
234    VBucketPtr vbucket;
235    const std::string description;
236};
237
238KVBucket::KVBucket(EventuallyPersistentEngine& theEngine)
239    : engine(theEngine),
240      stats(engine.getEpStats()),
241      vbMap(theEngine.getConfiguration(), *this),
242      defragmenterTask(NULL),
243      itemCompressorTask(nullptr),
244      itemFreqDecayerTask(nullptr),
245      vb_mutexes(engine.getConfiguration().getMaxVbuckets()),
246      diskDeleteAll(false),
247      bgFetchDelay(0),
248      backfillMemoryThreshold(0.95),
249      statsSnapshotTaskId(0),
250      lastTransTimePerItem(0),
251      collectionsManager(std::make_unique<Collections::Manager>()),
252      xattrEnabled(true),
253      maxTtl(engine.getConfiguration().getMaxTtl()) {
254    cachedResidentRatio.activeRatio.store(0);
255    cachedResidentRatio.replicaRatio.store(0);
256
257    Configuration &config = engine.getConfiguration();
258    for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
259        accessLog.emplace_back(
260                config.getAlogPath() + "." + std::to_string(i),
261                config.getAlogBlockSize());
262    }
263
264
265    const size_t size = GlobalTask::allTaskIds.size();
266    stats.schedulingHisto.resize(size);
267    stats.taskRuntimeHisto.resize(size);
268
269    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
270        stats.schedulingHisto[i].reset();
271        stats.taskRuntimeHisto[i].reset();
272    }
273
274    ExecutorPool::get()->registerTaskable(ObjectRegistry::getCurrentEngine()->getTaskable());
275
276    // Reset memory overhead when bucket is created.
277    for (auto& core : stats.coreLocal) {
278        core->memOverhead = 0;
279    }
280    stats.coreLocal.get()->memOverhead = sizeof(KVBucket);
281
282    // Set memUsedThresholdPercent before setting max_size
283    stats.setMemUsedMergeThresholdPercent(
284            config.getMemUsedMergeThresholdPercent());
285    config.addValueChangedListener(
286            "mem_used_merge_threshold_percent",
287            std::make_unique<StatsValueChangeListener>(stats, *this));
288    stats.setMaxDataSize(config.getMaxSize());
289    config.addValueChangedListener(
290            "max_size",
291            std::make_unique<StatsValueChangeListener>(stats, *this));
292    getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(
293                                                        config.getMaxSize());
294
295    stats.mem_low_wat.store(config.getMemLowWat());
296    config.addValueChangedListener(
297            "mem_low_wat",
298            std::make_unique<StatsValueChangeListener>(stats, *this));
299    stats.mem_low_wat_percent.store(
300                (double)(stats.mem_low_wat.load()) / stats.getMaxDataSize());
301
302    stats.mem_high_wat.store(config.getMemHighWat());
303    config.addValueChangedListener(
304            "mem_high_wat",
305            std::make_unique<StatsValueChangeListener>(stats, *this));
306    stats.mem_high_wat_percent.store(
307                (double)(stats.mem_high_wat.load()) / stats.getMaxDataSize());
308
309    setCursorDroppingLowerUpperThresholds(config.getMaxSize());
310
311    stats.replicationThrottleThreshold.store(static_cast<double>
312                                    (config.getReplicationThrottleThreshold())
313                                     / 100.0);
314    config.addValueChangedListener(
315            "replication_throttle_threshold",
316            std::make_unique<StatsValueChangeListener>(stats, *this));
317
318    stats.replicationThrottleWriteQueueCap.store(
319                                    config.getReplicationThrottleQueueCap());
320    config.addValueChangedListener(
321            "replication_throttle_queue_cap",
322            std::make_unique<EPStoreValueChangeListener>(*this));
323    config.addValueChangedListener(
324            "replication_throttle_cap_pcnt",
325            std::make_unique<EPStoreValueChangeListener>(*this));
326
327    setBGFetchDelay(config.getBgFetchDelay());
328    config.addValueChangedListener(
329            "bg_fetch_delay",
330            std::make_unique<EPStoreValueChangeListener>(*this));
331
332    stats.warmupMemUsedCap.store(static_cast<double>
333                               (config.getWarmupMinMemoryThreshold()) / 100.0);
334    config.addValueChangedListener(
335            "warmup_min_memory_threshold",
336            std::make_unique<StatsValueChangeListener>(stats, *this));
337    stats.warmupNumReadCap.store(static_cast<double>
338                                (config.getWarmupMinItemsThreshold()) / 100.0);
339    config.addValueChangedListener(
340            "warmup_min_items_threshold",
341            std::make_unique<StatsValueChangeListener>(stats, *this));
342
343    double mem_threshold = static_cast<double>
344                                      (config.getMutationMemThreshold()) / 100;
345    VBucket::setMutationMemoryThreshold(mem_threshold);
346    config.addValueChangedListener(
347            "mutation_mem_threshold",
348            std::make_unique<EPStoreValueChangeListener>(*this));
349
350    double backfill_threshold = static_cast<double>
351                                      (config.getBackfillMemThreshold()) / 100;
352    setBackfillMemoryThreshold(backfill_threshold);
353    config.addValueChangedListener(
354            "backfill_mem_threshold",
355            std::make_unique<EPStoreValueChangeListener>(*this));
356
357    config.addValueChangedListener(
358            "bfilter_enabled",
359            std::make_unique<EPStoreValueChangeListener>(*this));
360
361    bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
362    config.addValueChangedListener(
363            "bfilter_residency_threshold",
364            std::make_unique<EPStoreValueChangeListener>(*this));
365
366    compactionExpMemThreshold = config.getCompactionExpMemThreshold();
367    config.addValueChangedListener(
368            "compaction_exp_mem_threshold",
369            std::make_unique<EPStoreValueChangeListener>(*this));
370
371    compactionWriteQueueCap = config.getCompactionWriteQueueCap();
372    config.addValueChangedListener(
373            "compaction_write_queue_cap",
374            std::make_unique<EPStoreValueChangeListener>(*this));
375
376    config.addValueChangedListener(
377            "dcp_min_compression_ratio",
378            std::make_unique<EPStoreValueChangeListener>(*this));
379
380    config.addValueChangedListener(
381            "xattr_enabled",
382            std::make_unique<EPStoreValueChangeListener>(*this));
383
384    config.addValueChangedListener(
385            "max_ttl", std::make_unique<EPStoreValueChangeListener>(*this));
386
387    xattrEnabled = config.isXattrEnabled();
388
389    // Always create the item pager; but initially disable, leaving scheduling
390    // up to the specific KVBucket subclasses.
391    itemPagerTask = std::make_shared<ItemPager>(engine, stats);
392    disableItemPager();
393
394    initializeWarmupTask();
395}
396
397bool KVBucket::initialize() {
398    // We should nuke everything unless we want warmup
399    Configuration &config = engine.getConfiguration();
400    if (!config.isWarmup()) {
401        reset();
402    }
403
404    startWarmupTask();
405
406    initializeExpiryPager(config);
407
408    ExTask htrTask = std::make_shared<HashtableResizerTask>(this, 10);
409    ExecutorPool::get()->schedule(htrTask);
410
411    size_t checkpointRemoverInterval = config.getChkRemoverStime();
412    chkTask = std::make_shared<ClosedUnrefCheckpointRemoverTask>(
413            &engine, stats, checkpointRemoverInterval);
414    ExecutorPool::get()->schedule(chkTask);
415
416    ExTask workloadMonitorTask =
417            std::make_shared<WorkLoadMonitor>(&engine, false);
418    ExecutorPool::get()->schedule(workloadMonitorTask);
419
420#if HAVE_JEMALLOC
421    /* Only create the defragmenter task if we have an underlying memory
422     * allocator which can facilitate defragmenting memory.
423     */
424    defragmenterTask = std::make_shared<DefragmenterTask>(&engine, stats);
425    ExecutorPool::get()->schedule(defragmenterTask);
426#endif
427
428    itemCompressorTask = std::make_shared<ItemCompressorTask>(&engine, stats);
429    ExecutorPool::get()->schedule(itemCompressorTask);
430
431    /*
432     * Creates the ItemFreqDecayer task which is used to ensure that the
433     * frequency counters of items stored in the hash table do not all
434     * become saturated.  Once the task runs it will snooze for int max
435     * seconds and will only be woken up when the frequency counter of an
436     * item in the hash table becomes saturated.
437     */
438    itemFreqDecayerTask = std::make_shared<ItemFreqDecayerTask>(
439            &engine, config.getItemFreqDecayerPercent());
440    ExecutorPool::get()->schedule(itemFreqDecayerTask);
441
442    return true;
443}
444
445void KVBucket::initializeWarmupTask() {
446    if (engine.getConfiguration().isWarmup()) {
447        warmupTask = std::make_unique<Warmup>(*this, engine.getConfiguration());
448    }
449}
450void KVBucket::startWarmupTask() {
451    if (warmupTask) {
452        warmupTask->start();
453    } else {
454        // No warmup, immediately online the bucket.
455        warmupCompleted();
456    }
457}
458
459void KVBucket::deinitialize() {
460    stopWarmup();
461    ExecutorPool::get()->stopTaskGroup(engine.getTaskable().getGID(),
462                                       NONIO_TASK_IDX, stats.forceShutdown);
463
464    ExecutorPool::get()->cancel(statsSnapshotTaskId);
465
466    {
467        LockHolder lh(accessScanner.mutex);
468        ExecutorPool::get()->cancel(accessScanner.task);
469    }
470
471    ExecutorPool::get()->unregisterTaskable(engine.getTaskable(),
472                                            stats.forceShutdown);
473}
474
475KVBucket::~KVBucket() {
476    LOG(EXTENSION_LOG_NOTICE, "Deleting vb_mutexes");
477    LOG(EXTENSION_LOG_NOTICE, "Deleting defragmenterTask");
478    defragmenterTask.reset();
479    LOG(EXTENSION_LOG_NOTICE, "Deleting itemCompressorTask");
480    itemCompressorTask.reset();
481    LOG(EXTENSION_LOG_NOTICE, "Deleting itemFreqDecayerTask");
482    itemFreqDecayerTask.reset();
483    LOG(EXTENSION_LOG_NOTICE, "Deleted KvBucket.");
484}
485
486const Flusher* KVBucket::getFlusher(uint16_t shardId) {
487    return vbMap.shards[shardId]->getFlusher();
488}
489
490Warmup* KVBucket::getWarmup(void) const {
491    return warmupTask.get();
492}
493
494bool KVBucket::pauseFlusher() {
495    // Nothing do to - no flusher in this class
496    return false;
497}
498
499bool KVBucket::resumeFlusher() {
500    // Nothing do to - no flusher in this class
501    return false;
502}
503
504void KVBucket::wakeUpFlusher() {
505    // Nothing do to - no flusher in this class
506}
507
508protocol_binary_response_status KVBucket::evictKey(const DocKey& key,
509                                                   VBucket::id_type vbucket,
510                                                   const char** msg) {
511    VBucketPtr vb = getVBucket(vbucket);
512    if (!vb || (vb->getState() != vbucket_state_active)) {
513        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
514    }
515
516    return vb->evictKey(key, msg);
517}
518
519void KVBucket::getValue(Item& it) {
520    auto gv =
521            getROUnderlying(it.getVBucketId())
522                    ->get(it.getKey(), it.getVBucketId(), true /*fetchDelete*/);
523
524    if (gv.getStatus() != ENGINE_SUCCESS) {
525        // Cannot continue to pre_expiry, log this failed get and return
526        LOG(EXTENSION_LOG_WARNING,
527            "KVBucket::getValue failed get for item vb:%" PRIu16
528            ", it.seqno:%" PRIi64 ", status:%d",
529            it.getVBucketId(),
530            it.getBySeqno(),
531            gv.getStatus());
532        return;
533    } else if (!gv.item->isDeleted()) {
534        it.replaceValue(gv.item->getValue().get());
535    }
536}
537
538void KVBucket::runPreExpiryHook(VBucket& vb, Item& it) {
539    it.decompressValue(); // A no-op for already decompressed items
540    auto info =
541            it.toItemInfo(vb.failovers->getLatestUUID(), vb.getHLCEpochSeqno());
542    if (engine.getServerApi()->document->pre_expiry(info)) {
543        // The payload is modified and contains data we should use
544        it.replaceValue(Blob::New(static_cast<char*>(info.value[0].iov_base),
545                                  info.value[0].iov_len));
546        it.setDataType(info.datatype);
547    } else {
548        // Make the document empty and raw
549        it.replaceValue(Blob::New(0));
550        it.setDataType(PROTOCOL_BINARY_RAW_BYTES);
551    }
552}
553
554void KVBucket::deleteExpiredItem(Item& it,
555                                 time_t startTime,
556                                 ExpireBy source) {
557    VBucketPtr vb = getVBucket(it.getVBucketId());
558
559    if (vb) {
560        // MB-25931: Empty XATTR items need their value before we can call
561        // pre_expiry. These occur because the value has been evicted.
562        if (mcbp::datatype::is_xattr(it.getDataType()) && it.getNBytes() == 0) {
563            getValue(it);
564        }
565
566        // Process positive seqnos (ignoring special *temp* items) and only
567        // those items with a value
568        if (it.getBySeqno() >= 0 && it.getNBytes()) {
569            runPreExpiryHook(*vb, it);
570        }
571
572        // Obtain reader access to the VB state change lock so that
573        // the VB can't switch state whilst we're processing
574        ReaderLockHolder rlh(vb->getStateLock());
575        if (vb->getState() == vbucket_state_active) {
576            vb->deleteExpiredItem(it, startTime, source);
577        }
578    }
579}
580
581void KVBucket::deleteExpiredItems(
582        std::list<Item>& itms, ExpireBy source) {
583    std::list<std::pair<uint16_t, std::string> >::iterator it;
584    time_t startTime = ep_real_time();
585    for (auto& it : itms) {
586        deleteExpiredItem(it, startTime, source);
587    }
588}
589
590bool KVBucket::isMetaDataResident(VBucketPtr &vb, const DocKey& key) {
591
592    if (!vb) {
593        throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL");
594    }
595
596    auto hbl = vb->ht.getLockedBucket(key);
597    StoredValue* v = vb->ht.unlocked_find(
598            key, hbl.getBucketNum(), WantsDeleted::No, TrackReference::No);
599
600    if (v && !v->isTempItem()) {
601        return true;
602    } else {
603        return false;
604    }
605}
606
607void KVBucket::logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
608    auto ms = std::chrono::duration_cast<std::chrono::microseconds>(enqTime);
609    stats.schedulingHisto[static_cast<int>(taskType)].add(ms);
610}
611
612void KVBucket::logRunTime(TaskId taskType,
613                          const ProcessClock::duration runTime) {
614    auto ms = std::chrono::duration_cast<std::chrono::microseconds>(runTime);
615    stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ms);
616}
617
618ENGINE_ERROR_CODE KVBucket::set(Item& itm,
619                                const void* cookie,
620                                cb::StoreIfPredicate predicate) {
621    VBucketPtr vb = getVBucket(itm.getVBucketId());
622    if (!vb) {
623        ++stats.numNotMyVBuckets;
624        return ENGINE_NOT_MY_VBUCKET;
625    }
626
627    // Obtain read-lock on VB state to ensure VB state changes are interlocked
628    // with this set
629    ReaderLockHolder rlh(vb->getStateLock());
630    if (vb->getState() == vbucket_state_dead) {
631        ++stats.numNotMyVBuckets;
632        return ENGINE_NOT_MY_VBUCKET;
633    } else if (vb->getState() == vbucket_state_replica) {
634        ++stats.numNotMyVBuckets;
635        return ENGINE_NOT_MY_VBUCKET;
636    } else if (vb->getState() == vbucket_state_pending) {
637        if (vb->addPendingOp(cookie)) {
638            return ENGINE_EWOULDBLOCK;
639        }
640    } else if (vb->isTakeoverBackedUp()) {
641        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a set op"
642                ", becuase takeover is lagging", vb->getId());
643        return ENGINE_TMPFAIL;
644    }
645
646    { // collections read-lock scope
647        auto collectionsRHandle = vb->lockCollections();
648        if (!collectionsRHandle.doesKeyContainValidCollection(itm.getKey())) {
649            return ENGINE_UNKNOWN_COLLECTION;
650        } // now hold collections read access for the duration of the set
651
652        return vb->set(itm, cookie, engine, bgFetchDelay, predicate);
653    }
654}
655
656ENGINE_ERROR_CODE KVBucket::add(Item &itm, const void *cookie)
657{
658    VBucketPtr vb = getVBucket(itm.getVBucketId());
659    if (!vb) {
660        ++stats.numNotMyVBuckets;
661        return ENGINE_NOT_MY_VBUCKET;
662    }
663
664    // Obtain read-lock on VB state to ensure VB state changes are interlocked
665    // with this add
666    ReaderLockHolder rlh(vb->getStateLock());
667    if (vb->getState() == vbucket_state_dead ||
668        vb->getState() == vbucket_state_replica) {
669        ++stats.numNotMyVBuckets;
670        return ENGINE_NOT_MY_VBUCKET;
671    } else if (vb->getState() == vbucket_state_pending) {
672        if (vb->addPendingOp(cookie)) {
673            return ENGINE_EWOULDBLOCK;
674        }
675    } else if (vb->isTakeoverBackedUp()) {
676        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a add op"
677                ", becuase takeover is lagging", vb->getId());
678        return ENGINE_TMPFAIL;
679    }
680
681    if (itm.getCas() != 0) {
682        // Adding with a cas value doesn't make sense..
683        return ENGINE_NOT_STORED;
684    }
685
686    { // collections read-lock scope
687        auto collectionsRHandle = vb->lockCollections(itm.getKey());
688        if (!collectionsRHandle.valid()) {
689            return ENGINE_UNKNOWN_COLLECTION;
690        } // now hold collections read access for the duration of the add
691
692        return vb->add(itm, cookie, engine, bgFetchDelay, collectionsRHandle);
693    }
694}
695
696ENGINE_ERROR_CODE KVBucket::replace(Item& itm,
697                                    const void* cookie,
698                                    cb::StoreIfPredicate predicate) {
699    VBucketPtr vb = getVBucket(itm.getVBucketId());
700    if (!vb) {
701        ++stats.numNotMyVBuckets;
702        return ENGINE_NOT_MY_VBUCKET;
703    }
704
705    // Obtain read-lock on VB state to ensure VB state changes are interlocked
706    // with this replace
707    ReaderLockHolder rlh(vb->getStateLock());
708    if (vb->getState() == vbucket_state_dead ||
709        vb->getState() == vbucket_state_replica) {
710        ++stats.numNotMyVBuckets;
711        return ENGINE_NOT_MY_VBUCKET;
712    } else if (vb->getState() == vbucket_state_pending) {
713        if (vb->addPendingOp(cookie)) {
714            return ENGINE_EWOULDBLOCK;
715        }
716    }
717
718    { // collections read-lock scope
719        auto collectionsRHandle = vb->lockCollections(itm.getKey());
720        if (!collectionsRHandle.valid()) {
721            return ENGINE_UNKNOWN_COLLECTION;
722        } // now hold collections read access for the duration of the set
723
724        return vb->replace(itm,
725                           cookie,
726                           engine,
727                           bgFetchDelay,
728                           predicate,
729                           collectionsRHandle);
730    }
731}
732
733ENGINE_ERROR_CODE KVBucket::addBackfillItem(Item& itm,
734                                            GenerateBySeqno genBySeqno,
735                                            ExtendedMetaData* emd) {
736    VBucketPtr vb = getVBucket(itm.getVBucketId());
737    if (!vb) {
738        ++stats.numNotMyVBuckets;
739        return ENGINE_NOT_MY_VBUCKET;
740    }
741
742    // Obtain read-lock on VB state to ensure VB state changes are interlocked
743    // with this addBackfillItem
744    ReaderLockHolder rlh(vb->getStateLock());
745    if (vb->getState() == vbucket_state_dead ||
746        vb->getState() == vbucket_state_active) {
747        ++stats.numNotMyVBuckets;
748        return ENGINE_NOT_MY_VBUCKET;
749    }
750
751    //check for the incoming item's CAS validity
752    if (!Item::isValidCas(itm.getCas())) {
753        return ENGINE_KEY_EEXISTS;
754    }
755
756    return vb->addBackfillItem(itm, genBySeqno);
757}
758
759ENGINE_ERROR_CODE KVBucket::setVBucketState(uint16_t vbid,
760                                            vbucket_state_t to,
761                                            bool transfer,
762                                            const void* cookie) {
763    // MB-25197: we shouldn't process setVBState if warmup hasn't yet loaded
764    // the vbucket state data.
765    if (cookie && shouldSetVBStateBlock(cookie)) {
766        LOG(EXTENSION_LOG_NOTICE,
767            "KVBucket::setVBucketState blocking vb:%" PRIu16
768            ", to:%s, transfer:%d, cookie:%p",
769            vbid,
770            VBucket::toString(to),
771            transfer,
772            cookie);
773        return ENGINE_EWOULDBLOCK;
774    }
775
776    // Lock to prevent a race condition between a failed update and add.
777    std::unique_lock<std::mutex> lh(vbsetMutex);
778    return setVBucketState_UNLOCKED(vbid, to, transfer, true /*notifyDcp*/, lh);
779}
780
781ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
782        uint16_t vbid,
783        vbucket_state_t to,
784        bool transfer,
785        bool notify_dcp,
786        std::unique_lock<std::mutex>& vbset,
787        WriterLockHolder* vbStateLock) {
788    VBucketPtr vb = vbMap.getBucket(vbid);
789    if (vb && to == vb->getState()) {
790        return ENGINE_SUCCESS;
791    }
792
793    if (vb) {
794        vbucket_state_t oldstate = vb->getState();
795        vbMap.decVBStateCount(oldstate);
796        if (vbStateLock) {
797            vb->setState_UNLOCKED(to, *vbStateLock);
798        } else {
799            vb->setState(to);
800        }
801
802        if (oldstate != to && notify_dcp) {
803            bool closeInboundStreams = false;
804            if (to == vbucket_state_active && !transfer) {
805                /**
806                 * Close inbound (passive) streams into the vbucket
807                 * only in case of a failover.
808                 */
809                closeInboundStreams = true;
810            }
811            engine.getDcpConnMap().vbucketStateChanged(vbid, to,
812                                                       closeInboundStreams);
813        }
814
815        if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
816            /**
817             * Update snapshot range when vbucket goes from being a replica
818             * to active, to maintain the correct snapshot sequence numbers
819             * even in a failover scenario.
820             */
821            vb->checkpointManager->resetSnapshotRange();
822        }
823
824        if (to == vbucket_state_active && !transfer) {
825            const snapshot_range_t range = vb->getPersistedSnapshot();
826            auto highSeqno = range.end == vb->getPersistenceSeqno()
827                                     ? range.end
828                                     : range.start;
829            vb->failovers->createEntry(highSeqno);
830
831            auto entry = vb->failovers->getLatestEntry();
832            LOG(EXTENSION_LOG_NOTICE,
833                "KVBucket::setVBucketState: vb:%" PRIu16
834                " created new failover entry with "
835                "uuid:%" PRIu64 " and seqno:%" PRIu64,
836                vbid,
837                entry.vb_uuid,
838                entry.by_seqno);
839        }
840
841        if (oldstate == vbucket_state_pending &&
842            to == vbucket_state_active) {
843            ExTask notifyTask =
844                    std::make_shared<PendingOpsNotification>(engine, vb);
845            ExecutorPool::get()->schedule(notifyTask);
846        }
847        scheduleVBStatePersist(vbid);
848    } else if (vbid < vbMap.getSize()) {
849        auto ft =
850                std::make_unique<FailoverTable>(engine.getMaxFailoverEntries());
851        KVShard* shard = vbMap.getShardByVbId(vbid);
852
853        VBucketPtr newvb =
854                makeVBucket(vbid,
855                            to,
856                            shard,
857                            std::move(ft),
858                            std::make_unique<NotifyNewSeqnoCB>(*this));
859
860        newvb->setFreqSaturatedCallback(
861                [this] { this->wakeItemFreqDecayerTask(); });
862
863        Configuration& config = engine.getConfiguration();
864        if (config.isBfilterEnabled()) {
865            // Initialize bloom filters upon vbucket creation during
866            // bucket creation and rebalance
867            newvb->createFilter(config.getBfilterKeyCount(),
868                                config.getBfilterFpProb());
869        }
870
871        // The first checkpoint for active vbucket should start with id 2.
872        uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
873        newvb->checkpointManager->setOpenCheckpointId(start_chk_id);
874
875        // Before adding the VB to the map increment the revision
876        getRWUnderlying(vbid)->incrementRevision(vbid);
877
878        // If active, update the VB from the bucket's collection state
879        if (to == vbucket_state_active) {
880            collectionsManager->update(*newvb);
881        }
882
883        if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
884            return ENGINE_ERANGE;
885        }
886        // When the VBucket is constructed we initialize
887        // persistenceSeqno(0) && persistenceCheckpointId(0)
888        newvb->setBucketCreation(true);
889        scheduleVBStatePersist(vbid);
890    } else {
891        return ENGINE_ERANGE;
892    }
893    return ENGINE_SUCCESS;
894}
895
896void KVBucket::scheduleVBStatePersist() {
897    for (auto vbid : vbMap.getBuckets()) {
898        scheduleVBStatePersist(vbid);
899    }
900}
901
902void KVBucket::scheduleVBStatePersist(VBucket::id_type vbid) {
903    VBucketPtr vb = getVBucket(vbid);
904
905    if (!vb) {
906        LOG(EXTENSION_LOG_WARNING,
907            "EPStore::scheduleVBStatePersist: vb:%" PRIu16
908            " does not not exist. Unable to schedule persistence.", vbid);
909        return;
910    }
911
912    vb->checkpointManager->queueSetVBState(*vb);
913}
914
915ENGINE_ERROR_CODE KVBucket::deleteVBucket(uint16_t vbid, const void* c) {
916    // Lock to prevent a race condition between a failed update and add
917    // (and delete).
918    VBucketPtr vb = vbMap.getBucket(vbid);
919    if (!vb) {
920        return ENGINE_NOT_MY_VBUCKET;
921    }
922
923    {
924        std::unique_lock<std::mutex> vbSetLh(vbsetMutex);
925        // Obtain a locked VBucket to ensure we interlock with other
926        // threads that are manipulating the VB (particularly ones which may
927        // try and change the disk revision e.g. deleteAll and compaction).
928        auto lockedVB = getLockedVBucket(vbid);
929        vbMap.decVBStateCount(lockedVB->getState());
930        lockedVB->setState(vbucket_state_dead);
931        engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
932
933        // Drop the VB to begin the delete, the last holder of the VB will
934        // unknowingly trigger the destructor which schedules a deletion task.
935        vbMap.dropVBucketAndSetupDeferredDeletion(vbid, c);
936    }
937
938    if (c) {
939        return ENGINE_EWOULDBLOCK;
940    }
941    return ENGINE_SUCCESS;
942}
943
944ENGINE_ERROR_CODE KVBucket::checkForDBExistence(DBFileId db_file_id) {
945    std::string backend = engine.getConfiguration().getBackend();
946    if (backend.compare("couchdb") == 0) {
947        VBucketPtr vb = vbMap.getBucket(db_file_id);
948        if (!vb) {
949            return ENGINE_NOT_MY_VBUCKET;
950        }
951    } else {
952        LOG(EXTENSION_LOG_WARNING,
953            "Unknown backend specified for db file id: %d", db_file_id);
954        return ENGINE_FAILED;
955    }
956
957    return ENGINE_SUCCESS;
958}
959
960uint16_t KVBucket::getDBFileId(const protocol_binary_request_compact_db& req) {
961    KVStore *store = vbMap.shards[0]->getROUnderlying();
962    return store->getDBFileId(req);
963}
964
965bool KVBucket::resetVBucket(uint16_t vbid) {
966    std::unique_lock<std::mutex> vbsetLock(vbsetMutex);
967    // Obtain a locked VBucket to ensure we interlock with other
968    // threads that are manipulating the VB (particularly ones which may
969    // try and change the disk revision).
970    auto lockedVB = getLockedVBucket(vbid);
971    return resetVBucket_UNLOCKED(lockedVB, vbsetLock);
972}
973
974bool KVBucket::resetVBucket_UNLOCKED(LockedVBucketPtr& vb,
975                                     std::unique_lock<std::mutex>& vbset) {
976    bool rv(false);
977
978    if (vb) {
979        vbucket_state_t vbstate = vb->getState();
980
981        vbMap.dropVBucketAndSetupDeferredDeletion(vb->getId(),
982                                                  nullptr /*no cookie*/);
983
984        checkpointCursorInfoList cursors =
985                vb->checkpointManager->getAllCursors();
986        // Delete and recreate the vbucket database file
987        setVBucketState_UNLOCKED(vb->getId(),
988                                 vbstate,
989                                 false /*transfer*/,
990                                 true /*notifyDcp*/,
991                                 vbset);
992
993        // Copy the all cursors from the old vbucket into the new vbucket
994        VBucketPtr newvb = vbMap.getBucket(vb->getId());
995        newvb->checkpointManager->resetCursors(cursors);
996
997        rv = true;
998    }
999    return rv;
1000}
1001
1002extern "C" {
1003
1004struct snapshot_stats_t : cb::tracing::Traceable {
1005    EventuallyPersistentEngine* engine;
1006    std::map<std::string, std::string> smap;
1007};
1008
1009static void add_stat(const char* key,
1010                     const uint16_t klen,
1011                     const char* val,
1012                     const uint32_t vlen,
1013                     gsl::not_null<const void*> cookie) {
1014    void* ptr = const_cast<void*>(cookie.get());
1015    snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1016    ObjectRegistry::onSwitchThread(snap->engine);
1017    std::string k(key, klen);
1018    std::string v(val, vlen);
1019    snap->smap.insert(std::pair<std::string, std::string>(k, v));
1020    }
1021}
1022
1023void KVBucket::snapshotStats() {
1024    snapshot_stats_t snap;
1025    snap.engine = &engine;
1026    bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1027              engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1028
1029    if (rv && stats.isShutdown) {
1030        snap.smap["ep_force_shutdown"] = stats.forceShutdown ? "true" : "false";
1031        std::stringstream ss;
1032        ss << ep_real_time();
1033        snap.smap["ep_shutdown_time"] = ss.str();
1034    }
1035    getOneRWUnderlying()->snapshotStats(snap.smap);
1036}
1037
1038void KVBucket::getAggregatedVBucketStats(const void* cookie,
1039                                         ADD_STAT add_stat) {
1040    // Create visitors for each of the four vBucket states, and collect
1041    // stats for each.
1042    auto active = makeVBCountVisitor(vbucket_state_active);
1043    auto replica = makeVBCountVisitor(vbucket_state_replica);
1044    auto pending = makeVBCountVisitor(vbucket_state_pending);
1045    auto dead = makeVBCountVisitor(vbucket_state_dead);
1046
1047    VBucketCountAggregator aggregator;
1048    aggregator.addVisitor(active.get());
1049    aggregator.addVisitor(replica.get());
1050    aggregator.addVisitor(pending.get());
1051    aggregator.addVisitor(dead.get());
1052    visit(aggregator);
1053
1054    updateCachedResidentRatio(active->getMemResidentPer(),
1055                              replica->getMemResidentPer());
1056    engine.getReplicationThrottle().adjustWriteQueueCap(active->getNumItems() +
1057                                                        replica->getNumItems() +
1058                                                        pending->getNumItems());
1059
1060    // And finally actually return the stats using the ADD_STAT callback.
1061    appendAggregatedVBucketStats(
1062            *active, *replica, *pending, *dead, cookie, add_stat);
1063}
1064
1065std::unique_ptr<VBucketCountVisitor> KVBucket::makeVBCountVisitor(
1066        vbucket_state_t state) {
1067    return std::make_unique<VBucketCountVisitor>(state);
1068}
1069
1070void KVBucket::appendAggregatedVBucketStats(VBucketCountVisitor& active,
1071                                            VBucketCountVisitor& replica,
1072                                            VBucketCountVisitor& pending,
1073                                            VBucketCountVisitor& dead,
1074                                            const void* cookie,
1075                                            ADD_STAT add_stat) {
1076// Simplify the repetition of calling add_casted_stat with `add_stat` and
1077// cookie each time. (Note: if we had C++14 we could use a polymorphic
1078// lambda, but for now will have to stick to C++98 and macros :).
1079#define DO_STAT(k, v)                            \
1080    do {                                         \
1081        add_casted_stat(k, v, add_stat, cookie); \
1082    } while (0)
1083
1084    // Top-level stats:
1085    DO_STAT("ep_flush_all", isDeleteAllScheduled());
1086    DO_STAT("curr_items", active.getNumItems());
1087    DO_STAT("curr_temp_items", active.getNumTempItems());
1088    DO_STAT("curr_items_tot",
1089            active.getNumItems() + replica.getNumItems() +
1090                    pending.getNumItems());
1091
1092    // Active vBuckets:
1093    DO_STAT("vb_active_backfill_queue_size", active.getBackfillQueueSize());
1094    DO_STAT("vb_active_num", active.getVBucketNumber());
1095    DO_STAT("vb_active_curr_items", active.getNumItems());
1096    DO_STAT("vb_active_hp_vb_req_size", active.getNumHpVBReqs());
1097    DO_STAT("vb_active_num_non_resident", active.getNonResident());
1098    DO_STAT("vb_active_perc_mem_resident", active.getMemResidentPer());
1099    DO_STAT("vb_active_eject", active.getEjects());
1100    DO_STAT("vb_active_expired", active.getExpired());
1101    DO_STAT("vb_active_meta_data_memory", active.getMetaDataMemory());
1102    DO_STAT("vb_active_meta_data_disk", active.getMetaDataDisk());
1103    DO_STAT("vb_active_checkpoint_memory", active.getCheckpointMemory());
1104    DO_STAT("vb_active_checkpoint_memory_unreferenced",
1105            active.getCheckpointMemoryUnreferenced());
1106    DO_STAT("vb_active_checkpoint_memory_overhead",
1107            active.getCheckpointMemoryOverhead());
1108    DO_STAT("vb_active_ht_memory", active.getHashtableMemory());
1109    DO_STAT("vb_active_itm_memory", active.getItemMemory());
1110    DO_STAT("vb_active_itm_memory_uncompressed",
1111            active.getUncompressedItemMemory());
1112    DO_STAT("vb_active_ops_create", active.getOpsCreate());
1113    DO_STAT("vb_active_ops_update", active.getOpsUpdate());
1114    DO_STAT("vb_active_ops_delete", active.getOpsDelete());
1115    DO_STAT("vb_active_ops_reject", active.getOpsReject());
1116    DO_STAT("vb_active_queue_size", active.getQueueSize());
1117    DO_STAT("vb_active_queue_memory", active.getQueueMemory());
1118    DO_STAT("vb_active_queue_age", active.getAge());
1119    DO_STAT("vb_active_queue_pending", active.getPendingWrites());
1120    DO_STAT("vb_active_queue_fill", active.getQueueFill());
1121    DO_STAT("vb_active_queue_drain", active.getQueueDrain());
1122    DO_STAT("vb_active_rollback_item_count", active.getRollbackItemCount());
1123
1124    // Replica vBuckets:
1125    DO_STAT("vb_replica_backfill_queue_size", replica.getBackfillQueueSize());
1126    DO_STAT("vb_replica_num", replica.getVBucketNumber());
1127    DO_STAT("vb_replica_curr_items", replica.getNumItems());
1128    DO_STAT("vb_replica_hp_vb_req_size", replica.getNumHpVBReqs());
1129    DO_STAT("vb_replica_num_non_resident", replica.getNonResident());
1130    DO_STAT("vb_replica_perc_mem_resident", replica.getMemResidentPer());
1131    DO_STAT("vb_replica_eject", replica.getEjects());
1132    DO_STAT("vb_replica_expired", replica.getExpired());
1133    DO_STAT("vb_replica_meta_data_memory", replica.getMetaDataMemory());
1134    DO_STAT("vb_replica_meta_data_disk", replica.getMetaDataDisk());
1135    DO_STAT("vb_replica_checkpoint_memory", replica.getCheckpointMemory());
1136    DO_STAT("vb_replica_checkpoint_memory_unreferenced",
1137            replica.getCheckpointMemoryUnreferenced());
1138    DO_STAT("vb_replica_checkpoint_memory_overhead",
1139            replica.getCheckpointMemoryOverhead());
1140    DO_STAT("vb_replica_ht_memory", replica.getHashtableMemory());
1141    DO_STAT("vb_replica_itm_memory", replica.getItemMemory());
1142    DO_STAT("vb_replica_itm_memory_uncompressed",
1143            replica.getUncompressedItemMemory());
1144    DO_STAT("vb_replica_ops_create", replica.getOpsCreate());
1145    DO_STAT("vb_replica_ops_update", replica.getOpsUpdate());
1146    DO_STAT("vb_replica_ops_delete", replica.getOpsDelete());
1147    DO_STAT("vb_replica_ops_reject", replica.getOpsReject());
1148    DO_STAT("vb_replica_queue_size", replica.getQueueSize());
1149    DO_STAT("vb_replica_queue_memory", replica.getQueueMemory());
1150    DO_STAT("vb_replica_queue_age", replica.getAge());
1151    DO_STAT("vb_replica_queue_pending", replica.getPendingWrites());
1152    DO_STAT("vb_replica_queue_fill", replica.getQueueFill());
1153    DO_STAT("vb_replica_queue_drain", replica.getQueueDrain());
1154    DO_STAT("vb_replica_rollback_item_count", replica.getRollbackItemCount());
1155
1156    // Pending vBuckets:
1157    DO_STAT("vb_pending_backfill_queue_size", pending.getBackfillQueueSize());
1158    DO_STAT("vb_pending_num", pending.getVBucketNumber());
1159    DO_STAT("vb_pending_curr_items", pending.getNumItems());
1160    DO_STAT("vb_pending_hp_vb_req_size", pending.getNumHpVBReqs());
1161    DO_STAT("vb_pending_num_non_resident", pending.getNonResident());
1162    DO_STAT("vb_pending_perc_mem_resident", pending.getMemResidentPer());
1163    DO_STAT("vb_pending_eject", pending.getEjects());
1164    DO_STAT("vb_pending_expired", pending.getExpired());
1165    DO_STAT("vb_pending_meta_data_memory", pending.getMetaDataMemory());
1166    DO_STAT("vb_pending_meta_data_disk", pending.getMetaDataDisk());
1167    DO_STAT("vb_pending_checkpoint_memory", pending.getCheckpointMemory());
1168    DO_STAT("vb_pending_checkpoint_memory_unreferenced",
1169            pending.getCheckpointMemoryUnreferenced());
1170    DO_STAT("vb_pending_checkpoint_memory_overhead",
1171            pending.getCheckpointMemoryOverhead());
1172    DO_STAT("vb_pending_ht_memory", pending.getHashtableMemory());
1173    DO_STAT("vb_pending_itm_memory", pending.getItemMemory());
1174    DO_STAT("vb_pending_itm_memory_uncompressed",
1175            pending.getUncompressedItemMemory());
1176    DO_STAT("vb_pending_ops_create", pending.getOpsCreate());
1177    DO_STAT("vb_pending_ops_update", pending.getOpsUpdate());
1178    DO_STAT("vb_pending_ops_delete", pending.getOpsDelete());
1179    DO_STAT("vb_pending_ops_reject", pending.getOpsReject());
1180    DO_STAT("vb_pending_queue_size", pending.getQueueSize());
1181    DO_STAT("vb_pending_queue_memory", pending.getQueueMemory());
1182    DO_STAT("vb_pending_queue_age", pending.getAge());
1183    DO_STAT("vb_pending_queue_pending", pending.getPendingWrites());
1184    DO_STAT("vb_pending_queue_fill", pending.getQueueFill());
1185    DO_STAT("vb_pending_queue_drain", pending.getQueueDrain());
1186    DO_STAT("vb_pending_rollback_item_count", pending.getRollbackItemCount());
1187
1188    // Dead vBuckets:
1189    DO_STAT("vb_dead_num", dead.getVBucketNumber());
1190
1191    // Totals:
1192    DO_STAT("ep_vb_total",
1193            active.getVBucketNumber() + replica.getVBucketNumber() +
1194                    pending.getVBucketNumber() + dead.getVBucketNumber());
1195    DO_STAT("ep_total_new_items",
1196            active.getOpsCreate() + replica.getOpsCreate() +
1197                    pending.getOpsCreate());
1198    DO_STAT("ep_total_del_items",
1199            active.getOpsDelete() + replica.getOpsDelete() +
1200                    pending.getOpsDelete());
1201    DO_STAT("ep_diskqueue_memory",
1202            active.getQueueMemory() + replica.getQueueMemory() +
1203                    pending.getQueueMemory());
1204    DO_STAT("ep_diskqueue_fill",
1205            active.getQueueFill() + replica.getQueueFill() +
1206                    pending.getQueueFill());
1207    DO_STAT("ep_diskqueue_drain",
1208            active.getQueueDrain() + replica.getQueueDrain() +
1209                    pending.getQueueDrain());
1210    DO_STAT("ep_diskqueue_pending",
1211            active.getPendingWrites() + replica.getPendingWrites() +
1212                    pending.getPendingWrites());
1213    DO_STAT("ep_meta_data_memory",
1214            active.getMetaDataMemory() + replica.getMetaDataMemory() +
1215                    pending.getMetaDataMemory());
1216    DO_STAT("ep_meta_data_disk",
1217            active.getMetaDataDisk() + replica.getMetaDataDisk() +
1218                    pending.getMetaDataDisk());
1219    DO_STAT("ep_checkpoint_memory",
1220            active.getCheckpointMemory() + replica.getCheckpointMemory() +
1221                    pending.getCheckpointMemory());
1222    DO_STAT("ep_checkpoint_memory_unreferenced",
1223            active.getCheckpointMemoryUnreferenced() +
1224                    replica.getCheckpointMemoryUnreferenced() +
1225                    pending.getCheckpointMemoryUnreferenced());
1226    DO_STAT("ep_checkpoint_memory_overhead",
1227            active.getCheckpointMemoryOverhead() +
1228                    replica.getCheckpointMemoryOverhead() +
1229                    pending.getCheckpointMemoryOverhead());
1230    DO_STAT("ep_total_cache_size",
1231            active.getCacheSize() + replica.getCacheSize() +
1232                    pending.getCacheSize());
1233    DO_STAT("rollback_item_count",
1234            active.getRollbackItemCount() + replica.getRollbackItemCount() +
1235                    pending.getRollbackItemCount());
1236    DO_STAT("ep_num_non_resident",
1237            active.getNonResident() + pending.getNonResident() +
1238                    replica.getNonResident());
1239    DO_STAT("ep_chk_persistence_remains",
1240            active.getChkPersistRemaining() + pending.getChkPersistRemaining() +
1241                    replica.getChkPersistRemaining());
1242
1243    // Add stats for tracking HLC drift
1244    DO_STAT("ep_active_hlc_drift", active.getTotalAbsHLCDrift().total);
1245    DO_STAT("ep_active_hlc_drift_count", active.getTotalAbsHLCDrift().updates);
1246    DO_STAT("ep_replica_hlc_drift", replica.getTotalAbsHLCDrift().total);
1247    DO_STAT("ep_replica_hlc_drift_count",
1248            replica.getTotalAbsHLCDrift().updates);
1249
1250    DO_STAT("ep_active_ahead_exceptions",
1251            active.getTotalHLCDriftExceptionCounters().ahead);
1252    DO_STAT("ep_active_behind_exceptions",
1253            active.getTotalHLCDriftExceptionCounters().behind);
1254    DO_STAT("ep_replica_ahead_exceptions",
1255            replica.getTotalHLCDriftExceptionCounters().ahead);
1256    DO_STAT("ep_replica_behind_exceptions",
1257            replica.getTotalHLCDriftExceptionCounters().behind);
1258
1259    // A single total for ahead exceptions accross all active/replicas
1260    DO_STAT("ep_clock_cas_drift_threshold_exceeded",
1261            active.getTotalHLCDriftExceptionCounters().ahead +
1262                    replica.getTotalHLCDriftExceptionCounters().ahead);
1263
1264    for (uint8_t ii = 0; ii < active.getNumDatatypes(); ++ii) {
1265        std::string name = "ep_active_datatype_";
1266        name += mcbp::datatype::to_string(ii);
1267        DO_STAT(name.c_str(), active.getDatatypeCount(ii));
1268    }
1269
1270    for (uint8_t ii = 0; ii < replica.getNumDatatypes(); ++ii) {
1271        std::string name = "ep_replica_datatype_";
1272        name += mcbp::datatype::to_string(ii);
1273        DO_STAT(name.c_str(), replica.getDatatypeCount(ii));
1274    }
1275
1276#undef DO_STAT
1277}
1278
1279void KVBucket::completeBGFetch(const DocKey& key,
1280                               uint16_t vbucket,
1281                               const void* cookie,
1282                               ProcessClock::time_point init,
1283                               bool isMeta) {
1284    ProcessClock::time_point startTime(ProcessClock::now());
1285    // Go find the data
1286    GetValue gcb = getROUnderlying(vbucket)->get(key, vbucket, isMeta);
1287
1288    {
1289      // Lock to prevent a race condition between a fetch for restore and delete
1290        LockHolder lh(vbsetMutex);
1291
1292        VBucketPtr vb = getVBucket(vbucket);
1293        if (vb) {
1294            VBucketBGFetchItem item{&gcb, cookie, init, isMeta};
1295            ENGINE_ERROR_CODE status =
1296                    vb->completeBGFetchForSingleItem(key, item, startTime);
1297            engine.notifyIOComplete(item.cookie, status);
1298        } else {
1299            LOG(EXTENSION_LOG_INFO,
1300                "vb:%" PRIu16
1301                " file was deleted in the "
1302                "middle of a bg fetch for key{%.*s}",
1303                vbucket,
1304                int(key.size()),
1305                key.data());
1306            engine.notifyIOComplete(cookie, ENGINE_NOT_MY_VBUCKET);
1307        }
1308    }
1309
1310    --stats.numRemainingBgJobs;
1311}
1312
1313void KVBucket::completeBGFetchMulti(uint16_t vbId,
1314                                    std::vector<bgfetched_item_t>& fetchedItems,
1315                                    ProcessClock::time_point startTime) {
1316    VBucketPtr vb = getVBucket(vbId);
1317    if (vb) {
1318        for (const auto& item : fetchedItems) {
1319            auto& key = item.first;
1320            auto* fetched_item = item.second;
1321            ENGINE_ERROR_CODE status = vb->completeBGFetchForSingleItem(
1322                    key, *fetched_item, startTime);
1323            engine.notifyIOComplete(fetched_item->cookie, status);
1324        }
1325        LOG(EXTENSION_LOG_DEBUG,
1326            "EP Store completes %" PRIu64
1327            " of batched background fetch "
1328            "for vBucket = %d endTime = %" PRIu64,
1329            uint64_t(fetchedItems.size()),
1330            vbId,
1331            std::chrono::duration_cast<std::chrono::milliseconds>(
1332                    ProcessClock::now().time_since_epoch())
1333                    .count());
1334    } else {
1335        for (const auto& item : fetchedItems) {
1336            engine.notifyIOComplete(item.second->cookie,
1337                                    ENGINE_NOT_MY_VBUCKET);
1338        }
1339        LOG(EXTENSION_LOG_WARNING,
1340            "EP Store completes %d of batched background fetch for "
1341            "for vBucket = %d that is already deleted",
1342            (int)fetchedItems.size(),
1343            vbId);
1344    }
1345}
1346
1347GetValue KVBucket::getInternal(const DocKey& key,
1348                               uint16_t vbucket,
1349                               const void *cookie,
1350                               vbucket_state_t allowedState,
1351                               get_options_t options) {
1352    vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1353        vbucket_state_replica : vbucket_state_active;
1354    VBucketPtr vb = getVBucket(vbucket);
1355
1356    if (!vb) {
1357        ++stats.numNotMyVBuckets;
1358        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1359    }
1360
1361    const bool honorStates = (options & HONOR_STATES);
1362
1363    ReaderLockHolder rlh(vb->getStateLock());
1364    if (honorStates) {
1365        vbucket_state_t vbState = vb->getState();
1366        if (vbState == vbucket_state_dead) {
1367            ++stats.numNotMyVBuckets;
1368            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1369        } else if (vbState == disallowedState) {
1370            ++stats.numNotMyVBuckets;
1371            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1372        } else if (vbState == vbucket_state_pending) {
1373            if (vb->addPendingOp(cookie)) {
1374                return GetValue(NULL, ENGINE_EWOULDBLOCK);
1375            }
1376        }
1377    }
1378
1379    { // collections read scope
1380        auto collectionsRHandle = vb->lockCollections(key);
1381        if (!collectionsRHandle.valid()) {
1382            return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION);
1383        }
1384
1385        return vb->getInternal(cookie,
1386                               engine,
1387                               bgFetchDelay,
1388                               options,
1389                               diskDeleteAll,
1390                               VBucket::GetKeyOnly::No,
1391                               collectionsRHandle);
1392    }
1393}
1394
1395GetValue KVBucket::getRandomKey() {
1396    VBucketMap::id_type max = vbMap.getSize();
1397
1398    const long start = random() % max;
1399    long curr = start;
1400    std::unique_ptr<Item> itm;
1401
1402    while (itm == NULL) {
1403        VBucketPtr vb = getVBucket(curr++);
1404        while (!vb || vb->getState() != vbucket_state_active) {
1405            if (curr == start) {
1406                return GetValue(NULL, ENGINE_KEY_ENOENT);
1407            }
1408            if (curr == max) {
1409                curr = 0;
1410            }
1411
1412            vb = getVBucket(curr++);
1413        }
1414
1415        if ((itm = vb->ht.getRandomKey(random()))) {
1416            GetValue rv(std::move(itm), ENGINE_SUCCESS);
1417            return rv;
1418        }
1419
1420        if (curr == max) {
1421            curr = 0;
1422        }
1423
1424        if (curr == start) {
1425            return GetValue(NULL, ENGINE_KEY_ENOENT);
1426        }
1427        // Search next vbucket
1428    }
1429
1430    return GetValue(NULL, ENGINE_KEY_ENOENT);
1431}
1432
1433ENGINE_ERROR_CODE KVBucket::getMetaData(const DocKey& key,
1434                                        uint16_t vbucket,
1435                                        const void* cookie,
1436                                        ItemMetaData& metadata,
1437                                        uint32_t& deleted,
1438                                        uint8_t& datatype)
1439{
1440    VBucketPtr vb = getVBucket(vbucket);
1441
1442    if (!vb) {
1443        ++stats.numNotMyVBuckets;
1444        return ENGINE_NOT_MY_VBUCKET;
1445    }
1446
1447    ReaderLockHolder rlh(vb->getStateLock());
1448    if (vb->getState() == vbucket_state_dead ||
1449        vb->getState() == vbucket_state_replica) {
1450        ++stats.numNotMyVBuckets;
1451        return ENGINE_NOT_MY_VBUCKET;
1452    }
1453
1454    { // collections read scope
1455        auto collectionsRHandle = vb->lockCollections(key);
1456        if (!collectionsRHandle.valid()) {
1457            return ENGINE_UNKNOWN_COLLECTION;
1458        }
1459
1460        return vb->getMetaData(cookie,
1461                               engine,
1462                               bgFetchDelay,
1463                               collectionsRHandle,
1464                               metadata,
1465                               deleted,
1466                               datatype);
1467    }
1468}
1469
1470ENGINE_ERROR_CODE KVBucket::setWithMeta(Item& itm,
1471                                        uint64_t cas,
1472                                        uint64_t* seqno,
1473                                        const void* cookie,
1474                                        PermittedVBStates permittedVBStates,
1475                                        CheckConflicts checkConflicts,
1476                                        bool allowExisting,
1477                                        GenerateBySeqno genBySeqno,
1478                                        GenerateCas genCas,
1479                                        ExtendedMetaData* emd,
1480                                        bool isReplication) {
1481    VBucketPtr vb = getVBucket(itm.getVBucketId());
1482    if (!vb) {
1483        ++stats.numNotMyVBuckets;
1484        return ENGINE_NOT_MY_VBUCKET;
1485    }
1486
1487    ReaderLockHolder rlh(vb->getStateLock());
1488    if (!permittedVBStates.test(vb->getState())) {
1489        if (vb->getState() == vbucket_state_pending) {
1490            if (vb->addPendingOp(cookie)) {
1491                return ENGINE_EWOULDBLOCK;
1492            }
1493        } else {
1494            ++stats.numNotMyVBuckets;
1495            return ENGINE_NOT_MY_VBUCKET;
1496        }
1497    } else if (vb->isTakeoverBackedUp()) {
1498        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a setWithMeta op"
1499                ", becuase takeover is lagging", vb->getId());
1500        return ENGINE_TMPFAIL;
1501    }
1502
1503    //check for the incoming item's CAS validity
1504    if (!Item::isValidCas(itm.getCas())) {
1505        return ENGINE_KEY_EEXISTS;
1506    }
1507
1508    ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
1509    { // collections read scope
1510        auto collectionsRHandle = vb->lockCollections(itm.getKey());
1511        if (!collectionsRHandle.valid()) {
1512            rv = ENGINE_UNKNOWN_COLLECTION;
1513        } else {
1514            rv = vb->setWithMeta(itm,
1515                                 cas,
1516                                 seqno,
1517                                 cookie,
1518                                 engine,
1519                                 bgFetchDelay,
1520                                 checkConflicts,
1521                                 allowExisting,
1522                                 genBySeqno,
1523                                 genCas,
1524                                 isReplication,
1525                                 collectionsRHandle);
1526        }
1527    }
1528
1529    if (rv == ENGINE_SUCCESS) {
1530        checkAndMaybeFreeMemory();
1531    }
1532    return rv;
1533}
1534
1535GetValue KVBucket::getAndUpdateTtl(const DocKey& key, uint16_t vbucket,
1536                                   const void *cookie, time_t exptime)
1537{
1538    VBucketPtr vb = getVBucket(vbucket);
1539    if (!vb) {
1540        ++stats.numNotMyVBuckets;
1541        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1542    }
1543
1544    ReaderLockHolder rlh(vb->getStateLock());
1545    if (vb->getState() == vbucket_state_dead) {
1546        ++stats.numNotMyVBuckets;
1547        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1548    } else if (vb->getState() == vbucket_state_replica) {
1549        ++stats.numNotMyVBuckets;
1550        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1551    } else if (vb->getState() == vbucket_state_pending) {
1552        if (vb->addPendingOp(cookie)) {
1553            return GetValue(NULL, ENGINE_EWOULDBLOCK);
1554        }
1555    }
1556
1557    { // collections read scope
1558        auto collectionsRHandle = vb->lockCollections(key);
1559        if (!collectionsRHandle.valid()) {
1560            return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION);
1561        }
1562
1563        return vb->getAndUpdateTtl(
1564                cookie, engine, bgFetchDelay, exptime, collectionsRHandle);
1565    }
1566}
1567
1568GetValue KVBucket::getLocked(const DocKey& key, uint16_t vbucket,
1569                             rel_time_t currentTime, uint32_t lockTimeout,
1570                             const void *cookie) {
1571    VBucketPtr vb = getVBucket(vbucket);
1572    if (!vb || vb->getState() != vbucket_state_active) {
1573        ++stats.numNotMyVBuckets;
1574        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1575    }
1576
1577    { // collections read scope
1578        auto collectionsRHandle = vb->lockCollections(key);
1579        if (!collectionsRHandle.valid()) {
1580            return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION);
1581        }
1582
1583        return vb->getLocked(currentTime,
1584                             lockTimeout,
1585                             cookie,
1586                             engine,
1587                             bgFetchDelay,
1588                             collectionsRHandle);
1589    }
1590}
1591
1592ENGINE_ERROR_CODE KVBucket::unlockKey(const DocKey& key,
1593                                      uint16_t vbucket,
1594                                      uint64_t cas,
1595                                      rel_time_t currentTime)
1596{
1597
1598    VBucketPtr vb = getVBucket(vbucket);
1599    if (!vb || vb->getState() != vbucket_state_active) {
1600        ++stats.numNotMyVBuckets;
1601        return ENGINE_NOT_MY_VBUCKET;
1602    }
1603
1604    auto collectionsRHandle = vb->lockCollections(key);
1605    if (!collectionsRHandle.valid()) {
1606        return ENGINE_UNKNOWN_COLLECTION;
1607    }
1608
1609    auto hbl = vb->ht.getLockedBucket(key);
1610    StoredValue* v = vb->fetchValidValue(hbl,
1611                                         key,
1612                                         WantsDeleted::Yes,
1613                                         TrackReference::Yes,
1614                                         QueueExpired::Yes);
1615
1616    if (v) {
1617        if (VBucket::isLogicallyNonExistent(*v, collectionsRHandle)) {
1618            vb->ht.cleanupIfTemporaryItem(hbl, *v);
1619            return ENGINE_KEY_ENOENT;
1620        }
1621        if (v->isLocked(currentTime)) {
1622            if (v->getCas() == cas) {
1623                v->unlock();
1624                return ENGINE_SUCCESS;
1625            }
1626            return ENGINE_LOCKED_TMPFAIL;
1627        }
1628        return ENGINE_TMPFAIL;
1629    } else {
1630        if (eviction_policy == VALUE_ONLY) {
1631            return ENGINE_KEY_ENOENT;
1632        } else {
1633            // With the full eviction, an item's lock is automatically
1634            // released when the item is evicted from memory. Therefore,
1635            // we simply return ENGINE_TMPFAIL when we receive unlockKey
1636            // for an item that is not in memocy cache. Note that we don't
1637            // spawn any bg fetch job to figure out if an item actually
1638            // exists in disk or not.
1639            return ENGINE_TMPFAIL;
1640        }
1641    }
1642}
1643
1644ENGINE_ERROR_CODE KVBucket::getKeyStats(const DocKey& key,
1645                                        uint16_t vbucket,
1646                                        const void* cookie,
1647                                        struct key_stats& kstats,
1648                                        WantsDeleted wantsDeleted) {
1649    VBucketPtr vb = getVBucket(vbucket);
1650    if (!vb) {
1651        return ENGINE_NOT_MY_VBUCKET;
1652    }
1653
1654    { // collections read scope
1655        auto collectionsRHandle = vb->lockCollections(key);
1656        if (!collectionsRHandle.valid()) {
1657            return ENGINE_UNKNOWN_COLLECTION;
1658        }
1659
1660        return vb->getKeyStats(cookie,
1661                               engine,
1662                               bgFetchDelay,
1663                               kstats,
1664                               wantsDeleted,
1665                               collectionsRHandle);
1666}
1667}
1668
1669std::string KVBucket::validateKey(const DocKey& key, uint16_t vbucket,
1670                                  Item &diskItem) {
1671    VBucketPtr vb = getVBucket(vbucket);
1672
1673    auto collectionsRHandle = vb->lockCollections(key);
1674    if (!collectionsRHandle.valid()) {
1675        return "collection_unknown";
1676    }
1677
1678    auto hbl = vb->ht.getLockedBucket(key);
1679    StoredValue* v = vb->fetchValidValue(
1680            hbl, key, WantsDeleted::Yes, TrackReference::No, QueueExpired::Yes);
1681
1682    if (v) {
1683        if (VBucket::isLogicallyNonExistent(*v, collectionsRHandle)) {
1684            vb->ht.cleanupIfTemporaryItem(hbl, *v);
1685            return "item_deleted";
1686        }
1687
1688        if (diskItem.getFlags() != v->getFlags()) {
1689            return "flags_mismatch";
1690        } else if (v->isResident() && memcmp(diskItem.getData(),
1691                                             v->getValue()->getData(),
1692                                             diskItem.getNBytes())) {
1693            return "data_mismatch";
1694        } else {
1695            return "valid";
1696        }
1697    } else {
1698        return "item_deleted";
1699    }
1700
1701}
1702
1703ENGINE_ERROR_CODE KVBucket::deleteItem(const DocKey& key,
1704                                       uint64_t& cas,
1705                                       uint16_t vbucket,
1706                                       const void* cookie,
1707                                       ItemMetaData* itemMeta,
1708                                       mutation_descr_t& mutInfo) {
1709    VBucketPtr vb = getVBucket(vbucket);
1710    if (!vb || vb->getState() == vbucket_state_dead) {
1711        ++stats.numNotMyVBuckets;
1712        return ENGINE_NOT_MY_VBUCKET;
1713    } else if (vb->getState() == vbucket_state_replica) {
1714        ++stats.numNotMyVBuckets;
1715        return ENGINE_NOT_MY_VBUCKET;
1716    } else if (vb->getState() == vbucket_state_pending) {
1717        if (vb->addPendingOp(cookie)) {
1718            return ENGINE_EWOULDBLOCK;
1719        }
1720    } else if (vb->isTakeoverBackedUp()) {
1721        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a delete op"
1722                ", becuase takeover is lagging", vb->getId());
1723        return ENGINE_TMPFAIL;
1724    }
1725    { // collections read scope
1726        auto collectionsRHandle = vb->lockCollections(key);
1727        if (!collectionsRHandle.valid()) {
1728            return ENGINE_UNKNOWN_COLLECTION;
1729        }
1730
1731        return vb->deleteItem(cas,
1732                              cookie,
1733                              engine,
1734                              bgFetchDelay,
1735                              itemMeta,
1736                              mutInfo,
1737                              collectionsRHandle);
1738    }
1739}
1740
1741ENGINE_ERROR_CODE KVBucket::deleteWithMeta(const DocKey& key,
1742                                           uint64_t& cas,
1743                                           uint64_t* seqno,
1744                                           uint16_t vbucket,
1745                                           const void* cookie,
1746                                           PermittedVBStates permittedVBStates,
1747                                           CheckConflicts checkConflicts,
1748                                           const ItemMetaData& itemMeta,
1749                                           bool backfill,
1750                                           GenerateBySeqno genBySeqno,
1751                                           GenerateCas generateCas,
1752                                           uint64_t bySeqno,
1753                                           ExtendedMetaData* emd,
1754                                           bool isReplication) {
1755    VBucketPtr vb = getVBucket(vbucket);
1756
1757    if (!vb) {
1758        ++stats.numNotMyVBuckets;
1759        return ENGINE_NOT_MY_VBUCKET;
1760    }
1761
1762    ReaderLockHolder rlh(vb->getStateLock());
1763    if (!permittedVBStates.test(vb->getState())) {
1764        if (vb->getState() == vbucket_state_pending) {
1765            if (vb->addPendingOp(cookie)) {
1766                return ENGINE_EWOULDBLOCK;
1767            }
1768        } else {
1769            ++stats.numNotMyVBuckets;
1770            return ENGINE_NOT_MY_VBUCKET;
1771        }
1772    } else if (vb->isTakeoverBackedUp()) {
1773        LOG(EXTENSION_LOG_DEBUG,
1774            "(vb %u) Returned TMPFAIL to a deleteWithMeta op"
1775            ", becuase takeover is lagging",
1776            vb->getId());
1777        return ENGINE_TMPFAIL;
1778    }
1779
1780    //check for the incoming item's CAS validity
1781    if (!Item::isValidCas(itemMeta.cas)) {
1782        return ENGINE_KEY_EEXISTS;
1783    }
1784
1785    { // collections read scope
1786        auto collectionsRHandle = vb->lockCollections(key);
1787        if (!collectionsRHandle.valid()) {
1788            return ENGINE_UNKNOWN_COLLECTION;
1789        }
1790
1791        return vb->deleteWithMeta(cas,
1792                                  seqno,
1793                                  cookie,
1794                                  engine,
1795                                  bgFetchDelay,
1796                                  checkConflicts,
1797                                  itemMeta,
1798                                  backfill,
1799                                  genBySeqno,
1800                                  generateCas,
1801                                  bySeqno,
1802                                  isReplication,
1803                                  collectionsRHandle);
1804    }
1805}
1806
1807void KVBucket::reset() {
1808    auto buckets = vbMap.getBuckets();
1809    for (auto vbid : buckets) {
1810        auto vb = getLockedVBucket(vbid);
1811        if (vb) {
1812            vb->ht.clear();
1813            vb->checkpointManager->clear(vb->getState());
1814            vb->resetStats();
1815            vb->setPersistedSnapshot(0, 0);
1816            LOG(EXTENSION_LOG_NOTICE,
1817                "KVBucket::reset(): Successfully flushed vb:%" PRIu16,
1818                vbid);
1819        }
1820    }
1821    LOG(EXTENSION_LOG_NOTICE, "KVBucket::reset(): Successfully flushed bucket");
1822}
1823
1824void KVBucket::setDeleteAllComplete() {
1825    // Notify memcached about delete all task completion, and
1826    // set diskFlushall flag to false
1827    if (deleteAllTaskCtx.cookie) {
1828        engine.notifyIOComplete(deleteAllTaskCtx.cookie, ENGINE_SUCCESS);
1829    }
1830    bool inverse = false;
1831    deleteAllTaskCtx.delay.compare_exchange_strong(inverse, true);
1832    inverse = true;
1833    diskDeleteAll.compare_exchange_strong(inverse, false);
1834}
1835
1836std::vector<vbucket_state *> KVBucket::loadVBucketState()
1837{
1838    return getOneROUnderlying()->listPersistedVbuckets();
1839}
1840
1841void KVBucket::warmupCompleted() {
1842    // Snapshot VBucket state after warmup to ensure Failover table is
1843    // persisted.
1844    scheduleVBStatePersist();
1845
1846    if (engine.getConfiguration().getAlogPath().length() > 0) {
1847
1848        if (engine.getConfiguration().isAccessScannerEnabled()) {
1849            {
1850                LockHolder lh(accessScanner.mutex);
1851                accessScanner.enabled = true;
1852            }
1853            LOG(EXTENSION_LOG_NOTICE, "Access Scanner task enabled");
1854            size_t smin = engine.getConfiguration().getAlogSleepTime();
1855            setAccessScannerSleeptime(smin, true);
1856        } else {
1857            LockHolder lh(accessScanner.mutex);
1858            accessScanner.enabled = false;
1859            LOG(EXTENSION_LOG_NOTICE, "Access Scanner task disabled");
1860        }
1861
1862        Configuration &config = engine.getConfiguration();
1863        config.addValueChangedListener(
1864                "access_scanner_enabled",
1865                std::make_unique<EPStoreValueChangeListener>(*this));
1866        config.addValueChangedListener(
1867                "alog_sleep_time",
1868                std::make_unique<EPStoreValueChangeListener>(*this));
1869        config.addValueChangedListener(
1870                "alog_task_time",
1871                std::make_unique<EPStoreValueChangeListener>(*this));
1872    }
1873
1874    // "0" sleep_time means that the first snapshot task will be executed
1875    // right after warmup. Subsequent snapshot tasks will be scheduled every
1876    // 60 sec by default.
1877    ExecutorPool *iom = ExecutorPool::get();
1878    ExTask task = std::make_shared<StatSnap>(&engine, 0, false);
1879    statsSnapshotTaskId = iom->schedule(task);
1880}
1881
1882bool KVBucket::maybeEnableTraffic()
1883{
1884    // @todo rename.. skal vaere isTrafficDisabled elns
1885    double memoryUsed =
1886            static_cast<double>(stats.getEstimatedTotalMemoryUsed());
1887    double maxSize = static_cast<double>(stats.getMaxDataSize());
1888
1889    if (memoryUsed  >= stats.mem_low_wat) {
1890        LOG(EXTENSION_LOG_NOTICE,
1891            "Total memory use reached to the low water mark, stop warmup"
1892            ": memoryUsed (%f) >= low water mark (%" PRIu64 ")",
1893            memoryUsed, uint64_t(stats.mem_low_wat.load()));
1894        return true;
1895    } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
1896        LOG(EXTENSION_LOG_NOTICE,
1897                "Enough MB of data loaded to enable traffic"
1898                ": memoryUsed (%f) > (maxSize(%f) * warmupMemUsedCap(%f))",
1899                 memoryUsed, maxSize, stats.warmupMemUsedCap.load());
1900        return true;
1901    } else if (eviction_policy == VALUE_ONLY &&
1902               stats.warmedUpValues >=
1903                               (stats.warmedUpKeys * stats.warmupNumReadCap)) {
1904        // Let ep-engine think we're done with the warmup phase
1905        // (we should refactor this into "enableTraffic")
1906        LOG(EXTENSION_LOG_NOTICE,
1907            "Enough number of items loaded to enable traffic (value eviction)"
1908            ": warmedUpValues(%" PRIu64 ") >= (warmedUpKeys(%" PRIu64 ") * "
1909            "warmupNumReadCap(%f))",  uint64_t(stats.warmedUpValues.load()),
1910            uint64_t(stats.warmedUpKeys.load()), stats.warmupNumReadCap.load());
1911        return true;
1912    } else if (eviction_policy == FULL_EVICTION &&
1913               stats.warmedUpValues >=
1914                            (warmupTask->getEstimatedItemCount() *
1915                             stats.warmupNumReadCap)) {
1916        // In case of FULL EVICTION, warmed up keys always matches the number
1917        // of warmed up values, therefore for honoring the min_item threshold
1918        // in this scenario, we can consider warmup's estimated item count.
1919        LOG(EXTENSION_LOG_NOTICE,
1920            "Enough number of items loaded to enable traffic (full eviction)"
1921            ": warmedUpValues(%" PRIu64 ") >= (warmup est items(%" PRIu64 ") * "
1922            "warmupNumReadCap(%f))",  uint64_t(stats.warmedUpValues.load()),
1923            uint64_t(warmupTask->getEstimatedItemCount()),
1924            stats.warmupNumReadCap.load());
1925        return true;
1926    }
1927    return false;
1928}
1929
1930bool KVBucket::isWarmingUp() {
1931    return warmupTask && !warmupTask->isComplete();
1932}
1933
1934bool KVBucket::shouldSetVBStateBlock(const void* cookie) {
1935    if (warmupTask) {
1936        return warmupTask->shouldSetVBStateBlock(cookie);
1937    }
1938    return false;
1939}
1940
1941bool KVBucket::isWarmupOOMFailure() {
1942    return warmupTask && warmupTask->hasOOMFailure();
1943}
1944
1945void KVBucket::stopWarmup(void)
1946{
1947    // forcefully stop current warmup task
1948    if (isWarmingUp()) {
1949        LOG(EXTENSION_LOG_NOTICE,
1950            "Stopping warmup while engine is loading "
1951            "data from underlying storage, shutdown = %s",
1952            stats.isShutdown ? "yes" : "no");
1953        warmupTask->stop();
1954    }
1955}
1956
1957bool KVBucket::isMemoryUsageTooHigh() {
1958    double memoryUsed =
1959            static_cast<double>(stats.getEstimatedTotalMemoryUsed());
1960    double maxSize = static_cast<double>(stats.getMaxDataSize());
1961    return memoryUsed > (maxSize * backfillMemoryThreshold);
1962}
1963
1964// Trigger memory reduction (ItemPager) if we've exceeded high water
1965void KVBucket::checkAndMaybeFreeMemory() {
1966    if (stats.getEstimatedTotalMemoryUsed() > stats.mem_high_wat) {
1967        attemptToFreeMemory();
1968    }
1969}
1970
1971void KVBucket::setBackfillMemoryThreshold(double threshold) {
1972    backfillMemoryThreshold = threshold;
1973}
1974
1975void KVBucket::setExpiryPagerSleeptime(size_t val) {
1976    LockHolder lh(expiryPager.mutex);
1977
1978    ExecutorPool::get()->cancel(expiryPager.task);
1979
1980    expiryPager.sleeptime = val;
1981    if (expiryPager.enabled) {
1982        ExTask expTask = std::make_shared<ExpiredItemPager>(
1983                &engine, stats, expiryPager.sleeptime);
1984        expiryPager.task = ExecutorPool::get()->schedule(expTask);
1985    } else {
1986        LOG(EXTENSION_LOG_DEBUG, "Expiry pager disabled, "
1987                                 "enabling it will make exp_pager_stime (%lu)"
1988                                 "to go into effect!", val);
1989    }
1990}
1991
1992void KVBucket::setExpiryPagerTasktime(ssize_t val) {
1993    LockHolder lh(expiryPager.mutex);
1994    if (expiryPager.enabled) {
1995        ExecutorPool::get()->cancel(expiryPager.task);
1996        ExTask expTask = std::make_shared<ExpiredItemPager>(
1997                &engine, stats, expiryPager.sleeptime, val);
1998        expiryPager.task = ExecutorPool::get()->schedule(expTask);
1999    } else {
2000        LOG(EXTENSION_LOG_DEBUG, "Expiry pager disabled, "
2001                                 "enabling it will make exp_pager_stime (%lu)"
2002                                 "to go into effect!", val);
2003    }
2004}
2005
2006void KVBucket::enableExpiryPager() {
2007    LockHolder lh(expiryPager.mutex);
2008    if (!expiryPager.enabled) {
2009        expiryPager.enabled = true;
2010
2011        ExecutorPool::get()->cancel(expiryPager.task);
2012        ExTask expTask = std::make_shared<ExpiredItemPager>(
2013                &engine, stats, expiryPager.sleeptime);
2014        expiryPager.task = ExecutorPool::get()->schedule(expTask);
2015    } else {
2016        LOG(EXTENSION_LOG_DEBUG, "Expiry Pager already enabled!");
2017    }
2018}
2019
2020void KVBucket::disableExpiryPager() {
2021    LockHolder lh(expiryPager.mutex);
2022    if (expiryPager.enabled) {
2023        ExecutorPool::get()->cancel(expiryPager.task);
2024        expiryPager.enabled = false;
2025    } else {
2026        LOG(EXTENSION_LOG_DEBUG, "Expiry Pager already disabled!");
2027    }
2028}
2029
2030void KVBucket::wakeUpExpiryPager() {
2031    LockHolder lh(expiryPager.mutex);
2032    if (expiryPager.enabled) {
2033        ExecutorPool::get()->wake(expiryPager.task);
2034    }
2035}
2036
2037void KVBucket::wakeItemPager() {
2038    if (itemPagerTask->getState() == TASK_SNOOZED) {
2039        ExecutorPool::get()->wake(itemPagerTask->getId());
2040    }
2041}
2042
2043void KVBucket::enableItemPager() {
2044    ExecutorPool::get()->cancel(itemPagerTask->getId());
2045    ExecutorPool::get()->schedule(itemPagerTask);
2046}
2047
2048void KVBucket::disableItemPager() {
2049    ExecutorPool::get()->cancel(itemPagerTask->getId());
2050}
2051
2052void KVBucket::wakeItemFreqDecayerTask() {
2053    auto& t = dynamic_cast<ItemFreqDecayerTask&>(*itemFreqDecayerTask);
2054    t.wakeup();
2055}
2056
2057void KVBucket::enableAccessScannerTask() {
2058    LockHolder lh(accessScanner.mutex);
2059    if (!accessScanner.enabled) {
2060        accessScanner.enabled = true;
2061
2062        if (accessScanner.sleeptime != 0) {
2063            ExecutorPool::get()->cancel(accessScanner.task);
2064        }
2065
2066        size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime();
2067        accessScanner.sleeptime = alogSleepTime * 60;
2068        if (accessScanner.sleeptime != 0) {
2069            ExTask task =
2070                    std::make_shared<AccessScanner>(*this,
2071                                                    engine.getConfiguration(),
2072                                                    stats,
2073                                                    accessScanner.sleeptime,
2074                                                    true);
2075            accessScanner.task = ExecutorPool::get()->schedule(task);
2076        } else {
2077            LOG(EXTENSION_LOG_NOTICE, "Did not enable access scanner task, "
2078                                      "as alog_sleep_time is set to zero!");
2079        }
2080    } else {
2081        LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!");
2082    }
2083}
2084
2085void KVBucket::disableAccessScannerTask() {
2086    LockHolder lh(accessScanner.mutex);
2087    if (accessScanner.enabled) {
2088        ExecutorPool::get()->cancel(accessScanner.task);
2089        accessScanner.sleeptime = 0;
2090        accessScanner.enabled = false;
2091    } else {
2092        LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!");
2093    }
2094}
2095
2096void KVBucket::setAccessScannerSleeptime(size_t val, bool useStartTime) {
2097    LockHolder lh(accessScanner.mutex);
2098
2099    if (accessScanner.enabled) {
2100        if (accessScanner.sleeptime != 0) {
2101            ExecutorPool::get()->cancel(accessScanner.task);
2102        }
2103
2104        // store sleeptime in seconds
2105        accessScanner.sleeptime = val * 60;
2106        if (accessScanner.sleeptime != 0) {
2107            ExTask task =
2108                    std::make_shared<AccessScanner>(*this,
2109                                                    engine.getConfiguration(),
2110                                                    stats,
2111                                                    accessScanner.sleeptime,
2112                                                    useStartTime);
2113            accessScanner.task = ExecutorPool::get()->schedule(task);
2114        }
2115    }
2116}
2117
2118void KVBucket::resetAccessScannerStartTime() {
2119    LockHolder lh(accessScanner.mutex);
2120
2121    if (accessScanner.enabled) {
2122        if (accessScanner.sleeptime != 0) {
2123            ExecutorPool::get()->cancel(accessScanner.task);
2124            // re-schedule task according to the new task start hour
2125            ExTask task =
2126                    std::make_shared<AccessScanner>(*this,
2127                                                    engine.getConfiguration(),
2128                                                    stats,
2129                                                    accessScanner.sleeptime,
2130                                                    true);
2131            accessScanner.task = ExecutorPool::get()->schedule(task);
2132        }
2133    }
2134}
2135
2136void KVBucket::setAllBloomFilters(bool to) {
2137    for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); vbid++) {
2138        VBucketPtr vb = vbMap.getBucket(vbid);
2139        if (vb) {
2140            if (to) {
2141                vb->setFilterStatus(BFILTER_ENABLED);
2142            } else {
2143                vb->setFilterStatus(BFILTER_DISABLED);
2144            }
2145        }
2146    }
2147}
2148
2149void KVBucket::visit(VBucketVisitor &visitor)
2150{
2151    for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); ++vbid) {
2152        VBucketPtr vb = vbMap.getBucket(vbid);
2153        if (vb) {
2154            visitor.visitBucket(vb);
2155        }
2156    }
2157    visitor.complete();
2158}
2159
2160size_t KVBucket::visit(std::unique_ptr<VBucketVisitor> visitor,
2161                       const char* lbl,
2162                       TaskId id,
2163                       double sleepTime,
2164                       std::chrono::microseconds maxExpectedDuration) {
2165    auto task = std::make_shared<VBCBAdaptor>(
2166            this, id, std::move(visitor), lbl, sleepTime, /*shutdown*/ false);
2167    task->setMaxExpectedDuration(maxExpectedDuration);
2168    return ExecutorPool::get()->schedule(task);
2169}
2170
2171KVBucket::Position KVBucket::pauseResumeVisit(PauseResumeVBVisitor& visitor,
2172                                              Position& start_pos) {
2173    uint16_t vbid = start_pos.vbucket_id;
2174    for (; vbid < vbMap.getSize(); ++vbid) {
2175        VBucketPtr vb = vbMap.getBucket(vbid);
2176        if (vb) {
2177            bool paused = !visitor.visit(*vb);
2178            if (paused) {
2179                break;
2180            }
2181        }
2182    }
2183
2184    return KVBucket::Position(vbid);
2185}
2186
2187KVBucket::Position KVBucket::startPosition() const
2188{
2189    return KVBucket::Position(0);
2190}
2191
2192KVBucket::Position KVBucket::endPosition() const
2193{
2194    return KVBucket::Position(vbMap.getSize());
2195}
2196
2197VBCBAdaptor::VBCBAdaptor(KVBucket* s,
2198                         TaskId id,
2199                         std::unique_ptr<VBucketVisitor> v,
2200                         const char* l,
2201                         double sleep,
2202                         bool shutdown)
2203    : GlobalTask(&s->getEPEngine(), id, 0, shutdown),
2204      store(s),
2205      visitor(std::move(v)),
2206      label(l),
2207      sleepTime(sleep),
2208      maxDuration(std::chrono::microseconds::max()),
2209      currentvb(0) {
2210    const VBucketFilter& vbFilter = visitor->getVBucketFilter();
2211    for (auto vbid : store->getVBuckets().getBuckets()) {
2212        if (vbFilter(vbid)) {
2213            vbList.push(vbid);
2214        }
2215    }
2216}
2217
2218std::string VBCBAdaptor::getDescription() {
2219    return std::string(label) + " on vb " + std::to_string(currentvb.load());
2220}
2221
2222bool VBCBAdaptor::run(void) {
2223    if (!vbList.empty()) {
2224        currentvb.store(vbList.front());
2225        VBucketPtr vb = store->getVBucket(currentvb);
2226        if (vb) {
2227            if (visitor->pauseVisitor()) {
2228                snooze(sleepTime);
2229                return true;
2230            }
2231            visitor->visitBucket(vb);
2232        }
2233        vbList.pop();
2234    }
2235
2236    bool isdone = vbList.empty();
2237    if (isdone) {
2238        visitor->complete();
2239    }
2240    return !isdone;
2241}
2242
2243void KVBucket::resetUnderlyingStats(void)
2244{
2245    for (size_t i = 0; i < vbMap.shards.size(); i++) {
2246        KVShard *shard = vbMap.shards[i].get();
2247        shard->getRWUnderlying()->resetStats();
2248        shard->getROUnderlying()->resetStats();
2249    }
2250
2251    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
2252        stats.schedulingHisto[i].reset();
2253        stats.taskRuntimeHisto[i].reset();
2254    }
2255}
2256
2257void KVBucket::addKVStoreStats(ADD_STAT add_stat, const void* cookie) {
2258    for (size_t i = 0; i < vbMap.shards.size(); i++) {
2259        /* Add the different KVStore instances into a set and then
2260         * retrieve the stats from each instance separately. This
2261         * is because CouchKVStore has separate read only and read
2262         * write instance whereas RocksDBKVStore has only instance
2263         * for both read write and read-only.
2264         */
2265        std::set<KVStore *> underlyingSet;
2266        underlyingSet.insert(vbMap.shards[i]->getRWUnderlying());
2267        underlyingSet.insert(vbMap.shards[i]->getROUnderlying());
2268
2269        for (auto* store : underlyingSet) {
2270            store->addStats(add_stat, cookie);
2271        }
2272    }
2273}
2274
2275void KVBucket::addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie) {
2276    for (size_t i = 0; i < vbMap.shards.size(); i++) {
2277        std::set<KVStore*> underlyingSet;
2278        underlyingSet.insert(vbMap.shards[i]->getRWUnderlying());
2279        underlyingSet.insert(vbMap.shards[i]->getROUnderlying());
2280
2281        for (auto* store : underlyingSet) {
2282            store->addTimingStats(add_stat, cookie);
2283        }
2284    }
2285}
2286
2287bool KVBucket::getKVStoreStat(const char* name, size_t& value, KVSOption option)
2288{
2289    value = 0;
2290    bool success = true;
2291    for (const auto& shard : vbMap.shards) {
2292        size_t per_shard_value;
2293
2294        if (option == KVSOption::RO || option == KVSOption::BOTH) {
2295            success &= shard->getROUnderlying()->getStat(name, per_shard_value);
2296            value += per_shard_value;
2297        }
2298
2299        if (option == KVSOption::RW || option == KVSOption::BOTH) {
2300            success &= shard->getRWUnderlying()->getStat(name, per_shard_value);
2301            value += per_shard_value;
2302        }
2303    }
2304    return success;
2305}
2306
2307KVStore *KVBucket::getOneROUnderlying(void) {
2308    return vbMap.shards[EP_PRIMARY_SHARD]->getROUnderlying();
2309}
2310
2311KVStore *KVBucket::getOneRWUnderlying(void) {
2312    return vbMap.shards[EP_PRIMARY_SHARD]->getRWUnderlying();
2313}
2314
2315TaskStatus KVBucket::rollback(uint16_t vbid, uint64_t rollbackSeqno) {
2316    std::unique_lock<std::mutex> vbset(vbsetMutex);
2317
2318    auto vb = getLockedVBucket(vbid, std::try_to_lock);
2319
2320    if (!vb.owns_lock()) {
2321        return TaskStatus::Reschedule; // Reschedule a vbucket rollback task.
2322    }
2323
2324    if (!vb.getVB()) {
2325        LOG(EXTENSION_LOG_WARNING,
2326            "vb:%" PRIu16 " Aborting rollback as the vbucket was not found",
2327            vbid);
2328        return TaskStatus::Abort;
2329    }
2330
2331    ReaderLockHolder rlh(vb->getStateLock());
2332    if ((vb->getState() == vbucket_state_replica) ||
2333        (vb->getState() == vbucket_state_pending)) {
2334        uint64_t prevHighSeqno =
2335                static_cast<uint64_t>(vb->checkpointManager->getHighSeqno());
2336        if (rollbackSeqno != 0) {
2337            RollbackResult result = doRollback(vbid, rollbackSeqno);
2338
2339            if (result.success /* not suceess hence reset vbucket to
2340                                  avoid data loss */
2341                &&
2342                (result.highSeqno > 0) /* if 0, reset vbucket for a clean start
2343                                          instead of deleting everything in it
2344                                        */) {
2345                rollbackUnpersistedItems(*vb, result.highSeqno);
2346                vb->postProcessRollback(result, prevHighSeqno);
2347                engine.getDcpConnMap().closeStreamsDueToRollback(vbid);
2348                return TaskStatus::Complete;
2349            }
2350        }
2351
2352        if (resetVBucket_UNLOCKED(vb, vbset)) {
2353            VBucketPtr newVb = vbMap.getBucket(vbid);
2354            newVb->incrRollbackItemCount(prevHighSeqno);
2355            engine.getDcpConnMap().closeStreamsDueToRollback(vbid);
2356            return TaskStatus::Complete;
2357        }
2358        LOG(EXTENSION_LOG_WARNING,
2359            "vb:%" PRIu16 " Aborting rollback as reset of the vbucket failed",
2360            vbid);
2361        return TaskStatus::Abort;
2362    } else {
2363        LOG(EXTENSION_LOG_WARNING,
2364            "vb:%" PRIu16 " Rollback not supported on the vbucket state %s",
2365            vbid,
2366            VBucket::toString(vb->getState()));
2367        return TaskStatus::Abort;
2368    }
2369}
2370
2371void KVBucket::attemptToFreeMemory() {
2372    static_cast<ItemPager*>(itemPagerTask.get())->scheduleNow();
2373}
2374
2375void KVBucket::runDefragmenterTask() {
2376    defragmenterTask->run();
2377}
2378
2379void KVBucket::runItemFreqDecayerTask() {
2380    itemFreqDecayerTask->run();
2381}
2382
2383bool KVBucket::runAccessScannerTask() {
2384    return ExecutorPool::get()->wake(accessScanner.task);
2385}
2386
2387void KVBucket::runVbStatePersistTask(int vbid) {
2388    scheduleVBStatePersist(vbid);
2389}
2390
2391bool KVBucket::compactionCanExpireItems() {
2392    // Process expired items only if memory usage is lesser than
2393    // compaction_exp_mem_threshold and disk queue is small
2394    // enough (marked by replication_throttle_queue_cap)
2395
2396    bool isMemoryUsageOk =
2397            (stats.getEstimatedTotalMemoryUsed() <
2398             (stats.getMaxDataSize() * compactionExpMemThreshold));
2399
2400    size_t queueSize = stats.diskQueueSize.load();
2401    bool isQueueSizeOk =
2402            ((stats.replicationThrottleWriteQueueCap == -1) ||
2403             (queueSize <
2404              static_cast<size_t>(stats.replicationThrottleWriteQueueCap)));
2405
2406    return (isMemoryUsageOk && isQueueSizeOk);
2407}
2408
2409void KVBucket::setCursorDroppingLowerUpperThresholds(size_t maxSize) {
2410    Configuration &config = engine.getConfiguration();
2411    stats.cursorDroppingLThreshold.store(static_cast<size_t>(maxSize *
2412                    ((double)(config.getCursorDroppingLowerMark()) / 100)));
2413    stats.cursorDroppingUThreshold.store(static_cast<size_t>(maxSize *
2414                    ((double)(config.getCursorDroppingUpperMark()) / 100)));
2415}
2416
2417size_t KVBucket::getActiveResidentRatio() const {
2418    return cachedResidentRatio.activeRatio.load();
2419}
2420
2421size_t KVBucket::getReplicaResidentRatio() const {
2422    return cachedResidentRatio.replicaRatio.load();
2423}
2424
2425ENGINE_ERROR_CODE KVBucket::forceMaxCas(uint16_t vbucket, uint64_t cas) {
2426    VBucketPtr vb = vbMap.getBucket(vbucket);
2427    if (vb) {
2428        vb->forceMaxCas(cas);
2429        return ENGINE_SUCCESS;
2430    }
2431    return ENGINE_NOT_MY_VBUCKET;
2432}
2433
2434std::ostream& operator<<(std::ostream& os, const KVBucket::Position& pos) {
2435    os << "vbucket:" << pos.vbucket_id;
2436    return os;
2437}
2438
2439void KVBucket::notifyFlusher(const uint16_t vbid) {
2440    KVShard* shard = vbMap.getShardByVbId(vbid);
2441    if (shard) {
2442        shard->getFlusher()->notifyFlushEvent();
2443    } else {
2444        throw std::logic_error(
2445                "KVBucket::notifyFlusher() : shard null for "
2446                "vbucket " +
2447                std::to_string(vbid));
2448    }
2449}
2450
2451void KVBucket::notifyReplication(const uint16_t vbid, const int64_t bySeqno) {
2452    engine.getDcpConnMap().notifyVBConnections(vbid, bySeqno);
2453}
2454
2455void KVBucket::initializeExpiryPager(Configuration& config) {
2456    {
2457        LockHolder elh(expiryPager.mutex);
2458        expiryPager.enabled = config.isExpPagerEnabled();
2459    }
2460
2461    setExpiryPagerSleeptime(config.getExpPagerStime());
2462
2463    config.addValueChangedListener(
2464            "exp_pager_stime",
2465            std::make_unique<EPStoreValueChangeListener>(*this));
2466    config.addValueChangedListener(
2467            "exp_pager_enabled",
2468            std::make_unique<EPStoreValueChangeListener>(*this));
2469    config.addValueChangedListener(
2470            "exp_pager_initial_run_time",
2471            std::make_unique<EPStoreValueChangeListener>(*this));
2472}
2473
2474cb::engine_error KVBucket::setCollections(cb::const_char_buffer json) {
2475    // cJSON can't accept a size so we must create a string
2476    std::string manifest(json.data(), json.size());
2477
2478    // Inhibit VB state changes whilst updating the vbuckets
2479    LockHolder lh(vbsetMutex);
2480
2481    return collectionsManager->update(*this, manifest);
2482}
2483
2484cb::EngineErrorStringPair KVBucket::getCollections() const {
2485    return collectionsManager->getManifest();
2486}
2487
2488const Collections::Manager& KVBucket::getCollectionsManager() const {
2489    return *collectionsManager.get();
2490}
2491
2492bool KVBucket::isXattrEnabled() const {
2493    return xattrEnabled;
2494}
2495
2496void KVBucket::setXattrEnabled(bool value) {
2497    xattrEnabled = value;
2498}
2499
2500bool KVBucket::collectionsEraseKey(
2501        uint16_t vbid,
2502        const DocKey key,
2503        int64_t bySeqno,
2504        bool deleted,
2505        Collections::VB::EraserContext& eraserContext) {
2506    auto vb = getVBucket(vbid);
2507    boost::optional<cb::const_char_buffer> completedCollection;
2508    if (!vb) {
2509        return false;
2510    }
2511
2512    // If the eraserContext consumes the key - just return
2513    // deleted keys can be ignored at this point
2514    if (!deleted && eraserContext.manageSeparator(key)) {
2515        return false;
2516    }
2517
2518    { // collections read lock scope
2519        auto collectionsRHandle = vb->lockCollections();
2520        if (collectionsRHandle.isLogicallyDeleted(
2521                    key, bySeqno, eraserContext.getSeparator())) {
2522            vb->removeKey(key, bySeqno);
2523
2524            // Update item count for non-system collections.
2525            switch (key.getDocNamespace()) {
2526            case DocNamespace::DefaultCollection:
2527            case DocNamespace::Collections:
2528                vb->decrNumTotalItems();
2529                break;
2530            case DocNamespace::System:
2531                break;
2532            }
2533        } else {
2534            return false;
2535        }
2536
2537        // Now determine if the key represents the end of a collection
2538        completedCollection = collectionsRHandle.shouldCompleteDeletion(key);
2539    } // read lock dropped as we may need the write lock in next block
2540
2541    // If a collection was returned, notify that all keys have been removed
2542    if (completedCollection.is_initialized()) {
2543        // completeDeletion obtains write access to the manifest
2544        vb->completeDeletion(completedCollection.get());
2545    }
2546    return true;
2547}
2548
2549std::chrono::seconds KVBucket::getMaxTtl() const {
2550    return std::chrono::seconds{maxTtl.load()};
2551}
2552
2553void KVBucket::setMaxTtl(size_t max) {
2554    maxTtl = max;
2555}
2556
2557uint16_t KVBucket::getNumOfVBucketsInState(vbucket_state_t state) const {
2558    return vbMap.getVBStateCount(state);
2559}
2560