1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "ep_bucket.h"
19
20#include "bgfetcher.h"
21#include "checkpoint.h"
22#include "ep_engine.h"
23#include "ep_time.h"
24#include "ep_vb.h"
25#include "failover-table.h"
26#include "flusher.h"
27#include "persistence_callback.h"
28#include "replicationthrottle.h"
29#include "tasks.h"
30
31/**
32 * Callback class used by EpStore, for adding relevant keys
33 * to bloomfilter during compaction.
34 */
35class BloomFilterCallback : public Callback<uint16_t&, const DocKey&, bool&> {
36public:
37    BloomFilterCallback(KVBucket& eps) : store(eps) {
38    }
39
40    void callback(uint16_t& vbucketId, const DocKey& key, bool& isDeleted) {
41        VBucketPtr vb = store.getVBucket(vbucketId);
42        if (vb) {
43            /* Check if a temporary filter has been initialized. If not,
44             * initialize it. If initialization fails, throw an exception
45             * to the caller and let the caller deal with it.
46             */
47            bool tempFilterInitialized = vb->isTempFilterAvailable();
48            if (!tempFilterInitialized) {
49                tempFilterInitialized = initTempFilter(vbucketId);
50            }
51
52            if (!tempFilterInitialized) {
53                throw std::runtime_error(
54                        "BloomFilterCallback::callback: Failed "
55                        "to initialize temporary filter for vbucket: " +
56                        std::to_string(vbucketId));
57            }
58
59            if (store.getItemEvictionPolicy() == VALUE_ONLY) {
60                /**
61                 * VALUE-ONLY EVICTION POLICY
62                 * Consider deleted items only.
63                 */
64                if (isDeleted) {
65                    vb->addToTempFilter(key);
66                }
67            } else {
68                /**
69                 * FULL EVICTION POLICY
70                 * If vbucket's resident ratio is found to be less than
71                 * the residency threshold, consider all items, otherwise
72                 * consider deleted and non-resident items only.
73                 */
74                bool residentRatioLessThanThreshold =
75                        vb->isResidentRatioUnderThreshold(
76                                store.getBfiltersResidencyThreshold());
77                if (residentRatioLessThanThreshold) {
78                    vb->addToTempFilter(key);
79                } else {
80                    if (isDeleted || !store.isMetaDataResident(vb, key)) {
81                        vb->addToTempFilter(key);
82                    }
83                }
84            }
85        }
86    }
87
88private:
89    bool initTempFilter(uint16_t vbucketId);
90    KVBucket& store;
91};
92
93bool BloomFilterCallback::initTempFilter(uint16_t vbucketId) {
94    Configuration& config = store.getEPEngine().getConfiguration();
95    VBucketPtr vb = store.getVBucket(vbucketId);
96    if (!vb) {
97        return false;
98    }
99
100    size_t initial_estimation = config.getBfilterKeyCount();
101    size_t estimated_count;
102    size_t num_deletes = 0;
103    try {
104        num_deletes = store.getROUnderlying(vbucketId)->getNumPersistedDeletes(vbucketId);
105    } catch (std::runtime_error& re) {
106        LOG(EXTENSION_LOG_WARNING,
107            "BloomFilterCallback::initTempFilter: runtime error while getting "
108            "number of persisted deletes for vbucket: %" PRIu16
109            "Details: %s", vbucketId, re.what());
110        return false;
111    }
112
113    item_eviction_policy_t eviction_policy = store.getItemEvictionPolicy();
114    if (eviction_policy == VALUE_ONLY) {
115        /**
116         * VALUE-ONLY EVICTION POLICY
117         * Obtain number of persisted deletes from underlying kvstore.
118         * Bloomfilter's estimated_key_count = 1.25 * deletes
119         */
120        estimated_count = round(1.25 * num_deletes);
121    } else {
122        /**
123         * FULL EVICTION POLICY
124         * First determine if the resident ratio of vbucket is less than
125         * the threshold from configuration.
126         */
127        bool residentRatioAlert = vb->isResidentRatioUnderThreshold(
128                store.getBfiltersResidencyThreshold());
129
130        /**
131         * Based on resident ratio against threshold, estimate count.
132         *
133         * 1. If resident ratio is greater than the threshold:
134         * Obtain number of persisted deletes from underlying kvstore.
135         * Obtain number of non-resident-items for vbucket.
136         * Bloomfilter's estimated_key_count =
137         *                              1.25 * (deletes + non-resident)
138         *
139         * 2. Otherwise:
140         * Obtain number of items for vbucket.
141         * Bloomfilter's estimated_key_count =
142         *                              1.25 * (num_items)
143         */
144
145        if (residentRatioAlert) {
146            estimated_count = round(1.25 * vb->getNumItems());
147        } else {
148            estimated_count =
149                    round(1.25 * (num_deletes + vb->getNumNonResidentItems()));
150        }
151    }
152
153    if (estimated_count < initial_estimation) {
154        estimated_count = initial_estimation;
155    }
156
157    vb->initTempFilter(estimated_count, config.getBfilterFpProb());
158
159    return true;
160}
161
162class ExpiredItemsCallback : public Callback<Item&, time_t&> {
163public:
164    ExpiredItemsCallback(KVBucket& store) : epstore(store) {
165    }
166
167    void callback(Item& it, time_t& startTime) {
168        if (epstore.compactionCanExpireItems()) {
169            epstore.deleteExpiredItem(it, startTime, ExpireBy::Compactor);
170        }
171    }
172
173private:
174    KVBucket& epstore;
175};
176
177class EPBucket::ValueChangedListener : public ::ValueChangedListener {
178public:
179    ValueChangedListener(EPBucket& bucket) : bucket(bucket) {
180    }
181
182    virtual void sizeValueChanged(const std::string& key,
183                                  size_t value) override {
184        if (key == "flusher_batch_split_trigger") {
185            bucket.setFlusherBatchSplitTrigger(value);
186        } else {
187            LOG(EXTENSION_LOG_WARNING,
188                "Failed to change value for unknown variable, %s\n",
189                key.c_str());
190        }
191    }
192
193private:
194    EPBucket& bucket;
195};
196
197EPBucket::EPBucket(EventuallyPersistentEngine& theEngine)
198    : KVBucket(theEngine) {
199    auto& config = engine.getConfiguration();
200    const std::string& policy = config.getItemEvictionPolicy();
201    if (policy.compare("value_only") == 0) {
202        eviction_policy = VALUE_ONLY;
203    } else {
204        eviction_policy = FULL_EVICTION;
205    }
206    replicationThrottle = std::make_unique<ReplicationThrottle>(
207            engine.getConfiguration(), stats);
208
209    vbMap.enablePersistence(*this);
210
211    flusherBatchSplitTrigger = config.getFlusherBatchSplitTrigger();
212    config.addValueChangedListener(
213            "flusher_batch_split_trigger",
214            std::make_unique<ValueChangedListener>(*this));
215}
216
217bool EPBucket::initialize() {
218    KVBucket::initialize();
219
220    enableItemPager();
221
222    if (!startBgFetcher()) {
223        LOG(EXTENSION_LOG_FATAL,
224           "EPBucket::initialize: Failed to create and start bgFetchers");
225        return false;
226    }
227    startFlusher();
228
229    return true;
230}
231
232void EPBucket::deinitialize() {
233    stopFlusher();
234    stopBgFetcher();
235
236    KVBucket::deinitialize();
237}
238
239void EPBucket::reset() {
240    KVBucket::reset();
241
242    // Need to additionally update disk state
243    bool inverse = true;
244    deleteAllTaskCtx.delay.compare_exchange_strong(inverse, false);
245    // Waking up (notifying) one flusher is good enough for diskDeleteAll
246    vbMap.getShard(EP_PRIMARY_SHARD)->getFlusher()->notifyFlushEvent();
247}
248
249std::pair<bool, size_t> EPBucket::flushVBucket(uint16_t vbid) {
250    KVShard *shard = vbMap.getShardByVbId(vbid);
251    if (diskDeleteAll && !deleteAllTaskCtx.delay) {
252        if (shard->getId() == EP_PRIMARY_SHARD) {
253            flushOneDeleteAll();
254        } else {
255            // disk flush is pending just return
256            return {false, 0};
257        }
258    }
259
260    int items_flushed = 0;
261    bool moreAvailable = false;
262    const auto flush_start = ProcessClock::now();
263
264    auto vb = getLockedVBucket(vbid, std::try_to_lock);
265    if (!vb.owns_lock()) {
266        // Try another bucket if this one is locked to avoid blocking flusher.
267        return {true, 0};
268    }
269    if (vb) {
270        // Obtain the set of items to flush, up to the maximum allowed for
271        // a single flush.
272        auto toFlush = vb->getItemsToPersist(flusherBatchSplitTrigger);
273        auto& items = toFlush.items;
274        auto& range = toFlush.range;
275        moreAvailable = toFlush.moreAvailable;
276
277        KVStore* rwUnderlying = getRWUnderlying(vb->getId());
278
279        if (!items.empty()) {
280            while (!rwUnderlying->begin(
281                    std::make_unique<EPTransactionContext>(stats, *vb))) {
282                ++stats.beginFailed;
283                LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
284                    "Retry in 1 sec ...");
285                sleep(1);
286            }
287            rwUnderlying->optimizeWrites(items);
288
289            Item *prev = NULL;
290            auto vbstate = vb->getVBucketState();
291            uint64_t maxSeqno = 0;
292            auto minSeqno = std::numeric_limits<uint64_t>::max();
293
294            range.start = std::max(range.start, vbstate.lastSnapStart);
295
296            bool mustCheckpointVBState = false;
297            auto& pcbs = rwUnderlying->getPersistenceCbList();
298
299            SystemEventFlush sef;
300
301            for (const auto& item : items) {
302
303                if (!item->shouldPersist()) {
304                    continue;
305                }
306
307                // Pass the Item through the SystemEventFlush which may filter
308                // the item away (return Skip).
309                if (sef.process(item) == ProcessStatus::Skip) {
310                    // The item has no further flushing actions i.e. we've
311                    // absorbed it in the process function.
312                    // Update stats and carry-on
313                    --stats.diskQueueSize;
314                    vb->doStatsForFlushing(*item, item->size());
315                    continue;
316                }
317
318                if (item->getOperation() == queue_op::set_vbucket_state) {
319                    // No actual item explicitly persisted to (this op exists
320                    // to ensure a commit occurs with the current vbstate);
321                    // flag that we must trigger a snapshot even if there are
322                    // no 'real' items in the checkpoint.
323                    mustCheckpointVBState = true;
324
325                    // Update queuing stats how this item has logically been
326                    // processed.
327                    --stats.diskQueueSize;
328                    vb->doStatsForFlushing(*item, item->size());
329
330                } else if (!prev || prev->getKey() != item->getKey()) {
331                    prev = item.get();
332                    ++items_flushed;
333                    auto cb = flushOneDelOrSet(item, vb.getVB());
334                    if (cb) {
335                        pcbs.emplace_back(std::move(cb));
336                    }
337
338                    maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
339
340                    // Track the lowest seqno, so we can set the HLC epoch
341                    minSeqno = std::min(minSeqno, (uint64_t)item->getBySeqno());
342                    vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
343                    if (item->isDeleted()) {
344                        vbstate.maxDeletedSeqno =
345                                std::max(uint64_t(vbstate.maxDeletedSeqno),
346                                         item->getRevSeqno());
347                    }
348                    ++stats.flusher_todo;
349
350                } else {
351                    // Item is the same key as the previous[1] one - don't need
352                    // to flush to disk.
353                    // [1] Previous here really means 'next' - optimizeWrites()
354                    //     above has actually re-ordered items such that items
355                    //     with the same key are ordered from high->low seqno.
356                    //     This means we only write the highest (i.e. newest)
357                    //     item for a given key, and discard any duplicate,
358                    //     older items.
359                    --stats.diskQueueSize;
360                    vb->doStatsForFlushing(*item, item->size());
361                }
362            }
363
364
365            {
366                ReaderLockHolder rlh(vb->getStateLock());
367                if (vb->getState() == vbucket_state_active) {
368                    if (maxSeqno) {
369                        range.start = maxSeqno;
370                        range.end = maxSeqno;
371                    }
372                }
373
374                // Update VBstate based on the changes we have just made,
375                // then tell the rwUnderlying the 'new' state
376                // (which will persisted as part of the commit() below).
377                vbstate.lastSnapStart = range.start;
378                vbstate.lastSnapEnd = range.end;
379
380                // Track the lowest seqno written in spock and record it as
381                // the HLC epoch, a seqno which we can be sure the value has a
382                // HLC CAS.
383                vbstate.hlcCasEpochSeqno = vb->getHLCEpochSeqno();
384                if (vbstate.hlcCasEpochSeqno == HlcCasSeqnoUninitialised &&
385                    minSeqno != std::numeric_limits<uint64_t>::max()) {
386                    vbstate.hlcCasEpochSeqno = minSeqno;
387                    vb->setHLCEpochSeqno(vbstate.hlcCasEpochSeqno);
388                }
389
390                // Track if the VB has xattrs present
391                vbstate.mightContainXattrs = vb->mightContainXattrs();
392
393                // Do we need to trigger a persist of the state?
394                // If there are no "real" items to flush, and we encountered
395                // a set_vbucket_state meta-item.
396                auto options = VBStatePersist::VBSTATE_CACHE_UPDATE_ONLY;
397                if ((items_flushed == 0) && mustCheckpointVBState) {
398                    options = VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT;
399                }
400
401                if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
402                                                  options) != true) {
403                    return {true, 0};
404                }
405
406                if (vb->setBucketCreation(false)) {
407                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
408                }
409            }
410
411            /* Perform an explicit commit to disk if the commit
412             * interval reaches zero and if there is a non-zero number
413             * of items to flush.
414             * Or if there is a manifest item
415             */
416            if (items_flushed > 0 || sef.getCollectionsManifestItem()) {
417                commit(*rwUnderlying, sef.getCollectionsManifestItem());
418
419                // Now the commit is complete, vBucket file must exist.
420                if (vb->setBucketCreation(false)) {
421                    LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
422                }
423            }
424
425            if (vb->rejectQueue.empty()) {
426                vb->setPersistedSnapshot(range.start, range.end);
427                uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
428                if (highSeqno > 0 && highSeqno != vb->getPersistenceSeqno()) {
429                    vb->setPersistenceSeqno(highSeqno);
430                }
431            }
432
433            auto flush_end = ProcessClock::now();
434            uint64_t trans_time =
435                    std::chrono::duration_cast<std::chrono::milliseconds>(
436                            flush_end - flush_start)
437                            .count();
438
439            lastTransTimePerItem.store((items_flushed == 0) ? 0 :
440                                       static_cast<double>(trans_time) /
441                                       static_cast<double>(items_flushed));
442            stats.cumulativeFlushTime.fetch_add(trans_time);
443            stats.flusher_todo.store(0);
444            stats.totalPersistVBState++;
445        }
446
447        rwUnderlying->pendingTasks();
448
449        if (vb->checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) {
450            wakeUpCheckpointRemover();
451        }
452
453        if (vb->rejectQueue.empty()) {
454            vb->checkpointManager->itemsPersisted();
455            uint64_t seqno = vb->getPersistenceSeqno();
456            uint64_t chkid =
457                    vb->checkpointManager->getPersistenceCursorPreChkId();
458            vb->notifyHighPriorityRequests(
459                    engine, seqno, HighPriorityVBNotify::Seqno);
460            vb->notifyHighPriorityRequests(
461                    engine, chkid, HighPriorityVBNotify::ChkPersistence);
462            if (chkid > 0 && chkid != vb->getPersistenceCheckpointId()) {
463                vb->setPersistenceCheckpointId(chkid);
464            }
465        } else {
466            return {true, items_flushed};
467        }
468    }
469
470    return {moreAvailable, items_flushed};
471}
472
473void EPBucket::setFlusherBatchSplitTrigger(size_t limit) {
474    flusherBatchSplitTrigger = limit;
475}
476
477void EPBucket::commit(KVStore& kvstore, const Item* collectionsManifest) {
478    auto& pcbs = kvstore.getPersistenceCbList();
479    BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
480    auto commit_start = ProcessClock::now();
481
482    while (!kvstore.commit(collectionsManifest)) {
483        ++stats.commitFailed;
484        LOG(EXTENSION_LOG_WARNING,
485            "KVBucket::commit: kvstore.commit failed!!! Retry in 1 sec...");
486        sleep(1);
487    }
488
489    pcbs.clear();
490    pcbs.shrink_to_fit();
491
492    ++stats.flusherCommits;
493    auto commit_end = ProcessClock::now();
494    auto commit_time = std::chrono::duration_cast<std::chrono::milliseconds>(
495                               commit_end - commit_start)
496                               .count();
497    stats.commit_time.store(commit_time);
498    stats.cumulativeCommitTime.fetch_add(commit_time);
499}
500
501void EPBucket::startFlusher() {
502    for (const auto& shard : vbMap.shards) {
503        shard->getFlusher()->start();
504    }
505}
506
507void EPBucket::stopFlusher() {
508    for (const auto& shard : vbMap.shards) {
509        auto* flusher = shard->getFlusher();
510        LOG(EXTENSION_LOG_NOTICE,
511            "Attempting to stop the flusher for "
512            "shard:%" PRIu16,
513            shard->getId());
514        bool rv = flusher->stop(stats.forceShutdown);
515        if (rv && !stats.forceShutdown) {
516            flusher->wait();
517        }
518    }
519}
520
521bool EPBucket::pauseFlusher() {
522    bool rv = true;
523    for (const auto& shard : vbMap.shards) {
524        auto* flusher = shard->getFlusher();
525        if (!flusher->pause()) {
526            LOG(EXTENSION_LOG_WARNING,
527                "Attempted to pause flusher in state "
528                "[%s], shard = %d",
529                flusher->stateName(),
530                shard->getId());
531            rv = false;
532        }
533    }
534    return rv;
535}
536
537bool EPBucket::resumeFlusher() {
538    bool rv = true;
539    for (const auto& shard : vbMap.shards) {
540        auto* flusher = shard->getFlusher();
541        if (!flusher->resume()) {
542            LOG(EXTENSION_LOG_WARNING,
543                "Attempted to resume flusher in state [%s], "
544                "shard = %" PRIu16,
545                flusher->stateName(),
546                shard->getId());
547            rv = false;
548        }
549    }
550    return rv;
551}
552
553void EPBucket::wakeUpFlusher() {
554    if (stats.diskQueueSize.load() == 0) {
555        for (const auto& shard : vbMap.shards) {
556            shard->getFlusher()->wake();
557        }
558    }
559}
560
561bool EPBucket::startBgFetcher() {
562    for (const auto& shard : vbMap.shards) {
563        BgFetcher* bgfetcher = shard->getBgFetcher();
564        if (bgfetcher == NULL) {
565            LOG(EXTENSION_LOG_WARNING,
566                "Failed to start bg fetcher for shard %" PRIu16,
567                shard->getId());
568            return false;
569        }
570        bgfetcher->start();
571    }
572    return true;
573}
574
575void EPBucket::stopBgFetcher() {
576    for (const auto& shard : vbMap.shards) {
577        BgFetcher* bgfetcher = shard->getBgFetcher();
578        if (multiBGFetchEnabled() && bgfetcher->pendingJob()) {
579            LOG(EXTENSION_LOG_WARNING,
580                "Shutting down engine while there are still pending data "
581                "read for shard %" PRIu16 " from database storage",
582                shard->getId());
583        }
584        LOG(EXTENSION_LOG_NOTICE,
585            "Stopping bg fetcher for shard:%" PRIu16,
586            shard->getId());
587        bgfetcher->stop();
588    }
589}
590
591ENGINE_ERROR_CODE EPBucket::scheduleCompaction(uint16_t vbid,
592                                               compaction_ctx c,
593                                               const void* cookie) {
594    ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id);
595    if (errCode != ENGINE_SUCCESS) {
596        return errCode;
597    }
598
599    /* Obtain the vbucket so we can get the previous purge seqno */
600    VBucketPtr vb = vbMap.getBucket(vbid);
601    if (!vb) {
602        return ENGINE_NOT_MY_VBUCKET;
603    }
604
605    /* Update the compaction ctx with the previous purge seqno */
606    c.max_purged_seq[vbid] = vb->getPurgeSeqno();
607
608    LockHolder lh(compactionLock);
609    ExTask task = std::make_shared<CompactTask>(*this, c, cookie);
610    compactionTasks.push_back(std::make_pair(c.db_file_id, task));
611    if (compactionTasks.size() > 1) {
612        if ((stats.diskQueueSize > compactionWriteQueueCap &&
613             compactionTasks.size() > (vbMap.getNumShards() / 2)) ||
614            engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) {
615            // Snooze a new compaction task.
616            // We will wake it up when one of the existing compaction tasks is
617            // done.
618            task->snooze(60);
619        }
620    }
621
622    ExecutorPool::get()->schedule(task);
623
624    LOG(EXTENSION_LOG_DEBUG,
625        "Scheduled compaction task %" PRIu64
626        " on db %d,"
627        "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64
628        ", dropdeletes = %d",
629        uint64_t(task->getId()),
630        c.db_file_id,
631        c.purge_before_ts,
632        c.purge_before_seq,
633        c.drop_deletes);
634
635    return ENGINE_EWOULDBLOCK;
636}
637
638void EPBucket::flushOneDeleteAll() {
639    for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
640        auto vb = getLockedVBucket(i);
641        if (!vb) {
642            continue;
643        }
644        // Reset the vBucket if it's non-null and not already in the middle of
645        // being created / destroyed.
646        if (!(vb->isBucketCreation() || vb->isDeletionDeferred())) {
647            getRWUnderlying(vb->getId())->reset(i);
648        }
649        // Reset disk item count.
650        vb->setNumTotalItems(0);
651    }
652
653    setDeleteAllComplete();
654}
655
656std::unique_ptr<PersistenceCallback> EPBucket::flushOneDelOrSet(
657        const queued_item& qi, VBucketPtr& vb) {
658    if (!vb) {
659        --stats.diskQueueSize;
660        return NULL;
661    }
662
663    int64_t bySeqno = qi->getBySeqno();
664    bool deleted = qi->isDeleted();
665    rel_time_t queued(qi->getQueuedTime());
666
667    auto dirtyAge = std::chrono::seconds(ep_current_time() - queued);
668    stats.dirtyAgeHisto.add(dirtyAge);
669    stats.dirtyAge.store(static_cast<rel_time_t>(dirtyAge.count()));
670    stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
671                                         stats.dirtyAgeHighWat.load()));
672
673    KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
674    if (!deleted) {
675        // TODO: Need to separate disk_insert from disk_update because
676        // bySeqno doesn't give us that information.
677        BlockTimer timer(bySeqno == -1 ?
678                         &stats.diskInsertHisto : &stats.diskUpdateHisto,
679                         bySeqno == -1 ? "disk_insert" : "disk_update",
680                         stats.timingLog);
681        auto cb = std::make_unique<PersistenceCallback>(qi, qi->getCas());
682        rwUnderlying->set(*qi, *cb);
683        return cb;
684    } else {
685        BlockTimer timer(&stats.diskDelHisto, "disk_delete",
686                         stats.timingLog);
687        auto cb = std::make_unique<PersistenceCallback>(qi, 0);
688        rwUnderlying->del(*qi, *cb);
689        return cb;
690    }
691}
692
693void EPBucket::compactInternal(compaction_ctx* ctx) {
694    BloomFilterCBPtr filter(new BloomFilterCallback(*this));
695    ctx->bloomFilterCallback = filter;
696
697    ExpiredItemsCBPtr expiry(new ExpiredItemsCallback(*this));
698    ctx->expiryCallback = expiry;
699
700    ctx->collectionsEraser = std::bind(&KVBucket::collectionsEraseKey,
701                                       this,
702                                       uint16_t(ctx->db_file_id),
703                                       std::placeholders::_1,
704                                       std::placeholders::_2,
705                                       std::placeholders::_3,
706                                       std::placeholders::_4);
707
708    KVShard* shard = vbMap.getShardByVbId(ctx->db_file_id);
709    KVStore* store = shard->getRWUnderlying();
710    bool result = store->compactDB(ctx);
711
712    Configuration& config = getEPEngine().getConfiguration();
713    /* Iterate over all the vbucket ids set in max_purged_seq map. If there is
714     * an entry
715     * in the map for a vbucket id, then it was involved in compaction and thus
716     * can
717     * be used to update the associated bloom filters and purge sequence numbers
718     */
719    for (auto& it : ctx->max_purged_seq) {
720        const uint16_t vbid = it.first;
721        VBucketPtr vb = getVBucket(vbid);
722        if (!vb) {
723            continue;
724        }
725
726        if (config.isBfilterEnabled() && result) {
727            vb->swapFilter();
728        } else {
729            vb->clearFilter();
730        }
731        vb->setPurgeSeqno(it.second);
732    }
733
734    LOG(EXTENSION_LOG_NOTICE,
735        "Compaction of db file id: %d completed (%s). "
736        "tombstones_purged:%" PRIu64 ", collection_items_erased:%" PRIu64
737        ", pre{size:%" PRIu64 ", items:%" PRIu64 ", deleted_items:%" PRIu64
738        ", purge_seqno:%" PRIu64 "}, post{size:%" PRIu64 ", items:%" PRIu64
739        ", deleted_items:%" PRIu64 ", purge_seqno:%" PRIu64 "}",
740        ctx->db_file_id,
741        result ? "ok" : "failed",
742        ctx->stats.tombstonesPurged,
743        ctx->stats.collectionsItemsPurged,
744        ctx->stats.pre.size,
745        ctx->stats.pre.items,
746        ctx->stats.pre.deletedItems,
747        ctx->stats.pre.purgeSeqno,
748        ctx->stats.post.size,
749        ctx->stats.post.items,
750        ctx->stats.post.deletedItems,
751        ctx->stats.post.purgeSeqno);
752
753    // The collections eraser may have gathered some garbage keys which can now
754    // be released.
755    auto vb = getVBucket(uint16_t(ctx->db_file_id));
756    if (vb) {
757        ctx->eraserContext.processKeys(*vb);
758    }
759}
760
761bool EPBucket::doCompact(compaction_ctx* ctx, const void* cookie) {
762    ENGINE_ERROR_CODE err = ENGINE_SUCCESS;
763    StorageProperties storeProp = getStorageProperties();
764    bool concWriteCompact = storeProp.hasConcWriteCompact();
765    uint16_t vbid = ctx->db_file_id;
766
767    /**
768     * Check if the underlying storage engine allows writes concurrently
769     * as the database file is being compacted. If not, a lock needs to
770     * be held in order to serialize access to the database file between
771     * the writer and compactor threads
772     */
773    if (concWriteCompact == false) {
774        auto vb = getLockedVBucket(vbid, std::try_to_lock);
775        if (!vb.owns_lock()) {
776            // VB currently locked; try again later.
777            return true;
778        }
779
780        if (!vb) {
781            err = ENGINE_NOT_MY_VBUCKET;
782            engine.storeEngineSpecific(cookie, NULL);
783            /**
784             * Decrement session counter here, as memcached thread wouldn't
785             * visit the engine interface in case of a NOT_MY_VB notification
786             */
787            engine.decrementSessionCtr();
788        } else {
789            compactInternal(ctx);
790        }
791    } else {
792        compactInternal(ctx);
793    }
794
795    updateCompactionTasks(ctx->db_file_id);
796
797    if (cookie) {
798        engine.notifyIOComplete(cookie, err);
799    }
800    --stats.pendingCompactions;
801    return false;
802}
803
804void EPBucket::updateCompactionTasks(DBFileId db_file_id) {
805    LockHolder lh(compactionLock);
806    bool erased = false, woke = false;
807    std::list<CompTaskEntry>::iterator it = compactionTasks.begin();
808    while (it != compactionTasks.end()) {
809        if ((*it).first == db_file_id) {
810            it = compactionTasks.erase(it);
811            erased = true;
812        } else {
813            ExTask& task = (*it).second;
814            if (task->getState() == TASK_SNOOZED) {
815                ExecutorPool::get()->wake(task->getId());
816                woke = true;
817            }
818            ++it;
819        }
820        if (erased && woke) {
821            break;
822        }
823    }
824}
825
826std::pair<uint64_t, bool> EPBucket::getLastPersistedCheckpointId(uint16_t vb) {
827    auto vbucket = vbMap.getBucket(vb);
828    if (vbucket) {
829        return {vbucket->getPersistenceCheckpointId(), true};
830    } else {
831        return {0, true};
832    }
833}
834
835ENGINE_ERROR_CODE EPBucket::getFileStats(const void* cookie,
836                                         ADD_STAT add_stat) {
837    const auto numShards = vbMap.getNumShards();
838    DBFileInfo totalInfo;
839
840    for (uint16_t shardId = 0; shardId < numShards; shardId++) {
841        const auto dbInfo =
842                getRWUnderlyingByShard(shardId)->getAggrDbFileInfo();
843        totalInfo.spaceUsed += dbInfo.spaceUsed;
844        totalInfo.fileSize += dbInfo.fileSize;
845    }
846
847    add_casted_stat("ep_db_data_size", totalInfo.spaceUsed, add_stat, cookie);
848    add_casted_stat("ep_db_file_size", totalInfo.fileSize, add_stat, cookie);
849
850    return ENGINE_SUCCESS;
851}
852
853ENGINE_ERROR_CODE EPBucket::getPerVBucketDiskStats(const void* cookie,
854                                                   ADD_STAT add_stat) {
855    class DiskStatVisitor : public VBucketVisitor {
856    public:
857        DiskStatVisitor(const void* c, ADD_STAT a) : cookie(c), add_stat(a) {
858        }
859
860        void visitBucket(VBucketPtr& vb) override {
861            char buf[32];
862            uint16_t vbid = vb->getId();
863            DBFileInfo dbInfo =
864                    vb->getShard()->getRWUnderlying()->getDbFileInfo(vbid);
865
866            try {
867                checked_snprintf(buf, sizeof(buf), "vb_%d:data_size", vbid);
868                add_casted_stat(buf, dbInfo.spaceUsed, add_stat, cookie);
869                checked_snprintf(buf, sizeof(buf), "vb_%d:file_size", vbid);
870                add_casted_stat(buf, dbInfo.fileSize, add_stat, cookie);
871            } catch (std::exception& error) {
872                LOG(EXTENSION_LOG_WARNING,
873                    "DiskStatVisitor::visitBucket: Failed to build stat: %s",
874                    error.what());
875            }
876        }
877
878    private:
879        const void* cookie;
880        ADD_STAT add_stat;
881    };
882
883    DiskStatVisitor dsv(cookie, add_stat);
884    visit(dsv);
885    return ENGINE_SUCCESS;
886}
887
888VBucketPtr EPBucket::makeVBucket(VBucket::id_type id,
889                                 vbucket_state_t state,
890                                 KVShard* shard,
891                                 std::unique_ptr<FailoverTable> table,
892                                 NewSeqnoCallback newSeqnoCb,
893                                 vbucket_state_t initState,
894                                 int64_t lastSeqno,
895                                 uint64_t lastSnapStart,
896                                 uint64_t lastSnapEnd,
897                                 uint64_t purgeSeqno,
898                                 uint64_t maxCas,
899                                 int64_t hlcEpochSeqno,
900                                 bool mightContainXattrs,
901                                 const std::string& collectionsManifest) {
902    auto flusherCb = std::make_shared<NotifyFlusherCB>(shard);
903    // Not using make_shared or allocate_shared
904    // 1. make_shared doesn't accept a Deleter
905    // 2. allocate_shared has inconsistencies between platforms in calling
906    //    alloc.destroy (libc++ doesn't call it)
907    return VBucketPtr(new EPVBucket(id,
908                                    state,
909                                    stats,
910                                    engine.getCheckpointConfig(),
911                                    shard,
912                                    lastSeqno,
913                                    lastSnapStart,
914                                    lastSnapEnd,
915                                    std::move(table),
916                                    flusherCb,
917                                    std::move(newSeqnoCb),
918                                    engine.getConfiguration(),
919                                    eviction_policy,
920                                    initState,
921                                    purgeSeqno,
922                                    maxCas,
923                                    hlcEpochSeqno,
924                                    mightContainXattrs,
925                                    collectionsManifest),
926                      VBucket::DeferredDeleter(engine));
927}
928
929ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key,
930                                      uint16_t vbucket,
931                                      const void* cookie) {
932    VBucketPtr vb = getVBucket(vbucket);
933    if (!vb) {
934        return ENGINE_NOT_MY_VBUCKET;
935    }
936
937    return vb->statsVKey(key, cookie, engine, bgFetchDelay);
938}
939
940void EPBucket::completeStatsVKey(const void* cookie,
941                                 const DocKey& key,
942                                 uint16_t vbid,
943                                 uint64_t bySeqNum) {
944    GetValue gcb = getROUnderlying(vbid)->get(key, vbid);
945
946    if (eviction_policy == FULL_EVICTION) {
947        VBucketPtr vb = getVBucket(vbid);
948        if (vb) {
949            vb->completeStatsVKey(key, gcb);
950        }
951    }
952
953    if (gcb.getStatus() == ENGINE_SUCCESS) {
954        engine.addLookupResult(cookie, std::move(gcb.item));
955    } else {
956        engine.addLookupResult(cookie, NULL);
957    }
958
959    --stats.numRemainingBgJobs;
960    engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
961}
962
963/**
964 * Class that handles the disk callback during the rollback.
965 * For each mutation/deletion which was discarded as part of the rollback,
966 * the callback() method is invoked with the key of the discarded update.
967 * It can then lookup the state of that key using dbHandle (which represents the
968 * new, rolled-back file) and correct the in-memory view:
969 *
970 * a) If the key is not present in the Rollback header then delete it from
971 *    the HashTable (if either didn't exist yet, or had previously been
972 *    deleted in the Rollback header).
973 * b) If the key is present in the Rollback header then replace the in-memory
974 *    value with the value from the Rollback header.
975 */
976class EPDiskRollbackCB : public RollbackCB {
977public:
978    EPDiskRollbackCB(EventuallyPersistentEngine& e) : RollbackCB(), engine(e) {
979    }
980
981    void callback(GetValue& val) {
982        if (!val.item) {
983            throw std::invalid_argument(
984                    "EPDiskRollbackCB::callback: val is NULL");
985        }
986        if (dbHandle == nullptr) {
987            throw std::logic_error(
988                    "EPDiskRollbackCB::callback: dbHandle is NULL");
989        }
990        UniqueItemPtr itm(std::move(val.item));
991        VBucketPtr vb = engine.getVBucket(itm->getVBucketId());
992        GetValue gcb = engine.getKVBucket()
993                               ->getROUnderlying(itm->getVBucketId())
994                               ->getWithHeader(dbHandle,
995                                               itm->getKey(),
996                                               itm->getVBucketId(),
997                                               GetMetaOnly::No);
998        if (gcb.getStatus() == ENGINE_SUCCESS) {
999            UniqueItemPtr it(std::move(gcb.item));
1000            if (it->isDeleted()) {
1001                removeDeletedDoc(*vb, it->getKey());
1002            } else {
1003                MutationStatus mtype = vb->setFromInternal(*it);
1004
1005                if (mtype == MutationStatus::NoMem) {
1006                    setStatus(ENGINE_ENOMEM);
1007                }
1008            }
1009        } else if (gcb.getStatus() == ENGINE_KEY_ENOENT) {
1010            removeDeletedDoc(*vb, itm->getKey());
1011        } else {
1012            LOG(EXTENSION_LOG_WARNING,
1013                "EPDiskRollbackCB::callback:Unexpected Error Status: %d",
1014                gcb.getStatus());
1015        }
1016    }
1017
1018    /// Remove a deleted-on-disk document from the VBucket's hashtable.
1019    void removeDeletedDoc(VBucket& vb, const DocKey& key) {
1020        if (vb.deleteKey(key)) {
1021            setStatus(ENGINE_SUCCESS);
1022        } else {
1023            // Document didn't exist in memory - may have been deleted in since
1024            // the checkpoint.
1025            setStatus(ENGINE_KEY_ENOENT);
1026        }
1027        // Irrespective of if the in-memory delete succeeded; the document
1028        // doesn't exist on disk; so decrement the item count.
1029        vb.decrNumTotalItems();
1030    }
1031
1032private:
1033    EventuallyPersistentEngine& engine;
1034};
1035
1036RollbackResult EPBucket::doRollback(uint16_t vbid, uint64_t rollbackSeqno) {
1037    auto cb = std::make_shared<EPDiskRollbackCB>(engine);
1038    KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying();
1039    return rwUnderlying->rollback(vbid, rollbackSeqno, cb);
1040}
1041
1042void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) {
1043    std::vector<queued_item> items;
1044    vb.checkpointManager->getAllItemsForCursor(CheckpointManager::pCursorName,
1045                                               items);
1046    for (const auto& item : items) {
1047        if (item->getBySeqno() > rollbackSeqno &&
1048            !item->isCheckPointMetaItem()) {
1049            GetValue gcb = getROUnderlying(vb.getId())
1050                                   ->get(item->getKey(), vb.getId(), false);
1051
1052            if (gcb.getStatus() == ENGINE_SUCCESS) {
1053                vb.setFromInternal(*gcb.item.get());
1054            } else {
1055                vb.deleteKey(item->getKey());
1056            }
1057        }
1058    }
1059}
1060
1061void EPBucket::notifyNewSeqno(const uint16_t vbid,
1062                              const VBNotifyCtx& notifyCtx) {
1063    if (notifyCtx.notifyFlusher) {
1064        notifyFlusher(vbid);
1065    }
1066    if (notifyCtx.notifyReplication) {
1067        notifyReplication(vbid, notifyCtx.bySeqno);
1068    }
1069}
1070