xref: /3.0.3-GA/ep-engine/src/kvshard.h (revision 5174ad24)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2013 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_KVSHARD_H_
19#define SRC_KVSHARD_H_ 1
20
21#include "config.h"
22
23#include <memcached/engine.h>
24
25#include <algorithm>
26#include <map>
27#include <set>
28#include <sstream>
29#include <vector>
30
31#include "bgfetcher.h"
32#include "kvstore.h"
33
34
35/**
36 * Base class encapsulating individual couchstore(vbucket) into a
37 * logical group representing underlying storage operations
38 *
39 * KVShard(Shard) is the highest level abstraction of underlying
40 * storage partitions used within the EventuallyPersistentEngine(ep)
41 * and the global I/O Task Manager(iom). It gathers a collection
42 * of logical partition(vbucket) into single data access administrative
43 * unit for multiple data access dispatchers(threads)
44 *
45 *   (EP) ---> (VBucketMap) ---> Shards[0...N]
46 *
47 *   Shards[n]:
48 *   ------------------------KVShard----
49 *   | shardId: uint16_t(n)            |
50 *   | highPrioritySnapshot: bool      |
51 *   | lowPrioritySnapshot: bool       |
52 *   |                                 |
53 *   | vbuckets: VBucket[] (partitions)|----> [(VBucket),(VBucket)..]
54 *   |                                 |
55 *   | flusher: Flusher                |
56 *   | BGFetcher: bgFetcher            |
57 *   |                                 |
58 *   | rwUnderlying: KVStore (write)   |----> (CouchKVStore)
59 *   | roUnderlying: KVStore (read)    |----> (CouchKVStore)
60 *   -----------------------------------
61 *
62 */
63class Flusher;
64
65class KVShard {
66    friend class VBucketMap;
67public:
68    KVShard(uint16_t id, EventuallyPersistentStore &store);
69    ~KVShard();
70
71    KVStore *getRWUnderlying();
72    KVStore *getROUnderlying();
73
74    Flusher *getFlusher();
75    BgFetcher *getBgFetcher();
76
77    RCPtr<VBucket> getBucket(uint16_t id) const;
78    void setBucket(const RCPtr<VBucket> &b);
79    void resetBucket(uint16_t id);
80
81    uint16_t getId() { return shardId; }
82    std::vector<int> getVBucketsSortedByState();
83    std::vector<int> getVBuckets();
84    size_t getMaxNumVbuckets() { return maxVbuckets; }
85
86    /**
87     * Set the flag to coordinate the scheduled high priority vbucket
88     * snapshot and new snapshot requests with the high priority. The
89     * flag is "true" if a snapshot task with the high priority is
90     * currently scheduled, otherwise "false".  If (1) the flag is
91     * currently "false" and (2) a new snapshot request invokes this
92     * method by passing "true" parameter, this will set the flag to
93     * "true" and return "true" to indicate that the new request can
94     * be scheduled now. Otherwise, return "false" to prevent
95     * duplciate snapshot tasks from being scheduled.  When the
96     * snapshot task is running and about to writing to disk, it will
97     * invoke this method to reset the flag by passing "false"
98     * parameter.
99     *
100     * @param highPrioritySnapshot bool flag for coordination between
101     *                             the scheduled snapshot task and new
102     *                             snapshot requests.
103     * @return "true" if a flag's value was changed. Otherwise "false".
104     */
105    bool setHighPriorityVbSnapshotFlag(bool highPrioritySnapshot);
106    bool getHighPriorityVbSnapshotFlag(void) {
107        return highPrioritySnapshot;
108    }
109
110    /**
111     * Set the flag to coordinate the scheduled low priority vbucket
112     * snapshot and new snapshot requests with the low priority. The
113     * flag is "true" if a snapshot task with the low priority is
114     * currently scheduled, otherwise "false".  If (1) the flag is
115     * currently "false" and (2) a new snapshot request invokes this
116     * method by passing "true" parameter, this will set the flag to
117     * "true" and return "true" to indicate that the new request can
118     * be scheduled now. Otherwise, return "false" to prevent
119     * duplciate snapshot tasks from being scheduled.  When the
120     * snapshot task is running and about to writing to disk, it will
121     * invoke this method to reset the flag by passing "false"
122     * parameter.
123     *
124     * @param lowPrioritySnapshot bool flag for coordination between
125     *                             the scheduled low priority snapshot
126     *                             task and new snapshot requests with
127     *                             low priority.
128     *
129     * @return "true" if a flag's value was changed. Otherwise
130     *                "false".
131     */
132    bool setLowPriorityVbSnapshotFlag(bool lowPrioritySnapshot);
133    bool getLowPriorityVbSnapshotFlag(void) {
134        return lowPrioritySnapshot;
135    }
136
137    /**
138     * KVStore operations of Flush, VBDelete and VBSnapshot read and alter
139     * global KVStore variables and hence need to be serialized
140     *
141     * @return the actual mutex
142     */
143    Mutex &getWriteLock(void) {
144        return writeLock;
145    }
146
147    /**
148     * Return the number of non-deleted items from a given vbucket database
149     * @param vbid vbucket from which the number of items is retrived
150     * @return the number of non-deleted items from a given vbucket database
151     */
152    size_t getNumItemsOnDisk(uint16_t vbid) {
153        return rwUnderlying->getNumItems(vbid);
154    }
155
156    bool tryLockShardTask(ExTask &task) {
157        if (!opLock) {
158            opLock = true;
159            return true;
160        } else { // block the task in a pendingQueue
161            pendingQueue.push_back(task);
162        }
163        return false;
164    }
165
166    ExTask unlockShardTask(void) {
167        ExTask retVal;
168        cb_assert(opLock);
169        opLock = false;
170        if (!pendingQueue.empty()) { // return any blocked shard serial task
171            retVal = pendingQueue.front();
172            pendingQueue.pop_front();
173        } else {
174            retVal = NULL;
175        }
176        return retVal;
177    }
178
179    size_t getPendingTaskCount(void) {
180        return pendingQueue.size();
181    }
182
183    bool isShardLock(void) {
184        return opLock;
185    }
186
187private:
188    RCPtr<VBucket> *vbuckets;
189
190    KVStore    *rwUnderlying;
191    KVStore    *roUnderlying;
192    Mutex       writeLock;
193
194    Flusher    *flusher;
195    BgFetcher  *bgFetcher;
196
197    size_t maxVbuckets;
198    uint16_t shardId;
199
200    bool opLock; // Used by ExecutoPool infrastructure to serialize operations
201    std::list<ExTask> pendingQueue;
202
203    AtomicValue<bool> highPrioritySnapshot;
204    AtomicValue<bool> lowPrioritySnapshot;
205
206public:
207    AtomicValue<size_t> highPriorityCount;
208
209    DISALLOW_COPY_AND_ASSIGN(KVShard);
210};
211
212#endif  // SRC_KVSHARD_H_
213