1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2011 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <string>
21 #include <vector>
22 
23 #include "atomic.h"
24 #include "backfill.h"
25 #include "ep.h"
26 #include "vbucket.h"
27 
28 class RangeCallback : public Callback<SeqnoRange> {
29 public:
RangeCallback()30     RangeCallback() {}
~RangeCallback()31     ~RangeCallback() {}
callback(SeqnoRange&)32     void callback(SeqnoRange&) {}
33 };
34 
35 class ItemResidentCallback : public Callback<CacheLookup> {
36 public:
ItemResidentCallback(hrtime_t token, const std::string &n, TapConnMap &cm, EventuallyPersistentEngine* e)37     ItemResidentCallback(hrtime_t token, const std::string &n,
38                          TapConnMap &cm, EventuallyPersistentEngine* e)
39     : connToken(token), tapConnName(n), connMap(cm), engine(e) {
40         cb_assert(engine);
41     }
42 
43     void callback(CacheLookup &lookup);
44 
45 private:
46     hrtime_t                    connToken;
47     const std::string           tapConnName;
48     TapConnMap                    &connMap;
49     EventuallyPersistentEngine *engine;
50 };
51 
callback(CacheLookup &lookup)52 void ItemResidentCallback::callback(CacheLookup &lookup) {
53     RCPtr<VBucket> vb = engine->getEpStore()->getVBucket(lookup.getVBucketId());
54     if (!vb) {
55         setStatus(ENGINE_SUCCESS);
56         return;
57     }
58 
59     int bucket_num(0);
60     LockHolder lh = vb->ht.getLockedBucket(lookup.getKey(), &bucket_num);
61     StoredValue *v = vb->ht.unlocked_find(lookup.getKey(), bucket_num);
62     if (v && v->isResident() && v->getBySeqno() == lookup.getBySeqno()) {
63         Item* it = v->toItem(false, lookup.getVBucketId());
64         lh.unlock();
65         CompletedBGFetchTapOperation tapop(connToken,
66                                            lookup.getVBucketId(), true);
67         if (!connMap.performOp(tapConnName, tapop, it)) {
68             delete it;
69         }
70         setStatus(ENGINE_KEY_EEXISTS);
71     } else {
72         setStatus(ENGINE_SUCCESS);
73     }
74 }
75 
76 /**
77  * Callback class used to process an item backfilled from disk and push it into
78  * the corresponding TAP queue.
79  */
80 class BackfillDiskCallback : public Callback<GetValue> {
81 public:
BackfillDiskCallback(hrtime_t token, const std::string &n, TapConnMap &cm)82     BackfillDiskCallback(hrtime_t token, const std::string &n, TapConnMap &cm)
83         : connToken(token), tapConnName(n), connMap(cm) {}
84 
85     void callback(GetValue &val);
86 
87 private:
88 
89     hrtime_t                    connToken;
90     const std::string           tapConnName;
91     TapConnMap                 &connMap;
92 };
93 
callback(GetValue &gv)94 void BackfillDiskCallback::callback(GetValue &gv) {
95     cb_assert(gv.getValue());
96     CompletedBGFetchTapOperation tapop(connToken,
97                                        gv.getValue()->getVBucketId(), true);
98     // if the tap connection is closed, then free an Item instance
99     if (!connMap.performOp(tapConnName, tapop, gv.getValue())) {
100         delete gv.getValue();
101     }
102 }
103 
run()104 bool BackfillDiskLoad::run() {
105     if (engine->getEpStore()->isMemoryUsageTooHigh()) {
106         LOG(EXTENSION_LOG_INFO, "VBucket %d backfill task from disk is "
107          "temporarily suspended  because the current memory usage is too high",
108          vbucket);
109         snooze(DEFAULT_BACKFILL_SNOOZE_TIME);
110         return true;
111     }
112 
113     if (connMap.checkConnectivity(name) &&
114                                !engine->getEpStore()->isFlushAllScheduled()) {
115         size_t num_items = store->getNumItems(vbucket);
116         size_t num_deleted = store->getNumPersistedDeletes(vbucket);
117         connMap.incrBackfillRemaining(name, num_items + num_deleted);
118 
119         shared_ptr<Callback<GetValue> >
120             cb(new BackfillDiskCallback(connToken, name, connMap));
121         shared_ptr<Callback<CacheLookup> >
122             cl(new ItemResidentCallback(connToken, name, connMap, engine));
123         shared_ptr<Callback<SeqnoRange> >
124             sr(new RangeCallback());
125         store->dump(vbucket, startSeqno, cb, cl, sr);
126     }
127 
128     LOG(EXTENSION_LOG_INFO,"VBucket %d backfill task from disk is completed",
129         vbucket);
130 
131     // Should decr the disk backfill counter regardless of the connectivity
132     // status
133     CompleteDiskBackfillTapOperation op;
134     connMap.performOp(name, op, static_cast<void*>(NULL));
135 
136     return false;
137 }
138 
getDescription()139 std::string BackfillDiskLoad::getDescription() {
140     std::stringstream rv;
141     rv << "Loading TAP backfill from disk: vb " << vbucket;
142     return rv.str();
143 }
144 
visitBucket(RCPtr<VBucket> &vb)145 bool BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
146     if (VBucketVisitor::visitBucket(vb)) {
147         item_eviction_policy_t policy =
148             engine->getEpStore()->getItemEvictionPolicy();
149         double num_items = static_cast<double>(vb->getNumItems(policy));
150 
151         if (num_items == 0) {
152             return false;
153         }
154 
155         KVStore *underlying(engine->getEpStore()->
156                             getROUnderlying(vb->getId()));
157         LOG(EXTENSION_LOG_INFO,
158             "Schedule a full backfill from disk for vbucket %d.", vb->getId());
159         ExTask task = new BackfillDiskLoad(name, engine, connMap,
160                                           underlying, vb->getId(), 0, connToken,
161                                           Priority::TapBgFetcherPriority,
162                                           0, false);
163         ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
164     }
165     return false;
166 }
167 
visit(StoredValue*)168 void BackFillVisitor::visit(StoredValue*) {
169     abort();
170 }
171 
pauseVisitor()172 bool BackFillVisitor::pauseVisitor() {
173     bool pause(true);
174 
175     ssize_t theSize(connMap.backfillQueueDepth(name));
176     if (!checkValidity() || theSize < 0) {
177         LOG(EXTENSION_LOG_WARNING,
178             "TapProducer %s went away. Stopping backfill",
179             name.c_str());
180         valid = false;
181         return false;
182     }
183 
184     ssize_t maxBackfillSize = engine->getTapConfig().getBackfillBacklogLimit();
185     pause = theSize > maxBackfillSize;
186 
187     if (pause) {
188         LOG(EXTENSION_LOG_INFO, "Tap queue depth is too big for %s!!! ",
189             "Pausing backfill temporarily...\n", name.c_str());
190     }
191     return pause;
192 }
193 
complete()194 void BackFillVisitor::complete() {
195     CompleteBackfillTapOperation tapop;
196     connMap.performOp(name, tapop, static_cast<void*>(NULL));
197     LOG(EXTENSION_LOG_INFO,
198         "Backfill dispatcher task for TapProducer %s is completed.\n",
199         name.c_str());
200 }
201 
checkValidity()202 bool BackFillVisitor::checkValidity() {
203     if (valid) {
204         valid = connMap.checkConnectivity(name);
205         if (!valid) {
206             LOG(EXTENSION_LOG_WARNING, "Backfilling connectivity for %s went "
207                 "invalid. Stopping backfill.\n", name.c_str());
208         }
209     }
210     return valid;
211 }
212 
run(void)213 bool BackfillTask::run(void) {
214     engine->getEpStore()->visit(bfv, "Backfill task", NONIO_TASK_IDX,
215                                 Priority::BackfillTaskPriority, 1);
216     return false;
217 }
218