154e72c45STrond Norbye/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2ba5b814bSMatt Ingenthron/*
366eb94d0SMike Wiederhold *     Copyright 2010 Couchbase, Inc
4ba5b814bSMatt Ingenthron *
5ba5b814bSMatt Ingenthron *   Licensed under the Apache License, Version 2.0 (the "License");
6ba5b814bSMatt Ingenthron *   you may not use this file except in compliance with the License.
7ba5b814bSMatt Ingenthron *   You may obtain a copy of the License at
8ba5b814bSMatt Ingenthron *
9ba5b814bSMatt Ingenthron *       http://www.apache.org/licenses/LICENSE-2.0
10ba5b814bSMatt Ingenthron *
11ba5b814bSMatt Ingenthron *   Unless required by applicable law or agreed to in writing, software
12ba5b814bSMatt Ingenthron *   distributed under the License is distributed on an "AS IS" BASIS,
13d067db62STrond Norbye *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14ba5b814bSMatt Ingenthron *   See the License for the specific language governing permissions and
15ba5b814bSMatt Ingenthron *   limitations under the License.
16ba5b814bSMatt Ingenthron */
1754e72c45STrond Norbye
18d067db62STrond Norbye#include "config.h"
19244c0146SMike Wiederhold
2054e72c45STrond Norbye#include <string.h>
21244c0146SMike Wiederhold#include <time.h>
22244c0146SMike Wiederhold
23b96fece4SDustin Sallings#include <fstream>
2466353d14SDustin Sallings#include <functional>
25244c0146SMike Wiederhold#include <iostream>
26244c0146SMike Wiederhold#include <map>
27244c0146SMike Wiederhold#include <sstream>
28244c0146SMike Wiederhold#include <string>
29244c0146SMike Wiederhold#include <utility>
30244c0146SMike Wiederhold#include <vector>
3154e72c45STrond Norbye
32244c0146SMike Wiederhold#include "access_scanner.h"
33244c0146SMike Wiederhold#include "checkpoint_remover.h"
3415aed944SMike Wiederhold#include "conflict_resolution.h"
35244c0146SMike Wiederhold#include "ep.h"
36697f518eSTrond Norbye#include "ep_engine.h"
3715aed944SMike Wiederhold#include "failover-table.h"
38244c0146SMike Wiederhold#include "flusher.h"
39fc9615cdSMike Wiederhold#include "htresizer.h"
40b926395aSMike Wiederhold#include "kvshard.h"
41244c0146SMike Wiederhold#include "kvstore.h"
42244c0146SMike Wiederhold#include "locks.h"
4315aed944SMike Wiederhold#include "mutation_log.h"
44244c0146SMike Wiederhold#include "warmup.h"
453e9ca12cSMike Wiederhold#include "connmap.h"
4615aed944SMike Wiederhold#include "tapthrottle.h"
4754e72c45STrond Norbye
486dca6401STrond Norbyeclass StatsValueChangeListener : public ValueChangedListener {
496dca6401STrond Norbyepublic:
506dca6401STrond Norbye    StatsValueChangeListener(EPStats &st) : stats(st) {
516dca6401STrond Norbye        // EMPTY
526dca6401STrond Norbye    }
536dca6401STrond Norbye
54d182fc72STrond Norbye    virtual void sizeValueChanged(const std::string &key, size_t value) {
55f65ef05bSMike Wiederhold        if (key.compare("max_size") == 0) {
56874b111bSMike Wiederhold            stats.setMaxDataSize(value);
5718770839SSundar Sridharan            size_t low_wat = static_cast<size_t>
5818770839SSundar Sridharan                             (static_cast<double>(value) * 0.6);
5918770839SSundar Sridharan            size_t high_wat = static_cast<size_t>(
6018770839SSundar Sridharan                              static_cast<double>(value) * 0.75);
617c6809d2STrond Norbye            stats.mem_low_wat.store(low_wat);
627c6809d2STrond Norbye            stats.mem_high_wat.store(high_wat);
63874b111bSMike Wiederhold        } else if (key.compare("mem_low_wat") == 0) {
647c6809d2STrond Norbye            stats.mem_low_wat.store(value);
65874b111bSMike Wiederhold        } else if (key.compare("mem_high_wat") == 0) {
667c6809d2STrond Norbye            stats.mem_high_wat.store(value);
67670a9f31SDustin Sallings        } else if (key.compare("tap_throttle_threshold") == 0) {
6818770839SSundar Sridharan            stats.tapThrottleThreshold.store(
6918770839SSundar Sridharan                                          static_cast<double>(value) / 100.0);
7042ff2869STrond Norbye        } else if (key.compare("warmup_min_memory_threshold") == 0) {
717c6809d2STrond Norbye            stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
7242ff2869STrond Norbye        } else if (key.compare("warmup_min_items_threshold") == 0) {
737c6809d2STrond Norbye            stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0);
74b4ee1f57SJin Lim        } else {
75a1213f50SMike Wiederhold            LOG(EXTENSION_LOG_WARNING,
76a1213f50SMike Wiederhold                "Failed to change value for unknown variable, %s\n",
77a1213f50SMike Wiederhold                key.c_str());
78e7dc7938STrond Norbye        }
796dca6401STrond Norbye    }
806dca6401STrond Norbye
816dca6401STrond Norbyeprivate:
826dca6401STrond Norbye    EPStats &stats;
836dca6401STrond Norbye};
846dca6401STrond Norbye
854d05a0cbSDustin Sallings/**
864d05a0cbSDustin Sallings * A configuration value changed listener that responds to ep-engine
874d05a0cbSDustin Sallings * parameter changes by invoking engine-specific methods on
884d05a0cbSDustin Sallings * configuration change events.
894d05a0cbSDustin Sallings */
9044499e88STrond Norbyeclass EPStoreValueChangeListener : public ValueChangedListener {
91c3bfe571STrond Norbyepublic:
9244499e88STrond Norbye    EPStoreValueChangeListener(EventuallyPersistentStore &st) : store(st) {
93c3bfe571STrond Norbye    }
94c3bfe571STrond Norbye
95d182fc72STrond Norbye    virtual void sizeValueChanged(const std::string &key, size_t value) {
9644499e88STrond Norbye        if (key.compare("bg_fetch_delay") == 0) {
9744499e88STrond Norbye            store.setBGFetchDelay(static_cast<uint32_t>(value));
98e59abd35Sabhinavdangeti        } else if (key.compare("compaction_write_queue_cap") == 0) {
99e59abd35Sabhinavdangeti            store.setCompactionWriteQueueCap(value);
10040e59276SChiyoung Seo        } else if (key.compare("exp_pager_stime") == 0) {
10140e59276SChiyoung Seo            store.setExpiryPagerSleeptime(value);
10261b92cd1SLiang Guo        } else if (key.compare("alog_sleep_time") == 0) {
10361b92cd1SLiang Guo            store.setAccessScannerSleeptime(value);
104ccaa609dSLiang Guo        } else if (key.compare("alog_task_time") == 0) {
105ccaa609dSLiang Guo            store.resetAccessScannerStartTime();
106c923a472SChiyoung Seo        } else if (key.compare("mutation_mem_threshold") == 0) {
107c923a472SChiyoung Seo            double mem_threshold = static_cast<double>(value) / 100;
108c923a472SChiyoung Seo            StoredValue::setMutationMemoryThreshold(mem_threshold);
1091964c235Sabhinavdangeti        } else if (key.compare("backfill_mem_threshold") == 0) {
1101964c235Sabhinavdangeti            double backfill_threshold = static_cast<double>(value) / 100;
1111964c235Sabhinavdangeti            store.setBackfillMemoryThreshold(backfill_threshold);
112c1015f71Sabhinavdangeti        } else if (key.compare("compaction_exp_mem_threshold") == 0) {
113c1015f71Sabhinavdangeti            store.setCompactionExpMemThreshold(value);
1149a6397a9SLiang Guo        } else if (key.compare("tap_throttle_queue_cap") == 0) {
1159a6397a9SLiang Guo            store.getEPEngine().getTapThrottle().setQueueCap(value);
1169a6397a9SLiang Guo        } else if (key.compare("tap_throttle_cap_pcnt") == 0) {
1179a6397a9SLiang Guo            store.getEPEngine().getTapThrottle().setCapPercent(value);
118b4ee1f57SJin Lim        } else {
119a1213f50SMike Wiederhold            LOG(EXTENSION_LOG_WARNING,
120a1213f50SMike Wiederhold                "Failed to change value for unknown variable, %s\n",
121a1213f50SMike Wiederhold                key.c_str());
122c3bfe571STrond Norbye        }
123c3bfe571STrond Norbye    }
124c3bfe571STrond Norbye
125118e7bedSabhinavdangeti    virtual void booleanValueChanged(const std::string &key, bool value) {
126118e7bedSabhinavdangeti        if (key.compare("access_scanner_enabled") == 0) {
127118e7bedSabhinavdangeti            if (value) {
128118e7bedSabhinavdangeti                store.enableAccessScannerTask();
129118e7bedSabhinavdangeti            } else {
130118e7bedSabhinavdangeti                store.disableAccessScannerTask();
131118e7bedSabhinavdangeti            }
132118e7bedSabhinavdangeti        }
133118e7bedSabhinavdangeti    }
134118e7bedSabhinavdangeti
135c3bfe571STrond Norbyeprivate:
136c3bfe571STrond Norbye    EventuallyPersistentStore &store;
137c3bfe571STrond Norbye};
138e5fe7d0dSTrond Norbye
139ac3ea552SSundar Sridharanclass VBucketMemoryDeletionTask : public GlobalTask {
1405bd10ddfSMike Wiederholdpublic:
141ac3ea552SSundar Sridharan    VBucketMemoryDeletionTask(EventuallyPersistentEngine &eng,
142ac3ea552SSundar Sridharan                              RCPtr<VBucket> &vb, double delay) :
143ac3ea552SSundar Sridharan                              GlobalTask(&eng,
144ac3ea552SSundar Sridharan                              Priority::VBMemoryDeletionPriority, delay,false),
145ac3ea552SSundar Sridharan                              e(eng), vbucket(vb), vbid(vb->getId()) { }
1465bd10ddfSMike Wiederhold
147ac3ea552SSundar Sridharan    std::string getDescription() {
148ac3ea552SSundar Sridharan        std::stringstream ss;
149ac3ea552SSundar Sridharan        ss << "Removing (dead) vbucket " << vbid << " from memory";
150ac3ea552SSundar Sridharan        return ss.str();
151ac3ea552SSundar Sridharan    }
152ac3ea552SSundar Sridharan
153ac3ea552SSundar Sridharan    bool run(void) {
154ac3ea552SSundar Sridharan        vbucket->notifyAllPendingConnsFailed(e);
1555bd10ddfSMike Wiederhold        vbucket->ht.clear();
1565bd10ddfSMike Wiederhold        vbucket.reset();
1575bd10ddfSMike Wiederhold        return false;
1585bd10ddfSMike Wiederhold    }
1595bd10ddfSMike Wiederhold
1605bd10ddfSMike Wiederholdprivate:
161ac3ea552SSundar Sridharan    EventuallyPersistentEngine &e;
1625bd10ddfSMike Wiederhold    RCPtr<VBucket> vbucket;
163ac3ea552SSundar Sridharan    uint16_t vbid;
1645bd10ddfSMike Wiederhold};
1655bd10ddfSMike Wiederhold
16696f86bb5SChiyoung Seoclass PendingOpsNotification : public GlobalTask {
16796f86bb5SChiyoung Seopublic:
16896f86bb5SChiyoung Seo    PendingOpsNotification(EventuallyPersistentEngine &e, RCPtr<VBucket> &vb) :
16996f86bb5SChiyoung Seo        GlobalTask(&e, Priority::VBMemoryDeletionPriority, 0, false),
17096f86bb5SChiyoung Seo        engine(e), vbucket(vb) { }
17196f86bb5SChiyoung Seo
17296f86bb5SChiyoung Seo    std::string getDescription() {
17396f86bb5SChiyoung Seo        std::stringstream ss;
17496f86bb5SChiyoung Seo        ss << "Notify pending operations for vbucket " << vbucket->getId();
17596f86bb5SChiyoung Seo        return ss.str();
17696f86bb5SChiyoung Seo    }
17796f86bb5SChiyoung Seo
17896f86bb5SChiyoung Seo    bool run(void) {
17996f86bb5SChiyoung Seo        vbucket->fireAllOps(engine);
18096f86bb5SChiyoung Seo        return false;
18196f86bb5SChiyoung Seo    }
18296f86bb5SChiyoung Seo
18396f86bb5SChiyoung Seoprivate:
18496f86bb5SChiyoung Seo    EventuallyPersistentEngine &engine;
18596f86bb5SChiyoung Seo    RCPtr<VBucket> vbucket;
18696f86bb5SChiyoung Seo};
18796f86bb5SChiyoung Seo
18818770839SSundar SridharanEventuallyPersistentStore::EventuallyPersistentStore(
18918770839SSundar Sridharan    EventuallyPersistentEngine &theEngine) :
190f20f8837SJin Lim    engine(theEngine), stats(engine.getEpStats()),
191d41195ffSMike Wiederhold    vbMap(theEngine.getConfiguration(), *this),
1929d8a9725STrond Norbye    bgFetchQueue(0),
1931964c235Sabhinavdangeti    diskFlushAll(false), bgFetchDelay(0), backfillMemoryThreshold(0.95),
1941964c235Sabhinavdangeti    statsSnapshotTaskId(0), lastTransTimePerItem(0)
195811f7829STrond Norbye{
1969d8a9725STrond Norbye    cachedResidentRatio.activeRatio.store(0);
1979d8a9725STrond Norbye    cachedResidentRatio.replicaRatio.store(0);
1989d8a9725STrond Norbye
199f20f8837SJin Lim    Configuration &config = engine.getConfiguration();
200410eeed1Sabhinavdangeti    MutationLog *shardlog;
201f62649c7Sabhinav dangeti    for (uint16_t i = 0; i < config.getMaxNumShards(); i++) {
202410eeed1Sabhinavdangeti        std::stringstream s;
203410eeed1Sabhinavdangeti        s << i;
20418770839SSundar Sridharan        shardlog = new MutationLog(engine.getConfiguration().getAlogPath() +
20518770839SSundar Sridharan                                 "." + s.str(),
20618770839SSundar Sridharan                                 engine.getConfiguration().getAlogBlockSize());
207410eeed1Sabhinavdangeti        accessLog.push_back(shardlog);
208410eeed1Sabhinavdangeti    }
209410eeed1Sabhinavdangeti
210f20f8837SJin Lim    storageProperties = new StorageProperties(true, true, true, true);
21196c8a03fSMike Wiederhold
212fc21d573SSundar Sridharan    stats.schedulingHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
213fc21d573SSundar Sridharan    stats.taskRuntimeHisto = new Histogram<hrtime_t>[MAX_TYPE_ID];
214fc21d573SSundar Sridharan
215fc21d573SSundar Sridharan    for (size_t i = 0; i < MAX_TYPE_ID; i++) {
216fc21d573SSundar Sridharan        stats.schedulingHisto[i].reset();
217fc21d573SSundar Sridharan        stats.taskRuntimeHisto[i].reset();
218fc21d573SSundar Sridharan    }
219fc21d573SSundar Sridharan
220a5db9f75SSundar Sridharan    ExecutorPool::get()->registerBucket(ObjectRegistry::getCurrentEngine());
22196c8a03fSMike Wiederhold
222e9459939SChiyoung Seo    size_t num_vbs = config.getMaxVbuckets();
223e9459939SChiyoung Seo    vb_mutexes = new Mutex[num_vbs];
224e9459939SChiyoung Seo    schedule_vbstate_persist = new AtomicValue<bool>[num_vbs];
225e9459939SChiyoung Seo    for (size_t i = 0; i < num_vbs; ++i) {
226e9459939SChiyoung Seo        schedule_vbstate_persist[i] = false;
227e9459939SChiyoung Seo    }
228dca5ef32Sabhinavdangeti
229c5261daaSDustin Sallings    stats.memOverhead = sizeof(EventuallyPersistentStore);
230c5261daaSDustin Sallings
231d43a1deeSMike Wiederhold    if (config.getConflictResolutionType().compare("seqno") == 0) {
232d43a1deeSMike Wiederhold        conflictResolver = new SeqBasedResolution();
233d43a1deeSMike Wiederhold    }
23444499e88STrond Norbye
235874b111bSMike Wiederhold    stats.setMaxDataSize(config.getMaxSize());
236874b111bSMike Wiederhold    config.addValueChangedListener("max_size",
237874b111bSMike Wiederhold                                   new StatsValueChangeListener(stats));
238874b111bSMike Wiederhold
2397c6809d2STrond Norbye    stats.mem_low_wat.store(config.getMemLowWat());
240874b111bSMike Wiederhold    config.addValueChangedListener("mem_low_wat",
241874b111bSMike Wiederhold                                   new StatsValueChangeListener(stats));
242874b111bSMike Wiederhold
2437c6809d2STrond Norbye    stats.mem_high_wat.store(config.getMemHighWat());
244874b111bSMike Wiederhold    config.addValueChangedListener("mem_high_wat",
245874b111bSMike Wiederhold                                   new StatsValueChangeListener(stats));
2466dca6401STrond Norbye
24718770839SSundar Sridharan    stats.tapThrottleThreshold.store(static_cast<double>
24818770839SSundar Sridharan                                    (config.getTapThrottleThreshold())
2497c6809d2STrond Norbye                                     / 100.0);
250670a9f31SDustin Sallings    config.addValueChangedListener("tap_throttle_threshold",
251670a9f31SDustin Sallings                                   new StatsValueChangeListener(stats));
252670a9f31SDustin Sallings
2537c6809d2STrond Norbye    stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
25446d10cc0SChiyoung Seo    config.addValueChangedListener("tap_throttle_queue_cap",
2559a6397a9SLiang Guo                                   new EPStoreValueChangeListener(*this));
2569a6397a9SLiang Guo    config.addValueChangedListener("tap_throttle_cap_pcnt",
2579a6397a9SLiang Guo                                   new EPStoreValueChangeListener(*this));
25846d10cc0SChiyoung Seo
25944499e88STrond Norbye    setBGFetchDelay(config.getBgFetchDelay());
26044499e88STrond Norbye    config.addValueChangedListener("bg_fetch_delay",
26144499e88STrond Norbye                                   new EPStoreValueChangeListener(*this));
262c3bfe571STrond Norbye
26318770839SSundar Sridharan    stats.warmupMemUsedCap.store(static_cast<double>
26418770839SSundar Sridharan                               (config.getWarmupMinMemoryThreshold()) / 100.0);
26542ff2869STrond Norbye    config.addValueChangedListener("warmup_min_memory_threshold",
266b4ee1f57SJin Lim                                   new StatsValueChangeListener(stats));
26718770839SSundar Sridharan    stats.warmupNumReadCap.store(static_cast<double>
26818770839SSundar Sridharan                                (config.getWarmupMinItemsThreshold()) / 100.0);
26942ff2869STrond Norbye    config.addValueChangedListener("warmup_min_items_threshold",
270b4ee1f57SJin Lim                                   new StatsValueChangeListener(stats));
27142ff2869STrond Norbye
27218770839SSundar Sridharan    double mem_threshold = static_cast<double>
27318770839SSundar Sridharan                                      (config.getMutationMemThreshold()) / 100;
274c923a472SChiyoung Seo    StoredValue::setMutationMemoryThreshold(mem_threshold);
275c923a472SChiyoung Seo    config.addValueChangedListener("mutation_mem_threshold",
276c923a472SChiyoung Seo                                   new EPStoreValueChangeListener(*this));
277c923a472SChiyoung Seo
2781964c235Sabhinavdangeti    double backfill_threshold = static_cast<double>
2791964c235Sabhinavdangeti                                      (config.getBackfillMemThreshold()) / 100;
2801964c235Sabhinavdangeti    setBackfillMemoryThreshold(backfill_threshold);
2811964c235Sabhinavdangeti    config.addValueChangedListener("backfill_mem_threshold",
2821964c235Sabhinavdangeti                                   new EPStoreValueChangeListener(*this));
2831964c235Sabhinavdangeti
284c1015f71Sabhinavdangeti    compactionExpMemThreshold = config.getCompactionExpMemThreshold();
285c1015f71Sabhinavdangeti    config.addValueChangedListener("compaction_exp_mem_threshold",
286c1015f71Sabhinavdangeti                                   new EPStoreValueChangeListener(*this));
287c1015f71Sabhinavdangeti
288e59abd35Sabhinavdangeti    compactionWriteQueueCap = config.getCompactionWriteQueueCap();
289e59abd35Sabhinavdangeti    config.addValueChangedListener("compaction_write_queue_cap",
290e59abd35Sabhinavdangeti                                   new EPStoreValueChangeListener(*this));
291e59abd35Sabhinavdangeti
2929a8679e2SChiyoung Seo    const std::string &policy = config.getItemEvictionPolicy();
2939a8679e2SChiyoung Seo    if (policy.compare("value_only") == 0) {
2949a8679e2SChiyoung Seo        eviction_policy = VALUE_ONLY;
2959a8679e2SChiyoung Seo    } else {
2969a8679e2SChiyoung Seo        eviction_policy = FULL_EVICTION;
2979a8679e2SChiyoung Seo    }
2989a8679e2SChiyoung Seo
29924de0038Sabhinavdangeti    warmupTask = new Warmup(this);
30038b0b7c5STrond Norbye}
30154e72c45STrond Norbye
302f20f8837SJin Limbool EventuallyPersistentStore::initialize() {
30388e45e28STrond Norbye    // We should nuke everything unless we want warmup
30488e45e28STrond Norbye    Configuration &config = engine.getConfiguration();
30588e45e28STrond Norbye    if (!config.isWarmup()) {
30688e45e28STrond Norbye        reset();
30788e45e28STrond Norbye    }
30888e45e28STrond Norbye
309f20f8837SJin Lim    if (!startFlusher()) {
310f20f8837SJin Lim        LOG(EXTENSION_LOG_WARNING,
311f20f8837SJin Lim            "FATAL: Failed to create and start flushers");
312f20f8837SJin Lim        return false;
313f20f8837SJin Lim    }
314f20f8837SJin Lim    if (!startBgFetcher()) {
315f20f8837SJin Lim        LOG(EXTENSION_LOG_WARNING,
316f20f8837SJin Lim           "FATAL: Failed to create and start bgfetchers");
317f20f8837SJin Lim        return false;
318f20f8837SJin Lim    }
319f20f8837SJin Lim
320ec41875bSTrond Norbye    warmupTask->start();
321b89779ecSMike Wiederhold
3225634dcd2STrond Norbye    if (config.isFailpartialwarmup() && stats.warmOOM > 0) {
323a1213f50SMike Wiederhold        LOG(EXTENSION_LOG_WARNING,
324a1213f50SMike Wiederhold            "Warmup failed to load %d records due to OOM, exiting.\n",
325a1213f50SMike Wiederhold            static_cast<unsigned int>(stats.warmOOM));
326f20f8837SJin Lim        return false;
32788e45e28STrond Norbye    }
32888e45e28STrond Norbye
329671a7b5aSabhinavdangeti    itmpTask = new ItemPager(&engine, stats);
330ac3ea552SSundar Sridharan    ExecutorPool::get()->schedule(itmpTask, NONIO_TASK_IDX);
33140de2912SMike Wiederhold
332ac3ea552SSundar Sridharan    size_t expiryPagerSleeptime = config.getExpPagerStime();
33340de2912SMike Wiederhold    setExpiryPagerSleeptime(expiryPagerSleeptime);
33440de2912SMike Wiederhold    config.addValueChangedListener("exp_pager_stime",
335ac3ea552SSundar Sridharan                                   new EPStoreValueChangeListener(*this));
33688e45e28STrond Norbye
337ac3ea552SSundar Sridharan    ExTask htrTask = new HashtableResizerTask(this, 10);
338ac3ea552SSundar Sridharan    ExecutorPool::get()->schedule(htrTask, NONIO_TASK_IDX);
33988e45e28STrond Norbye
34088e45e28STrond Norbye    size_t checkpointRemoverInterval = config.getChkRemoverStime();
3417fb81337Sabhinavdangeti    chkTask = new ClosedUnrefCheckpointRemoverTask(&engine, stats,
3427fb81337Sabhinavdangeti                                                   checkpointRemoverInterval);
343ac3ea552SSundar Sridharan    ExecutorPool::get()->schedule(chkTask, NONIO_TASK_IDX);
34414ca80fbSChiyoung Seo
34514ca80fbSChiyoung Seo    ExTask vbSnapshotTask = new DaemonVBSnapshotTask(&engine);
34614ca80fbSChiyoung Seo    ExecutorPool::get()->schedule(vbSnapshotTask, WRITER_TASK_IDX);
34714ca80fbSChiyoung Seo
3486dbd231fSChiyoung Seo    ExTask workloadMonitorTask = new WorkLoadMonitor(&engine, false);
3496dbd231fSChiyoung Seo    ExecutorPool::get()->schedule(workloadMonitorTask, NONIO_TASK_IDX);
3506dbd231fSChiyoung Seo
351f20f8837SJin Lim    return true;
35288e45e28STrond Norbye}
35388e45e28STrond Norbye
35438b0b7c5STrond NorbyeEventuallyPersistentStore::~EventuallyPersistentStore() {
3556b89027bSJin Lim    stopWarmup();
356ece63f5cSJin Lim    stopBgFetcher();
357ac3ea552SSundar Sridharan    ExecutorPool::get()->stopTaskGroup(&engine, NONIO_TASK_IDX);
35896c8a03fSMike Wiederhold
359a5db9f75SSundar Sridharan    ExecutorPool::get()->cancel(statsSnapshotTaskId);
36046c0c7f6SSundar Sridharan    LockHolder lh(accessScanner.mutex);
361a5db9f75SSundar Sridharan    ExecutorPool::get()->cancel(accessScanner.task);
36246c0c7f6SSundar Sridharan    lh.unlock();
36373dac325Sabhinavdangeti
364ac3ea552SSundar Sridharan    stopFlusher();
365ac3ea552SSundar Sridharan    ExecutorPool::get()->unregisterBucket(ObjectRegistry::getCurrentEngine());
3665a149168STrond Norbye
367d1d9b9eeSabhinavdangeti    delete [] vb_mutexes;
368e9459939SChiyoung Seo    delete [] schedule_vbstate_persist;
369fc21d573SSundar Sridharan    delete [] stats.schedulingHisto;
370fc21d573SSundar Sridharan    delete [] stats.taskRuntimeHisto;
371d43a1deeSMike Wiederhold    delete conflictResolver;
37296c8a03fSMike Wiederhold    delete warmupTask;
373f20f8837SJin Lim    delete storageProperties;
374410eeed1Sabhinavdangeti
375410eeed1Sabhinavdangeti    std::vector<MutationLog*>::iterator it;
376410eeed1Sabhinavdangeti    for (it = accessLog.begin(); it != accessLog.end(); it++) {
377410eeed1Sabhinavdangeti        delete *it;
378410eeed1Sabhinavdangeti    }
37938b0b7c5STrond Norbye}
38054e72c45STrond Norbye
381f20f8837SJin Limconst Flusher* EventuallyPersistentStore::getFlusher(uint16_t shardId) {
382f20f8837SJin Lim    return vbMap.getShard(shardId)->getFlusher();
3832a1c9f88SDustin Sallings}
3842a1c9f88SDustin Sallings
38510faf752STrond NorbyeWarmup* EventuallyPersistentStore::getWarmup(void) const {
386ec41875bSTrond Norbye    return warmupTask;
387ec41875bSTrond Norbye}
388ec41875bSTrond Norbye
389f20f8837SJin Limbool EventuallyPersistentStore::startFlusher() {
390f20f8837SJin Lim    for (uint16_t i = 0; i < vbMap.numShards; ++i) {
391f20f8837SJin Lim        Flusher *flusher = vbMap.shards[i]->getFlusher();
39273dac325Sabhinavdangeti        flusher->start();
393f20f8837SJin Lim    }
394f20f8837SJin Lim    return true;
3951c2c6867SDustin Sallings}
3961c2c6867SDustin Sallings
3971c2c6867SDustin Sallingsvoid EventuallyPersistentStore::stopFlusher() {
398f20f8837SJin Lim    for (uint16_t i = 0; i < vbMap.numShards; i++) {
399f20f8837SJin Lim        Flusher *flusher = vbMap.shards[i]->getFlusher();
400f20f8837SJin Lim        bool rv = flusher->stop(stats.forceShutdown);
401f20f8837SJin Lim        if (rv && !stats.forceShutdown) {
402f20f8837SJin Lim            flusher->wait();
403f20f8837SJin Lim        }
4041ea523bbSDustin Sallings    }
4051c2c6867SDustin Sallings}
4061c2c6867SDustin Sallings
4072a1c9f88SDustin Sallingsbool EventuallyPersistentStore::pauseFlusher() {
408f20f8837SJin Lim    bool rv = true;
409f20f8837SJin Lim    for (uint16_t i = 0; i < vbMap.numShards; i++) {
410f20f8837SJin Lim        Flusher *flusher = vbMap.shards[i]->getFlusher();
411f20f8837SJin Lim        if (!flusher->pause()) {
412f20f8837SJin Lim            LOG(EXTENSION_LOG_WARNING, "Attempted to pause flusher in state "
413f20f8837SJin Lim                "[%s], shard = %d", flusher->stateName(), i);
414f20f8837SJin Lim            rv = false;
415f20f8837SJin Lim        }
416f20f8837SJin Lim    }
417f20f8837SJin Lim    return rv;
4182a1c9f88SDustin Sallings}
4192a1c9f88SDustin Sallings
4202a1c9f88SDustin Sallingsbool EventuallyPersistentStore::resumeFlusher() {
421f20f8837SJin Lim    bool rv = true;
422f20f8837SJin Lim    for (uint16_t i = 0; i < vbMap.numShards; i++) {
423f20f8837SJin Lim        Flusher *flusher = vbMap.shards[i]->getFlusher();
424f20f8837SJin Lim        if (!flusher->resume()) {
425f20f8837SJin Lim            LOG(EXTENSION_LOG_WARNING,
42618770839SSundar Sridharan                    "Warning: attempted to resume flusher in state [%s], "
42718770839SSundar Sridharan                    "shard = %d", flusher->stateName(), i);
428f20f8837SJin Lim            rv = false;
429f20f8837SJin Lim        }
430f20f8837SJin Lim    }
431f20f8837SJin Lim    return rv;
4321c2c6867SDustin Sallings}
4331c2c6867SDustin Sallings
4341cccdfe4SChiyoung Seovoid EventuallyPersistentStore::wakeUpFlusher() {
4357c6809d2STrond Norbye    if (stats.diskQueueSize.load() == 0) {
436f20f8837SJin Lim        for (uint16_t i = 0; i < vbMap.numShards; i++) {
437f20f8837SJin Lim            Flusher *flusher = vbMap.shards[i]->getFlusher();
438f20f8837SJin Lim            flusher->wake();
439f20f8837SJin Lim        }
4401cccdfe4SChiyoung Seo    }
4411cccdfe4SChiyoung Seo}
4421cccdfe4SChiyoung Seo
443f20f8837SJin Limbool EventuallyPersistentStore::startBgFetcher() {
444f20f8837SJin Lim    for (uint16_t i = 0; i < vbMap.numShards; i++) {
445f20f8837SJin Lim        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
446f20f8837SJin Lim        if (bgfetcher == NULL) {
447f20f8837SJin Lim            LOG(EXTENSION_LOG_WARNING,
448f20f8837SJin Lim                "Falied to start bg fetcher for shard %d", i);
449f20f8837SJin Lim            return false;
450f20f8837SJin Lim        }
45173dac325Sabhinavdangeti        bgfetcher->start();
452ece63f5cSJin Lim    }
453f20f8837SJin Lim    return true;
454ece63f5cSJin Lim}
455ece63f5cSJin Lim
456ece63f5cSJin Limvoid EventuallyPersistentStore::stopBgFetcher() {
457f20f8837SJin Lim    for (uint16_t i = 0; i < vbMap.numShards; i++) {
458f20f8837SJin Lim        BgFetcher *bgfetcher = vbMap.shards[i]->getBgFetcher();
459f20f8837SJin Lim        if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
460f20f8837SJin Lim            LOG(EXTENSION_LOG_WARNING,
461f20f8837SJin Lim                "Shutting down engine while there are still pending data "
462f20f8837SJin Lim                "read for shard %d from database storage", i);
463ece63f5cSJin Lim        }
464a1213f50SMike Wiederhold        LOG(EXTENSION_LOG_INFO, "Stopping bg fetcher for underlying storage");
465f20f8837SJin Lim        bgfetcher->stop();
466ece63f5cSJin Lim    }
467ece63f5cSJin Lim}
468ece63f5cSJin Lim
469bc17f0bdSDustin SallingsRCPtr<VBucket> EventuallyPersistentStore::getVBucket(uint16_t vbid,
47018770839SSundar Sridharan                                                vbucket_state_t wanted_state) {
4717643451eSMike Wiederhold    RCPtr<VBucket> vb = vbMap.getBucket(vbid);
4727a27df1eSTrond Norbye    vbucket_state_t found_state(vb ? vb->getState() : vbucket_state_dead);
473bc17f0bdSDustin Sallings    if (found_state == wanted_state) {
474bc17f0bdSDustin Sallings        return vb;
475bc17f0bdSDustin Sallings    } else {
476bc17f0bdSDustin Sallings        RCPtr<VBucket> rv;
477bc17f0bdSDustin Sallings        return rv;
478bc17f0bdSDustin Sallings    }
479bc17f0bdSDustin Sallings}
480bc17f0bdSDustin Sallings
481b50f3546SSundar Sridharanvoid
482b50f3546SSundar SridharanEventuallyPersistentStore::deleteExpiredItem(uint16_t vbid, std::string &key,
483b50f3546SSundar Sridharan                                             time_t startTime,
484b50f3546SSundar Sridharan                                             uint64_t revSeqno) {
485b50f3546SSundar Sridharan    RCPtr<VBucket> vb = getVBucket(vbid);
486b50f3546SSundar Sridharan    if (vb) {
487b50f3546SSundar Sridharan        int bucket_num(0);
488b50f3546SSundar Sridharan        incExpirationStat(vb);
489b50f3546SSundar Sridharan        LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
490b50f3546SSundar Sridharan        StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true, false);
491b50f3546SSundar Sridharan        if (v) {
49200dea4cfSChiyoung Seo            if (v->isTempNonExistentItem() || v->isTempDeletedItem()) {
493191776a2SSrinivas Vadlamani                // This is a temporary item whose background fetch for metadata
494191776a2SSrinivas Vadlamani                // has completed.
495b50f3546SSundar Sridharan                bool deleted = vb->ht.unlocked_del(key, bucket_num);
496c649b2d9STrond Norbye                cb_assert(deleted);
497b50f3546SSundar Sridharan            } else if (v->isExpired(startTime) && !v->isDeleted()) {
4989a8679e2SChiyoung Seo                vb->ht.unlocked_softDelete(v, 0, getItemEvictionPolicy());
49944856d60SChiyoung Seo                queueDirty(vb, v, &lh, false);
50093e74a1eSChiyoung Seo            }
50193e74a1eSChiyoung Seo        } else {
50293e74a1eSChiyoung Seo            if (eviction_policy == FULL_EVICTION) {
50318770839SSundar Sridharan                // Create a temp item and delete and push it
50418770839SSundar Sridharan                // into the checkpoint queue.
50593e74a1eSChiyoung Seo                add_type_t rv = vb->ht.unlocked_addTempItem(bucket_num, key,
50693e74a1eSChiyoung Seo                                                            eviction_policy);
50793e74a1eSChiyoung Seo                if (rv == ADD_NOMEM) {
50893e74a1eSChiyoung Seo                    return;
50993e74a1eSChiyoung Seo                }
51093e74a1eSChiyoung Seo                v = vb->ht.unlocked_find(key, bucket_num, true, false);
51193e74a1eSChiyoung Seo                v->setStoredValueState(StoredValue::state_deleted_key);
51293e74a1eSChiyoung Seo                v->setRevSeqno(revSeqno);
51393e74a1eSChiyoung Seo                vb->ht.unlocked_softDelete(v, 0, eviction_policy);
51444856d60SChiyoung Seo                queueDirty(vb, v, &lh, false);
51553b0aa40SDustin Sallings            }
51653b0aa40SDustin Sallings        }
51753b0aa40SDustin Sallings    }
518b50f3546SSundar Sridharan}
51953b0aa40SDustin Sallings
520395a83efSChiyoung Seovoid
521b50f3546SSundar SridharanEventuallyPersistentStore::deleteExpiredItems(std::list<std::pair<uint16_t,
522b50f3546SSundar Sridharan                                                        std::string> > &keys) {
523b50f3546SSundar Sridharan    std::list<std::pair<uint16_t, std::string> >::iterator it;
524b50f3546SSundar Sridharan    time_t startTime = ep_real_time();
525b50f3546SSundar Sridharan    for (it = keys.begin(); it != keys.end(); it++) {
526b50f3546SSundar Sridharan        deleteExpiredItem(it->first, it->second, startTime, 0);
527b50f3546SSundar Sridharan    }
52853b0aa40SDustin Sallings}
52953b0aa40SDustin Sallings
53086a9ab8bSChiyoung SeoStoredValue *EventuallyPersistentStore::fetchValidValue(RCPtr<VBucket> &vb,
531ac548d0dSDustin Sallings                                                        const std::string &key,
53233d09fd3SDustin Sallings                                                        int bucket_num,
53383918c50SLiang Guo                                                        bool wantDeleted,
5342bfde960SMike Wiederhold                                                        bool trackReference,
5352bfde960SMike Wiederhold                                                        bool queueExpired) {
53618770839SSundar Sridharan    StoredValue *v = vb->ht.unlocked_find(key, bucket_num, wantDeleted,
53718770839SSundar Sridharan                                          trackReference);
5389156dea8SChiyoung Seo    if (v && !v->isDeleted() && !v->isTempItem()) {
53918770839SSundar Sridharan        // In the deleted case, we ignore expiration time.
5400c0c71b7STrond Norbye        if (v->isExpired(ep_real_time())) {
5413ff6d651SChiyoung Seo            if (vb->getState() != vbucket_state_active) {
542