xref: /4.6.4/ep-engine/src/ep.cc (revision 73d84472)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string.h>
21#include <time.h>
22
23#include <fstream>
24#include <functional>
25#include <iostream>
26#include <map>
27#include <sstream>
28#include <string>
29#include <utility>
30#include <vector>
31
32#include "access_scanner.h"
33#include "bgfetcher.h"
34#include "checkpoint_remover.h"
35#include "conflict_resolution.h"
36#include "defragmenter.h"
37#include "ep.h"
38#include "ep_engine.h"
39#include "ext_meta_parser.h"
40#include "failover-table.h"
41#include "flusher.h"
42#include "htresizer.h"
43#include "kvshard.h"
44#include "kvstore.h"
45#include "locks.h"
46#include "mutation_log.h"
47#include "warmup.h"
48#include "connmap.h"
49#include "replicationthrottle.h"
50#include "tasks.h"
51
52class StatsValueChangeListener : public ValueChangedListener {
53public:
54    StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
55        : stats(st), store(str) {
56        // EMPTY
57    }
58
59    virtual void sizeValueChanged(const std::string &key, size_t value) {
60        if (key.compare("max_size") == 0) {
61            stats.setMaxDataSize(value);
62            store.getEPEngine().getDcpConnMap(). \
63                                     updateMaxActiveSnoozingBackfills(value);
64            size_t low_wat = static_cast<size_t>
65                    (static_cast<double>(value) * stats.mem_low_wat_percent);
66            size_t high_wat = static_cast<size_t>
67                    (static_cast<double>(value) * stats.mem_high_wat_percent);
68            stats.mem_low_wat.store(low_wat);
69            stats.mem_high_wat.store(high_wat);
70            store.setCursorDroppingLowerUpperThresholds(value);
71        } else if (key.compare("mem_low_wat") == 0) {
72            stats.mem_low_wat.store(value);
73            stats.mem_low_wat_percent.store(
74                                    (double)(value) / stats.getMaxDataSize());
75        } else if (key.compare("mem_high_wat") == 0) {
76            stats.mem_high_wat.store(value);
77            stats.mem_high_wat_percent.store(
78                                    (double)(value) / stats.getMaxDataSize());
79        } else if (key.compare("replication_throttle_threshold") == 0) {
80            stats.replicationThrottleThreshold.store(
81                                          static_cast<double>(value) / 100.0);
82        } else if (key.compare("warmup_min_memory_threshold") == 0) {
83            stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
84        } else if (key.compare("warmup_min_items_threshold") == 0) {
85            stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
86        } else {
87            LOG(EXTENSION_LOG_WARNING,
88                "Failed to change value for unknown variable, %s\n",
89                key.c_str());
90        }
91    }
92
93private:
94    EPStats &stats;
95    EventuallyPersistentStore &store;
96};
97
98/**
99 * A configuration value changed listener that responds to ep-engine
100 * parameter changes by invoking engine-specific methods on
101 * configuration change events.
102 */
103class EPStoreValueChangeListener : public ValueChangedListener {
104public:
105    EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
106    }
107
108    virtual void sizeValueChanged(const std::string &key, size_t value) {
109        if (key.compare("bg_fetch_delay") == 0) {
110            store.setBGFetchDelay(static_cast<uint32_t>(value));
111        } else if (key.compare("compaction_write_queue_cap") == 0) {
112            store.setCompactionWriteQueueCap(value);
113        } else if (key.compare("exp_pager_stime") == 0) {
114            store.setExpiryPagerSleeptime(value);
115        } else if (key.compare("exp_pager_initial_run_time") == 0) {
116            store.setExpiryPagerTasktime(value);
117        } else if (key.compare("alog_sleep_time") == 0) {
118            store.setAccessScannerSleeptime(value, false);
119        } else if (key.compare("alog_task_time") == 0) {
120            store.resetAccessScannerStartTime();
121        } else if (key.compare("mutation_mem_threshold") == 0) {
122            double mem_threshold = static_cast<double>(value) / 100;
123            StoredValue::setMutationMemoryThreshold(mem_threshold);
124        } else if (key.compare("backfill_mem_threshold") == 0) {
125            double backfill_threshold = static_cast<double>(value) / 100;
126            store.setBackfillMemoryThreshold(backfill_threshold);
127        } else if (key.compare("compaction_exp_mem_threshold") == 0) {
128            store.setCompactionExpMemThreshold(value);
129        } else if (key.compare("replication_throttle_queue_cap") == 0) {
130            store.getEPEngine().getReplicationThrottle().setQueueCap(value);
131        } else if (key.compare("replication_throttle_cap_pcnt") == 0) {
132            store.getEPEngine().getReplicationThrottle().setCapPercent(value);
133        } else {
134            LOG(EXTENSION_LOG_WARNING,
135                "Failed to change value for unknown variable, %s\n",
136                key.c_str());
137        }
138    }
139
140    virtual void booleanValueChanged(const std::string &key, bool value) {
141        if (key.compare("access_scanner_enabled") == 0) {
142            if (value) {
143                store.enableAccessScannerTask();
144            } else {
145                store.disableAccessScannerTask();
146            }
147        } else if (key.compare("bfilter_enabled") == 0) {
148            store.setAllBloomFilters(value);
149        } else if (key.compare("exp_pager_enabled") == 0) {
150            if (value) {
151                store.enableExpiryPager();
152            } else {
153                store.disableExpiryPager();
154            }
155        }
156    }
157
158    virtual void floatValueChanged(const std::string &key, float value) {
159        if (key.compare("bfilter_residency_threshold") == 0) {
160            store.setBfiltersResidencyThreshold(value);
161        } else if (key.compare("dcp_min_compression_ratio") == 0) {
162            store.getEPEngine().updateDcpMinCompressionRatio(value);
163        }
164    }
165
166private:
167    EventuallyPersistentStore &store;
168};
169
170/**
171 * Callback class used by EpStore, for adding relevent keys
172 * to bloomfilter during compaction.
173 */
174class BloomFilterCallback : public Callback<std::string&, bool&> {
175public:
176    BloomFilterCallback(EventuallyPersistentStore& eps, uint16_t vbid,
177                        bool residentRatioAlert)
178        : store(eps), vbucketId(vbid),
179          residentRatioLessThanThreshold(residentRatioAlert) {
180    }
181
182    void callback(std::string& key, bool& isDeleted) {
183        RCPtr<VBucket> vb = store.getVBucket(vbucketId);
184        if (vb) {
185            if (vb->isTempFilterAvailable()) {
186                if (store.getItemEvictionPolicy() == VALUE_ONLY) {
187                    /**
188                     * VALUE-ONLY EVICTION POLICY
189                     * Consider deleted items only.
190                     */
191                    if (isDeleted) {
192                        vb->addToTempFilter(key);
193                    }
194                } else {
195                    /**
196                     * FULL EVICTION POLICY
197                     * If vbucket's resident ratio is found to be less than
198                     * the residency threshold, consider all items, otherwise
199                     * consider deleted and non-resident items only.
200                     */
201                    if (residentRatioLessThanThreshold) {
202                        vb->addToTempFilter(key);
203                    } else {
204                        if (isDeleted || !store.isMetaDataResident(vb, key)) {
205                            vb->addToTempFilter(key);
206                        }
207                    }
208                }
209            }
210        }
211    }
212
213private:
214    EventuallyPersistentStore& store;
215    uint16_t vbucketId;
216    bool residentRatioLessThanThreshold;
217};
218
219class PendingOpsNotification : public GlobalTask {
220public:
221    PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
222        GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
223        engine(e), vbucket(vb) { }
224
225    std::string getDescription() {
226        std::stringstream ss;
227        ss << "Notify pending operations for vbucket " << vbucket->getId();
228        return ss.str();
229    }
230
231    bool run(void) {
232        vbucket->fireAllOps(engine);
233        return false;
234    }
235
236private:
237    EventuallyPersistentEngine &engine;
238    RCPtr<VBucket> vbucket;
239};
240
241EventuallyPersistentStore::EventuallyPersistentStore(
242    EventuallyPersistentEngine &theEngine) :
243    engine(theEngine), stats(engine.getEpStats()),
244    vbMap(theEngine.getConfiguration(), *this),
245    defragmenterTask(NULL),
246    bgFetchQueue(0),
247    diskFlushAll(false), bgFetchDelay(0),
248    backfillMemoryThreshold(0.95),
249    statsSnapshotTaskId(0), lastTransTimePerItem(0)
250{
251    cachedResidentRatio.activeRatio.store(0);
252    cachedResidentRatio.replicaRatio.store(0);
253
254    Configuration &config = engine.getConfiguration();
255    MutationLog *shardlog;
256    for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
257        std::stringstream s;
258        s << i;
259        shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
260                                 "." + s.str(),
261                                 engine.getConfiguration().getAlogBlockSize());
262        accessLog.push_back(shardlog);
263    }
264
265    storageProperties = new StorageProperties(true, true, true, true);
266
267    const auto size = GlobalTask::allTaskIds.size();
268    stats.schedulingHisto = new Histogram<ProcessClock::duration::rep>[size];
269    stats.taskRuntimeHisto = new Histogram<ProcessClock::duration::rep>[size];
270
271    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
272        stats.schedulingHisto[i].reset();
273        stats.taskRuntimeHisto[i].reset();
274    }
275
276    ExecutorPool::get()->registerTaskable(ObjectRegistry::getCurrentEngine()->getTaskable());
277
278    size_t num_vbs = config.getMaxVbuckets();
279    vb_mutexes = new Mutex[num_vbs];
280
281    *stats.memOverhead = sizeof(EventuallyPersistentStore);
282
283    if (config.getConflictResolutionType().compare("lww") == 0) {
284        conflictResolver.reset(new LastWriteWinsResolution());
285    } else {
286        conflictResolver.reset(new RevisionSeqnoResolution());
287    }
288
289    stats.setMaxDataSize(config.getMaxSize());
290    config.addValueChangedListener("max_size",
291                                   new StatsValueChangeListener(stats, *this));
292    getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(
293                                                        config.getMaxSize());
294
295    stats.mem_low_wat.store(config.getMemLowWat());
296    config.addValueChangedListener("mem_low_wat",
297                                   new StatsValueChangeListener(stats, *this));
298    stats.mem_low_wat_percent.store(
299                (double)(stats.mem_low_wat.load()) / stats.getMaxDataSize());
300
301    stats.mem_high_wat.store(config.getMemHighWat());
302    config.addValueChangedListener("mem_high_wat",
303                                   new StatsValueChangeListener(stats, *this));
304    stats.mem_high_wat_percent.store(
305                (double)(stats.mem_high_wat.load()) / stats.getMaxDataSize());
306
307    setCursorDroppingLowerUpperThresholds(config.getMaxSize());
308
309    stats.replicationThrottleThreshold.store(static_cast<double>
310                                    (config.getReplicationThrottleThreshold())
311                                     / 100.0);
312    config.addValueChangedListener("replication_throttle_threshold",
313                                   new StatsValueChangeListener(stats, *this));
314
315    stats.replicationThrottleWriteQueueCap.store(
316                                    config.getReplicationThrottleQueueCap());
317    config.addValueChangedListener("replication_throttle_queue_cap",
318                                   new EPStoreValueChangeListener(*this));
319    config.addValueChangedListener("replication_throttle_cap_pcnt",
320                                   new EPStoreValueChangeListener(*this));
321
322    setBGFetchDelay(config.getBgFetchDelay());
323    config.addValueChangedListener("bg_fetch_delay",
324                                   new EPStoreValueChangeListener(*this));
325
326    stats.warmupMemUsedCap.store(static_cast<double>
327                               (config.getWarmupMinMemoryThreshold()) / 100.0);
328    config.addValueChangedListener("warmup_min_memory_threshold",
329                                   new StatsValueChangeListener(stats, *this));
330    stats.warmupNumReadCap.store(static_cast<double>
331                                (config.getWarmupMinItemsThreshold()) / 100.0);
332    config.addValueChangedListener("warmup_min_items_threshold",
333                                   new StatsValueChangeListener(stats, *this));
334
335    double mem_threshold = static_cast<double>
336                                      (config.getMutationMemThreshold()) / 100;
337    StoredValue::setMutationMemoryThreshold(mem_threshold);
338    config.addValueChangedListener("mutation_mem_threshold",
339                                   new EPStoreValueChangeListener(*this));
340
341    double backfill_threshold = static_cast<double>
342                                      (config.getBackfillMemThreshold()) / 100;
343    setBackfillMemoryThreshold(backfill_threshold);
344    config.addValueChangedListener("backfill_mem_threshold",
345                                   new EPStoreValueChangeListener(*this));
346
347    config.addValueChangedListener("bfilter_enabled",
348                                   new EPStoreValueChangeListener(*this));
349
350    bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
351    config.addValueChangedListener("bfilter_residency_threshold",
352                                   new EPStoreValueChangeListener(*this));
353
354    compactionExpMemThreshold = config.getCompactionExpMemThreshold();
355    config.addValueChangedListener("compaction_exp_mem_threshold",
356                                   new EPStoreValueChangeListener(*this));
357
358    compactionWriteQueueCap = config.getCompactionWriteQueueCap();
359    config.addValueChangedListener("compaction_write_queue_cap",
360                                   new EPStoreValueChangeListener(*this));
361
362    config.addValueChangedListener("dcp_min_compression_ratio",
363                                   new EPStoreValueChangeListener(*this));
364
365    const std::string &policy = config.getItemEvictionPolicy();
366    if (policy.compare("value_only") == 0) {
367        eviction_policy = VALUE_ONLY;
368    } else {
369        eviction_policy = FULL_EVICTION;
370    }
371
372    warmupTask = new Warmup(*this, config);
373}
374
375bool EventuallyPersistentStore::initialize() {
376    // We should nuke everything unless we want warmup
377    Configuration &config = engine.getConfiguration();
378    if (!config.isWarmup()) {
379        reset();
380    }
381
382    if (!startFlusher()) {
383        LOG(EXTENSION_LOG_WARNING,
384            "FATAL: Failed to create and start flushers");
385        return false;
386    }
387    if (!startBgFetcher()) {
388        LOG(EXTENSION_LOG_WARNING,
389           "FATAL: Failed to create and start bgfetchers");
390        return false;
391    }
392
393    warmupTask->start();
394
395    itmpTask = new ItemPager(&engine, stats);
396    ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
397
398    LockHolder elh(expiryPager.mutex);
399    expiryPager.enabled = config.isExpPagerEnabled();
400    elh.unlock();
401
402    size_t expiryPagerSleeptime = config.getExpPagerStime();
403    setExpiryPagerSleeptime(expiryPagerSleeptime);
404    config.addValueChangedListener("exp_pager_stime",
405                                   new EPStoreValueChangeListener(*this));
406    config.addValueChangedListener("exp_pager_enabled",
407                                   new EPStoreValueChangeListener(*this));
408    config.addValueChangedListener("exp_pager_initial_run_time",
409                                   new EPStoreValueChangeListener(*this));
410
411    ExTask htrTask = new HashtableResizerTask(this, 10);
412    ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
413
414    size_t checkpointRemoverInterval = config.getChkRemoverStime();
415    chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
416                                                   checkpointRemoverInterval);
417    ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
418
419    ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
420    ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
421
422#if HAVE_JEMALLOC
423    /* Only create the defragmenter task if we have an underlying memory
424     * allocator which can facilitate defragmenting memory.
425     */
426    defragmenterTask = new DefragmenterTask(&engine, stats);
427    ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
428#endif
429
430    return true;
431}
432
433EventuallyPersistentStore::~EventuallyPersistentStore() {
434    stopWarmup();
435    stopBgFetcher();
436    ExecutorPool::get()->stopTaskGroup(engine.getTaskable().getGID(), NONIO_TASK_IDX,
437                                       stats.forceShutdown);
438
439    ExecutorPool::get()->cancel(statsSnapshotTaskId);
440
441    LockHolder lh(accessScanner.mutex);
442    ExecutorPool::get()->cancel(accessScanner.task);
443    lh.unlock();
444
445    stopFlusher();
446
447    ExecutorPool::get()->unregisterTaskable(engine.getTaskable(),
448                                            stats.forceShutdown);
449
450    delete [] vb_mutexes;
451    delete [] stats.schedulingHisto;
452    delete [] stats.taskRuntimeHisto;
453    delete warmupTask;
454    delete storageProperties;
455    defragmenterTask.reset();
456
457    std::vector<MutationLog*>::iterator it;
458    for (it = accessLog.begin(); it != accessLog.end(); it++) {
459        delete *it;
460    }
461}
462
463const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
464    return vbMap.shards[shardId]->getFlusher();
465}
466
467uint16_t EventuallyPersistentStore::getCommitInterval(uint16_t shardId) {
468    Flusher *flusher = vbMap.shards[shardId]->getFlusher();
469    return flusher->getCommitInterval();
470}
471
472uint16_t EventuallyPersistentStore::decrCommitInterval(uint16_t shardId) {
473    Flusher *flusher = vbMap.shards[shardId]->getFlusher();
474    return flusher->decrCommitInterval();
475}
476
477Warmup* EventuallyPersistentStore::getWarmup(void) const {
478    return warmupTask;
479}
480
481bool EventuallyPersistentStore::startFlusher() {
482    for (auto* shard : vbMap.shards) {
483        shard->getFlusher()->start();
484    }
485    return true;
486}
487
488void EventuallyPersistentStore::stopFlusher() {
489    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
490        Flusher *flusher = vbMap.shards[i]->getFlusher();
491        LOG(EXTENSION_LOG_NOTICE, "Attempting to stop the flusher for "
492            "shard:%" PRIu16, i);
493        bool rv = flusher->stop(stats.forceShutdown);
494        if (rv && !stats.forceShutdown) {
495            flusher->wait();
496        }
497    }
498}
499
500bool EventuallyPersistentStore::pauseFlusher() {
501    bool rv = true;
502    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
503        Flusher *flusher = vbMap.shards[i]->getFlusher();
504        if (!flusher->pause()) {
505            LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
506                "[%s], shard = %d", flusher->stateName(), i);
507            rv = false;
508        }
509    }
510    return rv;
511}
512
513bool EventuallyPersistentStore::resumeFlusher() {
514    bool rv = true;
515    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
516        Flusher *flusher = vbMap.shards[i]->getFlusher();
517        if (!flusher->resume()) {
518            LOG(EXTENSION_LOG_WARNING,
519                    "Attempted to resume flusher in state [%s], "
520                    "shard = %d", flusher->stateName(), i);
521            rv = false;
522        }
523    }
524    return rv;
525}
526
527void EventuallyPersistentStore::wakeUpFlusher() {
528    if (stats.diskQueueSize.load() == 0) {
529        for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
530            Flusher *flusher = vbMap.shards[i]->getFlusher();
531            flusher->wake();
532        }
533    }
534}
535
536bool EventuallyPersistentStore::startBgFetcher() {
537    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
538        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
539        if (bgfetcher == NULL) {
540            LOG(EXTENSION_LOG_WARNING,
541                "Failed to start bg fetcher for shard %d", i);
542            return false;
543        }
544        bgfetcher->start();
545    }
546    return true;
547}
548
549void EventuallyPersistentStore::stopBgFetcher() {
550    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
551        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
552        if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
553            LOG(EXTENSION_LOG_WARNING,
554                "Shutting down engine while there are still pending data "
555                "read for shard %d from database storage", i);
556        }
557        LOG(EXTENSION_LOG_NOTICE, "Stopping bg fetcher for shard:%" PRIu16, i);
558        bgfetcher->stop();
559    }
560}
561
562void
563EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
564                                             time_t startTime,
565                                             uint64_t revSeqno,
566                                             exp_type_t source) {
567    RCPtr<VBucket> vb = getVBucket(vbid);
568    if (vb) {
569        // Obtain reader access to the VB state change lock so that
570        // the VB can't switch state whilst we're processing
571        ReaderLockHolder rlh(vb->getStateLock());
572        if (vb->getState() == vbucket_state_active) {
573            int bucket_num(0);
574            LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
575            StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
576            if (v) {
577                if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
578                    // This is a temporary item whose background fetch for metadata
579                    // has completed.
580                    bool deleted = vb->ht.unlocked_del(key, bucket_num);
581                    if (!deleted) {
582                        throw std::logic_error("EPStore::deleteExpiredItem: "
583                                "Failed to delete key '" + key + "' from bucket "
584                                + std::to_string(bucket_num));
585                    }
586                } else if (v->isExpired(startTime) && !v->isDeleted()) {
587                    vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
588                    queueDirty(vb, v, &lh, NULL);
589                }
590            } else {
591                if (eviction_policy == FULL_EVICTION) {
592                    // Create a temp item and delete and push it
593                    // into the checkpoint queue, only if the bloomfilter
594                    // predicts that the item may exist on disk.
595                    if (vb->maybeKeyExistsInFilter(key)) {
596                        add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
597                                                                    eviction_policy);
598                        if (rv == ADD_NOMEM) {
599                            return;
600                        }
601                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
602                        v->setDeleted();
603                        v->setRevSeqno(revSeqno);
604                        vb->ht.unlocked_softDelete(v, 0, eviction_policy);
605                        queueDirty(vb, v, &lh, NULL);
606                    }
607                }
608            }
609            incExpirationStat(vb, source);
610        }
611    }
612}
613
614void
615EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
616                                                        std::string> > &keys,
617                                              exp_type_t source) {
618    std::list<std::pair<uint16_t, std::string> >::iterator it;
619    time_t startTime = ep_real_time();
620    for (it = keys.begin(); it != keys.end(); it++) {
621        deleteExpiredItem(it->first, it->second, startTime, 0, source);
622    }
623}
624
625StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
626                                                        const const_sized_buffer key,
627                                                        int bucket_num,
628                                                        bool wantDeleted,
629                                                        bool trackReference,
630                                                        bool queueExpired) {
631    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
632                                          trackReference);
633    if (v && !v->isDeleted() && !v->isTempItem()) {
634        // In the deleted case, we ignore expiration time.
635        if (v->isExpired(ep_real_time())) {
636            if (vb->getState() != vbucket_state_active) {
637                return wantDeleted ? v : NULL;
638            }
639
640            // queueDirty only allowed on active VB
641            if (queueExpired && vb->getState() == vbucket_state_active) {
642                incExpirationStat(vb, EXP_BY_ACCESS);
643                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
644                queueDirty(vb, v, NULL, NULL);
645            }
646            return wantDeleted ? v : NULL;
647        }
648    }
649    return v;
650}
651
652bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
653                                                   const std::string &key) {
654
655    if (!vb) {
656        throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL");
657    }
658
659    int bucket_num(0);
660    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
661    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
662
663    if (v && !v->isTempItem()) {
664        return true;
665    } else {
666        return false;
667    }
668}
669
670protocol_binary_response_status EventuallyPersistentStore::evictKey(
671                                                        const std::string &key,
672                                                        uint16_t vbucket,
673                                                        const char **msg,
674                                                        size_t *msg_size,
675                                                        bool force) {
676    RCPtr<VBucket> vb = getVBucket(vbucket);
677    if (!vb || (vb->getState() != vbucket_state_active && !force)) {
678        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
679    }
680
681    int bucket_num(0);
682    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
683    StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
684
685    protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
686
687    *msg_size = 0;
688    if (v) {
689        if (force)  {
690            v->markClean();
691        }
692        if (v->isResident()) {
693            if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
694                *msg = "Ejected.";
695
696                // Add key to bloom filter incase of full eviction mode
697                if (getItemEvictionPolicy() == FULL_EVICTION) {
698                    vb->addToFilter(key);
699                }
700            } else {
701                *msg = "Can't eject: Dirty object.";
702                rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
703            }
704        } else {
705            *msg = "Already ejected.";
706        }
707    } else {
708        if (eviction_policy == VALUE_ONLY) {
709            *msg = "Not found.";
710            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
711        } else {
712            *msg = "Already ejected.";
713        }
714    }
715
716    return rv;
717}
718
719ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
720                                                        LockHolder &lock,
721                                                        int bucket_num,
722                                                        const const_sized_buffer key,
723                                                        RCPtr<VBucket> &vb,
724                                                        const void *cookie,
725                                                        bool metadataOnly,
726                                                        bool isReplication) {
727
728    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
729                                                eviction_policy,
730                                                isReplication);
731    switch(rv) {
732        case ADD_NOMEM:
733            return ENGINE_ENOMEM;
734
735        case ADD_EXISTS:
736        case ADD_UNDEL:
737        case ADD_SUCCESS:
738        case ADD_TMP_AND_BG_FETCH:
739            // Since the hashtable bucket is locked, we shouldn't get here
740            throw std::logic_error("EventuallyPersistentStore::addTempItemForBgFetch: "
741                    "Invalid result from addTempItem: " + std::to_string(rv));
742
743        case ADD_BG_FETCH:
744            lock.unlock();
745            bgFetch(key, vb->getId(), cookie, metadataOnly);
746    }
747    return ENGINE_EWOULDBLOCK;
748}
749
750ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
751                                                 const void *cookie,
752                                                 bool force,
753                                                 uint8_t nru) {
754
755    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
756    if (!vb) {
757        ++stats.numNotMyVBuckets;
758        return ENGINE_NOT_MY_VBUCKET;
759    }
760
761    // Obtain read-lock on VB state to ensure VB state changes are interlocked
762    // with this set
763    ReaderLockHolder rlh(vb->getStateLock());
764    if (vb->getState() == vbucket_state_dead) {
765        ++stats.numNotMyVBuckets;
766        return ENGINE_NOT_MY_VBUCKET;
767    } else if (vb->getState() == vbucket_state_replica && !force) {
768        ++stats.numNotMyVBuckets;
769        return ENGINE_NOT_MY_VBUCKET;
770    } else if (vb->getState() == vbucket_state_pending && !force) {
771        if (vb->addPendingOp(cookie)) {
772            return ENGINE_EWOULDBLOCK;
773        }
774    } else if (vb->isTakeoverBackedUp()) {
775        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a set op"
776                ", becuase takeover is lagging", vb->getId());
777        return ENGINE_TMPFAIL;
778    }
779
780    bool cas_op = (itm.getCas() != 0);
781    int bucket_num(0);
782    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
783    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num,
784                                          /*wantsDeleted*/true,
785                                          /*trackReference*/false);
786    if (v && v->isLocked(ep_current_time()) &&
787        (vb->getState() == vbucket_state_replica ||
788         vb->getState() == vbucket_state_pending)) {
789        v->unlock();
790    }
791
792    bool maybeKeyExists = true;
793    // If we didn't find a valid item, check Bloomfilter's prediction if in
794    // full eviction policy and for a CAS operation.
795    if ((v == nullptr || v->isTempInitialItem()) &&
796        (eviction_policy == FULL_EVICTION) &&
797        (itm.getCas() != 0)) {
798        // Check Bloomfilter's prediction
799        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
800            maybeKeyExists = false;
801        }
802    }
803
804    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
805                                                eviction_policy, nru,
806                                                maybeKeyExists);
807
808    Item& it = const_cast<Item&>(itm);
809    uint64_t seqno = 0;
810    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
811    switch (mtype) {
812    case NOMEM:
813        ret = ENGINE_ENOMEM;
814        break;
815    case INVALID_CAS:
816    case IS_LOCKED:
817        ret = ENGINE_KEY_EEXISTS;
818        break;
819    case NOT_FOUND:
820        if (cas_op) {
821            ret = ENGINE_KEY_ENOENT;
822            break;
823        }
824        // FALLTHROUGH
825    case WAS_DIRTY:
826        // Even if the item was dirty, push it into the vbucket's open
827        // checkpoint.
828    case WAS_CLEAN:
829        // We keep lh held as we need to do v->getCas()
830        queueDirty(vb, v, nullptr, &seqno);
831        it.setBySeqno(seqno);
832        it.setCas(v->getCas());
833        break;
834    case NEED_BG_FETCH:
835    {   // CAS operation with non-resident item + full eviction.
836        if (v) {
837            // temp item is already created. Simply schedule a bg fetch job
838            lh.unlock();
839            bgFetch(itm.getKey(), vb->getId(), cookie, true);
840            return ENGINE_EWOULDBLOCK;
841        }
842        ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
843                                    cookie, true);
844        break;
845    }
846    case INVALID_VBUCKET:
847        ret = ENGINE_NOT_MY_VBUCKET;
848        break;
849    }
850
851    return ret;
852}
853
854ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
855                                                 const void *cookie)
856{
857    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
858    if (!vb) {
859        ++stats.numNotMyVBuckets;
860        return ENGINE_NOT_MY_VBUCKET;
861    }
862
863    // Obtain read-lock on VB state to ensure VB state changes are interlocked
864    // with this add
865    ReaderLockHolder rlh(vb->getStateLock());
866    if (vb->getState() == vbucket_state_dead ||
867        vb->getState() == vbucket_state_replica) {
868        ++stats.numNotMyVBuckets;
869        return ENGINE_NOT_MY_VBUCKET;
870    } else if (vb->getState() == vbucket_state_pending) {
871        if (vb->addPendingOp(cookie)) {
872            return ENGINE_EWOULDBLOCK;
873        }
874    } else if (vb->isTakeoverBackedUp()) {
875        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a add op"
876                ", becuase takeover is lagging", vb->getId());
877        return ENGINE_TMPFAIL;
878    }
879
880    if (itm.getCas() != 0) {
881        // Adding with a cas value doesn't make sense..
882        return ENGINE_NOT_STORED;
883    }
884
885    int bucket_num(0);
886    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
887    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
888                                          false);
889
890    bool maybeKeyExists = true;
891    if ((v == nullptr || v->isTempInitialItem()) &&
892        (eviction_policy == FULL_EVICTION)) {
893        // Check bloomfilter's prediction
894        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
895            maybeKeyExists = false;
896        }
897    }
898
899    add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
900                                           eviction_policy,
901                                           true, true,
902                                           maybeKeyExists);
903
904    Item& it = const_cast<Item&>(itm);
905    uint64_t seqno = 0;
906    switch (atype) {
907    case ADD_NOMEM:
908        return ENGINE_ENOMEM;
909    case ADD_EXISTS:
910        return ENGINE_NOT_STORED;
911    case ADD_TMP_AND_BG_FETCH:
912        return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
913                                     cookie, true);
914    case ADD_BG_FETCH:
915        lh.unlock();
916        bgFetch(it.getKey(), vb->getId(), cookie, true);
917        return ENGINE_EWOULDBLOCK;
918    case ADD_SUCCESS:
919    case ADD_UNDEL:
920        // We need to keep lh as we will do v->getCas()
921        queueDirty(vb, v, nullptr, &seqno);
922        it.setBySeqno(seqno);
923        it.setCas(v->getCas());
924        break;
925    }
926
927    return ENGINE_SUCCESS;
928}
929
930ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
931                                                     const void *cookie) {
932    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
933    if (!vb) {
934        ++stats.numNotMyVBuckets;
935        return ENGINE_NOT_MY_VBUCKET;
936    }
937
938    // Obtain read-lock on VB state to ensure VB state changes are interlocked
939    // with this replace
940    ReaderLockHolder rlh(vb->getStateLock());
941    if (vb->getState() == vbucket_state_dead ||
942        vb->getState() == vbucket_state_replica) {
943        ++stats.numNotMyVBuckets;
944        return ENGINE_NOT_MY_VBUCKET;
945    } else if (vb->getState() == vbucket_state_pending) {
946        if (vb->addPendingOp(cookie)) {
947            return ENGINE_EWOULDBLOCK;
948        }
949    }
950
951    int bucket_num(0);
952    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
953    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
954                                          false);
955    if (v) {
956        if (v->isDeleted() || v->isTempDeletedItem() ||
957            v->isTempNonExistentItem()) {
958            return ENGINE_KEY_ENOENT;
959        }
960
961        mutation_type_t mtype;
962        if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
963            mtype = NEED_BG_FETCH;
964        } else {
965            mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
966                                        0xff);
967        }
968
969        Item& it = const_cast<Item&>(itm);
970        uint64_t seqno = 0;
971        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
972        switch (mtype) {
973            case NOMEM:
974                ret = ENGINE_ENOMEM;
975                break;
976            case IS_LOCKED:
977                ret = ENGINE_KEY_EEXISTS;
978                break;
979            case INVALID_CAS:
980            case NOT_FOUND:
981                ret = ENGINE_NOT_STORED;
982                break;
983                // FALLTHROUGH
984            case WAS_DIRTY:
985                // Even if the item was dirty, push it into the vbucket's open
986                // checkpoint.
987            case WAS_CLEAN:
988                // Keep lh as we need to do v->getCas()
989                queueDirty(vb, v, nullptr, &seqno);
990                it.setBySeqno(seqno);
991                it.setCas(v->getCas());
992                break;
993            case NEED_BG_FETCH:
994            {
995                // temp item is already created. Simply schedule a bg fetch job
996                lh.unlock();
997                bgFetch(it.getKey(), vb->getId(), cookie, true);
998                ret = ENGINE_EWOULDBLOCK;
999                break;
1000            }
1001            case INVALID_VBUCKET:
1002                ret = ENGINE_NOT_MY_VBUCKET;
1003                break;
1004        }
1005
1006        return ret;
1007    } else {
1008        if (eviction_policy == VALUE_ONLY) {
1009            return ENGINE_KEY_ENOENT;
1010        }
1011
1012        if (vb->maybeKeyExistsInFilter(itm.getKey())) {
1013            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1014                                         cookie, false);
1015        } else {
1016            // As bloomfilter predicted that item surely doesn't exist
1017            // on disk, return ENOENT for replace().
1018            return ENGINE_KEY_ENOENT;
1019        }
1020    }
1021}
1022
1023ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
1024                                                        const Item &itm,
1025                                                        uint8_t nru,
1026                                                        bool genBySeqno,
1027                                                        ExtendedMetaData *emd) {
1028
1029    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1030    if (!vb) {
1031        ++stats.numNotMyVBuckets;
1032        return ENGINE_NOT_MY_VBUCKET;
1033    }
1034
1035    // Obtain read-lock on VB state to ensure VB state changes are interlocked
1036    // with this add-tapbackfill
1037    ReaderLockHolder rlh(vb->getStateLock());
1038    if (vb->getState() == vbucket_state_dead ||
1039        vb->getState() == vbucket_state_active) {
1040        ++stats.numNotMyVBuckets;
1041        return ENGINE_NOT_MY_VBUCKET;
1042    }
1043
1044    //check for the incoming item's CAS validity
1045    if (!Item::isValidCas(itm.getCas())) {
1046        return ENGINE_KEY_EEXISTS;
1047    }
1048
1049    int bucket_num(0);
1050    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1051    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1052                                          false);
1053
1054    // Note that this function is only called on replica or pending vbuckets.
1055    if (v && v->isLocked(ep_current_time())) {
1056        v->unlock();
1057    }
1058    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
1059                                                eviction_policy, nru);
1060
1061    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1062    switch (mtype) {
1063    case NOMEM:
1064        ret = ENGINE_ENOMEM;
1065        break;
1066    case INVALID_CAS:
1067    case IS_LOCKED:
1068        ret = ENGINE_KEY_EEXISTS;
1069        break;
1070    case WAS_DIRTY:
1071        // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
1072        // set correctly, and also the sequence numbers are ordered correctly.
1073        // (MB-14003)
1074    case NOT_FOUND:
1075        // FALLTHROUGH
1076    case WAS_CLEAN:
1077        vb->setMaxCas(v->getCas());
1078        tapQueueDirty(*vb, v, lh, NULL,
1079                      genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No);
1080        break;
1081    case INVALID_VBUCKET:
1082        ret = ENGINE_NOT_MY_VBUCKET;
1083        break;
1084    case NEED_BG_FETCH:
1085        throw std::logic_error("EventuallyPersistentStore::addTAPBackfillItem: "
1086                "SET on a non-active vbucket should not require a "
1087                "bg_metadata_fetch.");
1088    }
1089
1090    return ret;
1091}
1092
1093class KVStatsCallback : public Callback<kvstats_ctx> {
1094    public:
1095        KVStatsCallback(EventuallyPersistentStore *store)
1096            : epstore(store) { }
1097
1098       void callback(kvstats_ctx &ctx) {
1099            RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
1100            if (vb) {
1101                vb->fileSpaceUsed = ctx.fileSpaceUsed;
1102                vb->fileSize = ctx.fileSize;
1103            }
1104        }
1105
1106    private:
1107        EventuallyPersistentStore *epstore;
1108};
1109
1110ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1111                                                           vbucket_state_t to,
1112                                                           bool transfer,
1113                                                           bool notify_dcp) {
1114    LockHolder lh(vbsetMutex);
1115    return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
1116}
1117
1118ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState_UNLOCKED(
1119                                            uint16_t vbid,
1120                                            vbucket_state_t to,
1121                                            bool transfer,
1122                                            bool notify_dcp,
1123                                            LockHolder& vbset,
1124                                            WriterLockHolder* vbStateLock) {
1125    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1126    if (vb && to == vb->getState()) {
1127        return ENGINE_SUCCESS;
1128    }
1129
1130    if (vb) {
1131        vbucket_state_t oldstate = vb->getState();
1132
1133        if (vbStateLock) {
1134            vb->setState_UNLOCKED(to, *vbStateLock);
1135        } else {
1136            vb->setState(to);
1137        }
1138
1139        if (oldstate != to && notify_dcp) {
1140            bool closeInboundStreams = false;
1141            if (to == vbucket_state_active && !transfer) {
1142                /**
1143                 * Close inbound (passive) streams into the vbucket
1144                 * only in case of a failover.
1145                 */
1146                closeInboundStreams = true;
1147            }
1148            engine.getDcpConnMap().vbucketStateChanged(vbid, to,
1149                                                       closeInboundStreams);
1150        }
1151
1152        if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
1153            /**
1154             * Update snapshot range when vbucket goes from being a replica
1155             * to active, to maintain the correct snapshot sequence numbers
1156             * even in a failover scenario.
1157             */
1158            vb->checkpointManager.resetSnapshotRange();
1159        }
1160
1161        if (to == vbucket_state_active && !transfer) {
1162            const snapshot_range_t range = vb->getPersistedSnapshot();
1163            if (range.end == vbMap.getPersistenceSeqno(vbid)) {
1164                vb->failovers->createEntry(range.end);
1165            } else {
1166                vb->failovers->createEntry(range.start);
1167            }
1168        }
1169
1170        if (oldstate == vbucket_state_pending &&
1171            to == vbucket_state_active) {
1172            ExTask notifyTask = new PendingOpsNotification(engine, vb);
1173            ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1174        }
1175        scheduleVBStatePersist(vbid);
1176    } else if (vbid < vbMap.getSize()) {
1177        FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1178        KVShard* shard = vbMap.getShardByVbId(vbid);
1179        std::shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
1180        Configuration& config = engine.getConfiguration();
1181        RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1182                                         engine.getCheckpointConfig(),
1183                                         shard, 0, 0, 0, ft, cb,
1184                                         config));
1185
1186        if (config.isBfilterEnabled()) {
1187            // Initialize bloom filters upon vbucket creation during
1188            // bucket creation and rebalance
1189            newvb->createFilter(config.getBfilterKeyCount(),
1190                                config.getBfilterFpProb());
1191        }
1192
1193        // The first checkpoint for active vbucket should start with id 2.
1194        uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1195        newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1196        if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1197            return ENGINE_ERANGE;
1198        }
1199        vbMap.setPersistenceCheckpointId(vbid, 0);
1200        vbMap.setPersistenceSeqno(vbid, 0);
1201        vbMap.setBucketCreation(vbid, true);
1202        scheduleVBStatePersist(vbid);
1203    } else {
1204        return ENGINE_ERANGE;
1205    }
1206    return ENGINE_SUCCESS;
1207}
1208
1209void EventuallyPersistentStore::scheduleVBStatePersist() {
1210    for (auto vbid : vbMap.getBuckets()) {
1211        scheduleVBStatePersist(vbid);
1212    }
1213}
1214
1215void EventuallyPersistentStore::scheduleVBStatePersist(VBucket::id_type vbid) {
1216    RCPtr<VBucket> vb = getVBucket(vbid);
1217
1218    if (!vb) {
1219        LOG(EXTENSION_LOG_WARNING,
1220            "EPStore::scheduleVBStatePersist: vb:%" PRIu16
1221            " does not not exist. Unable to schedule persistence.", vbid);
1222        return;
1223    }
1224
1225    vb->checkpointManager.queueSetVBState(*vb);
1226}
1227
1228bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1229                                                        const void* cookie) {
1230    LockHolder lh(vbsetMutex);
1231
1232    hrtime_t start_time(gethrtime());
1233    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1234    if (!vb || vb->getState() == vbucket_state_dead ||
1235         vbMap.isBucketDeletion(vbid)) {
1236        lh.unlock();
1237        LockHolder vlh(vb_mutexes[vbid]);
1238        getRWUnderlying(vbid)->delVBucket(vbid);
1239        vbMap.setBucketDeletion(vbid, false);
1240        vbMap.setBucketCreation(vbid, false);
1241        vbMap.setPersistenceSeqno(vbid, 0);
1242        ++stats.vbucketDeletions;
1243    }
1244
1245    hrtime_t spent(gethrtime() - start_time);
1246    hrtime_t wall_time = spent / 1000;
1247    BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1248    stats.diskVBDelHisto.add(wall_time);
1249    atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1250    stats.vbucketDelTotWalltime.fetch_add(wall_time);
1251    if (cookie) {
1252        engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1253    }
1254
1255    return true;
1256}
1257
1258void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1259                                                   const void* cookie,
1260                                                   double delay) {
1261    ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1262    ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1263
1264    if (vbMap.setBucketDeletion(vb->getId(), true)) {
1265        ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
1266        ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1267    }
1268}
1269
1270ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1271                                                           const void* c) {
1272    // Lock to prevent a race condition between a failed update and add
1273    // (and delete).
1274    LockHolder lh(vbsetMutex);
1275
1276    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1277    if (!vb) {
1278        return ENGINE_NOT_MY_VBUCKET;
1279    }
1280
1281    vb->setState(vbucket_state_dead);
1282    engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1283    vbMap.removeBucket(vbid);
1284    lh.unlock();
1285    scheduleVBDeletion(vb, c);
1286    if (c) {
1287        return ENGINE_EWOULDBLOCK;
1288    }
1289    return ENGINE_SUCCESS;
1290}
1291
1292ENGINE_ERROR_CODE EventuallyPersistentStore::checkForDBExistence(DBFileId db_file_id) {
1293    std::string backend = engine.getConfiguration().getBackend();
1294    if (backend.compare("couchdb") == 0) {
1295        RCPtr<VBucket> vb = vbMap.getBucket(db_file_id);
1296        if (!vb) {
1297            return ENGINE_NOT_MY_VBUCKET;
1298        }
1299    } else if (backend.compare("forestdb") == 0) {
1300        if (db_file_id > (vbMap.getNumShards() - 1)) {
1301            //TODO: find a better error code
1302            return ENGINE_EINVAL;
1303        }
1304    } else {
1305        LOG(EXTENSION_LOG_WARNING,
1306            "Unknown backend specified for db file id: %d", db_file_id);
1307        return ENGINE_FAILED;
1308    }
1309
1310    return ENGINE_SUCCESS;
1311}
1312
1313ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1314                                                       compaction_ctx c,
1315                                                       const void *cookie) {
1316    ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
1317    if (errCode != ENGINE_SUCCESS) {
1318        return errCode;
1319    }
1320
1321    /* Obtain the vbucket so we can get the previous purge seqno */
1322    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1323    if (!vb) {
1324        return ENGINE_NOT_MY_VBUCKET;
1325    }
1326
1327    /* Update the compaction ctx with the previous purge seqno */
1328    c.max_purged_seq = vb->getPurgeSeqno();
1329
1330    LockHolder lh(compactionLock);
1331    ExTask task = new CompactTask(&engine, c, cookie);
1332    compactionTasks.push_back(std::make_pair(c.db_file_id, task));
1333    if (compactionTasks.size() > 1) {
1334        if ((stats.diskQueueSize > compactionWriteQueueCap &&
1335            compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1336            engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1337            // Snooze a new compaction task.
1338            // We will wake it up when one of the existing compaction tasks is done.
1339            task->snooze(60);
1340        }
1341    }
1342
1343    ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1344
1345    LOG(EXTENSION_LOG_DEBUG,
1346        "Scheduled compaction task %" PRIu64 " on db %d,"
1347        "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64
1348        ", dropdeletes = %d",
1349        uint64_t(task->getId()),c.db_file_id, c.purge_before_ts,
1350        c.purge_before_seq, c.drop_deletes);
1351
1352   return ENGINE_EWOULDBLOCK;
1353}
1354
1355class ExpiredItemsCallback : public Callback<std::string&, uint64_t&> {
1356    public:
1357        ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid,
1358                             time_t start)
1359            : epstore(store), vbucket(vbid), startTime(start) { }
1360
1361        void callback(std::string& key, uint64_t& revSeqno) {
1362            if (epstore->compactionCanExpireItems()) {
1363                epstore->deleteExpiredItem(vbucket, key, startTime, revSeqno,
1364                                           EXP_BY_COMPACTOR);
1365            }
1366        }
1367
1368    private:
1369        EventuallyPersistentStore *epstore;
1370        uint16_t vbucket;
1371        time_t startTime;
1372};
1373
1374bool EventuallyPersistentStore::doCompact(compaction_ctx *ctx,
1375                                          const void *cookie) {
1376    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1377    const uint16_t vbid = ctx->db_file_id;
1378    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1379    if (vb) {
1380        LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1381        if (!lh.islocked()) {
1382            return true; // Schedule a compaction task again.
1383        }
1384
1385        Configuration &config = getEPEngine().getConfiguration();
1386        if (config.isBfilterEnabled()) {
1387            size_t initial_estimation = config.getBfilterKeyCount();
1388            size_t estimated_count;
1389            size_t num_deletes =
1390                    getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
1391            if (eviction_policy == VALUE_ONLY) {
1392                /**
1393                 * VALUE-ONLY EVICTION POLICY
1394                 * Obtain number of persisted deletes from underlying kvstore.
1395                 * Bloomfilter's estimated_key_count = 1.25 * deletes
1396                 */
1397
1398                estimated_count = round(1.25 * num_deletes);
1399                std::shared_ptr<Callback<std::string&, bool&> >
1400                    filter(new BloomFilterCallback(*this, vbid, false));
1401                ctx->bloomFilterCallback = filter;
1402            } else {
1403                /**
1404                 * FULL EVICTION POLICY
1405                 * First determine if the resident ratio of vbucket is less than
1406                 * the threshold from configuration.
1407                 */
1408
1409                bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
1410                                                getBfiltersResidencyThreshold(),
1411                                                eviction_policy);
1412                std::shared_ptr<Callback<std::string&, bool&> >
1413                    filter(new BloomFilterCallback(*this, vbid, residentRatioAlert));
1414                ctx->bloomFilterCallback = filter;
1415
1416                /**
1417                 * Based on resident ratio against threshold, estimate count.
1418                 *
1419                 * 1. If resident ratio is greater than the threshold:
1420                 * Obtain number of persisted deletes from underlying kvstore.
1421                 * Obtain number of non-resident-items for vbucket.
1422                 * Bloomfilter's estimated_key_count =
1423                 *                              1.25 * (deletes + non-resident)
1424                 *
1425                 * 2. Otherwise:
1426                 * Obtain number of items for vbucket.
1427                 * Bloomfilter's estimated_key_count =
1428                 *                              1.25 * (num_items)
1429                 */
1430
1431                if (residentRatioAlert) {
1432                    estimated_count = round(1.25 *
1433                                            vb->getNumItems(eviction_policy));
1434                } else {
1435                    estimated_count = round(1.25 * (num_deletes +
1436                                vb->getNumNonResidentItems(eviction_policy)));
1437                }
1438            }
1439            if (estimated_count < initial_estimation) {
1440                estimated_count = initial_estimation;
1441            }
1442            vb->initTempFilter(estimated_count, config.getBfilterFpProb());
1443        }
1444
1445        if (vb->getState() == vbucket_state_active) {
1446            // Set the current time ONLY for active vbuckets.
1447            ctx->curr_time = ep_real_time();
1448        } else {
1449            ctx->curr_time = 0;
1450        }
1451        std::shared_ptr<Callback<std::string&, uint64_t&> >
1452           expiry(new ExpiredItemsCallback(this, vbid, ctx->curr_time));
1453        ctx->expiryCallback = expiry;
1454
1455        KVStatsCallback kvcb(this);
1456        if (getRWUnderlying(vbid)->compactDB(ctx, kvcb)) {
1457            if (config.isBfilterEnabled()) {
1458                vb->swapFilter();
1459            } else {
1460                vb->clearFilter();
1461            }
1462        } else {
1463            LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
1464                    "clearing bloom filter, if any.", vb->getId());
1465            vb->clearFilter();
1466        }
1467        vb->setPurgeSeqno(ctx->max_purged_seq);
1468    } else {
1469        err = ENGINE_NOT_MY_VBUCKET;
1470        engine.storeEngineSpecific(cookie, NULL);
1471        //Decrement session counter here, as memcached thread wouldn't
1472        //visit the engine interface in case of a NOT_MY_VB notification
1473        engine.decrementSessionCtr();
1474    }
1475
1476    updateCompactionTasks(ctx->db_file_id);
1477
1478    if (cookie) {
1479        engine.notifyIOComplete(cookie, err);
1480    }
1481    --stats.pendingCompactions;
1482    return false;
1483}
1484
1485void EventuallyPersistentStore::updateCompactionTasks(DBFileId db_file_id) {
1486    LockHolder lh(compactionLock);
1487    bool erased = false, woke = false;
1488    std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1489    while (it != compactionTasks.end()) {
1490        if ((*it).first == db_file_id) {
1491            it = compactionTasks.erase(it);
1492            erased = true;
1493        } else {
1494            ExTask &task = (*it).second;
1495            if (task->getState() == TASK_SNOOZED) {
1496                ExecutorPool::get()->wake(task->getId());
1497                woke = true;
1498            }
1499            ++it;
1500        }
1501        if (erased && woke) {
1502            break;
1503        }
1504    }
1505}
1506
1507bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1508    LockHolder lh(vbsetMutex);
1509    return resetVBucket_UNLOCKED(vbid, lh);
1510}
1511
1512bool EventuallyPersistentStore::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
1513    bool rv(false);
1514
1515    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1516    if (vb) {
1517        vbucket_state_t vbstate = vb->getState();
1518
1519        vbMap.removeBucket(vbid);
1520
1521        checkpointCursorInfoList cursors =
1522                                        vb->checkpointManager.getAllCursors();
1523        // Delete and recreate the vbucket database file
1524        scheduleVBDeletion(vb, NULL, 0);
1525        setVBucketState_UNLOCKED(vbid, vbstate,
1526                                 false/*transfer*/, true/*notifyDcp*/, vbset);
1527
1528        // Copy the all cursors from the old vbucket into the new vbucket
1529        RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1530        newvb->checkpointManager.resetCursors(cursors);
1531
1532        rv = true;
1533    }
1534    return rv;
1535}
1536
1537extern "C" {
1538
1539    typedef struct {
1540        EventuallyPersistentEngine* engine;
1541        std::map<std::string, std::string> smap;
1542    } snapshot_stats_t;
1543
1544    static void add_stat(const char *key, const uint16_t klen,
1545                         const char *val, const uint32_t vlen,
1546                         const void *cookie) {
1547        if (cookie == nullptr) {
1548            throw std::invalid_argument("add_stat: cookie is NULL");
1549        }
1550        void *ptr = const_cast<void *>(cookie);
1551        snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1552        ObjectRegistry::onSwitchThread(snap->engine);
1553
1554        std::string k(key, klen);
1555        std::string v(val, vlen);
1556        snap->smap.insert(std::pair<std::string, std::string>(k, v));
1557    }
1558}
1559
1560void EventuallyPersistentStore::snapshotStats() {
1561    snapshot_stats_t snap;
1562    snap.engine = &engine;
1563    bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1564              engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1565              engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1566
1567    if (rv && stats.isShutdown) {
1568        snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1569                                                              "true" : "false";
1570        std::stringstream ss;
1571        ss << ep_real_time();
1572        snap.smap["ep_shutdown_time"] = ss.str();
1573    }
1574    getOneRWUnderlying()->snapshotStats(snap.smap);
1575}
1576
1577void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1578                                              const hrtime_t start,
1579                                              const hrtime_t stop) {
1580    if (stop >= start && start >= init) {
1581        // skip the measurement if the counter wrapped...
1582        ++stats.bgNumOperations;
1583        hrtime_t w = (start - init) / 1000;
1584        BlockTimer::log(start - init, "bgwait", stats.timingLog);
1585        stats.bgWaitHisto.add(w);
1586        stats.bgWait.fetch_add(w);
1587        atomic_setIfLess(stats.bgMinWait, w);
1588        atomic_setIfBigger(stats.bgMaxWait, w);
1589
1590        hrtime_t l = (stop - start) / 1000;
1591        BlockTimer::log(stop - start, "bgload", stats.timingLog);
1592        stats.bgLoadHisto.add(l);
1593        stats.bgLoad.fetch_add(l);
1594        atomic_setIfLess(stats.bgMinLoad, l);
1595        atomic_setIfBigger(stats.bgMaxLoad, l);
1596    }
1597}
1598
1599void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1600                                                uint16_t vbucket,
1601                                                const void *cookie,
1602                                                hrtime_t init,
1603                                                bool isMeta) {
1604    hrtime_t start(gethrtime());
1605    // Go find the data
1606    RememberingCallback<GetValue> gcb;
1607    if (isMeta) {
1608        gcb.val.setPartial();
1609        ++stats.bg_meta_fetched;
1610    } else {
1611        ++stats.bg_fetched;
1612    }
1613    getROUnderlying(vbucket)->get(key, vbucket, gcb);
1614    gcb.waitForValue();
1615    ENGINE_ERROR_CODE status = gcb.val.getStatus();
1616
1617    // Lock to prevent a race condition between a fetch for restore and delete
1618    LockHolder lh(vbsetMutex);
1619
1620    RCPtr<VBucket> vb = getVBucket(vbucket);
1621    if (vb) {
1622        ReaderLockHolder rlh(vb->getStateLock());
1623        int bucket_num(0);
1624        LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1625        StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1626        if (isMeta) {
1627            if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1628                                              gcb.val.getStatus(), vb->ht))
1629                || ENGINE_KEY_ENOENT == status) {
1630                /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1631                 key is removed from hash table by the time bgfetch returns
1632                 (in case multiple bgfetch is scheduled for a key), we still
1633                 need to return ENGINE_SUCCESS to the memcached worker thread,
1634                 so that the worker thread can visit the ep-engine and figure
1635                 out the correct flow */
1636                status = ENGINE_SUCCESS;
1637            }
1638        } else {
1639            bool restore = false;
1640            if (v && v->isResident()) {
1641                status = ENGINE_SUCCESS;
1642            } else if (v && v->isDeleted()) {
1643                status = ENGINE_KEY_ENOENT;
1644            } else {
1645                switch (eviction_policy) {
1646                    case VALUE_ONLY:
1647                        if (v && !v->isResident() && !v->isDeleted()) {
1648                            restore = true;
1649                        }
1650                        break;
1651                    case FULL_EVICTION:
1652                        if (v) {
1653                            if (v->isTempInitialItem() ||
1654                                (!v->isResident() && !v->isDeleted())) {
1655                                restore = true;
1656                            }
1657                        }
1658                        break;
1659                    default:
1660                        throw std::logic_error("Unknown eviction policy");
1661                }
1662            }
1663
1664            if (restore) {
1665                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1666                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1667                    if (!v->isResident()) {
1668                        throw std::logic_error("EPStore::completeBGFetch: "
1669                                "storedvalue (which has key " + v->getKey() +
1670                                ") should be resident after calling restoreValue()");
1671                    }
1672                    if (vb->getState() == vbucket_state_active &&
1673                        v->getExptime() != gcb.val.getValue()->getExptime() &&
1674                        v->getCas() == gcb.val.getValue()->getCas()) {
1675                        // MB-9306: It is possible that by the time bgfetcher
1676                        // returns, the item may have been updated and queued
1677                        // Hence test the CAS value to be the same first.
1678                        // exptime mutated, schedule it into new checkpoint
1679                        queueDirty(vb, v, &hlh, NULL, GenerateBySeqno::Yes,
1680                                                    GenerateCas::No);
1681                    }
1682                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1683                    v->setNonExistent();
1684                    if (eviction_policy == FULL_EVICTION) {
1685                        // For the full eviction, we should notify
1686                        // ENGINE_SUCCESS to the memcached worker thread, so
1687                        // that the worker thread can visit the ep-engine and
1688                        // figure out the correct error code.
1689                        status = ENGINE_SUCCESS;
1690                    }
1691                } else {
1692                    // underlying kvstore couldn't fetch requested data
1693                    // log returned error and notify TMPFAIL to client
1694                    LOG(EXTENSION_LOG_WARNING,
1695                        "Failed background fetch for vb=%d "
1696                        "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
1697                        key.c_str());
1698                    status = ENGINE_TMPFAIL;
1699                }
1700            }
1701        }
1702    } else {
1703        LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1704            " a bg fetch for key %s\n", vbucket, key.c_str());
1705        status = ENGINE_NOT_MY_VBUCKET;
1706    }
1707
1708    lh.unlock();
1709
1710    hrtime_t stop = gethrtime();
1711    updateBGStats(init, start, stop);
1712    bgFetchQueue--;
1713
1714    delete gcb.val.getValue();
1715    engine.notifyIOComplete(cookie, status);
1716}
1717
1718void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1719                                 std::vector<bgfetched_item_t> &fetchedItems,
1720                                 hrtime_t startTime)
1721{
1722    RCPtr<VBucket> vb = getVBucket(vbId);
1723    if (!vb) {
1724        std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1725        for (; itemItr != fetchedItems.end(); ++itemItr) {
1726            engine.notifyIOComplete((*itemItr).second->cookie,
1727                                    ENGINE_NOT_MY_VBUCKET);
1728        }
1729        LOG(EXTENSION_LOG_WARNING,
1730            "EP Store completes %d of batched background fetch for "
1731            "for vBucket = %d that is already deleted\n",
1732            (int)fetchedItems.size(), vbId);
1733        return;
1734    }
1735
1736    std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1737    for (; itemItr != fetchedItems.end(); ++itemItr) {
1738        VBucketBGFetchItem *bgitem = (*itemItr).second;
1739        ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1740        Item *fetchedValue = bgitem->value.getValue();
1741        const std::string &key = (*itemItr).first;
1742        {   //locking scope
1743            ReaderLockHolder rlh(vb->getStateLock());
1744            int bucket = 0;
1745            LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1746            StoredValue *v = fetchValidValue(vb, key, bucket, true);
1747            if (bgitem->metaDataOnly) {
1748                if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1749                    || ENGINE_KEY_ENOENT == status) {
1750                    /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1751                     key is removed from hash table by the time bgfetch returns
1752                     (in case multiple bgfetch is scheduled for a key), we still
1753                     need to return ENGINE_SUCCESS to the memcached worker thread,
1754                     so that the worker thread can visit the ep-engine and figure
1755                     out the correct flow */
1756                    status = ENGINE_SUCCESS;
1757                }
1758            } else {
1759                bool restore = false;
1760                if (v && v->isResident()) {
1761                    status = ENGINE_SUCCESS;
1762                } else if (v && v->isDeleted()) {
1763                    status = ENGINE_KEY_ENOENT;
1764                } else {
1765                    switch (eviction_policy) {
1766                        case VALUE_ONLY:
1767                            if (v && !v->isResident() && !v->isDeleted()) {
1768                                restore = true;
1769                            }
1770                            break;
1771                        case FULL_EVICTION:
1772                            if (v) {
1773                                if (v->isTempInitialItem() ||
1774                                    (!v->isResident() && !v->isDeleted())) {
1775                                    restore = true;
1776                                }
1777                            }
1778                            break;
1779                        default:
1780                            throw std::logic_error("Unknown eviction policy");
1781                    }
1782                }
1783
1784                if (restore) {
1785                    if (status == ENGINE_SUCCESS) {
1786                        v->unlocked_restoreValue(fetchedValue, vb->ht);
1787                        if (!v->isResident()) {
1788                            throw std::logic_error("EPStore::completeBGFetchMulti: "
1789                                "storedvalue (which has key " + v->getKey() +
1790                                ") should be resident after calling restoreValue()");
1791                        }
1792                        if (vb->getState() == vbucket_state_active &&
1793                            v->getExptime() != fetchedValue->getExptime() &&
1794                            v->getCas() == fetchedValue->getCas()) {
1795                            // MB-9306: It is possible that by the time
1796                            // bgfetcher returns, the item may have been
1797                            // updated and queued
1798                            // Hence test the CAS value to be the same first.
1799                            // exptime mutated, schedule it into new checkpoint
1800                            queueDirty(vb, v, &blh, NULL, GenerateBySeqno::Yes,
1801                                                        GenerateCas::No);
1802                        }
1803                    } else if (status == ENGINE_KEY_ENOENT) {
1804                        v->setNonExistent();
1805                        if (eviction_policy == FULL_EVICTION) {
1806                            // For the full eviction, we should notify
1807                            // ENGINE_SUCCESS to the memcached worker thread,
1808                            // so that the worker thread can visit the
1809                            // ep-engine and figure out the correct error
1810                            // code.
1811                            status = ENGINE_SUCCESS;
1812                        }
1813                    } else {
1814                        // underlying kvstore couldn't fetch requested data
1815                        // log returned error and notify TMPFAIL to client
1816                        LOG(EXTENSION_LOG_WARNING,
1817                            "Failed background fetch for vb=%d "
1818                            "key=%s", vbId, key.c_str());
1819                        status = ENGINE_TMPFAIL;
1820                    }
1821                }
1822            }
1823        } // locked scope ends
1824
1825        if (bgitem->metaDataOnly) {
1826            ++stats.bg_meta_fetched;
1827        } else {
1828            ++stats.bg_fetched;
1829        }
1830
1831        hrtime_t endTime = gethrtime();
1832        updateBGStats(bgitem->initTime, startTime, endTime);
1833        engine.notifyIOComplete(bgitem->cookie, status);
1834    }
1835
1836    LOG(EXTENSION_LOG_DEBUG,
1837        "EP Store completes %" PRIu64 " of batched background fetch "
1838        "for vBucket = %d endTime = %" PRIu64,
1839        uint64_t(fetchedItems.size()), vbId, gethrtime()/1000000);
1840}
1841
1842void EventuallyPersistentStore::bgFetch(const const_sized_buffer key,
1843                                        uint16_t vbucket,
1844                                        const void *cookie,
1845                                        bool isMeta) {
1846    if (multiBGFetchEnabled()) {
1847        RCPtr<VBucket> vb = getVBucket(vbucket);
1848        if (!vb) {
1849            throw std::invalid_argument("EPStore::bgFetch: vbucket (which is " +
1850                                        std::to_string(vbucket) +
1851                                        ") is not present in vbMap");
1852        }
1853        KVShard *myShard = vbMap.getShardByVbId(vbucket);
1854
1855        // schedule to the current batch of background fetch of the given
1856        // vbucket
1857        VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1858                                                                isMeta);
1859        size_t bgfetch_size = vb->queueBGFetchItem(key, fetchThis,
1860                                                   myShard->getBgFetcher());
1861        myShard->getBgFetcher()->notifyBGEvent();
1862        LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1863            uint64_t(bgfetch_size));
1864    } else {
1865        bgFetchQueue++;
1866        stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1867                                            bgFetchQueue.load());
1868        ExecutorPool* iom = ExecutorPool::get();
1869        ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
1870                                              isMeta, bgFetchDelay, false);
1871        iom->schedule(task, READER_TASK_IDX);
1872        LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1873            uint64_t(bgFetchQueue.load()));
1874    }
1875}
1876
1877GetValue EventuallyPersistentStore::getInternal(const const_sized_buffer key,
1878                                                uint16_t vbucket,
1879                                                const void *cookie,
1880                                                vbucket_state_t allowedState,
1881                                                get_options_t options) {
1882
1883    vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1884        vbucket_state_replica : vbucket_state_active;
1885    RCPtr<VBucket> vb = getVBucket(vbucket);
1886    if (!vb) {
1887        ++stats.numNotMyVBuckets;
1888        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1889    }
1890
1891    const bool honorStates = (options & HONOR_STATES);
1892
1893    ReaderLockHolder rlh(vb->getStateLock());
1894    if (honorStates) {
1895        vbucket_state_t vbState = vb->getState();
1896        if (vbState == vbucket_state_dead) {
1897            ++stats.numNotMyVBuckets;
1898            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1899        } else if (vbState == disallowedState) {
1900            ++stats.numNotMyVBuckets;
1901            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1902        } else if (vbState == vbucket_state_pending) {
1903            if (vb->addPendingOp(cookie)) {
1904                return GetValue(NULL, ENGINE_EWOULDBLOCK);
1905            }
1906        }
1907    }
1908
1909    const bool trackReference = (options & TRACK_REFERENCE);
1910
1911    int bucket_num(0);
1912    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1913    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1914                                     trackReference);
1915    if (v) {
1916        if (v->isDeleted()) {
1917            GetValue rv;
1918            return rv;
1919        }
1920        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1921            // Delete a temp non-existent item to ensure that
1922            // if the get were issued over an item that doesn't
1923            // exist, then we dont preserve a temp item.
1924            if (options & DELETE_TEMP) {
1925                vb->ht.unlocked_del(key, bucket_num);
1926            }
1927            GetValue rv;
1928            return rv;
1929        }
1930
1931        // If the value is not resident, wait for it...
1932        if (!v->isResident()) {
1933            if (options & QUEUE_BG_FETCH) {
1934                bgFetch(key, vbucket, cookie);
1935            }
1936            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1937                            true, v->getNRUValue());
1938        }
1939
1940        // Should we hide (return -1) for the items' CAS?
1941        const bool hide_cas = (options & HIDE_LOCKED_CAS) &&
1942                              v->isLocked(ep_current_time());
1943        GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
1944                    v->getBySeqno(), false, v->getNRUValue());
1945        return rv;
1946    } else {
1947        if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1948            GetValue rv;
1949            return rv;
1950        }
1951
1952        if (vb->maybeKeyExistsInFilter(key)) {
1953            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1954            if (options & QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1955                ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1956                                           cookie, false);
1957            }
1958            return GetValue(NULL, ec, -1, true);
1959        } else {
1960            // As bloomfilter predicted that item surely doesn't exist
1961            // on disk, return ENONET, for getInternal().
1962            GetValue rv;
1963            return rv;
1964        }
1965    }
1966}
1967
1968GetValue EventuallyPersistentStore::getRandomKey() {
1969    VBucketMap::id_type max = vbMap.getSize();
1970
1971    const long start = random() % max;
1972    long curr = start;
1973    Item *itm = NULL;
1974
1975    while (itm == NULL) {
1976        RCPtr<VBucket> vb = getVBucket(curr++);
1977        while (!vb || vb->getState() != vbucket_state_active) {
1978            if (curr == start) {
1979                return GetValue(NULL, ENGINE_KEY_ENOENT);
1980            }
1981            if (curr == max) {
1982                curr = 0;
1983            }
1984
1985            vb = getVBucket(curr++);
1986        }
1987
1988        if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1989            GetValue rv(itm, ENGINE_SUCCESS);
1990            return rv;
1991        }
1992
1993        if (curr == max) {
1994            curr = 0;
1995        }
1996
1997        if (curr == start) {
1998            return GetValue(NULL, ENGINE_KEY_ENOENT);
1999        }
2000        // Search next vbucket
2001    }
2002
2003    return GetValue(NULL, ENGINE_KEY_ENOENT);
2004}
2005
2006
2007ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
2008                                                        const std::string &key,
2009                                                        uint16_t vbucket,
2010                                                        const void *cookie,
2011                                                        ItemMetaData &metadata,
2012                                                        uint32_t &deleted,
2013                                                        bool trackReferenced)
2014{
2015    (void) cookie;
2016    RCPtr<VBucket> vb = getVBucket(vbucket);
2017
2018    if (!vb) {
2019        ++stats.numNotMyVBuckets;
2020        return ENGINE_NOT_MY_VBUCKET;
2021    }
2022
2023    ReaderLockHolder rlh(vb->getStateLock());
2024    if (vb->getState() == vbucket_state_dead ||
2025        vb->getState() == vbucket_state_replica) {
2026        ++stats.numNotMyVBuckets;
2027        return ENGINE_NOT_MY_VBUCKET;
2028    }
2029
2030    int bucket_num(0);
2031    deleted = 0;
2032    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2033    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
2034                                          trackReferenced);
2035
2036    if (v) {
2037        stats.numOpsGetMeta++;
2038        if (v->isTempInitialItem()) { // Need bg meta fetch.
2039            bgFetch(key, vbucket, cookie, true);
2040            return ENGINE_EWOULDBLOCK;
2041        } else if (v->isTempNonExistentItem()) {
2042            metadata.cas = v->getCas();
2043            return ENGINE_KEY_ENOENT;
2044        } else {
2045            if (v->isTempDeletedItem() || v->isDeleted() ||
2046                v->isExpired(ep_real_time())) {
2047                deleted |= GET_META_ITEM_DELETED_FLAG;
2048            }
2049
2050            if (v->isLocked(ep_current_time())) {
2051                metadata.cas = static_cast<uint64_t>(-1);
2052            } else {
2053                metadata.cas = v->getCas();
2054            }
2055            metadata.flags = v->getFlags();
2056            metadata.exptime = v->getExptime();
2057            metadata.revSeqno = v->getRevSeqno();
2058            return ENGINE_SUCCESS;
2059        }
2060    } else {
2061        // The key wasn't found. However, this may be because it was previously
2062        // deleted or evicted with the full eviction strategy.
2063        // So, add a temporary item corresponding to the key to the hash table
2064        // and schedule a background fetch for its metadata from the persistent
2065        // store. The item's state will be updated after the fetch completes.
2066        //
2067        // Schedule this bgFetch only if the key is predicted to be may-be
2068        // existent on disk by the bloomfilter.
2069
2070        if (vb->maybeKeyExistsInFilter(key)) {
2071            return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
2072        } else {
2073            stats.numOpsGetMeta++;
2074            return ENGINE_KEY_ENOENT;
2075        }
2076    }
2077}
2078
2079ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
2080                                                     const Item &itm,
2081                                                     uint64_t cas,
2082                                                     uint64_t *seqno,
2083                                                     const void *cookie,
2084                                                     bool force,
2085                                                     bool allowExisting,
2086                                                     uint8_t nru,
2087                                                     GenerateBySeqno genBySeqno,
2088                                                     GenerateCas genCas,
2089                                                     ExtendedMetaData *emd,
2090                                                     bool isReplication)
2091{
2092    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
2093    if (!vb) {
2094        ++stats.numNotMyVBuckets;
2095        return ENGINE_NOT_MY_VBUCKET;
2096    }
2097
2098    ReaderLockHolder rlh(vb->getStateLock());
2099    if (vb->getState() == vbucket_state_dead) {
2100        ++stats.numNotMyVBuckets;
2101        return ENGINE_NOT_MY_VBUCKET;
2102    } else if (vb->getState() == vbucket_state_replica && !force) {
2103        ++stats.numNotMyVBuckets;
2104        return ENGINE_NOT_MY_VBUCKET;
2105    } else if (vb->getState() == vbucket_state_pending && !force) {
2106        if (vb->addPendingOp(cookie)) {
2107            return ENGINE_EWOULDBLOCK;
2108        }
2109    } else if (vb->isTakeoverBackedUp()) {
2110        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a setWithMeta op"
2111                ", becuase takeover is lagging", vb->getId());
2112        return ENGINE_TMPFAIL;
2113    }
2114
2115    //check for the incoming item's CAS validity
2116    if (!Item::isValidCas(itm.getCas())) {
2117        return ENGINE_KEY_EEXISTS;
2118    }
2119
2120    int bucket_num(0);
2121    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
2122    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
2123                                          false);
2124
2125    bool maybeKeyExists = true;
2126    if (!force) {
2127        if (v)  {
2128            if (v->isTempInitialItem()) {
2129                bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
2130                return ENGINE_EWOULDBLOCK;
2131            }
2132
2133            if (!conflictResolver->resolve(*v, itm.getMetaData(), false)) {
2134                ++stats.numOpsSetMetaResolutionFailed;
2135                return ENGINE_KEY_EEXISTS;
2136            }
2137        } else {
2138            if (vb->maybeKeyExistsInFilter(itm.getKey())) {
2139                return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2140                                             cookie, true, isReplication);
2141            } else {
2142                maybeKeyExists = false;
2143            }
2144        }
2145    } else {
2146        if (eviction_policy == FULL_EVICTION) {
2147            // Check Bloomfilter's prediction
2148            if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
2149                maybeKeyExists = false;
2150            }
2151        }
2152    }
2153
2154    if (v && v->isLocked(ep_current_time()) &&
2155        (vb->getState() == vbucket_state_replica ||
2156         vb->getState() == vbucket_state_pending)) {
2157        v->unlock();
2158    }
2159
2160    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
2161                                                true, eviction_policy, nru,
2162                                                maybeKeyExists, isReplication);
2163
2164    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2165    switch (mtype) {
2166    case NOMEM:
2167        ret = ENGINE_ENOMEM;
2168        break;
2169    case INVALID_CAS:
2170    case IS_LOCKED:
2171        ret = ENGINE_KEY_EEXISTS;
2172        break;
2173    case INVALID_VBUCKET:
2174        ret = ENGINE_NOT_MY_VBUCKET;
2175        break;
2176    case WAS_DIRTY:
2177    case WAS_CLEAN:
2178        vb->setMaxCasAndTrackDrift(v->getCas());
2179        queueDirty(vb, v, &lh, seqno, genBySeqno, genCas);
2180        break;
2181    case NOT_FOUND:
2182        ret = ENGINE_KEY_ENOENT;
2183        break;
2184    case NEED_BG_FETCH:
2185        {            // CAS operation with non-resident item + full eviction.
2186            if (v) { // temp item is already created. Simply schedule a
2187                lh.unlock(); // bg fetch job.
2188                bgFetch(itm.getKey(), vb->getId(), cookie, true);
2189                return ENGINE_EWOULDBLOCK;
2190            }
2191
2192            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2193                                        cookie, true, isReplication);
2194        }
2195    }
2196
2197    return ret;
2198}
2199
2200GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
2201                                                    uint16_t vbucket,
2202                                                    const void *cookie,
2203                                                    time_t exptime)
2204{
2205    RCPtr<VBucket> vb = getVBucket(vbucket);
2206    if (!vb) {
2207        ++stats.numNotMyVBuckets;
2208        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2209    }
2210
2211    ReaderLockHolder rlh(vb->getStateLock());
2212    if (vb->getState() == vbucket_state_dead) {
2213        ++stats.numNotMyVBuckets;
2214        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2215    } else if (vb->getState() == vbucket_state_replica) {
2216        ++stats.numNotMyVBuckets;
2217        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2218    } else if (vb->getState() == vbucket_state_pending) {
2219        if (vb->addPendingOp(cookie)) {
2220            return GetValue(NULL, ENGINE_EWOULDBLOCK);
2221        }
2222    }
2223
2224    int bucket_num(0);
2225    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2226    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2227
2228    if (v) {
2229        if (v->isDeleted() || v->isTempDeletedItem() ||
2230            v->isTempNonExistentItem()) {
2231            GetValue rv;
2232            return rv;
2233        }
2234
2235        if (!v->isResident()) {
2236            bgFetch(key, vbucket, cookie);
2237            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
2238        }
2239        if (v->isLocked(ep_current_time())) {
2240            GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
2241            return rv;
2242        }
2243
2244        const bool exptime_mutated = exptime != v->getExptime();
2245        if (exptime_mutated) {
2246            v->markDirty();
2247            v->setExptime(exptime);
2248            v->setRevSeqno(v->getRevSeqno()+1);
2249        }
2250
2251        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
2252                    ENGINE_SUCCESS, v->getBySeqno());
2253
2254        if (exptime_mutated) {
2255            queueDirty(vb, v, &lh, NULL);
2256        }
2257
2258        return rv;
2259    } else {
2260        if (eviction_policy == VALUE_ONLY) {
2261            GetValue rv;
2262            return rv;
2263        } else {
2264            if (vb->maybeKeyExistsInFilter(key)) {
2265                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2266                                                             key, vb, cookie,
2267                                                             false);
2268                return GetValue(NULL, ec, -1, true);
2269            } else {
2270                // As bloomfilter predicted that item surely doesn't exist
2271                // on disk, return ENOENT for getAndUpdateTtl().
2272                GetValue rv;
2273                return rv;
2274            }
2275        }
2276    }
2277}
2278
2279ENGINE_ERROR_CODE
2280EventuallyPersistentStore::statsVKey(const std::string &key,
2281                                     uint16_t vbucket,
2282                                     const void *cookie) {
2283    RCPtr<VBucket> vb = getVBucket(vbucket);
2284    if (!vb) {
2285        return ENGINE_NOT_MY_VBUCKET;
2286    }
2287
2288    int bucket_num(0);
2289    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2290    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2291
2292    if (v) {
2293        if (v->isDeleted() || v->isTempDeletedItem() ||
2294            v->isTempNonExistentItem()) {
2295            return ENGINE_KEY_ENOENT;
2296        }
2297        bgFetchQueue++;
2298        ExecutorPool* iom = ExecutorPool::get();
2299        ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2300                                           v->getBySeqno(), cookie,
2301                                           bgFetchDelay, false);
2302        iom->schedule(task, READER_TASK_IDX);
2303        return ENGINE_EWOULDBLOCK;
2304    } else {
2305        if (eviction_policy == VALUE_ONLY) {
2306            return ENGINE_KEY_ENOENT;
2307        } else {
2308            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2309                                                        eviction_policy);
2310            switch(rv) {
2311            case ADD_NOMEM:
2312                return ENGINE_ENOMEM;
2313            case ADD_EXISTS:
2314            case ADD_UNDEL:
2315            case ADD_SUCCESS:
2316            case ADD_TMP_AND_BG_FETCH:
2317                // Since the hashtable bucket is locked, we shouldn't get here
2318                throw std::logic_error("EventuallyPersistentStore::statsVKey: "
2319                        "Invalid result from unlocked_addTempItem (" +
2320                        std::to_string(rv) + ")");
2321
2322            case ADD_BG_FETCH:
2323                {
2324                    ++bgFetchQueue;
2325                    ExecutorPool* iom = ExecutorPool::get();
2326                    ExTask task = new VKeyStatBGFetchTask(&engine, key,
2327                                                          vbucket, -1, cookie,
2328                                                          bgFetchDelay, false);
2329                    iom->schedule(task, READER_TASK_IDX);
2330                }
2331            }
2332            return ENGINE_EWOULDBLOCK;
2333        }
2334    }
2335}
2336
2337void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2338                                                  std::string &key,
2339                                                  uint16_t vbid,
2340                                                  uint64_t bySeqNum) {
2341    RememberingCallback<GetValue> gcb;
2342
2343    getROUnderlying(vbid)->get(key, vbid, gcb);
2344    gcb.waitForValue();
2345
2346    if (eviction_policy == FULL_EVICTION) {
2347        RCPtr<VBucket> vb = getVBucket(vbid);
2348        if (vb) {
2349            int bucket_num(0);
2350            LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2351            StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2352            if (v && v->isTempInitialItem()) {
2353                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2354                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2355                    if (!v->isResident()) {
2356                        throw std::logic_error("EPStore::completeStatsVKey: "
2357                            "storedvalue (which has key " + v->getKey() +
2358                            ") should be resident after calling restoreValue()");
2359                    }
2360                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2361                    v->setNonExistent();
2362                } else {
2363                    // underlying kvstore couldn't fetch requested data
2364                    // log returned error and notify TMPFAIL to client
2365                    LOG(EXTENSION_LOG_WARNING,
2366                        "Failed background fetch for vb=%d "
2367                        "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
2368                        key.c_str());
2369                }
2370            }
2371        }
2372    }
2373
2374    if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2375        engine.addLookupResult(cookie, gcb.val.getValue());
2376    } else {
2377        engine.addLookupResult(cookie, NULL);
2378    }
2379
2380    bgFetchQueue--;
2381    engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2382}
2383
2384GetValue EventuallyPersistentStore::getLocked(const std::string &key,
2385                                              uint16_t vbucket,
2386                                              rel_time_t currentTime,
2387                                              uint32_t lockTimeout,
2388                                              const void *cookie) {
2389    RCPtr<VBucket> vb = getVBucket(vbucket);
2390    if (!vb || vb->getState() != vbucket_state_active) {
2391        ++stats.numNotMyVBuckets;
2392        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2393    }
2394
2395    int bucket_num(0);
2396    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2397    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2398
2399    if (v) {
2400        if (v->isDeleted() || v->isTempNonExistentItem() ||
2401            v->isTempDeletedItem()) {
2402            return GetValue(NULL, ENGINE_KEY_ENOENT);
2403        }
2404
2405        // if v is locked return error
2406        if (v->isLocked(currentTime)) {
2407            return GetValue(NULL, ENGINE_TMPFAIL);
2408        }
2409
2410        // If the value is not resident, wait for it...
2411        if (!v->isResident()) {
2412            if (cookie) {
2413                bgFetch(key, vbucket, cookie);
2414            }
2415            return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
2416        }
2417
2418        // acquire lock and increment cas value
2419        v->lock(currentTime + lockTimeout);
2420
2421        Item *it = v->toItem(false, vbucket);
2422        it->setCas(vb->nextHLCCas());
2423        v->setCas(it->getCas());
2424
2425        return GetValue(it);
2426
2427    } else {
2428        // No value found in the hashtable.
2429        switch (eviction_policy) {
2430        case VALUE_ONLY:
2431            return GetValue(NULL, ENGINE_KEY_ENOENT);
2432
2433        case FULL_EVICTION:
2434            if (vb->maybeKeyExistsInFilter(key)) {
2435                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2436                                                             key, vb, cookie,
2437                                                             false);
2438                return GetValue(NULL, ec, -1, true);
2439            } else {
2440                // As bloomfilter predicted that item surely doesn't exist
2441                // on disk, return ENOENT for getLocked().
2442                return GetValue(NULL, ENGINE_KEY_ENOENT);
2443            }
2444        default:
2445            throw std::logic_error("Unknown eviction policy");
2446        }
2447    }
2448}
2449
2450ENGINE_ERROR_CODE
2451EventuallyPersistentStore::unlockKey(const std::string &key,
2452                                     uint16_t vbucket,
2453                                     uint64_t cas,
2454                                     rel_time_t currentTime)
2455{
2456
2457    RCPtr<VBucket> vb = getVBucket(vbucket);
2458    if (!vb || vb->getState() != vbucket_state_active) {
2459        ++stats.numNotMyVBuckets;
2460        return ENGINE_NOT_MY_VBUCKET;
2461    }
2462
2463    int bucket_num(0);
2464    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2465    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2466
2467    if (v) {
2468        if (v->isDeleted() || v->isTempNonExistentItem() ||
2469            v->isTempDeletedItem()) {
2470            return ENGINE_KEY_ENOENT;
2471        }
2472        if (v->isLocked(currentTime)) {
2473            if (v->getCas() == cas) {
2474                v->unlock();
2475                return ENGINE_SUCCESS;
2476            }
2477        }
2478        return ENGINE_TMPFAIL;
2479    } else {
2480        if (eviction_policy == VALUE_ONLY) {
2481            return ENGINE_KEY_ENOENT;
2482        } else {
2483            // With the full eviction, an item's lock is automatically
2484            // released when the item is evicted from memory. Therefore,
2485            // we simply return ENGINE_TMPFAIL when we receive unlockKey
2486            // for an item that is not in memocy cache. Note that we don't
2487            // spawn any bg fetch job to figure out if an item actually
2488            // exists in disk or not.
2489            return ENGINE_TMPFAIL;
2490        }
2491    }
2492}
2493
2494
2495ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2496                                            const std::string &key,
2497                                            uint16_t vbucket,
2498                                            const void *cookie,
2499                                            struct key_stats &kstats,
2500                                            bool bgfetch,
2501                                            bool wantsDeleted)
2502{
2503    RCPtr<VBucket> vb = getVBucket(vbucket);
2504    if (!vb) {
2505        return ENGINE_NOT_MY_VBUCKET;
2506    }
2507
2508    int bucket_num(0);
2509    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2510    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2511
2512    if (v) {
2513        if ((v->isDeleted() && !wantsDeleted) ||
2514            v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2515            return ENGINE_KEY_ENOENT;
2516        }
2517        if (eviction_policy == FULL_EVICTION &&
2518            v->isTempInitialItem() && bgfetch) {
2519            lh.unlock();
2520            bgFetch(key, vbucket, cookie, true);
2521            return ENGINE_EWOULDBLOCK;
2522        }
2523        kstats.logically_deleted = v->isDeleted();
2524        kstats.dirty = v->isDirty();
2525        kstats.exptime = v->getExptime();
2526        kstats.flags = v->getFlags();
2527        kstats.cas = v->getCas();
2528        kstats.vb_state = vb->getState();
2529        return ENGINE_SUCCESS;
2530    } else {
2531        if (eviction_policy == VALUE_ONLY) {
2532            return ENGINE_KEY_ENOENT;
2533        } else {
2534            if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
2535                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2536                                             cookie, true);
2537            } else {
2538                // If bgFetch were false, or bloomfilter predicted that
2539                // item surely doesn't exist on disk, return ENOENT for
2540                // getKeyStats().
2541                return ENGINE_KEY_ENOENT;
2542            }
2543        }
2544    }
2545}
2546
2547std::string EventuallyPersistentStore::validateKey(const std::string &key,
2548                                                   uint16_t vbucket,
2549                                                   Item &diskItem) {
2550    int bucket_num(0);
2551    RCPtr<VBucket> vb = getVBucket(vbucket);
2552    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2553    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2554                                     false, true);
2555
2556    if (v) {
2557        if (v->isDeleted() || v->isTempNonExistentItem() ||
2558            v->isTempDeletedItem()) {
2559            return "item_deleted";
2560        }
2561
2562        if (diskItem.getFlags() != v->getFlags()) {
2563            return "flags_mismatch";
2564        } else if (v->isResident() && memcmp(diskItem.getData(),
2565                                             v->getValue()->getData(),
2566                                             diskItem.getNBytes())) {
2567            return "data_mismatch";
2568        } else {
2569            return "valid";
2570        }
2571    } else {
2572        return "item_deleted";
2573    }
2574
2575}
2576
2577ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2578                                                        uint64_t *cas,
2579                                                        uint16_t vbucket,
2580                                                        const void *cookie,
2581                                                        bool force,
2582                                                        ItemMetaData *itemMeta,
2583                                                        mutation_descr_t
2584                                                        *mutInfo)
2585{
2586    RCPtr<VBucket> vb = getVBucket(vbucket);
2587    if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2588        ++stats.numNotMyVBuckets;
2589        return ENGINE_NOT_MY_VBUCKET;
2590    } else if (vb->getState() == vbucket_state_replica && !force) {
2591        ++stats.numNotMyVBuckets;
2592        return ENGINE_NOT_MY_VBUCKET;
2593    } else if (vb->getState() == vbucket_state_pending && !force) {
2594        if (vb->addPendingOp(cookie)) {
2595            return ENGINE_EWOULDBLOCK;
2596        }
2597    } else if (vb->isTakeoverBackedUp()) {
2598        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a delete op"
2599                ", becuase takeover is lagging", vb->getId());
2600        return ENGINE_TMPFAIL;
2601    }
2602
2603    int bucket_num(0);
2604    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2605    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2606    if (!v || v->isDeleted() || v->isTempItem()) {
2607        if (eviction_policy == VALUE_ONLY) {
2608            return ENGINE_KEY_ENOENT;
2609        } else { // Full eviction.
2610            if (!force) {
2611                if (!v) { // Item might be evicted from cache.
2612                    if (vb->maybeKeyExistsInFilter(key)) {
2613                        return addTempItemForBgFetch(lh, bucket_num, key, vb,
2614                                                     cookie, true);
2615                    } else {
2616                        // As bloomfilter predicted that item surely doesn't
2617                        // exist on disk, return ENOENT for deleteItem().
2618                        return ENGINE_KEY_ENOENT;
2619                    }
2620                } else if (v->isTempInitialItem()) {
2621                    lh.unlock();
2622                    bgFetch(key, vbucket, cookie, true);
2623                    return ENGINE_EWOULDBLOCK;
2624                } else { // Non-existent or deleted key.
2625                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2626                        // Delete a temp non-existent item to ensure that
2627                        // if a delete were issued over an item that doesn't
2628                        // exist, then we don't preserve a temp item.
2629                        vb->ht.unlocked_del(key, bucket_num);
2630                    }
2631                    return ENGINE_KEY_ENOENT;
2632                }
2633            } else {
2634                if (!v) { // Item might be evicted from cache.
2635                    // Create a temp item and delete it below as it is a
2636                    // force deletion, only if bloomfilter predicts that
2637                    // item may exist on disk.
2638                    if (vb->maybeKeyExistsInFilter(key)) {
2639                        add_type_t rv = vb->ht.unlocked_addTempItem(
2640                                                               bucket_num,
2641                                                               key,
2642                                                               eviction_policy);
2643                        if (rv == ADD_NOMEM) {
2644                            return ENGINE_ENOMEM;
2645                        }
2646                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
2647                        v->setDeleted();
2648                    } else {
2649                        return ENGINE_KEY_ENOENT;
2650                    }
2651                } else if (v->isTempInitialItem()) {
2652                    v->setDeleted();
2653                } else { // Non-existent or deleted key.
2654                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2655                        // Delete a temp non-existent item to ensure that
2656                        // if a delete were issued over an item that doesn't
2657                        // exist, then we don't preserve a temp item.
2658                        vb->ht.unlocked_del(key, bucket_num);
2659                    }
2660                    return ENGINE_KEY_ENOENT;
2661                }
2662            }
2663        }
2664    }
2665
2666    if (v && v->isLocked(ep_current_time()) &&
2667        (vb->getState() == vbucket_state_replica ||
2668         vb->getState() == vbucket_state_pending)) {
2669        v->unlock();
2670    }
2671    mutation_type_t delrv;
2672    delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2673    if (v && (delrv == NOT_FOUND || delrv == WAS_DIRTY || delrv == WAS_CLEAN)) {
2674        if (itemMeta != nullptr) {
2675            itemMeta->revSeqno = v->getRevSeqno();
2676            itemMeta->cas = v->getCas();
2677            itemMeta->flags = v->getFlags();
2678            itemMeta->exptime = v->getExptime();
2679        }
2680    }
2681
2682    uint64_t seqno = 0;
2683    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2684    switch (delrv) {
2685    case NOMEM:
2686        ret = ENGINE_ENOMEM;
2687        break;
2688    case INVALID_VBUCKET:
2689        ret = ENGINE_NOT_MY_VBUCKET;
2690        break;
2691    case INVALID_CAS:
2692        ret = ENGINE_KEY_EEXISTS;
2693        break;
2694    case IS_LOCKED:
2695        ret = ENGINE_TMPFAIL;
2696        break;
2697    case NOT_FOUND:
2698        ret = ENGINE_KEY_ENOENT;
2699    case WAS_CLEAN:
2700    case WAS_DIRTY:
2701        if (v) {
2702            // Keep lh as we need to do v->getCas
2703            queueDirty(vb, v, nullptr, &seqno);
2704            *cas = v->getCas();
2705        }
2706
2707        if (delrv != NOT_FOUND) {
2708            mutInfo->seqno = seqno;
2709            mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
2710            if (itemMeta != nullptr) {
2711                itemMeta->cas = v->getCas();
2712            }
2713        }
2714        break;
2715    case NEED_BG_FETCH:
2716        // We already figured out if a bg fetch is requred for a full-evicted
2717        // item above.
2718        throw std::logic_error("EventuallyPersistentStore::deleteItem: "
2719                "Unexpected NEEDS_BG_FETCH from unlocked_softDelete");
2720    }
2721    return ret;
2722}
2723
2724ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2725                                                     const std::string &key,
2726                                                     uint64_t *cas,
2727                                                     uint64_t *seqno,
2728                                                     uint16_t vbucket,
2729                                                     const void *cookie,
2730                                                     bool force,
2731                                                     ItemMetaData *itemMeta,
2732                                                     bool tapBackfill,
2733                                                     GenerateBySeqno genBySeqno,
2734                                                     GenerateCas generateCas,
2735                                                     uint64_t bySeqno,
2736                                                     ExtendedMetaData *emd,
2737                                                     bool isReplication)
2738{
2739    RCPtr<VBucket> vb = getVBucket(vbucket);
2740
2741    if (!vb) {
2742        ++stats.numNotMyVBuckets;
2743        return ENGINE_NOT_MY_VBUCKET;
2744    }
2745
2746    ReaderLockHolder rlh(vb->getStateLock());
2747    if (vb->getState() == vbucket_state_dead) {
2748        ++stats.numNotMyVBuckets;
2749        return ENGINE_NOT_MY_VBUCKET;
2750    } else if (vb->getState() == vbucket_state_replica && !force) {
2751        ++stats.numNotMyVBuckets;
2752        return ENGINE_NOT_MY_VBUCKET;
2753    } else if (vb->getState() == vbucket_state_pending && !force) {
2754        if (vb->addPendingOp(cookie)) {
2755            return ENGINE_EWOULDBLOCK;
2756        }
2757    } else if (vb->isTakeoverBackedUp()) {
2758        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a deleteWithMeta "
2759                "op, because takeover is lagging", vb->getId());
2760        return ENGINE_TMPFAIL;
2761    }
2762
2763    //check for the incoming item's CAS validity
2764    if (!Item::isValidCas(itemMeta->cas)) {
2765        return ENGINE_KEY_EEXISTS;
2766    }
2767
2768    int bucket_num(0);
2769    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2770    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2771    if (!force) { // Need conflict resolution.
2772        if (v)  {
2773            if (v->isTempInitialItem()) {
2774                bgFetch(key, vbucket, cookie, true);
2775                return ENGINE_EWOULDBLOCK;
2776            }
2777
2778            if (!conflictResolver->resolve(*v, *itemMeta, true)) {
2779                ++stats.numOpsDelMetaResolutionFailed;
2780                return ENGINE_KEY_EEXISTS;
2781            }
2782        } else {
2783            // Item is 1) deleted or not existent in the value eviction case OR
2784            // 2) deleted or evicted in the full eviction.
2785            if (vb->maybeKeyExistsInFilter(key)) {
2786                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2787                                             cookie, true, isReplication);
2788            } else {
2789                // Even though bloomfilter predicted that item doesn't exist
2790                // on disk, we must put this delete on disk if the cas is valid.
2791                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2792                                                            eviction_policy,
2793                                                            isReplication);
2794                if (rv == ADD_NOMEM) {
2795                    return ENGINE_ENOMEM;
2796                }
2797                v = vb->ht.unlocked_find(key, bucket_num, true, false);
2798                v->setDeleted();
2799            }
2800        }
2801    } else {
2802        if (!v) {
2803            // We should always try to persist a delete here.
2804            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2805                                                        eviction_policy,
2806                                                        isReplication);
2807            if (rv == ADD_NOMEM) {
2808                return ENGINE_ENOMEM;
2809            }
2810            v = vb->ht.unlocked_find(key, bucket_num, true, false);
2811            v->setDeleted();
2812            v->setCas(*cas);
2813        } else if (v->isTempInitialItem()) {
2814            v->setDeleted();
2815            v->setCas(*cas);
2816        }
2817    }
2818
2819    if (v && v->isLocked(ep_current_time()) &&
2820        (vb->getState() == vbucket_state_replica ||
2821         vb->getState() == vbucket_state_pending)) {
2822        v->unlock();
2823    }
2824    mutation_type_t delrv;
2825    delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2826                                       eviction_policy, true);
2827    *cas = v ? v->getCas() : 0;
2828
2829    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2830    switch (delrv) {
2831    case NOMEM:
2832        ret = ENGINE_ENOMEM;
2833        break;
2834    case INVALID_VBUCKET:
2835        ret = ENGINE_NOT_MY_VBUCKET;
2836        break;
2837    case INVALID_CAS:
2838        ret = ENGINE_KEY_EEXISTS;
2839        break;
2840    case IS_LOCKED:
2841        ret = ENGINE_TMPFAIL;
2842        break;
2843    case NOT_FOUND:
2844        ret = ENGINE_KEY_ENOENT;
2845        break;
2846    case WAS_DIRTY:
2847    case WAS_CLEAN:
2848        if (genBySeqno == GenerateBySeqno::No) {
2849            v->setBySeqno(bySeqno);
2850        }
2851
2852        vb->setMaxCasAndTrackDrift(v->getCas());
2853
2854        if (tapBackfill) {
2855            tapQueueDirty(*vb, v, lh, seqno, genBySeqno);
2856        } else {
2857            queueDirty(vb, v, &lh, seqno, genBySeqno, generateCas);
2858        }
2859        break;
2860    case NEED_BG_FETCH:
2861        lh.unlock();
2862        bgFetch(key, vbucket, cookie, true);
2863        ret = ENGINE_EWOULDBLOCK;
2864    }
2865
2866    return ret;
2867}
2868
2869void EventuallyPersistentStore::reset() {
2870    auto buckets = vbMap.getBuckets();
2871    for (auto vbid : buckets) {
2872        RCPtr<VBucket> vb = getVBucket(vbid);
2873        if (vb) {
2874            LockHolder lh(vb_mutexes[vb->getId()]);
2875            vb->ht.clear();
2876            vb->checkpointManager.clear(vb->getState());
2877            vb->resetStats();
2878            vb->setPersistedSnapshot(0, 0);
2879        }
2880    }
2881
2882    ++stats.diskQueueSize;
2883    bool inverse = true;
2884    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
2885    // Waking up (notifying) one flusher is good enough for diskFlushAll
2886    vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2887}
2888
2889/**
2890 * Callback invoked after persisting an item from memory to disk.
2891 *
2892 * This class exists to create a closure around a few variables within
2893 * EventuallyPersistentStore::flushOne so that an object can be
2894 * requeued in case of failure to store in the underlying layer.
2895 */
2896class PersistenceCallback : public Callback<mutation_result>,
2897                            public Callback<int> {
2898public:
2899
2900    PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2901                        EventuallyPersistentStore& st, EPStats& s, uint64_t c)
2902        : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2903        if (!vb) {
2904            throw std::invalid_argument("PersistenceCallback(): vb is NULL");
2905        }
2906    }
2907
2908    // This callback is invoked for set only.
2909    void callback(mutation_result &value) {
2910        if (value.first == 1) {
2911            int bucket_num(0);
2912            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2913                                                        &bucket_num);
2914            StoredValue *v = store.fetchValidValue(vbucket,
2915                                                   queuedItem->getKey(),
2916                                                   bucket_num, true, false);
2917            if (v) {
2918                if (v->getCas() == cas) {
2919                    // mark this item clean only if current and stored cas
2920                    // value match
2921                    v->markClean();
2922                }
2923                if (v->isNewCacheItem()) {
2924                    if (value.second) {
2925                        // Insert in value-only or full eviction mode.
2926                        ++vbucket->opsCreate;
2927                        vbucket->incrMetaDataDisk(*queuedItem);
2928                    } else { // Update in full eviction mode.
2929                        vbucket->ht.decrNumTotalItems();
2930                        ++vbucket->opsUpdate;
2931                    }
2932                    v->setNewCacheItem(false);
2933                } else { // Update in value-only or full eviction mode.
2934                    ++vbucket->opsUpdate;
2935                }
2936            }
2937
2938            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2939            stats.decrDiskQueueSize(1);
2940            stats.totalPersisted++;
2941        } else {
2942            // If the return was 0 here, we're in a bad state because
2943            // we do not know the rowid of this object.
2944            if (value.first == 0) {
2945                int bucket_num(0);
2946                LockHolder lh = vbucket->ht.getLockedBucket(
2947                                           queuedItem->getKey(), &bucket_num);
2948                StoredValue *v = store.fetchValidValue(vbucket,
2949                                                       queuedItem->getKey(),
2950                                                       bucket_num, true,
2951                                                       false);
2952                if (v) {
2953                    std::stringstream ss;
2954                    ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2955                       << queuedItem->getVBucketId() << " (rowid="
2956                       << v->getBySeqno() << ") returned 0 updates\n";
2957                    LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2958                } else {
2959                    LOG(EXTENSION_LOG_WARNING,
2960                        "Error persisting now missing ``%s'' from vb%d",
2961                        queuedItem->getKey().c_str(),
2962                        queuedItem->getVBucketId());
2963                }
2964
2965                vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2966                stats.decrDiskQueueSize(1);
2967            } else {
2968                std::stringstream ss;
2969                ss <<
2970                "Fatal error in persisting SET ``" <<
2971                queuedItem->getKey() << "'' on vb "
2972                   << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2973                LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2974                redirty();
2975            }
2976        }
2977    }
2978
2979    // This callback is invoked for deletions only.
2980    //
2981    // The boolean indicates whether the underlying storage
2982    // successfully deleted the item.
2983    void callback(int &value) {
2984        // > 1 would be bad.  We were only trying to delete one row.
2985        if (value > 1) {
2986            throw std::logic_error("PersistenceCallback::callback: value "
2987                    "(which is " + std::to_string(value) +
2988                    ") should be <= 1 for deletions");
2989        }
2990        // -1 means fail
2991        // 1 means we deleted one row
2992        // 0 means we did not delete a row, but did not fail (did not exist)
2993        if (value >= 0) {
2994            // We have successfully removed an item from the disk, we
2995            // may now remove it from the hash table.
2996            int bucket_num(0);
2997            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2998                                                        &bucket_num);
2999            StoredValue *v = store.fetchValidValue(vbucket,
3000                                                   queuedItem->getKey(),
3001                                                   bucket_num, true, false);
3002            // Delete the item in the hash table iff:
3003            //  1. Item is existent in hashtable, and deleted flag is true
3004            //  2. rev seqno of queued item matches rev seqno of hash table item
3005            if (v && v->isDeleted() &&
3006                (queuedItem->getRevSeqno() == v->getRevSeqno())) {
3007                bool newCacheItem = v->isNewCacheItem();
3008                bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
3009                                                        bucket_num);
3010                if (!deleted) {
3011                    throw std::logic_error("PersistenceCallback:callback: "
3012                            "Failed to delete key '" + queuedItem->getKey() +
3013                            "' from bucket " + std::to_string(bucket_num));
3014                }
3015                if (newCacheItem && value > 0) {
3016                    // Need to decrement the item counter again for an item that
3017                    // exists on DB file, but not in memory (i.e., full eviction),
3018                    // because we created the temp item in memory and incremented
3019                    // the item counter when a deletion is pushed in the queue.
3020                    vbucket->ht.decrNumTotalItems();
3021                }
3022
3023                /**
3024                 * Deleted items are to be added to the bloomfilter,
3025                 * in either eviction policy.
3026                 */
3027                vbucket->addToFilter(queuedItem->getKey());
3028            }
3029
3030            if (value > 0) {
3031                ++stats.totalPersisted;
3032                ++vbucket->opsDelete;
3033            }
3034            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3035            stats.decrDiskQueueSize(1);
3036            vbucket->decrMetaDataDisk(*queuedItem);
3037        } else {
3038            std::stringstream ss;
3039            ss << "Fatal error in persisting DELETE ``" <<
3040            queuedItem->getKey() << "'' on vb "
3041               << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3042            LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3043            redirty();
3044        }
3045    }
3046
3047private:
3048
3049    void redirty() {
3050        if (store.vbMap.isBucketDeletion(vbucket->getId())) {
3051            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3052            stats.decrDiskQueueSize(1);
3053            return;
3054        }
3055        ++stats.flushFailed;
3056        store.invokeOnLockedStoredValue(queuedItem->getKey(),
3057                                         queuedItem->getVBucketId(),
3058                                         &StoredValue::reDirty);
3059        vbucket->rejectQueue.push(queuedItem);
3060        ++vbucket->opsReject;
3061    }
3062
3063    const queued_item queuedItem;
3064    RCPtr<VBucket> vbucket;
3065    EventuallyPersistentStore& store;
3066    EPStats& stats;
3067    uint64_t cas;
3068    DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
3069};
3070
3071bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
3072                                                     time_t when) {
3073    bool inverse = false;
3074    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
3075        flushAllTaskCtx.cookie = cookie;
3076        flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3077        ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
3078        ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
3079        return true;
3080    } else {
3081        return false;
3082    }
3083}
3084
3085void EventuallyPersistentStore::setFlushAllComplete() {
3086    // Notify memcached about flushAll task completion, and
3087    // set diskFlushall flag to false
3088    if (flushAllTaskCtx.cookie) {
3089        engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
3090    }
3091    bool inverse = false;
3092    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3093    inverse = true;
3094    diskFlushAll.compare_exchange_strong(inverse, false);
3095}
3096
3097void EventuallyPersistentStore::flushOneDeleteAll() {
3098    for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
3099        RCPtr<VBucket> vb = getVBucket(i);
3100        // Reset the vBucket if it's non-null and not already in the middle of
3101        // being created / destroyed.
3102        if (vb &&
3103            !(vbMap.isBucketCreation(i) || vbMap.isBucketDeletion(i))) {
3104            LockHolder lh(vb_mutexes[vb->getId()]);
3105            getRWUnderlying(vb->getId())->reset(i);
3106        }
3107    }
3108
3109    stats.decrDiskQueueSize(1);
3110    setFlushAllComplete();
3111}
3112
3113int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
3114    KVShard *shard = vbMap.getShardByVbId(vbid);
3115    if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
3116        if (shard->getId() == EP_PRIMARY_SHARD) {
3117            flushOneDeleteAll();
3118        } else {
3119            // disk flush is pending just return
3120            return 0;
3121        }
3122    }
3123
3124    int items_flushed = 0;
3125    const rel_time_t flush_start = ep_current_time();
3126
3127    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3128    if (vb) {
3129        LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3130        if (!lh.islocked()) { // Try another bucket if this one is locked
3131            return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
3132        }
3133
3134        std::vector<queued_item> items;
3135        KVStore *rwUnderlying = getRWUnderlying(vbid);
3136
3137        while (!vb->rejectQueue.empty()) {
3138            items.push_back(vb->rejectQueue.front());
3139            vb->rejectQueue.pop();
3140        }
3141
3142        // Append any 'backfill' items (mutations added by a TAP stream).
3143        vb->getBackfillItems(items);
3144
3145        // Append all items outstanding for the persistence cursor.
3146        snapshot_range_t range;
3147        range = vb->checkpointManager.getAllItemsForCursor(
3148                CheckpointManager::pCursorName, items);
3149
3150        if (!items.empty()) {
3151            while (!rwUnderlying->begin()) {
3152                ++stats.beginFailed;
3153                LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
3154                    "Retry in 1 sec ...");
3155                sleep(1);
3156            }
3157            rwUnderlying->optimizeWrites(items);
3158
3159            Item *prev = NULL;
3160            auto vbstate = vb->getVBucketState();
3161            uint64_t maxSeqno = 0;
3162            range.start = std::max(range.start, vbstate.lastSnapStart);
3163
3164            bool mustCheckpointVBState = false;
3165            std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
3166
3167            for (const auto& item : items) {
3168
3169                if (!item->shouldPersist()) {
3170                    continue;
3171                }
3172
3173                if (item->getOperation() == queue_op::set_vbucket_state) {
3174                    // No actual item explicitly persisted to (this op exists
3175                    // to ensure a commit occurs with the current vbstate);
3176                    // flag that we must trigger a snapshot even if there are
3177                    // no 'real' items in the checkpoint.
3178                    mustCheckpointVBState = true;
3179
3180                    // Update queuing stats how this item has logically been
3181                    // processed.
3182                    stats.decrDiskQueueSize(1);
3183                    vb->doStatsForFlushing(*item, item->size());
3184
3185                } else if (!prev || prev->getKey() != item->getKey()) {
3186                    prev = item.get();
3187                    ++items_flushed;
3188                    PersistenceCallback *cb = flushOneDelOrSet(item, vb);
3189                    if (cb) {
3190                        pcbs.push_back(cb);
3191                    }
3192
3193                    maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
3194                    vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
3195                    if (item->isDeleted()) {
3196                        vbstate.maxDeletedSeqno =
3197                                std::max(vbstate.maxDeletedSeqno,
3198                                         item->getRevSeqno());
3199                    }
3200                    ++stats.flusher_todo;
3201
3202                } else {
3203                    // Item is the same key as the previous[1] one - don't need
3204                    // to flush to disk.
3205                    // [1] Previous here really means 'next' - optimizeWrites()
3206                    //     above has actually re-ordered items such that items
3207                    //     with the same key are ordered from high->low seqno.
3208                    //     This means we only write the highest (i.e. newest)
3209                    //     item for a given key, and discard any duplicate,
3210                    //     older items.
3211                    stats.decrDiskQueueSize(1);
3212                    vb->doStatsForFlushing(*item, item->size());
3213                }
3214            }
3215
3216
3217            {
3218                ReaderLockHolder rlh(vb->getStateLock());
3219                if (vb->getState() == vbucket_state_active) {
3220                    if (maxSeqno) {
3221                        range.start = maxSeqno;
3222                        range.end = maxSeqno;
3223                    }
3224                }
3225
3226                // Update VBstate based on the changes we have just made,
3227                // then tell the rwUnderlying the 'new' state
3228                // (which will persisted as part of the commit() below).
3229                vbstate.lastSnapStart = range.start;
3230                vbstate.lastSnapEnd = range.end;
3231
3232                // Do we need to trigger a persist of the state?
3233                // If there are no "real" items to flush, and we encountered
3234                // a set_vbucket_state meta-item.
3235                const bool persist = (items_flushed == 0) && mustCheckpointVBState;
3236
3237                KVStatsCallback kvcb(this);
3238                if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
3239                                                  &kvcb, persist) != true) {
3240                    return RETRY_FLUSH_VBUCKET;
3241                }
3242
3243                if (vbMap.setBucketCreation(vbid, false)) {
3244                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3245                }
3246            }
3247
3248            // Commit all mutations to disk if there is a non-zero number
3249            // of items to flush, and the commit interval is zero.
3250            if ((items_flushed > 0) &&
3251                (decrCommitInterval(shard->getId()) == 0)) {
3252
3253                commit(shard->getId());
3254
3255                // Now the commit is complete, vBucket file must exist.
3256                if (vbMap.setBucketCreation(vbid, false)) {
3257                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3258                }
3259            }
3260
3261            hrtime_t end = gethrtime();
3262            uint64_t trans_time = (end - flush_start) / 1000000;
3263
3264            lastTransTimePerItem.store((items_flushed == 0) ? 0 :
3265                                       static_cast<double>(trans_time) /
3266                                       static_cast<double>(items_flushed));
3267            stats.cumulativeFlushTime.fetch_add(ep_current_time()
3268                                                - flush_start);
3269            stats.flusher_todo.store(0);
3270            stats.totalPersistVBState++;
3271
3272            if (vb->rejectQueue.empty()) {
3273                vb->setPersistedSnapshot(range.start, range.end);
3274                uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
3275                if (highSeqno > 0 &&
3276                    highSeqno != vbMap.getPersistenceSeqno(vbid)) {
3277                    vbMap.setPersistenceSeqno(vbid, highSeqno);
3278                }
3279            }
3280        }
3281
3282        rwUnderlying->pendingTasks();
3283
3284        if (vb->checkpointManager.getNumCheckpoints() > 1) {
3285            wakeUpCheckpointRemover();
3286        }
3287
3288        if (vb->rejectQueue.empty()) {
3289            vb->checkpointManager.itemsPersisted();
3290            uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
3291            uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
3292            vb->notifyOnPersistence(engine, seqno, true);
3293            vb->notifyOnPersistence(engine, chkid, false);
3294            if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
3295                vbMap.setPersistenceCheckpointId(vbid, chkid);
3296            }
3297        } else {
3298            return RETRY_FLUSH_VBUCKET;
3299        }
3300    }
3301
3302    return items_flushed;
3303}
3304
3305void EventuallyPersistentStore::commit(uint16_t shardId) {
3306    KVStore *rwUnderlying = getRWUnderlyingByShard(shardId);
3307    std::list<PersistenceCallback *>& pcbs = rwUnderlying->getPersistenceCbList();
3308    BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
3309    hrtime_t commit_start = gethrtime();
3310
3311    KVStatsCallback cb(this);
3312    while (!rwUnderlying->commit(&cb)) {
3313        ++stats.commitFailed;
3314        LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
3315            "1 sec...\n");
3316        sleep(1);
3317    }
3318
3319    while (!pcbs.empty()) {
3320         delete pcbs.front();
3321         pcbs.pop_front();
3322    }
3323
3324    ++stats.flusherCommits;
3325    hrtime_t commit_end = gethrtime();
3326    uint64_t commit_time = (commit_end - commit_start) / 1000000;
3327    stats.commit_time.store(commit_time);
3328    stats.cumulativeCommitTime.fetch_add(commit_time);
3329}
3330
3331PersistenceCallback*
3332EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
3333                                            RCPtr<VBucket> &vb) {
3334
3335    if (!vb) {
3336        stats.decrDiskQueueSize(1);
3337        return NULL;
3338    }
3339
3340    int64_t bySeqno = qi->getBySeqno();
3341    bool deleted = qi->isDeleted();
3342    rel_time_t queued(qi->getQueuedTime());
3343
3344    int dirtyAge = ep_current_time() - queued;
3345    stats.dirtyAgeHisto.add(dirtyAge * 1000000);
3346    stats.dirtyAge.store(dirtyAge);
3347    stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
3348                                         stats.dirtyAgeHighWat.load()));
3349
3350    // Wait until the VB is deleted before writing
3351    if (vbMap.isBucketDeletion(qi->getVBucketId())) {
3352        vb->rejectQueue.push(qi);
3353        ++vb->opsReject;
3354        return NULL;
3355    }
3356
3357    KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
3358    if (!deleted) {
3359        // TODO: Need to separate disk_insert from disk_update because
3360        // bySeqno doesn't give us that information.
3361        BlockTimer timer(bySeqno == -1 ?
3362                         &stats.diskInsertHisto : &stats.diskUpdateHisto,
3363                         bySeqno == -1 ? "disk_insert" : "disk_update",
3364                         stats.timingLog);
3365        PersistenceCallback *cb =
3366            new PersistenceCallback(qi, vb, *this, stats, qi->getCas());
3367        rwUnderlying->set(*qi, *cb);
3368        return cb;
3369    } else {
3370        BlockTimer timer(&stats.diskDelHisto, "disk_delete",
3371                         stats.timingLog);
3372        PersistenceCallback *cb =
3373            new PersistenceCallback(qi, vb, *this, stats, 0);
3374        rwUnderlying->del(*qi, *cb);
3375        return cb;
3376    }
3377}
3378
3379void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
3380                                           StoredValue* v,
3381                                           LockHolder *plh,
3382                                           uint64_t *seqno,
3383                                           const GenerateBySeqno generateBySeqno,
3384                                           const GenerateCas generateCas) {
3385    if (vb) {
3386        queued_item qi(v->toItem(false, vb->getId()));
3387
3388        bool rv = vb->checkpointManager.queueDirty(*vb, qi,
3389                                                   generateBySeqno, generateCas);
3390        v->setBySeqno(qi->getBySeqno());
3391
3392        if (seqno) {
3393            *seqno = v->getBySeqno();
3394        }
3395
3396        if (GenerateCas::Yes == generateCas) {
3397            v->setCas(qi->getCas());
3398        }
3399
3400        if (plh) {
3401            plh->unlock();
3402        }
3403
3404        if (rv) {
3405            KVShard* shard = vbMap.getShardByVbId(vb->getId());
3406            shard->getFlusher()->notifyFlushEvent();
3407        }
3408
3409        // Now notify replication
3410        engine.getTapConnMap().notifyVBConnections(vb->getId());
3411        engine.getDcpConnMap().notifyVBConnections(vb->getId(),
3412                                                   qi->getBySeqno());
3413    }
3414}
3415
3416void EventuallyPersistentStore::tapQueueDirty(VBucket &vb,
3417                                              StoredValue* v,
3418                                              LockHolder& plh,
3419                                              uint64_t *seqno,
3420                                              const GenerateBySeqno generateBySeqno) {
3421    queued_item qi(v->toItem(false, vb.getId()));
3422
3423    bool queued = vb.queueBackfillItem(qi, generateBySeqno);
3424
3425    v->setBySeqno(qi->getBySeqno());
3426
3427    /* During backfill on a TAP receiver we need to update the snapshot
3428       range in the checkpoint. Has to be done here because in case of TAP
3429       backfill, above, we use vb.queueBackfillItem() instead of
3430       vb.checkpointManager.queueDirty() */
3431    if (GenerateBySeqno::Yes == generateBySeqno) {
3432        vb.checkpointManager.resetSnapshotRange();
3433    }
3434
3435    if (seqno) {
3436        *seqno = v->getBySeqno();
3437    }
3438
3439    plh.unlock();
3440
3441    if (queued) {
3442        KVShard* shard = vbMap.getShardByVbId(vb.getId());
3443        shard->getFlusher()->notifyFlushEvent();
3444    }
3445}
3446
3447std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
3448{
3449    return getOneROUnderlying()->listPersistedVbuckets();
3450}
3451
3452void EventuallyPersistentStore::warmupCompleted() {
3453    // Snapshot VBucket state after warmup to ensure Failover table is
3454    // persisted.
3455    scheduleVBStatePersist();
3456
3457    if (engine.getConfiguration().getAlogPath().length() > 0) {
3458
3459        if (engine.getConfiguration().isAccessScannerEnabled()) {
3460            LockHolder lh(accessScanner.mutex);
3461            accessScanner.enabled = true;
3462            lh.unlock();
3463            LOG(EXTENSION_LOG_NOTICE, "Access Scanner task enabled");
3464            size_t smin = engine.getConfiguration().getAlogSleepTime();
3465            setAccessScannerSleeptime(smin, true);
3466        } else {
3467            LockHolder lh(accessScanner.mutex);
3468            accessScanner.enabled = false;
3469            LOG(EXTENSION_LOG_NOTICE, "Access Scanner task disabled");
3470        }
3471
3472        Configuration &config = engine.get