xref: /3.0.3-GA/ep-engine/src/checkpoint.cc (revision 2fe13250)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string>
21#include <utility>
22#include <vector>
23
24#include "checkpoint.h"
25#include "ep_engine.h"
26#define STATWRITER_NAMESPACE checkpoint
27#include "statwriter.h"
28#undef STATWRITER_NAMESPACE
29#include "vbucket.h"
30
31/**
32 * A listener class to update checkpoint related configs at runtime.
33 */
34class CheckpointConfigChangeListener : public ValueChangedListener {
35public:
36    CheckpointConfigChangeListener(CheckpointConfig &c) : config(c) { }
37    virtual ~CheckpointConfigChangeListener() { }
38
39    virtual void sizeValueChanged(const std::string &key, size_t value) {
40        if (key.compare("chk_period") == 0) {
41            config.setCheckpointPeriod(value);
42        } else if (key.compare("chk_max_items") == 0) {
43            config.setCheckpointMaxItems(value);
44        } else if (key.compare("max_checkpoints") == 0) {
45            config.setMaxCheckpoints(value);
46        }
47    }
48
49    virtual void booleanValueChanged(const std::string &key, bool value) {
50        if (key.compare("item_num_based_new_chk") == 0) {
51            config.allowItemNumBasedNewCheckpoint(value);
52        } else if (key.compare("keep_closed_chks") == 0) {
53            config.allowKeepClosedCheckpoints(value);
54        }
55    }
56
57private:
58    CheckpointConfig &config;
59};
60
61Checkpoint::~Checkpoint() {
62    LOG(EXTENSION_LOG_INFO,
63        "Checkpoint %llu for vbucket %d is purged from memory",
64        checkpointId, vbucketId);
65    stats.memOverhead.fetch_sub(memorySize());
66    cb_assert(stats.memOverhead.load() < GIGANTOR);
67}
68
69void Checkpoint::setState(checkpoint_state state) {
70    checkpointState = state;
71}
72
73void Checkpoint::popBackCheckpointEndItem() {
74    if (!toWrite.empty() &&
75        toWrite.back()->getOperation() == queue_op_checkpoint_end) {
76        keyIndex.erase(toWrite.back()->getKey());
77        toWrite.pop_back();
78    }
79}
80
81bool Checkpoint::keyExists(const std::string &key) {
82    return keyIndex.find(key) != keyIndex.end();
83}
84
85queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
86                                     CheckpointManager *checkpointManager) {
87    assert (checkpointState == CHECKPOINT_OPEN);
88    queue_dirty_t rv;
89
90    checkpoint_index::iterator it = keyIndex.find(qi->getKey());
91    // Check if this checkpoint already had an item for the same key.
92    if (it != keyIndex.end()) {
93        rv = EXISTING_ITEM;
94        std::list<queued_item>::iterator currPos = it->second.position;
95        uint64_t currMutationId = it->second.mutation_id;
96        CheckpointCursor &pcursor = checkpointManager->persistenceCursor;
97        queued_item &pqi = *(pcursor.currentPos);
98
99        if (*(pcursor.currentCheckpoint) == this) {
100            // If the existing item is in the left-hand side of the item
101            // pointed by the persistence cursor, decrease the persistence
102            // cursor's offset by 1.
103            const std::string &key = pqi->getKey();
104            checkpoint_index::iterator ita = keyIndex.find(key);
105            if (ita != keyIndex.end()) {
106                uint64_t mutationId = ita->second.mutation_id;
107                if (currMutationId <= mutationId &&
108                    pqi->getOperation() != queue_op_checkpoint_start) {
109                    checkpointManager->decrCursorOffset_UNLOCKED(pcursor, 1);
110                    rv = PERSIST_AGAIN;
111                }
112            }
113            // If the persistence cursor points to the existing item for the
114            // same key,
115            // shift the cursor left by 1.
116            if (pcursor.currentPos == currPos) {
117                checkpointManager->decrCursorPos_UNLOCKED(pcursor);
118            }
119        }
120
121        cursor_index::iterator map_it = checkpointManager->tapCursors.begin();
122        for (; map_it != checkpointManager->tapCursors.end(); ++map_it) {
123
124            if (*(map_it->second.currentCheckpoint) == this) {
125                queued_item &tqi = *(map_it->second.currentPos);
126                const std::string &key = tqi->getKey();
127                checkpoint_index::iterator ita = keyIndex.find(key);
128                if (ita != keyIndex.end()) {
129                    uint64_t mutationId = ita->second.mutation_id;
130                    if (currMutationId <= mutationId &&
131                        tqi->getOperation() != queue_op_checkpoint_start) {
132                        checkpointManager->
133                                  decrCursorOffset_UNLOCKED(map_it->second, 1);
134                    }
135                }
136                // If an TAP cursor points to the existing item for the same
137                // key, shift it left by 1
138                if (map_it->second.currentPos == currPos) {
139                    checkpointManager->decrCursorPos_UNLOCKED(map_it->second);
140                }
141            }
142        }
143
144        toWrite.push_back(qi);
145        // Remove the existing item for the same key from the list.
146        toWrite.erase(currPos);
147    } else {
148        if (qi->getOperation() == queue_op_set ||
149            qi->getOperation() == queue_op_del) {
150            ++numItems;
151        }
152        rv = NEW_ITEM;
153        // Push the new item into the list
154        toWrite.push_back(qi);
155    }
156
157    if (qi->getNKey() > 0) {
158        std::list<queued_item>::iterator last = toWrite.end();
159        // --last is okay as the list is not empty now.
160        index_entry entry = {--last, qi->getBySeqno()};
161        // Set the index of the key to the new item that is pushed back into
162        // the list.
163        keyIndex[qi->getKey()] = entry;
164        if (rv == NEW_ITEM) {
165            size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
166                                  sizeof(queued_item);
167            memOverhead += newEntrySize;
168            stats.memOverhead.fetch_add(newEntrySize);
169            cb_assert(stats.memOverhead.load() < GIGANTOR);
170        }
171    }
172    return rv;
173}
174
175size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
176    size_t numNewItems = 0;
177    size_t newEntryMemOverhead = 0;
178    std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
179
180    LOG(EXTENSION_LOG_INFO,
181        "Collapse the checkpoint %llu into the checkpoint %llu for vbucket %d",
182        pPrevCheckpoint->getId(), checkpointId, vbucketId);
183
184    std::list<queued_item>::iterator itr = toWrite.begin();
185    uint64_t seqno = pPrevCheckpoint->getMutationIdForKey("dummy_key");
186    keyIndex["dummy_key"].mutation_id = seqno;
187    (*itr)->setBySeqno(seqno);
188
189    seqno = pPrevCheckpoint->getMutationIdForKey("checkpoint_start");
190    keyIndex["checkpoint_start"].mutation_id = seqno;
191    ++itr;
192    (*itr)->setBySeqno(seqno);
193
194    for (; rit != pPrevCheckpoint->rend(); ++rit) {
195        const std::string &key = (*rit)->getKey();
196        if ((*rit)->getOperation() != queue_op_del &&
197            (*rit)->getOperation() != queue_op_set) {
198            continue;
199        }
200        checkpoint_index::iterator it = keyIndex.find(key);
201        if (it == keyIndex.end()) {
202            std::list<queued_item>::iterator pos = toWrite.begin();
203            // Skip the first two meta items
204            ++pos; ++pos;
205            toWrite.insert(pos, *rit);
206            index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
207                                                    getMutationIdForKey(key))};
208            keyIndex[key] = entry;
209            newEntryMemOverhead += key.size() + sizeof(index_entry);
210            ++numItems;
211            ++numNewItems;
212        }
213    }
214    memOverhead += newEntryMemOverhead;
215    stats.memOverhead.fetch_add(newEntryMemOverhead);
216    cb_assert(stats.memOverhead.load() < GIGANTOR);
217    return numNewItems;
218}
219
220uint64_t Checkpoint::getMutationIdForKey(const std::string &key) {
221    uint64_t mid = 0;
222    checkpoint_index::iterator it = keyIndex.find(key);
223    if (it != keyIndex.end()) {
224        mid = it->second.mutation_id;
225    }
226    return mid;
227}
228
229CheckpointManager::~CheckpointManager() {
230    LockHolder lh(queueLock);
231    std::list<Checkpoint*>::iterator it = checkpointList.begin();
232    while(it != checkpointList.end()) {
233        delete *it;
234        ++it;
235    }
236}
237
238uint64_t CheckpointManager::getOpenCheckpointId_UNLOCKED() {
239    if (checkpointList.empty()) {
240        return 0;
241    }
242
243    uint64_t id = checkpointList.back()->getId();
244    return checkpointList.back()->getState() == CHECKPOINT_OPEN ? id : id + 1;
245}
246
247uint64_t CheckpointManager::getOpenCheckpointId() {
248    LockHolder lh(queueLock);
249    return getOpenCheckpointId_UNLOCKED();
250}
251
252uint64_t CheckpointManager::getLastClosedCheckpointId_UNLOCKED() {
253    if (!isCollapsedCheckpoint) {
254        uint64_t id = getOpenCheckpointId_UNLOCKED();
255        lastClosedCheckpointId = id > 0 ? (id - 1) : 0;
256    }
257    return lastClosedCheckpointId;
258}
259
260uint64_t CheckpointManager::getLastClosedCheckpointId() {
261    LockHolder lh(queueLock);
262    return getLastClosedCheckpointId_UNLOCKED();
263}
264
265void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
266    if (!checkpointList.empty()) {
267        // Update the checkpoint_start item with the new Id.
268        std::list<queued_item>::iterator it =
269            ++(checkpointList.back()->begin());
270        (*it)->setRevSeqno(id);
271        if (checkpointList.back()->getId() == 0) {
272            (*it)->setBySeqno(lastBySeqNo + 1);
273        }
274        checkpointList.back()->setId(id);
275        LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %llu "
276            "for vbucket %d, bySeqno is %llu, max is %llu", id, vbucketId,
277            (*it)->getBySeqno(), lastBySeqNo);
278
279    }
280}
281
282bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
283    // This is just for making sure that the current checkpoint should be
284    // closed.
285    if (!checkpointList.empty() &&
286        checkpointList.back()->getState() == CHECKPOINT_OPEN) {
287        closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
288    }
289
290    LOG(EXTENSION_LOG_INFO, "Create a new open checkpoint %llu for vbucket %d",
291        id, vbucketId);
292
293    Checkpoint *checkpoint = new Checkpoint(stats, id, vbucketId,
294                                            CHECKPOINT_OPEN);
295    // Add a dummy item into the new checkpoint, so that any cursor referring
296    // to the actual first
297    // item in this new checkpoint can be safely shifted left by 1 if the
298    // first item is removed
299    // and pushed into the tail.
300    queued_item qi = createCheckpointItem(0, 0xffff, queue_op_empty);
301    checkpoint->queueDirty(qi, this);
302
303    // This item represents the start of the new checkpoint and is also sent to the slave node.
304    qi = createCheckpointItem(id, vbucketId, queue_op_checkpoint_start);
305    checkpoint->queueDirty(qi, this);
306    ++numItems;
307    checkpointList.push_back(checkpoint);
308
309    return true;
310}
311
312bool CheckpointManager::addNewCheckpoint(uint64_t id) {
313    LockHolder lh(queueLock);
314    return addNewCheckpoint_UNLOCKED(id);
315}
316
317bool CheckpointManager::closeOpenCheckpoint_UNLOCKED(uint64_t id) {
318    if (checkpointList.empty()) {
319        return false;
320    }
321    if (id != checkpointList.back()->getId() ||
322        checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
323        return true;
324    }
325
326    LOG(EXTENSION_LOG_INFO, "Close the open checkpoint %llu for vbucket %d",
327        id, vbucketId);
328
329    // This item represents the end of the current open checkpoint and is sent to the slave node.
330    queued_item qi = createCheckpointItem(id, vbucketId,
331                                          queue_op_checkpoint_end);
332    checkpointList.back()->queueDirty(qi, this);
333    ++numItems;
334    checkpointList.back()->setState(CHECKPOINT_CLOSED);
335    return true;
336}
337
338bool CheckpointManager::closeOpenCheckpoint(uint64_t id) {
339    LockHolder lh(queueLock);
340    return closeOpenCheckpoint_UNLOCKED(id);
341}
342
343void CheckpointManager::registerPersistenceCursor() {
344    LockHolder lh(queueLock);
345    cb_assert(!checkpointList.empty());
346    persistenceCursor.currentCheckpoint = checkpointList.begin();
347    persistenceCursor.currentPos = checkpointList.front()->begin();
348    checkpointList.front()->registerCursorName(persistenceCursor.name);
349}
350
351bool CheckpointManager::registerTAPCursor(const std::string &name,
352                                          uint64_t checkpointId,
353                                          bool alwaysFromBeginning) {
354    LockHolder lh(queueLock);
355    return registerTAPCursor_UNLOCKED(name,
356                                      checkpointId,
357                                      alwaysFromBeginning);
358}
359
360uint64_t CheckpointManager::registerTAPCursorBySeqno(const std::string &name,
361                                                     uint64_t startBySeqno,
362                                                     uint64_t endBySeqno) {
363    LockHolder lh(queueLock);
364    cb_assert(!checkpointList.empty());
365    cb_assert(checkpointList.back()->getHighSeqno() >= startBySeqno);
366
367    size_t skipped = 0;
368    uint64_t seqnoToStart = std::numeric_limits<uint64_t>::max();
369    bool needToFindStartSeqno =
370        startBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
371    bool needToFindEndSeqno =
372        endBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
373
374    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
375    for (; itr != checkpointList.end(); ++itr) {
376        uint64_t en = (*itr)->getHighSeqno();
377        uint64_t st = (*itr)->getLowSeqno();
378        if (needToFindStartSeqno) {
379            if (startBySeqno < st) {
380                tapCursors[name] = CheckpointCursor(name, itr, (*itr)->begin(),
381                                                    skipped, false);
382                (*itr)->registerCursorName(name);
383                seqnoToStart = (*itr)->getLowSeqno();
384                needToFindStartSeqno = false;
385            } else if (startBySeqno == st) {
386                std::list<queued_item>::iterator iitr = (*itr)->begin();
387
388                //Advance the iterator to point to the first item in the
389                //checkpoint list
390                std::advance(iitr, 2);
391                skipped = 2;
392
393                tapCursors[name] = CheckpointCursor(name, itr, iitr, skipped,
394                                                    false);
395                (*itr)->registerCursorName(name);
396                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno()) + 1;
397                needToFindStartSeqno = false;
398            } else if (startBySeqno <= en) {
399                std::list<queued_item>::iterator iitr = (*itr)->begin();
400                while (++iitr != (*itr)->end() &&
401                       startBySeqno > static_cast<uint64_t>((*iitr)->getBySeqno())) {
402                    skipped++;
403                }
404
405                if (iitr == (*itr)->end() ||
406                    startBySeqno < static_cast<uint64_t>((*iitr)->getBySeqno())) {
407                    --iitr;
408                }
409
410                size_t remaining = (numItems > skipped) ? numItems - skipped : 0;
411                tapCursors[name] = CheckpointCursor(name, itr, iitr, remaining,
412                                                    false);
413                (*itr)->registerCursorName(name);
414                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno());
415                needToFindStartSeqno = false;
416            } else {
417                skipped += (*itr)->getNumItems() + 2;
418            }
419        }
420        if (needToFindEndSeqno) {
421            if ((*itr)->getNumItems() > 0 && endBySeqno <= en) {
422                if ((*itr)->getState() == CHECKPOINT_OPEN) {
423                    // Closed the open checkpoint and create a new one by force.
424                    checkOpenCheckpoint_UNLOCKED(true, true);
425                }
426                needToFindEndSeqno = false;
427            }
428        }
429        if (!needToFindStartSeqno && !needToFindEndSeqno) {
430            break;
431        }
432    }
433
434    if (seqnoToStart == std::numeric_limits<uint64_t>::max()) {
435        /*
436         * We should never get here since this would mean that the sequence number
437         * we are looking for is higher than anything currently assigned and there
438         * is already an assert above for this case.
439         */
440        LOG(EXTENSION_LOG_WARNING, "Cursor not registered into vb %d "
441            " for stream '%s' because seqno %llu is too high",
442            vbucketId, name.c_str(), startBySeqno);
443    }
444    return seqnoToStart;
445}
446
447bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
448                                                   uint64_t checkpointId,
449                                                   bool alwaysFromBeginning) {
450    cb_assert(!checkpointList.empty());
451
452    bool found = false;
453    std::list<Checkpoint*>::iterator it = checkpointList.begin();
454    for (; it != checkpointList.end(); ++it) {
455        if (checkpointId == (*it)->getId()) {
456            found = true;
457            break;
458        }
459    }
460
461    LOG(EXTENSION_LOG_INFO,
462        "Register the tap cursor with the name \"%s\" for vbucket %d",
463        name.c_str(), vbucketId);
464
465    // If the tap cursor exists, remove its name from the checkpoint that is
466    // currently referenced by the tap cursor.
467    cursor_index::iterator map_it = tapCursors.find(name);
468    if (map_it != tapCursors.end()) {
469        (*(map_it->second.currentCheckpoint))->removeCursorName(name);
470    }
471
472    if (!found) {
473        for (it = checkpointList.begin(); it != checkpointList.end(); ++it) {
474            if (pCursorPreCheckpointId < (*it)->getId() ||
475                pCursorPreCheckpointId == 0) {
476                break;
477            }
478        }
479
480        LOG(EXTENSION_LOG_DEBUG,
481            "Checkpoint %llu for vbucket %d doesn't exist in memory. "
482            "Set the cursor with the name \"%s\" to checkpoint %d.\n",
483            checkpointId, vbucketId, name.c_str(), (*it)->getId());
484
485        cb_assert(it != checkpointList.end());
486
487        size_t offset = 0;
488        std::list<Checkpoint*>::iterator pos = checkpointList.begin();
489        for (; pos != it; ++pos) {
490            offset += (*pos)->getNumItems() + 2;
491        }
492
493        tapCursors[name] = CheckpointCursor(name, it, (*it)->begin(), offset,
494                                            true);
495        (*it)->registerCursorName(name);
496    } else {
497        size_t offset = 0;
498        std::list<queued_item>::iterator curr;
499
500        LOG(EXTENSION_LOG_DEBUG,
501            "Checkpoint %llu for vbucket %d exists in memory. "
502            "Set the cursor with the name \"%s\" to the checkpoint %llu\n",
503            checkpointId, vbucketId, name.c_str(), checkpointId);
504
505        if (!alwaysFromBeginning &&
506            map_it != tapCursors.end() &&
507            (*(map_it->second.currentCheckpoint))->getId() == (*it)->getId()) {
508            // If the cursor is currently in the checkpoint to start with,
509            // simply start from
510            // its current position.
511            curr = map_it->second.currentPos;
512            offset = map_it->second.offset;
513        } else {
514            // Set the cursor's position to the begining of the checkpoint to
515            // start with
516            curr = (*it)->begin();
517            std::list<Checkpoint*>::iterator pos = checkpointList.begin();
518            for (; pos != it; ++pos) {
519                offset += (*pos)->getNumItems() + 2;
520                // 2 is for checkpoint start and end items.
521            }
522        }
523
524        tapCursors[name] = CheckpointCursor(name, it, curr, offset, true);
525        // Register the tap cursor's name to the checkpoint.
526        (*it)->registerCursorName(name);
527    }
528
529    return found;
530}
531
532bool CheckpointManager::removeTAPCursor(const std::string &name) {
533    LockHolder lh(queueLock);
534
535    cursor_index::iterator it = tapCursors.find(name);
536    if (it == tapCursors.end()) {
537        return false;
538    }
539
540    LOG(EXTENSION_LOG_INFO,
541        "Remove the checkpoint cursor with the name \"%s\" from vbucket %d",
542        name.c_str(), vbucketId);
543
544    // We can simply remove the cursor's name from the checkpoint to which it
545    // currently belongs,
546    // by calling
547    // (*(it->second.currentCheckpoint))->removeCursorName(name);
548    // However, we just want to do more sanity checks by looking at each
549    // checkpoint. This won't
550    // cause much overhead because the max number of checkpoints allowed per
551    // vbucket is small.
552    std::list<Checkpoint*>::iterator cit = checkpointList.begin();
553    for (; cit != checkpointList.end(); ++cit) {
554        (*cit)->removeCursorName(name);
555    }
556
557    tapCursors.erase(it);
558    return true;
559}
560
561uint64_t CheckpointManager::getCheckpointIdForTAPCursor(
562                                                     const std::string &name) {
563    LockHolder lh(queueLock);
564    cursor_index::iterator it = tapCursors.find(name);
565    if (it == tapCursors.end()) {
566        return 0;
567    }
568
569    return (*(it->second.currentCheckpoint))->getId();
570}
571
572size_t CheckpointManager::getNumOfTAPCursors() {
573    LockHolder lh(queueLock);
574    return tapCursors.size();
575}
576
577size_t CheckpointManager::getNumCheckpoints() {
578    LockHolder lh(queueLock);
579    return checkpointList.size();
580}
581
582std::list<std::string> CheckpointManager::getTAPCursorNames() {
583    LockHolder lh(queueLock);
584    std::list<std::string> cursor_names;
585    cursor_index::iterator tap_it = tapCursors.begin();
586        for (; tap_it != tapCursors.end(); ++tap_it) {
587        cursor_names.push_back((tap_it->first));
588    }
589    return cursor_names;
590}
591
592bool CheckpointManager::isCheckpointCreationForHighMemUsage(
593                                              const RCPtr<VBucket> &vbucket) {
594    bool forceCreation = false;
595    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
596    // pesistence and tap cursors are all currently in the open checkpoint?
597    bool allCursorsInOpenCheckpoint =
598        (tapCursors.size() + 1) == checkpointList.back()->getNumberOfCursors();
599
600    if (memoryUsed > stats.mem_high_wat &&
601        allCursorsInOpenCheckpoint &&
602        (checkpointList.back()->getNumItems() >= MIN_CHECKPOINT_ITEMS ||
603         checkpointList.back()->getNumItems() == vbucket->ht.getNumInMemoryItems())) {
604        forceCreation = true;
605    }
606    return forceCreation;
607}
608
609size_t CheckpointManager::removeClosedUnrefCheckpoints(
610                                              const RCPtr<VBucket> &vbucket,
611                                              bool &newOpenCheckpointCreated) {
612
613    // This function is executed periodically by the non-IO dispatcher.
614    LockHolder lh(queueLock);
615    cb_assert(vbucket);
616    uint64_t oldCheckpointId = 0;
617    bool canCreateNewCheckpoint = false;
618    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
619        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
620         checkpointList.front()->getNumberOfCursors() == 0)) {
621        canCreateNewCheckpoint = true;
622    }
623    if (vbucket->getState() == vbucket_state_active &&
624        canCreateNewCheckpoint) {
625
626        bool forceCreation = isCheckpointCreationForHighMemUsage(vbucket);
627        // Check if this master active vbucket needs to create a new open
628        // checkpoint.
629        oldCheckpointId = checkOpenCheckpoint_UNLOCKED(forceCreation, true);
630    }
631    newOpenCheckpointCreated = oldCheckpointId > 0;
632
633    if (checkpointConfig.canKeepClosedCheckpoints()) {
634        double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
635        if (memoryUsed < stats.mem_high_wat &&
636            checkpointList.size() <= checkpointConfig.getMaxCheckpoints()) {
637            return 0;
638        }
639    }
640
641    size_t numUnrefItems = 0;
642    size_t numMetaItems = 0;
643    size_t numCheckpointsRemoved = 0;
644    std::list<Checkpoint*> unrefCheckpointList;
645    std::list<Checkpoint*>::iterator it = checkpointList.begin();
646    for (; it != checkpointList.end(); ++it) {
647        removeInvalidCursorsOnCheckpoint(*it);
648        if ((*it)->getNumberOfCursors() > 0 ||
649            (*it)->getId() > pCursorPreCheckpointId) {
650            break;
651        } else {
652            numUnrefItems += (*it)->getNumItems();
653            numMetaItems +=  2; // 2 is for checkpoint start and end items.
654            ++numCheckpointsRemoved;
655            if (checkpointConfig.canKeepClosedCheckpoints() &&
656                (checkpointList.size() - numCheckpointsRemoved) <=
657                 checkpointConfig.getMaxCheckpoints()) {
658                // Collect unreferenced closed checkpoints until the number
659                // of checkpoints is
660                // equal to the number of max checkpoints allowed.
661                ++it;
662                break;
663            }
664        }
665    }
666    numItems.fetch_sub(numUnrefItems + numMetaItems);
667    if (numUnrefItems > 0) {
668        decrCursorOffset_UNLOCKED(persistenceCursor, numUnrefItems);
669        cursor_index::iterator map_it = tapCursors.begin();
670        for (; map_it != tapCursors.end(); ++map_it) {
671            decrCursorOffset_UNLOCKED(map_it->second, numUnrefItems);
672        }
673    }
674    unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList,
675                               checkpointList.begin(), it);
676
677    // If any cursor on a replica vbucket or downstream active vbucket
678    // receiving checkpoints from
679    // the upstream master is very slow and causes more closed checkpoints in
680    // memory, collapse those closed checkpoints into a single one to reduce
681    // the memory overhead.
682    if (!checkpointConfig.canKeepClosedCheckpoints() &&
683        vbucket->getState() == vbucket_state_replica)
684    {
685        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
686        collapseClosedCheckpoints(unrefCheckpointList);
687        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
688        if (curr_remains > new_remains) {
689            size_t diff = curr_remains - new_remains;
690            stats.decrDiskQueueSize(diff);
691            vbucket->decrDirtyQueueSize(diff);
692        } else if (curr_remains < new_remains) {
693            size_t diff = new_remains - curr_remains;
694            stats.diskQueueSize.fetch_add(diff);
695            vbucket->dirtyQueueSize.fetch_add(diff);
696        }
697    }
698    lh.unlock();
699
700    std::list<Checkpoint*>::iterator chkpoint_it = unrefCheckpointList.begin();
701    for (; chkpoint_it != unrefCheckpointList.end(); ++chkpoint_it) {
702        delete *chkpoint_it;
703    }
704
705    return numUnrefItems;
706}
707
708void CheckpointManager::removeInvalidCursorsOnCheckpoint(
709                                                     Checkpoint *pCheckpoint) {
710    std::list<std::string> invalidCursorNames;
711    const std::set<std::string> &cursors = pCheckpoint->getCursorNameList();
712    std::set<std::string>::const_iterator cit = cursors.begin();
713    for (; cit != cursors.end(); ++cit) {
714        // Check it with persistence cursor
715        if ((*cit).compare(persistenceCursor.name) == 0) {
716            if (pCheckpoint != *(persistenceCursor.currentCheckpoint)) {
717                invalidCursorNames.push_back(*cit);
718            }
719        } else { // Check it with tap cursors
720            cursor_index::iterator mit = tapCursors.find(*cit);
721            if (mit == tapCursors.end() ||
722                pCheckpoint != *(mit->second.currentCheckpoint)) {
723                invalidCursorNames.push_back(*cit);
724            }
725        }
726    }
727
728    std::list<std::string>::iterator it = invalidCursorNames.begin();
729    for (; it != invalidCursorNames.end(); ++it) {
730        pCheckpoint->removeCursorName(*it);
731    }
732}
733
734void CheckpointManager::collapseClosedCheckpoints(
735                                      std::list<Checkpoint*> &collapsedChks) {
736    // If there are one open checkpoint and more than one closed checkpoint,
737    // collapse those
738    // closed checkpoints into one checkpoint to reduce the memory overhead.
739    if (checkpointList.size() > 2) {
740        std::map<std::string, std::pair<uint64_t, bool> > slowCursors;
741        std::set<std::string> fastCursors;
742        std::list<Checkpoint*>::iterator lastClosedChk = checkpointList.end();
743        --lastClosedChk; --lastClosedChk; // Move to the lastest closed chkpt.
744        fastCursors.insert((*lastClosedChk)->getCursorNameList().begin(),
745                           (*lastClosedChk)->getCursorNameList().end());
746        std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
747        ++rit; ++rit;// Move to the second lastest closed checkpoint.
748        size_t numDuplicatedItems = 0, numMetaItems = 0;
749        for (; rit != checkpointList.rend(); ++rit) {
750            size_t numAddedItems = (*lastClosedChk)->mergePrevCheckpoint(*rit);
751            numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
752            numMetaItems += 2; // checkpoint start and end meta items
753
754            std::set<std::string>::iterator nameItr =
755                (*rit)->getCursorNameList().begin();
756            for (; nameItr != (*rit)->getCursorNameList().end(); ++nameItr) {
757                if (nameItr->compare(persistenceCursor.name) == 0) {
758                    const std::string& key =
759                                 (*(persistenceCursor.currentPos))->getKey();
760                    bool cursor_on_chk_start = false;
761                    if ((*(persistenceCursor.currentPos))->getOperation() ==
762                        queue_op_checkpoint_start) {
763                        cursor_on_chk_start = true;
764                    }
765                    slowCursors[*nameItr] =
766                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
767                } else {
768                    cursor_index::iterator cc =
769                        tapCursors.find(*nameItr);
770                    const std::string& key = (*(cc->second.currentPos))->
771                                             getKey();
772                    bool cursor_on_chk_start = false;
773                    if ((*(cc->second.currentPos))->getOperation() ==
774                        queue_op_checkpoint_start) {
775                        cursor_on_chk_start = true;
776                    }
777                    slowCursors[*nameItr] =
778                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
779                }
780            }
781        }
782        putCursorsInCollapsedChk(slowCursors, lastClosedChk);
783
784        numItems.fetch_sub(numDuplicatedItems + numMetaItems);
785        Checkpoint *pOpenCheckpoint = checkpointList.back();
786        const std::set<std::string> &openCheckpointCursors =
787                                    pOpenCheckpoint->getCursorNameList();
788        fastCursors.insert(openCheckpointCursors.begin(),
789                           openCheckpointCursors.end());
790        std::set<std::string>::const_iterator cit = fastCursors.begin();
791        // Update the offset of each fast cursor.
792        for (; cit != fastCursors.end(); ++cit) {
793            if ((*cit).compare(persistenceCursor.name) == 0) {
794                decrCursorOffset_UNLOCKED(persistenceCursor,
795                                          numDuplicatedItems);
796            } else {
797                cursor_index::iterator mit = tapCursors.find(*cit);
798                if (mit != tapCursors.end()) {
799                    decrCursorOffset_UNLOCKED(mit->second,
800                                              numDuplicatedItems);
801                }
802            }
803        }
804        collapsedChks.splice(collapsedChks.end(), checkpointList,
805                             checkpointList.begin(),  lastClosedChk);
806    }
807}
808
809bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb, queued_item& qi,
810                                   bool genSeqno) {
811    LockHolder lh(queueLock);
812
813    cb_assert(vb);
814    bool canCreateNewCheckpoint = false;
815    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
816        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
817         checkpointList.front()->getNumberOfCursors() == 0)) {
818        canCreateNewCheckpoint = true;
819    }
820    if (vb->getState() == vbucket_state_active && canCreateNewCheckpoint) {
821        // Only the master active vbucket can create a next open checkpoint.
822        checkOpenCheckpoint_UNLOCKED(false, true);
823    }
824
825    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
826        addNewCheckpoint_UNLOCKED(checkpointList.back()->getId() + 1);
827    }
828
829    cb_assert(checkpointList.back()->getState() == CHECKPOINT_OPEN);
830
831    if (genSeqno) {
832        qi->setBySeqno(nextBySeqno());
833    } else {
834        lastBySeqNo = qi->getBySeqno();
835    }
836
837    queue_dirty_t result = checkpointList.back()->queueDirty(qi, this);
838    if (result == NEW_ITEM) {
839        ++numItems;
840    }
841
842    if (result != EXISTING_ITEM) {
843        ++stats.totalEnqueued;
844        ++stats.diskQueueSize;
845        vb->doStatsForQueueing(*qi, qi->size());
846    }
847
848    return result != EXISTING_ITEM;
849}
850
851void CheckpointManager::itemsPersisted() {
852    LockHolder lh(queueLock);
853    std::list<Checkpoint*>::iterator itr = persistenceCursor.currentCheckpoint;
854    pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
855
856    std::list<queued_item>::iterator curr_pos = persistenceCursor.currentPos;
857    pCursorSeqno = (*curr_pos)->getBySeqno();
858}
859
860void CheckpointManager::getAllItemsForPersistence(
861                                             std::vector<queued_item> &items) {
862    LockHolder lh(queueLock);
863    // Get all the items up to the end of the current open checkpoint.
864    while (incrCursor(persistenceCursor)) {
865        items.push_back(*(persistenceCursor.currentPos));
866    }
867
868    LOG(EXTENSION_LOG_DEBUG,
869        "Grab %ld items through the persistence cursor from vbucket %d",
870        items.size(), vbucketId);
871}
872
873queued_item CheckpointManager::nextItem(const std::string &name,
874                                        bool &isLastMutationItem,
875                                        uint64_t &endSeqno) {
876    LockHolder lh(queueLock);
877    cursor_index::iterator it = tapCursors.find(name);
878    if (it == tapCursors.end()) {
879        LOG(EXTENSION_LOG_WARNING,
880        "The cursor with name \"%s\" is not found in the checkpoint of vbucket"
881        "%d.\n", name.c_str(), vbucketId);
882        queued_item qi(new Item(std::string(""), 0xffff,
883                                queue_op_empty, 0, 0));
884        return qi;
885    }
886    if (checkpointList.back()->getId() == 0) {
887        LOG(EXTENSION_LOG_INFO,
888            "VBucket %d is still in backfill phase that doesn't allow "
889            " the tap cursor to fetch an item from it's current checkpoint",
890            vbucketId);
891        queued_item qi(new Item(std::string(""), 0xffff,
892                                queue_op_empty, 0, 0));
893        return qi;
894    }
895
896    CheckpointCursor &cursor = it->second;
897    if (incrCursor(cursor)) {
898        isLastMutationItem = isLastMutationItemInCheckpoint(cursor);
899        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
900        return *(cursor.currentPos);
901    } else {
902        isLastMutationItem = false;
903        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
904        queued_item qi(new Item(std::string(""), 0xffff,
905                                queue_op_empty, 0, 0));
906        return qi;
907    }
908}
909
910bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
911    if (++(cursor.currentPos) != (*(cursor.currentCheckpoint))->end()) {
912        queued_item &qi = *(cursor.currentPos);
913        if (qi->getOperation() != queue_op_checkpoint_start &&
914            qi->getOperation() != queue_op_checkpoint_end) {
915            ++(cursor.offset);
916        }
917        return true;
918    } else if (!moveCursorToNextCheckpoint(cursor)) {
919        --(cursor.currentPos);
920        return false;
921    }
922    return incrCursor(cursor);
923}
924
925void CheckpointManager::clear(vbucket_state_t vbState) {
926    LockHolder lh(queueLock);
927    std::list<Checkpoint*>::iterator it = checkpointList.begin();
928    // Remove all the checkpoints.
929    while(it != checkpointList.end()) {
930        delete *it;
931        ++it;
932    }
933    checkpointList.clear();
934    numItems = 0;
935
936    uint64_t checkpointId = vbState == vbucket_state_active ? 1 : 0;
937    // Add a new open checkpoint.
938    addNewCheckpoint_UNLOCKED(checkpointId);
939    resetCursors();
940}
941
942void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
943    // Reset the persistence cursor.
944    if (resetPersistenceCursor) {
945        persistenceCursor.currentCheckpoint = checkpointList.begin();
946        persistenceCursor.currentPos = checkpointList.front()->begin();
947        persistenceCursor.offset = 0;
948        checkpointList.front()->registerCursorName(persistenceCursor.name);
949    }
950
951    // Reset all the TAP cursors.
952    cursor_index::iterator cit = tapCursors.begin();
953    for (; cit != tapCursors.end(); ++cit) {
954        cit->second.currentCheckpoint = checkpointList.begin();
955        cit->second.currentPos = checkpointList.front()->begin();
956        cit->second.offset = 0;
957        checkpointList.front()->registerCursorName(cit->second.name);
958    }
959}
960
961void CheckpointManager::resetTAPCursors(const std::list<std::string> &cursors){
962    LockHolder lh(queueLock);
963    std::list<std::string>::const_iterator it = cursors.begin();
964    for (; it != cursors.end(); ++it) {
965        registerTAPCursor_UNLOCKED(*it, getOpenCheckpointId_UNLOCKED(), true);
966    }
967}
968
969bool CheckpointManager::moveCursorToNextCheckpoint(CheckpointCursor &cursor) {
970    if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_OPEN) {
971        return false;
972    } else if ((*(cursor.currentCheckpoint))->getState() ==
973                                                           CHECKPOINT_CLOSED) {
974        std::list<Checkpoint*>::iterator currCheckpoint =
975                                                      cursor.currentCheckpoint;
976        if (++currCheckpoint == checkpointList.end()) {
977            return false;
978        }
979    }
980
981    // Remove the cursor's name from its current checkpoint.
982    (*(cursor.currentCheckpoint))->removeCursorName(cursor.name);
983    // Move the cursor to the next checkpoint.
984    ++(cursor.currentCheckpoint);
985    cursor.currentPos = (*(cursor.currentCheckpoint))->begin();
986    // Register the cursor's name to its new current checkpoint.
987    (*(cursor.currentCheckpoint))->registerCursorName(cursor.name);
988    return true;
989}
990
991size_t CheckpointManager::getNumOpenChkItems() {
992    LockHolder lh(queueLock);
993    if (checkpointList.empty()) {
994        return 0;
995    }
996    return checkpointList.back()->getNumItems() + 1;
997}
998
999uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation,
1000                                                         bool timeBound) {
1001    int checkpoint_id = 0;
1002
1003    timeBound = timeBound &&
1004                (ep_real_time() - checkpointList.back()->getCreationTime()) >=
1005                checkpointConfig.getCheckpointPeriod();
1006    // Create the new open checkpoint if any of the following conditions is
1007    // satisfied:
1008    // (1) force creation due to online update or high memory usage
1009    // (2) current checkpoint is reached to the max number of items allowed.
1010    // (3) time elapsed since the creation of the current checkpoint is greater
1011    //     than the threshold
1012    if (forceCreation ||
1013        (checkpointConfig.isItemNumBasedNewCheckpoint() &&
1014         checkpointList.back()->getNumItems() >=
1015         checkpointConfig.getCheckpointMaxItems()) ||
1016        (checkpointList.back()->getNumItems() > 0 && timeBound)) {
1017
1018        checkpoint_id = checkpointList.back()->getId();
1019        closeOpenCheckpoint_UNLOCKED(checkpoint_id);
1020        addNewCheckpoint_UNLOCKED(checkpoint_id + 1);
1021    }
1022    return checkpoint_id;
1023}
1024
1025bool CheckpointManager::eligibleForEviction(const std::string &key) {
1026    LockHolder lh(queueLock);
1027    uint64_t smallest_mid;
1028
1029    // Get the mutation id of the item pointed by the slowest cursor.
1030    // This won't cause much overhead as the number of cursors per vbucket is
1031    // usually bounded to 3 (persistence cursor + 2 replicas).
1032    const std::string &pkey = (*(persistenceCursor.currentPos))->getKey();
1033    smallest_mid = (*(persistenceCursor.currentCheckpoint))->
1034                                                     getMutationIdForKey(pkey);
1035    cursor_index::iterator mit = tapCursors.begin();
1036    for (; mit != tapCursors.end(); ++mit) {
1037        const std::string &tkey = (*(mit->second.currentPos))->getKey();
1038        uint64_t mid = (*(mit->second.currentCheckpoint))->
1039                                                     getMutationIdForKey(tkey);
1040        if (mid < smallest_mid) {
1041            smallest_mid = mid;
1042        }
1043    }
1044
1045    bool can_evict = true;
1046    std::list<Checkpoint*>::reverse_iterator it = checkpointList.rbegin();
1047    for (; it != checkpointList.rend(); ++it) {
1048        uint64_t mid = (*it)->getMutationIdForKey(key);
1049        if (mid == 0) { // key doesn't exist in a checkpoint.
1050            continue;
1051        }
1052        if (smallest_mid < mid) { // The slowest cursor is still
1053            can_evict = false;    //sitting behind a given key.
1054            break;
1055        }
1056    }
1057
1058    return can_evict;
1059}
1060
1061size_t CheckpointManager::getNumItemsForTAPConnection(
1062                                                     const std::string &name) {
1063    LockHolder lh(queueLock);
1064    size_t remains = 0;
1065    cursor_index::iterator it = tapCursors.find(name);
1066    if (it != tapCursors.end()) {
1067        remains = (numItems >= it->second.offset) ?
1068                   numItems - it->second.offset : 0;
1069    }
1070    return remains;
1071}
1072
1073size_t CheckpointManager::getNumItemsForPersistence_UNLOCKED() {
1074    size_t num_items = numItems;
1075    size_t offset = persistenceCursor.offset;
1076
1077    // Get the number of meta items that can be skipped by the persistence
1078    // cursor.
1079    size_t meta_items = 0;
1080    std::list<Checkpoint*>::iterator curr_chk =
1081                                           persistenceCursor.currentCheckpoint;
1082    for (; curr_chk != checkpointList.end(); ++curr_chk) {
1083        if (curr_chk == persistenceCursor.currentCheckpoint) {
1084            std::list<queued_item>::iterator curr_pos =
1085                                                  persistenceCursor.currentPos;
1086            ++curr_pos;
1087            if (curr_pos == (*curr_chk)->end()) {
1088                continue;
1089            }
1090            if ((*curr_pos)->getOperation() == queue_op_checkpoint_start) {
1091                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1092                    meta_items += 2;
1093                } else {
1094                    ++meta_items;
1095                }
1096            } else {
1097                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1098                    ++meta_items;
1099                }
1100            }
1101        } else {
1102            if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1103                meta_items += 2;
1104            } else {
1105                ++meta_items;
1106            }
1107        }
1108    }
1109
1110    offset += meta_items;
1111    return num_items > offset ? num_items - offset : 0;
1112}
1113
1114void CheckpointManager::decrTapCursorFromCheckpointEnd(
1115                                                    const std::string &name) {
1116    LockHolder lh(queueLock);
1117    cursor_index::iterator it = tapCursors.find(name);
1118    if (it != tapCursors.end() &&
1119        (*(it->second.currentPos))->getOperation() ==
1120        queue_op_checkpoint_end) {
1121        decrCursorPos_UNLOCKED(it->second);
1122    }
1123}
1124
1125uint64_t CheckpointManager::getMutationIdForKey(uint64_t chk_id,
1126                                                std::string key) {
1127    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
1128    for (; itr != checkpointList.end(); ++itr) {
1129        if (chk_id == (*itr)->getId()) {
1130            return (*itr)->getMutationIdForKey(key);
1131        }
1132    }
1133    return 0;
1134}
1135
1136bool CheckpointManager::isLastMutationItemInCheckpoint(
1137                                                   CheckpointCursor &cursor) {
1138    std::list<queued_item>::iterator it = cursor.currentPos;
1139    ++it;
1140    if (it == (*(cursor.currentCheckpoint))->end() ||
1141        (*it)->getOperation() == queue_op_checkpoint_end) {
1142        return true;
1143    }
1144    return false;
1145}
1146
1147void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
1148                                               const RCPtr<VBucket> &vbucket) {
1149    LockHolder lh(queueLock);
1150
1151    // Ignore CHECKPOINT_START message with ID 0 as 0 is reserved for
1152    // representing backfill.
1153    if (id == 0) {
1154        return;
1155    }
1156    // If the replica receives a checkpoint start message right after backfill
1157    // completion, simply set the current open checkpoint id to the one
1158    // received from the active vbucket.
1159    if (checkpointList.back()->getId() == 0) {
1160        setOpenCheckpointId_UNLOCKED(id);
1161        resetCursors(false);
1162        return;
1163    }
1164
1165    std::list<Checkpoint*>::iterator it = checkpointList.begin();
1166    // Check if a checkpoint exists with ID >= id.
1167    while (it != checkpointList.end()) {
1168        if (id <= (*it)->getId()) {
1169            break;
1170        }
1171        ++it;
1172    }
1173
1174    if (it == checkpointList.end()) {
1175        if ((checkpointList.back()->getId() + 1) < id) {
1176            isCollapsedCheckpoint = true;
1177            uint64_t oid = getOpenCheckpointId_UNLOCKED();
1178            lastClosedCheckpointId = oid > 0 ? (oid - 1) : 0;
1179        } else if ((checkpointList.back()->getId() + 1) == id) {
1180            isCollapsedCheckpoint = false;
1181        }
1182        if (checkpointList.back()->getState() == CHECKPOINT_OPEN &&
1183            checkpointList.back()->getNumItems() == 0) {
1184            // If the current open checkpoint doesn't have any items, simply
1185            // set its id to
1186            // the one from the master node.
1187            setOpenCheckpointId_UNLOCKED(id);
1188            // Reposition all the cursors in the open checkpoint to the
1189            // begining position so that a checkpoint_start message can be
1190            // sent again with the correct id.
1191            const std::set<std::string> &cursors = checkpointList.back()->
1192                                                   getCursorNameList();
1193            std::set<std::string>::const_iterator cit = cursors.begin();
1194            for (; cit != cursors.end(); ++cit) {
1195                if ((*cit).compare(persistenceCursor.name) == 0) {
1196                    // Persistence cursor
1197                    continue;
1198                } else { // TAP cursors
1199                    cursor_index::iterator mit = tapCursors.find(*cit);
1200                    mit->second.currentPos = checkpointList.back()->begin();
1201                }
1202            }
1203        } else {
1204            closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
1205            addNewCheckpoint_UNLOCKED(id);
1206        }
1207    } else {
1208        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
1209        collapseCheckpoints(id);
1210        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
1211        if (curr_remains > new_remains) {
1212            size_t diff = curr_remains - new_remains;
1213            stats.decrDiskQueueSize(diff);
1214            vbucket->decrDirtyQueueSize(diff);
1215        } else if (curr_remains < new_remains) {
1216            size_t diff = new_remains - curr_remains;
1217            stats.diskQueueSize.fetch_add(diff);
1218            vbucket->dirtyQueueSize.fetch_add(diff);
1219        }
1220    }
1221}
1222
1223void CheckpointManager::collapseCheckpoints(uint64_t id) {
1224    cb_assert(!checkpointList.empty());
1225
1226    std::map<std::string, std::pair<uint64_t, bool> > cursorMap;
1227    cursor_index::iterator itr;
1228    for (itr = tapCursors.begin(); itr != tapCursors.end(); itr++) {
1229        Checkpoint* chk = *(itr->second.currentCheckpoint);
1230        const std::string& key = (*(itr->second.currentPos))->getKey();
1231        bool cursor_on_chk_start = false;
1232        if ((*(itr->second.currentPos))->getOperation() == queue_op_checkpoint_start) {
1233            cursor_on_chk_start = true;
1234        }
1235        cursorMap[itr->first.c_str()] =
1236            std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1237    }
1238
1239    Checkpoint* chk = *(persistenceCursor.currentCheckpoint);
1240    std::string key = (*(persistenceCursor.currentPos))->getKey();
1241    bool cursor_on_chk_start = false;
1242    if ((*(persistenceCursor.currentPos))->getOperation() == queue_op_checkpoint_start) {
1243        cursor_on_chk_start = true;
1244    }
1245    cursorMap[persistenceCursor.name.c_str()] =
1246        std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1247
1248    std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
1249    ++rit; // Move to the last closed checkpoint.
1250    size_t numDuplicatedItems = 0, numMetaItems = 0;
1251    // Collapse all checkpoints.
1252    for (; rit != checkpointList.rend(); ++rit) {
1253        size_t numAddedItems = checkpointList.back()->
1254                               mergePrevCheckpoint(*rit);
1255        numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
1256        numMetaItems += 2; // checkpoint start and end meta items
1257        delete *rit;
1258    }
1259    numItems.fetch_sub(numDuplicatedItems + numMetaItems);
1260
1261    if (checkpointList.size() > 1) {
1262        checkpointList.erase(checkpointList.begin(), --checkpointList.end());
1263    }
1264    cb_assert(checkpointList.size() == 1);
1265
1266    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
1267        checkpointList.back()->popBackCheckpointEndItem();
1268        --numItems;
1269        checkpointList.back()->setState(CHECKPOINT_OPEN);
1270    }
1271    setOpenCheckpointId_UNLOCKED(id);
1272    putCursorsInCollapsedChk(cursorMap, checkpointList.begin());
1273}
1274
1275void CheckpointManager::
1276putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &cursors,
1277                         std::list<Checkpoint*>::iterator chkItr) {
1278    size_t i;
1279    Checkpoint *chk = *chkItr;
1280    std::list<queued_item>::iterator cit = chk->begin();
1281    std::list<queued_item>::iterator last = chk->begin();
1282    for (i = 0; cit != chk->end(); ++i, ++cit) {
1283        uint64_t id = chk->getMutationIdForKey((*cit)->getKey());
1284        std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1285        while (mit != cursors.end()) {
1286            std::pair<uint64_t, bool> val = mit->second;
1287            if (val.first < id || (val.first == id && val.second &&
1288                                   (*last)->getOperation() == queue_op_checkpoint_start)) {
1289                if (mit->first.compare(persistenceCursor.name) == 0) {
1290                    persistenceCursor.currentCheckpoint = chkItr;
1291                    persistenceCursor.currentPos = last;
1292                    persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1293                    chk->registerCursorName(persistenceCursor.name);
1294                } else {
1295                    cursor_index::iterator cc = tapCursors.find(mit->first);
1296                    if (cc == tapCursors.end() ||
1297                        cc->second.fromBeginningOnChkCollapse) {
1298                        ++mit;
1299                        continue;
1300                    }
1301                    cc->second.currentCheckpoint = chkItr;
1302                    cc->second.currentPos = last;
1303                    cc->second.offset = (i > 0) ? i - 1 : 0;
1304                    chk->registerCursorName(cc->second.name);
1305                }
1306                cursors.erase(mit++);
1307            } else {
1308                ++mit;
1309            }
1310        }
1311        last = cit;
1312    }
1313
1314    std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1315    for (; mit != cursors.end(); ++mit) {
1316        if (mit->first.compare(persistenceCursor.name) == 0) {
1317            persistenceCursor.currentCheckpoint = chkItr;
1318            persistenceCursor.currentPos = last;
1319            persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1320            chk->registerCursorName(persistenceCursor.name);
1321        } else {
1322            cursor_index::iterator cc = tapCursors.find(mit->first);
1323            if (cc == tapCursors.end()) {
1324                continue;
1325            }
1326            cc->second.currentCheckpoint = chkItr;
1327            if (cc->second.fromBeginningOnChkCollapse) {
1328                cc->second.currentPos = chk->begin();
1329                cc->second.offset = 0;
1330            } else {
1331                cc->second.currentPos = last;
1332                cc->second.offset = (i > 0) ? i - 1 : 0;
1333            }
1334            chk->registerCursorName(cc->second.name);
1335        }
1336    }
1337}
1338
1339bool CheckpointManager::hasNext(const std::string &name) {
1340    LockHolder lh(queueLock);
1341    cursor_index::iterator it = tapCursors.find(name);
1342    if (it == tapCursors.end() || getOpenCheckpointId_UNLOCKED() == 0) {
1343        return false;
1344    }
1345
1346    bool hasMore = true;
1347    std::list<queued_item>::iterator curr = it->second.currentPos;
1348    ++curr;
1349    if (curr == (*(it->second.currentCheckpoint))->end() &&
1350        (*(it->second.currentCheckpoint)) == checkpointList.back()) {
1351        hasMore = false;
1352    }
1353    return hasMore;
1354}
1355
1356queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
1357                                          enum queue_operation checkpoint_op) {
1358    cb_assert(checkpoint_op == queue_op_checkpoint_start ||
1359           checkpoint_op == queue_op_checkpoint_end ||
1360           checkpoint_op == queue_op_empty);
1361
1362    uint64_t bySeqno;
1363    std::stringstream key;
1364    if (checkpoint_op == queue_op_checkpoint_start) {
1365        key << "checkpoint_start";
1366        bySeqno = lastBySeqNo + 1;
1367    } else if (checkpoint_op == queue_op_empty) {
1368        key << "dummy_key";
1369        bySeqno = lastBySeqNo;
1370    } else {
1371        key << "checkpoint_end";
1372        bySeqno = lastBySeqNo + 1;
1373    }
1374    queued_item qi(new Item(key.str(), vbid, checkpoint_op, id, bySeqno));
1375    return qi;
1376}
1377
1378bool CheckpointManager::hasNextForPersistence() {
1379    LockHolder lh(queueLock);
1380    bool hasMore = true;
1381    std::list<queued_item>::iterator curr = persistenceCursor.currentPos;
1382    ++curr;
1383    if (curr == (*(persistenceCursor.currentCheckpoint))->end() &&
1384        (*(persistenceCursor.currentCheckpoint)) == checkpointList.back()) {
1385        hasMore = false;
1386    }
1387    return hasMore;
1388}
1389
1390uint64_t CheckpointManager::createNewCheckpoint() {
1391    LockHolder lh(queueLock);
1392    if (checkpointList.back()->getNumItems() > 0) {
1393        uint64_t chk_id = checkpointList.back()->getId();
1394        closeOpenCheckpoint_UNLOCKED(chk_id);
1395        addNewCheckpoint_UNLOCKED(chk_id + 1);
1396    }
1397    return checkpointList.back()->getId();
1398}
1399
1400void CheckpointManager::decrCursorOffset_UNLOCKED(CheckpointCursor &cursor,
1401                                                  size_t decr) {
1402    if (cursor.offset >= decr) {
1403        cursor.offset.fetch_sub(decr);
1404    } else {
1405        cursor.offset = 0;
1406        LOG(EXTENSION_LOG_INFO,
1407            "%s cursor offset is negative. Reset it to 0.",
1408            cursor.name.c_str());
1409    }
1410}
1411
1412void CheckpointManager::decrCursorPos_UNLOCKED(CheckpointCursor &cursor) {
1413    if (cursor.currentPos != (*(cursor.currentCheckpoint))->begin()) {
1414        --(cursor.currentPos);
1415    }
1416}
1417
1418uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
1419    LockHolder lh(queueLock);
1420    return pCursorPreCheckpointId;
1421}
1422
1423uint64_t CheckpointManager::getPersistenceCursorSeqno() {
1424    LockHolder lh(queueLock);
1425    return pCursorSeqno;
1426}
1427
1428void CheckpointConfig::addConfigChangeListener(
1429                                         EventuallyPersistentEngine &engine) {
1430    Configuration &configuration = engine.getConfiguration();
1431    configuration.addValueChangedListener("chk_period",
1432             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1433    configuration.addValueChangedListener("chk_max_items",
1434             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1435    configuration.addValueChangedListener("max_checkpoints",
1436             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1437    configuration.addValueChangedListener("item_num_based_new_chk",
1438             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1439    configuration.addValueChangedListener("keep_closed_chks",
1440             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1441}
1442
1443CheckpointConfig::CheckpointConfig(EventuallyPersistentEngine &e) {
1444    Configuration &config = e.getConfiguration();
1445    checkpointPeriod = config.getChkPeriod();
1446    checkpointMaxItems = config.getChkMaxItems();
1447    maxCheckpoints = config.getMaxCheckpoints();
1448    itemNumBasedNewCheckpoint = config.isItemNumBasedNewChk();
1449    keepClosedCheckpoints = config.isKeepClosedChks();
1450}
1451
1452bool CheckpointConfig::validateCheckpointMaxItemsParam(size_t
1453                                                       checkpoint_max_items) {
1454    if (checkpoint_max_items < MIN_CHECKPOINT_ITEMS ||
1455        checkpoint_max_items > MAX_CHECKPOINT_ITEMS) {
1456        std::stringstream ss;
1457        ss << "New checkpoint_max_items param value " << checkpoint_max_items
1458           << " is not ranged between the min allowed value " <<
1459           MIN_CHECKPOINT_ITEMS
1460           << " and max value " << MAX_CHECKPOINT_ITEMS;
1461        LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
1462        return false;
1463    }
1464    return true;
1465}
1466
1467bool CheckpointConfig::validateCheckpointPeriodParam(
1468                                                   size_t checkpoint_period) {
1469    if (checkpoint_period < MIN_CHECKPOINT_PERIOD ||
1470        checkpoint_period > MAX_CHECKPOINT_PERIOD) {
1471        std::stringstream ss;
1472        ss << "New checkpoint_period param value " << checkpoint_period
1473           << " is not ranged between the min allowed value " <<
1474              MIN_CHECKPOINT_PERIOD
1475           << " and max value " << MAX_CHECKPOINT_PERIOD;
1476        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1477        return false;
1478    }
1479    return true;
1480}
1481
1482bool CheckpointConfig::validateMaxCheckpointsParam(size_t max_checkpoints) {
1483    if (max_checkpoints < DEFAULT_MAX_CHECKPOINTS ||
1484        max_checkpoints > MAX_CHECKPOINTS_UPPER_BOUND) {
1485        std::stringstream ss;
1486        ss << "New max_checkpoints param value " << max_checkpoints
1487           << " is not ranged between the min allowed value " <<
1488              DEFAULT_MAX_CHECKPOINTS
1489           << " and max value " << MAX_CHECKPOINTS_UPPER_BOUND;
1490        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1491        return false;
1492    }
1493    return true;
1494}
1495
1496void CheckpointConfig::setCheckpointPeriod(size_t value) {
1497    if (!validateCheckpointPeriodParam(value)) {
1498        value = DEFAULT_CHECKPOINT_PERIOD;
1499    }
1500    checkpointPeriod = static_cast<rel_time_t>(value);
1501}
1502
1503void CheckpointConfig::setCheckpointMaxItems(size_t value) {
1504    if (!validateCheckpointMaxItemsParam(value)) {
1505        value = DEFAULT_CHECKPOINT_ITEMS;
1506    }
1507    checkpointMaxItems = value;
1508}
1509
1510void CheckpointConfig::setMaxCheckpoints(size_t value) {
1511    if (!validateMaxCheckpointsParam(value)) {
1512        value = DEFAULT_MAX_CHECKPOINTS;
1513    }
1514    maxCheckpoints = value;
1515}
1516
1517void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
1518    LockHolder lh(queueLock);
1519    char buf[256];
1520
1521    snprintf(buf, sizeof(buf), "vb_%d:open_checkpoint_id", vbucketId);
1522    add_casted_stat(buf, getOpenCheckpointId_UNLOCKED(), add_stat, cookie);
1523    snprintf(buf, sizeof(buf), "vb_%d:last_closed_checkpoint_id", vbucketId);
1524    add_casted_stat(buf, getLastClosedCheckpointId_UNLOCKED(),
1525    add_stat, cookie);
1526    snprintf(buf, sizeof(buf), "vb_%d:persistence_seqno", vbucketId);
1527    add_casted_stat(buf, pCursorSeqno, add_stat, cookie);
1528    snprintf(buf, sizeof(buf), "vb_%d:num_tap_cursors", vbucketId);
1529    add_casted_stat(buf, tapCursors.size(), add_stat, cookie);
1530    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoint_items", vbucketId);
1531    add_casted_stat(buf, numItems, add_stat, cookie);
1532    snprintf(buf, sizeof(buf), "vb_%d:num_open_checkpoint_items", vbucketId);
1533    add_casted_stat(buf, checkpointList.empty() ? 0 :
1534                    checkpointList.back()->getNumItems(),
1535                    add_stat, cookie);
1536    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoints", vbucketId);
1537    add_casted_stat(buf, checkpointList.size(), add_stat, cookie);
1538    snprintf(buf, sizeof(buf), "vb_%d:num_items_for_persistence", vbucketId);
1539    add_casted_stat(buf, getNumItemsForPersistence_UNLOCKED(),
1540                    add_stat, cookie);
1541
1542    cursor_index::iterator tap_it = tapCursors.begin();
1543    for (; tap_it != tapCursors.end(); ++tap_it) {
1544        snprintf(buf, sizeof(buf),
1545                 "vb_%d:%s:cursor_checkpoint_id", vbucketId,
1546                 tap_it->first.c_str());
1547        add_casted_stat(buf, (*(tap_it->second.currentCheckpoint))->getId(),
1548                        add_stat, cookie);
1549    }
1550}
1551