1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2016 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include "ep_bucket.h" 19 20#include "bgfetcher.h" 21#include "checkpoint.h" 22#include "ep_engine.h" 23#include "ep_time.h" 24#include "ep_vb.h" 25#include "failover-table.h" 26#include "flusher.h" 27#include "persistence_callback.h" 28#include "replicationthrottle.h" 29#include "tasks.h" 30 31/** 32 * Callback class used by EpStore, for adding relevant keys 33 * to bloomfilter during compaction. 34 */ 35class BloomFilterCallback : public Callback<uint16_t&, const DocKey&, bool&> { 36public: 37 BloomFilterCallback(KVBucket& eps) : store(eps) { 38 } 39 40 void callback(uint16_t& vbucketId, const DocKey& key, bool& isDeleted) { 41 VBucketPtr vb = store.getVBucket(vbucketId); 42 if (vb) { 43 /* Check if a temporary filter has been initialized. If not, 44 * initialize it. If initialization fails, throw an exception 45 * to the caller and let the caller deal with it. 46 */ 47 bool tempFilterInitialized = vb->isTempFilterAvailable(); 48 if (!tempFilterInitialized) { 49 tempFilterInitialized = initTempFilter(vbucketId); 50 } 51 52 if (!tempFilterInitialized) { 53 throw std::runtime_error( 54 "BloomFilterCallback::callback: Failed " 55 "to initialize temporary filter for vbucket: " + 56 std::to_string(vbucketId)); 57 } 58 59 if (store.getItemEvictionPolicy() == VALUE_ONLY) { 60 /** 61 * VALUE-ONLY EVICTION POLICY 62 * Consider deleted items only. 63 */ 64 if (isDeleted) { 65 vb->addToTempFilter(key); 66 } 67 } else { 68 /** 69 * FULL EVICTION POLICY 70 * If vbucket's resident ratio is found to be less than 71 * the residency threshold, consider all items, otherwise 72 * consider deleted and non-resident items only. 73 */ 74 bool residentRatioLessThanThreshold = 75 vb->isResidentRatioUnderThreshold( 76 store.getBfiltersResidencyThreshold()); 77 if (residentRatioLessThanThreshold) { 78 vb->addToTempFilter(key); 79 } else { 80 if (isDeleted || !store.isMetaDataResident(vb, key)) { 81 vb->addToTempFilter(key); 82 } 83 } 84 } 85 } 86 } 87 88private: 89 bool initTempFilter(uint16_t vbucketId); 90 KVBucket& store; 91}; 92 93bool BloomFilterCallback::initTempFilter(uint16_t vbucketId) { 94 Configuration& config = store.getEPEngine().getConfiguration(); 95 VBucketPtr vb = store.getVBucket(vbucketId); 96 if (!vb) { 97 return false; 98 } 99 100 size_t initial_estimation = config.getBfilterKeyCount(); 101 size_t estimated_count; 102 size_t num_deletes = 0; 103 try { 104 num_deletes = store.getROUnderlying(vbucketId)->getNumPersistedDeletes(vbucketId); 105 } catch (std::runtime_error& re) { 106 LOG(EXTENSION_LOG_WARNING, 107 "BloomFilterCallback::initTempFilter: runtime error while getting " 108 "number of persisted deletes for vbucket: %" PRIu16 109 "Details: %s", vbucketId, re.what()); 110 return false; 111 } 112 113 item_eviction_policy_t eviction_policy = store.getItemEvictionPolicy(); 114 if (eviction_policy == VALUE_ONLY) { 115 /** 116 * VALUE-ONLY EVICTION POLICY 117 * Obtain number of persisted deletes from underlying kvstore. 118 * Bloomfilter's estimated_key_count = 1.25 * deletes 119 */ 120 estimated_count = round(1.25 * num_deletes); 121 } else { 122 /** 123 * FULL EVICTION POLICY 124 * First determine if the resident ratio of vbucket is less than 125 * the threshold from configuration. 126 */ 127 bool residentRatioAlert = vb->isResidentRatioUnderThreshold( 128 store.getBfiltersResidencyThreshold()); 129 130 /** 131 * Based on resident ratio against threshold, estimate count. 132 * 133 * 1. If resident ratio is greater than the threshold: 134 * Obtain number of persisted deletes from underlying kvstore. 135 * Obtain number of non-resident-items for vbucket. 136 * Bloomfilter's estimated_key_count = 137 * 1.25 * (deletes + non-resident) 138 * 139 * 2. Otherwise: 140 * Obtain number of items for vbucket. 141 * Bloomfilter's estimated_key_count = 142 * 1.25 * (num_items) 143 */ 144 145 if (residentRatioAlert) { 146 estimated_count = round(1.25 * vb->getNumItems()); 147 } else { 148 estimated_count = 149 round(1.25 * (num_deletes + vb->getNumNonResidentItems())); 150 } 151 } 152 153 if (estimated_count < initial_estimation) { 154 estimated_count = initial_estimation; 155 } 156 157 vb->initTempFilter(estimated_count, config.getBfilterFpProb()); 158 159 return true; 160} 161 162class ExpiredItemsCallback : public Callback<Item&, time_t&> { 163public: 164 ExpiredItemsCallback(KVBucket& store) : epstore(store) { 165 } 166 167 void callback(Item& it, time_t& startTime) { 168 if (epstore.compactionCanExpireItems()) { 169 epstore.deleteExpiredItem(it, startTime, ExpireBy::Compactor); 170 } 171 } 172 173private: 174 KVBucket& epstore; 175}; 176 177class EPBucket::ValueChangedListener : public ::ValueChangedListener { 178public: 179 ValueChangedListener(EPBucket& bucket) : bucket(bucket) { 180 } 181 182 virtual void sizeValueChanged(const std::string& key, 183 size_t value) override { 184 if (key == "flusher_batch_split_trigger") { 185 bucket.setFlusherBatchSplitTrigger(value); 186 } else { 187 LOG(EXTENSION_LOG_WARNING, 188 "Failed to change value for unknown variable, %s\n", 189 key.c_str()); 190 } 191 } 192 193private: 194 EPBucket& bucket; 195}; 196 197EPBucket::EPBucket(EventuallyPersistentEngine& theEngine) 198 : KVBucket(theEngine) { 199 auto& config = engine.getConfiguration(); 200 const std::string& policy = config.getItemEvictionPolicy(); 201 if (policy.compare("value_only") == 0) { 202 eviction_policy = VALUE_ONLY; 203 } else { 204 eviction_policy = FULL_EVICTION; 205 } 206 replicationThrottle = std::make_unique<ReplicationThrottle>( 207 engine.getConfiguration(), stats); 208 209 vbMap.enablePersistence(*this); 210 211 flusherBatchSplitTrigger = config.getFlusherBatchSplitTrigger(); 212 config.addValueChangedListener( 213 "flusher_batch_split_trigger", 214 std::make_unique<ValueChangedListener>(*this)); 215} 216 217bool EPBucket::initialize() { 218 KVBucket::initialize(); 219 220 enableItemPager(); 221 222 if (!startBgFetcher()) { 223 LOG(EXTENSION_LOG_FATAL, 224 "EPBucket::initialize: Failed to create and start bgFetchers"); 225 return false; 226 } 227 startFlusher(); 228 229 return true; 230} 231 232void EPBucket::deinitialize() { 233 stopFlusher(); 234 stopBgFetcher(); 235 236 KVBucket::deinitialize(); 237} 238 239void EPBucket::reset() { 240 KVBucket::reset(); 241 242 // Need to additionally update disk state 243 bool inverse = true; 244 deleteAllTaskCtx.delay.compare_exchange_strong(inverse, false); 245 // Waking up (notifying) one flusher is good enough for diskDeleteAll 246 vbMap.getShard(EP_PRIMARY_SHARD)->getFlusher()->notifyFlushEvent(); 247} 248 249std::pair<bool, size_t> EPBucket::flushVBucket(uint16_t vbid) { 250 KVShard *shard = vbMap.getShardByVbId(vbid); 251 if (diskDeleteAll && !deleteAllTaskCtx.delay) { 252 if (shard->getId() == EP_PRIMARY_SHARD) { 253 flushOneDeleteAll(); 254 } else { 255 // disk flush is pending just return 256 return {false, 0}; 257 } 258 } 259 260 int items_flushed = 0; 261 bool moreAvailable = false; 262 const auto flush_start = ProcessClock::now(); 263 264 auto vb = getLockedVBucket(vbid, std::try_to_lock); 265 if (!vb.owns_lock()) { 266 // Try another bucket if this one is locked to avoid blocking flusher. 267 return {true, 0}; 268 } 269 if (vb) { 270 // Obtain the set of items to flush, up to the maximum allowed for 271 // a single flush. 272 auto toFlush = vb->getItemsToPersist(flusherBatchSplitTrigger); 273 auto& items = toFlush.items; 274 auto& range = toFlush.range; 275 moreAvailable = toFlush.moreAvailable; 276 277 KVStore* rwUnderlying = getRWUnderlying(vb->getId()); 278 279 if (!items.empty()) { 280 while (!rwUnderlying->begin( 281 std::make_unique<EPTransactionContext>(stats, *vb))) { 282 ++stats.beginFailed; 283 LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! " 284 "Retry in 1 sec ..."); 285 sleep(1); 286 } 287 rwUnderlying->optimizeWrites(items); 288 289 Item *prev = NULL; 290 auto vbstate = vb->getVBucketState(); 291 uint64_t maxSeqno = 0; 292 auto minSeqno = std::numeric_limits<uint64_t>::max(); 293 294 range.start = std::max(range.start, vbstate.lastSnapStart); 295 296 bool mustCheckpointVBState = false; 297 auto& pcbs = rwUnderlying->getPersistenceCbList(); 298 299 SystemEventFlush sef; 300 301 for (const auto& item : items) { 302 303 if (!item->shouldPersist()) { 304 continue; 305 } 306 307 // Pass the Item through the SystemEventFlush which may filter 308 // the item away (return Skip). 309 if (sef.process(item) == ProcessStatus::Skip) { 310 // The item has no further flushing actions i.e. we've 311 // absorbed it in the process function. 312 // Update stats and carry-on 313 --stats.diskQueueSize; 314 vb->doStatsForFlushing(*item, item->size()); 315 continue; 316 } 317 318 if (item->getOperation() == queue_op::set_vbucket_state) { 319 // No actual item explicitly persisted to (this op exists 320 // to ensure a commit occurs with the current vbstate); 321 // flag that we must trigger a snapshot even if there are 322 // no 'real' items in the checkpoint. 323 mustCheckpointVBState = true; 324 325 // Update queuing stats how this item has logically been 326 // processed. 327 --stats.diskQueueSize; 328 vb->doStatsForFlushing(*item, item->size()); 329 330 } else if (!prev || prev->getKey() != item->getKey()) { 331 prev = item.get(); 332 ++items_flushed; 333 auto cb = flushOneDelOrSet(item, vb.getVB()); 334 if (cb) { 335 pcbs.emplace_back(std::move(cb)); 336 } 337 338 maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno()); 339 340 // Track the lowest seqno, so we can set the HLC epoch 341 minSeqno = std::min(minSeqno, (uint64_t)item->getBySeqno()); 342 vbstate.maxCas = std::max(vbstate.maxCas, item->getCas()); 343 if (item->isDeleted()) { 344 vbstate.maxDeletedSeqno = 345 std::max(uint64_t(vbstate.maxDeletedSeqno), 346 item->getRevSeqno()); 347 } 348 ++stats.flusher_todo; 349 350 } else { 351 // Item is the same key as the previous[1] one - don't need 352 // to flush to disk. 353 // [1] Previous here really means 'next' - optimizeWrites() 354 // above has actually re-ordered items such that items 355 // with the same key are ordered from high->low seqno. 356 // This means we only write the highest (i.e. newest) 357 // item for a given key, and discard any duplicate, 358 // older items. 359 --stats.diskQueueSize; 360 vb->doStatsForFlushing(*item, item->size()); 361 } 362 } 363 364 365 { 366 ReaderLockHolder rlh(vb->getStateLock()); 367 if (vb->getState() == vbucket_state_active) { 368 if (maxSeqno) { 369 range.start = maxSeqno; 370 range.end = maxSeqno; 371 } 372 } 373 374 // Update VBstate based on the changes we have just made, 375 // then tell the rwUnderlying the 'new' state 376 // (which will persisted as part of the commit() below). 377 vbstate.lastSnapStart = range.start; 378 vbstate.lastSnapEnd = range.end; 379 380 // Track the lowest seqno written in spock and record it as 381 // the HLC epoch, a seqno which we can be sure the value has a 382 // HLC CAS. 383 vbstate.hlcCasEpochSeqno = vb->getHLCEpochSeqno(); 384 if (vbstate.hlcCasEpochSeqno == HlcCasSeqnoUninitialised && 385 minSeqno != std::numeric_limits<uint64_t>::max()) { 386 vbstate.hlcCasEpochSeqno = minSeqno; 387 vb->setHLCEpochSeqno(vbstate.hlcCasEpochSeqno); 388 } 389 390 // Track if the VB has xattrs present 391 vbstate.mightContainXattrs = vb->mightContainXattrs(); 392 393 // Do we need to trigger a persist of the state? 394 // If there are no "real" items to flush, and we encountered 395 // a set_vbucket_state meta-item. 396 auto options = VBStatePersist::VBSTATE_CACHE_UPDATE_ONLY; 397 if ((items_flushed == 0) && mustCheckpointVBState) { 398 options = VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT; 399 } 400 401 if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate, 402 options) != true) { 403 return {true, 0}; 404 } 405 406 if (vb->setBucketCreation(false)) { 407 LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid); 408 } 409 } 410 411 /* Perform an explicit commit to disk if the commit 412 * interval reaches zero and if there is a non-zero number 413 * of items to flush. 414 * Or if there is a manifest item 415 */ 416 if (items_flushed > 0 || sef.getCollectionsManifestItem()) { 417 commit(*rwUnderlying, sef.getCollectionsManifestItem()); 418 419 // Now the commit is complete, vBucket file must exist. 420 if (vb->setBucketCreation(false)) { 421 LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid); 422 } 423 } 424 425 if (vb->rejectQueue.empty()) { 426 vb->setPersistedSnapshot(range.start, range.end); 427 uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid); 428 if (highSeqno > 0 && highSeqno != vb->getPersistenceSeqno()) { 429 vb->setPersistenceSeqno(highSeqno); 430 } 431 } 432 433 auto flush_end = ProcessClock::now(); 434 uint64_t trans_time = 435 std::chrono::duration_cast<std::chrono::milliseconds>( 436 flush_end - flush_start) 437 .count(); 438 439 lastTransTimePerItem.store((items_flushed == 0) ? 0 : 440 static_cast<double>(trans_time) / 441 static_cast<double>(items_flushed)); 442 stats.cumulativeFlushTime.fetch_add(trans_time); 443 stats.flusher_todo.store(0); 444 stats.totalPersistVBState++; 445 } 446 447 rwUnderlying->pendingTasks(); 448 449 if (vb->checkpointManager->hasClosedCheckpointWhichCanBeRemoved()) { 450 wakeUpCheckpointRemover(); 451 } 452 453 if (vb->rejectQueue.empty()) { 454 vb->checkpointManager->itemsPersisted(); 455 uint64_t seqno = vb->getPersistenceSeqno(); 456 uint64_t chkid = 457 vb->checkpointManager->getPersistenceCursorPreChkId(); 458 vb->notifyHighPriorityRequests( 459 engine, seqno, HighPriorityVBNotify::Seqno); 460 vb->notifyHighPriorityRequests( 461 engine, chkid, HighPriorityVBNotify::ChkPersistence); 462 if (chkid > 0 && chkid != vb->getPersistenceCheckpointId()) { 463 vb->setPersistenceCheckpointId(chkid); 464 } 465 } else { 466 return {true, items_flushed}; 467 } 468 } 469 470 return {moreAvailable, items_flushed}; 471} 472 473void EPBucket::setFlusherBatchSplitTrigger(size_t limit) { 474 flusherBatchSplitTrigger = limit; 475} 476 477void EPBucket::commit(KVStore& kvstore, const Item* collectionsManifest) { 478 auto& pcbs = kvstore.getPersistenceCbList(); 479 BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog); 480 auto commit_start = ProcessClock::now(); 481 482 while (!kvstore.commit(collectionsManifest)) { 483 ++stats.commitFailed; 484 LOG(EXTENSION_LOG_WARNING, 485 "KVBucket::commit: kvstore.commit failed!!! Retry in 1 sec..."); 486 sleep(1); 487 } 488 489 pcbs.clear(); 490 pcbs.shrink_to_fit(); 491 492 ++stats.flusherCommits; 493 auto commit_end = ProcessClock::now(); 494 auto commit_time = std::chrono::duration_cast<std::chrono::milliseconds>( 495 commit_end - commit_start) 496 .count(); 497 stats.commit_time.store(commit_time); 498 stats.cumulativeCommitTime.fetch_add(commit_time); 499} 500 501void EPBucket::startFlusher() { 502 for (const auto& shard : vbMap.shards) { 503 shard->getFlusher()->start(); 504 } 505} 506 507void EPBucket::stopFlusher() { 508 for (const auto& shard : vbMap.shards) { 509 auto* flusher = shard->getFlusher(); 510 LOG(EXTENSION_LOG_NOTICE, 511 "Attempting to stop the flusher for " 512 "shard:%" PRIu16, 513 shard->getId()); 514 bool rv = flusher->stop(stats.forceShutdown); 515 if (rv && !stats.forceShutdown) { 516 flusher->wait(); 517 } 518 } 519} 520 521bool EPBucket::pauseFlusher() { 522 bool rv = true; 523 for (const auto& shard : vbMap.shards) { 524 auto* flusher = shard->getFlusher(); 525 if (!flusher->pause()) { 526 LOG(EXTENSION_LOG_WARNING, 527 "Attempted to pause flusher in state " 528 "[%s], shard = %d", 529 flusher->stateName(), 530 shard->getId()); 531 rv = false; 532 } 533 } 534 return rv; 535} 536 537bool EPBucket::resumeFlusher() { 538 bool rv = true; 539 for (const auto& shard : vbMap.shards) { 540 auto* flusher = shard->getFlusher(); 541 if (!flusher->resume()) { 542 LOG(EXTENSION_LOG_WARNING, 543 "Attempted to resume flusher in state [%s], " 544 "shard = %" PRIu16, 545 flusher->stateName(), 546 shard->getId()); 547 rv = false; 548 } 549 } 550 return rv; 551} 552 553void EPBucket::wakeUpFlusher() { 554 if (stats.diskQueueSize.load() == 0) { 555 for (const auto& shard : vbMap.shards) { 556 shard->getFlusher()->wake(); 557 } 558 } 559} 560 561bool EPBucket::startBgFetcher() { 562 for (const auto& shard : vbMap.shards) { 563 BgFetcher* bgfetcher = shard->getBgFetcher(); 564 if (bgfetcher == NULL) { 565 LOG(EXTENSION_LOG_WARNING, 566 "Failed to start bg fetcher for shard %" PRIu16, 567 shard->getId()); 568 return false; 569 } 570 bgfetcher->start(); 571 } 572 return true; 573} 574 575void EPBucket::stopBgFetcher() { 576 for (const auto& shard : vbMap.shards) { 577 BgFetcher* bgfetcher = shard->getBgFetcher(); 578 if (multiBGFetchEnabled() && bgfetcher->pendingJob()) { 579 LOG(EXTENSION_LOG_WARNING, 580 "Shutting down engine while there are still pending data " 581 "read for shard %" PRIu16 " from database storage", 582 shard->getId()); 583 } 584 LOG(EXTENSION_LOG_NOTICE, 585 "Stopping bg fetcher for shard:%" PRIu16, 586 shard->getId()); 587 bgfetcher->stop(); 588 } 589} 590 591ENGINE_ERROR_CODE EPBucket::scheduleCompaction(uint16_t vbid, 592 compaction_ctx c, 593 const void* cookie) { 594 ENGINE_ERROR_CODE errCode = checkForDBExistence(c.db_file_id); 595 if (errCode != ENGINE_SUCCESS) { 596 return errCode; 597 } 598 599 /* Obtain the vbucket so we can get the previous purge seqno */ 600 VBucketPtr vb = vbMap.getBucket(vbid); 601 if (!vb) { 602 return ENGINE_NOT_MY_VBUCKET; 603 } 604 605 /* Update the compaction ctx with the previous purge seqno */ 606 c.max_purged_seq[vbid] = vb->getPurgeSeqno(); 607 608 LockHolder lh(compactionLock); 609 ExTask task = std::make_shared<CompactTask>(*this, c, cookie); 610 compactionTasks.push_back(std::make_pair(c.db_file_id, task)); 611 if (compactionTasks.size() > 1) { 612 if ((stats.diskQueueSize > compactionWriteQueueCap && 613 compactionTasks.size() > (vbMap.getNumShards() / 2)) || 614 engine.getWorkLoadPolicy().getWorkLoadPattern() == READ_HEAVY) { 615 // Snooze a new compaction task. 616 // We will wake it up when one of the existing compaction tasks is 617 // done. 618 task->snooze(60); 619 } 620 } 621 622 ExecutorPool::get()->schedule(task); 623 624 LOG(EXTENSION_LOG_DEBUG, 625 "Scheduled compaction task %" PRIu64 626 " on db %d," 627 "purge_before_ts = %" PRIu64 ", purge_before_seq = %" PRIu64 628 ", dropdeletes = %d", 629 uint64_t(task->getId()), 630 c.db_file_id, 631 c.purge_before_ts, 632 c.purge_before_seq, 633 c.drop_deletes); 634 635 return ENGINE_EWOULDBLOCK; 636} 637 638void EPBucket::flushOneDeleteAll() { 639 for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) { 640 auto vb = getLockedVBucket(i); 641 if (!vb) { 642 continue; 643 } 644 // Reset the vBucket if it's non-null and not already in the middle of 645 // being created / destroyed. 646 if (!(vb->isBucketCreation() || vb->isDeletionDeferred())) { 647 getRWUnderlying(vb->getId())->reset(i); 648 } 649 // Reset disk item count. 650 vb->setNumTotalItems(0); 651 } 652 653 setDeleteAllComplete(); 654} 655 656std::unique_ptr<PersistenceCallback> EPBucket::flushOneDelOrSet( 657 const queued_item& qi, VBucketPtr& vb) { 658 if (!vb) { 659 --stats.diskQueueSize; 660 return NULL; 661 } 662 663 int64_t bySeqno = qi->getBySeqno(); 664 bool deleted = qi->isDeleted(); 665 rel_time_t queued(qi->getQueuedTime()); 666 667 auto dirtyAge = std::chrono::seconds(ep_current_time() - queued); 668 stats.dirtyAgeHisto.add(dirtyAge); 669 stats.dirtyAge.store(static_cast<rel_time_t>(dirtyAge.count())); 670 stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(), 671 stats.dirtyAgeHighWat.load())); 672 673 KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId()); 674 if (!deleted) { 675 // TODO: Need to separate disk_insert from disk_update because 676 // bySeqno doesn't give us that information. 677 BlockTimer timer(bySeqno == -1 ? 678 &stats.diskInsertHisto : &stats.diskUpdateHisto, 679 bySeqno == -1 ? "disk_insert" : "disk_update", 680 stats.timingLog); 681 auto cb = std::make_unique<PersistenceCallback>(qi, qi->getCas()); 682 rwUnderlying->set(*qi, *cb); 683 return cb; 684 } else { 685 BlockTimer timer(&stats.diskDelHisto, "disk_delete", 686 stats.timingLog); 687 auto cb = std::make_unique<PersistenceCallback>(qi, 0); 688 rwUnderlying->del(*qi, *cb); 689 return cb; 690 } 691} 692 693void EPBucket::compactInternal(compaction_ctx* ctx) { 694 BloomFilterCBPtr filter(new BloomFilterCallback(*this)); 695 ctx->bloomFilterCallback = filter; 696 697 ExpiredItemsCBPtr expiry(new ExpiredItemsCallback(*this)); 698 ctx->expiryCallback = expiry; 699 700 ctx->collectionsEraser = std::bind(&KVBucket::collectionsEraseKey, 701 this, 702 uint16_t(ctx->db_file_id), 703 std::placeholders::_1, 704 std::placeholders::_2, 705 std::placeholders::_3, 706 std::placeholders::_4); 707 708 KVShard* shard = vbMap.getShardByVbId(ctx->db_file_id); 709 KVStore* store = shard->getRWUnderlying(); 710 bool result = store->compactDB(ctx); 711 712 Configuration& config = getEPEngine().getConfiguration(); 713 /* Iterate over all the vbucket ids set in max_purged_seq map. If there is 714 * an entry 715 * in the map for a vbucket id, then it was involved in compaction and thus 716 * can 717 * be used to update the associated bloom filters and purge sequence numbers 718 */ 719 for (auto& it : ctx->max_purged_seq) { 720 const uint16_t vbid = it.first; 721 VBucketPtr vb = getVBucket(vbid); 722 if (!vb) { 723 continue; 724 } 725 726 if (config.isBfilterEnabled() && result) { 727 vb->swapFilter(); 728 } else { 729 vb->clearFilter(); 730 } 731 vb->setPurgeSeqno(it.second); 732 } 733 734 LOG(EXTENSION_LOG_NOTICE, 735 "Compaction of db file id: %d completed (%s). " 736 "tombstones_purged:%" PRIu64 ", collection_items_erased:%" PRIu64 737 ", pre{size:%" PRIu64 ", items:%" PRIu64 ", deleted_items:%" PRIu64 738 ", purge_seqno:%" PRIu64 "}, post{size:%" PRIu64 ", items:%" PRIu64 739 ", deleted_items:%" PRIu64 ", purge_seqno:%" PRIu64 "}", 740 ctx->db_file_id, 741 result ? "ok" : "failed", 742 ctx->stats.tombstonesPurged, 743 ctx->stats.collectionsItemsPurged, 744 ctx->stats.pre.size, 745 ctx->stats.pre.items, 746 ctx->stats.pre.deletedItems, 747 ctx->stats.pre.purgeSeqno, 748 ctx->stats.post.size, 749 ctx->stats.post.items, 750 ctx->stats.post.deletedItems, 751 ctx->stats.post.purgeSeqno); 752 753 // The collections eraser may have gathered some garbage keys which can now 754 // be released. 755 auto vb = getVBucket(uint16_t(ctx->db_file_id)); 756 if (vb) { 757 ctx->eraserContext.processKeys(*vb); 758 } 759} 760 761bool EPBucket::doCompact(compaction_ctx* ctx, const void* cookie) { 762 ENGINE_ERROR_CODE err = ENGINE_SUCCESS; 763 StorageProperties storeProp = getStorageProperties(); 764 bool concWriteCompact = storeProp.hasConcWriteCompact(); 765 uint16_t vbid = ctx->db_file_id; 766 767 /** 768 * Check if the underlying storage engine allows writes concurrently 769 * as the database file is being compacted. If not, a lock needs to 770 * be held in order to serialize access to the database file between 771 * the writer and compactor threads 772 */ 773 if (concWriteCompact == false) { 774 auto vb = getLockedVBucket(vbid, std::try_to_lock); 775 if (!vb.owns_lock()) { 776 // VB currently locked; try again later. 777 return true; 778 } 779 780 if (!vb) { 781 err = ENGINE_NOT_MY_VBUCKET; 782 engine.storeEngineSpecific(cookie, NULL); 783 /** 784 * Decrement session counter here, as memcached thread wouldn't 785 * visit the engine interface in case of a NOT_MY_VB notification 786 */ 787 engine.decrementSessionCtr(); 788 } else { 789 compactInternal(ctx); 790 } 791 } else { 792 compactInternal(ctx); 793 } 794 795 updateCompactionTasks(ctx->db_file_id); 796 797 if (cookie) { 798 engine.notifyIOComplete(cookie, err); 799 } 800 --stats.pendingCompactions; 801 return false; 802} 803 804void EPBucket::updateCompactionTasks(DBFileId db_file_id) { 805 LockHolder lh(compactionLock); 806 bool erased = false, woke = false; 807 std::list<CompTaskEntry>::iterator it = compactionTasks.begin(); 808 while (it != compactionTasks.end()) { 809 if ((*it).first == db_file_id) { 810 it = compactionTasks.erase(it); 811 erased = true; 812 } else { 813 ExTask& task = (*it).second; 814 if (task->getState() == TASK_SNOOZED) { 815 ExecutorPool::get()->wake(task->getId()); 816 woke = true; 817 } 818 ++it; 819 } 820 if (erased && woke) { 821 break; 822 } 823 } 824} 825 826std::pair<uint64_t, bool> EPBucket::getLastPersistedCheckpointId(uint16_t vb) { 827 auto vbucket = vbMap.getBucket(vb); 828 if (vbucket) { 829 return {vbucket->getPersistenceCheckpointId(), true}; 830 } else { 831 return {0, true}; 832 } 833} 834 835ENGINE_ERROR_CODE EPBucket::getFileStats(const void* cookie, 836 ADD_STAT add_stat) { 837 const auto numShards = vbMap.getNumShards(); 838 DBFileInfo totalInfo; 839 840 for (uint16_t shardId = 0; shardId < numShards; shardId++) { 841 const auto dbInfo = 842 getRWUnderlyingByShard(shardId)->getAggrDbFileInfo(); 843 totalInfo.spaceUsed += dbInfo.spaceUsed; 844 totalInfo.fileSize += dbInfo.fileSize; 845 } 846 847 add_casted_stat("ep_db_data_size", totalInfo.spaceUsed, add_stat, cookie); 848 add_casted_stat("ep_db_file_size", totalInfo.fileSize, add_stat, cookie); 849 850 return ENGINE_SUCCESS; 851} 852 853ENGINE_ERROR_CODE EPBucket::getPerVBucketDiskStats(const void* cookie, 854 ADD_STAT add_stat) { 855 class DiskStatVisitor : public VBucketVisitor { 856 public: 857 DiskStatVisitor(const void* c, ADD_STAT a) : cookie(c), add_stat(a) { 858 } 859 860 void visitBucket(VBucketPtr& vb) override { 861 char buf[32]; 862 uint16_t vbid = vb->getId(); 863 DBFileInfo dbInfo = 864 vb->getShard()->getRWUnderlying()->getDbFileInfo(vbid); 865 866 try { 867 checked_snprintf(buf, sizeof(buf), "vb_%d:data_size", vbid); 868 add_casted_stat(buf, dbInfo.spaceUsed, add_stat, cookie); 869 checked_snprintf(buf, sizeof(buf), "vb_%d:file_size", vbid); 870 add_casted_stat(buf, dbInfo.fileSize, add_stat, cookie); 871 } catch (std::exception& error) { 872 LOG(EXTENSION_LOG_WARNING, 873 "DiskStatVisitor::visitBucket: Failed to build stat: %s", 874 error.what()); 875 } 876 } 877 878 private: 879 const void* cookie; 880 ADD_STAT add_stat; 881 }; 882 883 DiskStatVisitor dsv(cookie, add_stat); 884 visit(dsv); 885 return ENGINE_SUCCESS; 886} 887 888VBucketPtr EPBucket::makeVBucket(VBucket::id_type id, 889 vbucket_state_t state, 890 KVShard* shard, 891 std::unique_ptr<FailoverTable> table, 892 NewSeqnoCallback newSeqnoCb, 893 vbucket_state_t initState, 894 int64_t lastSeqno, 895 uint64_t lastSnapStart, 896 uint64_t lastSnapEnd, 897 uint64_t purgeSeqno, 898 uint64_t maxCas, 899 int64_t hlcEpochSeqno, 900 bool mightContainXattrs, 901 const std::string& collectionsManifest) { 902 auto flusherCb = std::make_shared<NotifyFlusherCB>(shard); 903 // Not using make_shared or allocate_shared 904 // 1. make_shared doesn't accept a Deleter 905 // 2. allocate_shared has inconsistencies between platforms in calling 906 // alloc.destroy (libc++ doesn't call it) 907 return VBucketPtr(new EPVBucket(id, 908 state, 909 stats, 910 engine.getCheckpointConfig(), 911 shard, 912 lastSeqno, 913 lastSnapStart, 914 lastSnapEnd, 915 std::move(table), 916 flusherCb, 917 std::move(newSeqnoCb), 918 engine.getConfiguration(), 919 eviction_policy, 920 initState, 921 purgeSeqno, 922 maxCas, 923 hlcEpochSeqno, 924 mightContainXattrs, 925 collectionsManifest), 926 VBucket::DeferredDeleter(engine)); 927} 928 929ENGINE_ERROR_CODE EPBucket::statsVKey(const DocKey& key, 930 uint16_t vbucket, 931 const void* cookie) { 932 VBucketPtr vb = getVBucket(vbucket); 933 if (!vb) { 934 return ENGINE_NOT_MY_VBUCKET; 935 } 936 937 return vb->statsVKey(key, cookie, engine, bgFetchDelay); 938} 939 940void EPBucket::completeStatsVKey(const void* cookie, 941 const DocKey& key, 942 uint16_t vbid, 943 uint64_t bySeqNum) { 944 GetValue gcb = getROUnderlying(vbid)->get(key, vbid); 945 946 if (eviction_policy == FULL_EVICTION) { 947 VBucketPtr vb = getVBucket(vbid); 948 if (vb) { 949 vb->completeStatsVKey(key, gcb); 950 } 951 } 952 953 if (gcb.getStatus() == ENGINE_SUCCESS) { 954 engine.addLookupResult(cookie, std::move(gcb.item)); 955 } else { 956 engine.addLookupResult(cookie, NULL); 957 } 958 959 --stats.numRemainingBgJobs; 960 engine.notifyIOComplete(cookie, ENGINE_SUCCESS); 961} 962 963/** 964 * Class that handles the disk callback during the rollback. 965 * For each mutation/deletion which was discarded as part of the rollback, 966 * the callback() method is invoked with the key of the discarded update. 967 * It can then lookup the state of that key using dbHandle (which represents the 968 * new, rolled-back file) and correct the in-memory view: 969 * 970 * a) If the key is not present in the Rollback header then delete it from 971 * the HashTable (if either didn't exist yet, or had previously been 972 * deleted in the Rollback header). 973 * b) If the key is present in the Rollback header then replace the in-memory 974 * value with the value from the Rollback header. 975 */ 976class EPDiskRollbackCB : public RollbackCB { 977public: 978 EPDiskRollbackCB(EventuallyPersistentEngine& e) : RollbackCB(), engine(e) { 979 } 980 981 void callback(GetValue& val) { 982 if (!val.item) { 983 throw std::invalid_argument( 984 "EPDiskRollbackCB::callback: val is NULL"); 985 } 986 if (dbHandle == nullptr) { 987 throw std::logic_error( 988 "EPDiskRollbackCB::callback: dbHandle is NULL"); 989 } 990 UniqueItemPtr itm(std::move(val.item)); 991 VBucketPtr vb = engine.getVBucket(itm->getVBucketId()); 992 GetValue gcb = engine.getKVBucket() 993 ->getROUnderlying(itm->getVBucketId()) 994 ->getWithHeader(dbHandle, 995 itm->getKey(), 996 itm->getVBucketId(), 997 GetMetaOnly::No); 998 if (gcb.getStatus() == ENGINE_SUCCESS) { 999 UniqueItemPtr it(std::move(gcb.item)); 1000 if (it->isDeleted()) { 1001 removeDeletedDoc(*vb, it->getKey()); 1002 } else { 1003 MutationStatus mtype = vb->setFromInternal(*it); 1004 1005 if (mtype == MutationStatus::NoMem) { 1006 setStatus(ENGINE_ENOMEM); 1007 } 1008 } 1009 } else if (gcb.getStatus() == ENGINE_KEY_ENOENT) { 1010 removeDeletedDoc(*vb, itm->getKey()); 1011 } else { 1012 LOG(EXTENSION_LOG_WARNING, 1013 "EPDiskRollbackCB::callback:Unexpected Error Status: %d", 1014 gcb.getStatus()); 1015 } 1016 } 1017 1018 /// Remove a deleted-on-disk document from the VBucket's hashtable. 1019 void removeDeletedDoc(VBucket& vb, const DocKey& key) { 1020 if (vb.deleteKey(key)) { 1021 setStatus(ENGINE_SUCCESS); 1022 } else { 1023 // Document didn't exist in memory - may have been deleted in since 1024 // the checkpoint. 1025 setStatus(ENGINE_KEY_ENOENT); 1026 } 1027 // Irrespective of if the in-memory delete succeeded; the document 1028 // doesn't exist on disk; so decrement the item count. 1029 vb.decrNumTotalItems(); 1030 } 1031 1032private: 1033 EventuallyPersistentEngine& engine; 1034}; 1035 1036RollbackResult EPBucket::doRollback(uint16_t vbid, uint64_t rollbackSeqno) { 1037 auto cb = std::make_shared<EPDiskRollbackCB>(engine); 1038 KVStore* rwUnderlying = vbMap.getShardByVbId(vbid)->getRWUnderlying(); 1039 return rwUnderlying->rollback(vbid, rollbackSeqno, cb); 1040} 1041 1042void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) { 1043 std::vector<queued_item> items; 1044 vb.checkpointManager->getAllItemsForCursor(CheckpointManager::pCursorName, 1045 items); 1046 for (const auto& item : items) { 1047 if (item->getBySeqno() > rollbackSeqno && 1048 !item->isCheckPointMetaItem()) { 1049 GetValue gcb = getROUnderlying(vb.getId()) 1050 ->get(item->getKey(), vb.getId(), false); 1051 1052 if (gcb.getStatus() == ENGINE_SUCCESS) { 1053 vb.setFromInternal(*gcb.item.get()); 1054 } else { 1055 vb.deleteKey(item->getKey()); 1056 } 1057 } 1058 } 1059} 1060 1061void EPBucket::notifyNewSeqno(const uint16_t vbid, 1062 const VBNotifyCtx& notifyCtx) { 1063 if (notifyCtx.notifyFlusher) { 1064 notifyFlusher(vbid); 1065 } 1066 if (notifyCtx.notifyReplication) { 1067 notifyReplication(vbid, notifyCtx.bySeqno); 1068 } 1069} 1070