xref: /4.6.4/ep-engine/src/ep.cc (revision 92330468)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string.h>
21#include <time.h>
22
23#include <fstream>
24#include <functional>
25#include <iostream>
26#include <map>
27#include <sstream>
28#include <string>
29#include <utility>
30#include <vector>
31
32#include "access_scanner.h"
33#include "bgfetcher.h"
34#include "checkpoint_remover.h"
35#include "conflict_resolution.h"
36#include "defragmenter.h"
37#include "ep.h"
38#include "ep_engine.h"
39#include "ext_meta_parser.h"
40#include "failover-table.h"
41#include "flusher.h"
42#include "htresizer.h"
43#include "kvshard.h"
44#include "kvstore.h"
45#include "locks.h"
46#include "mutation_log.h"
47#include "warmup.h"
48#include "connmap.h"
49#include "replicationthrottle.h"
50#include "tasks.h"
51
52class StatsValueChangeListener : public ValueChangedListener {
53public:
54    StatsValueChangeListener(EPStats &st, EventuallyPersistentStore &str)
55        : stats(st), store(str) {
56        // EMPTY
57    }
58
59    virtual void sizeValueChanged(const std::string &key, size_t value) {
60        if (key.compare("max_size") == 0) {
61            stats.setMaxDataSize(value);
62            store.getEPEngine().getDcpConnMap(). \
63                                     updateMaxActiveSnoozingBackfills(value);
64            size_t low_wat = static_cast<size_t>
65                    (static_cast<double>(value) * stats.mem_low_wat_percent);
66            size_t high_wat = static_cast<size_t>
67                    (static_cast<double>(value) * stats.mem_high_wat_percent);
68            stats.mem_low_wat.store(low_wat);
69            stats.mem_high_wat.store(high_wat);
70            store.setCursorDroppingLowerUpperThresholds(value);
71        } else if (key.compare("mem_low_wat") == 0) {
72            stats.mem_low_wat.store(value);
73            stats.mem_low_wat_percent.store(
74                                    (double)(value) / stats.getMaxDataSize());
75        } else if (key.compare("mem_high_wat") == 0) {
76            stats.mem_high_wat.store(value);
77            stats.mem_high_wat_percent.store(
78                                    (double)(value) / stats.getMaxDataSize());
79        } else if (key.compare("replication_throttle_threshold") == 0) {
80            stats.replicationThrottleThreshold.store(
81                                          static_cast<double>(value) / 100.0);
82        } else if (key.compare("warmup_min_memory_threshold") == 0) {
83            stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
84        } else if (key.compare("warmup_min_items_threshold") == 0) {
85            stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
86        } else {
87            LOG(EXTENSION_LOG_WARNING,
88                "Failed to change value for unknown variable, %s\n",
89                key.c_str());
90        }
91    }
92
93private:
94    EPStats &stats;
95    EventuallyPersistentStore &store;
96};
97
98/**
99 * A configuration value changed listener that responds to ep-engine
100 * parameter changes by invoking engine-specific methods on
101 * configuration change events.
102 */
103class EPStoreValueChangeListener : public ValueChangedListener {
104public:
105    EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
106    }
107
108    virtual void sizeValueChanged(const std::string &key, size_t value) {
109        if (key.compare("bg_fetch_delay") == 0) {
110            store.setBGFetchDelay(static_cast<uint32_t>(value));
111        } else if (key.compare("compaction_write_queue_cap") == 0) {
112            store.setCompactionWriteQueueCap(value);
113        } else if (key.compare("exp_pager_stime") == 0) {
114            store.setExpiryPagerSleeptime(value);
115        } else if (key.compare("exp_pager_initial_run_time") == 0) {
116            store.setExpiryPagerTasktime(value);
117        } else if (key.compare("alog_sleep_time") == 0) {
118            store.setAccessScannerSleeptime(value, false);
119        } else if (key.compare("alog_task_time") == 0) {
120            store.resetAccessScannerStartTime();
121        } else if (key.compare("mutation_mem_threshold") == 0) {
122            double mem_threshold = static_cast<double>(value) / 100;
123            StoredValue::setMutationMemoryThreshold(mem_threshold);
124        } else if (key.compare("backfill_mem_threshold") == 0) {
125            double backfill_threshold = static_cast<double>(value) / 100;
126            store.setBackfillMemoryThreshold(backfill_threshold);
127        } else if (key.compare("compaction_exp_mem_threshold") == 0) {
128            store.setCompactionExpMemThreshold(value);
129        } else if (key.compare("replication_throttle_queue_cap") == 0) {
130            store.getEPEngine().getReplicationThrottle().setQueueCap(value);
131        } else if (key.compare("replication_throttle_cap_pcnt") == 0) {
132            store.getEPEngine().getReplicationThrottle().setCapPercent(value);
133        } else {
134            LOG(EXTENSION_LOG_WARNING,
135                "Failed to change value for unknown variable, %s\n",
136                key.c_str());
137        }
138    }
139
140    virtual void booleanValueChanged(const std::string &key, bool value) {
141        if (key.compare("access_scanner_enabled") == 0) {
142            if (value) {
143                store.enableAccessScannerTask();
144            } else {
145                store.disableAccessScannerTask();
146            }
147        } else if (key.compare("bfilter_enabled") == 0) {
148            store.setAllBloomFilters(value);
149        } else if (key.compare("exp_pager_enabled") == 0) {
150            if (value) {
151                store.enableExpiryPager();
152            } else {
153                store.disableExpiryPager();
154            }
155        }
156    }
157
158    virtual void floatValueChanged(const std::string &key, float value) {
159        if (key.compare("bfilter_residency_threshold") == 0) {
160            store.setBfiltersResidencyThreshold(value);
161        } else if (key.compare("dcp_min_compression_ratio") == 0) {
162            store.getEPEngine().updateDcpMinCompressionRatio(value);
163        }
164    }
165
166private:
167    EventuallyPersistentStore &store;
168};
169
170/**
171 * Callback class used by EpStore, for adding relevent keys
172 * to bloomfilter during compaction.
173 */
174class BloomFilterCallback : public Callback<std::string&, bool&> {
175public:
176    BloomFilterCallback(EventuallyPersistentStore& eps, uint16_t vbid,
177                        bool residentRatioAlert)
178        : store(eps), vbucketId(vbid),
179          residentRatioLessThanThreshold(residentRatioAlert) {
180    }
181
182    void callback(std::string& key, bool& isDeleted) {
183        RCPtr<VBucket> vb = store.getVBucket(vbucketId);
184        if (vb) {
185            if (vb->isTempFilterAvailable()) {
186                if (store.getItemEvictionPolicy() == VALUE_ONLY) {
187                    /**
188                     * VALUE-ONLY EVICTION POLICY
189                     * Consider deleted items only.
190                     */
191                    if (isDeleted) {
192                        vb->addToTempFilter(key);
193                    }
194                } else {
195                    /**
196                     * FULL EVICTION POLICY
197                     * If vbucket's resident ratio is found to be less than
198                     * the residency threshold, consider all items, otherwise
199                     * consider deleted and non-resident items only.
200                     */
201                    if (residentRatioLessThanThreshold) {
202                        vb->addToTempFilter(key);
203                    } else {
204                        if (isDeleted || !store.isMetaDataResident(vb, key)) {
205                            vb->addToTempFilter(key);
206                        }
207                    }
208                }
209            }
210        }
211    }
212
213private:
214    EventuallyPersistentStore& store;
215    uint16_t vbucketId;
216    bool residentRatioLessThanThreshold;
217};
218
219class PendingOpsNotification : public GlobalTask {
220public:
221    PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
222        GlobalTask(&e, TaskId::PendingOpsNotification, 0, false),
223        engine(e), vbucket(vb) { }
224
225    std::string getDescription() {
226        std::stringstream ss;
227        ss << "Notify pending operations for vbucket " << vbucket->getId();
228        return ss.str();
229    }
230
231    bool run(void) {
232        vbucket->fireAllOps(engine);
233        return false;
234    }
235
236private:
237    EventuallyPersistentEngine &engine;
238    RCPtr<VBucket> vbucket;
239};
240
241EventuallyPersistentStore::EventuallyPersistentStore(
242    EventuallyPersistentEngine &theEngine) :
243    engine(theEngine), stats(engine.getEpStats()),
244    vbMap(theEngine.getConfiguration(), *this),
245    defragmenterTask(NULL),
246    bgFetchQueue(0),
247    diskFlushAll(false), bgFetchDelay(0),
248    backfillMemoryThreshold(0.95),
249    statsSnapshotTaskId(0), lastTransTimePerItem(0)
250{
251    cachedResidentRatio.activeRatio.store(0);
252    cachedResidentRatio.replicaRatio.store(0);
253
254    Configuration &config = engine.getConfiguration();
255    MutationLog *shardlog;
256    for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
257        std::stringstream s;
258        s << i;
259        shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
260                                 "." + s.str(),
261                                 engine.getConfiguration().getAlogBlockSize());
262        accessLog.push_back(shardlog);
263    }
264
265    storageProperties = new StorageProperties(true, true, true, true);
266
267    const auto size = GlobalTask::allTaskIds.size();
268    stats.schedulingHisto = new Histogram<ProcessClock::duration::rep>[size];
269    stats.taskRuntimeHisto = new Histogram<ProcessClock::duration::rep>[size];
270
271    for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) {
272        stats.schedulingHisto[i].reset();
273        stats.taskRuntimeHisto[i].reset();
274    }
275
276    ExecutorPool::get()->registerTaskable(ObjectRegistry::getCurrentEngine()->getTaskable());
277
278    size_t num_vbs = config.getMaxVbuckets();
279    vb_mutexes = new Mutex[num_vbs];
280
281    *stats.memOverhead = sizeof(EventuallyPersistentStore);
282
283    if (config.getConflictResolutionType().compare("lww") == 0) {
284        conflictResolver.reset(new LastWriteWinsResolution());
285    } else {
286        conflictResolver.reset(new RevisionSeqnoResolution());
287    }
288
289    stats.setMaxDataSize(config.getMaxSize());
290    config.addValueChangedListener("max_size",
291                                   new StatsValueChangeListener(stats, *this));
292    getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills(
293                                                        config.getMaxSize());
294
295    stats.mem_low_wat.store(config.getMemLowWat());
296    config.addValueChangedListener("mem_low_wat",
297                                   new StatsValueChangeListener(stats, *this));
298    stats.mem_low_wat_percent.store(
299                (double)(stats.mem_low_wat.load()) / stats.getMaxDataSize());
300
301    stats.mem_high_wat.store(config.getMemHighWat());
302    config.addValueChangedListener("mem_high_wat",
303                                   new StatsValueChangeListener(stats, *this));
304    stats.mem_high_wat_percent.store(
305                (double)(stats.mem_high_wat.load()) / stats.getMaxDataSize());
306
307    setCursorDroppingLowerUpperThresholds(config.getMaxSize());
308
309    stats.replicationThrottleThreshold.store(static_cast<double>
310                                    (config.getReplicationThrottleThreshold())
311                                     / 100.0);
312    config.addValueChangedListener("replication_throttle_threshold",
313                                   new StatsValueChangeListener(stats, *this));
314
315    stats.replicationThrottleWriteQueueCap.store(
316                                    config.getReplicationThrottleQueueCap());
317    config.addValueChangedListener("replication_throttle_queue_cap",
318                                   new EPStoreValueChangeListener(*this));
319    config.addValueChangedListener("replication_throttle_cap_pcnt",
320                                   new EPStoreValueChangeListener(*this));
321
322    setBGFetchDelay(config.getBgFetchDelay());
323    config.addValueChangedListener("bg_fetch_delay",
324                                   new EPStoreValueChangeListener(*this));
325
326    stats.warmupMemUsedCap.store(static_cast<double>
327                               (config.getWarmupMinMemoryThreshold()) / 100.0);
328    config.addValueChangedListener("warmup_min_memory_threshold",
329                                   new StatsValueChangeListener(stats, *this));
330    stats.warmupNumReadCap.store(static_cast<double>
331                                (config.getWarmupMinItemsThreshold()) / 100.0);
332    config.addValueChangedListener("warmup_min_items_threshold",
333                                   new StatsValueChangeListener(stats, *this));
334
335    double mem_threshold = static_cast<double>
336                                      (config.getMutationMemThreshold()) / 100;
337    StoredValue::setMutationMemoryThreshold(mem_threshold);
338    config.addValueChangedListener("mutation_mem_threshold",
339                                   new EPStoreValueChangeListener(*this));
340
341    double backfill_threshold = static_cast<double>
342                                      (config.getBackfillMemThreshold()) / 100;
343    setBackfillMemoryThreshold(backfill_threshold);
344    config.addValueChangedListener("backfill_mem_threshold",
345                                   new EPStoreValueChangeListener(*this));
346
347    config.addValueChangedListener("bfilter_enabled",
348                                   new EPStoreValueChangeListener(*this));
349
350    bfilterResidencyThreshold = config.getBfilterResidencyThreshold();
351    config.addValueChangedListener("bfilter_residency_threshold",
352                                   new EPStoreValueChangeListener(*this));
353
354    compactionExpMemThreshold = config.getCompactionExpMemThreshold();
355    config.addValueChangedListener("compaction_exp_mem_threshold",
356                                   new EPStoreValueChangeListener(*this));
357
358    compactionWriteQueueCap = config.getCompactionWriteQueueCap();
359    config.addValueChangedListener("compaction_write_queue_cap",
360                                   new EPStoreValueChangeListener(*this));
361
362    config.addValueChangedListener("dcp_min_compression_ratio",
363                                   new EPStoreValueChangeListener(*this));
364
365    const std::string &policy = config.getItemEvictionPolicy();
366    if (policy.compare("value_only") == 0) {
367        eviction_policy = VALUE_ONLY;
368    } else {
369        eviction_policy = FULL_EVICTION;
370    }
371
372    warmupTask = new Warmup(*this, config);
373}
374
375bool EventuallyPersistentStore::initialize() {
376    // We should nuke everything unless we want warmup
377    Configuration &config = engine.getConfiguration();
378    if (!config.isWarmup()) {
379        reset();
380    }
381
382    if (!startFlusher()) {
383        LOG(EXTENSION_LOG_WARNING,
384            "FATAL: Failed to create and start flushers");
385        return false;
386    }
387    if (!startBgFetcher()) {
388        LOG(EXTENSION_LOG_WARNING,
389           "FATAL: Failed to create and start bgfetchers");
390        return false;
391    }
392
393    warmupTask->start();
394
395    itmpTask = new ItemPager(&engine, stats);
396    ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
397
398    LockHolder elh(expiryPager.mutex);
399    expiryPager.enabled = config.isExpPagerEnabled();
400    elh.unlock();
401
402    size_t expiryPagerSleeptime = config.getExpPagerStime();
403    setExpiryPagerSleeptime(expiryPagerSleeptime);
404    config.addValueChangedListener("exp_pager_stime",
405                                   new EPStoreValueChangeListener(*this));
406    config.addValueChangedListener("exp_pager_enabled",
407                                   new EPStoreValueChangeListener(*this));
408    config.addValueChangedListener("exp_pager_initial_run_time",
409                                   new EPStoreValueChangeListener(*this));
410
411    ExTask htrTask = new HashtableResizerTask(this, 10);
412    ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
413
414    size_t checkpointRemoverInterval = config.getChkRemoverStime();
415    chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
416                                                   checkpointRemoverInterval);
417    ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
418
419    ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
420    ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
421
422#if HAVE_JEMALLOC
423    /* Only create the defragmenter task if we have an underlying memory
424     * allocator which can facilitate defragmenting memory.
425     */
426    defragmenterTask = new DefragmenterTask(&engine, stats);
427    ExecutorPool::get()->schedule(defragmenterTask, NONIO_TASK_IDX);
428#endif
429
430    return true;
431}
432
433EventuallyPersistentStore::~EventuallyPersistentStore() {
434    stopWarmup();
435    stopBgFetcher();
436    ExecutorPool::get()->stopTaskGroup(engine.getTaskable().getGID(), NONIO_TASK_IDX,
437                                       stats.forceShutdown);
438
439    ExecutorPool::get()->cancel(statsSnapshotTaskId);
440
441    LockHolder lh(accessScanner.mutex);
442    ExecutorPool::get()->cancel(accessScanner.task);
443    lh.unlock();
444
445    stopFlusher();
446
447    ExecutorPool::get()->unregisterTaskable(engine.getTaskable(),
448                                            stats.forceShutdown);
449
450    delete [] vb_mutexes;
451    delete [] stats.schedulingHisto;
452    delete [] stats.taskRuntimeHisto;
453    delete warmupTask;
454    delete storageProperties;
455    defragmenterTask.reset();
456
457    std::vector<MutationLog*>::iterator it;
458    for (it = accessLog.begin(); it != accessLog.end(); it++) {
459        delete *it;
460    }
461}
462
463const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
464    return vbMap.shards[shardId]->getFlusher();
465}
466
467uint16_t EventuallyPersistentStore::getCommitInterval(uint16_t shardId) {
468    Flusher *flusher = vbMap.shards[shardId]->getFlusher();
469    return flusher->getCommitInterval();
470}
471
472uint16_t EventuallyPersistentStore::decrCommitInterval(uint16_t shardId) {
473    Flusher *flusher = vbMap.shards[shardId]->getFlusher();
474    return flusher->decrCommitInterval();
475}
476
477Warmup* EventuallyPersistentStore::getWarmup(void) const {
478    return warmupTask;
479}
480
481bool EventuallyPersistentStore::startFlusher() {
482    for (auto* shard : vbMap.shards) {
483        shard->getFlusher()->start();
484    }
485    return true;
486}
487
488void EventuallyPersistentStore::stopFlusher() {
489    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
490        Flusher *flusher = vbMap.shards[i]->getFlusher();
491        LOG(EXTENSION_LOG_NOTICE, "Attempting to stop the flusher for "
492            "shard:%" PRIu16, i);
493        bool rv = flusher->stop(stats.forceShutdown);
494        if (rv && !stats.forceShutdown) {
495            flusher->wait();
496        }
497    }
498}
499
500bool EventuallyPersistentStore::pauseFlusher() {
501    bool rv = true;
502    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
503        Flusher *flusher = vbMap.shards[i]->getFlusher();
504        if (!flusher->pause()) {
505            LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
506                "[%s], shard = %d", flusher->stateName(), i);
507            rv = false;
508        }
509    }
510    return rv;
511}
512
513bool EventuallyPersistentStore::resumeFlusher() {
514    bool rv = true;
515    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
516        Flusher *flusher = vbMap.shards[i]->getFlusher();
517        if (!flusher->resume()) {
518            LOG(EXTENSION_LOG_WARNING,
519                    "Attempted to resume flusher in state [%s], "
520                    "shard = %d", flusher->stateName(), i);
521            rv = false;
522        }
523    }
524    return rv;
525}
526
527void EventuallyPersistentStore::wakeUpFlusher() {
528    if (stats.diskQueueSize.load() == 0) {
529        for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
530            Flusher *flusher = vbMap.shards[i]->getFlusher();
531            flusher->wake();
532        }
533    }
534}
535
536bool EventuallyPersistentStore::startBgFetcher() {
537    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
538        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
539        if (bgfetcher == NULL) {
540            LOG(EXTENSION_LOG_WARNING,
541                "Failed to start bg fetcher for shard %d", i);
542            return false;
543        }
544        bgfetcher->start();
545    }
546    return true;
547}
548
549void EventuallyPersistentStore::stopBgFetcher() {
550    for (uint16_t i = 0; i < vbMap.shards.size(); i++) {
551        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
552        if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
553            LOG(EXTENSION_LOG_WARNING,
554                "Shutting down engine while there are still pending data "
555                "read for shard %d from database storage", i);
556        }
557        LOG(EXTENSION_LOG_NOTICE, "Stopping bg fetcher for shard:%" PRIu16, i);
558        bgfetcher->stop();
559    }
560}
561
562void
563EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
564                                             time_t startTime,
565                                             uint64_t revSeqno,
566                                             exp_type_t source) {
567    RCPtr<VBucket> vb = getVBucket(vbid);
568    if (vb) {
569        // Obtain reader access to the VB state change lock so that
570        // the VB can't switch state whilst we're processing
571        ReaderLockHolder rlh(vb->getStateLock());
572        if (vb->getState() == vbucket_state_active) {
573            int bucket_num(0);
574            LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
575            StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
576            if (v) {
577                if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
578                    // This is a temporary item whose background fetch for metadata
579                    // has completed.
580                    bool deleted = vb->ht.unlocked_del(key, bucket_num);
581                    if (!deleted) {
582                        throw std::logic_error("EPStore::deleteExpiredItem: "
583                                "Failed to delete key '" + key + "' from bucket "
584                                + std::to_string(bucket_num));
585                    }
586                } else if (v->isExpired(startTime) && !v->isDeleted()) {
587                    vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
588                    queueDirty(vb, v, &lh, NULL);
589                }
590            } else {
591                if (eviction_policy == FULL_EVICTION) {
592                    // Create a temp item and delete and push it
593                    // into the checkpoint queue, only if the bloomfilter
594                    // predicts that the item may exist on disk.
595                    if (vb->maybeKeyExistsInFilter(key)) {
596                        add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
597                                                                    eviction_policy);
598                        if (rv == ADD_NOMEM) {
599                            return;
600                        }
601                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
602                        v->setDeleted();
603                        v->setRevSeqno(revSeqno);
604                        vb->ht.unlocked_softDelete(v, 0, eviction_policy);
605                        queueDirty(vb, v, &lh, NULL);
606                    }
607                }
608            }
609            incExpirationStat(vb, source);
610        }
611    }
612}
613
614void
615EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
616                                                        std::string> > &keys,
617                                              exp_type_t source) {
618    std::list<std::pair<uint16_t, std::string> >::iterator it;
619    time_t startTime = ep_real_time();
620    for (it = keys.begin(); it != keys.end(); it++) {
621        deleteExpiredItem(it->first, it->second, startTime, 0, source);
622    }
623}
624
625StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
626                                                        const const_sized_buffer key,
627                                                        int bucket_num,
628                                                        bool wantDeleted,
629                                                        bool trackReference,
630                                                        bool queueExpired) {
631    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
632                                          trackReference);
633    if (v && !v->isDeleted() && !v->isTempItem()) {
634        // In the deleted case, we ignore expiration time.
635        if (v->isExpired(ep_real_time())) {
636            if (vb->getState() != vbucket_state_active) {
637                return wantDeleted ? v : NULL;
638            }
639
640            // queueDirty only allowed on active VB
641            if (queueExpired && vb->getState() == vbucket_state_active) {
642                incExpirationStat(vb, EXP_BY_ACCESS);
643                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
644                queueDirty(vb, v, NULL, NULL);
645            }
646            return wantDeleted ? v : NULL;
647        }
648    }
649    return v;
650}
651
652bool EventuallyPersistentStore::isMetaDataResident(RCPtr<VBucket> &vb,
653                                                   const std::string &key) {
654
655    if (!vb) {
656        throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL");
657    }
658
659    int bucket_num(0);
660    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
661    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, false, false);
662
663    if (v && !v->isTempItem()) {
664        return true;
665    } else {
666        return false;
667    }
668}
669
670protocol_binary_response_status EventuallyPersistentStore::evictKey(
671                                                        const std::string &key,
672                                                        uint16_t vbucket,
673                                                        const char **msg,
674                                                        size_t *msg_size,
675                                                        bool force) {
676    RCPtr<VBucket> vb = getVBucket(vbucket);
677    if (!vb || (vb->getState() != vbucket_state_active && !force)) {
678        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
679    }
680
681    int bucket_num(0);
682    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
683    StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
684
685    protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
686
687    *msg_size = 0;
688    if (v) {
689        if (force)  {
690            v->markClean();
691        }
692        if (v->isResident()) {
693            if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
694                *msg = "Ejected.";
695
696                // Add key to bloom filter incase of full eviction mode
697                if (getItemEvictionPolicy() == FULL_EVICTION) {
698                    vb->addToFilter(key);
699                }
700            } else {
701                *msg = "Can't eject: Dirty object.";
702                rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
703            }
704        } else {
705            *msg = "Already ejected.";
706        }
707    } else {
708        if (eviction_policy == VALUE_ONLY) {
709            *msg = "Not found.";
710            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
711        } else {
712            *msg = "Already ejected.";
713        }
714    }
715
716    return rv;
717}
718
719ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
720                                                        LockHolder &lock,
721                                                        int bucket_num,
722                                                        const const_sized_buffer key,
723                                                        RCPtr<VBucket> &vb,
724                                                        const void *cookie,
725                                                        bool metadataOnly,
726                                                        bool isReplication) {
727
728    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
729                                                eviction_policy,
730                                                isReplication);
731    switch(rv) {
732        case ADD_NOMEM:
733            return ENGINE_ENOMEM;
734
735        case ADD_EXISTS:
736        case ADD_UNDEL:
737        case ADD_SUCCESS:
738        case ADD_TMP_AND_BG_FETCH:
739            // Since the hashtable bucket is locked, we shouldn't get here
740            throw std::logic_error("EventuallyPersistentStore::addTempItemForBgFetch: "
741                    "Invalid result from addTempItem: " + std::to_string(rv));
742
743        case ADD_BG_FETCH:
744            lock.unlock();
745            bgFetch(key, vb->getId(), cookie, metadataOnly);
746    }
747    return ENGINE_EWOULDBLOCK;
748}
749
750ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
751                                                 const void *cookie,
752                                                 bool force,
753                                                 uint8_t nru) {
754
755    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
756    if (!vb) {
757        ++stats.numNotMyVBuckets;
758        return ENGINE_NOT_MY_VBUCKET;
759    }
760
761    // Obtain read-lock on VB state to ensure VB state changes are interlocked
762    // with this set
763    ReaderLockHolder rlh(vb->getStateLock());
764    if (vb->getState() == vbucket_state_dead) {
765        ++stats.numNotMyVBuckets;
766        return ENGINE_NOT_MY_VBUCKET;
767    } else if (vb->getState() == vbucket_state_replica && !force) {
768        ++stats.numNotMyVBuckets;
769        return ENGINE_NOT_MY_VBUCKET;
770    } else if (vb->getState() == vbucket_state_pending && !force) {
771        if (vb->addPendingOp(cookie)) {
772            return ENGINE_EWOULDBLOCK;
773        }
774    } else if (vb->isTakeoverBackedUp()) {
775        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a set op"
776                ", becuase takeover is lagging", vb->getId());
777        return ENGINE_TMPFAIL;
778    }
779
780    bool cas_op = (itm.getCas() != 0);
781    int bucket_num(0);
782    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
783    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num,
784                                          /*wantsDeleted*/true,
785                                          /*trackReference*/false);
786    if (v && v->isLocked(ep_current_time()) &&
787        (vb->getState() == vbucket_state_replica ||
788         vb->getState() == vbucket_state_pending)) {
789        v->unlock();
790    }
791
792    bool maybeKeyExists = true;
793    // If we didn't find a valid item, check Bloomfilter's prediction if in
794    // full eviction policy and for a CAS operation.
795    if ((v == nullptr || v->isTempInitialItem()) &&
796        (eviction_policy == FULL_EVICTION) &&
797        (itm.getCas() != 0)) {
798        // Check Bloomfilter's prediction
799        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
800            maybeKeyExists = false;
801        }
802    }
803
804    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(), true, false,
805                                                eviction_policy, nru,
806                                                maybeKeyExists);
807
808    Item& it = const_cast<Item&>(itm);
809    uint64_t seqno = 0;
810    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
811    switch (mtype) {
812    case NOMEM:
813        ret = ENGINE_ENOMEM;
814        break;
815    case INVALID_CAS:
816    case IS_LOCKED:
817        ret = ENGINE_KEY_EEXISTS;
818        break;
819    case NOT_FOUND:
820        if (cas_op) {
821            ret = ENGINE_KEY_ENOENT;
822            break;
823        }
824        // FALLTHROUGH
825    case WAS_DIRTY:
826        // Even if the item was dirty, push it into the vbucket's open
827        // checkpoint.
828    case WAS_CLEAN:
829        // We keep lh held as we need to do v->getCas()
830        queueDirty(vb, v, nullptr, &seqno);
831        it.setBySeqno(seqno);
832        it.setCas(v->getCas());
833        break;
834    case NEED_BG_FETCH:
835    {   // CAS operation with non-resident item + full eviction.
836        if (v) {
837            // temp item is already created. Simply schedule a bg fetch job
838            lh.unlock();
839            bgFetch(itm.getKey(), vb->getId(), cookie, true);
840            return ENGINE_EWOULDBLOCK;
841        }
842        ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
843                                    cookie, true);
844        break;
845    }
846    case INVALID_VBUCKET:
847        ret = ENGINE_NOT_MY_VBUCKET;
848        break;
849    }
850
851    return ret;
852}
853
854ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
855                                                 const void *cookie)
856{
857    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
858    if (!vb) {
859        ++stats.numNotMyVBuckets;
860        return ENGINE_NOT_MY_VBUCKET;
861    }
862
863    // Obtain read-lock on VB state to ensure VB state changes are interlocked
864    // with this add
865    ReaderLockHolder rlh(vb->getStateLock());
866    if (vb->getState() == vbucket_state_dead ||
867        vb->getState() == vbucket_state_replica) {
868        ++stats.numNotMyVBuckets;
869        return ENGINE_NOT_MY_VBUCKET;
870    } else if (vb->getState() == vbucket_state_pending) {
871        if (vb->addPendingOp(cookie)) {
872            return ENGINE_EWOULDBLOCK;
873        }
874    } else if (vb->isTakeoverBackedUp()) {
875        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a add op"
876                ", becuase takeover is lagging", vb->getId());
877        return ENGINE_TMPFAIL;
878    }
879
880    if (itm.getCas() != 0) {
881        // Adding with a cas value doesn't make sense..
882        return ENGINE_NOT_STORED;
883    }
884
885    int bucket_num(0);
886    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
887    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
888                                          false);
889
890    bool maybeKeyExists = true;
891    if ((v == nullptr || v->isTempInitialItem()) &&
892        (eviction_policy == FULL_EVICTION)) {
893        // Check bloomfilter's prediction
894        if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
895            maybeKeyExists = false;
896        }
897    }
898
899    add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
900                                           eviction_policy,
901                                           true, true,
902                                           maybeKeyExists);
903
904    Item& it = const_cast<Item&>(itm);
905    uint64_t seqno = 0;
906    switch (atype) {
907    case ADD_NOMEM:
908        return ENGINE_ENOMEM;
909    case ADD_EXISTS:
910        return ENGINE_NOT_STORED;
911    case ADD_TMP_AND_BG_FETCH:
912        return addTempItemForBgFetch(lh, bucket_num, it.getKey(), vb,
913                                     cookie, true);
914    case ADD_BG_FETCH:
915        lh.unlock();
916        bgFetch(it.getKey(), vb->getId(), cookie, true);
917        return ENGINE_EWOULDBLOCK;
918    case ADD_SUCCESS:
919    case ADD_UNDEL:
920        // We need to keep lh as we will do v->getCas()
921        queueDirty(vb, v, nullptr, &seqno);
922        it.setBySeqno(seqno);
923        it.setCas(v->getCas());
924        break;
925    }
926
927    return ENGINE_SUCCESS;
928}
929
930ENGINE_ERROR_CODE EventuallyPersistentStore::replace(const Item &itm,
931                                                     const void *cookie) {
932    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
933    if (!vb) {
934        ++stats.numNotMyVBuckets;
935        return ENGINE_NOT_MY_VBUCKET;
936    }
937
938    // Obtain read-lock on VB state to ensure VB state changes are interlocked
939    // with this replace
940    ReaderLockHolder rlh(vb->getStateLock());
941    if (vb->getState() == vbucket_state_dead ||
942        vb->getState() == vbucket_state_replica) {
943        ++stats.numNotMyVBuckets;
944        return ENGINE_NOT_MY_VBUCKET;
945    } else if (vb->getState() == vbucket_state_pending) {
946        if (vb->addPendingOp(cookie)) {
947            return ENGINE_EWOULDBLOCK;
948        }
949    }
950
951    int bucket_num(0);
952    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
953    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
954                                          false);
955    if (v) {
956        if (v->isDeleted() || v->isTempDeletedItem() ||
957            v->isTempNonExistentItem()) {
958            return ENGINE_KEY_ENOENT;
959        }
960
961        mutation_type_t mtype;
962        if (eviction_policy == FULL_EVICTION && v->isTempInitialItem()) {
963            mtype = NEED_BG_FETCH;
964        } else {
965            mtype = vb->ht.unlocked_set(v, itm, 0, true, false, eviction_policy,
966                                        0xff);
967        }
968
969        Item& it = const_cast<Item&>(itm);
970        uint64_t seqno = 0;
971        ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
972        switch (mtype) {
973            case NOMEM:
974                ret = ENGINE_ENOMEM;
975                break;
976            case IS_LOCKED:
977                ret = ENGINE_KEY_EEXISTS;
978                break;
979            case INVALID_CAS:
980            case NOT_FOUND:
981                ret = ENGINE_NOT_STORED;
982                break;
983                // FALLTHROUGH
984            case WAS_DIRTY:
985                // Even if the item was dirty, push it into the vbucket's open
986                // checkpoint.
987            case WAS_CLEAN:
988                // Keep lh as we need to do v->getCas()
989                queueDirty(vb, v, nullptr, &seqno);
990                it.setBySeqno(seqno);
991                it.setCas(v->getCas());
992                break;
993            case NEED_BG_FETCH:
994            {
995                // temp item is already created. Simply schedule a bg fetch job
996                lh.unlock();
997                bgFetch(it.getKey(), vb->getId(), cookie, true);
998                ret = ENGINE_EWOULDBLOCK;
999                break;
1000            }
1001            case INVALID_VBUCKET:
1002                ret = ENGINE_NOT_MY_VBUCKET;
1003                break;
1004        }
1005
1006        return ret;
1007    } else {
1008        if (eviction_policy == VALUE_ONLY) {
1009            return ENGINE_KEY_ENOENT;
1010        }
1011
1012        if (vb->maybeKeyExistsInFilter(itm.getKey())) {
1013            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1014                                         cookie, false);
1015        } else {
1016            // As bloomfilter predicted that item surely doesn't exist
1017            // on disk, return ENOENT for replace().
1018            return ENGINE_KEY_ENOENT;
1019        }
1020    }
1021}
1022
1023ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(
1024                                                        const Item &itm,
1025                                                        uint8_t nru,
1026                                                        bool genBySeqno,
1027                                                        ExtendedMetaData *emd) {
1028
1029    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1030    if (!vb) {
1031        ++stats.numNotMyVBuckets;
1032        return ENGINE_NOT_MY_VBUCKET;
1033    }
1034
1035    // Obtain read-lock on VB state to ensure VB state changes are interlocked
1036    // with this add-tapbackfill
1037    ReaderLockHolder rlh(vb->getStateLock());
1038    if (vb->getState() == vbucket_state_dead ||
1039        vb->getState() == vbucket_state_active) {
1040        ++stats.numNotMyVBuckets;
1041        return ENGINE_NOT_MY_VBUCKET;
1042    }
1043
1044    //check for the incoming item's CAS validity
1045    if (!Item::isValidCas(itm.getCas())) {
1046        return ENGINE_KEY_EEXISTS;
1047    }
1048
1049    int bucket_num(0);
1050    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1051    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1052                                          false);
1053
1054    // Note that this function is only called on replica or pending vbuckets.
1055    if (v && v->isLocked(ep_current_time())) {
1056        v->unlock();
1057    }
1058    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
1059                                                eviction_policy, nru);
1060
1061    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1062    switch (mtype) {
1063    case NOMEM:
1064        ret = ENGINE_ENOMEM;
1065        break;
1066    case INVALID_CAS:
1067    case IS_LOCKED:
1068        ret = ENGINE_KEY_EEXISTS;
1069        break;
1070    case WAS_DIRTY:
1071        // FALLTHROUGH, to ensure the bySeqno for the hashTable item is
1072        // set correctly, and also the sequence numbers are ordered correctly.
1073        // (MB-14003)
1074    case NOT_FOUND:
1075        // FALLTHROUGH
1076    case WAS_CLEAN:
1077        vb->setMaxCas(v->getCas());
1078        tapQueueDirty(*vb, v, lh, NULL,
1079                      genBySeqno ? GenerateBySeqno::Yes : GenerateBySeqno::No);
1080        break;
1081    case INVALID_VBUCKET:
1082        ret = ENGINE_NOT_MY_VBUCKET;
1083        break;
1084    case NEED_BG_FETCH:
1085        throw std::logic_error("EventuallyPersistentStore::addTAPBackfillItem: "
1086                "SET on a non-active vbucket should not require a "
1087                "bg_metadata_fetch.");
1088    }
1089
1090    return ret;
1091}
1092
1093class KVStatsCallback : public Callback<kvstats_ctx> {
1094    public:
1095        KVStatsCallback(EventuallyPersistentStore *store)
1096            : epstore(store) { }
1097
1098       void callback(kvstats_ctx &ctx) {
1099            RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
1100            if (vb) {
1101                vb->fileSpaceUsed = ctx.fileSpaceUsed;
1102                vb->fileSize = ctx.fileSize;
1103            }
1104        }
1105
1106    private:
1107        EventuallyPersistentStore *epstore;
1108};
1109
1110ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
1111                                                           vbucket_state_t to,
1112                                                           bool transfer,
1113                                                           bool notify_dcp) {
1114    LockHolder lh(vbsetMutex);
1115    return setVBucketState_UNLOCKED(vbid, to, transfer, notify_dcp, lh);
1116}
1117
1118ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState_UNLOCKED(uint16_t vbid,
1119                                                           vbucket_state_t to,
1120                                                           bool transfer,
1121                                                           bool notify_dcp,
1122                                                           LockHolder& vbset) {
1123    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1124    if (vb && to == vb->getState()) {
1125        return ENGINE_SUCCESS;
1126    }
1127
1128    if (vb) {
1129        vbucket_state_t oldstate = vb->getState();
1130
1131        vb->setState(to);
1132
1133        if (oldstate != to && notify_dcp) {
1134            bool closeInboundStreams = false;
1135            if (to == vbucket_state_active && !transfer) {
1136                /**
1137                 * Close inbound (passive) streams into the vbucket
1138                 * only in case of a failover.
1139                 */
1140                closeInboundStreams = true;
1141            }
1142            engine.getDcpConnMap().vbucketStateChanged(vbid, to,
1143                                                       closeInboundStreams);
1144        }
1145
1146        if (to == vbucket_state_active && oldstate == vbucket_state_replica) {
1147            /**
1148             * Update snapshot range when vbucket goes from being a replica
1149             * to active, to maintain the correct snapshot sequence numbers
1150             * even in a failover scenario.
1151             */
1152            vb->checkpointManager.resetSnapshotRange();
1153        }
1154
1155        if (to == vbucket_state_active && !transfer) {
1156            const snapshot_range_t range = vb->getPersistedSnapshot();
1157            if (range.end == vbMap.getPersistenceSeqno(vbid)) {
1158                vb->failovers->createEntry(range.end);
1159            } else {
1160                vb->failovers->createEntry(range.start);
1161            }
1162        }
1163
1164        if (oldstate == vbucket_state_pending &&
1165            to == vbucket_state_active) {
1166            ExTask notifyTask = new PendingOpsNotification(engine, vb);
1167            ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
1168        }
1169        scheduleVBStatePersist(vbid);
1170    } else if (vbid < vbMap.getSize()) {
1171        FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
1172        KVShard* shard = vbMap.getShardByVbId(vbid);
1173        std::shared_ptr<Callback<uint16_t> > cb(new NotifyFlusherCB(shard));
1174        Configuration& config = engine.getConfiguration();
1175        RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
1176                                         engine.getCheckpointConfig(),
1177                                         shard, 0, 0, 0, ft, cb,
1178                                         config));
1179
1180        if (config.isBfilterEnabled()) {
1181            // Initialize bloom filters upon vbucket creation during
1182            // bucket creation and rebalance
1183            newvb->createFilter(config.getBfilterKeyCount(),
1184                                config.getBfilterFpProb());
1185        }
1186
1187        // The first checkpoint for active vbucket should start with id 2.
1188        uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
1189        newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
1190        if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
1191            return ENGINE_ERANGE;
1192        }
1193        vbMap.setPersistenceCheckpointId(vbid, 0);
1194        vbMap.setPersistenceSeqno(vbid, 0);
1195        vbMap.setBucketCreation(vbid, true);
1196        scheduleVBStatePersist(vbid);
1197    } else {
1198        return ENGINE_ERANGE;
1199    }
1200    return ENGINE_SUCCESS;
1201}
1202
1203void EventuallyPersistentStore::scheduleVBStatePersist() {
1204    for (auto vbid : vbMap.getBuckets()) {
1205        scheduleVBStatePersist(vbid);
1206    }
1207}
1208
1209void EventuallyPersistentStore::scheduleVBStatePersist(VBucket::id_type vbid) {
1210    RCPtr<VBucket> vb = getVBucket(vbid);
1211
1212    if (!vb) {
1213        LOG(EXTENSION_LOG_WARNING,
1214            "EPStore::scheduleVBStatePersist: vb:%" PRIu16
1215            " does not not exist. Unable to schedule persistence.", vbid);
1216        return;
1217    }
1218
1219    vb->checkpointManager.queueSetVBState(*vb);
1220}
1221
1222bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
1223                                                        const void* cookie) {
1224    LockHolder lh(vbsetMutex);
1225
1226    hrtime_t start_time(gethrtime());
1227    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1228    if (!vb || vb->getState() == vbucket_state_dead ||
1229         vbMap.isBucketDeletion(vbid)) {
1230        lh.unlock();
1231        LockHolder vlh(vb_mutexes[vbid]);
1232        getRWUnderlying(vbid)->delVBucket(vbid);
1233        vbMap.setBucketDeletion(vbid, false);
1234        vbMap.setBucketCreation(vbid, false);
1235        vbMap.setPersistenceSeqno(vbid, 0);
1236        ++stats.vbucketDeletions;
1237    }
1238
1239    hrtime_t spent(gethrtime() - start_time);
1240    hrtime_t wall_time = spent / 1000;
1241    BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
1242    stats.diskVBDelHisto.add(wall_time);
1243    atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
1244    stats.vbucketDelTotWalltime.fetch_add(wall_time);
1245    if (cookie) {
1246        engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1247    }
1248
1249    return true;
1250}
1251
1252void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1253                                                   const void* cookie,
1254                                                   double delay) {
1255    ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1256    ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1257
1258    if (vbMap.setBucketDeletion(vb->getId(), true)) {
1259        ExTask task = new VBDeleteTask(&engine, vb->getId(), cookie);
1260        ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1261    }
1262}
1263
1264ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1265                                                           const void* c) {
1266    // Lock to prevent a race condition between a failed update and add
1267    // (and delete).
1268    LockHolder lh(vbsetMutex);
1269
1270    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1271    if (!vb) {
1272        return ENGINE_NOT_MY_VBUCKET;
1273    }
1274
1275    vb->setState(vbucket_state_dead);
1276    engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1277    vbMap.removeBucket(vbid);
1278    lh.unlock();
1279    scheduleVBDeletion(vb, c);
1280    if (c) {
1281        return ENGINE_EWOULDBLOCK;
1282    }
1283    return ENGINE_SUCCESS;
1284}
1285
1286ENGINE_ERROR_CODE EventuallyPersistentStore::checkForDBExistence(DBFileId db_file_id) {
1287    std::string backend = engine.getConfiguration().getBackend();
1288    if (backend.compare("couchdb") == 0) {
1289        RCPtr<VBucket> vb = vbMap.getBucket(db_file_id);
1290        if (!vb) {
1291            return ENGINE_NOT_MY_VBUCKET;
1292        }
1293    } else if (backend.compare("forestdb") == 0) {
1294        if (db_file_id > (vbMap.getNumShards() - 1)) {
1295            //TODO: find a better error code
1296            return ENGINE_EINVAL;
1297        }
1298    } else {
1299        LOG(EXTENSION_LOG_WARNING,
1300            "Unknown backend specified for db file id: %d", db_file_id);
1301        return ENGINE_FAILED;
1302    }
1303
1304    return ENGINE_SUCCESS;
1305}
1306
1307ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1308                                                       compaction_ctx c,
1309                                                       const void *cookie) {
1310    ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
1311    if (errCode != ENGINE_SUCCESS) {
1312        return errCode;
1313    }
1314
1315    /* Obtain the vbucket so we can get the previous purge seqno */
1316    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1317    if (!vb) {
1318        return ENGINE_NOT_MY_VBUCKET;
1319    }
1320
1321    /* Update the compaction ctx with the previous purge seqno */
1322    c.max_purged_seq = vb->getPurgeSeqno();
1323
1324    LockHolder lh(compactionLock);
1325    ExTask task = new CompactTask(&engine, c, cookie);
1326    compactionTasks.push_back(std::make_pair(c.db_file_id, task));
1327    if (compactionTasks.size() > 1) {
1328        if ((stats.diskQueueSize > compactionWriteQueueCap &&
1329            compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
1330            engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
1331            // Snooze a new compaction task.
1332            // We will wake it up when one of the existing compaction tasks is done.
1333            task->snooze(60);
1334        }
1335    }
1336
1337    ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1338
1339    LOG(EXTENSION_LOG_DEBUG,
1340        "Scheduled compaction task %" PRIu64 " on db %d,"
1341        "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64
1342        ", dropdeletes = %d",
1343        uint64_t(task->getId()),c.db_file_id, c.purge_before_ts,
1344        c.purge_before_seq, c.drop_deletes);
1345
1346   return ENGINE_EWOULDBLOCK;
1347}
1348
1349class ExpiredItemsCallback : public Callback<std::string&, uint64_t&> {
1350    public:
1351        ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid,
1352                             time_t start)
1353            : epstore(store), vbucket(vbid), startTime(start) { }
1354
1355        void callback(std::string& key, uint64_t& revSeqno) {
1356            if (epstore->compactionCanExpireItems()) {
1357                epstore->deleteExpiredItem(vbucket, key, startTime, revSeqno,
1358                                           EXP_BY_COMPACTOR);
1359            }
1360        }
1361
1362    private:
1363        EventuallyPersistentStore *epstore;
1364        uint16_t vbucket;
1365        time_t startTime;
1366};
1367
1368bool EventuallyPersistentStore::doCompact(compaction_ctx *ctx,
1369                                          const void *cookie) {
1370    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1371    const uint16_t vbid = ctx->db_file_id;
1372    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1373    if (vb) {
1374        LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
1375        if (!lh.islocked()) {
1376            return true; // Schedule a compaction task again.
1377        }
1378
1379        Configuration &config = getEPEngine().getConfiguration();
1380        if (config.isBfilterEnabled()) {
1381            size_t initial_estimation = config.getBfilterKeyCount();
1382            size_t estimated_count;
1383            size_t num_deletes =
1384                    getROUnderlying(vbid)->getNumPersistedDeletes(vbid);
1385            if (eviction_policy == VALUE_ONLY) {
1386                /**
1387                 * VALUE-ONLY EVICTION POLICY
1388                 * Obtain number of persisted deletes from underlying kvstore.
1389                 * Bloomfilter's estimated_key_count = 1.25 * deletes
1390                 */
1391
1392                estimated_count = round(1.25 * num_deletes);
1393                std::shared_ptr<Callback<std::string&, bool&> >
1394                    filter(new BloomFilterCallback(*this, vbid, false));
1395                ctx->bloomFilterCallback = filter;
1396            } else {
1397                /**
1398                 * FULL EVICTION POLICY
1399                 * First determine if the resident ratio of vbucket is less than
1400                 * the threshold from configuration.
1401                 */
1402
1403                bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
1404                                                getBfiltersResidencyThreshold(),
1405                                                eviction_policy);
1406                std::shared_ptr<Callback<std::string&, bool&> >
1407                    filter(new BloomFilterCallback(*this, vbid, residentRatioAlert));
1408                ctx->bloomFilterCallback = filter;
1409
1410                /**
1411                 * Based on resident ratio against threshold, estimate count.
1412                 *
1413                 * 1. If resident ratio is greater than the threshold:
1414                 * Obtain number of persisted deletes from underlying kvstore.
1415                 * Obtain number of non-resident-items for vbucket.
1416                 * Bloomfilter's estimated_key_count =
1417                 *                              1.25 * (deletes + non-resident)
1418                 *
1419                 * 2. Otherwise:
1420                 * Obtain number of items for vbucket.
1421                 * Bloomfilter's estimated_key_count =
1422                 *                              1.25 * (num_items)
1423                 */
1424
1425                if (residentRatioAlert) {
1426                    estimated_count = round(1.25 *
1427                                            vb->getNumItems(eviction_policy));
1428                } else {
1429                    estimated_count = round(1.25 * (num_deletes +
1430                                vb->getNumNonResidentItems(eviction_policy)));
1431                }
1432            }
1433            if (estimated_count < initial_estimation) {
1434                estimated_count = initial_estimation;
1435            }
1436            vb->initTempFilter(estimated_count, config.getBfilterFpProb());
1437        }
1438
1439        if (vb->getState() == vbucket_state_active) {
1440            // Set the current time ONLY for active vbuckets.
1441            ctx->curr_time = ep_real_time();
1442        } else {
1443            ctx->curr_time = 0;
1444        }
1445        std::shared_ptr<Callback<std::string&, uint64_t&> >
1446           expiry(new ExpiredItemsCallback(this, vbid, ctx->curr_time));
1447        ctx->expiryCallback = expiry;
1448
1449        KVStatsCallback kvcb(this);
1450        if (getRWUnderlying(vbid)->compactDB(ctx, kvcb)) {
1451            if (config.isBfilterEnabled()) {
1452                vb->swapFilter();
1453            } else {
1454                vb->clearFilter();
1455            }
1456        } else {
1457            LOG(EXTENSION_LOG_WARNING, "Compaction: Not successful for vb %u, "
1458                    "clearing bloom filter, if any.", vb->getId());
1459            vb->clearFilter();
1460        }
1461        vb->setPurgeSeqno(ctx->max_purged_seq);
1462    } else {
1463        err = ENGINE_NOT_MY_VBUCKET;
1464        engine.storeEngineSpecific(cookie, NULL);
1465        //Decrement session counter here, as memcached thread wouldn't
1466        //visit the engine interface in case of a NOT_MY_VB notification
1467        engine.decrementSessionCtr();
1468    }
1469
1470    updateCompactionTasks(ctx->db_file_id);
1471
1472    if (cookie) {
1473        engine.notifyIOComplete(cookie, err);
1474    }
1475    --stats.pendingCompactions;
1476    return false;
1477}
1478
1479void EventuallyPersistentStore::updateCompactionTasks(DBFileId db_file_id) {
1480    LockHolder lh(compactionLock);
1481    bool erased = false, woke = false;
1482    std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
1483    while (it != compactionTasks.end()) {
1484        if ((*it).first == db_file_id) {
1485            it = compactionTasks.erase(it);
1486            erased = true;
1487        } else {
1488            ExTask &task = (*it).second;
1489            if (task->getState() == TASK_SNOOZED) {
1490                ExecutorPool::get()->wake(task->getId());
1491                woke = true;
1492            }
1493            ++it;
1494        }
1495        if (erased && woke) {
1496            break;
1497        }
1498    }
1499}
1500
1501bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1502    LockHolder lh(vbsetMutex);
1503    return resetVBucket_UNLOCKED(vbid, lh);
1504}
1505
1506bool EventuallyPersistentStore::resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset) {
1507    bool rv(false);
1508
1509    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1510    if (vb) {
1511        vbucket_state_t vbstate = vb->getState();
1512
1513        vbMap.removeBucket(vbid);
1514
1515        checkpointCursorInfoList cursors =
1516                                        vb->checkpointManager.getAllCursors();
1517        // Delete and recreate the vbucket database file
1518        scheduleVBDeletion(vb, NULL, 0);
1519        setVBucketState_UNLOCKED(vbid, vbstate,
1520                                 false/*transfer*/, true/*notifyDcp*/, vbset);
1521
1522        // Copy the all cursors from the old vbucket into the new vbucket
1523        RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1524        newvb->checkpointManager.resetCursors(cursors);
1525
1526        rv = true;
1527    }
1528    return rv;
1529}
1530
1531extern "C" {
1532
1533    typedef struct {
1534        EventuallyPersistentEngine* engine;
1535        std::map<std::string, std::string> smap;
1536    } snapshot_stats_t;
1537
1538    static void add_stat(const char *key, const uint16_t klen,
1539                         const char *val, const uint32_t vlen,
1540                         const void *cookie) {
1541        if (cookie == nullptr) {
1542            throw std::invalid_argument("add_stat: cookie is NULL");
1543        }
1544        void *ptr = const_cast<void *>(cookie);
1545        snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1546        ObjectRegistry::onSwitchThread(snap->engine);
1547
1548        std::string k(key, klen);
1549        std::string v(val, vlen);
1550        snap->smap.insert(std::pair<std::string, std::string>(k, v));
1551    }
1552}
1553
1554void EventuallyPersistentStore::snapshotStats() {
1555    snapshot_stats_t snap;
1556    snap.engine = &engine;
1557    bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1558              engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1559              engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS;
1560
1561    if (rv && stats.isShutdown) {
1562        snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1563                                                              "true" : "false";
1564        std::stringstream ss;
1565        ss << ep_real_time();
1566        snap.smap["ep_shutdown_time"] = ss.str();
1567    }
1568    getOneRWUnderlying()->snapshotStats(snap.smap);
1569}
1570
1571void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1572                                              const hrtime_t start,
1573                                              const hrtime_t stop) {
1574    if (stop >= start && start >= init) {
1575        // skip the measurement if the counter wrapped...
1576        ++stats.bgNumOperations;
1577        hrtime_t w = (start - init) / 1000;
1578        BlockTimer::log(start - init, "bgwait", stats.timingLog);
1579        stats.bgWaitHisto.add(w);
1580        stats.bgWait.fetch_add(w);
1581        atomic_setIfLess(stats.bgMinWait, w);
1582        atomic_setIfBigger(stats.bgMaxWait, w);
1583
1584        hrtime_t l = (stop - start) / 1000;
1585        BlockTimer::log(stop - start, "bgload", stats.timingLog);
1586        stats.bgLoadHisto.add(l);
1587        stats.bgLoad.fetch_add(l);
1588        atomic_setIfLess(stats.bgMinLoad, l);
1589        atomic_setIfBigger(stats.bgMaxLoad, l);
1590    }
1591}
1592
1593void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1594                                                uint16_t vbucket,
1595                                                const void *cookie,
1596                                                hrtime_t init,
1597                                                bool isMeta) {
1598    hrtime_t start(gethrtime());
1599    // Go find the data
1600    RememberingCallback<GetValue> gcb;
1601    if (isMeta) {
1602        gcb.val.setPartial();
1603        ++stats.bg_meta_fetched;
1604    } else {
1605        ++stats.bg_fetched;
1606    }
1607    getROUnderlying(vbucket)->get(key, vbucket, gcb);
1608    gcb.waitForValue();
1609    ENGINE_ERROR_CODE status = gcb.val.getStatus();
1610
1611    // Lock to prevent a race condition between a fetch for restore and delete
1612    LockHolder lh(vbsetMutex);
1613
1614    RCPtr<VBucket> vb = getVBucket(vbucket);
1615    if (vb) {
1616        ReaderLockHolder rlh(vb->getStateLock());
1617        int bucket_num(0);
1618        LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1619        StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1620        if (isMeta) {
1621            if ((v && v->unlocked_restoreMeta(gcb.val.getValue(),
1622                                              gcb.val.getStatus(), vb->ht))
1623                || ENGINE_KEY_ENOENT == status) {
1624                /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1625                 key is removed from hash table by the time bgfetch returns
1626                 (in case multiple bgfetch is scheduled for a key), we still
1627                 need to return ENGINE_SUCCESS to the memcached worker thread,
1628                 so that the worker thread can visit the ep-engine and figure
1629                 out the correct flow */
1630                status = ENGINE_SUCCESS;
1631            }
1632        } else {
1633            bool restore = false;
1634            if (v && v->isResident()) {
1635                status = ENGINE_SUCCESS;
1636            } else if (v && v->isDeleted()) {
1637                status = ENGINE_KEY_ENOENT;
1638            } else {
1639                switch (eviction_policy) {
1640                    case VALUE_ONLY:
1641                        if (v && !v->isResident() && !v->isDeleted()) {
1642                            restore = true;
1643                        }
1644                        break;
1645                    case FULL_EVICTION:
1646                        if (v) {
1647                            if (v->isTempInitialItem() ||
1648                                (!v->isResident() && !v->isDeleted())) {
1649                                restore = true;
1650                            }
1651                        }
1652                        break;
1653                    default:
1654                        throw std::logic_error("Unknown eviction policy");
1655                }
1656            }
1657
1658            if (restore) {
1659                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1660                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1661                    if (!v->isResident()) {
1662                        throw std::logic_error("EPStore::completeBGFetch: "
1663                                "storedvalue (which has key " + v->getKey() +
1664                                ") should be resident after calling restoreValue()");
1665                    }
1666                    if (vb->getState() == vbucket_state_active &&
1667                        v->getExptime() != gcb.val.getValue()->getExptime() &&
1668                        v->getCas() == gcb.val.getValue()->getCas()) {
1669                        // MB-9306: It is possible that by the time bgfetcher
1670                        // returns, the item may have been updated and queued
1671                        // Hence test the CAS value to be the same first.
1672                        // exptime mutated, schedule it into new checkpoint
1673                        queueDirty(vb, v, &hlh, NULL, GenerateBySeqno::Yes,
1674                                                    GenerateCas::No);
1675                    }
1676                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1677                    v->setNonExistent();
1678                    if (eviction_policy == FULL_EVICTION) {
1679                        // For the full eviction, we should notify
1680                        // ENGINE_SUCCESS to the memcached worker thread, so
1681                        // that the worker thread can visit the ep-engine and
1682                        // figure out the correct error code.
1683                        status = ENGINE_SUCCESS;
1684                    }
1685                } else {
1686                    // underlying kvstore couldn't fetch requested data
1687                    // log returned error and notify TMPFAIL to client
1688                    LOG(EXTENSION_LOG_WARNING,
1689                        "Failed background fetch for vb=%d "
1690                        "seq=%" PRId64 " key=%s", vbucket, v->getBySeqno(),
1691                        key.c_str());
1692                    status = ENGINE_TMPFAIL;
1693                }
1694            }
1695        }
1696    } else {
1697        LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1698            " a bg fetch for key %s\n", vbucket, key.c_str());
1699        status = ENGINE_NOT_MY_VBUCKET;
1700    }
1701
1702    lh.unlock();
1703
1704    hrtime_t stop = gethrtime();
1705    updateBGStats(init, start, stop);
1706    bgFetchQueue--;
1707
1708    delete gcb.val.getValue();
1709    engine.notifyIOComplete(cookie, status);
1710}
1711
1712void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1713                                 std::vector<bgfetched_item_t> &fetchedItems,
1714                                 hrtime_t startTime)
1715{
1716    RCPtr<VBucket> vb = getVBucket(vbId);
1717    if (!vb) {
1718        std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1719        for (; itemItr != fetchedItems.end(); ++itemItr) {
1720            engine.notifyIOComplete((*itemItr).second->cookie,
1721                                    ENGINE_NOT_MY_VBUCKET);
1722        }
1723        LOG(EXTENSION_LOG_WARNING,
1724            "EP Store completes %d of batched background fetch for "
1725            "for vBucket = %d that is already deleted\n",
1726            (int)fetchedItems.size(), vbId);
1727        return;
1728    }
1729
1730    std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1731    for (; itemItr != fetchedItems.end(); ++itemItr) {
1732        VBucketBGFetchItem *bgitem = (*itemItr).second;
1733        ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1734        Item *fetchedValue = bgitem->value.getValue();
1735        const std::string &key = (*itemItr).first;
1736        {   //locking scope
1737            ReaderLockHolder rlh(vb->getStateLock());
1738            int bucket = 0;
1739            LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1740            StoredValue *v = fetchValidValue(vb, key, bucket, true);
1741            if (bgitem->metaDataOnly) {
1742                if ((v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht))
1743                    || ENGINE_KEY_ENOENT == status) {
1744                    /* If ENGINE_KEY_ENOENT is the status from storage and the temp
1745                     key is removed from hash table by the time bgfetch returns
1746                     (in case multiple bgfetch is scheduled for a key), we still
1747                     need to return ENGINE_SUCCESS to the memcached worker thread,
1748                     so that the worker thread can visit the ep-engine and figure
1749                     out the correct flow */
1750                    status = ENGINE_SUCCESS;
1751                }
1752            } else {
1753                bool restore = false;
1754                if (v && v->isResident()) {
1755                    status = ENGINE_SUCCESS;
1756                } else if (v && v->isDeleted()) {
1757                    status = ENGINE_KEY_ENOENT;
1758                } else {
1759                    switch (eviction_policy) {
1760                        case VALUE_ONLY:
1761                            if (v && !v->isResident() && !v->isDeleted()) {
1762                                restore = true;
1763                            }
1764                            break;
1765                        case FULL_EVICTION:
1766                            if (v) {
1767                                if (v->isTempInitialItem() ||
1768                                    (!v->isResident() && !v->isDeleted())) {
1769                                    restore = true;
1770                                }
1771                            }
1772                            break;
1773                        default:
1774                            throw std::logic_error("Unknown eviction policy");
1775                    }
1776                }
1777
1778                if (restore) {
1779                    if (status == ENGINE_SUCCESS) {
1780                        v->unlocked_restoreValue(fetchedValue, vb->ht);
1781                        if (!v->isResident()) {
1782                            throw std::logic_error("EPStore::completeBGFetchMulti: "
1783                                "storedvalue (which has key " + v->getKey() +
1784                                ") should be resident after calling restoreValue()");
1785                        }
1786                        if (vb->getState() == vbucket_state_active &&
1787                            v->getExptime() != fetchedValue->getExptime() &&
1788                            v->getCas() == fetchedValue->getCas()) {
1789                            // MB-9306: It is possible that by the time
1790                            // bgfetcher returns, the item may have been
1791                            // updated and queued
1792                            // Hence test the CAS value to be the same first.
1793                            // exptime mutated, schedule it into new checkpoint
1794                            queueDirty(vb, v, &blh, NULL, GenerateBySeqno::Yes,
1795                                                        GenerateCas::No);
1796                        }
1797                    } else if (status == ENGINE_KEY_ENOENT) {
1798                        v->setNonExistent();
1799                        if (eviction_policy == FULL_EVICTION) {
1800                            // For the full eviction, we should notify
1801                            // ENGINE_SUCCESS to the memcached worker thread,
1802                            // so that the worker thread can visit the
1803                            // ep-engine and figure out the correct error
1804                            // code.
1805                            status = ENGINE_SUCCESS;
1806                        }
1807                    } else {
1808                        // underlying kvstore couldn't fetch requested data
1809                        // log returned error and notify TMPFAIL to client
1810                        LOG(EXTENSION_LOG_WARNING,
1811                            "Failed background fetch for vb=%d "
1812                            "key=%s", vbId, key.c_str());
1813                        status = ENGINE_TMPFAIL;
1814                    }
1815                }
1816            }
1817        } // locked scope ends
1818
1819        if (bgitem->metaDataOnly) {
1820            ++stats.bg_meta_fetched;
1821        } else {
1822            ++stats.bg_fetched;
1823        }
1824
1825        hrtime_t endTime = gethrtime();
1826        updateBGStats(bgitem->initTime, startTime, endTime);
1827        engine.notifyIOComplete(bgitem->cookie, status);
1828    }
1829
1830    LOG(EXTENSION_LOG_DEBUG,
1831        "EP Store completes %" PRIu64 " of batched background fetch "
1832        "for vBucket = %d endTime = %" PRIu64,
1833        uint64_t(fetchedItems.size()), vbId, gethrtime()/1000000);
1834}
1835
1836void EventuallyPersistentStore::bgFetch(const const_sized_buffer key,
1837                                        uint16_t vbucket,
1838                                        const void *cookie,
1839                                        bool isMeta) {
1840    if (multiBGFetchEnabled()) {
1841        RCPtr<VBucket> vb = getVBucket(vbucket);
1842        if (!vb) {
1843            throw std::invalid_argument("EPStore::bgFetch: vbucket (which is " +
1844                                        std::to_string(vbucket) +
1845                                        ") is not present in vbMap");
1846        }
1847        KVShard *myShard = vbMap.getShardByVbId(vbucket);
1848
1849        // schedule to the current batch of background fetch of the given
1850        // vbucket
1851        VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1852                                                                isMeta);
1853        size_t bgfetch_size = vb->queueBGFetchItem(key, fetchThis,
1854                                                   myShard->getBgFetcher());
1855        myShard->getBgFetcher()->notifyBGEvent();
1856        LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1857            uint64_t(bgfetch_size));
1858    } else {
1859        bgFetchQueue++;
1860        stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1861                                            bgFetchQueue.load());
1862        ExecutorPool* iom = ExecutorPool::get();
1863        ExTask task = new SingleBGFetcherTask(&engine, key, vbucket, cookie,
1864                                              isMeta, bgFetchDelay, false);
1865        iom->schedule(task, READER_TASK_IDX);
1866        LOG(EXTENSION_LOG_DEBUG, "Queued a background fetch, now at %" PRIu64,
1867            uint64_t(bgFetchQueue.load()));
1868    }
1869}
1870
1871GetValue EventuallyPersistentStore::getInternal(const const_sized_buffer key,
1872                                                uint16_t vbucket,
1873                                                const void *cookie,
1874                                                vbucket_state_t allowedState,
1875                                                get_options_t options) {
1876
1877    vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1878        vbucket_state_replica : vbucket_state_active;
1879    RCPtr<VBucket> vb = getVBucket(vbucket);
1880    if (!vb) {
1881        ++stats.numNotMyVBuckets;
1882        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1883    }
1884
1885    const bool honorStates = (options & HONOR_STATES);
1886
1887    ReaderLockHolder rlh(vb->getStateLock());
1888    if (honorStates) {
1889        vbucket_state_t vbState = vb->getState();
1890        if (vbState == vbucket_state_dead) {
1891            ++stats.numNotMyVBuckets;
1892            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1893        } else if (vbState == disallowedState) {
1894            ++stats.numNotMyVBuckets;
1895            return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1896        } else if (vbState == vbucket_state_pending) {
1897            if (vb->addPendingOp(cookie)) {
1898                return GetValue(NULL, ENGINE_EWOULDBLOCK);
1899            }
1900        }
1901    }
1902
1903    const bool trackReference = (options & TRACK_REFERENCE);
1904
1905    int bucket_num(0);
1906    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1907    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1908                                     trackReference);
1909    if (v) {
1910        if (v->isDeleted()) {
1911            GetValue rv;
1912            return rv;
1913        }
1914        if (v->isTempDeletedItem() || v->isTempNonExistentItem()) {
1915            // Delete a temp non-existent item to ensure that
1916            // if the get were issued over an item that doesn't
1917            // exist, then we dont preserve a temp item.
1918            if (options & DELETE_TEMP) {
1919                vb->ht.unlocked_del(key, bucket_num);
1920            }
1921            GetValue rv;
1922            return rv;
1923        }
1924
1925        // If the value is not resident, wait for it...
1926        if (!v->isResident()) {
1927            if (options & QUEUE_BG_FETCH) {
1928                bgFetch(key, vbucket, cookie);
1929            }
1930            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1931                            true, v->getNRUValue());
1932        }
1933
1934        // Should we hide (return -1) for the items' CAS?
1935        const bool hide_cas = (options & HIDE_LOCKED_CAS) &&
1936                              v->isLocked(ep_current_time());
1937        GetValue rv(v->toItem(hide_cas, vbucket), ENGINE_SUCCESS,
1938                    v->getBySeqno(), false, v->getNRUValue());
1939        return rv;
1940    } else {
1941        if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1942            GetValue rv;
1943            return rv;
1944        }
1945
1946        if (vb->maybeKeyExistsInFilter(key)) {
1947            ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1948            if (options & QUEUE_BG_FETCH) { // Full eviction and need a bg fetch.
1949                ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1950                                           cookie, false);
1951            }
1952            return GetValue(NULL, ec, -1, true);
1953        } else {
1954            // As bloomfilter predicted that item surely doesn't exist
1955            // on disk, return ENONET, for getInternal().
1956            GetValue rv;
1957            return rv;
1958        }
1959    }
1960}
1961
1962GetValue EventuallyPersistentStore::getRandomKey() {
1963    VBucketMap::id_type max = vbMap.getSize();
1964
1965    const long start = random() % max;
1966    long curr = start;
1967    Item *itm = NULL;
1968
1969    while (itm == NULL) {
1970        RCPtr<VBucket> vb = getVBucket(curr++);
1971        while (!vb || vb->getState() != vbucket_state_active) {
1972            if (curr == start) {
1973                return GetValue(NULL, ENGINE_KEY_ENOENT);
1974            }
1975            if (curr == max) {
1976                curr = 0;
1977            }
1978
1979            vb = getVBucket(curr++);
1980        }
1981
1982        if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1983            GetValue rv(itm, ENGINE_SUCCESS);
1984            return rv;
1985        }
1986
1987        if (curr == max) {
1988            curr = 0;
1989        }
1990
1991        if (curr == start) {
1992            return GetValue(NULL, ENGINE_KEY_ENOENT);
1993        }
1994        // Search next vbucket
1995    }
1996
1997    return GetValue(NULL, ENGINE_KEY_ENOENT);
1998}
1999
2000
2001ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
2002                                                        const std::string &key,
2003                                                        uint16_t vbucket,
2004                                                        const void *cookie,
2005                                                        ItemMetaData &metadata,
2006                                                        uint32_t &deleted,
2007                                                        bool trackReferenced)
2008{
2009    (void) cookie;
2010    RCPtr<VBucket> vb = getVBucket(vbucket);
2011
2012    if (!vb) {
2013        ++stats.numNotMyVBuckets;
2014        return ENGINE_NOT_MY_VBUCKET;
2015    }
2016
2017    ReaderLockHolder rlh(vb->getStateLock());
2018    if (vb->getState() == vbucket_state_dead ||
2019        vb->getState() == vbucket_state_replica) {
2020        ++stats.numNotMyVBuckets;
2021        return ENGINE_NOT_MY_VBUCKET;
2022    }
2023
2024    int bucket_num(0);
2025    deleted = 0;
2026    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2027    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
2028                                          trackReferenced);
2029
2030    if (v) {
2031        stats.numOpsGetMeta++;
2032        if (v->isTempInitialItem()) { // Need bg meta fetch.
2033            bgFetch(key, vbucket, cookie, true);
2034            return ENGINE_EWOULDBLOCK;
2035        } else if (v->isTempNonExistentItem()) {
2036            metadata.cas = v->getCas();
2037            return ENGINE_KEY_ENOENT;
2038        } else {
2039            if (v->isTempDeletedItem() || v->isDeleted() ||
2040                v->isExpired(ep_real_time())) {
2041                deleted |= GET_META_ITEM_DELETED_FLAG;
2042            }
2043
2044            if (v->isLocked(ep_current_time())) {
2045                metadata.cas = static_cast<uint64_t>(-1);
2046            } else {
2047                metadata.cas = v->getCas();
2048            }
2049            metadata.flags = v->getFlags();
2050            metadata.exptime = v->getExptime();
2051            metadata.revSeqno = v->getRevSeqno();
2052            return ENGINE_SUCCESS;
2053        }
2054    } else {
2055        // The key wasn't found. However, this may be because it was previously
2056        // deleted or evicted with the full eviction strategy.
2057        // So, add a temporary item corresponding to the key to the hash table
2058        // and schedule a background fetch for its metadata from the persistent
2059        // store. The item's state will be updated after the fetch completes.
2060        //
2061        // Schedule this bgFetch only if the key is predicted to be may-be
2062        // existent on disk by the bloomfilter.
2063
2064        if (vb->maybeKeyExistsInFilter(key)) {
2065            return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
2066        } else {
2067            stats.numOpsGetMeta++;
2068            return ENGINE_KEY_ENOENT;
2069        }
2070    }
2071}
2072
2073ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(
2074                                                     const Item &itm,
2075                                                     uint64_t cas,
2076                                                     uint64_t *seqno,
2077                                                     const void *cookie,
2078                                                     bool force,
2079                                                     bool allowExisting,
2080                                                     uint8_t nru,
2081                                                     GenerateBySeqno genBySeqno,
2082                                                     GenerateCas genCas,
2083                                                     ExtendedMetaData *emd,
2084                                                     bool isReplication)
2085{
2086    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
2087    if (!vb) {
2088        ++stats.numNotMyVBuckets;
2089        return ENGINE_NOT_MY_VBUCKET;
2090    }
2091
2092    ReaderLockHolder rlh(vb->getStateLock());
2093    if (vb->getState() == vbucket_state_dead) {
2094        ++stats.numNotMyVBuckets;
2095        return ENGINE_NOT_MY_VBUCKET;
2096    } else if (vb->getState() == vbucket_state_replica && !force) {
2097        ++stats.numNotMyVBuckets;
2098        return ENGINE_NOT_MY_VBUCKET;
2099    } else if (vb->getState() == vbucket_state_pending && !force) {
2100        if (vb->addPendingOp(cookie)) {
2101            return ENGINE_EWOULDBLOCK;
2102        }
2103    } else if (vb->isTakeoverBackedUp()) {
2104        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a setWithMeta op"
2105                ", becuase takeover is lagging", vb->getId());
2106        return ENGINE_TMPFAIL;
2107    }
2108
2109    //check for the incoming item's CAS validity
2110    if (!Item::isValidCas(itm.getCas())) {
2111        return ENGINE_KEY_EEXISTS;
2112    }
2113
2114    int bucket_num(0);
2115    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
2116    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
2117                                          false);
2118
2119    bool maybeKeyExists = true;
2120    if (!force) {
2121        if (v)  {
2122            if (v->isTempInitialItem()) {
2123                bgFetch(itm.getKey(), itm.getVBucketId(), cookie, true);
2124                return ENGINE_EWOULDBLOCK;
2125            }
2126
2127            if (!conflictResolver->resolve(*v, itm.getMetaData(), false)) {
2128                ++stats.numOpsSetMetaResolutionFailed;
2129                return ENGINE_KEY_EEXISTS;
2130            }
2131        } else {
2132            if (vb->maybeKeyExistsInFilter(itm.getKey())) {
2133                return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2134                                             cookie, true, isReplication);
2135            } else {
2136                maybeKeyExists = false;
2137            }
2138        }
2139    } else {
2140        if (eviction_policy == FULL_EVICTION) {
2141            // Check Bloomfilter's prediction
2142            if (!vb->maybeKeyExistsInFilter(itm.getKey())) {
2143                maybeKeyExists = false;
2144            }
2145        }
2146    }
2147
2148    if (v && v->isLocked(ep_current_time()) &&
2149        (vb->getState() == vbucket_state_replica ||
2150         vb->getState() == vbucket_state_pending)) {
2151        v->unlock();
2152    }
2153
2154    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
2155                                                true, eviction_policy, nru,
2156                                                maybeKeyExists, isReplication);
2157
2158    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2159    switch (mtype) {
2160    case NOMEM:
2161        ret = ENGINE_ENOMEM;
2162        break;
2163    case INVALID_CAS:
2164    case IS_LOCKED:
2165        ret = ENGINE_KEY_EEXISTS;
2166        break;
2167    case INVALID_VBUCKET:
2168        ret = ENGINE_NOT_MY_VBUCKET;
2169        break;
2170    case WAS_DIRTY:
2171    case WAS_CLEAN:
2172        vb->setMaxCasAndTrackDrift(v->getCas());
2173        queueDirty(vb, v, &lh, seqno, genBySeqno, genCas);
2174        break;
2175    case NOT_FOUND:
2176        ret = ENGINE_KEY_ENOENT;
2177        break;
2178    case NEED_BG_FETCH:
2179        {            // CAS operation with non-resident item + full eviction.
2180            if (v) { // temp item is already created. Simply schedule a
2181                lh.unlock(); // bg fetch job.
2182                bgFetch(itm.getKey(), vb->getId(), cookie, true);
2183                return ENGINE_EWOULDBLOCK;
2184            }
2185
2186            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
2187                                        cookie, true, isReplication);
2188        }
2189    }
2190
2191    return ret;
2192}
2193
2194GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
2195                                                    uint16_t vbucket,
2196                                                    const void *cookie,
2197                                                    time_t exptime)
2198{
2199    RCPtr<VBucket> vb = getVBucket(vbucket);
2200    if (!vb) {
2201        ++stats.numNotMyVBuckets;
2202        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2203    }
2204
2205    ReaderLockHolder rlh(vb->getStateLock());
2206    if (vb->getState() == vbucket_state_dead) {
2207        ++stats.numNotMyVBuckets;
2208        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2209    } else if (vb->getState() == vbucket_state_replica) {
2210        ++stats.numNotMyVBuckets;
2211        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2212    } else if (vb->getState() == vbucket_state_pending) {
2213        if (vb->addPendingOp(cookie)) {
2214            return GetValue(NULL, ENGINE_EWOULDBLOCK);
2215        }
2216    }
2217
2218    int bucket_num(0);
2219    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2220    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2221
2222    if (v) {
2223        if (v->isDeleted() || v->isTempDeletedItem() ||
2224            v->isTempNonExistentItem()) {
2225            GetValue rv;
2226            return rv;
2227        }
2228
2229        if (!v->isResident()) {
2230            bgFetch(key, vbucket, cookie);
2231            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
2232        }
2233        if (v->isLocked(ep_current_time())) {
2234            GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
2235            return rv;
2236        }
2237
2238        const bool exptime_mutated = exptime != v->getExptime();
2239        if (exptime_mutated) {
2240            v->markDirty();
2241            v->setExptime(exptime);
2242            v->setRevSeqno(v->getRevSeqno()+1);
2243        }
2244
2245        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
2246                    ENGINE_SUCCESS, v->getBySeqno());
2247
2248        if (exptime_mutated) {
2249            queueDirty(vb, v, &lh, NULL);
2250        }
2251
2252        return rv;
2253    } else {
2254        if (eviction_policy == VALUE_ONLY) {
2255            GetValue rv;
2256            return rv;
2257        } else {
2258            if (vb->maybeKeyExistsInFilter(key)) {
2259                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2260                                                             key, vb, cookie,
2261                                                             false);
2262                return GetValue(NULL, ec, -1, true);
2263            } else {
2264                // As bloomfilter predicted that item surely doesn't exist
2265                // on disk, return ENOENT for getAndUpdateTtl().
2266                GetValue rv;
2267                return rv;
2268            }
2269        }
2270    }
2271}
2272
2273ENGINE_ERROR_CODE
2274EventuallyPersistentStore::statsVKey(const std::string &key,
2275                                     uint16_t vbucket,
2276                                     const void *cookie) {
2277    RCPtr<VBucket> vb = getVBucket(vbucket);
2278    if (!vb) {
2279        return ENGINE_NOT_MY_VBUCKET;
2280    }
2281
2282    int bucket_num(0);
2283    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2284    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2285
2286    if (v) {
2287        if (v->isDeleted() || v->isTempDeletedItem() ||
2288            v->isTempNonExistentItem()) {
2289            return ENGINE_KEY_ENOENT;
2290        }
2291        bgFetchQueue++;
2292        ExecutorPool* iom = ExecutorPool::get();
2293        ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
2294                                           v->getBySeqno(), cookie,
2295                                           bgFetchDelay, false);
2296        iom->schedule(task, READER_TASK_IDX);
2297        return ENGINE_EWOULDBLOCK;
2298    } else {
2299        if (eviction_policy == VALUE_ONLY) {
2300            return ENGINE_KEY_ENOENT;
2301        } else {
2302            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2303                                                        eviction_policy);
2304            switch(rv) {
2305            case ADD_NOMEM:
2306                return ENGINE_ENOMEM;
2307            case ADD_EXISTS:
2308            case ADD_UNDEL:
2309            case ADD_SUCCESS:
2310            case ADD_TMP_AND_BG_FETCH:
2311                // Since the hashtable bucket is locked, we shouldn't get here
2312                throw std::logic_error("EventuallyPersistentStore::statsVKey: "
2313                        "Invalid result from unlocked_addTempItem (" +
2314                        std::to_string(rv) + ")");
2315
2316            case ADD_BG_FETCH:
2317                {
2318                    ++bgFetchQueue;
2319                    ExecutorPool* iom = ExecutorPool::get();
2320                    ExTask task = new VKeyStatBGFetchTask(&engine, key,
2321                                                          vbucket, -1, cookie,
2322                                                          bgFetchDelay, false);
2323                    iom->schedule(task, READER_TASK_IDX);
2324                }
2325            }
2326            return ENGINE_EWOULDBLOCK;
2327        }
2328    }
2329}
2330
2331void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
2332                                                  std::string &key,
2333                                                  uint16_t vbid,
2334                                                  uint64_t bySeqNum) {
2335    RememberingCallback<GetValue> gcb;
2336
2337    getROUnderlying(vbid)->get(key, vbid, gcb);
2338    gcb.waitForValue();
2339
2340    if (eviction_policy == FULL_EVICTION) {
2341        RCPtr<VBucket> vb = getVBucket(vbid);
2342        if (vb) {
2343            int bucket_num(0);
2344            LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
2345            StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2346            if (v && v->isTempInitialItem()) {
2347                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2348                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
2349                    if (!v->isResident()) {
2350                        throw std::logic_error("EPStore::completeStatsVKey: "
2351                            "storedvalue (which has key " + v->getKey() +
2352                            ") should be resident after calling restoreValue()");
2353                    }
2354                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
2355                    v->setNonExistent();
2356                } else {
2357                    // underlying kvstore couldn't fetch requested data
2358                    // log returned error and notify TMPFAIL to client
2359                    LOG(EXTENSION_LOG_WARNING,
2360                        "Failed background fetch for vb=%d "
2361                        "seq=%" PRId64 " key=%s", vbid, v->getBySeqno(),
2362                        key.c_str());
2363                }
2364            }
2365        }
2366    }
2367
2368    if (gcb.val.getStatus() == ENGINE_SUCCESS) {
2369        engine.addLookupResult(cookie, gcb.val.getValue());
2370    } else {
2371        engine.addLookupResult(cookie, NULL);
2372    }
2373
2374    bgFetchQueue--;
2375    engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
2376}
2377
2378GetValue EventuallyPersistentStore::getLocked(const std::string &key,
2379                                              uint16_t vbucket,
2380                                              rel_time_t currentTime,
2381                                              uint32_t lockTimeout,
2382                                              const void *cookie) {
2383    RCPtr<VBucket> vb = getVBucket(vbucket);
2384    if (!vb || vb->getState() != vbucket_state_active) {
2385        ++stats.numNotMyVBuckets;
2386        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
2387    }
2388
2389    int bucket_num(0);
2390    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2391    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2392
2393    if (v) {
2394        if (v->isDeleted() || v->isTempNonExistentItem() ||
2395            v->isTempDeletedItem()) {
2396            return GetValue(NULL, ENGINE_KEY_ENOENT);
2397        }
2398
2399        // if v is locked return error
2400        if (v->isLocked(currentTime)) {
2401            return GetValue(NULL, ENGINE_TMPFAIL);
2402        }
2403
2404        // If the value is not resident, wait for it...
2405        if (!v->isResident()) {
2406            if (cookie) {
2407                bgFetch(key, vbucket, cookie);
2408            }
2409            return GetValue(NULL, ENGINE_EWOULDBLOCK, -1, true);
2410        }
2411
2412        // acquire lock and increment cas value
2413        v->lock(currentTime + lockTimeout);
2414
2415        Item *it = v->toItem(false, vbucket);
2416        it->setCas(vb->nextHLCCas());
2417        v->setCas(it->getCas());
2418
2419        return GetValue(it);
2420
2421    } else {
2422        // No value found in the hashtable.
2423        switch (eviction_policy) {
2424        case VALUE_ONLY:
2425            return GetValue(NULL, ENGINE_KEY_ENOENT);
2426
2427        case FULL_EVICTION:
2428            if (vb->maybeKeyExistsInFilter(key)) {
2429                ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num,
2430                                                             key, vb, cookie,
2431                                                             false);
2432                return GetValue(NULL, ec, -1, true);
2433            } else {
2434                // As bloomfilter predicted that item surely doesn't exist
2435                // on disk, return ENOENT for getLocked().
2436                return GetValue(NULL, ENGINE_KEY_ENOENT);
2437            }
2438        default:
2439            throw std::logic_error("Unknown eviction policy");
2440        }
2441    }
2442}
2443
2444ENGINE_ERROR_CODE
2445EventuallyPersistentStore::unlockKey(const std::string &key,
2446                                     uint16_t vbucket,
2447                                     uint64_t cas,
2448                                     rel_time_t currentTime)
2449{
2450
2451    RCPtr<VBucket> vb = getVBucket(vbucket);
2452    if (!vb || vb->getState() != vbucket_state_active) {
2453        ++stats.numNotMyVBuckets;
2454        return ENGINE_NOT_MY_VBUCKET;
2455    }
2456
2457    int bucket_num(0);
2458    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2459    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2460
2461    if (v) {
2462        if (v->isDeleted() || v->isTempNonExistentItem() ||
2463            v->isTempDeletedItem()) {
2464            return ENGINE_KEY_ENOENT;
2465        }
2466        if (v->isLocked(currentTime)) {
2467            if (v->getCas() == cas) {
2468                v->unlock();
2469                return ENGINE_SUCCESS;
2470            }
2471        }
2472        return ENGINE_TMPFAIL;
2473    } else {
2474        if (eviction_policy == VALUE_ONLY) {
2475            return ENGINE_KEY_ENOENT;
2476        } else {
2477            // With the full eviction, an item's lock is automatically
2478            // released when the item is evicted from memory. Therefore,
2479            // we simply return ENGINE_TMPFAIL when we receive unlockKey
2480            // for an item that is not in memocy cache. Note that we don't
2481            // spawn any bg fetch job to figure out if an item actually
2482            // exists in disk or not.
2483            return ENGINE_TMPFAIL;
2484        }
2485    }
2486}
2487
2488
2489ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
2490                                            const std::string &key,
2491                                            uint16_t vbucket,
2492                                            const void *cookie,
2493                                            struct key_stats &kstats,
2494                                            bool bgfetch,
2495                                            bool wantsDeleted)
2496{
2497    RCPtr<VBucket> vb = getVBucket(vbucket);
2498    if (!vb) {
2499        return ENGINE_NOT_MY_VBUCKET;
2500    }
2501
2502    int bucket_num(0);
2503    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2504    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
2505
2506    if (v) {
2507        if ((v->isDeleted() && !wantsDeleted) ||
2508            v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2509            return ENGINE_KEY_ENOENT;
2510        }
2511        if (eviction_policy == FULL_EVICTION &&
2512            v->isTempInitialItem() && bgfetch) {
2513            lh.unlock();
2514            bgFetch(key, vbucket, cookie, true);
2515            return ENGINE_EWOULDBLOCK;
2516        }
2517        kstats.logically_deleted = v->isDeleted();
2518        kstats.dirty = v->isDirty();
2519        kstats.exptime = v->getExptime();
2520        kstats.flags = v->getFlags();
2521        kstats.cas = v->getCas();
2522        kstats.vb_state = vb->getState();
2523        return ENGINE_SUCCESS;
2524    } else {
2525        if (eviction_policy == VALUE_ONLY) {
2526            return ENGINE_KEY_ENOENT;
2527        } else {
2528            if (bgfetch && vb->maybeKeyExistsInFilter(key)) {
2529                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2530                                             cookie, true);
2531            } else {
2532                // If bgFetch were false, or bloomfilter predicted that
2533                // item surely doesn't exist on disk, return ENOENT for
2534                // getKeyStats().
2535                return ENGINE_KEY_ENOENT;
2536            }
2537        }
2538    }
2539}
2540
2541std::string EventuallyPersistentStore::validateKey(const std::string &key,
2542                                                   uint16_t vbucket,
2543                                                   Item &diskItem) {
2544    int bucket_num(0);
2545    RCPtr<VBucket> vb = getVBucket(vbucket);
2546    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2547    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2548                                     false, true);
2549
2550    if (v) {
2551        if (v->isDeleted() || v->isTempNonExistentItem() ||
2552            v->isTempDeletedItem()) {
2553            return "item_deleted";
2554        }
2555
2556        if (diskItem.getFlags() != v->getFlags()) {
2557            return "flags_mismatch";
2558        } else if (v->isResident() && memcmp(diskItem.getData(),
2559                                             v->getValue()->getData(),
2560                                             diskItem.getNBytes())) {
2561            return "data_mismatch";
2562        } else {
2563            return "valid";
2564        }
2565    } else {
2566        return "item_deleted";
2567    }
2568
2569}
2570
2571ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2572                                                        uint64_t *cas,
2573                                                        uint16_t vbucket,
2574                                                        const void *cookie,
2575                                                        bool force,
2576                                                        ItemMetaData *itemMeta,
2577                                                        mutation_descr_t
2578                                                        *mutInfo)
2579{
2580    RCPtr<VBucket> vb = getVBucket(vbucket);
2581    if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2582        ++stats.numNotMyVBuckets;
2583        return ENGINE_NOT_MY_VBUCKET;
2584    } else if (vb->getState() == vbucket_state_replica && !force) {
2585        ++stats.numNotMyVBuckets;
2586        return ENGINE_NOT_MY_VBUCKET;
2587    } else if (vb->getState() == vbucket_state_pending && !force) {
2588        if (vb->addPendingOp(cookie)) {
2589            return ENGINE_EWOULDBLOCK;
2590        }
2591    } else if (vb->isTakeoverBackedUp()) {
2592        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a delete op"
2593                ", becuase takeover is lagging", vb->getId());
2594        return ENGINE_TMPFAIL;
2595    }
2596
2597    int bucket_num(0);
2598    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2599    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2600    if (!v || v->isDeleted() || v->isTempItem()) {
2601        if (eviction_policy == VALUE_ONLY) {
2602            return ENGINE_KEY_ENOENT;
2603        } else { // Full eviction.
2604            if (!force) {
2605                if (!v) { // Item might be evicted from cache.
2606                    if (vb->maybeKeyExistsInFilter(key)) {
2607                        return addTempItemForBgFetch(lh, bucket_num, key, vb,
2608                                                     cookie, true);
2609                    } else {
2610                        // As bloomfilter predicted that item surely doesn't
2611                        // exist on disk, return ENOENT for deleteItem().
2612                        return ENGINE_KEY_ENOENT;
2613                    }
2614                } else if (v->isTempInitialItem()) {
2615                    lh.unlock();
2616                    bgFetch(key, vbucket, cookie, true);
2617                    return ENGINE_EWOULDBLOCK;
2618                } else { // Non-existent or deleted key.
2619                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2620                        // Delete a temp non-existent item to ensure that
2621                        // if a delete were issued over an item that doesn't
2622                        // exist, then we don't preserve a temp item.
2623                        vb->ht.unlocked_del(key, bucket_num);
2624                    }
2625                    return ENGINE_KEY_ENOENT;
2626                }
2627            } else {
2628                if (!v) { // Item might be evicted from cache.
2629                    // Create a temp item and delete it below as it is a
2630                    // force deletion, only if bloomfilter predicts that
2631                    // item may exist on disk.
2632                    if (vb->maybeKeyExistsInFilter(key)) {
2633                        add_type_t rv = vb->ht.unlocked_addTempItem(
2634                                                               bucket_num,
2635                                                               key,
2636                                                               eviction_policy);
2637                        if (rv == ADD_NOMEM) {
2638                            return ENGINE_ENOMEM;
2639                        }
2640                        v = vb->ht.unlocked_find(key, bucket_num, true, false);
2641                        v->setDeleted();
2642                    } else {
2643                        return ENGINE_KEY_ENOENT;
2644                    }
2645                } else if (v->isTempInitialItem()) {
2646                    v->setDeleted();
2647                } else { // Non-existent or deleted key.
2648                    if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
2649                        // Delete a temp non-existent item to ensure that
2650                        // if a delete were issued over an item that doesn't
2651                        // exist, then we don't preserve a temp item.
2652                        vb->ht.unlocked_del(key, bucket_num);
2653                    }
2654                    return ENGINE_KEY_ENOENT;
2655                }
2656            }
2657        }
2658    }
2659
2660    if (v && v->isLocked(ep_current_time()) &&
2661        (vb->getState() == vbucket_state_replica ||
2662         vb->getState() == vbucket_state_pending)) {
2663        v->unlock();
2664    }
2665    mutation_type_t delrv;
2666    delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2667    if (v && (delrv == NOT_FOUND || delrv == WAS_DIRTY || delrv == WAS_CLEAN)) {
2668        if (itemMeta != nullptr) {
2669            itemMeta->revSeqno = v->getRevSeqno();
2670            itemMeta->cas = v->getCas();
2671            itemMeta->flags = v->getFlags();
2672            itemMeta->exptime = v->getExptime();
2673        }
2674    }
2675
2676    uint64_t seqno = 0;
2677    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2678    switch (delrv) {
2679    case NOMEM:
2680        ret = ENGINE_ENOMEM;
2681        break;
2682    case INVALID_VBUCKET:
2683        ret = ENGINE_NOT_MY_VBUCKET;
2684        break;
2685    case INVALID_CAS:
2686        ret = ENGINE_KEY_EEXISTS;
2687        break;
2688    case IS_LOCKED:
2689        ret = ENGINE_TMPFAIL;
2690        break;
2691    case NOT_FOUND:
2692        ret = ENGINE_KEY_ENOENT;
2693    case WAS_CLEAN:
2694    case WAS_DIRTY:
2695        if (v) {
2696            // Keep lh as we need to do v->getCas
2697            queueDirty(vb, v, nullptr, &seqno);
2698            *cas = v->getCas();
2699        }
2700
2701        if (delrv != NOT_FOUND) {
2702            mutInfo->seqno = seqno;
2703            mutInfo->vbucket_uuid = vb->failovers->getLatestUUID();
2704            if (itemMeta != nullptr) {
2705                itemMeta->cas = v->getCas();
2706            }
2707        }
2708        break;
2709    case NEED_BG_FETCH:
2710        // We already figured out if a bg fetch is requred for a full-evicted
2711        // item above.
2712        throw std::logic_error("EventuallyPersistentStore::deleteItem: "
2713                "Unexpected NEEDS_BG_FETCH from unlocked_softDelete");
2714    }
2715    return ret;
2716}
2717
2718ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2719                                                     const std::string &key,
2720                                                     uint64_t *cas,
2721                                                     uint64_t *seqno,
2722                                                     uint16_t vbucket,
2723                                                     const void *cookie,
2724                                                     bool force,
2725                                                     ItemMetaData *itemMeta,
2726                                                     bool tapBackfill,
2727                                                     GenerateBySeqno genBySeqno,
2728                                                     GenerateCas generateCas,
2729                                                     uint64_t bySeqno,
2730                                                     ExtendedMetaData *emd,
2731                                                     bool isReplication)
2732{
2733    RCPtr<VBucket> vb = getVBucket(vbucket);
2734
2735    if (!vb) {
2736        ++stats.numNotMyVBuckets;
2737        return ENGINE_NOT_MY_VBUCKET;
2738    }
2739
2740    ReaderLockHolder rlh(vb->getStateLock());
2741    if (vb->getState() == vbucket_state_dead) {
2742        ++stats.numNotMyVBuckets;
2743        return ENGINE_NOT_MY_VBUCKET;
2744    } else if (vb->getState() == vbucket_state_replica && !force) {
2745        ++stats.numNotMyVBuckets;
2746        return ENGINE_NOT_MY_VBUCKET;
2747    } else if (vb->getState() == vbucket_state_pending && !force) {
2748        if (vb->addPendingOp(cookie)) {
2749            return ENGINE_EWOULDBLOCK;
2750        }
2751    } else if (vb->isTakeoverBackedUp()) {
2752        LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a deleteWithMeta "
2753                "op, because takeover is lagging", vb->getId());
2754        return ENGINE_TMPFAIL;
2755    }
2756
2757    //check for the incoming item's CAS validity
2758    if (!Item::isValidCas(itemMeta->cas)) {
2759        return ENGINE_KEY_EEXISTS;
2760    }
2761
2762    int bucket_num(0);
2763    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2764    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2765    if (!force) { // Need conflict resolution.
2766        if (v)  {
2767            if (v->isTempInitialItem()) {
2768                bgFetch(key, vbucket, cookie, true);
2769                return ENGINE_EWOULDBLOCK;
2770            }
2771
2772            if (!conflictResolver->resolve(*v, *itemMeta, true)) {
2773                ++stats.numOpsDelMetaResolutionFailed;
2774                return ENGINE_KEY_EEXISTS;
2775            }
2776        } else {
2777            // Item is 1) deleted or not existent in the value eviction case OR
2778            // 2) deleted or evicted in the full eviction.
2779            if (vb->maybeKeyExistsInFilter(key)) {
2780                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2781                                             cookie, true, isReplication);
2782            } else {
2783                // Even though bloomfilter predicted that item doesn't exist
2784                // on disk, we must put this delete on disk if the cas is valid.
2785                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2786                                                            eviction_policy,
2787                                                            isReplication);
2788                if (rv == ADD_NOMEM) {
2789                    return ENGINE_ENOMEM;
2790                }
2791                v = vb->ht.unlocked_find(key, bucket_num, true, false);
2792                v->setDeleted();
2793            }
2794        }
2795    } else {
2796        if (!v) {
2797            // We should always try to persist a delete here.
2798            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2799                                                        eviction_policy,
2800                                                        isReplication);
2801            if (rv == ADD_NOMEM) {
2802                return ENGINE_ENOMEM;
2803            }
2804            v = vb->ht.unlocked_find(key, bucket_num, true, false);
2805            v->setDeleted();
2806            v->setCas(*cas);
2807        } else if (v->isTempInitialItem()) {
2808            v->setDeleted();
2809            v->setCas(*cas);
2810        }
2811    }
2812
2813    if (v && v->isLocked(ep_current_time()) &&
2814        (vb->getState() == vbucket_state_replica ||
2815         vb->getState() == vbucket_state_pending)) {
2816        v->unlock();
2817    }
2818    mutation_type_t delrv;
2819    delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2820                                       eviction_policy, true);
2821    *cas = v ? v->getCas() : 0;
2822
2823    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2824    switch (delrv) {
2825    case NOMEM:
2826        ret = ENGINE_ENOMEM;
2827        break;
2828    case INVALID_VBUCKET:
2829        ret = ENGINE_NOT_MY_VBUCKET;
2830        break;
2831    case INVALID_CAS:
2832        ret = ENGINE_KEY_EEXISTS;
2833        break;
2834    case IS_LOCKED:
2835        ret = ENGINE_TMPFAIL;
2836        break;
2837    case NOT_FOUND:
2838        ret = ENGINE_KEY_ENOENT;
2839        break;
2840    case WAS_DIRTY:
2841    case WAS_CLEAN:
2842        if (genBySeqno == GenerateBySeqno::No) {
2843            v->setBySeqno(bySeqno);
2844        }
2845
2846        vb->setMaxCasAndTrackDrift(v->getCas());
2847
2848        if (tapBackfill) {
2849            tapQueueDirty(*vb, v, lh, seqno, genBySeqno);
2850        } else {
2851            queueDirty(vb, v, &lh, seqno, genBySeqno, generateCas);
2852        }
2853        break;
2854    case NEED_BG_FETCH:
2855        lh.unlock();
2856        bgFetch(key, vbucket, cookie, true);
2857        ret = ENGINE_EWOULDBLOCK;
2858    }
2859
2860    return ret;
2861}
2862
2863void EventuallyPersistentStore::reset() {
2864    auto buckets = vbMap.getBuckets();
2865    for (auto vbid : buckets) {
2866        RCPtr<VBucket> vb = getVBucket(vbid);
2867        if (vb) {
2868            LockHolder lh(vb_mutexes[vb->getId()]);
2869            vb->ht.clear();
2870            vb->checkpointManager.clear(vb->getState());
2871            vb->resetStats();
2872            vb->setPersistedSnapshot(0, 0);
2873        }
2874    }
2875
2876    ++stats.diskQueueSize;
2877    bool inverse = true;
2878    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, false);
2879    // Waking up (notifying) one flusher is good enough for diskFlushAll
2880    vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2881}
2882
2883/**
2884 * Callback invoked after persisting an item from memory to disk.
2885 *
2886 * This class exists to create a closure around a few variables within
2887 * EventuallyPersistentStore::flushOne so that an object can be
2888 * requeued in case of failure to store in the underlying layer.
2889 */
2890class PersistenceCallback : public Callback<mutation_result>,
2891                            public Callback<int> {
2892public:
2893
2894    PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2895                        EventuallyPersistentStore& st, EPStats& s, uint64_t c)
2896        : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2897        if (!vb) {
2898            throw std::invalid_argument("PersistenceCallback(): vb is NULL");
2899        }
2900    }
2901
2902    // This callback is invoked for set only.
2903    void callback(mutation_result &value) {
2904        if (value.first == 1) {
2905            int bucket_num(0);
2906            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2907                                                        &bucket_num);
2908            StoredValue *v = store.fetchValidValue(vbucket,
2909                                                   queuedItem->getKey(),
2910                                                   bucket_num, true, false);
2911            if (v) {
2912                if (v->getCas() == cas) {
2913                    // mark this item clean only if current and stored cas
2914                    // value match
2915                    v->markClean();
2916                }
2917                if (v->isNewCacheItem()) {
2918                    if (value.second) {
2919                        // Insert in value-only or full eviction mode.
2920                        ++vbucket->opsCreate;
2921                        vbucket->incrMetaDataDisk(*queuedItem);
2922                    } else { // Update in full eviction mode.
2923                        vbucket->ht.decrNumTotalItems();
2924                        ++vbucket->opsUpdate;
2925                    }
2926                    v->setNewCacheItem(false);
2927                } else { // Update in value-only or full eviction mode.
2928                    ++vbucket->opsUpdate;
2929                }
2930            }
2931
2932            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2933            stats.decrDiskQueueSize(1);
2934            stats.totalPersisted++;
2935        } else {
2936            // If the return was 0 here, we're in a bad state because
2937            // we do not know the rowid of this object.
2938            if (value.first == 0) {
2939                int bucket_num(0);
2940                LockHolder lh = vbucket->ht.getLockedBucket(
2941                                           queuedItem->getKey(), &bucket_num);
2942                StoredValue *v = store.fetchValidValue(vbucket,
2943                                                       queuedItem->getKey(),
2944                                                       bucket_num, true,
2945                                                       false);
2946                if (v) {
2947                    std::stringstream ss;
2948                    ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2949                       << queuedItem->getVBucketId() << " (rowid="
2950                       << v->getBySeqno() << ") returned 0 updates\n";
2951                    LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2952                } else {
2953                    LOG(EXTENSION_LOG_WARNING,
2954                        "Error persisting now missing ``%s'' from vb%d",
2955                        queuedItem->getKey().c_str(),
2956                        queuedItem->getVBucketId());
2957                }
2958
2959                vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2960                stats.decrDiskQueueSize(1);
2961            } else {
2962                std::stringstream ss;
2963                ss <<
2964                "Fatal error in persisting SET ``" <<
2965                queuedItem->getKey() << "'' on vb "
2966                   << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2967                LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2968                redirty();
2969            }
2970        }
2971    }
2972
2973    // This callback is invoked for deletions only.
2974    //
2975    // The boolean indicates whether the underlying storage
2976    // successfully deleted the item.
2977    void callback(int &value) {
2978        // > 1 would be bad.  We were only trying to delete one row.
2979        if (value > 1) {
2980            throw std::logic_error("PersistenceCallback::callback: value "
2981                    "(which is " + std::to_string(value) +
2982                    ") should be <= 1 for deletions");
2983        }
2984        // -1 means fail
2985        // 1 means we deleted one row
2986        // 0 means we did not delete a row, but did not fail (did not exist)
2987        if (value >= 0) {
2988            // We have successfully removed an item from the disk, we
2989            // may now remove it from the hash table.
2990            int bucket_num(0);
2991            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2992                                                        &bucket_num);
2993            StoredValue *v = store.fetchValidValue(vbucket,
2994                                                   queuedItem->getKey(),
2995                                                   bucket_num, true, false);
2996            // Delete the item in the hash table iff:
2997            //  1. Item is existent in hashtable, and deleted flag is true
2998            //  2. rev seqno of queued item matches rev seqno of hash table item
2999            if (v && v->isDeleted() &&
3000                (queuedItem->getRevSeqno() == v->getRevSeqno())) {
3001                bool newCacheItem = v->isNewCacheItem();
3002                bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
3003                                                        bucket_num);
3004                if (!deleted) {
3005                    throw std::logic_error("PersistenceCallback:callback: "
3006                            "Failed to delete key '" + queuedItem->getKey() +
3007                            "' from bucket " + std::to_string(bucket_num));
3008                }
3009                if (newCacheItem && value > 0) {
3010                    // Need to decrement the item counter again for an item that
3011                    // exists on DB file, but not in memory (i.e., full eviction),
3012                    // because we created the temp item in memory and incremented
3013                    // the item counter when a deletion is pushed in the queue.
3014                    vbucket->ht.decrNumTotalItems();
3015                }
3016
3017                /**
3018                 * Deleted items are to be added to the bloomfilter,
3019                 * in either eviction policy.
3020                 */
3021                vbucket->addToFilter(queuedItem->getKey());
3022            }
3023
3024            if (value > 0) {
3025                ++stats.totalPersisted;
3026                ++vbucket->opsDelete;
3027            }
3028            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3029            stats.decrDiskQueueSize(1);
3030            vbucket->decrMetaDataDisk(*queuedItem);
3031        } else {
3032            std::stringstream ss;
3033            ss << "Fatal error in persisting DELETE ``" <<
3034            queuedItem->getKey() << "'' on vb "
3035               << queuedItem->getVBucketId() << "!!! Requeue it...\n";
3036            LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
3037            redirty();
3038        }
3039    }
3040
3041private:
3042
3043    void redirty() {
3044        if (store.vbMap.isBucketDeletion(vbucket->getId())) {
3045            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
3046            stats.decrDiskQueueSize(1);
3047            return;
3048        }
3049        ++stats.flushFailed;
3050        store.invokeOnLockedStoredValue(queuedItem->getKey(),
3051                                         queuedItem->getVBucketId(),
3052                                         &StoredValue::reDirty);
3053        vbucket->rejectQueue.push(queuedItem);
3054        ++vbucket->opsReject;
3055    }
3056
3057    const queued_item queuedItem;
3058    RCPtr<VBucket> vbucket;
3059    EventuallyPersistentStore& store;
3060    EPStats& stats;
3061    uint64_t cas;
3062    DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
3063};
3064
3065bool EventuallyPersistentStore::scheduleFlushAllTask(const void* cookie,
3066                                                     time_t when) {
3067    bool inverse = false;
3068    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
3069        flushAllTaskCtx.cookie = cookie;
3070        flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3071        ExTask task = new FlushAllTask(&engine, static_cast<double>(when));
3072        ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);
3073        return true;
3074    } else {
3075        return false;
3076    }
3077}
3078
3079void EventuallyPersistentStore::setFlushAllComplete() {
3080    // Notify memcached about flushAll task completion, and
3081    // set diskFlushall flag to false
3082    if (flushAllTaskCtx.cookie) {
3083        engine.notifyIOComplete(flushAllTaskCtx.cookie, ENGINE_SUCCESS);
3084    }
3085    bool inverse = false;
3086    flushAllTaskCtx.delayFlushAll.compare_exchange_strong(inverse, true);
3087    inverse = true;
3088    diskFlushAll.compare_exchange_strong(inverse, false);
3089}
3090
3091void EventuallyPersistentStore::flushOneDeleteAll() {
3092    for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
3093        RCPtr<VBucket> vb = getVBucket(i);
3094        // Reset the vBucket if it's non-null and not already in the middle of
3095        // being created / destroyed.
3096        if (vb &&
3097            !(vbMap.isBucketCreation(i) || vbMap.isBucketDeletion(i))) {
3098            LockHolder lh(vb_mutexes[vb->getId()]);
3099            getRWUnderlying(vb->getId())->reset(i);
3100        }
3101    }
3102
3103    stats.decrDiskQueueSize(1);
3104    setFlushAllComplete();
3105}
3106
3107int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
3108    KVShard *shard = vbMap.getShardByVbId(vbid);
3109    if (diskFlushAll && !flushAllTaskCtx.delayFlushAll) {
3110        if (shard->getId() == EP_PRIMARY_SHARD) {
3111            flushOneDeleteAll();
3112        } else {
3113            // disk flush is pending just return
3114            return 0;
3115        }
3116    }
3117
3118    int items_flushed = 0;
3119    const rel_time_t flush_start = ep_current_time();
3120
3121    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
3122    if (vb) {
3123        LockHolder lh(vb_mutexes[vbid], true /*tryLock*/);
3124        if (!lh.islocked()) { // Try another bucket if this one is locked
3125            return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
3126        }
3127
3128        std::vector<queued_item> items;
3129        KVStore *rwUnderlying = getRWUnderlying(vbid);
3130
3131        while (!vb->rejectQueue.empty()) {
3132            items.push_back(vb->rejectQueue.front());
3133            vb->rejectQueue.pop();
3134        }
3135
3136        // Append any 'backfill' items (mutations added by a TAP stream).
3137        vb->getBackfillItems(items);
3138
3139        // Append all items outstanding for the persistence cursor.
3140        snapshot_range_t range;
3141        range = vb->checkpointManager.getAllItemsForCursor(
3142                CheckpointManager::pCursorName, items);
3143
3144        if (!items.empty()) {
3145            while (!rwUnderlying->begin()) {
3146                ++stats.beginFailed;
3147                LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
3148                    "Retry in 1 sec ...");
3149                sleep(1);
3150            }
3151            rwUnderlying->optimizeWrites(items);
3152
3153            Item *prev = NULL;
3154            auto vbstate = vb->getVBucketState();
3155            uint64_t maxSeqno = 0;
3156            range.start = std::max(range.start, vbstate.lastSnapStart);
3157
3158            bool mustCheckpointVBState = false;
3159            std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
3160
3161            for (const auto& item : items) {
3162
3163                if (!item->shouldPersist()) {
3164                    continue;
3165                }
3166
3167                if (item->getOperation() == queue_op::set_vbucket_state) {
3168                    // No actual item explicitly persisted to (this op exists
3169                    // to ensure a commit occurs with the current vbstate);
3170                    // flag that we must trigger a snapshot even if there are
3171                    // no 'real' items in the checkpoint.
3172                    mustCheckpointVBState = true;
3173
3174                    // Update queuing stats how this item has logically been
3175                    // processed.
3176                    stats.decrDiskQueueSize(1);
3177                    vb->doStatsForFlushing(*item, item->size());
3178
3179                } else if (!prev || prev->getKey() != item->getKey()) {
3180                    prev = item.get();
3181                    ++items_flushed;
3182                    PersistenceCallback *cb = flushOneDelOrSet(item, vb);
3183                    if (cb) {
3184                        pcbs.push_back(cb);
3185                    }
3186
3187                    maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
3188                    vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
3189                    if (item->isDeleted()) {
3190                        vbstate.maxDeletedSeqno =
3191                                std::max(vbstate.maxDeletedSeqno,
3192                                         item->getRevSeqno());
3193                    }
3194                    ++stats.flusher_todo;
3195
3196                } else {
3197                    // Item is the same key as the previous[1] one - don't need
3198                    // to flush to disk.
3199                    // [1] Previous here really means 'next' - optimizeWrites()
3200                    //     above has actually re-ordered items such that items
3201                    //     with the same key are ordered from high->low seqno.
3202                    //     This means we only write the highest (i.e. newest)
3203                    //     item for a given key, and discard any duplicate,
3204                    //     older items.
3205                    stats.decrDiskQueueSize(1);
3206                    vb->doStatsForFlushing(*item, item->size());
3207                }
3208            }
3209
3210
3211            {
3212                ReaderLockHolder rlh(vb->getStateLock());
3213                if (vb->getState() == vbucket_state_active) {
3214                    if (maxSeqno) {
3215                        range.start = maxSeqno;
3216                        range.end = maxSeqno;
3217                    }
3218                }
3219
3220                // Update VBstate based on the changes we have just made,
3221                // then tell the rwUnderlying the 'new' state
3222                // (which will persisted as part of the commit() below).
3223                vbstate.lastSnapStart = range.start;
3224                vbstate.lastSnapEnd = range.end;
3225
3226                // Do we need to trigger a persist of the state?
3227                // If there are no "real" items to flush, and we encountered
3228                // a set_vbucket_state meta-item.
3229                const bool persist = (items_flushed == 0) && mustCheckpointVBState;
3230
3231                KVStatsCallback kvcb(this);
3232                if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
3233                                                  &kvcb, persist) != true) {
3234                    return RETRY_FLUSH_VBUCKET;
3235                }
3236
3237                if (vbMap.setBucketCreation(vbid, false)) {
3238                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3239                }
3240            }
3241
3242            // Commit all mutations to disk if there is a non-zero number
3243            // of items to flush, and the commit interval is zero.
3244            if ((items_flushed > 0) &&
3245                (decrCommitInterval(shard->getId()) == 0)) {
3246
3247                commit(shard->getId());
3248
3249                // Now the commit is complete, vBucket file must exist.
3250                if (vbMap.setBucketCreation(vbid, false)) {
3251                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
3252                }
3253            }
3254
3255            hrtime_t end = gethrtime();
3256            uint64_t trans_time = (end - flush_start) / 1000000;
3257
3258            lastTransTimePerItem.store((items_flushed == 0) ? 0 :
3259                                       static_cast<double>(trans_time) /
3260                                       static_cast<double>(items_flushed));
3261            stats.cumulativeFlushTime.fetch_add(ep_current_time()
3262                                                - flush_start);
3263            stats.flusher_todo.store(0);
3264            stats.totalPersistVBState++;
3265
3266            if (vb->rejectQueue.empty()) {
3267                vb->setPersistedSnapshot(range.start, range.end);
3268                uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
3269                if (highSeqno > 0 &&
3270                    highSeqno != vbMap.getPersistenceSeqno(vbid)) {
3271                    vbMap.setPersistenceSeqno(vbid, highSeqno);
3272                }
3273            }
3274        }
3275
3276        rwUnderlying->pendingTasks();
3277
3278        if (vb->checkpointManager.getNumCheckpoints() > 1) {
3279            wakeUpCheckpointRemover();
3280        }
3281
3282        if (vb->rejectQueue.empty()) {
3283            vb->checkpointManager.itemsPersisted();
3284            uint64_t seqno = vbMap.getPersistenceSeqno(vbid);
3285            uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
3286            vb->notifyOnPersistence(engine, seqno, true);
3287            vb->notifyOnPersistence(engine, chkid, false);
3288            if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
3289                vbMap.setPersistenceCheckpointId(vbid, chkid);
3290            }
3291        } else {
3292            return RETRY_FLUSH_VBUCKET;
3293        }
3294    }
3295
3296    return items_flushed;
3297}
3298
3299void EventuallyPersistentStore::commit(uint16_t shardId) {
3300    KVStore *rwUnderlying = getRWUnderlyingByShard(shardId);
3301    std::list<PersistenceCallback *>& pcbs = rwUnderlying->getPersistenceCbList();
3302    BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
3303    hrtime_t commit_start = gethrtime();
3304
3305    KVStatsCallback cb(this);
3306    while (!rwUnderlying->commit(&cb)) {
3307        ++stats.commitFailed;
3308        LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
3309            "1 sec...\n");
3310        sleep(1);
3311    }
3312
3313    while (!pcbs.empty()) {
3314         delete pcbs.front();
3315         pcbs.pop_front();
3316    }
3317
3318    ++stats.flusherCommits;
3319    hrtime_t commit_end = gethrtime();
3320    uint64_t commit_time = (commit_end - commit_start) / 1000000;
3321    stats.commit_time.store(commit_time);
3322    stats.cumulativeCommitTime.fetch_add(commit_time);
3323}
3324
3325PersistenceCallback*
3326EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
3327                                            RCPtr<VBucket> &vb) {
3328
3329    if (!vb) {
3330        stats.decrDiskQueueSize(1);
3331        return NULL;
3332    }
3333
3334    int64_t bySeqno = qi->getBySeqno();
3335    bool deleted = qi->isDeleted();
3336    rel_time_t queued(qi->getQueuedTime());
3337
3338    int dirtyAge = ep_current_time() - queued;
3339    stats.dirtyAgeHisto.add(dirtyAge * 1000000);
3340    stats.dirtyAge.store(dirtyAge);
3341    stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
3342                                         stats.dirtyAgeHighWat.load()));
3343
3344    // Wait until the VB is deleted before writing
3345    if (vbMap.isBucketDeletion(qi->getVBucketId())) {
3346        vb->rejectQueue.push(qi);
3347        ++vb->opsReject;
3348        return NULL;
3349    }
3350
3351    KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
3352    if (!deleted) {
3353        // TODO: Need to separate disk_insert from disk_update because
3354        // bySeqno doesn't give us that information.
3355        BlockTimer timer(bySeqno == -1 ?
3356                         &stats.diskInsertHisto : &stats.diskUpdateHisto,
3357                         bySeqno == -1 ? "disk_insert" : "disk_update",
3358                         stats.timingLog);
3359        PersistenceCallback *cb =
3360            new PersistenceCallback(qi, vb, *this, stats, qi->getCas());
3361        rwUnderlying->set(*qi, *cb);
3362        return cb;
3363    } else {
3364        BlockTimer timer(&stats.diskDelHisto, "disk_delete",
3365                         stats.timingLog);
3366        PersistenceCallback *cb =
3367            new PersistenceCallback(qi, vb, *this, stats, 0);
3368        rwUnderlying->del(*qi, *cb);
3369        return cb;
3370    }
3371}
3372
3373void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
3374                                           StoredValue* v,
3375                                           LockHolder *plh,
3376                                           uint64_t *seqno,
3377                                           const GenerateBySeqno generateBySeqno,
3378                                           const GenerateCas generateCas) {
3379    if (vb) {
3380        queued_item qi(v->toItem(false, vb->getId()));
3381
3382        bool rv = vb->checkpointManager.queueDirty(*vb, qi,
3383                                                   generateBySeqno, generateCas);
3384        v->setBySeqno(qi->getBySeqno());
3385
3386        if (seqno) {
3387            *seqno = v->getBySeqno();
3388        }
3389
3390        if (GenerateCas::Yes == generateCas) {
3391            v->setCas(qi->getCas());
3392        }
3393
3394        if (plh) {
3395            plh->unlock();
3396        }
3397
3398        if (rv) {
3399            KVShard* shard = vbMap.getShardByVbId(vb->getId());
3400            shard->getFlusher()->notifyFlushEvent();
3401        }
3402
3403        // Now notify replication
3404        engine.getTapConnMap().notifyVBConnections(vb->getId());
3405        engine.getDcpConnMap().notifyVBConnections(vb->getId(),
3406                                                   qi->getBySeqno());
3407    }
3408}
3409
3410void EventuallyPersistentStore::tapQueueDirty(VBucket &vb,
3411                                              StoredValue* v,
3412                                              LockHolder& plh,
3413                                              uint64_t *seqno,
3414                                              const GenerateBySeqno generateBySeqno) {
3415    queued_item qi(v->toItem(false, vb.getId()));
3416
3417    bool queued = vb.queueBackfillItem(qi, generateBySeqno);
3418
3419    v->setBySeqno(qi->getBySeqno());
3420
3421    /* During backfill on a TAP receiver we need to update the snapshot
3422       range in the checkpoint. Has to be done here because in case of TAP
3423       backfill, above, we use vb.queueBackfillItem() instead of
3424       vb.checkpointManager.queueDirty() */
3425    if (GenerateBySeqno::Yes == generateBySeqno) {
3426        vb.checkpointManager.resetSnapshotRange();
3427    }
3428
3429    if (seqno) {
3430        *seqno = v->getBySeqno();
3431    }
3432
3433    plh.unlock();
3434
3435    if (queued) {
3436        KVShard* shard = vbMap.getShardByVbId(vb.getId());
3437        shard->getFlusher()->notifyFlushEvent();
3438    }
3439}
3440
3441std::vector<vbucket_state *> EventuallyPersistentStore::loadVBucketState()
3442{
3443    return getOneROUnderlying()->listPersistedVbuckets();
3444}
3445
3446void EventuallyPersistentStore::warmupCompleted() {
3447    // Snapshot VBucket state after warmup to ensure Failover table is
3448    // persisted.
3449    scheduleVBStatePersist();
3450
3451    if (engine.getConfiguration().getAlogPath().length() > 0) {
3452
3453        if (engine.getConfiguration().isAccessScannerEnabled()) {
3454            LockHolder lh(accessScanner.mutex);
3455            accessScanner.enabled = true;
3456            lh.unlock();
3457            LOG(EXTENSION_LOG_NOTICE, "Access Scanner task enabled");
3458            size_t smin = engine.getConfiguration().getAlogSleepTime();
3459            setAccessScannerSleeptime(smin, true);
3460        } else {
3461            LockHolder lh(accessScanner.mutex);
3462            accessScanner.enabled = false;
3463            LOG(EXTENSION_LOG_NOTICE, "Access Scanner task disabled");
3464        }
3465
3466        Configuration &config = engine.getConfiguration();
3467        config.addValueChangedListener("access_scanner_enabled",
3468                                       new EPStoreValueChangeListener(*this));
3469        config.addValueChangedListener("alog_sleep_time",
3470                                       new EPStoreValueChangeListener(*this))&