xref: /3.0.3-GA/ep-engine/src/ep.cc (revision 9156dea8)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string.h>
21#include <time.h>
22
23#include <fstream>
24#include <functional>
25#include <iostream>
26#include <map>
27#include <sstream>
28#include <string>
29#include <utility>
30#include <vector>
31
32#include "access_scanner.h"
33#include "checkpoint_remover.h"
34#include "conflict_resolution.h"
35#include "ep.h"
36#include "ep_engine.h"
37#include "failover-table.h"
38#include "flusher.h"
39#include "htresizer.h"
40#include "kvshard.h"
41#include "kvstore.h"
42#include "locks.h"
43#include "mutation_log.h"
44#include "warmup.h"
45#include "tapconnmap.h"
46#include "tapthrottle.h"
47
48class StatsValueChangeListener : public ValueChangedListener {
49public:
50    StatsValueChangeListener(EPStats &st) : stats(st) {
51        // EMPTY
52    }
53
54    virtual void sizeValueChanged(const std::string &key, size_t value) {
55        if (key.compare("max_size") == 0) {
56            stats.setMaxDataSize(value);
57            size_t low_wat = static_cast<size_t>
58                             (static_cast<double>(value) * 0.6);
59            size_t high_wat = static_cast<size_t>(
60                              static_cast<double>(value) * 0.75);
61            stats.mem_low_wat.store(low_wat);
62            stats.mem_high_wat.store(high_wat);
63        } else if (key.compare("mem_low_wat") == 0) {
64            stats.mem_low_wat.store(value);
65        } else if (key.compare("mem_high_wat") == 0) {
66            stats.mem_high_wat.store(value);
67        } else if (key.compare("tap_throttle_threshold") == 0) {
68            stats.tapThrottleThreshold.store(
69                                          static_cast<double>(value) / 100.0);
70        } else if (key.compare("warmup_min_memory_threshold") == 0) {
71            stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
72        } else if (key.compare("warmup_min_items_threshold") == 0) {
73            stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
74        } else {
75            LOG(EXTENSION_LOG_WARNING,
76                "Failed to change value for unknown variable, %s\n",
77                key.c_str());
78        }
79    }
80
81private:
82    EPStats &stats;
83};
84
85/**
86 * A configuration value changed listener that responds to ep-engine
87 * parameter changes by invoking engine-specific methods on
88 * configuration change events.
89 */
90class EPStoreValueChangeListener : public ValueChangedListener {
91public:
92    EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
93    }
94
95    virtual void sizeValueChanged(const std::string &key, size_t value) {
96        if (key.compare("bg_fetch_delay") == 0) {
97            store.setBGFetchDelay(static_cast<uint32_t>(value));
98        } else if (key.compare("exp_pager_stime") == 0) {
99            store.setExpiryPagerSleeptime(value);
100        } else if (key.compare("alog_sleep_time") == 0) {
101            store.setAccessScannerSleeptime(value);
102        } else if (key.compare("alog_task_time") == 0) {
103            store.resetAccessScannerStartTime();
104        } else if (key.compare("mutation_mem_threshold") == 0) {
105            double mem_threshold = static_cast<double>(value) / 100;
106            StoredValue::setMutationMemoryThreshold(mem_threshold);
107        } else if (key.compare("tap_throttle_queue_cap") == 0) {
108            store.getEPEngine().getTapThrottle().setQueueCap(value);
109        } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
110            store.getEPEngine().getTapThrottle().setCapPercent(value);
111        } else {
112            LOG(EXTENSION_LOG_WARNING,
113                "Failed to change value for unknown variable, %s\n",
114                key.c_str());
115        }
116    }
117
118private:
119    EventuallyPersistentStore &store;
120};
121
122class VBucketMemoryDeletionTask : public GlobalTask {
123public:
124    VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
125                              RCPtr<VBucket> &vb, double delay) :
126                              GlobalTask(&eng,
127                              Priority::VBMemoryDeletionPriority, delay,false),
128                              e(eng), vbucket(vb), vbid(vb->getId()) { }
129
130    std::string getDescription() {
131        std::stringstream ss;
132        ss << "Removing (dead) vbucket " << vbid << " from memory";
133        return ss.str();
134    }
135
136    bool run(void) {
137        vbucket->notifyAllPendingConnsFailed(e);
138        vbucket->ht.clear();
139        vbucket.reset();
140        return false;
141    }
142
143private:
144    EventuallyPersistentEngine &e;
145    RCPtr<VBucket> vbucket;
146    uint16_t vbid;
147};
148
149class PendingOpsNotification : public GlobalTask {
150public:
151    PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
152        GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
153        engine(e), vbucket(vb) { }
154
155    std::string getDescription() {
156        std::stringstream ss;
157        ss << "Notify pending operations for vbucket " << vbucket->getId();
158        return ss.str();
159    }
160
161    bool run(void) {
162        vbucket->fireAllOps(engine);
163        return false;
164    }
165
166private:
167    EventuallyPersistentEngine &engine;
168    RCPtr<VBucket> vbucket;
169};
170
171EventuallyPersistentStore::EventuallyPersistentStore(
172    EventuallyPersistentEngine &theEngine) :
173    engine(theEngine), stats(engine.getEpStats()),
174    vbMap(theEngine.getConfiguration(), *this),
175    bgFetchQueue(0),
176    diskFlushAll(false), bgFetchDelay(0), statsSnapshotTaskId(0),
177    lastTransTimePerItem(0),snapshotVBState(false)
178{
179    cachedResidentRatio.activeRatio.store(0);
180    cachedResidentRatio.replicaRatio.store(0);
181
182    Configuration &config = engine.getConfiguration();
183    MutationLog *shardlog;
184    for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
185        std::stringstream s;
186        s << i;
187        shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
188                                 "." + s.str(),
189                                 engine.getConfiguration().getAlogBlockSize());
190        accessLog.push_back(shardlog);
191    }
192
193    storageProperties = new StorageProperties(true, true, true, true);
194
195    ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
196
197    auxUnderlying = KVStoreFactory::create(stats, config, true);
198    cb_assert(auxUnderlying);
199
200    stats.memOverhead = sizeof(EventuallyPersistentStore);
201
202    if (config.getConflictResolutionType().compare("seqno") == 0) {
203        conflictResolver = new SeqBasedResolution();
204    }
205
206    stats.setMaxDataSize(config.getMaxSize());
207    config.addValueChangedListener("max_size",
208                                   new StatsValueChangeListener(stats));
209
210    stats.mem_low_wat.store(config.getMemLowWat());
211    config.addValueChangedListener("mem_low_wat",
212                                   new StatsValueChangeListener(stats));
213
214    stats.mem_high_wat.store(config.getMemHighWat());
215    config.addValueChangedListener("mem_high_wat",
216                                   new StatsValueChangeListener(stats));
217
218    stats.tapThrottleThreshold.store(static_cast<double>
219                                    (config.getTapThrottleThreshold())
220                                     / 100.0);
221    config.addValueChangedListener("tap_throttle_threshold",
222                                   new StatsValueChangeListener(stats));
223
224    stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
225    config.addValueChangedListener("tap_throttle_queue_cap",
226                                   new EPStoreValueChangeListener(*this));
227    config.addValueChangedListener("tap_throttle_cap_pcnt",
228                                   new EPStoreValueChangeListener(*this));
229
230    setBGFetchDelay(config.getBgFetchDelay());
231    config.addValueChangedListener("bg_fetch_delay",
232                                   new EPStoreValueChangeListener(*this));
233
234    stats.warmupMemUsedCap.store(static_cast<double>
235                               (config.getWarmupMinMemoryThreshold()) / 100.0);
236    config.addValueChangedListener("warmup_min_memory_threshold",
237                                   new StatsValueChangeListener(stats));
238    stats.warmupNumReadCap.store(static_cast<double>
239                                (config.getWarmupMinItemsThreshold()) / 100.0);
240    config.addValueChangedListener("warmup_min_items_threshold",
241                                   new StatsValueChangeListener(stats));
242
243    double mem_threshold = static_cast<double>
244                                      (config.getMutationMemThreshold()) / 100;
245    StoredValue::setMutationMemoryThreshold(mem_threshold);
246    config.addValueChangedListener("mutation_mem_threshold",
247                                   new EPStoreValueChangeListener(*this));
248
249    const std::string &policy = config.getItemEvictionPolicy();
250    if (policy.compare("value_only") == 0) {
251        eviction_policy = VALUE_ONLY;
252    } else {
253        eviction_policy = FULL_EVICTION;
254    }
255
256    // @todo - Ideally we should run the warmup thread in it's own
257    //         thread so that it won't block the flusher (in the write
258    //         thread), but we can't put it in the RO dispatcher either,
259    //         because that would block the background fetches..
260    warmupTask = new Warmup(this);
261}
262
263class WarmupWaitListener : public WarmupStateListener {
264public:
265    WarmupWaitListener(Warmup &f, bool wfw) :
266        warmup(f), waitForWarmup(wfw) { }
267
268    virtual void stateChanged(const int, const int to) {
269        if (waitForWarmup) {
270            if (to == WarmupState::Done) {
271                LockHolder lh(syncobject);
272                syncobject.notify();
273            }
274        } else if (to != WarmupState::Initialize) {
275            LockHolder lh(syncobject);
276            syncobject.notify();
277        }
278    }
279
280    void wait() {
281        LockHolder lh(syncobject);
282        // Verify that we're not already reached the state...
283        int currstate = warmup.getState().getState();
284
285        if (waitForWarmup) {
286            if (currstate == WarmupState::Done) {
287                return;
288            }
289        } else if (currstate != WarmupState::Initialize) {
290            return ;
291        }
292
293        syncobject.wait();
294    }
295
296private:
297    Warmup &warmup;
298    bool waitForWarmup;
299    SyncObject syncobject;
300};
301
302bool EventuallyPersistentStore::initialize() {
303    // We should nuke everything unless we want warmup
304    Configuration &config = engine.getConfiguration();
305    if (!config.isWarmup()) {
306        reset();
307    }
308
309    if (!startFlusher()) {
310        LOG(EXTENSION_LOG_WARNING,
311            "FATAL: Failed to create and start flushers");
312        return false;
313    }
314    if (!startBgFetcher()) {
315        LOG(EXTENSION_LOG_WARNING,
316           "FATAL: Failed to create and start bgfetchers");
317        return false;
318    }
319
320    WarmupWaitListener warmupListener(*warmupTask, config.isWaitforwarmup());
321    warmupTask->addWarmupStateListener(&warmupListener);
322    warmupTask->start();
323    warmupListener.wait();
324    warmupTask->removeWarmupStateListener(&warmupListener);
325
326    if (config.isVb0() && !vbMap.getBucket(0)) {
327        setVBucketState(0, vbucket_state_active, false);
328    }
329
330    if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
331        LOG(EXTENSION_LOG_WARNING,
332            "Warmup failed to load %d records due to OOM, exiting.\n",
333            static_cast<unsigned int>(stats.warmOOM));
334        return false;
335    }
336
337    ExTask itmpTask = new ItemPager(&engine, stats);
338    ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
339
340    size_t expiryPagerSleeptime = config.getExpPagerStime();
341    setExpiryPagerSleeptime(expiryPagerSleeptime);
342    config.addValueChangedListener("exp_pager_stime",
343                                   new EPStoreValueChangeListener(*this));
344
345    ExTask htrTask = new HashtableResizerTask(this, 10);
346    ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
347
348    size_t checkpointRemoverInterval = config.getChkRemoverStime();
349    ExTask chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
350                                                    checkpointRemoverInterval);
351    ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
352    return true;
353}
354
355EventuallyPersistentStore::~EventuallyPersistentStore() {
356    stopWarmup();
357    stopBgFetcher();
358    ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
359
360    ExecutorPool::get()->cancel(statsSnapshotTaskId);
361    ExecutorPool::get()->cancel(accessScanner.task);
362
363    stopFlusher();
364    ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
365
366    engine.getUprConnMap().closeAllStreams();
367
368    delete conflictResolver;
369    delete warmupTask;
370    delete auxUnderlying;
371    delete storageProperties;
372
373    std::vector<MutationLog*>::iterator it;
374    for (it = accessLog.begin(); it != accessLog.end(); it++) {
375        delete *it;
376    }
377}
378
379const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
380    return vbMap.getShard(shardId)->getFlusher();
381}
382
383Warmup* EventuallyPersistentStore::getWarmup(void) const {
384    return warmupTask;
385}
386
387bool EventuallyPersistentStore::startFlusher() {
388    for (uint16_t i = 0; i < vbMap.numShards; ++i) {
389        Flusher *flusher = vbMap.shards[i]->getFlusher();
390        flusher->start();
391    }
392    return true;
393}
394
395void EventuallyPersistentStore::stopFlusher() {
396    for (uint16_t i = 0; i < vbMap.numShards; i++) {
397        Flusher *flusher = vbMap.shards[i]->getFlusher();
398        bool rv = flusher->stop(stats.forceShutdown);
399        if (rv && !stats.forceShutdown) {
400            flusher->wait();
401        }
402    }
403}
404
405bool EventuallyPersistentStore::pauseFlusher() {
406    bool rv = true;
407    for (uint16_t i = 0; i < vbMap.numShards; i++) {
408        Flusher *flusher = vbMap.shards[i]->getFlusher();
409        if (!flusher->pause()) {
410            LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
411                "[%s], shard = %d", flusher->stateName(), i);
412            rv = false;
413        }
414    }
415    return rv;
416}
417
418bool EventuallyPersistentStore::resumeFlusher() {
419    bool rv = true;
420    for (uint16_t i = 0; i < vbMap.numShards; i++) {
421        Flusher *flusher = vbMap.shards[i]->getFlusher();
422        if (!flusher->resume()) {
423            LOG(EXTENSION_LOG_WARNING,
424                    "Warning: attempted to resume flusher in state [%s], "
425                    "shard = %d", flusher->stateName(), i);
426            rv = false;
427        }
428    }
429    return rv;
430}
431
432void EventuallyPersistentStore::wakeUpFlusher() {
433    if (stats.diskQueueSize.load() == 0) {
434        for (uint16_t i = 0; i < vbMap.numShards; i++) {
435            Flusher *flusher = vbMap.shards[i]->getFlusher();
436            flusher->wake();
437        }
438    }
439}
440
441bool EventuallyPersistentStore::startBgFetcher() {
442    for (uint16_t i = 0; i < vbMap.numShards; i++) {
443        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
444        if (bgfetcher == NULL) {
445            LOG(EXTENSION_LOG_WARNING,
446                "Falied to start bg fetcher for shard %d", i);
447            return false;
448        }
449        bgfetcher->start();
450    }
451    return true;
452}
453
454void EventuallyPersistentStore::stopBgFetcher() {
455    for (uint16_t i = 0; i < vbMap.numShards; i++) {
456        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
457        if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
458            LOG(EXTENSION_LOG_WARNING,
459                "Shutting down engine while there are still pending data "
460                "read for shard %d from database storage", i);
461        }
462        LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
463        bgfetcher->stop();
464    }
465}
466
467RCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
468                                                vbucket_state_t wanted_state) {
469    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
470    vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
471    if (found_state == wanted_state) {
472        return vb;
473    } else {
474        RCPtr<VBucket> rv;
475        return rv;
476    }
477}
478
479void
480EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
481                                             time_t startTime,
482                                             uint64_t revSeqno) {
483    RCPtr<VBucket> vb = getVBucket(vbid);
484    if (vb) {
485        int bucket_num(0);
486        incExpirationStat(vb);
487        LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
488        StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
489        if (v) {
490            if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
491                // This is a temporary item whose background fetch for metadata
492                // has completed.
493                bool deleted = vb->ht.unlocked_del(key, bucket_num);
494                cb_assert(deleted);
495            } else if (v->isExpired(startTime) && !v->isDeleted()) {
496                vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
497                queueDirty(vb, v, false);
498                lh.unlock();
499            }
500        } else {
501            if (eviction_policy == FULL_EVICTION) {
502                // Create a temp item and delete and push it
503                // into the checkpoint queue.
504                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
505                                                            eviction_policy);
506                if (rv == ADD_NOMEM) {
507                    return;
508                }
509                v = vb->ht.unlocked_find(key, bucket_num, true, false);
510                v->setStoredValueState(StoredValue::state_deleted_key);
511                v->setRevSeqno(revSeqno);
512                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
513                queueDirty(vb, v, false);
514                lh.unlock();
515            }
516        }
517    }
518}
519
520void
521EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
522                                                        std::string> > &keys) {
523    std::list<std::pair<uint16_t, std::string> >::iterator it;
524    time_t startTime = ep_real_time();
525    for (it = keys.begin(); it != keys.end(); it++) {
526        deleteExpiredItem(it->first, it->second, startTime, 0);
527    }
528}
529
530StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
531                                                        const std::string &key,
532                                                        int bucket_num,
533                                                        bool wantDeleted,
534                                                        bool trackReference,
535                                                        bool queueExpired) {
536    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
537                                          trackReference);
538    if (v && !v->isDeleted() && !v->isTempItem()) {
539        // In the deleted case, we ignore expiration time.
540        if (v->isExpired(ep_real_time())) {
541            if (vb->getState() != vbucket_state_active) {
542                return wantDeleted ? v : NULL;
543            }
544            if (queueExpired) {
545                incExpirationStat(vb, false);
546                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
547                queueDirty(vb, v, false, false);
548            }
549            return wantDeleted ? v : NULL;
550        }
551    }
552    return v;
553}
554
555protocol_binary_response_status EventuallyPersistentStore::evictKey(
556                                                        const std::string &key,
557                                                        uint16_t vbucket,
558                                                        const char **msg,
559                                                        size_t *msg_size,
560                                                        bool force) {
561    RCPtr<VBucket> vb = getVBucket(vbucket);
562    if (!vb || (vb->getState() != vbucket_state_active && !force)) {
563        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
564    }
565
566    int bucket_num(0);
567    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
568    StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
569
570    protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
571
572    *msg_size = 0;
573    if (v) {
574        if (force)  {
575            v->markClean();
576        }
577        if (v->isResident()) {
578            if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
579                *msg = "Ejected.";
580            } else {
581                *msg = "Can't eject: Dirty object.";
582                rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
583            }
584        } else {
585            *msg = "Already ejected.";
586        }
587    } else {
588        if (eviction_policy == VALUE_ONLY) {
589            *msg = "Not found.";
590            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
591        } else {
592            *msg = "Already ejected.";
593        }
594    }
595
596    return rv;
597}
598
599ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
600                                                        LockHolder &lock,
601                                                        int bucket_num,
602                                                        const std::string &key,
603                                                        RCPtr<VBucket> &vb,
604                                                        const void *cookie,
605                                                        bool metadataOnly) {
606
607    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
608                                                eviction_policy);
609    switch(rv) {
610        case ADD_NOMEM:
611            return ENGINE_ENOMEM;
612        case ADD_EXISTS:
613        case ADD_UNDEL:
614        case ADD_SUCCESS:
615        case ADD_TMP_AND_BG_FETCH:
616            // Since the hashtable bucket is locked, we shouldn't get here
617            abort();
618        case ADD_BG_FETCH:
619            lock.unlock();
620            bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
621    }
622    return ENGINE_EWOULDBLOCK;
623}
624
625ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
626                                                 const void *cookie,
627                                                 bool force,
628                                                 uint8_t nru) {
629
630    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
631    if (!vb || vb->getState() == vbucket_state_dead) {
632        ++stats.numNotMyVBuckets;
633        return ENGINE_NOT_MY_VBUCKET;
634    } else if (vb->getState() == vbucket_state_replica && !force) {
635        ++stats.numNotMyVBuckets;
636        return ENGINE_NOT_MY_VBUCKET;
637    } else if (vb->getState() == vbucket_state_pending && !force) {
638        if (vb->addPendingOp(cookie)) {
639            return ENGINE_EWOULDBLOCK;
640        }
641    }
642
643    bool cas_op = (itm.getCas() != 0);
644    int bucket_num(0);
645    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
646    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
647                                          false);
648    if (v && v->isLocked(ep_current_time()) &&
649        (vb->getState() == vbucket_state_replica ||
650         vb->getState() == vbucket_state_pending)) {
651        v->unlock();
652    }
653    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
654                                                true, false,
655                                                eviction_policy, nru);
656
657    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
658    switch (mtype) {
659    case NOMEM:
660        ret = ENGINE_ENOMEM;
661        break;
662    case INVALID_CAS:
663    case IS_LOCKED:
664        ret = ENGINE_KEY_EEXISTS;
665        break;
666    case NOT_FOUND:
667        if (cas_op) {
668            ret = ENGINE_KEY_ENOENT;
669            break;
670        }
671        // FALLTHROUGH
672    case WAS_DIRTY:
673        // Even if the item was dirty, push it into the vbucket's open
674        // checkpoint.
675    case WAS_CLEAN:
676        queueDirty(vb, v);
677        break;
678    case NEED_BG_FETCH:
679        { // CAS operation with non-resident item + full eviction.
680            if (v) {
681                // temp item is already created. Simply schedule a bg fetch job
682                lh.unlock();
683                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
684                return ENGINE_EWOULDBLOCK;
685            }
686            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
687                                        cookie, true);
688            break;
689        }
690    case INVALID_VBUCKET:
691        ret = ENGINE_NOT_MY_VBUCKET;
692        break;
693    }
694
695    return ret;
696}
697
698ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
699                                                 const void *cookie)
700{
701    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
702    if (!vb || vb->getState() == vbucket_state_dead ||
703        vb->getState() == vbucket_state_replica) {
704        ++stats.numNotMyVBuckets;
705        return ENGINE_NOT_MY_VBUCKET;
706    } else if(vb->getState() == vbucket_state_pending) {
707        if (vb->addPendingOp(cookie)) {
708            return ENGINE_EWOULDBLOCK;
709        }
710    }
711
712    if (itm.getCas() != 0) {
713        // Adding with a cas value doesn't make sense..
714        return ENGINE_NOT_STORED;
715    }
716
717    int bucket_num(0);
718    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
719    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
720                                          false);
721    add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
722                                           eviction_policy);
723
724    switch (atype) {
725    case ADD_NOMEM:
726        return ENGINE_ENOMEM;
727    case ADD_EXISTS:
728        return ENGINE_NOT_STORED;
729    case ADD_TMP_AND_BG_FETCH:
730        return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
731                                     cookie, true);
732    case ADD_BG_FETCH:
733        lh.unlock();
734        bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
735        return ENGINE_EWOULDBLOCK;
736    case ADD_SUCCESS:
737    case ADD_UNDEL:
738        queueDirty(vb, v);
739        break;
740    }
741    return ENGINE_SUCCESS;
742}
743
744ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
745                                                                uint8_t nru) {
746
747    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
748    if (!vb ||
749        vb->getState() == vbucket_state_dead ||
750        vb->getState() == vbucket_state_active) {
751        ++stats.numNotMyVBuckets;
752        return ENGINE_NOT_MY_VBUCKET;
753    }
754
755    int bucket_num(0);
756    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
757    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
758                                          false);
759
760    // Note that this function is only called on replica or pending vbuckets.
761    if (v && v->isLocked(ep_current_time())) {
762        v->unlock();
763    }
764    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
765                                                eviction_policy, nru);
766
767    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
768    switch (mtype) {
769    case NOMEM:
770        ret = ENGINE_ENOMEM;
771        break;
772    case INVALID_CAS:
773    case IS_LOCKED:
774        ret = ENGINE_KEY_EEXISTS;
775        break;
776    case WAS_DIRTY:
777        // If a given backfill item is already dirty, don't queue it again.
778        break;
779    case NOT_FOUND:
780        // FALLTHROUGH
781    case WAS_CLEAN:
782        queueDirty(vb, v, true, true, true);
783        break;
784    case INVALID_VBUCKET:
785        ret = ENGINE_NOT_MY_VBUCKET;
786        break;
787    case NEED_BG_FETCH:
788        // SET on a non-active vbucket should not require a bg_metadata_fetch.
789        abort();
790    }
791
792    return ret;
793}
794
795class KVStatsCallback : public Callback<kvstats_ctx> {
796    public:
797        KVStatsCallback(EventuallyPersistentStore *store)
798            : epstore(store) { }
799
800        void callback(kvstats_ctx &ctx) {
801            RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
802            if (vb) {
803                vb->fileSpaceUsed = ctx.fileSpaceUsed;
804                vb->fileSize = ctx.fileSize;
805            }
806        }
807
808    private:
809        EventuallyPersistentStore *epstore;
810};
811
812void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
813                                                 uint16_t shardId) {
814
815    class VBucketStateVisitor : public VBucketVisitor {
816    public:
817        VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
818            : vbuckets(vb_map), shardId(sid) { }
819        bool visitBucket(RCPtr<VBucket> &vb) {
820            if (vbuckets.getShard(vb->getId())->getId() == shardId) {
821                vbucket_state vb_state;
822                vb_state.state = vb->getState();
823                vb_state.checkpointId = vbuckets.getPersistenceCheckpointId(
824                                        vb->getId());
825                vb_state.maxDeletedSeqno = 0;
826                vb_state.failovers = vb->failovers->toJSON();
827                states[vb->getId()] = vb_state;
828            }
829            return false;
830        }
831
832        void visit(StoredValue*) {
833            cb_assert(false); // this does not happen
834        }
835
836        std::map<uint16_t, vbucket_state> states;
837
838    private:
839        VBucketMap &vbuckets;
840        uint16_t shardId;
841    };
842
843    KVShard *shard = vbMap.shards[shardId];
844    if (priority == Priority::VBucketPersistLowPriority) {
845        shard->setLowPriorityVbSnapshotFlag(false);
846    } else {
847        shard->setHighPriorityVbSnapshotFlag(false);
848    }
849
850    KVStatsCallback kvcb(this);
851    VBucketStateVisitor v(vbMap, shard->getId());
852    visit(v);
853    hrtime_t start = gethrtime();
854    LockHolder lh(shard->getWriteLock());
855    KVStore *rwUnderlying = shard->getRWUnderlying();
856    if (!rwUnderlying->snapshotVBuckets(v.states, &kvcb)) {
857        LOG(EXTENSION_LOG_WARNING,
858            "VBucket snapshot task failed!!! Rescheduling");
859        scheduleVBSnapshot(priority, shard->getId());
860    } else {
861        stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
862    }
863
864    if (priority == Priority::VBucketPersistHighPriority) {
865        std::map<uint16_t, vbucket_state>::iterator it = v.states.begin();
866        for (; it != v.states.end(); ++it) {
867            vbMap.setBucketCreation(it->first, false);
868        }
869    }
870}
871
872ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
873                                                           vbucket_state_t to,
874                                                           bool transfer,
875                                                           bool notify_upr) {
876    // Lock to prevent a race condition between a failed update and add.
877    LockHolder lh(vbsetMutex);
878    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
879    if (vb && to == vb->getState()) {
880        return ENGINE_SUCCESS;
881    }
882
883    uint16_t shardId = vbMap.getShard(vbid)->getId();
884    if (vb) {
885        vbucket_state_t oldstate = vb->getState();
886        if (oldstate != to && notify_upr) {
887            engine.getUprConnMap().vbucketStateChanged(vbid, to);
888        }
889
890        vb->setState(to, engine.getServerApi());
891        if (to == vbucket_state_active && !transfer) {
892            vb->failovers->createEntry(vb->getHighSeqno());
893        }
894        lh.unlock();
895        if (oldstate == vbucket_state_pending &&
896            to == vbucket_state_active) {
897            ExTask notifyTask = new PendingOpsNotification(engine, vb);
898            ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
899        }
900        scheduleVBSnapshot(Priority::VBucketPersistLowPriority, shardId, true);
901    } else {
902        FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
903        RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
904                                         engine.getCheckpointConfig(),
905                                         vbMap.getShard(vbid), 0, ft));
906        // The first checkpoint for active vbucket should start with id 2.
907        uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
908        newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
909        if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
910            lh.unlock();
911            return ENGINE_ERANGE;
912        }
913        vbMap.setPersistenceCheckpointId(vbid, 0);
914        vbMap.setPersistenceSeqno(vbid, 0);
915        vbMap.setBucketCreation(vbid, true);
916        lh.unlock();
917        scheduleVBSnapshot(Priority::VBucketPersistHighPriority, shardId, true);
918    }
919    return ENGINE_SUCCESS;
920}
921
922void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
923    snapshotVBState = false;
924    KVShard *shard = NULL;
925    if (p == Priority::VBucketPersistHighPriority) {
926        for (size_t i = 0; i < vbMap.numShards; ++i) {
927            shard = vbMap.shards[i];
928            if (shard->setHighPriorityVbSnapshotFlag(true)) {
929                ExTask task = new VBSnapshotTask(&engine, p, i, false);
930                ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
931            }
932        }
933    } else {
934        for (size_t i = 0; i < vbMap.numShards; ++i) {
935            shard = vbMap.shards[i];
936            if (shard->setLowPriorityVbSnapshotFlag(true)) {
937                ExTask task = new VBSnapshotTask(&engine, p, i, false);
938                ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
939            }
940        }
941    }
942}
943
944void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
945                                                   uint16_t shardId,
946                                                   bool force) {
947    snapshotVBState = false;
948    KVShard *shard = vbMap.shards[shardId];
949    if (p == Priority::VBucketPersistHighPriority) {
950        if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
951            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
952            ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
953        }
954    } else {
955        if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
956            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
957            ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
958        }
959    }
960}
961
962bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
963                                                        const void* cookie,
964                                                        bool recreate) {
965    LockHolder lh(vbsetMutex);
966
967    hrtime_t start_time(gethrtime());
968    bool success = true;
969    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
970    if (!vb || vb->getState() == vbucket_state_dead ||
971         vbMap.isBucketDeletion(vbid)) {
972        lh.unlock();
973        uint16_t sid = vbMap.getShard(vbid)->getId();
974        KVShard *shard = vbMap.shards[sid];
975        LockHolder ls(shard->getWriteLock());
976        KVStore *rwUnderlying = getRWUnderlying(vbid);
977        if (rwUnderlying->delVBucket(vbid, recreate)) {
978            vbMap.setBucketDeletion(vbid, false);
979            vbMap.setPersistenceSeqno(vbid, 0);
980            ++stats.vbucketDeletions;
981        } else {
982            ++stats.vbucketDeletionFail;
983            success = false;
984        }
985    }
986
987    if (success) {
988        hrtime_t spent(gethrtime() - start_time);
989        hrtime_t wall_time = spent / 1000;
990        BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
991        stats.diskVBDelHisto.add(wall_time);
992        atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
993        stats.vbucketDelTotWalltime.fetch_add(wall_time);
994        if (cookie) {
995            engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
996        }
997        return true;
998    }
999
1000    return false;
1001}
1002
1003void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1004                                                   const void* cookie,
1005                                                   double delay,
1006                                                   bool recreate) {
1007    ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1008    ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1009
1010    uint16_t vbid = vb->getId();
1011    if (vbMap.setBucketDeletion(vbid, true)) {
1012        ExTask task = new VBDeleteTask(&engine, vbid, cookie,
1013                                       Priority::VBucketDeletionPriority,
1014                                       vbMap.getShard(vbid)->getId(),
1015                                       recreate, delay);
1016        ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1017    }
1018}
1019
1020ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1021                                                           const void* c) {
1022    // Lock to prevent a race condition between a failed update and add
1023    // (and delete).
1024    LockHolder lh(vbsetMutex);
1025
1026    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1027    if (!vb) {
1028        return ENGINE_NOT_MY_VBUCKET;
1029    }
1030
1031    engine.getUprConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1032    vbMap.removeBucket(vbid);
1033    lh.unlock();
1034    scheduleVBDeletion(vb, c);
1035    scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
1036                       vbMap.getShard(vbid)->getId());
1037    if (c) {
1038        return ENGINE_EWOULDBLOCK;
1039    }
1040    return ENGINE_SUCCESS;
1041}
1042
1043ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1044                                                       compaction_ctx c,
1045                                                       const void *cookie) {
1046    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1047    if (!vb) {
1048        return ENGINE_NOT_MY_VBUCKET;
1049    }
1050
1051    ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
1052                                         vbid, c, cookie,
1053                                         vbMap.getShard(vbid)->getId());
1054
1055    ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1056
1057    LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1058        "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1059        task->getId(), vbid, c.purge_before_ts,
1060        c.purge_before_seq, c.drop_deletes);
1061
1062   return ENGINE_EWOULDBLOCK;
1063}
1064
1065class ExpiredItemsCallback : public Callback<compaction_ctx> {
1066    public:
1067        ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1068            : epstore(store), vbucket(vbid) { }
1069
1070        void callback(compaction_ctx &ctx) {
1071            std::list<expiredItemCtx>::iterator it;
1072            for (it  = ctx.expiredItems.begin();
1073                 it != ctx.expiredItems.end(); it++) {
1074                epstore->deleteExpiredItem(vbucket, it->keyStr,
1075                                           ctx.curr_time,
1076                                           it->revSeqno);
1077            }
1078        }
1079
1080    private:
1081        EventuallyPersistentStore *epstore;
1082        uint16_t vbucket;
1083};
1084
1085bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1086                                               compaction_ctx *ctx,
1087                                               const void *cookie) {
1088    KVShard *shard = vbMap.getShard(vbid);
1089    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1090    LockHolder lh(shard->getWriteLock());
1091    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1092    if (vb) {
1093        if (vb->getState() == vbucket_state_active) {
1094            // Set the current time ONLY for active vbuckets.
1095            ctx->curr_time = ep_real_time();
1096        } else {
1097            ctx->curr_time = 0;
1098        }
1099        KVStore *rwUnderlying = shard->getRWUnderlying();
1100        ExpiredItemsCallback cb(this, vbid);
1101        KVStatsCallback kvcb(this);
1102        if (!rwUnderlying->compactVBucket(vbid, ctx, cb, kvcb)) {
1103            LOG(EXTENSION_LOG_WARNING,
1104                    "VBucket compaction failed failed!!!");
1105            err = ENGINE_TMPFAIL;
1106            engine.storeEngineSpecific(cookie, NULL);
1107        } else {
1108            vb->setPurgeSeqno(ctx->purge_before_seq);
1109        }
1110    } else {
1111        err = ENGINE_NOT_MY_VBUCKET;
1112        engine.storeEngineSpecific(cookie, NULL);
1113    }
1114
1115    if (cookie) {
1116        engine.notifyIOComplete(cookie, err);
1117    }
1118    --stats.pendingCompactions;
1119    return false;
1120}
1121
1122bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1123    LockHolder lh(vbsetMutex);
1124    bool rv(false);
1125
1126    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1127    if (vb) {
1128        vbucket_state_t vbstate = vb->getState();
1129
1130        vbMap.removeBucket(vbid);
1131        lh.unlock();
1132
1133        std::list<std::string> tap_cursors = vb->checkpointManager.
1134                                             getTAPCursorNames();
1135        // Delete the vbucket database file and recreate the empty file
1136        scheduleVBDeletion(vb, NULL, 0, true);
1137        setVBucketState(vbid, vbstate, false);
1138
1139        // Copy the all cursors from the old vbucket into the new vbucket
1140        RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1141        newvb->checkpointManager.resetTAPCursors(tap_cursors);
1142
1143        rv = true;
1144    }
1145    return rv;
1146}
1147
1148extern "C" {
1149
1150    typedef struct {
1151        EventuallyPersistentEngine* engine;
1152        std::map<std::string, std::string> smap;
1153    } snapshot_stats_t;
1154
1155    static void add_stat(const char *key, const uint16_t klen,
1156                         const char *val, const uint32_t vlen,
1157                         const void *cookie) {
1158        cb_assert(cookie);
1159        void *ptr = const_cast<void *>(cookie);
1160        snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1161        ObjectRegistry::onSwitchThread(snap->engine);
1162
1163        std::string k(key, klen);
1164        std::string v(val, vlen);
1165        snap->smap.insert(std::pair<std::string, std::string>(k, v));
1166    }
1167}
1168
1169void EventuallyPersistentStore::snapshotStats() {
1170    snapshot_stats_t snap;
1171    snap.engine = &engine;
1172    std::map<std::string, std::string>  smap;
1173    bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1174              engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1175              engine.getStats(&snap, "upr", 3, add_stat) == ENGINE_SUCCESS;
1176    if (rv && stats.isShutdown) {
1177        snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1178                                                              "true" : "false";
1179        std::stringstream ss;
1180        ss << ep_real_time();
1181        snap.smap["ep_shutdown_time"] = ss.str();
1182    }
1183    getOneRWUnderlying()->snapshotStats(snap.smap);
1184}
1185
1186void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1187                                              const hrtime_t start,
1188                                              const hrtime_t stop) {
1189    if (stop >= start && start >= init) {
1190        // skip the measurement if the counter wrapped...
1191        ++stats.bgNumOperations;
1192        hrtime_t w = (start - init) / 1000;
1193        BlockTimer::log(start - init, "bgwait", stats.timingLog);
1194        stats.bgWaitHisto.add(w);
1195        stats.bgWait.fetch_add(w);
1196        atomic_setIfLess(stats.bgMinWait, w);
1197        atomic_setIfBigger(stats.bgMaxWait, w);
1198
1199        hrtime_t l = (stop - start) / 1000;
1200        BlockTimer::log(stop - start, "bgload", stats.timingLog);
1201        stats.bgLoadHisto.add(l);
1202        stats.bgLoad.fetch_add(l);
1203        atomic_setIfLess(stats.bgMinLoad, l);
1204        atomic_setIfBigger(stats.bgMaxLoad, l);
1205    }
1206}
1207
1208void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1209                                                uint16_t vbucket,
1210                                                uint64_t rowid,
1211                                                const void *cookie,
1212                                                hrtime_t init,
1213                                                bool isMeta) {
1214    hrtime_t start(gethrtime());
1215    // Go find the data
1216    RememberingCallback<GetValue> gcb;
1217    if (isMeta) {
1218        gcb.val.setPartial();
1219        ++stats.bg_meta_fetched;
1220    } else {
1221        ++stats.bg_fetched;
1222    }
1223    getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
1224    gcb.waitForValue();
1225    cb_assert(gcb.fired);
1226    ENGINE_ERROR_CODE status = gcb.val.getStatus();
1227
1228    // Lock to prevent a race condition between a fetch for restore and delete
1229    LockHolder lh(vbsetMutex);
1230
1231    RCPtr<VBucket> vb = getVBucket(vbucket);
1232    if (vb) {
1233        int bucket_num(0);
1234        LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1235        StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1236        if (isMeta) {
1237            if (v && v->unlocked_restoreMeta(gcb.val.getValue(),
1238                                             gcb.val.getStatus(), vb->ht)) {
1239                status = ENGINE_SUCCESS;
1240            }
1241        } else {
1242            if (v && v->isResident()) {
1243                status = ENGINE_SUCCESS;
1244            }
1245
1246            bool restore = false;
1247            if (eviction_policy == VALUE_ONLY &&
1248                v && !v->isResident() && !v->isDeleted()) {
1249                restore = true;
1250            } else if (eviction_policy == FULL_EVICTION &&
1251                       v && v->isTempInitialItem()) {
1252                restore = true;
1253            }
1254
1255            if (restore) {
1256                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1257                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1258                    cb_assert(v->isResident());
1259                    if (vb->getState() == vbucket_state_active &&
1260                        v->getExptime() != gcb.val.getValue()->getExptime() &&
1261                        v->getCas() == gcb.val.getValue()->getCas()) {
1262                        // MB-9306: It is possible that by the time bgfetcher
1263                        // returns, the item may have been updated and queued
1264                        // Hence test the CAS value to be the same first.
1265                        // exptime mutated, schedule it into new checkpoint
1266                        queueDirty(vb, v);
1267                        hlh.unlock();
1268                    }
1269                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1270                    v->setStoredValueState(
1271                                          StoredValue::state_non_existent_key);
1272                    if (eviction_policy == FULL_EVICTION) {
1273                        // For the full eviction, we should notify
1274                        // ENGINE_SUCCESS to the memcached worker thread, so
1275                        // that the worker thread can visit the ep-engine and
1276                        // figure out the correct error code.
1277                        status = ENGINE_SUCCESS;
1278                    }
1279                } else {
1280                    // underlying kvstore couldn't fetch requested data
1281                    // log returned error and notify TMPFAIL to client
1282                    LOG(EXTENSION_LOG_WARNING,
1283                        "Warning: failed background fetch for vb=%d seq=%d "
1284                        "key=%s", vbucket, v->getBySeqno(), key.c_str());
1285                    status = ENGINE_TMPFAIL;
1286                }
1287            }
1288        }
1289    } else {
1290        LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1291            " a bg fetch for key %s\n", vbucket, key.c_str());
1292        status = ENGINE_NOT_MY_VBUCKET;
1293    }
1294
1295    lh.unlock();
1296
1297    hrtime_t stop = gethrtime();
1298    updateBGStats(init, start, stop);
1299    bgFetchQueue--;
1300
1301    delete gcb.val.getValue();
1302    engine.notifyIOComplete(cookie, status);
1303}
1304
1305void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1306                                 std::vector<bgfetched_item_t> &fetchedItems,
1307                                 hrtime_t startTime)
1308{
1309    RCPtr<VBucket> vb = getVBucket(vbId);
1310    if (!vb) {
1311        std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1312        for (; itemItr != fetchedItems.end(); ++itemItr) {
1313            engine.notifyIOComplete((*itemItr).second->cookie,
1314                                    ENGINE_NOT_MY_VBUCKET);
1315        }
1316        LOG(EXTENSION_LOG_WARNING,
1317            "EP Store completes %d of batched background fetch for "
1318            "for vBucket = %d that is already deleted\n",
1319            (int)fetchedItems.size(), vbId);
1320        return;
1321    }
1322
1323    std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1324    for (; itemItr != fetchedItems.end(); ++itemItr) {
1325        VBucketBGFetchItem *bgitem = (*itemItr).second;
1326        ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1327        Item *fetchedValue = bgitem->value.getValue();
1328        const std::string &key = (*itemItr).first;
1329
1330        int bucket = 0;
1331        LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1332        StoredValue *v = fetchValidValue(vb, key, bucket, true);
1333        if (bgitem->metaDataOnly) {
1334            if (v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht)) {
1335                status = ENGINE_SUCCESS;
1336            }
1337        } else {
1338            if (v && v->isResident()) {
1339                status = ENGINE_SUCCESS;
1340            }
1341
1342            bool restore = false;
1343            if (eviction_policy == VALUE_ONLY &&
1344                v && !v->isResident() && !v->isDeleted()) {
1345                restore = true;
1346            } else if (eviction_policy == FULL_EVICTION &&
1347                       v && v->isTempInitialItem()) {
1348                restore = true;
1349            }
1350
1351            if (restore) {
1352                if (status == ENGINE_SUCCESS) {
1353                    v->unlocked_restoreValue(fetchedValue, vb->ht);
1354                    cb_assert(v->isResident());
1355                    if (vb->getState() == vbucket_state_active &&
1356                        v->getExptime() != fetchedValue->getExptime() &&
1357                        v->getCas() == fetchedValue->getCas()) {
1358                        // MB-9306: It is possible that by the time
1359                        // bgfetcher returns, the item may have been
1360                        // updated and queued
1361                        // Hence test the CAS value to be the same first.
1362                        // exptime mutated, schedule it into new checkpoint
1363                        queueDirty(vb, v);
1364                        blh.unlock();
1365                    }
1366                } else if (status == ENGINE_KEY_ENOENT) {
1367                    v->setStoredValueState(StoredValue::state_non_existent_key);
1368                    if (eviction_policy == FULL_EVICTION) {
1369                        // For the full eviction, we should notify
1370                        // ENGINE_SUCCESS to the memcached worker thread,
1371                        // so that the worker thread can visit the
1372                        // ep-engine and figure out the correct error
1373                        // code.
1374                        status = ENGINE_SUCCESS;
1375                    }
1376                } else {
1377                    // underlying kvstore couldn't fetch requested data
1378                    // log returned error and notify TMPFAIL to client
1379                    LOG(EXTENSION_LOG_WARNING,
1380                        "Warning: failed background fetch for vb=%d "
1381                        "key=%s", vbId, key.c_str());
1382                    status = ENGINE_TMPFAIL;
1383                }
1384            }
1385        }
1386        blh.unlock();
1387
1388        if (bgitem->metaDataOnly) {
1389            ++stats.bg_meta_fetched;
1390        } else {
1391            ++stats.bg_fetched;
1392        }
1393
1394        hrtime_t endTime = gethrtime();
1395        updateBGStats(bgitem->initTime, startTime, endTime);
1396        engine.notifyIOComplete(bgitem->cookie, status);
1397    }
1398
1399    LOG(EXTENSION_LOG_DEBUG,
1400        "EP Store completes %d of batched background fetch "
1401        "for vBucket = %d endTime = %lld\n",
1402        fetchedItems.size(), vbId, gethrtime()/1000000);
1403}
1404
1405void EventuallyPersistentStore::bgFetch(const std::string &key,
1406                                        uint16_t vbucket,
1407                                        uint64_t rowid,
1408                                        const void *cookie,
1409                                        bool isMeta) {
1410    std::stringstream ss;
1411
1412    if (multiBGFetchEnabled()) {
1413        RCPtr<VBucket> vb = getVBucket(vbucket);
1414        cb_assert(vb);
1415        KVShard *myShard = vbMap.getShard(vbucket);
1416
1417        // schedule to the current batch of background fetch of the given
1418        // vbucket
1419        VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1420                                                                isMeta);
1421        vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1422        myShard->getBgFetcher()->notifyBGEvent();
1423        ss << "Queued a background fetch, now at "
1424           << vb->numPendingBGFetchItems() << std::endl;
1425        LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1426    } else {
1427        bgFetchQueue++;
1428        stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1429                                            bgFetchQueue.load());
1430        ExecutorPool* iom = ExecutorPool::get();
1431        ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
1432                                      isMeta,
1433                                      Priority::BgFetcherGetMetaPriority,
1434                                      bgFetchDelay, false);
1435        iom->schedule(task, READER_TASK_IDX);
1436        ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1437           << std::endl;
1438        LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1439    }
1440}
1441
1442GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1443                                                uint16_t vbucket,
1444                                                const void *cookie,
1445                                                bool queueBG,
1446                                                bool honorStates,
1447                                                vbucket_state_t allowedState,
1448                                                bool trackReference) {
1449
1450    vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1451        vbucket_state_replica : vbucket_state_active;
1452    RCPtr<VBucket> vb = getVBucket(vbucket);
1453    if (!vb) {
1454        ++stats.numNotMyVBuckets;
1455        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1456    } else if (honorStates && vb->getState() == vbucket_state_dead) {
1457        ++stats.numNotMyVBuckets;
1458        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1459    } else if (honorStates && vb->getState() == disallowedState) {
1460        ++stats.numNotMyVBuckets;
1461        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1462    } else if (honorStates && vb->getState() == vbucket_state_pending) {
1463        if (vb->addPendingOp(cookie)) {
1464            return GetValue(NULL, ENGINE_EWOULDBLOCK);
1465        }
1466    }
1467
1468    int bucket_num(0);
1469    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1470    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1471                                     trackReference);
1472    if (v) {
1473        if (v->isDeleted() || v->isTempDeletedItem() ||
1474            v->isTempNonExistentItem()) {
1475            GetValue rv;
1476            return rv;
1477        }
1478        // If the value is not resident, wait for it...
1479        if (!v->isResident()) {
1480            if (queueBG) {
1481                bgFetch(key, vbucket, v->getBySeqno(), cookie);
1482            }
1483            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1484                            true, v->getNRUValue());
1485        }
1486
1487        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1488                    ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
1489        return rv;
1490    } else {
1491        if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1492            GetValue rv;
1493            return rv;
1494        }
1495        ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1496        if (queueBG) { // Full eviction and need a bg fetch.
1497            ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1498                                       cookie, false);
1499        }
1500        return GetValue(NULL, ec, -1, true);
1501    }
1502}
1503
1504GetValue EventuallyPersistentStore::getRandomKey() {
1505    long max = vbMap.getSize();
1506
1507    long start = random() % max;
1508    long curr = start;
1509    Item *itm = NULL;
1510
1511    while (itm == NULL) {
1512        RCPtr<VBucket> vb = getVBucket(curr++);
1513        while (!vb || vb->getState() != vbucket_state_active) {
1514            if (curr == start) {
1515                return GetValue(NULL, ENGINE_KEY_ENOENT);
1516            }
1517            if (curr == max) {
1518                curr = 0;
1519            }
1520
1521            vb = getVBucket(curr++);
1522        }
1523
1524        if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1525            GetValue rv(itm, ENGINE_SUCCESS);
1526            return rv;
1527        }
1528
1529        if (curr == max) {
1530            curr = 0;
1531        }
1532
1533        if (curr == start) {
1534            return GetValue(NULL, ENGINE_KEY_ENOENT);
1535        }
1536        // Search next vbucket
1537    }
1538
1539    return GetValue(NULL, ENGINE_KEY_ENOENT);
1540}
1541
1542
1543ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
1544                                                        const std::string &key,
1545                                                        uint16_t vbucket,
1546                                                        const void *cookie,
1547                                                        ItemMetaData &metadata,
1548                                                        uint32_t &deleted,
1549                                                        bool trackReferenced)
1550{
1551    (void) cookie;
1552    RCPtr<VBucket> vb = getVBucket(vbucket);
1553    if (!vb || vb->getState() == vbucket_state_dead ||
1554        vb->getState() == vbucket_state_replica) {
1555        ++stats.numNotMyVBuckets;
1556        return ENGINE_NOT_MY_VBUCKET;
1557    }
1558
1559    int bucket_num(0);
1560    deleted = 0;
1561    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1562    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
1563                                          trackReferenced);
1564
1565    if (v) {
1566        stats.numOpsGetMeta++;
1567
1568        if (v->isTempInitialItem()) { // Need bg meta fetch.
1569            bgFetch(key, vbucket, -1, cookie, true);
1570            return ENGINE_EWOULDBLOCK;
1571        } else if (v->isTempNonExistentItem()) {
1572            metadata.cas = v->getCas();
1573            return ENGINE_KEY_ENOENT;
1574        } else {
1575            if (v->isTempDeletedItem() || v->isDeleted() ||
1576                v->isExpired(ep_real_time())) {
1577                deleted |= GET_META_ITEM_DELETED_FLAG;
1578            }
1579            metadata.cas = v->getCas();
1580            metadata.flags = v->getFlags();
1581            metadata.exptime = v->getExptime();
1582            metadata.revSeqno = v->getRevSeqno();
1583            return ENGINE_SUCCESS;
1584        }
1585    } else {
1586        // The key wasn't found. However, this may be because it was previously
1587        // deleted or evicted with the full eviction strategy.
1588        // So, add a temporary item corresponding to the key to the hash table
1589        // and schedule a background fetch for its metadata from the persistent
1590        // store. The item's state will be updated after the fetch completes.
1591        return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
1592    }
1593}
1594
1595ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
1596                                                         uint64_t cas,
1597                                                         const void *cookie,
1598                                                         bool force,
1599                                                         bool allowExisting,
1600                                                         uint8_t nru,
1601                                                         bool genBySeqno)
1602{
1603    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1604    if (!vb || vb->getState() == vbucket_state_dead) {
1605        ++stats.numNotMyVBuckets;
1606        return ENGINE_NOT_MY_VBUCKET;
1607    } else if (vb->getState() == vbucket_state_replica && !force) {
1608        ++stats.numNotMyVBuckets;
1609        return ENGINE_NOT_MY_VBUCKET;
1610    } else if (vb->getState() == vbucket_state_pending && !force) {
1611        if (vb->addPendingOp(cookie)) {
1612            return ENGINE_EWOULDBLOCK;
1613        }
1614    }
1615
1616    int bucket_num(0);
1617    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1618    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1619                                          false);
1620
1621    if (!force) {
1622        if (v)  {
1623            if (v->isTempInitialItem()) {
1624                bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
1625                return ENGINE_EWOULDBLOCK;
1626            }
1627            if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
1628                ++stats.numOpsSetMetaResolutionFailed;
1629                return ENGINE_KEY_EEXISTS;
1630            }
1631        } else {
1632            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1633                                         cookie, true);
1634        }
1635    }
1636
1637    if (v && v->isLocked(ep_current_time()) &&
1638        (vb->getState() == vbucket_state_replica ||
1639         vb->getState() == vbucket_state_pending)) {
1640        v->unlock();
1641    }
1642    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
1643                                                true, eviction_policy, nru);
1644
1645    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1646    switch (mtype) {
1647    case NOMEM:
1648        ret = ENGINE_ENOMEM;
1649        break;
1650    case INVALID_CAS:
1651    case IS_LOCKED:
1652        ret = ENGINE_KEY_EEXISTS;
1653        break;
1654    case INVALID_VBUCKET:
1655        ret = ENGINE_NOT_MY_VBUCKET;
1656        break;
1657    case WAS_DIRTY:
1658    case WAS_CLEAN:
1659        queueDirty(vb, v, false, true, genBySeqno);
1660        break;
1661    case NOT_FOUND:
1662        ret = ENGINE_KEY_ENOENT;
1663        break;
1664    case NEED_BG_FETCH:
1665        {            // CAS operation with non-resident item + full eviction.
1666            if (v) { // temp item is already created. Simply schedule a
1667                lh.unlock(); // bg fetch job.
1668                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
1669                return ENGINE_EWOULDBLOCK;
1670            }
1671            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1672                                        cookie, true);
1673        }
1674    }
1675
1676    return ret;
1677}
1678
1679GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
1680                                                    uint16_t vbucket,
1681                                                    const void *cookie,
1682                                                    time_t exptime)
1683{
1684    RCPtr<VBucket> vb = getVBucket(vbucket);
1685    if (!vb) {
1686        ++stats.numNotMyVBuckets;
1687        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1688    } else if (vb->getState() == vbucket_state_dead) {
1689        ++stats.numNotMyVBuckets;
1690        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1691    } else if (vb->getState() == vbucket_state_replica) {
1692        ++stats.numNotMyVBuckets;
1693        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1694    } else if (vb->getState() == vbucket_state_pending) {
1695        if (vb->addPendingOp(cookie)) {
1696            return GetValue(NULL, ENGINE_EWOULDBLOCK);
1697        }
1698    }
1699
1700    int bucket_num(0);
1701    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1702    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1703
1704    if (v) {
1705        if (v->isDeleted() || v->isTempDeletedItem() ||
1706            v->isTempNonExistentItem()) {
1707            GetValue rv;
1708            return rv;
1709        }
1710
1711        if (!v->isResident()) {
1712            bgFetch(key, vbucket, v->getBySeqno(), cookie);
1713            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
1714        }
1715        if (v->isLocked(ep_current_time())) {
1716            GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
1717            return rv;
1718        }
1719
1720        bool exptime_mutated = exptime != v->getExptime() ? true : false;
1721        if (exptime_mutated) {
1722           v->markDirty();
1723           v->setExptime(exptime);
1724        }
1725
1726        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1727                    ENGINE_SUCCESS, v->getBySeqno());
1728
1729        if (exptime_mutated) {
1730            // persist the itme in the underlying storage for
1731            // mutated exptime
1732            queueDirty(vb, v);
1733            lh.unlock();
1734        }
1735        return rv;
1736    } else {
1737        if (eviction_policy == VALUE_ONLY) {
1738            GetValue rv;
1739            return rv;
1740        } else {
1741            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
1742                                                         vb, cookie, false);
1743            return GetValue(NULL, ec, -1, true);
1744        }
1745    }
1746}
1747
1748ENGINE_ERROR_CODE
1749EventuallyPersistentStore::statsVKey(const std::string &key,
1750                                     uint16_t vbucket,
1751                                     const void *cookie) {
1752    RCPtr<VBucket> vb = getVBucket(vbucket);
1753    if (!vb) {
1754        return ENGINE_NOT_MY_VBUCKET;
1755    }
1756
1757    int bucket_num(0);
1758    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1759    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1760
1761    if (v) {
1762        if (v->isDeleted() || v->isTempDeletedItem() ||
1763            v->isTempNonExistentItem()) {
1764            return ENGINE_KEY_ENOENT;
1765        }
1766        bgFetchQueue++;
1767        cb_assert(bgFetchQueue > 0);
1768        ExecutorPool* iom = ExecutorPool::get();
1769        ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
1770                                           v->getBySeqno(), cookie,
1771                                           Priority::VKeyStatBgFetcherPriority,
1772                                           bgFetchDelay, false);
1773        iom->schedule(task, READER_TASK_IDX);
1774        return ENGINE_EWOULDBLOCK;
1775    } else {
1776        if (eviction_policy == VALUE_ONLY) {
1777            return ENGINE_KEY_ENOENT;
1778        } else {
1779            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
1780                                                        eviction_policy);
1781            switch(rv) {
1782            case ADD_NOMEM:
1783                return ENGINE_ENOMEM;
1784            case ADD_EXISTS:
1785            case ADD_UNDEL:
1786            case ADD_SUCCESS:
1787            case ADD_TMP_AND_BG_FETCH:
1788                // Since the hashtable bucket is locked, we shouldn't get here
1789                abort();
1790            case ADD_BG_FETCH:
1791                {
1792                    ++bgFetchQueue;
1793                    cb_assert(bgFetchQueue > 0);
1794                    ExecutorPool* iom = ExecutorPool::get();
1795                    ExTask task = new VKeyStatBGFetchTask(&engine, key,
1796                                                          vbucket, -1, cookie,
1797                                           Priority::VKeyStatBgFetcherPriority,
1798                                                          bgFetchDelay, false);
1799                    iom->schedule(task, READER_TASK_IDX);
1800                }
1801            }
1802            return ENGINE_EWOULDBLOCK;
1803        }
1804    }
1805}
1806
1807void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
1808                                                  std::string &key,
1809                                                  uint16_t vbid,
1810                                                  uint64_t bySeqNum) {
1811    RememberingCallback<GetValue> gcb;
1812
1813    getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
1814    gcb.waitForValue();
1815    cb_assert(gcb.fired);
1816
1817    if (eviction_policy == FULL_EVICTION) {
1818        RCPtr<VBucket> vb = getVBucket(vbid);
1819        if (vb) {
1820            int bucket_num(0);
1821            LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1822            StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1823            if (v && v->isTempInitialItem()) {
1824                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1825                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1826                    cb_assert(v->isResident());
1827                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1828                    v->setStoredValueState(
1829                                          StoredValue::state_non_existent_key);
1830                } else {
1831                    // underlying kvstore couldn't fetch requested data
1832                    // log returned error and notify TMPFAIL to client
1833                    LOG(EXTENSION_LOG_WARNING,
1834                        "Warning: failed background fetch for vb=%d seq=%d "
1835                        "key=%s", vbid, v->getBySeqno(), key.c_str());
1836                }
1837            }
1838        }
1839    }
1840
1841    if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1842        engine.addLookupResult(cookie, gcb.val.getValue());
1843    } else {
1844        engine.addLookupResult(cookie, NULL);
1845    }
1846
1847    bgFetchQueue--;
1848    engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1849}
1850
1851bool EventuallyPersistentStore::getLocked(const std::string &key,
1852                                          uint16_t vbucket,
1853                                          Callback<GetValue> &cb,
1854                                          rel_time_t currentTime,
1855                                          uint32_t lockTimeout,
1856                                          const void *cookie) {
1857    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
1858    if (!vb) {
1859        ++stats.numNotMyVBuckets;
1860        GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
1861        cb.callback(rv);
1862        return false;
1863    }
1864
1865    int bucket_num(0);
1866    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1867    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1868
1869    if (v) {
1870        if (v->isDeleted() || v->isTempNonExistentItem() ||
1871            v->isTempDeletedItem()) {
1872            GetValue rv;
1873            cb.callback(rv);
1874            return true;
1875        }
1876
1877        // if v is locked return error
1878        if (v->isLocked(currentTime)) {
1879            GetValue rv;
1880            cb.callback(rv);
1881            return false;
1882        }
1883
1884        // If the value is not resident, wait for it...
1885        if (!v->isResident()) {
1886            if (cookie) {
1887                bgFetch(key, vbucket, v->getBySeqno(), cookie);
1888            }
1889            GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
1890            cb.callback(rv);
1891            return false;
1892        }
1893
1894        // acquire lock and increment cas value
1895        v->lock(currentTime + lockTimeout);
1896
1897        Item *it = v->toItem(false, vbucket);
1898        it->setCas();
1899        v->setCas(it->getCas());
1900
1901        GetValue rv(it);
1902        cb.callback(rv);
1903        return true;
1904    } else {
1905        if (eviction_policy == VALUE_ONLY) {
1906            GetValue rv;
1907            cb.callback(rv);
1908            return true;
1909        } else {
1910            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
1911                                                         vb, cookie, false);
1912            GetValue rv(NULL, ec, -1, true);
1913            cb.callback(rv);
1914            return false;
1915        }
1916    }
1917}
1918
1919ENGINE_ERROR_CODE
1920EventuallyPersistentStore::unlockKey(const std::string &key,
1921                                     uint16_t vbucket,
1922                                     uint64_t cas,
1923                                     rel_time_t currentTime)
1924{
1925
1926    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
1927    if (!vb) {
1928        ++stats.numNotMyVBuckets;
1929        return ENGINE_NOT_MY_VBUCKET;
1930    }
1931
1932    int bucket_num(0);
1933    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1934    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1935
1936    if (v) {
1937        if (v->isDeleted() || v->isTempNonExistentItem() ||
1938            v->isTempDeletedItem()) {
1939            return ENGINE_KEY_ENOENT;
1940        }
1941        if (v->isLocked(currentTime)) {
1942            if (v->getCas() == cas) {
1943                v->unlock();
1944                return ENGINE_SUCCESS;
1945            }
1946        }
1947        return ENGINE_TMPFAIL;
1948    } else {
1949        if (eviction_policy == VALUE_ONLY) {
1950            return ENGINE_KEY_ENOENT;
1951        } else {
1952            // With the full eviction, an item's lock is automatically
1953            // released when the item is evicted from memory. Therefore,
1954            // we simply return ENGINE_TMPFAIL when we receive unlockKey
1955            // for an item that is not in memocy cache. Note that we don't
1956            // spawn any bg fetch job to figure out if an item actually
1957            // exists in disk or not.
1958            return ENGINE_TMPFAIL;
1959        }
1960    }
1961}
1962
1963
1964ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
1965                                            const std::string &key,
1966                                            uint16_t vbucket,
1967                                            const void *cookie,
1968                                            struct key_stats &kstats,
1969                                            bool bgfetch,
1970                                            bool wantsDeleted)
1971{
1972    RCPtr<VBucket> vb = getVBucket(vbucket);
1973    if (!vb) {
1974        return ENGINE_NOT_MY_VBUCKET;
1975    }
1976
1977    int bucket_num(0);
1978    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1979    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1980
1981    if (v) {
1982        if ((v->isDeleted() && !wantsDeleted) ||
1983            v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1984            return ENGINE_KEY_ENOENT;
1985        }
1986        if (eviction_policy == FULL_EVICTION &&
1987            v->isTempInitialItem() && bgfetch) {
1988            lh.unlock();
1989            bgFetch(key, vbucket, -1, cookie, true);
1990            return ENGINE_EWOULDBLOCK;
1991        }
1992        kstats.logically_deleted = v->isDeleted();
1993        kstats.dirty = v->isDirty();
1994        kstats.exptime = v->getExptime();
1995        kstats.flags = v->getFlags();
1996        kstats.cas = v->getCas();
1997        kstats.vb_state = vb->getState();
1998        return ENGINE_SUCCESS;
1999    } else {
2000        if (eviction_policy == VALUE_ONLY) {
2001            return ENGINE_KEY_ENOENT;
2002        } else {
2003            if (bgfetch) {
2004                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2005                                             cookie, true);
2006            } else {
2007                return ENGINE_KEY_ENOENT;
2008            }
2009        }
2010    }
2011}
2012
2013std::string EventuallyPersistentStore::validateKey(const std::string &key,
2014                                                   uint16_t vbucket,
2015                                                   Item &diskItem) {
2016    int bucket_num(0);
2017    RCPtr<VBucket> vb = getVBucket(vbucket);
2018    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2019    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2020                                     false, true);
2021
2022    if (v) {
2023        if (v->isDeleted() || v->isTempNonExistentItem() ||
2024            v->isTempDeletedItem()) {
2025            return "item_deleted";
2026        }
2027
2028        if (diskItem.getFlags() != v->getFlags()) {
2029            return "flags_mismatch";
2030        } else if (v->isResident() && memcmp(diskItem.getData(),
2031                                             v->getValue()->getData(),
2032                                             diskItem.getNBytes())) {
2033            return "data_mismatch";
2034        } else {
2035            return "valid";
2036        }
2037    } else {
2038        return "item_deleted";
2039    }
2040
2041}
2042
2043ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2044                                                        uint64_t* cas,
2045                                                        uint16_t vbucket,
2046                                                        const void *cookie,
2047                                                        bool force,
2048                                                        ItemMetaData *itemMeta,
2049                                                        bool tapBackfill)
2050{
2051    RCPtr<VBucket> vb = getVBucket(vbucket);
2052    if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2053        ++stats.numNotMyVBuckets;
2054        return ENGINE_NOT_MY_VBUCKET;
2055    } else if(vb->getState() == vbucket_state_replica && !force) {
2056        ++stats.numNotMyVBuckets;
2057        return ENGINE_NOT_MY_VBUCKET;
2058    } else if(vb->getState() == vbucket_state_pending && !force) {
2059        if (vb->addPendingOp(cookie)) {
2060            return ENGINE_EWOULDBLOCK;
2061        }
2062    }
2063
2064    int bucket_num(0);
2065    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2066    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2067    if (!v || v->isDeleted() || v->isTempItem()) {
2068        if (eviction_policy == VALUE_ONLY) {
2069            return ENGINE_KEY_ENOENT;
2070        } else { // Full eviction.
2071            if (!force) {
2072                if (!v) { // Item might be evicted from cache.
2073                    return addTempItemForBgFetch(lh, bucket_num, key, vb,
2074                                                 cookie, true);
2075                } else if (v->isTempInitialItem()) {
2076                    lh.unlock();
2077                    bgFetch(key, vbucket, -1, cookie, true);
2078                    return ENGINE_EWOULDBLOCK;
2079                } else { // Non-existent or deleted key.
2080                    return ENGINE_KEY_ENOENT;
2081                }
2082            } else {
2083                if (!v) { // Item might be evicted from cache.
2084                    // Create a temp item and delete it below as it is a
2085                    // force deletion.
2086                    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
2087                                                              key,
2088                                                              eviction_policy);
2089                    if (rv == ADD_NOMEM) {
2090                        return ENGINE_ENOMEM;
2091                    }
2092                    v = vb->ht.unlocked_find(key, bucket_num, true, false);
2093                    v->setStoredValueState(StoredValue::state_deleted_key);
2094                } else if (v->isTempInitialItem()) {
2095                    v->setStoredValueState(StoredValue::state_deleted_key);
2096                } else { // Non-existent or deleted key.
2097                    return ENGINE_KEY_ENOENT;
2098                }
2099            }
2100        }
2101    }
2102
2103    if (v && v->isLocked(ep_current_time()) &&
2104        (vb->getState() == vbucket_state_replica ||
2105         vb->getState() == vbucket_state_pending)) {
2106        v->unlock();
2107    }
2108    mutation_type_t delrv;
2109    delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2110
2111    if (itemMeta && v) {
2112        itemMeta->revSeqno = v->getRevSeqno();
2113        itemMeta->cas = v->getCas();
2114        itemMeta->flags = v->getFlags();
2115        itemMeta->exptime = v->getExptime();
2116    }
2117    *cas = v ? v->getCas() : 0;
2118
2119    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2120    switch (delrv) {
2121    case NOMEM:
2122        ret = ENGINE_ENOMEM;
2123        break;
2124    case INVALID_VBUCKET:
2125        ret = ENGINE_NOT_MY_VBUCKET;
2126        break;
2127    case INVALID_CAS:
2128        ret = ENGINE_KEY_EEXISTS;
2129        break;
2130    case IS_LOCKED:
2131        ret = ENGINE_TMPFAIL;
2132        break;
2133    case NOT_FOUND:
2134        ret = ENGINE_KEY_ENOENT;
2135        break;
2136    case WAS_DIRTY:
2137    case WAS_CLEAN:
2138        queueDirty(vb, v, tapBackfill);
2139        break;
2140    case NEED_BG_FETCH:
2141        // We already figured out if a bg fetch is requred for a full-evicted
2142        // item above.
2143        abort();
2144    }
2145    return ret;
2146}
2147
2148ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2149                                                        const std::string &key,
2150                                                        uint64_t* cas,
2151                                                        uint16_t vbucket,
2152                                                        const void *cookie,
2153                                                        bool force,
2154                                                        ItemMetaData *itemMeta,
2155                                                        bool tapBackfill,
2156                                                        bool genBySeqno,
2157                                                        uint64_t bySeqno)
2158{
2159    RCPtr<VBucket> vb = getVBucket(vbucket);
2160    if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2161        ++stats.numNotMyVBuckets;
2162        return ENGINE_NOT_MY_VBUCKET;
2163    } else if(vb->getState() == vbucket_state_replica && !force) {
2164        ++stats.numNotMyVBuckets;
2165        return ENGINE_NOT_MY_VBUCKET;
2166    } else if(vb->getState() == vbucket_state_pending && !force) {
2167        if (vb->addPendingOp(cookie)) {
2168            return ENGINE_EWOULDBLOCK;
2169        }
2170    }
2171
2172    int bucket_num(0);
2173    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2174    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2175    if (!force) { // Need conflict resolution.
2176        if (v)  {
2177            if (v->isTempInitialItem()) {
2178                bgFetch(key, vbucket, -1, cookie, true);
2179                return ENGINE_EWOULDBLOCK;
2180            }
2181            if (!conflictResolver->resolve(v, *itemMeta, true)) {
2182                ++stats.numOpsDelMetaResolutionFailed;
2183                return ENGINE_KEY_EEXISTS;
2184            }
2185        } else {
2186            // Item is 1) deleted or not existent in the value eviction case OR
2187            // 2) deleted or evicted in the full eviction.
2188            return addTempItemForBgFetch(lh, bucket_num, key, vb,
2189                                         cookie, true);
2190        }
2191    } else {
2192        if (!v) {
2193            // Create a temp item and delete it below as it is a force deletion
2194            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2195                                                        eviction_policy);
2196            if (rv == ADD_NOMEM) {
2197                return ENGINE_ENOMEM;
2198            }
2199            v = vb->ht.unlocked_find(key, bucket_num, true, false);
2200            v->setStoredValueState(StoredValue::state_deleted_key);
2201        } else if (v->isTempInitialItem()) {
2202            v->setStoredValueState(StoredValue::state_deleted_key);
2203        }
2204    }
2205
2206    if (v && v->isLocked(ep_current_time()) &&
2207        (vb->getState() == vbucket_state_replica ||
2208         vb->getState() == vbucket_state_pending)) {
2209        v->unlock();
2210    }
2211    mutation_type_t delrv;
2212    delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2213                                       eviction_policy, true);
2214    *cas = v ? v->getCas() : 0;
2215
2216    if (!genBySeqno) {
2217        v->setBySeqno(bySeqno);
2218    }
2219
2220    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2221    switch (delrv) {
2222    case NOMEM:
2223        ret = ENGINE_ENOMEM;
2224        break;
2225    case INVALID_VBUCKET:
2226        ret = ENGINE_NOT_MY_VBUCKET;
2227        break;
2228    case INVALID_CAS:
2229        ret = ENGINE_KEY_EEXISTS;
2230        break;
2231    case IS_LOCKED:
2232        ret = ENGINE_TMPFAIL;
2233        break;
2234    case NOT_FOUND:
2235        ret = ENGINE_KEY_ENOENT;
2236        break;
2237    case WAS_DIRTY:
2238    case WAS_CLEAN:
2239            queueDirty(vb, v, tapBackfill, true, genBySeqno);
2240        break;
2241    case NEED_BG_FETCH:
2242        lh.unlock();
2243        bgFetch(key, vbucket, -1, cookie, true);
2244        ret = ENGINE_EWOULDBLOCK;
2245    }
2246
2247    return ret;
2248}
2249
2250void EventuallyPersistentStore::reset() {
2251    std::vector<int> buckets = vbMap.getBuckets();
2252    std::vector<int>::iterator it;
2253    for (it = buckets.begin(); it != buckets.end(); ++it) {
2254        RCPtr<VBucket> vb = getVBucket(*it);
2255        if (vb) {
2256            vb->ht.clear();
2257            vb->checkpointManager.clear(vb->getState());
2258            vb->resetStats();
2259        }
2260    }
2261
2262    bool inverse = false;
2263    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
2264        ++stats.diskQueueSize;
2265        // wake up (notify) one flusher is good enough for diskFlushAll
2266        vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2267    }
2268}
2269
2270/**
2271 * Callback invoked after persisting an item from memory to disk.
2272 *
2273 * This class exists to create a closure around a few variables within
2274 * EventuallyPersistentStore::flushOne so that an object can be
2275 * requeued in case of failure to store in the underlying layer.
2276 */
2277class PersistenceCallback : public Callback<mutation_result>,
2278                            public Callback<int> {
2279public:
2280
2281    PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2282                        EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2283        : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2284        cb_assert(vb);
2285        cb_assert(s);
2286    }
2287
2288    // This callback is invoked for set only.
2289    void callback(mutation_result &value) {
2290        if (value.first == 1) {
2291            int bucket_num(0);
2292            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2293                                                        &bucket_num);
2294            StoredValue *v = store->fetchValidValue(vbucket,
2295                                                    queuedItem->getKey(),
2296                                                    bucket_num, true, false);
2297            if (v) {
2298                if (v->getCas() == cas) {
2299                    // mark this item clean only if current and stored cas
2300                    // value match
2301                    v->markClean();
2302                }
2303                if (v->isNewCacheItem()) {
2304                    if (value.second) {
2305                        // Insert in value-only or full eviction mode.
2306                        ++vbucket->opsCreate;
2307                    } else { // Update in full eviction mode.
2308                        --vbucket->ht.numTotalItems;
2309                        ++vbucket->opsUpdate;
2310                    }
2311                    v->setNewCacheItem(false);
2312                } else { // Update in value-only or full eviction mode.
2313                    ++vbucket->opsUpdate;
2314                }
2315            }
2316
2317            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2318            stats->decrDiskQueueSize(1);
2319            stats->totalPersisted++;
2320        } else {
2321            // If the return was 0 here, we're in a bad state because
2322            // we do not know the rowid of this object.
2323            if (value.first == 0) {
2324                int bucket_num(0);
2325                LockHolder lh = vbucket->ht.getLockedBucket(
2326                                           queuedItem->getKey(), &bucket_num);
2327                StoredValue *v = store->fetchValidValue(vbucket,
2328                                                        queuedItem->getKey(),
2329                                                        bucket_num, true,
2330                                                        false);
2331                if (v) {
2332                    std::stringstream ss;
2333                    ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2334                       << queuedItem->getVBucketId() << " (rowid="
2335                       << v->getBySeqno() << ") returned 0 updates\n";
2336                    LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2337                } else {
2338                    LOG(EXTENSION_LOG_WARNING,
2339                        "Error persisting now missing ``%s'' from vb%d",
2340                        queuedItem->getKey().c_str(),
2341                        queuedItem->getVBucketId());
2342                }
2343
2344                vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2345                stats->decrDiskQueueSize(1);
2346            } else {
2347                std::stringstream ss;
2348                ss <<
2349                "Fatal error in persisting SET ``" <<
2350                queuedItem->getKey() << "'' on vb "
2351                   << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2352                LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2353                redirty();
2354            }
2355        }
2356    }
2357
2358    // This callback is invoked for deletions only.
2359    //
2360    // The boolean indicates whether the underlying storage
2361    // successfully deleted the item.
2362    void callback(int &value) {
2363        // > 1 would be bad.  We were only trying to delete one row.
2364        cb_assert(value < 2);
2365        // -1 means fail
2366        // 1 means we deleted one row
2367        // 0 means we did not delete a row, but did not fail (did not exist)
2368        if (value >= 0) {
2369            // We have succesfully removed an item from the disk, we
2370            // may now remove it from the hash table.
2371            int bucket_num(0);
2372            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2373                                                        &bucket_num);
2374            StoredValue *v = store->fetchValidValue(vbucket,
2375                                                    queuedItem->getKey(),
2376                                                    bucket_num, true, false);
2377            if (v && v->isDeleted()) {
2378                bool newCacheItem = v->isNewCacheItem();
2379                bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
2380                                                        bucket_num);
2381                cb_assert(deleted);
2382                if (newCacheItem && value > 0) {
2383                    // Need to decrement the item counter again for an item that
2384                    // exists on DB file, but not in memory (i.e., full eviction),
2385                    // because we created the temp item in memory and incremented
2386                    // the item counter when a deletion is pushed in the queue.
2387                    --vbucket->ht.numTotalItems;
2388                }
2389            }
2390
2391            if (value > 0) {
2392                ++stats->totalPersisted;
2393                ++vbucket->opsDelete;
2394            }
2395
2396            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2397            stats->decrDiskQueueSize(1);
2398        } else {
2399            std::stringstream ss;
2400            ss << "Fatal error in persisting DELETE ``" <<
2401            queuedItem->getKey() << "'' on vb "
2402               << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2403            LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2404            redirty();
2405        }
2406    }
2407
2408private:
2409
2410    void redirty() {
2411        if (store->vbMap.isBucketDeletion(vbucket->getId())) {
2412            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2413            stats->decrDiskQueueSize(1);
2414            return;
2415        }
2416        ++stats->flushFailed;
2417        store->invokeOnLockedStoredValue(queuedItem->getKey(),
2418                                         queuedItem->getVBucketId(),
2419                                         &StoredValue::reDirty);
2420        vbucket->rejectQueue.push(queuedItem);
2421    }
2422
2423    const queued_item queuedItem;
2424    RCPtr<VBucket> &vbucket;
2425    EventuallyPersistentStore *store;
2426    EPStats *stats;
2427    uint64_t cas;
2428    DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
2429};
2430
2431void EventuallyPersistentStore::flushOneDeleteAll() {
2432    for (size_t i = 0; i < vbMap.numShards; ++i) {
2433        KVShard* shard = vbMap.shards[i];
2434        LockHolder lh(shard->getWriteLock());
2435        shard->getRWUnderlying()->reset(i);
2436    }
2437
2438    bool inverse = true;
2439    diskFlushAll.compare_exchange_strong(inverse, false);
2440    stats.decrDiskQueueSize(1);
2441}
2442
2443int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
2444    KVShard *shard = vbMap.getShard(vbid);
2445    if (diskFlushAll) {
2446        if (shard->getId() == EP_PRIMARY_SHARD) {
2447            flushOneDeleteAll();
2448        } else {
2449            // disk flush is pending just return
2450            return 0;
2451        }
2452    }
2453
2454    if (vbMap.isBucketCreation(vbid)) {
2455        return RETRY_FLUSH_VBUCKET;
2456    }
2457
2458    int items_flushed = 0;
2459    bool schedule_vb_snapshot = false;
2460    rel_time_t flush_start = ep_current_time();
2461
2462    LockHolder lh(shard->getWriteLock());
2463    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2464    if (vb) {
2465        KVStatsCallback cb(this);
2466        std::vector<queued_item> items;
2467        KVStore *rwUnderlying = getRWUnderlying(vbid);
2468
2469        while (!vb->rejectQueue.empty()) {
2470            items.push_back(vb->rejectQueue.front());
2471            vb->rejectQueue.pop();
2472        }
2473
2474        vb->getBackfillItems(items);
2475        vb->checkpointManager.getAllItemsForPersistence(items);
2476
2477        if (!items.empty()) {
2478            while (!rwUnderlying->begin()) {
2479                ++stats.beginFailed;
2480                LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2481                    "Retry in 1 sec ...");
2482                sleep(1);
2483            }
2484            rwUnderlying->optimizeWrites(items);
2485
2486            Item *prev = NULL;
2487            std::list<PersistenceCallback*> pcbs;
2488            std::vector<queued_item>::iterator it = items.begin();
2489            for(; it != items.end(); ++it) {
2490                if ((*it)->getOperation() != queue_op_set &&
2491                    (*it)->getOperation() != queue_op_del) {
2492                    continue;
2493                } else if (!prev || prev->getKey() != (*it)->getKey()) {
2494                    prev = (*it).get();
2495                    ++items_flushed;
2496                    PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
2497                    if (cb) {
2498                        pcbs.push_back(cb);
2499                    }
2500                    ++stats.flusher_todo;
2501                } else {
2502                    stats.decrDiskQueueSize(1);
2503                    vb->doStatsForFlushing(*(*it), (*it)->size());
2504                }
2505            }
2506
2507            BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
2508                             stats.timingLog);
2509            hrtime_t start = gethrtime();
2510
2511            while (!rwUnderlying->commit(&cb)) {
2512                ++stats.commitFailed;
2513                LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
2514                    "1 sec...\n");
2515                sleep(1);
2516            }
2517
2518            while (!pcbs.empty()) {
2519                delete pcbs.front();
2520                pcbs.pop_front();
2521            }
2522
2523            ++stats.flusherCommits;
2524            hrtime_t end = gethrtime();
2525            uint64_t commit_time = (end - start) / 1000000;
2526            uint64_t trans_time = (end - flush_start) / 1000000;
2527
2528            lastTransTimePerItem = (items_flushed == 0) ? 0 :
2529                static_cast<double>(trans_time) /
2530                static_cast<double>(items_flushed);
2531            stats.commit_time.store(commit_time);
2532            stats.cumulativeCommitTime.fetch_add(commit_time);
2533            stats.cumulativeFlushTime.fetch_add(ep_current_time()
2534                                                - flush_start);
2535            stats.flusher_todo.store(0);
2536
2537        }
2538
2539        if (vb->rejectQueue.empty()) {
2540            vb->checkpointManager.itemsPersisted();
2541        }
2542
2543        uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2544        if (vb->rejectQueue.empty()) {
2545            vb->notifyCheckpointPersisted(engine, chkid, false);
2546        }
2547
2548        if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
2549            vbMap.setPersistenceCheckpointId(vbid, chkid);
2550            schedule_vb_snapshot = true;
2551        }
2552
2553        uint64_t seqno = vb->checkpointManager.getPersistenceCursorSeqno();
2554        if (vb->rejectQueue.empty()) {
2555            vb->notifyCheckpointPersisted(engine, seqno, true);
2556            if (seqno > 0 && seqno != vbMap.getPersistenceSeqno(vbid)) {
2557                vbMap.setPersistenceSeqno(vbid, seqno);
2558            }
2559        }
2560    }
2561
2562    if (schedule_vb_snapshot || snapshotVBState) {
2563        scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
2564                           shard->getId());
2565    }
2566
2567    return items_flushed;
2568}
2569
2570PersistenceCallback*
2571EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
2572                                            RCPtr<VBucket> &vb) {
2573
2574    if (!vb) {
2575        stats.decrDiskQueueSize(1);
2576        return NULL;
2577    }
2578
2579    int64_t bySeqno = qi->getBySeqno();
2580    bool deleted = qi->isDeleted();
2581    rel_time_t queued(qi->getQueuedTime());
2582
2583    int dirtyAge = ep_current_time() - queued;
2584    stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2585    stats.dirtyAge.store(dirtyAge);
2586    stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2587                                         stats.dirtyAgeHighWat.load()));
2588
2589    // Wait until the vbucket database is created by the vbucket state
2590    // snapshot task.
2591    if (vbMap.isBucketCreation(qi->getVBucketId()) ||
2592        vbMap.isBucketDeletion(qi->getVBucketId())) {
2593        vb->rejectQueue.push(qi);
2594        ++vb->opsReject;
2595        return NULL;
2596    }
2597
2598    KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2599    if (!deleted) {
2600        // TODO: Need to separate disk_insert from disk_update because
2601        // bySeqno doesn't give us that information.
2602        BlockTimer timer(bySeqno == -1 ?
2603                         &stats.diskInsertHisto : &stats.diskUpdateHisto,
2604                         bySeqno == -1 ? "disk_insert" : "disk_update",
2605                         stats.timingLog);
2606        PersistenceCallback *cb =
2607            new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
2608        rwUnderlying->set(*qi, *cb);
2609        return cb;
2610    } else {
2611        BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2612                         stats.timingLog);
2613        PersistenceCallback *cb =
2614            new PersistenceCallback(qi, vb, this, &stats, 0);
2615        rwUnderlying->del(*qi, *cb);
2616        return cb;
2617    }
2618}
2619
2620void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
2621                                           StoredValue* v,
2622                                           bool tapBackfill,
2623                                           bool notifyReplicator,
2624                                           bool genBySeqno) {
2625    if (vb) {
2626        queued_item qi(v->toItem(false, vb->getId()));
2627        bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2628                                vb->checkpointManager.queueDirty(vb, qi,
2629                                                                 genBySeqno);
2630        v->setBySeqno(qi->getBySeqno());
2631
2632        if (rv) {
2633            KVShard* shard = vbMap.getShard(vb->getId());
2634            shard->getFlusher()->notifyFlushEvent();
2635
2636        }
2637        if (!tapBackfill && notifyReplicator) {
2638            engine.getTapConnMap().notifyVBConnections(vb->getId());
2639            engine.getUprConnMap().notifyVBConnections(vb->getId(),
2640                                                       qi->getBySeqno());
2641        }
2642    }
2643}
2644
2645std::map<uint16_t, vbucket_state> EventuallyPersistentStore::loadVBucketState()
2646{
2647    return getOneROUnderlying()->listPersistedVbuckets();
2648}
2649
2650void EventuallyPersistentStore::warmupCompleted() {
2651    // Run the vbucket state snapshot job once after the warmup
2652    scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
2653
2654    if (engine.getConfiguration().getAlogPath().length() > 0) {
2655        size_t smin = engine.getConfiguration().getAlogSleepTime();
2656        setAccessScannerSleeptime(smin);
2657        Configuration &config = engine.getConfiguration();
2658        config.addValueChangedListener("alog_sleep_time",
2659                                       new EPStoreValueChangeListener(*this));
2660        config.addValueChangedListener("alog_task_time",
2661                                       new EPStoreValueChangeListener(*this));
2662    }
2663
2664    // "0" sleep_time means that the first snapshot task will be executed
2665    // right after warmup. Subsequent snapshot tasks will be scheduled every
2666    // 60 sec by default.
2667    ExecutorPool *iom = ExecutorPool::get();
2668    ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
2669    statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
2670}
2671
2672bool EventuallyPersistentStore::maybeEnableTraffic()
2673{
2674    // @todo rename.. skal vaere isTrafficDisabled elns
2675    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2676    double maxSize = static_cast<double>(stats.getMaxDataSize());
2677
2678    if (memoryUsed  >= stats.mem_low_wat) {
2679        LOG(EXTENSION_LOG_WARNING,
2680            "Total memory use reached to the low water mark, stop warmup");
2681        return true;
2682    } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
2683        LOG(EXTENSION_LOG_WARNING,
2684                "Enough MB of data loaded to enable traffic");
2685        return true;
2686    } else if (eviction_policy == VALUE_ONLY &&
2687               stats.warmedUpValues >=
2688                               (stats.warmedUpKeys * stats.warmupNumReadCap)) {
2689        // Let ep-engine think we're done with the warmup phase
2690        // (we should refactor this into "enableTraffic")
2691        LOG(EXTENSION_LOG_WARNING,
2692            "Enough number of items loaded to enable traffic");
2693        return true;
2694    }
2695    return false;
2696}
2697
2698bool EventuallyPersistentStore::isWarmingUp() {
2699    return !warmupTask->isComplete();
2700}
2701
2702void EventuallyPersistentStore::stopWarmup(void)
2703{
2704    // forcefully stop current warmup task
2705    if (isWarmingUp()) {
2706        LOG(EXTENSION_LOG_WARNING, "Stopping warmup while engine is loading "
2707            "data from underlying storage, shutdown = %s\n",
2708            stats.isShutdown ? "yes" : "no");
2709        warmupTask->stop();
2710    }
2711}
2712
2713void EventuallyPersistentStore::setExpiryPagerSleeptime(size_t val) {
2714    LockHolder lh(expiryPager.mutex);
2715
2716    if (expiryPager.sleeptime != 0) {
2717        ExecutorPool::get()->cancel(expiryPager.task);
2718    }
2719
2720    expiryPager.sleeptime = val;
2721    if (val != 0) {
2722        ExTask expTask = new ExpiredItemPager(&engine, stats,
2723                                                expiryPager.sleeptime);
2724        expiryPager.task = ExecutorPool::get()->schedule(expTask,
2725                                                        NONIO_TASK_IDX);
2726    }
2727}
2728
2729void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
2730    LockHolder lh(accessScanner.mutex);
2731
2732    if (accessScanner.sleeptime != 0) {
2733        ExecutorPool::get()->cancel(accessScanner.task);
2734    }
2735
2736    // store sleeptime in seconds
2737    accessScanner.sleeptime = val * 60;
2738    if (accessScanner.sleeptime != 0) {
2739        ExTask task = new AccessScanner(*this, stats,
2740                                        Priority::AccessScannerPriority,
2741                                        accessScanner.sleeptime);
2742        accessScanner.task = ExecutorPool::get()->schedule(task,
2743                                                           AUXIO_TASK_IDX);
2744
2745        struct timeval tv;
2746        gettimeofday(&tv, NULL);
2747        advance_tv(tv, accessScanner.sleeptime);
2748        stats.alogTime.store(tv.tv_sec);
2749    }
2750}
2751
2752void EventuallyPersistentStore::resetAccessScannerStartTime() {
2753    LockHolder lh(accessScanner.mutex);
2754
2755    if (accessScanner.sleeptime != 0) {
2756        ExecutorPool::get()->cancel(accessScanner.task);
2757        // re-schedule task according to the new task start hour
2758        ExTask task = new AccessScanner(*this, stats,
2759                                        Priority::AccessScannerPriority,
2760                                        accessScanner.sleeptime);
2761        accessScanner.task = ExecutorPool::get()->schedule(task,
2762                                                           AUXIO_TASK_IDX);
2763
2764        struct timeval tv;
2765        gettimeofday(&tv, NULL);
2766        advance_tv(tv, accessScanner.sleeptime);
2767        stats.alogTime.store(tv.tv_sec);
2768    }
2769}
2770
2771void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
2772{
2773    size_t maxSize = vbMap.getSize();
2774    cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
2775    for (size_t i = 0; i < maxSize; ++i) {
2776        uint16_t vbid = static_cast<uint16_t>(i);
2777        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2778        if (vb) {
2779            bool wantData = visitor.visitBucket(vb);
2780            // We could've lost this along the way.
2781            if (wantData) {
2782                vb->ht.visit(visitor);
2783            }
2784        }
2785    }
2786    visitor.complete();
2787}
2788
2789VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
2790                         shared_ptr<VBucketVisitor> v,
2791                         const char *l, const Priority &p, double sleep) :
2792    GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
2793    visitor(v), label(l), sleepTime(sleep), currentvb(0)
2794{
2795    const VBucketFilter &vbFilter = visitor->getVBucketFilter();
2796    size_t maxSize = store->vbMap.getSize();
2797    cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
2798    for (size_t i = 0; i < maxSize; ++i) {
2799        uint16_t vbid = static_cast<uint16_t>(i);
2800        RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
2801        if (vb && vbFilter(vbid)) {
2802            vbList.push(vbid);
2803        }
2804    }
2805}
2806
2807bool VBCBAdaptor::run(void) {
2808    if (!vbList.empty()) {
2809        currentvb = vbList.front();
2810        RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
2811        if (vb) {
2812            if (visitor->pauseVisitor()) {
2813                snooze(sleepTime);
2814                return true;
2815            }
2816            if (visitor->visitBucket(vb)) {
2817                vb->ht.visit(*visitor);
2818            }
2819        }
2820        vbList.pop();
2821    }
2822
2823    bool isdone = vbList.empty();
2824    if (isdone) {
2825        visitor->complete();
2826    }
2827    return !isdone;
2828}
2829
2830VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
2831                                       shared_ptr<VBucketVisitor> v,
2832                                       uint16_t sh, const char *l,
2833                                       double sleep, bool shutdown):
2834    GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
2835               0, shutdown),
2836    store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
2837    shardID(sh)
2838{
2839    const VBucketFilter &vbFilter = visitor->getVBucketFilter();
2840    std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
2841    cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
2842    std::vector<int>::iterator it;
2843    for (it = vbs.begin(); it != vbs.end(); ++it) {
2844        uint16_t vbid = static_cast<uint16_t>(*it);
2845        RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
2846        if (vb && vbFilter(vbid)) {
2847            vbList.push(vbid);
2848        }
2849    }
2850}
2851
2852bool VBucketVisitorTask::run() {
2853    if (!vbList.empty()) {
2854        currentvb = vbList.front();
2855        RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
2856        if (vb) {
2857            if (visitor->pauseVisitor()) {
2858                snooze(sleepTime);
2859                return true;
2860            }
2861            if (visitor->visitBucket(vb)) {
2862                vb->ht.visit(*visitor);
2863            }
2864        }
2865        vbList.pop();
2866    }
2867
2868    bool isDone = vbList.empty();
2869    if (isDone) {
2870        visitor->complete();
2871    }
2872    return !isDone;
2873}
2874
2875void EventuallyPersistentStore::resetUnderlyingStats(void)
2876{
2877    for (size_t i = 0; i < vbMap.numShards; i++) {
2878        KVShard *shard = vbMap.shards[i];
2879        shard->getRWUnderlying()->resetStats();
2880        shard->getROUnderlying()->resetStats();
2881    }
2882    auxUnderlying->resetStats();
2883}
2884
2885void EventuallyPersistentStore::addKVStoreStats(ADD_STAT add_stat,
2886                                                const void* cookie) {
2887    for (size_t i = 0; i < vbMap.numShards; i++) {
2888        std::stringstream rwPrefix;
2889        std::stringstream roPrefix;
2890        rwPrefix << "rw_" << i;
2891        roPrefix << "ro_" << i;
2892        vbMap.shards[i]->getRWUnderlying()->addStats(rwPrefix.str(), add_stat,
2893                                                     cookie);
2894        vbMap.shards[i]->getROUnderlying()->addStats(roPrefix.str(), add_stat,
2895                                                     cookie);
2896    }
2897}
2898
2899void EventuallyPersistentStore::addKVStoreTimingStats(ADD_STAT add_stat,
2900                                                      const void* cookie) {
2901    for (size_t i = 0; i < vbMap.numShards; i++) {
2902        std::stringstream rwPrefix;
2903        std::stringstream roPrefix;
2904        rwPrefix << "rw_" << i;
2905        roPrefix << "ro_" << i;
2906        vbMap.shards[i]->getRWUnderlying()->addTimingStats(rwPrefix.str(),
2907                                                           add_stat,
2908                                                           cookie);
2909        vbMap.shards[i]->getROUnderlying()->addTimingStats(roPrefix.str(),
2910                                                           add_stat,
2911                                                           cookie);
2912    }
2913}
2914
2915KVStore *EventuallyPersistentStore::getOneROUnderlying(void) {
2916    return vbMap.getShard(EP_PRIMARY_SHARD)->getROUnderlying();
2917}
2918
2919KVStore *EventuallyPersistentStore::getOneRWUnderlying(void) {
2920    return vbMap.getShard(EP_PRIMARY_SHARD)->getRWUnderlying();
2921}
2922
2923ENGINE_ERROR_CODE
2924EventuallyPersistentStore::rollback(uint16_t vbid,
2925                                    uint64_t rollbackSeqno,
2926                                    shared_ptr<RollbackCB> cb) {
2927    rollback_error_code err;
2928    err = vbMap.shards[vbid]->getROUnderlying()->
2929                              rollback(vbid, rollbackSeqno, cb);
2930
2931    if (err.first != ENGINE_FAILED) {
2932        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2933        vb->failovers->pruneEntries(err.second);
2934    }
2935    return err.first;
2936}
2937