1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2018 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "checkpoint_remover_test.h"
19
20#include "checkpoint_utils.h"
21
22#include <engines/ep/src/checkpoint_remover.h>
23
24#include "../mock/mock_dcp.h"
25#include "../mock/mock_dcp_consumer.h"
26#include "../mock/mock_dcp_producer.h"
27#include "checkpoint.h"
28#include "test_helpers.h"
29
30size_t CheckpointRemoverTest::getMaxCheckpointItems(VBucket& vb) {
31    return vb.checkpointManager->getCheckpointConfig().getCheckpointMaxItems();
32}
33
34/**
35 * Check that the VBucketMap.getActiveVBucketsSortedByChkMgrMem() returns the
36 * correct ordering of vBuckets, sorted from largest memory usage to smallest.
37 */
38TEST_F(CheckpointRemoverEPTest, GetActiveVBucketsSortedByChkMgrMem) {
39    for (uint16_t i = 0; i < 3; i++) {
40        setVBucketStateAndRunPersistTask(i, vbucket_state_active);
41        for (uint16_t j = 0; j < i; j++) {
42            std::string doc_key =
43                    "key_" + std::to_string(i) + "_" + std::to_string(j);
44            store_item(i, makeStoredDocKey(doc_key), "value");
45        }
46    }
47
48    auto map = store->getVBuckets().getActiveVBucketsSortedByChkMgrMem();
49
50    // The map should be 3 elements long, since we created 3 vBuckets
51    ASSERT_EQ(3, map.size());
52
53    for (size_t i = 1; i < map.size(); i++) {
54        auto this_vbucket = map[i];
55        auto prev_vbucket = map[i - 1];
56        // This vBucket should have a greater memory usage than the one previous
57        // to it in the map
58        ASSERT_GE(this_vbucket.second, prev_vbucket.second);
59    }
60}
61
62/**
63 * Check that the CheckpointManager memory usage calculation is correct and
64 * accurate based on the size of the checkpoints in it.
65 */
66TEST_F(CheckpointRemoverEPTest, CheckpointManagerMemoryUsage) {
67    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
68    auto vb = store->getVBuckets().getBucket(vbid);
69    auto& checkpointManager = vb->checkpointManager;
70
71    // We should have one checkpoint which is for the state change
72    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
73
74    // Check that the expected memory usage of the checkpoints is correct
75    size_t expected_size = 0;
76    for (auto& checkpoint :
77         CheckpointManagerTestIntrospector::public_getCheckpointList(
78                 *checkpointManager)) {
79        for (auto itr = checkpoint->begin(); itr != checkpoint->end(); ++itr) {
80            expected_size += (*itr)->size();
81        }
82    }
83    ASSERT_EQ(expected_size, checkpointManager->getMemoryUsage());
84
85    // Check that the new checkpoint memory usage is equal to the previous
86    // amount + size of new item
87    Item item = store_item(vbid, makeStoredDocKey("key0"), "value");
88    ASSERT_EQ(expected_size + item.size(), checkpointManager->getMemoryUsage());
89}
90
91/**
92 * Test CheckpointManager correctly returns which cursors we are eligible to
93 * drop. We should not be allowed to drop any cursors in a checkpoint when the
94 * persistence cursor is present.
95 */
96TEST_F(CheckpointRemoverEPTest, CursorsEligibleToDrop) {
97    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
98    auto vb = store->getVBuckets().getBucket(vbid);
99    auto& checkpointManager = vb->checkpointManager;
100
101    // We should have one checkpoint which is for the state change
102    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
103    // We should only have one cursor, which is for persistence
104    ASSERT_EQ(1, checkpointManager->getNumOfCursors());
105
106    auto producer =
107            createDcpProducer(cookie, {}, false, IncludeDeleteTime::Yes);
108
109    // The persistence cursor is still within the current checkpoint,
110    // so we should not be allowed to drop any cursors at this time
111    auto cursors = checkpointManager->getListOfCursorsToDrop();
112    ASSERT_EQ(0, cursors.size());
113
114    // Create a DCP stream for the vBucket, and check that we now have 2 cursors
115    // registered
116    createDcpStream(*producer);
117    ASSERT_EQ(2, checkpointManager->getNumOfCursors());
118
119    // Insert items to the vBucket so we create a new checkpoint by going over
120    // the max items limit by 10
121    for (size_t i = 0; i < getMaxCheckpointItems(*vb) + 10; i++) {
122        std::string doc_key = "key_" + std::to_string(i);
123        store_item(vbid, makeStoredDocKey(doc_key), "value");
124    }
125
126    // We should now have 2 checkpoints for this vBucket
127    ASSERT_EQ(2, checkpointManager->getNumCheckpoints());
128
129    // Run the persistence task for this vBucket, this should advance the
130    // persistence cursor out of the first checkpoint
131    flush_vbucket_to_disk(vbid, getMaxCheckpointItems(*vb) + 10);
132
133    // We should now be eligible to drop the user created DCP stream from the
134    // checkpoint
135    cursors = checkpointManager->getListOfCursorsToDrop();
136    ASSERT_EQ(1, cursors.size());
137    ActiveStream& activeStream =
138            reinterpret_cast<ActiveStream&>(*producer->findStream(vbid));
139    ASSERT_EQ(activeStream.getCursorName(), cursors[0]);
140}
141
142/**
143 * Check that the memory of unreferenced checkpoints after we drop all cursors
144 * in a checkpoint is equal to the size of the items that were contained within
145 * it.
146 */
147TEST_F(CheckpointRemoverEPTest, CursorDropMemoryFreed) {
148    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
149    auto vb = store->getVBuckets().getBucket(vbid);
150    auto& checkpointManager = vb->checkpointManager;
151
152    // We should have one checkpoint which is for the state change
153    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
154    // We should only have one cursor, which is for persistence
155    ASSERT_EQ(1, checkpointManager->getNumOfCursors());
156
157    auto initialSize = checkpointManager->getMemoryUsage();
158
159    auto producer =
160            createDcpProducer(cookie, {}, false, IncludeDeleteTime::Yes);
161
162    createDcpStream(*producer);
163
164    auto expectedFreedMemoryFromItems = 0;
165    for (size_t i = 0; i < getMaxCheckpointItems(*vb); i++) {
166        std::string doc_key = "key_" + std::to_string(i);
167        Item item = store_item(vbid, makeStoredDocKey(doc_key), "value");
168        expectedFreedMemoryFromItems += item.size();
169    }
170    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
171    ASSERT_EQ(getMaxCheckpointItems(*vb) + 2, checkpointManager->getNumItems());
172    ASSERT_NE(0, expectedFreedMemoryFromItems);
173
174    // Insert a new item, this will create a new checkpoint
175    store_item(vbid, makeStoredDocKey("Banana"), "value");
176    ASSERT_EQ(2, checkpointManager->getNumCheckpoints());
177
178    // Run the persistence task for this vBucket, this should advance the
179    // persistence cursor out of the first checkpoint
180    flush_vbucket_to_disk(vbid, getMaxCheckpointItems(*vb) + 1);
181
182    auto cursors = checkpointManager->getListOfCursorsToDrop();
183    ASSERT_EQ(1, cursors.size());
184    ActiveStream& activeStream =
185            reinterpret_cast<ActiveStream&>(*producer->findStream(vbid));
186    ASSERT_EQ(activeStream.getCursorName(), cursors[0]);
187
188    // Manually handle the slow stream, this is the same logic as the checkpoint
189    // remover task uses, just without the overhead of setting up the task
190    auto memoryOverhead = checkpointManager->getMemoryOverhead();
191    if (engine->getDcpConnMap().handleSlowStream(vbid, cursors[0])) {
192        ASSERT_EQ(expectedFreedMemoryFromItems + initialSize,
193                  checkpointManager->getMemoryUsageOfUnrefCheckpoints());
194        // Check that the memory of unreferenced checkpoints is greater than or
195        // equal to the pre-cursor-dropped memory overhead.
196        //
197        // This is the least amount of memory we expect to be able to free,
198        // as it is all internal and independent from the HashTable.
199        ASSERT_GE(checkpointManager->getMemoryUsageOfUnrefCheckpoints(),
200                  memoryOverhead);
201    } else {
202        ASSERT_FALSE(producer->isCursorDroppingEnabled());
203    }
204
205    // There should only be the one checkpoint cursor now for persistence
206    ASSERT_EQ(1, checkpointManager->getNumOfCursors());
207}
208