xref: /6.6.0/kv_engine/engines/ep/src/kv_bucket.h (revision c1be21e1)
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *   you may not use this file except in compliance with the License.
4  *   You may obtain a copy of the License at
5  *
6  *       http://www.apache.org/licenses/LICENSE-2.0
7  *
8  *   Unless required by applicable law or agreed to in writing, software
9  *   distributed under the License is distributed on an "AS IS" BASIS,
10  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  *   See the License for the specific language governing permissions and
12  *   limitations under the License.
13  */
14 
15 #pragma once
16 
17 #include "ep_types.h"
18 #include "kv_bucket_iface.h"
19 #include "mutation_log.h"
20 #include "stored-value.h"
21 #include "storeddockey.h"
22 #include "task_type.h"
23 #include "utility.h"
24 #include "vbucket.h"
25 #include "vbucketmap.h"
26 
27 #include <cstdlib>
28 #include <deque>
29 
30 class DurabilityCompletionTask;
31 class ReplicationThrottle;
32 class VBucketCountVisitor;
33 namespace Collections {
34 class Manager;
35 }
36 
37 /**
38  * VBucket Callback Adaptor is a helper task used to implement visitAsync().
39  *
40  * It is used to assist in visiting multiple vBuckets, without creating a
41  * separate task (and associated task overhead) for each vBucket individually.
42  *
43  * The set of vBuckets to visit is obtained by applying
44  * VBucketVisitor::getVBucketFilter() to the set of vBuckets the Bucket has.
45  */
46 class VBCBAdaptor : public GlobalTask {
47 public:
48     VBCBAdaptor(KVBucket* s,
49                 TaskId id,
50                 std::unique_ptr<PausableVBucketVisitor> v,
51                 const char* l,
52                 bool shutdown);
53 
54     std::string getDescription() override;
55 
56     /// Set the maximum expected duration for this task.
setMaxExpectedDuration(std::chrono::microseconds duration)57     void setMaxExpectedDuration(std::chrono::microseconds duration) {
58         maxDuration = duration;
59     }
60 
maxExpectedDuration()61     std::chrono::microseconds maxExpectedDuration() override {
62         return maxDuration;
63     }
64 
65     /**
66      * Execute the VBCBAdapter task using our visitor.
67      *
68      * Calls the visitVBucket() method of the visitor object for each vBucket.
69      * Before each visitVBucket() call, calls pauseVisitor() to check if
70      * visiting should be paused. If true, will sleep for 0s, yielding execution
71      * back to the executor - to allow any higher priority tasks to run.
72      * When run() is called again, will resume from the vBucket it paused at.
73      */
74     bool run() override;
75 
76 private:
77     KVBucket* store;
78     std::unique_ptr<PausableVBucketVisitor> visitor;
79     const char                 *label;
80     std::chrono::microseconds maxDuration;
81 
82     /**
83      * Current VBucket.
84      * RelaxedAtomic as this is used by getDescription to generate the task
85      * description, which can be called by threads other than the one executing.
86      */
87     cb::RelaxedAtomic<Vbid::id_type> currentvb;
88 
89     DISALLOW_COPY_AND_ASSIGN(VBCBAdaptor);
90 };
91 
92 const uint16_t EP_PRIMARY_SHARD = 0;
93 class KVShard;
94 
95 typedef std::pair<Vbid, ExTask> CompTaskEntry;
96 
97 /**
98  * KVBucket is the base class for concrete Key/Value bucket implementations
99  * which use the concept of VBuckets to support replication, persistence and
100  * failover.
101  *
102  */
103 class KVBucket : public KVBucketIface {
104 public:
105 
106     KVBucket(EventuallyPersistentEngine &theEngine);
107     virtual ~KVBucket();
108 
109     bool initialize() override;
110 
111     void deinitialize() override;
112 
113     ENGINE_ERROR_CODE set(Item& item,
114                           const void* cookie,
115                           cb::StoreIfPredicate predicate = {}) override;
116 
117     ENGINE_ERROR_CODE add(Item &item, const void *cookie) override;
118 
119     ENGINE_ERROR_CODE replace(Item& item,
120                               const void* cookie,
121                               cb::StoreIfPredicate predicate = {}) override;
122 
get(const DocKey & key,Vbid vbucket,const void * cookie,get_options_t options)123     GetValue get(const DocKey& key,
124                  Vbid vbucket,
125                  const void* cookie,
126                  get_options_t options) override {
127         return getInternal(key, vbucket, cookie, ForGetReplicaOp::No, options);
128     }
129 
130     GetValue getRandomKey() override;
131 
getReplica(const DocKey & key,Vbid vbucket,const void * cookie,get_options_t options)132     GetValue getReplica(const DocKey& key,
133                         Vbid vbucket,
134                         const void* cookie,
135                         get_options_t options) override {
136         return getInternal(key, vbucket, cookie, ForGetReplicaOp::Yes, options);
137     }
138 
139     ENGINE_ERROR_CODE getMetaData(const DocKey& key,
140                                   Vbid vbucket,
141                                   const void* cookie,
142                                   ItemMetaData& metadata,
143                                   uint32_t& deleted,
144                                   uint8_t& datatype) override;
145 
146     ENGINE_ERROR_CODE setWithMeta(
147             Item& item,
148             uint64_t cas,
149             uint64_t* seqno,
150             const void* cookie,
151             PermittedVBStates permittedVBStates,
152             CheckConflicts checkConflicts,
153             bool allowExisting,
154             GenerateBySeqno genBySeqno = GenerateBySeqno::Yes,
155             GenerateCas genCas = GenerateCas::No,
156             ExtendedMetaData* emd = NULL) override;
157 
158     ENGINE_ERROR_CODE prepare(Item& item, const void* cookie);
159 
160     GetValue getAndUpdateTtl(const DocKey& key,
161                              Vbid vbucket,
162                              const void* cookie,
163                              time_t exptime) override;
164 
165     ENGINE_ERROR_CODE deleteItem(
166             const DocKey& key,
167             uint64_t& cas,
168             Vbid vbucket,
169             const void* cookie,
170             boost::optional<cb::durability::Requirements> durability,
171             ItemMetaData* itemMeta,
172             mutation_descr_t& mutInfo) override;
173 
174     ENGINE_ERROR_CODE deleteWithMeta(const DocKey& key,
175                                      uint64_t& cas,
176                                      uint64_t* seqno,
177                                      Vbid vbucket,
178                                      const void* cookie,
179                                      PermittedVBStates permittedVBStates,
180                                      CheckConflicts checkConflicts,
181                                      const ItemMetaData& itemMeta,
182                                      GenerateBySeqno genBySeqno,
183                                      GenerateCas generateCas,
184                                      uint64_t bySeqno,
185                                      ExtendedMetaData* emd,
186                                      DeleteSource deleteSource) override;
187 
188     void reset() override;
189 
190     bool pauseFlusher() override;
191     bool resumeFlusher() override;
192     void wakeUpFlusher() override;
193 
194     void snapshotStats() override;
195 
196     void getAggregatedVBucketStats(const void* cookie,
197                                    const AddStatFn& add_stat) override;
198 
199     void completeBGFetchMulti(Vbid vbId,
200                               std::vector<bgfetched_item_t>& fetchedItems,
201                               std::chrono::steady_clock::time_point start) override;
202 
203     /**
204      * Returns the number of vbuckets in a given state.
205      * @param state  the vbucket state to compare against
206      * @return  the number of vbuckets in the requested state
207      */
208     uint16_t getNumOfVBucketsInState(vbucket_state_t state) const;
209 
210     /**
211      * Returns a vector containing the vbuckets from the vbMap that are in
212      * the given state.
213      * @param state  the state used to filter which vbuckets to return
214      * @return  vector of vbuckets that are in the given state.
215      */
getVBucketsInState(vbucket_state_t state)216     std::vector<Vbid> getVBucketsInState(vbucket_state_t state) const {
217         return vbMap.getBucketsInState(state);
218     }
219 
getVBucket(Vbid vbid)220     VBucketPtr getVBucket(Vbid vbid) override {
221         return vbMap.getBucket(vbid);
222     }
223 
224     /**
225      * Return a pointer to the given VBucket, acquiring the appropriate VB
226      * mutex lock at the same time.
227      * @param vbid VBucket ID to get.
228      * @return A RAII-style handle which owns the correct VBucket mutex,
229      *         alongside a shared pointer to the requested VBucket.
230      */
getLockedVBucket(Vbid vbid)231     LockedVBucketPtr getLockedVBucket(Vbid vbid) {
232         std::unique_lock<std::mutex> lock(vb_mutexes[vbid.get()]);
233         return {vbMap.getBucket(vbid), std::move(lock)};
234     }
235 
236     /**
237      * Attempt to return a pointer to the given VBucket and the VBucket's mutex,
238      * if the mutex isn't already acquired.
239      * @param vbid VBucket ID to get.
240      * @return If the mutex was available then a pointer to the VBucket and
241      *         a lock on the mutex. If the mutex was already locked then returns
242      *         nullptr and a lock object which owns_lock() returns false.
243      *
244      * For correct usage, clients should call owns_lock() to check if they
245      * successfully acquired a locked VBucket.
246      */
getLockedVBucket(Vbid vbid,std::try_to_lock_t)247     LockedVBucketPtr getLockedVBucket(Vbid vbid, std::try_to_lock_t) {
248         std::unique_lock<std::mutex> lock(vb_mutexes[vbid.get()],
249                                           std::try_to_lock);
250         if (!lock) {
251             return {{}, std::move(lock)};
252         }
253         return {vbMap.getBucket(vbid), std::move(lock)};
254     }
255 
getMemFootPrint(void)256     size_t getMemFootPrint(void) {
257         size_t mem = 0;
258         for (size_t i = 0; i < vbMap.shards.size(); i++) {
259             KVShard* shard = vbMap.shards[i].get();
260             mem += shard->getRWUnderlying()->getMemFootPrint();
261             mem += shard->getROUnderlying()->getMemFootPrint();
262         }
263         return mem;
264     }
265 
getLastPersistedCheckpointId(Vbid vb)266     std::pair<uint64_t, bool> getLastPersistedCheckpointId(Vbid vb) override {
267         // No persistence at the KVBucket class level.
268         return {0, false};
269     }
270 
getLastPersistedSeqno(Vbid vb)271     uint64_t getLastPersistedSeqno(Vbid vb) override {
272         auto vbucket = vbMap.getBucket(vb);
273         if (vbucket) {
274             return vbucket->getPersistenceSeqno();
275         } else {
276             return 0;
277         }
278     }
279 
280     /**
281      * Release all cookies blocked for sync write
282      *
283      * This method is called during bucket shutdown to make sure that
284      * all of the cookies waiting for a durable write is released so that
285      * we can continue bucket deletion. As part of bucket deletion one of
286      * the first things we do is to tear down the DCP streams so that
287      * the durable writes will never be notified and would be stuck
288      * waiting for a timeout if we don't explicitly release them.
289      */
290     void releaseRegisteredSyncWrites();
291 
292     /**
293      * Sets the vbucket or creates a vbucket with the desired state
294      *
295      * @param vbid vbucket id
296      * @param state desired state of the vbucket
297      * @param meta optional meta information to apply alongside the state
298      * @param transfer indicates that the vbucket is transferred to the active
299      *                 post a failover and/or rebalance
300      * @param cookie under certain conditions we may use ewouldblock
301      *
302      * @return status of the operation
303      */
304     ENGINE_ERROR_CODE setVBucketState(Vbid vbid,
305                                       vbucket_state_t state,
306                                       const nlohmann::json& meta = {},
307                                       TransferVB transfer = TransferVB::No,
308                                       const void* cookie = nullptr);
309 
310     /**
311      * Sets the vbucket to the desired state
312      *
313      * @param vb shared_ptr to the vbucket to set the state on
314      * @param state desired state for the vbucket
315      * @param meta optional meta information to apply alongside the state
316      * @param transfer indicates that the vbucket is transferred to the active
317      *                 post a failover and/or rebalance
318      * @param notify_dcp indicates whether we must consider closing DCP streams
319      *                    associated with the vbucket
320      * @param vbset LockHolder acquiring the 'vbsetMutex' lock in the
321      *              EventuallyPersistentStore class
322      * @param vbStateLock WriterLockHolder of 'stateLock' in the vbucket
323      *                    class.
324      */
325     void setVBucketState_UNLOCKED(VBucketPtr& vb,
326                                   vbucket_state_t to,
327                                   const nlohmann::json& meta,
328                                   TransferVB transfer,
329                                   bool notify_dcp,
330                                   std::unique_lock<std::mutex>& vbset,
331                                   folly::SharedMutex::WriteHolder& vbStateLock);
332 
333     /**
334      * Creates the vbucket in the desired state
335      *
336      * @param vbid vbucket id
337      * @param state desired state of the vbucket
338      * @param meta optional meta information to apply alongside the state
339      * @param vbset LockHolder acquiring the 'vbsetMutex' lock in the
340      *              EventuallyPersistentStore class
341      *
342      * @return status of the operation
343      */
344     ENGINE_ERROR_CODE createVBucket_UNLOCKED(
345             Vbid vbid,
346             vbucket_state_t state,
347             const nlohmann::json& meta,
348             std::unique_lock<std::mutex>& vbset);
349     /**
350      * Returns the 'vbsetMutex'
351      */
getVbSetMutexLock()352     std::mutex& getVbSetMutexLock() {
353         return vbsetMutex;
354     }
355 
356     ENGINE_ERROR_CODE deleteVBucket(Vbid vbid, const void* c = NULL) override;
357 
358     ENGINE_ERROR_CODE checkForDBExistence(Vbid db_file_id) override;
359 
360     Vbid getDBFileId(const cb::mcbp::Request& req) override;
361 
362     bool resetVBucket(Vbid vbid) override;
363 
364     void visit(VBucketVisitor &visitor) override;
365 
366     size_t visitAsync(std::unique_ptr<PausableVBucketVisitor> visitor,
367                       const char* lbl,
368                       TaskId id,
369                       std::chrono::microseconds maxExpectedDuration) override;
370 
371     Position pauseResumeVisit(PauseResumeVBVisitor& visitor,
372                               Position& start_pos) override;
373 
374     Position startPosition() const override;
375 
376     Position endPosition() const override;
377 
378     const Flusher* getFlusher(uint16_t shardId) override;
379 
380     Warmup* getWarmup() const override;
381 
382     ENGINE_ERROR_CODE getKeyStats(const DocKey& key,
383                                   Vbid vbucket,
384                                   const void* cookie,
385                                   key_stats& kstats,
386                                   WantsDeleted wantsDeleted) override;
387 
388     std::string validateKey(const DocKey& key, Vbid vbucket, Item& diskItem) override;
389 
390     GetValue getLocked(const DocKey& key,
391                        Vbid vbucket,
392                        rel_time_t currentTime,
393                        uint32_t lockTimeout,
394                        const void* cookie) override ;
395 
396     ENGINE_ERROR_CODE unlockKey(const DocKey& key,
397                                 Vbid vbucket,
398                                 uint64_t cas,
399                                 rel_time_t currentTime,
400                                 const void* cookie) override;
401 
getRWUnderlying(Vbid vbId)402     KVStore* getRWUnderlying(Vbid vbId) override {
403         return vbMap.getShardByVbId(vbId)->getRWUnderlying();
404     }
405 
getRWUnderlyingByShard(size_t shardId)406     KVStore* getRWUnderlyingByShard(size_t shardId) override {
407         return vbMap.shards[shardId]->getRWUnderlying();
408     }
409 
getROUnderlyingByShard(size_t shardId)410     KVStore* getROUnderlyingByShard(size_t shardId) override {
411         return vbMap.shards[shardId]->getROUnderlying();
412     }
413 
getROUnderlying(Vbid vbId)414     KVStore* getROUnderlying(Vbid vbId) override {
415         return vbMap.getShardByVbId(vbId)->getROUnderlying();
416     }
417 
418     KVStoreRWRO takeRWRO(size_t shardId) override;
419 
420     void setRWRO(size_t shardId,
421                  std::unique_ptr<KVStore> rw,
422                  std::unique_ptr<KVStore> ro) override;
423 
424     cb::mcbp::Status evictKey(const DocKey& key,
425                               Vbid vbucket,
426                               const char** msg) override;
427 
428     /**
429      * Run the server-api pre-expiry hook against the Item - the function
430      * may (if the pre-expiry hook dictates) mutate the Item value so that
431      * xattrs and the value are removed. The method doesn't care for the Item
432      * state (i.e. isDeleted) and the callers should be passing expired/deleted
433      * items only.
434      *
435      * @param vb The vbucket it belongs to
436      * @param it A reference to the Item to run the hook against and possibly
437      *        mutate.
438      */
439     void runPreExpiryHook(VBucket& vb, Item& it);
440 
441     void deleteExpiredItem(Item& it, time_t startTime, ExpireBy source) override;
442     void deleteExpiredItems(std::list<Item>&, ExpireBy) override;
443 
444     /**
445      * Get the value for the Item
446      * If the value is already deleted no update occurs
447      * If a value can be retrieved then it is updated via setValue
448      * @param it reference to an Item which maybe updated
449      */
450     void getValue(Item& it);
451 
getStorageProperties()452     const StorageProperties getStorageProperties() const override {
453         KVStore* store  = vbMap.shards[0]->getROUnderlying();
454         return store->getStorageProperties();
455     }
456 
457     void scheduleVBStatePersist() override;
458 
459     void scheduleVBStatePersist(Vbid vbid) override;
460 
getVBuckets()461     const VBucketMap &getVBuckets() override {
462         return vbMap;
463     }
464 
getEPEngine()465     EventuallyPersistentEngine& getEPEngine() override {
466         return engine;
467     }
468 
getExpiryPagerSleeptime()469     size_t getExpiryPagerSleeptime() override {
470         LockHolder lh(expiryPager.mutex);
471         return expiryPager.sleeptime;
472     }
473 
getTransactionTimePerItem()474     size_t getTransactionTimePerItem() override {
475         return lastTransTimePerItem.load();
476     }
477 
478     void setBackfillMemoryThreshold(double threshold) override;
479 
480     void setExpiryPagerSleeptime(size_t val) override;
481     void setExpiryPagerTasktime(ssize_t val) override;
482     void enableExpiryPager() override;
483     void disableExpiryPager() override;
484 
485     /// Wake up the expiry pager (if enabled), scheduling it for immediate run.
486     void wakeUpExpiryPager();
487 
488     /// Wake up the item pager (if enabled), scheduling it for immediate run.
489     /// Currently this is used only during testing.
490     void wakeItemPager();
491     void enableItemPager();
492     void disableItemPager();
493 
494     /// Wake up the ItemFreqDecayer Task, scheduling it for immediate run.
495     void wakeItemFreqDecayerTask();
496 
497     void enableAccessScannerTask() override;
498     void disableAccessScannerTask() override;
499     void setAccessScannerSleeptime(size_t val, bool useStartTime) override;
500     void resetAccessScannerStartTime() override;
501 
resetAccessScannerTasktime()502     void resetAccessScannerTasktime() override {
503         accessScanner.lastTaskRuntime = std::chrono::steady_clock::now();
504     }
505 
506     void enableItemCompressor();
507 
508     void setAllBloomFilters(bool to) override;
509 
getBfiltersResidencyThreshold()510     float getBfiltersResidencyThreshold() override {
511         return bfilterResidencyThreshold;
512     }
513 
setBfiltersResidencyThreshold(float to)514     void setBfiltersResidencyThreshold(float to) override {
515         bfilterResidencyThreshold = to;
516     }
517 
518     bool isMetaDataResident(VBucketPtr &vb, const DocKey& key) override;
519 
520     void logQTime(TaskId taskType,
521                   const std::chrono::steady_clock::duration enqTime) override;
522 
523     void logRunTime(TaskId taskType,
524                     const std::chrono::steady_clock::duration runTime) override;
525 
updateCachedResidentRatio(size_t activePerc,size_t replicaPerc)526     void updateCachedResidentRatio(size_t activePerc, size_t replicaPerc) override {
527         cachedResidentRatio.activeRatio.store(activePerc);
528         cachedResidentRatio.replicaRatio.store(replicaPerc);
529     }
530 
531     bool isMemoryUsageTooHigh() override;
532 
533     /**
534      * Check the status of memory used and maybe begin to free memory if
535      * required.
536      *
537      * This checks if the bucket's mem_used has exceeded the high water mark.
538      */
539     void checkAndMaybeFreeMemory();
540 
541     void addKVStoreStats(const AddStatFn& add_stat,
542                          const void* cookie,
543                          const std::string& args) override;
544 
545     void addKVStoreTimingStats(const AddStatFn& add_stat,
546                                const void* cookie) override;
547 
548     bool getKVStoreStat(const char* name, size_t& value,
549                         KVSOption option) override;
550 
551     void resetUnderlyingStats() override;
552     KVStore *getOneROUnderlying() override;
553     KVStore *getOneRWUnderlying() override;
554 
getItemEvictionPolicy()555     EvictionPolicy getItemEvictionPolicy() const override {
556         return eviction_policy;
557     }
558 
559     TaskStatus rollback(Vbid vbid, uint64_t rollbackSeqno) override;
560 
561     void attemptToFreeMemory() override;
562 
563     void wakeUpCheckpointRemover() override;
564 
565     void runDefragmenterTask() override;
566 
567     void runItemFreqDecayerTask() override;
568 
569     bool runAccessScannerTask() override;
570 
571     void runVbStatePersistTask(Vbid vbid) override;
572 
setCompactionWriteQueueCap(size_t to)573     void setCompactionWriteQueueCap(size_t to) override {
574         compactionWriteQueueCap = to;
575     }
576 
setCompactionExpMemThreshold(size_t to)577     void setCompactionExpMemThreshold(size_t to) override {
578         compactionExpMemThreshold = static_cast<double>(to) / 100.0;
579     }
580 
581     bool compactionCanExpireItems() override;
582 
583     void setCursorDroppingLowerUpperThresholds(size_t maxSize) override;
584 
isAccessScannerEnabled()585     bool isAccessScannerEnabled() override {
586         LockHolder lh(accessScanner.mutex);
587         return accessScanner.enabled;
588     }
589 
isExpPagerEnabled()590     bool isExpPagerEnabled() override {
591         LockHolder lh(expiryPager.mutex);
592         return expiryPager.enabled;
593     }
594 
595     bool isWarmingUp() override;
596 
597     bool isWarmupOOMFailure() override;
598 
599     /**
600      * This method store the given cookie for later notification iff Warmup has
601      * yet to reach and complete the PopulateVBucketMap phase.
602      *
603      * @param cookie the callers cookie which might be stored for later
604      *        notification (see return value)
605      * @return true if the cookie was stored for later notification, false if
606      *         not.
607      */
608     bool maybeWaitForVBucketWarmup(const void* cookie) override;
609 
610     size_t getActiveResidentRatio() const override;
611 
612     size_t getReplicaResidentRatio() const override;
613 
614     ENGINE_ERROR_CODE forceMaxCas(Vbid vbucket, uint64_t cas) override;
615 
616     VBucketPtr makeVBucket(Vbid id,
617                            vbucket_state_t state,
618                            KVShard* shard,
619                            std::unique_ptr<FailoverTable> table,
620                            NewSeqnoCallback newSeqnoCb,
621                            std::unique_ptr<Collections::VB::Manifest> manifest,
622                            vbucket_state_t initState = vbucket_state_dead,
623                            int64_t lastSeqno = 0,
624                            uint64_t lastSnapStart = 0,
625                            uint64_t lastSnapEnd = 0,
626                            uint64_t purgeSeqno = 0,
627                            uint64_t maxCas = 0,
628                            int64_t hlcEpochSeqno = HlcCasSeqnoUninitialised,
629                            bool mightContainXattrs = false,
630                            const nlohmann::json& replicationTopology = {},
631                            uint64_t maxVisibleSeqno = 0) override = 0;
632 
633     /**
634      * Method to handle set_collections commands
635      * @param json a buffer containing a JSON manifest to apply to the bucket
636      */
637     cb::engine_error setCollections(cb::const_char_buffer json);
638 
639     /**
640      * Method to handle get_collections commands
641      * @return a pair with the status and JSON as a std::string
642      */
643     std::pair<cb::mcbp::Status, std::string> getCollections() const;
644 
645     /**
646      * Method to handle get_collection_id command
647      * @param path A path for scope.collection
648      * @return pair with error status and result if success
649      */
650     cb::EngineErrorGetCollectionIDResult getCollectionID(
651             cb::const_char_buffer path) const;
652 
653     /**
654      * Method to handle get_scope_id command
655      * @param path A path for scope
656      * @return pair with error status and result if success
657      */
658     cb::EngineErrorGetScopeIDResult getScopeID(
659             cb::const_char_buffer path) const;
660 
661     const Collections::Manager& getCollectionsManager() const;
662 
663     bool isXattrEnabled() const;
664 
665     void setXattrEnabled(bool value);
666 
667     /**
668      * Returns the replication throttle instance
669      *
670      * @return Ref to replication throttle
671      */
getReplicationThrottle()672     ReplicationThrottle& getReplicationThrottle() {
673         return *replicationThrottle;
674     }
675 
676     /// return the buckets maxTtl value
677     std::chrono::seconds getMaxTtl() const;
678 
679     /// set the buckets maxTtl
680     void setMaxTtl(size_t max);
681 
682     /**
683      * Set the Bucket Minimum Durability Level to the given level.
684      *
685      * @param level
686      * @return success if the operation succeeds, an error code otherwise
687      */
688     ENGINE_ERROR_CODE setMinDurabilityLevel(cb::durability::Level level);
689 
690     cb::durability::Level getMinDurabilityLevel() const;
691 
692 protected:
693     GetValue getInternal(const DocKey& key,
694                          Vbid vbucket,
695                          const void* cookie,
696                          ForGetReplicaOp getReplicaItem,
697                          get_options_t options) override;
698 
699     bool resetVBucket_UNLOCKED(LockedVBucketPtr& vb,
700                                std::unique_lock<std::mutex>& vbset);
701 
702     /* Notify flusher of a new seqno being added in the vbucket */
703     virtual void notifyFlusher(const Vbid vbid);
704 
705     /**
706      * Notify replication of a new seqno being added in the vbucket
707      *
708      * @param vbid vBucket ID
709      * @param bySeqno new high seqno
710      * @param syncWriteOnly is this a SyncWrite operation (we don't notify
711      *        producers that do not care about SyncWrites of prepares).
712      */
713     void notifyReplication(const Vbid vbid,
714                            const int64_t bySeqno,
715                            SyncWriteOperation syncWrite);
716 
717     /// Helper method from initialize() to setup the expiry pager
718     void initializeExpiryPager(Configuration& config);
719 
720     /**
721      * Check whether the ItemFreqDecayer Task is snoozed.  Currently only
722      * used for testing purposes.
723      */
isItemFreqDecayerTaskSnoozed()724     bool isItemFreqDecayerTaskSnoozed() const {
725         return (itemFreqDecayerTask->getState() == TASK_SNOOZED);
726     }
727 
728     /// Factory method to create a VBucket count visitor of the correct type.
729     virtual std::unique_ptr<VBucketCountVisitor> makeVBCountVisitor(
730             vbucket_state_t state);
731 
732     /**
733      * Helper method used by getAggregatedVBucketStats to output aggregated
734      * bucket stats.
735      */
736     virtual void appendAggregatedVBucketStats(VBucketCountVisitor& active,
737                                               VBucketCountVisitor& replica,
738                                               VBucketCountVisitor& pending,
739                                               VBucketCountVisitor& dead,
740                                               const void* cookie,
741                                               const AddStatFn& add_stat);
742 
743     /**
744      * Returns the callback function to be invoked when SyncWrite(s) have been
745      * resolved for the given vBucket and are awaiting Completion (Commit /
746      * Abort). Used by makeVBucket().
747      */
748     SyncWriteResolvedCallback makeSyncWriteResolvedCB();
749 
750     /**
751      * Returns the callback function to be invoked when a SyncWrite has been
752      * completed. Used by makeVBucket().
753      */
754     SyncWriteCompleteCallback makeSyncWriteCompleteCB();
755 
756     /**
757      * Returns the callback function to be invoked at Replica for sending a
758      * SeqnoAck to the Active.
759      */
760     SeqnoAckCallback makeSeqnoAckCB() const;
761 
762     /**
763      * Chech if the given level is a valid Bucket Durability Level for this
764      * Bucket.
765      *
766      * @param level
767      * @return
768      */
769     virtual bool isValidBucketDurabilityLevel(
770             cb::durability::Level level) const = 0;
771 
772     friend class Warmup;
773     friend class PersistenceCallback;
774 
775     EventuallyPersistentEngine     &engine;
776     EPStats                        &stats;
777     VBucketMap                      vbMap;
778     ExTask itemPagerTask;
779     ExTask                          chkTask;
780     float                           bfilterResidencyThreshold;
781     ExTask                          defragmenterTask;
782     ExTask itemCompressorTask;
783     // The itemFreqDecayerTask is used to decay the frequency count of items
784     // stored in the hash table.  This is required to ensure that all the
785     // frequency counts do not become saturated.
786     ExTask itemFreqDecayerTask;
787     size_t                          compactionWriteQueueCap;
788     float                           compactionExpMemThreshold;
789 
790     // Responsible for enforcing the Durability Timeout for the SyncWrites
791     // tracked in this KVBucket.
792     ExTask durabilityTimeoutTask;
793 
794     /// Responsible for completing (commiting or aborting SyncWrites which have
795     /// completed in this KVBucket.
796     std::shared_ptr<DurabilityCompletionTask> durabilityCompletionTask;
797 
798     /* Vector of mutexes for each vbucket
799      * Used by flush operations: flushVB, deleteVB, compactVB, snapshotVB */
800     std::vector<std::mutex>       vb_mutexes;
801     std::deque<MutationLog>       accessLog;
802 
803     std::mutex vbsetMutex;
804     double backfillMemoryThreshold;
805     struct ExpiryPagerDelta {
ExpiryPagerDeltaExpiryPagerDelta806         ExpiryPagerDelta() : sleeptime(0), task(0), enabled(true) {}
807         std::mutex mutex;
808         size_t sleeptime;
809         size_t task;
810         bool enabled;
811     } expiryPager;
812     struct ALogTask {
ALogTaskALogTask813         ALogTask()
814             : sleeptime(0),
815               task(0),
816               lastTaskRuntime(std::chrono::steady_clock::now()),
817               enabled(true) {
818         }
819         std::mutex mutex;
820         size_t sleeptime;
821         size_t task;
822         std::chrono::steady_clock::time_point lastTaskRuntime;
823         bool enabled;
824     } accessScanner;
825     struct ResidentRatio {
826         std::atomic<size_t> activeRatio;
827         std::atomic<size_t> replicaRatio;
828     } cachedResidentRatio;
829     size_t statsSnapshotTaskId;
830     std::atomic<size_t> lastTransTimePerItem;
831     EvictionPolicy eviction_policy;
832 
833     std::mutex compactionLock;
834     std::list<CompTaskEntry> compactionTasks;
835 
836     std::unique_ptr<Collections::Manager> collectionsManager;
837 
838     /**
839      * Status of XATTR support for this bucket - this is set from the
840      * bucket config and also via set_flush_param. Written/read by differing
841      * threads.
842      */
843     cb::RelaxedAtomic<bool> xattrEnabled;
844 
845     /* Contains info about throttling the replication */
846     std::unique_ptr<ReplicationThrottle> replicationThrottle;
847 
848     std::atomic<size_t> maxTtl;
849 
850     /**
851      * Allows us to override the random function.  This is used for testing
852      * purposes where we want a constant number as opposed to a random one.
853      */
854     std::function<long()> getRandom = std::rand;
855 
856     /**
857      * The Minimum Durability Level enforced by this Bucket:
858      *
859      *  - if Level::None, then just use the Level provided in the SW's dur-reqs,
860      *   if any (Level::None actually disable the bucket durability level)
861      *  - if lower than the SW's level, again just enforce the SW's dur-reqs
862      *  - if higher than the SW's level, then up-level the SW
863      *
864      * Requires synchronization as it stores a dynamic configuration param.
865      */
866     std::atomic<cb::durability::Level> minDurabilityLevel;
867 
868     friend class KVBucketTest;
869 
870     DISALLOW_COPY_AND_ASSIGN(KVBucket);
871 };
872 
873 /**
874  * Callback for notifying clients of the Bucket (KVBucket) about a new item in
875  * the partition (Vbucket).
876  */
877 class NotifyNewSeqnoCB : public Callback<const Vbid, const VBNotifyCtx&> {
878 public:
NotifyNewSeqnoCB(KVBucket & kvb)879     NotifyNewSeqnoCB(KVBucket& kvb) : kvBucket(kvb) {
880     }
881 
callback(const Vbid & vbid,const VBNotifyCtx & notifyCtx)882     void callback(const Vbid& vbid, const VBNotifyCtx& notifyCtx) {
883         kvBucket.notifyNewSeqno(vbid, notifyCtx);
884     }
885 
886 private:
887     KVBucket& kvBucket;
888 };
889