1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_CHECKPOINT_H_
19#define SRC_CHECKPOINT_H_ 1
20
21#include "config.h"
22
23#include <list>
24#include <map>
25#include <set>
26#include <string>
27#include <vector>
28
29#include "atomic.h"
30#include "common.h"
31#include "item.h"
32#include "locks.h"
33#include "stats.h"
34
35#define MIN_CHECKPOINT_ITEMS 10
36#define MAX_CHECKPOINT_ITEMS 50000
37#define DEFAULT_CHECKPOINT_ITEMS 500
38
39#define MIN_CHECKPOINT_PERIOD 1 //  1 sec.
40#define MAX_CHECKPOINT_PERIOD 3600 // 3600 sec.
41#define DEFAULT_CHECKPOINT_PERIOD 5 // 5 sec.
42
43#define DEFAULT_MAX_CHECKPOINTS 2
44#define MAX_CHECKPOINTS_UPPER_BOUND 5
45
46/**
47 * The state of a given checkpoint.
48 */
49typedef enum {
50    CHECKPOINT_OPEN, //!< The checkpoint is open.
51    CHECKPOINT_CLOSED  //!< The checkpoint is not open.
52} checkpoint_state;
53
54/**
55 * A checkpoint index entry.
56 */
57struct index_entry {
58    std::list<queued_item>::iterator position;
59    int64_t mutation_id;
60};
61
62/**
63 * The checkpoint index maps a key to a checkpoint index_entry.
64 */
65typedef unordered_map<std::string, index_entry> checkpoint_index;
66
67class Checkpoint;
68class CheckpointManager;
69class CheckpointConfig;
70class VBucket;
71
72/**
73 * A checkpoint cursor
74 */
75class CheckpointCursor {
76    friend class CheckpointManager;
77    friend class Checkpoint;
78public:
79
80    CheckpointCursor() { }
81
82    CheckpointCursor(const std::string &n)
83        : name(n),
84          currentCheckpoint(),
85          currentPos(),
86          offset(0),
87          fromBeginningOnChkCollapse(false) { }
88
89    CheckpointCursor(const std::string &n,
90                     std::list<Checkpoint*>::iterator checkpoint,
91                     std::list<queued_item>::iterator pos,
92                     size_t os,
93                     bool beginningOnChkCollapse) :
94        name(n), currentCheckpoint(checkpoint), currentPos(pos),
95        offset(os), fromBeginningOnChkCollapse(beginningOnChkCollapse) { }
96
97    // We need to define the copy construct explicitly due to the fact
98    // that std::atomic implicitly deleted the assignment operator
99    CheckpointCursor(const CheckpointCursor &other) :
100        name(other.name), currentCheckpoint(other.currentCheckpoint),
101        currentPos(other.currentPos), offset(other.offset.load()),
102        fromBeginningOnChkCollapse(other.fromBeginningOnChkCollapse) { }
103
104    CheckpointCursor &operator=(const CheckpointCursor &other) {
105        name.assign(other.name);
106        currentCheckpoint = other.currentCheckpoint;
107        currentPos = other.currentPos;
108        offset.store(other.offset.load());
109        fromBeginningOnChkCollapse = other.fromBeginningOnChkCollapse;
110        return *this;
111    }
112
113private:
114    std::string                      name;
115    std::list<Checkpoint*>::iterator currentCheckpoint;
116    std::list<queued_item>::iterator currentPos;
117    AtomicValue<size_t>              offset;
118    bool                             fromBeginningOnChkCollapse;
119};
120
121/**
122 * The cursor index maps checkpoint cursor names to checkpoint cursors
123 */
124typedef std::map<const std::string, CheckpointCursor> cursor_index;
125
126/**
127 * Result from invoking queueDirty in the current open checkpoint.
128 */
129typedef enum {
130    /*
131     * The item exists on the right hand side of the persistence cursor. The
132     * item will be deduplicated and doesn't change the size of the checkpoint.
133     */
134    EXISTING_ITEM,
135
136    /**
137     * The item exists on the left hand side of the persistence cursor. It will
138     * be dedeuplicated and moved the to right hand side, but the item needs
139     * to be re-persisted.
140     */
141    PERSIST_AGAIN,
142
143    /**
144     * The item doesn't exist yet in the checkpoint. Adding this item will
145     * increase the size of the checkpoint.
146     */
147    NEW_ITEM
148} queue_dirty_t;
149
150/**
151 * Representation of a checkpoint used in the unified queue for persistence and tap.
152 */
153class Checkpoint {
154public:
155    Checkpoint(EPStats &st, uint64_t id, uint16_t vbid,
156               checkpoint_state state = CHECKPOINT_OPEN) :
157        stats(st), checkpointId(id), vbucketId(vbid), creationTime(ep_real_time()),
158        checkpointState(state), numItems(0), memOverhead(0) {
159        stats.memOverhead.fetch_add(memorySize());
160        cb_assert(stats.memOverhead.load() < GIGANTOR);
161    }
162
163    ~Checkpoint();
164
165    /**
166     * Return the checkpoint Id
167     */
168    uint64_t getId() const {
169        return checkpointId;
170    }
171
172    /**
173     * Set the checkpoint Id
174     * @param id the checkpoint Id to be set.
175     */
176    void setId(uint64_t id) {
177        checkpointId = id;
178    }
179
180    /**
181     * Return the creation timestamp of this checkpoint in sec.
182     */
183    rel_time_t getCreationTime() {
184        return creationTime;
185    }
186
187    /**
188     * Return the number of items belonging to this checkpoint.
189     */
190    size_t getNumItems() const {
191        return numItems;
192    }
193
194    /**
195     * Return the current state of this checkpoint.
196     */
197    checkpoint_state getState() const {
198        return checkpointState;
199    }
200
201    /**
202     * Set the current state of this checkpoint.
203     * @param state the checkpoint's new state
204     */
205    void setState(checkpoint_state state);
206
207    void popBackCheckpointEndItem();
208
209    /**
210     * Return the number of cursors that are currently walking through this checkpoint.
211     */
212    size_t getNumberOfCursors() const {
213        return cursors.size();
214    }
215
216    /**
217     * Register a cursor's name to this checkpoint
218     */
219    void registerCursorName(const std::string &name) {
220        cursors.insert(name);
221    }
222
223    /**
224     * Remove a cursor's name from this checkpoint
225     */
226    void removeCursorName(const std::string &name) {
227        cursors.erase(name);
228    }
229
230    /**
231     * Return true if the cursor with a given name exists in this checkpoint
232     */
233    bool hasCursorName(const std::string &name) const {
234        return cursors.find(name) != cursors.end();
235    }
236
237    /**
238     * Return the list of all cursor names in this checkpoint
239     */
240    const std::set<std::string> &getCursorNameList() const {
241        return cursors;
242    }
243
244    /**
245     * Queue an item to be written to persistent layer.
246     * @param item the item to be persisted
247     * @param checkpointManager the checkpoint manager to which this checkpoint belongs
248     * @param bySeqno the by sequence number assigned to this mutation
249     * @return a result indicating the status of the operation.
250     */
251    queue_dirty_t queueDirty(const queued_item &qi,
252                             CheckpointManager *checkpointManager);
253
254    uint64_t getLowSeqno() {
255        std::list<queued_item>::iterator pos = toWrite.begin();
256        pos++;
257        return (*pos)->getBySeqno();
258    }
259
260    uint64_t getHighSeqno() {
261        std::list<queued_item>::reverse_iterator pos = toWrite.rbegin();
262        return (*pos)->getBySeqno();
263    }
264
265    std::list<queued_item>::iterator begin() {
266        return toWrite.begin();
267    }
268
269    std::list<queued_item>::iterator end() {
270        return toWrite.end();
271    }
272
273    std::list<queued_item>::reverse_iterator rbegin() {
274        return toWrite.rbegin();
275    }
276
277    std::list<queued_item>::reverse_iterator rend() {
278        return toWrite.rend();
279    }
280
281    bool keyExists(const std::string &key);
282
283    /**
284     * Return the memory overhead of this checkpoint instance, except for the memory used by
285     * all the items belonging to this checkpoint. The memory overhead of those items is
286     * accounted separately in "ep_kv_size" stat.
287     * @return memory overhead of this checkpoint instance.
288     */
289    size_t memorySize() {
290        return sizeof(Checkpoint) + memOverhead;
291    }
292
293    /**
294     * Merge the previous checkpoint into the this checkpoint by adding the items from
295     * the previous checkpoint, which don't exist in this checkpoint.
296     * @param pPrevCheckpoint pointer to the previous checkpoint.
297     * @return the number of items added from the previous checkpoint.
298     */
299    size_t mergePrevCheckpoint(Checkpoint *pPrevCheckpoint);
300
301    /**
302     * Get the mutation id for a given key in this checkpoint
303     * @param key a key to retrieve its mutation id
304     * @return the mutation id for a given key
305     */
306    uint64_t getMutationIdForKey(const std::string &key);
307
308private:
309    EPStats                       &stats;
310    uint64_t                       checkpointId;
311    uint16_t                       vbucketId;
312    rel_time_t                     creationTime;
313    checkpoint_state               checkpointState;
314    size_t                         numItems;
315    std::set<std::string>          cursors; // List of cursors with their unique names.
316    // List is used for queueing mutations as vector incurs shift operations for deduplication.
317    std::list<queued_item>         toWrite;
318    checkpoint_index               keyIndex;
319    size_t                         memOverhead;
320};
321
322typedef std::pair<uint64_t, bool> CursorRegResult;
323
324/**
325 * Representation of a checkpoint manager that maintains the list of checkpoints
326 * for each vbucket.
327 */
328class CheckpointManager {
329    friend class Checkpoint;
330    friend class EventuallyPersistentEngine;
331    friend class Consumer;
332    friend class TapConsumer;
333public:
334
335    CheckpointManager(EPStats &st, uint16_t vbucket, CheckpointConfig &config,
336                      int64_t lastSeqno, uint64_t checkpointId = 1) :
337        stats(st), checkpointConfig(config), vbucketId(vbucket), numItems(0),
338        lastBySeqno(lastSeqno), lastClosedChkBySeqno(lastSeqno),
339        persistenceCursor("persistence"), isCollapsedCheckpoint(false),
340        pCursorPreCheckpointId(0) {
341        addNewCheckpoint(checkpointId);
342        registerPersistenceCursor();
343    }
344
345    ~CheckpointManager();
346
347    uint64_t getOpenCheckpointId_UNLOCKED();
348    uint64_t getOpenCheckpointId();
349
350    uint64_t getLastClosedCheckpointId_UNLOCKED();
351    uint64_t getLastClosedCheckpointId();
352
353    void setOpenCheckpointId_UNLOCKED(uint64_t id);
354
355    void setOpenCheckpointId(uint64_t id) {
356        LockHolder lh(queueLock);
357        setOpenCheckpointId_UNLOCKED(id);
358    }
359
360    /**
361     * Remove closed unreferenced checkpoints and return them through the vector.
362     * @param vbucket the vbucket that this checkpoint manager belongs to.
363     * @param newOpenCheckpointCreated the flag indicating if the new open checkpoint was created
364     * as a result of running this function.
365     * @return the number of items that are purged from checkpoint
366     */
367    size_t removeClosedUnrefCheckpoints(const RCPtr<VBucket> &vbucket,
368                                        bool &newOpenCheckpointCreated);
369
370    /**
371     * Register the cursor for getting items whose bySeqno values are between
372     * startBySeqno and endBySeqno, and close the open checkpoint if endBySeqno
373     * belongs to the open checkpoint.
374     * @param startBySeqno start bySeqno.
375     * @return Cursor registration result which consists of (1) the bySeqno with
376     * which the cursor can start and (2) flag indicating if the cursor starts
377     * with the first item on a checkpoint.
378     */
379    CursorRegResult registerTAPCursorBySeqno(const std::string &name,
380                                             uint64_t startBySeqno);
381
382    /**
383     * Register the new cursor for a given TAP connection
384     * @param name the name of a given TAP connection
385     * @param checkpointId the checkpoint Id to start with.
386     * @param alwaysFromBeginning the flag indicating if a cursor should be set to the beginning of
387     * checkpoint to start with, even if the cursor is currently in that checkpoint.
388     * @return true if the checkpoint to start with exists in the queue.
389     */
390    bool registerTAPCursor(const std::string &name, uint64_t checkpointId = 1,
391                           bool alwaysFromBeginning = false);
392
393    /**
394     * Remove the cursor for a given TAP connection.
395     * @param name the name of a given TAP connection
396     * @return true if the TAP cursor is removed successfully.
397     */
398    bool removeTAPCursor(const std::string &name);
399
400    /**
401     * Get the Id of the checkpoint where the given TAP connection's cursor is currently located.
402     * If the cursor is not found, return 0 as a checkpoint Id.
403     * @param name the name of a given TAP connection
404     * @return the checkpoint Id for a given TAP connection's cursor.
405     */
406    uint64_t getCheckpointIdForTAPCursor(const std::string &name);
407
408    size_t getNumOfTAPCursors();
409
410    std::list<std::string> getTAPCursorNames();
411
412    /**
413     * Queue an item to be written to persistent layer.
414     * @param item the item to be persisted.
415     * @param vbucket the vbucket that a new item is pushed into.
416     * @param bySeqno the sequence number assigned to this mutation
417     * @return true if an item queued increases the size of persistence queue by 1.
418     */
419    bool queueDirty(const RCPtr<VBucket> &vb, queued_item& qi, bool genSeqno);
420
421    /**
422     * Return the next item to be sent to a given TAP connection
423     * @param name the name of a given TAP connection
424     * @param isLastMutationItem flag indicating if the item to be returned is the last mutation one
425     * in the closed checkpoint.
426     * @return the next item to be sent to a given TAP connection.
427     */
428    queued_item nextItem(const std::string &name, bool &isLastMutationItem,
429                         uint64_t &highSeqno);
430
431    /**
432     * Return the list of items, which needs to be persisted, to the flusher.
433     * @param items the array that will contain the list of items to be persisted and
434     * be pushed into the flusher's outgoing queue where the further IO optimization is performed.
435     */
436    void getAllItemsForPersistence(std::vector<queued_item> &items);
437
438    void getAllItemsForCursor(const std::string& name,
439                              std::list<queued_item> &items);
440
441    /**
442     * Return the total number of items that belong to this checkpoint manager.
443     */
444    size_t getNumItems() {
445        return numItems;
446    }
447
448    size_t getNumOpenChkItems();
449
450    size_t getNumCheckpoints();
451
452    /**
453     * Return the total number of remaining items that should be visited by the persistence cursor.
454     */
455    size_t getNumItemsForPersistence_UNLOCKED();
456
457    size_t getNumItemsForPersistence() {
458        LockHolder lh(queueLock);
459        return getNumItemsForPersistence_UNLOCKED();
460    }
461
462    size_t getNumItemsForTAPConnection(const std::string &name);
463
464    /**
465     * Return true if a given key was already visited by all the cursors
466     * and is eligible for eviction.
467     */
468    bool eligibleForEviction(const std::string &key);
469
470    /**
471     * Clear all the checkpoints managed by this checkpoint manager.
472     */
473    void clear(vbucket_state_t vbState);
474
475    /**
476     * If a given TAP cursor currently points to the checkpoint_end dummy item,
477     * decrease its current position by 1. This function is mainly used for checkpoint
478     * synchronization between the master and slave nodes.
479     * @param name the name of a given TAP connection
480     */
481    void decrTapCursorFromCheckpointEnd(const std::string &name);
482
483    bool hasNext(const std::string &name);
484
485    bool hasNextForPersistence();
486
487    const CheckpointConfig &getCheckpointConfig() const {
488        return checkpointConfig;
489    }
490
491    void addStats(ADD_STAT add_stat, const void *cookie);
492
493    /**
494     * Create a new open checkpoint by force.
495     * @return the new open checkpoint id
496     */
497    uint64_t createNewCheckpoint();
498
499    void resetTAPCursors(const std::list<std::string> &cursors);
500
501    /**
502     * Get id of the previous checkpoint that is followed by the checkpoint
503     * where the persistence cursor is currently walking.
504     */
505    uint64_t getPersistenceCursorPreChkId();
506
507    /**
508     * Update the checkpoint manager persistence cursor checkpoint offset
509     */
510    void itemsPersisted();
511
512    /**
513     * This method performs the following steps for creating a new checkpoint with a given ID i1:
514     * 1) Check if the checkpoint manager contains any checkpoints with IDs >= i1.
515     * 2) If exists, collapse all checkpoints and set the open checkpoint id to a given ID.
516     * 3) Otherwise, simply create a new open checkpoint with a given ID.
517     * This method is mainly for dealing with rollback events from a TAP producer.
518     * @param id the id of a checkpoint to be created.
519     * @param vbucket vbucket of the checkpoint.
520     */
521    void checkAndAddNewCheckpoint(uint64_t id, const RCPtr<VBucket> &vbucket);
522
523    /**
524     * Gets the mutation id for a given checkpoint item.
525     * @param The checkpoint to look for the key in
526     * @param The key to get the mutation id for
527     * @return The mutation id or 0 if not found
528     */
529    uint64_t getMutationIdForKey(uint64_t chk_id, std::string key);
530
531    bool incrCursor(CheckpointCursor &cursor);
532
533    void setBySeqno(int64_t seqno) {
534        LockHolder lh(queueLock);
535        lastBySeqno = seqno;
536    }
537
538    int64_t getHighSeqno() {
539        LockHolder lh(queueLock);
540        return lastBySeqno;
541    }
542
543    int64_t getLastClosedChkBySeqno() {
544        LockHolder lh(queueLock);
545        return lastClosedChkBySeqno;
546    }
547
548    int64_t nextBySeqno() {
549        LockHolder lh(queueLock);
550        return ++lastBySeqno;
551    }
552
553private:
554
555    bool removeTAPCursor_UNLOCKED(const std::string &name);
556
557    bool registerTAPCursor_UNLOCKED(const std::string &name,
558                                    uint64_t checkpointId = 1,
559                                    bool alwaysFromBeginning = false);
560
561    void registerPersistenceCursor();
562
563    /**
564     * Create a new open checkpoint and add it to the checkpoint list.
565     * The lock should be acquired before calling this function.
566     * @param id the id of a checkpoint to be created.
567     */
568    bool addNewCheckpoint_UNLOCKED(uint64_t id);
569
570    void removeInvalidCursorsOnCheckpoint(Checkpoint *pCheckpoint);
571
572    /**
573     * Create a new open checkpoint and add it to the checkpoint list.
574     * @param id the id of a checkpoint to be created.
575     */
576    bool addNewCheckpoint(uint64_t id);
577
578    bool moveCursorToNextCheckpoint(CheckpointCursor &cursor);
579
580    /**
581     * Check the current open checkpoint to see if we need to create the new open checkpoint.
582     * @param forceCreation is to indicate if a new checkpoint is created due to online update or
583     * high memory usage.
584     * @param timeBound is to indicate if time bound should be considered in creating a new
585     * checkpoint.
586     * @return the previous open checkpoint Id if we create the new open checkpoint. Otherwise
587     * return 0.
588     */
589    uint64_t checkOpenCheckpoint_UNLOCKED(bool forceCreation, bool timeBound);
590
591    uint64_t checkOpenCheckpoint(bool forceCreation, bool timeBound) {
592        LockHolder lh(queueLock);
593        return checkOpenCheckpoint_UNLOCKED(forceCreation, timeBound);
594    }
595
596    bool closeOpenCheckpoint_UNLOCKED(uint64_t id);
597    bool closeOpenCheckpoint(uint64_t id);
598
599    void decrCursorOffset_UNLOCKED(CheckpointCursor &cursor, size_t decr);
600
601    void decrCursorPos_UNLOCKED(CheckpointCursor &cursor);
602
603    bool isLastMutationItemInCheckpoint(CheckpointCursor &cursor);
604
605    bool isCheckpointCreationForHighMemUsage(const RCPtr<VBucket> &vbucket);
606
607    void collapseClosedCheckpoints(std::list<Checkpoint*> &collapsedChks);
608
609    void collapseCheckpoints(uint64_t id);
610
611    void resetCursors(bool resetPersistenceCursor = true);
612
613    void putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &cursors,
614                                  std::list<Checkpoint*>::iterator chkItr);
615
616    queued_item createCheckpointItem(uint64_t id, uint16_t vbid,
617                                     enum queue_operation checkpoint_op);
618
619    size_t getNumOfMetaItemsFromCursor(CheckpointCursor &cursor);
620
621    EPStats                 &stats;
622    CheckpointConfig        &checkpointConfig;
623    Mutex                    queueLock;
624    uint16_t                 vbucketId;
625    AtomicValue<size_t>      numItems;
626    int64_t                  lastBySeqno;
627    int64_t                  lastClosedChkBySeqno;
628    std::list<Checkpoint*>   checkpointList;
629    CheckpointCursor         persistenceCursor;
630    bool                     isCollapsedCheckpoint;
631    uint64_t                 lastClosedCheckpointId;
632    uint64_t                 pCursorPreCheckpointId;
633    cursor_index             tapCursors;
634};
635
636/**
637 * A class containing the config parameters for checkpoint.
638 */
639
640class CheckpointConfig {
641public:
642    CheckpointConfig()
643        : checkpointPeriod(DEFAULT_CHECKPOINT_PERIOD),
644          checkpointMaxItems(DEFAULT_CHECKPOINT_ITEMS),
645          maxCheckpoints(DEFAULT_MAX_CHECKPOINTS),
646          itemNumBasedNewCheckpoint(true),
647          keepClosedCheckpoints(false),
648          enableChkMerge(false)
649    { /* empty */ }
650
651    CheckpointConfig(EventuallyPersistentEngine &e);
652
653    rel_time_t getCheckpointPeriod() const {
654        return checkpointPeriod;
655    }
656
657    size_t getCheckpointMaxItems() const {
658        return checkpointMaxItems;
659    }
660
661    size_t getMaxCheckpoints() const {
662        return maxCheckpoints;
663    }
664
665    bool isItemNumBasedNewCheckpoint() const {
666        return itemNumBasedNewCheckpoint;
667    }
668
669    bool canKeepClosedCheckpoints() const {
670        return keepClosedCheckpoints;
671    }
672
673    bool isCheckpointMergeSupported() const {
674        return enableChkMerge;
675    }
676
677protected:
678    friend class CheckpointConfigChangeListener;
679    friend class EventuallyPersistentEngine;
680
681    bool validateCheckpointMaxItemsParam(size_t checkpoint_max_items);
682    bool validateCheckpointPeriodParam(size_t checkpoint_period);
683    bool validateMaxCheckpointsParam(size_t max_checkpoints);
684
685    void setCheckpointPeriod(size_t value);
686    void setCheckpointMaxItems(size_t value);
687    void setMaxCheckpoints(size_t value);
688
689    void allowItemNumBasedNewCheckpoint(bool value) {
690        itemNumBasedNewCheckpoint = value;
691    }
692
693    void allowKeepClosedCheckpoints(bool value) {
694        keepClosedCheckpoints = value;
695    }
696
697    void allowCheckpointMerge(bool value) {
698        enableChkMerge = value;
699    }
700
701    static void addConfigChangeListener(EventuallyPersistentEngine &engine);
702
703private:
704    // Period of a checkpoint in terms of time in sec
705    rel_time_t checkpointPeriod;
706    // Number of max items allowed in each checkpoint
707    size_t checkpointMaxItems;
708    // Number of max checkpoints allowed
709    size_t     maxCheckpoints;
710    // Flag indicating if a new checkpoint is created once the number of items in the current
711    // checkpoint is greater than the max number allowed.
712    bool itemNumBasedNewCheckpoint;
713    // Flag indicating if closed checkpoints should be kept in memory if the current memory usage
714    // below the high water mark.
715    bool keepClosedCheckpoints;
716    // Flag indicating if merging closed checkpoints is enabled or not.
717    bool enableChkMerge;
718};
719
720#endif  // SRC_CHECKPOINT_H_
721