1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#pragma once
19
20#include "config.h"
21
22#include "bloomfilter.h"
23#include "checkpoint_config.h"
24#include "collections/vbucket_manifest.h"
25#include "dcp/dcp-types.h"
26#include "hash_table.h"
27#include "hlc.h"
28#include "item_pager.h"
29#include "kvstore.h"
30#include "monotonic.h"
31
32#include <memcached/engine.h>
33#include <platform/atomic_duration.h>
34#include <platform/non_negative_counter.h>
35#include <relaxed_atomic.h>
36#include <atomic>
37#include <queue>
38
39class EPStats;
40class CheckpointManager;
41class ConflictResolution;
42class Configuration;
43class ItemMetaData;
44class PreLinkDocumentContext;
45class EventuallyPersistentEngine;
46class DCPBackfill;
47class RollbackResult;
48class VBucketBGFetchItem;
49
50/**
51 * The following will be used to identify
52 * the source of an item's expiration.
53 */
54enum class ExpireBy { Pager, Compactor, Access };
55
56/* Structure that holds info needed for notification for an item being updated
57   in the vbucket */
58struct VBNotifyCtx {
59    VBNotifyCtx() : bySeqno(0), notifyReplication(false), notifyFlusher(false) {
60    }
61    Monotonic<int64_t> bySeqno;
62    bool notifyReplication;
63    bool notifyFlusher;
64};
65
66/**
67 * Structure that holds info needed to queue an item in chkpt or vb backfill
68 * queue
69 *
70 * GenerateDeleteTime - Only the queueing of items which are isDeleted() == true
71 * will this parameter have any affect. E.g. an add of an Item with this set to
72 * Yes will have no effect, whilst an explicit delete this parameter will have
73 * have an effect.
74 *
75 * Note that when queueing a delete with and expiry time of 0, the delete time
76 * is always generated. It is invalid to queue an Item with a 0 delete time,
77 * in this case the GenerateDeleteTime setting is ignored.
78 *
79 */
80struct VBQueueItemCtx {
81    /**
82     * For code which needs to determine if a delete time should be generated
83     * that is for example del and set-with-meta, expiry, deletes this
84     * constructor allows for the GenerateDeleteTime to be set as Yes or No.
85     */
86    VBQueueItemCtx(GenerateBySeqno genBySeqno,
87                   GenerateCas genCas,
88                   GenerateDeleteTime generateDeleteTime,
89                   TrackCasDrift trackCasDrift,
90                   bool isBackfillItem,
91                   PreLinkDocumentContext* preLinkDocumentContext)
92        : genBySeqno(genBySeqno),
93          genCas(genCas),
94          generateDeleteTime(generateDeleteTime),
95          trackCasDrift(trackCasDrift),
96          isBackfillItem(isBackfillItem),
97          preLinkDocumentContext(preLinkDocumentContext) {
98    }
99
100    /**
101     * For code which is queueing Item's that are not deleted and the generate
102     * delete time has no affect, this constructor should be used.
103     */
104    VBQueueItemCtx(GenerateBySeqno genBySeqno,
105                   GenerateCas genCas,
106                   TrackCasDrift trackCasDrift,
107                   bool isBackfillItem,
108                   PreLinkDocumentContext* preLinkDocumentContext)
109        : genBySeqno(genBySeqno),
110          genCas(genCas),
111          generateDeleteTime(GenerateDeleteTime::No),
112          trackCasDrift(trackCasDrift),
113          isBackfillItem(isBackfillItem),
114          preLinkDocumentContext(preLinkDocumentContext) {
115    }
116
117    /* Indicates if we should queue an item or not. If this is false other
118       members should not be used */
119    GenerateBySeqno genBySeqno;
120    GenerateCas genCas;
121    GenerateDeleteTime generateDeleteTime;
122    TrackCasDrift trackCasDrift;
123    bool isBackfillItem;
124    PreLinkDocumentContext* preLinkDocumentContext;
125};
126
127/**
128 * Structure that holds seqno based or checkpoint persistence based high
129 * priority requests to a vbucket
130 */
131struct HighPriorityVBEntry {
132    HighPriorityVBEntry(const void* c,
133                        uint64_t idNum,
134                        HighPriorityVBNotify reqType)
135        : cookie(c), id(idNum), reqType(reqType), start(ProcessClock::now()) {
136    }
137
138    const void* cookie;
139    uint64_t id;
140    HighPriorityVBNotify reqType;
141
142    /* for stats (histogram) */
143    ProcessClock::time_point start;
144};
145
146typedef std::unique_ptr<Callback<const uint16_t, const VBNotifyCtx&>>
147        NewSeqnoCallback;
148
149class EventuallyPersistentEngine;
150class FailoverTable;
151class KVShard;
152class VBucketMemoryDeletionTask;
153
154/**
155 * An individual vbucket.
156 */
157class VBucket : public std::enable_shared_from_this<VBucket> {
158public:
159
160    // Identifier for a vBucket
161    typedef uint16_t id_type;
162
163    enum class GetKeyOnly {
164         Yes,
165         No
166     };
167
168    VBucket(id_type i,
169            vbucket_state_t newState,
170            EPStats& st,
171            CheckpointConfig& chkConfig,
172            int64_t lastSeqno,
173            uint64_t lastSnapStart,
174            uint64_t lastSnapEnd,
175            std::unique_ptr<FailoverTable> table,
176            std::shared_ptr<Callback<id_type>> flusherCb,
177            std::unique_ptr<AbstractStoredValueFactory> valFact,
178            NewSeqnoCallback newSeqnoCb,
179            Configuration& config,
180            item_eviction_policy_t evictionPolicy,
181            vbucket_state_t initState = vbucket_state_dead,
182            uint64_t purgeSeqno = 0,
183            uint64_t maxCas = 0,
184            int64_t hlcEpochSeqno = HlcCasSeqnoUninitialised,
185            bool mightContainXattrs = false,
186            const std::string& collectionsManifest = "");
187
188    virtual ~VBucket();
189
190    int64_t getHighSeqno() const;
191
192    size_t getChkMgrMemUsage() const;
193
194    size_t getChkMgrMemUsageOfUnrefCheckpoints() const;
195
196    size_t getChkMgrMemUsageOverhead() const;
197
198    uint64_t getPurgeSeqno() const {
199        return purge_seqno;
200    }
201
202    void setPurgeSeqno(uint64_t to) {
203        purge_seqno = to;
204    }
205
206    void setPersistedSnapshot(uint64_t start, uint64_t end) {
207        LockHolder lh(snapshotMutex);
208        persisted_snapshot_start = start;
209        persisted_snapshot_end = end;
210    }
211
212    snapshot_range_t getPersistedSnapshot() const {
213        LockHolder lh(snapshotMutex);
214        return {persisted_snapshot_start, persisted_snapshot_end};
215    }
216
217    uint64_t getMaxCas() const {
218        return hlc.getMaxHLC();
219    }
220
221    void setMaxCas(uint64_t cas) {
222        hlc.setMaxHLC(cas);
223    }
224
225    void setMaxCasAndTrackDrift(uint64_t cas) {
226        hlc.setMaxHLCAndTrackDrift(cas);
227    }
228
229    void forceMaxCas(uint64_t cas) {
230        hlc.forceMaxHLC(cas);
231    }
232
233    HLC::DriftStats getHLCDriftStats() const {
234        return hlc.getDriftStats();
235    }
236
237    HLC::DriftExceptions getHLCDriftExceptionCounters() const {
238        return hlc.getDriftExceptionCounters();
239    }
240
241    void setHLCDriftAheadThreshold(std::chrono::microseconds threshold) {
242        hlc.setDriftAheadThreshold(threshold);
243    }
244
245    void setHLCDriftBehindThreshold(std::chrono::microseconds threshold) {
246        hlc.setDriftBehindThreshold(threshold);
247    }
248
249    /**
250     * @returns a seqno, documents with a seqno >= the returned value have a HLC
251     * generated CAS. Can return HlcCasSeqnoUninitialised if warmup has not
252     * established or nothing is persisted
253     */
254    int64_t getHLCEpochSeqno() const {
255        return hlc.getEpochSeqno();
256    }
257
258    /**
259     * Set the seqno to be used to establish if an item has a HLC generated CAS.
260     * @param seqno the value to store in the vbucket
261     * @throws if an attempt to set to < 0
262     */
263    void setHLCEpochSeqno(int64_t seqno) {
264        if (seqno < 0) {
265            throw std::invalid_argument("VBucket::setHLCEpochSeqno(" +
266                                        std::to_string(seqno) + ") seqno < 0 ");
267        }
268        hlc.setEpochSeqno(seqno);
269    }
270
271    bool isTakeoverBackedUp() {
272        return takeover_backed_up.load();
273    }
274
275    void setTakeoverBackedUpState(bool to) {
276        bool inverse = !to;
277        takeover_backed_up.compare_exchange_strong(inverse, to);
278    }
279
280    // States whether the VBucket is in the process of being created
281    bool isBucketCreation() const {
282        return bucketCreation.load();
283    }
284
285    bool setBucketCreation(bool rv) {
286        bool inverse = !rv;
287        return bucketCreation.compare_exchange_strong(inverse, rv);
288    }
289
290    /**
291     * @return true if the vbucket deletion is to be deferred to a background
292     *         task.
293     */
294    bool isDeletionDeferred() const {
295        return deferredDeletion.load();
296    }
297
298    /**
299     * @param value true if the vbucket's deletion should be deferred to a
300     *        background task. This is for VBucket objects created by
301     *        makeVBucket and owned by a VBucketPtr. If the VBucket was manually
302     *        created this will have no effect on deletion.
303     */
304    void setDeferredDeletion(bool value) {
305        deferredDeletion.store(true);
306    }
307
308    /**
309     * @param A cookie to notify when the deferred deletion completes.
310     */
311    void setDeferredDeletionCookie(const void* cookie) {
312        deferredDeletionCookie = cookie;
313    }
314
315    /**
316     * @return the cookie which could of been set when setupDeferredDeletion was
317     *         called.
318     */
319    const void* getDeferredDeletionCookie() const {
320        return deferredDeletionCookie;
321    }
322
323    /**
324     * Setup deferred deletion, this is where deletion of the vbucket is
325     * deferred and completed by an AUXIO/NONIO task. AUXIO for EPVBucket
326     * as it will hit disk for the data file unlink, NONIO is used for
327     * EphemeralVBucket as only memory resources need freeing.
328     * @param cookie A cookie to notify when the deletion task completes.
329     */
330    virtual void setupDeferredDeletion(const void* cookie) = 0;
331
332    // Returns the last persisted sequence number for the VBucket
333    virtual uint64_t getPersistenceSeqno() const = 0;
334
335    /**
336     * Returns the sequence number to expose publically as the highest
337     * persisted seqno. Note this is may differ from getPersistenceSeqno,
338     * depending on the Bucket type.
339     *
340     * Historical note: This is the same as PersistenceSeqno for EP buckets,
341     * and hence before Spock wasn't a separate function; however for Ephemeral
342     * buckets we need to distinguish between what sequence number we report
343     * to external clients for Observe/persistTo, and what sequence number we
344     * report to internal DCP / ns_server for takeover:
345     *  a) Clients need 0 for the Ephemeral "persisted to" seqno (as
346     *     there isn't any Persistence and we can't claim something is on-disk
347     *     when it is not).
348     *  b) ns_server / replication needs a non-zero, "logically-persisted" seqno
349     *     from the replica to know that a vBucket is ready for takeover.
350     * As such, getPublicPersistenceSeqno() is used for (a), and
351     * getPersistenceSeqno() is used for (b).
352     */
353    virtual uint64_t getPublicPersistenceSeqno() const = 0;
354
355    void setPersistenceSeqno(uint64_t seqno) {
356        persistenceSeqno.store(seqno);
357    }
358
359    id_type getId() const { return id; }
360    vbucket_state_t getState(void) const { return state.load(); }
361
362    /**
363     * Sets the vbucket state to a desired state
364     *
365     * @param to desired vbucket state
366     */
367    void setState(vbucket_state_t to);
368
369    /**
370     * Sets the vbucket state to a desired state with the 'stateLock' already
371     * acquired
372     *
373     * @param to desired vbucket state
374     * @param vbStateLock write lock holder on 'stateLock'
375     */
376    void setState_UNLOCKED(vbucket_state_t to, WriterLockHolder& vbStateLock);
377
378    cb::RWLock& getStateLock() {return stateLock;}
379
380    vbucket_state_t getInitialState(void) { return initialState; }
381
382    vbucket_state getVBucketState() const;
383
384    /**
385     * This method performs operations on the stored value prior
386     * to expiring the item.
387     *
388     * @param v the stored value
389     */
390    void handlePreExpiry(const std::unique_lock<std::mutex>& hbl,
391                         StoredValue& v);
392
393    bool addPendingOp(const void *cookie);
394
395    void doStatsForQueueing(const Item& item, size_t itemBytes);
396    void doStatsForFlushing(const Item& item, size_t itemBytes);
397    void incrMetaDataDisk(const Item& qi);
398    void decrMetaDataDisk(const Item& qi);
399
400    /// Increase the total count of items in this VBucket by 1.
401    virtual void incrNumTotalItems() = 0;
402
403    /// Decrease the total count of items in this VBucket by 1.
404    virtual void decrNumTotalItems() = 0;
405
406    /**
407     * Set the total count of items in this VBucket to the specified value.
408     */
409    virtual void setNumTotalItems(size_t items) = 0;
410
411    /// Reset all statistics assocated with this vBucket.
412    virtual void resetStats();
413
414    // Get age sum in millisecond
415    uint64_t getQueueAge();
416
417    void fireAllOps(EventuallyPersistentEngine &engine);
418
419    size_t size(void) {
420        HashTableDepthStatVisitor v;
421        ht.visitDepth(v);
422        return v.size;
423    }
424
425    size_t getBackfillSize() {
426        LockHolder lh(backfill.mutex);
427        return backfill.items.size();
428    }
429
430    /**
431     * Process an item from a DCP backfill.
432     * It puts it onto a queue for persistence and/or generates a seqno and
433     * updates stats
434     *
435     * @param qi item to be processed
436     * @param generateBySeqno indicates if a new seqno must generated or the
437     *                        seqno in the item must be used
438     *
439     *
440     */
441    virtual void queueBackfillItem(queued_item& qi,
442                                   const GenerateBySeqno generateBySeqno) = 0;
443
444    /**
445     * Transfer any backfill items to the specified vector, up to the optional
446     * limit.
447     * @param items Destination for items transferred from backfill.
448     * @param limit If non-zero, limit the number of items transferred to the
449     * specified number.
450     * @return true if any more items remain in the backfill set - i.e. if
451     *         limit constrained the number of items.
452     */
453    bool getBackfillItems(std::vector<queued_item>& items, size_t limit = 0);
454
455    struct ItemsToFlush {
456        std::vector<queued_item> items;
457        snapshot_range_t range{0, 0};
458        bool moreAvailable = false;
459    };
460
461    /**
462     * Obtain the series of items to be flushed for this vBucket.
463     *
464     * @param vb VBucket to fetch items for.
465     * @param approxLimit Upper bound on how many items to fetch.
466     * @return The items to flush; along with their seqno range and
467     *         if more items are available for this vBucket (i.e. the
468     *         limit was reached).
469     */
470    ItemsToFlush getItemsToPersist(size_t approxLimit);
471
472    bool isBackfillPhase() {
473        return backfill.isBackfillPhase.load();
474    }
475
476    void setBackfillPhase(bool backfillPhase) {
477        backfill.isBackfillPhase.store(backfillPhase);
478    }
479
480    /**
481     * Returns the map of bgfetch items for this vbucket, clearing the
482     * pendingBGFetches.
483     */
484    virtual vb_bgfetch_queue_t getBGFetchItems() = 0;
485
486    virtual bool hasPendingBGFetchItems() = 0;
487
488    static const char* toString(vbucket_state_t s) {
489        switch(s) {
490        case vbucket_state_active: return "active"; break;
491        case vbucket_state_replica: return "replica"; break;
492        case vbucket_state_pending: return "pending"; break;
493        case vbucket_state_dead: return "dead"; break;
494        }
495        return "unknown";
496    }
497
498    static vbucket_state_t fromString(const char* state) {
499        if (strcmp(state, "active") == 0) {
500            return vbucket_state_active;
501        } else if (strcmp(state, "replica") == 0) {
502            return vbucket_state_replica;
503        } else if (strcmp(state, "pending") == 0) {
504            return vbucket_state_pending;
505        } else {
506            return vbucket_state_dead;
507        }
508    }
509
510    /**
511     * Checks and decides whether to add high priority request on the vbucket.
512     * This is an async request made by modules like ns-server during
513     * rebalance. The request is for a response from the vbucket when it
514     * 'sees' beyond a certain sequence number or when a certain checkpoint
515     * is persisted.
516     * Depending on the vbucket type, the meaning 'seeing' a sequence number
517     * changes. That is, it could mean persisted in case of EPVBucket and
518     * added to the sequenced data structure in case of EphemeralVBucket.
519     *
520     * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
521     * @param cookie cookie of conn to be notified
522     * @param reqType indicating request for seqno or chk persistence
523     *
524     * @return RequestScheduled if a high priority request is added and
525     *                          notification will be done asynchronously
526     *         NotSupported if the request is not supported for the reqType
527     *         RequestNotScheduled if a high priority request is NOT added (as
528     *                             it is not required). This implies there won't
529     *                             be a subsequent notification
530     */
531    virtual HighPriorityVBReqStatus checkAddHighPriorityVBEntry(
532            uint64_t seqnoOrChkId,
533            const void* cookie,
534            HighPriorityVBNotify reqType) = 0;
535
536    /**
537     * Notify the high priority requests on the vbucket.
538     * This is the response to async requests made by modules like ns-server
539     * during rebalance.
540     *
541     * @param engine Ref to ep-engine
542     * @param id seqno or checkpoint id causing the notification(s).
543     * @param notifyType indicating notify for seqno or chk persistence
544     */
545    virtual void notifyHighPriorityRequests(
546            EventuallyPersistentEngine& engine,
547            uint64_t id,
548            HighPriorityVBNotify notifyType) = 0;
549
550    virtual void notifyAllPendingConnsFailed(EventuallyPersistentEngine& e) = 0;
551
552    /**
553     * Get high priority notifications for a seqno or checkpoint persisted
554     *
555     * @param engine Ref to ep-engine
556     * @param id seqno or checkpoint id for which notifies are to be found
557     * @param notifyType indicating notify for seqno or chk persistence
558     *
559     * @return map of notifications with conn cookie as the key and notify
560     *         status as the value
561     */
562    std::map<const void*, ENGINE_ERROR_CODE> getHighPriorityNotifications(
563            EventuallyPersistentEngine& engine,
564            uint64_t idNum,
565            HighPriorityVBNotify notifyType);
566
567    size_t getHighPriorityChkSize() {
568        return numHpVBReqs.load();
569    }
570
571    /**
572     * BloomFilter operations for vbucket
573     */
574    void createFilter(size_t key_count, double probability);
575    void initTempFilter(size_t key_count, double probability);
576    void addToFilter(const DocKey& key);
577    virtual bool maybeKeyExistsInFilter(const DocKey& key);
578    bool isTempFilterAvailable();
579    void addToTempFilter(const DocKey& key);
580    void swapFilter();
581    void clearFilter();
582    void setFilterStatus(bfilter_status_t to);
583    std::string getFilterStatusString();
584    size_t getFilterSize();
585    size_t getNumOfKeysInFilter();
586
587    uint64_t nextHLCCas() {
588        return hlc.nextHLC();
589    }
590
591    // Applicable only for FULL EVICTION POLICY
592    bool isResidentRatioUnderThreshold(float threshold);
593
594    /**
595     * Returns true if deleted items (aka tombstones) are always resident in
596     * memory (and hence we do not need to attempt a bgFetch if we try to
597     * access a deleted key which isn't found in memory).
598     */
599    virtual bool areDeletedItemsAlwaysResident() const = 0;
600
601    virtual void addStats(bool details, ADD_STAT add_stat, const void* c) = 0;
602
603    virtual KVShard* getShard() = 0;
604
605    /**
606     * Returns the number of alive (non-deleted) Items the VBucket.
607     *
608     * Includes items which are not currently resident in memory (i.e. under
609     * Full eviction and have been fully evicted from memory).
610     * Does *not* include deleted items.
611     */
612    virtual size_t getNumItems() const = 0;
613
614    virtual size_t getNumNonResidentItems() const = 0;
615
616    size_t getNumTempItems(void) {
617        return ht.getNumTempItems();
618    }
619
620    void incrRollbackItemCount(uint64_t val) {
621        rollbackItemCount.fetch_add(val, std::memory_order_relaxed);
622    }
623
624    uint64_t getRollbackItemCount(void) {
625        return rollbackItemCount.load(std::memory_order_relaxed);
626    }
627
628    // Return the persistence checkpoint ID
629    uint64_t getPersistenceCheckpointId() const;
630
631    // Set the persistence checkpoint ID to the given value.
632    void setPersistenceCheckpointId(uint64_t checkpointId);
633
634    // Mark the value associated with the given key as dirty
635    void markDirty(const DocKey& key);
636
637    /**
638     * Obtain the read handle for the collections manifest.
639     * The caller will have read-only access to manifest using the methods
640     * exposed by the ReadHandle
641     */
642    Collections::VB::Manifest::ReadHandle lockCollections() const {
643        return manifest.lock();
644    }
645
646    /**
647     * Obtain a caching read handle for the collections manifest.
648     * The returned handle will lookup the collection associated with key
649     * and cache the internal iterator so that future usage of
650     * isLogicallyDeleted doesn't need to re-scan and lookup. This is different
651     * to a plain ReadHandle which provides more functionality (more methods
652     * for the caller), but may result in extra lookups and key-scans.
653     * @param key A key to use for constructing the read handle.
654     * @return a CachingReadHandle which the caller should test is valid with
655     *         CachingReadHandle::valid
656     */
657    Collections::VB::Manifest::CachingReadHandle lockCollections(
658            const DocKey& key) const {
659        return manifest.lock(key);
660    }
661
662    Collections::VB::Manifest::CachingReadHandle lockCollections(
663            const DocKey& key, const std::string& separator) const {
664        return manifest.lock(key, separator);
665    }
666
667    /**
668     * Update the Collections::VB::Manifest and the VBucket.
669     * Adds SystemEvents for the create and delete of collections into the
670     * checkpoint.
671     *
672     * @param m A Collections::Manifest to apply to the VB::Manifest
673     */
674    void updateFromManifest(const Collections::Manifest& m) {
675        manifest.wlock().update(*this, m);
676    }
677
678    /**
679     * Finalise the deletion of a collection (no items remain in the collection)
680     *
681     * @param collection Name of the collection that has completed deleting
682     */
683    void completeDeletion(cb::const_char_buffer collection) {
684        manifest.wlock().completeDeletion(*this, collection);
685    }
686
687    /**
688     * Add a collection to this vbucket with a pre-assigned seqno. I.e.
689     * this VB is a replica.
690     *
691     * @param manifestUid the uid of the manifest which made the change
692     * @param identifier Identifier for the collection to add.
693     * @param bySeqno The seqno assigned to the collection create event.
694     */
695    void replicaAddCollection(Collections::uid_t manifestUid,
696                              Collections::Identifier identifier,
697                              int64_t bySeqno) {
698        manifest.wlock().replicaAdd(*this, manifestUid, identifier, bySeqno);
699    }
700
701    /**
702     * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
703     * this VB is a replica.
704     *
705     * @param manifestUid the uid of the manifest which made the change
706     * @param identifier Identifier for the collection to begin deleting.
707     * @param bySeqno The seqno assigned to the collection delete event.
708     */
709    void replicaBeginDeleteCollection(Collections::uid_t manifestUid,
710                                      Collections::Identifier identifier,
711                                      int64_t bySeqno) {
712        manifest.wlock().replicaBeginDelete(
713                *this, manifestUid, identifier, bySeqno);
714    }
715
716    /**
717     * Delete a collection from this vbucket with a pre-assigned seqno. I.e.
718     * this VB is a replica.
719     *
720     * @param manifestUid the uid of the manifest which made the change
721     * @param separator The new separator.
722     * @param bySeqno The seqno assigned to the change separator event.
723     */
724    void replicaChangeCollectionSeparator(Collections::uid_t manifestUid,
725                                          cb::const_char_buffer separator,
726                                          int64_t bySeqno) {
727        manifest.wlock().replicaChangeSeparator(
728                *this, manifestUid, separator, bySeqno);
729    }
730
731    /**
732     * Get the collection manifest
733     *
734     * @return const reference to the manifest
735     */
736    const Collections::VB::Manifest& getManifest() const {
737        return manifest;
738    }
739
740    static const vbucket_state_t ACTIVE;
741    static const vbucket_state_t REPLICA;
742    static const vbucket_state_t PENDING;
743    static const vbucket_state_t DEAD;
744
745    HashTable         ht;
746
747    /// Manager of this vBucket's checkpoints. unique_ptr for pimpl.
748    std::unique_ptr<CheckpointManager> checkpointManager;
749
750    // Struct for managing 'backfill' items - Items which have been added by
751    // an incoming DCP stream and need to be persisted to disk.
752    struct {
753        std::mutex mutex;
754        std::queue<queued_item> items;
755        std::atomic<bool> isBackfillPhase;
756    } backfill;
757
758    /**
759     * Gets the valid StoredValue for the key and deletes an expired item if
760     * desired by the caller. Requires the hash bucket to be locked
761     *
762     * @param hbl Reference to the hash bucket lock
763     * @param key
764     * @param wantsDeleted
765     * @param trackReference
766     * @param queueExpired Delete an expired item
767     */
768    StoredValue* fetchValidValue(HashTable::HashBucketLock& hbl,
769                                 const DocKey& key,
770                                 WantsDeleted wantsDeleted,
771                                 TrackReference trackReference,
772                                 QueueExpired queueExpired);
773
774    /**
775     * Complete the background fetch for the specified item. Depending on the
776     * state of the item, restore it to the hashtable as appropriate,
777     * potentially queuing it as dirty.
778     *
779     * @param key The key of the item
780     * @param fetched_item The item which has been fetched.
781     * @param startTime The time processing of the batch of items started.
782     *
783     * @return ENGINE_ERROR_CODE status notified to be to the front end
784     */
785    virtual ENGINE_ERROR_CODE completeBGFetchForSingleItem(
786            const DocKey& key,
787            const VBucketBGFetchItem& fetched_item,
788            const ProcessClock::time_point startTime) = 0;
789
790    /**
791     * Retrieve an item from the disk for vkey stats
792     *
793     * @param key the key to fetch
794     * @param cookie the connection cookie
795     * @param eviction_policy The eviction policy
796     * @param engine Reference to ep engine
797     * @param bgFetchDelay
798     *
799     * @return VBReturnCtx indicates notifyCtx and operation result
800     */
801    virtual ENGINE_ERROR_CODE statsVKey(const DocKey& key,
802                                        const void* cookie,
803                                        EventuallyPersistentEngine& engine,
804                                        int bgFetchDelay) = 0;
805
806    /**
807     * Complete the vkey stats for an item background fetched from disk.
808     *
809     * @param key The key of the item
810     * @param gcb Bgfetch cbk obj containing the item from disk
811     *
812     */
813    virtual void completeStatsVKey(const DocKey& key, const GetValue& gcb) = 0;
814
815    /**
816     * Set (add new or update) an item into in-memory structure like
817     * hash table and do not generate a seqno. This is called internally from
818     * ep-engine when we want to update our in-memory data (like in HT) with
819     * another source of truth like disk.
820     * Currently called during rollback.
821     *
822     * @param itm Item to be added or updated. Upon success, the itm
823     *            revSeqno are updated
824     *
825     * @return Result indicating the status of the operation
826     */
827    MutationStatus setFromInternal(Item& itm);
828
829    /**
830     * Set (add new or update) an item in the vbucket.
831     *
832     * @param itm Item to be added or updated. Upon success, the itm
833     *            bySeqno, cas and revSeqno are updated
834     * @param cookie the connection cookie
835     * @param engine Reference to ep engine
836     * @param bgFetchDelay
837     * @param predicate a function to call which if returns true, the set will
838     *        succeed. The function is called against any existing item.
839     *
840     * @return ENGINE_ERROR_CODE status notified to be to the front end
841     */
842    ENGINE_ERROR_CODE set(Item& itm,
843                          const void* cookie,
844                          EventuallyPersistentEngine& engine,
845                          int bgFetchDelay,
846                          cb::StoreIfPredicate predicate);
847
848    /**
849     * Replace (overwrite existing) an item in the vbucket.
850     *
851     * @param itm Item to be added or updated. Upon success, the itm
852     *            bySeqno, cas and revSeqno are updated
853     * @param cookie the connection cookie
854     * @param engine Reference to ep engine
855     * @param bgFetchDelay
856     * @param predicate a function to call which if returns true, the replace
857     *        will succeed. The function is called against any existing item.
858     * @param readHandle Reader access to the Item's collection data.
859     *
860     * @return ENGINE_ERROR_CODE status notified to be to the front end
861     */
862    ENGINE_ERROR_CODE replace(
863            Item& itm,
864            const void* cookie,
865            EventuallyPersistentEngine& engine,
866            int bgFetchDelay,
867            cb::StoreIfPredicate predicate,
868            const Collections::VB::Manifest::CachingReadHandle& readHandle);
869
870    /**
871     * Add an item directly into its vbucket rather than putting it on a
872     * checkpoint (backfill the item). The can happen during DCP or when a
873     * replica vbucket is receiving backfill items from active vbucket.
874     *
875     * @param itm Item to be added/updated from DCP backfill. Upon
876     *            success, the itm revSeqno is updated
877     * @param genBySeqno whether or not to generate sequence number
878     *
879     * @return the result of the operation
880     */
881    ENGINE_ERROR_CODE addBackfillItem(Item& itm, GenerateBySeqno genBySeqno);
882
883    /**
884     * Set an item in the store from a non-front end operation (DCP, XDCR)
885     *
886     * @param item the item to set. Upon success, the itm revSeqno is updated
887     * @param cas value to match
888     * @param seqno sequence number of mutation
889     * @param cookie the cookie representing the client to store the item
890     * @param engine Reference to ep engine
891     * @param bgFetchDelay
892     * @param checkConflicts should conflict resolution be done?
893     * @param allowExisting set to false if you want set to fail if the
894     *                      item exists already
895     * @param genBySeqno whether or not to generate sequence number
896     * @param genCas
897     * @param isReplication set to true if we are to use replication
898     *                      throttle threshold
899     * @param readHandle Reader access to the Item's collection data.
900     *
901     * @return the result of the store operation
902     */
903    ENGINE_ERROR_CODE setWithMeta(
904            Item& itm,
905            uint64_t cas,
906            uint64_t* seqno,
907            const void* cookie,
908            EventuallyPersistentEngine& engine,
909            int bgFetchDelay,
910            CheckConflicts checkConflicts,
911            bool allowExisting,
912            GenerateBySeqno genBySeqno,
913            GenerateCas genCas,
914            bool isReplication,
915            const Collections::VB::Manifest::CachingReadHandle& readHandle);
916
917    /**
918     * Delete an item in the vbucket
919     *
920     * @param[in,out] cas value to match; new cas after logical delete
921     * @param cookie the cookie representing the client to store the item
922     * @param engine Reference to ep engine
923     * @param bgFetchDelay
924     * @param[out] itemMeta pointer to item meta data that needs to be returned
925     *                      as a result the delete. A NULL pointer indicates
926     *                      that no meta data needs to be returned.
927     * @param[out] mutInfo Info to uniquely identify (and order) the delete
928     *                     seq. A NULL pointer indicates no info needs to be
929     *                     returned.
930     * @param readHandle Reader access to the affected key's collection data.
931     *
932     * @return the result of the operation
933     */
934    ENGINE_ERROR_CODE deleteItem(
935            uint64_t& cas,
936            const void* cookie,
937            EventuallyPersistentEngine& engine,
938            int bgFetchDelay,
939            ItemMetaData* itemMeta,
940            mutation_descr_t& mutInfo,
941            const Collections::VB::Manifest::CachingReadHandle& readHandle);
942
943    /**
944     * Delete an item in the vbucket from a non-front end operation (DCP, XDCR)
945     *
946     * @param[in, out] cas value to match; new cas after logical delete
947     * @param[out] seqno Pointer to get the seqno generated for the item. A
948     *                   NULL value is passed if not needed
949     * @param cookie the cookie representing the client to store the item
950     * @param engine Reference to ep engine
951     * @param bgFetchDelay
952     * @param checkConflicts should conflict resolution be done?
953     * @param itemMeta ref to item meta data
954     * @param backfill indicates if the item must be put onto vb queue or
955     *                 onto checkpoint
956     * @param genBySeqno whether or not to generate sequence number
957     * @param generateCas whether or not to generate cas
958     * @param bySeqno seqno of the key being deleted
959     * @param isReplication set to true if we are to use replication
960     *                      throttle threshold
961     * @param readHandle Reader access to the key's collection data.
962     *
963     * @return the result of the operation
964     */
965    ENGINE_ERROR_CODE deleteWithMeta(
966            uint64_t& cas,
967            uint64_t* seqno,
968            const void* cookie,
969            EventuallyPersistentEngine& engine,
970            int bgFetchDelay,
971            CheckConflicts checkConflicts,
972            const ItemMetaData& itemMeta,
973            bool backfill,
974            GenerateBySeqno genBySeqno,
975            GenerateCas generateCas,
976            uint64_t bySeqno,
977            bool isReplication,
978            const Collections::VB::Manifest::CachingReadHandle& readHandle);
979
980    /**
981     * Delete an expired item
982     *
983     * @param it item to be deleted
984     * @param startTime the time to be compared with this item's expiry time
985     * @param source Expiry source
986     */
987    void deleteExpiredItem(const Item& it,
988                           time_t startTime,
989                           ExpireBy source);
990
991    /**
992     * Evict a key from memory.
993     *
994     * @param key Key to evict
995     * @param[out] msg Updated to point to a string (with static duration)
996     *                 describing the result of the operation.
997     *
998     * @return SUCCESS if key was successfully evicted (or was already
999     *                 evicted), or the reason why the request failed.
1000     *
1001     */
1002    virtual protocol_binary_response_status evictKey(const DocKey& key,
1003                                                     const char** msg) = 0;
1004
1005    /**
1006     * Page out a StoredValue from memory.
1007     *
1008     * The definition of "page out" is up to the underlying VBucket
1009     * implementation - this may mean simply ejecting the value from memory
1010     * (Value Eviction), removing the entire document from memory (Full Eviction),
1011     * or actually deleting the document (Ephemeral Buckets).
1012     *
1013     * @param lh Bucket lock associated with the StoredValue.
1014     * @param v[in, out] Ref to the StoredValue to be ejected. Based on the
1015     *                   VBucket type, policy in the vbucket contents of v and
1016     *                   v itself may be changed
1017     *
1018     * @return true if an item is ejected.
1019     */
1020    virtual bool pageOut(const HashTable::HashBucketLock& lh,
1021                         StoredValue*& v) = 0;
1022
1023    /*
1024     * Check to see if a StoredValue is eligible to be paged out of memory.
1025     *
1026     * @param lh Bucket lock associated with the StoredValue.
1027     * @param v Reference to the StoredValue to be ejected.
1028     *
1029     * @return true if the StoredValue is eligible to be paged out.
1030     *
1031     */
1032    virtual bool eligibleToPageOut(const HashTable::HashBucketLock& lh,
1033                                   const StoredValue& v) const = 0;
1034
1035    /**
1036     * Add an item in the store
1037     *
1038     * @param itm the item to add. On success, this will have its seqno and
1039     *            CAS updated.
1040     * @param cookie the cookie representing the client to store the item
1041     * @param engine Reference to ep engine
1042     * @param bgFetchDelay
1043     * @param readHandle Reader access to the Item's collection data.
1044     *
1045     * @return the result of the operation
1046     */
1047    ENGINE_ERROR_CODE add(
1048            Item& itm,
1049            const void* cookie,
1050            EventuallyPersistentEngine& engine,
1051            int bgFetchDelay,
1052            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1053
1054    /**
1055     * Retrieve a value, but update its TTL first
1056     *
1057     * @param cookie the connection cookie
1058     * @param engine Reference to ep engine
1059     * @param bgFetchDelay
1060     * @param exptime the new expiry time for the object
1061     * @param readHandle Reader access to the key's collection data.
1062     *
1063     * @return a GetValue representing the result of the request
1064     */
1065    GetValue getAndUpdateTtl(
1066            const void* cookie,
1067            EventuallyPersistentEngine& engine,
1068            int bgFetchDelay,
1069            time_t exptime,
1070            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1071    /**
1072     * Queue an Item to the checkpoint and return its seqno
1073     *
1074     * @param item an Item object to queue, can be any kind of item and will be
1075     *        given a CAS and seqno by this function.
1076     * @param seqno An optional sequence number, if not specified checkpoint
1077     *        queueing will assign a seqno to the Item.
1078     */
1079    int64_t queueItem(Item* item, OptionalSeqno seqno);
1080
1081    /**
1082     * Get metadata and value for a given key
1083     *
1084     * @param cookie the cookie representing the client
1085     * @param engine Reference to ep engine
1086     * @param bgFetchDelay
1087     * @param options flags indicating some retrieval related info
1088     * @param diskFlushAll
1089     * @param getKeyOnly if GetKeyOnly::Yes we want only the key
1090     * @param readHandle Reader access to the requested key's collection data.
1091     *
1092     * @return the result of the operation
1093     */
1094    GetValue getInternal(
1095            const void* cookie,
1096            EventuallyPersistentEngine& engine,
1097            int bgFetchDelay,
1098            get_options_t options,
1099            bool diskFlushAll,
1100            GetKeyOnly getKeyOnly,
1101            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1102
1103    /**
1104     * Retrieve the meta data for given key
1105     *
1106     * @param cookie the connection cookie
1107     * @param engine Reference to ep engine
1108     * @param bgFetchDelay Delay in secs before we run the bgFetch task
1109     * @param readHandle Reader access to the key's collection data.
1110     * @param[out] metadata meta information returned to the caller
1111     * @param[out] deleted specifies the caller whether or not the key is
1112     *                     deleted
1113     * @param[out] datatype specifies the datatype of the item
1114     *
1115     * @return the result of the operation
1116     */
1117    ENGINE_ERROR_CODE getMetaData(
1118            const void* cookie,
1119            EventuallyPersistentEngine& engine,
1120            int bgFetchDelay,
1121            const Collections::VB::Manifest::CachingReadHandle& readHandle,
1122            ItemMetaData& metadata,
1123            uint32_t& deleted,
1124            uint8_t& datatype);
1125
1126    /**
1127     * Looks up the key stats for the given {vbucket, key}.
1128     *
1129     * @param cookie The client's cookie
1130     * @param engine Reference to ep engine
1131     * @param bgFetchDelay
1132     * @param[out] kstats On success the keystats for this item.
1133     * @param wantsDeleted If yes then return keystats even if the item is
1134     *                     marked as deleted. If no then will return
1135     *                     ENGINE_KEY_ENOENT for deleted items.
1136     * @param readHandle Reader access to the key's collection data.
1137     *
1138     * @return the result of the operation
1139     */
1140    ENGINE_ERROR_CODE getKeyStats(
1141            const void* cookie,
1142            EventuallyPersistentEngine& engine,
1143            int bgFetchDelay,
1144            struct key_stats& kstats,
1145            WantsDeleted wantsDeleted,
1146            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1147
1148    /**
1149     * Gets a locked item for a given key.
1150     *
1151     * @param currentTime Current time to use for locking the item for a
1152     *                    duration of lockTimeout
1153     * @param lockTimeout Timeout for the lock on the item
1154     * @param cookie The client's cookie
1155     * @param engine Reference to ep engine
1156     * @param bgFetchDelay Delay in secs before we run the bgFetch task
1157     * @param readHandle Reader access to the key's collection data.
1158     *
1159     * @return the result of the operation (contains locked item on success)
1160     */
1161    GetValue getLocked(
1162            rel_time_t currentTime,
1163            uint32_t lockTimeout,
1164            const void* cookie,
1165            EventuallyPersistentEngine& engine,
1166            int bgFetchDelay,
1167            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1168    /**
1169     * Update in memory data structures after an item is deleted on disk
1170     *
1171     * @param queuedItem reference to the deleted item
1172     * @param deleted indicates if item actaully deleted or not (in case item
1173     *                did not exist on disk)
1174     */
1175    void deletedOnDiskCbk(const Item& queuedItem, bool deleted);
1176
1177    /**
1178     * Update in memory data structures after a rollback on disk
1179     *
1180     * @param queuedItem item key
1181     *
1182     * @return indicates if the operation is succcessful
1183     */
1184    bool deleteKey(const DocKey& key);
1185
1186    /**
1187     * Creates a DCP backfill object
1188     *
1189     * @param e ref to EventuallyPersistentEngine
1190     * @param stream Shared ptr to the stream for which this backfill obj is
1191     *               created
1192     * @param startSeqno requested start sequence number of the backfill
1193     * @param endSeqno requested end sequence number of the backfill
1194     *
1195     * @return pointer to the backfill object created. Caller to own this
1196     *         object and hence must handle deletion.
1197     */
1198    virtual std::unique_ptr<DCPBackfill> createDCPBackfill(
1199            EventuallyPersistentEngine& e,
1200            std::shared_ptr<ActiveStream> stream,
1201            uint64_t startSeqno,
1202            uint64_t endSeqno) = 0;
1203
1204    /**
1205     * Update failovers, checkpoint mgr and other vBucket members after
1206     * rollback.
1207     *
1208     * @param rollbackResult contains high seqno of the vBucket after rollback,
1209     *                       snapshot start seqno of the last snapshot in the
1210     *                       vBucket after the rollback,
1211     *                       snapshot end seqno of the last snapshot in the
1212     *                       vBucket after the rollback
1213     * @param prevHighSeqno high seqno before the rollback
1214     */
1215    void postProcessRollback(const RollbackResult& rollbackResult,
1216                             uint64_t prevHighSeqno);
1217
1218    /**
1219     * Debug - print a textual description of the VBucket to stderr.
1220     */
1221    virtual void dump() const;
1222
1223    /**
1224     * Sets the callback function to invoke when a frequency counter becomes
1225     * saturated.  The callback function is to invoke the ItemFreqDecayer
1226     * task.
1227     *
1228     * @param callbackFunction - the function to callback.
1229     */
1230    void setFreqSaturatedCallback(std::function<void()> callbackFunction);
1231
1232    /**
1233     * Returns the number of deletes in the memory
1234     *
1235     * @return number of deletes
1236     */
1237    size_t getNumInMemoryDeletes() const {
1238        /* couchbase vbuckets: this is generally (after deletes are persisted)
1239                               zero as hash table doesn't keep deletes after
1240                               they are persisted.
1241           ephemeral vbuckets: we keep deletes in both hash table and ordered
1242                               data structure. */
1243        return ht.getNumDeletedItems();
1244    }
1245
1246    /**
1247     * Set that this VBucket might store documents with xattrs.
1248     * Persistent vbucket will flush this to disk.
1249     */
1250    void setMightContainXattrs() {
1251        mayContainXattrs.store(true);
1252    }
1253
1254    bool mightContainXattrs() const {
1255        return mayContainXattrs.load();
1256    }
1257
1258    cb::vbucket_info getInfo() const {
1259        return {mightContainXattrs()};
1260    }
1261
1262    /**
1263     * If the key@bySeqno is found, remove it from the hash table
1264     *
1265     * @param key The key to look for
1266     * @param bySeqno The seqno of the key to remove
1267     */
1268    void removeKey(const DocKey& key, int64_t bySeqno);
1269
1270    static std::chrono::seconds getCheckpointFlushTimeout();
1271
1272    /**
1273     * Set the memory threshold on the current bucket quota for accepting a
1274     * new mutation. This is same across all the vbuckets
1275     *
1276     * @param memThreshold Threshold between 0 and 1
1277     */
1278    static void setMutationMemoryThreshold(double memThreshold);
1279
1280    /**
1281     * Check if this StoredValue has become logically non-existent.
1282     * By logically non-existent, the item has been deleted
1283     * or doesn't exist
1284     *
1285     * @param v StoredValue to check
1286     * @param readHandle a ReadHandle for safe reading of collection data
1287     * @return true if the item is logically non-existent,
1288     *         false otherwise
1289     */
1290    static bool isLogicallyNonExistent(
1291            const StoredValue& v,
1292            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1293
1294    std::queue<queued_item> rejectQueue;
1295    std::unique_ptr<FailoverTable> failovers;
1296
1297    std::atomic<size_t>  opsCreate;
1298    std::atomic<size_t>  opsUpdate;
1299    std::atomic<size_t>  opsDelete;
1300    std::atomic<size_t>  opsReject;
1301
1302    cb::NonNegativeCounter<size_t> dirtyQueueSize;
1303    std::atomic<size_t>  dirtyQueueMem;
1304    std::atomic<size_t>  dirtyQueueFill;
1305    std::atomic<size_t>  dirtyQueueDrain;
1306    std::atomic<uint64_t> dirtyQueueAge;
1307    std::atomic<size_t>  dirtyQueuePendingWrites;
1308    std::atomic<size_t>  metaDataDisk;
1309
1310    std::atomic<size_t>  numExpiredItems;
1311
1312    /**
1313     * A custom delete function for deleting VBucket objects. Any thread could
1314     * be the last thread to release a VBucketPtr and deleting a VB will
1315     * eventually hit the I/O sub-system when we unlink the file, to be sure no
1316     * front-end thread does this work, we schedule the deletion to a background
1317     * task. This task scheduling is triggered by the shared_ptr/VBucketPtr
1318     * using this object as the deleter.
1319     */
1320    struct DeferredDeleter {
1321        DeferredDeleter(EventuallyPersistentEngine& engine) : engine(engine) {
1322        }
1323
1324        /**
1325         * Called when the VBucketPtr has no more owners and runs delete on
1326         * the object.
1327         */
1328        void operator()(VBucket* vb) const;
1329
1330        EventuallyPersistentEngine& engine;
1331    };
1332
1333protected:
1334    /**
1335     * This function checks for the various states of the value & depending on
1336     * which the calling function can issue a bgfetch as needed.
1337     */
1338    std::pair<MutationStatus, GetValue> processGetAndUpdateTtl(
1339            HashTable::HashBucketLock& hbl,
1340            StoredValue* v,
1341            time_t exptime,
1342            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1343    /**
1344     * This function checks cas, expiry and other partition (vbucket) related
1345     * rules before setting an item into other in-memory structure like HT,
1346     * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1347     *
1348     * @param hbl Hash table bucket lock that must be held
1349     * @param v Reference to the ptr of StoredValue. This can be changed if a
1350     *          new StoredValue is added or just its contents is changed if the
1351     *          exisiting StoredValue is updated
1352     * @param itm Item to be added/updated. On success, its revSeqno is updated
1353     * @param cas value to match
1354     * @param allowExisting set to false if you want set to fail if the
1355     *                      item exists already
1356     * @param hasMetaData
1357     * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1358     *                    backfill queue
1359     * @param storeIfStatus the status of any conditional store predicate
1360     * @param maybeKeyExists true if the key /may/ exist on disk (as an active,
1361     *                       alive document). Only valid if `v` is null.
1362     * @param isReplication true if issued by consumer (for replication)
1363     *
1364     * @return Result indicating the status of the operation and notification
1365     *                info (if operation was successful).
1366     */
1367    std::pair<MutationStatus, boost::optional<VBNotifyCtx>> processSet(
1368            const HashTable::HashBucketLock& hbl,
1369            StoredValue*& v,
1370            Item& itm,
1371            uint64_t cas,
1372            bool allowExisting,
1373            bool hasMetaData,
1374            const VBQueueItemCtx& queueItmCtx,
1375            cb::StoreIfStatus storeIfStatus,
1376            bool maybeKeyExists = true,
1377            bool isReplication = false);
1378
1379    /**
1380     * This function checks cas, expiry and other partition (vbucket) related
1381     * rules before adding an item into other in-memory structure like HT,
1382     * and checkpoint mgr. This function assumes that HT bucket lock is grabbed.
1383     *
1384     * @param hbl Hash table bucket lock that must be held
1385     * @param v[in, out] the stored value to do this operation on
1386     * @param itm Item to be added/updated. On success, its revSeqno is updated
1387     * @param isReplication true if issued by consumer (for replication)
1388     * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1389     *                    backfill queue
1390     * @param readHandle Reader access to the Item's collection data.
1391     *
1392     * @return Result indicating the status of the operation and notification
1393     *                info (if the operation was successful).
1394     */
1395    std::pair<AddStatus, boost::optional<VBNotifyCtx>> processAdd(
1396            const HashTable::HashBucketLock& hbl,
1397            StoredValue*& v,
1398            Item& itm,
1399            bool maybeKeyExists,
1400            bool isReplication,
1401            const VBQueueItemCtx& queueItmCtx,
1402            const Collections::VB::Manifest::CachingReadHandle& readHandle);
1403
1404    /**
1405     * This function checks cas, eviction policy and other partition
1406     * (vbucket) related rules before logically (soft) deleting an item in
1407     * in-memory structure like HT, and checkpoint mgr.
1408     * Assumes that HT bucket lock is grabbed.
1409     *
1410     * @param hbl Hash table bucket lock that must be held
1411     * @param v Reference to the StoredValue to be soft deleted
1412     * @param cas the expected CAS of the item (or 0 to override)
1413     * @param metadata ref to item meta data
1414     * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1415     *                    backfill queue
1416     * @param use_meta Indicates if v must be updated with the metadata
1417     * @param bySeqno seqno of the key being deleted
1418     *
1419     * @return pointer to the updated StoredValue. It can be same as that of
1420     *         v or different value if a new StoredValue is created for the
1421     *         update.
1422     *         status of the operation.
1423     *         notification info, if status was successful.
1424     */
1425    std::tuple<MutationStatus, StoredValue*, boost::optional<VBNotifyCtx>>
1426    processSoftDelete(const HashTable::HashBucketLock& hbl,
1427                      StoredValue& v,
1428                      uint64_t cas,
1429                      const ItemMetaData& metadata,
1430                      const VBQueueItemCtx& queueItmCtx,
1431                      bool use_meta,
1432                      uint64_t bySeqno);
1433
1434    /**
1435     * Delete a key (associated StoredValue) from ALL in-memory data structures
1436     * like HT.
1437     * Assumes that HT bucket lock is grabbed.
1438     *
1439     * Currently StoredValues form HashTable intrusively. That is, HashTable
1440     * does not store a reference or a copy of the StoredValue. If any other
1441     * in-memory data strucutures are formed intrusively using StoredValues,
1442     * then it must be decided in this function which data structure deletes
1443     * the StoredValue. Currently it is HashTable that deleted the StoredValue
1444     *
1445     * @param hbl Hash table bucket lock that must be held
1446     * @param v Reference to the StoredValue to be deleted
1447     *
1448     * @return true if an object was deleted, false otherwise
1449     */
1450    bool deleteStoredValue(const HashTable::HashBucketLock& hbl,
1451                           StoredValue& v);
1452
1453    /**
1454     * Queue an item for persistence and replication. Maybe track CAS drift
1455     *
1456     * The caller of this function must hold the lock of the hash table
1457     * partition that contains the StoredValue being Queued.
1458     *
1459     * @param v the dirty item. The cas and seqno maybe updated based on the
1460     *          flags passed
1461     * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1462     *                    backfill queue, whether to track cas, generate seqno,
1463     *                    generate new cas
1464     *
1465     * @return Notification context containing info needed to notify the
1466     *         clients (like connections, flusher)
1467     */
1468    VBNotifyCtx queueDirty(StoredValue& v, const VBQueueItemCtx& queueItmCtx);
1469
1470    /**
1471     * Queue an item for persistence and replication
1472     *
1473     * The caller of this function must hold the lock of the hash table
1474     * partition that contains the StoredValue being Queued.
1475     *
1476     * @param v the dirty item. The cas and seqno maybe updated based on the
1477     *          flags passed
1478     * @param generateBySeqno request that the seqno is generated by this call
1479     * @param generateCas request that the CAS is generated by this call
1480     * @param generateDeleteTime for queueing deletes, should the queue item
1481     *        have its delete time generated.
1482     * @param isBackfillItem indicates if the item must be put onto vb queue or
1483     *        onto checkpoint
1484     * @param preLinkDocumentContext context object which allows running the
1485     *        document pre link callback after the cas is assinged (but
1486     *        but document not available for anyone)
1487     *
1488     * @return Notification context containing info needed to notify the
1489     *         clients (like connections, flusher)
1490     */
1491    VBNotifyCtx queueDirty(
1492            StoredValue& v,
1493            GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes,
1494            GenerateCas generateCas = GenerateCas::Yes,
1495            GenerateDeleteTime generateDeleteTime = GenerateDeleteTime::Yes,
1496            bool isBackfillItem = false,
1497            PreLinkDocumentContext* preLinkDocumentContext = nullptr);
1498
1499    /**
1500     * Adds a temporary StoredValue in in-memory data structures like HT.
1501     * Assumes that HT bucket lock is grabbed.
1502     *
1503     * @param hbl Hash table bucket lock that must be held
1504     * @param key the key for which a temporary item needs to be added
1505     * @param isReplication true if issued by consumer (for replication)
1506     *
1507     * @return Result indicating the status of the operation
1508     */
1509    TempAddStatus addTempStoredValue(const HashTable::HashBucketLock& hbl,
1510                                     const DocKey& key,
1511                                     bool isReplication = false);
1512
1513    /**
1514     * Internal wrapper function around the callback to be called when a new
1515     * seqno is generated in the vbucket
1516     *
1517     * @param notifyCtx holds info needed for notification
1518     */
1519    void notifyNewSeqno(const VBNotifyCtx& notifyCtx);
1520
1521    /**
1522     * VBucket internal function to store high priority requests on the vbucket.
1523     *
1524     * @param seqnoOrChkId seqno to be seen or checkpoint id to be persisted
1525     * @param cookie cookie of conn to be notified
1526     * @param reqType request type indicating seqno or chk persistence
1527     */
1528    void addHighPriorityVBEntry(uint64_t seqnoOrChkId,
1529                                const void* cookie,
1530                                HighPriorityVBNotify reqType);
1531
1532    /**
1533     * Get all high priority notifications as temporary failures because they
1534     * could not be completed.
1535     *
1536     * @param engine Ref to ep-engine
1537     *
1538     * @return map of notifies with conn cookie as the key and notify status as
1539     *         the value
1540     */
1541    std::map<const void*, ENGINE_ERROR_CODE> tmpFailAndGetAllHpNotifies(
1542            EventuallyPersistentEngine& engine);
1543
1544    /**
1545     * Check if there is memory available to allocate the in-memory
1546     * instance (StoredValue or OrderedStoredValue) for an item.
1547     *
1548     * @param st Reference to epstats
1549     * @param item Item that is being added
1550     * @param isReplication Flag indicating if the item is from replication
1551     *
1552     * @return True if there is memory for the item; else False
1553     */
1554    bool hasMemoryForStoredValue(EPStats& st,
1555                                 const Item& item,
1556                                 bool isReplication);
1557
1558    void _addStats(bool details, ADD_STAT add_stat, const void* c);
1559
1560    template <typename T>
1561    void addStat(const char* nm,
1562                 const T& val,
1563                 ADD_STAT add_stat,
1564                 const void* c);
1565
1566    /* This member holds the eviction policy used */
1567    const item_eviction_policy_t eviction;
1568
1569    /* Reference to global (EP engine wide) stats */
1570    EPStats& stats;
1571
1572    /* last seqno that is persisted on the disk */
1573    std::atomic<uint64_t> persistenceSeqno;
1574
1575    /* holds all high priority async requests to the vbucket */
1576    std::list<HighPriorityVBEntry> hpVBReqs;
1577
1578    /* synchronizes access to hpVBReqs */
1579    std::mutex hpVBReqsMutex;
1580
1581    /* size of list hpVBReqs (to avoid MB-9434) */
1582    Couchbase::RelaxedAtomic<size_t> numHpVBReqs;
1583
1584    /**
1585     * VBucket sub-classes must implement a function that will schedule
1586     * an appropriate task that will delete the VBucket and its resources.
1587     *
1588     * @param engine owning engine (required for task construction)
1589     */
1590    virtual void scheduleDeferredDeletion(
1591            EventuallyPersistentEngine& engine) = 0;
1592
1593    /**
1594     * Update the revision seqno of a newly StoredValue item.
1595     * We must ensure that it is greater the maxDeletedRevSeqno
1596     *
1597     * @param v StoredValue added newly. Its revSeqno is updated
1598     */
1599    void updateRevSeqNoOfNewStoredValue(StoredValue& v);
1600
1601private:
1602    void fireAllOps(EventuallyPersistentEngine& engine, ENGINE_ERROR_CODE code);
1603
1604    void decrDirtyQueueMem(size_t decrementBy);
1605
1606    void decrDirtyQueueAge(uint32_t decrementBy);
1607
1608    void decrDirtyQueuePendingWrites(size_t decrementBy);
1609
1610    /**
1611     * Updates an existing StoredValue in in-memory data structures like HT.
1612     * Assumes that HT bucket lock is grabbed.
1613     *
1614     * @param htLock Hash table lock that must be held
1615     * @param v Reference to the StoredValue to be updated.
1616     * @param itm Item to be updated.
1617     * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1618     *                    backfill queue
1619     * @param justTouch   To note that this object is an existing item with
1620     *                    the same value but with few flags changed.
1621     * @return pointer to the updated StoredValue. It can be same as that of
1622     *         v or different value if a new StoredValue is created for the
1623     *         update.
1624     *         status of the operation.
1625     *         notification info.
1626     */
1627    virtual std::tuple<StoredValue*, MutationStatus, VBNotifyCtx>
1628    updateStoredValue(const HashTable::HashBucketLock& hbl,
1629                      StoredValue& v,
1630                      const Item& itm,
1631                      const VBQueueItemCtx& queueItmCtx,
1632                      bool justTouch = false) = 0;
1633
1634    /**
1635     * Adds a new StoredValue in in-memory data structures like HT.
1636     * Assumes that HT bucket lock is grabbed.
1637     *
1638     * @param hbl Hash table bucket lock that must be held
1639     * @param itm Item to be added.
1640     * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1641     *                    backfill queue
1642     * @param genRevSeqno whether to generate new revision sequence number
1643     *                    or not
1644     *
1645     * @return Ptr of the StoredValue added and notification info
1646     */
1647    virtual std::pair<StoredValue*, VBNotifyCtx> addNewStoredValue(
1648            const HashTable::HashBucketLock& hbl,
1649            const Item& itm,
1650            const VBQueueItemCtx& queueItmCtx,
1651            GenerateRevSeqno genRevSeqno) = 0;
1652
1653    /**
1654     * Logically (soft) delete item in all in-memory data structures. Also
1655     * updates revSeqno. Depending on the in-memory data structure the item may
1656     * be marked delete and/or reset and/or a new value (marked as deleted)
1657     * added.
1658     * Assumes that HT bucket lock is grabbed.
1659     * Also assumes that v is in the hash table.
1660     *
1661     * @param hbl Hash table bucket lock that must be held
1662     * @param v Reference to the StoredValue to be soft deleted
1663     * @param onlyMarkDeleted indicates if we must reset the StoredValue or
1664     *                        just mark deleted
1665     * @param queueItmCtx holds info needed to queue an item in chkpt or vb
1666     *                    backfill queue
1667     * @param bySeqno seqno of the key being deleted
1668     *
1669     * @return pointer to the updated StoredValue. It can be same as that of
1670     *         v or different value if a new StoredValue is created for the
1671     *         update.
1672     *         notification info.
1673     */
1674    virtual std::tuple<StoredValue*, VBNotifyCtx> softDeleteStoredValue(
1675            const HashTable::HashBucketLock& hbl,
1676            StoredValue& v,
1677            bool onlyMarkDeleted,
1678            const VBQueueItemCtx& queueItmCtx,
1679            uint64_t bySeqno) = 0;
1680
1681    /**
1682     * This function handles expiry relatead stuff before logically (soft)
1683     * deleting an item in in-memory structures like HT, and checkpoint mgr.
1684     * Assumes that HT bucket lock is grabbed.
1685     *
1686     * @param hbl Hash table bucket lock that must be held
1687     * @param v Reference to the StoredValue to be soft deleted
1688     *
1689     * @return status of the operation.
1690     *         pointer to the updated StoredValue. It can be same as that of
1691     *         v or different value if a new StoredValue is created for the
1692     *         update.
1693     *         notification info.
1694     */
1695    std::tuple<MutationStatus, StoredValue*, VBNotifyCtx> processExpiredItem(
1696            const HashTable::HashBucketLock& hbl, StoredValue& v);
1697
1698    /**
1699     * Add a temporary item in hash table and enqueue a background fetch for a
1700     * key.
1701     *
1702     * @param hbl Reference to the hash table bucket lock
1703     * @param key the key to be bg fetched
1704     * @param cookie the cookie of the requestor
1705     * @param engine Reference to ep engine
1706     * @param bgFetchDelay Delay in secs before we run the bgFetch task
1707     * @param metadataOnly whether the fetch is for a non-resident value or
1708     *                     metadata of a (possibly) deleted item
1709     * @param isReplication indicates if the call is for a replica vbucket
1710     *
1711     * @return ENGINE_ERROR_CODE status notified to be to the front end
1712     */
1713    virtual ENGINE_ERROR_CODE addTempItemAndBGFetch(
1714            HashTable::HashBucketLock& hbl,
1715            const DocKey& key,
1716            const void* cookie,
1717            EventuallyPersistentEngine& engine,
1718            int bgFetchDelay,
1719            bool metadataOnly,
1720            bool isReplication = false) = 0;
1721
1722    /**
1723     * Enqueue a background fetch for a key.
1724     *
1725     * @param key the key to be bg fetched
1726     * @param cookie the cookie of the requestor
1727     * @param engine Reference to ep engine
1728     * @param bgFetchDelay Delay in secs before we run the bgFetch task
1729     * @param isMeta whether the fetch is for a non-resident value or metadata
1730     *               of a (possibly) deleted item
1731     */
1732    virtual void bgFetch(const DocKey& key,
1733                         const void* cookie,
1734                         EventuallyPersistentEngine& engine,
1735                         int bgFetchDelay,
1736                         bool isMeta = false) = 0;
1737
1738    /**
1739     * Get metadata and value for a non-resident key
1740     *
1741     * @param key key for which metadata and value should be retrieved
1742     * @param cookie the cookie representing the client
1743     * @param engine Reference to ep engine
1744     * @param bgFetchDelay Delay in secs before we run the bgFetch task
1745     * @param queueBgFetch Indicates whether a background fetch needs to be
1746     *        queued
1747     * @param v reference to the stored value of the non-resident key
1748     *
1749     * @return the result of the operation
1750     */
1751    virtual GetValue getInternalNonResident(const DocKey& key,
1752                                            const void* cookie,
1753                                            EventuallyPersistentEngine& engine,
1754                                            int bgFetchDelay,
1755                                            QueueBgFetch queueBgFetch,
1756                                            const StoredValue& v) = 0;
1757
1758    /**
1759     * Increase the expiration count global stats and in the vbucket stats
1760     */
1761    void incExpirationStat(ExpireBy source);
1762
1763    void adjustCheckpointFlushTimeout(std::chrono::seconds wall_time);
1764
1765    /**
1766     * Given a StoredValue with XATTRs - prune the user keys so only system keys
1767     * remain.
1768     *
1769     * @param v StoredValue with XATTR value
1770     * @param itemMeta New ItemMetaData to use in item creation
1771     * @return unique_ptr<Item> which matches the StoredValue's meta-data and
1772     *         has the XATTR value with only the system-keys. If the pruning
1773     *         removed all keys (because no system-keys exist) an empty
1774     *         unique_ptr is returned.
1775     */
1776    std::unique_ptr<Item> pruneXattrDocument(StoredValue& v,
1777                                             const ItemMetaData& itemMeta);
1778
1779    /**
1780     * Estimate the new total memory usage with the allocation of an in-memory
1781     * instance for item
1782     *
1783     * @param st Reference to epstats
1784     * @param item Item that is being added
1785     *
1786     * @return new total size for this Bucket once Item is allocated
1787     */
1788    virtual size_t estimateNewMemoryUsage(EPStats& st, const Item& item) = 0;
1789
1790    /*
1791     * Call the predicate with item_info from v (none if v is nullptr)
1792     * @param predicate a function to call, must be initialised
1793     * @param v the StoredValue (or nullptr if none in cache)
1794     * @return how the caller should proceed (store_if semantics)
1795     */
1796    cb::StoreIfStatus callPredicate(cb::StoreIfPredicate predicate,
1797                                    StoredValue* v);
1798
1799    id_type                         id;
1800    std::atomic<vbucket_state_t>    state;
1801    cb::RWLock                      stateLock;
1802    vbucket_state_t                 initialState;
1803    std::mutex                           pendingOpLock;
1804    std::vector<const void*>        pendingOps;
1805    ProcessClock::time_point        pendingOpsStart;
1806    WeaklyMonotonic<uint64_t>       purge_seqno;
1807    std::atomic<bool>               takeover_backed_up;
1808
1809    /* snapshotMutex is used to update/read the pair {start, end} atomically,
1810       but not if reading a single field. */
1811    mutable std::mutex snapshotMutex;
1812    uint64_t persisted_snapshot_start;
1813    uint64_t persisted_snapshot_end;
1814
1815    std::mutex bfMutex;
1816    std::unique_ptr<BloomFilter> bFilter;
1817    std::unique_ptr<BloomFilter> tempFilter;    // Used during compaction.
1818
1819    std::atomic<uint64_t> rollbackItemCount;
1820
1821    HLC hlc;
1822    std::string statPrefix;
1823    // The persistence checkpoint ID for this vbucket.
1824    std::atomic<uint64_t> persistenceCheckpointId;
1825    // Flag to indicate the vbucket is being created
1826    std::atomic<bool> bucketCreation;
1827    // Flag to indicate the vbucket deletion is deferred
1828    std::atomic<bool> deferredDeletion;
1829    /// A cookie that can be set when the vbucket is deletion is deferred, the
1830    /// cookie will be notified when the deferred deletion completes
1831    const void* deferredDeletionCookie;
1832
1833    // Ptr to the item conflict resolution module
1834    std::unique_ptr<ConflictResolution> conflictResolver;
1835
1836    // A callback to be called when a new seqno is generated in the vbucket as
1837    // a result of a front end call
1838    NewSeqnoCallback newSeqnoCb;
1839
1840    /// The VBucket collection state
1841    Collections::VB::Manifest manifest;
1842
1843    /**
1844     * records if the vbucket has had xattrs documents written to it, note that
1845     * rollback of data or delete of all the xattr documents does not undo the
1846     * flag.
1847     */
1848    std::atomic<bool> mayContainXattrs;
1849
1850    static cb::AtomicDuration chkFlushTimeout;
1851
1852    static double mutationMemThreshold;
1853
1854    friend class VBucketTestBase;
1855
1856    DISALLOW_COPY_AND_ASSIGN(VBucket);
1857};
1858
1859using VBucketPtr = std::shared_ptr<VBucket>;
1860
1861/**
1862 * Represents a locked VBucket that provides RAII semantics for the lock.
1863 *
1864 * Behaves like the underlying shared_ptr - i.e. `operator->` is overloaded
1865 * to return the underlying VBucket.
1866 */
1867class LockedVBucketPtr {
1868public:
1869    LockedVBucketPtr(VBucketPtr vb, std::unique_lock<std::mutex>&& lock)
1870        : vb(std::move(vb)), lock(std::move(lock)) {
1871    }
1872
1873    VBucket& operator*() const {
1874        return *vb;
1875    }
1876
1877    VBucket* operator->() const {
1878        return vb.get();
1879    }
1880
1881    explicit operator bool() const {
1882        return vb.operator bool();
1883    }
1884
1885    VBucketPtr& getVB() {
1886        return vb;
1887    }
1888
1889    /// Return true if this object owns the mutex.
1890    bool owns_lock() const {
1891        return lock.owns_lock();
1892    }
1893
1894private:
1895    VBucketPtr vb;
1896    std::unique_lock<std::mutex> lock;
1897};
1898