xref: /3.0.3-GA/ep-engine/src/checkpoint.cc (revision 4664d274)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <string>
21#include <utility>
22#include <vector>
23
24#include "checkpoint.h"
25#include "ep_engine.h"
26#define STATWRITER_NAMESPACE checkpoint
27#include "statwriter.h"
28#undef STATWRITER_NAMESPACE
29#include "vbucket.h"
30
31/**
32 * A listener class to update checkpoint related configs at runtime.
33 */
34class CheckpointConfigChangeListener : public ValueChangedListener {
35public:
36    CheckpointConfigChangeListener(CheckpointConfig &c) : config(c) { }
37    virtual ~CheckpointConfigChangeListener() { }
38
39    virtual void sizeValueChanged(const std::string &key, size_t value) {
40        if (key.compare("chk_period") == 0) {
41            config.setCheckpointPeriod(value);
42        } else if (key.compare("chk_max_items") == 0) {
43            config.setCheckpointMaxItems(value);
44        } else if (key.compare("max_checkpoints") == 0) {
45            config.setMaxCheckpoints(value);
46        }
47    }
48
49    virtual void booleanValueChanged(const std::string &key, bool value) {
50        if (key.compare("item_num_based_new_chk") == 0) {
51            config.allowItemNumBasedNewCheckpoint(value);
52        } else if (key.compare("keep_closed_chks") == 0) {
53            config.allowKeepClosedCheckpoints(value);
54        }
55    }
56
57private:
58    CheckpointConfig &config;
59};
60
61Checkpoint::~Checkpoint() {
62    LOG(EXTENSION_LOG_INFO,
63        "Checkpoint %llu for vbucket %d is purged from memory",
64        checkpointId, vbucketId);
65    stats.memOverhead.fetch_sub(memorySize());
66    cb_assert(stats.memOverhead.load() < GIGANTOR);
67}
68
69void Checkpoint::setState(checkpoint_state state) {
70    checkpointState = state;
71}
72
73void Checkpoint::popBackCheckpointEndItem() {
74    if (!toWrite.empty() &&
75        toWrite.back()->getOperation() == queue_op_checkpoint_end) {
76        keyIndex.erase(toWrite.back()->getKey());
77        toWrite.pop_back();
78    }
79}
80
81bool Checkpoint::keyExists(const std::string &key) {
82    return keyIndex.find(key) != keyIndex.end();
83}
84
85queue_dirty_t Checkpoint::queueDirty(const queued_item &qi,
86                                     CheckpointManager *checkpointManager) {
87    assert (checkpointState == CHECKPOINT_OPEN);
88    queue_dirty_t rv;
89
90    checkpoint_index::iterator it = keyIndex.find(qi->getKey());
91    // Check if this checkpoint already had an item for the same key.
92    if (it != keyIndex.end()) {
93        rv = EXISTING_ITEM;
94        std::list<queued_item>::iterator currPos = it->second.position;
95        uint64_t currMutationId = it->second.mutation_id;
96        CheckpointCursor &pcursor = checkpointManager->persistenceCursor;
97        queued_item &pqi = *(pcursor.currentPos);
98
99        if (*(pcursor.currentCheckpoint) == this) {
100            // If the existing item is in the left-hand side of the item
101            // pointed by the persistence cursor, decrease the persistence
102            // cursor's offset by 1.
103            const std::string &key = pqi->getKey();
104            checkpoint_index::iterator ita = keyIndex.find(key);
105            if (ita != keyIndex.end()) {
106                uint64_t mutationId = ita->second.mutation_id;
107                if (currMutationId <= mutationId &&
108                    pqi->getOperation() != queue_op_checkpoint_start) {
109                    checkpointManager->decrCursorOffset_UNLOCKED(pcursor, 1);
110                    rv = PERSIST_AGAIN;
111                }
112            }
113            // If the persistence cursor points to the existing item for the
114            // same key,
115            // shift the cursor left by 1.
116            if (pcursor.currentPos == currPos) {
117                checkpointManager->decrCursorPos_UNLOCKED(pcursor);
118            }
119        }
120
121        cursor_index::iterator map_it = checkpointManager->tapCursors.begin();
122        for (; map_it != checkpointManager->tapCursors.end(); ++map_it) {
123
124            if (*(map_it->second.currentCheckpoint) == this) {
125                queued_item &tqi = *(map_it->second.currentPos);
126                const std::string &key = tqi->getKey();
127                checkpoint_index::iterator ita = keyIndex.find(key);
128                if (ita != keyIndex.end()) {
129                    uint64_t mutationId = ita->second.mutation_id;
130                    if (currMutationId <= mutationId &&
131                        tqi->getOperation() != queue_op_checkpoint_start) {
132                        checkpointManager->
133                                  decrCursorOffset_UNLOCKED(map_it->second, 1);
134                    }
135                }
136                // If an TAP cursor points to the existing item for the same
137                // key, shift it left by 1
138                if (map_it->second.currentPos == currPos) {
139                    checkpointManager->decrCursorPos_UNLOCKED(map_it->second);
140                }
141            }
142        }
143
144        toWrite.push_back(qi);
145        // Remove the existing item for the same key from the list.
146        toWrite.erase(currPos);
147    } else {
148        if (qi->getOperation() == queue_op_set ||
149            qi->getOperation() == queue_op_del) {
150            ++numItems;
151        }
152        rv = NEW_ITEM;
153        // Push the new item into the list
154        toWrite.push_back(qi);
155    }
156
157    if (qi->getNKey() > 0) {
158        std::list<queued_item>::iterator last = toWrite.end();
159        // --last is okay as the list is not empty now.
160        index_entry entry = {--last, qi->getBySeqno()};
161        // Set the index of the key to the new item that is pushed back into
162        // the list.
163        keyIndex[qi->getKey()] = entry;
164        if (rv == NEW_ITEM) {
165            size_t newEntrySize = qi->getNKey() + sizeof(index_entry) +
166                                  sizeof(queued_item);
167            memOverhead += newEntrySize;
168            stats.memOverhead.fetch_add(newEntrySize);
169            cb_assert(stats.memOverhead.load() < GIGANTOR);
170        }
171    }
172    return rv;
173}
174
175size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
176    size_t numNewItems = 0;
177    size_t newEntryMemOverhead = 0;
178    std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
179
180    LOG(EXTENSION_LOG_INFO,
181        "Collapse the checkpoint %llu into the checkpoint %llu for vbucket %d",
182        pPrevCheckpoint->getId(), checkpointId, vbucketId);
183
184    std::list<queued_item>::iterator itr = toWrite.begin();
185    uint64_t seqno = pPrevCheckpoint->getMutationIdForKey("dummy_key");
186    keyIndex["dummy_key"].mutation_id = seqno;
187    (*itr)->setBySeqno(seqno);
188
189    seqno = pPrevCheckpoint->getMutationIdForKey("checkpoint_start");
190    keyIndex["checkpoint_start"].mutation_id = seqno;
191    ++itr;
192    (*itr)->setBySeqno(seqno);
193
194    for (; rit != pPrevCheckpoint->rend(); ++rit) {
195        const std::string &key = (*rit)->getKey();
196        if ((*rit)->getOperation() != queue_op_del &&
197            (*rit)->getOperation() != queue_op_set) {
198            continue;
199        }
200        checkpoint_index::iterator it = keyIndex.find(key);
201        if (it == keyIndex.end()) {
202            std::list<queued_item>::iterator pos = toWrite.begin();
203            // Skip the first two meta items
204            ++pos; ++pos;
205            toWrite.insert(pos, *rit);
206            index_entry entry = {--pos, static_cast<int64_t>(pPrevCheckpoint->
207                                                    getMutationIdForKey(key))};
208            keyIndex[key] = entry;
209            newEntryMemOverhead += key.size() + sizeof(index_entry);
210            ++numItems;
211            ++numNewItems;
212        }
213    }
214    memOverhead += newEntryMemOverhead;
215    stats.memOverhead.fetch_add(newEntryMemOverhead);
216    cb_assert(stats.memOverhead.load() < GIGANTOR);
217    return numNewItems;
218}
219
220uint64_t Checkpoint::getMutationIdForKey(const std::string &key) {
221    uint64_t mid = 0;
222    checkpoint_index::iterator it = keyIndex.find(key);
223    if (it != keyIndex.end()) {
224        mid = it->second.mutation_id;
225    }
226    return mid;
227}
228
229CheckpointManager::~CheckpointManager() {
230    LockHolder lh(queueLock);
231    std::list<Checkpoint*>::iterator it = checkpointList.begin();
232    while(it != checkpointList.end()) {
233        delete *it;
234        ++it;
235    }
236}
237
238uint64_t CheckpointManager::getOpenCheckpointId_UNLOCKED() {
239    if (checkpointList.empty()) {
240        return 0;
241    }
242
243    uint64_t id = checkpointList.back()->getId();
244    return checkpointList.back()->getState() == CHECKPOINT_OPEN ? id : id + 1;
245}
246
247uint64_t CheckpointManager::getOpenCheckpointId() {
248    LockHolder lh(queueLock);
249    return getOpenCheckpointId_UNLOCKED();
250}
251
252uint64_t CheckpointManager::getLastClosedCheckpointId_UNLOCKED() {
253    if (!isCollapsedCheckpoint) {
254        uint64_t id = getOpenCheckpointId_UNLOCKED();
255        lastClosedCheckpointId = id > 0 ? (id - 1) : 0;
256    }
257    return lastClosedCheckpointId;
258}
259
260uint64_t CheckpointManager::getLastClosedCheckpointId() {
261    LockHolder lh(queueLock);
262    return getLastClosedCheckpointId_UNLOCKED();
263}
264
265void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
266    if (!checkpointList.empty()) {
267        // Update the checkpoint_start item with the new Id.
268        std::list<queued_item>::iterator it =
269            ++(checkpointList.back()->begin());
270        (*it)->setRevSeqno(id);
271        if (checkpointList.back()->getId() == 0) {
272            (*it)->setBySeqno(lastBySeqNo + 1);
273        }
274        checkpointList.back()->setId(id);
275        LOG(EXTENSION_LOG_INFO, "Set the current open checkpoint id to %llu "
276            "for vbucket %d, bySeqno is %llu, max is %llu", id, vbucketId,
277            (*it)->getBySeqno(), lastBySeqNo);
278
279    }
280}
281
282bool CheckpointManager::addNewCheckpoint_UNLOCKED(uint64_t id) {
283    // This is just for making sure that the current checkpoint should be
284    // closed.
285    if (!checkpointList.empty() &&
286        checkpointList.back()->getState() == CHECKPOINT_OPEN) {
287        closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
288    }
289
290    LOG(EXTENSION_LOG_INFO, "Create a new open checkpoint %llu for vbucket %d",
291        id, vbucketId);
292
293    Checkpoint *checkpoint = new Checkpoint(stats, id, vbucketId,
294                                            CHECKPOINT_OPEN);
295    // Add a dummy item into the new checkpoint, so that any cursor referring
296    // to the actual first
297    // item in this new checkpoint can be safely shifted left by 1 if the
298    // first item is removed
299    // and pushed into the tail.
300    queued_item qi = createCheckpointItem(0, 0xffff, queue_op_empty);
301    checkpoint->queueDirty(qi, this);
302
303    // This item represents the start of the new checkpoint and is also sent to the slave node.
304    qi = createCheckpointItem(id, vbucketId, queue_op_checkpoint_start);
305    checkpoint->queueDirty(qi, this);
306    ++numItems;
307    checkpointList.push_back(checkpoint);
308
309    return true;
310}
311
312bool CheckpointManager::addNewCheckpoint(uint64_t id) {
313    LockHolder lh(queueLock);
314    return addNewCheckpoint_UNLOCKED(id);
315}
316
317bool CheckpointManager::closeOpenCheckpoint_UNLOCKED(uint64_t id) {
318    if (checkpointList.empty()) {
319        return false;
320    }
321    if (id != checkpointList.back()->getId() ||
322        checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
323        return true;
324    }
325
326    LOG(EXTENSION_LOG_INFO, "Close the open checkpoint %llu for vbucket %d",
327        id, vbucketId);
328
329    // This item represents the end of the current open checkpoint and is sent to the slave node.
330    queued_item qi = createCheckpointItem(id, vbucketId,
331                                          queue_op_checkpoint_end);
332    checkpointList.back()->queueDirty(qi, this);
333    ++numItems;
334    checkpointList.back()->setState(CHECKPOINT_CLOSED);
335    return true;
336}
337
338bool CheckpointManager::closeOpenCheckpoint(uint64_t id) {
339    LockHolder lh(queueLock);
340    return closeOpenCheckpoint_UNLOCKED(id);
341}
342
343void CheckpointManager::registerPersistenceCursor() {
344    LockHolder lh(queueLock);
345    cb_assert(!checkpointList.empty());
346    persistenceCursor.currentCheckpoint = checkpointList.begin();
347    persistenceCursor.currentPos = checkpointList.front()->begin();
348    checkpointList.front()->registerCursorName(persistenceCursor.name);
349}
350
351bool CheckpointManager::registerTAPCursor(const std::string &name,
352                                          uint64_t checkpointId,
353                                          bool alwaysFromBeginning) {
354    LockHolder lh(queueLock);
355    return registerTAPCursor_UNLOCKED(name,
356                                      checkpointId,
357                                      alwaysFromBeginning);
358}
359
360uint64_t CheckpointManager::registerTAPCursorBySeqno(const std::string &name,
361                                                     uint64_t startBySeqno,
362                                                     uint64_t endBySeqno) {
363    LockHolder lh(queueLock);
364    cb_assert(!checkpointList.empty());
365    cb_assert(checkpointList.back()->getHighSeqno() >= startBySeqno);
366
367    removeTAPCursor_UNLOCKED(name);
368
369    size_t skipped = 0;
370    uint64_t seqnoToStart = std::numeric_limits<uint64_t>::max();
371    bool needToFindStartSeqno =
372        startBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
373    bool needToFindEndSeqno =
374        endBySeqno < std::numeric_limits<uint64_t>::max() ? true : false;
375
376    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
377    for (; itr != checkpointList.end(); ++itr) {
378        uint64_t en = (*itr)->getHighSeqno();
379        uint64_t st = (*itr)->getLowSeqno();
380        if (needToFindStartSeqno) {
381            if (startBySeqno < st) {
382                tapCursors[name] = CheckpointCursor(name, itr, (*itr)->begin(),
383                                                    skipped, false);
384                (*itr)->registerCursorName(name);
385                seqnoToStart = (*itr)->getLowSeqno();
386                needToFindStartSeqno = false;
387            } else if (startBySeqno == st) {
388                std::list<queued_item>::iterator iitr = (*itr)->begin();
389
390                //Advance the iterator to point to the first item in the
391                //checkpoint list
392                std::advance(iitr, 2);
393                skipped = 2;
394
395                tapCursors[name] = CheckpointCursor(name, itr, iitr, skipped,
396                                                    false);
397                (*itr)->registerCursorName(name);
398                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno()) + 1;
399                needToFindStartSeqno = false;
400            } else if (startBySeqno <= en) {
401                std::list<queued_item>::iterator iitr = (*itr)->begin();
402                while (++iitr != (*itr)->end() &&
403                       startBySeqno > static_cast<uint64_t>((*iitr)->getBySeqno())) {
404                    skipped++;
405                }
406
407                if (iitr == (*itr)->end() ||
408                    startBySeqno < static_cast<uint64_t>((*iitr)->getBySeqno())) {
409                    --iitr;
410                }
411
412                size_t remaining = (numItems > skipped) ? numItems - skipped : 0;
413                tapCursors[name] = CheckpointCursor(name, itr, iitr, remaining,
414                                                    false);
415                (*itr)->registerCursorName(name);
416                seqnoToStart = static_cast<uint64_t>((*iitr)->getBySeqno());
417                needToFindStartSeqno = false;
418            } else {
419                skipped += (*itr)->getNumItems() + 2;
420            }
421        }
422        if (needToFindEndSeqno) {
423            if ((*itr)->getNumItems() > 0 && endBySeqno <= en) {
424                if ((*itr)->getState() == CHECKPOINT_OPEN) {
425                    // Closed the open checkpoint and create a new one by force.
426                    checkOpenCheckpoint_UNLOCKED(true, true);
427                }
428                needToFindEndSeqno = false;
429            }
430        }
431        if (!needToFindStartSeqno && !needToFindEndSeqno) {
432            break;
433        }
434    }
435
436    if (seqnoToStart == std::numeric_limits<uint64_t>::max()) {
437        /*
438         * We should never get here since this would mean that the sequence number
439         * we are looking for is higher than anything currently assigned and there
440         * is already an assert above for this case.
441         */
442        LOG(EXTENSION_LOG_WARNING, "Cursor not registered into vb %d "
443            " for stream '%s' because seqno %llu is too high",
444            vbucketId, name.c_str(), startBySeqno);
445    }
446    return seqnoToStart;
447}
448
449bool CheckpointManager::registerTAPCursor_UNLOCKED(const std::string &name,
450                                                   uint64_t checkpointId,
451                                                   bool alwaysFromBeginning) {
452    cb_assert(!checkpointList.empty());
453
454    bool found = false;
455    std::list<Checkpoint*>::iterator it = checkpointList.begin();
456    for (; it != checkpointList.end(); ++it) {
457        if (checkpointId == (*it)->getId()) {
458            found = true;
459            break;
460        }
461    }
462
463    LOG(EXTENSION_LOG_INFO,
464        "Register the tap cursor with the name \"%s\" for vbucket %d",
465        name.c_str(), vbucketId);
466
467    // If the tap cursor exists, remove its name from the checkpoint that is
468    // currently referenced by the tap cursor.
469    cursor_index::iterator map_it = tapCursors.find(name);
470    if (map_it != tapCursors.end()) {
471        (*(map_it->second.currentCheckpoint))->removeCursorName(name);
472    }
473
474    if (!found) {
475        for (it = checkpointList.begin(); it != checkpointList.end(); ++it) {
476            if (pCursorPreCheckpointId < (*it)->getId() ||
477                pCursorPreCheckpointId == 0) {
478                break;
479            }
480        }
481
482        LOG(EXTENSION_LOG_DEBUG,
483            "Checkpoint %llu for vbucket %d doesn't exist in memory. "
484            "Set the cursor with the name \"%s\" to checkpoint %d.\n",
485            checkpointId, vbucketId, name.c_str(), (*it)->getId());
486
487        cb_assert(it != checkpointList.end());
488
489        size_t offset = 0;
490        std::list<Checkpoint*>::iterator pos = checkpointList.begin();
491        for (; pos != it; ++pos) {
492            offset += (*pos)->getNumItems() + 2;
493        }
494
495        tapCursors[name] = CheckpointCursor(name, it, (*it)->begin(), offset,
496                                            true);
497        (*it)->registerCursorName(name);
498    } else {
499        size_t offset = 0;
500        std::list<queued_item>::iterator curr;
501
502        LOG(EXTENSION_LOG_DEBUG,
503            "Checkpoint %llu for vbucket %d exists in memory. "
504            "Set the cursor with the name \"%s\" to the checkpoint %llu\n",
505            checkpointId, vbucketId, name.c_str(), checkpointId);
506
507        if (!alwaysFromBeginning &&
508            map_it != tapCursors.end() &&
509            (*(map_it->second.currentCheckpoint))->getId() == (*it)->getId()) {
510            // If the cursor is currently in the checkpoint to start with,
511            // simply start from
512            // its current position.
513            curr = map_it->second.currentPos;
514            offset = map_it->second.offset;
515        } else {
516            // Set the cursor's position to the begining of the checkpoint to
517            // start with
518            curr = (*it)->begin();
519            std::list<Checkpoint*>::iterator pos = checkpointList.begin();
520            for (; pos != it; ++pos) {
521                offset += (*pos)->getNumItems() + 2;
522                // 2 is for checkpoint start and end items.
523            }
524        }
525
526        tapCursors[name] = CheckpointCursor(name, it, curr, offset, true);
527        // Register the tap cursor's name to the checkpoint.
528        (*it)->registerCursorName(name);
529    }
530
531    return found;
532}
533
534bool CheckpointManager::removeTAPCursor(const std::string &name) {
535    LockHolder lh(queueLock);
536    return removeTAPCursor_UNLOCKED(name);
537}
538
539bool CheckpointManager::removeTAPCursor_UNLOCKED(const std::string &name) {
540    cursor_index::iterator it = tapCursors.find(name);
541    if (it == tapCursors.end()) {
542        return false;
543    }
544
545    LOG(EXTENSION_LOG_INFO,
546        "Remove the checkpoint cursor with the name \"%s\" from vbucket %d",
547        name.c_str(), vbucketId);
548
549    // We can simply remove the cursor's name from the checkpoint to which it
550    // currently belongs,
551    // by calling
552    // (*(it->second.currentCheckpoint))->removeCursorName(name);
553    // However, we just want to do more sanity checks by looking at each
554    // checkpoint. This won't
555    // cause much overhead because the max number of checkpoints allowed per
556    // vbucket is small.
557    std::list<Checkpoint*>::iterator cit = checkpointList.begin();
558    for (; cit != checkpointList.end(); ++cit) {
559        (*cit)->removeCursorName(name);
560    }
561
562    tapCursors.erase(it);
563    return true;
564}
565
566uint64_t CheckpointManager::getCheckpointIdForTAPCursor(
567                                                     const std::string &name) {
568    LockHolder lh(queueLock);
569    cursor_index::iterator it = tapCursors.find(name);
570    if (it == tapCursors.end()) {
571        return 0;
572    }
573
574    return (*(it->second.currentCheckpoint))->getId();
575}
576
577size_t CheckpointManager::getNumOfTAPCursors() {
578    LockHolder lh(queueLock);
579    return tapCursors.size();
580}
581
582size_t CheckpointManager::getNumCheckpoints() {
583    LockHolder lh(queueLock);
584    return checkpointList.size();
585}
586
587std::list<std::string> CheckpointManager::getTAPCursorNames() {
588    LockHolder lh(queueLock);
589    std::list<std::string> cursor_names;
590    cursor_index::iterator tap_it = tapCursors.begin();
591        for (; tap_it != tapCursors.end(); ++tap_it) {
592        cursor_names.push_back((tap_it->first));
593    }
594    return cursor_names;
595}
596
597bool CheckpointManager::isCheckpointCreationForHighMemUsage(
598                                              const RCPtr<VBucket> &vbucket) {
599    bool forceCreation = false;
600    double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
601    // pesistence and tap cursors are all currently in the open checkpoint?
602    bool allCursorsInOpenCheckpoint =
603        (tapCursors.size() + 1) == checkpointList.back()->getNumberOfCursors();
604
605    if (memoryUsed > stats.mem_high_wat &&
606        allCursorsInOpenCheckpoint &&
607        (checkpointList.back()->getNumItems() >= MIN_CHECKPOINT_ITEMS ||
608         checkpointList.back()->getNumItems() == vbucket->ht.getNumInMemoryItems())) {
609        forceCreation = true;
610    }
611    return forceCreation;
612}
613
614size_t CheckpointManager::removeClosedUnrefCheckpoints(
615                                              const RCPtr<VBucket> &vbucket,
616                                              bool &newOpenCheckpointCreated) {
617
618    // This function is executed periodically by the non-IO dispatcher.
619    LockHolder lh(queueLock);
620    cb_assert(vbucket);
621    uint64_t oldCheckpointId = 0;
622    bool canCreateNewCheckpoint = false;
623    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
624        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
625         checkpointList.front()->getNumberOfCursors() == 0)) {
626        canCreateNewCheckpoint = true;
627    }
628    if (vbucket->getState() == vbucket_state_active &&
629        canCreateNewCheckpoint) {
630
631        bool forceCreation = isCheckpointCreationForHighMemUsage(vbucket);
632        // Check if this master active vbucket needs to create a new open
633        // checkpoint.
634        oldCheckpointId = checkOpenCheckpoint_UNLOCKED(forceCreation, true);
635    }
636    newOpenCheckpointCreated = oldCheckpointId > 0;
637
638    if (checkpointConfig.canKeepClosedCheckpoints()) {
639        double memoryUsed = static_cast<double>(stats.getTotalMemoryUsed());
640        if (memoryUsed < stats.mem_high_wat &&
641            checkpointList.size() <= checkpointConfig.getMaxCheckpoints()) {
642            return 0;
643        }
644    }
645
646    size_t numUnrefItems = 0;
647    size_t numMetaItems = 0;
648    size_t numCheckpointsRemoved = 0;
649    std::list<Checkpoint*> unrefCheckpointList;
650    std::list<Checkpoint*>::iterator it = checkpointList.begin();
651    for (; it != checkpointList.end(); ++it) {
652        removeInvalidCursorsOnCheckpoint(*it);
653        if ((*it)->getNumberOfCursors() > 0 ||
654            (*it)->getId() > pCursorPreCheckpointId) {
655            break;
656        } else {
657            numUnrefItems += (*it)->getNumItems();
658            numMetaItems +=  2; // 2 is for checkpoint start and end items.
659            ++numCheckpointsRemoved;
660            if (checkpointConfig.canKeepClosedCheckpoints() &&
661                (checkpointList.size() - numCheckpointsRemoved) <=
662                 checkpointConfig.getMaxCheckpoints()) {
663                // Collect unreferenced closed checkpoints until the number
664                // of checkpoints is
665                // equal to the number of max checkpoints allowed.
666                ++it;
667                break;
668            }
669        }
670    }
671    numItems.fetch_sub(numUnrefItems + numMetaItems);
672    if (numUnrefItems > 0) {
673        decrCursorOffset_UNLOCKED(persistenceCursor, numUnrefItems);
674        cursor_index::iterator map_it = tapCursors.begin();
675        for (; map_it != tapCursors.end(); ++map_it) {
676            decrCursorOffset_UNLOCKED(map_it->second, numUnrefItems);
677        }
678    }
679    unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList,
680                               checkpointList.begin(), it);
681
682    // If any cursor on a replica vbucket or downstream active vbucket
683    // receiving checkpoints from
684    // the upstream master is very slow and causes more closed checkpoints in
685    // memory, collapse those closed checkpoints into a single one to reduce
686    // the memory overhead.
687    if (!checkpointConfig.canKeepClosedCheckpoints() &&
688        vbucket->getState() == vbucket_state_replica)
689    {
690        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
691        collapseClosedCheckpoints(unrefCheckpointList);
692        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
693        if (curr_remains > new_remains) {
694            size_t diff = curr_remains - new_remains;
695            stats.decrDiskQueueSize(diff);
696            vbucket->decrDirtyQueueSize(diff);
697        } else if (curr_remains < new_remains) {
698            size_t diff = new_remains - curr_remains;
699            stats.diskQueueSize.fetch_add(diff);
700            vbucket->dirtyQueueSize.fetch_add(diff);
701        }
702    }
703    lh.unlock();
704
705    std::list<Checkpoint*>::iterator chkpoint_it = unrefCheckpointList.begin();
706    for (; chkpoint_it != unrefCheckpointList.end(); ++chkpoint_it) {
707        delete *chkpoint_it;
708    }
709
710    return numUnrefItems;
711}
712
713void CheckpointManager::removeInvalidCursorsOnCheckpoint(
714                                                     Checkpoint *pCheckpoint) {
715    std::list<std::string> invalidCursorNames;
716    const std::set<std::string> &cursors = pCheckpoint->getCursorNameList();
717    std::set<std::string>::const_iterator cit = cursors.begin();
718    for (; cit != cursors.end(); ++cit) {
719        // Check it with persistence cursor
720        if ((*cit).compare(persistenceCursor.name) == 0) {
721            if (pCheckpoint != *(persistenceCursor.currentCheckpoint)) {
722                invalidCursorNames.push_back(*cit);
723            }
724        } else { // Check it with tap cursors
725            cursor_index::iterator mit = tapCursors.find(*cit);
726            if (mit == tapCursors.end() ||
727                pCheckpoint != *(mit->second.currentCheckpoint)) {
728                invalidCursorNames.push_back(*cit);
729            }
730        }
731    }
732
733    std::list<std::string>::iterator it = invalidCursorNames.begin();
734    for (; it != invalidCursorNames.end(); ++it) {
735        pCheckpoint->removeCursorName(*it);
736    }
737}
738
739void CheckpointManager::collapseClosedCheckpoints(
740                                      std::list<Checkpoint*> &collapsedChks) {
741    // If there are one open checkpoint and more than one closed checkpoint,
742    // collapse those
743    // closed checkpoints into one checkpoint to reduce the memory overhead.
744    if (checkpointList.size() > 2) {
745        std::map<std::string, std::pair<uint64_t, bool> > slowCursors;
746        std::set<std::string> fastCursors;
747        std::list<Checkpoint*>::iterator lastClosedChk = checkpointList.end();
748        --lastClosedChk; --lastClosedChk; // Move to the last closed chkpt.
749        std::set<std::string>::iterator nitr =
750                (*lastClosedChk)->getCursorNameList().begin();
751        // Check if there are any cursors in the last closed checkpoint, which
752        // haven't yet visited any regular items belonging to the last closed
753        // checkpoint. If so, then we should skip collapsing checkpoints until
754        // those cursors move to the first regular item. Otherwise, those cursors will
755        // visit old items from collapsed checkpoints again.
756        for (; nitr != (*lastClosedChk)->getCursorNameList().end(); ++nitr) {
757            if (nitr->compare(persistenceCursor.name) == 0) {
758                enum queue_operation qop = (*(persistenceCursor.currentPos))->getOperation();
759                if (qop == queue_op_empty || qop == queue_op_checkpoint_start) {
760                    return;
761                }
762            } else {
763                cursor_index::iterator cc = tapCursors.find(*nitr);
764                if (cc == tapCursors.end()) {
765                    continue;
766                }
767                enum queue_operation qop = (*(cc->second.currentPos))->getOperation();
768                if (qop ==  queue_op_empty || qop == queue_op_checkpoint_start) {
769                    return;
770                }
771            }
772        }
773
774        fastCursors.insert((*lastClosedChk)->getCursorNameList().begin(),
775                           (*lastClosedChk)->getCursorNameList().end());
776        std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
777        ++rit; ++rit; //Move to the second last closed checkpoint.
778        size_t numDuplicatedItems = 0, numMetaItems = 0;
779        for (; rit != checkpointList.rend(); ++rit) {
780            size_t numAddedItems = (*lastClosedChk)->mergePrevCheckpoint(*rit);
781            numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
782            numMetaItems += 2; // checkpoint start and end meta items
783
784            std::set<std::string>::iterator nameItr =
785                (*rit)->getCursorNameList().begin();
786            for (; nameItr != (*rit)->getCursorNameList().end(); ++nameItr) {
787                if (nameItr->compare(persistenceCursor.name) == 0) {
788                    const std::string& key =
789                                 (*(persistenceCursor.currentPos))->getKey();
790                    bool cursor_on_chk_start = false;
791                    if ((*(persistenceCursor.currentPos))->getOperation() ==
792                        queue_op_checkpoint_start) {
793                        cursor_on_chk_start = true;
794                    }
795                    slowCursors[*nameItr] =
796                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
797                } else {
798                    cursor_index::iterator cc =
799                        tapCursors.find(*nameItr);
800                    const std::string& key = (*(cc->second.currentPos))->
801                                             getKey();
802                    bool cursor_on_chk_start = false;
803                    if ((*(cc->second.currentPos))->getOperation() ==
804                        queue_op_checkpoint_start) {
805                        cursor_on_chk_start = true;
806                    }
807                    slowCursors[*nameItr] =
808                        std::make_pair((*rit)->getMutationIdForKey(key), cursor_on_chk_start);
809                }
810            }
811        }
812        putCursorsInCollapsedChk(slowCursors, lastClosedChk);
813
814        numItems.fetch_sub(numDuplicatedItems + numMetaItems);
815        Checkpoint *pOpenCheckpoint = checkpointList.back();
816        const std::set<std::string> &openCheckpointCursors =
817                                    pOpenCheckpoint->getCursorNameList();
818        fastCursors.insert(openCheckpointCursors.begin(),
819                           openCheckpointCursors.end());
820        std::set<std::string>::const_iterator cit = fastCursors.begin();
821        // Update the offset of each fast cursor.
822        for (; cit != fastCursors.end(); ++cit) {
823            if ((*cit).compare(persistenceCursor.name) == 0) {
824                decrCursorOffset_UNLOCKED(persistenceCursor,
825                                          numDuplicatedItems);
826            } else {
827                cursor_index::iterator mit = tapCursors.find(*cit);
828                if (mit != tapCursors.end()) {
829                    decrCursorOffset_UNLOCKED(mit->second,
830                                              numDuplicatedItems);
831                }
832            }
833        }
834        collapsedChks.splice(collapsedChks.end(), checkpointList,
835                             checkpointList.begin(),  lastClosedChk);
836    }
837}
838
839bool CheckpointManager::queueDirty(const RCPtr<VBucket> &vb, queued_item& qi,
840                                   bool genSeqno) {
841    LockHolder lh(queueLock);
842
843    cb_assert(vb);
844    bool canCreateNewCheckpoint = false;
845    if (checkpointList.size() < checkpointConfig.getMaxCheckpoints() ||
846        (checkpointList.size() == checkpointConfig.getMaxCheckpoints() &&
847         checkpointList.front()->getNumberOfCursors() == 0)) {
848        canCreateNewCheckpoint = true;
849    }
850    if (vb->getState() == vbucket_state_active && canCreateNewCheckpoint) {
851        // Only the master active vbucket can create a next open checkpoint.
852        checkOpenCheckpoint_UNLOCKED(false, true);
853    }
854
855    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
856        addNewCheckpoint_UNLOCKED(checkpointList.back()->getId() + 1);
857    }
858
859    cb_assert(checkpointList.back()->getState() == CHECKPOINT_OPEN);
860
861    if (genSeqno) {
862        qi->setBySeqno(nextBySeqno());
863    } else {
864        lastBySeqNo = qi->getBySeqno();
865    }
866
867    queue_dirty_t result = checkpointList.back()->queueDirty(qi, this);
868    if (result == NEW_ITEM) {
869        ++numItems;
870    }
871
872    if (result != EXISTING_ITEM) {
873        ++stats.totalEnqueued;
874        ++stats.diskQueueSize;
875        vb->doStatsForQueueing(*qi, qi->size());
876    }
877
878    return result != EXISTING_ITEM;
879}
880
881void CheckpointManager::getAllItemsForPersistence(
882                                             std::vector<queued_item> &items) {
883    LockHolder lh(queueLock);
884    // Get all the items up to the end of the current open checkpoint.
885    while (incrCursor(persistenceCursor)) {
886        items.push_back(*(persistenceCursor.currentPos));
887    }
888
889    LOG(EXTENSION_LOG_DEBUG,
890        "Grab %ld items through the persistence cursor from vbucket %d",
891        items.size(), vbucketId);
892}
893
894queued_item CheckpointManager::nextItem(const std::string &name,
895                                        bool &isLastMutationItem,
896                                        uint64_t &endSeqno) {
897    LockHolder lh(queueLock);
898    cursor_index::iterator it = tapCursors.find(name);
899    if (it == tapCursors.end()) {
900        LOG(EXTENSION_LOG_WARNING,
901        "The cursor with name \"%s\" is not found in the checkpoint of vbucket"
902        "%d.\n", name.c_str(), vbucketId);
903        queued_item qi(new Item(std::string(""), 0xffff,
904                                queue_op_empty, 0, 0));
905        return qi;
906    }
907    if (checkpointList.back()->getId() == 0) {
908        LOG(EXTENSION_LOG_INFO,
909            "VBucket %d is still in backfill phase that doesn't allow "
910            " the tap cursor to fetch an item from it's current checkpoint",
911            vbucketId);
912        queued_item qi(new Item(std::string(""), 0xffff,
913                                queue_op_empty, 0, 0));
914        return qi;
915    }
916
917    CheckpointCursor &cursor = it->second;
918    if (incrCursor(cursor)) {
919        isLastMutationItem = isLastMutationItemInCheckpoint(cursor);
920        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
921        return *(cursor.currentPos);
922    } else {
923        isLastMutationItem = false;
924        endSeqno = (*(cursor.currentCheckpoint))->getHighSeqno();
925        queued_item qi(new Item(std::string(""), 0xffff,
926                                queue_op_empty, 0, 0));
927        return qi;
928    }
929}
930
931bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
932    if (++(cursor.currentPos) != (*(cursor.currentCheckpoint))->end()) {
933        queued_item &qi = *(cursor.currentPos);
934        if (qi->getOperation() != queue_op_checkpoint_start &&
935            qi->getOperation() != queue_op_checkpoint_end) {
936            ++(cursor.offset);
937        }
938        return true;
939    } else if (!moveCursorToNextCheckpoint(cursor)) {
940        --(cursor.currentPos);
941        return false;
942    }
943    return incrCursor(cursor);
944}
945
946void CheckpointManager::clear(vbucket_state_t vbState) {
947    LockHolder lh(queueLock);
948    std::list<Checkpoint*>::iterator it = checkpointList.begin();
949    // Remove all the checkpoints.
950    while(it != checkpointList.end()) {
951        delete *it;
952        ++it;
953    }
954    checkpointList.clear();
955    numItems = 0;
956
957    uint64_t checkpointId = vbState == vbucket_state_active ? 1 : 0;
958    // Add a new open checkpoint.
959    addNewCheckpoint_UNLOCKED(checkpointId);
960    resetCursors();
961}
962
963void CheckpointManager::resetCursors(bool resetPersistenceCursor) {
964    // Reset the persistence cursor.
965    if (resetPersistenceCursor) {
966        persistenceCursor.currentCheckpoint = checkpointList.begin();
967        persistenceCursor.currentPos = checkpointList.front()->begin();
968        persistenceCursor.offset = 0;
969        checkpointList.front()->registerCursorName(persistenceCursor.name);
970    }
971
972    // Reset all the TAP cursors.
973    cursor_index::iterator cit = tapCursors.begin();
974    for (; cit != tapCursors.end(); ++cit) {
975        cit->second.currentCheckpoint = checkpointList.begin();
976        cit->second.currentPos = checkpointList.front()->begin();
977        cit->second.offset = 0;
978        checkpointList.front()->registerCursorName(cit->second.name);
979    }
980}
981
982void CheckpointManager::resetTAPCursors(const std::list<std::string> &cursors){
983    LockHolder lh(queueLock);
984    std::list<std::string>::const_iterator it = cursors.begin();
985    for (; it != cursors.end(); ++it) {
986        registerTAPCursor_UNLOCKED(*it, getOpenCheckpointId_UNLOCKED(), true);
987    }
988}
989
990bool CheckpointManager::moveCursorToNextCheckpoint(CheckpointCursor &cursor) {
991    if ((*(cursor.currentCheckpoint))->getState() == CHECKPOINT_OPEN) {
992        return false;
993    } else if ((*(cursor.currentCheckpoint))->getState() ==
994                                                           CHECKPOINT_CLOSED) {
995        std::list<Checkpoint*>::iterator currCheckpoint =
996                                                      cursor.currentCheckpoint;
997        if (++currCheckpoint == checkpointList.end()) {
998            return false;
999        }
1000    }
1001
1002    // Remove the cursor's name from its current checkpoint.
1003    (*(cursor.currentCheckpoint))->removeCursorName(cursor.name);
1004    // Move the cursor to the next checkpoint.
1005    ++(cursor.currentCheckpoint);
1006    cursor.currentPos = (*(cursor.currentCheckpoint))->begin();
1007    // Register the cursor's name to its new current checkpoint.
1008    (*(cursor.currentCheckpoint))->registerCursorName(cursor.name);
1009    return true;
1010}
1011
1012size_t CheckpointManager::getNumOpenChkItems() {
1013    LockHolder lh(queueLock);
1014    if (checkpointList.empty()) {
1015        return 0;
1016    }
1017    return checkpointList.back()->getNumItems() + 1;
1018}
1019
1020uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation,
1021                                                         bool timeBound) {
1022    int checkpoint_id = 0;
1023
1024    timeBound = timeBound &&
1025                (ep_real_time() - checkpointList.back()->getCreationTime()) >=
1026                checkpointConfig.getCheckpointPeriod();
1027    // Create the new open checkpoint if any of the following conditions is
1028    // satisfied:
1029    // (1) force creation due to online update or high memory usage
1030    // (2) current checkpoint is reached to the max number of items allowed.
1031    // (3) time elapsed since the creation of the current checkpoint is greater
1032    //     than the threshold
1033    if (forceCreation ||
1034        (checkpointConfig.isItemNumBasedNewCheckpoint() &&
1035         checkpointList.back()->getNumItems() >=
1036         checkpointConfig.getCheckpointMaxItems()) ||
1037        (checkpointList.back()->getNumItems() > 0 && timeBound)) {
1038
1039        checkpoint_id = checkpointList.back()->getId();
1040        closeOpenCheckpoint_UNLOCKED(checkpoint_id);
1041        addNewCheckpoint_UNLOCKED(checkpoint_id + 1);
1042    }
1043    return checkpoint_id;
1044}
1045
1046bool CheckpointManager::eligibleForEviction(const std::string &key) {
1047    LockHolder lh(queueLock);
1048    uint64_t smallest_mid;
1049
1050    // Get the mutation id of the item pointed by the slowest cursor.
1051    // This won't cause much overhead as the number of cursors per vbucket is
1052    // usually bounded to 3 (persistence cursor + 2 replicas).
1053    const std::string &pkey = (*(persistenceCursor.currentPos))->getKey();
1054    smallest_mid = (*(persistenceCursor.currentCheckpoint))->
1055                                                     getMutationIdForKey(pkey);
1056    cursor_index::iterator mit = tapCursors.begin();
1057    for (; mit != tapCursors.end(); ++mit) {
1058        const std::string &tkey = (*(mit->second.currentPos))->getKey();
1059        uint64_t mid = (*(mit->second.currentCheckpoint))->
1060                                                     getMutationIdForKey(tkey);
1061        if (mid < smallest_mid) {
1062            smallest_mid = mid;
1063        }
1064    }
1065
1066    bool can_evict = true;
1067    std::list<Checkpoint*>::reverse_iterator it = checkpointList.rbegin();
1068    for (; it != checkpointList.rend(); ++it) {
1069        uint64_t mid = (*it)->getMutationIdForKey(key);
1070        if (mid == 0) { // key doesn't exist in a checkpoint.
1071            continue;
1072        }
1073        if (smallest_mid < mid) { // The slowest cursor is still
1074            can_evict = false;    //sitting behind a given key.
1075            break;
1076        }
1077    }
1078
1079    return can_evict;
1080}
1081
1082size_t CheckpointManager::getNumItemsForTAPConnection(
1083                                                     const std::string &name) {
1084    LockHolder lh(queueLock);
1085    size_t remains = 0;
1086    cursor_index::iterator it = tapCursors.find(name);
1087    if (it != tapCursors.end()) {
1088        remains = (numItems >= it->second.offset) ?
1089                   numItems - it->second.offset : 0;
1090    }
1091    return remains;
1092}
1093
1094size_t CheckpointManager::getNumItemsForPersistence_UNLOCKED() {
1095    size_t num_items = numItems;
1096    size_t offset = persistenceCursor.offset;
1097
1098    // Get the number of meta items that can be skipped by the persistence
1099    // cursor.
1100    size_t meta_items = 0;
1101    std::list<Checkpoint*>::iterator curr_chk =
1102                                           persistenceCursor.currentCheckpoint;
1103    for (; curr_chk != checkpointList.end(); ++curr_chk) {
1104        if (curr_chk == persistenceCursor.currentCheckpoint) {
1105            std::list<queued_item>::iterator curr_pos =
1106                                                  persistenceCursor.currentPos;
1107            ++curr_pos;
1108            if (curr_pos == (*curr_chk)->end()) {
1109                continue;
1110            }
1111            if ((*curr_pos)->getOperation() == queue_op_checkpoint_start) {
1112                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1113                    meta_items += 2;
1114                } else {
1115                    ++meta_items;
1116                }
1117            } else {
1118                if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1119                    ++meta_items;
1120                }
1121            }
1122        } else {
1123            if ((*curr_chk)->getState() == CHECKPOINT_CLOSED) {
1124                meta_items += 2;
1125            } else {
1126                ++meta_items;
1127            }
1128        }
1129    }
1130
1131    offset += meta_items;
1132    return num_items > offset ? num_items - offset : 0;
1133}
1134
1135void CheckpointManager::decrTapCursorFromCheckpointEnd(
1136                                                    const std::string &name) {
1137    LockHolder lh(queueLock);
1138    cursor_index::iterator it = tapCursors.find(name);
1139    if (it != tapCursors.end() &&
1140        (*(it->second.currentPos))->getOperation() ==
1141        queue_op_checkpoint_end) {
1142        decrCursorPos_UNLOCKED(it->second);
1143    }
1144}
1145
1146uint64_t CheckpointManager::getMutationIdForKey(uint64_t chk_id,
1147                                                std::string key) {
1148    std::list<Checkpoint*>::iterator itr = checkpointList.begin();
1149    for (; itr != checkpointList.end(); ++itr) {
1150        if (chk_id == (*itr)->getId()) {
1151            return (*itr)->getMutationIdForKey(key);
1152        }
1153    }
1154    return 0;
1155}
1156
1157bool CheckpointManager::isLastMutationItemInCheckpoint(
1158                                                   CheckpointCursor &cursor) {
1159    std::list<queued_item>::iterator it = cursor.currentPos;
1160    ++it;
1161    if (it == (*(cursor.currentCheckpoint))->end() ||
1162        (*it)->getOperation() == queue_op_checkpoint_end) {
1163        return true;
1164    }
1165    return false;
1166}
1167
1168void CheckpointManager::checkAndAddNewCheckpoint(uint64_t id,
1169                                               const RCPtr<VBucket> &vbucket) {
1170    LockHolder lh(queueLock);
1171
1172    // Ignore CHECKPOINT_START message with ID 0 as 0 is reserved for
1173    // representing backfill.
1174    if (id == 0) {
1175        return;
1176    }
1177    // If the replica receives a checkpoint start message right after backfill
1178    // completion, simply set the current open checkpoint id to the one
1179    // received from the active vbucket.
1180    if (checkpointList.back()->getId() == 0) {
1181        setOpenCheckpointId_UNLOCKED(id);
1182        resetCursors(false);
1183        return;
1184    }
1185
1186    std::list<Checkpoint*>::iterator it = checkpointList.begin();
1187    // Check if a checkpoint exists with ID >= id.
1188    while (it != checkpointList.end()) {
1189        if (id <= (*it)->getId()) {
1190            break;
1191        }
1192        ++it;
1193    }
1194
1195    if (it == checkpointList.end()) {
1196        if ((checkpointList.back()->getId() + 1) < id) {
1197            isCollapsedCheckpoint = true;
1198            uint64_t oid = getOpenCheckpointId_UNLOCKED();
1199            lastClosedCheckpointId = oid > 0 ? (oid - 1) : 0;
1200        } else if ((checkpointList.back()->getId() + 1) == id) {
1201            isCollapsedCheckpoint = false;
1202        }
1203        if (checkpointList.back()->getState() == CHECKPOINT_OPEN &&
1204            checkpointList.back()->getNumItems() == 0) {
1205            // If the current open checkpoint doesn't have any items, simply
1206            // set its id to
1207            // the one from the master node.
1208            setOpenCheckpointId_UNLOCKED(id);
1209            // Reposition all the cursors in the open checkpoint to the
1210            // begining position so that a checkpoint_start message can be
1211            // sent again with the correct id.
1212            const std::set<std::string> &cursors = checkpointList.back()->
1213                                                   getCursorNameList();
1214            std::set<std::string>::const_iterator cit = cursors.begin();
1215            for (; cit != cursors.end(); ++cit) {
1216                if ((*cit).compare(persistenceCursor.name) == 0) {
1217                    // Persistence cursor
1218                    continue;
1219                } else { // TAP cursors
1220                    cursor_index::iterator mit = tapCursors.find(*cit);
1221                    mit->second.currentPos = checkpointList.back()->begin();
1222                }
1223            }
1224        } else {
1225            closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
1226            addNewCheckpoint_UNLOCKED(id);
1227        }
1228    } else {
1229        size_t curr_remains = getNumItemsForPersistence_UNLOCKED();
1230        collapseCheckpoints(id);
1231        size_t new_remains = getNumItemsForPersistence_UNLOCKED();
1232        if (curr_remains > new_remains) {
1233            size_t diff = curr_remains - new_remains;
1234            stats.decrDiskQueueSize(diff);
1235            vbucket->decrDirtyQueueSize(diff);
1236        } else if (curr_remains < new_remains) {
1237            size_t diff = new_remains - curr_remains;
1238            stats.diskQueueSize.fetch_add(diff);
1239            vbucket->dirtyQueueSize.fetch_add(diff);
1240        }
1241    }
1242}
1243
1244void CheckpointManager::collapseCheckpoints(uint64_t id) {
1245    cb_assert(!checkpointList.empty());
1246
1247    std::map<std::string, std::pair<uint64_t, bool> > cursorMap;
1248    cursor_index::iterator itr;
1249    for (itr = tapCursors.begin(); itr != tapCursors.end(); itr++) {
1250        Checkpoint* chk = *(itr->second.currentCheckpoint);
1251        const std::string& key = (*(itr->second.currentPos))->getKey();
1252        bool cursor_on_chk_start = false;
1253        if ((*(itr->second.currentPos))->getOperation() == queue_op_checkpoint_start) {
1254            cursor_on_chk_start = true;
1255        }
1256        cursorMap[itr->first.c_str()] =
1257            std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1258    }
1259
1260    Checkpoint* chk = *(persistenceCursor.currentCheckpoint);
1261    std::string key = (*(persistenceCursor.currentPos))->getKey();
1262    bool cursor_on_chk_start = false;
1263    if ((*(persistenceCursor.currentPos))->getOperation() == queue_op_checkpoint_start) {
1264        cursor_on_chk_start = true;
1265    }
1266    cursorMap[persistenceCursor.name.c_str()] =
1267        std::make_pair(chk->getMutationIdForKey(key), cursor_on_chk_start);
1268
1269    setOpenCheckpointId_UNLOCKED(id);
1270
1271    std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
1272    ++rit; // Move to the last closed checkpoint.
1273    size_t numDuplicatedItems = 0, numMetaItems = 0;
1274    // Collapse all checkpoints.
1275    for (; rit != checkpointList.rend(); ++rit) {
1276        size_t numAddedItems = checkpointList.back()->
1277                               mergePrevCheckpoint(*rit);
1278        numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
1279        numMetaItems += 2; // checkpoint start and end meta items
1280        delete *rit;
1281    }
1282    numItems.fetch_sub(numDuplicatedItems + numMetaItems);
1283
1284    if (checkpointList.size() > 1) {
1285        checkpointList.erase(checkpointList.begin(), --checkpointList.end());
1286    }
1287    cb_assert(checkpointList.size() == 1);
1288
1289    if (checkpointList.back()->getState() == CHECKPOINT_CLOSED) {
1290        checkpointList.back()->popBackCheckpointEndItem();
1291        --numItems;
1292        checkpointList.back()->setState(CHECKPOINT_OPEN);
1293    }
1294    putCursorsInCollapsedChk(cursorMap, checkpointList.begin());
1295}
1296
1297void CheckpointManager::
1298putCursorsInCollapsedChk(std::map<std::string, std::pair<uint64_t, bool> > &cursors,
1299                         std::list<Checkpoint*>::iterator chkItr) {
1300    size_t i;
1301    Checkpoint *chk = *chkItr;
1302    std::list<queued_item>::iterator cit = chk->begin();
1303    std::list<queued_item>::iterator last = chk->begin();
1304    for (i = 0; cit != chk->end(); ++i, ++cit) {
1305        uint64_t id = chk->getMutationIdForKey((*cit)->getKey());
1306        std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1307        while (mit != cursors.end()) {
1308            std::pair<uint64_t, bool> val = mit->second;
1309            if (val.first < id || (val.first == id && val.second &&
1310                                   (*last)->getOperation() == queue_op_checkpoint_start)) {
1311                if (mit->first.compare(persistenceCursor.name) == 0) {
1312                    persistenceCursor.currentCheckpoint = chkItr;
1313                    persistenceCursor.currentPos = last;
1314                    persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1315                    chk->registerCursorName(persistenceCursor.name);
1316                } else {
1317                    cursor_index::iterator cc = tapCursors.find(mit->first);
1318                    if (cc == tapCursors.end() ||
1319                        cc->second.fromBeginningOnChkCollapse) {
1320                        ++mit;
1321                        continue;
1322                    }
1323                    cc->second.currentCheckpoint = chkItr;
1324                    cc->second.currentPos = last;
1325                    cc->second.offset = (i > 0) ? i - 1 : 0;
1326                    chk->registerCursorName(cc->second.name);
1327                }
1328                cursors.erase(mit++);
1329            } else {
1330                ++mit;
1331            }
1332        }
1333
1334        last = cit;
1335        if (cursors.empty()) {
1336            break;
1337        }
1338    }
1339
1340    std::map<std::string, std::pair<uint64_t, bool> >::iterator mit = cursors.begin();
1341    for (; mit != cursors.end(); ++mit) {
1342        if (mit->first.compare(persistenceCursor.name) == 0) {
1343            persistenceCursor.currentCheckpoint = chkItr;
1344            persistenceCursor.currentPos = last;
1345            persistenceCursor.offset = (i > 0) ? i - 1 : 0;
1346            chk->registerCursorName(persistenceCursor.name);
1347        } else {
1348            cursor_index::iterator cc = tapCursors.find(mit->first);
1349            if (cc == tapCursors.end()) {
1350                continue;
1351            }
1352            cc->second.currentCheckpoint = chkItr;
1353            if (cc->second.fromBeginningOnChkCollapse) {
1354                cc->second.currentPos = chk->begin();
1355                cc->second.offset = 0;
1356            } else {
1357                cc->second.currentPos = last;
1358                cc->second.offset = (i > 0) ? i - 1 : 0;
1359            }
1360            chk->registerCursorName(cc->second.name);
1361        }
1362    }
1363}
1364
1365bool CheckpointManager::hasNext(const std::string &name) {
1366    LockHolder lh(queueLock);
1367    cursor_index::iterator it = tapCursors.find(name);
1368    if (it == tapCursors.end() || getOpenCheckpointId_UNLOCKED() == 0) {
1369        return false;
1370    }
1371
1372    bool hasMore = true;
1373    std::list<queued_item>::iterator curr = it->second.currentPos;
1374    ++curr;
1375    if (curr == (*(it->second.currentCheckpoint))->end() &&
1376        (*(it->second.currentCheckpoint)) == checkpointList.back()) {
1377        hasMore = false;
1378    }
1379    return hasMore;
1380}
1381
1382queued_item CheckpointManager::createCheckpointItem(uint64_t id, uint16_t vbid,
1383                                          enum queue_operation checkpoint_op) {
1384    cb_assert(checkpoint_op == queue_op_checkpoint_start ||
1385           checkpoint_op == queue_op_checkpoint_end ||
1386           checkpoint_op == queue_op_empty);
1387
1388    uint64_t bySeqno;
1389    std::stringstream key;
1390    if (checkpoint_op == queue_op_checkpoint_start) {
1391        key << "checkpoint_start";
1392        bySeqno = lastBySeqNo + 1;
1393    } else if (checkpoint_op == queue_op_empty) {
1394        key << "dummy_key";
1395        bySeqno = lastBySeqNo;
1396    } else {
1397        key << "checkpoint_end";
1398        bySeqno = lastBySeqNo;
1399    }
1400    queued_item qi(new Item(key.str(), vbid, checkpoint_op, id, bySeqno));
1401    return qi;
1402}
1403
1404bool CheckpointManager::hasNextForPersistence() {
1405    LockHolder lh(queueLock);
1406    bool hasMore = true;
1407    std::list<queued_item>::iterator curr = persistenceCursor.currentPos;
1408    ++curr;
1409    if (curr == (*(persistenceCursor.currentCheckpoint))->end() &&
1410        (*(persistenceCursor.currentCheckpoint)) == checkpointList.back()) {
1411        hasMore = false;
1412    }
1413    return hasMore;
1414}
1415
1416uint64_t CheckpointManager::createNewCheckpoint() {
1417    LockHolder lh(queueLock);
1418    if (checkpointList.back()->getNumItems() > 0) {
1419        uint64_t chk_id = checkpointList.back()->getId();
1420        closeOpenCheckpoint_UNLOCKED(chk_id);
1421        addNewCheckpoint_UNLOCKED(chk_id + 1);
1422    }
1423    return checkpointList.back()->getId();
1424}
1425
1426void CheckpointManager::decrCursorOffset_UNLOCKED(CheckpointCursor &cursor,
1427                                                  size_t decr) {
1428    if (cursor.offset >= decr) {
1429        cursor.offset.fetch_sub(decr);
1430    } else {
1431        cursor.offset = 0;
1432        LOG(EXTENSION_LOG_INFO,
1433            "%s cursor offset is negative. Reset it to 0.",
1434            cursor.name.c_str());
1435    }
1436}
1437
1438void CheckpointManager::decrCursorPos_UNLOCKED(CheckpointCursor &cursor) {
1439    if (cursor.currentPos != (*(cursor.currentCheckpoint))->begin()) {
1440        --(cursor.currentPos);
1441    }
1442}
1443
1444uint64_t CheckpointManager::getPersistenceCursorPreChkId() {
1445    LockHolder lh(queueLock);
1446    return pCursorPreCheckpointId;
1447}
1448
1449uint64_t CheckpointManager::getPersistenceCursorSeqno() {
1450    LockHolder lh(queueLock);
1451    std::list<Checkpoint*>::iterator itr = persistenceCursor.currentCheckpoint;
1452    pCursorPreCheckpointId = ((*itr)->getId() > 0) ? (*itr)->getId() - 1 : 0;
1453
1454    std::list<queued_item>::iterator curr_pos = persistenceCursor.currentPos;
1455    return (*curr_pos)->getBySeqno();
1456}
1457
1458void CheckpointConfig::addConfigChangeListener(
1459                                         EventuallyPersistentEngine &engine) {
1460    Configuration &configuration = engine.getConfiguration();
1461    configuration.addValueChangedListener("chk_period",
1462             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1463    configuration.addValueChangedListener("chk_max_items",
1464             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1465    configuration.addValueChangedListener("max_checkpoints",
1466             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1467    configuration.addValueChangedListener("item_num_based_new_chk",
1468             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1469    configuration.addValueChangedListener("keep_closed_chks",
1470             new CheckpointConfigChangeListener(engine.getCheckpointConfig()));
1471}
1472
1473CheckpointConfig::CheckpointConfig(EventuallyPersistentEngine &e) {
1474    Configuration &config = e.getConfiguration();
1475    checkpointPeriod = config.getChkPeriod();
1476    checkpointMaxItems = config.getChkMaxItems();
1477    maxCheckpoints = config.getMaxCheckpoints();
1478    itemNumBasedNewCheckpoint = config.isItemNumBasedNewChk();
1479    keepClosedCheckpoints = config.isKeepClosedChks();
1480}
1481
1482bool CheckpointConfig::validateCheckpointMaxItemsParam(size_t
1483                                                       checkpoint_max_items) {
1484    if (checkpoint_max_items < MIN_CHECKPOINT_ITEMS ||
1485        checkpoint_max_items > MAX_CHECKPOINT_ITEMS) {
1486        std::stringstream ss;
1487        ss << "New checkpoint_max_items param value " << checkpoint_max_items
1488           << " is not ranged between the min allowed value " <<
1489           MIN_CHECKPOINT_ITEMS
1490           << " and max value " << MAX_CHECKPOINT_ITEMS;
1491        LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
1492        return false;
1493    }
1494    return true;
1495}
1496
1497bool CheckpointConfig::validateCheckpointPeriodParam(
1498                                                   size_t checkpoint_period) {
1499    if (checkpoint_period < MIN_CHECKPOINT_PERIOD ||
1500        checkpoint_period > MAX_CHECKPOINT_PERIOD) {
1501        std::stringstream ss;
1502        ss << "New checkpoint_period param value " << checkpoint_period
1503           << " is not ranged between the min allowed value " <<
1504              MIN_CHECKPOINT_PERIOD
1505           << " and max value " << MAX_CHECKPOINT_PERIOD;
1506        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1507        return false;
1508    }
1509    return true;
1510}
1511
1512bool CheckpointConfig::validateMaxCheckpointsParam(size_t max_checkpoints) {
1513    if (max_checkpoints < DEFAULT_MAX_CHECKPOINTS ||
1514        max_checkpoints > MAX_CHECKPOINTS_UPPER_BOUND) {
1515        std::stringstream ss;
1516        ss << "New max_checkpoints param value " << max_checkpoints
1517           << " is not ranged between the min allowed value " <<
1518              DEFAULT_MAX_CHECKPOINTS
1519           << " and max value " << MAX_CHECKPOINTS_UPPER_BOUND;
1520        LOG(EXTENSION_LOG_WARNING, "%s\n", ss.str().c_str());
1521        return false;
1522    }
1523    return true;
1524}
1525
1526void CheckpointConfig::setCheckpointPeriod(size_t value) {
1527    if (!validateCheckpointPeriodParam(value)) {
1528        value = DEFAULT_CHECKPOINT_PERIOD;
1529    }
1530    checkpointPeriod = static_cast<rel_time_t>(value);
1531}
1532
1533void CheckpointConfig::setCheckpointMaxItems(size_t value) {
1534    if (!validateCheckpointMaxItemsParam(value)) {
1535        value = DEFAULT_CHECKPOINT_ITEMS;
1536    }
1537    checkpointMaxItems = value;
1538}
1539
1540void CheckpointConfig::setMaxCheckpoints(size_t value) {
1541    if (!validateMaxCheckpointsParam(value)) {
1542        value = DEFAULT_MAX_CHECKPOINTS;
1543    }
1544    maxCheckpoints = value;
1545}
1546
1547void CheckpointManager::addStats(ADD_STAT add_stat, const void *cookie) {
1548    LockHolder lh(queueLock);
1549    char buf[256];
1550
1551    snprintf(buf, sizeof(buf), "vb_%d:open_checkpoint_id", vbucketId);
1552    add_casted_stat(buf, getOpenCheckpointId_UNLOCKED(), add_stat, cookie);
1553    snprintf(buf, sizeof(buf), "vb_%d:last_closed_checkpoint_id", vbucketId);
1554    add_casted_stat(buf, getLastClosedCheckpointId_UNLOCKED(),
1555    add_stat, cookie);
1556    snprintf(buf, sizeof(buf), "vb_%d:num_tap_cursors", vbucketId);
1557    add_casted_stat(buf, tapCursors.size(), add_stat, cookie);
1558    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoint_items", vbucketId);
1559    add_casted_stat(buf, numItems, add_stat, cookie);
1560    snprintf(buf, sizeof(buf), "vb_%d:num_open_checkpoint_items", vbucketId);
1561    add_casted_stat(buf, checkpointList.empty() ? 0 :
1562                    checkpointList.back()->getNumItems(),
1563                    add_stat, cookie);
1564    snprintf(buf, sizeof(buf), "vb_%d:num_checkpoints", vbucketId);
1565    add_casted_stat(buf, checkpointList.size(), add_stat, cookie);
1566    snprintf(buf, sizeof(buf), "vb_%d:num_items_for_persistence", vbucketId);
1567    add_casted_stat(buf, getNumItemsForPersistence_UNLOCKED(),
1568                    add_stat, cookie);
1569
1570    cursor_index::iterator tap_it = tapCursors.begin();
1571    for (; tap_it != tapCursors.end(); ++tap_it) {
1572        snprintf(buf, sizeof(buf),
1573                 "vb_%d:%s:cursor_checkpoint_id", vbucketId,
1574                 tap_it->first.c_str());
1575        add_casted_stat(buf, (*(tap_it->second.currentCheckpoint))->getId(),
1576                        add_stat, cookie);
1577    }
1578}
1579