1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #pragma once
19 
20 #include "config.h"
21 
22 #include "bloomfilter.h"
23 #include "checkpoint_config.h"
24 #include "collections/vbucket_manifest.h"
25 #include "dcp/dcp-types.h"
26 #include "hash_table.h"
27 #include "hlc.h"
28 #include "item_pager.h"
29 #include "kvstore.h"
30 #include "monotonic.h"
31 
32 #include <memcached/engine.h>
33 #include <platform/atomic_duration.h>
34 #include <platform/non_negative_counter.h>
35 #include <relaxed_atomic.h>
36 #include <atomic>
37 #include <queue>
38 
39 class EPStats;
40 class CheckpointManager;
41 class ConflictResolution;
42 class Configuration;
43 class ItemMetaData;
44 class PreLinkDocumentContext;
45 class EventuallyPersistentEngine;
46 class DCPBackfill;
47 class RollbackResult;
48 class VBucketBGFetchItem;
49 
50 /**
51  * The following will be used to identify
52  * the source of an item's expiration.
53  */
54 enum class ExpireBy { Pager, Compactor, Access };
55 
56 /* Structure that holds info needed for notification for an item being updated
57    in the vbucket */
58 struct VBNotifyCtx {
VBNotifyCtxVBNotifyCtx59     VBNotifyCtx() : bySeqno(0), notifyReplication(false), notifyFlusher(false) {
60     }
61     Monotonic<int64_t> bySeqno;
62     bool notifyReplication;
63     bool notifyFlusher;
64 };
65 
66 /**
67  * Structure that holds info needed to queue an item in chkpt or vb backfill
68  * queue
69  *
70  * GenerateDeleteTime - Only the queueing of items which are isDeleted() == true
71  * will this parameter have any affect. E.g. an add of an Item with this set to
72  * Yes will have no effect, whilst an explicit delete this parameter will have
73  * have an effect.
74  *
75  * Note that when queueing a delete with and expiry time of 0, the delete time
76  * is always generated. It is invalid to queue an Item with a 0 delete time,
77  * in this case the GenerateDeleteTime setting is ignored.
78  *
79  */
80 struct VBQueueItemCtx {
81     /**
82      * For code which needs to determine if a delete time should be generated
83      * that is for example del and set-with-meta, expiry, deletes this
84      * constructor allows for the GenerateDeleteTime to be set as Yes or No.
85      */
VBQueueItemCtxVBQueueItemCtx86     VBQueueItemCtx(GenerateBySeqno genBySeqno,
87                    GenerateCas genCas,
88                    GenerateDeleteTime generateDeleteTime,
89                    TrackCasDrift trackCasDrift,
90                    bool isBackfillItem,
91                    PreLinkDocumentContext* preLinkDocumentContext)
92         : genBySeqno(genBySeqno),
93           genCas(genCas),
94           generateDeleteTime(generateDeleteTime),
95           trackCasDrift(trackCasDrift),
96           isBackfillItem(isBackfillItem),
97           preLinkDocumentContext(preLinkDocumentContext) {
98     }
99 
100     /**
101      * For code which is queueing Item's that are not deleted and the generate
102      * delete time has no affect, this constructor should be used.
103      */
VBQueueItemCtxVBQueueItemCtx104     VBQueueItemCtx(GenerateBySeqno genBySeqno,
105                    GenerateCas genCas,
106                    TrackCasDrift trackCasDrift,
107                    bool isBackfillItem,
108                    PreLinkDocumentContext* preLinkDocumentContext)
109         : genBySeqno(genBySeqno),
110           genCas(genCas),
111           generateDeleteTime(GenerateDeleteTime::No),
112           trackCasDrift(trackCasDrift),
113           isBackfillItem(isBackfillItem),
114           preLinkDocumentContext(preLinkDocumentContext) {
115     }
116 
117     /* Indicates if we should queue an item or not. If this is false other
118        members should not be used */
119     GenerateBySeqno genBySeqno;
120     GenerateCas genCas;
121     GenerateDeleteTime generateDeleteTime;
122     TrackCasDrift trackCasDrift;
123     bool isBackfillItem;
124     PreLinkDocumentContext* preLinkDocumentContext;
125 };
126 
127 /**
128  * Structure that holds seqno based or checkpoint persistence based high
129  * priority requests to a vbucket
130  */
131 struct HighPriorityVBEntry {
HighPriorityVBEntryHighPriorityVBEntry132     HighPriorityVBEntry(const void* c,
133                         uint64_t idNum,
134                         HighPriorityVBNotify reqType)
135         : cookie(c), id(idNum), reqType(reqType), start(ProcessClock::now()) {
136     }
137 
138     const void* cookie;
139     uint64_t id;
140     HighPriorityVBNotify reqType;
141 
142     /* for stats (histogram) */
143     ProcessClock::time_point start;
144 };
145 
146 typedef std::unique_ptr<Callback<const uint16_t, const VBNotifyCtx&>>
147         NewSeqnoCallback;
148 
149 class EventuallyPersistentEngine;
150 class FailoverTable;
151 class KVShard;
152 class VBucketMemoryDeletionTask;
153 
154 /**
155  * An individual vbucket.
156  */
157 class VBucket : public std::enable_shared_from_this<VBucket> {
158 public:
159 
160     // Identifier for a vBucket
161     typedef uint16_t id_type;
162 
163     enum class GetKeyOnly {
164          Yes,
165          No
166      };
167 
168     VBucket(id_type i,
169             vbucket_state_t newState,
170             EPStats& st,
171             CheckpointConfig& chkConfig,
172             int64_t lastSeqno,
173             uint64_t lastSnapStart,
174             uint64_t lastSnapEnd,
175             std::unique_ptr<FailoverTable> table,
176             std::shared_ptr<Callback<id_type>> flusherCb,
177             std::unique_ptr<AbstractStoredValueFactory> valFact,
178             NewSeqnoCallback newSeqnoCb,
179             Configuration& config,
180             item_eviction_policy_t evictionPolicy,
181             vbucket_state_t initState = vbucket_state_dead,
182             uint64_t purgeSeqno = 0,
183             uint64_t maxCas = 0,
184             int64_t hlcEpochSeqno = HlcCasSeqnoUninitialised,
185             bool mightContainXattrs = false,
186             const std::string& collectionsManifest = "");
187 
188     virtual ~VBucket();
189 
190     int64_t getHighSeqno() const;
191 
192     size_t getChkMgrMemUsage() const;
193 
194     size_t getChkMgrMemUsageOfUnrefCheckpoints() const;
195 
196     size_t getChkMgrMemUsageOverhead() const;
197 
getPurgeSeqno() const198     uint64_t getPurgeSeqno() const {
199         return purge_seqno;
200     }
201 
setPurgeSeqno(uint64_t to)202     void setPurgeSeqno(uint64_t to) {
203         purge_seqno = to;
204     }
205 
setPersistedSnapshot(uint64_t start, uint64_t end)206     void setPersistedSnapshot(uint64_t start, uint64_t end) {
207         LockHolder lh(snapshotMutex);
208         persisted_snapshot_start = start;
209         persisted_snapshot_end = end;
210     }
211 
getPersistedSnapshot() const212     snapshot_range_t getPersistedSnapshot() const {
213         LockHolder lh(snapshotMutex);
214         return {persisted_snapshot_start, persisted_snapshot_end};
215     }
216 
getMaxCas() const217     uint64_t getMaxCas() const {
218         return hlc.getMaxHLC();
219     }
220 
setMaxCas(uint64_t cas)221     void setMaxCas(uint64_t cas) {
222         hlc.setMaxHLC(cas);
223     }
224 
setMaxCasAndTrackDrift(uint64_t cas)225     void setMaxCasAndTrackDrift(uint64_t cas) {
226         hlc.setMaxHLCAndTrackDrift(cas);
227     }
228 
forceMaxCas(uint64_t cas)229     void forceMaxCas(uint64_t cas) {
230         hlc.forceMaxHLC(cas);
231     }
232 
getHLCDriftStats() const233     HLC::DriftStats getHLCDriftStats() const {
234         return hlc.getDriftStats();
235     }
236 
getHLCDriftExceptionCounters() const237     HLC::DriftExceptions getHLCDriftExceptionCounters() const {
238         return hlc.getDriftExceptionCounters();
239     }
240 
setHLCDriftAheadThreshold(std::chrono::microseconds threshold)241     void setHLCDriftAheadThreshold(std::chrono::microseconds threshold) {
242         hlc.setDriftAheadThreshold(threshold);
243     }
244 
setHLCDriftBehindThreshold(std::chrono::microseconds threshold)245     void setHLCDriftBehindThreshold(std::chrono::microseconds threshold) {
246         hlc.setDriftBehindThreshold(threshold);
247     }
248 
249     /**
250      * @returns a seqno, documents with a seqno >= the returned value have a HLC
251      * generated CAS. Can return HlcCasSeqnoUninitialised if warmup has not
252      * established or nothing is persisted
253      */
getHLCEpochSeqno() const254     int64_t getHLCEpochSeqno() const {
255         return hlc.getEpochSeqno();
256     }
257 
258     /**
259      * Set the seqno to be used to establish if an item has a HLC generated CAS.
260      * @param seqno the value to store in the vbucket
261      * @throws if an attempt to set to < 0
262      */
setHLCEpochSeqno(int64_t seqno)263     void setHLCEpochSeqno(int64_t seqno) {
264         if (seqno < 0) {
265             throw std::invalid_argument("VBucket::setHLCEpochSeqno(" +
266                                         std::to_string(seqno) + ") seqno < 0 ");
267         }
268         hlc.setEpochSeqno(seqno);
269     }
270 
isTakeoverBackedUp()271     bool isTakeoverBackedUp() {
272         return takeover_backed_up.load();
273     }
274 
setTakeoverBackedUpState(bool to)275     void setTakeoverBackedUpState(bool to) {
276         bool inverse = !to;
277         takeover_backed_up.compare_exchange_strong(inverse, to);
278     }
279 
280     // States whether the VBucket is in the process of being created
isBucketCreation() const281     bool isBucketCreation() const {
282         return bucketCreation.load();
283     }
284 
setBucketCreation(bool rv)285     bool setBucketCreation(bool rv) {
286         bool inverse = !rv;
287         return bucketCreation.compare_exchange_strong(inverse, rv);
288     }
289 
290     /**
291      * @return true if the vbucket deletion is to be deferred to a background
292      *         task.
293      */
isDeletionDeferred() const294     bool isDeletionDeferred() const {
295         return deferredDeletion.load();
296     }
297 
298     /**
299      * @param value true if the vbucket's deletion should be deferred to a
300      *        background task. This is for VBucket objects created by
301      *        makeVBucket and owned by a VBucketPtr. If the VBucket was manually
302      *        created this will have no effect on deletion.
303      */
setDeferredDeletion(bool value)304     void setDeferredDeletion(bool value) {
305         deferredDeletion.store(true);
306     }
307 
308     /**
309      * @param A cookie to notify when the deferred deletion completes.
310      */
setDeferredDeletionCookie(const void* cookie)311     void setDeferredDeletionCookie(const void* cookie) {
312         deferredDeletionCookie = cookie;
313     }
314 
315     /**
316      * @return the cookie which could of been set when setupDeferredDeletion was
317      *         called.
318      */
getDeferredDeletionCookie() const319     const void* getDeferredDeletionCookie() const {
320         return deferredDeletionCookie;
321     }
322 
323     /**
324      * Setup deferred deletion, this is where deletion of the vbucket is
325      * deferred and completed by an AUXIO/NONIO task. AUXIO for EPVBucket
326      * as it will hit disk for the data file unlink, NONIO is used for
327      * EphemeralVBucket as only memory resources need freeing.
328      * @param cookie A cookie to notify when the deletion task completes.
329      */
330     virtual void setupDeferredDeletion(const void* cookie) = 0;
331 
332     // Returns the last persisted sequence number for the VBucket
333     virtual uint64_t getPersistenceSeqno() const = 0;
334 
335     /**
336      * Returns the sequence number to expose publically as the highest
337      * persisted seqno. Note this is may differ from getPersistenceSeqno,
338      * depending on the Bucket type.
339      *
340      * Historical note: This is the same as PersistenceSeqno for EP buckets,
341      * and hence before Spock wasn't a separate function; however for Ephemeral
342      * buckets we need to distinguish between what sequence number we report
343      * to external clients for Observe/persistTo, and what sequence number we
344      * report to internal DCP / ns_server for takeover:
345      *  a) Clients need 0 for the Ephemeral "persisted to" seqno (as
346      *     there isn't any Persistence and we can't claim something is on-disk
347      *     when it is not).
348      *  b) ns_server / replication needs a non-zero, "logically-persisted" seqno
349      *     from the replica to know that a vBucket is ready for takeover.
350      * As such, getPublicPersistenceSeqno() is used for (a), and
351      * getPersistenceSeqno() is used for (b).
352      */
353     virtual uint64_t getPublicPersistenceSeqno() const = 0;
354 
setPersistenceSeqno(uint64_t seqno)355     void setPersistenceSeqno(uint64_t seqno) {
356         persistenceSeqno.store(seqno);
357     }
358 
getId() const359     id_type getId() const { return id; }
getState(void) const360     vbucket_state_t getState(void) const { return state.load(); }
361 
362     /**
363      * Sets the vbucket state to a desired state
364      *
365      * @param to desired vbucket state
366      */
367     void setState(vbucket_state_t to);
368 
369     /**
370      * Sets the vbucket state to a desired state with the 'stateLock' already
371      * acquired
372      *
373      * @param to desired vbucket state
374      * @param vbStateLock write lock holder on 'stateLock'
375      */
376     void setState_UNLOCKED(vbucket_state_t to, WriterLockHolder& vbStateLock);
377 
getStateLock()378     cb::RWLock& getStateLock() {return stateLock;}
379 
getInitialState(void)380     vbucket_state_t getInitialState(void) { return initialState; }
381 
382     vbucket_state getVBucketState() const;
383 
384     /**
385      * This method performs operations on the stored value prior
386      * to expiring the item.
387      *
388      * @param v the stored value
389      */
390     void handlePreExpiry(const std::unique_lock<std::mutex>& hbl,
391                          StoredValue& v);
392 
393     bool addPendingOp(const void *cookie);
394 
395     void doStatsForQueueing(const Item& item, size_t itemBytes);
396     void doStatsForFlushing(const Item& item, size_t itemBytes);
397     void incrMetaDataDisk(const Item& qi);
398     void decrMetaDataDisk(const Item& qi);
399 
400     /// Increase the total count of items in this VBucket by 1.
401     virtual void incrNumTotalItems() = 0;
402 
403     /// Decrease the total count of items in this VBucket by 1.
404     virtual void decrNumTotalItems() = 0;
405 
406     /**
407      * Set the total count of items in this VBucket to the specified value.
408      */
409     virtual void setNumTotalItems(size_t items) = 0;
410 
411     /// Reset all statistics assocated with this vBucket.
412     virtual void resetStats();
413 
414     // Get age sum in millisecond
415     uint64_t getQueueAge();
416 
417     void fireAllOps(EventuallyPersistentEngine &engine);
418 
size(void)419     size_t size(void) {
420         HashTableDepthStatVisitor v;
421         ht.visitDepth(v);
422         return v.size;
423     }
424 
getBackfillSize()425     size_t getBackfillSize() {
426         LockHolder lh(backfill.mutex);
427         return backfill.items.size();
428     }
429 
430     /**
431      * Process an item from a DCP backfill.
432      * It puts it onto a queue for persistence and/or generates a seqno and
433      * updates stats
434      *
435      * @param qi item to be processed
436      * @param generateBySeqno indicates if a new seqno must generated or the
437      *                        seqno in the item must be used
438      *
439      *
440      */
441     virtual void queueBackfillItem(queued_item& qi,
442                                    const GenerateBySeqno generateBySeqno) = 0;
443 
444     /**
445      * Transfer any backfill items to the specified vector, up to the optional
446      * limit.
447      * @param items Destination for items transferred from backfill.
448      * @param limit If non-zero, limit the number of items transferred to the
449      * specified number.
450      * @return true if any more items remain in the backfill set - i.e. if
451      *         limit constrained the number of items.
452      */
453     bool getBackfillItems(std::vector<queued_item>& items, size_t limit = 0);
454 
455     struct ItemsToFlush {
456         std::vector<queued_item> items;
457         snapshot_range_t range{0, 0};
458         bool moreAvailable = false;
459     };
460 
461     /**
462      * Obtain the series of items to be flushed for this vBucket.
463      *
464      * @param vb VBucket to fetch items for.
465      * @param approxLimit Upper bound on how many items to fetch.
466      * @return The items to flush; along with their seqno range and
467      *         if more items are available for this vBucket (i.e. the
468      *         limit was reached).
469      */
470     ItemsToFlush getItemsToPersist(size_t approxLimit);
471 
isBackfillPhase()472     bool isBackfillPhase() {
473         return backfill.isBackfillPhase.load();
474     }
475 
setBackfillPhase(bool backfillPhase)476     void setBackfillPhase(bool backfillPhase) {
477         backfill.isBackfillPhase.store(backfillPhase);
478     }
479 
480     /**
481      * Returns the map of bgfetch items for this vbucket, clearing the
482      * pendingBGFetches.
483      */
484     virtual vb_bgfetch_queue_t getBGFetchItems() = 0;
485 
486     virtual bool hasPendingBGFetchItems() = 0;
487 
toString(vbucket_state_t s)488     static const char* toString(vbucket_state_t s) {
489         switch(s) {
490         case vbucket_state_active: return "active"; break;
491         case vbucket_state_replica: return "replica"; break;
492         case vbucket_state_pending: return "pending"; break;
493         case vbucket_state_dead: return "dead"; break;
494         }
495         return "unknown";
496     }
497 
fromString(const char* state)498     static vbucket_state_t fromString(const char* state) {
499         if (strcmp(state, "active") == 0) {
500             return vbucket_state_active;
501         } else if (strcmp(state, "replica") == 0) {
502             return vbucket_state_replica;
503         } else if (strcmp(state, "pending") == 0) {
504             return vbucket_state_pending;
505         } else {
506             return vbucket_state_dead;
507         }
508     }
509 
510     /**
511      * Checks and decides whether to add high priority request on the vbucket.
512      * This is an async request made by modules like ns-server during
513      * rebalance. The request is for a response from the vbucket when it
514      * 'sees' beyond a certain sequence number or when a certain checkpoint
515      * is persisted.
516      * Depending on the vbucket type, the meaning 'seeing' a sequence number
517      * changes. That is, it could mean persisted in case of EPVBucket and
518      * added to the sequenced data structure in case of EphemeralVBucket.
519      *
520      * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
521      * @param cookie cookie of conn to be notified
522      * @param reqType indicating request for seqno or chk persistence
523      *
524      * @return RequestScheduled if a high priority request is added and
525      *                          notification will be done asynchronously
526      *         NotSupported if the request is not supported for the reqType
527      *         RequestNotScheduled if a high priority request is NOT added (as
528      *                             it is not required). This implies there won't
529      *                             be a subsequent notification
530      */
531     virtual HighPriorityVBReqStatus checkAddHighPriorityVBEntry(
532             uint64_t seqnoOrChkId,
533             const void* cookie,
534             HighPriorityVBNotify reqType) = 0;
535 
536     /**
537      * Notify the high priority requests on the vbucket.
538      * This is the response to async requests made by modules like ns-server
539      * during rebalance.
540      *
541      * @param engine Ref to ep-engine
542      * @param id seqno or checkpoint id causing the notification(s).
543      * @param notifyType indicating notify for seqno or chk persistence
544      */
545     virtual void notifyHighPriorityRequests(
546             EventuallyPersistentEngine& engine,
547             uint64_t id,
548             HighPriorityVBNotify notifyType) = 0;
549 
550     virtual void notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) = 0;
551 
552     /**
553      * Get high priority notifications for a seqno or checkpoint persisted
554      *
555      * @param engine Ref to ep-engine
556      * @param id seqno or checkpoint id for which notifies are to be found
557      * @param notifyType indicating notify for seqno or chk persistence
558      *
559      * @return map of notifications with conn cookie as the key and notify
560      *         status as the value
561      */
562     std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifications(
563             EventuallyPersistentEngine& engine,
564             uint64_t idNum,
565             HighPriorityVBNotify notifyType);
566 
getHighPriorityChkSize()567     size_t getHighPriorityChkSize() {
568         return numHpVBReqs.load();
569     }
570 
571     /**
572      * BloomFilter operations for vbucket
573      */
574     void createFilter(size_t key_count, double probability);
575     void initTempFilter(size_t key_count, double probability);
576     void addToFilter(const DocKey& key);
577     virtual bool maybeKeyExistsInFilter(const DocKey& key);
578     bool isTempFilterAvailable();
579     void addToTempFilter(const DocKey& key);
580     void swapFilter();
581     void clearFilter();
582     void setFilterStatus(bfilter_status_t to);
583     std::string getFilterStatusString();
584     size_t getFilterSize();
585     size_t getNumOfKeysInFilter();
586 
nextHLCCas()587     uint64_t nextHLCCas() {
588         return hlc.nextHLC();
589     }
590 
591     // Applicable only for FULL EVICTION POLICY
592     bool isResidentRatioUnderThreshold(float threshold);
593 
594     /**
595      * Returns true if deleted items (aka tombstones) are always resident in
596      * memory (and hence we do not need to attempt a bgFetch if we try to
597      * access a deleted key which isn't found in memory).
598      */
599     virtual bool areDeletedItemsAlwaysResident() const = 0;
600 
601     virtual void addStats(bool details, ADD_STAT add_stat, const void* c) = 0;
602 
603     virtual KVShard* getShard() = 0;
604 
605     /**
606      * Returns the number of alive (non-deleted) Items the VBucket.
607      *
608      * Includes items which are not currently resident in memory (i.e. under
609      * Full eviction and have been fully evicted from memory).
610      * Does *not* include deleted items.
611      */
612     virtual size_t getNumItems() const = 0;
613 
614     virtual size_t getNumNonResidentItems() const = 0;
615 
getNumTempItems(void)616     size_t getNumTempItems(void) {
617         return ht.getNumTempItems();
618     }
619 
incrRollbackItemCount(uint64_t val)620     void incrRollbackItemCount(uint64_t val) {
621         rollbackItemCount.fetch_add(val, std::memory_order_relaxed);
622     }
623 
getRollbackItemCount(void)624     uint64_t getRollbackItemCount(void) {
625         return rollbackItemCount.load(std::memory_order_relaxed);
626     }
627 
628     // Return the persistence checkpoint ID
629     uint64_t getPersistenceCheckpointId() const;
630 
631     // Set the persistence checkpoint ID to the given value.
632     void setPersistenceCheckpointId(uint64_t checkpointId);
633 
634     // Mark the value associated with the given key as dirty
635     void markDirty(const DocKey& key);
636 
637     /**
638      * Obtain the read handle for the collections manifest.
639      * The caller will have read-only access to manifest using the methods
640      * exposed by the ReadHandle
641      */
lockCollections() const642     Collections::VB::Manifest::ReadHandle lockCollections() const {
643         return manifest.lock();
644     }
645 
646     /**
647      * Obtain a caching read handle for the collections manifest.
648      * The returned handle will lookup the collection associated with key
649      * and cache the internal iterator so that future usage of
650      * isLogicallyDeleted doesn't need to re-scan and lookup. This is different
651      * to a plain ReadHandle which provides more functionality (more methods
652      * for the caller), but may result in extra lookups and key-scans.
653      * @param key A key to use for constructing the read handle.
654      * @return a CachingReadHandle which the caller should test is valid with
655      *         CachingReadHandle::valid
656      */
lockCollections( const DocKey& key) const657     Collections::VB::Manifest::CachingReadHandle lockCollections(
658             const DocKey& key) const {
659         return manifest.lock(key);
660     }
661 
lockCollections( const DocKey& key, const std::string& separator) const662     Collections::VB::Manifest::CachingReadHandle lockCollections(
663             const DocKey& key, const std::string& separator) const {
664         return manifest.lock(key, separator);
665     }
666 
667     /**
668      * Update the Collections::VB::Manifest and the VBucket.
669      * Adds SystemEvents for the create and delete of collections into the
670      * checkpoint.
671      *
672      * @param m A Collections::Manifest to apply to the VB::Manifest
673      */
updateFromManifest(const Collections::Manifest& m)674     void updateFromManifest(const Collections::Manifest& m) {
675         manifest.wlock().update(*this, m);
676     }
677 
678     /**
679      * Finalise the deletion of a collection (no items remain in the collection)
680      *
681      * @param collection Name of the collection that has completed deleting
682      */
completeDeletion(cb::const_char_buffer collection)683     void completeDeletion(cb::const_char_buffer collection) {
684         manifest.wlock().completeDeletion(*this, collection);
685     }
686 
687     /**
688      * Add a collection to this vbucket with a pre-assigned seqno. I.e.
689      * this VB is a replica.
690      *
691      * @param manifestUid the uid of the manifest which made the change
692      * @param identifier Identifier for the collection to add.
693      * @param bySeqno The seqno assigned to the collection create event.
694      */
replicaAddCollection(Collections::uid_t manifestUid, Collections::Identifier identifier, int64_t bySeqno)695     void replicaAddCollection(Collections::uid_t manifestUid,
696                               Collections::Identifier identifier,
697                               int64_t bySeqno) {
698         manifest.wlock().replicaAdd(*this, manifestUid, identifier, bySeqno);
699     }
700 
701     /**
702      * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
703      * this VB is a replica.
704      *
705      * @param manifestUid the uid of the manifest which made the change
706      * @param identifier Identifier for the collection to begin deleting.
707      * @param bySeqno The seqno assigned to the collection delete event.
708      */
replicaBeginDeleteCollection(Collections::uid_t manifestUid, Collections::Identifier identifier, int64_t bySeqno)709     void replicaBeginDeleteCollection(Collections::uid_t manifestUid,
710                                       Collections::Identifier identifier,
711                                       int64_t bySeqno) {
712         manifest.wlock().replicaBeginDelete(
713                 *this, manifestUid, identifier, bySeqno);
714     }
715 
716     /**
717      * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
718      * this VB is a replica.
719      *
720      * @param manifestUid the uid of the manifest which made the change
721      * @param separator The new separator.
722      * @param bySeqno The seqno assigned to the change separator event.
723      */
replicaChangeCollectionSeparator(Collections::uid_t manifestUid, cb::const_char_buffer separator, int64_t bySeqno)724     void replicaChangeCollectionSeparator(Collections::uid_t manifestUid,
725                                           cb::const_char_buffer separator,
726                                           int64_t bySeqno) {
727         manifest.wlock().replicaChangeSeparator(
728                 *this, manifestUid, separator, bySeqno);
729     }
730 
731     /**
732      * Get the collection manifest
733      *
734      * @return const reference to the manifest
735      */
getManifest() const736     const Collections::VB::Manifest& getManifest() const {
737         return manifest;
738     }
739 
740     static const vbucket_state_t ACTIVE;
741     static const vbucket_state_t REPLICA;
742     static const vbucket_state_t PENDING;
743     static const vbucket_state_t DEAD;
744 
745     HashTable         ht;
746 
747     /// Manager of this vBucket's checkpoints. unique_ptr for pimpl.
748     std::unique_ptr<CheckpointManager> checkpointManager;
749 
750     // Struct for managing 'backfill' items - Items which have been added by
751     // an incoming DCP stream and need to be persisted to disk.
752     struct {
753         std::mutex mutex;
754         std::queue<queued_item> items;
755         std::atomic<bool> isBackfillPhase;
756     } backfill;
757 
758     /**
759      * Gets the valid StoredValue for the key and deletes an expired item if
760      * desired by the caller. Requires the hash bucket to be locked
761      *
762      * @param hbl Reference to the hash bucket lock
763      * @param key
764      * @param wantsDeleted
765      * @param trackReference
766      * @param queueExpired Delete an expired item
767      */
768     StoredValue* fetchValidValue(HashTable::HashBucketLock& hbl,
769                                  const DocKey& key,
770                                  WantsDeleted wantsDeleted,
771                                  TrackReference trackReference,
772                                  QueueExpired queueExpired);
773 
774     /**
775      * Complete the background fetch for the specified item. Depending on the
776      * state of the item, restore it to the hashtable as appropriate,
777      * potentially queuing it as dirty.
778      *
779      * @param key The key of the item
780      * @param fetched_item The item which has been fetched.
781      * @param startTime The time processing of the batch of items started.
782      *
783      * @return ENGINE_ERROR_CODE status notified to be to the front end
784      */
785     virtual ENGINE_ERROR_CODE completeBGFetchForSingleItem(
786             const DocKey& key,
787             const VBucketBGFetchItem& fetched_item,
788             const ProcessClock::time_point startTime) = 0;
789 
790     /**
791      * Retrieve an item from the disk for vkey stats
792      *
793      * @param key the key to fetch
794      * @param cookie the connection cookie
795      * @param eviction_policy The eviction policy
796      * @param engine Reference to ep engine
797      * @param bgFetchDelay
798      *
799      * @return VBReturnCtx indicates notifyCtx and operation result
800      */
801     virtual ENGINE_ERROR_CODE statsVKey(const DocKey& key,
802                                         const void* cookie,
803                                         EventuallyPersistentEngine& engine,
804                                         int bgFetchDelay) = 0;
805 
806     /**
807      * Complete the vkey stats for an item background fetched from disk.
808      *
809      * @param key The key of the item
810      * @param gcb Bgfetch cbk obj containing the item from disk
811      *
812      */
813     virtual void completeStatsVKey(const DocKey& key, const GetValue& gcb) = 0;
814 
815     /**
816      * Set (add new or update) an item into in-memory structure like
817      * hash table and do not generate a seqno. This is called internally from
818      * ep-engine when we want to update our in-memory data (like in HT) with
819      * another source of truth like disk.
820      * Currently called during rollback.
821      *
822      * @param itm Item to be added or updated. Upon success, the itm
823      *            revSeqno are updated
824      *
825      * @return Result indicating the status of the operation
826      */
827     MutationStatus setFromInternal(Item& itm);
828 
829     /**
830      * Set (add new or update) an item in the vbucket.
831      *
832      * @param itm Item to be added or updated. Upon success, the itm
833      *            bySeqno, cas and revSeqno are updated
834      * @param cookie the connection cookie
835      * @param engine Reference to ep engine
836      * @param bgFetchDelay
837      * @param predicate a function to call which if returns true, the set will
838      *        succeed. The function is called against any existing item.
839      *
840      * @return ENGINE_ERROR_CODE status notified to be to the front end
841      */
842     ENGINE_ERROR_CODE set(Item& itm,
843                           const void* cookie,
844                           EventuallyPersistentEngine& engine,
845                           int bgFetchDelay,
846                           cb::StoreIfPredicate predicate);
847 
848     /**
849      * Replace (overwrite existing) an item in the vbucket.
850      *
851      * @param itm Item to be added or updated. Upon success, the itm
852      *            bySeqno, cas and revSeqno are updated
853      * @param cookie the connection cookie
854      * @param engine Reference to ep engine
855      * @param bgFetchDelay
856      * @param predicate a function to call which if returns true, the replace
857      *        will succeed. The function is called against any existing item.
858      * @param readHandle Reader access to the Item's collection data.
859      *
860      * @return ENGINE_ERROR_CODE status notified to be to the front end
861      */
862     ENGINE_ERROR_CODE replace(
863             Item& itm,
864             const void* cookie,
865             EventuallyPersistentEngine& engine,
866             int bgFetchDelay,
867             cb::StoreIfPredicate predicate,
868             const Collections::VB::Manifest::CachingReadHandle& readHandle);
869 
870     /**
871      * Add an item directly into its vbucket rather than putting it on a
872      * checkpoint (backfill the item). The can happen during DCP or when a
873      * replica vbucket is receiving backfill items from active vbucket.
874      *
875      * @param itm Item to be added/updated from DCP backfill. Upon
876      *            success, the itm revSeqno is updated
877      * @param genBySeqno whether or not to generate sequence number
878      *
879      * @return the result of the operation
880      */
881     ENGINE_ERROR_CODE addBackfillItem(Item& itm, GenerateBySeqno genBySeqno);
882 
883     /**
884      * Set an item in the store from a non-front end operation (DCP, XDCR)
885      *
886      * @param item the item to set. Upon success, the itm revSeqno is updated
887      * @param cas value to match
888      * @param seqno sequence number of mutation
889      * @param cookie the cookie representing the client to store the item
890      * @param engine Reference to ep engine
891      * @param bgFetchDelay
892      * @param checkConflicts should conflict resolution be done?
893      * @param allowExisting set to false if you want set to fail if the
894      *                      item exists already
895      * @param genBySeqno whether or not to generate sequence number
896      * @param genCas
897      * @param isReplication set to true if we are to use replication
898      *                      throttle threshold
899      * @param readHandle Reader access to the Item's collection data.
900      *
901      * @return the result of the store operation
902      */
903     ENGINE_ERROR_CODE setWithMeta(
904             Item& itm,
905             uint64_t cas,
906             uint64_t* seqno,
907             const void* cookie,
908             EventuallyPersistentEngine& engine,
909             int bgFetchDelay,
910             CheckConflicts checkConflicts,
911             bool allowExisting,
912             GenerateBySeqno genBySeqno,
913             GenerateCas genCas,
914             bool isReplication,
915             const Collections::VB::Manifest::CachingReadHandle& readHandle);
916 
917     /**
918      * Delete an item in the vbucket
919      *
920      * @param[in,out] cas value to match; new cas after logical delete
921      * @param cookie the cookie representing the client to store the item
922      * @param engine Reference to ep engine
923      * @param bgFetchDelay
924      * @param[out] itemMeta pointer to item meta data that needs to be returned
925      *                      as a result the delete. A NULL pointer indicates
926      *                      that no meta data needs to be returned.
927      * @param[out] mutInfo Info to uniquely identify (and order) the delete
928      *                     seq. A NULL pointer indicates no info needs to be
929      *                     returned.
930      * @param readHandle Reader access to the affected key's collection data.
931      *
932      * @return the result of the operation
933      */
934     ENGINE_ERROR_CODE deleteItem(
935             uint64_t& cas,
936             const void* cookie,
937             EventuallyPersistentEngine& engine,
938             int bgFetchDelay,
939             ItemMetaData* itemMeta,
940             mutation_descr_t& mutInfo,
941             const Collections::VB::Manifest::CachingReadHandle& readHandle);
942 
943     /**
944      * Delete an item in the vbucket from a non-front end operation (DCP, XDCR)
945      *
946      * @param[in, out] cas value to match; new cas after logical delete
947      * @param[out] seqno Pointer to get the seqno generated for the item. A
948      *                   NULL value is passed if not needed
949      * @param cookie the cookie representing the client to store the item
950      * @param engine Reference to ep engine
951      * @param bgFetchDelay
952      * @param checkConflicts should conflict resolution be done?
953      * @param itemMeta ref to item meta data
954      * @param backfill indicates if the item must be put onto vb queue or
955      *                 onto checkpoint
956      * @param genBySeqno whether or not to generate sequence number
957      * @param generateCas whether or not to generate cas
958      * @param bySeqno seqno of the key being deleted
959      * @param isReplication set to true if we are to use replication
960      *                      throttle threshold
961      * @param readHandle Reader access to the key's collection data.
962      *
963      * @return the result of the operation
964      */
965     ENGINE_ERROR_CODE deleteWithMeta(
966             uint64_t& cas,
967             uint64_t* seqno,
968             const void* cookie,
969             EventuallyPersistentEngine& engine,
970             int bgFetchDelay,
971             CheckConflicts checkConflicts,
972             const ItemMetaData& itemMeta,
973             bool backfill,
974             GenerateBySeqno genBySeqno,
975             GenerateCas generateCas,
976             uint64_t bySeqno,
977             bool isReplication,
978             const Collections::VB::Manifest::CachingReadHandle& readHandle);
979 
980     /**
981      * Delete an expired item
982      *
983      * @param it item to be deleted
984      * @param startTime the time to be compared with this item's expiry time
985      * @param source Expiry source
986      */
987     void deleteExpiredItem(const Item& it,
988                            time_t startTime,
989                            ExpireBy source);
990 
991     /**
992      * Evict a key from memory.
993      *
994      * @param key Key to evict
995      * @param[out] msg Updated to point to a string (with static duration)
996      *                 describing the result of the operation.
997      *
998      * @return SUCCESS if key was successfully evicted (or was already
999      *                 evicted), or the reason why the request failed.
1000      *
1001      */
1002     virtual protocol_binary_response_status evictKey(const DocKey& key,
1003                                                      const char** msg) = 0;
1004 
1005     /**
1006      * Page out a StoredValue from memory.
1007      *
1008      * The definition of "page out" is up to the underlying VBucket
1009      * implementation - this may mean simply ejecting the value from memory
1010      * (Value Eviction), removing the entire document from memory (Full Eviction),
1011      * or actually deleting the document (Ephemeral Buckets).
1012      *
1013      * @param lh Bucket lock associated with the StoredValue.
1014      * @param v[in, out] Ref to the StoredValue to be ejected. Based on the
1015      *                   VBucket type, policy in the vbucket contents of v and
1016      *                   v itself may be changed
1017      *
1018      * @return true if an item is ejected.
1019      */
1020     virtual bool pageOut(const HashTable::HashBucketLock& lh,
1021                          StoredValue*& v) = 0;
1022 
1023     /*
1024      * Check to see if a StoredValue is eligible to be paged out of memory.
1025      *
1026      * @param lh Bucket lock associated with the StoredValue.
1027      * @param v Reference to the StoredValue to be ejected.
1028      *
1029      * @return true if the StoredValue is eligible to be paged out.
1030      *
1031      */
1032     virtual bool eligibleToPageOut(const HashTable::HashBucketLock& lh,
1033                                    const StoredValue& v) const = 0;
1034 
1035     /**
1036      * Add an item in the store
1037      *
1038      * @param itm the item to add. On success, this will have its seqno and
1039      *            CAS updated.
1040      * @param cookie the cookie representing the client to store the item
1041      * @param engine Reference to ep engine
1042      * @param bgFetchDelay
1043      * @param readHandle Reader access to the Item's collection data.
1044      *
1045      * @return the result of the operation
1046      */
1047     ENGINE_ERROR_CODE add(
1048             Item& itm,
1049             const void* cookie,
1050             EventuallyPersistentEngine& engine,
1051             int bgFetchDelay,
1052             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1053 
1054     /**
1055      * Retrieve a value, but update its TTL first
1056      *
1057      * @param cookie the connection cookie
1058      * @param engine Reference to ep engine
1059      * @param bgFetchDelay
1060      * @param exptime the new expiry time for the object
1061      * @param readHandle Reader access to the key's collection data.
1062      *
1063      * @return a GetValue representing the result of the request
1064      */
1065     GetValue getAndUpdateTtl(
1066             const void* cookie,
1067             EventuallyPersistentEngine& engine,
1068             int bgFetchDelay,
1069             time_t exptime,
1070             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1071     /**
1072      * Queue an Item to the checkpoint and return its seqno
1073      *
1074      * @param item an Item object to queue, can be any kind of item and will be
1075      *        given a CAS and seqno by this function.
1076      * @param seqno An optional sequence number, if not specified checkpoint
1077      *        queueing will assign a seqno to the Item.
1078      */
1079     int64_t queueItem(Item* item, OptionalSeqno seqno);
1080 
1081     /**
1082      * Get metadata and value for a given key
1083      *
1084      * @param cookie the cookie representing the client
1085      * @param engine Reference to ep engine
1086      * @param bgFetchDelay
1087      * @param options flags indicating some retrieval related info
1088      * @param diskFlushAll
1089      * @param getKeyOnly if GetKeyOnly::Yes we want only the key
1090      * @param readHandle Reader access to the requested key's collection data.
1091      *
1092      * @return the result of the operation
1093      */
1094     GetValue getInternal(
1095             const void* cookie,
1096             EventuallyPersistentEngine& engine,
1097             int bgFetchDelay,
1098             get_options_t options,
1099             bool diskFlushAll,
1100             GetKeyOnly getKeyOnly,
1101             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1102 
1103     /**
1104      * Retrieve the meta data for given key
1105      *
1106      * @param cookie the connection cookie
1107      * @param engine Reference to ep engine
1108      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1109      * @param readHandle Reader access to the key's collection data.
1110      * @param[out] metadata meta information returned to the caller
1111      * @param[out] deleted specifies the caller whether or not the key is
1112      *                     deleted
1113      * @param[out] datatype specifies the datatype of the item
1114      *
1115      * @return the result of the operation
1116      */
1117     ENGINE_ERROR_CODE getMetaData(
1118             const void* cookie,
1119             EventuallyPersistentEngine& engine,
1120             int bgFetchDelay,
1121             const Collections::VB::Manifest::CachingReadHandle& readHandle,
1122             ItemMetaData& metadata,
1123             uint32_t& deleted,
1124             uint8_t& datatype);
1125 
1126     /**
1127      * Looks up the key stats for the given {vbucket, key}.
1128      *
1129      * @param cookie The client's cookie
1130      * @param engine Reference to ep engine
1131      * @param bgFetchDelay
1132      * @param[out] kstats On success the keystats for this item.
1133      * @param wantsDeleted If yes then return keystats even if the item is
1134      *                     marked as deleted. If no then will return
1135      *                     ENGINE_KEY_ENOENT for deleted items.
1136      * @param readHandle Reader access to the key's collection data.
1137      *
1138      * @return the result of the operation
1139      */
1140     ENGINE_ERROR_CODE getKeyStats(
1141             const void* cookie,
1142             EventuallyPersistentEngine& engine,
1143             int bgFetchDelay,
1144             struct key_stats& kstats,
1145             WantsDeleted wantsDeleted,
1146             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1147 
1148     /**
1149      * Gets a locked item for a given key.
1150      *
1151      * @param currentTime Current time to use for locking the item for a
1152      *                    duration of lockTimeout
1153      * @param lockTimeout Timeout for the lock on the item
1154      * @param cookie The client's cookie
1155      * @param engine Reference to ep engine
1156      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1157      * @param readHandle Reader access to the key's collection data.
1158      *
1159      * @return the result of the operation (contains locked item on success)
1160      */
1161     GetValue getLocked(
1162             rel_time_t currentTime,
1163             uint32_t lockTimeout,
1164             const void* cookie,
1165             EventuallyPersistentEngine& engine,
1166             int bgFetchDelay,
1167             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1168     /**
1169      * Update in memory data structures after an item is deleted on disk
1170      *
1171      * @param queuedItem reference to the deleted item
1172      * @param deleted indicates if item actaully deleted or not (in case item
1173      *                did not exist on disk)
1174      */
1175     void deletedOnDiskCbk(const Item& queuedItem, bool deleted);
1176 
1177     /**
1178      * Update in memory data structures after a rollback on disk
1179      *
1180      * @param queuedItem item key
1181      *
1182      * @return indicates if the operation is succcessful
1183      */
1184     bool deleteKey(const DocKey& key);
1185 
1186     /**
1187      * Creates a DCP backfill object
1188      *
1189      * @param e ref to EventuallyPersistentEngine
1190      * @param stream Shared ptr to the stream for which this backfill obj is
1191      *               created
1192      * @param startSeqno requested start sequence number of the backfill
1193      * @param endSeqno requested end sequence number of the backfill
1194      *
1195      * @return pointer to the backfill object created. Caller to own this
1196      *         object and hence must handle deletion.
1197      */
1198     virtual std::unique_ptr<DCPBackfill> createDCPBackfill(
1199             EventuallyPersistentEngine& e,
1200             std::shared_ptr<ActiveStream> stream,
1201             uint64_t startSeqno,
1202             uint64_t endSeqno) = 0;
1203 
1204     /**
1205      * Update failovers, checkpoint mgr and other vBucket members after
1206      * rollback.
1207      *
1208      * @param rollbackResult contains high seqno of the vBucket after rollback,
1209      *                       snapshot start seqno of the last snapshot in the
1210      *                       vBucket after the rollback,
1211      *                       snapshot end seqno of the last snapshot in the
1212      *                       vBucket after the rollback
1213      * @param prevHighSeqno high seqno before the rollback
1214      */
1215     void postProcessRollback(const RollbackResult& rollbackResult,
1216                              uint64_t prevHighSeqno);
1217 
1218     /**
1219      * Debug - print a textual description of the VBucket to stderr.
1220      */
1221     virtual void dump() const;
1222 
1223     /**
1224      * Sets the callback function to invoke when a frequency counter becomes
1225      * saturated.  The callback function is to invoke the ItemFreqDecayer
1226      * task.
1227      *
1228      * @param callbackFunction - the function to callback.
1229      */
1230     void setFreqSaturatedCallback(std::function<void()> callbackFunction);
1231 
1232     /**
1233      * Returns the number of deletes in the memory
1234      *
1235      * @return number of deletes
1236      */
getNumInMemoryDeletes() const1237     size_t getNumInMemoryDeletes() const {
1238         /* couchbase vbuckets: this is generally (after deletes are persisted)
1239                                zero as hash table doesn't keep deletes after
1240                                they are persisted.
1241            ephemeral vbuckets: we keep deletes in both hash table and ordered
1242                                data structure. */
1243         return ht.getNumDeletedItems();
1244     }
1245 
1246     /**
1247      * Set that this VBucket might store documents with xattrs.
1248      * Persistent vbucket will flush this to disk.
1249      */
setMightContainXattrs()1250     void setMightContainXattrs() {
1251         mayContainXattrs.store(true);
1252     }
1253 
mightContainXattrs() const1254     bool mightContainXattrs() const {
1255         return mayContainXattrs.load();
1256     }
1257 
getInfo() const1258     cb::vbucket_info getInfo() const {
1259         return {mightContainXattrs()};
1260     }
1261 
1262     /**
1263      * If the key@bySeqno is found, remove it from the hash table
1264      *
1265      * @param key The key to look for
1266      * @param bySeqno The seqno of the key to remove
1267      */
1268     void removeKey(const DocKey& key, int64_t bySeqno);
1269 
1270     static std::chrono::seconds getCheckpointFlushTimeout();
1271 
1272     /**
1273      * Set the memory threshold on the current bucket quota for accepting a
1274      * new mutation. This is same across all the vbuckets
1275      *
1276      * @param memThreshold Threshold between 0 and 1
1277      */
1278     static void setMutationMemoryThreshold(double memThreshold);
1279 
1280     /**
1281      * Check if this StoredValue has become logically non-existent.
1282      * By logically non-existent, the item has been deleted
1283      * or doesn't exist
1284      *
1285      * @param v StoredValue to check
1286      * @param readHandle a ReadHandle for safe reading of collection data
1287      * @return true if the item is logically non-existent,
1288      *         false otherwise
1289      */
1290     static bool isLogicallyNonExistent(
1291             const StoredValue& v,
1292             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1293 
1294     std::queue<queued_item> rejectQueue;
1295     std::unique_ptr<FailoverTable> failovers;
1296 
1297     std::atomic<size_t>  opsCreate;
1298     std::atomic<size_t>  opsUpdate;
1299     std::atomic<size_t>  opsDelete;
1300     std::atomic<size_t>  opsReject;
1301 
1302     cb::NonNegativeCounter<size_t> dirtyQueueSize;
1303     std::atomic<size_t>  dirtyQueueMem;
1304     std::atomic<size_t>  dirtyQueueFill;
1305     std::atomic<size_t>  dirtyQueueDrain;
1306     std::atomic<uint64_t> dirtyQueueAge;
1307     std::atomic<size_t>  dirtyQueuePendingWrites;
1308     std::atomic<size_t>  metaDataDisk;
1309 
1310     std::atomic<size_t>  numExpiredItems;
1311 
1312     /**
1313      * A custom delete function for deleting VBucket objects. Any thread could
1314      * be the last thread to release a VBucketPtr and deleting a VB will
1315      * eventually hit the I/O sub-system when we unlink the file, to be sure no
1316      * front-end thread does this work, we schedule the deletion to a background
1317      * task. This task scheduling is triggered by the shared_ptr/VBucketPtr
1318      * using this object as the deleter.
1319      */
1320     struct DeferredDeleter {
DeferredDeleterVBucket::DeferredDeleter1321         DeferredDeleter(EventuallyPersistentEngine& engine) : engine(engine) {
1322         }
1323 
1324         /**
1325          * Called when the VBucketPtr has no more owners and runs delete on
1326          * the object.
1327          */
1328         void operator()(VBucket* vb) const;
1329 
1330         EventuallyPersistentEngine& engine;
1331     };
1332 
1333 protected:
1334     /**
1335      * This function checks for the various states of the value & depending on
1336      * which the calling function can issue a bgfetch as needed.
1337      */
1338     std::pair<MutationStatus, GetValue> processGetAndUpdateTtl(
1339             HashTable::HashBucketLock& hbl,
1340             StoredValue* v,
1341             time_t exptime,
1342             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1343     /**
1344      * This function checks cas, expiry and other partition (vbucket) related
1345      * rules before setting an item into other in-memory structure like HT,
1346      * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1347      *
1348      * @param hbl Hash table bucket lock that must be held
1349      * @param v Reference to the ptr of StoredValue. This can be changed if a
1350      *          new StoredValue is added or just its contents is changed if the
1351      *          exisiting StoredValue is updated
1352      * @param itm Item to be added/updated. On success, its revSeqno is updated
1353      * @param cas value to match
1354      * @param allowExisting set to false if you want set to fail if the
1355      *                      item exists already
1356      * @param hasMetaData
1357      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1358      *                    backfill queue
1359      * @param storeIfStatus the status of any conditional store predicate
1360      * @param maybeKeyExists true if the key /may/ exist on disk (as an active,
1361      *                       alive document). Only valid if `v` is null.
1362      * @param isReplication true if issued by consumer (for replication)
1363      *
1364      * @return Result indicating the status of the operation and notification
1365      *                info (if operation was successful).
1366      */
1367     std::pair<MutationStatus, boost::optional<VBNotifyCtx>> processSet(
1368             const HashTable::HashBucketLock& hbl,
1369             StoredValue*& v,
1370             Item& itm,
1371             uint64_t cas,
1372             bool allowExisting,
1373             bool hasMetaData,
1374             const VBQueueItemCtx& queueItmCtx,
1375             cb::StoreIfStatus storeIfStatus,
1376             bool maybeKeyExists = true,
1377             bool isReplication = false);
1378 
1379     /**
1380      * This function checks cas, expiry and other partition (vbucket) related
1381      * rules before adding an item into other in-memory structure like HT,
1382      * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1383      *
1384      * @param hbl Hash table bucket lock that must be held
1385      * @param v[in, out] the stored value to do this operation on
1386      * @param itm Item to be added/updated. On success, its revSeqno is updated
1387      * @param isReplication true if issued by consumer (for replication)
1388      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1389      *                    backfill queue
1390      * @param readHandle Reader access to the Item's collection data.
1391      *
1392      * @return Result indicating the status of the operation and notification
1393      *                info (if the operation was successful).
1394      */
1395     std::pair<AddStatus, boost::optional<VBNotifyCtx>> processAdd(
1396             const HashTable::HashBucketLock& hbl,
1397             StoredValue*& v,
1398             Item& itm,
1399             bool maybeKeyExists,
1400             bool isReplication,
1401             const VBQueueItemCtx& queueItmCtx,
1402             const Collections::VB::Manifest::CachingReadHandle& readHandle);
1403 
1404     /**
1405      * This function checks cas, eviction policy and other partition
1406      * (vbucket) related rules before logically (soft) deleting an item in
1407      * in-memory structure like HT, and checkpoint mgr.
1408      * Assumes that HT bucket lock is grabbed.
1409      *
1410      * @param hbl Hash table bucket lock that must be held
1411      * @param v Reference to the StoredValue to be soft deleted
1412      * @param cas the expected CAS of the item (or 0 to override)
1413      * @param metadata ref to item meta data
1414      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1415      *                    backfill queue
1416      * @param use_meta Indicates if v must be updated with the metadata
1417      * @param bySeqno seqno of the key being deleted
1418      *
1419      * @return pointer to the updated StoredValue. It can be same as that of
1420      *         v or different value if a new StoredValue is created for the
1421      *         update.
1422      *         status of the operation.
1423      *         notification info, if status was successful.
1424      */
1425     std::tuple<MutationStatus, StoredValue*, boost::optional<VBNotifyCtx>>
1426     processSoftDelete(const HashTable::HashBucketLock& hbl,
1427                       StoredValue& v,
1428                       uint64_t cas,
1429                       const ItemMetaData& metadata,
1430                       const VBQueueItemCtx& queueItmCtx,
1431                       bool use_meta,
1432                       uint64_t bySeqno);
1433 
1434     /**
1435      * Delete a key (associated StoredValue) from ALL in-memory data structures
1436      * like HT.
1437      * Assumes that HT bucket lock is grabbed.
1438      *
1439      * Currently StoredValues form HashTable intrusively. That is, HashTable
1440      * does not store a reference or a copy of the StoredValue. If any other
1441      * in-memory data strucutures are formed intrusively using StoredValues,
1442      * then it must be decided in this function which data structure deletes
1443      * the StoredValue. Currently it is HashTable that deleted the StoredValue
1444      *
1445      * @param hbl Hash table bucket lock that must be held
1446      * @param v Reference to the StoredValue to be deleted
1447      *
1448      * @return true if an object was deleted, false otherwise
1449      */
1450     bool deleteStoredValue(const HashTable::HashBucketLock& hbl,
1451                            StoredValue& v);
1452 
1453     /**
1454      * Queue an item for persistence and replication. Maybe track CAS drift
1455      *
1456      * The caller of this function must hold the lock of the hash table
1457      * partition that contains the StoredValue being Queued.
1458      *
1459      * @param v the dirty item. The cas and seqno maybe updated based on the
1460      *          flags passed
1461      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1462      *                    backfill queue, whether to track cas, generate seqno,
1463      *                    generate new cas
1464      *
1465      * @return Notification context containing info needed to notify the
1466      *         clients (like connections, flusher)
1467      */
1468     VBNotifyCtx queueDirty(StoredValue& v, const VBQueueItemCtx& queueItmCtx);
1469 
1470     /**
1471      * Queue an item for persistence and replication
1472      *
1473      * The caller of this function must hold the lock of the hash table
1474      * partition that contains the StoredValue being Queued.
1475      *
1476      * @param v the dirty item. The cas and seqno maybe updated based on the
1477      *          flags passed
1478      * @param generateBySeqno request that the seqno is generated by this call
1479      * @param generateCas request that the CAS is generated by this call
1480      * @param generateDeleteTime for queueing deletes, should the queue item
1481      *        have its delete time generated.
1482      * @param isBackfillItem indicates if the item must be put onto vb queue or
1483      *        onto checkpoint
1484      * @param preLinkDocumentContext context object which allows running the
1485      *        document pre link callback after the cas is assinged (but
1486      *        but document not available for anyone)
1487      *
1488      * @return Notification context containing info needed to notify the
1489      *         clients (like connections, flusher)
1490      */
1491     VBNotifyCtx queueDirty(
1492             StoredValue& v,
1493             GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes,
1494             GenerateCas generateCas = GenerateCas::Yes,
1495             GenerateDeleteTime generateDeleteTime = GenerateDeleteTime::Yes,
1496             bool isBackfillItem = false,
1497             PreLinkDocumentContext* preLinkDocumentContext = nullptr);
1498 
1499     /**
1500      * Adds a temporary StoredValue in in-memory data structures like HT.
1501      * Assumes that HT bucket lock is grabbed.
1502      *
1503      * @param hbl Hash table bucket lock that must be held
1504      * @param key the key for which a temporary item needs to be added
1505      * @param isReplication true if issued by consumer (for replication)
1506      *
1507      * @return Result indicating the status of the operation
1508      */
1509     TempAddStatus addTempStoredValue(const HashTable::HashBucketLock& hbl,
1510                                      const DocKey& key,
1511                                      bool isReplication = false);
1512 
1513     /**
1514      * Internal wrapper function around the callback to be called when a new
1515      * seqno is generated in the vbucket
1516      *
1517      * @param notifyCtx holds info needed for notification
1518      */
1519     void notifyNewSeqno(const VBNotifyCtx& notifyCtx);
1520 
1521     /**
1522      * VBucket internal function to store high priority requests on the vbucket.
1523      *
1524      * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
1525      * @param cookie cookie of conn to be notified
1526      * @param reqType request type indicating seqno or chk persistence
1527      */
1528     void addHighPriorityVBEntry(uint64_t seqnoOrChkId,
1529                                 const void* cookie,
1530                                 HighPriorityVBNotify reqType);
1531 
1532     /**
1533      * Get all high priority notifications as temporary failures because they
1534      * could not be completed.
1535      *
1536      * @param engine Ref to ep-engine
1537      *
1538      * @return map of notifies with conn cookie as the key and notify status as
1539      *         the value
1540      */
1541     std::map<const void*, ENGINE_ERROR_CODE> tmpFailAndGetAllHpNotifies(
1542             EventuallyPersistentEngine& engine);
1543 
1544     /**
1545      * Check if there is memory available to allocate the in-memory
1546      * instance (StoredValue or OrderedStoredValue) for an item.
1547      *
1548      * @param st Reference to epstats
1549      * @param item Item that is being added
1550      * @param isReplication Flag indicating if the item is from replication
1551      *
1552      * @return True if there is memory for the item; else False
1553      */
1554     bool hasMemoryForStoredValue(EPStats& st,
1555                                  const Item& item,
1556                                  bool isReplication);
1557 
1558     void _addStats(bool details, ADD_STAT add_stat, const void* c);
1559 
1560     template <typename T>
1561     void addStat(const char* nm,
1562                  const T& val,
1563                  ADD_STAT add_stat,
1564                  const void* c);
1565 
1566     /* This member holds the eviction policy used */
1567     const item_eviction_policy_t eviction;
1568 
1569     /* Reference to global (EP engine wide) stats */
1570     EPStats& stats;
1571 
1572     /* last seqno that is persisted on the disk */
1573     std::atomic<uint64_t> persistenceSeqno;
1574 
1575     /* holds all high priority async requests to the vbucket */
1576     std::list<HighPriorityVBEntry> hpVBReqs;
1577 
1578     /* synchronizes access to hpVBReqs */
1579     std::mutex hpVBReqsMutex;
1580 
1581     /* size of list hpVBReqs (to avoid MB-9434) */
1582     Couchbase::RelaxedAtomic<size_t> numHpVBReqs;
1583 
1584     /**
1585      * VBucket sub-classes must implement a function that will schedule
1586      * an appropriate task that will delete the VBucket and its resources.
1587      *
1588      * @param engine owning engine (required for task construction)
1589      */
1590     virtual void scheduleDeferredDeletion(
1591             EventuallyPersistentEngine& engine) = 0;
1592 
1593     /**
1594      * Update the revision seqno of a newly StoredValue item.
1595      * We must ensure that it is greater the maxDeletedRevSeqno
1596      *
1597      * @param v StoredValue added newly. Its revSeqno is updated
1598      */
1599     void updateRevSeqNoOfNewStoredValue(StoredValue& v);
1600 
1601 private:
1602     void fireAllOps(EventuallyPersistentEngine& engine, ENGINE_ERROR_CODE code);
1603 
1604     void decrDirtyQueueMem(size_t decrementBy);
1605 
1606     void decrDirtyQueueAge(uint32_t decrementBy);
1607 
1608     void decrDirtyQueuePendingWrites(size_t decrementBy);
1609 
1610     /**
1611      * Updates an existing StoredValue in in-memory data structures like HT.
1612      * Assumes that HT bucket lock is grabbed.
1613      *
1614      * @param htLock Hash table lock that must be held
1615      * @param v Reference to the StoredValue to be updated.
1616      * @param itm Item to be updated.
1617      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1618      *                    backfill queue
1619      * @param justTouch   To note that this object is an existing item with
1620      *                    the same value but with few flags changed.
1621      * @return pointer to the updated StoredValue. It can be same as that of
1622      *         v or different value if a new StoredValue is created for the
1623      *         update.
1624      *         status of the operation.
1625      *         notification info.
1626      */
1627     virtual std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
1628     updateStoredValue(const HashTable::HashBucketLock& hbl,
1629                       StoredValue& v,
1630                       const Item& itm,
1631                       const VBQueueItemCtx& queueItmCtx,
1632                       bool justTouch = false) = 0;
1633 
1634     /**
1635      * Adds a new StoredValue in in-memory data structures like HT.
1636      * Assumes that HT bucket lock is grabbed.
1637      *
1638      * @param hbl Hash table bucket lock that must be held
1639      * @param itm Item to be added.
1640      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1641      *                    backfill queue
1642      * @param genRevSeqno whether to generate new revision sequence number
1643      *                    or not
1644      *
1645      * @return Ptr of the StoredValue added and notification info
1646      */
1647     virtual std::pair<StoredValue*, VBNotifyCtx> addNewStoredValue(
1648             const HashTable::HashBucketLock& hbl,
1649             const Item& itm,
1650             const VBQueueItemCtx& queueItmCtx,
1651             GenerateRevSeqno genRevSeqno) = 0;
1652 
1653     /**
1654      * Logically (soft) delete item in all in-memory data structures. Also
1655      * updates revSeqno. Depending on the in-memory data structure the item may
1656      * be marked delete and/or reset and/or a new value (marked as deleted)
1657      * added.
1658      * Assumes that HT bucket lock is grabbed.
1659      * Also assumes that v is in the hash table.
1660      *
1661      * @param hbl Hash table bucket lock that must be held
1662      * @param v Reference to the StoredValue to be soft deleted
1663      * @param onlyMarkDeleted indicates if we must reset the StoredValue or
1664      *                        just mark deleted
1665      * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1666      *                    backfill queue
1667      * @param bySeqno seqno of the key being deleted
1668      *
1669      * @return pointer to the updated StoredValue. It can be same as that of
1670      *         v or different value if a new StoredValue is created for the
1671      *         update.
1672      *         notification info.
1673      */
1674     virtual std::tuple<StoredValue*, VBNotifyCtx> softDeleteStoredValue(
1675             const HashTable::HashBucketLock& hbl,
1676             StoredValue& v,
1677             bool onlyMarkDeleted,
1678             const VBQueueItemCtx& queueItmCtx,
1679             uint64_t bySeqno) = 0;
1680 
1681     /**
1682      * This function handles expiry relatead stuff before logically (soft)
1683      * deleting an item in in-memory structures like HT, and checkpoint mgr.
1684      * Assumes that HT bucket lock is grabbed.
1685      *
1686      * @param hbl Hash table bucket lock that must be held
1687      * @param v Reference to the StoredValue to be soft deleted
1688      *
1689      * @return status of the operation.
1690      *         pointer to the updated StoredValue. It can be same as that of
1691      *         v or different value if a new StoredValue is created for the
1692      *         update.
1693      *         notification info.
1694      */
1695     std::tuple<MutationStatus, StoredValue*, VBNotifyCtx> processExpiredItem(
1696             const HashTable::HashBucketLock& hbl, StoredValue& v);
1697 
1698     /**
1699      * Add a temporary item in hash table and enqueue a background fetch for a
1700      * key.
1701      *
1702      * @param hbl Reference to the hash table bucket lock
1703      * @param key the key to be bg fetched
1704      * @param cookie the cookie of the requestor
1705      * @param engine Reference to ep engine
1706      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1707      * @param metadataOnly whether the fetch is for a non-resident value or
1708      *                     metadata of a (possibly) deleted item
1709      * @param isReplication indicates if the call is for a replica vbucket
1710      *
1711      * @return ENGINE_ERROR_CODE status notified to be to the front end
1712      */
1713     virtual ENGINE_ERROR_CODE addTempItemAndBGFetch(
1714             HashTable::HashBucketLock& hbl,
1715             const DocKey& key,
1716             const void* cookie,
1717             EventuallyPersistentEngine& engine,
1718             int bgFetchDelay,
1719             bool metadataOnly,
1720             bool isReplication = false) = 0;
1721 
1722     /**
1723      * Enqueue a background fetch for a key.
1724      *
1725      * @param key the key to be bg fetched
1726      * @param cookie the cookie of the requestor
1727      * @param engine Reference to ep engine
1728      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1729      * @param isMeta whether the fetch is for a non-resident value or metadata
1730      *               of a (possibly) deleted item
1731      */
1732     virtual void bgFetch(const DocKey& key,
1733                          const void* cookie,
1734                          EventuallyPersistentEngine& engine,
1735                          int bgFetchDelay,
1736                          bool isMeta = false) = 0;
1737 
1738     /**
1739      * Get metadata and value for a non-resident key
1740      *
1741      * @param key key for which metadata and value should be retrieved
1742      * @param cookie the cookie representing the client
1743      * @param engine Reference to ep engine
1744      * @param bgFetchDelay Delay in secs before we run the bgFetch task
1745      * @param queueBgFetch Indicates whether a background fetch needs to be
1746      *        queued
1747      * @param v reference to the stored value of the non-resident key
1748      *
1749      * @return the result of the operation
1750      */
1751     virtual GetValue getInternalNonResident(const DocKey& key,
1752                                             const void* cookie,
1753                                             EventuallyPersistentEngine& engine,
1754                                             int bgFetchDelay,
1755                                             QueueBgFetch queueBgFetch,
1756                                             const StoredValue& v) = 0;
1757 
1758     /**
1759      * Increase the expiration count global stats and in the vbucket stats
1760      */
1761     void incExpirationStat(ExpireBy source);
1762 
1763     void adjustCheckpointFlushTimeout(std::chrono::seconds wall_time);
1764 
1765     /**
1766      * Given a StoredValue with XATTRs - prune the user keys so only system keys
1767      * remain.
1768      *
1769      * @param v StoredValue with XATTR value
1770      * @param itemMeta New ItemMetaData to use in item creation
1771      * @return unique_ptr<Item> which matches the StoredValue's meta-data and
1772      *         has the XATTR value with only the system-keys. If the pruning
1773      *         removed all keys (because no system-keys exist) an empty
1774      *         unique_ptr is returned.
1775      */
1776     std::unique_ptr<Item> pruneXattrDocument(StoredValue& v,
1777                                              const ItemMetaData& itemMeta);
1778 
1779     /**
1780      * Estimate the new total memory usage with the allocation of an in-memory
1781      * instance for item
1782      *
1783      * @param st Reference to epstats
1784      * @param item Item that is being added
1785      *
1786      * @return new total size for this Bucket once Item is allocated
1787      */
1788     virtual size_t estimateNewMemoryUsage(EPStats& st, const Item& item) = 0;
1789 
1790     /*
1791      * Call the predicate with item_info from v (none if v is nullptr)
1792      * @param predicate a function to call, must be initialised
1793      * @param v the StoredValue (or nullptr if none in cache)
1794      * @return how the caller should proceed (store_if semantics)
1795      */
1796     cb::StoreIfStatus callPredicate(cb::StoreIfPredicate predicate,
1797                                     StoredValue* v);
1798 
1799     id_type                         id;
1800     std::atomic<vbucket_state_t>    state;
1801     cb::RWLock                      stateLock;
1802     vbucket_state_t                 initialState;
1803     std::mutex                           pendingOpLock;
1804     std::vector<const void*>        pendingOps;
1805     ProcessClock::time_point        pendingOpsStart;
1806     WeaklyMonotonic<uint64_t>       purge_seqno;
1807     std::atomic<bool>               takeover_backed_up;
1808 
1809     /* snapshotMutex is used to update/read the pair {start, end} atomically,
1810        but not if reading a single field. */
1811     mutable std::mutex snapshotMutex;
1812     uint64_t persisted_snapshot_start;
1813     uint64_t persisted_snapshot_end;
1814 
1815     std::mutex bfMutex;
1816     std::unique_ptr<BloomFilter> bFilter;
1817     std::unique_ptr<BloomFilter> tempFilter;    // Used during compaction.
1818 
1819     std::atomic<uint64_t> rollbackItemCount;
1820 
1821     HLC hlc;
1822     std::string statPrefix;
1823     // The persistence checkpoint ID for this vbucket.
1824     std::atomic<uint64_t> persistenceCheckpointId;
1825     // Flag to indicate the vbucket is being created
1826     std::atomic<bool> bucketCreation;
1827     // Flag to indicate the vbucket deletion is deferred
1828     std::atomic<bool> deferredDeletion;
1829     /// A cookie that can be set when the vbucket is deletion is deferred, the
1830     /// cookie will be notified when the deferred deletion completes
1831     const void* deferredDeletionCookie;
1832 
1833     // Ptr to the item conflict resolution module
1834     std::unique_ptr<ConflictResolution> conflictResolver;
1835 
1836     // A callback to be called when a new seqno is generated in the vbucket as
1837     // a result of a front end call
1838     NewSeqnoCallback newSeqnoCb;
1839 
1840     /// The VBucket collection state
1841     Collections::VB::Manifest manifest;
1842 
1843     /**
1844      * records if the vbucket has had xattrs documents written to it, note that
1845      * rollback of data or delete of all the xattr documents does not undo the
1846      * flag.
1847      */
1848     std::atomic<bool> mayContainXattrs;
1849 
1850     static cb::AtomicDuration chkFlushTimeout;
1851 
1852     static double mutationMemThreshold;
1853 
1854     friend class VBucketTestBase;
1855 
1856     DISALLOW_COPY_AND_ASSIGN(VBucket);
1857 };
1858 
1859 using VBucketPtr = std::shared_ptr<VBucket>;
1860 
1861 /**
1862  * Represents a locked VBucket that provides RAII semantics for the lock.
1863  *
1864  * Behaves like the underlying shared_ptr - i.e. `operator->` is overloaded
1865  * to return the underlying VBucket.
1866  */
1867 class LockedVBucketPtr {
1868 public:
LockedVBucketPtr(VBucketPtr vb, std::unique_lock<std::mutex>&& lock)1869     LockedVBucketPtr(VBucketPtr vb, std::unique_lock<std::mutex>&& lock)
1870         : vb(std::move(vb)), lock(std::move(lock)) {
1871     }
1872 
operator *() const1873     VBucket& operator*() const {
1874         return *vb;
1875     }
1876 
operator ->() const1877     VBucket* operator->() const {
1878         return vb.get();
1879     }
1880 
operator bool() const1881     explicit operator bool() const {
1882         return vb.operator bool();
1883     }
1884 
getVB()1885     VBucketPtr& getVB() {
1886         return vb;
1887     }
1888 
1889     /// Return true if this object owns the mutex.
owns_lock() const1890     bool owns_lock() const {
1891         return lock.owns_lock();
1892     }
1893 
1894 private:
1895     VBucketPtr vb;
1896     std::unique_lock<std::mutex> lock;
1897 };
1898