xref: /3.0.3-GA/ep-engine/src/vbucket.h (revision f0dc9348)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_VBUCKET_H_
19#define SRC_VBUCKET_H_ 1
20
21#include "config.h"
22
23#include <list>
24#include <queue>
25#include <set>
26#include <sstream>
27#include <string>
28#include <utility>
29#include <vector>
30
31#include "atomic.h"
32#include "bgfetcher.h"
33#include "checkpoint.h"
34#include "common.h"
35#include "stored-value.h"
36
37const size_t MIN_CHK_FLUSH_TIMEOUT = 10; // 10 sec.
38const size_t MAX_CHK_FLUSH_TIMEOUT = 30; // 30 sec.
39
40struct HighPriorityVBEntry {
41    HighPriorityVBEntry() :
42        cookie(NULL), id(0), start(gethrtime()), isBySeqno_(false) { }
43    HighPriorityVBEntry(const void *c, uint64_t idNum, bool isBySeqno) :
44        cookie(c), id(idNum), start(gethrtime()), isBySeqno_(isBySeqno) { }
45
46    const void *cookie;
47    uint64_t id;
48    hrtime_t start;
49    bool isBySeqno_;
50};
51
52/**
53 * Function object that returns true if the given vbucket is acceptable.
54 */
55class VBucketFilter {
56public:
57
58    /**
59     * Instiatiate a VBucketFilter that always returns true.
60     */
61    explicit VBucketFilter() : acceptable() {}
62
63    /**
64     * Instantiate a VBucketFilter that returns true for any of the
65     * given vbucket IDs.
66     */
67    explicit VBucketFilter(const std::vector<uint16_t> &a) :
68        acceptable(a.begin(), a.end()) {}
69
70    explicit VBucketFilter(const std::set<uint16_t> &s) : acceptable(s) {}
71
72    void assign(const std::set<uint16_t> &a) {
73        acceptable = a;
74    }
75
76    bool operator ()(uint16_t v) const {
77        return acceptable.empty() || acceptable.find(v) != acceptable.end();
78    }
79
80    size_t size() const { return acceptable.size(); }
81
82    bool empty() const { return acceptable.empty(); }
83
84    void reset() {
85        acceptable.clear();
86    }
87
88    /**
89     * Calculate the difference between this and another filter.
90     * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
91     * the returned filter contains: [1,2,5,6]
92     * @param other the other filter to compare with
93     * @return a new filter with the elements present in only one of the two
94     *         filters.
95     */
96    VBucketFilter filter_diff(const VBucketFilter &other) const;
97
98    /**
99     * Calculate the intersection between this and another filter.
100     * If "this" contains elements, [1,2,3,4] and other contains [3,4,5,6]
101     * the returned filter contains: [3,4]
102     * @param other the other filter to compare with
103     * @return a new filter with the elements present in both of the two
104     *         filters.
105     */
106    VBucketFilter filter_intersection(const VBucketFilter &other) const;
107
108    const std::set<uint16_t> &getVBSet() const { return acceptable; }
109
110    bool addVBucket(uint16_t vbucket) {
111        std::pair<std::set<uint16_t>::iterator, bool> rv = acceptable.insert(vbucket);
112        return rv.second;
113    }
114
115    void removeVBucket(uint16_t vbucket) {
116        acceptable.erase(vbucket);
117    }
118
119    /**
120     * Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
121     * to the specified output stream.
122     */
123    friend std::ostream& operator<< (std::ostream& out,
124                                     const VBucketFilter &filter);
125
126private:
127
128    std::set<uint16_t> acceptable;
129};
130
131class EventuallyPersistentEngine;
132class FailoverTable;
133class KVShard;
134
135// First bool is true if an item exists in VB DB file.
136// second bool is true if the operation is SET (i.e., insert or update).
137typedef std::pair<bool, bool> kstat_entry_t;
138
139struct KVStatsCtx{
140    KVStatsCtx() : vbucket(std::numeric_limits<uint16_t>::max()),
141                   fileSpaceUsed(0), fileSize(0) {}
142
143    uint16_t vbucket;
144    size_t fileSpaceUsed;
145    size_t fileSize;
146    unordered_map<std::string, kstat_entry_t> keyStats;
147};
148
149typedef struct KVStatsCtx kvstats_ctx;
150
151/**
152 * An individual vbucket.
153 */
154class VBucket : public RCValue {
155public:
156
157    VBucket(int i, vbucket_state_t newState, EPStats &st,
158            CheckpointConfig &chkConfig, KVShard *kvshard,
159            int64_t lastSeqno, FailoverTable *table,
160            vbucket_state_t initState = vbucket_state_dead,
161            uint64_t chkId = 1, uint64_t purgeSeqno = 0) :
162        ht(st),
163        checkpointManager(st, i, chkConfig, lastSeqno, chkId),
164        failovers(table),
165        opsCreate(0),
166        opsUpdate(0),
167        opsDelete(0),
168        opsReject(0),
169        dirtyQueueSize(0),
170        dirtyQueueMem(0),
171        dirtyQueueFill(0),
172        dirtyQueueDrain(0),
173        dirtyQueueAge(0),
174        dirtyQueuePendingWrites(0),
175        numExpiredItems(0),
176        fileSpaceUsed(0),
177        fileSize(0),
178        id(i),
179        state(newState),
180        initialState(initState),
181        stats(st),
182        purge_seqno(purgeSeqno),
183        numHpChks(0),
184        shard(kvshard)
185    {
186        backfill.isBackfillPhase = false;
187        pendingOpsStart = 0;
188        stats.memOverhead.fetch_add(sizeof(VBucket)
189                               + ht.memorySize() + sizeof(CheckpointManager));
190        cb_assert(stats.memOverhead.load() < GIGANTOR);
191    }
192
193    ~VBucket();
194
195    int64_t getHighSeqno() {
196        return checkpointManager.getHighSeqno();
197    }
198
199    uint64_t getPurgeSeqno() {
200        return purge_seqno;
201    }
202
203    void setPurgeSeqno(uint64_t to) {
204        purge_seqno = to;
205    }
206
207    int getId(void) const { return id; }
208    vbucket_state_t getState(void) const { return state; }
209    void setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi);
210
211    vbucket_state_t getInitialState(void) { return initialState; }
212    void setInitialState(vbucket_state_t initState) {
213        initialState = initState;
214    }
215
216    bool addPendingOp(const void *cookie) {
217        LockHolder lh(pendingOpLock);
218        if (state != vbucket_state_pending) {
219            // State transitioned while we were waiting.
220            return false;
221        }
222        // Start a timer when enqueuing the first client.
223        if (pendingOps.empty()) {
224            pendingOpsStart = gethrtime();
225        }
226        pendingOps.push_back(cookie);
227        ++stats.pendingOps;
228        ++stats.pendingOpsTotal;
229        return true;
230    }
231
232    void doStatsForQueueing(Item& item, size_t itemBytes);
233    void doStatsForFlushing(Item& item, size_t itemBytes);
234    void resetStats();
235
236    // Get age sum in millisecond
237    uint64_t getQueueAge() {
238        rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
239        if (currentAge < dirtyQueueAge) {
240            return 0;
241        }
242        return (currentAge - dirtyQueueAge) * 1000;
243    }
244
245    void fireAllOps(EventuallyPersistentEngine &engine);
246
247    size_t size(void) {
248        HashTableDepthStatVisitor v;
249        ht.visitDepth(v);
250        return v.size;
251    }
252
253    size_t getBackfillSize() {
254        LockHolder lh(backfill.mutex);
255        return backfill.items.size();
256    }
257    bool queueBackfillItem(queued_item& qi, bool genSeqno) {
258        LockHolder lh(backfill.mutex);
259        if (genSeqno) {
260            qi->setBySeqno(checkpointManager.nextBySeqno());
261        } else {
262            checkpointManager.setBySeqno(qi->getBySeqno());
263        }
264        backfill.items.push(qi);
265        ++stats.diskQueueSize;
266        ++stats.totalEnqueued;
267        doStatsForQueueing(*qi, qi->size());
268        stats.memOverhead.fetch_add(sizeof(queued_item));
269        return true;
270    }
271    void getBackfillItems(std::vector<queued_item> &items) {
272        LockHolder lh(backfill.mutex);
273        size_t num_items = backfill.items.size();
274        while (!backfill.items.empty()) {
275            items.push_back(backfill.items.front());
276            backfill.items.pop();
277        }
278        stats.memOverhead.fetch_sub(num_items * sizeof(queued_item));
279    }
280    bool isBackfillPhase() {
281        LockHolder lh(backfill.mutex);
282        return backfill.isBackfillPhase;
283    }
284    void setBackfillPhase(bool backfillPhase) {
285        LockHolder lh(backfill.mutex);
286        backfill.isBackfillPhase = backfillPhase;
287    }
288
289    bool getBGFetchItems(vb_bgfetch_queue_t &fetches);
290    void queueBGFetchItem(const std::string &key, VBucketBGFetchItem *fetch,
291                          BgFetcher *bgFetcher);
292    size_t numPendingBGFetchItems(void) {
293        // do a dirty read of number of fetch items
294        return pendingBGFetches.size();
295    }
296    bool hasPendingBGFetchItems(void) {
297        LockHolder lh(pendingBGFetchesLock);
298        return !pendingBGFetches.empty();
299    }
300
301    static const char* toString(vbucket_state_t s) {
302        switch(s) {
303        case vbucket_state_active: return "active"; break;
304        case vbucket_state_replica: return "replica"; break;
305        case vbucket_state_pending: return "pending"; break;
306        case vbucket_state_dead: return "dead"; break;
307        }
308        return "unknown";
309    }
310
311    static vbucket_state_t fromString(const char* state) {
312        if (strcmp(state, "active") == 0) {
313            return vbucket_state_active;
314        } else if (strcmp(state, "replica") == 0) {
315            return vbucket_state_replica;
316        } else if (strcmp(state, "pending") == 0) {
317            return vbucket_state_pending;
318        } else {
319            return vbucket_state_dead;
320        }
321    }
322
323    void addHighPriorityVBEntry(uint64_t id, const void *cookie, bool isBySeqno);
324    void notifyCheckpointPersisted(EventuallyPersistentEngine &e, uint64_t id, bool isBySeqno);
325    void notifyAllPendingConnsFailed(EventuallyPersistentEngine &e);
326    size_t getHighPriorityChkSize();
327    static size_t getCheckpointFlushTimeout();
328
329    void addStats(bool details, ADD_STAT add_stat, const void *c,
330                  item_eviction_policy_t policy);
331
332    size_t getNumItems(item_eviction_policy_t policy);
333
334    size_t getNumNonResidentItems(item_eviction_policy_t policy);
335
336    size_t getNumTempItems(void) {
337        return ht.getNumTempItems();
338    }
339
340    bool decrDirtyQueueSize(size_t decrementBy) {
341        size_t oldVal;
342        do {
343            oldVal = dirtyQueueSize.load();
344            if (oldVal < decrementBy) {
345                LOG(EXTENSION_LOG_DEBUG,
346                    "Cannot decrement dirty queue size of vbucket %d by %lld, "
347                    "the current value is %lld\n", id, decrementBy, oldVal);
348                return false;
349            }
350        } while (!dirtyQueueSize.compare_exchange_strong(oldVal, oldVal - decrementBy));
351        return true;
352    }
353
354    static const vbucket_state_t ACTIVE;
355    static const vbucket_state_t REPLICA;
356    static const vbucket_state_t PENDING;
357    static const vbucket_state_t DEAD;
358
359    HashTable         ht;
360    CheckpointManager checkpointManager;
361    struct {
362        Mutex mutex;
363        std::queue<queued_item> items;
364        bool isBackfillPhase;
365    } backfill;
366
367    std::queue<queued_item> rejectQueue;
368    FailoverTable *failovers;
369
370    AtomicValue<size_t>  opsCreate;
371    AtomicValue<size_t>  opsUpdate;
372    AtomicValue<size_t>  opsDelete;
373    AtomicValue<size_t>  opsReject;
374
375    AtomicValue<size_t>  dirtyQueueSize;
376    AtomicValue<size_t>  dirtyQueueMem;
377    AtomicValue<size_t>  dirtyQueueFill;
378    AtomicValue<size_t>  dirtyQueueDrain;
379    AtomicValue<uint64_t> dirtyQueueAge;
380    AtomicValue<size_t>  dirtyQueuePendingWrites;
381
382    AtomicValue<size_t>  numExpiredItems;
383    volatile size_t  fileSpaceUsed;
384    volatile size_t  fileSize;
385
386private:
387    template <typename T>
388    void addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c);
389
390    void fireAllOps(EventuallyPersistentEngine &engine, ENGINE_ERROR_CODE code);
391
392    void adjustCheckpointFlushTimeout(size_t wall_time);
393
394    int                      id;
395    AtomicValue<vbucket_state_t>  state;
396    vbucket_state_t          initialState;
397    Mutex                    pendingOpLock;
398    std::vector<const void*> pendingOps;
399    hrtime_t                 pendingOpsStart;
400    EPStats                 &stats;
401    uint64_t                 purge_seqno;
402
403    Mutex pendingBGFetchesLock;
404    vb_bgfetch_queue_t pendingBGFetches;
405
406    Mutex hpChksMutex;
407    std::list<HighPriorityVBEntry> hpChks;
408    volatile size_t numHpChks; // size of list hpChks (to avoid MB-9434)
409    KVShard *shard;
410
411    static size_t chkFlushTimeout;
412
413    DISALLOW_COPY_AND_ASSIGN(VBucket);
414};
415
416#endif  // SRC_VBUCKET_H_
417