xref: /4.6.4/ep-engine/src/ep.cc (revision 17d8153f)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string.h>
21#include <time.h>
22
23#include <fstream>
24#include <functional>
25#include <iostream>
26#include <map>
27#include <sstream>
28#include <string>
29#include <utility>
30#include <vector>
31
32#include "access_scanner.h"
33#include "bgfetcher.h"
34#include "checkpoint_remover.h"
35#include "conflict_resolution.h"
36#include "defragmenter.h"
37#include "ep.h"
38#include "ep_engine.h"
39#include "ext_meta_parser.h"
40#include "failover-table.h"
41#include "flusher.h"
42#include "htresizer.h"
43#include "kvshard.h"
44#include "kvstore.h"
45#include "locks.h"
46#include "mutation_log.h"
47#include "warmup.h"
48#include "connmap.h"
49#include "replicationthrottle.h"
50
51class StatsValueChangeListener : public ValueChangedListener {
52public:
53    StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
54        : stats(st), store(str) {
55        // EMPTY
56    }
57
58    virtual void sizeValueChanged(const std::string &key, size_t value) {
59        if (key.compare("max_size") == 0) {
60            stats.setMaxDataSize(value);
61            store.getEPEngine().getDcpConnMap(). \
62                                     updateMaxActiveSnoozingBackfills(value);
63            size_t low_wat = static_cast<size_t>
64                    (static_cast<double>(value) * stats.mem_low_wat_percent);
65            size_t high_wat = static_cast<size_t>
66                    (static_cast<double>(value) * stats.mem_high_wat_percent);
67            stats.mem_low_wat.store(low_wat);
68            stats.mem_high_wat.store(high_wat);
69            store.setCursorDroppingLowerUpperThresholds(value);
70        } else if (key.compare("mem_low_wat") == 0) {
71            stats.mem_low_wat.store(value);
72            stats.mem_low_wat_percent.store(
73                                    (double)(value) / stats.getMaxDataSize());
74        } else if (key.compare("mem_high_wat") == 0) {
75            stats.mem_high_wat.store(value);
76            stats.mem_high_wat_percent.store(
77                                    (double)(value) / stats.getMaxDataSize());
78        } else if (key.compare("replication_throttle_threshold") == 0) {
79            stats.replicationThrottleThreshold.store(
80                                          static_cast<double>(value) / 100.0);
81        } else if (key.compare("warmup_min_memory_threshold") == 0) {
82            stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
83        } else if (key.compare("warmup_min_items_threshold") == 0) {
84            stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
85        } else {
86            LOG(EXTENSION_LOG_WARNING,
87                "Failed to change value for unknown variable, %s\n",
88                key.c_str());
89        }
90    }
91
92private:
93    EPStats &stats;
94    EventuallyPersistentStore &store;
95};
96
97/**
98 * A configuration value changed listener that responds to ep-engine
99 * parameter changes by invoking engine-specific methods on
100 * configuration change events.
101 */
102class EPStoreValueChangeListener : public ValueChangedListener {
103public:
104    EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
105    }
106
107    virtual void sizeValueChanged(const std::string &key, size_t value) {
108        if (key.compare("bg_fetch_delay") == 0) {
109            store.setBGFetchDelay(static_cast<uint32_t>(value));
110        } else if (key.compare("compaction_write_queue_cap") == 0) {
111            store.setCompactionWriteQueueCap(value);
112        } else if (key.compare("exp_pager_stime") == 0) {
113            store.setExpiryPagerSleeptime(value);
114        } else if (key.compare("exp_pager_initial_run_time") == 0) {
115            store.setExpiryPagerTasktime(value);
116        } else if (key.compare("alog_sleep_time") == 0) {
117            store.setAccessScannerSleeptime(value, false);
118        } else if (key.compare("alog_task_time") == 0) {
119            store.resetAccessScannerStartTime();
120        } else if (key.compare("mutation_mem_threshold") == 0) {
121            double mem_threshold = static_cast<double>(value) / 100;
122            StoredValue::setMutationMemoryThreshold(mem_threshold);
123        } else if (key.compare("backfill_mem_threshold") == 0) {
124            double backfill_threshold = static_cast<double>(value) / 100;
125            store.setBackfillMemoryThreshold(backfill_threshold);
126        } else if (key.compare("compaction_exp_mem_threshold") == 0) {
127            store.setCompactionExpMemThreshold(value);
128        } else if (key.compare("replication_throttle_queue_cap") == 0) {
129            store.getEPEngine().getReplicationThrottle().setQueueCap(value);
130        } else if (key.compare("replication_throttle_cap_pcnt") == 0) {
131            store.getEPEngine().getReplicationThrottle().setCapPercent(value);
132        } else {
133            LOG(EXTENSION_LOG_WARNING,
134                "Failed to change value for unknown variable, %s\n",
135                key.c_str());
136        }
137    }
138
139    virtual void booleanValueChanged(const std::string &key, bool value) {
140        if (key.compare("access_scanner_enabled") == 0) {
141            if (value) {
142                store.enableAccessScannerTask();
143            } else {
144                store.disableAccessScannerTask();
145            }
146        } else if (key.compare("bfilter_enabled") == 0) {
147            store.setAllBloomFilters(value);
148        } else if (key.compare("exp_pager_enabled") == 0) {
149            if (value) {
150                store.enableExpiryPager();
151            } else {
152                store.disableExpiryPager();
153            }
154        }
155    }
156
157    virtual void floatValueChanged(const std::string &key, float value) {
158        if (key.compare("bfilter_residency_threshold") == 0) {
159            store.setBfiltersResidencyThreshold(value);
160        } else if (key.compare("dcp_min_compression_ratio") == 0) {
161            store.getEPEngine().updateDcpMinCompressionRatio(value);
162        }
163    }
164
165private:
166    EventuallyPersistentStore &store;
167};
168
169/**
170 * Callback class used by EpStore, for adding relevent keys
171 * to bloomfilter during compaction.
172 */
173class BloomFilterCallback : public Callback<std::string&, bool&> {
174public:
175    BloomFilterCallback(EventuallyPersistentStore& eps, uint16_t vbid,
176                        bool residentRatioAlert)
177        : store(eps), vbucketId(vbid),
178          residentRatioLessThanThreshold(residentRatioAlert) {
179    }
180
181    void callback(std::string& key, bool& isDeleted) {
182        RCPtr<VBucket> vb = store.getVBucket(vbucketId);
183        if (vb) {
184            if (vb->isTempFilterAvailable()) {
185                if (store.getItemEvictionPolicy() == VALUE_ONLY) {
186                    /**
187                     * VALUE-ONLY EVICTION POLICY
188                     * Consider deleted items only.
189                     */
190                    if (isDeleted) {
191                        vb->addToTempFilter(key);
192                    }
193                } else {
194                    /**
195                     * FULL EVICTION POLICY
196                     * If vbucket's resident ratio is found to be less than
197                     * the residency threshold, consider all items, otherwise
198                     * consider deleted and non-resident items only.
199                     */
200                    if (residentRatioLessThanThreshold) {
201                        vb->addToTempFilter(key);
202                    } else {
203                        if (isDeleted || !store.isMetaDataResident(vb, key)) {
204                            vb->addToTempFilter(key);
205                        }
206                    }
207                }
208            }
209        }
210    }
211
212private:
213    EventuallyPersistentStore& store;
214    uint16_t vbucketId;
215    bool residentRatioLessThanThreshold;
216};
217
218class VBucketMemoryDeletionTask : public GlobalTask {
219public:
220    VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
221                              RCPtr<VBucket> &vb, double delay) :
222                              GlobalTask(&eng,
223                              TaskId::VBucketMemoryDeletionTask, delay, true),
224                              e(eng), vbucket(vb), vbid(vb->getId()) { }
225
226    std::string getDescription() {
227        std::stringstream ss;
228        ss << "Removing (dead) vbucket " << vbid << " from memory";
229        return ss.str();
230    }
231
232    bool run(void) {
233        vbucket->notifyAllPendingConnsFailed(e);
234        vbucket->ht.clear();
235        vbucket.reset();
236        return false;
237    }
238
239private:
240    EventuallyPersistentEngine &e;
241    RCPtr<VBucket> vbucket;
242    uint16_t vbid;
243};
244
245class PendingOpsNotification : public GlobalTask {
246public:
247    PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
248        GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
249        engine(e), vbucket(vb) { }
250
251    std::string getDescription() {
252        std::stringstream ss;
253        ss << "Notify pending operations for vbucket " << vbucket->getId();
254        return ss.str();
255    }
256
257    bool run(void) {
258        vbucket->fireAllOps(engine);
259        return false;
260    }
261
262private:
263    EventuallyPersistentEngine &engine;
264    RCPtr<VBucket> vbucket;
265};
266
267EventuallyPersistentStore::EventuallyPersistentStore(
268    EventuallyPersistentEngine &theEngine) :
269    engine(theEngine), stats(engine.getEpStats()),
270    vbMap(theEngine.getConfiguration(), *this),
271    defragmenterTask(NULL),
272    bgFetchQueue(0),
273    diskFlushAll(false), bgFetchDelay(0),
274    backfillMemoryThreshold(0.95),
275    statsSnapshotTaskId(0), lastTransTimePerItem(0)
276{
277    cachedResidentRatio.activeRatio.store(0);
278    cachedResidentRatio.replicaRatio.store(0);
279
280    Configuration &config = engine.getConfiguration();
281    MutationLog *shardlog;
282    for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
283        std::stringstream s;
284        s << i;
285        shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
286                                 "." + s.str(),
287                                 engine.getConfiguration().getAlogBlockSize());
288        accessLog.push_back(shardlog);
289    }
290
291    storageProperties = new StorageProperties(true, true, true, true);
292
293    const auto size = GlobalTask::allTaskIds.size();
294    stats.schedulingHisto = new Histogram<ProcessClock::duration::rep>[size];
295    stats.taskRuntimeHisto = new Histogram<ProcessClock::duration::rep>[size];
296
297    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
298        stats.schedulingHisto[i].reset();
299        stats.taskRuntimeHisto[i].reset();
300    }
301
302    ExecutorPool::get()->registerTaskable(ObjectRegistry::getCurrentEngine()->getTaskable());
303
304    size_t num_vbs = config.getMaxVbuckets();
305    vb_mutexes = new Mutex[num_vbs];
306
307    *stats.memOverhead = sizeof(EventuallyPersistentStore);
308
309    if (config.getConflictResolutionType().compare("lww") == 0) {
310        conflictResolver.reset(new LastWriteWinsResolution());
311    } else {
312        conflictResolver.reset(new RevisionSeqnoResolution());
313    }
314
315    stats.setMaxDataSize(config.getMaxSize());
316    config.addValueChangedListener("max_size",
317                                   new StatsValueChangeListener(stats, *this));
318    getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(
319                                                        config.getMaxSize());
320
321    stats.mem_low_wat.store(config.getMemLowWat());
322    config.addValueChangedListener("mem_low_wat",
323                                   new StatsValueChangeListener(stats, *this));
324    stats.mem_low_wat_percent.store(
325                (double)(stats.mem_low_wat.load()) / stats.getMaxDataSize());
326
327    stats.mem_high_wat.store(config.getMemHighWat());
328    config.addValueChangedListener("mem_high_wat",
329                                   new StatsValueChangeListener(stats, *this));
330    stats.mem_high_wat_percent.store(
331                (double)(stats.mem_high_wat.load()) / stats.getMaxDataSize());
332
333    setCursorDroppingLowerUpperThresholds(config.getMaxSize());
334
335    stats.replicationThrottleThreshold.store(static_cast<double>
336                                    (config.getReplicationThrottleThreshold())
337                                     / 100.0);
338    config.addValueChangedListener("replication_throttle_threshold",
339                                   new StatsValueChangeListener(stats, *this));
340
341    stats.replicationThrottleWriteQueueCap.store(
342                                    config.getReplicationThrottleQueueCap());
343    config.addValueChangedListener("replication_throttle_queue_cap",
344                                   new EPStoreValueChangeListener(*this));
345    config.addValueChangedListener("replication_throttle_cap_pcnt",
346                                   new EPStoreValueChangeListener(*this));
347
348    setBGFetchDelay(config.getBgFetchDelay());
349    config.addValueChangedListener("bg_fetch_delay",
350                                   new EPStoreValueChangeListener(*this));
351
352    stats.warmupMemUsedCap.store(static_cast<double>
353                               (config.getWarmupMinMemoryThreshold()) / 100.0);
354    config.addValueChangedListener("warmup_min_memory_threshold",
355                                   new StatsValueChangeListener(stats, *this));
356    stats.warmupNumReadCap.store(static_cast<double>
357                                (config.getWarmupMinItemsThreshold()) / 100.0);
358    config.addValueChangedListener("warmup_min_items_threshold",
359                                   new StatsValueChangeListener(stats, *this));
360
361    double mem_threshold = static_cast<double>
362                                      (config.getMutationMemThreshold()) / 100;
363    StoredValue::setMutationMemoryThreshold(mem_threshold);
364    config.addValueChangedListener("mutation_mem_threshold",
365                                   new EPStoreValueChangeListener(*this));
366
367    double backfill_threshold = static_cast<double>
368                                      (config.getBackfillMemThreshold()) / 100;
369    setBackfillMemoryThreshold(backfill_threshold);
370    config.addValueChangedListener("backfill_mem_threshold",
371                                   new EPStoreValueChangeListener(*this));
372
373    config.addValueChangedListener("bfilter_enabled",
374                                   new EPStoreValueChangeListener(*this));
375
376    bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
377    config.addValueChangedListener("bfilter_residency_threshold",
378                                   new EPStoreValueChangeListener(*this));
379
380    compactionExpMemThreshold = config.getCompactionExpMemThreshold();
381    config.addValueChangedListener("compaction_exp_mem_threshold",
382                                   new EPStoreValueChangeListener(*this));
383
384    compactionWriteQueueCap = config.getCompactionWriteQueueCap();
385    config.addValueChangedListener("compaction_write_queue_cap",
386                                   new EPStoreValueChangeListener(*this));
387
388    config.addValueChangedListener("dcp_min_compression_ratio",
389                                   new EPStoreValueChangeListener(*this));
390
391    const std::string &policy = config.getItemEvictionPolicy();
392    if (policy.compare("value_only") == 0) {
393        eviction_policy = VALUE_ONLY;
394    } else {
395        eviction_policy = FULL_EVICTION;
396    }
397
398    warmupTask = new Warmup(*this, config);
399}
400
401bool EventuallyPersistentStore::initialize() {
402    // We should nuke everything unless we want warmup
403    Configuration &config = engine.getConfiguration();
404    if (!config.isWarmup()) {
405        reset();
406    }
407
408    if (!startFlusher()) {
409        LOG(EXTENSION_LOG_WARNING,
410            "FATAL: Failed to create and start flushers");
411        return false;
412    }
413    if (!startBgFetcher()) {
414        LOG(EXTENSION_LOG_WARNING,
415           "FATAL: Failed to create and start bgfetchers");
416        return false;
417    }
418
419    warmupTask->start();
420
421    itmpTask = new ItemPager(&engine, stats);
422    ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
423
424    LockHolder elh(expiryPager.mutex);
425    expiryPager.enabled = config.isExpPagerEnabled();
426    elh.unlock();
427
428    size_t expiryPagerSleeptime = config.getExpPagerStime();
429    setExpiryPagerSleeptime(expiryPagerSleeptime);
430    config.addValueChangedListener("exp_pager_stime",
431                                   new EPStoreValueChangeListener(*this));
432    config.addValueChangedListener("exp_pager_enabled",
433                                   new EPStoreValueChangeListener(*this));
434    config.addValueChangedListener("exp_pager_initial_run_time",
435                                   new EPStoreValueChangeListener(*this));
436
437    ExTask htrTask = new HashtableResizerTask(this, 10);
438    ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
439
440    size_t checkpointRemoverInterval = config.getChkRemoverStime();
441    chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
442                                                   checkpointRemoverInterval);
443    ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
444
445    ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
446    ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
447
448#if HAVE_JEMALLOC
449    /* Only create the defragmenter task if we have an underlying memory
450     * allocator which can facilitate defragmenting memory.
451     */
452    defragmenterTask = new DefragmenterTask(&engine, stats);
453    ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
454#endif
455
456    return true;
457}
458
459EventuallyPersistentStore::~EventuallyPersistentStore() {
460    stopWarmup();
461    stopBgFetcher();
462    ExecutorPool::get()->stopTaskGroup(engine.getTaskable().getGID(), NONIO_TASK_IDX,
463                                       stats.forceShutdown);
464
465    ExecutorPool::get()->cancel(statsSnapshotTaskId);
466
467    LockHolder lh(accessScanner.mutex);
468    ExecutorPool::get()->cancel(accessScanner.task);
469    lh.unlock();
470
471    stopFlusher();
472
473    ExecutorPool::get()->unregisterTaskable(engine.getTaskable(),
474                                            stats.forceShutdown);
475
476    delete [] vb_mutexes;
477    delete [] stats.schedulingHisto;
478    delete [] stats.taskRuntimeHisto;
479    delete warmupTask;
480    delete storageProperties;
481    defragmenterTask.reset();
482
483    std::vector<MutationLog*>::iterator it;
484    for (it = accessLog.begin(); it != accessLog.end(); it++) {
485        delete *it;
486    }
487}
488
489const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
490    return vbMap.shards[shardId]->getFlusher();
491}
492
493uint16_t EventuallyPersistentStore::getCommitInterval(uint16_t shardId) {
494    Flusher *flusher = vbMap.shards[shardId]->getFlusher();
495    return flusher->getCommitInterval();
496}
497
498uint16_t EventuallyPersistentStore::decrCommitInterval(uint16_t shardId) {
499    Flusher *flusher = vbMap.shards[shardId]->getFlusher();
500    return flusher->decrCommitInterval();
501}
502
503Warmup* EventuallyPersistentStore::getWarmup(void) const {
504    return warmupTask;
505}
506
507bool EventuallyPersistentStore::startFlusher() {
508    for (auto* shard : vbMap.shards) {
509        shard->getFlusher()->start();
510    }
511    return true;
512}
513
514void EventuallyPersistentStore::stopFlusher() {
515    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
516        Flusher *flusher = vbMap.shards[i]->getFlusher();
517        LOG(EXTENSION_LOG_NOTICE, "Attempting to stop the flusher for "
518            "shard:%" PRIu16, i);
519        bool rv = flusher->stop(stats.forceShutdown);
520        if (rv && !stats.forceShutdown) {
521            flusher->wait();
522        }
523    }
524}
525
526bool EventuallyPersistentStore::pauseFlusher() {
527    bool rv = true;
528    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
529        Flusher *flusher = vbMap.shards[i]->getFlusher();
530        if (!flusher->pause()) {
531            LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
532                "[%s], shard = %d", flusher->stateName(), i);
533            rv = false;
534        }
535    }
536    return rv;
537}
538
539bool EventuallyPersistentStore::resumeFlusher() {
540    bool rv = true;
541    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
542        Flusher *flusher = vbMap.shards[i]->getFlusher();
543        if (!flusher->resume()) {
544            LOG(EXTENSION_LOG_WARNING,
545                    "Attempted to resume flusher in state [%s], "
546                    "shard = %d", flusher->stateName(), i);
547            rv = false;
548        }
549    }
550    return rv;
551}
552
553void EventuallyPersistentStore::wakeUpFlusher() {
554    if (stats.diskQueueSize.load() == 0) {
555        for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
556            Flusher *flusher = vbMap.shards[i]->getFlusher();
557            flusher->wake();
558        }
559    }
560}
561
562bool EventuallyPersistentStore::startBgFetcher() {
563    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
564        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
565        if (bgfetcher == NULL) {
566            LOG(EXTENSION_LOG_WARNING,
567                "Failed to start bg fetcher for shard %d", i);
568            return false;
569        }
570        bgfetcher->start();
571    }
572    return true;
573}
574
575void EventuallyPersistentStore::stopBgFetcher() {
576    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
577        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
578        if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
579            LOG(EXTENSION_LOG_WARNING,
580                "Shutting down engine while there are still pending data "
581                "read for shard %d from database storage", i);
582        }
583        LOG(EXTENSION_LOG_NOTICE, "Stopping bg fetcher for shard:%" PRIu16, i);
584        bgfetcher->stop();
585    }
586}
587
588void
589EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
590                                             time_t startTime,
591                                             uint64_t revSeqno,
592                                             exp_type_t source) {
593    RCPtr<VBucket> vb = getVBucket(vbid);
594    if (vb) {
595        // Obtain reader access to the VB state change lock so that
596        // the VB can't switch state whilst we're processing
597        ReaderLockHolder rlh(vb->getStateLock());
598        if (vb->getState() == vbucket_state_active) {
599            int bucket_num(0);
600            LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
601            StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
602            if (v) {
603                if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
604                    // This is a temporary item whose background fetch for metadata
605                    // has completed.
606                    bool deleted = vb->ht.unlocked_del(key, bucket_num);
607                    if (!deleted) {
608                        throw std::logic_error("EPStore::deleteExpiredItem: "
609                                "Failed to delete key '" + key + "' from bucket "
610                                + std::to_string(bucket_num));
611                    }
612                } else if (v->isExpired(startTime) && !v->isDeleted()) {
613                    vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
614                    queueDirty(vb, v, &lh, NULL);
615                }
616            } else {
617                if (eviction_policy == FULL_EVICTION) {
618                    // Create a temp item and delete and push it
619                    // into the checkpoint queue, only if the bloomfilter
620                    // predicts that the item may exist on disk.
621                    if (vb->maybeKeyExistsInFilter(key)) {
622                        add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
623                                                                    eviction_policy);
624                        if (rv == ADD_NOMEM) {
625                            return;
626                        }
627                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
628                        v->setDeleted();
629                        v->setRevSeqno(revSeqno);
630                        vb->ht.unlocked_softDelete(v, 0, eviction_policy);
631                        queueDirty(vb, v, &lh, NULL);
632                    }
633                }
634            }
635            incExpirationStat(vb, source);
636        }
637    }
638}
639
640void
641EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
642                                                        std::string> > &keys,
643                                              exp_type_t source) {
644    std::list<std::pair<uint16_t, std::string> >::iterator it;
645    time_t startTime = ep_real_time();
646    for (it = keys.begin(); it != keys.end(); it++) {
647        deleteExpiredItem(it->first, it->second, startTime, 0, source);
648    }
649}
650
651StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
652                                                        const const_sized_buffer key,
653                                                        int bucket_num,
654                                                        bool wantDeleted,
655                                                        bool trackReference,
656                                                        bool queueExpired) {
657    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
658                                          trackReference);
659    if (v && !v->isDeleted() && !v->isTempItem()) {
660        // In the deleted case, we ignore expiration time.
661        if (v->isExpired(ep_real_time())) {
662            if (vb->getState() != vbucket_state_active) {
663                return wantDeleted ? v : NULL;
664            }
665
666            // queueDirty only allowed on active VB
667            if (queueExpired && vb->getState() == vbucket_state_active) {
668                incExpirationStat(vb, EXP_BY_ACCESS);
669                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
670                queueDirty(vb, v, NULL, NULL);
671            }
672            return wantDeleted ? v : NULL;
673        }
674    }
675    return v;
676}
677
678bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
679                                                   const std::string &key) {
680
681    if (!vb) {
682        throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL");
683    }
684
685    int bucket_num(0);
686    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
687    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
688
689    if (v && !v->isTempItem()) {
690        return true;
691    } else {
692        return false;
693    }
694}
695
696protocol_binary_response_status EventuallyPersistentStore::evictKey(
697                                                        const std::string &key,
698                                                        uint16_t vbucket,
699                                                        const char **msg,
700                                                        size_t *msg_size,
701                                                        bool force) {
702    RCPtr<VBucket> vb = getVBucket(vbucket);
703    if (!vb || (vb->getState() != vbucket_state_active && !force)) {
704        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
705    }
706
707    int bucket_num(0);
708    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
709    StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
710
711    protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
712
713    *msg_size = 0;
714    if (v) {
715        if (force)  {
716            v->markClean();
717        }
718        if (v->isResident()) {
719            if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
720                *msg = "Ejected.";
721
722                // Add key to bloom filter incase of full eviction mode
723                if (getItemEvictionPolicy() == FULL_EVICTION) {
724                    vb->addToFilter(key);
725                }
726            } else {
727                *msg = "Can't eject: Dirty object.";
728                rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
729            }
730        } else {
731            *msg = "Already ejected.";
732        }
733    } else {
734        if (eviction_policy == VALUE_ONLY) {
735            *msg = "Not found.";
736            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
737        } else {
738            *msg = "Already ejected.";
739        }
740    }
741
742    return rv;
743}
744
745ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
746                                                        LockHolder &lock,
747                                                        int bucket_num,
748                                                        const const_sized_buffer key,
749                                                        RCPtr<VBucket> &vb,
750                                                        const void *cookie,
751                                                        bool metadataOnly,
752                                                        bool isReplication) {
753
754    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
755                                                eviction_policy,
756                                                isReplication);
757    switch(rv) {
758        case ADD_NOMEM:
759            return ENGINE_ENOMEM;
760
761        case ADD_EXISTS:
762        case ADD_UNDEL:
763        case ADD_SUCCESS:
764        case ADD_TMP_AND_BG_FETCH:
765            // Since the hashtable bucket is locked, we shouldn't get here
766            throw std::logic_error("EventuallyPersistentStore::addTempItemForBgFetch: "
767                    "Invalid result from addTempItem: " + std::to_string(rv));
768
769        case ADD_BG_FETCH:
770            lock.unlock();
771            bgFetch(key, vb->getId(), cookie, metadataOnly);
772    }
773    return ENGINE_EWOULDBLOCK;
774}
775
776ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
777                                                 const void *cookie,
778                                                 bool force,
779                                                 uint8_t nru) {
780
781    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
782    if (!vb) {
783        ++stats.numNotMyVBuckets;
784        return ENGINE_NOT_MY_VBUCKET;
785    }
786
787    // Obtain read-lock on VB state to ensure VB state changes are interlocked
788    // with this set
789    ReaderLockHolder rlh(vb->getStateLock());
790    if (vb->getState() == vbucket_state_dead) {
791        ++stats.numNotMyVBuckets;
792        return ENGINE_NOT_MY_VBUCKET;
793    } else if (vb->getState() == vbucket_state_replica && !force) {
794        ++stats.numNotMyVBuckets;
795        return ENGINE_NOT_MY_VBUCKET;
796    } else if (vb->getState() == vbucket_state_pending && !force) {
797        if (vb->addPendingOp(cookie)) {
798            return ENGINE_EWOULDBLOCK;
799        }
800    } else if (vb->isTakeoverBackedUp()) {
801        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a set op"
802                ", becuase takeover is lagging", vb->getId());
803        return ENGINE_TMPFAIL;
804    }
805
806    bool cas_op = (itm.getCas() != 0);
807    int bucket_num(0);
808    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
809    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num,
810                                          /*wantsDeleted*/true,
811                                          /*trackReference*/false);
812    if (v && v->isLocked(ep_current_time()) &&
813        (vb->getState() == vbucket_state_replica ||
814         vb->getState() == vbucket_state_pending)) {
815        v->unlock();
816    }
817
818    bool maybeKeyExists = true;
819    // If we didn't find a valid item, check Bloomfilter's prediction if in
820    // full eviction policy and for a CAS operation.
821    if ((v == nullptr || v->isTempInitialItem()) &&
822        (eviction_policy == FULL_EVICTION) &&
823        (itm.getCas() != 0)) {
824        // Check Bloomfilter's prediction
825        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
826            maybeKeyExists = false;
827        }
828    }
829
830    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
831                                                eviction_policy, nru,
832                                                maybeKeyExists);
833
834    Item& it = const_cast<Item&>(itm);
835    uint64_t seqno = 0;
836    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
837    switch (mtype) {
838    case NOMEM:
839        ret = ENGINE_ENOMEM;
840        break;
841    case INVALID_CAS:
842    case IS_LOCKED:
843        ret = ENGINE_KEY_EEXISTS;
844        break;
845    case NOT_FOUND:
846        if (cas_op) {
847            ret = ENGINE_KEY_ENOENT;
848            break;
849        }
850        // FALLTHROUGH
851    case WAS_DIRTY:
852        // Even if the item was dirty, push it into the vbucket's open
853        // checkpoint.
854    case WAS_CLEAN:
855        // We keep lh held as we need to do v->getCas()
856        queueDirty(vb, v, nullptr, &seqno);
857        it.setBySeqno(seqno);
858        it.setCas(v->getCas());
859        break;
860    case NEED_BG_FETCH:
861    {   // CAS operation with non-resident item + full eviction.
862        if (v) {
863            // temp item is already created. Simply schedule a bg fetch job
864            lh.unlock();
865            bgFetch(itm.getKey(), vb->getId(), cookie, true);
866            return ENGINE_EWOULDBLOCK;
867        }
868        ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
869                                    cookie, true);
870        break;
871    }
872    case INVALID_VBUCKET:
873        ret = ENGINE_NOT_MY_VBUCKET;
874        break;
875    }
876
877    return ret;
878}
879
880ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
881                                                 const void *cookie)
882{
883    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
884    if (!vb) {
885        ++stats.numNotMyVBuckets;
886        return ENGINE_NOT_MY_VBUCKET;
887    }
888
889    // Obtain read-lock on VB state to ensure VB state changes are interlocked
890    // with this add
891    ReaderLockHolder rlh(vb->getStateLock());
892    if (vb->getState() == vbucket_state_dead ||
893        vb->getState() == vbucket_state_replica) {
894        ++stats.numNotMyVBuckets;
895        return ENGINE_NOT_MY_VBUCKET;
896    } else if (vb->getState() == vbucket_state_pending) {
897        if (vb->addPendingOp(cookie)) {
898            return ENGINE_EWOULDBLOCK;
899        }
900    } else if (vb->isTakeoverBackedUp()) {
901        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a add op"
902                ", becuase takeover is lagging", vb->getId());
903        return ENGINE_TMPFAIL;
904    }
905
906    if (itm.getCas() != 0) {
907        // Adding with a cas value doesn't make sense..
908        return ENGINE_NOT_STORED;
909    }
910
911    int bucket_num(0);
912    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
913    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
914                                          false);
915
916    bool maybeKeyExists = true;
917    if ((v == nullptr || v->isTempInitialItem()) &&
918        (eviction_policy == FULL_EVICTION)) {
919        // Check bloomfilter's prediction
920        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
921            maybeKeyExists = false;
922        }
923    }
924
925    add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
926                                           eviction_policy,
927                                           true, true,
928                                           maybeKeyExists);
929
930    Item& it = const_cast<Item&>(itm);
931    uint64_t seqno = 0;
932    switch (atype) {
933    case ADD_NOMEM:
934        return ENGINE_ENOMEM;
935    case ADD_EXISTS:
936        return ENGINE_NOT_STORED;
937    case ADD_TMP_AND_BG_FETCH:
938        return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
939                                     cookie, true);
940    case ADD_BG_FETCH:
941        lh.unlock();
942        bgFetch(it.getKey(), vb->getId(), cookie, true);
943        return ENGINE_EWOULDBLOCK;
944    case ADD_SUCCESS:
945    case ADD_UNDEL:
946        // We need to keep lh as we will do v->getCas()
947        queueDirty(vb, v, nullptr, &seqno);
948        it.setBySeqno(seqno);
949        it.setCas(v->getCas());
950        break;
951    }
952
953    return ENGINE_SUCCESS;
954}
955
956ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
957                                                     const void *cookie) {
958    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
959    if (!vb) {
960        ++stats.numNotMyVBuckets;
961        return ENGINE_NOT_MY_VBUCKET;
962    }
963
964    // Obtain read-lock on VB state to ensure VB state changes are interlocked
965    // with this replace
966    ReaderLockHolder rlh(vb->getStateLock());
967    if (vb->getState() == vbucket_state_dead ||
968        vb->getState() == vbucket_state_replica) {
969        ++stats.numNotMyVBuckets;
970        return ENGINE_NOT_MY_VBUCKET;
971    } else if (vb->getState() == vbucket_state_pending) {
972        if (vb->addPendingOp(cookie)) {
973            return ENGINE_EWOULDBLOCK;
974        }
975    }
976
977    int bucket_num(0);
978    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
979    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
980                                          false);
981    if (v) {
982        if (v->isDeleted() || v->isTempDeletedItem() ||
983            v->isTempNonExistentItem()) {
984            return ENGINE_KEY_ENOENT;
985        }
986
987        mutation_type_t mtype;
988        if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
989            mtype = NEED_BG_FETCH;
990        } else {
991            mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
992                                        0xff);
993        }
994
995        Item& it = const_cast<Item&>(itm);
996        uint64_t seqno = 0;
997        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
998        switch (mtype) {
999            case NOMEM:
1000                ret = ENGINE_ENOMEM;
1001                break;
1002            case IS_LOCKED:
1003                ret = ENGINE_KEY_EEXISTS;
1004                break;
1005            case INVALID_CAS:
1006            case NOT_FOUND:
1007                ret = ENGINE_NOT_STORED;
1008                break;
1009                // FALLTHROUGH
1010            case WAS_DIRTY:
1011                // Even if the item was dirty, push it into the vbucket's open
1012                // checkpoint.
1013            case WAS_CLEAN:
1014                // Keep lh as we need to do v->getCas()
1015                queueDirty(vb, v, nullptr, &seqno);
1016                it.setBySeqno(seqno);
1017                it.setCas(v->getCas());
1018                break;
1019            case NEED_BG_FETCH:
1020            {
1021                // temp item is already created. Simply schedule a bg fetch job
1022                lh.unlock();
1023                bgFetch(it.getKey(), vb->getId(), cookie, true);
1024                ret = ENGINE_EWOULDBLOCK;
1025                break;
1026            }
1027            case INVALID_VBUCKET:
1028                ret = ENGINE_NOT_MY_VBUCKET;
1029                break;
1030        }
1031
1032        return ret;
1033    } else {
1034        if (eviction_policy == VALUE_ONLY) {
1035            return ENGINE_KEY_ENOENT;
1036        }
1037
1038        if (vb->maybeKeyExistsInFilter(itm.getKey())) {
1039            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1040                                         cookie, false);
1041        } else {
1042            // As bloomfilter predicted that item surely doesn't exist
1043            // on disk, return ENOENT for replace().
1044            return ENGINE_KEY_ENOENT;
1045        }
1046    }
1047}
1048
1049ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
1050                                                        const Item &itm,
1051                                                        uint8_t nru,
1052                                                        bool genBySeqno,
1053                                                        ExtendedMetaData *emd) {
1054
1055    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1056    if (!vb) {
1057        ++stats.numNotMyVBuckets;
1058        return ENGINE_NOT_MY_VBUCKET;
1059    }
1060
1061    // Obtain read-lock on VB state to ensure VB state changes are interlocked
1062    // with this add-tapbackfill
1063    ReaderLockHolder rlh(vb->getStateLock());
1064    if (vb->getState() == vbucket_state_dead ||
1065        vb->getState() == vbucket_state_active) {
1066        ++stats.numNotMyVBuckets;
1067        return ENGINE_NOT_MY_VBUCKET;
1068    }
1069
1070    //check for the incoming item's CAS validity
1071    if (!Item::isValidCas(itm.getCas())) {
1072        return ENGINE_KEY_EEXISTS;
1073    }
1074
1075    int bucket_num(0);
1076    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1077    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1078                                          false);
1079
1080    // Note that this function is only called on replica or pending vbuckets.
1081    if (v && v->isLocked(ep_current_time())) {
1082        v->unlock();
1083    }
1084    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
1085                                                eviction_policy, nru);
1086
1087    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1088    switch (mtype) {
1089    case NOMEM:
1090        ret = ENGINE_ENOMEM;
1091        break;
1092    case INVALID_CAS:
1093    case IS_LOCKED:
1094        ret = ENGINE_KEY_EEXISTS;
1095        break;
1096    case WAS_DIRTY:
1097        // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
1098        // set correctly, and also the sequence numbers are ordered correctly.
1099        // (MB-14003)
1100    case NOT_FOUND:
1101        // FALLTHROUGH
1102    case WAS_CLEAN:
1103        vb->setMaxCas(v->getCas());
1104        tapQueueDirty(*vb, v, lh, NULL,
1105                      genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No);
1106        break;
1107    case INVALID_VBUCKET:
1108        ret = ENGINE_NOT_MY_VBUCKET;
1109        break;
1110    case NEED_BG_FETCH:
1111        throw std::logic_error("EventuallyPersistentStore::addTAPBackfillItem: "
1112                "SET on a non-active vbucket should not require a "
1113                "bg_metadata_fetch.");
1114    }
1115
1116    return ret;
1117}
1118
1119class KVStatsCallback : public Callback<kvstats_ctx> {
1120    public:
1121        KVStatsCallback(EventuallyPersistentStore *store)
1122            : epstore(store) { }
1123
1124       void callback(kvstats_ctx &ctx) {
1125            RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
1126            if (vb) {
1127                vb->fileSpaceUsed = ctx.fileSpaceUsed;
1128                vb->fileSize = ctx.fileSize;
1129            }
1130        }
1131
1132    private:
1133        EventuallyPersistentStore *epstore;
1134};
1135
1136ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1137                                                           vbucket_state_t to,
1138                                                           bool transfer,
1139                                                           bool notify_dcp) {
1140    LockHolder lh(vbsetMutex);
1141    return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
1142}
1143
1144ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState_UNLOCKED(uint16_t vbid,
1145                                                           vbucket_state_t to,
1146                                                           bool transfer,
1147                                                           bool notify_dcp,
1148                                                           LockHolder& vbset) {
1149    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1150    if (vb && to == vb->getState()) {
1151        return ENGINE_SUCCESS;
1152    }
1153
1154    if (vb) {
1155        vbucket_state_t oldstate = vb->getState();
1156
1157        vb->setState(to);
1158
1159        if (oldstate != to && notify_dcp) {
1160            bool closeInboundStreams = false;
1161            if (to == vbucket_state_active && !transfer) {
1162                /**
1163                 * Close inbound (passive) streams into the vbucket
1164                 * only in case of a failover.
1165                 */
1166                closeInboundStreams = true;
1167            }
1168            engine.getDcpConnMap().vbucketStateChanged(vbid, to,
1169                                                       closeInboundStreams);
1170        }
1171
1172        if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
1173            /**
1174             * Update snapshot range when vbucket goes from being a replica
1175             * to active, to maintain the correct snapshot sequence numbers
1176             * even in a failover scenario.
1177             */
1178            vb->checkpointManager.resetSnapshotRange();
1179        }
1180
1181        if (to == vbucket_state_active && !transfer) {
1182            const snapshot_range_t range = vb->getPersistedSnapshot();
1183            if (range.end == vbMap.getPersistenceSeqno(vbid)) {
1184                vb->failovers->createEntry(range.end);
1185            } else {
1186                vb->failovers->createEntry(range.start);
1187            }
1188        }
1189
1190        if (oldstate == vbucket_state_pending &&
1191            to == vbucket_state_active) {
1192            ExTask notifyTask = new PendingOpsNotification(engine, vb);
1193            ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1194        }
1195        scheduleVBStatePersist(vbid);
1196    } else if (vbid < vbMap.getSize()) {
1197        FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1198        KVShard* shard = vbMap.getShardByVbId(vbid);
1199        std::shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
1200        Configuration& config = engine.getConfiguration();
1201        RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1202                                         engine.getCheckpointConfig(),
1203                                         shard, 0, 0, 0, ft, cb,
1204                                         config));
1205
1206        if (config.isBfilterEnabled()) {
1207            // Initialize bloom filters upon vbucket creation during
1208            // bucket creation and rebalance
1209            newvb->createFilter(config.getBfilterKeyCount(),
1210                                config.getBfilterFpProb());
1211        }
1212
1213        // The first checkpoint for active vbucket should start with id 2.
1214        uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1215        newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1216        if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1217            return ENGINE_ERANGE;
1218        }
1219        vbMap.setPersistenceCheckpointId(vbid, 0);
1220        vbMap.setPersistenceSeqno(vbid, 0);
1221        vbMap.setBucketCreation(vbid, true);
1222        scheduleVBStatePersist(vbid);
1223    } else {
1224        return ENGINE_ERANGE;
1225    }
1226    return ENGINE_SUCCESS;
1227}
1228
1229void EventuallyPersistentStore::scheduleVBStatePersist() {
1230    for (auto vbid : vbMap.getBuckets()) {
1231        scheduleVBStatePersist(vbid);
1232    }
1233}
1234
1235void EventuallyPersistentStore::scheduleVBStatePersist(VBucket::id_type vbid) {
1236    RCPtr<VBucket> vb = getVBucket(vbid);
1237
1238    if (!vb) {
1239        LOG(EXTENSION_LOG_WARNING,
1240            "EPStore::scheduleVBStatePersist: vb:%" PRIu16
1241            " does not not exist. Unable to schedule persistence.", vbid);
1242        return;
1243    }
1244
1245    vb->checkpointManager.queueSetVBState(*vb);
1246}
1247
1248bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1249                                                        const void* cookie) {
1250    LockHolder lh(vbsetMutex);
1251
1252    hrtime_t start_time(gethrtime());
1253    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1254    if (!vb || vb->getState() == vbucket_state_dead ||
1255         vbMap.isBucketDeletion(vbid)) {
1256        lh.unlock();
1257        LockHolder vlh(vb_mutexes[vbid]);
1258        getRWUnderlying(vbid)->delVBucket(vbid);
1259        vbMap.setBucketDeletion(vbid, false);
1260        vbMap.setBucketCreation(vbid, false);
1261        vbMap.setPersistenceSeqno(vbid, 0);
1262        ++stats.vbucketDeletions;
1263    }
1264
1265    hrtime_t spent(gethrtime() - start_time);
1266    hrtime_t wall_time = spent / 1000;
1267    BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1268    stats.diskVBDelHisto.add(wall_time);
1269    atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1270    stats.vbucketDelTotWalltime.fetch_add(wall_time);
1271    if (cookie) {
1272        engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1273    }
1274
1275    return true;
1276}
1277
1278void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1279                                                   const void* cookie,
1280                                                   double delay) {
1281    ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1282    ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1283
1284    if (vbMap.setBucketDeletion(vb->getId(), true)) {
1285        ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
1286        ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1287    }
1288}
1289
1290ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1291                                                           const void* c) {
1292    // Lock to prevent a race condition between a failed update and add
1293    // (and delete).
1294    LockHolder lh(vbsetMutex);
1295
1296    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1297    if (!vb) {
1298        return ENGINE_NOT_MY_VBUCKET;
1299    }
1300
1301    vb->setState(vbucket_state_dead);
1302    engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1303    vbMap.removeBucket(vbid);
1304    lh.unlock();
1305    scheduleVBDeletion(vb, c);
1306    if (c) {
1307        return ENGINE_EWOULDBLOCK;
1308    }
1309    return ENGINE_SUCCESS;
1310}
1311
1312ENGINE_ERROR_CODE EventuallyPersistentStore::checkForDBExistence(DBFileId db_file_id) {
1313    std::string backend = engine.getConfiguration().getBackend();
1314    if (backend.compare("couchdb") == 0) {
1315        RCPtr<VBucket> vb = vbMap.getBucket(db_file_id);
1316        if (!vb) {
1317            return ENGINE_NOT_MY_VBUCKET;
1318        }
1319    } else if (backend.compare("forestdb") == 0) {
1320        if (db_file_id > (vbMap.getNumShards() - 1)) {
1321            //TODO: find a better error code
1322            return ENGINE_EINVAL;
1323        }
1324    } else {
1325        LOG(EXTENSION_LOG_WARNING,
1326            "Unknown backend specified for db file id: %d", db_file_id);
1327        return ENGINE_FAILED;
1328    }
1329
1330    return ENGINE_SUCCESS;
1331}
1332
1333ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1334                                                       compaction_ctx c,
1335                                                       const void *cookie) {
1336    ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
1337    if (errCode != ENGINE_SUCCESS) {
1338        return errCode;
1339    }
1340
1341    /* Obtain the vbucket so we can get the previous purge seqno */
1342    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1343    if (!vb) {
1344        return ENGINE_NOT_MY_VBUCKET;
1345    }
1346
1347    /* Update the compaction ctx with the previous purge seqno */
1348    c.max_purged_seq = vb->getPurgeSeqno();
1349
1350    LockHolder lh(compactionLock);
1351    ExTask task = new CompactTask(&engine, c, cookie);
1352    compactionTasks.push_back(std::make_pair(c.db_file_id, task));
1353    if (compactionTasks.size() > 1) {
1354        if ((stats.diskQueueSize > compactionWriteQueueCap &&
1355            compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1356            engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1357            // Snooze a new compaction task.
1358            // We will wake it up when one of the existing compaction tasks is done.
1359            task->snooze(60);
1360        }
1361    }
1362
1363    ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1364
1365    LOG(EXTENSION_LOG_DEBUG,
1366        "Scheduled compaction task %" PRIu64 " on db %d,"
1367        "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64
1368        ", dropdeletes = %d",
1369        uint64_t(task->getId()),c.db_file_id, c.purge_before_ts,
1370        c.purge_before_seq, c.drop_deletes);
1371
1372   return ENGINE_EWOULDBLOCK;
1373}
1374
1375class ExpiredItemsCallback : public Callback<std::string&, uint64_t&> {
1376    public:
1377        ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid,
1378                             time_t start)
1379            : epstore(store), vbucket(vbid), startTime(start) { }
1380
1381        void callback(std::string& key, uint64_t& revSeqno) {
1382            if (epstore->compactionCanExpireItems()) {
1383                epstore->deleteExpiredItem(vbucket, key, startTime, revSeqno,
1384                                           EXP_BY_COMPACTOR);
1385            }
1386        }
1387
1388    private:
1389        EventuallyPersistentStore *epstore;
1390        uint16_t vbucket;
1391        time_t startTime;
1392};
1393
1394bool EventuallyPersistentStore::doCompact(compaction_ctx *ctx,
1395                                          const void *cookie) {
1396    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1397    const uint16_t vbid = ctx->db_file_id;
1398    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1399    if (vb) {
1400        LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1401        if (!lh.islocked()) {
1402            return true; // Schedule a compaction task again.
1403        }
1404
1405        Configuration &config = getEPEngine().getConfiguration();
1406        if (config.isBfilterEnabled()) {
1407            size_t initial_estimation = config.getBfilterKeyCount();
1408            size_t estimated_count;
1409            size_t num_deletes =
1410                    getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
1411            if (eviction_policy == VALUE_ONLY) {
1412                /**
1413                 * VALUE-ONLY EVICTION POLICY
1414                 * Obtain number of persisted deletes from underlying kvstore.
1415                 * Bloomfilter's estimated_key_count = 1.25 * deletes
1416                 */
1417
1418                estimated_count = round(1.25 * num_deletes);
1419                std::shared_ptr<Callback<std::string&, bool&> >
1420                    filter(new BloomFilterCallback(*this, vbid, false));
1421                ctx->bloomFilterCallback = filter;
1422            } else {
1423                /**
1424                 * FULL EVICTION POLICY
1425                 * First determine if the resident ratio of vbucket is less than
1426                 * the threshold from configuration.
1427                 */
1428
1429                bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
1430                                                getBfiltersResidencyThreshold(),
1431                                                eviction_policy);
1432                std::shared_ptr<Callback<std::string&, bool&> >
1433                    filter(new BloomFilterCallback(*this, vbid, residentRatioAlert));
1434                ctx->bloomFilterCallback = filter;
1435
1436                /**
1437                 * Based on resident ratio against threshold, estimate count.
1438                 *
1439                 * 1. If resident ratio is greater than the threshold:
1440                 * Obtain number of persisted deletes from underlying kvstore.
1441                 * Obtain number of non-resident-items for vbucket.
1442                 * Bloomfilter's estimated_key_count =
1443                 *                              1.25 * (deletes + non-resident)
1444                 *
1445                 * 2. Otherwise:
1446                 * Obtain number of items for vbucket.
1447                 * Bloomfilter's estimated_key_count =
1448                 *                              1.25 * (num_items)
1449                 */
1450
1451                if (residentRatioAlert) {
1452                    estimated_count = round(1.25 *
1453                                            vb->getNumItems(eviction_policy));
1454                } else {
1455                    estimated_count = round(1.25 * (num_deletes +
1456                                vb->getNumNonResidentItems(eviction_policy)));
1457                }
1458            }
1459            if (estimated_count < initial_estimation) {
1460                estimated_count = initial_estimation;
1461            }
1462            vb->initTempFilter(estimated_count, config.getBfilterFpProb());
1463        }
1464
1465        if (vb->getState() == vbucket_state_active) {
1466            // Set the current time ONLY for active vbuckets.
1467            ctx->curr_time = ep_real_time();
1468        } else {
1469            ctx->curr_time = 0;
1470        }
1471        std::shared_ptr<Callback<std::string&, uint64_t&> >
1472           expiry(new ExpiredItemsCallback(this, vbid, ctx->curr_time));
1473        ctx->expiryCallback = expiry;
1474
1475        KVStatsCallback kvcb(this);
1476        if (getRWUnderlying(vbid)->compactDB(ctx, kvcb)) {
1477            if (config.isBfilterEnabled()) {
1478                vb->swapFilter();
1479            } else {
1480                vb->clearFilter();
1481            }
1482        } else {
1483            LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
1484                    "clearing bloom filter, if any.", vb->getId());
1485            vb->clearFilter();
1486        }
1487        vb->setPurgeSeqno(ctx->max_purged_seq);
1488    } else {
1489        err = ENGINE_NOT_MY_VBUCKET;
1490        engine.storeEngineSpecific(cookie, NULL);
1491        //Decrement session counter here, as memcached thread wouldn't
1492        //visit the engine interface in case of a NOT_MY_VB notification
1493        engine.decrementSessionCtr();
1494    }
1495
1496    updateCompactionTasks(ctx->db_file_id);
1497
1498    if (cookie) {
1499        engine.notifyIOComplete(cookie, err);
1500    }
1501    --stats.pendingCompactions;
1502    return false;
1503}
1504
1505void EventuallyPersistentStore::updateCompactionTasks(DBFileId db_file_id) {
1506    LockHolder lh(compactionLock);
1507    bool erased = false, woke = false;
1508    std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1509    while (it != compactionTasks.end()) {
1510        if ((*it).first == db_file_id) {
1511            it = compactionTasks.erase(it);
1512            erased = true;
1513        } else {
1514            ExTask &task = (*it).second;
1515            if (task->getState() == TASK_SNOOZED) {
1516                ExecutorPool::get()->wake(task->getId());
1517                woke = true;
1518            }
1519            ++it;
1520        }
1521        if (erased && woke) {
1522            break;
1523        }
1524    }
1525}
1526
1527bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1528    LockHolder lh(vbsetMutex);
1529    return resetVBucket_UNLOCKED(vbid, lh);
1530}
1531
1532bool EventuallyPersistentStore::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
1533    bool rv(false);
1534
1535    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1536    if (vb) {
1537        vbucket_state_t vbstate = vb->getState();
1538
1539        vbMap.removeBucket(vbid);
1540
1541        checkpointCursorInfoList cursors =
1542                                        vb->checkpointManager.getAllCursors();
1543        // Delete and recreate the vbucket database file
1544        scheduleVBDeletion(vb, NULL, 0);
1545        setVBucketState_UNLOCKED(vbid, vbstate,
1546                                 false/*transfer*/, true/*notifyDcp*/, vbset);
1547
1548        // Copy the all cursors from the old vbucket into the new vbucket
1549        RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1550        newvb->checkpointManager.resetCursors(cursors);
1551
1552        rv = true;
1553    }
1554    return rv;
1555}
1556
1557extern "C" {
1558
1559    typedef struct {
1560        EventuallyPersistentEngine* engine;
1561        std::map<std::string, std::string> smap;
1562    } snapshot_stats_t;
1563
1564    static void add_stat(const char *key, const uint16_t klen,
1565                         const char *val, const uint32_t vlen,
1566                         const void *cookie) {
1567        if (cookie == nullptr) {
1568            throw std::invalid_argument("add_stat: cookie is NULL");
1569        }
1570        void *ptr = const_cast<void *>(cookie);
1571        snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1572        ObjectRegistry::onSwitchThread(snap->engine);
1573
1574        std::string k(key, klen);
1575        std::string v(val, vlen);
1576        snap->smap.insert(std::pair<std::string, std::string>(k, v));
1577    }
1578}
1579
1580void EventuallyPersistentStore::snapshotStats() {
1581    snapshot_stats_t snap;
1582    snap.engine = &engine;
1583    bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1584              engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1585              engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1586
1587    if (rv && stats.isShutdown) {
1588        snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1589                                                              "true" : "false";
1590        std::stringstream ss;
1591        ss << ep_real_time();
1592        snap.smap["ep_shutdown_time"] = ss.str();
1593    }
1594    getOneRWUnderlying()->snapshotStats(snap.smap);
1595}
1596
1597void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1598                                              const hrtime_t start,
1599                                              const hrtime_t stop) {
1600    if (stop >= start && start >= init) {
1601        // skip the measurement if the counter wrapped...
1602        ++stats.bgNumOperations;
1603        hrtime_t w = (start - init) / 1000;
1604        BlockTimer::log(start - init, "bgwait", stats.timingLog);
1605        stats.bgWaitHisto.add(w);
1606        stats.bgWait.fetch_add(w);
1607        atomic_setIfLess(stats.bgMinWait, w);
1608        atomic_setIfBigger(stats.bgMaxWait, w);
1609
1610        hrtime_t l = (stop - start) / 1000;
1611        BlockTimer::log(stop - start, "bgload", stats.timingLog);
1612        stats.bgLoadHisto.add(l);
1613        stats.bgLoad.fetch_add(l);
1614        atomic_setIfLess(stats.bgMinLoad, l);
1615        atomic_setIfBigger(stats.bgMaxLoad, l);
1616    }
1617}
1618
1619void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1620                                                uint16_t vbucket,
1621                                                const void *cookie,
1622                                                hrtime_t init,
1623                                                bool isMeta) {
1624    hrtime_t start(gethrtime());
1625    // Go find the data
1626    RememberingCallback<GetValue> gcb;
1627    if (isMeta) {
1628        gcb.val.setPartial();
1629        ++stats.bg_meta_fetched;
1630    } else {
1631        ++stats.bg_fetched;
1632    }
1633    getROUnderlying(vbucket)->get(key, vbucket, gcb);
1634    gcb.waitForValue();
1635    ENGINE_ERROR_CODE status = gcb.val.getStatus();
1636
1637    // Lock to prevent a race condition between a fetch for restore and delete
1638    LockHolder lh(vbsetMutex);
1639
1640    RCPtr<VBucket> vb = getVBucket(vbucket);
1641    if (vb) {
1642        ReaderLockHolder rlh(vb->getStateLock());
1643        int bucket_num(0);
1644        LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1645        StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1646        if (isMeta) {
1647            if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1648                                              gcb.val.getStatus(), vb->ht))
1649                || ENGINE_KEY_ENOENT == status) {
1650                /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1651                 key is removed from hash table by the time bgfetch returns
1652                 (in case multiple bgfetch is scheduled for a key), we still
1653                 need to return ENGINE_SUCCESS to the memcached worker thread,
1654                 so that the worker thread can visit the ep-engine and figure
1655                 out the correct flow */
1656                status = ENGINE_SUCCESS;
1657            }
1658        } else {
1659            bool restore = false;
1660            if (v && v->isResident()) {
1661                status = ENGINE_SUCCESS;
1662            } else if (v && v->isDeleted()) {
1663                status = ENGINE_KEY_ENOENT;
1664            } else {
1665                switch (eviction_policy) {
1666                    case VALUE_ONLY:
1667                        if (v && !v->isResident() && !v->isDeleted()) {
1668                            restore = true;
1669                        }
1670                        break;
1671                    case FULL_EVICTION:
1672                        if (v) {
1673                            if (v->isTempInitialItem() ||
1674                                (!v->isResident() && !v->isDeleted())) {
1675                                restore = true;
1676                            }
1677                        }
1678                        break;
1679                    default:
1680                        throw std::logic_error("Unknown eviction policy");
1681                }
1682            }
1683
1684            if (restore) {
1685                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1686                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1687                    if (!v->isResident()) {
1688                        throw std::logic_error("EPStore::completeBGFetch: "
1689                                "storedvalue (which has key " + v->getKey() +
1690                                ") should be resident after calling restoreValue()");
1691                    }
1692                    if (vb->getState() == vbucket_state_active &&
1693                        v->getExptime() != gcb.val.getValue()->getExptime() &&
1694                        v->getCas() == gcb.val.getValue()->getCas()) {
1695                        // MB-9306: It is possible that by the time bgfetcher
1696                        // returns, the item may have been updated and queued
1697                        // Hence test the CAS value to be the same first.
1698                        // exptime mutated, schedule it into new checkpoint
1699                        queueDirty(vb, v, &hlh, NULL, GenerateBySeqno::Yes,
1700                                                    GenerateCas::No);
1701                    }
1702                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1703                    v->setNonExistent();
1704                    if (eviction_policy == FULL_EVICTION) {
1705                        // For the full eviction, we should notify
1706                        // ENGINE_SUCCESS to the memcached worker thread, so
1707                        // that the worker thread can visit the ep-engine and
1708                        // figure out the correct error code.
1709                        status = ENGINE_SUCCESS;
1710                    }
1711                } else {
1712                    // underlying kvstore couldn't fetch requested data
1713                    // log returned error and notify TMPFAIL to client
1714                    LOG(EXTENSION_LOG_WARNING,
1715                        "Failed background fetch for vb=%d "
1716                        "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
1717                        key.c_str());
1718                    status = ENGINE_TMPFAIL;
1719                }
1720            }
1721        }
1722    } else {
1723        LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1724            " a bg fetch for key %s\n", vbucket, key.c_str());
1725        status = ENGINE_NOT_MY_VBUCKET;
1726    }
1727
1728    lh.unlock();
1729
1730    hrtime_t stop = gethrtime();
1731    updateBGStats(init, start, stop);
1732    bgFetchQueue--;
1733
1734    delete gcb.val.getValue();
1735    engine.notifyIOComplete(cookie, status);
1736}
1737
1738void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1739                                 std::vector<bgfetched_item_t> &fetchedItems,
1740                                 hrtime_t startTime)
1741{
1742    RCPtr<VBucket> vb = getVBucket(vbId);
1743    if (!vb) {
1744        std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1745        for (; itemItr != fetchedItems.end(); ++itemItr) {
1746            engine.notifyIOComplete((*itemItr).second->cookie,
1747                                    ENGINE_NOT_MY_VBUCKET);
1748        }
1749        LOG(EXTENSION_LOG_WARNING,
1750            "EP Store completes %d of batched background fetch for "
1751            "for vBucket = %d that is already deleted\n",
1752            (int)fetchedItems.size(), vbId);
1753        return;
1754    }
1755
1756    std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1757    for (; itemItr != fetchedItems.end(); ++itemItr) {
1758        VBucketBGFetchItem *bgitem = (*itemItr).second;
1759        ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1760        Item *fetchedValue = bgitem->value.getValue();
1761        const std::string &key = (*itemItr).first;
1762        {   //locking scope
1763            ReaderLockHolder rlh(vb->getStateLock());
1764            int bucket = 0;
1765            LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1766            StoredValue *v = fetchValidValue(vb, key, bucket, true);
1767            if (bgitem->metaDataOnly) {
1768                if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1769                    || ENGINE_KEY_ENOENT == status) {
1770                    /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1771                     key is removed from hash table by the time bgfetch returns
1772                     (in case multiple bgfetch is scheduled for a key), we still
1773                     need to return ENGINE_SUCCESS to the memcached worker thread,
1774                     so that the worker thread can visit the ep-engine and figure
1775                     out the correct flow */
1776                    status = ENGINE_SUCCESS;
1777                }
1778            } else {
1779                bool restore = false;
1780                if (v && v->isResident()) {
1781                    status = ENGINE_SUCCESS;
1782                } else if (v && v->isDeleted()) {
1783                    status = ENGINE_KEY_ENOENT;
1784                } else {
1785                    switch (eviction_policy) {
1786                        case VALUE_ONLY:
1787                            if (v && !v->isResident() && !v->isDeleted()) {
1788                                restore = true;
1789                            }
1790                            break;
1791                        case FULL_EVICTION:
1792                            if (v) {
1793                                if (v->isTempInitialItem() ||
1794                                    (!v->isResident() && !v->isDeleted())) {
1795                                    restore = true;
1796                                }
1797                            }
1798                            break;
1799                        default:
1800                            throw std::logic_error("Unknown eviction policy");
1801                    }
1802                }
1803
1804                if (restore) {
1805                    if (status == ENGINE_SUCCESS) {
1806                        v->unlocked_restoreValue(fetchedValue, vb->ht);
1807                        if (!v->isResident()) {
1808                            throw std::logic_error("EPStore::completeBGFetchMulti: "
1809                                "storedvalue (which has key " + v->getKey() +
1810                                ") should be resident after calling restoreValue()");
1811                        }
1812                        if (vb->getState() == vbucket_state_active &&
1813                            v->getExptime() != fetchedValue->getExptime() &&
1814                            v->getCas() == fetchedValue->getCas()) {
1815                            // MB-9306: It is possible that by the time
1816                            // bgfetcher returns, the item may have been
1817                            // updated and queued
1818                            // Hence test the CAS value to be the same first.
1819                            // exptime mutated, schedule it into new checkpoint
1820                            queueDirty(vb, v, &blh, NULL, GenerateBySeqno::Yes,
1821                                                        GenerateCas::No);
1822                        }
1823                    } else if (status == ENGINE_KEY_ENOENT) {
1824                        v->setNonExistent();
1825                        if (eviction_policy == FULL_EVICTION) {
1826                            // For the full eviction, we should notify
1827                            // ENGINE_SUCCESS to the memcached worker thread,
1828                            // so that the worker thread can visit the
1829                            // ep-engine and figure out the correct error
1830                            // code.
1831                            status = ENGINE_SUCCESS;
1832                        }
1833                    } else {
1834                        // underlying kvstore couldn't fetch requested data
1835                        // log returned error and notify TMPFAIL to client
1836                        LOG(EXTENSION_LOG_WARNING,
1837                            "Failed background fetch for vb=%d "
1838                            "key=%s", vbId, key.c_str());
1839                        status = ENGINE_TMPFAIL;
1840                    }
1841                }
1842            }
1843        } // locked scope ends
1844
1845        if (bgitem->metaDataOnly) {
1846            ++stats.bg_meta_fetched;
1847        } else {
1848            ++stats.bg_fetched;
1849        }
1850
1851        hrtime_t endTime = gethrtime();
1852        updateBGStats(bgitem->initTime, startTime, endTime);
1853        engine.notifyIOComplete(bgitem->cookie, status);
1854    }
1855
1856    LOG(EXTENSION_LOG_DEBUG,
1857        "EP Store completes %" PRIu64 " of batched background fetch "
1858        "for vBucket = %d endTime = %" PRIu64,
1859        uint64_t(fetchedItems.size()), vbId, gethrtime()/1000000);
1860}
1861
1862void EventuallyPersistentStore::bgFetch(const const_sized_buffer key,
1863                                        uint16_t vbucket,
1864                                        const void *cookie,
1865                                        bool isMeta) {
1866    if (multiBGFetchEnabled()) {
1867        RCPtr<VBucket> vb = getVBucket(vbucket);
1868        if (!vb) {
1869            throw std::invalid_argument("EPStore::bgFetch: vbucket (which is " +
1870                                        std::to_string(vbucket) +
1871                                        ") is not present in vbMap");
1872        }
1873        KVShard *myShard = vbMap.getShardByVbId(vbucket);
1874
1875        // schedule to the current batch of background fetch of the given
1876        // vbucket
1877        VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1878                                                                isMeta);
1879        size_t bgfetch_size = vb->queueBGFetchItem(key, fetchThis,
1880                                                   myShard->getBgFetcher());
1881        myShard->getBgFetcher()->notifyBGEvent();
1882        LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1883            uint64_t(bgfetch_size));
1884    } else {
1885        bgFetchQueue++;
1886        stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1887                                            bgFetchQueue.load());
1888        ExecutorPool* iom = ExecutorPool::get();
1889        ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
1890                                              isMeta, bgFetchDelay, false);
1891        iom->schedule(task, READER_TASK_IDX);
1892        LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1893            uint64_t(bgFetchQueue.load()));
1894    }
1895}
1896
1897GetValue EventuallyPersistentStore::getInternal(const const_sized_buffer key,
1898                                                uint16_t vbucket,
1899                                                const void *cookie,
1900                                                vbucket_state_t allowedState,
1901                                                get_options_t options) {
1902
1903    vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1904        vbucket_state_replica : vbucket_state_active;
1905    RCPtr<VBucket> vb = getVBucket(vbucket);
1906    if (!vb) {
1907        ++stats.numNotMyVBuckets;
1908        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1909    }
1910
1911    const bool honorStates = (options & HONOR_STATES);
1912
1913    ReaderLockHolder rlh(vb->getStateLock());
1914    if (honorStates) {
1915        vbucket_state_t vbState = vb->getState();
1916        if (vbState == vbucket_state_dead) {
1917            ++stats.numNotMyVBuckets;
1918            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1919        } else if (vbState == disallowedState) {
1920            ++stats.numNotMyVBuckets;
1921            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1922        } else if (vbState == vbucket_state_pending) {
1923            if (vb->addPendingOp(cookie)) {
1924                return GetValue(NULL, ENGINE_EWOULDBLOCK);
1925            }
1926        }
1927    }
1928
1929    const bool trackReference = (options & TRACK_REFERENCE);
1930
1931    int bucket_num(0);
1932    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1933    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1934                                     trackReference);
1935    if (v) {
1936        if (v->isDeleted()) {
1937            GetValue rv;
1938            return rv;
1939        }
1940        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1941            // Delete a temp non-existent item to ensure that
1942            // if the get were issued over an item that doesn't
1943            // exist, then we dont preserve a temp item.
1944            if (options & DELETE_TEMP) {
1945                vb->ht.unlocked_del(key, bucket_num);
1946            }
1947            GetValue rv;
1948            return rv;
1949        }
1950
1951        // If the value is not resident, wait for it...
1952        if (!v->isResident()) {
1953            if (options & QUEUE_BG_FETCH) {
1954                bgFetch(key, vbucket, cookie);
1955            }
1956            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1957                            true, v->getNRUValue());
1958        }
1959
1960        // Should we hide (return -1) for the items' CAS?
1961        const bool hide_cas = (options & HIDE_LOCKED_CAS) &&
1962                              v->isLocked(ep_current_time());
1963        GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
1964                    v->getBySeqno(), false, v->getNRUValue());
1965        return rv;
1966    } else {
1967        if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1968            GetValue rv;
1969            return rv;
1970        }
1971
1972        if (vb->maybeKeyExistsInFilter(key)) {
1973            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1974            if (options & QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1975                ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1976                                           cookie, false);
1977            }
1978            return GetValue(NULL, ec, -1, true);
1979        } else {
1980            // As bloomfilter predicted that item surely doesn't exist
1981            // on disk, return ENONET, for getInternal().
1982            GetValue rv;
1983            return rv;
1984        }
1985    }
1986}
1987
1988GetValue EventuallyPersistentStore::getRandomKey() {
1989    VBucketMap::id_type max = vbMap.getSize();
1990
1991    const long start = random() % max;
1992    long curr = start;
1993    Item *itm = NULL;
1994
1995    while (itm == NULL) {
1996        RCPtr<VBucket> vb = getVBucket(curr++);
1997        while (!vb || vb->getState() != vbucket_state_active) {
1998            if (curr == start) {
1999                return GetValue(NULL, ENGINE_KEY_ENOENT);
2000            }
2001            if (curr == max) {
2002                curr = 0;
2003            }
2004
2005            vb = getVBucket(curr++);
2006        }
2007
2008        if ((itm = vb->ht.getRandomKey(random())) != NULL) {
2009            GetValue rv(itm, ENGINE_SUCCESS);
2010            return rv;
2011        }
2012
2013        if (curr == max) {
2014            curr = 0;
2015        }
2016
2017        if (curr == start) {
2018            return GetValue(NULL, ENGINE_KEY_ENOENT);
2019        }
2020        // Search next vbucket
2021    }
2022
2023    return GetValue(NULL, ENGINE_KEY_ENOENT);
2024}
2025
2026
2027ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
2028                                                        const std::string &key,
2029                                                        uint16_t vbucket,
2030                                                        const void *cookie,
2031                                                        ItemMetaData &metadata,
2032                                                        uint32_t &deleted,
2033                                                        bool trackReferenced)
2034{
2035    (void) cookie;
2036    RCPtr<VBucket> vb = getVBucket(vbucket);
2037
2038    if (!vb) {
2039        ++stats.numNotMyVBuckets;
2040        return ENGINE_NOT_MY_VBUCKET;
2041    }
2042
2043    ReaderLockHolder rlh(vb->getStateLock());
2044    if (vb->getState() == vbucket_state_dead ||
2045        vb->getState() == vbucket_state_replica) {
2046        ++stats.numNotMyVBuckets;
2047        return ENGINE_NOT_MY_VBUCKET;
2048    }
2049
2050    int bucket_num(0);
2051    deleted = 0;
2052    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2053    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
2054                                          trackReferenced);
2055
2056    if (v) {
2057        stats.numOpsGetMeta++;
2058        if (v->isTempInitialItem()) { // Need bg meta fetch.
2059            bgFetch(key, vbucket, cookie, true);
2060            return ENGINE_EWOULDBLOCK;
2061        } else if (v->isTempNonExistentItem()) {
2062            metadata.cas = v->getCas();
2063            return ENGINE_KEY_ENOENT;
2064        } else {
2065            if (v->isTempDeletedItem() || v->isDeleted() ||
2066                v->isExpired(ep_real_time())) {
2067                deleted |= GET_META_ITEM_DELETED_FLAG;
2068            }
2069
2070            if (v->isLocked(ep_current_time())) {
2071                metadata.cas = static_cast<uint64_t>(-1);
2072            } else {
2073                metadata.cas = v->getCas();
2074            }
2075            metadata.flags = v->getFlags();
2076            metadata.exptime = v->getExptime();
2077            metadata.revSeqno = v->getRevSeqno();
2078            return ENGINE_SUCCESS;
2079        }
2080    } else {
2081        // The key wasn't found. However, this may be because it was previously
2082        // deleted or evicted with the full eviction strategy.
2083        // So, add a temporary item corresponding to the key to the hash table
2084        // and schedule a background fetch for its metadata from the persistent
2085        // store. The item's state will be updated after the fetch completes.
2086        //
2087        // Schedule this bgFetch only if the key is predicted to be may-be
2088        // existent on disk by the bloomfilter.
2089
2090        if (vb->maybeKeyExistsInFilter(key)) {
2091            return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
2092        } else {
2093            stats.numOpsGetMeta++;
2094            return ENGINE_KEY_ENOENT;
2095        }
2096    }
2097}
2098
2099ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
2100                                                     const Item &itm,
2101                                                     uint64_t cas,
2102                                                     uint64_t *seqno,
2103                                                     const void *cookie,
2104                                                     bool force,
2105                                                     bool allowExisting,
2106                                                     uint8_t nru,
2107                                                     GenerateBySeqno genBySeqno,
2108                                                     GenerateCas genCas,
2109                                                     ExtendedMetaData *emd,
2110                                                     bool isReplication)
2111{
2112    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
2113    if (!vb) {
2114        ++stats.numNotMyVBuckets;
2115        return ENGINE_NOT_MY_VBUCKET;
2116    }
2117
2118    ReaderLockHolder rlh(vb->getStateLock());
2119    if (vb->getState() == vbucket_state_dead) {
2120        ++stats.numNotMyVBuckets;
2121        return ENGINE_NOT_MY_VBUCKET;
2122    } else if (vb->getState() == vbucket_state_replica && !force) {
2123        ++stats.numNotMyVBuckets;
2124        return ENGINE_NOT_MY_VBUCKET;
2125    } else if (vb->getState() == vbucket_state_pending && !force) {
2126        if (vb->addPendingOp(cookie)) {
2127            return ENGINE_EWOULDBLOCK;
2128        }
2129    } else if (vb->isTakeoverBackedUp()) {
2130        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a setWithMeta op"
2131                ", becuase takeover is lagging", vb->getId());
2132        return ENGINE_TMPFAIL;
2133    }
2134
2135    //check for the incoming item's CAS validity
2136    if (!Item::isValidCas(itm.getCas())) {
2137        return ENGINE_KEY_EEXISTS;
2138    }
2139
2140    int bucket_num(0);
2141    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
2142    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
2143                                          false);
2144
2145    bool maybeKeyExists = true;
2146    if (!force) {
2147        if (v)  {
2148            if (v->isTempInitialItem()) {
2149                bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
2150                return ENGINE_EWOULDBLOCK;
2151            }
2152
2153            if (!conflictResolver->resolve(*v, itm.getMetaData(), false)) {
2154                ++stats.numOpsSetMetaResolutionFailed;
2155                return ENGINE_KEY_EEXISTS;
2156            }
2157        } else {
2158            if (vb->maybeKeyExistsInFilter(itm.getKey())) {
2159                return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2160                                             cookie, true, isReplication);
2161            } else {
2162                maybeKeyExists = false;
2163            }
2164        }
2165    } else {
2166        if (eviction_policy == FULL_EVICTION) {
2167            // Check Bloomfilter's prediction
2168            if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
2169                maybeKeyExists = false;
2170            }
2171        }
2172    }
2173
2174    if (v && v->isLocked(ep_current_time()) &&
2175        (vb->getState() == vbucket_state_replica ||
2176         vb->getState() == vbucket_state_pending)) {
2177        v->unlock();
2178    }
2179
2180    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
2181                                                true, eviction_policy, nru,
2182                                                maybeKeyExists, isReplication);
2183
2184    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2185    switch (mtype) {
2186    case NOMEM:
2187        ret = ENGINE_ENOMEM;
2188        break;
2189    case INVALID_CAS:
2190    case IS_LOCKED:
2191        ret = ENGINE_KEY_EEXISTS;
2192        break;
2193    case INVALID_VBUCKET:
2194        ret = ENGINE_NOT_MY_VBUCKET;
2195        break;
2196    case WAS_DIRTY:
2197    case WAS_CLEAN:
2198        vb->setMaxCasAndTrackDrift(v->getCas());
2199        queueDirty(vb, v, &lh, seqno, genBySeqno, genCas);
2200        break;
2201    case NOT_FOUND:
2202        ret = ENGINE_KEY_ENOENT;
2203        break;
2204    case NEED_BG_FETCH:
2205        {            // CAS operation with non-resident item + full eviction.
2206            if (v) { // temp item is already created. Simply schedule a
2207                lh.unlock(); // bg fetch job.
2208                bgFetch(itm.getKey(), vb->getId(), cookie, true);
2209                return ENGINE_EWOULDBLOCK;
2210            }
2211
2212            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2213                                        cookie, true, isReplication);
2214        }
2215    }
2216
2217    return ret;
2218}
2219
2220GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
2221                                                    uint16_t vbucket,
2222                                                    const void *cookie,
2223                                                    time_t exptime)
2224{
2225    RCPtr<VBucket> vb = getVBucket(vbucket);
2226    if (!vb) {
2227        ++stats.numNotMyVBuckets;
2228        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2229    }
2230
2231    ReaderLockHolder rlh(vb->getStateLock());
2232    if (vb->getState() == vbucket_state_dead) {
2233        ++stats.numNotMyVBuckets;
2234        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2235    } else if (vb->getState() == vbucket_state_replica) {
2236        ++stats.numNotMyVBuckets;
2237        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2238    } else if (vb->getState() == vbucket_state_pending) {
2239        if (vb->addPendingOp(cookie)) {
2240            return GetValue(NULL, ENGINE_EWOULDBLOCK);
2241        }
2242    }
2243
2244    int bucket_num(0);
2245    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2246    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2247
2248    if (v) {
2249        if (v->isDeleted() || v->isTempDeletedItem() ||
2250            v->isTempNonExistentItem()) {
2251            GetValue rv;
2252            return rv;
2253        }
2254
2255        if (!v->isResident()) {
2256            bgFetch(key, vbucket, cookie);
2257            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
2258        }
2259        if (v->isLocked(ep_current_time())) {
2260            GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
2261            return rv;
2262        }
2263
2264        const bool exptime_mutated = exptime != v->getExptime();
2265        if (exptime_mutated) {
2266            v->markDirty();
2267            v->setExptime(exptime);
2268            v->setRevSeqno(v->getRevSeqno()+1);
2269        }
2270
2271        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
2272                    ENGINE_SUCCESS, v->getBySeqno());
2273
2274        if (exptime_mutated) {
2275            queueDirty(vb, v, &lh, NULL);
2276        }
2277
2278        return rv;
2279    } else {
2280        if (eviction_policy == VALUE_ONLY) {
2281            GetValue rv;
2282            return rv;
2283        } else {
2284            if (vb->maybeKeyExistsInFilter(key)) {
2285                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2286                                                             key, vb, cookie,
2287                                                             false);
2288                return GetValue(NULL, ec, -1, true);
2289            } else {
2290                // As bloomfilter predicted that item surely doesn't exist
2291                // on disk, return ENOENT for getAndUpdateTtl().
2292                GetValue rv;
2293                return rv;
2294            }
2295        }
2296    }
2297}
2298
2299ENGINE_ERROR_CODE
2300EventuallyPersistentStore::statsVKey(const std::string &key,
2301                                     uint16_t vbucket,
2302                                     const void *cookie) {
2303    RCPtr<VBucket> vb = getVBucket(vbucket);
2304    if (!vb) {
2305        return ENGINE_NOT_MY_VBUCKET;
2306    }
2307
2308    int bucket_num(0);
2309    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2310    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2311
2312    if (v) {
2313        if (v->isDeleted() || v->isTempDeletedItem() ||
2314            v->isTempNonExistentItem()) {
2315            return ENGINE_KEY_ENOENT;
2316        }
2317        bgFetchQueue++;
2318        ExecutorPool* iom = ExecutorPool::get();
2319        ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2320                                           v->getBySeqno(), cookie,
2321                                           bgFetchDelay, false);
2322        iom->schedule(task, READER_TASK_IDX);
2323        return ENGINE_EWOULDBLOCK;
2324    } else {
2325        if (eviction_policy == VALUE_ONLY) {
2326            return ENGINE_KEY_ENOENT;
2327        } else {
2328            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2329                                                        eviction_policy);
2330            switch(rv) {
2331            case ADD_NOMEM:
2332                return ENGINE_ENOMEM;
2333            case ADD_EXISTS:
2334            case ADD_UNDEL:
2335            case ADD_SUCCESS:
2336            case ADD_TMP_AND_BG_FETCH:
2337                // Since the hashtable bucket is locked, we shouldn't get here
2338                throw std::logic_error("EventuallyPersistentStore::statsVKey: "
2339                        "Invalid result from unlocked_addTempItem (" +
2340                        std::to_string(rv) + ")");
2341
2342            case ADD_BG_FETCH:
2343                {
2344                    ++bgFetchQueue;
2345                    ExecutorPool* iom = ExecutorPool::get();
2346                    ExTask task = new VKeyStatBGFetchTask(&engine, key,
2347                                                          vbucket, -1, cookie,
2348                                                          bgFetchDelay, false);
2349                    iom->schedule(task, READER_TASK_IDX);
2350                }
2351            }
2352            return ENGINE_EWOULDBLOCK;
2353        }
2354    }
2355}
2356
2357void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2358                                                  std::string &key,
2359                                                  uint16_t vbid,
2360                                                  uint64_t bySeqNum) {
2361    RememberingCallback<GetValue> gcb;
2362
2363    getROUnderlying(vbid)->get(key, vbid, gcb);
2364    gcb.waitForValue();
2365
2366    if (eviction_policy == FULL_EVICTION) {
2367        RCPtr<VBucket> vb = getVBucket(vbid);
2368        if (vb) {
2369            int bucket_num(0);
2370            LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2371            StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2372            if (v && v->isTempInitialItem()) {
2373                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2374                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2375                    if (!v->isResident()) {
2376                        throw std::logic_error("EPStore::completeStatsVKey: "
2377                            "storedvalue (which has key " + v->getKey() +
2378                            ") should be resident after calling restoreValue()");
2379                    }
2380                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2381                    v->setNonExistent();
2382                } else {
2383                    // underlying kvstore couldn't fetch requested data
2384                    // log returned error and notify TMPFAIL to client
2385                    LOG(EXTENSION_LOG_WARNING,
2386                        "Failed background fetch for vb=%d "
2387                        "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
2388                        key.c_str());
2389                }
2390            }
2391        }
2392    }
2393
2394    if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2395        engine.addLookupResult(cookie, gcb.val.getValue());
2396    } else {
2397        engine.addLookupResult(cookie, NULL);
2398    }
2399
2400    bgFetchQueue--;
2401    engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2402}
2403
2404GetValue EventuallyPersistentStore::getLocked(const std::string &key,
2405                                              uint16_t vbucket,
2406                                              rel_time_t currentTime,
2407                                              uint32_t lockTimeout,
2408                                              const void *cookie) {
2409    RCPtr<VBucket> vb = getVBucket(vbucket);
2410    if (!vb || vb->getState() != vbucket_state_active) {
2411        ++stats.numNotMyVBuckets;
2412        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2413    }
2414
2415    int bucket_num(0);
2416    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2417    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2418
2419    if (v) {
2420        if (v->isDeleted() || v->isTempNonExistentItem() ||
2421            v->isTempDeletedItem()) {
2422            return GetValue(NULL, ENGINE_KEY_ENOENT);
2423        }
2424
2425        // if v is locked return error
2426        if (v->isLocked(currentTime)) {
2427            return GetValue(NULL, ENGINE_TMPFAIL);
2428        }
2429
2430        // If the value is not resident, wait for it...
2431        if (!v->isResident()) {
2432            if (cookie) {
2433                bgFetch(key, vbucket, cookie);
2434            }
2435            return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
2436        }
2437
2438        // acquire lock and increment cas value
2439        v->lock(currentTime + lockTimeout);
2440
2441        Item *it = v->toItem(false, vbucket);
2442        it->setCas(vb->nextHLCCas());
2443        v->setCas(it->getCas());
2444
2445        return GetValue(it);
2446
2447    } else {
2448        // No value found in the hashtable.
2449        switch (eviction_policy) {
2450        case VALUE_ONLY:
2451            return GetValue(NULL, ENGINE_KEY_ENOENT);
2452
2453        case FULL_EVICTION:
2454            if (vb->maybeKeyExistsInFilter(key)) {
2455                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2456                                                             key, vb, cookie,
2457                                                             false);
2458                return GetValue(NULL, ec, -1, true);
2459            } else {
2460                // As bloomfilter predicted that item surely doesn't exist
2461                // on disk, return ENOENT for getLocked().
2462                return GetValue(NULL, ENGINE_KEY_ENOENT);
2463            }
2464        default:
2465            throw std::logic_error("Unknown eviction policy");
2466        }
2467    }
2468}
2469
2470ENGINE_ERROR_CODE
2471EventuallyPersistentStore::unlockKey(const std::string &key,
2472                                     uint16_t vbucket,
2473                                     uint64_t cas,
2474                                     rel_time_t currentTime)
2475{
2476
2477    RCPtr<VBucket> vb = getVBucket(vbucket);
2478    if (!vb || vb->getState() != vbucket_state_active) {
2479        ++stats.numNotMyVBuckets;
2480        return ENGINE_NOT_MY_VBUCKET;
2481    }
2482
2483    int bucket_num(0);
2484    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2485    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2486
2487    if (v) {
2488        if (v->isDeleted() || v->isTempNonExistentItem() ||
2489            v->isTempDeletedItem()) {
2490            return ENGINE_KEY_ENOENT;
2491        }
2492        if (v->isLocked(currentTime)) {
2493            if (v->getCas() == cas) {
2494                v->unlock();
2495                return ENGINE_SUCCESS;
2496            }
2497        }
2498        return ENGINE_TMPFAIL;
2499    } else {
2500        if (eviction_policy == VALUE_ONLY) {
2501            return ENGINE_KEY_ENOENT;
2502        } else {
2503            // With the full eviction, an item's lock is automatically
2504            // released when the item is evicted from memory. Therefore,
2505            // we simply return ENGINE_TMPFAIL when we receive unlockKey
2506            // for an item that is not in memocy cache. Note that we don't
2507            // spawn any bg fetch job to figure out if an item actually
2508            // exists in disk or not.
2509            return ENGINE_TMPFAIL;
2510        }
2511    }
2512}
2513
2514
2515ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2516                                            const std::string &key,
2517                                            uint16_t vbucket,
2518                                            const void *cookie,
2519                                            struct key_stats &kstats,
2520                                            bool bgfetch,
2521                                            bool wantsDeleted)
2522{
2523    RCPtr<VBucket> vb = getVBucket(vbucket);
2524    if (!vb) {
2525        return ENGINE_NOT_MY_VBUCKET;
2526    }
2527
2528    int bucket_num(0);
2529    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2530    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2531
2532    if (v) {
2533        if ((v->isDeleted() && !wantsDeleted) ||
2534            v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2535            return ENGINE_KEY_ENOENT;
2536        }
2537        if (eviction_policy == FULL_EVICTION &&
2538            v->isTempInitialItem() && bgfetch) {
2539            lh.unlock();
2540            bgFetch(key, vbucket, cookie, true);
2541            return ENGINE_EWOULDBLOCK;
2542        }
2543        kstats.logically_deleted = v->isDeleted();
2544        kstats.dirty = v->isDirty();
2545        kstats.exptime = v->getExptime();
2546        kstats.flags = v->getFlags();
2547        kstats.cas = v->getCas();
2548        kstats.vb_state = vb->getState();
2549        return ENGINE_SUCCESS;
2550    } else {
2551        if (eviction_policy == VALUE_ONLY) {
2552            return ENGINE_KEY_ENOENT;
2553        } else {
2554            if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
2555                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2556                                             cookie, true);
2557            } else {
2558                // If bgFetch were false, or bloomfilter predicted that
2559                // item surely doesn't exist on disk, return ENOENT for
2560                // getKeyStats().
2561                return ENGINE_KEY_ENOENT;
2562            }
2563        }
2564    }
2565}
2566
2567std::string EventuallyPersistentStore::validateKey(const std::string &key,
2568                                                   uint16_t vbucket,
2569                                                   Item &diskItem) {
2570    int bucket_num(0);
2571    RCPtr<VBucket> vb = getVBucket(vbucket);
2572    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2573    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2574                                     false, true);
2575
2576    if (v) {
2577        if (v->isDeleted() || v->isTempNonExistentItem() ||
2578            v->isTempDeletedItem()) {
2579            return "item_deleted";
2580        }
2581
2582        if (diskItem.getFlags() != v->getFlags()) {
2583            return "flags_mismatch";
2584        } else if (v->isResident() && memcmp(diskItem.getData(),
2585                                             v->getValue()->getData(),
2586                                             diskItem.getNBytes())) {
2587            return "data_mismatch";
2588        } else {
2589            return "valid";
2590        }
2591    } else {
2592        return "item_deleted";
2593    }
2594
2595}
2596
2597ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2598                                                        uint64_t *cas,
2599                                                        uint16_t vbucket,
2600                                                        const void *cookie,
2601                                                        bool force,
2602                                                        ItemMetaData *itemMeta,
2603                                                        mutation_descr_t
2604                                                        *mutInfo)
2605{
2606    RCPtr<VBucket> vb = getVBucket(vbucket);
2607    if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2608        ++stats.numNotMyVBuckets;
2609        return ENGINE_NOT_MY_VBUCKET;
2610    } else if (vb->getState() == vbucket_state_replica && !force) {
2611        ++stats.numNotMyVBuckets;
2612        return ENGINE_NOT_MY_VBUCKET;
2613    } else if (vb->getState() == vbucket_state_pending && !force) {
2614        if (vb->addPendingOp(cookie)) {
2615            return ENGINE_EWOULDBLOCK;
2616        }
2617    } else if (vb->isTakeoverBackedUp()) {
2618        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a delete op"
2619                ", becuase takeover is lagging", vb->getId());
2620        return ENGINE_TMPFAIL;
2621    }
2622
2623    int bucket_num(0);
2624    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2625    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2626    if (!v || v->isDeleted() || v->isTempItem()) {
2627        if (eviction_policy == VALUE_ONLY) {
2628            return ENGINE_KEY_ENOENT;
2629        } else { // Full eviction.
2630            if (!force) {
2631                if (!v) { // Item might be evicted from cache.
2632                    if (vb->maybeKeyExistsInFilter(key)) {
2633                        return addTempItemForBgFetch(lh, bucket_num, key, vb,
2634                                                     cookie, true);
2635                    } else {
2636                        // As bloomfilter predicted that item surely doesn't
2637                        // exist on disk, return ENOENT for deleteItem().
2638                        return ENGINE_KEY_ENOENT;
2639                    }
2640                } else if (v->isTempInitialItem()) {
2641                    lh.unlock();
2642                    bgFetch(key, vbucket, cookie, true);
2643                    return ENGINE_EWOULDBLOCK;
2644                } else { // Non-existent or deleted key.
2645                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2646                        // Delete a temp non-existent item to ensure that
2647                        // if a delete were issued over an item that doesn't
2648                        // exist, then we don't preserve a temp item.
2649                        vb->ht.unlocked_del(key, bucket_num);
2650                    }
2651                    return ENGINE_KEY_ENOENT;
2652                }
2653            } else {
2654                if (!v) { // Item might be evicted from cache.
2655                    // Create a temp item and delete it below as it is a
2656                    // force deletion, only if bloomfilter predicts that
2657                    // item may exist on disk.
2658                    if (vb->maybeKeyExistsInFilter(key)) {
2659                        add_type_t rv = vb->ht.unlocked_addTempItem(
2660                                                               bucket_num,
2661                                                               key,
2662                                                               eviction_policy);
2663                        if (rv == ADD_NOMEM) {
2664                            return ENGINE_ENOMEM;
2665                        }
2666                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
2667                        v->setDeleted();
2668                    } else {
2669                        return ENGINE_KEY_ENOENT;
2670                    }
2671                } else if (v->isTempInitialItem()) {
2672                    v->setDeleted();
2673                } else { // Non-existent or deleted key.
2674                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2675                        // Delete a temp non-existent item to ensure that
2676                        // if a delete were issued over an item that doesn't
2677                        // exist, then we don't preserve a temp item.
2678                        vb->ht.unlocked_del(key, bucket_num);
2679                    }
2680                    return ENGINE_KEY_ENOENT;
2681                }
2682            }
2683        }
2684    }
2685
2686    if (v && v->isLocked(ep_current_time()) &&
2687        (vb->getState() == vbucket_state_replica ||
2688         vb->getState() == vbucket_state_pending)) {
2689        v->unlock();
2690    }
2691    mutation_type_t delrv;
2692    delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2693    if (v && (delrv == NOT_FOUND || delrv == WAS_DIRTY || delrv == WAS_CLEAN)) {
2694        if (itemMeta != nullptr) {
2695            itemMeta->revSeqno = v->getRevSeqno();
2696            itemMeta->cas = v->getCas();
2697            itemMeta->flags = v->getFlags();
2698            itemMeta->exptime = v->getExptime();
2699        }
2700    }
2701
2702    uint64_t seqno = 0;
2703    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2704    switch (delrv) {
2705    case NOMEM:
2706        ret = ENGINE_ENOMEM;
2707        break;
2708    case INVALID_VBUCKET:
2709        ret = ENGINE_NOT_MY_VBUCKET;
2710        break;
2711    case INVALID_CAS:
2712        ret = ENGINE_KEY_EEXISTS;
2713        break;
2714    case IS_LOCKED:
2715        ret = ENGINE_TMPFAIL;
2716        break;
2717    case NOT_FOUND:
2718        ret = ENGINE_KEY_ENOENT;
2719    case WAS_CLEAN:
2720    case WAS_DIRTY:
2721        if (v) {
2722            // Keep lh as we need to do v->getCas
2723            queueDirty(vb, v, nullptr, &seqno);
2724            *cas = v->getCas();
2725        }
2726
2727        if (delrv != NOT_FOUND) {
2728            mutInfo->seqno = seqno;
2729            mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
2730            if (itemMeta != nullptr) {
2731                itemMeta->cas = v->getCas();
2732            }
2733        }
2734        break;
2735    case NEED_BG_FETCH:
2736        // We already figured out if a bg fetch is requred for a full-evicted
2737        // item above.
2738        throw std::logic_error("EventuallyPersistentStore::deleteItem: "
2739                "Unexpected NEEDS_BG_FETCH from unlocked_softDelete");
2740    }
2741    return ret;
2742}
2743
2744ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2745                                                     const std::string &key,
2746                                                     uint64_t *cas,
2747                                                     uint64_t *seqno,
2748                                                     uint16_t vbucket,
2749                                                     const void *cookie,
2750                                                     bool force,
2751                                                     ItemMetaData *itemMeta,
2752                                                     bool tapBackfill,
2753                                                     GenerateBySeqno genBySeqno,
2754                                                     GenerateCas generateCas,
2755                                                     uint64_t bySeqno,
2756                                                     ExtendedMetaData *emd,
2757                                                     bool isReplication)
2758{
2759    RCPtr<VBucket> vb = getVBucket(vbucket);
2760
2761    if (!vb) {
2762        ++stats.numNotMyVBuckets;
2763        return ENGINE_NOT_MY_VBUCKET;
2764    }
2765
2766    ReaderLockHolder rlh(vb->getStateLock());
2767    if (vb->getState() == vbucket_state_dead) {
2768        ++stats.numNotMyVBuckets;
2769        return ENGINE_NOT_MY_VBUCKET;
2770    } else if (vb->getState() == vbucket_state_replica && !force) {
2771        ++stats.numNotMyVBuckets;
2772        return ENGINE_NOT_MY_VBUCKET;
2773    } else if (vb->getState() == vbucket_state_pending && !force) {
2774        if (vb->addPendingOp(cookie)) {
2775            return ENGINE_EWOULDBLOCK;
2776        }
2777    } else if (vb->isTakeoverBackedUp()) {
2778        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a deleteWithMeta "
2779                "op, because takeover is lagging", vb->getId());
2780        return ENGINE_TMPFAIL;
2781    }
2782
2783    //check for the incoming item's CAS validity
2784    if (!Item::isValidCas(itemMeta->cas)) {
2785        return ENGINE_KEY_EEXISTS;
2786    }
2787
2788    int bucket_num(0);
2789    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2790    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2791    if (!force) { // Need conflict resolution.
2792        if (v)  {
2793            if (v->isTempInitialItem()) {
2794                bgFetch(key, vbucket, cookie, true);
2795                return ENGINE_EWOULDBLOCK;
2796            }
2797
2798            if (!conflictResolver->resolve(*v, *itemMeta, true)) {
2799                ++stats.numOpsDelMetaResolutionFailed;
2800                return ENGINE_KEY_EEXISTS;
2801            }
2802        } else {
2803            // Item is 1) deleted or not existent in the value eviction case OR
2804            // 2) deleted or evicted in the full eviction.
2805            if (vb->maybeKeyExistsInFilter(key)) {
2806                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2807                                             cookie, true, isReplication);
2808            } else {
2809                // Even though bloomfilter predicted that item doesn't exist
2810                // on disk, we must put this delete on disk if the cas is valid.
2811                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2812                                                            eviction_policy,
2813                                                            isReplication);
2814                if (rv == ADD_NOMEM) {
2815                    return ENGINE_ENOMEM;
2816                }
2817                v = vb->ht.unlocked_find(key, bucket_num, true, false);
2818                v->setDeleted();
2819            }
2820        }
2821    } else {
2822        if (!v) {
2823            // We should always try to persist a delete here.
2824            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2825                                                        eviction_policy,
2826                                                        isReplication);
2827            if (rv == ADD_NOMEM) {
2828                return ENGINE_ENOMEM;
2829            }
2830            v = vb->ht.unlocked_find(key, bucket_num, true, false);
2831            v->setDeleted();
2832            v->setCas(*cas);
2833        } else if (v->isTempInitialItem()) {
2834            v->setDeleted();
2835            v->setCas(*cas);
2836        }
2837    }
2838
2839    if (v && v->isLocked(ep_current_time()) &&
2840        (vb->getState() == vbucket_state_replica ||
2841         vb->getState() == vbucket_state_pending)) {
2842        v->unlock();
2843    }
2844    mutation_type_t delrv;
2845    delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2846                                       eviction_policy, true);
2847    *cas = v ? v->getCas() : 0;
2848
2849    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2850    switch (delrv) {
2851    case NOMEM:
2852        ret = ENGINE_ENOMEM;
2853        break;
2854    case INVALID_VBUCKET:
2855        ret = ENGINE_NOT_MY_VBUCKET;
2856        break;
2857    case INVALID_CAS:
2858        ret = ENGINE_KEY_EEXISTS;
2859        break;
2860    case IS_LOCKED:
2861        ret = ENGINE_TMPFAIL;
2862        break;
2863    case NOT_FOUND:
2864        ret = ENGINE_KEY_ENOENT;
2865        break;
2866    case WAS_DIRTY:
2867    case WAS_CLEAN:
2868        if (genBySeqno == GenerateBySeqno::No) {
2869            v->setBySeqno(bySeqno);
2870        }
2871
2872        vb->setMaxCasAndTrackDrift(v->getCas());
2873
2874        if (tapBackfill) {
2875            tapQueueDirty(*vb, v, lh, seqno, genBySeqno);
2876        } else {
2877            queueDirty(vb, v, &lh, seqno, genBySeqno, generateCas);
2878        }
2879        break;
2880    case NEED_BG_FETCH:
2881        lh.unlock();
2882        bgFetch(key, vbucket, cookie, true);
2883        ret = ENGINE_EWOULDBLOCK;
2884    }
2885
2886    return ret;
2887}
2888
2889void EventuallyPersistentStore::reset() {
2890    auto buckets = vbMap.getBuckets();
2891    for (auto vbid : buckets) {
2892        RCPtr<VBucket> vb = getVBucket(vbid);
2893        if (vb) {
2894            LockHolder lh(vb_mutexes[vb->getId()]);
2895            vb->ht.clear();
2896            vb->checkpointManager.clear(vb->getState());
2897            vb->resetStats();
2898            vb->setPersistedSnapshot(0, 0);
2899        }
2900    }
2901
2902    ++stats.diskQueueSize;
2903    bool inverse = true;
2904    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
2905    // Waking up (notifying) one flusher is good enough for diskFlushAll
2906    vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2907}
2908
2909/**
2910 * Callback invoked after persisting an item from memory to disk.
2911 *
2912 * This class exists to create a closure around a few variables within
2913 * EventuallyPersistentStore::flushOne so that an object can be
2914 * requeued in case of failure to store in the underlying layer.
2915 */
2916class PersistenceCallback : public Callback<mutation_result>,
2917                            public Callback<int> {
2918public:
2919
2920    PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2921                        EventuallyPersistentStore& st, EPStats& s, uint64_t c)
2922        : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2923        if (!vb) {
2924            throw std::invalid_argument("PersistenceCallback(): vb is NULL");
2925        }
2926    }
2927
2928    // This callback is invoked for set only.
2929    void callback(mutation_result &value) {
2930        if (value.first == 1) {
2931            int bucket_num(0);
2932            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2933                                                        &bucket_num);
2934            StoredValue *v = store.fetchValidValue(vbucket,
2935                                                   queuedItem->getKey(),
2936                                                   bucket_num, true, false);
2937            if (v) {
2938                if (v->getCas() == cas) {
2939                    // mark this item clean only if current and stored cas
2940                    // value match
2941                    v->markClean();
2942                }
2943                if (v->isNewCacheItem()) {
2944                    if (value.second) {
2945                        // Insert in value-only or full eviction mode.
2946                        ++vbucket->opsCreate;
2947                        vbucket->incrMetaDataDisk(*queuedItem);
2948                    } else { // Update in full eviction mode.
2949                        vbucket->ht.decrNumTotalItems();
2950                        ++vbucket->opsUpdate;
2951                    }
2952                    v->setNewCacheItem(false);
2953                } else { // Update in value-only or full eviction mode.
2954                    ++vbucket->opsUpdate;
2955                }
2956            }
2957
2958            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2959            stats.decrDiskQueueSize(1);
2960            stats.totalPersisted++;
2961        } else {
2962            // If the return was 0 here, we're in a bad state because
2963            // we do not know the rowid of this object.
2964            if (value.first == 0) {
2965                int bucket_num(0);
2966                LockHolder lh = vbucket->ht.getLockedBucket(
2967                                           queuedItem->getKey(), &bucket_num);
2968                StoredValue *v = store.fetchValidValue(vbucket,
2969                                                       queuedItem->getKey(),
2970                                                       bucket_num, true,
2971                                                       false);
2972                if (v) {
2973                    std::stringstream ss;
2974                    ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2975                       << queuedItem->getVBucketId() << " (rowid="
2976                       << v->getBySeqno() << ") returned 0 updates\n";
2977                    LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2978                } else {
2979                    LOG(EXTENSION_LOG_WARNING,
2980                        "Error persisting now missing ``%s'' from vb%d",
2981                        queuedItem->getKey().c_str(),
2982                        queuedItem->getVBucketId());
2983                }
2984
2985                vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2986                stats.decrDiskQueueSize(1);
2987            } else {
2988                std::stringstream ss;
2989                ss <<
2990                "Fatal error in persisting SET ``" <<
2991                queuedItem->getKey() << "'' on vb "
2992                   << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2993                LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2994                redirty();
2995            }
2996        }
2997    }
2998
2999    // This callback is invoked for deletions only.
3000    //
3001    // The boolean indicates whether the underlying storage
3002    // successfully deleted the item.
3003    void callback(int &value) {
3004        // > 1 would be bad.  We were only trying to delete one row.
3005        if (value > 1) {
3006            throw std::logic_error("PersistenceCallback::callback: value "
3007                    "(which is " + std::to_string(value) +
3008                    ") should be <= 1 for deletions");
3009        }
3010        // -1 means fail
3011        // 1 means we deleted one row
3012        // 0 means we did not delete a row, but did not fail (did not exist)
3013        if (value >= 0) {
3014            // We have successfully removed an item from the disk, we
3015            // may now remove it from the hash table.
3016            int bucket_num(0);
3017            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
3018                                                        &bucket_num);
3019            StoredValue *v = store.fetchValidValue(vbucket,
3020                                                   queuedItem->getKey(),
3021                                                   bucket_num, true, false);
3022            // Delete the item in the hash table iff:
3023            //  1. Item is existent in hashtable, and deleted flag is true
3024            //  2. rev seqno of queued item matches rev seqno of hash table item
3025            if (v && v->isDeleted() &&
3026                (queuedItem->getRevSeqno() == v->getRevSeqno())) {
3027                bool newCacheItem = v->isNewCacheItem();
3028                bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
3029                                                        bucket_num);
3030                if (!deleted) {
3031                    throw std::logic_error("PersistenceCallback:callback: "
3032                            "Failed to delete key '" + queuedItem->getKey() +
3033                            "' from bucket " + std::to_string(bucket_num));
3034                }
3035                if (newCacheItem && value > 0) {
3036                    // Need to decrement the item counter again for an item that
3037                    // exists on DB file, but not in memory (i.e., full eviction),
3038                    // because we created the temp item in memory and incremented
3039                    // the item counter when a deletion is pushed in the queue.
3040                    vbucket->ht.decrNumTotalItems();
3041                }
3042
3043                /**
3044                 * Deleted items are to be added to the bloomfilter,
3045                 * in either eviction policy.
3046                 */
3047                vbucket->addToFilter(queuedItem->getKey());
3048            }
3049
3050            if (value > 0) {
3051                ++stats.totalPersisted;
3052                ++vbucket->opsDelete;
3053            }
3054            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3055            stats.decrDiskQueueSize(1);
3056            vbucket->decrMetaDataDisk(*queuedItem);
3057        } else {
3058            std::stringstream ss;
3059            ss << "Fatal error in persisting DELETE ``" <<
3060            queuedItem->getKey() << "'' on vb "
3061               << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3062            LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3063            redirty();
3064        }
3065    }
3066
3067private:
3068
3069    void redirty() {
3070        if (store.vbMap.isBucketDeletion(vbucket->getId())) {
3071            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3072            stats.decrDiskQueueSize(1);
3073            return;
3074        }
3075        ++stats.flushFailed;
3076        store.invokeOnLockedStoredValue(queuedItem->getKey(),
3077                                         queuedItem->getVBucketId(),
3078                                         &StoredValue::reDirty);
3079        vbucket->rejectQueue.push(queuedItem);
3080        ++vbucket->opsReject;
3081    }
3082
3083    const queued_item queuedItem;
3084    RCPtr<VBucket> vbucket;
3085    EventuallyPersistentStore& store;
3086    EPStats& stats;
3087    uint64_t cas;
3088    DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
3089};
3090
3091bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
3092                                                     time_t when) {
3093    bool inverse = false;
3094    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
3095        flushAllTaskCtx.cookie = cookie;
3096        flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3097        ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
3098        ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
3099        return true;
3100    } else {
3101        return false;
3102    }
3103}
3104
3105void EventuallyPersistentStore::setFlushAllComplete() {
3106    // Notify memcached about flushAll task completion, and
3107    // set diskFlushall flag to false
3108    if (flushAllTaskCtx.cookie) {
3109        engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
3110    }
3111    bool inverse = false;
3112    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3113    inverse = true;
3114    diskFlushAll.compare_exchange_strong(inverse, false);
3115}
3116
3117void EventuallyPersistentStore::flushOneDeleteAll() {
3118    for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
3119        RCPtr<VBucket> vb = getVBucket(i);
3120        // Reset the vBucket if it's non-null and not already in the middle of
3121        // being created / destroyed.
3122        if (vb &&
3123            !(vbMap.isBucketCreation(i) || vbMap.isBucketDeletion(i))) {
3124            LockHolder lh(vb_mutexes[vb->getId()]);
3125            getRWUnderlying(vb->getId())->reset(i);
3126        }
3127    }
3128
3129    stats.decrDiskQueueSize(1);
3130    setFlushAllComplete();
3131}
3132
3133int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
3134    KVShard *shard = vbMap.getShardByVbId(vbid);
3135    if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
3136        if (shard->getId() == EP_PRIMARY_SHARD) {
3137            flushOneDeleteAll();
3138        } else {
3139            // disk flush is pending just return
3140            return 0;
3141        }
3142    }
3143
3144    int items_flushed = 0;
3145    const rel_time_t flush_start = ep_current_time();
3146
3147    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3148    if (vb) {
3149        LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3150        if (!lh.islocked()) { // Try another bucket if this one is locked
3151            return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
3152        }
3153
3154        std::vector<queued_item> items;
3155        KVStore *rwUnderlying = getRWUnderlying(vbid);
3156
3157        while (!vb->rejectQueue.empty()) {
3158            items.push_back(vb->rejectQueue.front());
3159            vb->rejectQueue.pop();
3160        }
3161
3162        // Append any 'backfill' items (mutations added by a TAP stream).
3163        vb->getBackfillItems(items);
3164
3165        // Append all items outstanding for the persistence cursor.
3166        snapshot_range_t range;
3167        range = vb->checkpointManager.getAllItemsForCursor(
3168                CheckpointManager::pCursorName, items);
3169
3170        if (!items.empty()) {
3171            while (!rwUnderlying->begin()) {
3172                ++stats.beginFailed;
3173                LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
3174                    "Retry in 1 sec ...");
3175                sleep(1);
3176            }
3177            rwUnderlying->optimizeWrites(items);
3178
3179            Item *prev = NULL;
3180            auto vbstate = vb->getVBucketState();
3181            uint64_t maxSeqno = 0;
3182            range.start = std::max(range.start, vbstate.lastSnapStart);
3183
3184            bool mustCheckpointVBState = false;
3185            std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
3186
3187            for (const auto& item : items) {
3188
3189                if (!item->shouldPersist()) {
3190                    continue;
3191                }
3192
3193                if (item->getOperation() == queue_op::set_vbucket_state) {
3194                    // No actual item explicitly persisted to (this op exists
3195                    // to ensure a commit occurs with the current vbstate);
3196                    // flag that we must trigger a snapshot even if there are
3197                    // no 'real' items in the checkpoint.
3198                    mustCheckpointVBState = true;
3199
3200                    // Update queuing stats how this item has logically been
3201                    // processed.
3202                    stats.decrDiskQueueSize(1);
3203                    vb->doStatsForFlushing(*item, item->size());
3204
3205                } else if (!prev || prev->getKey() != item->getKey()) {
3206                    prev = item.get();
3207                    ++items_flushed;
3208                    PersistenceCallback *cb = flushOneDelOrSet(item, vb);
3209                    if (cb) {
3210                        pcbs.push_back(cb);
3211                    }
3212
3213                    maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
3214                    vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
3215                    if (item->isDeleted()) {
3216                        vbstate.maxDeletedSeqno =
3217                                std::max(vbstate.maxDeletedSeqno,
3218                                         item->getRevSeqno());
3219                    }
3220                    ++stats.flusher_todo;
3221
3222                } else {
3223                    // Item is the same key as the previous[1] one - don't need
3224                    // to flush to disk.
3225                    // [1] Previous here really means 'next' - optimizeWrites()
3226                    //     above has actually re-ordered items such that items
3227                    //     with the same key are ordered from high->low seqno.
3228                    //     This means we only write the highest (i.e. newest)
3229                    //     item for a given key, and discard any duplicate,
3230                    //     older items.
3231                    stats.decrDiskQueueSize(1);
3232                    vb->doStatsForFlushing(*item, item->size());
3233                }
3234            }
3235
3236
3237            {
3238                ReaderLockHolder rlh(vb->getStateLock());
3239                if (vb->getState() == vbucket_state_active) {
3240                    if (maxSeqno) {
3241                        range.start = maxSeqno;
3242                        range.end = maxSeqno;
3243                    }
3244                }
3245
3246                // Update VBstate based on the changes we have just made,
3247                // then tell the rwUnderlying the 'new' state
3248                // (which will persisted as part of the commit() below).
3249                vbstate.lastSnapStart = range.start;
3250                vbstate.lastSnapEnd = range.end;
3251
3252                // Do we need to trigger a persist of the state?
3253                // If there are no "real" items to flush, and we encountered
3254                // a set_vbucket_state meta-item.
3255                const bool persist = (items_flushed == 0) && mustCheckpointVBState;
3256
3257                KVStatsCallback kvcb(this);
3258                if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
3259                                                  &kvcb, persist) != true) {
3260                    return RETRY_FLUSH_VBUCKET;
3261                }
3262
3263                if (vbMap.setBucketCreation(vbid, false)) {
3264                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3265                }
3266            }
3267
3268            // Commit all mutations to disk if there is a non-zero number
3269            // of items to flush, and the commit interval is zero.
3270            if ((items_flushed > 0) &&
3271                (decrCommitInterval(shard->getId()) == 0)) {
3272
3273                commit(shard->getId());
3274
3275                // Now the commit is complete, vBucket file must exist.
3276                if (vbMap.setBucketCreation(vbid, false)) {
3277                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3278                }
3279            }
3280
3281            hrtime_t end = gethrtime();
3282            uint64_t trans_time = (end - flush_start) / 1000000;
3283
3284            lastTransTimePerItem.store((items_flushed == 0) ? 0 :
3285                                       static_cast<double>(trans_time) /
3286                                       static_cast<double>(items_flushed));
3287            stats.cumulativeFlushTime.fetch_add(ep_current_time()
3288                                                - flush_start);
3289            stats.flusher_todo.store(0);
3290            stats.totalPersistVBState++;
3291
3292            if (vb->rejectQueue.empty()) {
3293                vb->setPersistedSnapshot(range.start, range.end);
3294                uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
3295                if (highSeqno > 0 &&
3296                    highSeqno != vbMap.getPersistenceSeqno(vbid)) {
3297                    vbMap.setPersistenceSeqno(vbid, highSeqno);
3298                }
3299            }
3300        }
3301
3302        rwUnderlying->pendingTasks();
3303
3304        if (vb->checkpointManager.getNumCheckpoints() > 1) {
3305            wakeUpCheckpointRemover();
3306        }
3307
3308        if (vb->rejectQueue.empty()) {
3309            vb->checkpointManager.itemsPersisted();
3310            uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
3311            uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
3312            vb->notifyOnPersistence(engine, seqno, true);
3313            vb->notifyOnPersistence(engine, chkid, false);
3314            if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
3315                vbMap.setPersistenceCheckpointId(vbid, chkid);
3316            }
3317        } else {
3318            return RETRY_FLUSH_VBUCKET;
3319        }
3320    }
3321
3322    return items_flushed;
3323}
3324
3325void EventuallyPersistentStore::commit(uint16_t shardId) {
3326    KVStore *rwUnderlying = getRWUnderlyingByShard(shardId);
3327    std::list<PersistenceCallback *>& pcbs = rwUnderlying->getPersistenceCbList();
3328    BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
3329    hrtime_t commit_start = gethrtime();
3330
3331    KVStatsCallback cb(this);
3332    while (!rwUnderlying->commit(&cb)) {
3333        ++stats.commitFailed;
3334        LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
3335            "1 sec...\n");
3336        sleep(1);
3337    }
3338
3339    while (!pcbs.empty()) {
3340         delete pcbs.front();
3341         pcbs.pop_front();
3342    }
3343
3344    ++stats.flusherCommits;
3345    hrtime_t commit_end = gethrtime();
3346    uint64_t commit_time = (commit_end - commit_start) / 1000000;
3347    stats.commit_time.store(commit_time);
3348    stats.cumulativeCommitTime.fetch_add(commit_time);
3349}
3350
3351PersistenceCallback*
3352EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
3353                                            RCPtr<VBucket> &vb) {
3354
3355    if (!vb) {
3356        stats.decrDiskQueueSize(1);
3357        return NULL;
3358    }
3359
3360    int64_t bySeqno = qi->getBySeqno();
3361    bool deleted = qi->isDeleted();
3362    rel_time_t queued(qi->getQueuedTime());
3363
3364    int dirtyAge = ep_current_time() - queued;
3365    stats.dirtyAgeHisto.add(dirtyAge * 1000000);
3366    stats.dirtyAge.store(dirtyAge);
3367    stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
3368                                         stats.dirtyAgeHighWat.load()));
3369
3370    KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
3371    if (!deleted) {
3372        // TODO: Need to separate disk_insert from disk_update because
3373        // bySeqno doesn't give us that information.
3374        BlockTimer timer(bySeqno == -1 ?
3375                         &stats.diskInsertHisto : &stats.diskUpdateHisto,
3376                         bySeqno == -1 ? "disk_insert" : "disk_update",
3377                         stats.timingLog);
3378        PersistenceCallback *cb =
3379            new PersistenceCallback(qi, vb, *this, stats, qi->getCas());
3380        rwUnderlying->set(*qi, *cb);
3381        return cb;
3382    } else {
3383        BlockTimer timer(&stats.diskDelHisto, "disk_delete",
3384                         stats.timingLog);
3385        PersistenceCallback *cb =
3386            new PersistenceCallback(qi, vb, *this, stats, 0);
3387        rwUnderlying->del(*qi, *cb);
3388        return cb;
3389    }
3390}
3391
3392void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
3393                                           StoredValue* v,
3394                                           LockHolder *plh,
3395                                           uint64_t *seqno,
3396                                           const GenerateBySeqno generateBySeqno,
3397                                           const GenerateCas generateCas) {
3398    if (vb) {
3399        queued_item qi(v->toItem(false, vb->getId()));
3400
3401        bool rv = vb->checkpointManager.queueDirty(*vb, qi,
3402                                                   generateBySeqno, generateCas);
3403        v->setBySeqno(qi->getBySeqno());
3404
3405        if (seqno) {
3406            *seqno = v->getBySeqno();
3407        }
3408
3409        if (GenerateCas::Yes == generateCas) {
3410            v->setCas(qi->getCas());
3411        }
3412
3413        if (plh) {
3414            plh->unlock();
3415        }
3416
3417        if (rv) {
3418            KVShard* shard = vbMap.getShardByVbId(vb->getId());
3419            shard->getFlusher()->notifyFlushEvent();
3420        }
3421
3422        // Now notify replication
3423        engine.getTapConnMap().notifyVBConnections(vb->getId());
3424        engine.getDcpConnMap().notifyVBConnections(vb->getId(),
3425                                                   qi->getBySeqno());
3426    }
3427}
3428
3429void EventuallyPersistentStore::tapQueueDirty(VBucket &vb,
3430                                              StoredValue* v,
3431                                              LockHolder& plh,
3432                                              uint64_t *seqno,
3433                                              const GenerateBySeqno generateBySeqno) {
3434    queued_item qi(v->toItem(false, vb.getId()));
3435
3436    bool queued = vb.queueBackfillItem(qi, generateBySeqno);
3437
3438    v->setBySeqno(qi->getBySeqno());
3439
3440    /* During backfill on a TAP receiver we need to update the snapshot
3441       range in the checkpoint. Has to be done here because in case of TAP
3442       backfill, above, we use vb.queueBackfillItem() instead of
3443       vb.checkpointManager.queueDirty() */
3444    if (GenerateBySeqno::Yes == generateBySeqno) {
3445        vb.checkpointManager.resetSnapshotRange();
3446    }
3447
3448    if (seqno) {
3449        *seqno = v->getBySeqno();
3450    }
3451
3452    plh.unlock();
3453
3454    if (queued) {
3455        KVShard* shard = vbMap.getShardByVbId(vb.getId());
3456        shard->getFlusher()->notifyFlushEvent();
3457    }
3458}
3459
3460std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
3461{
3462    return getOneROUnderlying()->listPersistedVbuckets();
3463}
3464
3465void EventuallyPersistentStore::warmupCompleted() {
3466    // Snapshot VBucket state after warmup to ensure Failover table is
3467    // persisted.
3468    scheduleVBStatePersist();
3469
3470    if (engine.getConfiguration().getAlogPath().length() > 0) {
3471
3472        if (engine.getConfiguration().isAccessScannerEnabled()) {
3473            LockHolder lh(accessScanner.mute