1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef SRC_VBUCKET_H_
19 #define SRC_VBUCKET_H_ 1
20 
21 #include "config.h"
22 
23 #include <list>
24 #include <queue>
25 #include <set>
26 #include <sstream>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 #include "atomic.h"
32 #include "bgfetcher.h"
33 #include "checkpoint.h"
34 #include "common.h"
35 #include "stored-value.h"
36 
37 const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
38 const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
39 
40 struct HighPriorityVBEntry {
HighPriorityVBEntryHighPriorityVBEntry41     HighPriorityVBEntry() :
42         cookie(NULL), id(0), start(gethrtime()), isBySeqno_(false) { }
HighPriorityVBEntryHighPriorityVBEntry43     HighPriorityVBEntry(const void *c, uint64_t idNum, bool isBySeqno) :
44         cookie(c), id(idNum), start(gethrtime()), isBySeqno_(isBySeqno) { }
45 
46     const void *cookie;
47     uint64_t id;
48     hrtime_t start;
49     bool isBySeqno_;
50 };
51 
52 /**
53  * Function object that returns true if the given vbucket is acceptable.
54  */
55 class VBucketFilter {
56 public:
57 
58     /**
59      * Instiatiate a VBucketFilter that always returns true.
60      */
VBucketFilter()61     explicit VBucketFilter() : acceptable() {}
62 
63     /**
64      * Instantiate a VBucketFilter that returns true for any of the
65      * given vbucket IDs.
66      */
VBucketFilter(const std::vector<uint16_t> &a)67     explicit VBucketFilter(const std::vector<uint16_t> &a) :
68         acceptable(a.begin(), a.end()) {}
69 
VBucketFilter(const std::set<uint16_t> &s)70     explicit VBucketFilter(const std::set<uint16_t> &s) : acceptable(s) {}
71 
assign(const std::set<uint16_t> &a)72     void assign(const std::set<uint16_t> &a) {
73         acceptable = a;
74     }
75 
operator ()(uint16_t v) const76     bool operator ()(uint16_t v) const {
77         return acceptable.empty() || acceptable.find(v) != acceptable.end();
78     }
79 
size() const80     size_t size() const { return acceptable.size(); }
81 
empty() const82     bool empty() const { return acceptable.empty(); }
83 
reset()84     void reset() {
85         acceptable.clear();
86     }
87 
88     /**
89      * Calculate the difference between this and another filter.
90      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
91      * the returned filter contains: [1,2,5,6]
92      * @param other the other filter to compare with
93      * @return a new filter with the elements present in only one of the two
94      *         filters.
95      */
96     VBucketFilter filter_diff(const VBucketFilter &other) const;
97 
98     /**
99      * Calculate the intersection between this and another filter.
100      * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
101      * the returned filter contains: [3,4]
102      * @param other the other filter to compare with
103      * @return a new filter with the elements present in both of the two
104      *         filters.
105      */
106     VBucketFilter filter_intersection(const VBucketFilter &other) const;
107 
getVBSet() const108     const std::set<uint16_t> &getVBSet() const { return acceptable; }
109 
addVBucket(uint16_t vbucket)110     bool addVBucket(uint16_t vbucket) {
111         std::pair<std::set<uint16_t>::iterator, bool> rv = acceptable.insert(vbucket);
112         return rv.second;
113     }
114 
removeVBucket(uint16_t vbucket)115     void removeVBucket(uint16_t vbucket) {
116         acceptable.erase(vbucket);
117     }
118 
119     /**
120      * Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
121      * to the specified output stream.
122      */
123     friend std::ostream& operator<< (std::ostream& out,
124                                      const VBucketFilter &filter);
125 
126 private:
127 
128     std::set<uint16_t> acceptable;
129 };
130 
131 class EventuallyPersistentEngine;
132 class FailoverTable;
133 class KVShard;
134 
135 // First bool is true if an item exists in VB DB file.
136 // second bool is true if the operation is SET (i.e., insert or update).
137 typedef std::pair<bool, bool> kstat_entry_t;
138 
139 struct KVStatsCtx{
KVStatsCtxKVStatsCtx140     KVStatsCtx() : vbucket(std::numeric_limits<uint16_t>::max()),
141                    fileSpaceUsed(0), fileSize(0) {}
142 
143     uint16_t vbucket;
144     size_t fileSpaceUsed;
145     size_t fileSize;
146     unordered_map<std::string, kstat_entry_t> keyStats;
147 };
148 
149 typedef struct KVStatsCtx kvstats_ctx;
150 
151 /**
152  * An individual vbucket.
153  */
154 class VBucket : public RCValue {
155 public:
156 
VBucket(int i, vbucket_state_t newState, EPStats &st, CheckpointConfig &chkConfig, KVShard *kvshard, int64_t lastSeqno, uint64_t lastSnapStart, uint64_t lastSnapEnd, FailoverTable *table, vbucket_state_t initState = vbucket_state_dead, uint64_t chkId = 1, uint64_t purgeSeqno = 0)157     VBucket(int i, vbucket_state_t newState, EPStats &st,
158             CheckpointConfig &chkConfig, KVShard *kvshard,
159             int64_t lastSeqno, uint64_t lastSnapStart,
160             uint64_t lastSnapEnd, FailoverTable *table,
161             vbucket_state_t initState = vbucket_state_dead,
162             uint64_t chkId = 1, uint64_t purgeSeqno = 0) :
163         ht(st),
164         checkpointManager(st, i, chkConfig, lastSeqno, chkId),
165         failovers(table),
166         opsCreate(0),
167         opsUpdate(0),
168         opsDelete(0),
169         opsReject(0),
170         dirtyQueueSize(0),
171         dirtyQueueMem(0),
172         dirtyQueueFill(0),
173         dirtyQueueDrain(0),
174         dirtyQueueAge(0),
175         dirtyQueuePendingWrites(0),
176         metaDataDisk(0),
177         numExpiredItems(0),
178         fileSpaceUsed(0),
179         fileSize(0),
180         id(i),
181         state(newState),
182         initialState(initState),
183         stats(st),
184         purge_seqno(purgeSeqno),
185         cur_snapshot_start(lastSnapStart),
186         cur_snapshot_end(lastSnapEnd),
187         numHpChks(0),
188         shard(kvshard)
189     {
190         backfill.isBackfillPhase = false;
191         pendingOpsStart = 0;
192         stats.memOverhead.fetch_add(sizeof(VBucket)
193                                + ht.memorySize() + sizeof(CheckpointManager));
194         cb_assert(stats.memOverhead.load() < GIGANTOR);
195     }
196 
197     ~VBucket();
198 
getHighSeqno()199     int64_t getHighSeqno() {
200         return checkpointManager.getHighSeqno();
201     }
202 
getPurgeSeqno()203     uint64_t getPurgeSeqno() {
204         return purge_seqno;
205     }
206 
setPurgeSeqno(uint64_t to)207     void setPurgeSeqno(uint64_t to) {
208         purge_seqno = to;
209     }
210 
getSnapshotLock()211     LockHolder getSnapshotLock() {
212         LockHolder lh(snapshotMutex);
213         return lh;
214     }
215 
setCurrentSnapshot(uint64_t start, uint64_t end)216     void setCurrentSnapshot(uint64_t start, uint64_t end) {
217         LockHolder lh(snapshotMutex);
218         setCurrentSnapshot_UNLOCKED(start, end);
219     }
220 
setCurrentSnapshot_UNLOCKED(uint64_t start, uint64_t end)221     void setCurrentSnapshot_UNLOCKED(uint64_t start, uint64_t end) {
222         cb_assert(start <= end);
223 
224         if (state == vbucket_state_replica) {
225             cb_assert(end >= (uint64_t)checkpointManager.getHighSeqno());
226         }
227 
228         cur_snapshot_start = start;
229         cur_snapshot_end = end;
230     }
231 
getCurrentSnapshot(uint64_t& start, uint64_t& end)232     void getCurrentSnapshot(uint64_t& start, uint64_t& end) {
233         LockHolder lh(snapshotMutex);
234         getCurrentSnapshot_UNLOCKED(start, end);
235     }
236 
getCurrentSnapshot_UNLOCKED(uint64_t& start, uint64_t& end)237     void getCurrentSnapshot_UNLOCKED(uint64_t& start, uint64_t& end) {
238         start = cur_snapshot_start;
239         end = cur_snapshot_end;
240     }
241 
getId(void) const242     int getId(void) const { return id; }
getState(void) const243     vbucket_state_t getState(void) const { return state; }
244     void setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi);
245 
getInitialState(void)246     vbucket_state_t getInitialState(void) { return initialState; }
setInitialState(vbucket_state_t initState)247     void setInitialState(vbucket_state_t initState) {
248         initialState = initState;
249     }
250 
addPendingOp(const void *cookie)251     bool addPendingOp(const void *cookie) {
252         LockHolder lh(pendingOpLock);
253         if (state != vbucket_state_pending) {
254             // State transitioned while we were waiting.
255             return false;
256         }
257         // Start a timer when enqueuing the first client.
258         if (pendingOps.empty()) {
259             pendingOpsStart = gethrtime();
260         }
261         pendingOps.push_back(cookie);
262         ++stats.pendingOps;
263         ++stats.pendingOpsTotal;
264         return true;
265     }
266 
267     void doStatsForQueueing(Item& item, size_t itemBytes);
268     void doStatsForFlushing(Item& item, size_t itemBytes);
269     void incrMetaDataDisk(Item& qi);
270     void decrMetaDataDisk(Item& qi);
271 
272     void resetStats();
273 
274     // Get age sum in millisecond
getQueueAge()275     uint64_t getQueueAge() {
276         rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
277         if (currentAge < dirtyQueueAge) {
278             return 0;
279         }
280         return (currentAge - dirtyQueueAge) * 1000;
281     }
282 
283     void fireAllOps(EventuallyPersistentEngine &engine);
284 
size(void)285     size_t size(void) {
286         HashTableDepthStatVisitor v;
287         ht.visitDepth(v);
288         return v.size;
289     }
290 
getBackfillSize()291     size_t getBackfillSize() {
292         LockHolder lh(backfill.mutex);
293         return backfill.items.size();
294     }
queueBackfillItem(queued_item& qi, bool genSeqno)295     bool queueBackfillItem(queued_item& qi, bool genSeqno) {
296         LockHolder lh(backfill.mutex);
297         if (genSeqno) {
298             qi->setBySeqno(checkpointManager.nextBySeqno());
299         } else {
300             checkpointManager.setBySeqno(qi->getBySeqno());
301         }
302         backfill.items.push(qi);
303         ++stats.diskQueueSize;
304         ++stats.totalEnqueued;
305         doStatsForQueueing(*qi, qi->size());
306         stats.memOverhead.fetch_add(sizeof(queued_item));
307         return true;
308     }
getBackfillItems(std::vector<queued_item> &items)309     void getBackfillItems(std::vector<queued_item> &items) {
310         LockHolder lh(backfill.mutex);
311         size_t num_items = backfill.items.size();
312         while (!backfill.items.empty()) {
313             items.push_back(backfill.items.front());
314             backfill.items.pop();
315         }
316         stats.memOverhead.fetch_sub(num_items * sizeof(queued_item));
317     }
isBackfillPhase()318     bool isBackfillPhase() {
319         LockHolder lh(backfill.mutex);
320         return backfill.isBackfillPhase;
321     }
setBackfillPhase(bool backfillPhase)322     void setBackfillPhase(bool backfillPhase) {
323         LockHolder lh(backfill.mutex);
324         backfill.isBackfillPhase = backfillPhase;
325     }
326 
327     bool getBGFetchItems(vb_bgfetch_queue_t &fetches);
328     void queueBGFetchItem(const std::string &key, VBucketBGFetchItem *fetch,
329                           BgFetcher *bgFetcher);
numPendingBGFetchItems(void)330     size_t numPendingBGFetchItems(void) {
331         // do a dirty read of number of fetch items
332         return pendingBGFetches.size();
333     }
hasPendingBGFetchItems(void)334     bool hasPendingBGFetchItems(void) {
335         LockHolder lh(pendingBGFetchesLock);
336         return !pendingBGFetches.empty();
337     }
338 
toString(vbucket_state_t s)339     static const char* toString(vbucket_state_t s) {
340         switch(s) {
341         case vbucket_state_active: return "active"; break;
342         case vbucket_state_replica: return "replica"; break;
343         case vbucket_state_pending: return "pending"; break;
344         case vbucket_state_dead: return "dead"; break;
345         }
346         return "unknown";
347     }
348 
fromString(const char* state)349     static vbucket_state_t fromString(const char* state) {
350         if (strcmp(state, "active") == 0) {
351             return vbucket_state_active;
352         } else if (strcmp(state, "replica") == 0) {
353             return vbucket_state_replica;
354         } else if (strcmp(state, "pending") == 0) {
355             return vbucket_state_pending;
356         } else {
357             return vbucket_state_dead;
358         }
359     }
360 
361     void addHighPriorityVBEntry(uint64_t id, const void *cookie, bool isBySeqno);
362     void notifyCheckpointPersisted(EventuallyPersistentEngine &e, uint64_t id, bool isBySeqno);
363     void notifyAllPendingConnsFailed(EventuallyPersistentEngine &e);
364     size_t getHighPriorityChkSize();
365     static size_t getCheckpointFlushTimeout();
366 
367     void addStats(bool details, ADD_STAT add_stat, const void *c,
368                   item_eviction_policy_t policy);
369 
370     size_t getNumItems(item_eviction_policy_t policy);
371 
372     size_t getNumNonResidentItems(item_eviction_policy_t policy);
373 
getNumTempItems(void)374     size_t getNumTempItems(void) {
375         return ht.getNumTempItems();
376     }
377 
decrDirtyQueueSize(size_t decrementBy)378     bool decrDirtyQueueSize(size_t decrementBy) {
379         size_t oldVal;
380         do {
381             oldVal = dirtyQueueSize.load();
382             if (oldVal < decrementBy) {
383                 LOG(EXTENSION_LOG_DEBUG,
384                     "Cannot decrement dirty queue size of vbucket %d by %lld, "
385                     "the current value is %lld\n", id, decrementBy, oldVal);
386                 return false;
387             }
388         } while (!dirtyQueueSize.compare_exchange_strong(oldVal, oldVal - decrementBy));
389         return true;
390     }
391 
392     static const vbucket_state_t ACTIVE;
393     static const vbucket_state_t REPLICA;
394     static const vbucket_state_t PENDING;
395     static const vbucket_state_t DEAD;
396 
397     HashTable         ht;
398     CheckpointManager checkpointManager;
399     struct {
400         Mutex mutex;
401         std::queue<queued_item> items;
402         bool isBackfillPhase;
403     } backfill;
404 
getShard(void)405     KVShard *getShard(void) {
406         return shard;
407     }
408 
409     std::queue<queued_item> rejectQueue;
410     FailoverTable *failovers;
411 
412     AtomicValue<size_t>  opsCreate;
413     AtomicValue<size_t>  opsUpdate;
414     AtomicValue<size_t>  opsDelete;
415     AtomicValue<size_t>  opsReject;
416 
417     AtomicValue<size_t>  dirtyQueueSize;
418     AtomicValue<size_t>  dirtyQueueMem;
419     AtomicValue<size_t>  dirtyQueueFill;
420     AtomicValue<size_t>  dirtyQueueDrain;
421     AtomicValue<uint64_t> dirtyQueueAge;
422     AtomicValue<size_t>  dirtyQueuePendingWrites;
423     AtomicValue<size_t>  metaDataDisk;
424 
425     AtomicValue<size_t>  numExpiredItems;
426     volatile size_t  fileSpaceUsed;
427     volatile size_t  fileSize;
428 
429 private:
430     template <typename T>
431     void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c);
432 
433     void fireAllOps(EventuallyPersistentEngine &engine, ENGINE_ERROR_CODE code);
434 
435     void adjustCheckpointFlushTimeout(size_t wall_time);
436 
437     int                      id;
438     AtomicValue<vbucket_state_t>  state;
439     vbucket_state_t          initialState;
440     Mutex                    pendingOpLock;
441     std::vector<const void*> pendingOps;
442     hrtime_t                 pendingOpsStart;
443     EPStats                 &stats;
444     uint64_t                 purge_seqno;
445 
446     Mutex pendingBGFetchesLock;
447     vb_bgfetch_queue_t pendingBGFetches;
448 
449     Mutex snapshotMutex;
450     uint64_t cur_snapshot_start;
451     uint64_t cur_snapshot_end;
452 
453     Mutex hpChksMutex;
454     std::list<HighPriorityVBEntry> hpChks;
455     volatile size_t numHpChks; // size of list hpChks (to avoid MB-9434)
456     KVShard *shard;
457 
458     static size_t chkFlushTimeout;
459 
460     DISALLOW_COPY_AND_ASSIGN(VBucket);
461 };
462 
463 #endif  // SRC_VBUCKET_H_
464