xref: /3.0.3-GA/ep-engine/src/backfill.cc (revision 4664d274)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string>
21#include <vector>
22
23#include "atomic.h"
24#include "backfill.h"
25#include "ep.h"
26#include "vbucket.h"
27
28static bool isMemoryUsageTooHigh(EPStats &stats) {
29    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
30    double maxSize = static_cast<double>(stats.getMaxDataSize());
31    return memoryUsed > (maxSize * BACKFILL_MEM_THRESHOLD);
32}
33
34class RangeCallback : public Callback<SeqnoRange> {
35public:
36    RangeCallback() {}
37    ~RangeCallback() {}
38    void callback(SeqnoRange&) {}
39};
40
41class ItemResidentCallback : public Callback<CacheLookup> {
42public:
43    ItemResidentCallback(hrtime_t token, const std::string &n,
44                         TapConnMap &cm, EventuallyPersistentEngine* e)
45    : connToken(token), tapConnName(n), connMap(cm), engine(e) {
46        cb_assert(engine);
47    }
48
49    void callback(CacheLookup &lookup);
50
51private:
52    hrtime_t                    connToken;
53    const std::string           tapConnName;
54    TapConnMap                    &connMap;
55    EventuallyPersistentEngine *engine;
56};
57
58void ItemResidentCallback::callback(CacheLookup &lookup) {
59    RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(lookup.getVBucketId());
60    if (!vb) {
61        setStatus(ENGINE_SUCCESS);
62        return;
63    }
64
65    int bucket_num(0);
66    LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
67    StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num);
68    if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
69        Item* it = v->toItem(false, lookup.getVBucketId());
70        lh.unlock();
71        CompletedBGFetchTapOperation tapop(connToken,
72                                           lookup.getVBucketId(), true);
73        if (!connMap.performOp(tapConnName, tapop, it)) {
74            delete it;
75        }
76        setStatus(ENGINE_KEY_EEXISTS);
77    } else {
78        setStatus(ENGINE_SUCCESS);
79    }
80}
81
82/**
83 * Callback class used to process an item backfilled from disk and push it into
84 * the corresponding TAP queue.
85 */
86class BackfillDiskCallback : public Callback<GetValue> {
87public:
88    BackfillDiskCallback(hrtime_t token, const std::string &n, TapConnMap &cm)
89        : connToken(token), tapConnName(n), connMap(cm) {}
90
91    void callback(GetValue &val);
92
93private:
94
95    hrtime_t                    connToken;
96    const std::string           tapConnName;
97    TapConnMap                 &connMap;
98};
99
100void BackfillDiskCallback::callback(GetValue &gv) {
101    cb_assert(gv.getValue());
102    CompletedBGFetchTapOperation tapop(connToken,
103                                       gv.getValue()->getVBucketId(), true);
104    // if the tap connection is closed, then free an Item instance
105    if (!connMap.performOp(tapConnName, tapop, gv.getValue())) {
106        delete gv.getValue();
107    }
108}
109
110bool BackfillDiskLoad::run() {
111    if (isMemoryUsageTooHigh(engine->getEpStats())) {
112        LOG(EXTENSION_LOG_INFO, "VBucket %d backfill task from disk is "
113         "temporarily suspended  because the current memory usage is too high",
114         vbucket);
115        snooze(DEFAULT_BACKFILL_SNOOZE_TIME);
116        return true;
117    }
118
119    if (connMap.checkConnectivity(name) &&
120                               !engine->getEpStore()->isFlushAllScheduled()) {
121        size_t num_items = store->getNumItems(vbucket);
122        size_t num_deleted = store->getNumPersistedDeletes(vbucket);
123        connMap.incrBackfillRemaining(name, num_items + num_deleted);
124
125        shared_ptr<Callback<GetValue> >
126            cb(new BackfillDiskCallback(connToken, name, connMap));
127        shared_ptr<Callback<CacheLookup> >
128            cl(new ItemResidentCallback(connToken, name, connMap, engine));
129        shared_ptr<Callback<SeqnoRange> >
130            sr(new RangeCallback());
131        store->dump(vbucket, startSeqno, cb, cl, sr);
132    }
133
134    LOG(EXTENSION_LOG_INFO,"VBucket %d backfill task from disk is completed",
135        vbucket);
136
137    // Should decr the disk backfill counter regardless of the connectivity
138    // status
139    CompleteDiskBackfillTapOperation op;
140    connMap.performOp(name, op, static_cast<void*>(NULL));
141
142    return false;
143}
144
145std::string BackfillDiskLoad::getDescription() {
146    std::stringstream rv;
147    rv << "Loading TAP backfill from disk: vb " << vbucket;
148    return rv.str();
149}
150
151bool BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
152    if (VBucketVisitor::visitBucket(vb)) {
153        item_eviction_policy_t policy =
154            engine->getEpStore()->getItemEvictionPolicy();
155        double num_items = static_cast<double>(vb->getNumItems(policy));
156
157        if (num_items == 0) {
158            return false;
159        }
160
161        KVStore *underlying(engine->getEpStore()->getAuxUnderlying());
162        LOG(EXTENSION_LOG_INFO,
163            "Schedule a full backfill from disk for vbucket %d.", vb->getId());
164        ExTask task = new BackfillDiskLoad(name, engine, connMap,
165                                          underlying, vb->getId(), 0,
166                                          std::numeric_limits<uint64_t>::max(),
167                                          connToken,
168                                          Priority::TapBgFetcherPriority,
169                                          0, false);
170        ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
171    }
172    return false;
173}
174
175void BackFillVisitor::visit(StoredValue*) {
176    abort();
177}
178
179bool BackFillVisitor::pauseVisitor() {
180    bool pause(true);
181
182    ssize_t theSize(connMap.backfillQueueDepth(name));
183    if (!checkValidity() || theSize < 0) {
184        LOG(EXTENSION_LOG_WARNING,
185            "TapProducer %s went away. Stopping backfill",
186            name.c_str());
187        valid = false;
188        return false;
189    }
190
191    ssize_t maxBackfillSize = engine->getTapConfig().getBackfillBacklogLimit();
192    pause = theSize > maxBackfillSize;
193
194    if (pause) {
195        LOG(EXTENSION_LOG_INFO, "Tap queue depth is too big for %s!!! ",
196            "Pausing backfill temporarily...\n", name.c_str());
197    }
198    return pause;
199}
200
201void BackFillVisitor::complete() {
202    CompleteBackfillTapOperation tapop;
203    connMap.performOp(name, tapop, static_cast<void*>(NULL));
204    LOG(EXTENSION_LOG_INFO,
205        "Backfill dispatcher task for TapProducer %s is completed.\n",
206        name.c_str());
207}
208
209bool BackFillVisitor::checkValidity() {
210    if (valid) {
211        valid = connMap.checkConnectivity(name);
212        if (!valid) {
213            LOG(EXTENSION_LOG_WARNING, "Backfilling connectivity for %s went "
214                "invalid. Stopping backfill.\n", name.c_str());
215        }
216    }
217    return valid;
218}
219
220bool BackfillTask::run(void) {
221    engine->getEpStore()->visit(bfv, "Backfill task", NONIO_TASK_IDX,
222                                Priority::BackfillTaskPriority, 1);
223    return false;
224}
225