xref: /4.6.4/ep-engine/src/ep.h (revision 73d84472)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2010 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_EP_H_
19#define SRC_EP_H_ 1
20
21#include "config.h"
22
23#include "ep_types.h"
24#include "executorpool.h"
25#include "stored-value.h"
26#include "task_type.h"
27#include "vbucket.h"
28#include "vbucketmap.h"
29#include "utility.h"
30
31class ExtendedMetaData;
32
33/**
34 * vbucket-aware hashtable visitor.
35 *
36 * The caller (e.g. EventuallyPersistentStore::visit) will call visitBucket()
37 * for each valid VBucket, finally calling complete() after all vBuckets have
38 * been visited.
39 *
40 * Callers *may* call the pauseVisitor() method periodically (typically between
41 * vBuckets) which should return true if visiting VBuckets should be paused
42 * temporarily (typically to break up long-running visitation tasks to allow
43 * other Tasks to run).
44 */
45class VBucketVisitor {
46public:
47
48    VBucketVisitor() { }
49
50    virtual ~VBucketVisitor() {}
51
52    VBucketVisitor(const VBucketFilter &filter)
53        : vBucketFilter(filter) { }
54
55    /**
56     * Begin visiting a bucket.
57     *
58     * @param vb the vbucket we are beginning to visit
59     */
60    virtual void visitBucket(RCPtr<VBucket> &vb) = 0;
61
62    const VBucketFilter &getVBucketFilter() {
63        return vBucketFilter;
64    }
65
66    /**
67     * Called after all vbuckets have been visited.
68     */
69    virtual void complete() { }
70
71    /**
72     * Return true if visiting vbuckets should be paused temporarily.
73     */
74    virtual bool pauseVisitor() {
75        return false;
76    }
77
78protected:
79    VBucketFilter vBucketFilter;
80};
81
82// Forward declaration
83class BGFetchCallback;
84class ConflictResolution;
85class DefragmenterTask;
86class EventuallyPersistentStore;
87class Flusher;
88class MutationLog;
89class PauseResumeEPStoreVisitor;
90class PersistenceCallback;
91class Warmup;
92
93/**
94 * VBucket visitor callback adaptor.
95 */
96class VBCBAdaptor : public GlobalTask {
97public:
98
99    VBCBAdaptor(EventuallyPersistentStore *s, TaskId id,
100                std::shared_ptr<VBucketVisitor> v, const char *l,
101                double sleep=0);
102
103    std::string getDescription() {
104        std::stringstream rv;
105        rv << label << " on vb " << currentvb.load();
106        return rv.str();
107    }
108
109    bool run(void);
110
111private:
112    std::queue<uint16_t>        vbList;
113    EventuallyPersistentStore  *store;
114    std::shared_ptr<VBucketVisitor>  visitor;
115    const char                 *label;
116    double                      sleepTime;
117    AtomicValue<uint16_t>       currentvb;
118
119    DISALLOW_COPY_AND_ASSIGN(VBCBAdaptor);
120};
121
122
123/**
124 * Vbucket visitor task for a generic scheduler.
125 */
126class VBucketVisitorTask : public GlobalTask {
127public:
128
129    VBucketVisitorTask(EventuallyPersistentStore *s,
130                       std::shared_ptr<VBucketVisitor> v, uint16_t sh,
131                       const char *l, double sleep=0, bool shutdown=true);
132
133    std::string getDescription() {
134        std::stringstream rv;
135        rv << label << " on vb " << currentvb;
136        return rv.str();
137    }
138
139    bool run();
140
141private:
142    std::queue<uint16_t>         vbList;
143    EventuallyPersistentStore   *store;
144    std::shared_ptr<VBucketVisitor>   visitor;
145    const char                  *label;
146    double                       sleepTime;
147    uint16_t                     currentvb;
148    uint16_t                     shardID;
149};
150
151const uint16_t EP_PRIMARY_SHARD = 0;
152class KVShard;
153
154typedef std::pair<uint16_t, ExTask> CompTaskEntry;
155
156/**
157 * The following will be used to identify
158 * the source of an item's expiration.
159 */
160enum exp_type_t {
161    EXP_BY_PAGER,
162    EXP_BY_COMPACTOR,
163    EXP_BY_ACCESS
164};
165
166/**
167 * The following options can be specified
168 * for retrieving an item for get calls
169 */
170enum get_options_t {
171    NONE             = 0x0000,  //no option
172    TRACK_STATISTICS = 0x0001,  //whether statistics need to be tracked or not
173    QUEUE_BG_FETCH   = 0x0002,  //whether a background fetch needs to be queued
174    HONOR_STATES     = 0x0004,  //whether a retrieval should depend on the state
175                                //of the vbucket
176    TRACK_REFERENCE  = 0x0008,  //whether NRU bit needs to be set for the item
177    DELETE_TEMP      = 0x0010,  //whether temporary items need to be deleted
178    HIDE_LOCKED_CAS  = 0x0020   //whether locked items should have their CAS
179                                //hidden (return -1).
180};
181
182/**
183 * Manager of all interaction with the persistence.
184 */
185class EventuallyPersistentStore {
186public:
187
188    /**
189     * Represents a position within the epStore, used when visiting items.
190     *
191     * Currently opaque (and constant), clients can pass them around but
192     * cannot reposition the iterator.
193     */
194    class Position {
195    public:
196        bool operator==(const Position& other) const {
197            return (vbucket_id == other.vbucket_id);
198        }
199
200    private:
201        Position(uint16_t vbucket_id_) : vbucket_id(vbucket_id_) {}
202
203        uint16_t vbucket_id;
204
205        friend class EventuallyPersistentStore;
206        friend std::ostream& operator<<(std::ostream& os, const Position& pos);
207    };
208
209    EventuallyPersistentStore(EventuallyPersistentEngine &theEngine);
210    ~EventuallyPersistentStore();
211
212    bool initialize();
213
214    /**
215     * Set an item in the store.
216     * @param item the item to set
217     * @param cookie the cookie representing the client to store the item
218     * @param force override access to the vbucket even if the state of the
219     *              vbucket would deny mutations.
220     * @param nru the nru bit value for the item
221     * @return the result of the store operation
222     */
223    ENGINE_ERROR_CODE set(const Item &item,
224                          const void *cookie,
225                          bool force = false,
226                          uint8_t nru = 0xff);
227
228    /**
229     * Add an item in the store.
230     * @param item the item to add
231     * @param cookie the cookie representing the client to store the item
232     * @return the result of the operation
233     */
234    ENGINE_ERROR_CODE add(const Item &item, const void *cookie);
235
236    /**
237     * Replace an item in the store.
238     * @param item the item to replace
239     * @param cookie the cookie representing the client to store the item
240     * @return the result of the operation
241     */
242    ENGINE_ERROR_CODE replace(const Item &item, const void *cookie);
243
244    /**
245     * Add an TAP backfill item into its corresponding vbucket
246     * @param item the item to be added
247     * @param nru the nru bit for the item
248     * @param genBySeqno whether or not to generate sequence number
249     * @return the result of the operation
250     */
251    ENGINE_ERROR_CODE addTAPBackfillItem(const Item &item, uint8_t nru = 0xff,
252                                         bool genBySeqno = true,
253                                         ExtendedMetaData *emd = NULL);
254
255    /**
256     * Retrieve a value.
257     *
258     * @param key     the key to fetch
259     * @param vbucket the vbucket from which to retrieve the key
260     * @param cookie  the connection cookie
261     * @param options options specified for retrieval
262     *
263     * @return a GetValue representing the result of the request
264     */
265    GetValue get(const const_sized_buffer key, uint16_t vbucket,
266                 const void *cookie, get_options_t options) {
267        return getInternal(key, vbucket, cookie, vbucket_state_active,
268                           options);
269    }
270
271    GetValue getRandomKey(void);
272
273    /**
274     * Retrieve a value from a vbucket in replica state.
275     *
276     * @param key     the key to fetch
277     * @param vbucket the vbucket from which to retrieve the key
278     * @param cookie  the connection cookie
279     * @param options options specified for retrieval
280     *
281     * @return a GetValue representing the result of the request
282     */
283    GetValue getReplica(const const_sized_buffer key, uint16_t vbucket,
284                        const void *cookie,
285                        get_options_t options = static_cast<get_options_t>(
286                                                        QUEUE_BG_FETCH |
287                                                        HONOR_STATES |
288                                                        TRACK_REFERENCE |
289                                                        DELETE_TEMP |
290                                                        HIDE_LOCKED_CAS)) {
291        return getInternal(key, vbucket, cookie, vbucket_state_replica,
292                           options);
293    }
294
295
296    /**
297     * Retrieve the meta data for an item
298     *
299     * @parapm key the key to get the meta data for
300     * @param vbucket the vbucket from which to retrieve the key
301     * @param cookie the connection cookie
302     * @param metadata where to store the meta informaion
303     * @param deleted specifies whether or not the key is deleted
304     * @param trackReference true if we want to set the nru bit for the item
305     */
306    ENGINE_ERROR_CODE getMetaData(const std::string &key,
307                                  uint16_t vbucket,
308                                  const void *cookie,
309                                  ItemMetaData &metadata,
310                                  uint32_t &deleted,
311                                  bool trackReference = false);
312
313    /**
314     * Set an item in the store.
315     * @param item the item to set
316     * @param cas value to match
317     * @param seqno sequence number of mutation
318     * @param cookie the cookie representing the client to store the item
319     * @param force override vbucket states
320     * @param allowExisting set to false if you want set to fail if the
321     *                      item exists already
322     * @param nru the nru bit for the item
323     * @param genBySeqno whether or not to generate sequence number
324     * @param emd ExtendedMetaData class object that contains any ext meta
325     * @param isReplication set to true if we are to use replication
326     *                      throttle threshold
327     *
328     * @return the result of the store operation
329     */
330    ENGINE_ERROR_CODE setWithMeta(const Item &item,
331                                  uint64_t cas,
332                                  uint64_t *seqno,
333                                  const void *cookie,
334                                  bool force,
335                                  bool allowExisting,
336                                  uint8_t nru = 0xff,
337                                  GenerateBySeqno genBySeqno = GenerateBySeqno::Yes,
338                                  GenerateCas genCas = GenerateCas::No,
339                                  ExtendedMetaData *emd = NULL,
340                                  bool isReplication = false);
341
342    /**
343     * Retrieve a value, but update its TTL first
344     *
345     * @param key the key to fetch
346     * @param vbucket the vbucket from which to retrieve the key
347     * @param cookie the connection cookie
348     * @param exptime the new expiry time for the object
349     *
350     * @return a GetValue representing the result of the request
351     */
352    GetValue getAndUpdateTtl(const std::string &key, uint16_t vbucket,
353                             const void *cookie, time_t exptime);
354
355    /**
356     * Retrieve an item from the disk for vkey stats
357     *
358     * @param key the key to fetch
359     * @param vbucket the vbucket from which to retrieve the key
360     * @param cookie the connection cookie
361     * @param cb callback to return an item fetched from the disk
362     *
363     * @return a status resulting form executing the method
364     */
365    ENGINE_ERROR_CODE statsVKey(const std::string &key,
366                                uint16_t vbucket,
367                                const void *cookie);
368
369    void completeStatsVKey(const void* cookie, std::string &key, uint16_t vbid,
370                           uint64_t bySeqNum);
371
372    protocol_binary_response_status evictKey(const std::string &key,
373                                             uint16_t vbucket,
374                                             const char **msg,
375                                             size_t *msg_size,
376                                             bool force=false);
377
378    /**
379     * delete an item in the store.
380     * @param key the key of the item
381     * @param cas the CAS ID for a CASed delete (0 to override)
382     * @param vbucket the vbucket for the key
383     * @param cookie the cookie representing the client
384     * @param force override access to the vbucket even if the state of the
385     *              vbucket would deny mutations.
386     * @param itemMeta the pointer to the metadata memory.
387     *
388     * (deleteWithMeta)
389     * @param genBySeqno whether or not to generate sequence number
390     * @param emd ExtendedMetaData class object that contains any ext meta
391     * @param isReplication set to true if we are to use replication
392     *                      throttle threshold
393     *
394     * @return the result of the delete operation
395     */
396    ENGINE_ERROR_CODE deleteItem(const std::string &key,
397                                 uint64_t* cas,
398                                 uint16_t vbucket,
399                                 const void *cookie,
400                                 bool force,
401                                 ItemMetaData *itemMeta,
402                                 mutation_descr_t *mutInfo);
403
404    ENGINE_ERROR_CODE deleteWithMeta(const std::string &key,
405                                     uint64_t* cas,
406                                     uint64_t* seqno,
407                                     uint16_t vbucket,
408                                     const void *cookie,
409                                     bool force,
410                                     ItemMetaData *itemMeta,
411                                     bool tapBackfill,
412                                     GenerateBySeqno genBySeqno,
413                                     GenerateCas generateCas,
414                                     uint64_t bySeqno,
415                                     ExtendedMetaData *emd,
416                                     bool isReplication);
417
418    void reset();
419
420    /**
421     * Set the background fetch delay.
422     *
423     * This exists for debugging and testing purposes.  It
424     * artificially injects delays into background fetches that are
425     * performed when the user requests an item whose value is not
426     * currently resident.
427     *
428     * @param to how long to delay before performing a bg fetch
429     */
430    void setBGFetchDelay(uint32_t to) {
431        bgFetchDelay = to;
432    }
433
434    double getBGFetchDelay(void) { return (double)bgFetchDelay; }
435
436    void stopFlusher(void);
437
438    bool startFlusher(void);
439
440    bool pauseFlusher(void);
441    bool resumeFlusher(void);
442    void wakeUpFlusher(void);
443
444    bool startBgFetcher(void);
445    void stopBgFetcher(void);
446
447    /**
448     * Takes a snapshot of the current stats and persists them to disk.
449     */
450    void snapshotStats(void);
451
452    /**
453     * Enqueue a background fetch for a key.
454     *
455     * @param key the key to be bg fetched
456     * @param vbucket the vbucket in which the key lives
457     * @param cookie the cookie of the requestor
458     * @param type whether the fetch is for a non-resident value or metadata of
459     *             a (possibly) deleted item
460     */
461    void bgFetch(const const_sized_buffer key,
462                 uint16_t vbucket,
463                 const void *cookie,
464                 bool isMeta = false);
465
466    /**
467     * Complete a background fetch of a non resident value or metadata.
468     *
469     * @param key the key that was fetched
470     * @param vbucket the vbucket in which the key lived
471     * @param cookie the cookie of the requestor
472     * @param init the timestamp of when the request came in
473     * @param type whether the fetch is for a non-resident value or metadata of
474     *             a (possibly) deleted item
475     */
476    void completeBGFetch(const std::string &key,
477                         uint16_t vbucket,
478                         const void *cookie,
479                         hrtime_t init,
480                         bool isMeta);
481    /**
482     * Complete a batch of background fetch of a non resident value or metadata.
483     *
484     * @param vbId the vbucket in which the requested key lived
485     * @param fetchedItems vector of completed background feches containing key,
486     *                     value, client cookies
487     * @param start the time when the background fetch was started
488     *
489     */
490    void completeBGFetchMulti(uint16_t vbId,
491                              std::vector<bgfetched_item_t> &fetchedItems,
492                              hrtime_t start);
493
494    /**
495     * Helper function to update stats after completion of a background fetch
496     * for either the value of metadata of a key.
497     *
498     * @param init the time of epstore's initialization
499     * @param start the time when the background fetch was started
500     * @param stop the time when the background fetch completed
501     */
502    void updateBGStats(const hrtime_t init,
503                       const hrtime_t start,
504                       const hrtime_t stop);
505
506    RCPtr<VBucket> getVBucket(uint16_t vbid) {
507        return vbMap.getBucket(vbid);
508    }
509
510    uint64_t getLastPersistedCheckpointId(uint16_t vb) {
511        return vbMap.getPersistenceCheckpointId(vb);
512    }
513
514    uint64_t getLastPersistedSeqno(uint16_t vb) {
515        return vbMap.getPersistenceSeqno(vb);
516    }
517
518    /**
519     * Sets the vbucket or creates a vbucket with the desired state
520     *
521     * @param vbid vbucket id
522     * @param state desired state of the vbucket
523     * @param transfer indicates that the vbucket is transferred to the active
524     *                 post a failover and/or rebalance
525     * @param notify_dcp indicates whether we must consider closing DCP streams
526     *                    associated with the vbucket
527     *
528     * return status of the operation
529     */
530    ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t state,
531                                      bool transfer, bool notify_dcp = true);
532
533    /**
534     * Sets the vbucket or creates a vbucket with the desired state
535     *
536     * @param vbid vbucket id
537     * @param state desired state of the vbucket
538     * @param transfer indicates that the vbucket is transferred to the active
539     *                 post a failover and/or rebalance
540     * @param notify_dcp indicates whether we must consider closing DCP streams
541     *                    associated with the vbucket
542     * @param vbset LockHolder acquiring the 'vbsetMutex' lock in the
543     *              EventuallyPersistentStore class
544     * @param vbStateLock ptr to WriterLockHolder of 'stateLock' in the vbucket
545     *                    class. if passed as null, the function acquires the
546     *                    vbucket 'stateLock'
547     *
548     * return status of the operation
549     */
550    ENGINE_ERROR_CODE setVBucketState_UNLOCKED(
551                                    uint16_t vbid,
552                                    vbucket_state_t state,
553                                    bool transfer,
554                                    bool notify_dcp,
555                                    LockHolder& vbset,
556                                    WriterLockHolder* vbStateLock = nullptr);
557
558    /**
559     * Returns the 'vbsetMutex'
560     */
561    Mutex& getVbSetMutexLock() {
562        return vbsetMutex;
563    }
564
565    /**
566     * Physically deletes a VBucket from disk. This function should only
567     * be called on a VBucket that has already been logically deleted.
568     *
569     * @param vbid vbucket id
570     * @param cookie The connection that requested the deletion
571     */
572    bool completeVBucketDeletion(uint16_t vbid, const void* cookie);
573
574    /**
575     * Deletes a vbucket
576     *
577     * @param vbid The vbucket to delete.
578     * @param c The cookie for this connection.
579     *          Used in synchronous bucket deletes
580     *          to notify the connection of operation completion.
581     */
582    ENGINE_ERROR_CODE deleteVBucket(uint16_t vbid, const void* c = NULL);
583
584    /**
585     * Check for the existence of a vbucket in the case of couchstore
586     * or shard in the case of forestdb. Note that this function will be
587     * deprecated once forestdb is the only backend supported
588     *
589     * @param db_file_id vbucketid for couchstore or shard id in the
590     *                   case of forestdb
591     */
592    ENGINE_ERROR_CODE checkForDBExistence(uint16_t db_file_id);
593
594    /**
595     * Triggers compaction of a vbucket
596     *
597     * @param vbid The vbucket being compacted
598     * @param c The context for compaction of a DB file
599     * @param ck cookie used to notify connection of operation completion
600     */
601    ENGINE_ERROR_CODE compactDB(uint16_t vbid, compaction_ctx c, const void *ck);
602
603    /**
604     * Callback to do the compaction of a vbucket
605     *
606     * @param ctx Context for couchstore compaction hooks
607     * @param ck cookie used to notify connection of operation completion
608     */
609    bool doCompact(compaction_ctx *ctx, const void *ck);
610
611    /**
612     * Remove completed compaction tasks or wake snoozed tasks
613     *
614     * @param db_file_id vbucket id for couchstore or shard id in the
615     *                   case of forestdb
616     */
617    void updateCompactionTasks(uint16_t db_file_id);
618
619    /**
620     * Reset a given vbucket from memory and disk. This differs from vbucket deletion in that
621     * it does not delete the vbucket instance from memory hash table.
622     */
623    bool resetVBucket(uint16_t vbid);
624
625    /**
626     * Run a vBucket visitor, visiting all items. Synchronous.
627     */
628    void visit(VBucketVisitor &visitor);
629
630    /**
631     * Run a vbucket visitor with separate jobs per vbucket.
632     *
633     * Note that this is asynchronous.
634     */
635    size_t visit(std::shared_ptr<VBucketVisitor> visitor, const char *lbl,
636               task_type_t taskGroup, TaskId id,
637               double sleepTime=0) {
638        return ExecutorPool::get()->schedule(new VBCBAdaptor(this, id, visitor,
639                                             lbl, sleepTime), taskGroup);
640    }
641
642    /**
643     * Visit the items in this epStore, starting the iteration from the
644     * given startPosition and allowing the visit to be paused at any point.
645     *
646     * During visitation, the visitor object can request that the visit
647     * is stopped after the current item. The position passed to the
648     * visitor can then be used to restart visiting at the *APPROXIMATE*
649     * same position as it paused.
650     * This is approximate as various locks are released when the
651     * function returns, so any changes to the underlying epStore may cause
652     * the visiting to restart at the slightly different place.
653     *
654     * As a consequence, *DO NOT USE THIS METHOD* if you need to guarantee
655     * that all items are visited!
656     *
657     * @param visitor The visitor object.
658     * @return The final epStore position visited; equal to
659     *         EventuallyPersistentStore::end() if all items were visited
660     *         otherwise the position to resume from.
661     */
662    Position pauseResumeVisit(PauseResumeEPStoreVisitor& visitor,
663                              Position& start_pos);
664
665
666    /**
667     * Return a position at the start of the epStore.
668     */
669    Position startPosition() const;
670
671    /**
672     * Return a position at the end of the epStore. Has similar semantics
673     * as STL end() (i.e. one past the last element).
674     */
675    Position endPosition() const;
676
677    const Flusher* getFlusher(uint16_t shardId);
678
679    Warmup* getWarmup(void) const;
680
681    ENGINE_ERROR_CODE getKeyStats(const std::string &key, uint16_t vbucket,
682                                  const void* cookie, key_stats &kstats,
683                                  bool bgfetch, bool wantsDeleted=false);
684
685    std::string validateKey(const std::string &key,  uint16_t vbucket,
686                            Item &diskItem);
687
688    GetValue getLocked(const std::string &key, uint16_t vbucket,
689                       rel_time_t currentTime, uint32_t lockTimeout,
690                       const void *cookie);
691
692    ENGINE_ERROR_CODE unlockKey(const std::string &key,
693                                uint16_t vbucket,
694                                uint64_t cas,
695                                rel_time_t currentTime);
696
697
698    KVStore* getRWUnderlying(uint16_t vbId) {
699        return vbMap.getShardByVbId(vbId)->getRWUnderlying();
700    }
701
702    KVStore* getRWUnderlyingByShard(size_t shardId) {
703        return vbMap.shards[shardId]->getRWUnderlying();
704    }
705
706    KVStore* getROUnderlyingByShard(size_t shardId) {
707        return vbMap.shards[shardId]->getROUnderlying();
708    }
709
710    KVStore* getROUnderlying(uint16_t vbId) {
711        return vbMap.getShardByVbId(vbId)->getROUnderlying();
712    }
713
714    void deleteExpiredItem(uint16_t, std::string &, time_t, uint64_t,
715                           exp_type_t);
716    void deleteExpiredItems(std::list<std::pair<uint16_t, std::string> > &,
717                            exp_type_t);
718
719
720    /**
721     * Get the memoized storage properties from the DB.kv
722     */
723    const StorageProperties getStorageProperties() const {
724        return *storageProperties;
725    }
726
727    /**
728     * Schedule a vbstate persistence operation for all vbuckets.
729     */
730    void scheduleVBStatePersist();
731
732    /**
733     * Schedule a vbstate persistence operation for a given vbucket.
734     */
735    void scheduleVBStatePersist(VBucket::id_type vbid);
736
737    /**
738     * Persist a vbucket's state.
739     */
740    bool persistVBState(uint16_t vbid);
741
742    const VBucketMap &getVBuckets() {
743        return vbMap;
744    }
745
746    EventuallyPersistentEngine& getEPEngine() {
747        return engine;
748    }
749
750    size_t getExpiryPagerSleeptime(void) {
751        LockHolder lh(expiryPager.mutex);
752        return expiryPager.sleeptime;
753    }
754
755    size_t getTransactionTimePerItem() {
756        return lastTransTimePerItem.load();
757    }
758
759    bool isFlushAllScheduled() {
760        return diskFlushAll.load();
761    }
762
763    bool scheduleFlushAllTask(const void* cookie, time_t when);
764
765    void setFlushAllComplete();
766
767    void setBackfillMemoryThreshold(double threshold);
768
769    void setExpiryPagerSleeptime(size_t val);
770    void setExpiryPagerTasktime(ssize_t val);
771    void enableExpiryPager();
772    void disableExpiryPager();
773
774    void enableAccessScannerTask();
775    void disableAccessScannerTask();
776    void setAccessScannerSleeptime(size_t val, bool useStartTime);
777    void resetAccessScannerStartTime();
778
779    void resetAccessScannerTasktime() {
780        accessScanner.lastTaskRuntime = gethrtime();
781    }
782
783    void setAllBloomFilters(bool to);
784
785    float getBfiltersResidencyThreshold() {
786        return bfilterResidencyThreshold;
787    }
788
789    void setBfiltersResidencyThreshold(float to) {
790        bfilterResidencyThreshold = to;
791    }
792
793    bool isMetaDataResident(RCPtr<VBucket> &vb, const std::string &key);
794
795    void incExpirationStat(RCPtr<VBucket> &vb, exp_type_t source) {
796        switch (source) {
797        case EXP_BY_PAGER:
798            ++stats.expired_pager;
799            break;
800        case EXP_BY_COMPACTOR:
801            ++stats.expired_compactor;
802            break;
803        case EXP_BY_ACCESS:
804            ++stats.expired_access;
805            break;
806        }
807        ++vb->numExpiredItems;
808    }
809
810    void logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
811        const auto ns_count = std::chrono::duration_cast
812                <std::chrono::microseconds>(enqTime).count();
813        stats.schedulingHisto[static_cast<int>(taskType)].add(ns_count);
814    }
815
816    void logRunTime(TaskId taskType, const ProcessClock::duration runTime) {
817        const auto ns_count = std::chrono::duration_cast
818                <std::chrono::microseconds>(runTime).count();
819        stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ns_count);
820    }
821
822    bool multiBGFetchEnabled() {
823        return storageProperties->hasEfficientGet();
824    }
825
826    void updateCachedResidentRatio(size_t activePerc, size_t replicaPerc) {
827        cachedResidentRatio.activeRatio.store(activePerc);
828        cachedResidentRatio.replicaRatio.store(replicaPerc);
829    }
830
831    bool isWarmingUp();
832
833    bool maybeEnableTraffic(void);
834
835    /**
836     * Checks the memory consumption.
837     * To be used by backfill tasks (tap & dcp).
838     */
839    bool isMemoryUsageTooHigh();
840
841    /**
842     * Flushes all items waiting for persistence in a given vbucket
843     * @param vbid The id of the vbucket to flush
844     * @return The number of items flushed
845     */
846    int flushVBucket(uint16_t vbid);
847
848    void commit(uint16_t shardId);
849
850    void addKVStoreStats(ADD_STAT add_stat, const void* cookie);
851
852    void addKVStoreTimingStats(ADD_STAT add_stat, const void* cookie);
853
854    /* Given a named KVStore statistic, return the value of that statistic,
855     * accumulated across any shards.
856     *
857     * @param name The name of the statistic
858     * @param[out] value The value of the statistic.
859     * @return True if the statistic was successfully returned via {value},
860     *              else false.
861     */
862    bool getKVStoreStat(const char* name, size_t& value);
863
864    void resetUnderlyingStats(void);
865    KVStore *getOneROUnderlying(void);
866    KVStore *getOneRWUnderlying(void);
867
868    item_eviction_policy_t getItemEvictionPolicy(void) const {
869        return eviction_policy;
870    }
871
872    /*
873     * Request a rollback of the vbucket to the specified seqno.
874     * If the rollbackSeqno is not a checkpoint boundary, then the rollback
875     * will be to the nearest checkpoint.
876     * There are also cases where the rollback will be forced to 0.
877     * various failures or if the rollback is > 50% of the data.
878     *
879     * A check of the vbucket's high-seqno indicates if a rollback request
880     * was not honoured exactly.
881     *
882     * @param vbid The vbucket to rollback
883     * @rollbackSeqno The seqno to rollback to.
884     *
885     * @return TaskStatus::Complete upon successful rollback
886     *         TaskStatus::Abort if vbucket is not replica or
887     *                           if vbucket is not valid
888     *                           if vbucket reset and rollback fails
889     *         TaskStatus::Reschedule if you cannot get a lock on the vbucket
890     */
891    TaskStatus rollback(uint16_t vbid, uint64_t rollbackSeqno);
892
893    void wakeUpItemPager() {
894        if (itmpTask->getState() == TASK_SNOOZED) {
895            ExecutorPool::get()->wake(itmpTask->getId());
896        }
897    }
898
899    void wakeUpCheckpointRemover() {
900        if (chkTask->getState() == TASK_SNOOZED) {
901            ExecutorPool::get()->wake(chkTask->getId());
902        }
903    }
904
905    void runDefragmenterTask();
906
907    bool runAccessScannerTask();
908
909    void runVbStatePersistTask(int vbid);
910
911    void setCompactionWriteQueueCap(size_t to) {
912        compactionWriteQueueCap = to;
913    }
914
915    void setCompactionExpMemThreshold(size_t to) {
916        compactionExpMemThreshold = static_cast<double>(to) / 100.0;
917    }
918
919    bool compactionCanExpireItems() {
920        // Process expired items only if memory usage is lesser than
921        // compaction_exp_mem_threshold and disk queue is small
922        // enough (marked by replication_throttle_queue_cap)
923
924        bool isMemoryUsageOk = (stats.getTotalMemoryUsed() <
925                          (stats.getMaxDataSize() * compactionExpMemThreshold));
926
927        size_t queueSize = stats.diskQueueSize.load();
928        bool isQueueSizeOk = ((stats.replicationThrottleWriteQueueCap == -1) ||
929             (queueSize < static_cast<size_t>(stats.replicationThrottleWriteQueueCap)));
930
931        return (isMemoryUsageOk && isQueueSizeOk);
932    }
933
934    void setCursorDroppingLowerUpperThresholds(size_t maxSize);
935
936    bool isAccessScannerEnabled() {
937        LockHolder lh(accessScanner.mutex);
938        return accessScanner.enabled;
939    }
940
941    bool isExpPagerEnabled() {
942        LockHolder lh(expiryPager.mutex);
943        return expiryPager.enabled;
944    }
945
946    //Check if there were any out-of-memory errors during warmup
947    bool isWarmupOOMFailure(void);
948
949    /*
950     * Change the max_cas of the specified vbucket to cas without any
951     * care for the data or ongoing operations...
952     */
953    ENGINE_ERROR_CODE forceMaxCas(uint16_t vbucket, uint64_t cas);
954
955protected:
956    // During the warmup phase we might want to enable external traffic
957    // at a given point in time.. The LoadStorageKvPairCallback will be
958    // triggered whenever we want to check if we could enable traffic..
959    friend class LoadStorageKVPairCallback;
960
961    // Methods called during warmup
962    std::vector<vbucket_state *> loadVBucketState();
963
964    void warmupCompleted();
965    void stopWarmup(void);
966
967    void scheduleVBDeletion(RCPtr<VBucket> &vb,
968                            const void* cookie,
969                            double delay = 0);
970
971    /* Queue an item for persistence and replication
972     *
973     * The caller of this function must hold the lock of the hash table
974     * partition that contains the StoredValue being Queued.
975     *
976     * @param vb the vbucket that contains the dirty item
977     * @param v the dirty item
978     * @param plh the pointer to the hash table partition lock for the dirty item
979     *        Note that the lock is released inside this function
980     * @param seqno sequence number of the mutation
981     * @param generateBySeqno request that the seqno is generated by this call
982     * @param generateCas request that the CAS is generated by this call
983     */
984    void queueDirty(RCPtr<VBucket> &vb,
985                    StoredValue* v,
986                    LockHolder *plh,
987                    uint64_t *seqno,
988                    const GenerateBySeqno generateBySeqno = GenerateBySeqno::Yes,
989                    const GenerateCas generateCas = GenerateCas::Yes);
990
991    /* Queue an item for persistence following a TAP command
992     *
993     * The caller of this function must hold the lock of the hash table
994     * partition that contains the StoredValue being Queued.
995     *
996     * @param vb the vbucket that contains the dirty item
997     * @param v the dirty item
998     * @param plh the pointer to the hash table partition lock for the dirty item
999     *        Note that the lock is released inside this function
1000     * @param seqno sequence number of the mutation
1001     * @param generateBySeqno request that the seqno is generated by this call
1002     */
1003    void tapQueueDirty(VBucket& vb,
1004                       StoredValue* v,
1005                       LockHolder& plh,
1006                       uint64_t *seqno,
1007                       const GenerateBySeqno generateBySeqno);
1008
1009    /**
1010     * Retrieve a StoredValue and invoke a method on it.
1011     *
1012     * Note that because of complications with void/non-void methods
1013     * and potentially missing StoredValues along with the way I
1014     * actually intend to use this, I don't return any values from
1015     * this.
1016     *
1017     * @param key the item's key to retrieve
1018     * @param vbid the vbucket containing the item
1019     * @param f the method to invoke on the item
1020     *
1021     * @return true if the object was found and method was invoked
1022     */
1023    bool invokeOnLockedStoredValue(const std::string &key, uint16_t vbid,
1024                                   void (StoredValue::* f)()) {
1025        RCPtr<VBucket> vb = getVBucket(vbid);
1026        if (!vb) {
1027            return false;
1028        }
1029
1030        int bucket_num(0);
1031        LockHolder lh = vb->ht.getLockedBucket(key, &bucket_num);
1032        StoredValue *v = vb->ht.unlocked_find(key, bucket_num, true);
1033
1034        if (v) {
1035            std::mem_fun(f)(v);
1036        }
1037        return v != NULL;
1038    }
1039
1040    void flushOneDeleteAll(void);
1041    PersistenceCallback* flushOneDelOrSet(const queued_item &qi,
1042                                          RCPtr<VBucket> &vb);
1043
1044    StoredValue *fetchValidValue(RCPtr<VBucket> &vb, const const_sized_buffer key,
1045                                 int bucket_num, bool wantsDeleted=false,
1046                                 bool trackReference=true, bool queueExpired=true);
1047
1048    GetValue getInternal(const const_sized_buffer key, uint16_t vbucket,
1049                         const void *cookie,
1050                         vbucket_state_t allowedState,
1051                         get_options_t options = TRACK_REFERENCE);
1052
1053    ENGINE_ERROR_CODE addTempItemForBgFetch(LockHolder &lock, int bucket_num,
1054                                            const const_sized_buffer key, RCPtr<VBucket> &vb,
1055                                            const void *cookie, bool metadataOnly,
1056                                            bool isReplication = false);
1057
1058    uint16_t getCommitInterval(uint16_t shardId);
1059
1060    uint16_t decrCommitInterval(uint16_t shardId);
1061
1062    /*
1063     * Helper method for the rollback function.
1064     * Drain the VB's checkpoints looking for items which have a seqno
1065     * above the rollbackSeqno and must be rolled back themselves.
1066     */
1067    void rollbackCheckpoint(RCPtr<VBucket> &vb, int64_t rollbackSeqno);
1068
1069    bool resetVBucket_UNLOCKED(uint16_t vbid, LockHolder& vbset);
1070
1071    friend class Warmup;
1072    friend class Flusher;
1073    friend class BGFetchCallback;
1074    friend class VKeyStatBGFetchCallback;
1075    friend class PersistenceCallback;
1076    friend class Deleter;
1077    friend class VBCBAdaptor;
1078    friend class VBucketVisitorTask;
1079    friend class ItemPager;
1080    friend class PagingVisitor;
1081
1082    EventuallyPersistentEngine     &engine;
1083    EPStats                        &stats;
1084    StorageProperties              *storageProperties;
1085    Warmup                         *warmupTask;
1086    std::unique_ptr<ConflictResolution> conflictResolver;
1087    VBucketMap                      vbMap;
1088    ExTask                          itmpTask;
1089    ExTask                          chkTask;
1090    float                           bfilterResidencyThreshold;
1091    ExTask                          defragmenterTask;
1092
1093    size_t                          compactionWriteQueueCap;
1094    float                           compactionExpMemThreshold;
1095
1096    /* Array of mutexes for each vbucket
1097     * Used by flush operations: flushVB, deleteVB, compactVB, snapshotVB */
1098    Mutex                          *vb_mutexes;
1099    std::vector<MutationLog*>       accessLog;
1100
1101    AtomicValue<size_t> bgFetchQueue;
1102
1103    AtomicValue<bool> diskFlushAll;
1104    struct FlushAllTaskCtx {
1105        FlushAllTaskCtx(): delayFlushAll(true), cookie(NULL) {}
1106        AtomicValue<bool> delayFlushAll;
1107        const void* cookie;
1108    } flushAllTaskCtx;
1109
1110    Mutex vbsetMutex;
1111    uint32_t bgFetchDelay;
1112    double backfillMemoryThreshold;
1113    struct ExpiryPagerDelta {
1114        ExpiryPagerDelta() : sleeptime(0), task(0), enabled(true) {}
1115        Mutex mutex;
1116        size_t sleeptime;
1117        size_t task;
1118        bool enabled;
1119    } expiryPager;
1120    struct ALogTask {
1121        ALogTask() : sleeptime(0), task(0), lastTaskRuntime(gethrtime()),
1122                     enabled(true) {}
1123        Mutex mutex;
1124        size_t sleeptime;
1125        size_t task;
1126        hrtime_t lastTaskRuntime;
1127        bool enabled;
1128    } accessScanner;
1129    struct ResidentRatio {
1130        AtomicValue<size_t> activeRatio;
1131        AtomicValue<size_t> replicaRatio;
1132    } cachedResidentRatio;
1133    size_t statsSnapshotTaskId;
1134    AtomicValue<size_t> lastTransTimePerItem;
1135    item_eviction_policy_t eviction_policy;
1136
1137    Mutex compactionLock;
1138    std::list<CompTaskEntry> compactionTasks;
1139
1140    DISALLOW_COPY_AND_ASSIGN(EventuallyPersistentStore);
1141};
1142
1143/**
1144 * Base class for visiting an epStore with pause/resume support.
1145 */
1146class PauseResumeEPStoreVisitor {
1147public:
1148    virtual ~PauseResumeEPStoreVisitor() {}
1149
1150    /**
1151     * Visit a hashtable within an epStore.
1152     *
1153     * @param vbucket_id ID of the vbucket being visited.
1154     * @param ht a reference to the hashtable.
1155     * @return True if visiting should continue, otherwise false.
1156     */
1157    virtual bool visit(uint16_t vbucket_id, HashTable& ht) = 0;
1158};
1159
1160#endif  // SRC_EP_H_
1161