1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2015 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #pragma once 19 20 #include "config.h" 21 22 #include "bloomfilter.h" 23 #include "checkpoint_config.h" 24 #include "collections/vbucket_manifest.h" 25 #include "dcp/dcp-types.h" 26 #include "hash_table.h" 27 #include "hlc.h" 28 #include "item_pager.h" 29 #include "kvstore.h" 30 #include "monotonic.h" 31 32 #include <memcached/engine.h> 33 #include <platform/atomic_duration.h> 34 #include <platform/non_negative_counter.h> 35 #include <relaxed_atomic.h> 36 #include <atomic> 37 #include <queue> 38 39 class EPStats; 40 class CheckpointManager; 41 class ConflictResolution; 42 class Configuration; 43 class ItemMetaData; 44 class PreLinkDocumentContext; 45 class EventuallyPersistentEngine; 46 class DCPBackfill; 47 class RollbackResult; 48 class VBucketBGFetchItem; 49 50 /** 51 * The following will be used to identify 52 * the source of an item's expiration. 53 */ 54 enum class ExpireBy { Pager, Compactor, Access }; 55 56 /* Structure that holds info needed for notification for an item being updated 57 in the vbucket */ 58 struct VBNotifyCtx { VBNotifyCtxVBNotifyCtx59 VBNotifyCtx() : bySeqno(0), notifyReplication(false), notifyFlusher(false) { 60 } 61 Monotonic<int64_t> bySeqno; 62 bool notifyReplication; 63 bool notifyFlusher; 64 }; 65 66 /** 67 * Structure that holds info needed to queue an item in chkpt or vb backfill 68 * queue 69 * 70 * GenerateDeleteTime - Only the queueing of items which are isDeleted() == true 71 * will this parameter have any affect. E.g. an add of an Item with this set to 72 * Yes will have no effect, whilst an explicit delete this parameter will have 73 * have an effect. 74 * 75 * Note that when queueing a delete with and expiry time of 0, the delete time 76 * is always generated. It is invalid to queue an Item with a 0 delete time, 77 * in this case the GenerateDeleteTime setting is ignored. 78 * 79 */ 80 struct VBQueueItemCtx { 81 /** 82 * For code which needs to determine if a delete time should be generated 83 * that is for example del and set-with-meta, expiry, deletes this 84 * constructor allows for the GenerateDeleteTime to be set as Yes or No. 85 */ VBQueueItemCtxVBQueueItemCtx86 VBQueueItemCtx(GenerateBySeqno genBySeqno, 87 GenerateCas genCas, 88 GenerateDeleteTime generateDeleteTime, 89 TrackCasDrift trackCasDrift, 90 bool isBackfillItem, 91 PreLinkDocumentContext* preLinkDocumentContext) 92 : genBySeqno(genBySeqno), 93 genCas(genCas), 94 generateDeleteTime(generateDeleteTime), 95 trackCasDrift(trackCasDrift), 96 isBackfillItem(isBackfillItem), 97 preLinkDocumentContext(preLinkDocumentContext) { 98 } 99 100 /** 101 * For code which is queueing Item's that are not deleted and the generate 102 * delete time has no affect, this constructor should be used. 103 */ VBQueueItemCtxVBQueueItemCtx104 VBQueueItemCtx(GenerateBySeqno genBySeqno, 105 GenerateCas genCas, 106 TrackCasDrift trackCasDrift, 107 bool isBackfillItem, 108 PreLinkDocumentContext* preLinkDocumentContext) 109 : genBySeqno(genBySeqno), 110 genCas(genCas), 111 generateDeleteTime(GenerateDeleteTime::No), 112 trackCasDrift(trackCasDrift), 113 isBackfillItem(isBackfillItem), 114 preLinkDocumentContext(preLinkDocumentContext) { 115 } 116 117 /* Indicates if we should queue an item or not. If this is false other 118 members should not be used */ 119 GenerateBySeqno genBySeqno; 120 GenerateCas genCas; 121 GenerateDeleteTime generateDeleteTime; 122 TrackCasDrift trackCasDrift; 123 bool isBackfillItem; 124 PreLinkDocumentContext* preLinkDocumentContext; 125 }; 126 127 /** 128 * Structure that holds seqno based or checkpoint persistence based high 129 * priority requests to a vbucket 130 */ 131 struct HighPriorityVBEntry { HighPriorityVBEntryHighPriorityVBEntry132 HighPriorityVBEntry(const void* c, 133 uint64_t idNum, 134 HighPriorityVBNotify reqType) 135 : cookie(c), id(idNum), reqType(reqType), start(ProcessClock::now()) { 136 } 137 138 const void* cookie; 139 uint64_t id; 140 HighPriorityVBNotify reqType; 141 142 /* for stats (histogram) */ 143 ProcessClock::time_point start; 144 }; 145 146 typedef std::unique_ptr<Callback<const uint16_t, const VBNotifyCtx&>> 147 NewSeqnoCallback; 148 149 class EventuallyPersistentEngine; 150 class FailoverTable; 151 class KVShard; 152 class VBucketMemoryDeletionTask; 153 154 /** 155 * An individual vbucket. 156 */ 157 class VBucket : public std::enable_shared_from_this<VBucket> { 158 public: 159 160 // Identifier for a vBucket 161 typedef uint16_t id_type; 162 163 enum class GetKeyOnly { 164 Yes, 165 No 166 }; 167 168 VBucket(id_type i, 169 vbucket_state_t newState, 170 EPStats& st, 171 CheckpointConfig& chkConfig, 172 int64_t lastSeqno, 173 uint64_t lastSnapStart, 174 uint64_t lastSnapEnd, 175 std::unique_ptr<FailoverTable> table, 176 std::shared_ptr<Callback<id_type>> flusherCb, 177 std::unique_ptr<AbstractStoredValueFactory> valFact, 178 NewSeqnoCallback newSeqnoCb, 179 Configuration& config, 180 item_eviction_policy_t evictionPolicy, 181 vbucket_state_t initState = vbucket_state_dead, 182 uint64_t purgeSeqno = 0, 183 uint64_t maxCas = 0, 184 int64_t hlcEpochSeqno = HlcCasSeqnoUninitialised, 185 bool mightContainXattrs = false, 186 const std::string& collectionsManifest = ""); 187 188 virtual ~VBucket(); 189 190 int64_t getHighSeqno() const; 191 192 size_t getChkMgrMemUsage() const; 193 194 size_t getChkMgrMemUsageOfUnrefCheckpoints() const; 195 196 size_t getChkMgrMemUsageOverhead() const; 197 getPurgeSeqno() const198 uint64_t getPurgeSeqno() const { 199 return purge_seqno; 200 } 201 setPurgeSeqno(uint64_t to)202 void setPurgeSeqno(uint64_t to) { 203 purge_seqno = to; 204 } 205 setPersistedSnapshot(uint64_t start, uint64_t end)206 void setPersistedSnapshot(uint64_t start, uint64_t end) { 207 LockHolder lh(snapshotMutex); 208 persisted_snapshot_start = start; 209 persisted_snapshot_end = end; 210 } 211 getPersistedSnapshot() const212 snapshot_range_t getPersistedSnapshot() const { 213 LockHolder lh(snapshotMutex); 214 return {persisted_snapshot_start, persisted_snapshot_end}; 215 } 216 getMaxCas() const217 uint64_t getMaxCas() const { 218 return hlc.getMaxHLC(); 219 } 220 setMaxCas(uint64_t cas)221 void setMaxCas(uint64_t cas) { 222 hlc.setMaxHLC(cas); 223 } 224 setMaxCasAndTrackDrift(uint64_t cas)225 void setMaxCasAndTrackDrift(uint64_t cas) { 226 hlc.setMaxHLCAndTrackDrift(cas); 227 } 228 forceMaxCas(uint64_t cas)229 void forceMaxCas(uint64_t cas) { 230 hlc.forceMaxHLC(cas); 231 } 232 getHLCDriftStats() const233 HLC::DriftStats getHLCDriftStats() const { 234 return hlc.getDriftStats(); 235 } 236 getHLCDriftExceptionCounters() const237 HLC::DriftExceptions getHLCDriftExceptionCounters() const { 238 return hlc.getDriftExceptionCounters(); 239 } 240 setHLCDriftAheadThreshold(std::chrono::microseconds threshold)241 void setHLCDriftAheadThreshold(std::chrono::microseconds threshold) { 242 hlc.setDriftAheadThreshold(threshold); 243 } 244 setHLCDriftBehindThreshold(std::chrono::microseconds threshold)245 void setHLCDriftBehindThreshold(std::chrono::microseconds threshold) { 246 hlc.setDriftBehindThreshold(threshold); 247 } 248 249 /** 250 * @returns a seqno, documents with a seqno >= the returned value have a HLC 251 * generated CAS. Can return HlcCasSeqnoUninitialised if warmup has not 252 * established or nothing is persisted 253 */ getHLCEpochSeqno() const254 int64_t getHLCEpochSeqno() const { 255 return hlc.getEpochSeqno(); 256 } 257 258 /** 259 * Set the seqno to be used to establish if an item has a HLC generated CAS. 260 * @param seqno the value to store in the vbucket 261 * @throws if an attempt to set to < 0 262 */ setHLCEpochSeqno(int64_t seqno)263 void setHLCEpochSeqno(int64_t seqno) { 264 if (seqno < 0) { 265 throw std::invalid_argument("VBucket::setHLCEpochSeqno(" + 266 std::to_string(seqno) + ") seqno < 0 "); 267 } 268 hlc.setEpochSeqno(seqno); 269 } 270 isTakeoverBackedUp()271 bool isTakeoverBackedUp() { 272 return takeover_backed_up.load(); 273 } 274 setTakeoverBackedUpState(bool to)275 void setTakeoverBackedUpState(bool to) { 276 bool inverse = !to; 277 takeover_backed_up.compare_exchange_strong(inverse, to); 278 } 279 280 // States whether the VBucket is in the process of being created isBucketCreation() const281 bool isBucketCreation() const { 282 return bucketCreation.load(); 283 } 284 setBucketCreation(bool rv)285 bool setBucketCreation(bool rv) { 286 bool inverse = !rv; 287 return bucketCreation.compare_exchange_strong(inverse, rv); 288 } 289 290 /** 291 * @return true if the vbucket deletion is to be deferred to a background 292 * task. 293 */ isDeletionDeferred() const294 bool isDeletionDeferred() const { 295 return deferredDeletion.load(); 296 } 297 298 /** 299 * @param value true if the vbucket's deletion should be deferred to a 300 * background task. This is for VBucket objects created by 301 * makeVBucket and owned by a VBucketPtr. If the VBucket was manually 302 * created this will have no effect on deletion. 303 */ setDeferredDeletion(bool value)304 void setDeferredDeletion(bool value) { 305 deferredDeletion.store(true); 306 } 307 308 /** 309 * @param A cookie to notify when the deferred deletion completes. 310 */ setDeferredDeletionCookie(const void* cookie)311 void setDeferredDeletionCookie(const void* cookie) { 312 deferredDeletionCookie = cookie; 313 } 314 315 /** 316 * @return the cookie which could of been set when setupDeferredDeletion was 317 * called. 318 */ getDeferredDeletionCookie() const319 const void* getDeferredDeletionCookie() const { 320 return deferredDeletionCookie; 321 } 322 323 /** 324 * Setup deferred deletion, this is where deletion of the vbucket is 325 * deferred and completed by an AUXIO/NONIO task. AUXIO for EPVBucket 326 * as it will hit disk for the data file unlink, NONIO is used for 327 * EphemeralVBucket as only memory resources need freeing. 328 * @param cookie A cookie to notify when the deletion task completes. 329 */ 330 virtual void setupDeferredDeletion(const void* cookie) = 0; 331 332 // Returns the last persisted sequence number for the VBucket 333 virtual uint64_t getPersistenceSeqno() const = 0; 334 335 /** 336 * Returns the sequence number to expose publically as the highest 337 * persisted seqno. Note this is may differ from getPersistenceSeqno, 338 * depending on the Bucket type. 339 * 340 * Historical note: This is the same as PersistenceSeqno for EP buckets, 341 * and hence before Spock wasn't a separate function; however for Ephemeral 342 * buckets we need to distinguish between what sequence number we report 343 * to external clients for Observe/persistTo, and what sequence number we 344 * report to internal DCP / ns_server for takeover: 345 * a) Clients need 0 for the Ephemeral "persisted to" seqno (as 346 * there isn't any Persistence and we can't claim something is on-disk 347 * when it is not). 348 * b) ns_server / replication needs a non-zero, "logically-persisted" seqno 349 * from the replica to know that a vBucket is ready for takeover. 350 * As such, getPublicPersistenceSeqno() is used for (a), and 351 * getPersistenceSeqno() is used for (b). 352 */ 353 virtual uint64_t getPublicPersistenceSeqno() const = 0; 354 setPersistenceSeqno(uint64_t seqno)355 void setPersistenceSeqno(uint64_t seqno) { 356 persistenceSeqno.store(seqno); 357 } 358 getId() const359 id_type getId() const { return id; } getState(void) const360 vbucket_state_t getState(void) const { return state.load(); } 361 362 /** 363 * Sets the vbucket state to a desired state 364 * 365 * @param to desired vbucket state 366 */ 367 void setState(vbucket_state_t to); 368 369 /** 370 * Sets the vbucket state to a desired state with the 'stateLock' already 371 * acquired 372 * 373 * @param to desired vbucket state 374 * @param vbStateLock write lock holder on 'stateLock' 375 */ 376 void setState_UNLOCKED(vbucket_state_t to, WriterLockHolder& vbStateLock); 377 getStateLock()378 cb::RWLock& getStateLock() {return stateLock;} 379 getInitialState(void)380 vbucket_state_t getInitialState(void) { return initialState; } 381 382 vbucket_state getVBucketState() const; 383 384 /** 385 * This method performs operations on the stored value prior 386 * to expiring the item. 387 * 388 * @param v the stored value 389 */ 390 void handlePreExpiry(const std::unique_lock<std::mutex>& hbl, 391 StoredValue& v); 392 393 bool addPendingOp(const void *cookie); 394 395 void doStatsForQueueing(const Item& item, size_t itemBytes); 396 void doStatsForFlushing(const Item& item, size_t itemBytes); 397 void incrMetaDataDisk(const Item& qi); 398 void decrMetaDataDisk(const Item& qi); 399 400 /// Increase the total count of items in this VBucket by 1. 401 virtual void incrNumTotalItems() = 0; 402 403 /// Decrease the total count of items in this VBucket by 1. 404 virtual void decrNumTotalItems() = 0; 405 406 /** 407 * Set the total count of items in this VBucket to the specified value. 408 */ 409 virtual void setNumTotalItems(size_t items) = 0; 410 411 /// Reset all statistics assocated with this vBucket. 412 virtual void resetStats(); 413 414 // Get age sum in millisecond 415 uint64_t getQueueAge(); 416 417 void fireAllOps(EventuallyPersistentEngine &engine); 418 size(void)419 size_t size(void) { 420 HashTableDepthStatVisitor v; 421 ht.visitDepth(v); 422 return v.size; 423 } 424 getBackfillSize()425 size_t getBackfillSize() { 426 LockHolder lh(backfill.mutex); 427 return backfill.items.size(); 428 } 429 430 /** 431 * Process an item from a DCP backfill. 432 * It puts it onto a queue for persistence and/or generates a seqno and 433 * updates stats 434 * 435 * @param qi item to be processed 436 * @param generateBySeqno indicates if a new seqno must generated or the 437 * seqno in the item must be used 438 * 439 * 440 */ 441 virtual void queueBackfillItem(queued_item& qi, 442 const GenerateBySeqno generateBySeqno) = 0; 443 444 /** 445 * Transfer any backfill items to the specified vector, up to the optional 446 * limit. 447 * @param items Destination for items transferred from backfill. 448 * @param limit If non-zero, limit the number of items transferred to the 449 * specified number. 450 * @return true if any more items remain in the backfill set - i.e. if 451 * limit constrained the number of items. 452 */ 453 bool getBackfillItems(std::vector<queued_item>& items, size_t limit = 0); 454 455 struct ItemsToFlush { 456 std::vector<queued_item> items; 457 snapshot_range_t range{0, 0}; 458 bool moreAvailable = false; 459 }; 460 461 /** 462 * Obtain the series of items to be flushed for this vBucket. 463 * 464 * @param vb VBucket to fetch items for. 465 * @param approxLimit Upper bound on how many items to fetch. 466 * @return The items to flush; along with their seqno range and 467 * if more items are available for this vBucket (i.e. the 468 * limit was reached). 469 */ 470 ItemsToFlush getItemsToPersist(size_t approxLimit); 471 isBackfillPhase()472 bool isBackfillPhase() { 473 return backfill.isBackfillPhase.load(); 474 } 475 setBackfillPhase(bool backfillPhase)476 void setBackfillPhase(bool backfillPhase) { 477 backfill.isBackfillPhase.store(backfillPhase); 478 } 479 480 /** 481 * Returns the map of bgfetch items for this vbucket, clearing the 482 * pendingBGFetches. 483 */ 484 virtual vb_bgfetch_queue_t getBGFetchItems() = 0; 485 486 virtual bool hasPendingBGFetchItems() = 0; 487 toString(vbucket_state_t s)488 static const char* toString(vbucket_state_t s) { 489 switch(s) { 490 case vbucket_state_active: return "active"; break; 491 case vbucket_state_replica: return "replica"; break; 492 case vbucket_state_pending: return "pending"; break; 493 case vbucket_state_dead: return "dead"; break; 494 } 495 return "unknown"; 496 } 497 fromString(const char* state)498 static vbucket_state_t fromString(const char* state) { 499 if (strcmp(state, "active") == 0) { 500 return vbucket_state_active; 501 } else if (strcmp(state, "replica") == 0) { 502 return vbucket_state_replica; 503 } else if (strcmp(state, "pending") == 0) { 504 return vbucket_state_pending; 505 } else { 506 return vbucket_state_dead; 507 } 508 } 509 510 /** 511 * Checks and decides whether to add high priority request on the vbucket. 512 * This is an async request made by modules like ns-server during 513 * rebalance. The request is for a response from the vbucket when it 514 * 'sees' beyond a certain sequence number or when a certain checkpoint 515 * is persisted. 516 * Depending on the vbucket type, the meaning 'seeing' a sequence number 517 * changes. That is, it could mean persisted in case of EPVBucket and 518 * added to the sequenced data structure in case of EphemeralVBucket. 519 * 520 * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted 521 * @param cookie cookie of conn to be notified 522 * @param reqType indicating request for seqno or chk persistence 523 * 524 * @return RequestScheduled if a high priority request is added and 525 * notification will be done asynchronously 526 * NotSupported if the request is not supported for the reqType 527 * RequestNotScheduled if a high priority request is NOT added (as 528 * it is not required). This implies there won't 529 * be a subsequent notification 530 */ 531 virtual HighPriorityVBReqStatus checkAddHighPriorityVBEntry( 532 uint64_t seqnoOrChkId, 533 const void* cookie, 534 HighPriorityVBNotify reqType) = 0; 535 536 /** 537 * Notify the high priority requests on the vbucket. 538 * This is the response to async requests made by modules like ns-server 539 * during rebalance. 540 * 541 * @param engine Ref to ep-engine 542 * @param id seqno or checkpoint id causing the notification(s). 543 * @param notifyType indicating notify for seqno or chk persistence 544 */ 545 virtual void notifyHighPriorityRequests( 546 EventuallyPersistentEngine& engine, 547 uint64_t id, 548 HighPriorityVBNotify notifyType) = 0; 549 550 virtual void notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) = 0; 551 552 /** 553 * Get high priority notifications for a seqno or checkpoint persisted 554 * 555 * @param engine Ref to ep-engine 556 * @param id seqno or checkpoint id for which notifies are to be found 557 * @param notifyType indicating notify for seqno or chk persistence 558 * 559 * @return map of notifications with conn cookie as the key and notify 560 * status as the value 561 */ 562 std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifications( 563 EventuallyPersistentEngine& engine, 564 uint64_t idNum, 565 HighPriorityVBNotify notifyType); 566 getHighPriorityChkSize()567 size_t getHighPriorityChkSize() { 568 return numHpVBReqs.load(); 569 } 570 571 /** 572 * BloomFilter operations for vbucket 573 */ 574 void createFilter(size_t key_count, double probability); 575 void initTempFilter(size_t key_count, double probability); 576 void addToFilter(const DocKey& key); 577 virtual bool maybeKeyExistsInFilter(const DocKey& key); 578 bool isTempFilterAvailable(); 579 void addToTempFilter(const DocKey& key); 580 void swapFilter(); 581 void clearFilter(); 582 void setFilterStatus(bfilter_status_t to); 583 std::string getFilterStatusString(); 584 size_t getFilterSize(); 585 size_t getNumOfKeysInFilter(); 586 nextHLCCas()587 uint64_t nextHLCCas() { 588 return hlc.nextHLC(); 589 } 590 591 // Applicable only for FULL EVICTION POLICY 592 bool isResidentRatioUnderThreshold(float threshold); 593 594 /** 595 * Returns true if deleted items (aka tombstones) are always resident in 596 * memory (and hence we do not need to attempt a bgFetch if we try to 597 * access a deleted key which isn't found in memory). 598 */ 599 virtual bool areDeletedItemsAlwaysResident() const = 0; 600 601 virtual void addStats(bool details, ADD_STAT add_stat, const void* c) = 0; 602 603 virtual KVShard* getShard() = 0; 604 605 /** 606 * Returns the number of alive (non-deleted) Items the VBucket. 607 * 608 * Includes items which are not currently resident in memory (i.e. under 609 * Full eviction and have been fully evicted from memory). 610 * Does *not* include deleted items. 611 */ 612 virtual size_t getNumItems() const = 0; 613 614 virtual size_t getNumNonResidentItems() const = 0; 615 getNumTempItems(void)616 size_t getNumTempItems(void) { 617 return ht.getNumTempItems(); 618 } 619 incrRollbackItemCount(uint64_t val)620 void incrRollbackItemCount(uint64_t val) { 621 rollbackItemCount.fetch_add(val, std::memory_order_relaxed); 622 } 623 getRollbackItemCount(void)624 uint64_t getRollbackItemCount(void) { 625 return rollbackItemCount.load(std::memory_order_relaxed); 626 } 627 628 // Return the persistence checkpoint ID 629 uint64_t getPersistenceCheckpointId() const; 630 631 // Set the persistence checkpoint ID to the given value. 632 void setPersistenceCheckpointId(uint64_t checkpointId); 633 634 // Mark the value associated with the given key as dirty 635 void markDirty(const DocKey& key); 636 637 /** 638 * Obtain the read handle for the collections manifest. 639 * The caller will have read-only access to manifest using the methods 640 * exposed by the ReadHandle 641 */ lockCollections() const642 Collections::VB::Manifest::ReadHandle lockCollections() const { 643 return manifest.lock(); 644 } 645 646 /** 647 * Obtain a caching read handle for the collections manifest. 648 * The returned handle will lookup the collection associated with key 649 * and cache the internal iterator so that future usage of 650 * isLogicallyDeleted doesn't need to re-scan and lookup. This is different 651 * to a plain ReadHandle which provides more functionality (more methods 652 * for the caller), but may result in extra lookups and key-scans. 653 * @param key A key to use for constructing the read handle. 654 * @return a CachingReadHandle which the caller should test is valid with 655 * CachingReadHandle::valid 656 */ lockCollections( const DocKey& key) const657 Collections::VB::Manifest::CachingReadHandle lockCollections( 658 const DocKey& key) const { 659 return manifest.lock(key); 660 } 661 lockCollections( const DocKey& key, const std::string& separator) const662 Collections::VB::Manifest::CachingReadHandle lockCollections( 663 const DocKey& key, const std::string& separator) const { 664 return manifest.lock(key, separator); 665 } 666 667 /** 668 * Update the Collections::VB::Manifest and the VBucket. 669 * Adds SystemEvents for the create and delete of collections into the 670 * checkpoint. 671 * 672 * @param m A Collections::Manifest to apply to the VB::Manifest 673 */ updateFromManifest(const Collections::Manifest& m)674 void updateFromManifest(const Collections::Manifest& m) { 675 manifest.wlock().update(*this, m); 676 } 677 678 /** 679 * Finalise the deletion of a collection (no items remain in the collection) 680 * 681 * @param collection Name of the collection that has completed deleting 682 */ completeDeletion(cb::const_char_buffer collection)683 void completeDeletion(cb::const_char_buffer collection) { 684 manifest.wlock().completeDeletion(*this, collection); 685 } 686 687 /** 688 * Add a collection to this vbucket with a pre-assigned seqno. I.e. 689 * this VB is a replica. 690 * 691 * @param manifestUid the uid of the manifest which made the change 692 * @param identifier Identifier for the collection to add. 693 * @param bySeqno The seqno assigned to the collection create event. 694 */ replicaAddCollection(Collections::uid_t manifestUid, Collections::Identifier identifier, int64_t bySeqno)695 void replicaAddCollection(Collections::uid_t manifestUid, 696 Collections::Identifier identifier, 697 int64_t bySeqno) { 698 manifest.wlock().replicaAdd(*this, manifestUid, identifier, bySeqno); 699 } 700 701 /** 702 * Delete a collection from this vbucket with a pre-assigned seqno. I.e. 703 * this VB is a replica. 704 * 705 * @param manifestUid the uid of the manifest which made the change 706 * @param identifier Identifier for the collection to begin deleting. 707 * @param bySeqno The seqno assigned to the collection delete event. 708 */ replicaBeginDeleteCollection(Collections::uid_t manifestUid, Collections::Identifier identifier, int64_t bySeqno)709 void replicaBeginDeleteCollection(Collections::uid_t manifestUid, 710 Collections::Identifier identifier, 711 int64_t bySeqno) { 712 manifest.wlock().replicaBeginDelete( 713 *this, manifestUid, identifier, bySeqno); 714 } 715 716 /** 717 * Delete a collection from this vbucket with a pre-assigned seqno. I.e. 718 * this VB is a replica. 719 * 720 * @param manifestUid the uid of the manifest which made the change 721 * @param separator The new separator. 722 * @param bySeqno The seqno assigned to the change separator event. 723 */ replicaChangeCollectionSeparator(Collections::uid_t manifestUid, cb::const_char_buffer separator, int64_t bySeqno)724 void replicaChangeCollectionSeparator(Collections::uid_t manifestUid, 725 cb::const_char_buffer separator, 726 int64_t bySeqno) { 727 manifest.wlock().replicaChangeSeparator( 728 *this, manifestUid, separator, bySeqno); 729 } 730 731 /** 732 * Get the collection manifest 733 * 734 * @return const reference to the manifest 735 */ getManifest() const736 const Collections::VB::Manifest& getManifest() const { 737 return manifest; 738 } 739 740 static const vbucket_state_t ACTIVE; 741 static const vbucket_state_t REPLICA; 742 static const vbucket_state_t PENDING; 743 static const vbucket_state_t DEAD; 744 745 HashTable ht; 746 747 /// Manager of this vBucket's checkpoints. unique_ptr for pimpl. 748 std::unique_ptr<CheckpointManager> checkpointManager; 749 750 // Struct for managing 'backfill' items - Items which have been added by 751 // an incoming DCP stream and need to be persisted to disk. 752 struct { 753 std::mutex mutex; 754 std::queue<queued_item> items; 755 std::atomic<bool> isBackfillPhase; 756 } backfill; 757 758 /** 759 * Gets the valid StoredValue for the key and deletes an expired item if 760 * desired by the caller. Requires the hash bucket to be locked 761 * 762 * @param hbl Reference to the hash bucket lock 763 * @param key 764 * @param wantsDeleted 765 * @param trackReference 766 * @param queueExpired Delete an expired item 767 */ 768 StoredValue* fetchValidValue(HashTable::HashBucketLock& hbl, 769 const DocKey& key, 770 WantsDeleted wantsDeleted, 771 TrackReference trackReference, 772 QueueExpired queueExpired); 773 774 /** 775 * Complete the background fetch for the specified item. Depending on the 776 * state of the item, restore it to the hashtable as appropriate, 777 * potentially queuing it as dirty. 778 * 779 * @param key The key of the item 780 * @param fetched_item The item which has been fetched. 781 * @param startTime The time processing of the batch of items started. 782 * 783 * @return ENGINE_ERROR_CODE status notified to be to the front end 784 */ 785 virtual ENGINE_ERROR_CODE completeBGFetchForSingleItem( 786 const DocKey& key, 787 const VBucketBGFetchItem& fetched_item, 788 const ProcessClock::time_point startTime) = 0; 789 790 /** 791 * Retrieve an item from the disk for vkey stats 792 * 793 * @param key the key to fetch 794 * @param cookie the connection cookie 795 * @param eviction_policy The eviction policy 796 * @param engine Reference to ep engine 797 * @param bgFetchDelay 798 * 799 * @return VBReturnCtx indicates notifyCtx and operation result 800 */ 801 virtual ENGINE_ERROR_CODE statsVKey(const DocKey& key, 802 const void* cookie, 803 EventuallyPersistentEngine& engine, 804 int bgFetchDelay) = 0; 805 806 /** 807 * Complete the vkey stats for an item background fetched from disk. 808 * 809 * @param key The key of the item 810 * @param gcb Bgfetch cbk obj containing the item from disk 811 * 812 */ 813 virtual void completeStatsVKey(const DocKey& key, const GetValue& gcb) = 0; 814 815 /** 816 * Set (add new or update) an item into in-memory structure like 817 * hash table and do not generate a seqno. This is called internally from 818 * ep-engine when we want to update our in-memory data (like in HT) with 819 * another source of truth like disk. 820 * Currently called during rollback. 821 * 822 * @param itm Item to be added or updated. Upon success, the itm 823 * revSeqno are updated 824 * 825 * @return Result indicating the status of the operation 826 */ 827 MutationStatus setFromInternal(Item& itm); 828 829 /** 830 * Set (add new or update) an item in the vbucket. 831 * 832 * @param itm Item to be added or updated. Upon success, the itm 833 * bySeqno, cas and revSeqno are updated 834 * @param cookie the connection cookie 835 * @param engine Reference to ep engine 836 * @param bgFetchDelay 837 * @param predicate a function to call which if returns true, the set will 838 * succeed. The function is called against any existing item. 839 * 840 * @return ENGINE_ERROR_CODE status notified to be to the front end 841 */ 842 ENGINE_ERROR_CODE set(Item& itm, 843 const void* cookie, 844 EventuallyPersistentEngine& engine, 845 int bgFetchDelay, 846 cb::StoreIfPredicate predicate); 847 848 /** 849 * Replace (overwrite existing) an item in the vbucket. 850 * 851 * @param itm Item to be added or updated. Upon success, the itm 852 * bySeqno, cas and revSeqno are updated 853 * @param cookie the connection cookie 854 * @param engine Reference to ep engine 855 * @param bgFetchDelay 856 * @param predicate a function to call which if returns true, the replace 857 * will succeed. The function is called against any existing item. 858 * @param readHandle Reader access to the Item's collection data. 859 * 860 * @return ENGINE_ERROR_CODE status notified to be to the front end 861 */ 862 ENGINE_ERROR_CODE replace( 863 Item& itm, 864 const void* cookie, 865 EventuallyPersistentEngine& engine, 866 int bgFetchDelay, 867 cb::StoreIfPredicate predicate, 868 const Collections::VB::Manifest::CachingReadHandle& readHandle); 869 870 /** 871 * Add an item directly into its vbucket rather than putting it on a 872 * checkpoint (backfill the item). The can happen during DCP or when a 873 * replica vbucket is receiving backfill items from active vbucket. 874 * 875 * @param itm Item to be added/updated from DCP backfill. Upon 876 * success, the itm revSeqno is updated 877 * @param genBySeqno whether or not to generate sequence number 878 * 879 * @return the result of the operation 880 */ 881 ENGINE_ERROR_CODE addBackfillItem(Item& itm, GenerateBySeqno genBySeqno); 882 883 /** 884 * Set an item in the store from a non-front end operation (DCP, XDCR) 885 * 886 * @param item the item to set. Upon success, the itm revSeqno is updated 887 * @param cas value to match 888 * @param seqno sequence number of mutation 889 * @param cookie the cookie representing the client to store the item 890 * @param engine Reference to ep engine 891 * @param bgFetchDelay 892 * @param checkConflicts should conflict resolution be done? 893 * @param allowExisting set to false if you want set to fail if the 894 * item exists already 895 * @param genBySeqno whether or not to generate sequence number 896 * @param genCas 897 * @param isReplication set to true if we are to use replication 898 * throttle threshold 899 * @param readHandle Reader access to the Item's collection data. 900 * 901 * @return the result of the store operation 902 */ 903 ENGINE_ERROR_CODE setWithMeta( 904 Item& itm, 905 uint64_t cas, 906 uint64_t* seqno, 907 const void* cookie, 908 EventuallyPersistentEngine& engine, 909 int bgFetchDelay, 910 CheckConflicts checkConflicts, 911 bool allowExisting, 912 GenerateBySeqno genBySeqno, 913 GenerateCas genCas, 914 bool isReplication, 915 const Collections::VB::Manifest::CachingReadHandle& readHandle); 916 917 /** 918 * Delete an item in the vbucket 919 * 920 * @param[in,out] cas value to match; new cas after logical delete 921 * @param cookie the cookie representing the client to store the item 922 * @param engine Reference to ep engine 923 * @param bgFetchDelay 924 * @param[out] itemMeta pointer to item meta data that needs to be returned 925 * as a result the delete. A NULL pointer indicates 926 * that no meta data needs to be returned. 927 * @param[out] mutInfo Info to uniquely identify (and order) the delete 928 * seq. A NULL pointer indicates no info needs to be 929 * returned. 930 * @param readHandle Reader access to the affected key's collection data. 931 * 932 * @return the result of the operation 933 */ 934 ENGINE_ERROR_CODE deleteItem( 935 uint64_t& cas, 936 const void* cookie, 937 EventuallyPersistentEngine& engine, 938 int bgFetchDelay, 939 ItemMetaData* itemMeta, 940 mutation_descr_t& mutInfo, 941 const Collections::VB::Manifest::CachingReadHandle& readHandle); 942 943 /** 944 * Delete an item in the vbucket from a non-front end operation (DCP, XDCR) 945 * 946 * @param[in, out] cas value to match; new cas after logical delete 947 * @param[out] seqno Pointer to get the seqno generated for the item. A 948 * NULL value is passed if not needed 949 * @param cookie the cookie representing the client to store the item 950 * @param engine Reference to ep engine 951 * @param bgFetchDelay 952 * @param checkConflicts should conflict resolution be done? 953 * @param itemMeta ref to item meta data 954 * @param backfill indicates if the item must be put onto vb queue or 955 * onto checkpoint 956 * @param genBySeqno whether or not to generate sequence number 957 * @param generateCas whether or not to generate cas 958 * @param bySeqno seqno of the key being deleted 959 * @param isReplication set to true if we are to use replication 960 * throttle threshold 961 * @param readHandle Reader access to the key's collection data. 962 * 963 * @return the result of the operation 964 */ 965 ENGINE_ERROR_CODE deleteWithMeta( 966 uint64_t& cas, 967 uint64_t* seqno, 968 const void* cookie, 969 EventuallyPersistentEngine& engine, 970 int bgFetchDelay, 971 CheckConflicts checkConflicts, 972 const ItemMetaData& itemMeta, 973 bool backfill, 974 GenerateBySeqno genBySeqno, 975 GenerateCas generateCas, 976 uint64_t bySeqno, 977 bool isReplication, 978 const Collections::VB::Manifest::CachingReadHandle& readHandle); 979 980 /** 981 * Delete an expired item 982 * 983 * @param it item to be deleted 984 * @param startTime the time to be compared with this item's expiry time 985 * @param source Expiry source 986 */ 987 void deleteExpiredItem(const Item& it, 988 time_t startTime, 989 ExpireBy source); 990 991 /** 992 * Evict a key from memory. 993 * 994 * @param key Key to evict 995 * @param[out] msg Updated to point to a string (with static duration) 996 * describing the result of the operation. 997 * 998 * @return SUCCESS if key was successfully evicted (or was already 999 * evicted), or the reason why the request failed. 1000 * 1001 */ 1002 virtual protocol_binary_response_status evictKey(const DocKey& key, 1003 const char** msg) = 0; 1004 1005 /** 1006 * Page out a StoredValue from memory. 1007 * 1008 * The definition of "page out" is up to the underlying VBucket 1009 * implementation - this may mean simply ejecting the value from memory 1010 * (Value Eviction), removing the entire document from memory (Full Eviction), 1011 * or actually deleting the document (Ephemeral Buckets). 1012 * 1013 * @param lh Bucket lock associated with the StoredValue. 1014 * @param v[in, out] Ref to the StoredValue to be ejected. Based on the 1015 * VBucket type, policy in the vbucket contents of v and 1016 * v itself may be changed 1017 * 1018 * @return true if an item is ejected. 1019 */ 1020 virtual bool pageOut(const HashTable::HashBucketLock& lh, 1021 StoredValue*& v) = 0; 1022 1023 /* 1024 * Check to see if a StoredValue is eligible to be paged out of memory. 1025 * 1026 * @param lh Bucket lock associated with the StoredValue. 1027 * @param v Reference to the StoredValue to be ejected. 1028 * 1029 * @return true if the StoredValue is eligible to be paged out. 1030 * 1031 */ 1032 virtual bool eligibleToPageOut(const HashTable::HashBucketLock& lh, 1033 const StoredValue& v) const = 0; 1034 1035 /** 1036 * Add an item in the store 1037 * 1038 * @param itm the item to add. On success, this will have its seqno and 1039 * CAS updated. 1040 * @param cookie the cookie representing the client to store the item 1041 * @param engine Reference to ep engine 1042 * @param bgFetchDelay 1043 * @param readHandle Reader access to the Item's collection data. 1044 * 1045 * @return the result of the operation 1046 */ 1047 ENGINE_ERROR_CODE add( 1048 Item& itm, 1049 const void* cookie, 1050 EventuallyPersistentEngine& engine, 1051 int bgFetchDelay, 1052 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1053 1054 /** 1055 * Retrieve a value, but update its TTL first 1056 * 1057 * @param cookie the connection cookie 1058 * @param engine Reference to ep engine 1059 * @param bgFetchDelay 1060 * @param exptime the new expiry time for the object 1061 * @param readHandle Reader access to the key's collection data. 1062 * 1063 * @return a GetValue representing the result of the request 1064 */ 1065 GetValue getAndUpdateTtl( 1066 const void* cookie, 1067 EventuallyPersistentEngine& engine, 1068 int bgFetchDelay, 1069 time_t exptime, 1070 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1071 /** 1072 * Queue an Item to the checkpoint and return its seqno 1073 * 1074 * @param item an Item object to queue, can be any kind of item and will be 1075 * given a CAS and seqno by this function. 1076 * @param seqno An optional sequence number, if not specified checkpoint 1077 * queueing will assign a seqno to the Item. 1078 */ 1079 int64_t queueItem(Item* item, OptionalSeqno seqno); 1080 1081 /** 1082 * Get metadata and value for a given key 1083 * 1084 * @param cookie the cookie representing the client 1085 * @param engine Reference to ep engine 1086 * @param bgFetchDelay 1087 * @param options flags indicating some retrieval related info 1088 * @param diskFlushAll 1089 * @param getKeyOnly if GetKeyOnly::Yes we want only the key 1090 * @param readHandle Reader access to the requested key's collection data. 1091 * 1092 * @return the result of the operation 1093 */ 1094 GetValue getInternal( 1095 const void* cookie, 1096 EventuallyPersistentEngine& engine, 1097 int bgFetchDelay, 1098 get_options_t options, 1099 bool diskFlushAll, 1100 GetKeyOnly getKeyOnly, 1101 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1102 1103 /** 1104 * Retrieve the meta data for given key 1105 * 1106 * @param cookie the connection cookie 1107 * @param engine Reference to ep engine 1108 * @param bgFetchDelay Delay in secs before we run the bgFetch task 1109 * @param readHandle Reader access to the key's collection data. 1110 * @param[out] metadata meta information returned to the caller 1111 * @param[out] deleted specifies the caller whether or not the key is 1112 * deleted 1113 * @param[out] datatype specifies the datatype of the item 1114 * 1115 * @return the result of the operation 1116 */ 1117 ENGINE_ERROR_CODE getMetaData( 1118 const void* cookie, 1119 EventuallyPersistentEngine& engine, 1120 int bgFetchDelay, 1121 const Collections::VB::Manifest::CachingReadHandle& readHandle, 1122 ItemMetaData& metadata, 1123 uint32_t& deleted, 1124 uint8_t& datatype); 1125 1126 /** 1127 * Looks up the key stats for the given {vbucket, key}. 1128 * 1129 * @param cookie The client's cookie 1130 * @param engine Reference to ep engine 1131 * @param bgFetchDelay 1132 * @param[out] kstats On success the keystats for this item. 1133 * @param wantsDeleted If yes then return keystats even if the item is 1134 * marked as deleted. If no then will return 1135 * ENGINE_KEY_ENOENT for deleted items. 1136 * @param readHandle Reader access to the key's collection data. 1137 * 1138 * @return the result of the operation 1139 */ 1140 ENGINE_ERROR_CODE getKeyStats( 1141 const void* cookie, 1142 EventuallyPersistentEngine& engine, 1143 int bgFetchDelay, 1144 struct key_stats& kstats, 1145 WantsDeleted wantsDeleted, 1146 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1147 1148 /** 1149 * Gets a locked item for a given key. 1150 * 1151 * @param currentTime Current time to use for locking the item for a 1152 * duration of lockTimeout 1153 * @param lockTimeout Timeout for the lock on the item 1154 * @param cookie The client's cookie 1155 * @param engine Reference to ep engine 1156 * @param bgFetchDelay Delay in secs before we run the bgFetch task 1157 * @param readHandle Reader access to the key's collection data. 1158 * 1159 * @return the result of the operation (contains locked item on success) 1160 */ 1161 GetValue getLocked( 1162 rel_time_t currentTime, 1163 uint32_t lockTimeout, 1164 const void* cookie, 1165 EventuallyPersistentEngine& engine, 1166 int bgFetchDelay, 1167 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1168 /** 1169 * Update in memory data structures after an item is deleted on disk 1170 * 1171 * @param queuedItem reference to the deleted item 1172 * @param deleted indicates if item actaully deleted or not (in case item 1173 * did not exist on disk) 1174 */ 1175 void deletedOnDiskCbk(const Item& queuedItem, bool deleted); 1176 1177 /** 1178 * Update in memory data structures after a rollback on disk 1179 * 1180 * @param queuedItem item key 1181 * 1182 * @return indicates if the operation is succcessful 1183 */ 1184 bool deleteKey(const DocKey& key); 1185 1186 /** 1187 * Creates a DCP backfill object 1188 * 1189 * @param e ref to EventuallyPersistentEngine 1190 * @param stream Shared ptr to the stream for which this backfill obj is 1191 * created 1192 * @param startSeqno requested start sequence number of the backfill 1193 * @param endSeqno requested end sequence number of the backfill 1194 * 1195 * @return pointer to the backfill object created. Caller to own this 1196 * object and hence must handle deletion. 1197 */ 1198 virtual std::unique_ptr<DCPBackfill> createDCPBackfill( 1199 EventuallyPersistentEngine& e, 1200 std::shared_ptr<ActiveStream> stream, 1201 uint64_t startSeqno, 1202 uint64_t endSeqno) = 0; 1203 1204 /** 1205 * Update failovers, checkpoint mgr and other vBucket members after 1206 * rollback. 1207 * 1208 * @param rollbackResult contains high seqno of the vBucket after rollback, 1209 * snapshot start seqno of the last snapshot in the 1210 * vBucket after the rollback, 1211 * snapshot end seqno of the last snapshot in the 1212 * vBucket after the rollback 1213 * @param prevHighSeqno high seqno before the rollback 1214 */ 1215 void postProcessRollback(const RollbackResult& rollbackResult, 1216 uint64_t prevHighSeqno); 1217 1218 /** 1219 * Debug - print a textual description of the VBucket to stderr. 1220 */ 1221 virtual void dump() const; 1222 1223 /** 1224 * Sets the callback function to invoke when a frequency counter becomes 1225 * saturated. The callback function is to invoke the ItemFreqDecayer 1226 * task. 1227 * 1228 * @param callbackFunction - the function to callback. 1229 */ 1230 void setFreqSaturatedCallback(std::function<void()> callbackFunction); 1231 1232 /** 1233 * Returns the number of deletes in the memory 1234 * 1235 * @return number of deletes 1236 */ getNumInMemoryDeletes() const1237 size_t getNumInMemoryDeletes() const { 1238 /* couchbase vbuckets: this is generally (after deletes are persisted) 1239 zero as hash table doesn't keep deletes after 1240 they are persisted. 1241 ephemeral vbuckets: we keep deletes in both hash table and ordered 1242 data structure. */ 1243 return ht.getNumDeletedItems(); 1244 } 1245 1246 /** 1247 * Set that this VBucket might store documents with xattrs. 1248 * Persistent vbucket will flush this to disk. 1249 */ setMightContainXattrs()1250 void setMightContainXattrs() { 1251 mayContainXattrs.store(true); 1252 } 1253 mightContainXattrs() const1254 bool mightContainXattrs() const { 1255 return mayContainXattrs.load(); 1256 } 1257 getInfo() const1258 cb::vbucket_info getInfo() const { 1259 return {mightContainXattrs()}; 1260 } 1261 1262 /** 1263 * If the key@bySeqno is found, remove it from the hash table 1264 * 1265 * @param key The key to look for 1266 * @param bySeqno The seqno of the key to remove 1267 */ 1268 void removeKey(const DocKey& key, int64_t bySeqno); 1269 1270 static std::chrono::seconds getCheckpointFlushTimeout(); 1271 1272 /** 1273 * Set the memory threshold on the current bucket quota for accepting a 1274 * new mutation. This is same across all the vbuckets 1275 * 1276 * @param memThreshold Threshold between 0 and 1 1277 */ 1278 static void setMutationMemoryThreshold(double memThreshold); 1279 1280 /** 1281 * Check if this StoredValue has become logically non-existent. 1282 * By logically non-existent, the item has been deleted 1283 * or doesn't exist 1284 * 1285 * @param v StoredValue to check 1286 * @param readHandle a ReadHandle for safe reading of collection data 1287 * @return true if the item is logically non-existent, 1288 * false otherwise 1289 */ 1290 static bool isLogicallyNonExistent( 1291 const StoredValue& v, 1292 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1293 1294 std::queue<queued_item> rejectQueue; 1295 std::unique_ptr<FailoverTable> failovers; 1296 1297 std::atomic<size_t> opsCreate; 1298 std::atomic<size_t> opsUpdate; 1299 std::atomic<size_t> opsDelete; 1300 std::atomic<size_t> opsReject; 1301 1302 cb::NonNegativeCounter<size_t> dirtyQueueSize; 1303 std::atomic<size_t> dirtyQueueMem; 1304 std::atomic<size_t> dirtyQueueFill; 1305 std::atomic<size_t> dirtyQueueDrain; 1306 std::atomic<uint64_t> dirtyQueueAge; 1307 std::atomic<size_t> dirtyQueuePendingWrites; 1308 std::atomic<size_t> metaDataDisk; 1309 1310 std::atomic<size_t> numExpiredItems; 1311 1312 /** 1313 * A custom delete function for deleting VBucket objects. Any thread could 1314 * be the last thread to release a VBucketPtr and deleting a VB will 1315 * eventually hit the I/O sub-system when we unlink the file, to be sure no 1316 * front-end thread does this work, we schedule the deletion to a background 1317 * task. This task scheduling is triggered by the shared_ptr/VBucketPtr 1318 * using this object as the deleter. 1319 */ 1320 struct DeferredDeleter { DeferredDeleterVBucket::DeferredDeleter1321 DeferredDeleter(EventuallyPersistentEngine& engine) : engine(engine) { 1322 } 1323 1324 /** 1325 * Called when the VBucketPtr has no more owners and runs delete on 1326 * the object. 1327 */ 1328 void operator()(VBucket* vb) const; 1329 1330 EventuallyPersistentEngine& engine; 1331 }; 1332 1333 protected: 1334 /** 1335 * This function checks for the various states of the value & depending on 1336 * which the calling function can issue a bgfetch as needed. 1337 */ 1338 std::pair<MutationStatus, GetValue> processGetAndUpdateTtl( 1339 HashTable::HashBucketLock& hbl, 1340 StoredValue* v, 1341 time_t exptime, 1342 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1343 /** 1344 * This function checks cas, expiry and other partition (vbucket) related 1345 * rules before setting an item into other in-memory structure like HT, 1346 * and checkpoint mgr. This function assumes that HT bucket lock is grabbed. 1347 * 1348 * @param hbl Hash table bucket lock that must be held 1349 * @param v Reference to the ptr of StoredValue. This can be changed if a 1350 * new StoredValue is added or just its contents is changed if the 1351 * exisiting StoredValue is updated 1352 * @param itm Item to be added/updated. On success, its revSeqno is updated 1353 * @param cas value to match 1354 * @param allowExisting set to false if you want set to fail if the 1355 * item exists already 1356 * @param hasMetaData 1357 * @param queueItmCtx holds info needed to queue an item in chkpt or vb 1358 * backfill queue 1359 * @param storeIfStatus the status of any conditional store predicate 1360 * @param maybeKeyExists true if the key /may/ exist on disk (as an active, 1361 * alive document). Only valid if `v` is null. 1362 * @param isReplication true if issued by consumer (for replication) 1363 * 1364 * @return Result indicating the status of the operation and notification 1365 * info (if operation was successful). 1366 */ 1367 std::pair<MutationStatus, boost::optional<VBNotifyCtx>> processSet( 1368 const HashTable::HashBucketLock& hbl, 1369 StoredValue*& v, 1370 Item& itm, 1371 uint64_t cas, 1372 bool allowExisting, 1373 bool hasMetaData, 1374 const VBQueueItemCtx& queueItmCtx, 1375 cb::StoreIfStatus storeIfStatus, 1376 bool maybeKeyExists = true, 1377 bool isReplication = false); 1378 1379 /** 1380 * This function checks cas, expiry and other partition (vbucket) related 1381 * rules before adding an item into other in-memory structure like HT, 1382 * and checkpoint mgr. This function assumes that HT bucket lock is grabbed. 1383 * 1384 * @param hbl Hash table bucket lock that must be held 1385 * @param v[in, out] the stored value to do this operation on 1386 * @param itm Item to be added/updated. On success, its revSeqno is updated 1387 * @param isReplication true if issued by consumer (for replication) 1388 * @param queueItmCtx holds info needed to queue an item in chkpt or vb 1389 * backfill queue 1390 * @param readHandle Reader access to the Item's collection data. 1391 * 1392 * @return Result indicating the status of the operation and notification 1393 * info (if the operation was successful). 1394 */ 1395 std::pair<AddStatus, boost::optional<VBNotifyCtx>> processAdd( 1396 const HashTable::HashBucketLock& hbl, 1397 StoredValue*& v, 1398 Item& itm, 1399 bool maybeKeyExists, 1400 bool isReplication, 1401 const VBQueueItemCtx& queueItmCtx, 1402 const Collections::VB::Manifest::CachingReadHandle& readHandle); 1403 1404 /** 1405 * This function checks cas, eviction policy and other partition 1406 * (vbucket) related rules before logically (soft) deleting an item in 1407 * in-memory structure like HT, and checkpoint mgr. 1408 * Assumes that HT bucket lock is grabbed. 1409 * 1410 * @param hbl Hash table bucket lock that must be held 1411 * @param v Reference to the StoredValue to be soft deleted 1412 * @param cas the expected CAS of the item (or 0 to override) 1413 * @param metadata ref to item meta data 1414 * @param queueItmCtx holds info needed to queue an item in chkpt or vb 1415 * backfill queue 1416 * @param use_meta Indicates if v must be updated with the metadata 1417 * @param bySeqno seqno of the key being deleted 1418 * 1419 * @return pointer to the updated StoredValue. It can be same as that of 1420 * v or different value if a new StoredValue is created for the 1421 * update. 1422 * status of the operation. 1423 * notification info, if status was successful. 1424 */ 1425 std::tuple<MutationStatus, StoredValue*, boost::optional<VBNotifyCtx>> 1426 processSoftDelete(const HashTable::HashBucketLock& hbl, 1427 StoredValue& v, 1428 uint64_t cas, 1429 const ItemMetaData& metadata, 1430 const VBQueueItemCtx& queueItmCtx, 1431 bool use_meta, 1432 uint64_t bySeqno); 1433 1434 /** 1435 * Delete a key (associated StoredValue) from ALL in-memory data structures 1436 * like HT. 1437 * Assumes that HT bucket lock is grabbed. 1438 * 1439 * Currently StoredValues form HashTable intrusively. That is, HashTable 1440 * does not store a reference or a copy of the StoredValue. If any other 1441 * in-memory data strucutures are formed intrusively using StoredValues, 1442 * then it must be decided in this function which data structure deletes 1443 * the StoredValue. Currently it is HashTable that deleted the StoredValue 1444 * 1445 * @param hbl Hash table bucket lock that must be held 1446 * @param v Reference to the StoredValue to be deleted 1447 * 1448 * @return true if an object was deleted, false otherwise 1449 */ 1450 bool deleteStoredValue(const HashTable::HashBucketLock& hbl, 1451 StoredValue& v); 1452 1453 /** 1454 * Queue an item for persistence and replication. Maybe track CAS drift 1455 * 1456 * The caller of this function must hold the lock of the hash table 1457 * partition that contains the StoredValue being Queued. 1458 * 1459 * @param v the dirty item. The cas and seqno maybe updated based on the 1460 * flags passed 1461 * @param queueItmCtx holds info needed to queue an item in chkpt or vb 1462 * backfill queue, whether to track cas, generate seqno, 1463 * generate new cas 1464 * 1465 * @return Notification context containing info needed to notify the 1466 * clients (like connections, flusher) 1467 */ 1468 VBNotifyCtx queueDirty(StoredValue& v, const VBQueueItemCtx& queueItmCtx); 1469 1470 /** 1471 * Queue an item for persistence and replication 1472 * 1473 * The caller of this function must hold the lock of the hash table 1474 * partition that contains the StoredValue being Queued. 1475 * 1476 * @param v the dirty item. The cas and seqno maybe updated based on the 1477 * flags passed 1478 * @param generateBySeqno request that the seqno is generated by this call 1479 * @param generateCas request that the CAS is generated by this call 1480 * @param generateDeleteTime for queueing deletes, should the queue item 1481 * have its delete time generated. 1482 * @param isBackfillItem indicates if the item must be put onto vb queue or 1483 * onto checkpoint 1484 * @param preLinkDocumentContext context object which allows running the 1485 * document pre link callback after the cas is assinged (but 1486 * but document not available for anyone) 1487 * 1488 * @return Notification context containing info needed to notify the 1489 * clients (like connections, flusher) 1490 */ 1491 VBNotifyCtx queueDirty( 1492 StoredValue& v, 1493 GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes, 1494 GenerateCas generateCas = GenerateCas::Yes, 1495 GenerateDeleteTime generateDeleteTime = GenerateDeleteTime::Yes, 1496 bool isBackfillItem = false, 1497 PreLinkDocumentContext* preLinkDocumentContext = nullptr); 1498 1499 /** 1500 * Adds a temporary StoredValue in in-memory data structures like HT. 1501 * Assumes that HT bucket lock is grabbed. 1502 * 1503 * @param hbl Hash table bucket lock that must be held 1504 * @param key the key for which a temporary item needs to be added 1505 * @param isReplication true if issued by consumer (for replication) 1506 * 1507 * @return Result indicating the status of the operation 1508 */ 1509 TempAddStatus addTempStoredValue(const HashTable::HashBucketLock& hbl, 1510 const DocKey& key, 1511 bool isReplication = false); 1512 1513 /** 1514 * Internal wrapper function around the callback to be called when a new 1515 * seqno is generated in the vbucket 1516 * 1517 * @param notifyCtx holds info needed for notification 1518 */ 1519 void notifyNewSeqno(const VBNotifyCtx& notifyCtx); 1520 1521 /** 1522 * VBucket internal function to store high priority requests on the vbucket. 1523 * 1524 * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted 1525 * @param cookie cookie of conn to be notified 1526 * @param reqType request type indicating seqno or chk persistence 1527 */ 1528 void addHighPriorityVBEntry(uint64_t seqnoOrChkId, 1529 const void* cookie, 1530 HighPriorityVBNotify reqType); 1531 1532 /** 1533 * Get all high priority notifications as temporary failures because they 1534 * could not be completed. 1535 * 1536 * @param engine Ref to ep-engine 1537 * 1538 * @return map of notifies with conn cookie as the key and notify status as 1539 * the value 1540 */ 1541 std::map<const void*, ENGINE_ERROR_CODE> tmpFailAndGetAllHpNotifies( 1542 EventuallyPersistentEngine& engine); 1543 1544 /** 1545 * Check if there is memory available to allocate the in-memory 1546 * instance (StoredValue or OrderedStoredValue) for an item. 1547 * 1548 * @param st Reference to epstats 1549 * @param item Item that is being added 1550 * @param isReplication Flag indicating if the item is from replication 1551 * 1552 * @return True if there is memory for the item; else False 1553 */ 1554 bool hasMemoryForStoredValue(EPStats& st, 1555 const Item& item, 1556 bool isReplication); 1557 1558 void _addStats(bool details, ADD_STAT add_stat, const void* c); 1559 1560 template <typename T> 1561 void addStat(const char* nm, 1562 const T& val, 1563 ADD_STAT add_stat, 1564 const void* c); 1565 1566 /* This member holds the eviction policy used */ 1567 const item_eviction_policy_t eviction; 1568 1569 /* Reference to global (EP engine wide) stats */ 1570 EPStats& stats; 1571 1572 /* last seqno that is persisted on the disk */ 1573 std::atomic<uint64_t> persistenceSeqno; 1574 1575 /* holds all high priority async requests to the vbucket */ 1576 std::list<HighPriorityVBEntry> hpVBReqs; 1577 1578 /* synchronizes access to hpVBReqs */ 1579 std::mutex hpVBReqsMutex; 1580 1581 /* size of list hpVBReqs (to avoid MB-9434) */ 1582 Couchbase::RelaxedAtomic<size_t> numHpVBReqs; 1583 1584 /** 1585 * VBucket sub-classes must implement a function that will schedule 1586 * an appropriate task that will delete the VBucket and its resources. 1587 * 1588 * @param engine owning engine (required for task construction) 1589 */ 1590 virtual void scheduleDeferredDeletion( 1591 EventuallyPersistentEngine& engine) = 0; 1592 1593 /** 1594 * Update the revision seqno of a newly StoredValue item. 1595 * We must ensure that it is greater the maxDeletedRevSeqno 1596 * 1597 * @param v StoredValue added newly. Its revSeqno is updated 1598 */ 1599 void updateRevSeqNoOfNewStoredValue(StoredValue& v); 1600 1601 private: 1602 void fireAllOps(EventuallyPersistentEngine& engine, ENGINE_ERROR_CODE code); 1603 1604 void decrDirtyQueueMem(size_t decrementBy); 1605 1606 void decrDirtyQueueAge(uint32_t decrementBy); 1607 1608 void decrDirtyQueuePendingWrites(size_t decrementBy); 1609 1610 /** 1611 * Updates an existing StoredValue in in-memory data structures like HT. 1612 * Assumes that HT bucket lock is grabbed. 1613 * 1614 * @param htLock Hash table lock that must be held 1615 * @param v Reference to the StoredValue to be updated. 1616 * @param itm Item to be updated. 1617 * @param queueItmCtx holds info needed to queue an item in chkpt or vb 1618 * backfill queue 1619 * @param justTouch To note that this object is an existing item with 1620 * the same value but with few flags changed. 1621 * @return pointer to the updated StoredValue. It can be same as that of 1622 * v or different value if a new StoredValue is created for the 1623 * update. 1624 * status of the operation. 1625 * notification info. 1626 */ 1627 virtual std::tuple<StoredValue*, MutationStatus, VBNotifyCtx> 1628 updateStoredValue(const HashTable::HashBucketLock& hbl, 1629 StoredValue& v, 1630 const Item& itm, 1631 const VBQueueItemCtx& queueItmCtx, 1632 bool justTouch = false) = 0; 1633 1634 /** 1635 * Adds a new StoredValue in in-memory data structures like HT. 1636 * Assumes that HT bucket lock is grabbed. 1637 * 1638 * @param hbl Hash table bucket lock that must be held 1639 * @param itm Item to be added. 1640 * @param queueItmCtx holds info needed to queue an item in chkpt or vb 1641 * backfill queue 1642 * @param genRevSeqno whether to generate new revision sequence number 1643 * or not 1644 * 1645 * @return Ptr of the StoredValue added and notification info 1646 */ 1647 virtual std::pair<StoredValue*, VBNotifyCtx> addNewStoredValue( 1648 const HashTable::HashBucketLock& hbl, 1649 const Item& itm, 1650 const VBQueueItemCtx& queueItmCtx, 1651 GenerateRevSeqno genRevSeqno) = 0; 1652 1653 /** 1654 * Logically (soft) delete item in all in-memory data structures. Also 1655 * updates revSeqno. Depending on the in-memory data structure the item may 1656 * be marked delete and/or reset and/or a new value (marked as deleted) 1657 * added. 1658 * Assumes that HT bucket lock is grabbed. 1659 * Also assumes that v is in the hash table. 1660 * 1661 * @param hbl Hash table bucket lock that must be held 1662 * @param v Reference to the StoredValue to be soft deleted 1663 * @param onlyMarkDeleted indicates if we must reset the StoredValue or 1664 * just mark deleted 1665 * @param queueItmCtx holds info needed to queue an item in chkpt or vb 1666 * backfill queue 1667 * @param bySeqno seqno of the key being deleted 1668 * 1669 * @return pointer to the updated StoredValue. It can be same as that of 1670 * v or different value if a new StoredValue is created for the 1671 * update. 1672 * notification info. 1673 */ 1674 virtual std::tuple<StoredValue*, VBNotifyCtx> softDeleteStoredValue( 1675 const HashTable::HashBucketLock& hbl, 1676 StoredValue& v, 1677 bool onlyMarkDeleted, 1678 const VBQueueItemCtx& queueItmCtx, 1679 uint64_t bySeqno) = 0; 1680 1681 /** 1682 * This function handles expiry relatead stuff before logically (soft) 1683 * deleting an item in in-memory structures like HT, and checkpoint mgr. 1684 * Assumes that HT bucket lock is grabbed. 1685 * 1686 * @param hbl Hash table bucket lock that must be held 1687 * @param v Reference to the StoredValue to be soft deleted 1688 * 1689 * @return status of the operation. 1690 * pointer to the updated StoredValue. It can be same as that of 1691 * v or different value if a new StoredValue is created for the 1692 * update. 1693 * notification info. 1694 */ 1695 std::tuple<MutationStatus, StoredValue*, VBNotifyCtx> processExpiredItem( 1696 const HashTable::HashBucketLock& hbl, StoredValue& v); 1697 1698 /** 1699 * Add a temporary item in hash table and enqueue a background fetch for a 1700 * key. 1701 * 1702 * @param hbl Reference to the hash table bucket lock 1703 * @param key the key to be bg fetched 1704 * @param cookie the cookie of the requestor 1705 * @param engine Reference to ep engine 1706 * @param bgFetchDelay Delay in secs before we run the bgFetch task 1707 * @param metadataOnly whether the fetch is for a non-resident value or 1708 * metadata of a (possibly) deleted item 1709 * @param isReplication indicates if the call is for a replica vbucket 1710 * 1711 * @return ENGINE_ERROR_CODE status notified to be to the front end 1712 */ 1713 virtual ENGINE_ERROR_CODE addTempItemAndBGFetch( 1714 HashTable::HashBucketLock& hbl, 1715 const DocKey& key, 1716 const void* cookie, 1717 EventuallyPersistentEngine& engine, 1718 int bgFetchDelay, 1719 bool metadataOnly, 1720 bool isReplication = false) = 0; 1721 1722 /** 1723 * Enqueue a background fetch for a key. 1724 * 1725 * @param key the key to be bg fetched 1726 * @param cookie the cookie of the requestor 1727 * @param engine Reference to ep engine 1728 * @param bgFetchDelay Delay in secs before we run the bgFetch task 1729 * @param isMeta whether the fetch is for a non-resident value or metadata 1730 * of a (possibly) deleted item 1731 */ 1732 virtual void bgFetch(const DocKey& key, 1733 const void* cookie, 1734 EventuallyPersistentEngine& engine, 1735 int bgFetchDelay, 1736 bool isMeta = false) = 0; 1737 1738 /** 1739 * Get metadata and value for a non-resident key 1740 * 1741 * @param key key for which metadata and value should be retrieved 1742 * @param cookie the cookie representing the client 1743 * @param engine Reference to ep engine 1744 * @param bgFetchDelay Delay in secs before we run the bgFetch task 1745 * @param queueBgFetch Indicates whether a background fetch needs to be 1746 * queued 1747 * @param v reference to the stored value of the non-resident key 1748 * 1749 * @return the result of the operation 1750 */ 1751 virtual GetValue getInternalNonResident(const DocKey& key, 1752 const void* cookie, 1753 EventuallyPersistentEngine& engine, 1754 int bgFetchDelay, 1755 QueueBgFetch queueBgFetch, 1756 const StoredValue& v) = 0; 1757 1758 /** 1759 * Increase the expiration count global stats and in the vbucket stats 1760 */ 1761 void incExpirationStat(ExpireBy source); 1762 1763 void adjustCheckpointFlushTimeout(std::chrono::seconds wall_time); 1764 1765 /** 1766 * Given a StoredValue with XATTRs - prune the user keys so only system keys 1767 * remain. 1768 * 1769 * @param v StoredValue with XATTR value 1770 * @param itemMeta New ItemMetaData to use in item creation 1771 * @return unique_ptr<Item> which matches the StoredValue's meta-data and 1772 * has the XATTR value with only the system-keys. If the pruning 1773 * removed all keys (because no system-keys exist) an empty 1774 * unique_ptr is returned. 1775 */ 1776 std::unique_ptr<Item> pruneXattrDocument(StoredValue& v, 1777 const ItemMetaData& itemMeta); 1778 1779 /** 1780 * Estimate the new total memory usage with the allocation of an in-memory 1781 * instance for item 1782 * 1783 * @param st Reference to epstats 1784 * @param item Item that is being added 1785 * 1786 * @return new total size for this Bucket once Item is allocated 1787 */ 1788 virtual size_t estimateNewMemoryUsage(EPStats& st, const Item& item) = 0; 1789 1790 /* 1791 * Call the predicate with item_info from v (none if v is nullptr) 1792 * @param predicate a function to call, must be initialised 1793 * @param v the StoredValue (or nullptr if none in cache) 1794 * @return how the caller should proceed (store_if semantics) 1795 */ 1796 cb::StoreIfStatus callPredicate(cb::StoreIfPredicate predicate, 1797 StoredValue* v); 1798 1799 id_type id; 1800 std::atomic<vbucket_state_t> state; 1801 cb::RWLock stateLock; 1802 vbucket_state_t initialState; 1803 std::mutex pendingOpLock; 1804 std::vector<const void*> pendingOps; 1805 ProcessClock::time_point pendingOpsStart; 1806 WeaklyMonotonic<uint64_t> purge_seqno; 1807 std::atomic<bool> takeover_backed_up; 1808 1809 /* snapshotMutex is used to update/read the pair {start, end} atomically, 1810 but not if reading a single field. */ 1811 mutable std::mutex snapshotMutex; 1812 uint64_t persisted_snapshot_start; 1813 uint64_t persisted_snapshot_end; 1814 1815 std::mutex bfMutex; 1816 std::unique_ptr<BloomFilter> bFilter; 1817 std::unique_ptr<BloomFilter> tempFilter; // Used during compaction. 1818 1819 std::atomic<uint64_t> rollbackItemCount; 1820 1821 HLC hlc; 1822 std::string statPrefix; 1823 // The persistence checkpoint ID for this vbucket. 1824 std::atomic<uint64_t> persistenceCheckpointId; 1825 // Flag to indicate the vbucket is being created 1826 std::atomic<bool> bucketCreation; 1827 // Flag to indicate the vbucket deletion is deferred 1828 std::atomic<bool> deferredDeletion; 1829 /// A cookie that can be set when the vbucket is deletion is deferred, the 1830 /// cookie will be notified when the deferred deletion completes 1831 const void* deferredDeletionCookie; 1832 1833 // Ptr to the item conflict resolution module 1834 std::unique_ptr<ConflictResolution> conflictResolver; 1835 1836 // A callback to be called when a new seqno is generated in the vbucket as 1837 // a result of a front end call 1838 NewSeqnoCallback newSeqnoCb; 1839 1840 /// The VBucket collection state 1841 Collections::VB::Manifest manifest; 1842 1843 /** 1844 * records if the vbucket has had xattrs documents written to it, note that 1845 * rollback of data or delete of all the xattr documents does not undo the 1846 * flag. 1847 */ 1848 std::atomic<bool> mayContainXattrs; 1849 1850 static cb::AtomicDuration chkFlushTimeout; 1851 1852 static double mutationMemThreshold; 1853 1854 friend class VBucketTestBase; 1855 1856 DISALLOW_COPY_AND_ASSIGN(VBucket); 1857 }; 1858 1859 using VBucketPtr = std::shared_ptr<VBucket>; 1860 1861 /** 1862 * Represents a locked VBucket that provides RAII semantics for the lock. 1863 * 1864 * Behaves like the underlying shared_ptr - i.e. `operator->` is overloaded 1865 * to return the underlying VBucket. 1866 */ 1867 class LockedVBucketPtr { 1868 public: LockedVBucketPtr(VBucketPtr vb, std::unique_lock<std::mutex>&& lock)1869 LockedVBucketPtr(VBucketPtr vb, std::unique_lock<std::mutex>&& lock) 1870 : vb(std::move(vb)), lock(std::move(lock)) { 1871 } 1872 operator *() const1873 VBucket& operator*() const { 1874 return *vb; 1875 } 1876 operator ->() const1877 VBucket* operator->() const { 1878 return vb.get(); 1879 } 1880 operator bool() const1881 explicit operator bool() const { 1882 return vb.operator bool(); 1883 } 1884 getVB()1885 VBucketPtr& getVB() { 1886 return vb; 1887 } 1888 1889 /// Return true if this object owns the mutex. owns_lock() const1890 bool owns_lock() const { 1891 return lock.owns_lock(); 1892 } 1893 1894 private: 1895 VBucketPtr vb; 1896 std::unique_lock<std::mutex> lock; 1897 }; 1898