xref: /3.0.3-GA/ep-engine/src/checkpoint.cc (revision 21d63272)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string>
21#include <utility>
22#include <vector>
23
24#include "checkpoint.h"
25#include "ep_engine.h"
26#define STATWRITER_NAMESPACE checkpoint
27#include "statwriter.h"
28#undef STATWRITER_NAMESPACE
29#include "vbucket.h"
30
31/**
32 * A listener class to update checkpoint related configs at runtime.
33 */
34class CheckpointConfigChangeListener : public ValueChangedListener {
35public:
36    CheckpointConfigChangeListener(CheckpointConfig &c) : config(c) { }
37    virtual ~CheckpointConfigChangeListener() { }
38
39    virtual void sizeValueChanged(const std::string &key, size_t value) {
40        if (key.compare("chk_period") == 0) {
41            config.setCheckpointPeriod(value);
42        } else if (key.compare("chk_max_items") == 0) {
43            config.setCheckpointMaxItems(value);
44        } else if (key.compare("max_checkpoints") == 0) {
45            config.setMaxCheckpoints(value);
46        }
47    }
48
49    virtual void booleanValueChanged(const std::string &key, bool value) {
50        if (key.compare("item_num_based_new_chk") == 0) {
51            config.allowItemNumBasedNewCheckpoint(value);
52        } else if (key.compare("keep_closed_chks") == 0) {
53            config.allowKeepClosedCheckpoints(value);
54        }
55    }
56
57private:
58    CheckpointConfig &config;
59};
60
61Checkpoint::~Checkpoint() {
62    LOG(EXTENSION_LOG_INFO,
63        "Checkpoint %llu for vbucket %d is purged from memory",
64        checkpointId, vbucketId);
65    stats.memOverhead.fetch_sub(memorySize());
66    cb_assert(stats.memOverhead.load() < GIGANTOR);
67}
68
69void Checkpoint::setState(checkpoint_state state) {
70    checkpointState = state;
71}
72
73void Checkpoint::popBackCheckpointEndItem() {
74    if (!toWrite.empty() &&
75        toWrite.back()->getOperation() == queue_op_checkpoint_end) {
76        keyIndex.erase(toWrite.back()->getKey());
77        toWrite.pop_back();
78    }
79}
80
81bool Checkpoint::keyExists(const std::string &key) {
82    return keyIndex.find(key) != keyIndex.end();
83}
84
85queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
86                                     CheckpointManager *checkpointManager) {
87    assert (checkpointState == CHECKPOINT_OPEN);
88    queue_dirty_t rv;
89
90    checkpoint_index::iterator it = keyIndex.find(qi->getKey());
91    // Check if this checkpoint already had an item for the same key.
92    if (it != keyIndex.end()) {
93        rv = EXISTING_ITEM;
94        std::list<queued_item>::iterator currPos = it->second.position;
95        uint64_t currMutationId = it->second.mutation_id;
96        CheckpointCursor &pcursor = checkpointManager->persistenceCursor;
97        queued_item &pqi = *(pcursor.currentPos);
98
99        if (*(pcursor.currentCheckpoint) == this) {
100            // If the existing item is in the left-hand side of the item
101            // pointed by the persistence cursor, decrease the persistence
102            // cursor's offset by 1.
103            const std::string &key = pqi->getKey();
104            checkpoint_index::iterator ita = keyIndex.find(key);
105            if (ita != keyIndex.end()) {
106                uint64_t mutationId = ita->second.mutation_id;
107                if (currMutationId <= mutationId &&
108                    pqi->getOperation() != queue_op_checkpoint_start) {
109                    checkpointManager->decrCursorOffset_UNLOCKED(pcursor, 1);
110                    rv = PERSIST_AGAIN;
111                }
112            }
113            // If the persistence cursor points to the existing item for the
114            // same key,
115            // shift the cursor left by 1.
116            if (pcursor.currentPos == currPos) {
117                checkpointManager->decrCursorPos_UNLOCKED(pcursor);
118            }
119        }
120
121        cursor_index::iterator map_it = checkpointManager->tapCursors.begin();
122        for (; map_it != checkpointManager->tapCursors.end(); ++map_it) {
123
124            if (*(map_it->second.currentCheckpoint) == this) {
125                queued_item &tqi = *(map_it->second.currentPos);
126                const std::string &key = tqi->getKey();
127                checkpoint_index::iterator ita = keyIndex.find(key);
128                if (ita != keyIndex.end()) {
129                    uint64_t mutationId = ita->second.mutation_id;
130                    if (currMutationId <= mutationId &&
131                        tqi->getOperation() != queue_op_checkpoint_start) {
132                        checkpointManager->
133                                  decrCursorOffset_UNLOCKED(map_it->second, 1);
134                    }
135                }
136                // If an TAP cursor points to the existing item for the same
137                // key, shift it left by 1
138                if (map_it->second.currentPos == currPos) {
139                    checkpointManager->decrCursorPos_UNLOCKED(map_it->second);
140                }
141            }
142        }
143
144        toWrite.push_back(qi);
145        // Remove the existing item for the same key from the list.
146        toWrite.erase(currPos);
147    } else {
148        if (qi->getOperation() == queue_op_set ||
149            qi->getOperation() == queue_op_del) {
150            ++numItems;
151        }
152        rv = NEW_ITEM;
153        // Push the new item into the list
154        toWrite.push_back(qi);
155    }
156
157    if (qi->getNKey() > 0) {
158        std::list<queued_item>::iterator last = toWrite.end();
159        // --last is okay as the list is not empty now.
160        index_entry entry = {--last, qi->getBySeqno()};
161        // Set the index of the key to the new item that is pushed back into
162        // the list.
163        keyIndex[qi->getKey()] = entry;
164        if (rv == NEW_ITEM) {
165            size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
166                                  sizeof(queued_item);
167            memOverhead += newEntrySize;
168            stats.memOverhead.fetch_add(newEntrySize);
169            cb_assert(stats.memOverhead.load() < GIGANTOR);
170        }
171    }
172    return rv;
173}
174
175size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
176    size_t numNewItems = 0;
177    size_t newEntryMemOverhead = 0;
178    std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
179
180    LOG(EXTENSION_LOG_INFO,
181        "Collapse the checkpoint %llu into the checkpoint %llu for vbucket %d",
182        pPrevCheckpoint->getId(), checkpointId, vbucketId);
183
184    keyIndex["dummy_key"].mutation_id =
185        pPrevCheckpoint->getMutationIdForKey("dummy_key");
186    keyIndex["checkpoint_start"].mutation_id =
187        pPrevCheckpoint->getMutationIdForKey("checkpoint_start");
188    for (; rit != pPrevCheckpoint->rend(); ++rit) {
189        const std::string &key = (*rit)->getKey();
190        if ((*rit)->getOperation() != queue_op_del &&
191            (*rit)->getOperation() != queue_op_set) {
192            continue;
193        }
194        checkpoint_index::iterator it = keyIndex.find(key);
195        if (it == keyIndex.end()) {
196            std::list<queued_item>::iterator pos = toWrite.begin();
197            // Skip the first two meta items
198            ++pos; ++pos;
199            toWrite.insert(pos, *rit);
200            index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
201                                                    getMutationIdForKey(key))};
202            keyIndex[key] = entry;
203            newEntryMemOverhead += key.size() + sizeof(index_entry);
204            ++numItems;
205            ++numNewItems;
206        }
207    }
208    memOverhead += newEntryMemOverhead;
209    stats.memOverhead.fetch_add(newEntryMemOverhead);
210    cb_assert(stats.memOverhead.load() < GIGANTOR);
211    return numNewItems;
212}
213
214uint64_t Checkpoint::getMutationIdForKey(const std::string &key) {
215    uint64_t mid = 0;
216    checkpoint_index::iterator it = keyIndex.find(key);
217    if (it != keyIndex.end()) {
218        mid = it->second.mutation_id;
219    }
220    return mid;
221}
222
223CheckpointManager::~CheckpointManager() {
224    LockHolder lh(queueLock);
225    std::list<Checkpoint*>::iterator it = checkpointList.begin();
226    while(it != checkpointList.end()) {
227        delete *it;
228        ++it;
229    }
230}
231
232uint64_t CheckpointManager::getOpenCheckpointId_UNLOCKED() {
233    if (checkpointList.empty()) {
234        return 0;
235    }
236
237    uint64_t id = checkpointList.back()->getId();
238    return checkpointList.back()->getState() == CHECKPOINT_OPEN ? id : id + 1;
239}
240
241uint64_t CheckpointManager::getOpenCheckpointId() {
242    LockHolder lh(queueLock);
243    return getOpenCheckpointId_UNLOCKED();
244}
245
246uint64_t CheckpointManager::getLastClosedCheckpointId_UNLOCKED() {
247    if (!isCollapsedCheckpoint) {
248        uint64_t id = getOpenCheckpointId_UNLOCKED();
249        lastClosedCheckpointId = id > 0 ? (id - 1) : 0;
250    }
251    return lastClosedCheckpointId;
252}
253
254uint64_t CheckpointManager::getLastClosedCheckpointId() {
255    LockHolder lh(queueLock);
256    return getLastClosedCheckpointId_UNLOCKED();
257}
258
259void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
260    if (!checkpointList.empty()) {
261        // Update the checkpoint_start item with the new Id.
262        std::list<queued_item>::iterator it =
263            ++(checkpointList.back()->begin());
264        (*it)->setRevSeqno(id);
265        if (checkpointList.back()->getId() == 0) {
266            (*it)->setBySeqno(lastBySeqNo + 1);
267        }
268        checkpointList.back()->setId(id);
269        LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %llu "
270            "for vbucket %d, bySeqno is %llu, max is %llu", id, vbucketId,
271            (*it)->getBySeqno(), lastBySeqNo);
272
273    }
274}
275
276bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
277    // This is just for making sure that the current checkpoint should be
278    // closed.
279    if (!checkpointList.empty() &&
280        checkpointList.back()->getState() == CHECKPOINT_OPEN) {
281        closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
282    }
283
284    LOG(EXTENSION_LOG_INFO, "Create a new open checkpoint %llu for vbucket %d",
285        id, vbucketId);
286
287    Checkpoint *checkpoint = new Checkpoint(stats, id, vbucketId,
288                                            CHECKPOINT_OPEN);
289    // Add a dummy item into the new checkpoint, so that any cursor referring
290    // to the actual first
291    // item in this new checkpoint can be safely shifted left by 1 if the
292    // first item is removed
293    // and pushed into the tail.
294    queued_item qi = createCheckpointItem(0, 0xffff, queue_op_empty);
295    checkpoint->queueDirty(qi, this);
296
297    // This item represents the start of the new checkpoint and is also sent to the slave node.
298    qi = createCheckpointItem(id, vbucketId, queue_op_checkpoint_start);
299    checkpoint->queueDirty(qi, this);
300    ++numItems;
301    checkpointList.push_back(checkpoint);
302
303    return true;
304}
305
306bool CheckpointManager::addNewCheckpoint(uint64_t id) {
307    LockHolder lh(queueLock);
308    return addNewCheckpoint_UNLOCKED(id);
309}
310
311bool CheckpointManager::closeOpenCheckpoint_UNLOCKED(uint64_t id) {
312    if (checkpointList.empty()) {
313        return false;
314    }
315    if (id != checkpointList.back()->getId() ||
316        checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
317        return true;
318    }
319
320    LOG(EXTENSION_LOG_INFO, "Close the open checkpoint %llu for vbucket %d",
321        id, vbucketId);
322
323    // This item represents the end of the current open checkpoint and is sent to the slave node.
324    queued_item qi = createCheckpointItem(id, vbucketId,
325                                          queue_op_checkpoint_end);
326    checkpointList.back()->queueDirty(qi, this);
327    ++numItems;
328    checkpointList.back()->setState(CHECKPOINT_CLOSED);
329    return true;
330}
331
332bool CheckpointManager::closeOpenCheckpoint(uint64_t id) {
333    LockHolder lh(queueLock);
334    return closeOpenCheckpoint_UNLOCKED(id);
335}
336
337void CheckpointManager::registerPersistenceCursor() {
338    LockHolder lh(queueLock);
339    cb_assert(!checkpointList.empty());
340    persistenceCursor.currentCheckpoint = checkpointList.begin();
341    persistenceCursor.currentPos = checkpointList.front()->begin();
342    checkpointList.front()->registerCursorName(persistenceCursor.name);
343}
344
345bool CheckpointManager::registerTAPCursor(const std::string &name,
346                                          uint64_t checkpointId,
347                                          bool alwaysFromBeginning) {
348    LockHolder lh(queueLock);
349    return registerTAPCursor_UNLOCKED(name,
350                                      checkpointId,
351                                      alwaysFromBeginning);
352}
353
354uint64_t CheckpointManager::registerTAPCursorBySeqno(const std::string &name,
355                                                     uint64_t startBySeqno,
356                                                     uint64_t endBySeqno) {
357    LockHolder lh(queueLock);
358    cb_assert(!checkpointList.empty());
359    cb_assert(checkpointList.back()->getHighSeqno() >= startBySeqno);
360
361    size_t skipped = 0;
362    uint64_t seqnoToStart = std::numeric_limits<uint64_t>::max();
363    bool needToFindStartSeqno =
364        startBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
365    bool needToFindEndSeqno =
366        endBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
367
368    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
369    for (; itr != checkpointList.end(); ++itr) {
370        uint64_t en = (*itr)->getHighSeqno();
371        uint64_t st = (*itr)->getLowSeqno();
372        if (needToFindStartSeqno) {
373            if (startBySeqno < st) {
374                tapCursors[name] = CheckpointCursor(name, itr, (*itr)->begin(),
375                                                    skipped, false);
376                (*itr)->registerCursorName(name);
377                seqnoToStart = (*itr)->getLowSeqno();
378                needToFindStartSeqno = false;
379            } else if (startBySeqno == st) {
380                std::list<queued_item>::iterator iitr = (*itr)->begin();
381
382                //Advance the iterator to point to the first item in the
383                //checkpoint list
384                std::advance(iitr, 2);
385                skipped = 2;
386
387                tapCursors[name] = CheckpointCursor(name, itr, iitr, skipped,
388                                                    false);
389                (*itr)->registerCursorName(name);
390                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno()) + 1;
391                needToFindStartSeqno = false;
392            } else if (startBySeqno <= en) {
393                std::list<queued_item>::iterator iitr = (*itr)->begin();
394                while (++iitr != (*itr)->end() &&
395                       startBySeqno > static_cast<uint64_t>((*iitr)->getBySeqno())) {
396                    skipped++;
397                }
398
399                if (iitr == (*itr)->end() ||
400                    startBySeqno < static_cast<uint64_t>((*iitr)->getBySeqno())) {
401                    --iitr;
402                }
403
404                size_t remaining = (numItems > skipped) ? numItems - skipped : 0;
405                tapCursors[name] = CheckpointCursor(name, itr, iitr, remaining,
406                                                    false);
407                (*itr)->registerCursorName(name);
408                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno());
409                needToFindStartSeqno = false;
410            } else {
411                skipped += (*itr)->getNumItems() + 2;
412            }
413        }
414        if (needToFindEndSeqno) {
415            if ((*itr)->getNumItems() > 0 && endBySeqno <= en) {
416                if ((*itr)->getState() == CHECKPOINT_OPEN) {
417                    // Closed the open checkpoint and create a new one by force.
418                    checkOpenCheckpoint_UNLOCKED(true, true);
419                }
420                needToFindEndSeqno = false;
421            }
422        }
423        if (!needToFindStartSeqno && !needToFindEndSeqno) {
424            break;
425        }
426    }
427
428    if (seqnoToStart == std::numeric_limits<uint64_t>::max()) {
429        /*
430         * We should never get here since this would mean that the sequence number
431         * we are looking for is higher than anything currently assigned and there
432         * is already an assert above for this case.
433         */
434        LOG(EXTENSION_LOG_WARNING, "Cursor not registered into vb %d "
435            " for stream '%s' because seqno %llu is too high",
436            vbucketId, name.c_str(), startBySeqno);
437    }
438    return seqnoToStart;
439}
440
441bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
442                                                   uint64_t checkpointId,
443                                                   bool alwaysFromBeginning) {
444    cb_assert(!checkpointList.empty());
445
446    bool found = false;
447    std::list<Checkpoint*>::iterator it = checkpointList.begin();
448    for (; it != checkpointList.end(); ++it) {
449        if (checkpointId == (*it)->getId()) {
450            found = true;
451            break;
452        }
453    }
454
455    LOG(EXTENSION_LOG_INFO,
456        "Register the tap cursor with the name \"%s\" for vbucket %d",
457        name.c_str(), vbucketId);
458
459    // If the tap cursor exists, remove its name from the checkpoint that is
460    // currently referenced by the tap cursor.
461    cursor_index::iterator map_it = tapCursors.find(name);
462    if (map_it != tapCursors.end()) {
463        (*(map_it->second.currentCheckpoint))->removeCursorName(name);
464    }
465
466    if (!found) {
467        for (it = checkpointList.begin(); it != checkpointList.end(); ++it) {
468            if (pCursorPreCheckpointId < (*it)->getId() ||
469                pCursorPreCheckpointId == 0) {
470                break;
471            }
472        }
473
474        LOG(EXTENSION_LOG_DEBUG,
475            "Checkpoint %llu for vbucket %d doesn't exist in memory. "
476            "Set the cursor with the name \"%s\" to checkpoint %d.\n",
477            checkpointId, vbucketId, name.c_str(), (*it)->getId());
478
479        cb_assert(it != checkpointList.end());
480
481        size_t offset = 0;
482        std::list<Checkpoint*>::iterator pos = checkpointList.begin();
483        for (; pos != it; ++pos) {
484            offset += (*pos)->getNumItems() + 2;
485        }
486
487        tapCursors[name] = CheckpointCursor(name, it, (*it)->begin(), offset,
488                                            true);
489        (*it)->registerCursorName(name);
490    } else {
491        size_t offset = 0;
492        std::list<queued_item>::iterator curr;
493
494        LOG(EXTENSION_LOG_DEBUG,
495            "Checkpoint %llu for vbucket %d exists in memory. "
496            "Set the cursor with the name \"%s\" to the checkpoint %llu\n",
497            checkpointId, vbucketId, name.c_str(), checkpointId);
498
499        if (!alwaysFromBeginning &&
500            map_it != tapCursors.end() &&
501            (*(map_it->second.currentCheckpoint))->getId() == (*it)->getId()) {
502            // If the cursor is currently in the checkpoint to start with,
503            // simply start from
504            // its current position.
505            curr = map_it->second.currentPos;
506            offset = map_it->second.offset;
507        } else {
508            // Set the cursor's position to the begining of the checkpoint to
509            // start with
510            curr = (*it)->begin();
511            std::list<Checkpoint*>::iterator pos = checkpointList.begin();
512            for (; pos != it; ++pos) {
513                offset += (*pos)->getNumItems() + 2;
514                // 2 is for checkpoint start and end items.
515            }
516        }
517
518        tapCursors[name] = CheckpointCursor(name, it, curr, offset, true);
519        // Register the tap cursor's name to the checkpoint.
520        (*it)->registerCursorName(name);
521    }
522
523    return found;
524}
525
526bool CheckpointManager::removeTAPCursor(const std::string &name) {
527    LockHolder lh(queueLock);
528
529    cursor_index::iterator it = tapCursors.find(name);
530    if (it == tapCursors.end()) {
531        return false;
532    }
533
534    LOG(EXTENSION_LOG_INFO,
535        "Remove the checkpoint cursor with the name \"%s\" from vbucket %d",
536        name.c_str(), vbucketId);
537
538    // We can simply remove the cursor's name from the checkpoint to which it
539    // currently belongs,
540    // by calling
541    // (*(it->second.currentCheckpoint))->removeCursorName(name);
542    // However, we just want to do more sanity checks by looking at each
543    // checkpoint. This won't
544    // cause much overhead because the max number of checkpoints allowed per
545    // vbucket is small.
546    std::list<Checkpoint*>::iterator cit = checkpointList.begin();
547    for (; cit != checkpointList.end(); ++cit) {
548        (*cit)->removeCursorName(name);
549    }
550
551    tapCursors.erase(it);
552    return true;
553}
554
555uint64_t CheckpointManager::getCheckpointIdForTAPCursor(
556                                                     const std::string &name) {
557    LockHolder lh(queueLock);
558    cursor_index::iterator it = tapCursors.find(name);
559    if (it == tapCursors.end()) {
560        return 0;
561    }
562
563    return (*(it->second.currentCheckpoint))->getId();
564}
565
566size_t CheckpointManager::getNumOfTAPCursors() {
567    LockHolder lh(queueLock);
568    return tapCursors.size();
569}
570
571size_t CheckpointManager::getNumCheckpoints() {
572    LockHolder lh(queueLock);
573    return checkpointList.size();
574}
575
576std::list<std::string> CheckpointManager::getTAPCursorNames() {
577    LockHolder lh(queueLock);
578    std::list<std::string> cursor_names;
579    cursor_index::iterator tap_it = tapCursors.begin();
580        for (; tap_it != tapCursors.end(); ++tap_it) {
581        cursor_names.push_back((tap_it->first));
582    }
583    return cursor_names;
584}
585
586bool CheckpointManager::isCheckpointCreationForHighMemUsage(
587                                              const RCPtr<VBucket> &vbucket) {
588    bool forceCreation = false;
589    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
590    // pesistence and tap cursors are all currently in the open checkpoint?
591    bool allCursorsInOpenCheckpoint =
592        (tapCursors.size() + 1) == checkpointList.back()->getNumberOfCursors();
593
594    if (memoryUsed > stats.mem_high_wat &&
595        allCursorsInOpenCheckpoint &&
596        (checkpointList.back()->getNumItems() >= MIN_CHECKPOINT_ITEMS ||
597         checkpointList.back()->getNumItems() == vbucket->ht.getNumInMemoryItems())) {
598        forceCreation = true;
599    }
600    return forceCreation;
601}
602
603size_t CheckpointManager::removeClosedUnrefCheckpoints(
604                                              const RCPtr<VBucket> &vbucket,
605                                              bool &newOpenCheckpointCreated) {
606
607    // This function is executed periodically by the non-IO dispatcher.
608    LockHolder lh(queueLock);
609    cb_assert(vbucket);
610    uint64_t oldCheckpointId = 0;
611    bool canCreateNewCheckpoint = false;
612    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
613        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
614         checkpointList.front()->getNumberOfCursors() == 0)) {
615        canCreateNewCheckpoint = true;
616    }
617    if (vbucket->getState() == vbucket_state_active &&
618        canCreateNewCheckpoint) {
619
620        bool forceCreation = isCheckpointCreationForHighMemUsage(vbucket);
621        // Check if this master active vbucket needs to create a new open
622        // checkpoint.
623        oldCheckpointId = checkOpenCheckpoint_UNLOCKED(forceCreation, true);
624    }
625    newOpenCheckpointCreated = oldCheckpointId > 0;
626
627    if (checkpointConfig.canKeepClosedCheckpoints()) {
628        double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
629        if (memoryUsed < stats.mem_high_wat &&
630            checkpointList.size() <= checkpointConfig.getMaxCheckpoints()) {
631            return 0;
632        }
633    }
634
635    size_t numUnrefItems = 0;
636    size_t numMetaItems = 0;
637    size_t numCheckpointsRemoved = 0;
638    std::list<Checkpoint*> unrefCheckpointList;
639    std::list<Checkpoint*>::iterator it = checkpointList.begin();
640    for (; it != checkpointList.end(); ++it) {
641        removeInvalidCursorsOnCheckpoint(*it);
642        if ((*it)->getNumberOfCursors() > 0 ||
643            (*it)->getId() > pCursorPreCheckpointId) {
644            break;
645        } else {
646            numUnrefItems += (*it)->getNumItems();
647            numMetaItems +=  2; // 2 is for checkpoint start and end items.
648            ++numCheckpointsRemoved;
649            if (checkpointConfig.canKeepClosedCheckpoints() &&
650                (checkpointList.size() - numCheckpointsRemoved) <=
651                 checkpointConfig.getMaxCheckpoints()) {
652                // Collect unreferenced closed checkpoints until the number
653                // of checkpoints is
654                // equal to the number of max checkpoints allowed.
655                ++it;
656                break;
657            }
658        }
659    }
660    numItems.fetch_sub(numUnrefItems + numMetaItems);
661    if (numUnrefItems > 0) {
662        decrCursorOffset_UNLOCKED(persistenceCursor, numUnrefItems);
663        cursor_index::iterator map_it = tapCursors.begin();
664        for (; map_it != tapCursors.end(); ++map_it) {
665            decrCursorOffset_UNLOCKED(map_it->second, numUnrefItems);
666        }
667    }
668    unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList,
669                               checkpointList.begin(), it);
670
671    // If any cursor on a replica vbucket or downstream active vbucket
672    // receiving checkpoints from
673    // the upstream master is very slow and causes more closed checkpoints in
674    // memory, collapse those closed checkpoints into a single one to reduce
675    // the memory overhead.
676    if (!checkpointConfig.canKeepClosedCheckpoints() &&
677        (vbucket->getState() == vbucket_state_replica ||
678         (vbucket->getState() == vbucket_state_active)))
679    {
680        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
681        collapseClosedCheckpoints(unrefCheckpointList);
682        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
683        if (curr_remains > new_remains) {
684            size_t diff = curr_remains - new_remains;
685            stats.decrDiskQueueSize(diff);
686            vbucket->dirtyQueueSize.fetch_sub(diff);
687        } else if (curr_remains < new_remains) {
688            size_t diff = new_remains - curr_remains;
689            stats.diskQueueSize.fetch_add(diff);
690            vbucket->dirtyQueueSize.fetch_add(diff);
691        }
692    }
693    lh.unlock();
694
695    std::list<Checkpoint*>::iterator chkpoint_it = unrefCheckpointList.begin();
696    for (; chkpoint_it != unrefCheckpointList.end(); ++chkpoint_it) {
697        delete *chkpoint_it;
698    }
699
700    return numUnrefItems;
701}
702
703void CheckpointManager::removeInvalidCursorsOnCheckpoint(
704                                                     Checkpoint *pCheckpoint) {
705    std::list<std::string> invalidCursorNames;
706    const std::set<std::string> &cursors = pCheckpoint->getCursorNameList();
707    std::set<std::string>::const_iterator cit = cursors.begin();
708    for (; cit != cursors.end(); ++cit) {
709        // Check it with persistence cursor
710        if ((*cit).compare(persistenceCursor.name) == 0) {
711            if (pCheckpoint != *(persistenceCursor.currentCheckpoint)) {
712                invalidCursorNames.push_back(*cit);
713            }
714        } else { // Check it with tap cursors
715            cursor_index::iterator mit = tapCursors.find(*cit);
716            if (mit == tapCursors.end() ||
717                pCheckpoint != *(mit->second.currentCheckpoint)) {
718                invalidCursorNames.push_back(*cit);
719            }
720        }
721    }
722
723    std::list<std::string>::iterator it = invalidCursorNames.begin();
724    for (; it != invalidCursorNames.end(); ++it) {
725        pCheckpoint->removeCursorName(*it);
726    }
727}
728
729void CheckpointManager::collapseClosedCheckpoints(
730                                      std::list<Checkpoint*> &collapsedChks) {
731    // If there are one open checkpoint and more than one closed checkpoint,
732    // collapse those
733    // closed checkpoints into one checkpoint to reduce the memory overhead.
734    if (checkpointList.size() > 2) {
735        std::map<std::string, std::pair<uint64_t, bool> > slowCursors;
736        std::set<std::string> fastCursors;
737        std::list<Checkpoint*>::iterator lastClosedChk = checkpointList.end();
738        --lastClosedChk; --lastClosedChk; // Move to the lastest closed chkpt.
739        fastCursors.insert((*lastClosedChk)->getCursorNameList().begin(),
740                           (*lastClosedChk)->getCursorNameList().end());
741        std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
742        ++rit; ++rit;// Move to the second lastest closed checkpoint.
743        size_t numDuplicatedItems = 0, numMetaItems = 0;
744        for (; rit != checkpointList.rend(); ++rit) {
745            size_t numAddedItems = (*lastClosedChk)->mergePrevCheckpoint(*rit);
746            numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
747            numMetaItems += 2; // checkpoint start and end meta items
748
749            std::set<std::string>::iterator nameItr =
750                (*rit)->getCursorNameList().begin();
751            for (; nameItr != (*rit)->getCursorNameList().end(); ++nameItr) {
752                if (nameItr->compare(persistenceCursor.name) == 0) {
753                    const std::string& key =
754                                 (*(persistenceCursor.currentPos))->getKey();
755                    bool cursor_on_chk_start = false;
756                    if ((*(persistenceCursor.currentPos))->getOperation() ==
757                        queue_op_checkpoint_start) {
758                        cursor_on_chk_start = true;
759                    }
760                    slowCursors[*nameItr] =
761                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
762                } else {
763                    cursor_index::iterator cc =
764                        tapCursors.find(*nameItr);
765                    const std::string& key = (*(cc->second.currentPos))->
766                                             getKey();
767                    bool cursor_on_chk_start = false;
768                    if ((*(cc->second.currentPos))->getOperation() ==
769                        queue_op_checkpoint_start) {
770                        cursor_on_chk_start = true;
771                    }
772                    slowCursors[*nameItr] =
773                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
774                }
775            }
776        }
777        putCursorsInCollapsedChk(slowCursors, lastClosedChk);
778
779        numItems.fetch_sub(numDuplicatedItems + numMetaItems);
780        Checkpoint *pOpenCheckpoint = checkpointList.back();
781        const std::set<std::string> &openCheckpointCursors =
782                                    pOpenCheckpoint->getCursorNameList();
783        fastCursors.insert(openCheckpointCursors.begin(),
784                           openCheckpointCursors.end());
785        std::set<std::string>::const_iterator cit = fastCursors.begin();
786        // Update the offset of each fast cursor.
787        for (; cit != fastCursors.end(); ++cit) {
788            if ((*cit).compare(persistenceCursor.name) == 0) {
789                decrCursorOffset_UNLOCKED(persistenceCursor,
790                                          numDuplicatedItems);
791            } else {
792                cursor_index::iterator mit = tapCursors.find(*cit);
793                if (mit != tapCursors.end()) {
794                    decrCursorOffset_UNLOCKED(mit->second,
795                                              numDuplicatedItems);
796                }
797            }
798        }
799        collapsedChks.splice(collapsedChks.end(), checkpointList,
800                             checkpointList.begin(),  lastClosedChk);
801    }
802}
803
804bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb, queued_item& qi,
805                                   bool genSeqno) {
806    LockHolder lh(queueLock);
807
808    cb_assert(vb);
809    bool canCreateNewCheckpoint = false;
810    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
811        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
812         checkpointList.front()->getNumberOfCursors() == 0)) {
813        canCreateNewCheckpoint = true;
814    }
815    if (vb->getState() == vbucket_state_active && canCreateNewCheckpoint) {
816        // Only the master active vbucket can create a next open checkpoint.
817        checkOpenCheckpoint_UNLOCKED(false, true);
818    }
819
820    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
821        addNewCheckpoint_UNLOCKED(checkpointList.back()->getId() + 1);
822    }
823
824    cb_assert(checkpointList.back()->getState() == CHECKPOINT_OPEN);
825
826    if (genSeqno) {
827        qi->setBySeqno(nextBySeqno());
828    } else {
829        lastBySeqNo = qi->getBySeqno();
830    }
831
832    queue_dirty_t result = checkpointList.back()->queueDirty(qi, this);
833    if (result == NEW_ITEM) {
834        ++numItems;
835    }
836
837    if (result != EXISTING_ITEM) {
838        ++stats.totalEnqueued;
839        ++stats.diskQueueSize;
840        vb->doStatsForQueueing(*qi, qi->size());
841    }
842
843    return result != EXISTING_ITEM;
844}
845
846void CheckpointManager::itemsPersisted() {
847    LockHolder lh(queueLock);
848    std::list<Checkpoint*>::iterator itr = persistenceCursor.currentCheckpoint;
849    pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
850
851    std::list<queued_item>::iterator curr_pos = persistenceCursor.currentPos;
852    pCursorSeqno = (*curr_pos)->getBySeqno();
853}
854
855void CheckpointManager::getAllItemsForPersistence(
856                                             std::vector<queued_item> &items) {
857    LockHolder lh(queueLock);
858    // Get all the items up to the end of the current open checkpoint.
859    while (incrCursor(persistenceCursor)) {
860        items.push_back(*(persistenceCursor.currentPos));
861    }
862
863    LOG(EXTENSION_LOG_DEBUG,
864        "Grab %ld items through the persistence cursor from vbucket %d",
865        items.size(), vbucketId);
866}
867
868queued_item CheckpointManager::nextItem(const std::string &name,
869                                        bool &isLastMutationItem,
870                                        uint64_t &endSeqno) {
871    LockHolder lh(queueLock);
872    cursor_index::iterator it = tapCursors.find(name);
873    if (it == tapCursors.end()) {
874        LOG(EXTENSION_LOG_WARNING,
875        "The cursor with name \"%s\" is not found in the checkpoint of vbucket"
876        "%d.\n", name.c_str(), vbucketId);
877        queued_item qi(new Item(std::string(""), 0xffff,
878                                queue_op_empty, 0, 0));
879        return qi;
880    }
881    if (checkpointList.back()->getId() == 0) {
882        LOG(EXTENSION_LOG_INFO,
883            "VBucket %d is still in backfill phase that doesn't allow "
884            " the tap cursor to fetch an item from it's current checkpoint",
885            vbucketId);
886        queued_item qi(new Item(std::string(""), 0xffff,
887                                queue_op_empty, 0, 0));
888        return qi;
889    }
890
891    CheckpointCursor &cursor = it->second;
892    if (incrCursor(cursor)) {
893        isLastMutationItem = isLastMutationItemInCheckpoint(cursor);
894        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
895        return *(cursor.currentPos);
896    } else {
897        isLastMutationItem = false;
898        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
899        queued_item qi(new Item(std::string(""), 0xffff,
900                                queue_op_empty, 0, 0));
901        return qi;
902    }
903}
904
905bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
906    if (++(cursor.currentPos) != (*(cursor.currentCheckpoint))->end()) {
907        queued_item &qi = *(cursor.currentPos);
908        if (qi->getOperation() != queue_op_checkpoint_start &&
909            qi->getOperation() != queue_op_checkpoint_end) {
910            ++(cursor.offset);
911        }
912        return true;
913    } else if (!moveCursorToNextCheckpoint(cursor)) {
914        --(cursor.currentPos);
915        return false;
916    }
917    return incrCursor(cursor);
918}
919
920void CheckpointManager::clear(vbucket_state_t vbState) {
921    LockHolder lh(queueLock);
922    std::list<Checkpoint*>::iterator it = checkpointList.begin();
923    // Remove all the checkpoints.
924    while(it != checkpointList.end()) {
925        delete *it;
926        ++it;
927    }
928    checkpointList.clear();
929    numItems = 0;
930
931    uint64_t checkpointId = vbState == vbucket_state_active ? 1 : 0;
932    // Add a new open checkpoint.
933    addNewCheckpoint_UNLOCKED(checkpointId);
934    resetCursors();
935}
936
937void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
938    // Reset the persistence cursor.
939    if (resetPersistenceCursor) {
940        persistenceCursor.currentCheckpoint = checkpointList.begin();
941        persistenceCursor.currentPos = checkpointList.front()->begin();
942        persistenceCursor.offset = 0;
943        checkpointList.front()->registerCursorName(persistenceCursor.name);
944    }
945
946    // Reset all the TAP cursors.
947    cursor_index::iterator cit = tapCursors.begin();
948    for (; cit != tapCursors.end(); ++cit) {
949        cit->second.currentCheckpoint = checkpointList.begin();
950        cit->second.currentPos = checkpointList.front()->begin();
951        cit->second.offset = 0;
952        checkpointList.front()->registerCursorName(cit->second.name);
953    }
954}
955
956void CheckpointManager::resetTAPCursors(const std::list<std::string> &cursors){
957    LockHolder lh(queueLock);
958    std::list<std::string>::const_iterator it = cursors.begin();
959    for (; it != cursors.end(); ++it) {
960        registerTAPCursor_UNLOCKED(*it, getOpenCheckpointId_UNLOCKED(), true);
961    }
962}
963
964bool CheckpointManager::moveCursorToNextCheckpoint(CheckpointCursor &cursor) {
965    if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_OPEN) {
966        return false;
967    } else if ((*(cursor.currentCheckpoint))->getState() ==
968                                                           CHECKPOINT_CLOSED) {
969        std::list<Checkpoint*>::iterator currCheckpoint =
970                                                      cursor.currentCheckpoint;
971        if (++currCheckpoint == checkpointList.end()) {
972            return false;
973        }
974    }
975
976    // Remove the cursor's name from its current checkpoint.
977    (*(cursor.currentCheckpoint))->removeCursorName(cursor.name);
978    // Move the cursor to the next checkpoint.
979    ++(cursor.currentCheckpoint);
980    cursor.currentPos = (*(cursor.currentCheckpoint))->begin();
981    // Register the cursor's name to its new current checkpoint.
982    (*(cursor.currentCheckpoint))->registerCursorName(cursor.name);
983    return true;
984}
985
986size_t CheckpointManager::getNumOpenChkItems() {
987    LockHolder lh(queueLock);
988    if (checkpointList.empty()) {
989        return 0;
990    }
991    return checkpointList.back()->getNumItems() + 1;
992}
993
994uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation,
995                                                         bool timeBound) {
996    int checkpoint_id = 0;
997
998    timeBound = timeBound &&
999                (ep_real_time() - checkpointList.back()->getCreationTime()) >=
1000                checkpointConfig.getCheckpointPeriod();
1001    // Create the new open checkpoint if any of the following conditions is
1002    // satisfied:
1003    // (1) force creation due to online update or high memory usage
1004    // (2) current checkpoint is reached to the max number of items allowed.
1005    // (3) time elapsed since the creation of the current checkpoint is greater
1006    //     than the threshold
1007    if (forceCreation ||
1008        (checkpointConfig.isItemNumBasedNewCheckpoint() &&
1009         checkpointList.back()->getNumItems() >=
1010         checkpointConfig.getCheckpointMaxItems()) ||
1011        (checkpointList.back()->getNumItems() > 0 && timeBound)) {
1012
1013        checkpoint_id = checkpointList.back()->getId();
1014        closeOpenCheckpoint_UNLOCKED(checkpoint_id);
1015        addNewCheckpoint_UNLOCKED(checkpoint_id + 1);
1016    }
1017    return checkpoint_id;
1018}
1019
1020bool CheckpointManager::eligibleForEviction(const std::string &key) {
1021    LockHolder lh(queueLock);
1022    uint64_t smallest_mid;
1023
1024    // Get the mutation id of the item pointed by the slowest cursor.
1025    // This won't cause much overhead as the number of cursors per vbucket is
1026    // usually bounded to 3 (persistence cursor + 2 replicas).
1027    const std::string &pkey = (*(persistenceCursor.currentPos))->getKey();
1028    smallest_mid = (*(persistenceCursor.currentCheckpoint))->
1029                                                     getMutationIdForKey(pkey);
1030    cursor_index::iterator mit = tapCursors.begin();
1031    for (; mit != tapCursors.end(); ++mit) {
1032        const std::string &tkey = (*(mit->second.currentPos))->getKey();
1033        uint64_t mid = (*(mit->second.currentCheckpoint))->
1034                                                     getMutationIdForKey(tkey);
1035        if (mid < smallest_mid) {
1036            smallest_mid = mid;
1037        }
1038    }
1039
1040    bool can_evict = true;
1041    std::list<Checkpoint*>::reverse_iterator it = checkpointList.rbegin();
1042    for (; it != checkpointList.rend(); ++it) {
1043        uint64_t mid = (*it)->getMutationIdForKey(key);
1044        if (mid == 0) { // key doesn't exist in a checkpoint.
1045            continue;
1046        }
1047        if (smallest_mid < mid) { // The slowest cursor is still
1048            can_evict = false;    //sitting behind a given key.
1049            break;
1050        }
1051    }
1052
1053    return can_evict;
1054}
1055
1056size_t CheckpointManager::getNumItemsForTAPConnection(
1057                                                     const std::string &name) {
1058    LockHolder lh(queueLock);
1059    size_t remains = 0;
1060    cursor_index::iterator it = tapCursors.find(name);
1061    if (it != tapCursors.end()) {
1062        remains = (numItems >= it->second.offset) ?
1063                   numItems - it->second.offset : 0;
1064    }
1065    return remains;
1066}
1067
1068size_t CheckpointManager::getNumItemsForPersistence_UNLOCKED() {
1069    size_t num_items = numItems;
1070    size_t offset = persistenceCursor.offset;
1071
1072    // Get the number of meta items that can be skipped by the persistence
1073    // cursor.
1074    size_t meta_items = 0;
1075    std::list<Checkpoint*>::iterator curr_chk =
1076                                           persistenceCursor.currentCheckpoint;
1077    for (; curr_chk != checkpointList.end(); ++curr_chk) {
1078        if (curr_chk == persistenceCursor.currentCheckpoint) {
1079            std::list<queued_item>::iterator curr_pos =
1080                                                  persistenceCursor.currentPos;
1081            ++curr_pos;
1082            if (curr_pos == (*curr_chk)->end()) {
1083                continue;
1084            }
1085            if ((*curr_pos)->getOperation() == queue_op_checkpoint_start) {
1086                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1087                    meta_items += 2;
1088                } else {
1089                    ++meta_items;
1090                }
1091            } else {
1092                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1093                    ++meta_items;
1094                }
1095            }
1096        } else {
1097            if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1098                meta_items += 2;
1099            } else {
1100                ++meta_items;
1101            }
1102        }
1103    }
1104
1105    offset += meta_items;
1106    return num_items > offset ? num_items - offset : 0;
1107}
1108
1109void CheckpointManager::decrTapCursorFromCheckpointEnd(
1110                                                    const std::string &name) {
1111    LockHolder lh(queueLock);
1112    cursor_index::iterator it = tapCursors.find(name);
1113    if (it != tapCursors.end() &&
1114        (*(it->second.currentPos))->getOperation() ==
1115        queue_op_checkpoint_end) {
1116        decrCursorPos_UNLOCKED(it->second);
1117    }
1118}
1119
1120uint64_t CheckpointManager::getMutationIdForKey(uint64_t chk_id,
1121                                                std::string key) {
1122    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
1123    for (; itr != checkpointList.end(); ++itr) {
1124        if (chk_id == (*itr)->getId()) {
1125            return (*itr)->getMutationIdForKey(key);
1126        }
1127    }
1128    return 0;
1129}
1130
1131bool CheckpointManager::isLastMutationItemInCheckpoint(
1132                                                   CheckpointCursor &cursor) {
1133    std::list<queued_item>::iterator it = cursor.currentPos;
1134    ++it;
1135    if (it == (*(cursor.currentCheckpoint))->end() ||
1136        (*it)->getOperation() == queue_op_checkpoint_end) {
1137        return true;
1138    }
1139    return false;
1140}
1141
1142void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
1143                                               const RCPtr<VBucket> &vbucket) {
1144    LockHolder lh(queueLock);
1145
1146    // Ignore CHECKPOINT_START message with ID 0 as 0 is reserved for
1147    // representing backfill.
1148    if (id == 0) {
1149        return;
1150    }
1151    // If the replica receives a checkpoint start message right after backfill
1152    // completion, simply set the current open checkpoint id to the one
1153    // received from the active vbucket.
1154    if (checkpointList.back()->getId() == 0) {
1155        setOpenCheckpointId_UNLOCKED(id);
1156        resetCursors(false);
1157        return;
1158    }
1159
1160    std::list<Checkpoint*>::iterator it = checkpointList.begin();
1161    // Check if a checkpoint exists with ID >= id.
1162    while (it != checkpointList.end()) {
1163        if (id <= (*it)->getId()) {
1164            break;
1165        }
1166        ++it;
1167    }
1168
1169    if (it == checkpointList.end()) {
1170        if ((checkpointList.back()->getId() + 1) < id) {
1171            isCollapsedCheckpoint = true;
1172            uint64_t oid = getOpenCheckpointId_UNLOCKED();
1173            lastClosedCheckpointId = oid > 0 ? (oid - 1) : 0;
1174        } else if ((checkpointList.back()->getId() + 1) == id) {
1175            isCollapsedCheckpoint = false;
1176        }
1177        if (checkpointList.back()->getState() == CHECKPOINT_OPEN &&
1178            checkpointList.back()->getNumItems() == 0) {
1179            // If the current open checkpoint doesn't have any items, simply
1180            // set its id to
1181            // the one from the master node.
1182            setOpenCheckpointId_UNLOCKED(id);
1183            // Reposition all the cursors in the open checkpoint to the
1184            // begining position so that a checkpoint_start message can be
1185            // sent again with the correct id.
1186            const std::set<std::string> &cursors = checkpointList.back()->
1187                                                   getCursorNameList();
1188            std::set<std::string>::const_iterator cit = cursors.begin();
1189            for (; cit != cursors.end(); ++cit) {
1190                if ((*cit).compare(persistenceCursor.name) == 0) {
1191                    // Persistence cursor
1192                    continue;
1193                } else { // TAP cursors
1194                    cursor_index::iterator mit = tapCursors.find(*cit);
1195                    mit->second.currentPos = checkpointList.back()->begin();
1196                }
1197            }
1198        } else {
1199            closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
1200            addNewCheckpoint_UNLOCKED(id);
1201        }
1202    } else {
1203        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
1204        collapseCheckpoints(id);
1205        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
1206        if (curr_remains > new_remains) {
1207            size_t diff = curr_remains - new_remains;
1208            stats.decrDiskQueueSize(diff);
1209            vbucket->dirtyQueueSize.fetch_sub(diff);
1210        } else if (curr_remains < new_remains) {
1211            size_t diff = new_remains - curr_remains;
1212            stats.diskQueueSize.fetch_add(diff);
1213            vbucket->dirtyQueueSize.fetch_add(diff);
1214        }
1215    }
1216}
1217
1218void CheckpointManager::collapseCheckpoints(uint64_t id) {
1219    cb_assert(!checkpointList.empty());
1220
1221    std::map<std::string, std::pair<uint64_t, bool> > cursorMap;
1222    cursor_index::iterator itr;
1223    for (itr = tapCursors.begin(); itr != tapCursors.end(); itr++) {
1224        Checkpoint* chk = *(itr->second.currentCheckpoint);
1225        const std::string& key = (*(itr->second.currentPos))->getKey();
1226        bool cursor_on_chk_start = false;
1227        if ((*(itr->second.currentPos))->getOperation() == queue_op_checkpoint_start) {
1228            cursor_on_chk_start = true;
1229        }
1230        cursorMap[itr->first.c_str()] =
1231            std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1232    }
1233
1234    Checkpoint* chk = *(persistenceCursor.currentCheckpoint);
1235    std::string key = (*(persistenceCursor.currentPos))->getKey();
1236    bool cursor_on_chk_start = false;
1237    if ((*(persistenceCursor.currentPos))->getOperation() == queue_op_checkpoint_start) {
1238        cursor_on_chk_start = true;
1239    }
1240    cursorMap[persistenceCursor.name.c_str()] =
1241        std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1242
1243    std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
1244    ++rit; // Move to the last closed checkpoint.
1245    size_t numDuplicatedItems = 0, numMetaItems = 0;
1246    // Collapse all checkpoints.
1247    for (; rit != checkpointList.rend(); ++rit) {
1248        size_t numAddedItems = checkpointList.back()->
1249                               mergePrevCheckpoint(*rit);
1250        numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
1251        numMetaItems += 2; // checkpoint start and end meta items
1252        delete *rit;
1253    }
1254    numItems.fetch_sub(numDuplicatedItems + numMetaItems);
1255
1256    if (checkpointList.size() > 1) {
1257        checkpointList.erase(checkpointList.begin(), --checkpointList.end());
1258    }
1259    cb_assert(checkpointList.size() == 1);
1260
1261    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
1262        checkpointList.back()->popBackCheckpointEndItem();
1263        --numItems;
1264        checkpointList.back()->setState(CHECKPOINT_OPEN);
1265    }
1266    setOpenCheckpointId_UNLOCKED(id);
1267    putCursorsInCollapsedChk(cursorMap, checkpointList.begin());
1268}
1269
1270void CheckpointManager::
1271putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &cursors,
1272                         std::list<Checkpoint*>::iterator chkItr) {
1273    size_t i;
1274    Checkpoint *chk = *chkItr;
1275    std::list<queued_item>::iterator cit = chk->begin();
1276    std::list<queued_item>::iterator last = chk->begin();
1277    for (i = 0; cit != chk->end(); ++i, ++cit) {
1278        uint64_t id = chk->getMutationIdForKey((*cit)->getKey());
1279        std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1280        while (mit != cursors.end()) {
1281            std::pair<uint64_t, bool> val = mit->second;
1282            if (val.first < id || (val.first == id && val.second &&
1283                                   (*last)->getOperation() == queue_op_checkpoint_start)) {
1284                if (mit->first.compare(persistenceCursor.name) == 0) {
1285                    persistenceCursor.currentCheckpoint = chkItr;
1286                    persistenceCursor.currentPos = last;
1287                    persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1288                    chk->registerCursorName(persistenceCursor.name);
1289                } else {
1290                    cursor_index::iterator cc = tapCursors.find(mit->first);
1291                    if (cc == tapCursors.end() ||
1292                        cc->second.fromBeginningOnChkCollapse) {
1293                        ++mit;
1294                        continue;
1295                    }
1296                    cc->second.currentCheckpoint = chkItr;
1297                    cc->second.currentPos = last;
1298                    cc->second.offset = (i > 0) ? i - 1 : 0;
1299                    chk->registerCursorName(cc->second.name);
1300                }
1301                cursors.erase(mit++);
1302            } else {
1303                ++mit;
1304            }
1305        }
1306        last = cit;
1307    }
1308
1309    std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1310    for (; mit != cursors.end(); ++mit) {
1311        if (mit->first.compare(persistenceCursor.name) == 0) {
1312            persistenceCursor.currentCheckpoint = chkItr;
1313            persistenceCursor.currentPos = last;
1314            persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1315            chk->registerCursorName(persistenceCursor.name);
1316        } else {
1317            cursor_index::iterator cc = tapCursors.find(mit->first);
1318            if (cc == tapCursors.end()) {
1319                continue;
1320            }
1321            cc->second.currentCheckpoint = chkItr;
1322            if (cc->second.fromBeginningOnChkCollapse) {
1323                cc->second.currentPos = chk->begin();
1324                cc->second.offset = 0;
1325            } else {
1326                cc->second.currentPos = last;
1327                cc->second.offset = (i > 0) ? i - 1 : 0;
1328            }
1329            chk->registerCursorName(cc->second.name);
1330        }
1331    }
1332}
1333
1334bool CheckpointManager::hasNext(const std::string &name) {
1335    LockHolder lh(queueLock);
1336    cursor_index::iterator it = tapCursors.find(name);
1337    if (it == tapCursors.end() || getOpenCheckpointId_UNLOCKED() == 0) {
1338        return false;
1339    }
1340
1341    bool hasMore = true;
1342    std::list<queued_item>::iterator curr = it->second.currentPos;
1343    ++curr;
1344    if (curr == (*(it->second.currentCheckpoint))->end() &&
1345        (*(it->second.currentCheckpoint)) == checkpointList.back()) {
1346        hasMore = false;
1347    }
1348    return hasMore;
1349}
1350
1351queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
1352                                          enum queue_operation checkpoint_op) {
1353    cb_assert(checkpoint_op == queue_op_checkpoint_start ||
1354           checkpoint_op == queue_op_checkpoint_end ||
1355           checkpoint_op == queue_op_empty);
1356
1357    uint64_t bySeqno;
1358    std::stringstream key;
1359    if (checkpoint_op == queue_op_checkpoint_start) {
1360        key << "checkpoint_start";
1361        bySeqno = lastBySeqNo + 1;
1362    } else if (checkpoint_op == queue_op_empty) {
1363        key << "dummy_key";
1364        bySeqno = lastBySeqNo;
1365    } else {
1366        key << "checkpoint_end";
1367        bySeqno = lastBySeqNo + 1;
1368    }
1369    queued_item qi(new Item(key.str(), vbid, checkpoint_op, id, bySeqno));
1370    return qi;
1371}
1372
1373bool CheckpointManager::hasNextForPersistence() {
1374    LockHolder lh(queueLock);
1375    bool hasMore = true;
1376    std::list<queued_item>::iterator curr = persistenceCursor.currentPos;
1377    ++curr;
1378    if (curr == (*(persistenceCursor.currentCheckpoint))->end() &&
1379        (*(persistenceCursor.currentCheckpoint)) == checkpointList.back()) {
1380        hasMore = false;
1381    }
1382    return hasMore;
1383}
1384
1385uint64_t CheckpointManager::createNewCheckpoint() {
1386    LockHolder lh(queueLock);
1387    if (checkpointList.back()->getNumItems() > 0) {
1388        uint64_t chk_id = checkpointList.back()->getId();
1389        closeOpenCheckpoint_UNLOCKED(chk_id);
1390        addNewCheckpoint_UNLOCKED(chk_id + 1);
1391    }
1392    return checkpointList.back()->getId();
1393}
1394
1395void CheckpointManager::decrCursorOffset_UNLOCKED(CheckpointCursor &cursor,
1396                                                  size_t decr) {
1397    if (cursor.offset >= decr) {
1398        cursor.offset.fetch_sub(decr);
1399    } else {
1400        cursor.offset = 0;
1401        LOG(EXTENSION_LOG_INFO,
1402            "%s cursor offset is negative. Reset it to 0.",
1403            cursor.name.c_str());
1404    }
1405}
1406
1407void CheckpointManager::decrCursorPos_UNLOCKED(CheckpointCursor &cursor) {
1408    if (cursor.currentPos != (*(cursor.currentCheckpoint))->begin()) {
1409        --(cursor.currentPos);
1410    }
1411}
1412
1413uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
1414    LockHolder lh(queueLock);
1415    return pCursorPreCheckpointId;
1416}
1417
1418uint64_t CheckpointManager::getPersistenceCursorSeqno() {
1419    LockHolder lh(queueLock);
1420    return pCursorSeqno;
1421}
1422
1423void CheckpointConfig::addConfigChangeListener(
1424                                         EventuallyPersistentEngine &engine) {
1425    Configuration &configuration = engine.getConfiguration();
1426    configuration.addValueChangedListener("chk_period",
1427             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1428    configuration.addValueChangedListener("chk_max_items",
1429             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1430    configuration.addValueChangedListener("max_checkpoints",
1431             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1432    configuration.addValueChangedListener("item_num_based_new_chk",
1433             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1434    configuration.addValueChangedListener("keep_closed_chks",
1435             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1436}
1437
1438CheckpointConfig::CheckpointConfig(EventuallyPersistentEngine &e) {
1439    Configuration &config = e.getConfiguration();
1440    checkpointPeriod = config.getChkPeriod();
1441    checkpointMaxItems = config.getChkMaxItems();
1442    maxCheckpoints = config.getMaxCheckpoints();
1443    itemNumBasedNewCheckpoint = config.isItemNumBasedNewChk();
1444    keepClosedCheckpoints = config.isKeepClosedChks();
1445}
1446
1447bool CheckpointConfig::validateCheckpointMaxItemsParam(size_t
1448                                                       checkpoint_max_items) {
1449    if (checkpoint_max_items < MIN_CHECKPOINT_ITEMS ||
1450        checkpoint_max_items > MAX_CHECKPOINT_ITEMS) {
1451        std::stringstream ss;
1452        ss << "New checkpoint_max_items param value " << checkpoint_max_items
1453           << " is not ranged between the min allowed value " <<
1454           MIN_CHECKPOINT_ITEMS
1455           << " and max value " << MAX_CHECKPOINT_ITEMS;
1456        LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
1457        return false;
1458    }
1459    return true;
1460}
1461
1462bool CheckpointConfig::validateCheckpointPeriodParam(
1463                                                   size_t checkpoint_period) {
1464    if (checkpoint_period < MIN_CHECKPOINT_PERIOD ||
1465        checkpoint_period > MAX_CHECKPOINT_PERIOD) {
1466        std::stringstream ss;
1467        ss << "New checkpoint_period param value " << checkpoint_period
1468           << " is not ranged between the min allowed value " <<
1469              MIN_CHECKPOINT_PERIOD
1470           << " and max value " << MAX_CHECKPOINT_PERIOD;
1471        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1472        return false;
1473    }
1474    return true;
1475}
1476
1477bool CheckpointConfig::validateMaxCheckpointsParam(size_t max_checkpoints) {
1478    if (max_checkpoints < DEFAULT_MAX_CHECKPOINTS ||
1479        max_checkpoints > MAX_CHECKPOINTS_UPPER_BOUND) {
1480        std::stringstream ss;
1481        ss << "New max_checkpoints param value " << max_checkpoints
1482           << " is not ranged between the min allowed value " <<
1483              DEFAULT_MAX_CHECKPOINTS
1484           << " and max value " << MAX_CHECKPOINTS_UPPER_BOUND;
1485        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1486        return false;
1487    }
1488    return true;
1489}
1490
1491void CheckpointConfig::setCheckpointPeriod(size_t value) {
1492    if (!validateCheckpointPeriodParam(value)) {
1493        value = DEFAULT_CHECKPOINT_PERIOD;
1494    }
1495    checkpointPeriod = static_cast<rel_time_t>(value);
1496}
1497
1498void CheckpointConfig::setCheckpointMaxItems(size_t value) {
1499    if (!validateCheckpointMaxItemsParam(value)) {
1500        value = DEFAULT_CHECKPOINT_ITEMS;
1501    }
1502    checkpointMaxItems = value;
1503}
1504
1505void CheckpointConfig::setMaxCheckpoints(size_t value) {
1506    if (!validateMaxCheckpointsParam(value)) {
1507        value = DEFAULT_MAX_CHECKPOINTS;
1508    }
1509    maxCheckpoints = value;
1510}
1511
1512void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
1513    LockHolder lh(queueLock);
1514    char buf[256];
1515
1516    snprintf(buf, sizeof(buf), "vb_%d:open_checkpoint_id", vbucketId);
1517    add_casted_stat(buf, getOpenCheckpointId_UNLOCKED(), add_stat, cookie);
1518    snprintf(buf, sizeof(buf), "vb_%d:last_closed_checkpoint_id", vbucketId);
1519    add_casted_stat(buf, getLastClosedCheckpointId_UNLOCKED(),
1520    add_stat, cookie);
1521    snprintf(buf, sizeof(buf), "vb_%d:persistence_seqno", vbucketId);
1522    add_casted_stat(buf, pCursorSeqno, add_stat, cookie);
1523    snprintf(buf, sizeof(buf), "vb_%d:num_tap_cursors", vbucketId);
1524    add_casted_stat(buf, tapCursors.size(), add_stat, cookie);
1525    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoint_items", vbucketId);
1526    add_casted_stat(buf, numItems, add_stat, cookie);
1527    snprintf(buf, sizeof(buf), "vb_%d:num_open_checkpoint_items", vbucketId);
1528    add_casted_stat(buf, checkpointList.empty() ? 0 :
1529                    checkpointList.back()->getNumItems(),
1530                    add_stat, cookie);
1531    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoints", vbucketId);
1532    add_casted_stat(buf, checkpointList.size(), add_stat, cookie);
1533    snprintf(buf, sizeof(buf), "vb_%d:num_items_for_persistence", vbucketId);
1534    add_casted_stat(buf, getNumItemsForPersistence_UNLOCKED(),
1535                    add_stat, cookie);
1536
1537    cursor_index::iterator tap_it = tapCursors.begin();
1538    for (; tap_it != tapCursors.end(); ++tap_it) {
1539        snprintf(buf, sizeof(buf),
1540                 "vb_%d:%s:cursor_checkpoint_id", vbucketId,
1541                 tap_it->first.c_str());
1542        add_casted_stat(buf, (*(tap_it->second.currentCheckpoint))->getId(),
1543                        add_stat, cookie);
1544    }
1545}
1546