1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <functional>
21 #include <list>
22 #include <set>
23 #include <string>
24 
25 #include "ep_engine.h"
26 #include "failover-table.h"
27 #define STATWRITER_NAMESPACE vbucket
28 #include "statwriter.h"
29 #undef STATWRITER_NAMESPACE
30 #include "vbucket.h"
31 
filter_diff(const VBucketFilter &other) const32 VBucketFilter VBucketFilter::filter_diff(const VBucketFilter &other) const {
33     std::vector<uint16_t> tmp(acceptable.size() + other.size());
34     std::vector<uint16_t>::iterator end;
35     end = std::set_symmetric_difference(acceptable.begin(),
36                                         acceptable.end(),
37                                         other.acceptable.begin(),
38                                         other.acceptable.end(),
39                                         tmp.begin());
40     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
41 }
42 
filter_intersection(const VBucketFilter &other) const43 VBucketFilter VBucketFilter::filter_intersection(const VBucketFilter &other)
44                                                                         const {
45     std::vector<uint16_t> tmp(acceptable.size() + other.size());
46     std::vector<uint16_t>::iterator end;
47 
48     end = std::set_intersection(acceptable.begin(), acceptable.end(),
49                                 other.acceptable.begin(),
50                                 other.acceptable.end(),
51                                 tmp.begin());
52     return VBucketFilter(std::vector<uint16_t>(tmp.begin(), end));
53 }
54 
isRange(std::set<uint16_t>::const_iterator it, const std::set<uint16_t>::const_iterator &end, size_t &length)55 static bool isRange(std::set<uint16_t>::const_iterator it,
56                     const std::set<uint16_t>::const_iterator &end,
57                     size_t &length)
58 {
59     length = 0;
60     for (uint16_t val = *it;
61          it != end && (val + length) == *it;
62          ++it, ++length) {
63         // empty
64     }
65 
66     --length;
67 
68     return length > 1;
69 }
70 
operator <<(std::ostream &out, const VBucketFilter &filter)71 std::ostream& operator <<(std::ostream &out, const VBucketFilter &filter)
72 {
73     std::set<uint16_t>::const_iterator it;
74 
75     if (filter.acceptable.empty()) {
76         out << "{ empty }";
77     } else {
78         bool needcomma = false;
79         out << "{ ";
80         for (it = filter.acceptable.begin();
81              it != filter.acceptable.end();
82              ++it) {
83             if (needcomma) {
84                 out << ", ";
85             }
86 
87             size_t length;
88             if (isRange(it, filter.acceptable.end(), length)) {
89                 std::set<uint16_t>::iterator last = it;
90                 for (size_t i = 0; i < length; ++i) {
91                     ++last;
92                 }
93                 out << "[" << *it << "," << *last << "]";
94                 it = last;
95             } else {
96                 out << *it;
97             }
98             needcomma = true;
99         }
100         out << " }";
101     }
102 
103     return out;
104 }
105 
106 size_t VBucket::chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
107 
108 const vbucket_state_t VBucket::ACTIVE =
109                      static_cast<vbucket_state_t>(htonl(vbucket_state_active));
110 const vbucket_state_t VBucket::REPLICA =
111                     static_cast<vbucket_state_t>(htonl(vbucket_state_replica));
112 const vbucket_state_t VBucket::PENDING =
113                     static_cast<vbucket_state_t>(htonl(vbucket_state_pending));
114 const vbucket_state_t VBucket::DEAD =
115                     static_cast<vbucket_state_t>(htonl(vbucket_state_dead));
116 
~VBucket()117 VBucket::~VBucket() {
118     if (!pendingOps.empty() || !pendingBGFetches.empty()) {
119         LOG(EXTENSION_LOG_WARNING,
120             "Have %ld pending ops and %ld pending reads "
121             "while destroying vbucket\n",
122             pendingOps.size(), pendingBGFetches.size());
123     }
124 
125     stats.decrDiskQueueSize(dirtyQueueSize.load());
126 
127     size_t num_pending_fetches = 0;
128     vb_bgfetch_queue_t::iterator itr = pendingBGFetches.begin();
129     for (; itr != pendingBGFetches.end(); ++itr) {
130         std::list<VBucketBGFetchItem *> &bgitems = itr->second;
131         std::list<VBucketBGFetchItem *>::iterator vit = bgitems.begin();
132         for (; vit != bgitems.end(); ++vit) {
133             delete (*vit);
134             ++num_pending_fetches;
135         }
136     }
137     stats.numRemainingBgJobs.fetch_sub(num_pending_fetches);
138     pendingBGFetches.clear();
139     delete failovers;
140 
141     stats.memOverhead.fetch_sub(sizeof(VBucket) + ht.memorySize() + sizeof(CheckpointManager));
142     cb_assert(stats.memOverhead.load() < GIGANTOR);
143 
144     LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
145 }
146 
fireAllOps(EventuallyPersistentEngine &engine, ENGINE_ERROR_CODE code)147 void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
148                          ENGINE_ERROR_CODE code) {
149     if (pendingOpsStart > 0) {
150         hrtime_t now = gethrtime();
151         if (now > pendingOpsStart) {
152             hrtime_t d = (now - pendingOpsStart) / 1000;
153             stats.pendingOpsHisto.add(d);
154             atomic_setIfBigger(stats.pendingOpsMaxDuration, d);
155         }
156     } else {
157         return;
158     }
159 
160     pendingOpsStart = 0;
161     stats.pendingOps.fetch_sub(pendingOps.size());
162     atomic_setIfBigger(stats.pendingOpsMax, pendingOps.size());
163 
164     engine.notifyIOComplete(pendingOps, code);
165     pendingOps.clear();
166 
167     LOG(EXTENSION_LOG_INFO,
168         "Fired pendings ops for vbucket %d in state %s\n",
169         id, VBucket::toString(state));
170 }
171 
fireAllOps(EventuallyPersistentEngine &engine)172 void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
173     LockHolder lh(pendingOpLock);
174 
175     if (state == vbucket_state_active) {
176         fireAllOps(engine, ENGINE_SUCCESS);
177     } else if (state == vbucket_state_pending) {
178         // Nothing
179     } else {
180         fireAllOps(engine, ENGINE_NOT_MY_VBUCKET);
181     }
182 }
183 
setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi)184 void VBucket::setState(vbucket_state_t to, SERVER_HANDLE_V1 *sapi) {
185     cb_assert(sapi);
186     vbucket_state_t oldstate(state);
187 
188     if (to == vbucket_state_active &&
189         checkpointManager.getOpenCheckpointId() < 2) {
190         checkpointManager.setOpenCheckpointId(2);
191     }
192 
193     if (oldstate == vbucket_state_active) {
194         uint64_t highSeqno = (uint64_t)checkpointManager.getHighSeqno();
195         setCurrentSnapshot(highSeqno, highSeqno);
196     }
197 
198     LOG(EXTENSION_LOG_DEBUG, "transitioning vbucket %d from %s to %s",
199         id, VBucket::toString(oldstate), VBucket::toString(to));
200 
201     state = to;
202 }
203 
doStatsForQueueing(Item& qi, size_t itemBytes)204 void VBucket::doStatsForQueueing(Item& qi, size_t itemBytes)
205 {
206     ++dirtyQueueSize;
207     dirtyQueueMem.fetch_add(sizeof(Item));
208     ++dirtyQueueFill;
209     dirtyQueueAge.fetch_add(qi.getQueuedTime());
210     dirtyQueuePendingWrites.fetch_add(itemBytes);
211 }
212 
doStatsForFlushing(Item& qi, size_t itemBytes)213 void VBucket::doStatsForFlushing(Item& qi, size_t itemBytes)
214 {
215     decrDirtyQueueSize(1);
216     if (dirtyQueueMem > sizeof(Item)) {
217         dirtyQueueMem.fetch_sub(sizeof(Item));
218     } else {
219         dirtyQueueMem.store(0);
220     }
221     ++dirtyQueueDrain;
222 
223     if (dirtyQueueAge > qi.getQueuedTime()) {
224         dirtyQueueAge.fetch_sub(qi.getQueuedTime());
225     } else {
226         dirtyQueueAge.store(0);
227     }
228 
229     if (dirtyQueuePendingWrites > itemBytes) {
230         dirtyQueuePendingWrites.fetch_sub(itemBytes);
231     } else {
232         dirtyQueuePendingWrites.store(0);
233     }
234 }
235 
incrMetaDataDisk(Item& qi)236 void VBucket::incrMetaDataDisk(Item& qi)
237 {
238     metaDataDisk.fetch_add(qi.getNKey() + sizeof(ItemMetaData));
239 }
240 
decrMetaDataDisk(Item& qi)241 void VBucket::decrMetaDataDisk(Item& qi)
242 {
243     // assume couchstore remove approx this much data from disk
244     metaDataDisk.fetch_sub((qi.getNKey() + sizeof(ItemMetaData)));
245 }
246 
resetStats()247 void VBucket::resetStats() {
248     opsCreate.store(0);
249     opsUpdate.store(0);
250     opsDelete.store(0);
251     opsReject.store(0);
252 
253     stats.decrDiskQueueSize(dirtyQueueSize.load());
254     dirtyQueueSize.store(0);
255     dirtyQueueMem.store(0);
256     dirtyQueueFill.store(0);
257     dirtyQueueAge.store(0);
258     dirtyQueuePendingWrites.store(0);
259     dirtyQueueDrain.store(0);
260     fileSpaceUsed = 0;
261     fileSize = 0;
262 }
263 
264 template <typename T>
addStat(const char *nm, const T &val, ADD_STAT add_stat, const void *c)265 void VBucket::addStat(const char *nm, const T &val, ADD_STAT add_stat,
266                       const void *c) {
267     std::stringstream name;
268     name << "vb_" << id;
269     if (nm != NULL) {
270         name << ":" << nm;
271     }
272     std::stringstream value;
273     value << val;
274     std::string n = name.str();
275     add_casted_stat(n.data(), value.str().data(), add_stat, c);
276 }
277 
queueBGFetchItem(const std::string &key, VBucketBGFetchItem *fetch, BgFetcher *bgFetcher)278 void VBucket::queueBGFetchItem(const std::string &key,
279                                VBucketBGFetchItem *fetch,
280                                BgFetcher *bgFetcher) {
281     LockHolder lh(pendingBGFetchesLock);
282     pendingBGFetches[key].push_back(fetch);
283     bgFetcher->addPendingVB(id);
284     lh.unlock();
285 }
286 
getBGFetchItems(vb_bgfetch_queue_t &fetches)287 bool VBucket::getBGFetchItems(vb_bgfetch_queue_t &fetches) {
288     LockHolder lh(pendingBGFetchesLock);
289     fetches.insert(pendingBGFetches.begin(), pendingBGFetches.end());
290     pendingBGFetches.clear();
291     lh.unlock();
292     return fetches.size() > 0;
293 }
294 
addHighPriorityVBEntry(uint64_t id, const void *cookie, bool isBySeqno)295 void VBucket::addHighPriorityVBEntry(uint64_t id, const void *cookie,
296                                      bool isBySeqno) {
297     LockHolder lh(hpChksMutex);
298     if (shard) {
299         ++shard->highPriorityCount;
300     }
301     hpChks.push_back(HighPriorityVBEntry(cookie, id, isBySeqno));
302     numHpChks = hpChks.size();
303 }
304 
notifyCheckpointPersisted(EventuallyPersistentEngine &e, uint64_t idNum, bool isBySeqno)305 void VBucket::notifyCheckpointPersisted(EventuallyPersistentEngine &e,
306                                         uint64_t idNum,
307                                         bool isBySeqno) {
308     LockHolder lh(hpChksMutex);
309     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
310     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
311 
312     while (entry != hpChks.end()) {
313         if (isBySeqno != entry->isBySeqno_) {
314             ++entry;
315             continue;
316         }
317 
318         hrtime_t wall_time(gethrtime() - entry->start);
319         size_t spent = wall_time / 1000000000;
320         if (entry->id <= idNum) {
321             toNotify[entry->cookie] = ENGINE_SUCCESS;
322             stats.chkPersistenceHisto.add(wall_time / 1000);
323             adjustCheckpointFlushTimeout(wall_time / 1000000000);
324             LOG(EXTENSION_LOG_WARNING, "Notified the completion of checkpoint "
325                 "persistence for vbucket %d, id %llu, cookie %p", id, idNum,
326                 entry->cookie);
327             entry = hpChks.erase(entry);
328             if (shard) {
329                 --shard->highPriorityCount;
330             }
331         } else if (spent > getCheckpointFlushTimeout()) {
332             adjustCheckpointFlushTimeout(spent);
333             e.storeEngineSpecific(entry->cookie, NULL);
334             toNotify[entry->cookie] = ENGINE_TMPFAIL;
335             LOG(EXTENSION_LOG_WARNING, "Notified the timeout on checkpoint "
336                 "persistence for vbucket %d, id %llu, cookie %p", id, idNum,
337                 entry->cookie);
338             entry = hpChks.erase(entry);
339             if (shard) {
340                 --shard->highPriorityCount;
341             }
342         } else {
343             ++entry;
344         }
345     }
346     numHpChks = hpChks.size();
347     lh.unlock();
348 
349     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
350     for (; itr != toNotify.end(); ++itr) {
351         e.notifyIOComplete(itr->first, itr->second);
352     }
353 
354 }
355 
notifyAllPendingConnsFailed(EventuallyPersistentEngine &e)356 void VBucket::notifyAllPendingConnsFailed(EventuallyPersistentEngine &e) {
357     LockHolder lh(hpChksMutex);
358     std::map<const void*, ENGINE_ERROR_CODE> toNotify;
359     std::list<HighPriorityVBEntry>::iterator entry = hpChks.begin();
360     while (entry != hpChks.end()) {
361         toNotify[entry->cookie] = ENGINE_TMPFAIL;
362         e.storeEngineSpecific(entry->cookie, NULL);
363         entry = hpChks.erase(entry);
364         if (shard) {
365             --shard->highPriorityCount;
366         }
367     }
368     lh.unlock();
369 
370     std::map<const void*, ENGINE_ERROR_CODE>::iterator itr = toNotify.begin();
371     for (; itr != toNotify.end(); ++itr) {
372         e.notifyIOComplete(itr->first, itr->second);
373     }
374 
375     fireAllOps(e);
376 }
377 
adjustCheckpointFlushTimeout(size_t wall_time)378 void VBucket::adjustCheckpointFlushTimeout(size_t wall_time) {
379     size_t middle = (MIN_CHK_FLUSH_TIMEOUT + MAX_CHK_FLUSH_TIMEOUT) / 2;
380 
381     if (wall_time <= MIN_CHK_FLUSH_TIMEOUT) {
382         chkFlushTimeout = MIN_CHK_FLUSH_TIMEOUT;
383     } else if (wall_time <= middle) {
384         chkFlushTimeout = middle;
385     } else {
386         chkFlushTimeout = MAX_CHK_FLUSH_TIMEOUT;
387     }
388 }
389 
getHighPriorityChkSize()390 size_t VBucket::getHighPriorityChkSize() {
391     return numHpChks;
392 }
393 
getCheckpointFlushTimeout()394 size_t VBucket::getCheckpointFlushTimeout() {
395     return chkFlushTimeout;
396 }
397 
getNumItems(item_eviction_policy_t policy)398 size_t VBucket::getNumItems(item_eviction_policy_t policy) {
399     if (policy == VALUE_ONLY) {
400         return ht.getNumInMemoryItems();
401     } else {
402         return ht.getNumItems();
403     }
404 }
405 
getNumNonResidentItems(item_eviction_policy_t policy)406 size_t VBucket::getNumNonResidentItems(item_eviction_policy_t policy) {
407     if (policy == VALUE_ONLY) {
408         return ht.getNumInMemoryNonResItems();
409     } else {
410         size_t num_items = ht.getNumItems();
411         size_t num_res_items = ht.getNumInMemoryItems() -
412                                ht.getNumInMemoryNonResItems();
413         return num_items > num_res_items ? (num_items - num_res_items) : 0;
414     }
415 }
416 
addStats(bool details, ADD_STAT add_stat, const void *c, item_eviction_policy_t policy)417 void VBucket::addStats(bool details, ADD_STAT add_stat, const void *c,
418                        item_eviction_policy_t policy) {
419     addStat(NULL, toString(state), add_stat, c);
420     if (details) {
421         size_t numItems = getNumItems(policy);
422         size_t tempItems = getNumTempItems();
423         addStat("num_items", numItems, add_stat, c);
424         addStat("num_temp_items", tempItems, add_stat, c);
425         addStat("num_non_resident", getNumNonResidentItems(policy),
426                 add_stat, c);
427         addStat("ht_memory", ht.memorySize(), add_stat, c);
428         addStat("ht_item_memory", ht.getItemMemory(), add_stat, c);
429         addStat("ht_cache_size", ht.cacheSize, add_stat, c);
430         addStat("num_ejects", ht.getNumEjects(), add_stat, c);
431         addStat("ops_create", opsCreate, add_stat, c);
432         addStat("ops_update", opsUpdate, add_stat, c);
433         addStat("ops_delete", opsDelete, add_stat, c);
434         addStat("ops_reject", opsReject, add_stat, c);
435         addStat("queue_size", dirtyQueueSize, add_stat, c);
436         addStat("queue_memory", dirtyQueueMem, add_stat, c);
437         addStat("queue_fill", dirtyQueueFill, add_stat, c);
438         addStat("queue_drain", dirtyQueueDrain, add_stat, c);
439         addStat("queue_age", getQueueAge(), add_stat, c);
440         addStat("pending_writes", dirtyQueuePendingWrites, add_stat, c);
441         addStat("db_data_size", fileSpaceUsed, add_stat, c);
442         addStat("db_file_size", fileSize, add_stat, c);
443         addStat("high_seqno", getHighSeqno(), add_stat, c);
444         addStat("uuid", failovers->getLatestEntry().vb_uuid, add_stat, c);
445         addStat("purge_seqno", getPurgeSeqno(), add_stat, c);
446     }
447 }
448