xref: /4.6.4/ep-engine/src/vbucketmap.cc (revision 2b2cf1be)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <vector>
21
22#include "ep.h"
23#include "ep_engine.h"
24#include "vbucketmap.h"
25
26VBucketMap::VBucketMap(Configuration &config,
27                       EventuallyPersistentStore &store) :
28    bucketDeletion(new AtomicValue<bool>[config.getMaxVbuckets()]),
29    bucketCreation(new AtomicValue<bool>[config.getMaxVbuckets()]),
30    persistenceSeqnos(new AtomicValue<uint64_t>[config.getMaxVbuckets()]),
31    size(config.getMaxVbuckets())
32{
33    WorkLoadPolicy &workload = store.getEPEngine().getWorkLoadPolicy();
34    for (size_t shardId = 0; shardId < workload.getNumShards(); shardId++) {
35        KVShard *shard = new KVShard(shardId, store);
36        shards.push_back(shard);
37    }
38
39    for (size_t i = 0; i < size; ++i) {
40        bucketDeletion[i].store(false);
41        bucketCreation[i].store(false);
42        persistenceSeqnos[i].store(0);
43    }
44
45    config.addValueChangedListener("hlc_drift_ahead_threshold_us",
46                                    new VBucketConfigChangeListener(*this));
47    config.addValueChangedListener("hlc_drift_behind_threshold_us",
48                                    new VBucketConfigChangeListener(*this));
49}
50
51VBucketMap::~VBucketMap() {
52    delete[] bucketDeletion;
53    delete[] bucketCreation;
54    delete[] persistenceSeqnos;
55    while (!shards.empty()) {
56        delete shards.back();
57        shards.pop_back();
58    }
59}
60
61RCPtr<VBucket> VBucketMap::getBucket(id_type id) const {
62    static RCPtr<VBucket> emptyVBucket;
63    if (id < size) {
64        return getShardByVbId(id)->getBucket(id);
65    } else {
66        return emptyVBucket;
67    }
68}
69
70ENGINE_ERROR_CODE VBucketMap::addBucket(const RCPtr<VBucket> &b) {
71    if (b->getId() < size) {
72        getShardByVbId(b->getId())->setBucket(b);
73        LOG(EXTENSION_LOG_INFO, "Mapped new vbucket %d in state %s",
74            b->getId(), VBucket::toString(b->getState()));
75        return ENGINE_SUCCESS;
76    }
77    LOG(EXTENSION_LOG_WARNING,
78        "Cannot create vb %" PRIu16", max vbuckets is %" PRIu16, b->getId(),
79        size);
80    return ENGINE_ERANGE;
81}
82
83void VBucketMap::removeBucket(id_type id) {
84    if (id < size) {
85        // Theoretically, this could be off slightly.  In
86        // practice, this happens only on dead vbuckets.
87        getShardByVbId(id)->resetBucket(id);
88    }
89}
90
91std::vector<VBucketMap::id_type> VBucketMap::getBuckets(void) const {
92    std::vector<id_type> rv;
93    for (id_type i = 0; i < size; ++i) {
94        RCPtr<VBucket> b(getShardByVbId(i)->getBucket(i));
95        if (b) {
96            rv.push_back(b->getId());
97        }
98    }
99    return rv;
100}
101
102std::vector<VBucketMap::id_type> VBucketMap::getBucketsSortedByState(void) const {
103    std::vector<id_type> rv;
104    for (int state = vbucket_state_active;
105         state <= vbucket_state_dead; ++state) {
106        for (size_t i = 0; i < size; ++i) {
107            RCPtr<VBucket> b = getShardByVbId(i)->getBucket(i);
108            if (b && b->getState() == state) {
109                rv.push_back(b->getId());
110            }
111        }
112    }
113    return rv;
114}
115
116std::vector<std::pair<VBucketMap::id_type, size_t> >
117VBucketMap::getActiveVBucketsSortedByChkMgrMem(void) const {
118    std::vector<std::pair<id_type, size_t> > rv;
119    for (id_type i = 0; i < size; ++i) {
120        RCPtr<VBucket> b = getShardByVbId(i)->getBucket(i);
121        if (b && b->getState() == vbucket_state_active) {
122            rv.push_back(std::make_pair(b->getId(), b->getChkMgrMemUsage()));
123        }
124    }
125
126    struct SortCtx {
127        static bool compareSecond(std::pair<id_type, size_t> a,
128                                  std::pair<id_type, size_t> b) {
129            return (a.second < b.second);
130        }
131    };
132
133    std::sort(rv.begin(), rv.end(), SortCtx::compareSecond);
134
135    return rv;
136}
137
138
139VBucketMap::id_type VBucketMap::getSize(void) const {
140    return size;
141}
142
143bool VBucketMap::isBucketDeletion(id_type id) const {
144    return bucketDeletion[id].load();
145}
146
147bool VBucketMap::setBucketDeletion(id_type id, bool delBucket) {
148    bool inverse = !delBucket;
149    return bucketDeletion[id].compare_exchange_strong(inverse, delBucket);
150}
151
152bool VBucketMap::isBucketCreation(id_type id) const {
153    return bucketCreation[id].load();
154}
155
156bool VBucketMap::setBucketCreation(id_type id, bool rv) {
157    bool inverse = !rv;
158    return bucketCreation[id].compare_exchange_strong(inverse, rv);
159}
160
161uint64_t VBucketMap::getPersistenceCheckpointId(id_type id) const {
162    if (id < size) {
163        auto vb = getBucket(id);
164        if (vb) {
165            return vb->getPersistenceCheckpointId();
166        }
167    }
168    return {};
169}
170
171void VBucketMap::setPersistenceCheckpointId(id_type id,
172                                            uint64_t checkpointId) {
173    if (id < size) {
174        auto vb = getBucket(id);
175        if (vb) {
176            vb->setPersistenceCheckpointId(checkpointId);
177        }
178    }
179}
180
181uint64_t VBucketMap::getPersistenceSeqno(id_type id) const {
182    return persistenceSeqnos[id].load();
183}
184
185void VBucketMap::setPersistenceSeqno(id_type id, uint64_t seqno) {
186    persistenceSeqnos[id].store(seqno);
187}
188
189void VBucketMap::addBuckets(const std::vector<VBucket*> &newBuckets) {
190    std::vector<VBucket*>::const_iterator it;
191    for (it = newBuckets.begin(); it != newBuckets.end(); ++it) {
192        RCPtr<VBucket> v(*it);
193        addBucket(v);
194    }
195}
196
197KVShard* VBucketMap::getShardByVbId(id_type id) const {
198    return shards[id % shards.size()];
199}
200
201KVShard* VBucketMap::getShard(KVShard::id_type shardId) const {
202    return shards[shardId];
203}
204
205size_t VBucketMap::getNumShards() const {
206    return shards.size();
207}
208
209void VBucketMap::setHLCDriftAheadThreshold(std::chrono::microseconds threshold) {
210    for (id_type id = 0; id < size; id++) {
211        auto vb = getBucket(id);
212        if (vb) {
213            vb->setHLCDriftAheadThreshold(threshold);
214        }
215    }
216}
217
218void VBucketMap::setHLCDriftBehindThreshold(std::chrono::microseconds threshold) {
219    for (id_type id = 0; id < size; id++) {
220        auto vb = getBucket(id);
221        if (vb) {
222            vb->setHLCDriftBehindThreshold(threshold);
223        }
224    }
225}
226
227void VBucketMap::VBucketConfigChangeListener::sizeValueChanged(const std::string &key,
228                                                   size_t value) {
229    if (key == "hlc_drift_ahead_threshold_us") {
230        map.setHLCDriftAheadThreshold(std::chrono::microseconds(value));
231    } else if (key == "hlc_drift_behind_threshold_us") {
232        map.setHLCDriftBehindThreshold(std::chrono::microseconds(value));
233    }
234}