1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "checkpoint_test.h"
19
20#include "config.h"
21
22#include <algorithm>
23#include <set>
24#include <thread>
25#include <vector>
26
27#include "checkpoint.h"
28#include "checkpoint_utils.h"
29#include "configuration.h"
30#include "ep_vb.h"
31#include "failover-table.h"
32#include "item_pager.h"
33#include "stats.h"
34#include "tests/module_tests/test_helpers.h"
35#include "thread_gate.h"
36
37#include "../mock/mock_dcp_consumer.h"
38
39#include <engines/ep/src/ep_types.h>
40#include <gmock/gmock.h>
41#include <gtest/gtest.h>
42#include <valgrind/valgrind.h>
43
44#define NUM_DCP_THREADS 3
45#define NUM_DCP_THREADS_VG 2
46#define NUM_SET_THREADS 4
47#define NUM_SET_THREADS_VG 2
48
49#define NUM_ITEMS 500
50#define NUM_ITEMS_VG 10
51
52#define DCP_CURSOR_PREFIX "dcp-client-"
53
54template <typename V>
55CheckpointTest<V>::CheckpointTest()
56    : callback(new DummyCB()),
57      vbucket(new V(0,
58                    vbucket_state_active,
59                    global_stats,
60                    checkpoint_config,
61                    /*kvshard*/ NULL,
62                    /*lastSeqno*/ 1000,
63                    /*lastSnapStart*/ 0,
64                    /*lastSnapEnd*/ 0,
65                    /*table*/ NULL,
66                    callback,
67                    /*newSeqnoCb*/ nullptr,
68                    config,
69                    item_eviction_policy_t::VALUE_ONLY)) {
70    createManager();
71}
72
73template <typename V>
74void CheckpointTest<V>::createManager(int64_t last_seqno) {
75    manager.reset(new CheckpointManager(global_stats,
76                                        this->vbucket->getId(),
77                                        checkpoint_config,
78                                        last_seqno,
79                                        /*lastSnapStart*/ 0,
80                                        /*lastSnapEnd*/ 0,
81                                        callback));
82
83    // Sanity check initial state.
84    ASSERT_EQ(1, this->manager->getNumOfCursors());
85    ASSERT_EQ(0, this->manager->getNumOpenChkItems());
86    ASSERT_EQ(1, this->manager->getNumCheckpoints());
87}
88
89template <typename V>
90bool CheckpointTest<V>::queueNewItem(const std::string& key) {
91    queued_item qi{new Item(makeStoredDocKey(key),
92                            this->vbucket->getId(),
93                            queue_op::mutation,
94                            /*revSeq*/ 0,
95                            /*bySeq*/ 0)};
96    return this->manager->queueDirty(*this->vbucket,
97                                     qi,
98                                     GenerateBySeqno::Yes,
99                                     GenerateCas::Yes,
100                                     /*preLinkDocCtx*/ nullptr);
101}
102
103struct thread_args {
104    VBucket* vbucket;
105    CheckpointManager *checkpoint_manager;
106    std::string name;
107    ThreadGate& gate;
108};
109
110static void launch_persistence_thread(void *arg) {
111    struct thread_args *args = static_cast<struct thread_args *>(arg);
112    args->gate.threadUp();
113
114    bool flush = false;
115    while(true) {
116        size_t itemPos;
117        std::vector<queued_item> items;
118        const std::string cursor(CheckpointManager::pCursorName);
119        args->checkpoint_manager->getAllItemsForCursor(cursor, items);
120        for(itemPos = 0; itemPos < items.size(); ++itemPos) {
121            queued_item qi = items.at(itemPos);
122            if (qi->getOperation() == queue_op::flush) {
123                flush = true;
124                break;
125            }
126        }
127        if (flush) {
128            // Checkpoint start and end operations may have been introduced in
129            // the items queue after the "flush" operation was added. Ignore
130            // these. Anything else will be considered an error.
131            for(size_t i = itemPos + 1; i < items.size(); ++i) {
132                queued_item qi = items.at(i);
133                EXPECT_TRUE(queue_op::checkpoint_start == qi->getOperation() ||
134                            queue_op::checkpoint_end == qi->getOperation())
135                    << "Unexpected operation:" << to_string(qi->getOperation());
136            }
137            break;
138        }
139        // yield to allow set thread to actually do some useful work.
140        std::this_thread::yield();
141    }
142    EXPECT_TRUE(flush);
143}
144
145static void launch_dcp_client_thread(void* arg) {
146    struct thread_args *args = static_cast<struct thread_args *>(arg);
147    args->gate.threadUp();
148
149    bool flush = false;
150    bool isLastItem = false;
151    while(true) {
152        queued_item qi = args->checkpoint_manager->nextItem(args->name,
153                                                            isLastItem);
154        if (qi->getOperation() == queue_op::flush) {
155            flush = true;
156            break;
157        }
158        // yield to allow set thread to actually do some useful work.
159        std::this_thread::yield();
160    }
161    EXPECT_TRUE(flush);
162}
163
164static void launch_checkpoint_cleanup_thread(void *arg) {
165    struct thread_args *args = static_cast<struct thread_args *>(arg);
166    args->gate.threadUp();
167
168    while (args->checkpoint_manager->getNumOfCursors() > 1) {
169        bool newCheckpointCreated;
170        args->checkpoint_manager->removeClosedUnrefCheckpoints(
171                *args->vbucket, newCheckpointCreated);
172        // yield to allow set thread to actually do some useful work.
173        std::this_thread::yield();
174    }
175}
176
177static void launch_set_thread(void *arg) {
178    struct thread_args *args = static_cast<struct thread_args *>(arg);
179    args->gate.threadUp();
180
181    int i(0);
182    for (i = 0; i < NUM_ITEMS; ++i) {
183        std::string key = "key-" + std::to_string(i);
184        queued_item qi(new Item(makeStoredDocKey(key),
185                                args->vbucket->getId(),
186                                queue_op::mutation,
187                                0,
188                                0));
189        args->checkpoint_manager->queueDirty(*args->vbucket,
190                                             qi,
191                                             GenerateBySeqno::Yes,
192                                             GenerateCas::Yes,
193                                             /*preLinkDocCtx*/ nullptr);
194    }
195}
196
197typedef ::testing::Types<EPVBucket> VBucketTypes;
198TYPED_TEST_CASE(CheckpointTest, VBucketTypes);
199
200TYPED_TEST(CheckpointTest, basic_chk_test) {
201    std::shared_ptr<Callback<uint16_t> > cb(new DummyCB());
202    this->vbucket.reset(new TypeParam(0,
203                                      vbucket_state_active,
204                                      this->global_stats,
205                                      this->checkpoint_config,
206                                      NULL,
207                                      0,
208                                      0,
209                                      0,
210                                      NULL,
211                                      cb,
212                                      /*newSeqnoCb*/ nullptr,
213                                      this->config,
214                                      item_eviction_policy_t::VALUE_ONLY));
215
216    this->manager.reset(new CheckpointManager(
217            this->global_stats, 0, this->checkpoint_config, 1, 0, 0, cb));
218
219    const size_t n_set_threads = RUNNING_ON_VALGRIND ? NUM_SET_THREADS_VG :
220                                                       NUM_SET_THREADS;
221
222    const size_t n_dcp_threads =
223            RUNNING_ON_VALGRIND ? NUM_DCP_THREADS_VG : NUM_DCP_THREADS;
224
225    std::vector<cb_thread_t> dcp_threads(n_dcp_threads);
226    std::vector<cb_thread_t> set_threads(n_set_threads);
227    cb_thread_t persistence_thread;
228    cb_thread_t checkpoint_cleanup_thread;
229    int rc(0);
230
231    const size_t n_threads{n_set_threads + n_dcp_threads + 2};
232    ThreadGate gate{n_threads};
233    thread_args t_args{this->vbucket.get(), this->manager.get(), {}, gate};
234
235    std::vector<thread_args> dcp_t_args;
236    for (size_t i = 0; i < n_dcp_threads; ++i) {
237        std::string name(DCP_CURSOR_PREFIX + std::to_string(i));
238        dcp_t_args.emplace_back(thread_args{
239                this->vbucket.get(), this->manager.get(), name, gate});
240        this->manager->registerCursor(
241                name, 1, false, MustSendCheckpointEnd::YES);
242    }
243
244    rc = cb_create_thread(&persistence_thread, launch_persistence_thread, &t_args, 0);
245    EXPECT_EQ(0, rc);
246
247    rc = cb_create_thread(&checkpoint_cleanup_thread,
248                        launch_checkpoint_cleanup_thread, &t_args, 0);
249    EXPECT_EQ(0, rc);
250
251    for (size_t i = 0; i < n_dcp_threads; ++i) {
252        rc = cb_create_thread(
253                &dcp_threads[i], launch_dcp_client_thread, &dcp_t_args[i], 0);
254        EXPECT_EQ(0, rc);
255    }
256
257    for (size_t i = 0; i < n_set_threads; ++i) {
258        rc = cb_create_thread(&set_threads[i], launch_set_thread, &t_args, 0);
259        EXPECT_EQ(0, rc);
260    }
261
262    for (size_t i = 0; i < n_set_threads; ++i) {
263        rc = cb_join_thread(set_threads[i]);
264        EXPECT_EQ(0, rc);
265    }
266
267    // Push the flush command into the queue so that all other threads can be terminated.
268    queued_item qi(new Item(makeStoredDocKey("flush"),
269                            this->vbucket->getId(),
270                            queue_op::flush,
271                            0xffff,
272                            0));
273    this->manager->queueDirty(*this->vbucket,
274                              qi,
275                              GenerateBySeqno::Yes,
276                              GenerateCas::Yes,
277                              /*preLinkDocCtx*/ nullptr);
278
279    rc = cb_join_thread(persistence_thread);
280    EXPECT_EQ(0, rc);
281
282    for (size_t i = 0; i < n_dcp_threads; ++i) {
283        rc = cb_join_thread(dcp_threads[i]);
284        EXPECT_EQ(0, rc);
285        std::stringstream name;
286        name << "dcp-client-" << i;
287        this->manager->removeCursor(name.str());
288    }
289
290    rc = cb_join_thread(checkpoint_cleanup_thread);
291    EXPECT_EQ(0, rc);
292}
293
294TYPED_TEST(CheckpointTest, reset_checkpoint_id) {
295    int i;
296    for (i = 0; i < 10; ++i) {
297        EXPECT_TRUE(this->queueNewItem("key-" + std::to_string(i)));
298    }
299    EXPECT_EQ(10, this->manager->getNumOpenChkItems());
300    EXPECT_EQ(10,
301              this->manager->getNumItemsForCursor(
302                      CheckpointManager::pCursorName));
303
304    EXPECT_EQ(2, this->manager->createNewCheckpoint());
305
306    size_t itemPos;
307    size_t lastMutationId = 0;
308    std::vector<queued_item> items;
309    const std::string cursor(CheckpointManager::pCursorName);
310    auto range = this->manager->getAllItemsForCursor(cursor, items);
311    EXPECT_EQ(0, range.start);
312    EXPECT_EQ(1010, range.end);
313    EXPECT_EQ(13, items.size());
314    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
315    // Check that the next 10 items are all SET operations.
316    for(itemPos = 1; itemPos < 11; ++itemPos) {
317        queued_item qi = items.at(itemPos);
318        EXPECT_EQ(queue_op::mutation, qi->getOperation());
319        size_t mid = qi->getBySeqno();
320        EXPECT_GT(mid, lastMutationId);
321        lastMutationId = qi->getBySeqno();
322    }
323
324    // Check that the following items are checkpoint end, followed by a
325    // checkpoint start.
326    EXPECT_EQ(queue_op::checkpoint_end, items.at(11)->getOperation());
327    EXPECT_EQ(queue_op::checkpoint_start, items.at(12)->getOperation());
328
329    items.clear();
330
331    this->manager->checkAndAddNewCheckpoint(1, *this->vbucket);
332    range = this->manager->getAllItemsForCursor(cursor, items);
333    EXPECT_EQ(1001, range.start);
334    EXPECT_EQ(1010, range.end);
335    EXPECT_EQ(0, items.size());
336}
337
338// Sanity check test fixture
339TYPED_TEST(CheckpointTest, CheckFixture) {
340    // Should intially have a single cursor (persistence).
341    EXPECT_EQ(1, this->manager->getNumOfCursors());
342    EXPECT_EQ(0, this->manager->getNumOpenChkItems());
343    for (auto& cursor : this->manager->getAllCursors()) {
344        EXPECT_EQ(CheckpointManager::pCursorName, cursor.first);
345    }
346    // Should initially be zero items to persist.
347    EXPECT_EQ(0,
348              this->manager->getNumItemsForCursor(
349                      CheckpointManager::pCursorName));
350
351    // Check that the items fetched matches the number we were told to expect.
352    std::vector<queued_item> items;
353    auto result = this->manager->getAllItemsForCursor(
354            CheckpointManager::pCursorName, items);
355    EXPECT_EQ(0, result.start);
356    EXPECT_EQ(0, result.end);
357    EXPECT_EQ(1, items.size());
358    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
359}
360
361MATCHER_P(HasOperation, op, "") { return arg->getOperation() == op; }
362
363// Basic test of a single, open checkpoint.
364TYPED_TEST(CheckpointTest, OneOpenCkpt) {
365    // Queue a set operation.
366    queued_item qi(new Item(makeStoredDocKey("key1"),
367                            this->vbucket->getId(),
368                            queue_op::mutation,
369                            /*revSeq*/ 20,
370                            /*bySeq*/ 0));
371
372    // No set_ops in queue, expect queueDirty to return true (increase
373    // persistence queue size).
374    EXPECT_TRUE(this->manager->queueDirty(*this->vbucket,
375                                          qi,
376                                          GenerateBySeqno::Yes,
377                                          GenerateCas::Yes,
378                                          /*preLinkDocCtx*/ nullptr));
379    EXPECT_EQ(1, this->manager->getNumCheckpoints()); // Single open checkpoint.
380    // 1x op_set
381    EXPECT_EQ(1, this->manager->getNumOpenChkItems());
382    EXPECT_EQ(1001, qi->getBySeqno());
383    EXPECT_EQ(20, qi->getRevSeqno());
384    EXPECT_EQ(1,
385              this->manager->getNumItemsForCursor(
386                      CheckpointManager::pCursorName));
387
388    // Adding the same key again shouldn't increase the size.
389    queued_item qi2(new Item(makeStoredDocKey("key1"),
390                             this->vbucket->getId(),
391                             queue_op::mutation,
392                             /*revSeq*/ 21,
393                             /*bySeq*/ 0));
394    EXPECT_FALSE(this->manager->queueDirty(*this->vbucket,
395                                           qi2,
396                                           GenerateBySeqno::Yes,
397                                           GenerateCas::Yes,
398                                           /*preLinkDocCtx*/ nullptr));
399    EXPECT_EQ(1, this->manager->getNumCheckpoints());
400    EXPECT_EQ(1, this->manager->getNumOpenChkItems());
401    EXPECT_EQ(1002, qi2->getBySeqno());
402    EXPECT_EQ(21, qi2->getRevSeqno());
403    EXPECT_EQ(1,
404              this->manager->getNumItemsForCursor(
405                      CheckpointManager::pCursorName));
406
407    // Adding a different key should increase size.
408    queued_item qi3(new Item(makeStoredDocKey("key2"),
409                             this->vbucket->getId(),
410                             queue_op::mutation,
411                             /*revSeq*/ 0,
412                             /*bySeq*/ 0));
413    EXPECT_TRUE(this->manager->queueDirty(*this->vbucket,
414                                          qi3,
415                                          GenerateBySeqno::Yes,
416                                          GenerateCas::Yes,
417                                          /*preLinkDocCtx*/ nullptr));
418    EXPECT_EQ(1, this->manager->getNumCheckpoints());
419    EXPECT_EQ(2, this->manager->getNumOpenChkItems());
420    EXPECT_EQ(1003, qi3->getBySeqno());
421    EXPECT_EQ(0, qi3->getRevSeqno());
422    EXPECT_EQ(2,
423              this->manager->getNumItemsForCursor(
424                      CheckpointManager::pCursorName));
425
426    // Check that the items fetched matches the number we were told to expect.
427    std::vector<queued_item> items;
428    auto result = this->manager->getAllItemsForCursor(
429            CheckpointManager::pCursorName, items);
430    EXPECT_EQ(0, result.start);
431    EXPECT_EQ(1003, result.end);
432    EXPECT_EQ(3, items.size());
433    EXPECT_THAT(items,
434                testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
435                                     HasOperation(queue_op::mutation),
436                                     HasOperation(queue_op::mutation)));
437}
438
439// Test that enqueuing a single delete works.
440TYPED_TEST(CheckpointTest, Delete) {
441    // Enqueue a single delete.
442    queued_item qi{new Item{makeStoredDocKey("key1"),
443                            this->vbucket->getId(),
444                            queue_op::mutation,
445                            /*revSeq*/ 10,
446                            /*byseq*/ 0}};
447    qi->setDeleted();
448    EXPECT_TRUE(this->manager->queueDirty(*this->vbucket,
449                                          qi,
450                                          GenerateBySeqno::Yes,
451                                          GenerateCas::Yes,
452                                          /*preLinkDocCtx*/ nullptr));
453
454    EXPECT_EQ(1, this->manager->getNumCheckpoints());  // Single open checkpoint.
455    EXPECT_EQ(1, this->manager->getNumOpenChkItems()); // 1x op_del
456    EXPECT_EQ(1001, qi->getBySeqno());
457    EXPECT_EQ(10, qi->getRevSeqno());
458
459    // Check that the items fetched matches what was enqueued.
460    std::vector<queued_item> items;
461    auto result = this->manager->getAllItemsForCursor
462            (CheckpointManager::pCursorName, items);
463
464    EXPECT_EQ(0, result.start);
465    EXPECT_EQ(1001, result.end);
466    ASSERT_EQ(2, items.size());
467    EXPECT_THAT(items,
468                testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
469                                     HasOperation(queue_op::mutation)));
470    EXPECT_TRUE(items[1]->isDeleted());
471}
472
473
474// Test with one open and one closed checkpoint.
475TYPED_TEST(CheckpointTest, OneOpenOneClosed) {
476    // Add some items to the initial (open) checkpoint.
477    for (auto i : {1,2}) {
478        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(i)));
479    }
480    EXPECT_EQ(1, this->manager->getNumCheckpoints());
481    // 2x op_set
482    EXPECT_EQ(2, this->manager->getNumOpenChkItems());
483    const uint64_t ckpt_id1 = this->manager->getOpenCheckpointId();
484
485    // Create a new checkpoint (closing the current open one).
486    const uint64_t ckpt_id2 = this->manager->createNewCheckpoint();
487    EXPECT_NE(ckpt_id1, ckpt_id2) << "New checkpoint ID should differ from old";
488    EXPECT_EQ(ckpt_id1, this->manager->getLastClosedCheckpointId());
489    EXPECT_EQ(0, this->manager->getNumOpenChkItems()); // no items yet
490
491    // Add some items to the newly-opened checkpoint (note same keys as 1st
492    // ckpt).
493    for (auto ii : {1,2}) {
494        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
495    }
496    EXPECT_EQ(2, this->manager->getNumCheckpoints());
497    // 2x op_set
498    EXPECT_EQ(2, this->manager->getNumOpenChkItems());
499
500    // Examine the items - should be 2 lots of two keys.
501    EXPECT_EQ(4,
502              this->manager->getNumItemsForCursor(
503                      CheckpointManager::pCursorName));
504
505    // Check that the items fetched matches the number we were told to expect.
506    std::vector<queued_item> items;
507    auto result = this->manager->getAllItemsForCursor(
508            CheckpointManager::pCursorName, items);
509    EXPECT_EQ(0, result.start);
510    EXPECT_EQ(1004, result.end);
511    EXPECT_EQ(7, items.size());
512    EXPECT_THAT(items,
513                testing::ElementsAre(HasOperation(queue_op::checkpoint_start),
514                                     HasOperation(queue_op::mutation),
515                                     HasOperation(queue_op::mutation),
516                                     HasOperation(queue_op::checkpoint_end),
517                                     HasOperation(queue_op::checkpoint_start),
518                                     HasOperation(queue_op::mutation),
519                                     HasOperation(queue_op::mutation)));
520}
521
522// Test the automatic creation of checkpoints based on the number of items.
523TYPED_TEST(CheckpointTest, ItemBasedCheckpointCreation) {
524    // Size down the default number of items to create a new checkpoint and
525    // recreate the manager
526    this->checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
527                                               MIN_CHECKPOINT_ITEMS,
528                                               /*numCheckpoints*/ 2,
529                                               /*itemBased*/ true,
530                                               /*keepClosed*/ false,
531                                               /*enableMerge*/ false,
532                                               /*persistenceEnabled*/ true);
533    // TODO: ^^ Consider a variant for Ephemeral testing -
534    // persistenceEnabled:false
535
536    this->createManager();
537
538    // Create one less than the number required to create a new checkpoint.
539    queued_item qi;
540    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
541        EXPECT_EQ(ii, this->manager->getNumOpenChkItems());
542
543        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
544        EXPECT_EQ(1, this->manager->getNumCheckpoints());
545    }
546
547    // Add one more - should create a new checkpoint.
548    EXPECT_TRUE(this->queueNewItem("key_epoch"));
549    EXPECT_EQ(2, this->manager->getNumCheckpoints());
550    EXPECT_EQ(1, this->manager->getNumOpenChkItems()); // 1x op_set
551
552    // Fill up this checkpoint also - note loop for MIN_CHECKPOINT_ITEMS - 1
553    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS - 1; ii++) {
554        EXPECT_EQ(ii + 1,
555                  this->manager->getNumOpenChkItems()); /* +1 initial set */
556
557        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
558
559        EXPECT_EQ(2, this->manager->getNumCheckpoints());
560    }
561
562    // Add one more - as we have hit maximum checkpoints should *not* create a
563    // new one.
564    EXPECT_TRUE(this->queueNewItem("key_epoch2"));
565    EXPECT_EQ(2, this->manager->getNumCheckpoints());
566    EXPECT_EQ(11, // 1x key_epoch, 9x key_X, 1x key_epoch2
567              this->manager->getNumOpenChkItems());
568
569    // Fetch the items associated with the persistence cursor. This
570    // moves the single cursor registered outside of the initial checkpoint,
571    // allowing a new open checkpoint to be created.
572    EXPECT_EQ(1, this->manager->getNumOfCursors());
573    snapshot_range_t range;
574    std::vector<queued_item> items;
575    range = this->manager->getAllItemsForCursor(CheckpointManager::pCursorName,
576                                                items);
577
578    EXPECT_EQ(0, range.start);
579    EXPECT_EQ(1021, range.end);
580    EXPECT_EQ(24, items.size());
581
582    // Should still have the same number of checkpoints and open items.
583    EXPECT_EQ(2, this->manager->getNumCheckpoints());
584    EXPECT_EQ(11, this->manager->getNumOpenChkItems());
585
586    // But adding a new item will create a new one.
587    EXPECT_TRUE(this->queueNewItem("key_epoch3"));
588    EXPECT_EQ(3, this->manager->getNumCheckpoints());
589    EXPECT_EQ(1, this->manager->getNumOpenChkItems()); // 1x op_set
590}
591
592// Test checkpoint and cursor accounting - when checkpoints are closed the
593// offset of cursors is updated as appropriate.
594TYPED_TEST(CheckpointTest, CursorOffsetOnCheckpointClose) {
595    // Add two items to the initial (open) checkpoint.
596    for (auto i : {1,2}) {
597        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(i)));
598    }
599    EXPECT_EQ(1, this->manager->getNumCheckpoints());
600    // 2x op_set
601    EXPECT_EQ(2, this->manager->getNumOpenChkItems());
602
603    // Use the existing persistence cursor for this test:
604    EXPECT_EQ(
605            2,
606            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
607            << "Cursor should initially have two items pending";
608
609    // Check de-dupe counting - after adding another item with the same key,
610    // should still see two items.
611    EXPECT_FALSE(this->queueNewItem("key1")) << "Adding a duplicate key to "
612                                                "open checkpoint should not "
613                                                "increase queue size";
614
615    EXPECT_EQ(
616            2,
617            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
618            << "Expected 2 items for cursor (2x op_set) after adding a "
619               "duplicate.";
620
621    // Create a new checkpoint (closing the current open one).
622    this->manager->createNewCheckpoint();
623    EXPECT_EQ(0, this->manager->getNumOpenChkItems());
624    EXPECT_EQ(2, this->manager->getNumCheckpoints());
625    EXPECT_EQ(
626            2,
627            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
628            << "Expected 2 items for cursor after creating new checkpoint";
629
630    // Advance cursor - first to get the 'checkpoint_start' meta item,
631    // and a second time to get the a 'proper' mutation.
632    bool isLastMutationItem;
633    auto item = this->manager->nextItem(CheckpointManager::pCursorName,
634                                        isLastMutationItem);
635    EXPECT_TRUE(item->isCheckPointMetaItem());
636    EXPECT_FALSE(isLastMutationItem);
637    EXPECT_EQ(
638            2,
639            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
640            << "Expected 2 items for cursor after advancing one item";
641
642    item = this->manager->nextItem(CheckpointManager::pCursorName,
643                                   isLastMutationItem);
644    EXPECT_FALSE(item->isCheckPointMetaItem());
645    EXPECT_FALSE(isLastMutationItem);
646    EXPECT_EQ(
647            1,
648            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
649            << "Expected 1 item for cursor after advancing by 1";
650
651    // Add two items to the newly-opened checkpoint. Same keys as 1st ckpt,
652    // but cannot de-dupe across checkpoints.
653    for (auto ii : {1,2}) {
654        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
655    }
656
657    EXPECT_EQ(
658            3,
659            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
660            << "Expected 3 items for cursor after adding 2 more to new "
661               "checkpoint";
662
663    // Advance the cursor 'out' of the first checkpoint.
664    item = this->manager->nextItem(CheckpointManager::pCursorName,
665                                   isLastMutationItem);
666    EXPECT_FALSE(item->isCheckPointMetaItem());
667    EXPECT_TRUE(isLastMutationItem);
668
669    // Now at the end of the first checkpoint, move into the next checkpoint.
670    item = this->manager->nextItem(CheckpointManager::pCursorName,
671                                   isLastMutationItem);
672    EXPECT_TRUE(item->isCheckPointMetaItem());
673    EXPECT_TRUE(isLastMutationItem);
674    item = this->manager->nextItem(CheckpointManager::pCursorName,
675                                   isLastMutationItem);
676    EXPECT_TRUE(item->isCheckPointMetaItem());
677    EXPECT_FALSE(isLastMutationItem);
678
679    // Tell Checkpoint manager the items have been persisted, so it advances
680    // pCursorPreCheckpointId, which will allow us to remove the closed
681    // unreferenced checkpoints.
682    this->manager->itemsPersisted();
683
684    // Both previous checkpoints are unreferenced. Close them. This will
685    // cause the offset of this cursor to be recalculated.
686    bool new_open_ckpt_created;
687    EXPECT_EQ(2,
688              this->manager->removeClosedUnrefCheckpoints(
689                      *this->vbucket, new_open_ckpt_created));
690
691    EXPECT_EQ(1, this->manager->getNumCheckpoints());
692
693    EXPECT_EQ(2,
694              this->manager->getNumItemsForCursor(
695                      CheckpointManager::pCursorName));
696
697    // Drain the remaining items.
698    item = this->manager->nextItem(CheckpointManager::pCursorName,
699                                   isLastMutationItem);
700    EXPECT_FALSE(item->isCheckPointMetaItem());
701    EXPECT_FALSE(isLastMutationItem);
702    item = this->manager->nextItem(CheckpointManager::pCursorName,
703                                   isLastMutationItem);
704    EXPECT_FALSE(item->isCheckPointMetaItem());
705    EXPECT_TRUE(isLastMutationItem);
706
707    EXPECT_EQ(0,
708              this->manager->getNumItemsForCursor(
709                      CheckpointManager::pCursorName));
710}
711
712// Test the getAllItemsForCursor()
713TYPED_TEST(CheckpointTest, ItemsForCheckpointCursor) {
714    /* We want to have items across 2 checkpoints. Size down the default number
715       of items to create a new checkpoint and recreate the manager */
716    this->checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
717                                               MIN_CHECKPOINT_ITEMS,
718                                               /*numCheckpoints*/ 2,
719                                               /*itemBased*/ true,
720                                               /*keepClosed*/ false,
721                                               /*enableMerge*/ false,
722                                               /*persistenceEnabled*/ true);
723    // TODO: ^^ Consider a variant for Ephemeral testing -
724    // persistenceEnabled:false
725
726    this->createManager();
727
728    /* Add items such that we have 2 checkpoints */
729    queued_item qi;
730    for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
731        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
732    }
733
734    /* Check if we have desired number of checkpoints and desired number of
735       items */
736    EXPECT_EQ(2, this->manager->getNumCheckpoints());
737    EXPECT_EQ(MIN_CHECKPOINT_ITEMS, this->manager->getNumOpenChkItems());
738
739    /* Register DCP replication cursor */
740    std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
741    this->manager->registerCursorBySeqno(
742            dcp_cursor.c_str(), 0, MustSendCheckpointEnd::NO);
743
744    /* Get items for persistence*/
745    std::vector<queued_item> items;
746    auto range = this->manager->getAllItemsForCursor(
747            CheckpointManager::pCursorName, items);
748
749    /* We should have got (2 * MIN_CHECKPOINT_ITEMS + 3) items. 3 additional are
750       op_ckpt_start, op_ckpt_end and op_ckpt_start */
751    EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
752    EXPECT_EQ(0, range.start);
753    EXPECT_EQ(1000 + 2 * MIN_CHECKPOINT_ITEMS, range.end);
754
755    /* Get items for DCP replication cursor */
756    items.clear();
757    range = this->manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
758    EXPECT_EQ(2 * MIN_CHECKPOINT_ITEMS + 3, items.size());
759    EXPECT_EQ(0, range.start);
760    EXPECT_EQ(1000 + 2 * MIN_CHECKPOINT_ITEMS, range.end);
761}
762
763// Test getAllItemsForCursor() when it is limited to fewer items than exist
764// in total. Cursor should only advanced to the start of the 2nd checkpoint.
765TYPED_TEST(CheckpointTest, ItemsForCheckpointCursorLimited) {
766    /* We want to have items across 2 checkpoints. Size down the default number
767       of items to create a new checkpoint and recreate the manager */
768    this->checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
769                                               MIN_CHECKPOINT_ITEMS,
770                                               /*numCheckpoints*/ 2,
771                                               /*itemBased*/ true,
772                                               /*keepClosed*/ false,
773                                               /*enableMerge*/ false,
774                                               /*persistenceEnabled*/ true);
775
776    this->createManager();
777
778    /* Add items such that we have 2 checkpoints */
779    queued_item qi;
780    for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
781        ASSERT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
782    }
783
784    /* Verify we have desired number of checkpoints and desired number of
785       items */
786    ASSERT_EQ(2, this->manager->getNumCheckpoints());
787    ASSERT_EQ(MIN_CHECKPOINT_ITEMS, this->manager->getNumOpenChkItems());
788
789    /* Get items for persistence. Specify a limit of 1 so we should only
790     * fetch the first checkpoints' worth.
791     */
792    std::vector<queued_item> items;
793    auto result = this->manager->getItemsForCursor(
794            CheckpointManager::pCursorName, items, 1);
795    EXPECT_EQ(0, result.range.start);
796    EXPECT_EQ(1000 + MIN_CHECKPOINT_ITEMS, result.range.end);
797    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 2, items.size())
798            << "Should have MIN_CHECKPOINT_ITEMS + 2 (ckpt start & end) items";
799    EXPECT_EQ(2,
800              this->manager->getCheckpointIdForCursor(
801                      CheckpointManager::pCursorName))
802            << "Cursor should have moved into second checkpoint.";
803
804}
805
806// Test the checkpoint cursor movement
807TYPED_TEST(CheckpointTest, CursorMovement) {
808    /* We want to have items across 2 checkpoints. Size down the default number
809     of items to create a new checkpoint and recreate the manager */
810    this->checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
811                                               MIN_CHECKPOINT_ITEMS,
812                                               /*numCheckpoints*/ 2,
813                                               /*itemBased*/ true,
814                                               /*keepClosed*/ false,
815                                               /*enableMerge*/ false,
816                                               /*persistenceEnabled*/true);
817    // TODO: ^^ Consider a variant for Ephemeral testing -
818    // persistenceEnabled:false
819
820    this->createManager();
821
822    /* Add items such that we have 1 full (max items as per config) checkpoint.
823       Adding another would open new checkpoint */
824    queued_item qi;
825    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
826        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
827    }
828
829    /* Check if we have desired number of checkpoints and desired number of
830       items */
831    EXPECT_EQ(1, this->manager->getNumCheckpoints());
832    EXPECT_EQ(MIN_CHECKPOINT_ITEMS, this->manager->getNumOpenChkItems());
833
834    /* Register DCP replication cursor */
835    std::string dcp_cursor(DCP_CURSOR_PREFIX + std::to_string(1));
836    this->manager->registerCursorBySeqno(
837            dcp_cursor.c_str(), 0, MustSendCheckpointEnd::NO);
838
839    /* Get items for persistence cursor */
840    std::vector<queued_item> items;
841    auto range = this->manager->getAllItemsForCursor(
842            CheckpointManager::pCursorName, items);
843
844    /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
845    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
846    EXPECT_EQ(0, range.start);
847    EXPECT_EQ(1000 + MIN_CHECKPOINT_ITEMS, range.end);
848
849    /* Get items for DCP replication cursor */
850    items.clear();
851    range = this->manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
852    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
853    EXPECT_EQ(0, range.start);
854    EXPECT_EQ(1000 + MIN_CHECKPOINT_ITEMS, range.end);
855
856    uint64_t curr_open_chkpt_id = this->manager->getOpenCheckpointId_UNLOCKED();
857
858    /* Run the checkpoint remover so that new open checkpoint is created */
859    bool newCheckpointCreated;
860    this->manager->removeClosedUnrefCheckpoints(*this->vbucket,
861                                                newCheckpointCreated);
862    EXPECT_EQ(curr_open_chkpt_id + 1,
863              this->manager->getOpenCheckpointId_UNLOCKED());
864
865    /* Get items for persistence cursor */
866    EXPECT_EQ(
867            0,
868            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
869            << "Expected to have no normal (only meta) items";
870    items.clear();
871    range = this->manager->getAllItemsForCursor(CheckpointManager::pCursorName,
872                                                items);
873
874    /* We should have got op_ckpt_start item */
875    EXPECT_EQ(1, items.size());
876    EXPECT_EQ(1000 + MIN_CHECKPOINT_ITEMS, range.start);
877    EXPECT_EQ(1000 + MIN_CHECKPOINT_ITEMS, range.end);
878
879    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
880
881    /* Get items for DCP replication cursor */
882    EXPECT_EQ(
883            0,
884            this->manager->getNumItemsForCursor(CheckpointManager::pCursorName))
885            << "Expected to have no normal (only meta) items";
886    items.clear();
887    this->manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
888    /* Expecting only 1 op_ckpt_start item */
889    EXPECT_EQ(1, items.size());
890    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
891}
892
893// Test the checkpoint cursor movement for replica vBuckets (where we can
894// perform more checkpoint collapsing)
895TYPED_TEST(CheckpointTest, CursorMovementReplicaMerge) {
896    this->vbucket->setState(vbucket_state_replica);
897
898    /* We want to have items across 2 checkpoints. Size down the default number
899     of items to create a new checkpoint and recreate the manager */
900    this->checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
901                                               MIN_CHECKPOINT_ITEMS,
902                                               /*numCheckpoints*/ 2,
903                                               /*itemBased*/ true,
904                                               /*keepClosed*/ false,
905                                               /*enableMerge*/ true,
906                                               /*persistenceEnabled*/true);
907    // TODO: ^^ Consider a variant for Ephemeral testing -
908    // persistenceEnabled:false
909
910    // Add items such that we have a checkpoint at half-capacity.
911    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS / 2; ii++) {
912        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
913    }
914
915    /* Check if we have desired number of checkpoints and desired number of
916        items */
917    EXPECT_EQ(1, this->manager->getNumCheckpoints());
918    EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2), this->manager->getNumOpenChkItems());
919
920    // Register DCP replication cursor, which will be moved into the middle of
921    // first checkpoint and then left there.
922    std::string dcp_cursor{DCP_CURSOR_PREFIX + std::to_string(1)};
923    this->manager->registerCursorBySeqno(
924            dcp_cursor.c_str(), 0, MustSendCheckpointEnd::NO);
925
926    std::vector<queued_item> items;
927    this->manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
928    EXPECT_EQ((MIN_CHECKPOINT_ITEMS / 2) + 1, items.size());
929
930    // Add more items so this checkpoint is now full.
931    for (unsigned int ii = MIN_CHECKPOINT_ITEMS / 2; ii < MIN_CHECKPOINT_ITEMS;
932            ii++) {
933        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
934    }
935    EXPECT_EQ(1, this->manager->getNumCheckpoints())
936            << "Should still only have 1 checkpoint after adding "
937               "MIN_CHECKPOINT_ITEMS total";
938    EXPECT_EQ(MIN_CHECKPOINT_ITEMS, this->manager->getNumOpenChkItems());
939
940    /* Get items for persistence cursor - this will move the persistence cursor
941     * out of the initial checkpoint.
942     */
943    items.clear();
944    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
945
946    /* We should have got (MIN_CHECKPOINT_ITEMS + op_ckpt_start) items. */
947    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
948
949    EXPECT_EQ(1, this->manager->getOpenCheckpointId_UNLOCKED());
950
951    // Create a new checkpoint.
952    EXPECT_EQ(2, this->manager->createNewCheckpoint());
953
954    // Add another MIN_CHECKPOINT_ITEMS. This should fill up the second
955    // checkpoint.
956    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
957        EXPECT_TRUE(this->queueNewItem("keyB_" + std::to_string(ii)));
958    }
959
960    // Move the persistence cursor through these new items.
961    EXPECT_EQ(MIN_CHECKPOINT_ITEMS,
962              this->manager->getNumItemsForCursor(
963                      CheckpointManager::pCursorName));
964    items.clear();
965    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
966    EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 1, items.size());
967
968    // Create a third checkpoint.
969    EXPECT_EQ(3, this->manager->createNewCheckpoint());
970
971    // Move persistence cursor into third checkpoint.
972    items.clear();
973    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
974    EXPECT_EQ(1, items.size())
975        << "Expected to get a single meta item";
976    EXPECT_EQ(queue_op::checkpoint_start, items.at(0)->getOperation());
977
978    // We now have an unoccupied second checkpoint. We should be able to
979    // collapse this, and move the dcp_cursor into the merged checkpoint.
980    bool newCheckpointCreated;
981    this->manager->removeClosedUnrefCheckpoints(*this->vbucket,
982                                                newCheckpointCreated);
983
984    /* Get items for DCP cursor */
985    EXPECT_EQ(MIN_CHECKPOINT_ITEMS / 2 + MIN_CHECKPOINT_ITEMS,
986              this->manager->getNumItemsForCursor(dcp_cursor))
987            << "DCP cursor remaining items should have been recalculated after "
988               "close of unref checkpoints.";
989
990    items.clear();
991    auto range = this->manager->getAllItemsForCursor(dcp_cursor.c_str(), items);
992    EXPECT_EQ(1001, range.start);
993    EXPECT_EQ(1020, range.end);
994
995    // Check we have received correct items (done in chunks because
996    // EXPECT_THAT maxes out at 10 elements).
997    std::vector<queued_item> items_a(items.begin(), items.begin() + 5);
998    // Remainder of first checkpoint.
999    EXPECT_THAT(items_a, testing::Each(HasOperation(queue_op::mutation)));
1000
1001    // Second checkpoint's data- 10x set.
1002    std::vector<queued_item> items_b(items.begin() + 5, items.begin() + 15);
1003    EXPECT_THAT(items_b, testing::Each(HasOperation(queue_op::mutation)));
1004
1005    // end of second checkpoint and start of third.
1006    std::vector<queued_item> items_c(items.begin() + 15, items.end());
1007    EXPECT_THAT(items_c,
1008                testing::ElementsAre(HasOperation(queue_op::checkpoint_end),
1009                                     HasOperation(queue_op::checkpoint_start)));
1010}
1011
1012// MB-25056 - Regression test replicating situation where the seqno returned by
1013// registerCursorBySeqno minus one is greater than the input parameter
1014// startBySeqno but a backfill is not required.
1015TYPED_TEST(CheckpointTest, MB25056_backfill_not_required) {
1016    std::vector<queued_item> items;
1017    this->vbucket->setState(vbucket_state_replica);
1018
1019    ASSERT_TRUE(this->queueNewItem("key0"));
1020    // Add duplicate items, which should cause de-duplication to occur.
1021    for (unsigned int ii = 0; ii < MIN_CHECKPOINT_ITEMS; ii++) {
1022        EXPECT_FALSE(this->queueNewItem("key0"));
1023    }
1024    // Add a number of non duplicate items to the same checkpoint
1025    for (unsigned int ii = 1; ii < MIN_CHECKPOINT_ITEMS; ii++) {
1026        EXPECT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
1027    }
1028
1029    // Register DCP replication cursor
1030    std::string dcp_cursor(DCP_CURSOR_PREFIX);
1031    // Request to register the cursor with a seqno that has been de-duped away
1032    CursorRegResult result = this->manager->registerCursorBySeqno(
1033            dcp_cursor.c_str(), 1005, MustSendCheckpointEnd::NO);
1034    EXPECT_EQ(1011, result.first) << "Returned seqno is not expected value.";
1035    EXPECT_FALSE(result.second) << "Backfill is unexpectedly required.";
1036}
1037
1038//
1039// It's critical that the HLC (CAS) is ordered with seqno generation
1040// otherwise XDCR may drop a newer bySeqno mutation because the CAS is not
1041// higher.
1042//
1043TYPED_TEST(CheckpointTest, SeqnoAndHLCOrdering) {
1044    const int n_threads = 8;
1045    const int n_items = 1000;
1046
1047    // configure so we can store a large number of items
1048    // configure with 1 checkpoint to ensure the time-based closing
1049    // does not split the items over many checkpoints and muddy the final
1050    // data checks.
1051    this->checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
1052                                               n_threads * n_items,
1053                                               /*numCheckpoints*/ 1,
1054                                               /*itemBased*/ true,
1055                                               /*keepClosed*/ false,
1056                                               /*enableMerge*/ false,
1057                                               /*persistenceEnabled*/true);
1058    // TODO: ^^ Consider a variant for Ephemeral testing -
1059    // persistenceEnabled:false
1060
1061    this->createManager();
1062
1063    std::vector<std::thread> threads;
1064
1065    // vector of pairs, first is seqno, second is CAS
1066    // just do a scatter gather over n_threads
1067    std::vector<std::vector<std::pair<uint64_t, uint64_t> > > threadData(n_threads);
1068    for (int ii = 0; ii < n_threads; ii++) {
1069        auto& threadsData = threadData[ii];
1070        threads.push_back(std::thread([this, ii, n_items, &threadsData](){
1071            std::string key = "key" + std::to_string(ii);
1072            for (int item  = 0; item < n_items; item++) {
1073                queued_item qi(
1074                        new Item(makeStoredDocKey(key + std::to_string(item)),
1075                                 this->vbucket->getId(),
1076                                 queue_op::mutation,
1077                                 /*revSeq*/ 0,
1078                                 /*bySeq*/ 0));
1079                EXPECT_TRUE(
1080                        this->manager->queueDirty(*this->vbucket,
1081                                                  qi,
1082                                                  GenerateBySeqno::Yes,
1083                                                  GenerateCas::Yes,
1084                                                  /*preLinkDocCtx*/ nullptr));
1085
1086                // Save seqno/cas
1087                threadsData.push_back(std::make_pair(qi->getBySeqno(), qi->getCas()));
1088            }
1089        }));
1090    }
1091
1092    // Wait for all threads
1093    for (auto& thread : threads) {
1094        thread.join();
1095    }
1096
1097    // Now combine the data and check HLC is increasing with seqno
1098    std::map<uint64_t, uint64_t> finalData;
1099    for (auto t : threadData) {
1100        for (auto pair : t) {
1101            EXPECT_EQ(finalData.end(), finalData.find(pair.first));
1102            finalData[pair.first] = pair.second;
1103        }
1104    }
1105
1106    auto itr = finalData.begin();
1107    EXPECT_NE(itr, finalData.end());
1108    uint64_t previousCas = (itr++)->second;
1109    EXPECT_NE(itr, finalData.end());
1110    for (; itr != finalData.end(); itr++) {
1111        EXPECT_LT(previousCas, itr->second);
1112        previousCas = itr->second;
1113    }
1114
1115    // Now a final check, iterate the checkpoint and also check for increasing
1116    // HLC.
1117    std::vector<queued_item> items;
1118    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1119
1120    /* We should have got (n_threads*n_items + op_ckpt_start) items. */
1121    EXPECT_EQ(n_threads*n_items + 1, items.size());
1122
1123    previousCas = items[1]->getCas();
1124    for (size_t ii = 2; ii < items.size(); ii++) {
1125        EXPECT_LT(previousCas, items[ii]->getCas());
1126        previousCas = items[ii]->getCas();
1127    }
1128}
1129
1130// Test cursor is correctly updated when enqueuing a key which already exists
1131// in the checkpoint (and needs de-duping), where the cursor points at a
1132// meta-item at the head of the checkpoint:
1133//
1134//  Before:
1135//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET(key), 2:SET_VBSTATE() ]
1136//                                                               ^
1137//                                                            Cursor
1138//
1139//  After:
1140//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 2:SET_VBSTATE(), 2:SET(key) ]
1141//                                                     ^
1142//                                                   Cursor
1143//
1144TYPED_TEST(CheckpointTest, CursorUpdateForExistingItemWithMetaItemAtHead) {
1145    // Setup the checkpoint and cursor.
1146    ASSERT_EQ(1, this->manager->getNumItems());
1147    ASSERT_TRUE(this->queueNewItem("key"));
1148    ASSERT_EQ(2, this->manager->getNumItems());
1149    this->manager->queueSetVBState(*this->vbucket);
1150
1151    ASSERT_EQ(3, this->manager->getNumItems());
1152
1153    // Advance persistence cursor so all items have been consumed.
1154    std::vector<queued_item> items;
1155    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1156    ASSERT_EQ(3, items.size());
1157    ASSERT_EQ(0,
1158              this->manager->getNumItemsForCursor(
1159                      CheckpointManager::pCursorName));
1160
1161    // Queue an item with a duplicate key.
1162    this->queueNewItem("key");
1163
1164    // Test: Should have one item for cursor (the one we just added).
1165    EXPECT_EQ(1,
1166              this->manager->getNumItemsForCursor(
1167                      CheckpointManager::pCursorName));
1168
1169    // Should have another item to read (new version of 'key')
1170    items.clear();
1171    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1172    EXPECT_EQ(1, items.size());
1173}
1174
1175// Test cursor is correctly updated when enqueuing a key which already exists
1176// in the checkpoint (and needs de-duping), where the cursor points at a
1177// meta-item *not* at the head of the checkpoint:
1178//
1179//  Before:
1180//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 1:SET() ]
1181//                                                     ^
1182//                                                    Cursor
1183//
1184//  After:
1185//      Checkpoint [ 0:EMPTY(), 1:CKPT_START(), 1:SET_VBSTATE(key), 2:SET() ]
1186//                                                     ^
1187//                                                   Cursor
1188//
1189TYPED_TEST(CheckpointTest, CursorUpdateForExistingItemWithNonMetaItemAtHead) {
1190    // Setup the checkpoint and cursor.
1191    ASSERT_EQ(1, this->manager->getNumItems());
1192    this->manager->queueSetVBState(*this->vbucket);
1193    ASSERT_EQ(2, this->manager->getNumItems());
1194
1195    // Advance persistence cursor so all items have been consumed.
1196    std::vector<queued_item> items;
1197    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1198    ASSERT_EQ(2, items.size());
1199    ASSERT_EQ(0,
1200              this->manager->getNumItemsForCursor(
1201                      CheckpointManager::pCursorName));
1202
1203    // Queue a set (cursor will now be one behind).
1204    ASSERT_TRUE(this->queueNewItem("key"));
1205    ASSERT_EQ(1,
1206              this->manager->getNumItemsForCursor(
1207                      CheckpointManager::pCursorName));
1208
1209    // Test: queue an item with a duplicate key.
1210    this->queueNewItem("key");
1211
1212    // Test: Should have one item for cursor (the one we just added).
1213    EXPECT_EQ(1,
1214              this->manager->getNumItemsForCursor(
1215                      CheckpointManager::pCursorName));
1216
1217    // Should an item to read (new version of 'key')
1218    items.clear();
1219    this->manager->getAllItemsForCursor(CheckpointManager::pCursorName, items);
1220    EXPECT_EQ(1, items.size());
1221    EXPECT_EQ(1002, items.at(0)->getBySeqno());
1222    EXPECT_EQ(makeStoredDocKey("key"), items.at(0)->getKey());
1223}
1224
1225// Regression test for MB-21925 - when a duplicate key is queued and the
1226// persistence cursor is still positioned on the initial dummy key,
1227// should return EXISTING_ITEM.
1228TYPED_TEST(CheckpointTest,
1229           MB21925_QueueDuplicateWithPersistenceCursorOnInitialMetaItem) {
1230    // Need a manager starting from seqno zero.
1231    this->createManager(0);
1232    ASSERT_EQ(0, this->manager->getHighSeqno());
1233    ASSERT_EQ(1, this->manager->getNumItems())
1234            << "Should start with queue_op::empty on checkpoint.";
1235
1236    // Add an item with some new key.
1237    ASSERT_TRUE(this->queueNewItem("key"));
1238
1239    // Test - second item (duplicate key) should return false.
1240    EXPECT_FALSE(this->queueNewItem("key"));
1241}
1242
1243/*
1244 * We always want to close the current open checkpoint on replica-vbuckets
1245 * when the Consumer receives the snapshotEnd mutation of a memory-snapshot.
1246 */
1247TEST_F(SingleThreadedCheckpointTest,
1248       MB30019_CloseReplicaCheckpointOnMemorySnapshotEnd) {
1249    setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1250    auto vb = store->getVBuckets().getBucket(vbid);
1251    auto* ckptMgr = vb->checkpointManager.get();
1252    ASSERT_NE(nullptr, ckptMgr);
1253
1254    // We must have only 1 open checkpoint
1255    ASSERT_EQ(1, ckptMgr->getNumCheckpoints());
1256    // We must have only one cursor (the persistence cursor), as there is no
1257    // DCP producer for vbid
1258    ASSERT_EQ(1, ckptMgr->getNumOfCursors());
1259    // We must have only the checkpoint-open and the vbucket-state meta-items
1260    // in the open checkpoint
1261    ASSERT_EQ(2, ckptMgr->getNumItems());
1262    ASSERT_EQ(0, ckptMgr->getNumOpenChkItems());
1263
1264    auto consumer =
1265            std::make_shared<MockDcpConsumer>(*engine, cookie, "test-consumer");
1266    auto passiveStream = std::static_pointer_cast<MockPassiveStream>(
1267            consumer->makePassiveStream(
1268                    *engine,
1269                    consumer,
1270                    "test-passive-stream",
1271                    0 /* flags */,
1272                    0 /* opaque */,
1273                    vbid,
1274                    0 /* startSeqno */,
1275                    std::numeric_limits<uint64_t>::max() /* endSeqno */,
1276                    0 /* vbUuid */,
1277                    0 /* snapStartSeqno */,
1278                    0 /* snapEndSeqno */,
1279                    0 /* vb_high_seqno */));
1280
1281    const size_t snapshotEnd = 3;
1282    // 1) the consumer receives the snapshot-marker
1283    SnapshotMarker snapshotMarker(
1284            0 /* opaque */,
1285            vbid,
1286            0 /* startSeqno */,
1287            snapshotEnd /* endSeqno */,
1288            dcp_marker_flag_t::MARKER_FLAG_MEMORY /* flags */);
1289    passiveStream->processMarker(&snapshotMarker);
1290
1291    // 2) the consumer receives the mutations until (snapshotEnd -1)
1292    size_t i = 1;
1293    for (; i < snapshotEnd; i++) {
1294        // Queue item
1295        queued_item qi(new Item(makeStoredDocKey("key_" + std::to_string(i)),
1296                                0 /*flags*/,
1297                                0 /*expiry*/,
1298                                "value",
1299                                5 /*valueSize*/,
1300                                PROTOCOL_BINARY_RAW_BYTES,
1301                                0 /*cas*/,
1302                                i /*bySeqno*/,
1303                                vb->getId()));
1304
1305        MutationResponse mutation(std::move(qi), 0 /* opaque */);
1306
1307        // PassiveStream::processMutation does 2 things:
1308        //     1) setWithMeta (which enqueues the item into the checkpoint)
1309        //     2) calls PassiveStream::handleSnapshotEnd (which must close the
1310        //             open checkpoint if the current mutation is the
1311        //             snapshot-end)
1312        passiveStream->processMutation(&mutation);
1313    }
1314    // We must have 2 items in the checkpoint now
1315    ASSERT_EQ(snapshotEnd - 1, ckptMgr->getNumOpenChkItems());
1316    // We still must have only 1 open checkpoint, as the consumer has not
1317    // received the snapshot-end mutation
1318    ASSERT_EQ(1, ckptMgr->getNumCheckpoints());
1319
1320    // 3) the consumer receives the snapshotEnd mutation
1321    queued_item qi(
1322            new Item(makeStoredDocKey("key_" + std::to_string(snapshotEnd)),
1323                     0 /*flags*/,
1324                     0 /*expiry*/,
1325                     "value",
1326                     5 /*valueSize*/,
1327                     PROTOCOL_BINARY_RAW_BYTES,
1328                     0 /*cas*/,
1329                     i /*bySeqno*/,
1330                     vb->getId()));
1331    MutationResponse mutation(std::move(qi), 0 /* opaque */);
1332    passiveStream->processMutation(&mutation);
1333
1334    // The consumer has received the snapshotEnd mutation, now we expect
1335    // that a new (empty) open checkpoint has been created. So we must have
1336    // 2 checkpoints in total (the closed and the new open one).
1337    ASSERT_EQ(2, ckptMgr->getNumCheckpoints());
1338
1339    // Also, the new open checkpoint must be empty (all mutations are in the
1340    // closed one)
1341    const auto& ckptList =
1342            CheckpointManagerTestIntrospector::public_getCheckpointList(
1343                    *ckptMgr);
1344    ASSERT_EQ(ckptList.back()->getId(), ckptList.front()->getId() + 1);
1345    ASSERT_EQ(checkpoint_state::CHECKPOINT_CLOSED,
1346              ckptList.front()->getState_UNLOCKED());
1347    ASSERT_EQ(snapshotEnd, ckptList.front()->getNumItems());
1348    ASSERT_EQ(checkpoint_state::CHECKPOINT_OPEN,
1349              ckptList.back()->getState_UNLOCKED());
1350    ASSERT_EQ(0, ckptList.back()->getNumItems());
1351}
1352