1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2010 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24
25 #include "ep_engine.h"
26 #include "failover-table.h"
27 #define STATWRITER_NAMESPACE vbucket
28 #include "statwriter.h"
29 #undef STATWRITER_NAMESPACE
30 #include "vbucket.h"
31
filter_diff(const VBucketFilter &other) const32 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
33 std::vector<uint16_t> tmp(acceptable.size() + other.size());
34 std::vector<uint16_t>::iterator end;
35 end = std::set_symmetric_difference(acceptable.begin(),
36 acceptable.end(),
37 other.acceptable.begin(),
38 other.acceptable.end(),
39 tmp.begin());
40 return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
41 }
42
filter_intersection(const VBucketFilter &other) const43 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
44 const {
45 std::vector<uint16_t> tmp(acceptable.size() + other.size());
46 std::vector<uint16_t>::iterator end;
47
48 end = std::set_intersection(acceptable.begin(), acceptable.end(),
49 other.acceptable.begin(),
50 other.acceptable.end(),
51 tmp.begin());
52 return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
53 }
54
isRange(std::set<uint16_t>::const_iterator it, const std::set<uint16_t>::const_iterator &end, size_t &length)55 static bool isRange(std::set<uint16_t>::const_iterator it,
56 const std::set<uint16_t>::const_iterator &end,
57 size_t &length)
58 {
59 length = 0;
60 for (uint16_t val = *it;
61 it != end && (val + length) == *it;
62 ++it, ++length) {
63 // empty
64 }
65
66 --length;
67
68 return length > 1;
69 }
70
operator <<(std::ostream &out, const VBucketFilter &filter)71 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
72 {
73 std::set<uint16_t>::const_iterator it;
74
75 if (filter.acceptable.empty()) {
76 out << "{ empty }";
77 } else {
78 bool needcomma = false;
79 out << "{ ";
80 for (it = filter.acceptable.begin();
81 it != filter.acceptable.end();
82 ++it) {
83 if (needcomma) {
84 out << ", ";
85 }
86
87 size_t length;
88 if (isRange(it, filter.acceptable.end(), length)) {
89 std::set<uint16_t>::iterator last = it;
90 for (size_t i = 0; i < length; ++i) {
91 ++last;
92 }
93 out << "[" << *it << "," << *last << "]";
94 it = last;
95 } else {
96 out << *it;
97 }
98 needcomma = true;
99 }
100 out << " }";
101 }
102
103 return out;
104 }
105
106 size_t VBucket::chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
107
108 const vbucket_state_t VBucket::ACTIVE =
109 static_cast<vbucket_state_t>(htonl(vbucket_state_active));
110 const vbucket_state_t VBucket::REPLICA =
111 static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
112 const vbucket_state_t VBucket::PENDING =
113 static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
114 const vbucket_state_t VBucket::DEAD =
115 static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
116
~VBucket()117 VBucket::~VBucket() {
118 if (!pendingOps.empty() || !pendingBGFetches.empty()) {
119 LOG(EXTENSION_LOG_WARNING,
120 "Have %ld pending ops and %ld pending reads "
121 "while destroying vbucket\n",
122 pendingOps.size(), pendingBGFetches.size());
123 }
124
125 stats.decrDiskQueueSize(dirtyQueueSize.load());
126
127 size_t num_pending_fetches = 0;
128 vb_bgfetch_queue_t::iterator itr = pendingBGFetches.begin();
129 for (; itr != pendingBGFetches.end(); ++itr) {
130 std::list<VBucketBGFetchItem *> &bgitems = itr->second;
131 std::list<VBucketBGFetchItem *>::iterator vit = bgitems.begin();
132 for (; vit != bgitems.end(); ++vit) {
133 delete (*vit);
134 ++num_pending_fetches;
135 }
136 }
137 stats.numRemainingBgJobs.fetch_sub(num_pending_fetches);
138 pendingBGFetches.clear();
139 delete failovers;
140
141 stats.memOverhead.fetch_sub(sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
142 cb_assert(stats.memOverhead.load() < GIGANTOR);
143
144 LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
145 }
146
fireAllOps(EventuallyPersistentEngine &engine, ENGINE_ERROR_CODE code)147 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
148 ENGINE_ERROR_CODE code) {
149 if (pendingOpsStart > 0) {
150 hrtime_t now = gethrtime();
151 if (now > pendingOpsStart) {
152 hrtime_t d = (now - pendingOpsStart) / 1000;
153 stats.pendingOpsHisto.add(d);
154 atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
155 }
156 } else {
157 return;
158 }
159
160 pendingOpsStart = 0;
161 stats.pendingOps.fetch_sub(pendingOps.size());
162 atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
163
164 engine.notifyIOComplete(pendingOps, code);
165 pendingOps.clear();
166
167 LOG(EXTENSION_LOG_INFO,
168 "Fired pendings ops for vbucket %d in state %s\n",
169 id, VBucket::toString(state));
170 }
171
fireAllOps(EventuallyPersistentEngine &engine)172 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
173 LockHolder lh(pendingOpLock);
174
175 if (state == vbucket_state_active) {
176 fireAllOps(engine, ENGINE_SUCCESS);
177 } else if (state == vbucket_state_pending) {
178 // Nothing
179 } else {
180 fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
181 }
182 }
183
setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi)184 void VBucket::setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi) {
185 cb_assert(sapi);
186 vbucket_state_t oldstate(state);
187
188 if (to == vbucket_state_active &&
189 checkpointManager.getOpenCheckpointId() < 2) {
190 checkpointManager.setOpenCheckpointId(2);
191 }
192
193 if (oldstate == vbucket_state_active) {
194 uint64_t highSeqno = (uint64_t)checkpointManager.getHighSeqno();
195 setCurrentSnapshot(highSeqno, highSeqno);
196 }
197
198 LOG(EXTENSION_LOG_DEBUG, "transitioning vbucket %d from %s to %s",
199 id, VBucket::toString(oldstate), VBucket::toString(to));
200
201 state = to;
202 }
203
doStatsForQueueing(Item& qi, size_t itemBytes)204 void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
205 {
206 ++dirtyQueueSize;
207 dirtyQueueMem.fetch_add(sizeof(Item));
208 ++dirtyQueueFill;
209 dirtyQueueAge.fetch_add(qi.getQueuedTime());
210 dirtyQueuePendingWrites.fetch_add(itemBytes);
211 }
212
doStatsForFlushing(Item& qi, size_t itemBytes)213 void VBucket::doStatsForFlushing(Item& qi, size_t itemBytes)
214 {
215 decrDirtyQueueSize(1);
216 if (dirtyQueueMem > sizeof(Item)) {
217 dirtyQueueMem.fetch_sub(sizeof(Item));
218 } else {
219 dirtyQueueMem.store(0);
220 }
221 ++dirtyQueueDrain;
222
223 if (dirtyQueueAge > qi.getQueuedTime()) {
224 dirtyQueueAge.fetch_sub(qi.getQueuedTime());
225 } else {
226 dirtyQueueAge.store(0);
227 }
228
229 if (dirtyQueuePendingWrites > itemBytes) {
230 dirtyQueuePendingWrites.fetch_sub(itemBytes);
231 } else {
232 dirtyQueuePendingWrites.store(0);
233 }
234 }
235
incrMetaDataDisk(Item& qi)236 void VBucket::incrMetaDataDisk(Item& qi)
237 {
238 metaDataDisk.fetch_add(qi.getNKey() + sizeof(ItemMetaData));
239 }
240
decrMetaDataDisk(Item& qi)241 void VBucket::decrMetaDataDisk(Item& qi)
242 {
243 // assume couchstore remove approx this much data from disk
244 metaDataDisk.fetch_sub((qi.getNKey() + sizeof(ItemMetaData)));
245 }
246
resetStats()247 void VBucket::resetStats() {
248 opsCreate.store(0);
249 opsUpdate.store(0);
250 opsDelete.store(0);
251 opsReject.store(0);
252
253 stats.decrDiskQueueSize(dirtyQueueSize.load());
254 dirtyQueueSize.store(0);
255 dirtyQueueMem.store(0);
256 dirtyQueueFill.store(0);
257 dirtyQueueAge.store(0);
258 dirtyQueuePendingWrites.store(0);
259 dirtyQueueDrain.store(0);
260 fileSpaceUsed = 0;
261 fileSize = 0;
262 }
263
264 template <typename T>
addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c)265 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
266 const void *c) {
267 std::stringstream name;
268 name << "vb_" << id;
269 if (nm != NULL) {
270 name << ":" << nm;
271 }
272 std::stringstream value;
273 value << val;
274 std::string n = name.str();
275 add_casted_stat(n.data(), value.str().data(), add_stat, c);
276 }
277
queueBGFetchItem(const std::string &key, VBucketBGFetchItem *fetch, BgFetcher *bgFetcher)278 void VBucket::queueBGFetchItem(const std::string &key,
279 VBucketBGFetchItem *fetch,
280 BgFetcher *bgFetcher) {
281 LockHolder lh(pendingBGFetchesLock);
282 pendingBGFetches[key].push_back(fetch);
283 bgFetcher->addPendingVB(id);
284 lh.unlock();
285 }
286
getBGFetchItems(vb_bgfetch_queue_t &fetches)287 bool VBucket::getBGFetchItems(vb_bgfetch_queue_t &fetches) {
288 LockHolder lh(pendingBGFetchesLock);
289 fetches.insert(pendingBGFetches.begin(), pendingBGFetches.end());
290 pendingBGFetches.clear();
291 lh.unlock();
292 return fetches.size() > 0;
293 }
294
addHighPriorityVBEntry(uint64_t id, const void *cookie, bool isBySeqno)295 void VBucket::addHighPriorityVBEntry(uint64_t id, const void *cookie,
296 bool isBySeqno) {
297 LockHolder lh(hpChksMutex);
298 if (shard) {
299 ++shard->highPriorityCount;
300 }
301 hpChks.push_back(HighPriorityVBEntry(cookie, id, isBySeqno));
302 numHpChks = hpChks.size();
303 }
304
notifyCheckpointPersisted(EventuallyPersistentEngine &e, uint64_t idNum, bool isBySeqno)305 void VBucket::notifyCheckpointPersisted(EventuallyPersistentEngine &e,
306 uint64_t idNum,
307 bool isBySeqno) {
308 LockHolder lh(hpChksMutex);
309 std::map<const void*, ENGINE_ERROR_CODE> toNotify;
310 std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
311
312 while (entry != hpChks.end()) {
313 if (isBySeqno != entry->isBySeqno_) {
314 ++entry;
315 continue;
316 }
317
318 hrtime_t wall_time(gethrtime() - entry->start);
319 size_t spent = wall_time / 1000000000;
320 if (entry->id <= idNum) {
321 toNotify[entry->cookie] = ENGINE_SUCCESS;
322 stats.chkPersistenceHisto.add(wall_time / 1000);
323 adjustCheckpointFlushTimeout(wall_time / 1000000000);
324 LOG(EXTENSION_LOG_WARNING, "Notified the completion of checkpoint "
325 "persistence for vbucket %d, id %llu, cookie %p", id, idNum,
326 entry->cookie);
327 entry = hpChks.erase(entry);
328 if (shard) {
329 --shard->highPriorityCount;
330 }
331 } else if (spent > getCheckpointFlushTimeout()) {
332 adjustCheckpointFlushTimeout(spent);
333 e.storeEngineSpecific(entry->cookie, NULL);
334 toNotify[entry->cookie] = ENGINE_TMPFAIL;
335 LOG(EXTENSION_LOG_WARNING, "Notified the timeout on checkpoint "
336 "persistence for vbucket %d, id %llu, cookie %p", id, idNum,
337 entry->cookie);
338 entry = hpChks.erase(entry);
339 if (shard) {
340 --shard->highPriorityCount;
341 }
342 } else {
343 ++entry;
344 }
345 }
346 numHpChks = hpChks.size();
347 lh.unlock();
348
349 std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
350 for (; itr != toNotify.end(); ++itr) {
351 e.notifyIOComplete(itr->first, itr->second);
352 }
353
354 }
355
notifyAllPendingConnsFailed(EventuallyPersistentEngine &e)356 void VBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine &e) {
357 LockHolder lh(hpChksMutex);
358 std::map<const void*, ENGINE_ERROR_CODE> toNotify;
359 std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
360 while (entry != hpChks.end()) {
361 toNotify[entry->cookie] = ENGINE_TMPFAIL;
362 e.storeEngineSpecific(entry->cookie, NULL);
363 entry = hpChks.erase(entry);
364 if (shard) {
365 --shard->highPriorityCount;
366 }
367 }
368 lh.unlock();
369
370 std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
371 for (; itr != toNotify.end(); ++itr) {
372 e.notifyIOComplete(itr->first, itr->second);
373 }
374
375 fireAllOps(e);
376 }
377
adjustCheckpointFlushTimeout(size_t wall_time)378 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
379 size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
380
381 if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
382 chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
383 } else if (wall_time <= middle) {
384 chkFlushTimeout = middle;
385 } else {
386 chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
387 }
388 }
389
getHighPriorityChkSize()390 size_t VBucket::getHighPriorityChkSize() {
391 return numHpChks;
392 }
393
getCheckpointFlushTimeout()394 size_t VBucket::getCheckpointFlushTimeout() {
395 return chkFlushTimeout;
396 }
397
getNumItems(item_eviction_policy_t policy)398 size_t VBucket::getNumItems(item_eviction_policy_t policy) {
399 if (policy == VALUE_ONLY) {
400 return ht.getNumInMemoryItems();
401 } else {
402 return ht.getNumItems();
403 }
404 }
405
getNumNonResidentItems(item_eviction_policy_t policy)406 size_t VBucket::getNumNonResidentItems(item_eviction_policy_t policy) {
407 if (policy == VALUE_ONLY) {
408 return ht.getNumInMemoryNonResItems();
409 } else {
410 size_t num_items = ht.getNumItems();
411 size_t num_res_items = ht.getNumInMemoryItems() -
412 ht.getNumInMemoryNonResItems();
413 return num_items > num_res_items ? (num_items - num_res_items) : 0;
414 }
415 }
416
addStats(bool details, ADD_STAT add_stat, const void *c, item_eviction_policy_t policy)417 void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
418 item_eviction_policy_t policy) {
419 addStat(NULL, toString(state), add_stat, c);
420 if (details) {
421 size_t numItems = getNumItems(policy);
422 size_t tempItems = getNumTempItems();
423 addStat("num_items", numItems, add_stat, c);
424 addStat("num_temp_items", tempItems, add_stat, c);
425 addStat("num_non_resident", getNumNonResidentItems(policy),
426 add_stat, c);
427 addStat("ht_memory", ht.memorySize(), add_stat, c);
428 addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
429 addStat("ht_cache_size", ht.cacheSize, add_stat, c);
430 addStat("num_ejects", ht.getNumEjects(), add_stat, c);
431 addStat("ops_create", opsCreate, add_stat, c);
432 addStat("ops_update", opsUpdate, add_stat, c);
433 addStat("ops_delete", opsDelete, add_stat, c);
434 addStat("ops_reject", opsReject, add_stat, c);
435 addStat("queue_size", dirtyQueueSize, add_stat, c);
436 addStat("queue_memory", dirtyQueueMem, add_stat, c);
437 addStat("queue_fill", dirtyQueueFill, add_stat, c);
438 addStat("queue_drain", dirtyQueueDrain, add_stat, c);
439 addStat("queue_age", getQueueAge(), add_stat, c);
440 addStat("pending_writes", dirtyQueuePendingWrites, add_stat, c);
441 addStat("db_data_size", fileSpaceUsed, add_stat, c);
442 addStat("db_file_size", fileSize, add_stat, c);
443 addStat("high_seqno", getHighSeqno(), add_stat, c);
444 addStat("uuid", failovers->getLatestEntry().vb_uuid, add_stat, c);
445 addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
446 }
447 }
448