1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2015 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include "config.h" 19 20#include <string.h> 21#include <time.h> 22 23#include <fstream> 24#include <functional> 25#include <iostream> 26#include <map> 27#include <sstream> 28#include <string> 29#include <utility> 30#include <vector> 31 32#include <phosphor/phosphor.h> 33#include <platform/make_unique.h> 34 35#include "access_scanner.h" 36#include "checkpoint.h" 37#include "checkpoint_remover.h" 38#include "collections/manager.h" 39#include "conflict_resolution.h" 40#include "connmap.h" 41#include "dcp/dcpconnmap.h" 42#include "defragmenter.h" 43#include "ep_engine.h" 44#include "ep_time.h" 45#include "ext_meta_parser.h" 46#include "failover-table.h" 47#include "flusher.h" 48#include "htresizer.h" 49#include "item_compressor.h" 50#include "kv_bucket.h" 51#include "kvshard.h" 52#include "kvstore.h" 53#include "locks.h" 54#include "mutation_log.h" 55#include "replicationthrottle.h" 56#include "statwriter.h" 57#include "tasks.h" 58#include "trace_helpers.h" 59#include "vb_count_visitor.h" 60#include "vbucket.h" 61#include "vbucket_bgfetch_item.h" 62#include "vbucketdeletiontask.h" 63#include "warmup.h" 64 65class StatsValueChangeListener : public ValueChangedListener { 66public: 67 StatsValueChangeListener(EPStats& st, KVBucket& str) 68 : stats(st), store(str) { 69 // EMPTY 70 } 71 72 void sizeValueChanged(const std::string& key, size_t value) override { 73 if (key.compare("max_size") == 0) { 74 stats.setMaxDataSize(value); 75 store.getEPEngine().getDcpConnMap(). \ 76 updateMaxActiveSnoozingBackfills(value); 77 size_t low_wat = static_cast<size_t> 78 (static_cast<double>(value) * stats.mem_low_wat_percent); 79 size_t high_wat = static_cast<size_t> 80 (static_cast<double>(value) * stats.mem_high_wat_percent); 81 stats.mem_low_wat.store(low_wat); 82 stats.mem_high_wat.store(high_wat); 83 store.setCursorDroppingLowerUpperThresholds(value); 84 } else if (key.compare("mem_low_wat") == 0) { 85 stats.mem_low_wat.store(value); 86 stats.mem_low_wat_percent.store( 87 (double)(value) / stats.getMaxDataSize()); 88 } else if (key.compare("mem_high_wat") == 0) { 89 stats.mem_high_wat.store(value); 90 stats.mem_high_wat_percent.store( 91 (double)(value) / stats.getMaxDataSize()); 92 } else if (key.compare("replication_throttle_threshold") == 0) { 93 stats.replicationThrottleThreshold.store( 94 static_cast<double>(value) / 100.0); 95 } else if (key.compare("warmup_min_memory_threshold") == 0) { 96 stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0); 97 } else if (key.compare("warmup_min_items_threshold") == 0) { 98 stats.warmupNumReadCap.store(static_cast<double>(value) / 100.0); 99 } else { 100 LOG(EXTENSION_LOG_WARNING, 101 "StatsValueChangeListener(size_t) failed to change value for " 102 "unknown variable, %s", 103 key.c_str()); 104 } 105 } 106 107 void floatValueChanged(const std::string& key, float value) override { 108 if (key.compare("mem_used_merge_threshold_percent") == 0) { 109 stats.setMemUsedMergeThresholdPercent(value); 110 } else { 111 LOG(EXTENSION_LOG_WARNING, 112 "StatsValueChangeListener(float) failed to change value for " 113 "unknown variable, %s", 114 key.c_str()); 115 } 116 } 117 118private: 119 EPStats& stats; 120 KVBucket& store; 121}; 122 123/** 124 * A configuration value changed listener that responds to ep-engine 125 * parameter changes by invoking engine-specific methods on 126 * configuration change events. 127 */ 128class EPStoreValueChangeListener : public ValueChangedListener { 129public: 130 EPStoreValueChangeListener(KVBucket& st) : store(st) { 131 } 132 133 virtual void sizeValueChanged(const std::string &key, size_t value) { 134 if (key.compare("bg_fetch_delay") == 0) { 135 store.setBGFetchDelay(static_cast<uint32_t>(value)); 136 } else if (key.compare("compaction_write_queue_cap") == 0) { 137 store.setCompactionWriteQueueCap(value); 138 } else if (key.compare("exp_pager_stime") == 0) { 139 store.setExpiryPagerSleeptime(value); 140 } else if (key.compare("alog_sleep_time") == 0) { 141 store.setAccessScannerSleeptime(value, false); 142 } else if (key.compare("alog_task_time") == 0) { 143 store.resetAccessScannerStartTime(); 144 } else if (key.compare("mutation_mem_threshold") == 0) { 145 double mem_threshold = static_cast<double>(value) / 100; 146 VBucket::setMutationMemoryThreshold(mem_threshold); 147 } else if (key.compare("backfill_mem_threshold") == 0) { 148 double backfill_threshold = static_cast<double>(value) / 100; 149 store.setBackfillMemoryThreshold(backfill_threshold); 150 } else if (key.compare("compaction_exp_mem_threshold") == 0) { 151 store.setCompactionExpMemThreshold(value); 152 } else if (key.compare("replication_throttle_cap_pcnt") == 0) { 153 store.getEPEngine().getReplicationThrottle().setCapPercent(value); 154 } else if (key.compare("max_ttl") == 0) { 155 store.setMaxTtl(value); 156 } else { 157 LOG(EXTENSION_LOG_WARNING, 158 "Failed to change value for unknown variable, %s", 159 key.c_str()); 160 } 161 } 162 163 virtual void ssizeValueChanged(const std::string& key, ssize_t value) { 164 if (key.compare("exp_pager_initial_run_time") == 0) { 165 store.setExpiryPagerTasktime(value); 166 } else if (key.compare("replication_throttle_queue_cap") == 0) { 167 store.getEPEngine().getReplicationThrottle().setQueueCap(value); 168 } 169 } 170 171 virtual void booleanValueChanged(const std::string &key, bool value) { 172 if (key.compare("access_scanner_enabled") == 0) { 173 if (value) { 174 store.enableAccessScannerTask(); 175 } else { 176 store.disableAccessScannerTask(); 177 } 178 } else if (key.compare("bfilter_enabled") == 0) { 179 store.setAllBloomFilters(value); 180 } else if (key.compare("exp_pager_enabled") == 0) { 181 if (value) { 182 store.enableExpiryPager(); 183 } else { 184 store.disableExpiryPager(); 185 } 186 } else if (key.compare("xattr_enabled") == 0) { 187 store.setXattrEnabled(value); 188 } 189 } 190 191 virtual void floatValueChanged(const std::string &key, float value) { 192 if (key.compare("bfilter_residency_threshold") == 0) { 193 store.setBfiltersResidencyThreshold(value); 194 } else if (key.compare("dcp_min_compression_ratio") == 0) { 195 store.getEPEngine().updateDcpMinCompressionRatio(value); 196 } 197 } 198 199private: 200 KVBucket& store; 201}; 202 203class PendingOpsNotification : public GlobalTask { 204public: 205 PendingOpsNotification(EventuallyPersistentEngine& e, VBucketPtr& vb) 206 : GlobalTask(&e, TaskId::PendingOpsNotification, 0, false), 207 engine(e), 208 vbucket(vb), 209 description("Notify pending operations for vbucket " + 210 std::to_string(vbucket->getId())) { 211 } 212 213 std::string getDescription() { 214 return description; 215 } 216 217 std::chrono::microseconds maxExpectedDuration() { 218 // This should be a very fast operation (p50 under 10us), however we 219 // have observed long tails: p99.9 of 20ms; so use a threshold of 100ms. 220 return std::chrono::milliseconds(100); 221 } 222 223 bool run(void) { 224 TRACE_EVENT1("ep-engine/task", 225 "PendingOpsNotification", 226 "vb", 227 vbucket->getId()); 228 vbucket->fireAllOps(engine); 229 return false; 230 } 231 232private: 233 EventuallyPersistentEngine &engine; 234 VBucketPtr vbucket; 235 const std::string description; 236}; 237 238KVBucket::KVBucket(EventuallyPersistentEngine& theEngine) 239 : engine(theEngine), 240 stats(engine.getEpStats()), 241 vbMap(theEngine.getConfiguration(), *this), 242 defragmenterTask(NULL), 243 itemCompressorTask(nullptr), 244 itemFreqDecayerTask(nullptr), 245 vb_mutexes(engine.getConfiguration().getMaxVbuckets()), 246 diskDeleteAll(false), 247 bgFetchDelay(0), 248 backfillMemoryThreshold(0.95), 249 statsSnapshotTaskId(0), 250 lastTransTimePerItem(0), 251 collectionsManager(std::make_unique<Collections::Manager>()), 252 xattrEnabled(true), 253 maxTtl(engine.getConfiguration().getMaxTtl()) { 254 cachedResidentRatio.activeRatio.store(0); 255 cachedResidentRatio.replicaRatio.store(0); 256 257 Configuration &config = engine.getConfiguration(); 258 for (uint16_t i = 0; i < config.getMaxNumShards(); i++) { 259 accessLog.emplace_back( 260 config.getAlogPath() + "." + std::to_string(i), 261 config.getAlogBlockSize()); 262 } 263 264 265 const size_t size = GlobalTask::allTaskIds.size(); 266 stats.schedulingHisto.resize(size); 267 stats.taskRuntimeHisto.resize(size); 268 269 for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) { 270 stats.schedulingHisto[i].reset(); 271 stats.taskRuntimeHisto[i].reset(); 272 } 273 274 ExecutorPool::get()->registerTaskable(ObjectRegistry::getCurrentEngine()->getTaskable()); 275 276 // Reset memory overhead when bucket is created. 277 for (auto& core : stats.coreLocal) { 278 core->memOverhead = 0; 279 } 280 stats.coreLocal.get()->memOverhead = sizeof(KVBucket); 281 282 // Set memUsedThresholdPercent before setting max_size 283 stats.setMemUsedMergeThresholdPercent( 284 config.getMemUsedMergeThresholdPercent()); 285 config.addValueChangedListener( 286 "mem_used_merge_threshold_percent", 287 std::make_unique<StatsValueChangeListener>(stats, *this)); 288 stats.setMaxDataSize(config.getMaxSize()); 289 config.addValueChangedListener( 290 "max_size", 291 std::make_unique<StatsValueChangeListener>(stats, *this)); 292 getEPEngine().getDcpConnMap().updateMaxActiveSnoozingBackfills( 293 config.getMaxSize()); 294 295 stats.mem_low_wat.store(config.getMemLowWat()); 296 config.addValueChangedListener( 297 "mem_low_wat", 298 std::make_unique<StatsValueChangeListener>(stats, *this)); 299 stats.mem_low_wat_percent.store( 300 (double)(stats.mem_low_wat.load()) / stats.getMaxDataSize()); 301 302 stats.mem_high_wat.store(config.getMemHighWat()); 303 config.addValueChangedListener( 304 "mem_high_wat", 305 std::make_unique<StatsValueChangeListener>(stats, *this)); 306 stats.mem_high_wat_percent.store( 307 (double)(stats.mem_high_wat.load()) / stats.getMaxDataSize()); 308 309 setCursorDroppingLowerUpperThresholds(config.getMaxSize()); 310 311 stats.replicationThrottleThreshold.store(static_cast<double> 312 (config.getReplicationThrottleThreshold()) 313 / 100.0); 314 config.addValueChangedListener( 315 "replication_throttle_threshold", 316 std::make_unique<StatsValueChangeListener>(stats, *this)); 317 318 stats.replicationThrottleWriteQueueCap.store( 319 config.getReplicationThrottleQueueCap()); 320 config.addValueChangedListener( 321 "replication_throttle_queue_cap", 322 std::make_unique<EPStoreValueChangeListener>(*this)); 323 config.addValueChangedListener( 324 "replication_throttle_cap_pcnt", 325 std::make_unique<EPStoreValueChangeListener>(*this)); 326 327 setBGFetchDelay(config.getBgFetchDelay()); 328 config.addValueChangedListener( 329 "bg_fetch_delay", 330 std::make_unique<EPStoreValueChangeListener>(*this)); 331 332 stats.warmupMemUsedCap.store(static_cast<double> 333 (config.getWarmupMinMemoryThreshold()) / 100.0); 334 config.addValueChangedListener( 335 "warmup_min_memory_threshold", 336 std::make_unique<StatsValueChangeListener>(stats, *this)); 337 stats.warmupNumReadCap.store(static_cast<double> 338 (config.getWarmupMinItemsThreshold()) / 100.0); 339 config.addValueChangedListener( 340 "warmup_min_items_threshold", 341 std::make_unique<StatsValueChangeListener>(stats, *this)); 342 343 double mem_threshold = static_cast<double> 344 (config.getMutationMemThreshold()) / 100; 345 VBucket::setMutationMemoryThreshold(mem_threshold); 346 config.addValueChangedListener( 347 "mutation_mem_threshold", 348 std::make_unique<EPStoreValueChangeListener>(*this)); 349 350 double backfill_threshold = static_cast<double> 351 (config.getBackfillMemThreshold()) / 100; 352 setBackfillMemoryThreshold(backfill_threshold); 353 config.addValueChangedListener( 354 "backfill_mem_threshold", 355 std::make_unique<EPStoreValueChangeListener>(*this)); 356 357 config.addValueChangedListener( 358 "bfilter_enabled", 359 std::make_unique<EPStoreValueChangeListener>(*this)); 360 361 bfilterResidencyThreshold = config.getBfilterResidencyThreshold(); 362 config.addValueChangedListener( 363 "bfilter_residency_threshold", 364 std::make_unique<EPStoreValueChangeListener>(*this)); 365 366 compactionExpMemThreshold = config.getCompactionExpMemThreshold(); 367 config.addValueChangedListener( 368 "compaction_exp_mem_threshold", 369 std::make_unique<EPStoreValueChangeListener>(*this)); 370 371 compactionWriteQueueCap = config.getCompactionWriteQueueCap(); 372 config.addValueChangedListener( 373 "compaction_write_queue_cap", 374 std::make_unique<EPStoreValueChangeListener>(*this)); 375 376 config.addValueChangedListener( 377 "dcp_min_compression_ratio", 378 std::make_unique<EPStoreValueChangeListener>(*this)); 379 380 config.addValueChangedListener( 381 "xattr_enabled", 382 std::make_unique<EPStoreValueChangeListener>(*this)); 383 384 config.addValueChangedListener( 385 "max_ttl", std::make_unique<EPStoreValueChangeListener>(*this)); 386 387 xattrEnabled = config.isXattrEnabled(); 388 389 // Always create the item pager; but initially disable, leaving scheduling 390 // up to the specific KVBucket subclasses. 391 itemPagerTask = std::make_shared<ItemPager>(engine, stats); 392 disableItemPager(); 393 394 initializeWarmupTask(); 395} 396 397bool KVBucket::initialize() { 398 // We should nuke everything unless we want warmup 399 Configuration &config = engine.getConfiguration(); 400 if (!config.isWarmup()) { 401 reset(); 402 } 403 404 startWarmupTask(); 405 406 initializeExpiryPager(config); 407 408 ExTask htrTask = std::make_shared<HashtableResizerTask>(this, 10); 409 ExecutorPool::get()->schedule(htrTask); 410 411 size_t checkpointRemoverInterval = config.getChkRemoverStime(); 412 chkTask = std::make_shared<ClosedUnrefCheckpointRemoverTask>( 413 &engine, stats, checkpointRemoverInterval); 414 ExecutorPool::get()->schedule(chkTask); 415 416 ExTask workloadMonitorTask = 417 std::make_shared<WorkLoadMonitor>(&engine, false); 418 ExecutorPool::get()->schedule(workloadMonitorTask); 419 420#if HAVE_JEMALLOC 421 /* Only create the defragmenter task if we have an underlying memory 422 * allocator which can facilitate defragmenting memory. 423 */ 424 defragmenterTask = std::make_shared<DefragmenterTask>(&engine, stats); 425 ExecutorPool::get()->schedule(defragmenterTask); 426#endif 427 428 itemCompressorTask = std::make_shared<ItemCompressorTask>(&engine, stats); 429 ExecutorPool::get()->schedule(itemCompressorTask); 430 431 /* 432 * Creates the ItemFreqDecayer task which is used to ensure that the 433 * frequency counters of items stored in the hash table do not all 434 * become saturated. Once the task runs it will snooze for int max 435 * seconds and will only be woken up when the frequency counter of an 436 * item in the hash table becomes saturated. 437 */ 438 itemFreqDecayerTask = std::make_shared<ItemFreqDecayerTask>( 439 &engine, config.getItemFreqDecayerPercent()); 440 ExecutorPool::get()->schedule(itemFreqDecayerTask); 441 442 return true; 443} 444 445void KVBucket::initializeWarmupTask() { 446 if (engine.getConfiguration().isWarmup()) { 447 warmupTask = std::make_unique<Warmup>(*this, engine.getConfiguration()); 448 } 449} 450void KVBucket::startWarmupTask() { 451 if (warmupTask) { 452 warmupTask->start(); 453 } else { 454 // No warmup, immediately online the bucket. 455 warmupCompleted(); 456 } 457} 458 459void KVBucket::deinitialize() { 460 stopWarmup(); 461 ExecutorPool::get()->stopTaskGroup(engine.getTaskable().getGID(), 462 NONIO_TASK_IDX, stats.forceShutdown); 463 464 ExecutorPool::get()->cancel(statsSnapshotTaskId); 465 466 { 467 LockHolder lh(accessScanner.mutex); 468 ExecutorPool::get()->cancel(accessScanner.task); 469 } 470 471 ExecutorPool::get()->unregisterTaskable(engine.getTaskable(), 472 stats.forceShutdown); 473} 474 475KVBucket::~KVBucket() { 476 LOG(EXTENSION_LOG_NOTICE, "Deleting vb_mutexes"); 477 LOG(EXTENSION_LOG_NOTICE, "Deleting defragmenterTask"); 478 defragmenterTask.reset(); 479 LOG(EXTENSION_LOG_NOTICE, "Deleting itemCompressorTask"); 480 itemCompressorTask.reset(); 481 LOG(EXTENSION_LOG_NOTICE, "Deleting itemFreqDecayerTask"); 482 itemFreqDecayerTask.reset(); 483 LOG(EXTENSION_LOG_NOTICE, "Deleted KvBucket."); 484} 485 486const Flusher* KVBucket::getFlusher(uint16_t shardId) { 487 return vbMap.shards[shardId]->getFlusher(); 488} 489 490Warmup* KVBucket::getWarmup(void) const { 491 return warmupTask.get(); 492} 493 494bool KVBucket::pauseFlusher() { 495 // Nothing do to - no flusher in this class 496 return false; 497} 498 499bool KVBucket::resumeFlusher() { 500 // Nothing do to - no flusher in this class 501 return false; 502} 503 504void KVBucket::wakeUpFlusher() { 505 // Nothing do to - no flusher in this class 506} 507 508protocol_binary_response_status KVBucket::evictKey(const DocKey& key, 509 VBucket::id_type vbucket, 510 const char** msg) { 511 VBucketPtr vb = getVBucket(vbucket); 512 if (!vb || (vb->getState() != vbucket_state_active)) { 513 return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET; 514 } 515 516 return vb->evictKey(key, msg); 517} 518 519void KVBucket::getValue(Item& it) { 520 auto gv = 521 getROUnderlying(it.getVBucketId()) 522 ->get(it.getKey(), it.getVBucketId(), true /*fetchDelete*/); 523 524 if (gv.getStatus() != ENGINE_SUCCESS) { 525 // Cannot continue to pre_expiry, log this failed get and return 526 LOG(EXTENSION_LOG_WARNING, 527 "KVBucket::getValue failed get for item vb:%" PRIu16 528 ", it.seqno:%" PRIi64 ", status:%d", 529 it.getVBucketId(), 530 it.getBySeqno(), 531 gv.getStatus()); 532 return; 533 } else if (!gv.item->isDeleted()) { 534 it.replaceValue(gv.item->getValue().get()); 535 } 536} 537 538void KVBucket::runPreExpiryHook(VBucket& vb, Item& it) { 539 it.decompressValue(); // A no-op for already decompressed items 540 auto info = 541 it.toItemInfo(vb.failovers->getLatestUUID(), vb.getHLCEpochSeqno()); 542 if (engine.getServerApi()->document->pre_expiry(info)) { 543 // The payload is modified and contains data we should use 544 it.replaceValue(Blob::New(static_cast<char*>(info.value[0].iov_base), 545 info.value[0].iov_len)); 546 it.setDataType(info.datatype); 547 } else { 548 // Make the document empty and raw 549 it.replaceValue(Blob::New(0)); 550 it.setDataType(PROTOCOL_BINARY_RAW_BYTES); 551 } 552} 553 554void KVBucket::deleteExpiredItem(Item& it, 555 time_t startTime, 556 ExpireBy source) { 557 VBucketPtr vb = getVBucket(it.getVBucketId()); 558 559 if (vb) { 560 // MB-25931: Empty XATTR items need their value before we can call 561 // pre_expiry. These occur because the value has been evicted. 562 if (mcbp::datatype::is_xattr(it.getDataType()) && it.getNBytes() == 0) { 563 getValue(it); 564 } 565 566 // Process positive seqnos (ignoring special *temp* items) and only 567 // those items with a value 568 if (it.getBySeqno() >= 0 && it.getNBytes()) { 569 runPreExpiryHook(*vb, it); 570 } 571 572 // Obtain reader access to the VB state change lock so that 573 // the VB can't switch state whilst we're processing 574 ReaderLockHolder rlh(vb->getStateLock()); 575 if (vb->getState() == vbucket_state_active) { 576 vb->deleteExpiredItem(it, startTime, source); 577 } 578 } 579} 580 581void KVBucket::deleteExpiredItems( 582 std::list<Item>& itms, ExpireBy source) { 583 std::list<std::pair<uint16_t, std::string> >::iterator it; 584 time_t startTime = ep_real_time(); 585 for (auto& it : itms) { 586 deleteExpiredItem(it, startTime, source); 587 } 588} 589 590bool KVBucket::isMetaDataResident(VBucketPtr &vb, const DocKey& key) { 591 592 if (!vb) { 593 throw std::invalid_argument("EPStore::isMetaDataResident: vb is NULL"); 594 } 595 596 auto hbl = vb->ht.getLockedBucket(key); 597 StoredValue* v = vb->ht.unlocked_find( 598 key, hbl.getBucketNum(), WantsDeleted::No, TrackReference::No); 599 600 if (v && !v->isTempItem()) { 601 return true; 602 } else { 603 return false; 604 } 605} 606 607void KVBucket::logQTime(TaskId taskType, const ProcessClock::duration enqTime) { 608 auto ms = std::chrono::duration_cast<std::chrono::microseconds>(enqTime); 609 stats.schedulingHisto[static_cast<int>(taskType)].add(ms); 610} 611 612void KVBucket::logRunTime(TaskId taskType, 613 const ProcessClock::duration runTime) { 614 auto ms = std::chrono::duration_cast<std::chrono::microseconds>(runTime); 615 stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ms); 616} 617 618ENGINE_ERROR_CODE KVBucket::set(Item& itm, 619 const void* cookie, 620 cb::StoreIfPredicate predicate) { 621 VBucketPtr vb = getVBucket(itm.getVBucketId()); 622 if (!vb) { 623 ++stats.numNotMyVBuckets; 624 return ENGINE_NOT_MY_VBUCKET; 625 } 626 627 // Obtain read-lock on VB state to ensure VB state changes are interlocked 628 // with this set 629 ReaderLockHolder rlh(vb->getStateLock()); 630 if (vb->getState() == vbucket_state_dead) { 631 ++stats.numNotMyVBuckets; 632 return ENGINE_NOT_MY_VBUCKET; 633 } else if (vb->getState() == vbucket_state_replica) { 634 ++stats.numNotMyVBuckets; 635 return ENGINE_NOT_MY_VBUCKET; 636 } else if (vb->getState() == vbucket_state_pending) { 637 if (vb->addPendingOp(cookie)) { 638 return ENGINE_EWOULDBLOCK; 639 } 640 } else if (vb->isTakeoverBackedUp()) { 641 LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a set op" 642 ", becuase takeover is lagging", vb->getId()); 643 return ENGINE_TMPFAIL; 644 } 645 646 { // collections read-lock scope 647 auto collectionsRHandle = vb->lockCollections(); 648 if (!collectionsRHandle.doesKeyContainValidCollection(itm.getKey())) { 649 return ENGINE_UNKNOWN_COLLECTION; 650 } // now hold collections read access for the duration of the set 651 652 return vb->set(itm, cookie, engine, bgFetchDelay, predicate); 653 } 654} 655 656ENGINE_ERROR_CODE KVBucket::add(Item &itm, const void *cookie) 657{ 658 VBucketPtr vb = getVBucket(itm.getVBucketId()); 659 if (!vb) { 660 ++stats.numNotMyVBuckets; 661 return ENGINE_NOT_MY_VBUCKET; 662 } 663 664 // Obtain read-lock on VB state to ensure VB state changes are interlocked 665 // with this add 666 ReaderLockHolder rlh(vb->getStateLock()); 667 if (vb->getState() == vbucket_state_dead || 668 vb->getState() == vbucket_state_replica) { 669 ++stats.numNotMyVBuckets; 670 return ENGINE_NOT_MY_VBUCKET; 671 } else if (vb->getState() == vbucket_state_pending) { 672 if (vb->addPendingOp(cookie)) { 673 return ENGINE_EWOULDBLOCK; 674 } 675 } else if (vb->isTakeoverBackedUp()) { 676 LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a add op" 677 ", becuase takeover is lagging", vb->getId()); 678 return ENGINE_TMPFAIL; 679 } 680 681 if (itm.getCas() != 0) { 682 // Adding with a cas value doesn't make sense.. 683 return ENGINE_NOT_STORED; 684 } 685 686 { // collections read-lock scope 687 auto collectionsRHandle = vb->lockCollections(itm.getKey()); 688 if (!collectionsRHandle.valid()) { 689 return ENGINE_UNKNOWN_COLLECTION; 690 } // now hold collections read access for the duration of the add 691 692 return vb->add(itm, cookie, engine, bgFetchDelay, collectionsRHandle); 693 } 694} 695 696ENGINE_ERROR_CODE KVBucket::replace(Item& itm, 697 const void* cookie, 698 cb::StoreIfPredicate predicate) { 699 VBucketPtr vb = getVBucket(itm.getVBucketId()); 700 if (!vb) { 701 ++stats.numNotMyVBuckets; 702 return ENGINE_NOT_MY_VBUCKET; 703 } 704 705 // Obtain read-lock on VB state to ensure VB state changes are interlocked 706 // with this replace 707 ReaderLockHolder rlh(vb->getStateLock()); 708 if (vb->getState() == vbucket_state_dead || 709 vb->getState() == vbucket_state_replica) { 710 ++stats.numNotMyVBuckets; 711 return ENGINE_NOT_MY_VBUCKET; 712 } else if (vb->getState() == vbucket_state_pending) { 713 if (vb->addPendingOp(cookie)) { 714 return ENGINE_EWOULDBLOCK; 715 } 716 } 717 718 { // collections read-lock scope 719 auto collectionsRHandle = vb->lockCollections(itm.getKey()); 720 if (!collectionsRHandle.valid()) { 721 return ENGINE_UNKNOWN_COLLECTION; 722 } // now hold collections read access for the duration of the set 723 724 return vb->replace(itm, 725 cookie, 726 engine, 727 bgFetchDelay, 728 predicate, 729 collectionsRHandle); 730 } 731} 732 733ENGINE_ERROR_CODE KVBucket::addBackfillItem(Item& itm, 734 GenerateBySeqno genBySeqno, 735 ExtendedMetaData* emd) { 736 VBucketPtr vb = getVBucket(itm.getVBucketId()); 737 if (!vb) { 738 ++stats.numNotMyVBuckets; 739 return ENGINE_NOT_MY_VBUCKET; 740 } 741 742 // Obtain read-lock on VB state to ensure VB state changes are interlocked 743 // with this addBackfillItem 744 ReaderLockHolder rlh(vb->getStateLock()); 745 if (vb->getState() == vbucket_state_dead || 746 vb->getState() == vbucket_state_active) { 747 ++stats.numNotMyVBuckets; 748 return ENGINE_NOT_MY_VBUCKET; 749 } 750 751 //check for the incoming item's CAS validity 752 if (!Item::isValidCas(itm.getCas())) { 753 return ENGINE_KEY_EEXISTS; 754 } 755 756 return vb->addBackfillItem(itm, genBySeqno); 757} 758 759ENGINE_ERROR_CODE KVBucket::setVBucketState(uint16_t vbid, 760 vbucket_state_t to, 761 bool transfer, 762 const void* cookie) { 763 // MB-25197: we shouldn't process setVBState if warmup hasn't yet loaded 764 // the vbucket state data. 765 if (cookie && shouldSetVBStateBlock(cookie)) { 766 LOG(EXTENSION_LOG_NOTICE, 767 "KVBucket::setVBucketState blocking vb:%" PRIu16 768 ", to:%s, transfer:%d, cookie:%p", 769 vbid, 770 VBucket::toString(to), 771 transfer, 772 cookie); 773 return ENGINE_EWOULDBLOCK; 774 } 775 776 // Lock to prevent a race condition between a failed update and add. 777 std::unique_lock<std::mutex> lh(vbsetMutex); 778 return setVBucketState_UNLOCKED(vbid, to, transfer, true /*notifyDcp*/, lh); 779} 780 781ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED( 782 uint16_t vbid, 783 vbucket_state_t to, 784 bool transfer, 785 bool notify_dcp, 786 std::unique_lock<std::mutex>& vbset, 787 WriterLockHolder* vbStateLock) { 788 VBucketPtr vb = vbMap.getBucket(vbid); 789 if (vb && to == vb->getState()) { 790 return ENGINE_SUCCESS; 791 } 792 793 if (vb) { 794 vbucket_state_t oldstate = vb->getState(); 795 vbMap.decVBStateCount(oldstate); 796 if (vbStateLock) { 797 vb->setState_UNLOCKED(to, *vbStateLock); 798 } else { 799 vb->setState(to); 800 } 801 802 if (oldstate != to && notify_dcp) { 803 bool closeInboundStreams = false; 804 if (to == vbucket_state_active && !transfer) { 805 /** 806 * Close inbound (passive) streams into the vbucket 807 * only in case of a failover. 808 */ 809 closeInboundStreams = true; 810 } 811 engine.getDcpConnMap().vbucketStateChanged(vbid, to, 812 closeInboundStreams); 813 } 814 815 if (to == vbucket_state_active && oldstate == vbucket_state_replica) { 816 /** 817 * Update snapshot range when vbucket goes from being a replica 818 * to active, to maintain the correct snapshot sequence numbers 819 * even in a failover scenario. 820 */ 821 vb->checkpointManager->resetSnapshotRange(); 822 } 823 824 if (to == vbucket_state_active && !transfer) { 825 const snapshot_range_t range = vb->getPersistedSnapshot(); 826 auto highSeqno = range.end == vb->getPersistenceSeqno() 827 ? range.end 828 : range.start; 829 vb->failovers->createEntry(highSeqno); 830 831 auto entry = vb->failovers->getLatestEntry(); 832 LOG(EXTENSION_LOG_NOTICE, 833 "KVBucket::setVBucketState: vb:%" PRIu16 834 " created new failover entry with " 835 "uuid:%" PRIu64 " and seqno:%" PRIu64, 836 vbid, 837 entry.vb_uuid, 838 entry.by_seqno); 839 } 840 841 if (oldstate == vbucket_state_pending && 842 to == vbucket_state_active) { 843 ExTask notifyTask = 844 std::make_shared<PendingOpsNotification>(engine, vb); 845 ExecutorPool::get()->schedule(notifyTask); 846 } 847 scheduleVBStatePersist(vbid); 848 } else if (vbid < vbMap.getSize()) { 849 auto ft = 850 std::make_unique<FailoverTable>(engine.getMaxFailoverEntries()); 851 KVShard* shard = vbMap.getShardByVbId(vbid); 852 853 VBucketPtr newvb = 854 makeVBucket(vbid, 855 to, 856 shard, 857 std::move(ft), 858 std::make_unique<NotifyNewSeqnoCB>(*this)); 859 860 newvb->setFreqSaturatedCallback( 861 [this] { this->wakeItemFreqDecayerTask(); }); 862 863 Configuration& config = engine.getConfiguration(); 864 if (config.isBfilterEnabled()) { 865 // Initialize bloom filters upon vbucket creation during 866 // bucket creation and rebalance 867 newvb->createFilter(config.getBfilterKeyCount(), 868 config.getBfilterFpProb()); 869 } 870 871 // The first checkpoint for active vbucket should start with id 2. 872 uint64_t start_chk_id = (to == vbucket_state_active) ? 2 : 0; 873 newvb->checkpointManager->setOpenCheckpointId(start_chk_id); 874 875 // Before adding the VB to the map increment the revision 876 getRWUnderlying(vbid)->incrementRevision(vbid); 877 878 // If active, update the VB from the bucket's collection state 879 if (to == vbucket_state_active) { 880 collectionsManager->update(*newvb); 881 } 882 883 if (vbMap.addBucket(newvb) == ENGINE_ERANGE) { 884 return ENGINE_ERANGE; 885 } 886 // When the VBucket is constructed we initialize 887 // persistenceSeqno(0) && persistenceCheckpointId(0) 888 newvb->setBucketCreation(true); 889 scheduleVBStatePersist(vbid); 890 } else { 891 return ENGINE_ERANGE; 892 } 893 return ENGINE_SUCCESS; 894} 895 896void KVBucket::scheduleVBStatePersist() { 897 for (auto vbid : vbMap.getBuckets()) { 898 scheduleVBStatePersist(vbid); 899 } 900} 901 902void KVBucket::scheduleVBStatePersist(VBucket::id_type vbid) { 903 VBucketPtr vb = getVBucket(vbid); 904 905 if (!vb) { 906 LOG(EXTENSION_LOG_WARNING, 907 "EPStore::scheduleVBStatePersist: vb:%" PRIu16 908 " does not not exist. Unable to schedule persistence.", vbid); 909 return; 910 } 911 912 vb->checkpointManager->queueSetVBState(*vb); 913} 914 915ENGINE_ERROR_CODE KVBucket::deleteVBucket(uint16_t vbid, const void* c) { 916 // Lock to prevent a race condition between a failed update and add 917 // (and delete). 918 VBucketPtr vb = vbMap.getBucket(vbid); 919 if (!vb) { 920 return ENGINE_NOT_MY_VBUCKET; 921 } 922 923 { 924 std::unique_lock<std::mutex> vbSetLh(vbsetMutex); 925 // Obtain a locked VBucket to ensure we interlock with other 926 // threads that are manipulating the VB (particularly ones which may 927 // try and change the disk revision e.g. deleteAll and compaction). 928 auto lockedVB = getLockedVBucket(vbid); 929 vbMap.decVBStateCount(lockedVB->getState()); 930 lockedVB->setState(vbucket_state_dead); 931 engine.getDcpConnMap().vbucketStateChanged(vbid, vbucket_state_dead); 932 933 // Drop the VB to begin the delete, the last holder of the VB will 934 // unknowingly trigger the destructor which schedules a deletion task. 935 vbMap.dropVBucketAndSetupDeferredDeletion(vbid, c); 936 } 937 938 if (c) { 939 return ENGINE_EWOULDBLOCK; 940 } 941 return ENGINE_SUCCESS; 942} 943 944ENGINE_ERROR_CODE KVBucket::checkForDBExistence(DBFileId db_file_id) { 945 std::string backend = engine.getConfiguration().getBackend(); 946 if (backend.compare("couchdb") == 0) { 947 VBucketPtr vb = vbMap.getBucket(db_file_id); 948 if (!vb) { 949 return ENGINE_NOT_MY_VBUCKET; 950 } 951 } else { 952 LOG(EXTENSION_LOG_WARNING, 953 "Unknown backend specified for db file id: %d", db_file_id); 954 return ENGINE_FAILED; 955 } 956 957 return ENGINE_SUCCESS; 958} 959 960uint16_t KVBucket::getDBFileId(const protocol_binary_request_compact_db& req) { 961 KVStore *store = vbMap.shards[0]->getROUnderlying(); 962 return store->getDBFileId(req); 963} 964 965bool KVBucket::resetVBucket(uint16_t vbid) { 966 std::unique_lock<std::mutex> vbsetLock(vbsetMutex); 967 // Obtain a locked VBucket to ensure we interlock with other 968 // threads that are manipulating the VB (particularly ones which may 969 // try and change the disk revision). 970 auto lockedVB = getLockedVBucket(vbid); 971 return resetVBucket_UNLOCKED(lockedVB, vbsetLock); 972} 973 974bool KVBucket::resetVBucket_UNLOCKED(LockedVBucketPtr& vb, 975 std::unique_lock<std::mutex>& vbset) { 976 bool rv(false); 977 978 if (vb) { 979 vbucket_state_t vbstate = vb->getState(); 980 981 vbMap.dropVBucketAndSetupDeferredDeletion(vb->getId(), 982 nullptr /*no cookie*/); 983 984 checkpointCursorInfoList cursors = 985 vb->checkpointManager->getAllCursors(); 986 // Delete and recreate the vbucket database file 987 setVBucketState_UNLOCKED(vb->getId(), 988 vbstate, 989 false /*transfer*/, 990 true /*notifyDcp*/, 991 vbset); 992 993 // Copy the all cursors from the old vbucket into the new vbucket 994 VBucketPtr newvb = vbMap.getBucket(vb->getId()); 995 newvb->checkpointManager->resetCursors(cursors); 996 997 rv = true; 998 } 999 return rv; 1000} 1001 1002extern "C" { 1003 1004struct snapshot_stats_t : cb::tracing::Traceable { 1005 EventuallyPersistentEngine* engine; 1006 std::map<std::string, std::string> smap; 1007}; 1008 1009static void add_stat(const char* key, 1010 const uint16_t klen, 1011 const char* val, 1012 const uint32_t vlen, 1013 gsl::not_null<const void*> cookie) { 1014 void* ptr = const_cast<void*>(cookie.get()); 1015 snapshot_stats_t* snap = static_cast<snapshot_stats_t*>(ptr); 1016 ObjectRegistry::onSwitchThread(snap->engine); 1017 std::string k(key, klen); 1018 std::string v(val, vlen); 1019 snap->smap.insert(std::pair<std::string, std::string>(k, v)); 1020 } 1021} 1022 1023void KVBucket::snapshotStats() { 1024 snapshot_stats_t snap; 1025 snap.engine = &engine; 1026 bool rv = engine.getStats(&snap, NULL, 0, add_stat) == ENGINE_SUCCESS && 1027 engine.getStats(&snap, "dcp", 3, add_stat) == ENGINE_SUCCESS; 1028 1029 if (rv && stats.isShutdown) { 1030 snap.smap["ep_force_shutdown"] = stats.forceShutdown ? "true" : "false"; 1031 std::stringstream ss; 1032 ss << ep_real_time(); 1033 snap.smap["ep_shutdown_time"] = ss.str(); 1034 } 1035 getOneRWUnderlying()->snapshotStats(snap.smap); 1036} 1037 1038void KVBucket::getAggregatedVBucketStats(const void* cookie, 1039 ADD_STAT add_stat) { 1040 // Create visitors for each of the four vBucket states, and collect 1041 // stats for each. 1042 auto active = makeVBCountVisitor(vbucket_state_active); 1043 auto replica = makeVBCountVisitor(vbucket_state_replica); 1044 auto pending = makeVBCountVisitor(vbucket_state_pending); 1045 auto dead = makeVBCountVisitor(vbucket_state_dead); 1046 1047 VBucketCountAggregator aggregator; 1048 aggregator.addVisitor(active.get()); 1049 aggregator.addVisitor(replica.get()); 1050 aggregator.addVisitor(pending.get()); 1051 aggregator.addVisitor(dead.get()); 1052 visit(aggregator); 1053 1054 updateCachedResidentRatio(active->getMemResidentPer(), 1055 replica->getMemResidentPer()); 1056 engine.getReplicationThrottle().adjustWriteQueueCap(active->getNumItems() + 1057 replica->getNumItems() + 1058 pending->getNumItems()); 1059 1060 // And finally actually return the stats using the ADD_STAT callback. 1061 appendAggregatedVBucketStats( 1062 *active, *replica, *pending, *dead, cookie, add_stat); 1063} 1064 1065std::unique_ptr<VBucketCountVisitor> KVBucket::makeVBCountVisitor( 1066 vbucket_state_t state) { 1067 return std::make_unique<VBucketCountVisitor>(state); 1068} 1069 1070void KVBucket::appendAggregatedVBucketStats(VBucketCountVisitor& active, 1071 VBucketCountVisitor& replica, 1072 VBucketCountVisitor& pending, 1073 VBucketCountVisitor& dead, 1074 const void* cookie, 1075 ADD_STAT add_stat) { 1076// Simplify the repetition of calling add_casted_stat with `add_stat` and 1077// cookie each time. (Note: if we had C++14 we could use a polymorphic 1078// lambda, but for now will have to stick to C++98 and macros :). 1079#define DO_STAT(k, v) \ 1080 do { \ 1081 add_casted_stat(k, v, add_stat, cookie); \ 1082 } while (0) 1083 1084 // Top-level stats: 1085 DO_STAT("ep_flush_all", isDeleteAllScheduled()); 1086 DO_STAT("curr_items", active.getNumItems()); 1087 DO_STAT("curr_temp_items", active.getNumTempItems()); 1088 DO_STAT("curr_items_tot", 1089 active.getNumItems() + replica.getNumItems() + 1090 pending.getNumItems()); 1091 1092 // Active vBuckets: 1093 DO_STAT("vb_active_backfill_queue_size", active.getBackfillQueueSize()); 1094 DO_STAT("vb_active_num", active.getVBucketNumber()); 1095 DO_STAT("vb_active_curr_items", active.getNumItems()); 1096 DO_STAT("vb_active_hp_vb_req_size", active.getNumHpVBReqs()); 1097 DO_STAT("vb_active_num_non_resident", active.getNonResident()); 1098 DO_STAT("vb_active_perc_mem_resident", active.getMemResidentPer()); 1099 DO_STAT("vb_active_eject", active.getEjects()); 1100 DO_STAT("vb_active_expired", active.getExpired()); 1101 DO_STAT("vb_active_meta_data_memory", active.getMetaDataMemory()); 1102 DO_STAT("vb_active_meta_data_disk", active.getMetaDataDisk()); 1103 DO_STAT("vb_active_checkpoint_memory", active.getCheckpointMemory()); 1104 DO_STAT("vb_active_checkpoint_memory_unreferenced", 1105 active.getCheckpointMemoryUnreferenced()); 1106 DO_STAT("vb_active_checkpoint_memory_overhead", 1107 active.getCheckpointMemoryOverhead()); 1108 DO_STAT("vb_active_ht_memory", active.getHashtableMemory()); 1109 DO_STAT("vb_active_itm_memory", active.getItemMemory()); 1110 DO_STAT("vb_active_itm_memory_uncompressed", 1111 active.getUncompressedItemMemory()); 1112 DO_STAT("vb_active_ops_create", active.getOpsCreate()); 1113 DO_STAT("vb_active_ops_update", active.getOpsUpdate()); 1114 DO_STAT("vb_active_ops_delete", active.getOpsDelete()); 1115 DO_STAT("vb_active_ops_reject", active.getOpsReject()); 1116 DO_STAT("vb_active_queue_size", active.getQueueSize()); 1117 DO_STAT("vb_active_queue_memory", active.getQueueMemory()); 1118 DO_STAT("vb_active_queue_age", active.getAge()); 1119 DO_STAT("vb_active_queue_pending", active.getPendingWrites()); 1120 DO_STAT("vb_active_queue_fill", active.getQueueFill()); 1121 DO_STAT("vb_active_queue_drain", active.getQueueDrain()); 1122 DO_STAT("vb_active_rollback_item_count", active.getRollbackItemCount()); 1123 1124 // Replica vBuckets: 1125 DO_STAT("vb_replica_backfill_queue_size", replica.getBackfillQueueSize()); 1126 DO_STAT("vb_replica_num", replica.getVBucketNumber()); 1127 DO_STAT("vb_replica_curr_items", replica.getNumItems()); 1128 DO_STAT("vb_replica_hp_vb_req_size", replica.getNumHpVBReqs()); 1129 DO_STAT("vb_replica_num_non_resident", replica.getNonResident()); 1130 DO_STAT("vb_replica_perc_mem_resident", replica.getMemResidentPer()); 1131 DO_STAT("vb_replica_eject", replica.getEjects()); 1132 DO_STAT("vb_replica_expired", replica.getExpired()); 1133 DO_STAT("vb_replica_meta_data_memory", replica.getMetaDataMemory()); 1134 DO_STAT("vb_replica_meta_data_disk", replica.getMetaDataDisk()); 1135 DO_STAT("vb_replica_checkpoint_memory", replica.getCheckpointMemory()); 1136 DO_STAT("vb_replica_checkpoint_memory_unreferenced", 1137 replica.getCheckpointMemoryUnreferenced()); 1138 DO_STAT("vb_replica_checkpoint_memory_overhead", 1139 replica.getCheckpointMemoryOverhead()); 1140 DO_STAT("vb_replica_ht_memory", replica.getHashtableMemory()); 1141 DO_STAT("vb_replica_itm_memory", replica.getItemMemory()); 1142 DO_STAT("vb_replica_itm_memory_uncompressed", 1143 replica.getUncompressedItemMemory()); 1144 DO_STAT("vb_replica_ops_create", replica.getOpsCreate()); 1145 DO_STAT("vb_replica_ops_update", replica.getOpsUpdate()); 1146 DO_STAT("vb_replica_ops_delete", replica.getOpsDelete()); 1147 DO_STAT("vb_replica_ops_reject", replica.getOpsReject()); 1148 DO_STAT("vb_replica_queue_size", replica.getQueueSize()); 1149 DO_STAT("vb_replica_queue_memory", replica.getQueueMemory()); 1150 DO_STAT("vb_replica_queue_age", replica.getAge()); 1151 DO_STAT("vb_replica_queue_pending", replica.getPendingWrites()); 1152 DO_STAT("vb_replica_queue_fill", replica.getQueueFill()); 1153 DO_STAT("vb_replica_queue_drain", replica.getQueueDrain()); 1154 DO_STAT("vb_replica_rollback_item_count", replica.getRollbackItemCount()); 1155 1156 // Pending vBuckets: 1157 DO_STAT("vb_pending_backfill_queue_size", pending.getBackfillQueueSize()); 1158 DO_STAT("vb_pending_num", pending.getVBucketNumber()); 1159 DO_STAT("vb_pending_curr_items", pending.getNumItems()); 1160 DO_STAT("vb_pending_hp_vb_req_size", pending.getNumHpVBReqs()); 1161 DO_STAT("vb_pending_num_non_resident", pending.getNonResident()); 1162 DO_STAT("vb_pending_perc_mem_resident", pending.getMemResidentPer()); 1163 DO_STAT("vb_pending_eject", pending.getEjects()); 1164 DO_STAT("vb_pending_expired", pending.getExpired()); 1165 DO_STAT("vb_pending_meta_data_memory", pending.getMetaDataMemory()); 1166 DO_STAT("vb_pending_meta_data_disk", pending.getMetaDataDisk()); 1167 DO_STAT("vb_pending_checkpoint_memory", pending.getCheckpointMemory()); 1168 DO_STAT("vb_pending_checkpoint_memory_unreferenced", 1169 pending.getCheckpointMemoryUnreferenced()); 1170 DO_STAT("vb_pending_checkpoint_memory_overhead", 1171 pending.getCheckpointMemoryOverhead()); 1172 DO_STAT("vb_pending_ht_memory", pending.getHashtableMemory()); 1173 DO_STAT("vb_pending_itm_memory", pending.getItemMemory()); 1174 DO_STAT("vb_pending_itm_memory_uncompressed", 1175 pending.getUncompressedItemMemory()); 1176 DO_STAT("vb_pending_ops_create", pending.getOpsCreate()); 1177 DO_STAT("vb_pending_ops_update", pending.getOpsUpdate()); 1178 DO_STAT("vb_pending_ops_delete", pending.getOpsDelete()); 1179 DO_STAT("vb_pending_ops_reject", pending.getOpsReject()); 1180 DO_STAT("vb_pending_queue_size", pending.getQueueSize()); 1181 DO_STAT("vb_pending_queue_memory", pending.getQueueMemory()); 1182 DO_STAT("vb_pending_queue_age", pending.getAge()); 1183 DO_STAT("vb_pending_queue_pending", pending.getPendingWrites()); 1184 DO_STAT("vb_pending_queue_fill", pending.getQueueFill()); 1185 DO_STAT("vb_pending_queue_drain", pending.getQueueDrain()); 1186 DO_STAT("vb_pending_rollback_item_count", pending.getRollbackItemCount()); 1187 1188 // Dead vBuckets: 1189 DO_STAT("vb_dead_num", dead.getVBucketNumber()); 1190 1191 // Totals: 1192 DO_STAT("ep_vb_total", 1193 active.getVBucketNumber() + replica.getVBucketNumber() + 1194 pending.getVBucketNumber() + dead.getVBucketNumber()); 1195 DO_STAT("ep_total_new_items", 1196 active.getOpsCreate() + replica.getOpsCreate() + 1197 pending.getOpsCreate()); 1198 DO_STAT("ep_total_del_items", 1199 active.getOpsDelete() + replica.getOpsDelete() + 1200 pending.getOpsDelete()); 1201 DO_STAT("ep_diskqueue_memory", 1202 active.getQueueMemory() + replica.getQueueMemory() + 1203 pending.getQueueMemory()); 1204 DO_STAT("ep_diskqueue_fill", 1205 active.getQueueFill() + replica.getQueueFill() + 1206 pending.getQueueFill()); 1207 DO_STAT("ep_diskqueue_drain", 1208 active.getQueueDrain() + replica.getQueueDrain() + 1209 pending.getQueueDrain()); 1210 DO_STAT("ep_diskqueue_pending", 1211 active.getPendingWrites() + replica.getPendingWrites() + 1212 pending.getPendingWrites()); 1213 DO_STAT("ep_meta_data_memory", 1214 active.getMetaDataMemory() + replica.getMetaDataMemory() + 1215 pending.getMetaDataMemory()); 1216 DO_STAT("ep_meta_data_disk", 1217 active.getMetaDataDisk() + replica.getMetaDataDisk() + 1218 pending.getMetaDataDisk()); 1219 DO_STAT("ep_checkpoint_memory", 1220 active.getCheckpointMemory() + replica.getCheckpointMemory() + 1221 pending.getCheckpointMemory()); 1222 DO_STAT("ep_checkpoint_memory_unreferenced", 1223 active.getCheckpointMemoryUnreferenced() + 1224 replica.getCheckpointMemoryUnreferenced() + 1225 pending.getCheckpointMemoryUnreferenced()); 1226 DO_STAT("ep_checkpoint_memory_overhead", 1227 active.getCheckpointMemoryOverhead() + 1228 replica.getCheckpointMemoryOverhead() + 1229 pending.getCheckpointMemoryOverhead()); 1230 DO_STAT("ep_total_cache_size", 1231 active.getCacheSize() + replica.getCacheSize() + 1232 pending.getCacheSize()); 1233 DO_STAT("rollback_item_count", 1234 active.getRollbackItemCount() + replica.getRollbackItemCount() + 1235 pending.getRollbackItemCount()); 1236 DO_STAT("ep_num_non_resident", 1237 active.getNonResident() + pending.getNonResident() + 1238 replica.getNonResident()); 1239 DO_STAT("ep_chk_persistence_remains", 1240 active.getChkPersistRemaining() + pending.getChkPersistRemaining() + 1241 replica.getChkPersistRemaining()); 1242 1243 // Add stats for tracking HLC drift 1244 DO_STAT("ep_active_hlc_drift", active.getTotalAbsHLCDrift().total); 1245 DO_STAT("ep_active_hlc_drift_count", active.getTotalAbsHLCDrift().updates); 1246 DO_STAT("ep_replica_hlc_drift", replica.getTotalAbsHLCDrift().total); 1247 DO_STAT("ep_replica_hlc_drift_count", 1248 replica.getTotalAbsHLCDrift().updates); 1249 1250 DO_STAT("ep_active_ahead_exceptions", 1251 active.getTotalHLCDriftExceptionCounters().ahead); 1252 DO_STAT("ep_active_behind_exceptions", 1253 active.getTotalHLCDriftExceptionCounters().behind); 1254 DO_STAT("ep_replica_ahead_exceptions", 1255 replica.getTotalHLCDriftExceptionCounters().ahead); 1256 DO_STAT("ep_replica_behind_exceptions", 1257 replica.getTotalHLCDriftExceptionCounters().behind); 1258 1259 // A single total for ahead exceptions accross all active/replicas 1260 DO_STAT("ep_clock_cas_drift_threshold_exceeded", 1261 active.getTotalHLCDriftExceptionCounters().ahead + 1262 replica.getTotalHLCDriftExceptionCounters().ahead); 1263 1264 for (uint8_t ii = 0; ii < active.getNumDatatypes(); ++ii) { 1265 std::string name = "ep_active_datatype_"; 1266 name += mcbp::datatype::to_string(ii); 1267 DO_STAT(name.c_str(), active.getDatatypeCount(ii)); 1268 } 1269 1270 for (uint8_t ii = 0; ii < replica.getNumDatatypes(); ++ii) { 1271 std::string name = "ep_replica_datatype_"; 1272 name += mcbp::datatype::to_string(ii); 1273 DO_STAT(name.c_str(), replica.getDatatypeCount(ii)); 1274 } 1275 1276#undef DO_STAT 1277} 1278 1279void KVBucket::completeBGFetch(const DocKey& key, 1280 uint16_t vbucket, 1281 const void* cookie, 1282 ProcessClock::time_point init, 1283 bool isMeta) { 1284 ProcessClock::time_point startTime(ProcessClock::now()); 1285 // Go find the data 1286 GetValue gcb = getROUnderlying(vbucket)->get(key, vbucket, isMeta); 1287 1288 { 1289 // Lock to prevent a race condition between a fetch for restore and delete 1290 LockHolder lh(vbsetMutex); 1291 1292 VBucketPtr vb = getVBucket(vbucket); 1293 if (vb) { 1294 VBucketBGFetchItem item{&gcb, cookie, init, isMeta}; 1295 ENGINE_ERROR_CODE status = 1296 vb->completeBGFetchForSingleItem(key, item, startTime); 1297 engine.notifyIOComplete(item.cookie, status); 1298 } else { 1299 LOG(EXTENSION_LOG_INFO, 1300 "vb:%" PRIu16 1301 " file was deleted in the " 1302 "middle of a bg fetch for key{%.*s}", 1303 vbucket, 1304 int(key.size()), 1305 key.data()); 1306 engine.notifyIOComplete(cookie, ENGINE_NOT_MY_VBUCKET); 1307 } 1308 } 1309 1310 --stats.numRemainingBgJobs; 1311} 1312 1313void KVBucket::completeBGFetchMulti(uint16_t vbId, 1314 std::vector<bgfetched_item_t>& fetchedItems, 1315 ProcessClock::time_point startTime) { 1316 VBucketPtr vb = getVBucket(vbId); 1317 if (vb) { 1318 for (const auto& item : fetchedItems) { 1319 auto& key = item.first; 1320 auto* fetched_item = item.second; 1321 ENGINE_ERROR_CODE status = vb->completeBGFetchForSingleItem( 1322 key, *fetched_item, startTime); 1323 engine.notifyIOComplete(fetched_item->cookie, status); 1324 } 1325 LOG(EXTENSION_LOG_DEBUG, 1326 "EP Store completes %" PRIu64 1327 " of batched background fetch " 1328 "for vBucket = %d endTime = %" PRIu64, 1329 uint64_t(fetchedItems.size()), 1330 vbId, 1331 std::chrono::duration_cast<std::chrono::milliseconds>( 1332 ProcessClock::now().time_since_epoch()) 1333 .count()); 1334 } else { 1335 for (const auto& item : fetchedItems) { 1336 engine.notifyIOComplete(item.second->cookie, 1337 ENGINE_NOT_MY_VBUCKET); 1338 } 1339 LOG(EXTENSION_LOG_WARNING, 1340 "EP Store completes %d of batched background fetch for " 1341 "for vBucket = %d that is already deleted", 1342 (int)fetchedItems.size(), 1343 vbId); 1344 } 1345} 1346 1347GetValue KVBucket::getInternal(const DocKey& key, 1348 uint16_t vbucket, 1349 const void *cookie, 1350 vbucket_state_t allowedState, 1351 get_options_t options) { 1352 vbucket_state_t disallowedState = (allowedState == vbucket_state_active) ? 1353 vbucket_state_replica : vbucket_state_active; 1354 VBucketPtr vb = getVBucket(vbucket); 1355 1356 if (!vb) { 1357 ++stats.numNotMyVBuckets; 1358 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET); 1359 } 1360 1361 const bool honorStates = (options & HONOR_STATES); 1362 1363 ReaderLockHolder rlh(vb->getStateLock()); 1364 if (honorStates) { 1365 vbucket_state_t vbState = vb->getState(); 1366 if (vbState == vbucket_state_dead) { 1367 ++stats.numNotMyVBuckets; 1368 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET); 1369 } else if (vbState == disallowedState) { 1370 ++stats.numNotMyVBuckets; 1371 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET); 1372 } else if (vbState == vbucket_state_pending) { 1373 if (vb->addPendingOp(cookie)) { 1374 return GetValue(NULL, ENGINE_EWOULDBLOCK); 1375 } 1376 } 1377 } 1378 1379 { // collections read scope 1380 auto collectionsRHandle = vb->lockCollections(key); 1381 if (!collectionsRHandle.valid()) { 1382 return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION); 1383 } 1384 1385 return vb->getInternal(cookie, 1386 engine, 1387 bgFetchDelay, 1388 options, 1389 diskDeleteAll, 1390 VBucket::GetKeyOnly::No, 1391 collectionsRHandle); 1392 } 1393} 1394 1395GetValue KVBucket::getRandomKey() { 1396 VBucketMap::id_type max = vbMap.getSize(); 1397 1398 const long start = random() % max; 1399 long curr = start; 1400 std::unique_ptr<Item> itm; 1401 1402 while (itm == NULL) { 1403 VBucketPtr vb = getVBucket(curr++); 1404 while (!vb || vb->getState() != vbucket_state_active) { 1405 if (curr == start) { 1406 return GetValue(NULL, ENGINE_KEY_ENOENT); 1407 } 1408 if (curr == max) { 1409 curr = 0; 1410 } 1411 1412 vb = getVBucket(curr++); 1413 } 1414 1415 if ((itm = vb->ht.getRandomKey(random()))) { 1416 GetValue rv(std::move(itm), ENGINE_SUCCESS); 1417 return rv; 1418 } 1419 1420 if (curr == max) { 1421 curr = 0; 1422 } 1423 1424 if (curr == start) { 1425 return GetValue(NULL, ENGINE_KEY_ENOENT); 1426 } 1427 // Search next vbucket 1428 } 1429 1430 return GetValue(NULL, ENGINE_KEY_ENOENT); 1431} 1432 1433ENGINE_ERROR_CODE KVBucket::getMetaData(const DocKey& key, 1434 uint16_t vbucket, 1435 const void* cookie, 1436 ItemMetaData& metadata, 1437 uint32_t& deleted, 1438 uint8_t& datatype) 1439{ 1440 VBucketPtr vb = getVBucket(vbucket); 1441 1442 if (!vb) { 1443 ++stats.numNotMyVBuckets; 1444 return ENGINE_NOT_MY_VBUCKET; 1445 } 1446 1447 ReaderLockHolder rlh(vb->getStateLock()); 1448 if (vb->getState() == vbucket_state_dead || 1449 vb->getState() == vbucket_state_replica) { 1450 ++stats.numNotMyVBuckets; 1451 return ENGINE_NOT_MY_VBUCKET; 1452 } 1453 1454 { // collections read scope 1455 auto collectionsRHandle = vb->lockCollections(key); 1456 if (!collectionsRHandle.valid()) { 1457 return ENGINE_UNKNOWN_COLLECTION; 1458 } 1459 1460 return vb->getMetaData(cookie, 1461 engine, 1462 bgFetchDelay, 1463 collectionsRHandle, 1464 metadata, 1465 deleted, 1466 datatype); 1467 } 1468} 1469 1470ENGINE_ERROR_CODE KVBucket::setWithMeta(Item& itm, 1471 uint64_t cas, 1472 uint64_t* seqno, 1473 const void* cookie, 1474 PermittedVBStates permittedVBStates, 1475 CheckConflicts checkConflicts, 1476 bool allowExisting, 1477 GenerateBySeqno genBySeqno, 1478 GenerateCas genCas, 1479 ExtendedMetaData* emd, 1480 bool isReplication) { 1481 VBucketPtr vb = getVBucket(itm.getVBucketId()); 1482 if (!vb) { 1483 ++stats.numNotMyVBuckets; 1484 return ENGINE_NOT_MY_VBUCKET; 1485 } 1486 1487 ReaderLockHolder rlh(vb->getStateLock()); 1488 if (!permittedVBStates.test(vb->getState())) { 1489 if (vb->getState() == vbucket_state_pending) { 1490 if (vb->addPendingOp(cookie)) { 1491 return ENGINE_EWOULDBLOCK; 1492 } 1493 } else { 1494 ++stats.numNotMyVBuckets; 1495 return ENGINE_NOT_MY_VBUCKET; 1496 } 1497 } else if (vb->isTakeoverBackedUp()) { 1498 LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a setWithMeta op" 1499 ", becuase takeover is lagging", vb->getId()); 1500 return ENGINE_TMPFAIL; 1501 } 1502 1503 //check for the incoming item's CAS validity 1504 if (!Item::isValidCas(itm.getCas())) { 1505 return ENGINE_KEY_EEXISTS; 1506 } 1507 1508 ENGINE_ERROR_CODE rv = ENGINE_SUCCESS; 1509 { // collections read scope 1510 auto collectionsRHandle = vb->lockCollections(itm.getKey()); 1511 if (!collectionsRHandle.valid()) { 1512 rv = ENGINE_UNKNOWN_COLLECTION; 1513 } else { 1514 rv = vb->setWithMeta(itm, 1515 cas, 1516 seqno, 1517 cookie, 1518 engine, 1519 bgFetchDelay, 1520 checkConflicts, 1521 allowExisting, 1522 genBySeqno, 1523 genCas, 1524 isReplication, 1525 collectionsRHandle); 1526 } 1527 } 1528 1529 if (rv == ENGINE_SUCCESS) { 1530 checkAndMaybeFreeMemory(); 1531 } 1532 return rv; 1533} 1534 1535GetValue KVBucket::getAndUpdateTtl(const DocKey& key, uint16_t vbucket, 1536 const void *cookie, time_t exptime) 1537{ 1538 VBucketPtr vb = getVBucket(vbucket); 1539 if (!vb) { 1540 ++stats.numNotMyVBuckets; 1541 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET); 1542 } 1543 1544 ReaderLockHolder rlh(vb->getStateLock()); 1545 if (vb->getState() == vbucket_state_dead) { 1546 ++stats.numNotMyVBuckets; 1547 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET); 1548 } else if (vb->getState() == vbucket_state_replica) { 1549 ++stats.numNotMyVBuckets; 1550 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET); 1551 } else if (vb->getState() == vbucket_state_pending) { 1552 if (vb->addPendingOp(cookie)) { 1553 return GetValue(NULL, ENGINE_EWOULDBLOCK); 1554 } 1555 } 1556 1557 { // collections read scope 1558 auto collectionsRHandle = vb->lockCollections(key); 1559 if (!collectionsRHandle.valid()) { 1560 return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION); 1561 } 1562 1563 return vb->getAndUpdateTtl( 1564 cookie, engine, bgFetchDelay, exptime, collectionsRHandle); 1565 } 1566} 1567 1568GetValue KVBucket::getLocked(const DocKey& key, uint16_t vbucket, 1569 rel_time_t currentTime, uint32_t lockTimeout, 1570 const void *cookie) { 1571 VBucketPtr vb = getVBucket(vbucket); 1572 if (!vb || vb->getState() != vbucket_state_active) { 1573 ++stats.numNotMyVBuckets; 1574 return GetValue(NULL, ENGINE_NOT_MY_VBUCKET); 1575 } 1576 1577 { // collections read scope 1578 auto collectionsRHandle = vb->lockCollections(key); 1579 if (!collectionsRHandle.valid()) { 1580 return GetValue(NULL, ENGINE_UNKNOWN_COLLECTION); 1581 } 1582 1583 return vb->getLocked(currentTime, 1584 lockTimeout, 1585 cookie, 1586 engine, 1587 bgFetchDelay, 1588 collectionsRHandle); 1589 } 1590} 1591 1592ENGINE_ERROR_CODE KVBucket::unlockKey(const DocKey& key, 1593 uint16_t vbucket, 1594 uint64_t cas, 1595 rel_time_t currentTime) 1596{ 1597 1598 VBucketPtr vb = getVBucket(vbucket); 1599 if (!vb || vb->getState() != vbucket_state_active) { 1600 ++stats.numNotMyVBuckets; 1601 return ENGINE_NOT_MY_VBUCKET; 1602 } 1603 1604 auto collectionsRHandle = vb->lockCollections(key); 1605 if (!collectionsRHandle.valid()) { 1606 return ENGINE_UNKNOWN_COLLECTION; 1607 } 1608 1609 auto hbl = vb->ht.getLockedBucket(key); 1610 StoredValue* v = vb->fetchValidValue(hbl, 1611 key, 1612 WantsDeleted::Yes, 1613 TrackReference::Yes, 1614 QueueExpired::Yes); 1615 1616 if (v) { 1617 if (VBucket::isLogicallyNonExistent(*v, collectionsRHandle)) { 1618 vb->ht.cleanupIfTemporaryItem(hbl, *v); 1619 return ENGINE_KEY_ENOENT; 1620 } 1621 if (v->isLocked(currentTime)) { 1622 if (v->getCas() == cas) { 1623 v->unlock(); 1624 return ENGINE_SUCCESS; 1625 } 1626 return ENGINE_LOCKED_TMPFAIL; 1627 } 1628 return ENGINE_TMPFAIL; 1629 } else { 1630 if (eviction_policy == VALUE_ONLY) { 1631 return ENGINE_KEY_ENOENT; 1632 } else { 1633 // With the full eviction, an item's lock is automatically 1634 // released when the item is evicted from memory. Therefore, 1635 // we simply return ENGINE_TMPFAIL when we receive unlockKey 1636 // for an item that is not in memocy cache. Note that we don't 1637 // spawn any bg fetch job to figure out if an item actually 1638 // exists in disk or not. 1639 return ENGINE_TMPFAIL; 1640 } 1641 } 1642} 1643 1644ENGINE_ERROR_CODE KVBucket::getKeyStats(const DocKey& key, 1645 uint16_t vbucket, 1646 const void* cookie, 1647 struct key_stats& kstats, 1648 WantsDeleted wantsDeleted) { 1649 VBucketPtr vb = getVBucket(vbucket); 1650 if (!vb) { 1651 return ENGINE_NOT_MY_VBUCKET; 1652 } 1653 1654 { // collections read scope 1655 auto collectionsRHandle = vb->lockCollections(key); 1656 if (!collectionsRHandle.valid()) { 1657 return ENGINE_UNKNOWN_COLLECTION; 1658 } 1659 1660 return vb->getKeyStats(cookie, 1661 engine, 1662 bgFetchDelay, 1663 kstats, 1664 wantsDeleted, 1665 collectionsRHandle); 1666} 1667} 1668 1669std::string KVBucket::validateKey(const DocKey& key, uint16_t vbucket, 1670 Item &diskItem) { 1671 VBucketPtr vb = getVBucket(vbucket); 1672 1673 auto collectionsRHandle = vb->lockCollections(key); 1674 if (!collectionsRHandle.valid()) { 1675 return "collection_unknown"; 1676 } 1677 1678 auto hbl = vb->ht.getLockedBucket(key); 1679 StoredValue* v = vb->fetchValidValue( 1680 hbl, key, WantsDeleted::Yes, TrackReference::No, QueueExpired::Yes); 1681 1682 if (v) { 1683 if (VBucket::isLogicallyNonExistent(*v, collectionsRHandle)) { 1684 vb->ht.cleanupIfTemporaryItem(hbl, *v); 1685 return "item_deleted"; 1686 } 1687 1688 if (diskItem.getFlags() != v->getFlags()) { 1689 return "flags_mismatch"; 1690 } else if (v->isResident() && memcmp(diskItem.getData(), 1691 v->getValue()->getData(), 1692 diskItem.getNBytes())) { 1693 return "data_mismatch"; 1694 } else { 1695 return "valid"; 1696 } 1697 } else { 1698 return "item_deleted"; 1699 } 1700 1701} 1702 1703ENGINE_ERROR_CODE KVBucket::deleteItem(const DocKey& key, 1704 uint64_t& cas, 1705 uint16_t vbucket, 1706 const void* cookie, 1707 ItemMetaData* itemMeta, 1708 mutation_descr_t& mutInfo) { 1709 VBucketPtr vb = getVBucket(vbucket); 1710 if (!vb || vb->getState() == vbucket_state_dead) { 1711 ++stats.numNotMyVBuckets; 1712 return ENGINE_NOT_MY_VBUCKET; 1713 } else if (vb->getState() == vbucket_state_replica) { 1714 ++stats.numNotMyVBuckets; 1715 return ENGINE_NOT_MY_VBUCKET; 1716 } else if (vb->getState() == vbucket_state_pending) { 1717 if (vb->addPendingOp(cookie)) { 1718 return ENGINE_EWOULDBLOCK; 1719 } 1720 } else if (vb->isTakeoverBackedUp()) { 1721 LOG(EXTENSION_LOG_DEBUG, "(vb %u) Returned TMPFAIL to a delete op" 1722 ", becuase takeover is lagging", vb->getId()); 1723 return ENGINE_TMPFAIL; 1724 } 1725 { // collections read scope 1726 auto collectionsRHandle = vb->lockCollections(key); 1727 if (!collectionsRHandle.valid()) { 1728 return ENGINE_UNKNOWN_COLLECTION; 1729 } 1730 1731 return vb->deleteItem(cas, 1732 cookie, 1733 engine, 1734 bgFetchDelay, 1735 itemMeta, 1736 mutInfo, 1737 collectionsRHandle); 1738 } 1739} 1740 1741ENGINE_ERROR_CODE KVBucket::deleteWithMeta(const DocKey& key, 1742 uint64_t& cas, 1743 uint64_t* seqno, 1744 uint16_t vbucket, 1745 const void* cookie, 1746 PermittedVBStates permittedVBStates, 1747 CheckConflicts checkConflicts, 1748 const ItemMetaData& itemMeta, 1749 bool backfill, 1750 GenerateBySeqno genBySeqno, 1751 GenerateCas generateCas, 1752 uint64_t bySeqno, 1753 ExtendedMetaData* emd, 1754 bool isReplication) { 1755 VBucketPtr vb = getVBucket(vbucket); 1756 1757 if (!vb) { 1758 ++stats.numNotMyVBuckets; 1759 return ENGINE_NOT_MY_VBUCKET; 1760 } 1761 1762 ReaderLockHolder rlh(vb->getStateLock()); 1763 if (!permittedVBStates.test(vb->getState())) { 1764 if (vb->getState() == vbucket_state_pending) { 1765 if (vb->addPendingOp(cookie)) { 1766 return ENGINE_EWOULDBLOCK; 1767 } 1768 } else { 1769 ++stats.numNotMyVBuckets; 1770 return ENGINE_NOT_MY_VBUCKET; 1771 } 1772 } else if (vb->isTakeoverBackedUp()) { 1773 LOG(EXTENSION_LOG_DEBUG, 1774 "(vb %u) Returned TMPFAIL to a deleteWithMeta op" 1775 ", becuase takeover is lagging", 1776 vb->getId()); 1777 return ENGINE_TMPFAIL; 1778 } 1779 1780 //check for the incoming item's CAS validity 1781 if (!Item::isValidCas(itemMeta.cas)) { 1782 return ENGINE_KEY_EEXISTS; 1783 } 1784 1785 { // collections read scope 1786 auto collectionsRHandle = vb->lockCollections(key); 1787 if (!collectionsRHandle.valid()) { 1788 return ENGINE_UNKNOWN_COLLECTION; 1789 } 1790 1791 return vb->deleteWithMeta(cas, 1792 seqno, 1793 cookie, 1794 engine, 1795 bgFetchDelay, 1796 checkConflicts, 1797 itemMeta, 1798 backfill, 1799 genBySeqno, 1800 generateCas, 1801 bySeqno, 1802 isReplication, 1803 collectionsRHandle); 1804 } 1805} 1806 1807void KVBucket::reset() { 1808 auto buckets = vbMap.getBuckets(); 1809 for (auto vbid : buckets) { 1810 auto vb = getLockedVBucket(vbid); 1811 if (vb) { 1812 vb->ht.clear(); 1813 vb->checkpointManager->clear(vb->getState()); 1814 vb->resetStats(); 1815 vb->setPersistedSnapshot(0, 0); 1816 LOG(EXTENSION_LOG_NOTICE, 1817 "KVBucket::reset(): Successfully flushed vb:%" PRIu16, 1818 vbid); 1819 } 1820 } 1821 LOG(EXTENSION_LOG_NOTICE, "KVBucket::reset(): Successfully flushed bucket"); 1822} 1823 1824void KVBucket::setDeleteAllComplete() { 1825 // Notify memcached about delete all task completion, and 1826 // set diskFlushall flag to false 1827 if (deleteAllTaskCtx.cookie) { 1828 engine.notifyIOComplete(deleteAllTaskCtx.cookie, ENGINE_SUCCESS); 1829 } 1830 bool inverse = false; 1831 deleteAllTaskCtx.delay.compare_exchange_strong(inverse, true); 1832 inverse = true; 1833 diskDeleteAll.compare_exchange_strong(inverse, false); 1834} 1835 1836std::vector<vbucket_state *> KVBucket::loadVBucketState() 1837{ 1838 return getOneROUnderlying()->listPersistedVbuckets(); 1839} 1840 1841void KVBucket::warmupCompleted() { 1842 // Snapshot VBucket state after warmup to ensure Failover table is 1843 // persisted. 1844 scheduleVBStatePersist(); 1845 1846 if (engine.getConfiguration().getAlogPath().length() > 0) { 1847 1848 if (engine.getConfiguration().isAccessScannerEnabled()) { 1849 { 1850 LockHolder lh(accessScanner.mutex); 1851 accessScanner.enabled = true; 1852 } 1853 LOG(EXTENSION_LOG_NOTICE, "Access Scanner task enabled"); 1854 size_t smin = engine.getConfiguration().getAlogSleepTime(); 1855 setAccessScannerSleeptime(smin, true); 1856 } else { 1857 LockHolder lh(accessScanner.mutex); 1858 accessScanner.enabled = false; 1859 LOG(EXTENSION_LOG_NOTICE, "Access Scanner task disabled"); 1860 } 1861 1862 Configuration &config = engine.getConfiguration(); 1863 config.addValueChangedListener( 1864 "access_scanner_enabled", 1865 std::make_unique<EPStoreValueChangeListener>(*this)); 1866 config.addValueChangedListener( 1867 "alog_sleep_time", 1868 std::make_unique<EPStoreValueChangeListener>(*this)); 1869 config.addValueChangedListener( 1870 "alog_task_time", 1871 std::make_unique<EPStoreValueChangeListener>(*this)); 1872 } 1873 1874 // "0" sleep_time means that the first snapshot task will be executed 1875 // right after warmup. Subsequent snapshot tasks will be scheduled every 1876 // 60 sec by default. 1877 ExecutorPool *iom = ExecutorPool::get(); 1878 ExTask task = std::make_shared<StatSnap>(&engine, 0, false); 1879 statsSnapshotTaskId = iom->schedule(task); 1880} 1881 1882bool KVBucket::maybeEnableTraffic() 1883{ 1884 // @todo rename.. skal vaere isTrafficDisabled elns 1885 double memoryUsed = 1886 static_cast<double>(stats.getEstimatedTotalMemoryUsed()); 1887 double maxSize = static_cast<double>(stats.getMaxDataSize()); 1888 1889 if (memoryUsed >= stats.mem_low_wat) { 1890 LOG(EXTENSION_LOG_NOTICE, 1891 "Total memory use reached to the low water mark, stop warmup" 1892 ": memoryUsed (%f) >= low water mark (%" PRIu64 ")", 1893 memoryUsed, uint64_t(stats.mem_low_wat.load())); 1894 return true; 1895 } else if (memoryUsed > (maxSize * stats.warmupMemUsedCap)) { 1896 LOG(EXTENSION_LOG_NOTICE, 1897 "Enough MB of data loaded to enable traffic" 1898 ": memoryUsed (%f) > (maxSize(%f) * warmupMemUsedCap(%f))", 1899 memoryUsed, maxSize, stats.warmupMemUsedCap.load()); 1900 return true; 1901 } else if (eviction_policy == VALUE_ONLY && 1902 stats.warmedUpValues >= 1903 (stats.warmedUpKeys * stats.warmupNumReadCap)) { 1904 // Let ep-engine think we're done with the warmup phase 1905 // (we should refactor this into "enableTraffic") 1906 LOG(EXTENSION_LOG_NOTICE, 1907 "Enough number of items loaded to enable traffic (value eviction)" 1908 ": warmedUpValues(%" PRIu64 ") >= (warmedUpKeys(%" PRIu64 ") * " 1909 "warmupNumReadCap(%f))", uint64_t(stats.warmedUpValues.load()), 1910 uint64_t(stats.warmedUpKeys.load()), stats.warmupNumReadCap.load()); 1911 return true; 1912 } else if (eviction_policy == FULL_EVICTION && 1913 stats.warmedUpValues >= 1914 (warmupTask->getEstimatedItemCount() * 1915 stats.warmupNumReadCap)) { 1916 // In case of FULL EVICTION, warmed up keys always matches the number 1917 // of warmed up values, therefore for honoring the min_item threshold 1918 // in this scenario, we can consider warmup's estimated item count. 1919 LOG(EXTENSION_LOG_NOTICE, 1920 "Enough number of items loaded to enable traffic (full eviction)" 1921 ": warmedUpValues(%" PRIu64 ") >= (warmup est items(%" PRIu64 ") * " 1922 "warmupNumReadCap(%f))", uint64_t(stats.warmedUpValues.load()), 1923 uint64_t(warmupTask->getEstimatedItemCount()), 1924 stats.warmupNumReadCap.load()); 1925 return true; 1926 } 1927 return false; 1928} 1929 1930bool KVBucket::isWarmingUp() { 1931 return warmupTask && !warmupTask->isComplete(); 1932} 1933 1934bool KVBucket::shouldSetVBStateBlock(const void* cookie) { 1935 if (warmupTask) { 1936 return warmupTask->shouldSetVBStateBlock(cookie); 1937 } 1938 return false; 1939} 1940 1941bool KVBucket::isWarmupOOMFailure() { 1942 return warmupTask && warmupTask->hasOOMFailure(); 1943} 1944 1945void KVBucket::stopWarmup(void) 1946{ 1947 // forcefully stop current warmup task 1948 if (isWarmingUp()) { 1949 LOG(EXTENSION_LOG_NOTICE, 1950 "Stopping warmup while engine is loading " 1951 "data from underlying storage, shutdown = %s", 1952 stats.isShutdown ? "yes" : "no"); 1953 warmupTask->stop(); 1954 } 1955} 1956 1957bool KVBucket::isMemoryUsageTooHigh() { 1958 double memoryUsed = 1959 static_cast<double>(stats.getEstimatedTotalMemoryUsed()); 1960 double maxSize = static_cast<double>(stats.getMaxDataSize()); 1961 return memoryUsed > (maxSize * backfillMemoryThreshold); 1962} 1963 1964// Trigger memory reduction (ItemPager) if we've exceeded high water 1965void KVBucket::checkAndMaybeFreeMemory() { 1966 if (stats.getEstimatedTotalMemoryUsed() > stats.mem_high_wat) { 1967 attemptToFreeMemory(); 1968 } 1969} 1970 1971void KVBucket::setBackfillMemoryThreshold(double threshold) { 1972 backfillMemoryThreshold = threshold; 1973} 1974 1975void KVBucket::setExpiryPagerSleeptime(size_t val) { 1976 LockHolder lh(expiryPager.mutex); 1977 1978 ExecutorPool::get()->cancel(expiryPager.task); 1979 1980 expiryPager.sleeptime = val; 1981 if (expiryPager.enabled) { 1982 ExTask expTask = std::make_shared<ExpiredItemPager>( 1983 &engine, stats, expiryPager.sleeptime); 1984 expiryPager.task = ExecutorPool::get()->schedule(expTask); 1985 } else { 1986 LOG(EXTENSION_LOG_DEBUG, "Expiry pager disabled, " 1987 "enabling it will make exp_pager_stime (%lu)" 1988 "to go into effect!", val); 1989 } 1990} 1991 1992void KVBucket::setExpiryPagerTasktime(ssize_t val) { 1993 LockHolder lh(expiryPager.mutex); 1994 if (expiryPager.enabled) { 1995 ExecutorPool::get()->cancel(expiryPager.task); 1996 ExTask expTask = std::make_shared<ExpiredItemPager>( 1997 &engine, stats, expiryPager.sleeptime, val); 1998 expiryPager.task = ExecutorPool::get()->schedule(expTask); 1999 } else { 2000 LOG(EXTENSION_LOG_DEBUG, "Expiry pager disabled, " 2001 "enabling it will make exp_pager_stime (%lu)" 2002 "to go into effect!", val); 2003 } 2004} 2005 2006void KVBucket::enableExpiryPager() { 2007 LockHolder lh(expiryPager.mutex); 2008 if (!expiryPager.enabled) { 2009 expiryPager.enabled = true; 2010 2011 ExecutorPool::get()->cancel(expiryPager.task); 2012 ExTask expTask = std::make_shared<ExpiredItemPager>( 2013 &engine, stats, expiryPager.sleeptime); 2014 expiryPager.task = ExecutorPool::get()->schedule(expTask); 2015 } else { 2016 LOG(EXTENSION_LOG_DEBUG, "Expiry Pager already enabled!"); 2017 } 2018} 2019 2020void KVBucket::disableExpiryPager() { 2021 LockHolder lh(expiryPager.mutex); 2022 if (expiryPager.enabled) { 2023 ExecutorPool::get()->cancel(expiryPager.task); 2024 expiryPager.enabled = false; 2025 } else { 2026 LOG(EXTENSION_LOG_DEBUG, "Expiry Pager already disabled!"); 2027 } 2028} 2029 2030void KVBucket::wakeUpExpiryPager() { 2031 LockHolder lh(expiryPager.mutex); 2032 if (expiryPager.enabled) { 2033 ExecutorPool::get()->wake(expiryPager.task); 2034 } 2035} 2036 2037void KVBucket::wakeItemPager() { 2038 if (itemPagerTask->getState() == TASK_SNOOZED) { 2039 ExecutorPool::get()->wake(itemPagerTask->getId()); 2040 } 2041} 2042 2043void KVBucket::enableItemPager() { 2044 ExecutorPool::get()->cancel(itemPagerTask->getId()); 2045 ExecutorPool::get()->schedule(itemPagerTask); 2046} 2047 2048void KVBucket::disableItemPager() { 2049 ExecutorPool::get()->cancel(itemPagerTask->getId()); 2050} 2051 2052void KVBucket::wakeItemFreqDecayerTask() { 2053 auto& t = dynamic_cast<ItemFreqDecayerTask&>(*itemFreqDecayerTask); 2054 t.wakeup(); 2055} 2056 2057void KVBucket::enableAccessScannerTask() { 2058 LockHolder lh(accessScanner.mutex); 2059 if (!accessScanner.enabled) { 2060 accessScanner.enabled = true; 2061 2062 if (accessScanner.sleeptime != 0) { 2063 ExecutorPool::get()->cancel(accessScanner.task); 2064 } 2065 2066 size_t alogSleepTime = engine.getConfiguration().getAlogSleepTime(); 2067 accessScanner.sleeptime = alogSleepTime * 60; 2068 if (accessScanner.sleeptime != 0) { 2069 ExTask task = 2070 std::make_shared<AccessScanner>(*this, 2071 engine.getConfiguration(), 2072 stats, 2073 accessScanner.sleeptime, 2074 true); 2075 accessScanner.task = ExecutorPool::get()->schedule(task); 2076 } else { 2077 LOG(EXTENSION_LOG_NOTICE, "Did not enable access scanner task, " 2078 "as alog_sleep_time is set to zero!"); 2079 } 2080 } else { 2081 LOG(EXTENSION_LOG_DEBUG, "Access scanner already enabled!"); 2082 } 2083} 2084 2085void KVBucket::disableAccessScannerTask() { 2086 LockHolder lh(accessScanner.mutex); 2087 if (accessScanner.enabled) { 2088 ExecutorPool::get()->cancel(accessScanner.task); 2089 accessScanner.sleeptime = 0; 2090 accessScanner.enabled = false; 2091 } else { 2092 LOG(EXTENSION_LOG_DEBUG, "Access scanner already disabled!"); 2093 } 2094} 2095 2096void KVBucket::setAccessScannerSleeptime(size_t val, bool useStartTime) { 2097 LockHolder lh(accessScanner.mutex); 2098 2099 if (accessScanner.enabled) { 2100 if (accessScanner.sleeptime != 0) { 2101 ExecutorPool::get()->cancel(accessScanner.task); 2102 } 2103 2104 // store sleeptime in seconds 2105 accessScanner.sleeptime = val * 60; 2106 if (accessScanner.sleeptime != 0) { 2107 ExTask task = 2108 std::make_shared<AccessScanner>(*this, 2109 engine.getConfiguration(), 2110 stats, 2111 accessScanner.sleeptime, 2112 useStartTime); 2113 accessScanner.task = ExecutorPool::get()->schedule(task); 2114 } 2115 } 2116} 2117 2118void KVBucket::resetAccessScannerStartTime() { 2119 LockHolder lh(accessScanner.mutex); 2120 2121 if (accessScanner.enabled) { 2122 if (accessScanner.sleeptime != 0) { 2123 ExecutorPool::get()->cancel(accessScanner.task); 2124 // re-schedule task according to the new task start hour 2125 ExTask task = 2126 std::make_shared<AccessScanner>(*this, 2127 engine.getConfiguration(), 2128 stats, 2129 accessScanner.sleeptime, 2130 true); 2131 accessScanner.task = ExecutorPool::get()->schedule(task); 2132 } 2133 } 2134} 2135 2136void KVBucket::setAllBloomFilters(bool to) { 2137 for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); vbid++) { 2138 VBucketPtr vb = vbMap.getBucket(vbid); 2139 if (vb) { 2140 if (to) { 2141 vb->setFilterStatus(BFILTER_ENABLED); 2142 } else { 2143 vb->setFilterStatus(BFILTER_DISABLED); 2144 } 2145 } 2146 } 2147} 2148 2149void KVBucket::visit(VBucketVisitor &visitor) 2150{ 2151 for (VBucketMap::id_type vbid = 0; vbid < vbMap.getSize(); ++vbid) { 2152 VBucketPtr vb = vbMap.getBucket(vbid); 2153 if (vb) { 2154 visitor.visitBucket(vb); 2155 } 2156 } 2157 visitor.complete(); 2158} 2159 2160size_t KVBucket::visit(std::unique_ptr<VBucketVisitor> visitor, 2161 const char* lbl, 2162 TaskId id, 2163 double sleepTime, 2164 std::chrono::microseconds maxExpectedDuration) { 2165 auto task = std::make_shared<VBCBAdaptor>( 2166 this, id, std::move(visitor), lbl, sleepTime, /*shutdown*/ false); 2167 task->setMaxExpectedDuration(maxExpectedDuration); 2168 return ExecutorPool::get()->schedule(task); 2169} 2170 2171KVBucket::Position KVBucket::pauseResumeVisit(PauseResumeVBVisitor& visitor, 2172 Position& start_pos) { 2173 uint16_t vbid = start_pos.vbucket_id; 2174 for (; vbid < vbMap.getSize(); ++vbid) { 2175 VBucketPtr vb = vbMap.getBucket(vbid); 2176 if (vb) { 2177 bool paused = !visitor.visit(*vb); 2178 if (paused) { 2179 break; 2180 } 2181 } 2182 } 2183 2184 return KVBucket::Position(vbid); 2185} 2186 2187KVBucket::Position KVBucket::startPosition() const 2188{ 2189 return KVBucket::Position(0); 2190} 2191 2192KVBucket::Position KVBucket::endPosition() const 2193{ 2194 return KVBucket::Position(vbMap.getSize()); 2195} 2196 2197VBCBAdaptor::VBCBAdaptor(KVBucket* s, 2198 TaskId id, 2199 std::unique_ptr<VBucketVisitor> v, 2200 const char* l, 2201 double sleep, 2202 bool shutdown) 2203 : GlobalTask(&s->getEPEngine(), id, 0, shutdown), 2204 store(s), 2205 visitor(std::move(v)), 2206 label(l), 2207 sleepTime(sleep), 2208 maxDuration(std::chrono::microseconds::max()), 2209 currentvb(0) { 2210 const VBucketFilter& vbFilter = visitor->getVBucketFilter(); 2211 for (auto vbid : store->getVBuckets().getBuckets()) { 2212 if (vbFilter(vbid)) { 2213 vbList.push(vbid); 2214 } 2215 } 2216} 2217 2218std::string VBCBAdaptor::getDescription() { 2219 return std::string(label) + " on vb " + std::to_string(currentvb.load()); 2220} 2221 2222bool VBCBAdaptor::run(void) { 2223 if (!vbList.empty()) { 2224 currentvb.store(vbList.front()); 2225 VBucketPtr vb = store->getVBucket(currentvb); 2226 if (vb) { 2227 if (visitor->pauseVisitor()) { 2228 snooze(sleepTime); 2229 return true; 2230 } 2231 visitor->visitBucket(vb); 2232 } 2233 vbList.pop(); 2234 } 2235 2236 bool isdone = vbList.empty(); 2237 if (isdone) { 2238 visitor->complete(); 2239 } 2240 return !isdone; 2241} 2242 2243void KVBucket::resetUnderlyingStats(void) 2244{ 2245 for (size_t i = 0; i < vbMap.shards.size(); i++) { 2246 KVShard *shard = vbMap.shards[i].get(); 2247 shard->getRWUnderlying()->resetStats(); 2248 shard->getROUnderlying()->resetStats(); 2249 } 2250 2251 for (size_t i = 0; i < GlobalTask::allTaskIds.size(); i++) { 2252 stats.schedulingHisto[i].reset(); 2253 stats.taskRuntimeHisto[i].reset(); 2254 } 2255} 2256 2257void KVBucket::addKVStoreStats(ADD_STAT add_stat, const void* cookie) { 2258 for (size_t i = 0; i < vbMap.shards.size(); i++) { 2259 /* Add the different KVStore instances into a set and then 2260 * retrieve the stats from each instance separately. This 2261 * is because CouchKVStore has separate read only and read 2262 * write instance whereas RocksDBKVStore has only instance 2263 * for both read write and read-only. 2264 */ 2265 std::set<KVStore *> underlyingSet; 2266 underlyingSet.insert(vbMap.shards[i]->getRWUnderlying()); 2267 underlyingSet.insert(vbMap.shards[i]->getROUnderlying()); 2268 2269 for (auto* store : underlyingSet) { 2270 store->addStats(add_stat, cookie); 2271 } 2272 } 2273} 2274 2275void KVBucket::addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie) { 2276 for (size_t i = 0; i < vbMap.shards.size(); i++) { 2277 std::set<KVStore*> underlyingSet; 2278 underlyingSet.insert(vbMap.shards[i]->getRWUnderlying()); 2279 underlyingSet.insert(vbMap.shards[i]->getROUnderlying()); 2280 2281 for (auto* store : underlyingSet) { 2282 store->addTimingStats(add_stat, cookie); 2283 } 2284 } 2285} 2286 2287bool KVBucket::getKVStoreStat(const char* name, size_t& value, KVSOption option) 2288{ 2289 value = 0; 2290 bool success = true; 2291 for (const auto& shard : vbMap.shards) { 2292 size_t per_shard_value; 2293 2294 if (option == KVSOption::RO || option == KVSOption::BOTH) { 2295 success &= shard->getROUnderlying()->getStat(name, per_shard_value); 2296 value += per_shard_value; 2297 } 2298 2299 if (option == KVSOption::RW || option == KVSOption::BOTH) { 2300 success &= shard->getRWUnderlying()->getStat(name, per_shard_value); 2301 value += per_shard_value; 2302 } 2303 } 2304 return success; 2305} 2306 2307KVStore *KVBucket::getOneROUnderlying(void) { 2308 return vbMap.shards[EP_PRIMARY_SHARD]->getROUnderlying(); 2309} 2310 2311KVStore *KVBucket::getOneRWUnderlying(void) { 2312 return vbMap.shards[EP_PRIMARY_SHARD]->getRWUnderlying(); 2313} 2314 2315TaskStatus KVBucket::rollback(uint16_t vbid, uint64_t rollbackSeqno) { 2316 std::unique_lock<std::mutex> vbset(vbsetMutex); 2317 2318 auto vb = getLockedVBucket(vbid, std::try_to_lock); 2319 2320 if (!vb.owns_lock()) { 2321 return TaskStatus::Reschedule; // Reschedule a vbucket rollback task. 2322 } 2323 2324 if (!vb.getVB()) { 2325 LOG(EXTENSION_LOG_WARNING, 2326 "vb:%" PRIu16 " Aborting rollback as the vbucket was not found", 2327 vbid); 2328 return TaskStatus::Abort; 2329 } 2330 2331 ReaderLockHolder rlh(vb->getStateLock()); 2332 if ((vb->getState() == vbucket_state_replica) || 2333 (vb->getState() == vbucket_state_pending)) { 2334 uint64_t prevHighSeqno = 2335 static_cast<uint64_t>(vb->checkpointManager->getHighSeqno()); 2336 if (rollbackSeqno != 0) { 2337 RollbackResult result = doRollback(vbid, rollbackSeqno); 2338 2339 if (result.success /* not suceess hence reset vbucket to 2340 avoid data loss */ 2341 && 2342 (result.highSeqno > 0) /* if 0, reset vbucket for a clean start 2343 instead of deleting everything in it 2344 */) { 2345 rollbackUnpersistedItems(*vb, result.highSeqno); 2346 vb->postProcessRollback(result, prevHighSeqno); 2347 engine.getDcpConnMap().closeStreamsDueToRollback(vbid); 2348 return TaskStatus::Complete; 2349 } 2350 } 2351 2352 if (resetVBucket_UNLOCKED(vb, vbset)) { 2353 VBucketPtr newVb = vbMap.getBucket(vbid); 2354 newVb->incrRollbackItemCount(prevHighSeqno); 2355 engine.getDcpConnMap().closeStreamsDueToRollback(vbid); 2356 return TaskStatus::Complete; 2357 } 2358 LOG(EXTENSION_LOG_WARNING, 2359 "vb:%" PRIu16 " Aborting rollback as reset of the vbucket failed", 2360 vbid); 2361 return TaskStatus::Abort; 2362 } else { 2363 LOG(EXTENSION_LOG_WARNING, 2364 "vb:%" PRIu16 " Rollback not supported on the vbucket state %s", 2365 vbid, 2366 VBucket::toString(vb->getState())); 2367 return TaskStatus::Abort; 2368 } 2369} 2370 2371void KVBucket::attemptToFreeMemory() { 2372 static_cast<ItemPager*>(itemPagerTask.get())->scheduleNow(); 2373} 2374 2375void KVBucket::runDefragmenterTask() { 2376 defragmenterTask->run(); 2377} 2378 2379void KVBucket::runItemFreqDecayerTask() { 2380 itemFreqDecayerTask->run(); 2381} 2382 2383bool KVBucket::runAccessScannerTask() { 2384 return ExecutorPool::get()->wake(accessScanner.task); 2385} 2386 2387void KVBucket::runVbStatePersistTask(int vbid) { 2388 scheduleVBStatePersist(vbid); 2389} 2390 2391bool KVBucket::compactionCanExpireItems() { 2392 // Process expired items only if memory usage is lesser than 2393 // compaction_exp_mem_threshold and disk queue is small 2394 // enough (marked by replication_throttle_queue_cap) 2395 2396 bool isMemoryUsageOk = 2397 (stats.getEstimatedTotalMemoryUsed() < 2398 (stats.getMaxDataSize() * compactionExpMemThreshold)); 2399 2400 size_t queueSize = stats.diskQueueSize.load(); 2401 bool isQueueSizeOk = 2402 ((stats.replicationThrottleWriteQueueCap == -1) || 2403 (queueSize < 2404 static_cast<size_t>(stats.replicationThrottleWriteQueueCap))); 2405 2406 return (isMemoryUsageOk && isQueueSizeOk); 2407} 2408 2409void KVBucket::setCursorDroppingLowerUpperThresholds(size_t maxSize) { 2410 Configuration &config = engine.getConfiguration(); 2411 stats.cursorDroppingLThreshold.store(static_cast<size_t>(maxSize * 2412 ((double)(config.getCursorDroppingLowerMark()) / 100))); 2413 stats.cursorDroppingUThreshold.store(static_cast<size_t>(maxSize * 2414 ((double)(config.getCursorDroppingUpperMark()) / 100))); 2415} 2416 2417size_t KVBucket::getActiveResidentRatio() const { 2418 return cachedResidentRatio.activeRatio.load(); 2419} 2420 2421size_t KVBucket::getReplicaResidentRatio() const { 2422 return cachedResidentRatio.replicaRatio.load(); 2423} 2424 2425ENGINE_ERROR_CODE KVBucket::forceMaxCas(uint16_t vbucket, uint64_t cas) { 2426 VBucketPtr vb = vbMap.getBucket(vbucket); 2427 if (vb) { 2428 vb->forceMaxCas(cas); 2429 return ENGINE_SUCCESS; 2430 } 2431 return ENGINE_NOT_MY_VBUCKET; 2432} 2433 2434std::ostream& operator<<(std::ostream& os, const KVBucket::Position& pos) { 2435 os << "vbucket:" << pos.vbucket_id; 2436 return os; 2437} 2438 2439void KVBucket::notifyFlusher(const uint16_t vbid) { 2440 KVShard* shard = vbMap.getShardByVbId(vbid); 2441 if (shard) { 2442 shard->getFlusher()->notifyFlushEvent(); 2443 } else { 2444 throw std::logic_error( 2445 "KVBucket::notifyFlusher() : shard null for " 2446 "vbucket " + 2447 std::to_string(vbid)); 2448 } 2449} 2450 2451void KVBucket::notifyReplication(const uint16_t vbid, const int64_t bySeqno) { 2452 engine.getDcpConnMap().notifyVBConnections(vbid, bySeqno); 2453} 2454 2455void KVBucket::initializeExpiryPager(Configuration& config) { 2456 { 2457 LockHolder elh(expiryPager.mutex); 2458 expiryPager.enabled = config.isExpPagerEnabled(); 2459 } 2460 2461 setExpiryPagerSleeptime(config.getExpPagerStime()); 2462 2463 config.addValueChangedListener( 2464 "exp_pager_stime", 2465 std::make_unique<EPStoreValueChangeListener>(*this)); 2466 config.addValueChangedListener( 2467 "exp_pager_enabled", 2468 std::make_unique<EPStoreValueChangeListener>(*this)); 2469 config.addValueChangedListener( 2470 "exp_pager_initial_run_time", 2471 std::make_unique<EPStoreValueChangeListener>(*this)); 2472} 2473 2474cb::engine_error KVBucket::setCollections(cb::const_char_buffer json) { 2475 // cJSON can't accept a size so we must create a string 2476 std::string manifest(json.data(), json.size()); 2477 2478 // Inhibit VB state changes whilst updating the vbuckets 2479 LockHolder lh(vbsetMutex); 2480 2481 return collectionsManager->update(*this, manifest); 2482} 2483 2484cb::EngineErrorStringPair KVBucket::getCollections() const { 2485 return collectionsManager->getManifest(); 2486} 2487 2488const Collections::Manager& KVBucket::getCollectionsManager() const { 2489 return *collectionsManager.get(); 2490} 2491 2492bool KVBucket::isXattrEnabled() const { 2493 return xattrEnabled; 2494} 2495 2496void KVBucket::setXattrEnabled(bool value) { 2497 xattrEnabled = value; 2498} 2499 2500bool KVBucket::collectionsEraseKey( 2501 uint16_t vbid, 2502 const DocKey key, 2503 int64_t bySeqno, 2504 bool deleted, 2505 Collections::VB::EraserContext& eraserContext) { 2506 auto vb = getVBucket(vbid); 2507 boost::optional<cb::const_char_buffer> completedCollection; 2508 if (!vb) { 2509 return false; 2510 } 2511 2512 // If the eraserContext consumes the key - just return 2513 // deleted keys can be ignored at this point 2514 if (!deleted && eraserContext.manageSeparator(key)) { 2515 return false; 2516 } 2517 2518 { // collections read lock scope 2519 auto collectionsRHandle = vb->lockCollections(); 2520 if (collectionsRHandle.isLogicallyDeleted( 2521 key, bySeqno, eraserContext.getSeparator())) { 2522 vb->removeKey(key, bySeqno); 2523 2524 // Update item count for non-system collections. 2525 switch (key.getDocNamespace()) { 2526 case DocNamespace::DefaultCollection: 2527 case DocNamespace::Collections: 2528 vb->decrNumTotalItems(); 2529 break; 2530 case DocNamespace::System: 2531 break; 2532 } 2533 } else { 2534 return false; 2535 } 2536 2537 // Now determine if the key represents the end of a collection 2538 completedCollection = collectionsRHandle.shouldCompleteDeletion(key); 2539 } // read lock dropped as we may need the write lock in next block 2540 2541 // If a collection was returned, notify that all keys have been removed 2542 if (completedCollection.is_initialized()) { 2543 // completeDeletion obtains write access to the manifest 2544 vb->completeDeletion(completedCollection.get()); 2545 } 2546 return true; 2547} 2548 2549std::chrono::seconds KVBucket::getMaxTtl() const { 2550 return std::chrono::seconds{maxTtl.load()}; 2551} 2552 2553void KVBucket::setMaxTtl(size_t max) { 2554 maxTtl = max; 2555} 2556 2557uint16_t KVBucket::getNumOfVBucketsInState(vbucket_state_t state) const { 2558 return vbMap.getVBStateCount(state); 2559} 2560