xref: /3.0.3-GA/ep-engine/src/checkpoint.cc (revision 10862e4e)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string>
21#include <utility>
22#include <vector>
23
24#include "checkpoint.h"
25#include "ep_engine.h"
26#define STATWRITER_NAMESPACE checkpoint
27#include "statwriter.h"
28#undef STATWRITER_NAMESPACE
29#include "vbucket.h"
30
31/**
32 * A listener class to update checkpoint related configs at runtime.
33 */
34class CheckpointConfigChangeListener : public ValueChangedListener {
35public:
36    CheckpointConfigChangeListener(CheckpointConfig &c) : config(c) { }
37    virtual ~CheckpointConfigChangeListener() { }
38
39    virtual void sizeValueChanged(const std::string &key, size_t value) {
40        if (key.compare("chk_period") == 0) {
41            config.setCheckpointPeriod(value);
42        } else if (key.compare("chk_max_items") == 0) {
43            config.setCheckpointMaxItems(value);
44        } else if (key.compare("max_checkpoints") == 0) {
45            config.setMaxCheckpoints(value);
46        }
47    }
48
49    virtual void booleanValueChanged(const std::string &key, bool value) {
50        if (key.compare("item_num_based_new_chk") == 0) {
51            config.allowItemNumBasedNewCheckpoint(value);
52        } else if (key.compare("keep_closed_chks") == 0) {
53            config.allowKeepClosedCheckpoints(value);
54        }
55    }
56
57private:
58    CheckpointConfig &config;
59};
60
61Checkpoint::~Checkpoint() {
62    LOG(EXTENSION_LOG_INFO,
63        "Checkpoint %llu for vbucket %d is purged from memory",
64        checkpointId, vbucketId);
65    stats.memOverhead.fetch_sub(memorySize());
66    cb_assert(stats.memOverhead.load() < GIGANTOR);
67}
68
69void Checkpoint::setState(checkpoint_state state) {
70    checkpointState = state;
71}
72
73void Checkpoint::popBackCheckpointEndItem() {
74    if (!toWrite.empty() &&
75        toWrite.back()->getOperation() == queue_op_checkpoint_end) {
76        keyIndex.erase(toWrite.back()->getKey());
77        toWrite.pop_back();
78    }
79}
80
81bool Checkpoint::keyExists(const std::string &key) {
82    return keyIndex.find(key) != keyIndex.end();
83}
84
85queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
86                                     CheckpointManager *checkpointManager) {
87    assert (checkpointState == CHECKPOINT_OPEN);
88    queue_dirty_t rv;
89
90    checkpoint_index::iterator it = keyIndex.find(qi->getKey());
91    // Check if this checkpoint already had an item for the same key.
92    if (it != keyIndex.end()) {
93        rv = EXISTING_ITEM;
94        std::list<queued_item>::iterator currPos = it->second.position;
95        uint64_t currMutationId = it->second.mutation_id;
96        CheckpointCursor &pcursor = checkpointManager->persistenceCursor;
97        queued_item &pqi = *(pcursor.currentPos);
98
99        if (*(pcursor.currentCheckpoint) == this) {
100            // If the existing item is in the left-hand side of the item
101            // pointed by the persistence cursor, decrease the persistence
102            // cursor's offset by 1.
103            const std::string &key = pqi->getKey();
104            checkpoint_index::iterator ita = keyIndex.find(key);
105            if (ita != keyIndex.end()) {
106                uint64_t mutationId = ita->second.mutation_id;
107                if (currMutationId <= mutationId &&
108                    pqi->getOperation() != queue_op_checkpoint_start) {
109                    checkpointManager->decrCursorOffset_UNLOCKED(pcursor, 1);
110                    rv = PERSIST_AGAIN;
111                }
112            }
113            // If the persistence cursor points to the existing item for the
114            // same key,
115            // shift the cursor left by 1.
116            if (pcursor.currentPos == currPos) {
117                checkpointManager->decrCursorPos_UNLOCKED(pcursor);
118            }
119        }
120
121        cursor_index::iterator map_it = checkpointManager->tapCursors.begin();
122        for (; map_it != checkpointManager->tapCursors.end(); ++map_it) {
123
124            if (*(map_it->second.currentCheckpoint) == this) {
125                queued_item &tqi = *(map_it->second.currentPos);
126                const std::string &key = tqi->getKey();
127                checkpoint_index::iterator ita = keyIndex.find(key);
128                if (ita != keyIndex.end()) {
129                    uint64_t mutationId = ita->second.mutation_id;
130                    if (currMutationId <= mutationId &&
131                        tqi->getOperation() != queue_op_checkpoint_start) {
132                        checkpointManager->
133                                  decrCursorOffset_UNLOCKED(map_it->second, 1);
134                    }
135                }
136                // If an TAP cursor points to the existing item for the same
137                // key, shift it left by 1
138                if (map_it->second.currentPos == currPos) {
139                    checkpointManager->decrCursorPos_UNLOCKED(map_it->second);
140                }
141            }
142        }
143
144        toWrite.push_back(qi);
145        // Remove the existing item for the same key from the list.
146        toWrite.erase(currPos);
147    } else {
148        if (qi->getOperation() == queue_op_set ||
149            qi->getOperation() == queue_op_del) {
150            ++numItems;
151        }
152        rv = NEW_ITEM;
153        // Push the new item into the list
154        toWrite.push_back(qi);
155    }
156
157    if (qi->getNKey() > 0) {
158        std::list<queued_item>::iterator last = toWrite.end();
159        // --last is okay as the list is not empty now.
160        index_entry entry = {--last, qi->getBySeqno()};
161        // Set the index of the key to the new item that is pushed back into
162        // the list.
163        keyIndex[qi->getKey()] = entry;
164        if (rv == NEW_ITEM) {
165            size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
166                                  sizeof(queued_item);
167            memOverhead += newEntrySize;
168            stats.memOverhead.fetch_add(newEntrySize);
169            cb_assert(stats.memOverhead.load() < GIGANTOR);
170        }
171    }
172    return rv;
173}
174
175size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
176    size_t numNewItems = 0;
177    size_t newEntryMemOverhead = 0;
178    std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
179
180    LOG(EXTENSION_LOG_INFO,
181        "Collapse the checkpoint %llu into the checkpoint %llu for vbucket %d",
182        pPrevCheckpoint->getId(), checkpointId, vbucketId);
183
184    std::list<queued_item>::iterator itr = toWrite.begin();
185    uint64_t seqno = pPrevCheckpoint->getMutationIdForKey("dummy_key");
186    keyIndex["dummy_key"].mutation_id = seqno;
187    (*itr)->setBySeqno(seqno);
188
189    seqno = pPrevCheckpoint->getMutationIdForKey("checkpoint_start");
190    keyIndex["checkpoint_start"].mutation_id = seqno;
191    ++itr;
192    (*itr)->setBySeqno(seqno);
193
194    for (; rit != pPrevCheckpoint->rend(); ++rit) {
195        const std::string &key = (*rit)->getKey();
196        if ((*rit)->getOperation() != queue_op_del &&
197            (*rit)->getOperation() != queue_op_set) {
198            continue;
199        }
200        checkpoint_index::iterator it = keyIndex.find(key);
201        if (it == keyIndex.end()) {
202            std::list<queued_item>::iterator pos = toWrite.begin();
203            // Skip the first two meta items
204            ++pos; ++pos;
205            toWrite.insert(pos, *rit);
206            index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
207                                                    getMutationIdForKey(key))};
208            keyIndex[key] = entry;
209            newEntryMemOverhead += key.size() + sizeof(index_entry);
210            ++numItems;
211            ++numNewItems;
212        }
213    }
214    memOverhead += newEntryMemOverhead;
215    stats.memOverhead.fetch_add(newEntryMemOverhead);
216    cb_assert(stats.memOverhead.load() < GIGANTOR);
217    return numNewItems;
218}
219
220uint64_t Checkpoint::getMutationIdForKey(const std::string &key) {
221    uint64_t mid = 0;
222    checkpoint_index::iterator it = keyIndex.find(key);
223    if (it != keyIndex.end()) {
224        mid = it->second.mutation_id;
225    }
226    return mid;
227}
228
229CheckpointManager::~CheckpointManager() {
230    LockHolder lh(queueLock);
231    std::list<Checkpoint*>::iterator it = checkpointList.begin();
232    while(it != checkpointList.end()) {
233        delete *it;
234        ++it;
235    }
236}
237
238uint64_t CheckpointManager::getOpenCheckpointId_UNLOCKED() {
239    if (checkpointList.empty()) {
240        return 0;
241    }
242
243    uint64_t id = checkpointList.back()->getId();
244    return checkpointList.back()->getState() == CHECKPOINT_OPEN ? id : id + 1;
245}
246
247uint64_t CheckpointManager::getOpenCheckpointId() {
248    LockHolder lh(queueLock);
249    return getOpenCheckpointId_UNLOCKED();
250}
251
252uint64_t CheckpointManager::getLastClosedCheckpointId_UNLOCKED() {
253    if (!isCollapsedCheckpoint) {
254        uint64_t id = getOpenCheckpointId_UNLOCKED();
255        lastClosedCheckpointId = id > 0 ? (id - 1) : 0;
256    }
257    return lastClosedCheckpointId;
258}
259
260uint64_t CheckpointManager::getLastClosedCheckpointId() {
261    LockHolder lh(queueLock);
262    return getLastClosedCheckpointId_UNLOCKED();
263}
264
265void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
266    if (!checkpointList.empty()) {
267        // Update the checkpoint_start item with the new Id.
268        std::list<queued_item>::iterator it =
269            ++(checkpointList.back()->begin());
270        (*it)->setRevSeqno(id);
271        if (checkpointList.back()->getId() == 0) {
272            (*it)->setBySeqno(lastBySeqNo + 1);
273        }
274        checkpointList.back()->setId(id);
275        LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %llu "
276            "for vbucket %d, bySeqno is %llu, max is %llu", id, vbucketId,
277            (*it)->getBySeqno(), lastBySeqNo);
278
279    }
280}
281
282bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
283    // This is just for making sure that the current checkpoint should be
284    // closed.
285    if (!checkpointList.empty() &&
286        checkpointList.back()->getState() == CHECKPOINT_OPEN) {
287        closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
288    }
289
290    LOG(EXTENSION_LOG_INFO, "Create a new open checkpoint %llu for vbucket %d",
291        id, vbucketId);
292
293    Checkpoint *checkpoint = new Checkpoint(stats, id, vbucketId,
294                                            CHECKPOINT_OPEN);
295    // Add a dummy item into the new checkpoint, so that any cursor referring
296    // to the actual first
297    // item in this new checkpoint can be safely shifted left by 1 if the
298    // first item is removed
299    // and pushed into the tail.
300    queued_item qi = createCheckpointItem(0, 0xffff, queue_op_empty);
301    checkpoint->queueDirty(qi, this);
302
303    // This item represents the start of the new checkpoint and is also sent to the slave node.
304    qi = createCheckpointItem(id, vbucketId, queue_op_checkpoint_start);
305    checkpoint->queueDirty(qi, this);
306    ++numItems;
307    checkpointList.push_back(checkpoint);
308
309    return true;
310}
311
312bool CheckpointManager::addNewCheckpoint(uint64_t id) {
313    LockHolder lh(queueLock);
314    return addNewCheckpoint_UNLOCKED(id);
315}
316
317bool CheckpointManager::closeOpenCheckpoint_UNLOCKED(uint64_t id) {
318    if (checkpointList.empty()) {
319        return false;
320    }
321    if (id != checkpointList.back()->getId() ||
322        checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
323        return true;
324    }
325
326    LOG(EXTENSION_LOG_INFO, "Close the open checkpoint %llu for vbucket %d",
327        id, vbucketId);
328
329    // This item represents the end of the current open checkpoint and is sent to the slave node.
330    queued_item qi = createCheckpointItem(id, vbucketId,
331                                          queue_op_checkpoint_end);
332    checkpointList.back()->queueDirty(qi, this);
333    ++numItems;
334    checkpointList.back()->setState(CHECKPOINT_CLOSED);
335    return true;
336}
337
338bool CheckpointManager::closeOpenCheckpoint(uint64_t id) {
339    LockHolder lh(queueLock);
340    return closeOpenCheckpoint_UNLOCKED(id);
341}
342
343void CheckpointManager::registerPersistenceCursor() {
344    LockHolder lh(queueLock);
345    cb_assert(!checkpointList.empty());
346    persistenceCursor.currentCheckpoint = checkpointList.begin();
347    persistenceCursor.currentPos = checkpointList.front()->begin();
348    checkpointList.front()->registerCursorName(persistenceCursor.name);
349}
350
351bool CheckpointManager::registerTAPCursor(const std::string &name,
352                                          uint64_t checkpointId,
353                                          bool alwaysFromBeginning) {
354    LockHolder lh(queueLock);
355    return registerTAPCursor_UNLOCKED(name,
356                                      checkpointId,
357                                      alwaysFromBeginning);
358}
359
360uint64_t CheckpointManager::registerTAPCursorBySeqno(const std::string &name,
361                                                     uint64_t startBySeqno,
362                                                     uint64_t endBySeqno) {
363    LockHolder lh(queueLock);
364    cb_assert(!checkpointList.empty());
365    cb_assert(checkpointList.back()->getHighSeqno() >= startBySeqno);
366
367    removeTAPCursor_UNLOCKED(name);
368
369    size_t skipped = 0;
370    uint64_t seqnoToStart = std::numeric_limits<uint64_t>::max();
371    bool needToFindStartSeqno =
372        startBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
373    bool needToFindEndSeqno =
374        endBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
375
376    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
377    for (; itr != checkpointList.end(); ++itr) {
378        uint64_t en = (*itr)->getHighSeqno();
379        uint64_t st = (*itr)->getLowSeqno();
380        if (needToFindStartSeqno) {
381            if (startBySeqno < st) {
382                tapCursors[name] = CheckpointCursor(name, itr, (*itr)->begin(),
383                                                    skipped, false);
384                (*itr)->registerCursorName(name);
385                seqnoToStart = (*itr)->getLowSeqno();
386                needToFindStartSeqno = false;
387            } else if (startBySeqno == st) {
388                std::list<queued_item>::iterator iitr = (*itr)->begin();
389
390                //Advance the iterator to point to the first item in the
391                //checkpoint list
392                std::advance(iitr, 2);
393                skipped = 2;
394
395                tapCursors[name] = CheckpointCursor(name, itr, iitr, skipped,
396                                                    false);
397                (*itr)->registerCursorName(name);
398                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno()) + 1;
399                needToFindStartSeqno = false;
400            } else if (startBySeqno <= en) {
401                std::list<queued_item>::iterator iitr = (*itr)->begin();
402                while (++iitr != (*itr)->end() &&
403                       startBySeqno > static_cast<uint64_t>((*iitr)->getBySeqno())) {
404                    skipped++;
405                }
406
407                if (iitr == (*itr)->end() ||
408                    startBySeqno < static_cast<uint64_t>((*iitr)->getBySeqno())) {
409                    --iitr;
410                }
411
412                size_t remaining = (numItems > skipped) ? numItems - skipped : 0;
413                tapCursors[name] = CheckpointCursor(name, itr, iitr, remaining,
414                                                    false);
415                (*itr)->registerCursorName(name);
416                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno());
417                needToFindStartSeqno = false;
418            } else {
419                skipped += (*itr)->getNumItems() + 2;
420            }
421        }
422        if (needToFindEndSeqno) {
423            if ((*itr)->getNumItems() > 0 && endBySeqno <= en) {
424                if ((*itr)->getState() == CHECKPOINT_OPEN) {
425                    // Closed the open checkpoint and create a new one by force.
426                    checkOpenCheckpoint_UNLOCKED(true, true);
427                }
428                needToFindEndSeqno = false;
429            }
430        }
431        if (!needToFindStartSeqno && !needToFindEndSeqno) {
432            break;
433        }
434    }
435
436    if (seqnoToStart == std::numeric_limits<uint64_t>::max()) {
437        /*
438         * We should never get here since this would mean that the sequence number
439         * we are looking for is higher than anything currently assigned and there
440         * is already an assert above for this case.
441         */
442        LOG(EXTENSION_LOG_WARNING, "Cursor not registered into vb %d "
443            " for stream '%s' because seqno %llu is too high",
444            vbucketId, name.c_str(), startBySeqno);
445    }
446    return seqnoToStart;
447}
448
449bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
450                                                   uint64_t checkpointId,
451                                                   bool alwaysFromBeginning) {
452    cb_assert(!checkpointList.empty());
453
454    bool found = false;
455    std::list<Checkpoint*>::iterator it = checkpointList.begin();
456    for (; it != checkpointList.end(); ++it) {
457        if (checkpointId == (*it)->getId()) {
458            found = true;
459            break;
460        }
461    }
462
463    LOG(EXTENSION_LOG_INFO,
464        "Register the tap cursor with the name \"%s\" for vbucket %d",
465        name.c_str(), vbucketId);
466
467    // If the tap cursor exists, remove its name from the checkpoint that is
468    // currently referenced by the tap cursor.
469    cursor_index::iterator map_it = tapCursors.find(name);
470    if (map_it != tapCursors.end()) {
471        (*(map_it->second.currentCheckpoint))->removeCursorName(name);
472    }
473
474    if (!found) {
475        for (it = checkpointList.begin(); it != checkpointList.end(); ++it) {
476            if (pCursorPreCheckpointId < (*it)->getId() ||
477                pCursorPreCheckpointId == 0) {
478                break;
479            }
480        }
481
482        LOG(EXTENSION_LOG_DEBUG,
483            "Checkpoint %llu for vbucket %d doesn't exist in memory. "
484            "Set the cursor with the name \"%s\" to checkpoint %d.\n",
485            checkpointId, vbucketId, name.c_str(), (*it)->getId());
486
487        cb_assert(it != checkpointList.end());
488
489        size_t offset = 0;
490        std::list<Checkpoint*>::iterator pos = checkpointList.begin();
491        for (; pos != it; ++pos) {
492            offset += (*pos)->getNumItems() + 2;
493        }
494
495        tapCursors[name] = CheckpointCursor(name, it, (*it)->begin(), offset,
496                                            true);
497        (*it)->registerCursorName(name);
498    } else {
499        size_t offset = 0;
500        std::list<queued_item>::iterator curr;
501
502        LOG(EXTENSION_LOG_DEBUG,
503            "Checkpoint %llu for vbucket %d exists in memory. "
504            "Set the cursor with the name \"%s\" to the checkpoint %llu\n",
505            checkpointId, vbucketId, name.c_str(), checkpointId);
506
507        if (!alwaysFromBeginning &&
508            map_it != tapCursors.end() &&
509            (*(map_it->second.currentCheckpoint))->getId() == (*it)->getId()) {
510            // If the cursor is currently in the checkpoint to start with,
511            // simply start from
512            // its current position.
513            curr = map_it->second.currentPos;
514            offset = map_it->second.offset;
515        } else {
516            // Set the cursor's position to the begining of the checkpoint to
517            // start with
518            curr = (*it)->begin();
519            std::list<Checkpoint*>::iterator pos = checkpointList.begin();
520            for (; pos != it; ++pos) {
521                offset += (*pos)->getNumItems() + 2;
522                // 2 is for checkpoint start and end items.
523            }
524        }
525
526        tapCursors[name] = CheckpointCursor(name, it, curr, offset, true);
527        // Register the tap cursor's name to the checkpoint.
528        (*it)->registerCursorName(name);
529    }
530
531    return found;
532}
533
534bool CheckpointManager::removeTAPCursor(const std::string &name) {
535    LockHolder lh(queueLock);
536    return removeTAPCursor_UNLOCKED(name);
537}
538
539bool CheckpointManager::removeTAPCursor_UNLOCKED(const std::string &name) {
540    cursor_index::iterator it = tapCursors.find(name);
541    if (it == tapCursors.end()) {
542        return false;
543    }
544
545    LOG(EXTENSION_LOG_INFO,
546        "Remove the checkpoint cursor with the name \"%s\" from vbucket %d",
547        name.c_str(), vbucketId);
548
549    // We can simply remove the cursor's name from the checkpoint to which it
550    // currently belongs,
551    // by calling
552    // (*(it->second.currentCheckpoint))->removeCursorName(name);
553    // However, we just want to do more sanity checks by looking at each
554    // checkpoint. This won't
555    // cause much overhead because the max number of checkpoints allowed per
556    // vbucket is small.
557    std::list<Checkpoint*>::iterator cit = checkpointList.begin();
558    for (; cit != checkpointList.end(); ++cit) {
559        (*cit)->removeCursorName(name);
560    }
561
562    tapCursors.erase(it);
563    return true;
564}
565
566uint64_t CheckpointManager::getCheckpointIdForTAPCursor(
567                                                     const std::string &name) {
568    LockHolder lh(queueLock);
569    cursor_index::iterator it = tapCursors.find(name);
570    if (it == tapCursors.end()) {
571        return 0;
572    }
573
574    return (*(it->second.currentCheckpoint))->getId();
575}
576
577size_t CheckpointManager::getNumOfTAPCursors() {
578    LockHolder lh(queueLock);
579    return tapCursors.size();
580}
581
582size_t CheckpointManager::getNumCheckpoints() {
583    LockHolder lh(queueLock);
584    return checkpointList.size();
585}
586
587std::list<std::string> CheckpointManager::getTAPCursorNames() {
588    LockHolder lh(queueLock);
589    std::list<std::string> cursor_names;
590    cursor_index::iterator tap_it = tapCursors.begin();
591        for (; tap_it != tapCursors.end(); ++tap_it) {
592        cursor_names.push_back((tap_it->first));
593    }
594    return cursor_names;
595}
596
597bool CheckpointManager::isCheckpointCreationForHighMemUsage(
598                                              const RCPtr<VBucket> &vbucket) {
599    bool forceCreation = false;
600    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
601    // pesistence and tap cursors are all currently in the open checkpoint?
602    bool allCursorsInOpenCheckpoint =
603        (tapCursors.size() + 1) == checkpointList.back()->getNumberOfCursors();
604
605    if (memoryUsed > stats.mem_high_wat &&
606        allCursorsInOpenCheckpoint &&
607        (checkpointList.back()->getNumItems() >= MIN_CHECKPOINT_ITEMS ||
608         checkpointList.back()->getNumItems() == vbucket->ht.getNumInMemoryItems())) {
609        forceCreation = true;
610    }
611    return forceCreation;
612}
613
614size_t CheckpointManager::removeClosedUnrefCheckpoints(
615                                              const RCPtr<VBucket> &vbucket,
616                                              bool &newOpenCheckpointCreated) {
617
618    // This function is executed periodically by the non-IO dispatcher.
619    LockHolder lh(queueLock);
620    cb_assert(vbucket);
621    uint64_t oldCheckpointId = 0;
622    bool canCreateNewCheckpoint = false;
623    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
624        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
625         checkpointList.front()->getNumberOfCursors() == 0)) {
626        canCreateNewCheckpoint = true;
627    }
628    if (vbucket->getState() == vbucket_state_active &&
629        canCreateNewCheckpoint) {
630
631        bool forceCreation = isCheckpointCreationForHighMemUsage(vbucket);
632        // Check if this master active vbucket needs to create a new open
633        // checkpoint.
634        oldCheckpointId = checkOpenCheckpoint_UNLOCKED(forceCreation, true);
635    }
636    newOpenCheckpointCreated = oldCheckpointId > 0;
637
638    if (checkpointConfig.canKeepClosedCheckpoints()) {
639        double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
640        if (memoryUsed < stats.mem_high_wat &&
641            checkpointList.size() <= checkpointConfig.getMaxCheckpoints()) {
642            return 0;
643        }
644    }
645
646    size_t numUnrefItems = 0;
647    size_t numMetaItems = 0;
648    size_t numCheckpointsRemoved = 0;
649    std::list<Checkpoint*> unrefCheckpointList;
650    std::list<Checkpoint*>::iterator it = checkpointList.begin();
651    for (; it != checkpointList.end(); ++it) {
652        removeInvalidCursorsOnCheckpoint(*it);
653        if ((*it)->getNumberOfCursors() > 0 ||
654            (*it)->getId() > pCursorPreCheckpointId) {
655            break;
656        } else {
657            numUnrefItems += (*it)->getNumItems();
658            numMetaItems +=  2; // 2 is for checkpoint start and end items.
659            ++numCheckpointsRemoved;
660            if (checkpointConfig.canKeepClosedCheckpoints() &&
661                (checkpointList.size() - numCheckpointsRemoved) <=
662                 checkpointConfig.getMaxCheckpoints()) {
663                // Collect unreferenced closed checkpoints until the number
664                // of checkpoints is
665                // equal to the number of max checkpoints allowed.
666                ++it;
667                break;
668            }
669        }
670    }
671    numItems.fetch_sub(numUnrefItems + numMetaItems);
672    if (numUnrefItems > 0) {
673        decrCursorOffset_UNLOCKED(persistenceCursor, numUnrefItems);
674        cursor_index::iterator map_it = tapCursors.begin();
675        for (; map_it != tapCursors.end(); ++map_it) {
676            decrCursorOffset_UNLOCKED(map_it->second, numUnrefItems);
677        }
678    }
679    unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList,
680                               checkpointList.begin(), it);
681
682    // If any cursor on a replica vbucket or downstream active vbucket
683    // receiving checkpoints from
684    // the upstream master is very slow and causes more closed checkpoints in
685    // memory, collapse those closed checkpoints into a single one to reduce
686    // the memory overhead.
687    if (!checkpointConfig.canKeepClosedCheckpoints() &&
688        vbucket->getState() == vbucket_state_replica)
689    {
690        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
691        collapseClosedCheckpoints(unrefCheckpointList);
692        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
693        if (curr_remains > new_remains) {
694            size_t diff = curr_remains - new_remains;
695            stats.decrDiskQueueSize(diff);
696            vbucket->decrDirtyQueueSize(diff);
697        } else if (curr_remains < new_remains) {
698            size_t diff = new_remains - curr_remains;
699            stats.diskQueueSize.fetch_add(diff);
700            vbucket->dirtyQueueSize.fetch_add(diff);
701        }
702    }
703    lh.unlock();
704
705    std::list<Checkpoint*>::iterator chkpoint_it = unrefCheckpointList.begin();
706    for (; chkpoint_it != unrefCheckpointList.end(); ++chkpoint_it) {
707        delete *chkpoint_it;
708    }
709
710    return numUnrefItems;
711}
712
713void CheckpointManager::removeInvalidCursorsOnCheckpoint(
714                                                     Checkpoint *pCheckpoint) {
715    std::list<std::string> invalidCursorNames;
716    const std::set<std::string> &cursors = pCheckpoint->getCursorNameList();
717    std::set<std::string>::const_iterator cit = cursors.begin();
718    for (; cit != cursors.end(); ++cit) {
719        // Check it with persistence cursor
720        if ((*cit).compare(persistenceCursor.name) == 0) {
721            if (pCheckpoint != *(persistenceCursor.currentCheckpoint)) {
722                invalidCursorNames.push_back(*cit);
723            }
724        } else { // Check it with tap cursors
725            cursor_index::iterator mit = tapCursors.find(*cit);
726            if (mit == tapCursors.end() ||
727                pCheckpoint != *(mit->second.currentCheckpoint)) {
728                invalidCursorNames.push_back(*cit);
729            }
730        }
731    }
732
733    std::list<std::string>::iterator it = invalidCursorNames.begin();
734    for (; it != invalidCursorNames.end(); ++it) {
735        pCheckpoint->removeCursorName(*it);
736    }
737}
738
739void CheckpointManager::collapseClosedCheckpoints(
740                                      std::list<Checkpoint*> &collapsedChks) {
741    // If there are one open checkpoint and more than one closed checkpoint,
742    // collapse those
743    // closed checkpoints into one checkpoint to reduce the memory overhead.
744    if (checkpointList.size() > 2) {
745        std::map<std::string, std::pair<uint64_t, bool> > slowCursors;
746        std::set<std::string> fastCursors;
747        std::list<Checkpoint*>::iterator lastClosedChk = checkpointList.end();
748        --lastClosedChk; --lastClosedChk; // Move to the last closed chkpt.
749        std::set<std::string>::iterator nitr =
750                (*lastClosedChk)->getCursorNameList().begin();
751        // Check if there are any cursors in the last closed checkpoint, which
752        // haven't yet visited any regular items belonging to the last closed
753        // checkpoint. If so, then we should skip collapsing checkpoints until
754        // those cursors move to the first regular item. Otherwise, those cursors will
755        // visit old items from collapsed checkpoints again.
756        for (; nitr != (*lastClosedChk)->getCursorNameList().end(); ++nitr) {
757            if (nitr->compare(persistenceCursor.name) == 0) {
758                enum queue_operation qop = (*(persistenceCursor.currentPos))->getOperation();
759                if (qop == queue_op_empty || qop == queue_op_checkpoint_start) {
760                    return;
761                }
762            } else {
763                cursor_index::iterator cc = tapCursors.find(*nitr);
764                if (cc == tapCursors.end()) {
765                    continue;
766                }
767                enum queue_operation qop = (*(cc->second.currentPos))->getOperation();
768                if (qop ==  queue_op_empty || qop == queue_op_checkpoint_start) {
769                    return;
770                }
771            }
772        }
773
774        fastCursors.insert((*lastClosedChk)->getCursorNameList().begin(),
775                           (*lastClosedChk)->getCursorNameList().end());
776        std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
777        ++rit; ++rit; //Move to the second last closed checkpoint.
778        size_t numDuplicatedItems = 0, numMetaItems = 0;
779        for (; rit != checkpointList.rend(); ++rit) {
780            size_t numAddedItems = (*lastClosedChk)->mergePrevCheckpoint(*rit);
781            numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
782            numMetaItems += 2; // checkpoint start and end meta items
783
784            std::set<std::string>::iterator nameItr =
785                (*rit)->getCursorNameList().begin();
786            for (; nameItr != (*rit)->getCursorNameList().end(); ++nameItr) {
787                if (nameItr->compare(persistenceCursor.name) == 0) {
788                    const std::string& key =
789                                 (*(persistenceCursor.currentPos))->getKey();
790                    bool cursor_on_chk_start = false;
791                    if ((*(persistenceCursor.currentPos))->getOperation() ==
792                        queue_op_checkpoint_start) {
793                        cursor_on_chk_start = true;
794                    }
795                    slowCursors[*nameItr] =
796                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
797                } else {
798                    cursor_index::iterator cc =
799                        tapCursors.find(*nameItr);
800                    const std::string& key = (*(cc->second.currentPos))->
801                                             getKey();
802                    bool cursor_on_chk_start = false;
803                    if ((*(cc->second.currentPos))->getOperation() ==
804                        queue_op_checkpoint_start) {
805                        cursor_on_chk_start = true;
806                    }
807                    slowCursors[*nameItr] =
808                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
809                }
810            }
811        }
812        putCursorsInCollapsedChk(slowCursors, lastClosedChk);
813
814        numItems.fetch_sub(numDuplicatedItems + numMetaItems);
815        Checkpoint *pOpenCheckpoint = checkpointList.back();
816        const std::set<std::string> &openCheckpointCursors =
817                                    pOpenCheckpoint->getCursorNameList();
818        fastCursors.insert(openCheckpointCursors.begin(),
819                           openCheckpointCursors.end());
820        std::set<std::string>::const_iterator cit = fastCursors.begin();
821        // Update the offset of each fast cursor.
822        for (; cit != fastCursors.end(); ++cit) {
823            if ((*cit).compare(persistenceCursor.name) == 0) {
824                decrCursorOffset_UNLOCKED(persistenceCursor,
825                                          numDuplicatedItems);
826            } else {
827                cursor_index::iterator mit = tapCursors.find(*cit);
828                if (mit != tapCursors.end()) {
829                    decrCursorOffset_UNLOCKED(mit->second,
830                                              numDuplicatedItems);
831                }
832            }
833        }
834        collapsedChks.splice(collapsedChks.end(), checkpointList,
835                             checkpointList.begin(),  lastClosedChk);
836    }
837}
838
839bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb, queued_item& qi,
840                                   bool genSeqno) {
841    LockHolder lh(queueLock);
842
843    cb_assert(vb);
844    bool canCreateNewCheckpoint = false;
845    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
846        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
847         checkpointList.front()->getNumberOfCursors() == 0)) {
848        canCreateNewCheckpoint = true;
849    }
850    if (vb->getState() == vbucket_state_active && canCreateNewCheckpoint) {
851        // Only the master active vbucket can create a next open checkpoint.
852        checkOpenCheckpoint_UNLOCKED(false, true);
853    }
854
855    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
856        addNewCheckpoint_UNLOCKED(checkpointList.back()->getId() + 1);
857    }
858
859    cb_assert(checkpointList.back()->getState() == CHECKPOINT_OPEN);
860
861    if (genSeqno) {
862        qi->setBySeqno(nextBySeqno());
863    } else {
864        lastBySeqNo = qi->getBySeqno();
865    }
866
867    queue_dirty_t result = checkpointList.back()->queueDirty(qi, this);
868    if (result == NEW_ITEM) {
869        ++numItems;
870    }
871
872    if (result != EXISTING_ITEM) {
873        ++stats.totalEnqueued;
874        ++stats.diskQueueSize;
875        vb->doStatsForQueueing(*qi, qi->size());
876    }
877
878    return result != EXISTING_ITEM;
879}
880
881void CheckpointManager::getAllItemsForPersistence(
882                                             std::vector<queued_item> &items) {
883    LockHolder lh(queueLock);
884    // Get all the items up to the end of the current open checkpoint.
885    while (incrCursor(persistenceCursor)) {
886        items.push_back(*(persistenceCursor.currentPos));
887    }
888
889    LOG(EXTENSION_LOG_DEBUG,
890        "Grab %ld items through the persistence cursor from vbucket %d",
891        items.size(), vbucketId);
892}
893
894queued_item CheckpointManager::nextItem(const std::string &name,
895                                        bool &isLastMutationItem,
896                                        uint64_t &endSeqno) {
897    LockHolder lh(queueLock);
898    cursor_index::iterator it = tapCursors.find(name);
899    if (it == tapCursors.end()) {
900        LOG(EXTENSION_LOG_WARNING,
901        "The cursor with name \"%s\" is not found in the checkpoint of vbucket"
902        "%d.\n", name.c_str(), vbucketId);
903        queued_item qi(new Item(std::string(""), 0xffff,
904                                queue_op_empty, 0, 0));
905        return qi;
906    }
907    if (checkpointList.back()->getId() == 0) {
908        LOG(EXTENSION_LOG_INFO,
909            "VBucket %d is still in backfill phase that doesn't allow "
910            " the tap cursor to fetch an item from it's current checkpoint",
911            vbucketId);
912        queued_item qi(new Item(std::string(""), 0xffff,
913                                queue_op_empty, 0, 0));
914        return qi;
915    }
916
917    CheckpointCursor &cursor = it->second;
918    if (incrCursor(cursor)) {
919        isLastMutationItem = isLastMutationItemInCheckpoint(cursor);
920        if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_CLOSED) {
921            endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
922        } else {
923            endSeqno = -1;
924        }
925        return *(cursor.currentPos);
926    } else {
927        isLastMutationItem = false;
928        if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_CLOSED) {
929            endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
930        } else {
931            endSeqno = -1;
932        }
933        queued_item qi(new Item(std::string(""), 0xffff,
934                                queue_op_empty, 0, 0));
935        return qi;
936    }
937}
938
939bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
940    if (++(cursor.currentPos) != (*(cursor.currentCheckpoint))->end()) {
941        queued_item &qi = *(cursor.currentPos);
942        if (qi->getOperation() != queue_op_checkpoint_start &&
943            qi->getOperation() != queue_op_checkpoint_end) {
944            ++(cursor.offset);
945        }
946        return true;
947    } else if (!moveCursorToNextCheckpoint(cursor)) {
948        --(cursor.currentPos);
949        return false;
950    }
951    return incrCursor(cursor);
952}
953
954void CheckpointManager::clear(vbucket_state_t vbState) {
955    LockHolder lh(queueLock);
956    std::list<Checkpoint*>::iterator it = checkpointList.begin();
957    // Remove all the checkpoints.
958    while(it != checkpointList.end()) {
959        delete *it;
960        ++it;
961    }
962    checkpointList.clear();
963    numItems = 0;
964
965    uint64_t checkpointId = vbState == vbucket_state_active ? 1 : 0;
966    // Add a new open checkpoint.
967    addNewCheckpoint_UNLOCKED(checkpointId);
968    resetCursors();
969}
970
971void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
972    // Reset the persistence cursor.
973    if (resetPersistenceCursor) {
974        persistenceCursor.currentCheckpoint = checkpointList.begin();
975        persistenceCursor.currentPos = checkpointList.front()->begin();
976        persistenceCursor.offset = 0;
977        checkpointList.front()->registerCursorName(persistenceCursor.name);
978    }
979
980    // Reset all the TAP cursors.
981    cursor_index::iterator cit = tapCursors.begin();
982    for (; cit != tapCursors.end(); ++cit) {
983        cit->second.currentCheckpoint = checkpointList.begin();
984        cit->second.currentPos = checkpointList.front()->begin();
985        cit->second.offset = 0;
986        checkpointList.front()->registerCursorName(cit->second.name);
987    }
988}
989
990void CheckpointManager::resetTAPCursors(const std::list<std::string> &cursors){
991    LockHolder lh(queueLock);
992    std::list<std::string>::const_iterator it = cursors.begin();
993    for (; it != cursors.end(); ++it) {
994        registerTAPCursor_UNLOCKED(*it, getOpenCheckpointId_UNLOCKED(), true);
995    }
996}
997
998bool CheckpointManager::moveCursorToNextCheckpoint(CheckpointCursor &cursor) {
999    if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_OPEN) {
1000        return false;
1001    } else if ((*(cursor.currentCheckpoint))->getState() ==
1002                                                           CHECKPOINT_CLOSED) {
1003        std::list<Checkpoint*>::iterator currCheckpoint =
1004                                                      cursor.currentCheckpoint;
1005        if (++currCheckpoint == checkpointList.end()) {
1006            return false;
1007        }
1008    }
1009
1010    // Remove the cursor's name from its current checkpoint.
1011    (*(cursor.currentCheckpoint))->removeCursorName(cursor.name);
1012    // Move the cursor to the next checkpoint.
1013    ++(cursor.currentCheckpoint);
1014    cursor.currentPos = (*(cursor.currentCheckpoint))->begin();
1015    // Register the cursor's name to its new current checkpoint.
1016    (*(cursor.currentCheckpoint))->registerCursorName(cursor.name);
1017    return true;
1018}
1019
1020size_t CheckpointManager::getNumOpenChkItems() {
1021    LockHolder lh(queueLock);
1022    if (checkpointList.empty()) {
1023        return 0;
1024    }
1025    return checkpointList.back()->getNumItems() + 1;
1026}
1027
1028uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation,
1029                                                         bool timeBound) {
1030    int checkpoint_id = 0;
1031
1032    timeBound = timeBound &&
1033                (ep_real_time() - checkpointList.back()->getCreationTime()) >=
1034                checkpointConfig.getCheckpointPeriod();
1035    // Create the new open checkpoint if any of the following conditions is
1036    // satisfied:
1037    // (1) force creation due to online update or high memory usage
1038    // (2) current checkpoint is reached to the max number of items allowed.
1039    // (3) time elapsed since the creation of the current checkpoint is greater
1040    //     than the threshold
1041    if (forceCreation ||
1042        (checkpointConfig.isItemNumBasedNewCheckpoint() &&
1043         checkpointList.back()->getNumItems() >=
1044         checkpointConfig.getCheckpointMaxItems()) ||
1045        (checkpointList.back()->getNumItems() > 0 && timeBound)) {
1046
1047        checkpoint_id = checkpointList.back()->getId();
1048        closeOpenCheckpoint_UNLOCKED(checkpoint_id);
1049        addNewCheckpoint_UNLOCKED(checkpoint_id + 1);
1050    }
1051    return checkpoint_id;
1052}
1053
1054bool CheckpointManager::eligibleForEviction(const std::string &key) {
1055    LockHolder lh(queueLock);
1056    uint64_t smallest_mid;
1057
1058    // Get the mutation id of the item pointed by the slowest cursor.
1059    // This won't cause much overhead as the number of cursors per vbucket is
1060    // usually bounded to 3 (persistence cursor + 2 replicas).
1061    const std::string &pkey = (*(persistenceCursor.currentPos))->getKey();
1062    smallest_mid = (*(persistenceCursor.currentCheckpoint))->
1063                                                     getMutationIdForKey(pkey);
1064    cursor_index::iterator mit = tapCursors.begin();
1065    for (; mit != tapCursors.end(); ++mit) {
1066        const std::string &tkey = (*(mit->second.currentPos))->getKey();
1067        uint64_t mid = (*(mit->second.currentCheckpoint))->
1068                                                     getMutationIdForKey(tkey);
1069        if (mid < smallest_mid) {
1070            smallest_mid = mid;
1071        }
1072    }
1073
1074    bool can_evict = true;
1075    std::list<Checkpoint*>::reverse_iterator it = checkpointList.rbegin();
1076    for (; it != checkpointList.rend(); ++it) {
1077        uint64_t mid = (*it)->getMutationIdForKey(key);
1078        if (mid == 0) { // key doesn't exist in a checkpoint.
1079            continue;
1080        }
1081        if (smallest_mid < mid) { // The slowest cursor is still
1082            can_evict = false;    //sitting behind a given key.
1083            break;
1084        }
1085    }
1086
1087    return can_evict;
1088}
1089
1090size_t CheckpointManager::getNumItemsForTAPConnection(
1091                                                     const std::string &name) {
1092    LockHolder lh(queueLock);
1093    size_t remains = 0;
1094    cursor_index::iterator it = tapCursors.find(name);
1095    if (it != tapCursors.end()) {
1096        remains = (numItems >= it->second.offset) ?
1097                   numItems - it->second.offset : 0;
1098    }
1099    return remains;
1100}
1101
1102size_t CheckpointManager::getNumItemsForPersistence_UNLOCKED() {
1103    size_t num_items = numItems;
1104    size_t offset = persistenceCursor.offset;
1105
1106    // Get the number of meta items that can be skipped by the persistence
1107    // cursor.
1108    size_t meta_items = 0;
1109    std::list<Checkpoint*>::iterator curr_chk =
1110                                           persistenceCursor.currentCheckpoint;
1111    for (; curr_chk != checkpointList.end(); ++curr_chk) {
1112        if (curr_chk == persistenceCursor.currentCheckpoint) {
1113            std::list<queued_item>::iterator curr_pos =
1114                                                  persistenceCursor.currentPos;
1115            ++curr_pos;
1116            if (curr_pos == (*curr_chk)->end()) {
1117                continue;
1118            }
1119            if ((*curr_pos)->getOperation() == queue_op_checkpoint_start) {
1120                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1121                    meta_items += 2;
1122                } else {
1123                    ++meta_items;
1124                }
1125            } else {
1126                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1127                    ++meta_items;
1128                }
1129            }
1130        } else {
1131            if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1132                meta_items += 2;
1133            } else {
1134                ++meta_items;
1135            }
1136        }
1137    }
1138
1139    offset += meta_items;
1140    return num_items > offset ? num_items - offset : 0;
1141}
1142
1143void CheckpointManager::decrTapCursorFromCheckpointEnd(
1144                                                    const std::string &name) {
1145    LockHolder lh(queueLock);
1146    cursor_index::iterator it = tapCursors.find(name);
1147    if (it != tapCursors.end() &&
1148        (*(it->second.currentPos))->getOperation() ==
1149        queue_op_checkpoint_end) {
1150        decrCursorPos_UNLOCKED(it->second);
1151    }
1152}
1153
1154uint64_t CheckpointManager::getMutationIdForKey(uint64_t chk_id,
1155                                                std::string key) {
1156    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
1157    for (; itr != checkpointList.end(); ++itr) {
1158        if (chk_id == (*itr)->getId()) {
1159            return (*itr)->getMutationIdForKey(key);
1160        }
1161    }
1162    return 0;
1163}
1164
1165bool CheckpointManager::isLastMutationItemInCheckpoint(
1166                                                   CheckpointCursor &cursor) {
1167    std::list<queued_item>::iterator it = cursor.currentPos;
1168    ++it;
1169    if (it == (*(cursor.currentCheckpoint))->end() ||
1170        (*it)->getOperation() == queue_op_checkpoint_end) {
1171        return true;
1172    }
1173    return false;
1174}
1175
1176void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
1177                                               const RCPtr<VBucket> &vbucket) {
1178    LockHolder lh(queueLock);
1179
1180    // Ignore CHECKPOINT_START message with ID 0 as 0 is reserved for
1181    // representing backfill.
1182    if (id == 0) {
1183        return;
1184    }
1185    // If the replica receives a checkpoint start message right after backfill
1186    // completion, simply set the current open checkpoint id to the one
1187    // received from the active vbucket.
1188    if (checkpointList.back()->getId() == 0) {
1189        setOpenCheckpointId_UNLOCKED(id);
1190        resetCursors(false);
1191        return;
1192    }
1193
1194    std::list<Checkpoint*>::iterator it = checkpointList.begin();
1195    // Check if a checkpoint exists with ID >= id.
1196    while (it != checkpointList.end()) {
1197        if (id <= (*it)->getId()) {
1198            break;
1199        }
1200        ++it;
1201    }
1202
1203    if (it == checkpointList.end()) {
1204        if ((checkpointList.back()->getId() + 1) < id) {
1205            isCollapsedCheckpoint = true;
1206            uint64_t oid = getOpenCheckpointId_UNLOCKED();
1207            lastClosedCheckpointId = oid > 0 ? (oid - 1) : 0;
1208        } else if ((checkpointList.back()->getId() + 1) == id) {
1209            isCollapsedCheckpoint = false;
1210        }
1211        if (checkpointList.back()->getState() == CHECKPOINT_OPEN &&
1212            checkpointList.back()->getNumItems() == 0) {
1213            // If the current open checkpoint doesn't have any items, simply
1214            // set its id to
1215            // the one from the master node.
1216            setOpenCheckpointId_UNLOCKED(id);
1217            // Reposition all the cursors in the open checkpoint to the
1218            // begining position so that a checkpoint_start message can be
1219            // sent again with the correct id.
1220            const std::set<std::string> &cursors = checkpointList.back()->
1221                                                   getCursorNameList();
1222            std::set<std::string>::const_iterator cit = cursors.begin();
1223            for (; cit != cursors.end(); ++cit) {
1224                if ((*cit).compare(persistenceCursor.name) == 0) {
1225                    // Persistence cursor
1226                    continue;
1227                } else { // TAP cursors
1228                    cursor_index::iterator mit = tapCursors.find(*cit);
1229                    mit->second.currentPos = checkpointList.back()->begin();
1230                }
1231            }
1232        } else {
1233            closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
1234            addNewCheckpoint_UNLOCKED(id);
1235        }
1236    } else {
1237        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
1238        collapseCheckpoints(id);
1239        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
1240        if (curr_remains > new_remains) {
1241            size_t diff = curr_remains - new_remains;
1242            stats.decrDiskQueueSize(diff);
1243            vbucket->decrDirtyQueueSize(diff);
1244        } else if (curr_remains < new_remains) {
1245            size_t diff = new_remains - curr_remains;
1246            stats.diskQueueSize.fetch_add(diff);
1247            vbucket->dirtyQueueSize.fetch_add(diff);
1248        }
1249    }
1250}
1251
1252void CheckpointManager::collapseCheckpoints(uint64_t id) {
1253    cb_assert(!checkpointList.empty());
1254
1255    std::map<std::string, std::pair<uint64_t, bool> > cursorMap;
1256    cursor_index::iterator itr;
1257    for (itr = tapCursors.begin(); itr != tapCursors.end(); itr++) {
1258        Checkpoint* chk = *(itr->second.currentCheckpoint);
1259        const std::string& key = (*(itr->second.currentPos))->getKey();
1260        bool cursor_on_chk_start = false;
1261        if ((*(itr->second.currentPos))->getOperation() == queue_op_checkpoint_start) {
1262            cursor_on_chk_start = true;
1263        }
1264        cursorMap[itr->first.c_str()] =
1265            std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1266    }
1267
1268    Checkpoint* chk = *(persistenceCursor.currentCheckpoint);
1269    std::string key = (*(persistenceCursor.currentPos))->getKey();
1270    bool cursor_on_chk_start = false;
1271    if ((*(persistenceCursor.currentPos))->getOperation() == queue_op_checkpoint_start) {
1272        cursor_on_chk_start = true;
1273    }
1274    cursorMap[persistenceCursor.name.c_str()] =
1275        std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1276
1277    setOpenCheckpointId_UNLOCKED(id);
1278
1279    std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
1280    ++rit; // Move to the last closed checkpoint.
1281    size_t numDuplicatedItems = 0, numMetaItems = 0;
1282    // Collapse all checkpoints.
1283    for (; rit != checkpointList.rend(); ++rit) {
1284        size_t numAddedItems = checkpointList.back()->
1285                               mergePrevCheckpoint(*rit);
1286        numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
1287        numMetaItems += 2; // checkpoint start and end meta items
1288        delete *rit;
1289    }
1290    numItems.fetch_sub(numDuplicatedItems + numMetaItems);
1291
1292    if (checkpointList.size() > 1) {
1293        checkpointList.erase(checkpointList.begin(), --checkpointList.end());
1294    }
1295    cb_assert(checkpointList.size() == 1);
1296
1297    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
1298        checkpointList.back()->popBackCheckpointEndItem();
1299        --numItems;
1300        checkpointList.back()->setState(CHECKPOINT_OPEN);
1301    }
1302    putCursorsInCollapsedChk(cursorMap, checkpointList.begin());
1303}
1304
1305void CheckpointManager::
1306putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &cursors,
1307                         std::list<Checkpoint*>::iterator chkItr) {
1308    size_t i;
1309    Checkpoint *chk = *chkItr;
1310    std::list<queued_item>::iterator cit = chk->begin();
1311    std::list<queued_item>::iterator last = chk->begin();
1312    for (i = 0; cit != chk->end(); ++i, ++cit) {
1313        uint64_t id = chk->getMutationIdForKey((*cit)->getKey());
1314        std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1315        while (mit != cursors.end()) {
1316            std::pair<uint64_t, bool> val = mit->second;
1317            if (val.first < id || (val.first == id && val.second &&
1318                                   (*last)->getOperation() == queue_op_checkpoint_start)) {
1319                if (mit->first.compare(persistenceCursor.name) == 0) {
1320                    persistenceCursor.currentCheckpoint = chkItr;
1321                    persistenceCursor.currentPos = last;
1322                    persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1323                    chk->registerCursorName(persistenceCursor.name);
1324                } else {
1325                    cursor_index::iterator cc = tapCursors.find(mit->first);
1326                    if (cc == tapCursors.end() ||
1327                        cc->second.fromBeginningOnChkCollapse) {
1328                        ++mit;
1329                        continue;
1330                    }
1331                    cc->second.currentCheckpoint = chkItr;
1332                    cc->second.currentPos = last;
1333                    cc->second.offset = (i > 0) ? i - 1 : 0;
1334                    chk->registerCursorName(cc->second.name);
1335                }
1336                cursors.erase(mit++);
1337            } else {
1338                ++mit;
1339            }
1340        }
1341
1342        last = cit;
1343        if (cursors.empty()) {
1344            break;
1345        }
1346    }
1347
1348    std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1349    for (; mit != cursors.end(); ++mit) {
1350        if (mit->first.compare(persistenceCursor.name) == 0) {
1351            persistenceCursor.currentCheckpoint = chkItr;
1352            persistenceCursor.currentPos = last;
1353            persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1354            chk->registerCursorName(persistenceCursor.name);
1355        } else {
1356            cursor_index::iterator cc = tapCursors.find(mit->first);
1357            if (cc == tapCursors.end()) {
1358                continue;
1359            }
1360            cc->second.currentCheckpoint = chkItr;
1361            if (cc->second.fromBeginningOnChkCollapse) {
1362                cc->second.currentPos = chk->begin();
1363                cc->second.offset = 0;
1364            } else {
1365                cc->second.currentPos = last;
1366                cc->second.offset = (i > 0) ? i - 1 : 0;
1367            }
1368            chk->registerCursorName(cc->second.name);
1369        }
1370    }
1371}
1372
1373bool CheckpointManager::hasNext(const std::string &name) {
1374    LockHolder lh(queueLock);
1375    cursor_index::iterator it = tapCursors.find(name);
1376    if (it == tapCursors.end() || getOpenCheckpointId_UNLOCKED() == 0) {
1377        return false;
1378    }
1379
1380    bool hasMore = true;
1381    std::list<queued_item>::iterator curr = it->second.currentPos;
1382    ++curr;
1383    if (curr == (*(it->second.currentCheckpoint))->end() &&
1384        (*(it->second.currentCheckpoint)) == checkpointList.back()) {
1385        hasMore = false;
1386    }
1387    return hasMore;
1388}
1389
1390queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
1391                                          enum queue_operation checkpoint_op) {
1392    cb_assert(checkpoint_op == queue_op_checkpoint_start ||
1393           checkpoint_op == queue_op_checkpoint_end ||
1394           checkpoint_op == queue_op_empty);
1395
1396    uint64_t bySeqno;
1397    std::stringstream key;
1398    if (checkpoint_op == queue_op_checkpoint_start) {
1399        key << "checkpoint_start";
1400        bySeqno = lastBySeqNo + 1;
1401    } else if (checkpoint_op == queue_op_empty) {
1402        key << "dummy_key";
1403        bySeqno = lastBySeqNo;
1404    } else {
1405        key << "checkpoint_end";
1406        bySeqno = lastBySeqNo;
1407    }
1408    queued_item qi(new Item(key.str(), vbid, checkpoint_op, id, bySeqno));
1409    return qi;
1410}
1411
1412bool CheckpointManager::hasNextForPersistence() {
1413    LockHolder lh(queueLock);
1414    bool hasMore = true;
1415    std::list<queued_item>::iterator curr = persistenceCursor.currentPos;
1416    ++curr;
1417    if (curr == (*(persistenceCursor.currentCheckpoint))->end() &&
1418        (*(persistenceCursor.currentCheckpoint)) == checkpointList.back()) {
1419        hasMore = false;
1420    }
1421    return hasMore;
1422}
1423
1424uint64_t CheckpointManager::createNewCheckpoint() {
1425    LockHolder lh(queueLock);
1426    if (checkpointList.back()->getNumItems() > 0) {
1427        uint64_t chk_id = checkpointList.back()->getId();
1428        closeOpenCheckpoint_UNLOCKED(chk_id);
1429        addNewCheckpoint_UNLOCKED(chk_id + 1);
1430    }
1431    return checkpointList.back()->getId();
1432}
1433
1434void CheckpointManager::decrCursorOffset_UNLOCKED(CheckpointCursor &cursor,
1435                                                  size_t decr) {
1436    if (cursor.offset >= decr) {
1437        cursor.offset.fetch_sub(decr);
1438    } else {
1439        cursor.offset = 0;
1440        LOG(EXTENSION_LOG_INFO,
1441            "%s cursor offset is negative. Reset it to 0.",
1442            cursor.name.c_str());
1443    }
1444}
1445
1446void CheckpointManager::decrCursorPos_UNLOCKED(CheckpointCursor &cursor) {
1447    if (cursor.currentPos != (*(cursor.currentCheckpoint))->begin()) {
1448        --(cursor.currentPos);
1449    }
1450}
1451
1452uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
1453    LockHolder lh(queueLock);
1454    return pCursorPreCheckpointId;
1455}
1456
1457uint64_t CheckpointManager::getPersistenceCursorSeqno() {
1458    LockHolder lh(queueLock);
1459    std::list<Checkpoint*>::iterator itr = persistenceCursor.currentCheckpoint;
1460    pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
1461
1462    std::list<queued_item>::iterator curr_pos = persistenceCursor.currentPos;
1463    return (*curr_pos)->getBySeqno();
1464}
1465
1466void CheckpointConfig::addConfigChangeListener(
1467                                         EventuallyPersistentEngine &engine) {
1468    Configuration &configuration = engine.getConfiguration();
1469    configuration.addValueChangedListener("chk_period",
1470             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1471    configuration.addValueChangedListener("chk_max_items",
1472             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1473    configuration.addValueChangedListener("max_checkpoints",
1474             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1475    configuration.addValueChangedListener("item_num_based_new_chk",
1476             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1477    configuration.addValueChangedListener("keep_closed_chks",
1478             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1479}
1480
1481CheckpointConfig::CheckpointConfig(EventuallyPersistentEngine &e) {
1482    Configuration &config = e.getConfiguration();
1483    checkpointPeriod = config.getChkPeriod();
1484    checkpointMaxItems = config.getChkMaxItems();
1485    maxCheckpoints = config.getMaxCheckpoints();
1486    itemNumBasedNewCheckpoint = config.isItemNumBasedNewChk();
1487    keepClosedCheckpoints = config.isKeepClosedChks();
1488}
1489
1490bool CheckpointConfig::validateCheckpointMaxItemsParam(size_t
1491                                                       checkpoint_max_items) {
1492    if (checkpoint_max_items < MIN_CHECKPOINT_ITEMS ||
1493        checkpoint_max_items > MAX_CHECKPOINT_ITEMS) {
1494        std::stringstream ss;
1495        ss << "New checkpoint_max_items param value " << checkpoint_max_items
1496           << " is not ranged between the min allowed value " <<
1497           MIN_CHECKPOINT_ITEMS
1498           << " and max value " << MAX_CHECKPOINT_ITEMS;
1499        LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
1500        return false;
1501    }
1502    return true;
1503}
1504
1505bool CheckpointConfig::validateCheckpointPeriodParam(
1506                                                   size_t checkpoint_period) {
1507    if (checkpoint_period < MIN_CHECKPOINT_PERIOD ||
1508        checkpoint_period > MAX_CHECKPOINT_PERIOD) {
1509        std::stringstream ss;
1510        ss << "New checkpoint_period param value " << checkpoint_period
1511           << " is not ranged between the min allowed value " <<
1512              MIN_CHECKPOINT_PERIOD
1513           << " and max value " << MAX_CHECKPOINT_PERIOD;
1514        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1515        return false;
1516    }
1517    return true;
1518}
1519
1520bool CheckpointConfig::validateMaxCheckpointsParam(size_t max_checkpoints) {
1521    if (max_checkpoints < DEFAULT_MAX_CHECKPOINTS ||
1522        max_checkpoints > MAX_CHECKPOINTS_UPPER_BOUND) {
1523        std::stringstream ss;
1524        ss << "New max_checkpoints param value " << max_checkpoints
1525           << " is not ranged between the min allowed value " <<
1526              DEFAULT_MAX_CHECKPOINTS
1527           << " and max value " << MAX_CHECKPOINTS_UPPER_BOUND;
1528        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1529        return false;
1530    }
1531    return true;
1532}
1533
1534void CheckpointConfig::setCheckpointPeriod(size_t value) {
1535    if (!validateCheckpointPeriodParam(value)) {
1536        value = DEFAULT_CHECKPOINT_PERIOD;
1537    }
1538    checkpointPeriod = static_cast<rel_time_t>(value);
1539}
1540
1541void CheckpointConfig::setCheckpointMaxItems(size_t value) {
1542    if (!validateCheckpointMaxItemsParam(value)) {
1543        value = DEFAULT_CHECKPOINT_ITEMS;
1544    }
1545    checkpointMaxItems = value;
1546}
1547
1548void CheckpointConfig::setMaxCheckpoints(size_t value) {
1549    if (!validateMaxCheckpointsParam(value)) {
1550        value = DEFAULT_MAX_CHECKPOINTS;
1551    }
1552    maxCheckpoints = value;
1553}
1554
1555void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
1556    LockHolder lh(queueLock);
1557    char buf[256];
1558
1559    snprintf(buf, sizeof(buf), "vb_%d:open_checkpoint_id", vbucketId);
1560    add_casted_stat(buf, getOpenCheckpointId_UNLOCKED(), add_stat, cookie);
1561    snprintf(buf, sizeof(buf), "vb_%d:last_closed_checkpoint_id", vbucketId);
1562    add_casted_stat(buf, getLastClosedCheckpointId_UNLOCKED(),
1563    add_stat, cookie);
1564    snprintf(buf, sizeof(buf), "vb_%d:num_tap_cursors", vbucketId);
1565    add_casted_stat(buf, tapCursors.size(), add_stat, cookie);
1566    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoint_items", vbucketId);
1567    add_casted_stat(buf, numItems, add_stat, cookie);
1568    snprintf(buf, sizeof(buf), "vb_%d:num_open_checkpoint_items", vbucketId);
1569    add_casted_stat(buf, checkpointList.empty() ? 0 :
1570                    checkpointList.back()->getNumItems(),
1571                    add_stat, cookie);
1572    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoints", vbucketId);
1573    add_casted_stat(buf, checkpointList.size(), add_stat, cookie);
1574    snprintf(buf, sizeof(buf), "vb_%d:num_items_for_persistence", vbucketId);
1575    add_casted_stat(buf, getNumItemsForPersistence_UNLOCKED(),
1576                    add_stat, cookie);
1577
1578    cursor_index::iterator tap_it = tapCursors.begin();
1579    for (; tap_it != tapCursors.end(); ++tap_it) {
1580        snprintf(buf, sizeof(buf),
1581                 "vb_%d:%s:cursor_checkpoint_id", vbucketId,
1582                 tap_it->first.c_str());
1583        add_casted_stat(buf, (*(tap_it->second.currentCheckpoint))->getId(),
1584                        add_stat, cookie);
1585    }
1586}
1587