xref: /3.0.3-GA/ep-engine/src/checkpoint.h (revision 48972792)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_CHECKPOINT_H_
19#define SRC_CHECKPOINT_H_ 1
20
21#include "config.h"
22
23#include <list>
24#include <map>
25#include <set>
26#include <string>
27#include <vector>
28
29#include "atomic.h"
30#include "common.h"
31#include "item.h"
32#include "locks.h"
33#include "stats.h"
34
35#define MIN_CHECKPOINT_ITEMS 100
36#define MAX_CHECKPOINT_ITEMS 500000
37#define DEFAULT_CHECKPOINT_ITEMS 5000
38
39#define MIN_CHECKPOINT_PERIOD 60 // 1 min.
40#define MAX_CHECKPOINT_PERIOD 28800 // 8 hours.
41#define DEFAULT_CHECKPOINT_PERIOD 1800 // 30 min.
42
43#define DEFAULT_MAX_CHECKPOINTS 2
44#define MAX_CHECKPOINTS_UPPER_BOUND 5
45
46/**
47 * The state of a given checkpoint.
48 */
49typedef enum {
50    CHECKPOINT_OPEN, //!< The checkpoint is open.
51    CHECKPOINT_CLOSED  //!< The checkpoint is not open.
52} checkpoint_state;
53
54/**
55 * A checkpoint index entry.
56 */
57struct index_entry {
58    std::list<queued_item>::iterator position;
59    int64_t mutation_id;
60};
61
62/**
63 * The checkpoint index maps a key to a checkpoint index_entry.
64 */
65typedef unordered_map<std::string, index_entry> checkpoint_index;
66
67class Checkpoint;
68class CheckpointManager;
69class CheckpointConfig;
70class VBucket;
71
72/**
73 * A checkpoint cursor
74 */
75class CheckpointCursor {
76    friend class CheckpointManager;
77    friend class Checkpoint;
78public:
79
80    CheckpointCursor() { }
81
82    CheckpointCursor(const std::string &n)
83        : name(n),
84          currentCheckpoint(),
85          currentPos(),
86          offset(0),
87          fromBeginningOnChkCollapse(false) { }
88
89    CheckpointCursor(const std::string &n,
90                     std::list<Checkpoint*>::iterator checkpoint,
91                     std::list<queued_item>::iterator pos,
92                     size_t os,
93                     bool beginningOnChkCollapse) :
94        name(n), currentCheckpoint(checkpoint), currentPos(pos),
95        offset(os), fromBeginningOnChkCollapse(beginningOnChkCollapse) { }
96
97    // We need to define the copy construct explicitly due to the fact
98    // that std::atomic implicitly deleted the assignment operator
99    CheckpointCursor(const CheckpointCursor &other) :
100        name(other.name), currentCheckpoint(other.currentCheckpoint),
101        currentPos(other.currentPos), offset(other.offset.load()),
102        fromBeginningOnChkCollapse(other.fromBeginningOnChkCollapse) { }
103
104    CheckpointCursor &operator=(const CheckpointCursor &other) {
105        name.assign(other.name);
106        currentCheckpoint = other.currentCheckpoint;
107        currentPos = other.currentPos;
108        offset.store(other.offset.load());
109        fromBeginningOnChkCollapse = other.fromBeginningOnChkCollapse;
110        return *this;
111    }
112
113private:
114    std::string                      name;
115    std::list<Checkpoint*>::iterator currentCheckpoint;
116    std::list<queued_item>::iterator currentPos;
117    AtomicValue<size_t>              offset;
118    bool                             fromBeginningOnChkCollapse;
119};
120
121/**
122 * The cursor index maps checkpoint cursor names to checkpoint cursors
123 */
124typedef std::map<const std::string, CheckpointCursor> cursor_index;
125
126/**
127 * Result from invoking queueDirty in the current open checkpoint.
128 */
129typedef enum {
130    /*
131     * The item exists on the right hand side of the persistence cursor. The
132     * item will be deduplicated and doesn't change the size of the checkpoint.
133     */
134    EXISTING_ITEM,
135
136    /**
137     * The item exists on the left hand side of the persistence cursor. It will
138     * be dedeuplicated and moved the to right hand side, but the item needs
139     * to be re-persisted.
140     */
141    PERSIST_AGAIN,
142
143    /**
144     * The item doesn't exist yet in the checkpoint. Adding this item will
145     * increase the size of the checkpoint.
146     */
147    NEW_ITEM
148} queue_dirty_t;
149
150/**
151 * Representation of a checkpoint used in the unified queue for persistence and tap.
152 */
153class Checkpoint {
154public:
155    Checkpoint(EPStats &st, uint64_t id, uint16_t vbid,
156               checkpoint_state state = CHECKPOINT_OPEN) :
157        stats(st), checkpointId(id), vbucketId(vbid), creationTime(ep_real_time()),
158        checkpointState(state), numItems(0), memOverhead(0) {
159        stats.memOverhead.fetch_add(memorySize());
160        cb_assert(stats.memOverhead.load() < GIGANTOR);
161    }
162
163    ~Checkpoint();
164
165    /**
166     * Return the checkpoint Id
167     */
168    uint64_t getId() const {
169        return checkpointId;
170    }
171
172    /**
173     * Set the checkpoint Id
174     * @param id the checkpoint Id to be set.
175     */
176    void setId(uint64_t id) {
177        checkpointId = id;
178    }
179
180    /**
181     * Return the creation timestamp of this checkpoint in sec.
182     */
183    rel_time_t getCreationTime() {
184        return creationTime;
185    }
186
187    /**
188     * Return the number of items belonging to this checkpoint.
189     */
190    size_t getNumItems() const {
191        return numItems;
192    }
193
194    /**
195     * Return the current state of this checkpoint.
196     */
197    checkpoint_state getState() const {
198        return checkpointState;
199    }
200
201    /**
202     * Set the current state of this checkpoint.
203     * @param state the checkpoint's new state
204     */
205    void setState(checkpoint_state state);
206
207    void popBackCheckpointEndItem();
208
209    /**
210     * Return the number of cursors that are currently walking through this checkpoint.
211     */
212    size_t getNumberOfCursors() const {
213        return cursors.size();
214    }
215
216    /**
217     * Register a cursor's name to this checkpoint
218     */
219    void registerCursorName(const std::string &name) {
220        cursors.insert(name);
221    }
222
223    /**
224     * Remove a cursor's name from this checkpoint
225     */
226    void removeCursorName(const std::string &name) {
227        cursors.erase(name);
228    }
229
230    /**
231     * Return true if the cursor with a given name exists in this checkpoint
232     */
233    bool hasCursorName(const std::string &name) const {
234        return cursors.find(name) != cursors.end();
235    }
236
237    /**
238     * Return the list of all cursor names in this checkpoint
239     */
240    const std::set<std::string> &getCursorNameList() const {
241        return cursors;
242    }
243
244    /**
245     * Queue an item to be written to persistent layer.
246     * @param item the item to be persisted
247     * @param checkpointManager the checkpoint manager to which this checkpoint belongs
248     * @param bySeqno the by sequence number assigned to this mutation
249     * @return a result indicating the status of the operation.
250     */
251    queue_dirty_t queueDirty(const queued_item &qi,
252                             CheckpointManager *checkpointManager);
253
254    uint64_t getLowSeqno() {
255        std::list<queued_item>::iterator pos = toWrite.begin();
256        pos++;
257        return (*pos)->getBySeqno();
258    }
259
260    uint64_t getHighSeqno() {
261        if (numItems == 0) {
262            return -1;
263        }
264        std::list<queued_item>::reverse_iterator pos = toWrite.rbegin();
265        if (checkpointState != CHECKPOINT_OPEN) {
266            ++pos;
267        }
268        return (*pos)->getBySeqno();
269    }
270
271    std::list<queued_item>::iterator begin() {
272        return toWrite.begin();
273    }
274
275    std::list<queued_item>::iterator end() {
276        return toWrite.end();
277    }
278
279    std::list<queued_item>::reverse_iterator rbegin() {
280        return toWrite.rbegin();
281    }
282
283    std::list<queued_item>::reverse_iterator rend() {
284        return toWrite.rend();
285    }
286
287    bool keyExists(const std::string &key);
288
289    /**
290     * Return the memory overhead of this checkpoint instance, except for the memory used by
291     * all the items belonging to this checkpoint. The memory overhead of those items is
292     * accounted separately in "ep_kv_size" stat.
293     * @return memory overhead of this checkpoint instance.
294     */
295    size_t memorySize() {
296        return sizeof(Checkpoint) + memOverhead;
297    }
298
299    /**
300     * Merge the previous checkpoint into the this checkpoint by adding the items from
301     * the previous checkpoint, which don't exist in this checkpoint.
302     * @param pPrevCheckpoint pointer to the previous checkpoint.
303     * @return the number of items added from the previous checkpoint.
304     */
305    size_t mergePrevCheckpoint(Checkpoint *pPrevCheckpoint);
306
307    /**
308     * Get the mutation id for a given key in this checkpoint
309     * @param key a key to retrieve its mutation id
310     * @return the mutation id for a given key
311     */
312    uint64_t getMutationIdForKey(const std::string &key);
313
314private:
315    EPStats                       &stats;
316    uint64_t                       checkpointId;
317    uint16_t                       vbucketId;
318    rel_time_t                     creationTime;
319    checkpoint_state               checkpointState;
320    size_t                         numItems;
321    std::set<std::string>          cursors; // List of cursors with their unique names.
322    // List is used for queueing mutations as vector incurs shift operations for deduplication.
323    std::list<queued_item>         toWrite;
324    checkpoint_index               keyIndex;
325    size_t                         memOverhead;
326};
327
328/**
329 * Representation of a checkpoint manager that maintains the list of checkpoints
330 * for each vbucket.
331 */
332class CheckpointManager {
333    friend class Checkpoint;
334    friend class EventuallyPersistentEngine;
335    friend class Consumer;
336    friend class TapConsumer;
337    friend class UprConsumer;
338public:
339
340    CheckpointManager(EPStats &st, uint16_t vbucket, CheckpointConfig &config,
341                      int64_t lastSeqno, uint64_t checkpointId = 1) :
342        stats(st), checkpointConfig(config), vbucketId(vbucket), numItems(0),
343        lastBySeqNo(lastSeqno), persistenceCursor("persistence"),
344        isCollapsedCheckpoint(false),
345        pCursorPreCheckpointId(0),
346        pCursorSeqno(lastSeqno) {
347        addNewCheckpoint(checkpointId);
348        registerPersistenceCursor();
349    }
350
351    ~CheckpointManager();
352
353    uint64_t getOpenCheckpointId_UNLOCKED();
354    uint64_t getOpenCheckpointId();
355
356    uint64_t getLastClosedCheckpointId_UNLOCKED();
357    uint64_t getLastClosedCheckpointId();
358
359    void setOpenCheckpointId_UNLOCKED(uint64_t id);
360
361    void setOpenCheckpointId(uint64_t id) {
362        LockHolder lh(queueLock);
363        setOpenCheckpointId_UNLOCKED(id);
364    }
365
366    /**
367     * Remove closed unreferenced checkpoints and return them through the vector.
368     * @param vbucket the vbucket that this checkpoint manager belongs to.
369     * @param newOpenCheckpointCreated the flag indicating if the new open checkpoint was created
370     * as a result of running this function.
371     * @return the number of items that are purged from checkpoint
372     */
373    size_t removeClosedUnrefCheckpoints(const RCPtr<VBucket> &vbucket,
374                                        bool &newOpenCheckpointCreated);
375
376    /**
377     * Register the cursor for getting items whose bySeqno values are between
378     * startBySeqno and endBySeqno, and close the open checkpoint if endBySeqno
379     * belongs to the open checkpoint.
380     * @param startBySeqno start bySeqno.
381     * @param endBySeqno end bySeqno.
382     * @return the bySeqno with which the cursor can start.
383     */
384    uint64_t registerTAPCursorBySeqno(const std::string &name,
385                                      uint64_t startBySeqno,
386                                      uint64_t endBySeqno);
387
388    /**
389     * Register the new cursor for a given TAP connection
390     * @param name the name of a given TAP connection
391     * @param checkpointId the checkpoint Id to start with.
392     * @param alwaysFromBeginning the flag indicating if a cursor should be set to the beginning of
393     * checkpoint to start with, even if the cursor is currently in that checkpoint.
394     * @return true if the checkpoint to start with exists in the queue.
395     */
396    bool registerTAPCursor(const std::string &name, uint64_t checkpointId = 1,
397                           bool alwaysFromBeginning = false);
398
399    /**
400     * Remove the cursor for a given TAP connection.
401     * @param name the name of a given TAP connection
402     * @return true if the TAP cursor is removed successfully.
403     */
404    bool removeTAPCursor(const std::string &name);
405
406    /**
407     * Get the Id of the checkpoint where the given TAP connection's cursor is currently located.
408     * If the cursor is not found, return 0 as a checkpoint Id.
409     * @param name the name of a given TAP connection
410     * @return the checkpoint Id for a given TAP connection's cursor.
411     */
412    uint64_t getCheckpointIdForTAPCursor(const std::string &name);
413
414    size_t getNumOfTAPCursors();
415
416    std::list<std::string> getTAPCursorNames();
417
418    /**
419     * Queue an item to be written to persistent layer.
420     * @param item the item to be persisted.
421     * @param vbucket the vbucket that a new item is pushed into.
422     * @param bySeqno the sequence number assigned to this mutation
423     * @return true if an item queued increases the size of persistence queue by 1.
424     */
425    bool queueDirty(const RCPtr<VBucket> &vb, queued_item& qi, bool genSeqno);
426
427    /**
428     * Return the next item to be sent to a given TAP connection
429     * @param name the name of a given TAP connection
430     * @param isLastMutationItem flag indicating if the item to be returned is the last mutation one
431     * in the closed checkpoint.
432     * @return the next item to be sent to a given TAP connection.
433     */
434    queued_item nextItem(const std::string &name, bool &isLastMutationItem,
435                         uint64_t &highSeqno);
436
437    /**
438     * Return the list of items, which needs to be persisted, to the flusher.
439     * @param items the array that will contain the list of items to be persisted and
440     * be pushed into the flusher's outgoing queue where the further IO optimization is performed.
441     */
442    void getAllItemsForPersistence(std::vector<queued_item> &items);
443
444    /**
445     * Return the total number of items that belong to this checkpoint manager.
446     */
447    size_t getNumItems() {
448        return numItems;
449    }
450
451    size_t getNumOpenChkItems();
452
453    size_t getNumCheckpoints();
454
455    /**
456     * Return the total number of remaining items that should be visited by the persistence cursor.
457     */
458    size_t getNumItemsForPersistence_UNLOCKED();
459
460    size_t getNumItemsForPersistence() {
461        LockHolder lh(queueLock);
462        return getNumItemsForPersistence_UNLOCKED();
463    }
464
465    size_t getNumItemsForTAPConnection(const std::string &name);
466
467    /**
468     * Return true if a given key was already visited by all the cursors
469     * and is eligible for eviction.
470     */
471    bool eligibleForEviction(const std::string &key);
472
473    /**
474     * Clear all the checkpoints managed by this checkpoint manager.
475     */
476    void clear(vbucket_state_t vbState);
477
478    /**
479     * If a given TAP cursor currently points to the checkpoint_end dummy item,
480     * decrease its current position by 1. This function is mainly used for checkpoint
481     * synchronization between the master and slave nodes.
482     * @param name the name of a given TAP connection
483     */
484    void decrTapCursorFromCheckpointEnd(const std::string &name);
485
486    bool hasNext(const std::string &name);
487
488    bool hasNextForPersistence();
489
490    const CheckpointConfig &getCheckpointConfig() const {
491        return checkpointConfig;
492    }
493
494    void addStats(ADD_STAT add_stat, const void *cookie);
495
496    /**
497     * Create a new open checkpoint by force.
498     * @return the new open checkpoint id
499     */
500    uint64_t createNewCheckpoint();
501
502    void resetTAPCursors(const std::list<std::string> &cursors);
503
504    /**
505     * Get id of the previous checkpoint that is followed by the checkpoint
506     * where the persistence cursor is currently walking.
507     */
508    uint64_t getPersistenceCursorPreChkId();
509
510    /**
511     * Get seqno for the item that the persistence cursor is currently pointing to.
512     */
513    uint64_t getPersistenceCursorSeqno();
514
515    /**
516     * This method performs the following steps for creating a new checkpoint with a given ID i1:
517     * 1) Check if the checkpoint manager contains any checkpoints with IDs >= i1.
518     * 2) If exists, collapse all checkpoints and set the open checkpoint id to a given ID.
519     * 3) Otherwise, simply create a new open checkpoint with a given ID.
520     * This method is mainly for dealing with rollback events from a TAP producer.
521     * @param id the id of a checkpoint to be created.
522     * @param vbucket vbucket of the checkpoint.
523     */
524    void checkAndAddNewCheckpoint(uint64_t id, const RCPtr<VBucket> &vbucket);
525
526    /**
527     * Gets the mutation id for a given checkpoint item.
528     * @param The checkpoint to look for the key in
529     * @param The key to get the mutation id for
530     * @return The mutation id or 0 if not found
531     */
532    uint64_t getMutationIdForKey(uint64_t chk_id, std::string key);
533
534    bool incrCursor(CheckpointCursor &cursor);
535
536    void itemsPersisted();
537
538    void setBySeqno(int64_t seqno) {
539        LockHolder lh(queueLock);
540        lastBySeqNo = seqno;
541    }
542
543    int64_t getHighSeqno() {
544        return lastBySeqNo;
545    }
546
547    int64_t nextBySeqno() {
548        return ++lastBySeqNo;
549    }
550
551private:
552
553    bool registerTAPCursor_UNLOCKED(const std::string &name,
554                                    uint64_t checkpointId = 1,
555                                    bool alwaysFromBeginning = false);
556
557    void registerPersistenceCursor();
558
559    /**
560     * Create a new open checkpoint and add it to the checkpoint list.
561     * The lock should be acquired before calling this function.
562     * @param id the id of a checkpoint to be created.
563     */
564    bool addNewCheckpoint_UNLOCKED(uint64_t id);
565
566    void removeInvalidCursorsOnCheckpoint(Checkpoint *pCheckpoint);
567
568    /**
569     * Create a new open checkpoint and add it to the checkpoint list.
570     * @param id the id of a checkpoint to be created.
571     */
572    bool addNewCheckpoint(uint64_t id);
573
574    bool moveCursorToNextCheckpoint(CheckpointCursor &cursor);
575
576    /**
577     * Check the current open checkpoint to see if we need to create the new open checkpoint.
578     * @param forceCreation is to indicate if a new checkpoint is created due to online update or
579     * high memory usage.
580     * @param timeBound is to indicate if time bound should be considered in creating a new
581     * checkpoint.
582     * @return the previous open checkpoint Id if we create the new open checkpoint. Otherwise
583     * return 0.
584     */
585    uint64_t checkOpenCheckpoint_UNLOCKED(bool forceCreation, bool timeBound);
586
587    uint64_t checkOpenCheckpoint(bool forceCreation, bool timeBound) {
588        LockHolder lh(queueLock);
589        return checkOpenCheckpoint_UNLOCKED(forceCreation, timeBound);
590    }
591
592    bool closeOpenCheckpoint_UNLOCKED(uint64_t id);
593    bool closeOpenCheckpoint(uint64_t id);
594
595    void decrCursorOffset_UNLOCKED(CheckpointCursor &cursor, size_t decr);
596
597    void decrCursorPos_UNLOCKED(CheckpointCursor &cursor);
598
599    bool isLastMutationItemInCheckpoint(CheckpointCursor &cursor);
600
601    bool isCheckpointCreationForHighMemUsage(const RCPtr<VBucket> &vbucket);
602
603    void collapseClosedCheckpoints(std::list<Checkpoint*> &collapsedChks);
604
605    void collapseCheckpoints(uint64_t id);
606
607    void resetCursors(bool resetPersistenceCursor = true);
608
609    void putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &cursors,
610                                  std::list<Checkpoint*>::iterator chkItr);
611
612    queued_item createCheckpointItem(uint64_t id, uint16_t vbid,
613                                     enum queue_operation checkpoint_op);
614
615    EPStats                 &stats;
616    CheckpointConfig        &checkpointConfig;
617    Mutex                    queueLock;
618    uint16_t                 vbucketId;
619    AtomicValue<size_t>      numItems;
620    int64_t                  lastBySeqNo;
621    std::list<Checkpoint*>   checkpointList;
622    CheckpointCursor         persistenceCursor;
623    bool                     isCollapsedCheckpoint;
624    uint64_t                 lastClosedCheckpointId;
625    uint64_t                 pCursorPreCheckpointId;
626    uint64_t                 pCursorSeqno;
627    cursor_index             tapCursors;
628};
629
630/**
631 * A class containing the config parameters for checkpoint.
632 */
633
634class CheckpointConfig {
635public:
636    CheckpointConfig()
637        : checkpointPeriod(DEFAULT_CHECKPOINT_PERIOD),
638          checkpointMaxItems(DEFAULT_CHECKPOINT_ITEMS),
639          maxCheckpoints(DEFAULT_MAX_CHECKPOINTS),
640          itemNumBasedNewCheckpoint(true),
641          keepClosedCheckpoints(false)
642    { /* empty */ }
643
644    CheckpointConfig(EventuallyPersistentEngine &e);
645
646    rel_time_t getCheckpointPeriod() const {
647        return checkpointPeriod;
648    }
649
650    size_t getCheckpointMaxItems() const {
651        return checkpointMaxItems;
652    }
653
654    size_t getMaxCheckpoints() const {
655        return maxCheckpoints;
656    }
657
658    bool isItemNumBasedNewCheckpoint() const {
659        return itemNumBasedNewCheckpoint;
660    }
661
662    bool canKeepClosedCheckpoints() const {
663        return keepClosedCheckpoints;
664    }
665
666protected:
667    friend class CheckpointConfigChangeListener;
668    friend class EventuallyPersistentEngine;
669
670    bool validateCheckpointMaxItemsParam(size_t checkpoint_max_items);
671    bool validateCheckpointPeriodParam(size_t checkpoint_period);
672    bool validateMaxCheckpointsParam(size_t max_checkpoints);
673
674    void setCheckpointPeriod(size_t value);
675    void setCheckpointMaxItems(size_t value);
676    void setMaxCheckpoints(size_t value);
677
678    void allowItemNumBasedNewCheckpoint(bool value) {
679        itemNumBasedNewCheckpoint = value;
680    }
681
682    void allowKeepClosedCheckpoints(bool value) {
683        keepClosedCheckpoints = value;
684    }
685
686    static void addConfigChangeListener(EventuallyPersistentEngine &engine);
687
688private:
689    // Period of a checkpoint in terms of time in sec
690    rel_time_t checkpointPeriod;
691    // Number of max items allowed in each checkpoint
692    size_t checkpointMaxItems;
693    // Number of max checkpoints allowed
694    size_t     maxCheckpoints;
695    // Flag indicating if a new checkpoint is created once the number of items in the current
696    // checkpoint is greater than the max number allowed.
697    bool itemNumBasedNewCheckpoint;
698    // Flag indicating if closed checkpoints should be kept in memory if the current memory usage
699    // below the high water mark.
700    bool keepClosedCheckpoints;
701};
702
703#endif  // SRC_CHECKPOINT_H_
704