xref: /3.0.3-GA/ep-engine/src/checkpoint.cc (revision 24641af7)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string>
21#include <utility>
22#include <vector>
23
24#include "checkpoint.h"
25#include "ep_engine.h"
26#define STATWRITER_NAMESPACE checkpoint
27#include "statwriter.h"
28#undef STATWRITER_NAMESPACE
29#include "vbucket.h"
30
31/**
32 * A listener class to update checkpoint related configs at runtime.
33 */
34class CheckpointConfigChangeListener : public ValueChangedListener {
35public:
36    CheckpointConfigChangeListener(CheckpointConfig &c) : config(c) { }
37    virtual ~CheckpointConfigChangeListener() { }
38
39    virtual void sizeValueChanged(const std::string &key, size_t value) {
40        if (key.compare("chk_period") == 0) {
41            config.setCheckpointPeriod(value);
42        } else if (key.compare("chk_max_items") == 0) {
43            config.setCheckpointMaxItems(value);
44        } else if (key.compare("max_checkpoints") == 0) {
45            config.setMaxCheckpoints(value);
46        }
47    }
48
49    virtual void booleanValueChanged(const std::string &key, bool value) {
50        if (key.compare("item_num_based_new_chk") == 0) {
51            config.allowItemNumBasedNewCheckpoint(value);
52        } else if (key.compare("keep_closed_chks") == 0) {
53            config.allowKeepClosedCheckpoints(value);
54        }
55    }
56
57private:
58    CheckpointConfig &config;
59};
60
61Checkpoint::~Checkpoint() {
62    LOG(EXTENSION_LOG_INFO,
63        "Checkpoint %llu for vbucket %d is purged from memory",
64        checkpointId, vbucketId);
65    stats.memOverhead.fetch_sub(memorySize());
66    cb_assert(stats.memOverhead.load() < GIGANTOR);
67}
68
69void Checkpoint::setState(checkpoint_state state) {
70    checkpointState = state;
71}
72
73void Checkpoint::popBackCheckpointEndItem() {
74    if (!toWrite.empty() &&
75        toWrite.back()->getOperation() == queue_op_checkpoint_end) {
76        keyIndex.erase(toWrite.back()->getKey());
77        toWrite.pop_back();
78    }
79}
80
81bool Checkpoint::keyExists(const std::string &key) {
82    return keyIndex.find(key) != keyIndex.end();
83}
84
85queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
86                                     CheckpointManager *checkpointManager) {
87    assert (checkpointState == CHECKPOINT_OPEN);
88    queue_dirty_t rv;
89
90    checkpoint_index::iterator it = keyIndex.find(qi->getKey());
91    // Check if this checkpoint already had an item for the same key.
92    if (it != keyIndex.end()) {
93        rv = EXISTING_ITEM;
94        std::list<queued_item>::iterator currPos = it->second.position;
95        uint64_t currMutationId = it->second.mutation_id;
96        CheckpointCursor &pcursor = checkpointManager->persistenceCursor;
97        queued_item &pqi = *(pcursor.currentPos);
98
99        if (*(pcursor.currentCheckpoint) == this) {
100            // If the existing item is in the left-hand side of the item
101            // pointed by the persistence cursor, decrease the persistence
102            // cursor's offset by 1.
103            const std::string &key = pqi->getKey();
104            checkpoint_index::iterator ita = keyIndex.find(key);
105            if (ita != keyIndex.end()) {
106                uint64_t mutationId = ita->second.mutation_id;
107                if (currMutationId <= mutationId &&
108                    pqi->getOperation() != queue_op_checkpoint_start) {
109                    checkpointManager->decrCursorOffset_UNLOCKED(pcursor, 1);
110                    rv = PERSIST_AGAIN;
111                }
112            }
113            // If the persistence cursor points to the existing item for the
114            // same key,
115            // shift the cursor left by 1.
116            if (pcursor.currentPos == currPos) {
117                checkpointManager->decrCursorPos_UNLOCKED(pcursor);
118            }
119        }
120
121        cursor_index::iterator map_it = checkpointManager->tapCursors.begin();
122        for (; map_it != checkpointManager->tapCursors.end(); ++map_it) {
123
124            if (*(map_it->second.currentCheckpoint) == this) {
125                queued_item &tqi = *(map_it->second.currentPos);
126                const std::string &key = tqi->getKey();
127                checkpoint_index::iterator ita = keyIndex.find(key);
128                if (ita != keyIndex.end()) {
129                    uint64_t mutationId = ita->second.mutation_id;
130                    if (currMutationId <= mutationId &&
131                        tqi->getOperation() != queue_op_checkpoint_start) {
132                        checkpointManager->
133                                  decrCursorOffset_UNLOCKED(map_it->second, 1);
134                    }
135                }
136                // If an TAP cursor points to the existing item for the same
137                // key, shift it left by 1
138                if (map_it->second.currentPos == currPos) {
139                    checkpointManager->decrCursorPos_UNLOCKED(map_it->second);
140                }
141            }
142        }
143
144        toWrite.push_back(qi);
145        // Remove the existing item for the same key from the list.
146        toWrite.erase(currPos);
147    } else {
148        if (qi->getOperation() == queue_op_set ||
149            qi->getOperation() == queue_op_del) {
150            ++numItems;
151        }
152        rv = NEW_ITEM;
153        // Push the new item into the list
154        toWrite.push_back(qi);
155    }
156
157    if (qi->getNKey() > 0) {
158        std::list<queued_item>::iterator last = toWrite.end();
159        // --last is okay as the list is not empty now.
160        index_entry entry = {--last, qi->getBySeqno()};
161        // Set the index of the key to the new item that is pushed back into
162        // the list.
163        keyIndex[qi->getKey()] = entry;
164        if (rv == NEW_ITEM) {
165            size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
166                                  sizeof(queued_item);
167            memOverhead += newEntrySize;
168            stats.memOverhead.fetch_add(newEntrySize);
169            cb_assert(stats.memOverhead.load() < GIGANTOR);
170        }
171    }
172    return rv;
173}
174
175size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
176    size_t numNewItems = 0;
177    size_t newEntryMemOverhead = 0;
178    std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
179
180    LOG(EXTENSION_LOG_INFO,
181        "Collapse the checkpoint %llu into the checkpoint %llu for vbucket %d",
182        pPrevCheckpoint->getId(), checkpointId, vbucketId);
183
184    std::list<queued_item>::iterator itr = toWrite.begin();
185    uint64_t seqno = pPrevCheckpoint->getMutationIdForKey("dummy_key");
186    keyIndex["dummy_key"].mutation_id = seqno;
187    (*itr)->setBySeqno(seqno);
188
189    seqno = pPrevCheckpoint->getMutationIdForKey("checkpoint_start");
190    keyIndex["checkpoint_start"].mutation_id = seqno;
191    ++itr;
192    (*itr)->setBySeqno(seqno);
193
194    for (; rit != pPrevCheckpoint->rend(); ++rit) {
195        const std::string &key = (*rit)->getKey();
196        if ((*rit)->getOperation() != queue_op_del &&
197            (*rit)->getOperation() != queue_op_set) {
198            continue;
199        }
200        checkpoint_index::iterator it = keyIndex.find(key);
201        if (it == keyIndex.end()) {
202            std::list<queued_item>::iterator pos = toWrite.begin();
203            // Skip the first two meta items
204            ++pos; ++pos;
205            toWrite.insert(pos, *rit);
206            index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
207                                                    getMutationIdForKey(key))};
208            keyIndex[key] = entry;
209            newEntryMemOverhead += key.size() + sizeof(index_entry);
210            ++numItems;
211            ++numNewItems;
212        }
213    }
214    memOverhead += newEntryMemOverhead;
215    stats.memOverhead.fetch_add(newEntryMemOverhead);
216    cb_assert(stats.memOverhead.load() < GIGANTOR);
217    return numNewItems;
218}
219
220uint64_t Checkpoint::getMutationIdForKey(const std::string &key) {
221    uint64_t mid = 0;
222    checkpoint_index::iterator it = keyIndex.find(key);
223    if (it != keyIndex.end()) {
224        mid = it->second.mutation_id;
225    }
226    return mid;
227}
228
229CheckpointManager::~CheckpointManager() {
230    LockHolder lh(queueLock);
231    std::list<Checkpoint*>::iterator it = checkpointList.begin();
232    while(it != checkpointList.end()) {
233        delete *it;
234        ++it;
235    }
236}
237
238uint64_t CheckpointManager::getOpenCheckpointId_UNLOCKED() {
239    if (checkpointList.empty()) {
240        return 0;
241    }
242
243    uint64_t id = checkpointList.back()->getId();
244    return checkpointList.back()->getState() == CHECKPOINT_OPEN ? id : id + 1;
245}
246
247uint64_t CheckpointManager::getOpenCheckpointId() {
248    LockHolder lh(queueLock);
249    return getOpenCheckpointId_UNLOCKED();
250}
251
252uint64_t CheckpointManager::getLastClosedCheckpointId_UNLOCKED() {
253    if (!isCollapsedCheckpoint) {
254        uint64_t id = getOpenCheckpointId_UNLOCKED();
255        lastClosedCheckpointId = id > 0 ? (id - 1) : 0;
256    }
257    return lastClosedCheckpointId;
258}
259
260uint64_t CheckpointManager::getLastClosedCheckpointId() {
261    LockHolder lh(queueLock);
262    return getLastClosedCheckpointId_UNLOCKED();
263}
264
265void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
266    if (!checkpointList.empty()) {
267        // Update the checkpoint_start item with the new Id.
268        std::list<queued_item>::iterator it =
269            ++(checkpointList.back()->begin());
270        (*it)->setRevSeqno(id);
271        if (checkpointList.back()->getId() == 0) {
272            (*it)->setBySeqno(lastBySeqNo + 1);
273        }
274        checkpointList.back()->setId(id);
275        LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %llu "
276            "for vbucket %d, bySeqno is %llu, max is %llu", id, vbucketId,
277            (*it)->getBySeqno(), lastBySeqNo);
278
279    }
280}
281
282bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
283    // This is just for making sure that the current checkpoint should be
284    // closed.
285    if (!checkpointList.empty() &&
286        checkpointList.back()->getState() == CHECKPOINT_OPEN) {
287        closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
288    }
289
290    LOG(EXTENSION_LOG_INFO, "Create a new open checkpoint %llu for vbucket %d",
291        id, vbucketId);
292
293    Checkpoint *checkpoint = new Checkpoint(stats, id, vbucketId,
294                                            CHECKPOINT_OPEN);
295    // Add a dummy item into the new checkpoint, so that any cursor referring
296    // to the actual first
297    // item in this new checkpoint can be safely shifted left by 1 if the
298    // first item is removed
299    // and pushed into the tail.
300    queued_item qi = createCheckpointItem(0, 0xffff, queue_op_empty);
301    checkpoint->queueDirty(qi, this);
302
303    // This item represents the start of the new checkpoint and is also sent to the slave node.
304    qi = createCheckpointItem(id, vbucketId, queue_op_checkpoint_start);
305    checkpoint->queueDirty(qi, this);
306    ++numItems;
307    checkpointList.push_back(checkpoint);
308
309    return true;
310}
311
312bool CheckpointManager::addNewCheckpoint(uint64_t id) {
313    LockHolder lh(queueLock);
314    return addNewCheckpoint_UNLOCKED(id);
315}
316
317bool CheckpointManager::closeOpenCheckpoint_UNLOCKED(uint64_t id) {
318    if (checkpointList.empty()) {
319        return false;
320    }
321    if (id != checkpointList.back()->getId() ||
322        checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
323        return true;
324    }
325
326    LOG(EXTENSION_LOG_INFO, "Close the open checkpoint %llu for vbucket %d",
327        id, vbucketId);
328
329    // This item represents the end of the current open checkpoint and is sent to the slave node.
330    queued_item qi = createCheckpointItem(id, vbucketId,
331                                          queue_op_checkpoint_end);
332    checkpointList.back()->queueDirty(qi, this);
333    ++numItems;
334    checkpointList.back()->setState(CHECKPOINT_CLOSED);
335    return true;
336}
337
338bool CheckpointManager::closeOpenCheckpoint(uint64_t id) {
339    LockHolder lh(queueLock);
340    return closeOpenCheckpoint_UNLOCKED(id);
341}
342
343void CheckpointManager::registerPersistenceCursor() {
344    LockHolder lh(queueLock);
345    cb_assert(!checkpointList.empty());
346    persistenceCursor.currentCheckpoint = checkpointList.begin();
347    persistenceCursor.currentPos = checkpointList.front()->begin();
348    checkpointList.front()->registerCursorName(persistenceCursor.name);
349}
350
351bool CheckpointManager::registerTAPCursor(const std::string &name,
352                                          uint64_t checkpointId,
353                                          bool alwaysFromBeginning) {
354    LockHolder lh(queueLock);
355    return registerTAPCursor_UNLOCKED(name,
356                                      checkpointId,
357                                      alwaysFromBeginning);
358}
359
360uint64_t CheckpointManager::registerTAPCursorBySeqno(const std::string &name,
361                                                     uint64_t startBySeqno,
362                                                     uint64_t endBySeqno) {
363    LockHolder lh(queueLock);
364    cb_assert(!checkpointList.empty());
365    cb_assert(checkpointList.back()->getHighSeqno() >= startBySeqno);
366
367    size_t skipped = 0;
368    uint64_t seqnoToStart = std::numeric_limits<uint64_t>::max();
369    bool needToFindStartSeqno =
370        startBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
371    bool needToFindEndSeqno =
372        endBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
373
374    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
375    for (; itr != checkpointList.end(); ++itr) {
376        uint64_t en = (*itr)->getHighSeqno();
377        uint64_t st = (*itr)->getLowSeqno();
378        if (needToFindStartSeqno) {
379            if (startBySeqno < st) {
380                tapCursors[name] = CheckpointCursor(name, itr, (*itr)->begin(),
381                                                    skipped, false);
382                (*itr)->registerCursorName(name);
383                seqnoToStart = (*itr)->getLowSeqno();
384                needToFindStartSeqno = false;
385            } else if (startBySeqno == st) {
386                std::list<queued_item>::iterator iitr = (*itr)->begin();
387
388                //Advance the iterator to point to the first item in the
389                //checkpoint list
390                std::advance(iitr, 2);
391                skipped = 2;
392
393                tapCursors[name] = CheckpointCursor(name, itr, iitr, skipped,
394                                                    false);
395                (*itr)->registerCursorName(name);
396                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno()) + 1;
397                needToFindStartSeqno = false;
398            } else if (startBySeqno <= en) {
399                std::list<queued_item>::iterator iitr = (*itr)->begin();
400                while (++iitr != (*itr)->end() &&
401                       startBySeqno > static_cast<uint64_t>((*iitr)->getBySeqno())) {
402                    skipped++;
403                }
404
405                if (iitr == (*itr)->end() ||
406                    startBySeqno < static_cast<uint64_t>((*iitr)->getBySeqno())) {
407                    --iitr;
408                }
409
410                size_t remaining = (numItems > skipped) ? numItems - skipped : 0;
411                tapCursors[name] = CheckpointCursor(name, itr, iitr, remaining,
412                                                    false);
413                (*itr)->registerCursorName(name);
414                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno());
415                needToFindStartSeqno = false;
416            } else {
417                skipped += (*itr)->getNumItems() + 2;
418            }
419        }
420        if (needToFindEndSeqno) {
421            if ((*itr)->getNumItems() > 0 && endBySeqno <= en) {
422                if ((*itr)->getState() == CHECKPOINT_OPEN) {
423                    // Closed the open checkpoint and create a new one by force.
424                    checkOpenCheckpoint_UNLOCKED(true, true);
425                }
426                needToFindEndSeqno = false;
427            }
428        }
429        if (!needToFindStartSeqno && !needToFindEndSeqno) {
430            break;
431        }
432    }
433
434    if (seqnoToStart == std::numeric_limits<uint64_t>::max()) {
435        /*
436         * We should never get here since this would mean that the sequence number
437         * we are looking for is higher than anything currently assigned and there
438         * is already an assert above for this case.
439         */
440        LOG(EXTENSION_LOG_WARNING, "Cursor not registered into vb %d "
441            " for stream '%s' because seqno %llu is too high",
442            vbucketId, name.c_str(), startBySeqno);
443    }
444    return seqnoToStart;
445}
446
447bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
448                                                   uint64_t checkpointId,
449                                                   bool alwaysFromBeginning) {
450    cb_assert(!checkpointList.empty());
451
452    bool found = false;
453    std::list<Checkpoint*>::iterator it = checkpointList.begin();
454    for (; it != checkpointList.end(); ++it) {
455        if (checkpointId == (*it)->getId()) {
456            found = true;
457            break;
458        }
459    }
460
461    LOG(EXTENSION_LOG_INFO,
462        "Register the tap cursor with the name \"%s\" for vbucket %d",
463        name.c_str(), vbucketId);
464
465    // If the tap cursor exists, remove its name from the checkpoint that is
466    // currently referenced by the tap cursor.
467    cursor_index::iterator map_it = tapCursors.find(name);
468    if (map_it != tapCursors.end()) {
469        (*(map_it->second.currentCheckpoint))->removeCursorName(name);
470    }
471
472    if (!found) {
473        for (it = checkpointList.begin(); it != checkpointList.end(); ++it) {
474            if (pCursorPreCheckpointId < (*it)->getId() ||
475                pCursorPreCheckpointId == 0) {
476                break;
477            }
478        }
479
480        LOG(EXTENSION_LOG_DEBUG,
481            "Checkpoint %llu for vbucket %d doesn't exist in memory. "
482            "Set the cursor with the name \"%s\" to checkpoint %d.\n",
483            checkpointId, vbucketId, name.c_str(), (*it)->getId());
484
485        cb_assert(it != checkpointList.end());
486
487        size_t offset = 0;
488        std::list<Checkpoint*>::iterator pos = checkpointList.begin();
489        for (; pos != it; ++pos) {
490            offset += (*pos)->getNumItems() + 2;
491        }
492
493        tapCursors[name] = CheckpointCursor(name, it, (*it)->begin(), offset,
494                                            true);
495        (*it)->registerCursorName(name);
496    } else {
497        size_t offset = 0;
498        std::list<queued_item>::iterator curr;
499
500        LOG(EXTENSION_LOG_DEBUG,
501            "Checkpoint %llu for vbucket %d exists in memory. "
502            "Set the cursor with the name \"%s\" to the checkpoint %llu\n",
503            checkpointId, vbucketId, name.c_str(), checkpointId);
504
505        if (!alwaysFromBeginning &&
506            map_it != tapCursors.end() &&
507            (*(map_it->second.currentCheckpoint))->getId() == (*it)->getId()) {
508            // If the cursor is currently in the checkpoint to start with,
509            // simply start from
510            // its current position.
511            curr = map_it->second.currentPos;
512            offset = map_it->second.offset;
513        } else {
514            // Set the cursor's position to the begining of the checkpoint to
515            // start with
516            curr = (*it)->begin();
517            std::list<Checkpoint*>::iterator pos = checkpointList.begin();
518            for (; pos != it; ++pos) {
519                offset += (*pos)->getNumItems() + 2;
520                // 2 is for checkpoint start and end items.
521            }
522        }
523
524        tapCursors[name] = CheckpointCursor(name, it, curr, offset, true);
525        // Register the tap cursor's name to the checkpoint.
526        (*it)->registerCursorName(name);
527    }
528
529    return found;
530}
531
532bool CheckpointManager::removeTAPCursor(const std::string &name) {
533    LockHolder lh(queueLock);
534
535    cursor_index::iterator it = tapCursors.find(name);
536    if (it == tapCursors.end()) {
537        return false;
538    }
539
540    LOG(EXTENSION_LOG_INFO,
541        "Remove the checkpoint cursor with the name \"%s\" from vbucket %d",
542        name.c_str(), vbucketId);
543
544    // We can simply remove the cursor's name from the checkpoint to which it
545    // currently belongs,
546    // by calling
547    // (*(it->second.currentCheckpoint))->removeCursorName(name);
548    // However, we just want to do more sanity checks by looking at each
549    // checkpoint. This won't
550    // cause much overhead because the max number of checkpoints allowed per
551    // vbucket is small.
552    std::list<Checkpoint*>::iterator cit = checkpointList.begin();
553    for (; cit != checkpointList.end(); ++cit) {
554        (*cit)->removeCursorName(name);
555    }
556
557    tapCursors.erase(it);
558    return true;
559}
560
561uint64_t CheckpointManager::getCheckpointIdForTAPCursor(
562                                                     const std::string &name) {
563    LockHolder lh(queueLock);
564    cursor_index::iterator it = tapCursors.find(name);
565    if (it == tapCursors.end()) {
566        return 0;
567    }
568
569    return (*(it->second.currentCheckpoint))->getId();
570}
571
572size_t CheckpointManager::getNumOfTAPCursors() {
573    LockHolder lh(queueLock);
574    return tapCursors.size();
575}
576
577size_t CheckpointManager::getNumCheckpoints() {
578    LockHolder lh(queueLock);
579    return checkpointList.size();
580}
581
582std::list<std::string> CheckpointManager::getTAPCursorNames() {
583    LockHolder lh(queueLock);
584    std::list<std::string> cursor_names;
585    cursor_index::iterator tap_it = tapCursors.begin();
586        for (; tap_it != tapCursors.end(); ++tap_it) {
587        cursor_names.push_back((tap_it->first));
588    }
589    return cursor_names;
590}
591
592bool CheckpointManager::isCheckpointCreationForHighMemUsage(
593                                              const RCPtr<VBucket> &vbucket) {
594    bool forceCreation = false;
595    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
596    // pesistence and tap cursors are all currently in the open checkpoint?
597    bool allCursorsInOpenCheckpoint =
598        (tapCursors.size() + 1) == checkpointList.back()->getNumberOfCursors();
599
600    if (memoryUsed > stats.mem_high_wat &&
601        allCursorsInOpenCheckpoint &&
602        (checkpointList.back()->getNumItems() >= MIN_CHECKPOINT_ITEMS ||
603         checkpointList.back()->getNumItems() == vbucket->ht.getNumInMemoryItems())) {
604        forceCreation = true;
605    }
606    return forceCreation;
607}
608
609size_t CheckpointManager::removeClosedUnrefCheckpoints(
610                                              const RCPtr<VBucket> &vbucket,
611                                              bool &newOpenCheckpointCreated) {
612
613    // This function is executed periodically by the non-IO dispatcher.
614    LockHolder lh(queueLock);
615    cb_assert(vbucket);
616    uint64_t oldCheckpointId = 0;
617    bool canCreateNewCheckpoint = false;
618    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
619        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
620         checkpointList.front()->getNumberOfCursors() == 0)) {
621        canCreateNewCheckpoint = true;
622    }
623    if (vbucket->getState() == vbucket_state_active &&
624        canCreateNewCheckpoint) {
625
626        bool forceCreation = isCheckpointCreationForHighMemUsage(vbucket);
627        // Check if this master active vbucket needs to create a new open
628        // checkpoint.
629        oldCheckpointId = checkOpenCheckpoint_UNLOCKED(forceCreation, true);
630    }
631    newOpenCheckpointCreated = oldCheckpointId > 0;
632
633    if (checkpointConfig.canKeepClosedCheckpoints()) {
634        double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
635        if (memoryUsed < stats.mem_high_wat &&
636            checkpointList.size() <= checkpointConfig.getMaxCheckpoints()) {
637            return 0;
638        }
639    }
640
641    size_t numUnrefItems = 0;
642    size_t numMetaItems = 0;
643    size_t numCheckpointsRemoved = 0;
644    std::list<Checkpoint*> unrefCheckpointList;
645    std::list<Checkpoint*>::iterator it = checkpointList.begin();
646    for (; it != checkpointList.end(); ++it) {
647        removeInvalidCursorsOnCheckpoint(*it);
648        if ((*it)->getNumberOfCursors() > 0 ||
649            (*it)->getId() > pCursorPreCheckpointId) {
650            break;
651        } else {
652            numUnrefItems += (*it)->getNumItems();
653            numMetaItems +=  2; // 2 is for checkpoint start and end items.
654            ++numCheckpointsRemoved;
655            if (checkpointConfig.canKeepClosedCheckpoints() &&
656                (checkpointList.size() - numCheckpointsRemoved) <=
657                 checkpointConfig.getMaxCheckpoints()) {
658                // Collect unreferenced closed checkpoints until the number
659                // of checkpoints is
660                // equal to the number of max checkpoints allowed.
661                ++it;
662                break;
663            }
664        }
665    }
666    numItems.fetch_sub(numUnrefItems + numMetaItems);
667    if (numUnrefItems > 0) {
668        decrCursorOffset_UNLOCKED(persistenceCursor, numUnrefItems);
669        cursor_index::iterator map_it = tapCursors.begin();
670        for (; map_it != tapCursors.end(); ++map_it) {
671            decrCursorOffset_UNLOCKED(map_it->second, numUnrefItems);
672        }
673    }
674    unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList,
675                               checkpointList.begin(), it);
676
677    // If any cursor on a replica vbucket or downstream active vbucket
678    // receiving checkpoints from
679    // the upstream master is very slow and causes more closed checkpoints in
680    // memory, collapse those closed checkpoints into a single one to reduce
681    // the memory overhead.
682    if (!checkpointConfig.canKeepClosedCheckpoints() &&
683        vbucket->getState() == vbucket_state_replica)
684    {
685        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
686        collapseClosedCheckpoints(unrefCheckpointList);
687        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
688        if (curr_remains > new_remains) {
689            size_t diff = curr_remains - new_remains;
690            stats.decrDiskQueueSize(diff);
691            vbucket->decrDirtyQueueSize(diff);
692        } else if (curr_remains < new_remains) {
693            size_t diff = new_remains - curr_remains;
694            stats.diskQueueSize.fetch_add(diff);
695            vbucket->dirtyQueueSize.fetch_add(diff);
696        }
697    }
698    lh.unlock();
699
700    std::list<Checkpoint*>::iterator chkpoint_it = unrefCheckpointList.begin();
701    for (; chkpoint_it != unrefCheckpointList.end(); ++chkpoint_it) {
702        delete *chkpoint_it;
703    }
704
705    return numUnrefItems;
706}
707
708void CheckpointManager::removeInvalidCursorsOnCheckpoint(
709                                                     Checkpoint *pCheckpoint) {
710    std::list<std::string> invalidCursorNames;
711    const std::set<std::string> &cursors = pCheckpoint->getCursorNameList();
712    std::set<std::string>::const_iterator cit = cursors.begin();
713    for (; cit != cursors.end(); ++cit) {
714        // Check it with persistence cursor
715        if ((*cit).compare(persistenceCursor.name) == 0) {
716            if (pCheckpoint != *(persistenceCursor.currentCheckpoint)) {
717                invalidCursorNames.push_back(*cit);
718            }
719        } else { // Check it with tap cursors
720            cursor_index::iterator mit = tapCursors.find(*cit);
721            if (mit == tapCursors.end() ||
722                pCheckpoint != *(mit->second.currentCheckpoint)) {
723                invalidCursorNames.push_back(*cit);
724            }
725        }
726    }
727
728    std::list<std::string>::iterator it = invalidCursorNames.begin();
729    for (; it != invalidCursorNames.end(); ++it) {
730        pCheckpoint->removeCursorName(*it);
731    }
732}
733
734void CheckpointManager::collapseClosedCheckpoints(
735                                      std::list<Checkpoint*> &collapsedChks) {
736    // If there are one open checkpoint and more than one closed checkpoint,
737    // collapse those
738    // closed checkpoints into one checkpoint to reduce the memory overhead.
739    if (checkpointList.size() > 2) {
740        std::map<std::string, std::pair<uint64_t, bool> > slowCursors;
741        std::set<std::string> fastCursors;
742        std::list<Checkpoint*>::iterator lastClosedChk = checkpointList.end();
743        --lastClosedChk; --lastClosedChk; // Move to the last closed chkpt.
744        std::set<std::string>::iterator nitr =
745                (*lastClosedChk)->getCursorNameList().begin();
746        // Check if there are any cursors in the last closed checkpoint, which
747        // haven't yet visited any regular items belonging to the last closed
748        // checkpoint. If so, then we should skip collapsing checkpoints until
749        // those cursors move to the first regular item. Otherwise, those cursors will
750        // visit old items from collapsed checkpoints again.
751        for (; nitr != (*lastClosedChk)->getCursorNameList().end(); ++nitr) {
752            if (nitr->compare(persistenceCursor.name) == 0) {
753                enum queue_operation qop = (*(persistenceCursor.currentPos))->getOperation();
754                if (qop == queue_op_empty || qop == queue_op_checkpoint_start) {
755                    return;
756                }
757            } else {
758                cursor_index::iterator cc = tapCursors.find(*nitr);
759                if (cc == tapCursors.end()) {
760                    continue;
761                }
762                enum queue_operation qop = (*(cc->second.currentPos))->getOperation();
763                if (qop ==  queue_op_empty || qop == queue_op_checkpoint_start) {
764                    return;
765                }
766            }
767        }
768
769        fastCursors.insert((*lastClosedChk)->getCursorNameList().begin(),
770                           (*lastClosedChk)->getCursorNameList().end());
771        std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
772        ++rit; ++rit; //Move to the second last closed checkpoint.
773        size_t numDuplicatedItems = 0, numMetaItems = 0;
774        for (; rit != checkpointList.rend(); ++rit) {
775            size_t numAddedItems = (*lastClosedChk)->mergePrevCheckpoint(*rit);
776            numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
777            numMetaItems += 2; // checkpoint start and end meta items
778
779            std::set<std::string>::iterator nameItr =
780                (*rit)->getCursorNameList().begin();
781            for (; nameItr != (*rit)->getCursorNameList().end(); ++nameItr) {
782                if (nameItr->compare(persistenceCursor.name) == 0) {
783                    const std::string& key =
784                                 (*(persistenceCursor.currentPos))->getKey();
785                    bool cursor_on_chk_start = false;
786                    if ((*(persistenceCursor.currentPos))->getOperation() ==
787                        queue_op_checkpoint_start) {
788                        cursor_on_chk_start = true;
789                    }
790                    slowCursors[*nameItr] =
791                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
792                } else {
793                    cursor_index::iterator cc =
794                        tapCursors.find(*nameItr);
795                    const std::string& key = (*(cc->second.currentPos))->
796                                             getKey();
797                    bool cursor_on_chk_start = false;
798                    if ((*(cc->second.currentPos))->getOperation() ==
799                        queue_op_checkpoint_start) {
800                        cursor_on_chk_start = true;
801                    }
802                    slowCursors[*nameItr] =
803                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
804                }
805            }
806        }
807        putCursorsInCollapsedChk(slowCursors, lastClosedChk);
808
809        numItems.fetch_sub(numDuplicatedItems + numMetaItems);
810        Checkpoint *pOpenCheckpoint = checkpointList.back();
811        const std::set<std::string> &openCheckpointCursors =
812                                    pOpenCheckpoint->getCursorNameList();
813        fastCursors.insert(openCheckpointCursors.begin(),
814                           openCheckpointCursors.end());
815        std::set<std::string>::const_iterator cit = fastCursors.begin();
816        // Update the offset of each fast cursor.
817        for (; cit != fastCursors.end(); ++cit) {
818            if ((*cit).compare(persistenceCursor.name) == 0) {
819                decrCursorOffset_UNLOCKED(persistenceCursor,
820                                          numDuplicatedItems);
821            } else {
822                cursor_index::iterator mit = tapCursors.find(*cit);
823                if (mit != tapCursors.end()) {
824                    decrCursorOffset_UNLOCKED(mit->second,
825                                              numDuplicatedItems);
826                }
827            }
828        }
829        collapsedChks.splice(collapsedChks.end(), checkpointList,
830                             checkpointList.begin(),  lastClosedChk);
831    }
832}
833
834bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb, queued_item& qi,
835                                   bool genSeqno) {
836    LockHolder lh(queueLock);
837
838    cb_assert(vb);
839    bool canCreateNewCheckpoint = false;
840    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
841        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
842         checkpointList.front()->getNumberOfCursors() == 0)) {
843        canCreateNewCheckpoint = true;
844    }
845    if (vb->getState() == vbucket_state_active && canCreateNewCheckpoint) {
846        // Only the master active vbucket can create a next open checkpoint.
847        checkOpenCheckpoint_UNLOCKED(false, true);
848    }
849
850    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
851        addNewCheckpoint_UNLOCKED(checkpointList.back()->getId() + 1);
852    }
853
854    cb_assert(checkpointList.back()->getState() == CHECKPOINT_OPEN);
855
856    if (genSeqno) {
857        qi->setBySeqno(nextBySeqno());
858    } else {
859        lastBySeqNo = qi->getBySeqno();
860    }
861
862    queue_dirty_t result = checkpointList.back()->queueDirty(qi, this);
863    if (result == NEW_ITEM) {
864        ++numItems;
865    }
866
867    if (result != EXISTING_ITEM) {
868        ++stats.totalEnqueued;
869        ++stats.diskQueueSize;
870        vb->doStatsForQueueing(*qi, qi->size());
871    }
872
873    return result != EXISTING_ITEM;
874}
875
876void CheckpointManager::itemsPersisted() {
877    LockHolder lh(queueLock);
878    std::list<Checkpoint*>::iterator itr = persistenceCursor.currentCheckpoint;
879    pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
880
881    std::list<queued_item>::iterator curr_pos = persistenceCursor.currentPos;
882    pCursorSeqno = (*curr_pos)->getBySeqno();
883}
884
885void CheckpointManager::getAllItemsForPersistence(
886                                             std::vector<queued_item> &items) {
887    LockHolder lh(queueLock);
888    // Get all the items up to the end of the current open checkpoint.
889    while (incrCursor(persistenceCursor)) {
890        items.push_back(*(persistenceCursor.currentPos));
891    }
892
893    LOG(EXTENSION_LOG_DEBUG,
894        "Grab %ld items through the persistence cursor from vbucket %d",
895        items.size(), vbucketId);
896}
897
898queued_item CheckpointManager::nextItem(const std::string &name,
899                                        bool &isLastMutationItem,
900                                        uint64_t &endSeqno) {
901    LockHolder lh(queueLock);
902    cursor_index::iterator it = tapCursors.find(name);
903    if (it == tapCursors.end()) {
904        LOG(EXTENSION_LOG_WARNING,
905        "The cursor with name \"%s\" is not found in the checkpoint of vbucket"
906        "%d.\n", name.c_str(), vbucketId);
907        queued_item qi(new Item(std::string(""), 0xffff,
908                                queue_op_empty, 0, 0));
909        return qi;
910    }
911    if (checkpointList.back()->getId() == 0) {
912        LOG(EXTENSION_LOG_INFO,
913            "VBucket %d is still in backfill phase that doesn't allow "
914            " the tap cursor to fetch an item from it's current checkpoint",
915            vbucketId);
916        queued_item qi(new Item(std::string(""), 0xffff,
917                                queue_op_empty, 0, 0));
918        return qi;
919    }
920
921    CheckpointCursor &cursor = it->second;
922    if (incrCursor(cursor)) {
923        isLastMutationItem = isLastMutationItemInCheckpoint(cursor);
924        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
925        return *(cursor.currentPos);
926    } else {
927        isLastMutationItem = false;
928        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
929        queued_item qi(new Item(std::string(""), 0xffff,
930                                queue_op_empty, 0, 0));
931        return qi;
932    }
933}
934
935bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
936    if (++(cursor.currentPos) != (*(cursor.currentCheckpoint))->end()) {
937        queued_item &qi = *(cursor.currentPos);
938        if (qi->getOperation() != queue_op_checkpoint_start &&
939            qi->getOperation() != queue_op_checkpoint_end) {
940            ++(cursor.offset);
941        }
942        return true;
943    } else if (!moveCursorToNextCheckpoint(cursor)) {
944        --(cursor.currentPos);
945        return false;
946    }
947    return incrCursor(cursor);
948}
949
950void CheckpointManager::clear(vbucket_state_t vbState) {
951    LockHolder lh(queueLock);
952    std::list<Checkpoint*>::iterator it = checkpointList.begin();
953    // Remove all the checkpoints.
954    while(it != checkpointList.end()) {
955        delete *it;
956        ++it;
957    }
958    checkpointList.clear();
959    numItems = 0;
960
961    uint64_t checkpointId = vbState == vbucket_state_active ? 1 : 0;
962    // Add a new open checkpoint.
963    addNewCheckpoint_UNLOCKED(checkpointId);
964    resetCursors();
965}
966
967void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
968    // Reset the persistence cursor.
969    if (resetPersistenceCursor) {
970        persistenceCursor.currentCheckpoint = checkpointList.begin();
971        persistenceCursor.currentPos = checkpointList.front()->begin();
972        persistenceCursor.offset = 0;
973        checkpointList.front()->registerCursorName(persistenceCursor.name);
974    }
975
976    // Reset all the TAP cursors.
977    cursor_index::iterator cit = tapCursors.begin();
978    for (; cit != tapCursors.end(); ++cit) {
979        cit->second.currentCheckpoint = checkpointList.begin();
980        cit->second.currentPos = checkpointList.front()->begin();
981        cit->second.offset = 0;
982        checkpointList.front()->registerCursorName(cit->second.name);
983    }
984}
985
986void CheckpointManager::resetTAPCursors(const std::list<std::string> &cursors){
987    LockHolder lh(queueLock);
988    std::list<std::string>::const_iterator it = cursors.begin();
989    for (; it != cursors.end(); ++it) {
990        registerTAPCursor_UNLOCKED(*it, getOpenCheckpointId_UNLOCKED(), true);
991    }
992}
993
994bool CheckpointManager::moveCursorToNextCheckpoint(CheckpointCursor &cursor) {
995    if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_OPEN) {
996        return false;
997    } else if ((*(cursor.currentCheckpoint))->getState() ==
998                                                           CHECKPOINT_CLOSED) {
999        std::list<Checkpoint*>::iterator currCheckpoint =
1000                                                      cursor.currentCheckpoint;
1001        if (++currCheckpoint == checkpointList.end()) {
1002            return false;
1003        }
1004    }
1005
1006    // Remove the cursor's name from its current checkpoint.
1007    (*(cursor.currentCheckpoint))->removeCursorName(cursor.name);
1008    // Move the cursor to the next checkpoint.
1009    ++(cursor.currentCheckpoint);
1010    cursor.currentPos = (*(cursor.currentCheckpoint))->begin();
1011    // Register the cursor's name to its new current checkpoint.
1012    (*(cursor.currentCheckpoint))->registerCursorName(cursor.name);
1013    return true;
1014}
1015
1016size_t CheckpointManager::getNumOpenChkItems() {
1017    LockHolder lh(queueLock);
1018    if (checkpointList.empty()) {
1019        return 0;
1020    }
1021    return checkpointList.back()->getNumItems() + 1;
1022}
1023
1024uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation,
1025                                                         bool timeBound) {
1026    int checkpoint_id = 0;
1027
1028    timeBound = timeBound &&
1029                (ep_real_time() - checkpointList.back()->getCreationTime()) >=
1030                checkpointConfig.getCheckpointPeriod();
1031    // Create the new open checkpoint if any of the following conditions is
1032    // satisfied:
1033    // (1) force creation due to online update or high memory usage
1034    // (2) current checkpoint is reached to the max number of items allowed.
1035    // (3) time elapsed since the creation of the current checkpoint is greater
1036    //     than the threshold
1037    if (forceCreation ||
1038        (checkpointConfig.isItemNumBasedNewCheckpoint() &&
1039         checkpointList.back()->getNumItems() >=
1040         checkpointConfig.getCheckpointMaxItems()) ||
1041        (checkpointList.back()->getNumItems() > 0 && timeBound)) {
1042
1043        checkpoint_id = checkpointList.back()->getId();
1044        closeOpenCheckpoint_UNLOCKED(checkpoint_id);
1045        addNewCheckpoint_UNLOCKED(checkpoint_id + 1);
1046    }
1047    return checkpoint_id;
1048}
1049
1050bool CheckpointManager::eligibleForEviction(const std::string &key) {
1051    LockHolder lh(queueLock);
1052    uint64_t smallest_mid;
1053
1054    // Get the mutation id of the item pointed by the slowest cursor.
1055    // This won't cause much overhead as the number of cursors per vbucket is
1056    // usually bounded to 3 (persistence cursor + 2 replicas).
1057    const std::string &pkey = (*(persistenceCursor.currentPos))->getKey();
1058    smallest_mid = (*(persistenceCursor.currentCheckpoint))->
1059                                                     getMutationIdForKey(pkey);
1060    cursor_index::iterator mit = tapCursors.begin();
1061    for (; mit != tapCursors.end(); ++mit) {
1062        const std::string &tkey = (*(mit->second.currentPos))->getKey();
1063        uint64_t mid = (*(mit->second.currentCheckpoint))->
1064                                                     getMutationIdForKey(tkey);
1065        if (mid < smallest_mid) {
1066            smallest_mid = mid;
1067        }
1068    }
1069
1070    bool can_evict = true;
1071    std::list<Checkpoint*>::reverse_iterator it = checkpointList.rbegin();
1072    for (; it != checkpointList.rend(); ++it) {
1073        uint64_t mid = (*it)->getMutationIdForKey(key);
1074        if (mid == 0) { // key doesn't exist in a checkpoint.
1075            continue;
1076        }
1077        if (smallest_mid < mid) { // The slowest cursor is still
1078            can_evict = false;    //sitting behind a given key.
1079            break;
1080        }
1081    }
1082
1083    return can_evict;
1084}
1085
1086size_t CheckpointManager::getNumItemsForTAPConnection(
1087                                                     const std::string &name) {
1088    LockHolder lh(queueLock);
1089    size_t remains = 0;
1090    cursor_index::iterator it = tapCursors.find(name);
1091    if (it != tapCursors.end()) {
1092        remains = (numItems >= it->second.offset) ?
1093                   numItems - it->second.offset : 0;
1094    }
1095    return remains;
1096}
1097
1098size_t CheckpointManager::getNumItemsForPersistence_UNLOCKED() {
1099    size_t num_items = numItems;
1100    size_t offset = persistenceCursor.offset;
1101
1102    // Get the number of meta items that can be skipped by the persistence
1103    // cursor.
1104    size_t meta_items = 0;
1105    std::list<Checkpoint*>::iterator curr_chk =
1106                                           persistenceCursor.currentCheckpoint;
1107    for (; curr_chk != checkpointList.end(); ++curr_chk) {
1108        if (curr_chk == persistenceCursor.currentCheckpoint) {
1109            std::list<queued_item>::iterator curr_pos =
1110                                                  persistenceCursor.currentPos;
1111            ++curr_pos;
1112            if (curr_pos == (*curr_chk)->end()) {
1113                continue;
1114            }
1115            if ((*curr_pos)->getOperation() == queue_op_checkpoint_start) {
1116                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1117                    meta_items += 2;
1118                } else {
1119                    ++meta_items;
1120                }
1121            } else {
1122                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1123                    ++meta_items;
1124                }
1125            }
1126        } else {
1127            if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1128                meta_items += 2;
1129            } else {
1130                ++meta_items;
1131            }
1132        }
1133    }
1134
1135    offset += meta_items;
1136    return num_items > offset ? num_items - offset : 0;
1137}
1138
1139void CheckpointManager::decrTapCursorFromCheckpointEnd(
1140                                                    const std::string &name) {
1141    LockHolder lh(queueLock);
1142    cursor_index::iterator it = tapCursors.find(name);
1143    if (it != tapCursors.end() &&
1144        (*(it->second.currentPos))->getOperation() ==
1145        queue_op_checkpoint_end) {
1146        decrCursorPos_UNLOCKED(it->second);
1147    }
1148}
1149
1150uint64_t CheckpointManager::getMutationIdForKey(uint64_t chk_id,
1151                                                std::string key) {
1152    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
1153    for (; itr != checkpointList.end(); ++itr) {
1154        if (chk_id == (*itr)->getId()) {
1155            return (*itr)->getMutationIdForKey(key);
1156        }
1157    }
1158    return 0;
1159}
1160
1161bool CheckpointManager::isLastMutationItemInCheckpoint(
1162                                                   CheckpointCursor &cursor) {
1163    std::list<queued_item>::iterator it = cursor.currentPos;
1164    ++it;
1165    if (it == (*(cursor.currentCheckpoint))->end() ||
1166        (*it)->getOperation() == queue_op_checkpoint_end) {
1167        return true;
1168    }
1169    return false;
1170}
1171
1172void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
1173                                               const RCPtr<VBucket> &vbucket) {
1174    LockHolder lh(queueLock);
1175
1176    // Ignore CHECKPOINT_START message with ID 0 as 0 is reserved for
1177    // representing backfill.
1178    if (id == 0) {
1179        return;
1180    }
1181    // If the replica receives a checkpoint start message right after backfill
1182    // completion, simply set the current open checkpoint id to the one
1183    // received from the active vbucket.
1184    if (checkpointList.back()->getId() == 0) {
1185        setOpenCheckpointId_UNLOCKED(id);
1186        resetCursors(false);
1187        return;
1188    }
1189
1190    std::list<Checkpoint*>::iterator it = checkpointList.begin();
1191    // Check if a checkpoint exists with ID >= id.
1192    while (it != checkpointList.end()) {
1193        if (id <= (*it)->getId()) {
1194            break;
1195        }
1196        ++it;
1197    }
1198
1199    if (it == checkpointList.end()) {
1200        if ((checkpointList.back()->getId() + 1) < id) {
1201            isCollapsedCheckpoint = true;
1202            uint64_t oid = getOpenCheckpointId_UNLOCKED();
1203            lastClosedCheckpointId = oid > 0 ? (oid - 1) : 0;
1204        } else if ((checkpointList.back()->getId() + 1) == id) {
1205            isCollapsedCheckpoint = false;
1206        }
1207        if (checkpointList.back()->getState() == CHECKPOINT_OPEN &&
1208            checkpointList.back()->getNumItems() == 0) {
1209            // If the current open checkpoint doesn't have any items, simply
1210            // set its id to
1211            // the one from the master node.
1212            setOpenCheckpointId_UNLOCKED(id);
1213            // Reposition all the cursors in the open checkpoint to the
1214            // begining position so that a checkpoint_start message can be
1215            // sent again with the correct id.
1216            const std::set<std::string> &cursors = checkpointList.back()->
1217                                                   getCursorNameList();
1218            std::set<std::string>::const_iterator cit = cursors.begin();
1219            for (; cit != cursors.end(); ++cit) {
1220                if ((*cit).compare(persistenceCursor.name) == 0) {
1221                    // Persistence cursor
1222                    continue;
1223                } else { // TAP cursors
1224                    cursor_index::iterator mit = tapCursors.find(*cit);
1225                    mit->second.currentPos = checkpointList.back()->begin();
1226                }
1227            }
1228        } else {
1229            closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
1230            addNewCheckpoint_UNLOCKED(id);
1231        }
1232    } else {
1233        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
1234        collapseCheckpoints(id);
1235        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
1236        if (curr_remains > new_remains) {
1237            size_t diff = curr_remains - new_remains;
1238            stats.decrDiskQueueSize(diff);
1239            vbucket->decrDirtyQueueSize(diff);
1240        } else if (curr_remains < new_remains) {
1241            size_t diff = new_remains - curr_remains;
1242            stats.diskQueueSize.fetch_add(diff);
1243            vbucket->dirtyQueueSize.fetch_add(diff);
1244        }
1245    }
1246}
1247
1248void CheckpointManager::collapseCheckpoints(uint64_t id) {
1249    cb_assert(!checkpointList.empty());
1250
1251    std::map<std::string, std::pair<uint64_t, bool> > cursorMap;
1252    cursor_index::iterator itr;
1253    for (itr = tapCursors.begin(); itr != tapCursors.end(); itr++) {
1254        Checkpoint* chk = *(itr->second.currentCheckpoint);
1255        const std::string& key = (*(itr->second.currentPos))->getKey();
1256        bool cursor_on_chk_start = false;
1257        if ((*(itr->second.currentPos))->getOperation() == queue_op_checkpoint_start) {
1258            cursor_on_chk_start = true;
1259        }
1260        cursorMap[itr->first.c_str()] =
1261            std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1262    }
1263
1264    Checkpoint* chk = *(persistenceCursor.currentCheckpoint);
1265    std::string key = (*(persistenceCursor.currentPos))->getKey();
1266    bool cursor_on_chk_start = false;
1267    if ((*(persistenceCursor.currentPos))->getOperation() == queue_op_checkpoint_start) {
1268        cursor_on_chk_start = true;
1269    }
1270    cursorMap[persistenceCursor.name.c_str()] =
1271        std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1272
1273    setOpenCheckpointId_UNLOCKED(id);
1274
1275    std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
1276    ++rit; // Move to the last closed checkpoint.
1277    size_t numDuplicatedItems = 0, numMetaItems = 0;
1278    // Collapse all checkpoints.
1279    for (; rit != checkpointList.rend(); ++rit) {
1280        size_t numAddedItems = checkpointList.back()->
1281                               mergePrevCheckpoint(*rit);
1282        numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
1283        numMetaItems += 2; // checkpoint start and end meta items
1284        delete *rit;
1285    }
1286    numItems.fetch_sub(numDuplicatedItems + numMetaItems);
1287
1288    if (checkpointList.size() > 1) {
1289        checkpointList.erase(checkpointList.begin(), --checkpointList.end());
1290    }
1291    cb_assert(checkpointList.size() == 1);
1292
1293    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
1294        checkpointList.back()->popBackCheckpointEndItem();
1295        --numItems;
1296        checkpointList.back()->setState(CHECKPOINT_OPEN);
1297    }
1298    putCursorsInCollapsedChk(cursorMap, checkpointList.begin());
1299}
1300
1301void CheckpointManager::
1302putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &cursors,
1303                         std::list<Checkpoint*>::iterator chkItr) {
1304    size_t i;
1305    Checkpoint *chk = *chkItr;
1306    std::list<queued_item>::iterator cit = chk->begin();
1307    std::list<queued_item>::iterator last = chk->begin();
1308    for (i = 0; cit != chk->end(); ++i, ++cit) {
1309        uint64_t id = chk->getMutationIdForKey((*cit)->getKey());
1310        std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1311        while (mit != cursors.end()) {
1312            std::pair<uint64_t, bool> val = mit->second;
1313            if (val.first < id || (val.first == id && val.second &&
1314                                   (*last)->getOperation() == queue_op_checkpoint_start)) {
1315                if (mit->first.compare(persistenceCursor.name) == 0) {
1316                    persistenceCursor.currentCheckpoint = chkItr;
1317                    persistenceCursor.currentPos = last;
1318                    persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1319                    chk->registerCursorName(persistenceCursor.name);
1320                } else {
1321                    cursor_index::iterator cc = tapCursors.find(mit->first);
1322                    if (cc == tapCursors.end() ||
1323                        cc->second.fromBeginningOnChkCollapse) {
1324                        ++mit;
1325                        continue;
1326                    }
1327                    cc->second.currentCheckpoint = chkItr;
1328                    cc->second.currentPos = last;
1329                    cc->second.offset = (i > 0) ? i - 1 : 0;
1330                    chk->registerCursorName(cc->second.name);
1331                }
1332                cursors.erase(mit++);
1333            } else {
1334                ++mit;
1335            }
1336        }
1337
1338        last = cit;
1339        if (cursors.empty()) {
1340            break;
1341        }
1342    }
1343
1344    std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1345    for (; mit != cursors.end(); ++mit) {
1346        if (mit->first.compare(persistenceCursor.name) == 0) {
1347            persistenceCursor.currentCheckpoint = chkItr;
1348            persistenceCursor.currentPos = last;
1349            persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1350            chk->registerCursorName(persistenceCursor.name);
1351        } else {
1352            cursor_index::iterator cc = tapCursors.find(mit->first);
1353            if (cc == tapCursors.end()) {
1354                continue;
1355            }
1356            cc->second.currentCheckpoint = chkItr;
1357            if (cc->second.fromBeginningOnChkCollapse) {
1358                cc->second.currentPos = chk->begin();
1359                cc->second.offset = 0;
1360            } else {
1361                cc->second.currentPos = last;
1362                cc->second.offset = (i > 0) ? i - 1 : 0;
1363            }
1364            chk->registerCursorName(cc->second.name);
1365        }
1366    }
1367}
1368
1369bool CheckpointManager::hasNext(const std::string &name) {
1370    LockHolder lh(queueLock);
1371    cursor_index::iterator it = tapCursors.find(name);
1372    if (it == tapCursors.end() || getOpenCheckpointId_UNLOCKED() == 0) {
1373        return false;
1374    }
1375
1376    bool hasMore = true;
1377    std::list<queued_item>::iterator curr = it->second.currentPos;
1378    ++curr;
1379    if (curr == (*(it->second.currentCheckpoint))->end() &&
1380        (*(it->second.currentCheckpoint)) == checkpointList.back()) {
1381        hasMore = false;
1382    }
1383    return hasMore;
1384}
1385
1386queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
1387                                          enum queue_operation checkpoint_op) {
1388    cb_assert(checkpoint_op == queue_op_checkpoint_start ||
1389           checkpoint_op == queue_op_checkpoint_end ||
1390           checkpoint_op == queue_op_empty);
1391
1392    uint64_t bySeqno;
1393    std::stringstream key;
1394    if (checkpoint_op == queue_op_checkpoint_start) {
1395        key << "checkpoint_start";
1396        bySeqno = lastBySeqNo + 1;
1397    } else if (checkpoint_op == queue_op_empty) {
1398        key << "dummy_key";
1399        bySeqno = lastBySeqNo;
1400    } else {
1401        key << "checkpoint_end";
1402        bySeqno = lastBySeqNo;
1403    }
1404    queued_item qi(new Item(key.str(), vbid, checkpoint_op, id, bySeqno));
1405    return qi;
1406}
1407
1408bool CheckpointManager::hasNextForPersistence() {
1409    LockHolder lh(queueLock);
1410    bool hasMore = true;
1411    std::list<queued_item>::iterator curr = persistenceCursor.currentPos;
1412    ++curr;
1413    if (curr == (*(persistenceCursor.currentCheckpoint))->end() &&
1414        (*(persistenceCursor.currentCheckpoint)) == checkpointList.back()) {
1415        hasMore = false;
1416    }
1417    return hasMore;
1418}
1419
1420uint64_t CheckpointManager::createNewCheckpoint() {
1421    LockHolder lh(queueLock);
1422    if (checkpointList.back()->getNumItems() > 0) {
1423        uint64_t chk_id = checkpointList.back()->getId();
1424        closeOpenCheckpoint_UNLOCKED(chk_id);
1425        addNewCheckpoint_UNLOCKED(chk_id + 1);
1426    }
1427    return checkpointList.back()->getId();
1428}
1429
1430void CheckpointManager::decrCursorOffset_UNLOCKED(CheckpointCursor &cursor,
1431                                                  size_t decr) {
1432    if (cursor.offset >= decr) {
1433        cursor.offset.fetch_sub(decr);
1434    } else {
1435        cursor.offset = 0;
1436        LOG(EXTENSION_LOG_INFO,
1437            "%s cursor offset is negative. Reset it to 0.",
1438            cursor.name.c_str());
1439    }
1440}
1441
1442void CheckpointManager::decrCursorPos_UNLOCKED(CheckpointCursor &cursor) {
1443    if (cursor.currentPos != (*(cursor.currentCheckpoint))->begin()) {
1444        --(cursor.currentPos);
1445    }
1446}
1447
1448uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
1449    LockHolder lh(queueLock);
1450    return pCursorPreCheckpointId;
1451}
1452
1453uint64_t CheckpointManager::getPersistenceCursorSeqno() {
1454    LockHolder lh(queueLock);
1455    return pCursorSeqno;
1456}
1457
1458void CheckpointConfig::addConfigChangeListener(
1459                                         EventuallyPersistentEngine &engine) {
1460    Configuration &configuration = engine.getConfiguration();
1461    configuration.addValueChangedListener("chk_period",
1462             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1463    configuration.addValueChangedListener("chk_max_items",
1464             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1465    configuration.addValueChangedListener("max_checkpoints",
1466             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1467    configuration.addValueChangedListener("item_num_based_new_chk",
1468             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1469    configuration.addValueChangedListener("keep_closed_chks",
1470             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1471}
1472
1473CheckpointConfig::CheckpointConfig(EventuallyPersistentEngine &e) {
1474    Configuration &config = e.getConfiguration();
1475    checkpointPeriod = config.getChkPeriod();
1476    checkpointMaxItems = config.getChkMaxItems();
1477    maxCheckpoints = config.getMaxCheckpoints();
1478    itemNumBasedNewCheckpoint = config.isItemNumBasedNewChk();
1479    keepClosedCheckpoints = config.isKeepClosedChks();
1480}
1481
1482bool CheckpointConfig::validateCheckpointMaxItemsParam(size_t
1483                                                       checkpoint_max_items) {
1484    if (checkpoint_max_items < MIN_CHECKPOINT_ITEMS ||
1485        checkpoint_max_items > MAX_CHECKPOINT_ITEMS) {
1486        std::stringstream ss;
1487        ss << "New checkpoint_max_items param value " << checkpoint_max_items
1488           << " is not ranged between the min allowed value " <<
1489           MIN_CHECKPOINT_ITEMS
1490           << " and max value " << MAX_CHECKPOINT_ITEMS;
1491        LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
1492        return false;
1493    }
1494    return true;
1495}
1496
1497bool CheckpointConfig::validateCheckpointPeriodParam(
1498                                                   size_t checkpoint_period) {
1499    if (checkpoint_period < MIN_CHECKPOINT_PERIOD ||
1500        checkpoint_period > MAX_CHECKPOINT_PERIOD) {
1501        std::stringstream ss;
1502        ss << "New checkpoint_period param value " << checkpoint_period
1503           << " is not ranged between the min allowed value " <<
1504              MIN_CHECKPOINT_PERIOD
1505           << " and max value " << MAX_CHECKPOINT_PERIOD;
1506        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1507        return false;
1508    }
1509    return true;
1510}
1511
1512bool CheckpointConfig::validateMaxCheckpointsParam(size_t max_checkpoints) {
1513    if (max_checkpoints < DEFAULT_MAX_CHECKPOINTS ||
1514        max_checkpoints > MAX_CHECKPOINTS_UPPER_BOUND) {
1515        std::stringstream ss;
1516        ss << "New max_checkpoints param value " << max_checkpoints
1517           << " is not ranged between the min allowed value " <<
1518              DEFAULT_MAX_CHECKPOINTS
1519           << " and max value " << MAX_CHECKPOINTS_UPPER_BOUND;
1520        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1521        return false;
1522    }
1523    return true;
1524}
1525
1526void CheckpointConfig::setCheckpointPeriod(size_t value) {
1527    if (!validateCheckpointPeriodParam(value)) {
1528        value = DEFAULT_CHECKPOINT_PERIOD;
1529    }
1530    checkpointPeriod = static_cast<rel_time_t>(value);
1531}
1532
1533void CheckpointConfig::setCheckpointMaxItems(size_t value) {
1534    if (!validateCheckpointMaxItemsParam(value)) {
1535        value = DEFAULT_CHECKPOINT_ITEMS;
1536    }
1537    checkpointMaxItems = value;
1538}
1539
1540void CheckpointConfig::setMaxCheckpoints(size_t value) {
1541    if (!validateMaxCheckpointsParam(value)) {
1542        value = DEFAULT_MAX_CHECKPOINTS;
1543    }
1544    maxCheckpoints = value;
1545}
1546
1547void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
1548    LockHolder lh(queueLock);
1549    char buf[256];
1550
1551    snprintf(buf, sizeof(buf), "vb_%d:open_checkpoint_id", vbucketId);
1552    add_casted_stat(buf, getOpenCheckpointId_UNLOCKED(), add_stat, cookie);
1553    snprintf(buf, sizeof(buf), "vb_%d:last_closed_checkpoint_id", vbucketId);
1554    add_casted_stat(buf, getLastClosedCheckpointId_UNLOCKED(),
1555    add_stat, cookie);
1556    snprintf(buf, sizeof(buf), "vb_%d:persistence_seqno", vbucketId);
1557    add_casted_stat(buf, pCursorSeqno, add_stat, cookie);
1558    snprintf(buf, sizeof(buf), "vb_%d:num_tap_cursors", vbucketId);
1559    add_casted_stat(buf, tapCursors.size(), add_stat, cookie);
1560    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoint_items", vbucketId);
1561    add_casted_stat(buf, numItems, add_stat, cookie);
1562    snprintf(buf, sizeof(buf), "vb_%d:num_open_checkpoint_items", vbucketId);
1563    add_casted_stat(buf, checkpointList.empty() ? 0 :
1564                    checkpointList.back()->getNumItems(),
1565                    add_stat, cookie);
1566    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoints", vbucketId);
1567    add_casted_stat(buf, checkpointList.size(), add_stat, cookie);
1568    snprintf(buf, sizeof(buf), "vb_%d:num_items_for_persistence", vbucketId);
1569    add_casted_stat(buf, getNumItemsForPersistence_UNLOCKED(),
1570                    add_stat, cookie);
1571
1572    cursor_index::iterator tap_it = tapCursors.begin();
1573    for (; tap_it != tapCursors.end(); ++tap_it) {
1574        snprintf(buf, sizeof(buf),
1575                 "vb_%d:%s:cursor_checkpoint_id", vbucketId,
1576                 tap_it->first.c_str());
1577        add_casted_stat(buf, (*(tap_it->second.currentCheckpoint))->getId(),
1578                        add_stat, cookie);
1579    }
1580}
1581