1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "ep_bucket.h"
19 
20 #include "bgfetcher.h"
21 #include "checkpoint.h"
22 #include "ep_engine.h"
23 #include "ep_time.h"
24 #include "ep_vb.h"
25 #include "failover-table.h"
26 #include "flusher.h"
27 #include "persistence_callback.h"
28 #include "replicationthrottle.h"
29 #include "tasks.h"
30 
31 /**
32  * Callback class used by EpStore, for adding relevant keys
33  * to bloomfilter during compaction.
34  */
35 class BloomFilterCallback : public Callback<uint16_t&, const DocKey&, bool&> {
36 public:
BloomFilterCallback(KVBucket& eps)37     BloomFilterCallback(KVBucket& eps) : store(eps) {
38     }
39 
callback(uint16_t& vbucketId, const DocKey& key, bool& isDeleted)40     void callback(uint16_t& vbucketId, const DocKey& key, bool& isDeleted) {
41         VBucketPtr vb = store.getVBucket(vbucketId);
42         if (vb) {
43             /* Check if a temporary filter has been initialized. If not,
44              * initialize it. If initialization fails, throw an exception
45              * to the caller and let the caller deal with it.
46              */
47             bool tempFilterInitialized = vb->isTempFilterAvailable();
48             if (!tempFilterInitialized) {
49                 tempFilterInitialized = initTempFilter(vbucketId);
50             }
51 
52             if (!tempFilterInitialized) {
53                 throw std::runtime_error(
54                         "BloomFilterCallback::callback: Failed "
55                         "to initialize temporary filter for vbucket: " +
56                         std::to_string(vbucketId));
57             }
58 
59             if (store.getItemEvictionPolicy() == VALUE_ONLY) {
60                 /**
61                  * VALUE-ONLY EVICTION POLICY
62                  * Consider deleted items only.
63                  */
64                 if (isDeleted) {
65                     vb->addToTempFilter(key);
66                 }
67             } else {
68                 /**
69                  * FULL EVICTION POLICY
70                  * If vbucket's resident ratio is found to be less than
71                  * the residency threshold, consider all items, otherwise
72                  * consider deleted and non-resident items only.
73                  */
74                 bool residentRatioLessThanThreshold =
75                         vb->isResidentRatioUnderThreshold(
76                                 store.getBfiltersResidencyThreshold());
77                 if (residentRatioLessThanThreshold) {
78                     vb->addToTempFilter(key);
79                 } else {
80                     if (isDeleted || !store.isMetaDataResident(vb, key)) {
81                         vb->addToTempFilter(key);
82                     }
83                 }
84             }
85         }
86     }
87 
88 private:
89     bool initTempFilter(uint16_t vbucketId);
90     KVBucket& store;
91 };
92 
initTempFilter(uint16_t vbucketId)93 bool BloomFilterCallback::initTempFilter(uint16_t vbucketId) {
94     Configuration& config = store.getEPEngine().getConfiguration();
95     VBucketPtr vb = store.getVBucket(vbucketId);
96     if (!vb) {
97         return false;
98     }
99 
100     size_t initial_estimation = config.getBfilterKeyCount();
101     size_t estimated_count;
102     size_t num_deletes = 0;
103     try {
104         num_deletes = store.getROUnderlying(vbucketId)->getNumPersistedDeletes(vbucketId);
105     } catch (std::runtime_error& re) {
106         LOG(EXTENSION_LOG_WARNING,
107             "BloomFilterCallback::initTempFilter: runtime error while getting "
108             "number of persisted deletes for vbucket: %" PRIu16
109             "Details: %s", vbucketId, re.what());
110         return false;
111     }
112 
113     item_eviction_policy_t eviction_policy = store.getItemEvictionPolicy();
114     if (eviction_policy == VALUE_ONLY) {
115         /**
116          * VALUE-ONLY EVICTION POLICY
117          * Obtain number of persisted deletes from underlying kvstore.
118          * Bloomfilter's estimated_key_count = 1.25 * deletes
119          */
120         estimated_count = round(1.25 * num_deletes);
121     } else {
122         /**
123          * FULL EVICTION POLICY
124          * First determine if the resident ratio of vbucket is less than
125          * the threshold from configuration.
126          */
127         bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
128                 store.getBfiltersResidencyThreshold());
129 
130         /**
131          * Based on resident ratio against threshold, estimate count.
132          *
133          * 1. If resident ratio is greater than the threshold:
134          * Obtain number of persisted deletes from underlying kvstore.
135          * Obtain number of non-resident-items for vbucket.
136          * Bloomfilter's estimated_key_count =
137          *                              1.25 * (deletes + non-resident)
138          *
139          * 2. Otherwise:
140          * Obtain number of items for vbucket.
141          * Bloomfilter's estimated_key_count =
142          *                              1.25 * (num_items)
143          */
144 
145         if (residentRatioAlert) {
146             estimated_count = round(1.25 * vb->getNumItems());
147         } else {
148             estimated_count =
149                     round(1.25 * (num_deletes + vb->getNumNonResidentItems()));
150         }
151     }
152 
153     if (estimated_count < initial_estimation) {
154         estimated_count = initial_estimation;
155     }
156 
157     vb->initTempFilter(estimated_count, config.getBfilterFpProb());
158 
159     return true;
160 }
161 
162 class ExpiredItemsCallback : public Callback<Item&, time_t&> {
163 public:
ExpiredItemsCallback(KVBucket& store)164     ExpiredItemsCallback(KVBucket& store) : epstore(store) {
165     }
166 
callback(Item& it, time_t& startTime)167     void callback(Item& it, time_t& startTime) {
168         if (epstore.compactionCanExpireItems()) {
169             epstore.deleteExpiredItem(it, startTime, ExpireBy::Compactor);
170         }
171     }
172 
173 private:
174     KVBucket& epstore;
175 };
176 
177 class EPBucket::ValueChangedListener : public ::ValueChangedListener {
178 public:
ValueChangedListener(EPBucket& bucket)179     ValueChangedListener(EPBucket& bucket) : bucket(bucket) {
180     }
181 
182     virtual void sizeValueChanged(const std::string& key,
183                                   size_t value) override {
184         if (key == "flusher_batch_split_trigger") {
185             bucket.setFlusherBatchSplitTrigger(value);
186         } else {
187             LOG(EXTENSION_LOG_WARNING,
188                 "Failed to change value for unknown variable, %s\n",
189                 key.c_str());
190         }
191     }
192 
193 private:
194     EPBucket& bucket;
195 };
196 
EPBucket(EventuallyPersistentEngine& theEngine)197 EPBucket::EPBucket(EventuallyPersistentEngine& theEngine)
198     : KVBucket(theEngine) {
199     auto& config = engine.getConfiguration();
200     const std::string& policy = config.getItemEvictionPolicy();
201     if (policy.compare("value_only") == 0) {
202         eviction_policy = VALUE_ONLY;
203     } else {
204         eviction_policy = FULL_EVICTION;
205     }
206     replicationThrottle = std::make_unique<ReplicationThrottle>(
207             engine.getConfiguration(), stats);
208 
209     vbMap.enablePersistence(*this);
210 
211     flusherBatchSplitTrigger = config.getFlusherBatchSplitTrigger();
212     config.addValueChangedListener(
213             "flusher_batch_split_trigger",
214             std::make_unique<ValueChangedListener>(*this));
215 }
216 
initialize()217 bool EPBucket::initialize() {
218     KVBucket::initialize();
219 
220     enableItemPager();
221 
222     if (!startBgFetcher()) {
223         LOG(EXTENSION_LOG_FATAL,
224            "EPBucket::initialize: Failed to create and start bgFetchers");
225         return false;
226     }
227     startFlusher();
228 
229     return true;
230 }
231 
deinitialize()232 void EPBucket::deinitialize() {
233     stopFlusher();
234     stopBgFetcher();
235 
236     KVBucket::deinitialize();
237 }
238 
reset()239 void EPBucket::reset() {
240     KVBucket::reset();
241 
242     // Need to additionally update disk state
243     bool inverse = true;
244     deleteAllTaskCtx.delay.compare_exchange_strong(inverse, false);
245     // Waking up (notifying) one flusher is good enough for diskDeleteAll
246     vbMap.getShard(EP_PRIMARY_SHARD)->getFlusher()->notifyFlushEvent();
247 }
248 
flushVBucket(uint16_t vbid)249 std::pair<bool, size_t> EPBucket::flushVBucket(uint16_t vbid) {
250     KVShard *shard = vbMap.getShardByVbId(vbid);
251     if (diskDeleteAll && !deleteAllTaskCtx.delay) {
252         if (shard->getId() == EP_PRIMARY_SHARD) {
253             flushOneDeleteAll();
254         } else {
255             // disk flush is pending just return
256             return {false, 0};
257         }
258     }
259 
260     int items_flushed = 0;
261     bool moreAvailable = false;
262     const auto flush_start = ProcessClock::now();
263 
264     auto vb = getLockedVBucket(vbid, std::try_to_lock);
265     if (!vb.owns_lock()) {
266         // Try another bucket if this one is locked to avoid blocking flusher.
267         return {true, 0};
268     }
269     if (vb) {
270         // Obtain the set of items to flush, up to the maximum allowed for
271         // a single flush.
272         auto toFlush = vb->getItemsToPersist(flusherBatchSplitTrigger);
273         auto& items = toFlush.items;
274         auto& range = toFlush.range;
275         moreAvailable = toFlush.moreAvailable;
276 
277         KVStore* rwUnderlying = getRWUnderlying(vb->getId());
278 
279         if (!items.empty()) {
280             while (!rwUnderlying->begin(
281                     std::make_unique<EPTransactionContext>(stats, *vb))) {
282                 ++stats.beginFailed;
283                 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
284                     "Retry in 1 sec ...");
285                 sleep(1);
286             }
287             rwUnderlying->optimizeWrites(items);
288 
289             Item *prev = NULL;
290             auto vbstate = vb->getVBucketState();
291             uint64_t maxSeqno = 0;
292             auto minSeqno = std::numeric_limits<uint64_t>::max();
293 
294             range.start = std::max(range.start, vbstate.lastSnapStart);
295 
296             bool mustCheckpointVBState = false;
297             auto& pcbs = rwUnderlying->getPersistenceCbList();
298 
299             SystemEventFlush sef;
300 
301             for (const auto& item : items) {
302 
303                 if (!item->shouldPersist()) {
304                     continue;
305                 }
306 
307                 // Pass the Item through the SystemEventFlush which may filter
308                 // the item away (return Skip).
309                 if (sef.process(item) == ProcessStatus::Skip) {
310                     // The item has no further flushing actions i.e. we've
311                     // absorbed it in the process function.
312                     // Update stats and carry-on
313                     --stats.diskQueueSize;
314                     vb->doStatsForFlushing(*item, item->size());
315                     continue;
316                 }
317 
318                 if (item->getOperation() == queue_op::set_vbucket_state) {
319                     // No actual item explicitly persisted to (this op exists
320                     // to ensure a commit occurs with the current vbstate);
321                     // flag that we must trigger a snapshot even if there are
322                     // no 'real' items in the checkpoint.
323                     mustCheckpointVBState = true;
324 
325                     // Update queuing stats how this item has logically been
326                     // processed.
327                     --stats.diskQueueSize;
328                     vb->doStatsForFlushing(*item, item->size());
329 
330                 } else if (!prev || prev->getKey() != item->getKey()) {
331                     prev = item.get();
332                     ++items_flushed;
333                     auto cb = flushOneDelOrSet(item, vb.getVB());
334                     if (cb) {
335                         pcbs.emplace_back(std::move(cb));
336                     }
337 
338                     maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
339 
340                     // Track the lowest seqno, so we can set the HLC epoch
341                     minSeqno = std::min(minSeqno, (uint64_t)item->getBySeqno());
342                     vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
343                     if (item->isDeleted()) {
344                         vbstate.maxDeletedSeqno =
345                                 std::max(uint64_t(vbstate.maxDeletedSeqno),
346                                          item->getRevSeqno());
347                     }
348                     ++stats.flusher_todo;
349 
350                 } else {
351                     // Item is the same key as the previous[1] one - don't need
352                     // to flush to disk.
353                     // [1] Previous here really means 'next' - optimizeWrites()
354                     //     above has actually re-ordered items such that items
355                     //     with the same key are ordered from high->low seqno.
356                     //     This means we only write the highest (i.e. newest)
357                     //     item for a given key, and discard any duplicate,
358                     //     older items.
359                     --stats.diskQueueSize;
360                     vb->doStatsForFlushing(*item, item->size());
361                 }
362             }
363 
364 
365             {
366                 ReaderLockHolder rlh(vb->getStateLock());
367                 if (vb->getState() == vbucket_state_active) {
368                     if (maxSeqno) {
369                         range.start = maxSeqno;
370                         range.end = maxSeqno;
371                     }
372                 }
373 
374                 // Update VBstate based on the changes we have just made,
375                 // then tell the rwUnderlying the 'new' state
376                 // (which will persisted as part of the commit() below).
377                 vbstate.lastSnapStart = range.start;
378                 vbstate.lastSnapEnd = range.end;
379 
380                 // Track the lowest seqno written in spock and record it as
381                 // the HLC epoch, a seqno which we can be sure the value has a
382                 // HLC CAS.
383                 vbstate.hlcCasEpochSeqno = vb->getHLCEpochSeqno();
384                 if (vbstate.hlcCasEpochSeqno == HlcCasSeqnoUninitialised &&
385                     minSeqno != std::numeric_limits<uint64_t>::max()) {
386                     vbstate.hlcCasEpochSeqno = minSeqno;
387                     vb->setHLCEpochSeqno(vbstate.hlcCasEpochSeqno);
388                 }
389 
390                 // Track if the VB has xattrs present
391                 vbstate.mightContainXattrs = vb->mightContainXattrs();
392 
393                 // Do we need to trigger a persist of the state?
394                 // If there are no "real" items to flush, and we encountered
395                 // a set_vbucket_state meta-item.
396                 auto options = VBStatePersist::VBSTATE_CACHE_UPDATE_ONLY;
397                 if ((items_flushed == 0) && mustCheckpointVBState) {
398                     options = VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT;
399                 }
400 
401                 if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
402                                                   options) != true) {
403                     return {true, 0};
404                 }
405 
406                 if (vb->setBucketCreation(false)) {
407                     LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
408                 }
409             }
410 
411             /* Perform an explicit commit to disk if the commit
412              * interval reaches zero and if there is a non-zero number
413              * of items to flush.
414              * Or if there is a manifest item
415              */
416             if (items_flushed > 0 || sef.getCollectionsManifestItem()) {
417                 commit(*rwUnderlying, sef.getCollectionsManifestItem());
418 
419                 // Now the commit is complete, vBucket file must exist.
420                 if (vb->setBucketCreation(false)) {
421                     LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
422                 }
423             }
424 
425             if (vb->rejectQueue.empty()) {
426                 vb->setPersistedSnapshot(range.start, range.end);
427                 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
428                 if (highSeqno > 0 && highSeqno != vb->getPersistenceSeqno()) {
429                     vb->setPersistenceSeqno(highSeqno);
430                 }
431             }
432 
433             auto flush_end = ProcessClock::now();
434             uint64_t trans_time =
435                     std::chrono::duration_cast<std::chrono::milliseconds>(
436                             flush_end - flush_start)
437                             .count();
438 
439             lastTransTimePerItem.store((items_flushed == 0) ? 0 :
440                                        static_cast<double>(trans_time) /
441                                        static_cast<double>(items_flushed));
442             stats.cumulativeFlushTime.fetch_add(trans_time);
443             stats.flusher_todo.store(0);
444             stats.totalPersistVBState++;
445         }
446 
447         rwUnderlying->pendingTasks();
448 
449         if (vb->checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
450             wakeUpCheckpointRemover();
451         }
452 
453         if (vb->rejectQueue.empty()) {
454             vb->checkpointManager->itemsPersisted();
455             uint64_t seqno = vb->getPersistenceSeqno();
456             uint64_t chkid =
457                     vb->checkpointManager->getPersistenceCursorPreChkId();
458             vb->notifyHighPriorityRequests(
459                     engine, seqno, HighPriorityVBNotify::Seqno);
460             vb->notifyHighPriorityRequests(
461                     engine, chkid, HighPriorityVBNotify::ChkPersistence);
462             if (chkid > 0 && chkid != vb->getPersistenceCheckpointId()) {
463                 vb->setPersistenceCheckpointId(chkid);
464             }
465         } else {
466             return {true, items_flushed};
467         }
468     }
469 
470     return {moreAvailable, items_flushed};
471 }
472 
setFlusherBatchSplitTrigger(size_t limit)473 void EPBucket::setFlusherBatchSplitTrigger(size_t limit) {
474     flusherBatchSplitTrigger = limit;
475 }
476 
commit(KVStore& kvstore, const Item* collectionsManifest)477 void EPBucket::commit(KVStore& kvstore, const Item* collectionsManifest) {
478     auto& pcbs = kvstore.getPersistenceCbList();
479     BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
480     auto commit_start = ProcessClock::now();
481 
482     while (!kvstore.commit(collectionsManifest)) {
483         ++stats.commitFailed;
484         LOG(EXTENSION_LOG_WARNING,
485             "KVBucket::commit: kvstore.commit failed!!! Retry in 1 sec...");
486         sleep(1);
487     }
488 
489     pcbs.clear();
490     pcbs.shrink_to_fit();
491 
492     ++stats.flusherCommits;
493     auto commit_end = ProcessClock::now();
494     auto commit_time = std::chrono::duration_cast<std::chrono::milliseconds>(
495                                commit_end - commit_start)
496                                .count();
497     stats.commit_time.store(commit_time);
498     stats.cumulativeCommitTime.fetch_add(commit_time);
499 }
500 
startFlusher()501 void EPBucket::startFlusher() {
502     for (const auto& shard : vbMap.shards) {
503         shard->getFlusher()->start();
504     }
505 }
506 
stopFlusher()507 void EPBucket::stopFlusher() {
508     for (const auto& shard : vbMap.shards) {
509         auto* flusher = shard->getFlusher();
510         LOG(EXTENSION_LOG_NOTICE,
511             "Attempting to stop the flusher for "
512             "shard:%" PRIu16,
513             shard->getId());
514         bool rv = flusher->stop(stats.forceShutdown);
515         if (rv && !stats.forceShutdown) {
516             flusher->wait();
517         }
518     }
519 }
520 
pauseFlusher()521 bool EPBucket::pauseFlusher() {
522     bool rv = true;
523     for (const auto& shard : vbMap.shards) {
524         auto* flusher = shard->getFlusher();
525         if (!flusher->pause()) {
526             LOG(EXTENSION_LOG_WARNING,
527                 "Attempted to pause flusher in state "
528                 "[%s], shard = %d",
529                 flusher->stateName(),
530                 shard->getId());
531             rv = false;
532         }
533     }
534     return rv;
535 }
536 
resumeFlusher()537 bool EPBucket::resumeFlusher() {
538     bool rv = true;
539     for (const auto& shard : vbMap.shards) {
540         auto* flusher = shard->getFlusher();
541         if (!flusher->resume()) {
542             LOG(EXTENSION_LOG_WARNING,
543                 "Attempted to resume flusher in state [%s], "
544                 "shard = %" PRIu16,
545                 flusher->stateName(),
546                 shard->getId());
547             rv = false;
548         }
549     }
550     return rv;
551 }
552 
wakeUpFlusher()553 void EPBucket::wakeUpFlusher() {
554     if (stats.diskQueueSize.load() == 0) {
555         for (const auto& shard : vbMap.shards) {
556             shard->getFlusher()->wake();
557         }
558     }
559 }
560 
startBgFetcher()561 bool EPBucket::startBgFetcher() {
562     for (const auto& shard : vbMap.shards) {
563         BgFetcher* bgfetcher = shard->getBgFetcher();
564         if (bgfetcher == NULL) {
565             LOG(EXTENSION_LOG_WARNING,
566                 "Failed to start bg fetcher for shard %" PRIu16,
567                 shard->getId());
568             return false;
569         }
570         bgfetcher->start();
571     }
572     return true;
573 }
574 
stopBgFetcher()575 void EPBucket::stopBgFetcher() {
576     for (const auto& shard : vbMap.shards) {
577         BgFetcher* bgfetcher = shard->getBgFetcher();
578         if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
579             LOG(EXTENSION_LOG_WARNING,
580                 "Shutting down engine while there are still pending data "
581                 "read for shard %" PRIu16 " from database storage",
582                 shard->getId());
583         }
584         LOG(EXTENSION_LOG_NOTICE,
585             "Stopping bg fetcher for shard:%" PRIu16,
586             shard->getId());
587         bgfetcher->stop();
588     }
589 }
590 
scheduleCompaction(uint16_t vbid, compaction_ctx c, const void* cookie)591 ENGINE_ERROR_CODE EPBucket::scheduleCompaction(uint16_t vbid,
592                                                compaction_ctx c,
593                                                const void* cookie) {
594     ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
595     if (errCode != ENGINE_SUCCESS) {
596         return errCode;
597     }
598 
599     /* Obtain the vbucket so we can get the previous purge seqno */
600     VBucketPtr vb = vbMap.getBucket(vbid);
601     if (!vb) {
602         return ENGINE_NOT_MY_VBUCKET;
603     }
604 
605     /* Update the compaction ctx with the previous purge seqno */
606     c.max_purged_seq[vbid] = vb->getPurgeSeqno();
607 
608     LockHolder lh(compactionLock);
609     ExTask task = std::make_shared<CompactTask>(*this, c, cookie);
610     compactionTasks.push_back(std::make_pair(c.db_file_id, task));
611     if (compactionTasks.size() > 1) {
612         if ((stats.diskQueueSize > compactionWriteQueueCap &&
613              compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
614             engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
615             // Snooze a new compaction task.
616             // We will wake it up when one of the existing compaction tasks is
617             // done.
618             task->snooze(60);
619         }
620     }
621 
622     ExecutorPool::get()->schedule(task);
623 
624     LOG(EXTENSION_LOG_DEBUG,
625         "Scheduled compaction task %" PRIu64
626         " on db %d,"
627         "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64
628         ", dropdeletes = %d",
629         uint64_t(task->getId()),
630         c.db_file_id,
631         c.purge_before_ts,
632         c.purge_before_seq,
633         c.drop_deletes);
634 
635     return ENGINE_EWOULDBLOCK;
636 }
637 
flushOneDeleteAll()638 void EPBucket::flushOneDeleteAll() {
639     for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
640         auto vb = getLockedVBucket(i);
641         if (!vb) {
642             continue;
643         }
644         // Reset the vBucket if it's non-null and not already in the middle of
645         // being created / destroyed.
646         if (!(vb->isBucketCreation() || vb->isDeletionDeferred())) {
647             getRWUnderlying(vb->getId())->reset(i);
648         }
649         // Reset disk item count.
650         vb->setNumTotalItems(0);
651     }
652 
653     setDeleteAllComplete();
654 }
655 
flushOneDelOrSet( const queued_item& qi, VBucketPtr& vb)656 std::unique_ptr<PersistenceCallback> EPBucket::flushOneDelOrSet(
657         const queued_item& qi, VBucketPtr& vb) {
658     if (!vb) {
659         --stats.diskQueueSize;
660         return NULL;
661     }
662 
663     int64_t bySeqno = qi->getBySeqno();
664     bool deleted = qi->isDeleted();
665     rel_time_t queued(qi->getQueuedTime());
666 
667     auto dirtyAge = std::chrono::seconds(ep_current_time() - queued);
668     stats.dirtyAgeHisto.add(dirtyAge);
669     stats.dirtyAge.store(static_cast<rel_time_t>(dirtyAge.count()));
670     stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
671                                          stats.dirtyAgeHighWat.load()));
672 
673     KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
674     if (!deleted) {
675         // TODO: Need to separate disk_insert from disk_update because
676         // bySeqno doesn't give us that information.
677         BlockTimer timer(bySeqno == -1 ?
678                          &stats.diskInsertHisto : &stats.diskUpdateHisto,
679                          bySeqno == -1 ? "disk_insert" : "disk_update",
680                          stats.timingLog);
681         auto cb = std::make_unique<PersistenceCallback>(qi, qi->getCas());
682         rwUnderlying->set(*qi, *cb);
683         return cb;
684     } else {
685         BlockTimer timer(&stats.diskDelHisto, "disk_delete",
686                          stats.timingLog);
687         auto cb = std::make_unique<PersistenceCallback>(qi, 0);
688         rwUnderlying->del(*qi, *cb);
689         return cb;
690     }
691 }
692 
compactInternal(compaction_ctx* ctx)693 void EPBucket::compactInternal(compaction_ctx* ctx) {
694     BloomFilterCBPtr filter(new BloomFilterCallback(*this));
695     ctx->bloomFilterCallback = filter;
696 
697     ExpiredItemsCBPtr expiry(new ExpiredItemsCallback(*this));
698     ctx->expiryCallback = expiry;
699 
700     ctx->collectionsEraser = std::bind(&KVBucket::collectionsEraseKey,
701                                        this,
702                                        uint16_t(ctx->db_file_id),
703                                        std::placeholders::_1,
704                                        std::placeholders::_2,
705                                        std::placeholders::_3,
706                                        std::placeholders::_4);
707 
708     KVShard* shard = vbMap.getShardByVbId(ctx->db_file_id);
709     KVStore* store = shard->getRWUnderlying();
710     bool result = store->compactDB(ctx);
711 
712     Configuration& config = getEPEngine().getConfiguration();
713     /* Iterate over all the vbucket ids set in max_purged_seq map. If there is
714      * an entry
715      * in the map for a vbucket id, then it was involved in compaction and thus
716      * can
717      * be used to update the associated bloom filters and purge sequence numbers
718      */
719     for (auto& it : ctx->max_purged_seq) {
720         const uint16_t vbid = it.first;
721         VBucketPtr vb = getVBucket(vbid);
722         if (!vb) {
723             continue;
724         }
725 
726         if (config.isBfilterEnabled() && result) {
727             vb->swapFilter();
728         } else {
729             vb->clearFilter();
730         }
731         vb->setPurgeSeqno(it.second);
732     }
733 
734     LOG(EXTENSION_LOG_NOTICE,
735         "Compaction of db file id: %d completed (%s). "
736         "tombstones_purged:%" PRIu64 ", collection_items_erased:%" PRIu64
737         ", pre{size:%" PRIu64 ", items:%" PRIu64 ", deleted_items:%" PRIu64
738         ", purge_seqno:%" PRIu64 "}, post{size:%" PRIu64 ", items:%" PRIu64
739         ", deleted_items:%" PRIu64 ", purge_seqno:%" PRIu64 "}",
740         ctx->db_file_id,
741         result ? "ok" : "failed",
742         ctx->stats.tombstonesPurged,
743         ctx->stats.collectionsItemsPurged,
744         ctx->stats.pre.size,
745         ctx->stats.pre.items,
746         ctx->stats.pre.deletedItems,
747         ctx->stats.pre.purgeSeqno,
748         ctx->stats.post.size,
749         ctx->stats.post.items,
750         ctx->stats.post.deletedItems,
751         ctx->stats.post.purgeSeqno);
752 
753     // The collections eraser may have gathered some garbage keys which can now
754     // be released.
755     auto vb = getVBucket(uint16_t(ctx->db_file_id));
756     if (vb) {
757         ctx->eraserContext.processKeys(*vb);
758     }
759 }
760 
doCompact(compaction_ctx* ctx, const void* cookie)761 bool EPBucket::doCompact(compaction_ctx* ctx, const void* cookie) {
762     ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
763     StorageProperties storeProp = getStorageProperties();
764     bool concWriteCompact = storeProp.hasConcWriteCompact();
765     uint16_t vbid = ctx->db_file_id;
766 
767     /**
768      * Check if the underlying storage engine allows writes concurrently
769      * as the database file is being compacted. If not, a lock needs to
770      * be held in order to serialize access to the database file between
771      * the writer and compactor threads
772      */
773     if (concWriteCompact == false) {
774         auto vb = getLockedVBucket(vbid, std::try_to_lock);
775         if (!vb.owns_lock()) {
776             // VB currently locked; try again later.
777             return true;
778         }
779 
780         if (!vb) {
781             err = ENGINE_NOT_MY_VBUCKET;
782             engine.storeEngineSpecific(cookie, NULL);
783             /**
784              * Decrement session counter here, as memcached thread wouldn't
785              * visit the engine interface in case of a NOT_MY_VB notification
786              */
787             engine.decrementSessionCtr();
788         } else {
789             compactInternal(ctx);
790         }
791     } else {
792         compactInternal(ctx);
793     }
794 
795     updateCompactionTasks(ctx->db_file_id);
796 
797     if (cookie) {
798         engine.notifyIOComplete(cookie, err);
799     }
800     --stats.pendingCompactions;
801     return false;
802 }
803 
updateCompactionTasks(DBFileId db_file_id)804 void EPBucket::updateCompactionTasks(DBFileId db_file_id) {
805     LockHolder lh(compactionLock);
806     bool erased = false, woke = false;
807     std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
808     while (it != compactionTasks.end()) {
809         if ((*it).first == db_file_id) {
810             it = compactionTasks.erase(it);
811             erased = true;
812         } else {
813             ExTask& task = (*it).second;
814             if (task->getState() == TASK_SNOOZED) {
815                 ExecutorPool::get()->wake(task->getId());
816                 woke = true;
817             }
818             ++it;
819         }
820         if (erased && woke) {
821             break;
822         }
823     }
824 }
825 
getLastPersistedCheckpointId(uint16_t vb)826 std::pair<uint64_t, bool> EPBucket::getLastPersistedCheckpointId(uint16_t vb) {
827     auto vbucket = vbMap.getBucket(vb);
828     if (vbucket) {
829         return {vbucket->getPersistenceCheckpointId(), true};
830     } else {
831         return {0, true};
832     }
833 }
834 
getFileStats(const void* cookie, ADD_STAT add_stat)835 ENGINE_ERROR_CODE EPBucket::getFileStats(const void* cookie,
836                                          ADD_STAT add_stat) {
837     const auto numShards = vbMap.getNumShards();
838     DBFileInfo totalInfo;
839 
840     for (uint16_t shardId = 0; shardId < numShards; shardId++) {
841         const auto dbInfo =
842                 getRWUnderlyingByShard(shardId)->getAggrDbFileInfo();
843         totalInfo.spaceUsed += dbInfo.spaceUsed;
844         totalInfo.fileSize += dbInfo.fileSize;
845     }
846 
847     add_casted_stat("ep_db_data_size", totalInfo.spaceUsed, add_stat, cookie);
848     add_casted_stat("ep_db_file_size", totalInfo.fileSize, add_stat, cookie);
849 
850     return ENGINE_SUCCESS;
851 }
852 
getPerVBucketDiskStats(const void* cookie, ADD_STAT add_stat)853 ENGINE_ERROR_CODE EPBucket::getPerVBucketDiskStats(const void* cookie,
854                                                    ADD_STAT add_stat) {
855     class DiskStatVisitor : public VBucketVisitor {
856     public:
857         DiskStatVisitor(const void* c, ADD_STAT a) : cookie(c), add_stat(a) {
858         }
859 
860         void visitBucket(VBucketPtr& vb) override {
861             char buf[32];
862             uint16_t vbid = vb->getId();
863             DBFileInfo dbInfo =
864                     vb->getShard()->getRWUnderlying()->getDbFileInfo(vbid);
865 
866             try {
867                 checked_snprintf(buf, sizeof(buf), "vb_%d:data_size", vbid);
868                 add_casted_stat(buf, dbInfo.spaceUsed, add_stat, cookie);
869                 checked_snprintf(buf, sizeof(buf), "vb_%d:file_size", vbid);
870                 add_casted_stat(buf, dbInfo.fileSize, add_stat, cookie);
871             } catch (std::exception& error) {
872                 LOG(EXTENSION_LOG_WARNING,
873                     "DiskStatVisitor::visitBucket: Failed to build stat: %s",
874                     error.what());
875             }
876         }
877 
878     private:
879         const void* cookie;
880         ADD_STAT add_stat;
881     };
882 
883     DiskStatVisitor dsv(cookie, add_stat);
884     visit(dsv);
885     return ENGINE_SUCCESS;
886 }
887 
makeVBucket(VBucket::id_type id, vbucket_state_t state, KVShard* shard, std::unique_ptr<FailoverTable> table, NewSeqnoCallback newSeqnoCb, vbucket_state_t initState, int64_t lastSeqno, uint64_t lastSnapStart, uint64_t lastSnapEnd, uint64_t purgeSeqno, uint64_t maxCas, int64_t hlcEpochSeqno, bool mightContainXattrs, const std::string& collectionsManifest)888 VBucketPtr EPBucket::makeVBucket(VBucket::id_type id,
889                                  vbucket_state_t state,
890                                  KVShard* shard,
891                                  std::unique_ptr<FailoverTable> table,
892                                  NewSeqnoCallback newSeqnoCb,
893                                  vbucket_state_t initState,
894                                  int64_t lastSeqno,
895                                  uint64_t lastSnapStart,
896                                  uint64_t lastSnapEnd,
897                                  uint64_t purgeSeqno,
898                                  uint64_t maxCas,
899                                  int64_t hlcEpochSeqno,
900                                  bool mightContainXattrs,
901                                  const std::string& collectionsManifest) {
902     auto flusherCb = std::make_shared<NotifyFlusherCB>(shard);
903     // Not using make_shared or allocate_shared
904     // 1. make_shared doesn't accept a Deleter
905     // 2. allocate_shared has inconsistencies between platforms in calling
906     //    alloc.destroy (libc++ doesn't call it)
907     return VBucketPtr(new EPVBucket(id,
908                                     state,
909                                     stats,
910                                     engine.getCheckpointConfig(),
911                                     shard,
912                                     lastSeqno,
913                                     lastSnapStart,
914                                     lastSnapEnd,
915                                     std::move(table),
916                                     flusherCb,
917                                     std::move(newSeqnoCb),
918                                     engine.getConfiguration(),
919                                     eviction_policy,
920                                     initState,
921                                     purgeSeqno,
922                                     maxCas,
923                                     hlcEpochSeqno,
924                                     mightContainXattrs,
925                                     collectionsManifest),
926                       VBucket::DeferredDeleter(engine));
927 }
928 
statsVKey(const DocKey& key, uint16_t vbucket, const void* cookie)929 ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key,
930                                       uint16_t vbucket,
931                                       const void* cookie) {
932     VBucketPtr vb = getVBucket(vbucket);
933     if (!vb) {
934         return ENGINE_NOT_MY_VBUCKET;
935     }
936 
937     return vb->statsVKey(key, cookie, engine, bgFetchDelay);
938 }
939 
completeStatsVKey(const void* cookie, const DocKey& key, uint16_t vbid, uint64_t bySeqNum)940 void EPBucket::completeStatsVKey(const void* cookie,
941                                  const DocKey& key,
942                                  uint16_t vbid,
943                                  uint64_t bySeqNum) {
944     GetValue gcb = getROUnderlying(vbid)->get(key, vbid);
945 
946     if (eviction_policy == FULL_EVICTION) {
947         VBucketPtr vb = getVBucket(vbid);
948         if (vb) {
949             vb->completeStatsVKey(key, gcb);
950         }
951     }
952 
953     if (gcb.getStatus() == ENGINE_SUCCESS) {
954         engine.addLookupResult(cookie, std::move(gcb.item));
955     } else {
956         engine.addLookupResult(cookie, NULL);
957     }
958 
959     --stats.numRemainingBgJobs;
960     engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
961 }
962 
963 /**
964  * Class that handles the disk callback during the rollback.
965  * For each mutation/deletion which was discarded as part of the rollback,
966  * the callback() method is invoked with the key of the discarded update.
967  * It can then lookup the state of that key using dbHandle (which represents the
968  * new, rolled-back file) and correct the in-memory view:
969  *
970  * a) If the key is not present in the Rollback header then delete it from
971  *    the HashTable (if either didn't exist yet, or had previously been
972  *    deleted in the Rollback header).
973  * b) If the key is present in the Rollback header then replace the in-memory
974  *    value with the value from the Rollback header.
975  */
976 class EPDiskRollbackCB : public RollbackCB {
977 public:
EPDiskRollbackCB(EventuallyPersistentEngine& e)978     EPDiskRollbackCB(EventuallyPersistentEngine& e) : RollbackCB(), engine(e) {
979     }
980 
callback(GetValue& val)981     void callback(GetValue& val) {
982         if (!val.item) {
983             throw std::invalid_argument(
984                     "EPDiskRollbackCB::callback: val is NULL");
985         }
986         if (dbHandle == nullptr) {
987             throw std::logic_error(
988                     "EPDiskRollbackCB::callback: dbHandle is NULL");
989         }
990         UniqueItemPtr itm(std::move(val.item));
991         VBucketPtr vb = engine.getVBucket(itm->getVBucketId());
992         GetValue gcb = engine.getKVBucket()
993                                ->getROUnderlying(itm->getVBucketId())
994                                ->getWithHeader(dbHandle,
995                                                itm->getKey(),
996                                                itm->getVBucketId(),
997                                                GetMetaOnly::No);
998         if (gcb.getStatus() == ENGINE_SUCCESS) {
999             UniqueItemPtr it(std::move(gcb.item));
1000             if (it->isDeleted()) {
1001                 removeDeletedDoc(*vb, it->getKey());
1002             } else {
1003                 MutationStatus mtype = vb->setFromInternal(*it);
1004 
1005                 if (mtype == MutationStatus::NoMem) {
1006                     setStatus(ENGINE_ENOMEM);
1007                 }
1008             }
1009         } else if (gcb.getStatus() == ENGINE_KEY_ENOENT) {
1010             removeDeletedDoc(*vb, itm->getKey());
1011         } else {
1012             LOG(EXTENSION_LOG_WARNING,
1013                 "EPDiskRollbackCB::callback:Unexpected Error Status: %d",
1014                 gcb.getStatus());
1015         }
1016     }
1017 
1018     /// Remove a deleted-on-disk document from the VBucket's hashtable.
removeDeletedDoc(VBucket& vb, const DocKey& key)1019     void removeDeletedDoc(VBucket& vb, const DocKey& key) {
1020         if (vb.deleteKey(key)) {
1021             setStatus(ENGINE_SUCCESS);
1022         } else {
1023             // Document didn't exist in memory - may have been deleted in since
1024             // the checkpoint.
1025             setStatus(ENGINE_KEY_ENOENT);
1026         }
1027         // Irrespective of if the in-memory delete succeeded; the document
1028         // doesn't exist on disk; so decrement the item count.
1029         vb.decrNumTotalItems();
1030     }
1031 
1032 private:
1033     EventuallyPersistentEngine& engine;
1034 };
1035 
doRollback(uint16_t vbid, uint64_t rollbackSeqno)1036 RollbackResult EPBucket::doRollback(uint16_t vbid, uint64_t rollbackSeqno) {
1037     auto cb = std::make_shared<EPDiskRollbackCB>(engine);
1038     KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
1039     return rwUnderlying->rollback(vbid, rollbackSeqno, cb);
1040 }
1041 
rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno)1042 void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) {
1043     std::vector<queued_item> items;
1044     vb.checkpointManager->getAllItemsForCursor(CheckpointManager::pCursorName,
1045                                                items);
1046     for (const auto& item : items) {
1047         if (item->getBySeqno() > rollbackSeqno &&
1048             !item->isCheckPointMetaItem()) {
1049             GetValue gcb = getROUnderlying(vb.getId())
1050                                    ->get(item->getKey(), vb.getId(), false);
1051 
1052             if (gcb.getStatus() == ENGINE_SUCCESS) {
1053                 vb.setFromInternal(*gcb.item.get());
1054             } else {
1055                 vb.deleteKey(item->getKey());
1056             }
1057         }
1058     }
1059 }
1060 
notifyNewSeqno(const uint16_t vbid, const VBNotifyCtx& notifyCtx)1061 void EPBucket::notifyNewSeqno(const uint16_t vbid,
1062                               const VBNotifyCtx& notifyCtx) {
1063     if (notifyCtx.notifyFlusher) {
1064         notifyFlusher(vbid);
1065     }
1066     if (notifyCtx.notifyReplication) {
1067         notifyReplication(vbid, notifyCtx.bySeqno);
1068     }
1069 }
1070