xref: /3.0.3-GA/ep-engine/src/ep.cc (revision 4ef31d61)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string.h>
21#include <time.h>
22
23#include <fstream>
24#include <functional>
25#include <iostream>
26#include <map>
27#include <sstream>
28#include <string>
29#include <utility>
30#include <vector>
31
32#include "access_scanner.h"
33#include "checkpoint_remover.h"
34#include "conflict_resolution.h"
35#include "ep.h"
36#include "ep_engine.h"
37#include "failover-table.h"
38#include "flusher.h"
39#include "htresizer.h"
40#include "kvshard.h"
41#include "kvstore.h"
42#include "locks.h"
43#include "mutation_log.h"
44#include "warmup.h"
45#include "tapconnmap.h"
46#include "tapthrottle.h"
47
48class StatsValueChangeListener : public ValueChangedListener {
49public:
50    StatsValueChangeListener(EPStats &st) : stats(st) {
51        // EMPTY
52    }
53
54    virtual void sizeValueChanged(const std::string &key, size_t value) {
55        if (key.compare("max_size") == 0) {
56            stats.setMaxDataSize(value);
57            size_t low_wat = static_cast<size_t>
58                             (static_cast<double>(value) * 0.6);
59            size_t high_wat = static_cast<size_t>(
60                              static_cast<double>(value) * 0.75);
61            stats.mem_low_wat.store(low_wat);
62            stats.mem_high_wat.store(high_wat);
63        } else if (key.compare("mem_low_wat") == 0) {
64            stats.mem_low_wat.store(value);
65        } else if (key.compare("mem_high_wat") == 0) {
66            stats.mem_high_wat.store(value);
67        } else if (key.compare("tap_throttle_threshold") == 0) {
68            stats.tapThrottleThreshold.store(
69                                          static_cast<double>(value) / 100.0);
70        } else if (key.compare("warmup_min_memory_threshold") == 0) {
71            stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
72        } else if (key.compare("warmup_min_items_threshold") == 0) {
73            stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
74        } else {
75            LOG(EXTENSION_LOG_WARNING,
76                "Failed to change value for unknown variable, %s\n",
77                key.c_str());
78        }
79    }
80
81private:
82    EPStats &stats;
83};
84
85/**
86 * A configuration value changed listener that responds to ep-engine
87 * parameter changes by invoking engine-specific methods on
88 * configuration change events.
89 */
90class EPStoreValueChangeListener : public ValueChangedListener {
91public:
92    EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
93    }
94
95    virtual void sizeValueChanged(const std::string &key, size_t value) {
96        if (key.compare("bg_fetch_delay") == 0) {
97            store.setBGFetchDelay(static_cast<uint32_t>(value));
98        } else if (key.compare("exp_pager_stime") == 0) {
99            store.setExpiryPagerSleeptime(value);
100        } else if (key.compare("alog_sleep_time") == 0) {
101            store.setAccessScannerSleeptime(value);
102        } else if (key.compare("alog_task_time") == 0) {
103            store.resetAccessScannerStartTime();
104        } else if (key.compare("mutation_mem_threshold") == 0) {
105            double mem_threshold = static_cast<double>(value) / 100;
106            StoredValue::setMutationMemoryThreshold(mem_threshold);
107        } else if (key.compare("tap_throttle_queue_cap") == 0) {
108            store.getEPEngine().getTapThrottle().setQueueCap(value);
109        } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
110            store.getEPEngine().getTapThrottle().setCapPercent(value);
111        } else {
112            LOG(EXTENSION_LOG_WARNING,
113                "Failed to change value for unknown variable, %s\n",
114                key.c_str());
115        }
116    }
117
118private:
119    EventuallyPersistentStore &store;
120};
121
122class VBucketMemoryDeletionTask : public GlobalTask {
123public:
124    VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
125                              RCPtr<VBucket> &vb, double delay) :
126                              GlobalTask(&eng,
127                              Priority::VBMemoryDeletionPriority, delay,false),
128                              e(eng), vbucket(vb), vbid(vb->getId()) { }
129
130    std::string getDescription() {
131        std::stringstream ss;
132        ss << "Removing (dead) vbucket " << vbid << " from memory";
133        return ss.str();
134    }
135
136    bool run(void) {
137        vbucket->notifyAllPendingConnsFailed(e);
138        vbucket->ht.clear();
139        vbucket.reset();
140        return false;
141    }
142
143private:
144    EventuallyPersistentEngine &e;
145    RCPtr<VBucket> vbucket;
146    uint16_t vbid;
147};
148
149class PendingOpsNotification : public GlobalTask {
150public:
151    PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
152        GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
153        engine(e), vbucket(vb) { }
154
155    std::string getDescription() {
156        std::stringstream ss;
157        ss << "Notify pending operations for vbucket " << vbucket->getId();
158        return ss.str();
159    }
160
161    bool run(void) {
162        vbucket->fireAllOps(engine);
163        return false;
164    }
165
166private:
167    EventuallyPersistentEngine &engine;
168    RCPtr<VBucket> vbucket;
169};
170
171EventuallyPersistentStore::EventuallyPersistentStore(
172    EventuallyPersistentEngine &theEngine) :
173    engine(theEngine), stats(engine.getEpStats()),
174    vbMap(theEngine.getConfiguration(), *this),
175    bgFetchQueue(0),
176    diskFlushAll(false), bgFetchDelay(0), statsSnapshotTaskId(0),
177    lastTransTimePerItem(0),snapshotVBState(false)
178{
179    cachedResidentRatio.activeRatio.store(0);
180    cachedResidentRatio.replicaRatio.store(0);
181
182    Configuration &config = engine.getConfiguration();
183    MutationLog *shardlog;
184    for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
185        std::stringstream s;
186        s << i;
187        shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
188                                 "." + s.str(),
189                                 engine.getConfiguration().getAlogBlockSize());
190        accessLog.push_back(shardlog);
191    }
192
193    storageProperties = new StorageProperties(true, true, true, true);
194
195    ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
196
197    auxUnderlying = KVStoreFactory::create(stats, config, true);
198    cb_assert(auxUnderlying);
199
200    stats.memOverhead = sizeof(EventuallyPersistentStore);
201
202    if (config.getConflictResolutionType().compare("seqno") == 0) {
203        conflictResolver = new SeqBasedResolution();
204    }
205
206    stats.setMaxDataSize(config.getMaxSize());
207    config.addValueChangedListener("max_size",
208                                   new StatsValueChangeListener(stats));
209
210    stats.mem_low_wat.store(config.getMemLowWat());
211    config.addValueChangedListener("mem_low_wat",
212                                   new StatsValueChangeListener(stats));
213
214    stats.mem_high_wat.store(config.getMemHighWat());
215    config.addValueChangedListener("mem_high_wat",
216                                   new StatsValueChangeListener(stats));
217
218    stats.tapThrottleThreshold.store(static_cast<double>
219                                    (config.getTapThrottleThreshold())
220                                     / 100.0);
221    config.addValueChangedListener("tap_throttle_threshold",
222                                   new StatsValueChangeListener(stats));
223
224    stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
225    config.addValueChangedListener("tap_throttle_queue_cap",
226                                   new EPStoreValueChangeListener(*this));
227    config.addValueChangedListener("tap_throttle_cap_pcnt",
228                                   new EPStoreValueChangeListener(*this));
229
230    setBGFetchDelay(config.getBgFetchDelay());
231    config.addValueChangedListener("bg_fetch_delay",
232                                   new EPStoreValueChangeListener(*this));
233
234    stats.warmupMemUsedCap.store(static_cast<double>
235                               (config.getWarmupMinMemoryThreshold()) / 100.0);
236    config.addValueChangedListener("warmup_min_memory_threshold",
237                                   new StatsValueChangeListener(stats));
238    stats.warmupNumReadCap.store(static_cast<double>
239                                (config.getWarmupMinItemsThreshold()) / 100.0);
240    config.addValueChangedListener("warmup_min_items_threshold",
241                                   new StatsValueChangeListener(stats));
242
243    double mem_threshold = static_cast<double>
244                                      (config.getMutationMemThreshold()) / 100;
245    StoredValue::setMutationMemoryThreshold(mem_threshold);
246    config.addValueChangedListener("mutation_mem_threshold",
247                                   new EPStoreValueChangeListener(*this));
248
249    const std::string &policy = config.getItemEvictionPolicy();
250    if (policy.compare("value_only") == 0) {
251        eviction_policy = VALUE_ONLY;
252    } else {
253        eviction_policy = FULL_EVICTION;
254    }
255
256    // @todo - Ideally we should run the warmup thread in it's own
257    //         thread so that it won't block the flusher (in the write
258    //         thread), but we can't put it in the RO dispatcher either,
259    //         because that would block the background fetches..
260    warmupTask = new Warmup(this);
261}
262
263class WarmupWaitListener : public WarmupStateListener {
264public:
265    WarmupWaitListener(Warmup &f, bool wfw) :
266        warmup(f), waitForWarmup(wfw) { }
267
268    virtual void stateChanged(const int, const int to) {
269        if (waitForWarmup) {
270            if (to == WarmupState::Done) {
271                LockHolder lh(syncobject);
272                syncobject.notify();
273            }
274        } else if (to != WarmupState::Initialize) {
275            LockHolder lh(syncobject);
276            syncobject.notify();
277        }
278    }
279
280    void wait() {
281        LockHolder lh(syncobject);
282        // Verify that we're not already reached the state...
283        int currstate = warmup.getState().getState();
284
285        if (waitForWarmup) {
286            if (currstate == WarmupState::Done) {
287                return;
288            }
289        } else if (currstate != WarmupState::Initialize) {
290            return ;
291        }
292
293        syncobject.wait();
294    }
295
296private:
297    Warmup &warmup;
298    bool waitForWarmup;
299    SyncObject syncobject;
300};
301
302bool EventuallyPersistentStore::initialize() {
303    // We should nuke everything unless we want warmup
304    Configuration &config = engine.getConfiguration();
305    if (!config.isWarmup()) {
306        reset();
307    }
308
309    if (!startFlusher()) {
310        LOG(EXTENSION_LOG_WARNING,
311            "FATAL: Failed to create and start flushers");
312        return false;
313    }
314    if (!startBgFetcher()) {
315        LOG(EXTENSION_LOG_WARNING,
316           "FATAL: Failed to create and start bgfetchers");
317        return false;
318    }
319
320    WarmupWaitListener warmupListener(*warmupTask, config.isWaitforwarmup());
321    warmupTask->addWarmupStateListener(&warmupListener);
322    warmupTask->start();
323    warmupListener.wait();
324    warmupTask->removeWarmupStateListener(&warmupListener);
325
326    if (config.isVb0() && !vbMap.getBucket(0)) {
327        setVBucketState(0, vbucket_state_active, false);
328    }
329
330    if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
331        LOG(EXTENSION_LOG_WARNING,
332            "Warmup failed to load %d records due to OOM, exiting.\n",
333            static_cast<unsigned int>(stats.warmOOM));
334        return false;
335    }
336
337    ExTask itmpTask = new ItemPager(&engine, stats);
338    ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
339
340    size_t expiryPagerSleeptime = config.getExpPagerStime();
341    setExpiryPagerSleeptime(expiryPagerSleeptime);
342    config.addValueChangedListener("exp_pager_stime",
343                                   new EPStoreValueChangeListener(*this));
344
345    ExTask htrTask = new HashtableResizerTask(this, 10);
346    ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
347
348    size_t checkpointRemoverInterval = config.getChkRemoverStime();
349    ExTask chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
350                                                    checkpointRemoverInterval);
351    ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
352    return true;
353}
354
355EventuallyPersistentStore::~EventuallyPersistentStore() {
356    stopWarmup();
357    stopBgFetcher();
358    ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
359
360    ExecutorPool::get()->cancel(statsSnapshotTaskId);
361    ExecutorPool::get()->cancel(accessScanner.task);
362
363    stopFlusher();
364    ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
365
366    engine.getUprConnMap().closeAllStreams();
367
368    delete conflictResolver;
369    delete warmupTask;
370    delete auxUnderlying;
371    delete storageProperties;
372
373    std::vector<MutationLog*>::iterator it;
374    for (it = accessLog.begin(); it != accessLog.end(); it++) {
375        delete *it;
376    }
377}
378
379const Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
380    return vbMap.getShard(shardId)->getFlusher();
381}
382
383Warmup* EventuallyPersistentStore::getWarmup(void) const {
384    return warmupTask;
385}
386
387bool EventuallyPersistentStore::startFlusher() {
388    for (uint16_t i = 0; i < vbMap.numShards; ++i) {
389        Flusher *flusher = vbMap.shards[i]->getFlusher();
390        flusher->start();
391    }
392    return true;
393}
394
395void EventuallyPersistentStore::stopFlusher() {
396    for (uint16_t i = 0; i < vbMap.numShards; i++) {
397        Flusher *flusher = vbMap.shards[i]->getFlusher();
398        bool rv = flusher->stop(stats.forceShutdown);
399        if (rv && !stats.forceShutdown) {
400            flusher->wait();
401        }
402    }
403}
404
405bool EventuallyPersistentStore::pauseFlusher() {
406    bool rv = true;
407    for (uint16_t i = 0; i < vbMap.numShards; i++) {
408        Flusher *flusher = vbMap.shards[i]->getFlusher();
409        if (!flusher->pause()) {
410            LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
411                "[%s], shard = %d", flusher->stateName(), i);
412            rv = false;
413        }
414    }
415    return rv;
416}
417
418bool EventuallyPersistentStore::resumeFlusher() {
419    bool rv = true;
420    for (uint16_t i = 0; i < vbMap.numShards; i++) {
421        Flusher *flusher = vbMap.shards[i]->getFlusher();
422        if (!flusher->resume()) {
423            LOG(EXTENSION_LOG_WARNING,
424                    "Warning: attempted to resume flusher in state [%s], "
425                    "shard = %d", flusher->stateName(), i);
426            rv = false;
427        }
428    }
429    return rv;
430}
431
432void EventuallyPersistentStore::wakeUpFlusher() {
433    if (stats.diskQueueSize.load() == 0) {
434        for (uint16_t i = 0; i < vbMap.numShards; i++) {
435            Flusher *flusher = vbMap.shards[i]->getFlusher();
436            flusher->wake();
437        }
438    }
439}
440
441bool EventuallyPersistentStore::startBgFetcher() {
442    for (uint16_t i = 0; i < vbMap.numShards; i++) {
443        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
444        if (bgfetcher == NULL) {
445            LOG(EXTENSION_LOG_WARNING,
446                "Falied to start bg fetcher for shard %d", i);
447            return false;
448        }
449        bgfetcher->start();
450    }
451    return true;
452}
453
454void EventuallyPersistentStore::stopBgFetcher() {
455    for (uint16_t i = 0; i < vbMap.numShards; i++) {
456        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
457        if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
458            LOG(EXTENSION_LOG_WARNING,
459                "Shutting down engine while there are still pending data "
460                "read for shard %d from database storage", i);
461        }
462        LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
463        bgfetcher->stop();
464    }
465}
466
467RCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
468                                                vbucket_state_t wanted_state) {
469    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
470    vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
471    if (found_state == wanted_state) {
472        return vb;
473    } else {
474        RCPtr<VBucket> rv;
475        return rv;
476    }
477}
478
479void
480EventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
481                                             time_t startTime,
482                                             uint64_t revSeqno) {
483    RCPtr<VBucket> vb = getVBucket(vbid);
484    if (vb) {
485        int bucket_num(0);
486        incExpirationStat(vb);
487        LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
488        StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
489        if (v) {
490            if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
491                // This is a temporary item whose background fetch for metadata
492                // has completed.
493                bool deleted = vb->ht.unlocked_del(key, bucket_num);
494                cb_assert(deleted);
495            } else if (v->isExpired(startTime) && !v->isDeleted()) {
496                vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
497                queueDirty(vb, v, false);
498                lh.unlock();
499            }
500        } else {
501            if (eviction_policy == FULL_EVICTION) {
502                // Create a temp item and delete and push it
503                // into the checkpoint queue.
504                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
505                                                            eviction_policy);
506                if (rv == ADD_NOMEM) {
507                    return;
508                }
509                v = vb->ht.unlocked_find(key, bucket_num, true, false);
510                v->setStoredValueState(StoredValue::state_deleted_key);
511                v->setRevSeqno(revSeqno);
512                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
513                queueDirty(vb, v, false);
514                lh.unlock();
515            }
516        }
517    }
518}
519
520void
521EventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
522                                                        std::string> > &keys) {
523    std::list<std::pair<uint16_t, std::string> >::iterator it;
524    time_t startTime = ep_real_time();
525    for (it = keys.begin(); it != keys.end(); it++) {
526        deleteExpiredItem(it->first, it->second, startTime, 0);
527    }
528}
529
530StoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
531                                                        const std::string &key,
532                                                        int bucket_num,
533                                                        bool wantDeleted,
534                                                        bool trackReference,
535                                                        bool queueExpired) {
536    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
537                                          trackReference);
538    if (v && !v->isDeleted() && !v->isTempItem()) {
539        // In the deleted case, we ignore expiration time.
540        if (v->isExpired(ep_real_time())) {
541            if (vb->getState() != vbucket_state_active) {
542                return wantDeleted ? v : NULL;
543            }
544            if (queueExpired) {
545                incExpirationStat(vb, false);
546                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
547                queueDirty(vb, v, false, false);
548            }
549            return wantDeleted ? v : NULL;
550        }
551    }
552    return v;
553}
554
555protocol_binary_response_status EventuallyPersistentStore::evictKey(
556                                                        const std::string &key,
557                                                        uint16_t vbucket,
558                                                        const char **msg,
559                                                        size_t *msg_size,
560                                                        bool force) {
561    RCPtr<VBucket> vb = getVBucket(vbucket);
562    if (!vb || (vb->getState() != vbucket_state_active && !force)) {
563        return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
564    }
565
566    int bucket_num(0);
567    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
568    StoredValue *v = fetchValidValue(vb, key, bucket_num, force, false);
569
570    protocol_binary_response_status rv(PROTOCOL_BINARY_RESPONSE_SUCCESS);
571
572    *msg_size = 0;
573    if (v) {
574        if (force)  {
575            v->markClean();
576        }
577        if (v->isResident()) {
578            if (vb->ht.unlocked_ejectItem(v, eviction_policy)) {
579                *msg = "Ejected.";
580            } else {
581                *msg = "Can't eject: Dirty object.";
582                rv = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
583            }
584        } else {
585            *msg = "Already ejected.";
586        }
587    } else {
588        if (eviction_policy == VALUE_ONLY) {
589            *msg = "Not found.";
590            rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
591        } else {
592            *msg = "Already ejected.";
593        }
594    }
595
596    return rv;
597}
598
599ENGINE_ERROR_CODE EventuallyPersistentStore::addTempItemForBgFetch(
600                                                        LockHolder &lock,
601                                                        int bucket_num,
602                                                        const std::string &key,
603                                                        RCPtr<VBucket> &vb,
604                                                        const void *cookie,
605                                                        bool metadataOnly) {
606
607    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
608                                                eviction_policy);
609    switch(rv) {
610        case ADD_NOMEM:
611            return ENGINE_ENOMEM;
612        case ADD_EXISTS:
613        case ADD_UNDEL:
614        case ADD_SUCCESS:
615        case ADD_TMP_AND_BG_FETCH:
616            // Since the hashtable bucket is locked, we shouldn't get here
617            abort();
618        case ADD_BG_FETCH:
619            lock.unlock();
620            bgFetch(key, vb->getId(), -1, cookie, metadataOnly);
621    }
622    return ENGINE_EWOULDBLOCK;
623}
624
625ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
626                                                 const void *cookie,
627                                                 bool force,
628                                                 uint8_t nru) {
629
630    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
631    if (!vb || vb->getState() == vbucket_state_dead) {
632        ++stats.numNotMyVBuckets;
633        return ENGINE_NOT_MY_VBUCKET;
634    } else if (vb->getState() == vbucket_state_replica && !force) {
635        ++stats.numNotMyVBuckets;
636        return ENGINE_NOT_MY_VBUCKET;
637    } else if (vb->getState() == vbucket_state_pending && !force) {
638        if (vb->addPendingOp(cookie)) {
639            return ENGINE_EWOULDBLOCK;
640        }
641    }
642
643    bool cas_op = (itm.getCas() != 0);
644    int bucket_num(0);
645    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
646    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
647                                          false);
648    if (v && v->isLocked(ep_current_time()) &&
649        (vb->getState() == vbucket_state_replica ||
650         vb->getState() == vbucket_state_pending)) {
651        v->unlock();
652    }
653    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, itm.getCas(),
654                                                true, false,
655                                                eviction_policy, nru);
656
657    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
658    switch (mtype) {
659    case NOMEM:
660        ret = ENGINE_ENOMEM;
661        break;
662    case INVALID_CAS:
663    case IS_LOCKED:
664        ret = ENGINE_KEY_EEXISTS;
665        break;
666    case NOT_FOUND:
667        if (cas_op) {
668            ret = ENGINE_KEY_ENOENT;
669            break;
670        }
671        // FALLTHROUGH
672    case WAS_DIRTY:
673        // Even if the item was dirty, push it into the vbucket's open
674        // checkpoint.
675    case WAS_CLEAN:
676        queueDirty(vb, v);
677        break;
678    case NEED_BG_FETCH:
679        { // CAS operation with non-resident item + full eviction.
680            if (v) {
681                // temp item is already created. Simply schedule a bg fetch job
682                lh.unlock();
683                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
684                return ENGINE_EWOULDBLOCK;
685            }
686            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
687                                        cookie, true);
688            break;
689        }
690    case INVALID_VBUCKET:
691        ret = ENGINE_NOT_MY_VBUCKET;
692        break;
693    }
694
695    return ret;
696}
697
698ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
699                                                 const void *cookie)
700{
701    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
702    if (!vb || vb->getState() == vbucket_state_dead ||
703        vb->getState() == vbucket_state_replica) {
704        ++stats.numNotMyVBuckets;
705        return ENGINE_NOT_MY_VBUCKET;
706    } else if(vb->getState() == vbucket_state_pending) {
707        if (vb->addPendingOp(cookie)) {
708            return ENGINE_EWOULDBLOCK;
709        }
710    }
711
712    if (itm.getCas() != 0) {
713        // Adding with a cas value doesn't make sense..
714        return ENGINE_NOT_STORED;
715    }
716
717    int bucket_num(0);
718    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
719    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
720                                          false);
721    add_type_t atype = vb->ht.unlocked_add(bucket_num, v, itm,
722                                           eviction_policy);
723
724    switch (atype) {
725    case ADD_NOMEM:
726        return ENGINE_ENOMEM;
727    case ADD_EXISTS:
728        return ENGINE_NOT_STORED;
729    case ADD_TMP_AND_BG_FETCH:
730        return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
731                                     cookie, true);
732    case ADD_BG_FETCH:
733        lh.unlock();
734        bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
735        return ENGINE_EWOULDBLOCK;
736    case ADD_SUCCESS:
737    case ADD_UNDEL:
738        queueDirty(vb, v);
739        break;
740    }
741    return ENGINE_SUCCESS;
742}
743
744ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
745                                                                uint8_t nru,
746                                                                bool genBySeqno) {
747
748    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
749    if (!vb ||
750        vb->getState() == vbucket_state_dead ||
751        vb->getState() == vbucket_state_active) {
752        ++stats.numNotMyVBuckets;
753        return ENGINE_NOT_MY_VBUCKET;
754    }
755
756    int bucket_num(0);
757    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
758    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
759                                          false);
760
761    // Note that this function is only called on replica or pending vbuckets.
762    if (v && v->isLocked(ep_current_time())) {
763        v->unlock();
764    }
765    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, 0, true, true,
766                                                eviction_policy, nru);
767
768    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
769    switch (mtype) {
770    case NOMEM:
771        ret = ENGINE_ENOMEM;
772        break;
773    case INVALID_CAS:
774    case IS_LOCKED:
775        ret = ENGINE_KEY_EEXISTS;
776        break;
777    case WAS_DIRTY:
778        // If a given backfill item is already dirty, don't queue it again.
779        break;
780    case NOT_FOUND:
781        // FALLTHROUGH
782    case WAS_CLEAN:
783        queueDirty(vb, v, true, true, genBySeqno);
784        break;
785    case INVALID_VBUCKET:
786        ret = ENGINE_NOT_MY_VBUCKET;
787        break;
788    case NEED_BG_FETCH:
789        // SET on a non-active vbucket should not require a bg_metadata_fetch.
790        abort();
791    }
792
793    return ret;
794}
795
796class KVStatsCallback : public Callback<kvstats_ctx> {
797    public:
798        KVStatsCallback(EventuallyPersistentStore *store)
799            : epstore(store) { }
800
801        void callback(kvstats_ctx &ctx) {
802            RCPtr<VBucket> vb = epstore->getVBucket(ctx.vbucket);
803            if (vb) {
804                vb->fileSpaceUsed = ctx.fileSpaceUsed;
805                vb->fileSize = ctx.fileSize;
806            }
807        }
808
809    private:
810        EventuallyPersistentStore *epstore;
811};
812
813void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority,
814                                                 uint16_t shardId) {
815
816    class VBucketStateVisitor : public VBucketVisitor {
817    public:
818        VBucketStateVisitor(VBucketMap &vb_map, uint16_t sid)
819            : vbuckets(vb_map), shardId(sid) { }
820        bool visitBucket(RCPtr<VBucket> &vb) {
821            if (vbuckets.getShard(vb->getId())->getId() == shardId) {
822                vbucket_state vb_state;
823                vb_state.state = vb->getState();
824                vb_state.checkpointId = vbuckets.getPersistenceCheckpointId(
825                                        vb->getId());
826                vb_state.maxDeletedSeqno = 0;
827                vb_state.failovers = vb->failovers->toJSON();
828                states[vb->getId()] = vb_state;
829            }
830            return false;
831        }
832
833        void visit(StoredValue*) {
834            cb_assert(false); // this does not happen
835        }
836
837        std::map<uint16_t, vbucket_state> states;
838
839    private:
840        VBucketMap &vbuckets;
841        uint16_t shardId;
842    };
843
844    KVShard *shard = vbMap.shards[shardId];
845    if (priority == Priority::VBucketPersistLowPriority) {
846        shard->setLowPriorityVbSnapshotFlag(false);
847    } else {
848        shard->setHighPriorityVbSnapshotFlag(false);
849    }
850
851    KVStatsCallback kvcb(this);
852    VBucketStateVisitor v(vbMap, shard->getId());
853    visit(v);
854    hrtime_t start = gethrtime();
855    LockHolder lh(shard->getWriteLock());
856    KVStore *rwUnderlying = shard->getRWUnderlying();
857    if (!rwUnderlying->snapshotVBuckets(v.states, &kvcb)) {
858        LOG(EXTENSION_LOG_WARNING,
859            "VBucket snapshot task failed!!! Rescheduling");
860        scheduleVBSnapshot(priority, shard->getId());
861    } else {
862        stats.snapshotVbucketHisto.add((gethrtime() - start) / 1000);
863    }
864
865    if (priority == Priority::VBucketPersistHighPriority) {
866        std::map<uint16_t, vbucket_state>::iterator it = v.states.begin();
867        for (; it != v.states.end(); ++it) {
868            vbMap.setBucketCreation(it->first, false);
869        }
870    }
871}
872
873ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
874                                                           vbucket_state_t to,
875                                                           bool transfer,
876                                                           bool notify_upr) {
877    // Lock to prevent a race condition between a failed update and add.
878    LockHolder lh(vbsetMutex);
879    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
880    if (vb && to == vb->getState()) {
881        return ENGINE_SUCCESS;
882    }
883
884    uint16_t shardId = vbMap.getShard(vbid)->getId();
885    if (vb) {
886        vbucket_state_t oldstate = vb->getState();
887        if (oldstate != to && notify_upr) {
888            engine.getUprConnMap().vbucketStateChanged(vbid, to);
889        }
890
891        vb->setState(to, engine.getServerApi());
892        if (to == vbucket_state_active && !transfer) {
893            vb->failovers->createEntry(vb->getHighSeqno());
894        }
895        lh.unlock();
896        if (oldstate == vbucket_state_pending &&
897            to == vbucket_state_active) {
898            ExTask notifyTask = new PendingOpsNotification(engine, vb);
899            ExecutorPool::get()->schedule(notifyTask, NONIO_TASK_IDX);
900        }
901        scheduleVBSnapshot(Priority::VBucketPersistLowPriority, shardId);
902    } else {
903        FailoverTable* ft = new FailoverTable(engine.getMaxFailoverEntries());
904        RCPtr<VBucket> newvb(new VBucket(vbid, to, stats,
905                                         engine.getCheckpointConfig(),
906                                         vbMap.getShard(vbid), 0, ft));
907        // The first checkpoint for active vbucket should start with id 2.
908        uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0;
909        newvb->checkpointManager.setOpenCheckpointId(start_chk_id);
910        if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
911            lh.unlock();
912            return ENGINE_ERANGE;
913        }
914        vbMap.setPersistenceCheckpointId(vbid, 0);
915        vbMap.setPersistenceSeqno(vbid, 0);
916        vbMap.setBucketCreation(vbid, true);
917        lh.unlock();
918        scheduleVBSnapshot(Priority::VBucketPersistHighPriority, shardId);
919    }
920    return ENGINE_SUCCESS;
921}
922
923void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
924    snapshotVBState = false;
925    KVShard *shard = NULL;
926    if (p == Priority::VBucketPersistHighPriority) {
927        for (size_t i = 0; i < vbMap.numShards; ++i) {
928            shard = vbMap.shards[i];
929            if (shard->setHighPriorityVbSnapshotFlag(true)) {
930                ExTask task = new VBSnapshotTask(&engine, p, i, false);
931                ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
932            }
933        }
934    } else {
935        for (size_t i = 0; i < vbMap.numShards; ++i) {
936            shard = vbMap.shards[i];
937            if (shard->setLowPriorityVbSnapshotFlag(true)) {
938                ExTask task = new VBSnapshotTask(&engine, p, i, false);
939                ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
940            }
941        }
942    }
943}
944
945void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p,
946                                                   uint16_t shardId,
947                                                   bool force) {
948    snapshotVBState = false;
949    KVShard *shard = vbMap.shards[shardId];
950    if (p == Priority::VBucketPersistHighPriority) {
951        if (force || shard->setHighPriorityVbSnapshotFlag(true)) {
952            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
953            ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
954        }
955    } else {
956        if (force || shard->setLowPriorityVbSnapshotFlag(true)) {
957            ExTask task = new VBSnapshotTask(&engine, p, shardId, false);
958            ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
959        }
960    }
961}
962
963bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
964                                                        const void* cookie,
965                                                        bool recreate) {
966    LockHolder lh(vbsetMutex);
967
968    hrtime_t start_time(gethrtime());
969    bool success = true;
970    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
971    if (!vb || vb->getState() == vbucket_state_dead ||
972         vbMap.isBucketDeletion(vbid)) {
973        lh.unlock();
974        uint16_t sid = vbMap.getShard(vbid)->getId();
975        KVShard *shard = vbMap.shards[sid];
976        LockHolder ls(shard->getWriteLock());
977        KVStore *rwUnderlying = getRWUnderlying(vbid);
978        if (rwUnderlying->delVBucket(vbid, recreate)) {
979            vbMap.setBucketDeletion(vbid, false);
980            vbMap.setPersistenceSeqno(vbid, 0);
981            ++stats.vbucketDeletions;
982        } else {
983            ++stats.vbucketDeletionFail;
984            success = false;
985        }
986    }
987
988    if (success) {
989        hrtime_t spent(gethrtime() - start_time);
990        hrtime_t wall_time = spent / 1000;
991        BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
992        stats.diskVBDelHisto.add(wall_time);
993        atomic_setIfBigger(stats.vbucketDelMaxWalltime, wall_time);
994        stats.vbucketDelTotWalltime.fetch_add(wall_time);
995        if (cookie) {
996            engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
997        }
998        return true;
999    }
1000
1001    return false;
1002}
1003
1004void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
1005                                                   const void* cookie,
1006                                                   double delay,
1007                                                   bool recreate) {
1008    ExTask delTask = new VBucketMemoryDeletionTask(engine, vb, delay);
1009    ExecutorPool::get()->schedule(delTask, NONIO_TASK_IDX);
1010
1011    uint16_t vbid = vb->getId();
1012    if (vbMap.setBucketDeletion(vbid, true)) {
1013        ExTask task = new VBDeleteTask(&engine, vbid, cookie,
1014                                       Priority::VBucketDeletionPriority,
1015                                       vbMap.getShard(vbid)->getId(),
1016                                       recreate, delay);
1017        ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1018    }
1019}
1020
1021ENGINE_ERROR_CODE EventuallyPersistentStore::deleteVBucket(uint16_t vbid,
1022                                                           const void* c) {
1023    // Lock to prevent a race condition between a failed update and add
1024    // (and delete).
1025    LockHolder lh(vbsetMutex);
1026
1027    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1028    if (!vb) {
1029        return ENGINE_NOT_MY_VBUCKET;
1030    }
1031
1032    engine.getUprConnMap().vbucketStateChanged(vbid, vbucket_state_dead);
1033    vbMap.removeBucket(vbid);
1034    lh.unlock();
1035    scheduleVBDeletion(vb, c);
1036    scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
1037                       vbMap.getShard(vbid)->getId(), true);
1038    if (c) {
1039        return ENGINE_EWOULDBLOCK;
1040    }
1041    return ENGINE_SUCCESS;
1042}
1043
1044ENGINE_ERROR_CODE EventuallyPersistentStore::compactDB(uint16_t vbid,
1045                                                       compaction_ctx c,
1046                                                       const void *cookie) {
1047    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1048    if (!vb) {
1049        return ENGINE_NOT_MY_VBUCKET;
1050    }
1051
1052    ExTask task = new CompactVBucketTask(&engine, Priority::CompactorPriority,
1053                                         vbid, c, cookie,
1054                                         vbMap.getShard(vbid)->getId());
1055
1056    ExecutorPool::get()->schedule(task, WRITER_TASK_IDX);
1057
1058    LOG(EXTENSION_LOG_DEBUG, "Scheduled compaction task %d on vbucket %d,"
1059        "purge_before_ts = %lld, purge_before_seq = %lld, dropdeletes = %d",
1060        task->getId(), vbid, c.purge_before_ts,
1061        c.purge_before_seq, c.drop_deletes);
1062
1063   return ENGINE_EWOULDBLOCK;
1064}
1065
1066class ExpiredItemsCallback : public Callback<compaction_ctx> {
1067    public:
1068        ExpiredItemsCallback(EventuallyPersistentStore *store, uint16_t vbid)
1069            : epstore(store), vbucket(vbid) { }
1070
1071        void callback(compaction_ctx &ctx) {
1072            std::list<expiredItemCtx>::iterator it;
1073            for (it  = ctx.expiredItems.begin();
1074                 it != ctx.expiredItems.end(); it++) {
1075                epstore->deleteExpiredItem(vbucket, it->keyStr,
1076                                           ctx.curr_time,
1077                                           it->revSeqno);
1078            }
1079        }
1080
1081    private:
1082        EventuallyPersistentStore *epstore;
1083        uint16_t vbucket;
1084};
1085
1086bool EventuallyPersistentStore::compactVBucket(const uint16_t vbid,
1087                                               compaction_ctx *ctx,
1088                                               const void *cookie) {
1089    KVShard *shard = vbMap.getShard(vbid);
1090    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
1091    LockHolder lh(shard->getWriteLock());
1092    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1093    if (vb) {
1094        if (vb->getState() == vbucket_state_active) {
1095            // Set the current time ONLY for active vbuckets.
1096            ctx->curr_time = ep_real_time();
1097        } else {
1098            ctx->curr_time = 0;
1099        }
1100        KVStore *rwUnderlying = shard->getRWUnderlying();
1101        ExpiredItemsCallback cb(this, vbid);
1102        KVStatsCallback kvcb(this);
1103        if (!rwUnderlying->compactVBucket(vbid, ctx, cb, kvcb)) {
1104            LOG(EXTENSION_LOG_WARNING,
1105                    "VBucket compaction failed failed!!!");
1106            err = ENGINE_TMPFAIL;
1107            engine.storeEngineSpecific(cookie, NULL);
1108        } else {
1109            vb->setPurgeSeqno(ctx->purge_before_seq);
1110        }
1111    } else {
1112        err = ENGINE_NOT_MY_VBUCKET;
1113        engine.storeEngineSpecific(cookie, NULL);
1114    }
1115
1116    if (cookie) {
1117        engine.notifyIOComplete(cookie, err);
1118    }
1119    --stats.pendingCompactions;
1120    return false;
1121}
1122
1123bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
1124    LockHolder lh(vbsetMutex);
1125    bool rv(false);
1126
1127    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
1128    if (vb) {
1129        vbucket_state_t vbstate = vb->getState();
1130
1131        vbMap.removeBucket(vbid);
1132        lh.unlock();
1133
1134        std::list<std::string> tap_cursors = vb->checkpointManager.
1135                                             getTAPCursorNames();
1136        // Delete the vbucket database file and recreate the empty file
1137        scheduleVBDeletion(vb, NULL, 0, true);
1138        setVBucketState(vbid, vbstate, false);
1139
1140        // Copy the all cursors from the old vbucket into the new vbucket
1141        RCPtr<VBucket> newvb = vbMap.getBucket(vbid);
1142        newvb->checkpointManager.resetTAPCursors(tap_cursors);
1143
1144        rv = true;
1145    }
1146    return rv;
1147}
1148
1149extern "C" {
1150
1151    typedef struct {
1152        EventuallyPersistentEngine* engine;
1153        std::map<std::string, std::string> smap;
1154    } snapshot_stats_t;
1155
1156    static void add_stat(const char *key, const uint16_t klen,
1157                         const char *val, const uint32_t vlen,
1158                         const void *cookie) {
1159        cb_assert(cookie);
1160        void *ptr = const_cast<void *>(cookie);
1161        snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr);
1162        ObjectRegistry::onSwitchThread(snap->engine);
1163
1164        std::string k(key, klen);
1165        std::string v(val, vlen);
1166        snap->smap.insert(std::pair<std::string, std::string>(k, v));
1167    }
1168}
1169
1170void EventuallyPersistentStore::snapshotStats() {
1171    snapshot_stats_t snap;
1172    snap.engine = &engine;
1173    std::map<std::string, std::string>  smap;
1174    bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
1175              engine.getStats(&snap, "tap", 3, add_stat) == ENGINE_SUCCESS &&
1176              engine.getStats(&snap, "upr", 3, add_stat) == ENGINE_SUCCESS;
1177    if (rv && stats.isShutdown) {
1178        snap.smap["ep_force_shutdown"] = stats.forceShutdown ?
1179                                                              "true" : "false";
1180        std::stringstream ss;
1181        ss << ep_real_time();
1182        snap.smap["ep_shutdown_time"] = ss.str();
1183    }
1184    getOneRWUnderlying()->snapshotStats(snap.smap);
1185}
1186
1187void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
1188                                              const hrtime_t start,
1189                                              const hrtime_t stop) {
1190    if (stop >= start && start >= init) {
1191        // skip the measurement if the counter wrapped...
1192        ++stats.bgNumOperations;
1193        hrtime_t w = (start - init) / 1000;
1194        BlockTimer::log(start - init, "bgwait", stats.timingLog);
1195        stats.bgWaitHisto.add(w);
1196        stats.bgWait.fetch_add(w);
1197        atomic_setIfLess(stats.bgMinWait, w);
1198        atomic_setIfBigger(stats.bgMaxWait, w);
1199
1200        hrtime_t l = (stop - start) / 1000;
1201        BlockTimer::log(stop - start, "bgload", stats.timingLog);
1202        stats.bgLoadHisto.add(l);
1203        stats.bgLoad.fetch_add(l);
1204        atomic_setIfLess(stats.bgMinLoad, l);
1205        atomic_setIfBigger(stats.bgMaxLoad, l);
1206    }
1207}
1208
1209void EventuallyPersistentStore::completeBGFetch(const std::string &key,
1210                                                uint16_t vbucket,
1211                                                uint64_t rowid,
1212                                                const void *cookie,
1213                                                hrtime_t init,
1214                                                bool isMeta) {
1215    hrtime_t start(gethrtime());
1216    // Go find the data
1217    RememberingCallback<GetValue> gcb;
1218    if (isMeta) {
1219        gcb.val.setPartial();
1220        ++stats.bg_meta_fetched;
1221    } else {
1222        ++stats.bg_fetched;
1223    }
1224    getROUnderlying(vbucket)->get(key, rowid, vbucket, gcb);
1225    gcb.waitForValue();
1226    cb_assert(gcb.fired);
1227    ENGINE_ERROR_CODE status = gcb.val.getStatus();
1228
1229    // Lock to prevent a race condition between a fetch for restore and delete
1230    LockHolder lh(vbsetMutex);
1231
1232    RCPtr<VBucket> vb = getVBucket(vbucket);
1233    if (vb) {
1234        int bucket_num(0);
1235        LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1236        StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1237        if (isMeta) {
1238            if (v && v->unlocked_restoreMeta(gcb.val.getValue(),
1239                                             gcb.val.getStatus(), vb->ht)) {
1240                status = ENGINE_SUCCESS;
1241            }
1242        } else {
1243            if (v && v->isResident()) {
1244                status = ENGINE_SUCCESS;
1245            }
1246
1247            bool restore = false;
1248            if (eviction_policy == VALUE_ONLY &&
1249                v && !v->isResident() && !v->isDeleted()) {
1250                restore = true;
1251            } else if (eviction_policy == FULL_EVICTION &&
1252                       v && v->isTempInitialItem()) {
1253                restore = true;
1254            }
1255
1256            if (restore) {
1257                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1258                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1259                    cb_assert(v->isResident());
1260                    if (vb->getState() == vbucket_state_active &&
1261                        v->getExptime() != gcb.val.getValue()->getExptime() &&
1262                        v->getCas() == gcb.val.getValue()->getCas()) {
1263                        // MB-9306: It is possible that by the time bgfetcher
1264                        // returns, the item may have been updated and queued
1265                        // Hence test the CAS value to be the same first.
1266                        // exptime mutated, schedule it into new checkpoint
1267                        queueDirty(vb, v);
1268                        hlh.unlock();
1269                    }
1270                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1271                    v->setStoredValueState(
1272                                          StoredValue::state_non_existent_key);
1273                    if (eviction_policy == FULL_EVICTION) {
1274                        // For the full eviction, we should notify
1275                        // ENGINE_SUCCESS to the memcached worker thread, so
1276                        // that the worker thread can visit the ep-engine and
1277                        // figure out the correct error code.
1278                        status = ENGINE_SUCCESS;
1279                    }
1280                } else {
1281                    // underlying kvstore couldn't fetch requested data
1282                    // log returned error and notify TMPFAIL to client
1283                    LOG(EXTENSION_LOG_WARNING,
1284                        "Warning: failed background fetch for vb=%d seq=%d "
1285                        "key=%s", vbucket, v->getBySeqno(), key.c_str());
1286                    status = ENGINE_TMPFAIL;
1287                }
1288            }
1289        }
1290    } else {
1291        LOG(EXTENSION_LOG_INFO, "VBucket %d's file was deleted in the middle of"
1292            " a bg fetch for key %s\n", vbucket, key.c_str());
1293        status = ENGINE_NOT_MY_VBUCKET;
1294    }
1295
1296    lh.unlock();
1297
1298    hrtime_t stop = gethrtime();
1299    updateBGStats(init, start, stop);
1300    bgFetchQueue--;
1301
1302    delete gcb.val.getValue();
1303    engine.notifyIOComplete(cookie, status);
1304}
1305
1306void EventuallyPersistentStore::completeBGFetchMulti(uint16_t vbId,
1307                                 std::vector<bgfetched_item_t> &fetchedItems,
1308                                 hrtime_t startTime)
1309{
1310    RCPtr<VBucket> vb = getVBucket(vbId);
1311    if (!vb) {
1312        std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1313        for (; itemItr != fetchedItems.end(); ++itemItr) {
1314            engine.notifyIOComplete((*itemItr).second->cookie,
1315                                    ENGINE_NOT_MY_VBUCKET);
1316        }
1317        LOG(EXTENSION_LOG_WARNING,
1318            "EP Store completes %d of batched background fetch for "
1319            "for vBucket = %d that is already deleted\n",
1320            (int)fetchedItems.size(), vbId);
1321        return;
1322    }
1323
1324    std::vector<bgfetched_item_t>::iterator itemItr = fetchedItems.begin();
1325    for (; itemItr != fetchedItems.end(); ++itemItr) {
1326        VBucketBGFetchItem *bgitem = (*itemItr).second;
1327        ENGINE_ERROR_CODE status = bgitem->value.getStatus();
1328        Item *fetchedValue = bgitem->value.getValue();
1329        const std::string &key = (*itemItr).first;
1330
1331        int bucket = 0;
1332        LockHolder blh = vb->ht.getLockedBucket(key, &bucket);
1333        StoredValue *v = fetchValidValue(vb, key, bucket, true);
1334        if (bgitem->metaDataOnly) {
1335            if (v && v->unlocked_restoreMeta(fetchedValue, status, vb->ht)) {
1336                status = ENGINE_SUCCESS;
1337            }
1338        } else {
1339            if (v && v->isResident()) {
1340                status = ENGINE_SUCCESS;
1341            }
1342
1343            bool restore = false;
1344            if (eviction_policy == VALUE_ONLY &&
1345                v && !v->isResident() && !v->isDeleted()) {
1346                restore = true;
1347            } else if (eviction_policy == FULL_EVICTION &&
1348                       v && v->isTempInitialItem()) {
1349                restore = true;
1350            }
1351
1352            if (restore) {
1353                if (status == ENGINE_SUCCESS) {
1354                    v->unlocked_restoreValue(fetchedValue, vb->ht);
1355                    cb_assert(v->isResident());
1356                    if (vb->getState() == vbucket_state_active &&
1357                        v->getExptime() != fetchedValue->getExptime() &&
1358                        v->getCas() == fetchedValue->getCas()) {
1359                        // MB-9306: It is possible that by the time
1360                        // bgfetcher returns, the item may have been
1361                        // updated and queued
1362                        // Hence test the CAS value to be the same first.
1363                        // exptime mutated, schedule it into new checkpoint
1364                        queueDirty(vb, v);
1365                        blh.unlock();
1366                    }
1367                } else if (status == ENGINE_KEY_ENOENT) {
1368                    v->setStoredValueState(StoredValue::state_non_existent_key);
1369                    if (eviction_policy == FULL_EVICTION) {
1370                        // For the full eviction, we should notify
1371                        // ENGINE_SUCCESS to the memcached worker thread,
1372                        // so that the worker thread can visit the
1373                        // ep-engine and figure out the correct error
1374                        // code.
1375                        status = ENGINE_SUCCESS;
1376                    }
1377                } else {
1378                    // underlying kvstore couldn't fetch requested data
1379                    // log returned error and notify TMPFAIL to client
1380                    LOG(EXTENSION_LOG_WARNING,
1381                        "Warning: failed background fetch for vb=%d "
1382                        "key=%s", vbId, key.c_str());
1383                    status = ENGINE_TMPFAIL;
1384                }
1385            }
1386        }
1387        blh.unlock();
1388
1389        if (bgitem->metaDataOnly) {
1390            ++stats.bg_meta_fetched;
1391        } else {
1392            ++stats.bg_fetched;
1393        }
1394
1395        hrtime_t endTime = gethrtime();
1396        updateBGStats(bgitem->initTime, startTime, endTime);
1397        engine.notifyIOComplete(bgitem->cookie, status);
1398    }
1399
1400    LOG(EXTENSION_LOG_DEBUG,
1401        "EP Store completes %d of batched background fetch "
1402        "for vBucket = %d endTime = %lld\n",
1403        fetchedItems.size(), vbId, gethrtime()/1000000);
1404}
1405
1406void EventuallyPersistentStore::bgFetch(const std::string &key,
1407                                        uint16_t vbucket,
1408                                        uint64_t rowid,
1409                                        const void *cookie,
1410                                        bool isMeta) {
1411    std::stringstream ss;
1412
1413    if (multiBGFetchEnabled()) {
1414        RCPtr<VBucket> vb = getVBucket(vbucket);
1415        cb_assert(vb);
1416        KVShard *myShard = vbMap.getShard(vbucket);
1417
1418        // schedule to the current batch of background fetch of the given
1419        // vbucket
1420        VBucketBGFetchItem * fetchThis = new VBucketBGFetchItem(cookie,
1421                                                                isMeta);
1422        vb->queueBGFetchItem(key, fetchThis, myShard->getBgFetcher());
1423        myShard->getBgFetcher()->notifyBGEvent();
1424        ss << "Queued a background fetch, now at "
1425           << vb->numPendingBGFetchItems() << std::endl;
1426        LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1427    } else {
1428        bgFetchQueue++;
1429        stats.maxRemainingBgJobs = std::max(stats.maxRemainingBgJobs,
1430                                            bgFetchQueue.load());
1431        ExecutorPool* iom = ExecutorPool::get();
1432        ExTask task = new BGFetchTask(&engine, key, vbucket, rowid, cookie,
1433                                      isMeta,
1434                                      Priority::BgFetcherGetMetaPriority,
1435                                      bgFetchDelay, false);
1436        iom->schedule(task, READER_TASK_IDX);
1437        ss << "Queued a background fetch, now at " << bgFetchQueue.load()
1438           << std::endl;
1439        LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
1440    }
1441}
1442
1443GetValue EventuallyPersistentStore::getInternal(const std::string &key,
1444                                                uint16_t vbucket,
1445                                                const void *cookie,
1446                                                bool queueBG,
1447                                                bool honorStates,
1448                                                vbucket_state_t allowedState,
1449                                                bool trackReference) {
1450
1451    vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ?
1452        vbucket_state_replica : vbucket_state_active;
1453    RCPtr<VBucket> vb = getVBucket(vbucket);
1454    if (!vb) {
1455        ++stats.numNotMyVBuckets;
1456        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1457    } else if (honorStates && vb->getState() == vbucket_state_dead) {
1458        ++stats.numNotMyVBuckets;
1459        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1460    } else if (honorStates && vb->getState() == disallowedState) {
1461        ++stats.numNotMyVBuckets;
1462        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1463    } else if (honorStates && vb->getState() == vbucket_state_pending) {
1464        if (vb->addPendingOp(cookie)) {
1465            return GetValue(NULL, ENGINE_EWOULDBLOCK);
1466        }
1467    }
1468
1469    int bucket_num(0);
1470    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1471    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
1472                                     trackReference);
1473    if (v) {
1474        if (v->isDeleted() || v->isTempDeletedItem() ||
1475            v->isTempNonExistentItem()) {
1476            GetValue rv;
1477            return rv;
1478        }
1479        // If the value is not resident, wait for it...
1480        if (!v->isResident()) {
1481            if (queueBG) {
1482                bgFetch(key, vbucket, v->getBySeqno(), cookie);
1483            }
1484            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno(),
1485                            true, v->getNRUValue());
1486        }
1487
1488        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1489                    ENGINE_SUCCESS, v->getBySeqno(), false, v->getNRUValue());
1490        return rv;
1491    } else {
1492        if (eviction_policy == VALUE_ONLY || diskFlushAll) {
1493            GetValue rv;
1494            return rv;
1495        }
1496        ENGINE_ERROR_CODE ec = ENGINE_EWOULDBLOCK;
1497        if (queueBG) { // Full eviction and need a bg fetch.
1498            ec = addTempItemForBgFetch(lh, bucket_num, key, vb,
1499                                       cookie, false);
1500        }
1501        return GetValue(NULL, ec, -1, true);
1502    }
1503}
1504
1505GetValue EventuallyPersistentStore::getRandomKey() {
1506    long max = vbMap.getSize();
1507
1508    long start = random() % max;
1509    long curr = start;
1510    Item *itm = NULL;
1511
1512    while (itm == NULL) {
1513        RCPtr<VBucket> vb = getVBucket(curr++);
1514        while (!vb || vb->getState() != vbucket_state_active) {
1515            if (curr == start) {
1516                return GetValue(NULL, ENGINE_KEY_ENOENT);
1517            }
1518            if (curr == max) {
1519                curr = 0;
1520            }
1521
1522            vb = getVBucket(curr++);
1523        }
1524
1525        if ((itm = vb->ht.getRandomKey(random())) != NULL) {
1526            GetValue rv(itm, ENGINE_SUCCESS);
1527            return rv;
1528        }
1529
1530        if (curr == max) {
1531            curr = 0;
1532        }
1533
1534        if (curr == start) {
1535            return GetValue(NULL, ENGINE_KEY_ENOENT);
1536        }
1537        // Search next vbucket
1538    }
1539
1540    return GetValue(NULL, ENGINE_KEY_ENOENT);
1541}
1542
1543
1544ENGINE_ERROR_CODE EventuallyPersistentStore::getMetaData(
1545                                                        const std::string &key,
1546                                                        uint16_t vbucket,
1547                                                        const void *cookie,
1548                                                        ItemMetaData &metadata,
1549                                                        uint32_t &deleted,
1550                                                        bool trackReferenced)
1551{
1552    (void) cookie;
1553    RCPtr<VBucket> vb = getVBucket(vbucket);
1554    if (!vb || vb->getState() == vbucket_state_dead ||
1555        vb->getState() == vbucket_state_replica) {
1556        ++stats.numNotMyVBuckets;
1557        return ENGINE_NOT_MY_VBUCKET;
1558    }
1559
1560    int bucket_num(0);
1561    deleted = 0;
1562    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1563    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true,
1564                                          trackReferenced);
1565
1566    if (v) {
1567        stats.numOpsGetMeta++;
1568
1569        if (v->isTempInitialItem()) { // Need bg meta fetch.
1570            bgFetch(key, vbucket, -1, cookie, true);
1571            return ENGINE_EWOULDBLOCK;
1572        } else if (v->isTempNonExistentItem()) {
1573            metadata.cas = v->getCas();
1574            return ENGINE_KEY_ENOENT;
1575        } else {
1576            if (v->isTempDeletedItem() || v->isDeleted() ||
1577                v->isExpired(ep_real_time())) {
1578                deleted |= GET_META_ITEM_DELETED_FLAG;
1579            }
1580            metadata.cas = v->getCas();
1581            metadata.flags = v->getFlags();
1582            metadata.exptime = v->getExptime();
1583            metadata.revSeqno = v->getRevSeqno();
1584            return ENGINE_SUCCESS;
1585        }
1586    } else {
1587        // The key wasn't found. However, this may be because it was previously
1588        // deleted or evicted with the full eviction strategy.
1589        // So, add a temporary item corresponding to the key to the hash table
1590        // and schedule a background fetch for its metadata from the persistent
1591        // store. The item's state will be updated after the fetch completes.
1592        return addTempItemForBgFetch(lh, bucket_num, key, vb, cookie, true);
1593    }
1594}
1595
1596ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
1597                                                         uint64_t cas,
1598                                                         const void *cookie,
1599                                                         bool force,
1600                                                         bool allowExisting,
1601                                                         uint8_t nru,
1602                                                         bool genBySeqno)
1603{
1604    RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
1605    if (!vb || vb->getState() == vbucket_state_dead) {
1606        ++stats.numNotMyVBuckets;
1607        return ENGINE_NOT_MY_VBUCKET;
1608    } else if (vb->getState() == vbucket_state_replica && !force) {
1609        ++stats.numNotMyVBuckets;
1610        return ENGINE_NOT_MY_VBUCKET;
1611    } else if (vb->getState() == vbucket_state_pending && !force) {
1612        if (vb->addPendingOp(cookie)) {
1613            return ENGINE_EWOULDBLOCK;
1614        }
1615    }
1616
1617    int bucket_num(0);
1618    LockHolder lh = vb->ht.getLockedBucket(itm.getKey(), &bucket_num);
1619    StoredValue *v = vb->ht.unlocked_find(itm.getKey(), bucket_num, true,
1620                                          false);
1621
1622    if (!force) {
1623        if (v)  {
1624            if (v->isTempInitialItem()) {
1625                bgFetch(itm.getKey(), itm.getVBucketId(), -1, cookie, true);
1626                return ENGINE_EWOULDBLOCK;
1627            }
1628            if (!conflictResolver->resolve(v, itm.getMetaData(), false)) {
1629                ++stats.numOpsSetMetaResolutionFailed;
1630                return ENGINE_KEY_EEXISTS;
1631            }
1632        } else {
1633            return addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1634                                         cookie, true);
1635        }
1636    }
1637
1638    if (v && v->isLocked(ep_current_time()) &&
1639        (vb->getState() == vbucket_state_replica ||
1640         vb->getState() == vbucket_state_pending)) {
1641        v->unlock();
1642    }
1643    mutation_type_t mtype = vb->ht.unlocked_set(v, itm, cas, allowExisting,
1644                                                true, eviction_policy, nru);
1645
1646    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
1647    switch (mtype) {
1648    case NOMEM:
1649        ret = ENGINE_ENOMEM;
1650        break;
1651    case INVALID_CAS:
1652    case IS_LOCKED:
1653        ret = ENGINE_KEY_EEXISTS;
1654        break;
1655    case INVALID_VBUCKET:
1656        ret = ENGINE_NOT_MY_VBUCKET;
1657        break;
1658    case WAS_DIRTY:
1659    case WAS_CLEAN:
1660        queueDirty(vb, v, false, true, genBySeqno);
1661        break;
1662    case NOT_FOUND:
1663        ret = ENGINE_KEY_ENOENT;
1664        break;
1665    case NEED_BG_FETCH:
1666        {            // CAS operation with non-resident item + full eviction.
1667            if (v) { // temp item is already created. Simply schedule a
1668                lh.unlock(); // bg fetch job.
1669                bgFetch(itm.getKey(), vb->getId(), -1, cookie, true);
1670                return ENGINE_EWOULDBLOCK;
1671            }
1672            ret = addTempItemForBgFetch(lh, bucket_num, itm.getKey(), vb,
1673                                        cookie, true);
1674        }
1675    }
1676
1677    return ret;
1678}
1679
1680GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
1681                                                    uint16_t vbucket,
1682                                                    const void *cookie,
1683                                                    time_t exptime)
1684{
1685    RCPtr<VBucket> vb = getVBucket(vbucket);
1686    if (!vb) {
1687        ++stats.numNotMyVBuckets;
1688        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1689    } else if (vb->getState() == vbucket_state_dead) {
1690        ++stats.numNotMyVBuckets;
1691        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1692    } else if (vb->getState() == vbucket_state_replica) {
1693        ++stats.numNotMyVBuckets;
1694        return GetValue(NULL, ENGINE_NOT_MY_VBUCKET);
1695    } else if (vb->getState() == vbucket_state_pending) {
1696        if (vb->addPendingOp(cookie)) {
1697            return GetValue(NULL, ENGINE_EWOULDBLOCK);
1698        }
1699    }
1700
1701    int bucket_num(0);
1702    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1703    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1704
1705    if (v) {
1706        if (v->isDeleted() || v->isTempDeletedItem() ||
1707            v->isTempNonExistentItem()) {
1708            GetValue rv;
1709            return rv;
1710        }
1711
1712        if (!v->isResident()) {
1713            bgFetch(key, vbucket, v->getBySeqno(), cookie);
1714            return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getBySeqno());
1715        }
1716        if (v->isLocked(ep_current_time())) {
1717            GetValue rv(NULL, ENGINE_KEY_EEXISTS, 0);
1718            return rv;
1719        }
1720
1721        bool exptime_mutated = exptime != v->getExptime() ? true : false;
1722        if (exptime_mutated) {
1723           v->markDirty();
1724           v->setExptime(exptime);
1725        }
1726
1727        GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
1728                    ENGINE_SUCCESS, v->getBySeqno());
1729
1730        if (exptime_mutated) {
1731            // persist the itme in the underlying storage for
1732            // mutated exptime
1733            queueDirty(vb, v);
1734            lh.unlock();
1735        }
1736        return rv;
1737    } else {
1738        if (eviction_policy == VALUE_ONLY) {
1739            GetValue rv;
1740            return rv;
1741        } else {
1742            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
1743                                                         vb, cookie, false);
1744            return GetValue(NULL, ec, -1, true);
1745        }
1746    }
1747}
1748
1749ENGINE_ERROR_CODE
1750EventuallyPersistentStore::statsVKey(const std::string &key,
1751                                     uint16_t vbucket,
1752                                     const void *cookie) {
1753    RCPtr<VBucket> vb = getVBucket(vbucket);
1754    if (!vb) {
1755        return ENGINE_NOT_MY_VBUCKET;
1756    }
1757
1758    int bucket_num(0);
1759    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1760    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1761
1762    if (v) {
1763        if (v->isDeleted() || v->isTempDeletedItem() ||
1764            v->isTempNonExistentItem()) {
1765            return ENGINE_KEY_ENOENT;
1766        }
1767        bgFetchQueue++;
1768        cb_assert(bgFetchQueue > 0);
1769        ExecutorPool* iom = ExecutorPool::get();
1770        ExTask task = new VKeyStatBGFetchTask(&engine, key, vbucket,
1771                                           v->getBySeqno(), cookie,
1772                                           Priority::VKeyStatBgFetcherPriority,
1773                                           bgFetchDelay, false);
1774        iom->schedule(task, READER_TASK_IDX);
1775        return ENGINE_EWOULDBLOCK;
1776    } else {
1777        if (eviction_policy == VALUE_ONLY) {
1778            return ENGINE_KEY_ENOENT;
1779        } else {
1780            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
1781                                                        eviction_policy);
1782            switch(rv) {
1783            case ADD_NOMEM:
1784                return ENGINE_ENOMEM;
1785            case ADD_EXISTS:
1786            case ADD_UNDEL:
1787            case ADD_SUCCESS:
1788            case ADD_TMP_AND_BG_FETCH:
1789                // Since the hashtable bucket is locked, we shouldn't get here
1790                abort();
1791            case ADD_BG_FETCH:
1792                {
1793                    ++bgFetchQueue;
1794                    cb_assert(bgFetchQueue > 0);
1795                    ExecutorPool* iom = ExecutorPool::get();
1796                    ExTask task = new VKeyStatBGFetchTask(&engine, key,
1797                                                          vbucket, -1, cookie,
1798                                           Priority::VKeyStatBgFetcherPriority,
1799                                                          bgFetchDelay, false);
1800                    iom->schedule(task, READER_TASK_IDX);
1801                }
1802            }
1803            return ENGINE_EWOULDBLOCK;
1804        }
1805    }
1806}
1807
1808void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
1809                                                  std::string &key,
1810                                                  uint16_t vbid,
1811                                                  uint64_t bySeqNum) {
1812    RememberingCallback<GetValue> gcb;
1813
1814    getROUnderlying(vbid)->get(key, bySeqNum, vbid, gcb);
1815    gcb.waitForValue();
1816    cb_assert(gcb.fired);
1817
1818    if (eviction_policy == FULL_EVICTION) {
1819        RCPtr<VBucket> vb = getVBucket(vbid);
1820        if (vb) {
1821            int bucket_num(0);
1822            LockHolder hlh = vb->ht.getLockedBucket(key, &bucket_num);
1823            StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1824            if (v && v->isTempInitialItem()) {
1825                if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1826                    v->unlocked_restoreValue(gcb.val.getValue(), vb->ht);
1827                    cb_assert(v->isResident());
1828                } else if (gcb.val.getStatus() == ENGINE_KEY_ENOENT) {
1829                    v->setStoredValueState(
1830                                          StoredValue::state_non_existent_key);
1831                } else {
1832                    // underlying kvstore couldn't fetch requested data
1833                    // log returned error and notify TMPFAIL to client
1834                    LOG(EXTENSION_LOG_WARNING,
1835                        "Warning: failed background fetch for vb=%d seq=%d "
1836                        "key=%s", vbid, v->getBySeqno(), key.c_str());
1837                }
1838            }
1839        }
1840    }
1841
1842    if (gcb.val.getStatus() == ENGINE_SUCCESS) {
1843        engine.addLookupResult(cookie, gcb.val.getValue());
1844    } else {
1845        engine.addLookupResult(cookie, NULL);
1846    }
1847
1848    bgFetchQueue--;
1849    engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
1850}
1851
1852bool EventuallyPersistentStore::getLocked(const std::string &key,
1853                                          uint16_t vbucket,
1854                                          Callback<GetValue> &cb,
1855                                          rel_time_t currentTime,
1856                                          uint32_t lockTimeout,
1857                                          const void *cookie) {
1858    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
1859    if (!vb) {
1860        ++stats.numNotMyVBuckets;
1861        GetValue rv(NULL, ENGINE_NOT_MY_VBUCKET);
1862        cb.callback(rv);
1863        return false;
1864    }
1865
1866    int bucket_num(0);
1867    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1868    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1869
1870    if (v) {
1871        if (v->isDeleted() || v->isTempNonExistentItem() ||
1872            v->isTempDeletedItem()) {
1873            GetValue rv;
1874            cb.callback(rv);
1875            return true;
1876        }
1877
1878        // if v is locked return error
1879        if (v->isLocked(currentTime)) {
1880            GetValue rv;
1881            cb.callback(rv);
1882            return false;
1883        }
1884
1885        // If the value is not resident, wait for it...
1886        if (!v->isResident()) {
1887            if (cookie) {
1888                bgFetch(key, vbucket, v->getBySeqno(), cookie);
1889            }
1890            GetValue rv(NULL, ENGINE_EWOULDBLOCK, -1, true);
1891            cb.callback(rv);
1892            return false;
1893        }
1894
1895        // acquire lock and increment cas value
1896        v->lock(currentTime + lockTimeout);
1897
1898        Item *it = v->toItem(false, vbucket);
1899        it->setCas();
1900        v->setCas(it->getCas());
1901
1902        GetValue rv(it);
1903        cb.callback(rv);
1904        return true;
1905    } else {
1906        if (eviction_policy == VALUE_ONLY) {
1907            GetValue rv;
1908            cb.callback(rv);
1909            return true;
1910        } else {
1911            ENGINE_ERROR_CODE ec = addTempItemForBgFetch(lh, bucket_num, key,
1912                                                         vb, cookie, false);
1913            GetValue rv(NULL, ec, -1, true);
1914            cb.callback(rv);
1915            return false;
1916        }
1917    }
1918}
1919
1920ENGINE_ERROR_CODE
1921EventuallyPersistentStore::unlockKey(const std::string &key,
1922                                     uint16_t vbucket,
1923                                     uint64_t cas,
1924                                     rel_time_t currentTime)
1925{
1926
1927    RCPtr<VBucket> vb = getVBucket(vbucket, vbucket_state_active);
1928    if (!vb) {
1929        ++stats.numNotMyVBuckets;
1930        return ENGINE_NOT_MY_VBUCKET;
1931    }
1932
1933    int bucket_num(0);
1934    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1935    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1936
1937    if (v) {
1938        if (v->isDeleted() || v->isTempNonExistentItem() ||
1939            v->isTempDeletedItem()) {
1940            return ENGINE_KEY_ENOENT;
1941        }
1942        if (v->isLocked(currentTime)) {
1943            if (v->getCas() == cas) {
1944                v->unlock();
1945                return ENGINE_SUCCESS;
1946            }
1947        }
1948        return ENGINE_TMPFAIL;
1949    } else {
1950        if (eviction_policy == VALUE_ONLY) {
1951            return ENGINE_KEY_ENOENT;
1952        } else {
1953            // With the full eviction, an item's lock is automatically
1954            // released when the item is evicted from memory. Therefore,
1955            // we simply return ENGINE_TMPFAIL when we receive unlockKey
1956            // for an item that is not in memocy cache. Note that we don't
1957            // spawn any bg fetch job to figure out if an item actually
1958            // exists in disk or not.
1959            return ENGINE_TMPFAIL;
1960        }
1961    }
1962}
1963
1964
1965ENGINE_ERROR_CODE EventuallyPersistentStore::getKeyStats(
1966                                            const std::string &key,
1967                                            uint16_t vbucket,
1968                                            const void *cookie,
1969                                            struct key_stats &kstats,
1970                                            bool bgfetch,
1971                                            bool wantsDeleted)
1972{
1973    RCPtr<VBucket> vb = getVBucket(vbucket);
1974    if (!vb) {
1975        return ENGINE_NOT_MY_VBUCKET;
1976    }
1977
1978    int bucket_num(0);
1979    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1980    StoredValue *v = fetchValidValue(vb, key, bucket_num, true);
1981
1982    if (v) {
1983        if ((v->isDeleted() && !wantsDeleted) ||
1984            v->isTempNonExistentItem() || v->isTempDeletedItem()) {
1985            return ENGINE_KEY_ENOENT;
1986        }
1987        if (eviction_policy == FULL_EVICTION &&
1988            v->isTempInitialItem() && bgfetch) {
1989            lh.unlock();
1990            bgFetch(key, vbucket, -1, cookie, true);
1991            return ENGINE_EWOULDBLOCK;
1992        }
1993        kstats.logically_deleted = v->isDeleted();
1994        kstats.dirty = v->isDirty();
1995        kstats.exptime = v->getExptime();
1996        kstats.flags = v->getFlags();
1997        kstats.cas = v->getCas();
1998        kstats.vb_state = vb->getState();
1999        return ENGINE_SUCCESS;
2000    } else {
2001        if (eviction_policy == VALUE_ONLY) {
2002            return ENGINE_KEY_ENOENT;
2003        } else {
2004            if (bgfetch) {
2005                return addTempItemForBgFetch(lh, bucket_num, key, vb,
2006                                             cookie, true);
2007            } else {
2008                return ENGINE_KEY_ENOENT;
2009            }
2010        }
2011    }
2012}
2013
2014std::string EventuallyPersistentStore::validateKey(const std::string &key,
2015                                                   uint16_t vbucket,
2016                                                   Item &diskItem) {
2017    int bucket_num(0);
2018    RCPtr<VBucket> vb = getVBucket(vbucket);
2019    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2020    StoredValue *v = fetchValidValue(vb, key, bucket_num, true,
2021                                     false, true);
2022
2023    if (v) {
2024        if (v->isDeleted() || v->isTempNonExistentItem() ||
2025            v->isTempDeletedItem()) {
2026            return "item_deleted";
2027        }
2028
2029        if (diskItem.getFlags() != v->getFlags()) {
2030            return "flags_mismatch";
2031        } else if (v->isResident() && memcmp(diskItem.getData(),
2032                                             v->getValue()->getData(),
2033                                             diskItem.getNBytes())) {
2034            return "data_mismatch";
2035        } else {
2036            return "valid";
2037        }
2038    } else {
2039        return "item_deleted";
2040    }
2041
2042}
2043
2044ENGINE_ERROR_CODE EventuallyPersistentStore::deleteItem(const std::string &key,
2045                                                        uint64_t* cas,
2046                                                        uint16_t vbucket,
2047                                                        const void *cookie,
2048                                                        bool force,
2049                                                        ItemMetaData *itemMeta,
2050                                                        bool tapBackfill)
2051{
2052    RCPtr<VBucket> vb = getVBucket(vbucket);
2053    if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2054        ++stats.numNotMyVBuckets;
2055        return ENGINE_NOT_MY_VBUCKET;
2056    } else if(vb->getState() == vbucket_state_replica && !force) {
2057        ++stats.numNotMyVBuckets;
2058        return ENGINE_NOT_MY_VBUCKET;
2059    } else if(vb->getState() == vbucket_state_pending && !force) {
2060        if (vb->addPendingOp(cookie)) {
2061            return ENGINE_EWOULDBLOCK;
2062        }
2063    }
2064
2065    int bucket_num(0);
2066    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2067    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2068    if (!v || v->isDeleted() || v->isTempItem()) {
2069        if (eviction_policy == VALUE_ONLY) {
2070            return ENGINE_KEY_ENOENT;
2071        } else { // Full eviction.
2072            if (!force) {
2073                if (!v) { // Item might be evicted from cache.
2074                    return addTempItemForBgFetch(lh, bucket_num, key, vb,
2075                                                 cookie, true);
2076                } else if (v->isTempInitialItem()) {
2077                    lh.unlock();
2078                    bgFetch(key, vbucket, -1, cookie, true);
2079                    return ENGINE_EWOULDBLOCK;
2080                } else { // Non-existent or deleted key.
2081                    return ENGINE_KEY_ENOENT;
2082                }
2083            } else {
2084                if (!v) { // Item might be evicted from cache.
2085                    // Create a temp item and delete it below as it is a
2086                    // force deletion.
2087                    add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num,
2088                                                              key,
2089                                                              eviction_policy);
2090                    if (rv == ADD_NOMEM) {
2091                        return ENGINE_ENOMEM;
2092                    }
2093                    v = vb->ht.unlocked_find(key, bucket_num, true, false);
2094                    v->setStoredValueState(StoredValue::state_deleted_key);
2095                } else if (v->isTempInitialItem()) {
2096                    v->setStoredValueState(StoredValue::state_deleted_key);
2097                } else { // Non-existent or deleted key.
2098                    return ENGINE_KEY_ENOENT;
2099                }
2100            }
2101        }
2102    }
2103
2104    if (v && v->isLocked(ep_current_time()) &&
2105        (vb->getState() == vbucket_state_replica ||
2106         vb->getState() == vbucket_state_pending)) {
2107        v->unlock();
2108    }
2109    mutation_type_t delrv;
2110    delrv = vb->ht.unlocked_softDelete(v, *cas, eviction_policy);
2111
2112    if (itemMeta && v) {
2113        itemMeta->revSeqno = v->getRevSeqno();
2114        itemMeta->cas = v->getCas();
2115        itemMeta->flags = v->getFlags();
2116        itemMeta->exptime = v->getExptime();
2117    }
2118    *cas = v ? v->getCas() : 0;
2119
2120    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2121    switch (delrv) {
2122    case NOMEM:
2123        ret = ENGINE_ENOMEM;
2124        break;
2125    case INVALID_VBUCKET:
2126        ret = ENGINE_NOT_MY_VBUCKET;
2127        break;
2128    case INVALID_CAS:
2129        ret = ENGINE_KEY_EEXISTS;
2130        break;
2131    case IS_LOCKED:
2132        ret = ENGINE_TMPFAIL;
2133        break;
2134    case NOT_FOUND:
2135        ret = ENGINE_KEY_ENOENT;
2136        break;
2137    case WAS_DIRTY:
2138    case WAS_CLEAN:
2139        queueDirty(vb, v, tapBackfill);
2140        break;
2141    case NEED_BG_FETCH:
2142        // We already figured out if a bg fetch is requred for a full-evicted
2143        // item above.
2144        abort();
2145    }
2146    return ret;
2147}
2148
2149ENGINE_ERROR_CODE EventuallyPersistentStore::deleteWithMeta(
2150                                                        const std::string &key,
2151                                                        uint64_t* cas,
2152                                                        uint16_t vbucket,
2153                                                        const void *cookie,
2154                                                        bool force,
2155                                                        ItemMetaData *itemMeta,
2156                                                        bool tapBackfill,
2157                                                        bool genBySeqno,
2158                                                        uint64_t bySeqno)
2159{
2160    RCPtr<VBucket> vb = getVBucket(vbucket);
2161    if (!vb || (vb->getState() == vbucket_state_dead && !force)) {
2162        ++stats.numNotMyVBuckets;
2163        return ENGINE_NOT_MY_VBUCKET;
2164    } else if(vb->getState() == vbucket_state_replica && !force) {
2165        ++stats.numNotMyVBuckets;
2166        return ENGINE_NOT_MY_VBUCKET;
2167    } else if(vb->getState() == vbucket_state_pending && !force) {
2168        if (vb->addPendingOp(cookie)) {
2169            return ENGINE_EWOULDBLOCK;
2170        }
2171    }
2172
2173    int bucket_num(0);
2174    LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
2175    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
2176    if (!force) { // Need conflict resolution.
2177        if (v)  {
2178            if (v->isTempInitialItem()) {
2179                bgFetch(key, vbucket, -1, cookie, true);
2180                return ENGINE_EWOULDBLOCK;
2181            }
2182            if (!conflictResolver->resolve(v, *itemMeta, true)) {
2183                ++stats.numOpsDelMetaResolutionFailed;
2184                return ENGINE_KEY_EEXISTS;
2185            }
2186        } else {
2187            // Item is 1) deleted or not existent in the value eviction case OR
2188            // 2) deleted or evicted in the full eviction.
2189            return addTempItemForBgFetch(lh, bucket_num, key, vb,
2190                                         cookie, true);
2191        }
2192    } else {
2193        if (!v) {
2194            // Create a temp item and delete it below as it is a force deletion
2195            add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
2196                                                        eviction_policy);
2197            if (rv == ADD_NOMEM) {
2198                return ENGINE_ENOMEM;
2199            }
2200            v = vb->ht.unlocked_find(key, bucket_num, true, false);
2201            v->setStoredValueState(StoredValue::state_deleted_key);
2202        } else if (v->isTempInitialItem()) {
2203            v->setStoredValueState(StoredValue::state_deleted_key);
2204        }
2205    }
2206
2207    if (v && v->isLocked(ep_current_time()) &&
2208        (vb->getState() == vbucket_state_replica ||
2209         vb->getState() == vbucket_state_pending)) {
2210        v->unlock();
2211    }
2212    mutation_type_t delrv;
2213    delrv = vb->ht.unlocked_softDelete(v, *cas, *itemMeta,
2214                                       eviction_policy, true);
2215    *cas = v ? v->getCas() : 0;
2216
2217    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
2218    switch (delrv) {
2219    case NOMEM:
2220        ret = ENGINE_ENOMEM;
2221        break;
2222    case INVALID_VBUCKET:
2223        ret = ENGINE_NOT_MY_VBUCKET;
2224        break;
2225    case INVALID_CAS:
2226        ret = ENGINE_KEY_EEXISTS;
2227        break;
2228    case IS_LOCKED:
2229        ret = ENGINE_TMPFAIL;
2230        break;
2231    case NOT_FOUND:
2232        ret = ENGINE_KEY_ENOENT;
2233        break;
2234    case WAS_DIRTY:
2235    case WAS_CLEAN:
2236        if (!genBySeqno) {
2237            v->setBySeqno(bySeqno);
2238        }
2239        queueDirty(vb, v, tapBackfill, true, genBySeqno);
2240        break;
2241    case NEED_BG_FETCH:
2242        lh.unlock();
2243        bgFetch(key, vbucket, -1, cookie, true);
2244        ret = ENGINE_EWOULDBLOCK;
2245    }
2246
2247    return ret;
2248}
2249
2250void EventuallyPersistentStore::reset() {
2251    std::vector<int> buckets = vbMap.getBuckets();
2252    std::vector<int>::iterator it;
2253    for (it = buckets.begin(); it != buckets.end(); ++it) {
2254        RCPtr<VBucket> vb = getVBucket(*it);
2255        if (vb) {
2256            vb->ht.clear();
2257            vb->checkpointManager.clear(vb->getState());
2258            vb->resetStats();
2259        }
2260    }
2261
2262    bool inverse = false;
2263    if (diskFlushAll.compare_exchange_strong(inverse, true)) {
2264        ++stats.diskQueueSize;
2265        // wake up (notify) one flusher is good enough for diskFlushAll
2266        vbMap.shards[EP_PRIMARY_SHARD]->getFlusher()->notifyFlushEvent();
2267    }
2268}
2269
2270/**
2271 * Callback invoked after persisting an item from memory to disk.
2272 *
2273 * This class exists to create a closure around a few variables within
2274 * EventuallyPersistentStore::flushOne so that an object can be
2275 * requeued in case of failure to store in the underlying layer.
2276 */
2277class PersistenceCallback : public Callback<mutation_result>,
2278                            public Callback<int> {
2279public:
2280
2281    PersistenceCallback(const queued_item &qi, RCPtr<VBucket> &vb,
2282                        EventuallyPersistentStore *st, EPStats *s, uint64_t c)
2283        : queuedItem(qi), vbucket(vb), store(st), stats(s), cas(c) {
2284        cb_assert(vb);
2285        cb_assert(s);
2286    }
2287
2288    // This callback is invoked for set only.
2289    void callback(mutation_result &value) {
2290        if (value.first == 1) {
2291            int bucket_num(0);
2292            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2293                                                        &bucket_num);
2294            StoredValue *v = store->fetchValidValue(vbucket,
2295                                                    queuedItem->getKey(),
2296                                                    bucket_num, true, false);
2297            if (v) {
2298                if (v->getCas() == cas) {
2299                    // mark this item clean only if current and stored cas
2300                    // value match
2301                    v->markClean();
2302                }
2303                if (v->isNewCacheItem()) {
2304                    if (value.second) {
2305                        // Insert in value-only or full eviction mode.
2306                        ++vbucket->opsCreate;
2307                    } else { // Update in full eviction mode.
2308                        --vbucket->ht.numTotalItems;
2309                        ++vbucket->opsUpdate;
2310                    }
2311                    v->setNewCacheItem(false);
2312                } else { // Update in value-only or full eviction mode.
2313                    ++vbucket->opsUpdate;
2314                }
2315            }
2316
2317            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2318            stats->decrDiskQueueSize(1);
2319            stats->totalPersisted++;
2320        } else {
2321            // If the return was 0 here, we're in a bad state because
2322            // we do not know the rowid of this object.
2323            if (value.first == 0) {
2324                int bucket_num(0);
2325                LockHolder lh = vbucket->ht.getLockedBucket(
2326                                           queuedItem->getKey(), &bucket_num);
2327                StoredValue *v = store->fetchValidValue(vbucket,
2328                                                        queuedItem->getKey(),
2329                                                        bucket_num, true,
2330                                                        false);
2331                if (v) {
2332                    std::stringstream ss;
2333                    ss << "Persisting ``" << queuedItem->getKey() << "'' on vb"
2334                       << queuedItem->getVBucketId() << " (rowid="
2335                       << v->getBySeqno() << ") returned 0 updates\n";
2336                    LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2337                } else {
2338                    LOG(EXTENSION_LOG_WARNING,
2339                        "Error persisting now missing ``%s'' from vb%d",
2340                        queuedItem->getKey().c_str(),
2341                        queuedItem->getVBucketId());
2342                }
2343
2344                vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2345                stats->decrDiskQueueSize(1);
2346            } else {
2347                std::stringstream ss;
2348                ss <<
2349                "Fatal error in persisting SET ``" <<
2350                queuedItem->getKey() << "'' on vb "
2351                   << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2352                LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2353                redirty();
2354            }
2355        }
2356    }
2357
2358    // This callback is invoked for deletions only.
2359    //
2360    // The boolean indicates whether the underlying storage
2361    // successfully deleted the item.
2362    void callback(int &value) {
2363        // > 1 would be bad.  We were only trying to delete one row.
2364        cb_assert(value < 2);
2365        // -1 means fail
2366        // 1 means we deleted one row
2367        // 0 means we did not delete a row, but did not fail (did not exist)
2368        if (value >= 0) {
2369            // We have succesfully removed an item from the disk, we
2370            // may now remove it from the hash table.
2371            int bucket_num(0);
2372            LockHolder lh = vbucket->ht.getLockedBucket(queuedItem->getKey(),
2373                                                        &bucket_num);
2374            StoredValue *v = store->fetchValidValue(vbucket,
2375                                                    queuedItem->getKey(),
2376                                                    bucket_num, true, false);
2377            if (v && v->isDeleted()) {
2378                bool newCacheItem = v->isNewCacheItem();
2379                bool deleted = vbucket->ht.unlocked_del(queuedItem->getKey(),
2380                                                        bucket_num);
2381                cb_assert(deleted);
2382                if (newCacheItem && value > 0) {
2383                    // Need to decrement the item counter again for an item that
2384                    // exists on DB file, but not in memory (i.e., full eviction),
2385                    // because we created the temp item in memory and incremented
2386                    // the item counter when a deletion is pushed in the queue.
2387                    --vbucket->ht.numTotalItems;
2388                }
2389            }
2390
2391            if (value > 0) {
2392                ++stats->totalPersisted;
2393                ++vbucket->opsDelete;
2394            }
2395
2396            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2397            stats->decrDiskQueueSize(1);
2398        } else {
2399            std::stringstream ss;
2400            ss << "Fatal error in persisting DELETE ``" <<
2401            queuedItem->getKey() << "'' on vb "
2402               << queuedItem->getVBucketId() << "!!! Requeue it...\n";
2403            LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
2404            redirty();
2405        }
2406    }
2407
2408private:
2409
2410    void redirty() {
2411        if (store->vbMap.isBucketDeletion(vbucket->getId())) {
2412            vbucket->doStatsForFlushing(*queuedItem, queuedItem->size());
2413            stats->decrDiskQueueSize(1);
2414            return;
2415        }
2416        ++stats->flushFailed;
2417        store->invokeOnLockedStoredValue(queuedItem->getKey(),
2418                                         queuedItem->getVBucketId(),
2419                                         &StoredValue::reDirty);
2420        vbucket->rejectQueue.push(queuedItem);
2421    }
2422
2423    const queued_item queuedItem;
2424    RCPtr<VBucket> &vbucket;
2425    EventuallyPersistentStore *store;
2426    EPStats *stats;
2427    uint64_t cas;
2428    DISALLOW_COPY_AND_ASSIGN(PersistenceCallback);
2429};
2430
2431void EventuallyPersistentStore::flushOneDeleteAll() {
2432    for (size_t i = 0; i < vbMap.numShards; ++i) {
2433        KVShard* shard = vbMap.shards[i];
2434        LockHolder lh(shard->getWriteLock());
2435        shard->getRWUnderlying()->reset(i);
2436    }
2437
2438    bool inverse = true;
2439    diskFlushAll.compare_exchange_strong(inverse, false);
2440    stats.decrDiskQueueSize(1);
2441}
2442
2443int EventuallyPersistentStore::flushVBucket(uint16_t vbid) {
2444    KVShard *shard = vbMap.getShard(vbid);
2445    if (diskFlushAll) {
2446        if (shard->getId() == EP_PRIMARY_SHARD) {
2447            flushOneDeleteAll();
2448        } else {
2449            // disk flush is pending just return
2450            return 0;
2451        }
2452    }
2453
2454    if (vbMap.isBucketCreation(vbid)) {
2455        return RETRY_FLUSH_VBUCKET;
2456    }
2457
2458    int items_flushed = 0;
2459    bool schedule_vb_snapshot = false;
2460    rel_time_t flush_start = ep_current_time();
2461
2462    LockHolder lh(shard->getWriteLock());
2463    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2464    if (vb) {
2465        KVStatsCallback cb(this);
2466        std::vector<queued_item> items;
2467        KVStore *rwUnderlying = getRWUnderlying(vbid);
2468
2469        while (!vb->rejectQueue.empty()) {
2470            items.push_back(vb->rejectQueue.front());
2471            vb->rejectQueue.pop();
2472        }
2473
2474        vb->getBackfillItems(items);
2475        vb->checkpointManager.getAllItemsForPersistence(items);
2476
2477        if (!items.empty()) {
2478            while (!rwUnderlying->begin()) {
2479                ++stats.beginFailed;
2480                LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
2481                    "Retry in 1 sec ...");
2482                sleep(1);
2483            }
2484            rwUnderlying->optimizeWrites(items);
2485
2486            Item *prev = NULL;
2487            std::list<PersistenceCallback*> pcbs;
2488            std::vector<queued_item>::iterator it = items.begin();
2489            for(; it != items.end(); ++it) {
2490                if ((*it)->getOperation() != queue_op_set &&
2491                    (*it)->getOperation() != queue_op_del) {
2492                    continue;
2493                } else if (!prev || prev->getKey() != (*it)->getKey()) {
2494                    prev = (*it).get();
2495                    ++items_flushed;
2496                    PersistenceCallback *cb = flushOneDelOrSet(*it, vb);
2497                    if (cb) {
2498                        pcbs.push_back(cb);
2499                    }
2500                    ++stats.flusher_todo;
2501                } else {
2502                    stats.decrDiskQueueSize(1);
2503                    vb->doStatsForFlushing(*(*it), (*it)->size());
2504                }
2505            }
2506
2507            BlockTimer timer(&stats.diskCommitHisto, "disk_commit",
2508                             stats.timingLog);
2509            hrtime_t start = gethrtime();
2510
2511            while (!rwUnderlying->commit(&cb)) {
2512                ++stats.commitFailed;
2513                LOG(EXTENSION_LOG_WARNING, "Flusher commit failed!!! Retry in "
2514                    "1 sec...\n");
2515                sleep(1);
2516            }
2517
2518            while (!pcbs.empty()) {
2519                delete pcbs.front();
2520                pcbs.pop_front();
2521            }
2522
2523            ++stats.flusherCommits;
2524            hrtime_t end = gethrtime();
2525            uint64_t commit_time = (end - start) / 1000000;
2526            uint64_t trans_time = (end - flush_start) / 1000000;
2527
2528            lastTransTimePerItem = (items_flushed == 0) ? 0 :
2529                static_cast<double>(trans_time) /
2530                static_cast<double>(items_flushed);
2531            stats.commit_time.store(commit_time);
2532            stats.cumulativeCommitTime.fetch_add(commit_time);
2533            stats.cumulativeFlushTime.fetch_add(ep_current_time()
2534                                                - flush_start);
2535            stats.flusher_todo.store(0);
2536
2537        }
2538
2539        if (vb->rejectQueue.empty()) {
2540            vb->checkpointManager.itemsPersisted();
2541        }
2542
2543        uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
2544        if (vb->rejectQueue.empty()) {
2545            vb->notifyCheckpointPersisted(engine, chkid, false);
2546        }
2547
2548        if (chkid > 0 && chkid != vbMap.getPersistenceCheckpointId(vbid)) {
2549            vbMap.setPersistenceCheckpointId(vbid, chkid);
2550            schedule_vb_snapshot = true;
2551        }
2552
2553        uint64_t seqno = vb->checkpointManager.getPersistenceCursorSeqno();
2554        if (vb->rejectQueue.empty()) {
2555            vb->notifyCheckpointPersisted(engine, seqno, true);
2556            if (seqno > 0 && seqno != vbMap.getPersistenceSeqno(vbid)) {
2557                vbMap.setPersistenceSeqno(vbid, seqno);
2558            }
2559        }
2560    }
2561
2562    if (schedule_vb_snapshot || snapshotVBState) {
2563        scheduleVBSnapshot(Priority::VBucketPersistHighPriority,
2564                           shard->getId());
2565    }
2566
2567    return items_flushed;
2568}
2569
2570PersistenceCallback*
2571EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
2572                                            RCPtr<VBucket> &vb) {
2573
2574    if (!vb) {
2575        stats.decrDiskQueueSize(1);
2576        return NULL;
2577    }
2578
2579    int64_t bySeqno = qi->getBySeqno();
2580    bool deleted = qi->isDeleted();
2581    rel_time_t queued(qi->getQueuedTime());
2582
2583    int dirtyAge = ep_current_time() - queued;
2584    stats.dirtyAgeHisto.add(dirtyAge * 1000000);
2585    stats.dirtyAge.store(dirtyAge);
2586    stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
2587                                         stats.dirtyAgeHighWat.load()));
2588
2589    // Wait until the vbucket database is created by the vbucket state
2590    // snapshot task.
2591    if (vbMap.isBucketCreation(qi->getVBucketId()) ||
2592        vbMap.isBucketDeletion(qi->getVBucketId())) {
2593        vb->rejectQueue.push(qi);
2594        ++vb->opsReject;
2595        return NULL;
2596    }
2597
2598    KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
2599    if (!deleted) {
2600        // TODO: Need to separate disk_insert from disk_update because
2601        // bySeqno doesn't give us that information.
2602        BlockTimer timer(bySeqno == -1 ?
2603                         &stats.diskInsertHisto : &stats.diskUpdateHisto,
2604                         bySeqno == -1 ? "disk_insert" : "disk_update",
2605                         stats.timingLog);
2606        PersistenceCallback *cb =
2607            new PersistenceCallback(qi, vb, this, &stats, qi->getCas());
2608        rwUnderlying->set(*qi, *cb);
2609        return cb;
2610    } else {
2611        BlockTimer timer(&stats.diskDelHisto, "disk_delete",
2612                         stats.timingLog);
2613        PersistenceCallback *cb =
2614            new PersistenceCallback(qi, vb, this, &stats, 0);
2615        rwUnderlying->del(*qi, *cb);
2616        return cb;
2617    }
2618}
2619
2620void EventuallyPersistentStore::queueDirty(RCPtr<VBucket> &vb,
2621                                           StoredValue* v,
2622                                           bool tapBackfill,
2623                                           bool notifyReplicator,
2624                                           bool genBySeqno) {
2625    if (vb) {
2626        queued_item qi(v->toItem(false, vb->getId()));
2627        bool rv = tapBackfill ? vb->queueBackfillItem(qi, genBySeqno) :
2628                                vb->checkpointManager.queueDirty(vb, qi,
2629                                                                 genBySeqno);
2630        v->setBySeqno(qi->getBySeqno());
2631
2632        if (rv) {
2633            KVShard* shard = vbMap.getShard(vb->getId());
2634            shard->getFlusher()->notifyFlushEvent();
2635
2636        }
2637        if (!tapBackfill && notifyReplicator) {
2638            engine.getTapConnMap().notifyVBConnections(vb->getId());
2639            engine.getUprConnMap().notifyVBConnections(vb->getId(),
2640                                                       qi->getBySeqno());
2641        }
2642    }
2643}
2644
2645std::map<uint16_t, vbucket_state> EventuallyPersistentStore::loadVBucketState()
2646{
2647    return getOneROUnderlying()->listPersistedVbuckets();
2648}
2649
2650void EventuallyPersistentStore::warmupCompleted() {
2651    // Run the vbucket state snapshot job once after the warmup
2652    scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
2653
2654    if (engine.getConfiguration().getAlogPath().length() > 0) {
2655        size_t smin = engine.getConfiguration().getAlogSleepTime();
2656        setAccessScannerSleeptime(smin);
2657        Configuration &config = engine.getConfiguration();
2658        config.addValueChangedListener("alog_sleep_time",
2659                                       new EPStoreValueChangeListener(*this));
2660        config.addValueChangedListener("alog_task_time",
2661                                       new EPStoreValueChangeListener(*this));
2662    }
2663
2664    // "0" sleep_time means that the first snapshot task will be executed
2665    // right after warmup. Subsequent snapshot tasks will be scheduled every
2666    // 60 sec by default.
2667    ExecutorPool *iom = ExecutorPool::get();
2668    ExTask task = new StatSnap(&engine, Priority::StatSnapPriority, 0, false);
2669    statsSnapshotTaskId = iom->schedule(task, WRITER_TASK_IDX);
2670}
2671
2672bool EventuallyPersistentStore::maybeEnableTraffic()
2673{
2674    // @todo rename.. skal vaere isTrafficDisabled elns
2675    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
2676    double maxSize = static_cast<double>(stats.getMaxDataSize());
2677
2678    if (memoryUsed  >= stats.mem_low_wat) {
2679        LOG(EXTENSION_LOG_WARNING,
2680            "Total memory use reached to the low water mark, stop warmup");
2681        return true;
2682    } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) {
2683        LOG(EXTENSION_LOG_WARNING,
2684                "Enough MB of data loaded to enable traffic");
2685        return true;
2686    } else if (eviction_policy == VALUE_ONLY &&
2687               stats.warmedUpValues >=
2688                               (stats.warmedUpKeys * stats.warmupNumReadCap)) {
2689        // Let ep-engine think we're done with the warmup phase
2690        // (we should refactor this into "enableTraffic")
2691        LOG(EXTENSION_LOG_WARNING,
2692            "Enough number of items loaded to enable traffic");
2693        return true;
2694    }
2695    return false;
2696}
2697
2698bool EventuallyPersistentStore::isWarmingUp() {
2699    return !warmupTask->isComplete();
2700}
2701
2702void EventuallyPersistentStore::stopWarmup(void)
2703{
2704    // forcefully stop current warmup task
2705    if (isWarmingUp()) {
2706        LOG(EXTENSION_LOG_WARNING, "Stopping warmup while engine is loading "
2707            "data from underlying storage, shutdown = %s\n",
2708            stats.isShutdown ? "yes" : "no");
2709        warmupTask->stop();
2710    }
2711}
2712
2713void EventuallyPersistentStore::setExpiryPagerSleeptime(size_t val) {
2714    LockHolder lh(expiryPager.mutex);
2715
2716    if (expiryPager.sleeptime != 0) {
2717        ExecutorPool::get()->cancel(expiryPager.task);
2718    }
2719
2720    expiryPager.sleeptime = val;
2721    if (val != 0) {
2722        ExTask expTask = new ExpiredItemPager(&engine, stats,
2723                                                expiryPager.sleeptime);
2724        expiryPager.task = ExecutorPool::get()->schedule(expTask,
2725                                                        NONIO_TASK_IDX);
2726    }
2727}
2728
2729void EventuallyPersistentStore::setAccessScannerSleeptime(size_t val) {
2730    LockHolder lh(accessScanner.mutex);
2731
2732    if (accessScanner.sleeptime != 0) {
2733        ExecutorPool::get()->cancel(accessScanner.task);
2734    }
2735
2736    // store sleeptime in seconds
2737    accessScanner.sleeptime = val * 60;
2738    if (accessScanner.sleeptime != 0) {
2739        ExTask task = new AccessScanner(*this, stats,
2740                                        Priority::AccessScannerPriority,
2741                                        accessScanner.sleeptime);
2742        accessScanner.task = ExecutorPool::get()->schedule(task,
2743                                                           AUXIO_TASK_IDX);
2744
2745        struct timeval tv;
2746        gettimeofday(&tv, NULL);
2747        advance_tv(tv, accessScanner.sleeptime);
2748        stats.alogTime.store(tv.tv_sec);
2749    }
2750}
2751
2752void EventuallyPersistentStore::resetAccessScannerStartTime() {
2753    LockHolder lh(accessScanner.mutex);
2754
2755    if (accessScanner.sleeptime != 0) {
2756        ExecutorPool::get()->cancel(accessScanner.task);
2757        // re-schedule task according to the new task start hour
2758        ExTask task = new AccessScanner(*this, stats,
2759                                        Priority::AccessScannerPriority,
2760                                        accessScanner.sleeptime);
2761        accessScanner.task = ExecutorPool::get()->schedule(task,
2762                                                           AUXIO_TASK_IDX);
2763
2764        struct timeval tv;
2765        gettimeofday(&tv, NULL);
2766        advance_tv(tv, accessScanner.sleeptime);
2767        stats.alogTime.store(tv.tv_sec);
2768    }
2769}
2770
2771void EventuallyPersistentStore::visit(VBucketVisitor &visitor)
2772{
2773    size_t maxSize = vbMap.getSize();
2774    cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
2775    for (size_t i = 0; i < maxSize; ++i) {
2776        uint16_t vbid = static_cast<uint16_t>(i);
2777        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2778        if (vb) {
2779            bool wantData = visitor.visitBucket(vb);
2780            // We could've lost this along the way.
2781            if (wantData) {
2782                vb->ht.visit(visitor);
2783            }
2784        }
2785    }
2786    visitor.complete();
2787}
2788
2789VBCBAdaptor::VBCBAdaptor(EventuallyPersistentStore *s,
2790                         shared_ptr<VBucketVisitor> v,
2791                         const char *l, const Priority &p, double sleep) :
2792    GlobalTask(&s->getEPEngine(), p, 0, false), store(s),
2793    visitor(v), label(l), sleepTime(sleep), currentvb(0)
2794{
2795    const VBucketFilter &vbFilter = visitor->getVBucketFilter();
2796    size_t maxSize = store->vbMap.getSize();
2797    cb_assert(maxSize <= std::numeric_limits<uint16_t>::max());
2798    for (size_t i = 0; i < maxSize; ++i) {
2799        uint16_t vbid = static_cast<uint16_t>(i);
2800        RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
2801        if (vb && vbFilter(vbid)) {
2802            vbList.push(vbid);
2803        }
2804    }
2805}
2806
2807bool VBCBAdaptor::run(void) {
2808    if (!vbList.empty()) {
2809        currentvb = vbList.front();
2810        RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
2811        if (vb) {
2812            if (visitor->pauseVisitor()) {
2813                snooze(sleepTime);
2814                return true;
2815            }
2816            if (visitor->visitBucket(vb)) {
2817                vb->ht.visit(*visitor);
2818            }
2819        }
2820        vbList.pop();
2821    }
2822
2823    bool isdone = vbList.empty();
2824    if (isdone) {
2825        visitor->complete();
2826    }
2827    return !isdone;
2828}
2829
2830VBucketVisitorTask::VBucketVisitorTask(EventuallyPersistentStore *s,
2831                                       shared_ptr<VBucketVisitor> v,
2832                                       uint16_t sh, const char *l,
2833                                       double sleep, bool shutdown):
2834    GlobalTask(&(s->getEPEngine()), Priority::AccessScannerPriority,
2835               0, shutdown),
2836    store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0),
2837    shardID(sh)
2838{
2839    const VBucketFilter &vbFilter = visitor->getVBucketFilter();
2840    std::vector<int> vbs = store->vbMap.getShard(shardID)->getVBuckets();
2841    cb_assert(vbs.size() <= std::numeric_limits<uint16_t>::max());
2842    std::vector<int>::iterator it;
2843    for (it = vbs.begin(); it != vbs.end(); ++it) {
2844        uint16_t vbid = static_cast<uint16_t>(*it);
2845        RCPtr<VBucket> vb = store->vbMap.getBucket(vbid);
2846        if (vb && vbFilter(vbid)) {
2847            vbList.push(vbid);
2848        }
2849    }
2850}
2851
2852bool VBucketVisitorTask::run() {
2853    if (!vbList.empty()) {
2854        currentvb = vbList.front();
2855        RCPtr<VBucket> vb = store->vbMap.getBucket(currentvb);
2856        if (vb) {
2857            if (visitor->pauseVisitor()) {
2858                snooze(sleepTime);
2859                return true;
2860            }
2861            if (visitor->visitBucket(vb)) {
2862                vb->ht.visit(*visitor);
2863            }
2864        }
2865        vbList.pop();
2866    }
2867
2868    bool isDone = vbList.empty();
2869    if (isDone) {
2870        visitor->complete();
2871    }
2872    return !isDone;
2873}
2874
2875void EventuallyPersistentStore::resetUnderlyingStats(void)
2876{
2877    for (size_t i = 0; i < vbMap.numShards; i++) {
2878        KVShard *shard = vbMap.shards[i];
2879        shard->getRWUnderlying()->resetStats();
2880        shard->getROUnderlying()->resetStats();
2881    }
2882    auxUnderlying->resetStats();
2883}
2884
2885void EventuallyPersistentStore::addKVStoreStats(ADD_STAT add_stat,
2886                                                const void* cookie) {
2887    for (size_t i = 0; i < vbMap.numShards; i++) {
2888        std::stringstream rwPrefix;
2889        std::stringstream roPrefix;
2890        rwPrefix << "rw_" << i;
2891        roPrefix << "ro_" << i;
2892        vbMap.shards[i]->getRWUnderlying()->addStats(rwPrefix.str(), add_stat,
2893                                                     cookie);
2894        vbMap.shards[i]->getROUnderlying()->addStats(roPrefix.str(), add_stat,
2895                                                     cookie);
2896    }
2897}
2898
2899void EventuallyPersistentStore::addKVStoreTimingStats(ADD_STAT add_stat,
2900                                                      const void* cookie) {
2901    for (size_t i = 0; i < vbMap.numShards; i++) {
2902        std::stringstream rwPrefix;
2903        std::stringstream roPrefix;
2904        rwPrefix << "rw_" << i;
2905        roPrefix << "ro_" << i;
2906        vbMap.shards[i]->getRWUnderlying()->addTimingStats(rwPrefix.str(),
2907                                                           add_stat,
2908                                                           cookie);
2909        vbMap.shards[i]->getROUnderlying()->addTimingStats(roPrefix.str(),
2910                                                           add_stat,
2911                                                           cookie);
2912    }
2913}
2914
2915KVStore *EventuallyPersistentStore::getOneROUnderlying(void) {
2916    return vbMap.getShard(EP_PRIMARY_SHARD)->getROUnderlying();
2917}
2918
2919KVStore *EventuallyPersistentStore::getOneRWUnderlying(void) {
2920    return vbMap.getShard(EP_PRIMARY_SHARD)->getRWUnderlying();
2921}
2922
2923ENGINE_ERROR_CODE
2924EventuallyPersistentStore::rollback(uint16_t vbid,
2925                                    uint64_t rollbackSeqno,
2926                                    shared_ptr<RollbackCB> cb) {
2927    rollback_error_code err;
2928    err = vbMap.shards[vbid]->getROUnderlying()->
2929                              rollback(vbid, rollbackSeqno, cb);
2930
2931    if (err.first != ENGINE_FAILED) {
2932        RCPtr<VBucket> vb = vbMap.getBucket(vbid);
2933        vb->failovers->pruneEntries(err.second);
2934    }
2935    return err.first;
2936}
2937