1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2018 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "checkpoint_remover_test.h"
19#include <engines/ep/src/checkpoint_remover.h>
20
21#include "../mock/mock_dcp.h"
22#include "../mock/mock_dcp_consumer.h"
23#include "../mock/mock_dcp_producer.h"
24#include "checkpoint.h"
25#include "test_helpers.h"
26
27size_t CheckpointRemoverTest::getMaxCheckpointItems(VBucket& vb) {
28    return vb.checkpointManager->getCheckpointConfig().getCheckpointMaxItems();
29}
30
31const CheckpointList&
32CheckpointManagerTestIntrospector::public_getCheckpointList(
33        CheckpointManager& checkpointManager) {
34    return checkpointManager.checkpointList;
35}
36
37/**
38 * Check that the VBucketMap.getActiveVBucketsSortedByChkMgrMem() returns the
39 * correct ordering of vBuckets, sorted from largest memory usage to smallest.
40 */
41TEST_F(CheckpointRemoverEPTest, GetActiveVBucketsSortedByChkMgrMem) {
42    for (uint16_t i = 0; i < 3; i++) {
43        setVBucketStateAndRunPersistTask(i, vbucket_state_active);
44        for (uint16_t j = 0; j < i; j++) {
45            std::string doc_key =
46                    "key_" + std::to_string(i) + "_" + std::to_string(j);
47            store_item(i, makeStoredDocKey(doc_key), "value");
48        }
49    }
50
51    auto map = store->getVBuckets().getActiveVBucketsSortedByChkMgrMem();
52
53    // The map should be 3 elements long, since we created 3 vBuckets
54    ASSERT_EQ(3, map.size());
55
56    for (size_t i = 1; i < map.size(); i++) {
57        auto this_vbucket = map[i];
58        auto prev_vbucket = map[i - 1];
59        // This vBucket should have a greater memory usage than the one previous
60        // to it in the map
61        ASSERT_GE(this_vbucket.second, prev_vbucket.second);
62    }
63}
64
65/**
66 * Check that the CheckpointManager memory usage calculation is correct and
67 * accurate based on the size of the checkpoints in it.
68 */
69TEST_F(CheckpointRemoverEPTest, CheckpointManagerMemoryUsage) {
70    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
71    auto vb = store->getVBuckets().getBucket(vbid);
72    auto& checkpointManager = vb->checkpointManager;
73
74    // We should have one checkpoint which is for the state change
75    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
76
77    // Check that the expected memory usage of the checkpoints is correct
78    size_t expected_size = 0;
79    for (auto& checkpoint :
80         CheckpointManagerTestIntrospector::public_getCheckpointList(
81                 *checkpointManager)) {
82        for (auto itr = checkpoint->begin(); itr != checkpoint->end(); ++itr) {
83            expected_size += (*itr)->size();
84        }
85    }
86    ASSERT_EQ(expected_size, checkpointManager->getMemoryUsage());
87
88    // Check that the new checkpoint memory usage is equal to the previous
89    // amount + size of new item
90    Item item = store_item(vbid, makeStoredDocKey("key0"), "value");
91    ASSERT_EQ(expected_size + item.size(), checkpointManager->getMemoryUsage());
92}
93
94/**
95 * Test CheckpointManager correctly returns which cursors we are eligible to
96 * drop. We should not be allowed to drop any cursors in a checkpoint when the
97 * persistence cursor is present.
98 */
99TEST_F(CheckpointRemoverEPTest, CursorsEligibleToDrop) {
100    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
101    auto vb = store->getVBuckets().getBucket(vbid);
102    auto& checkpointManager = vb->checkpointManager;
103
104    // We should have one checkpoint which is for the state change
105    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
106    // We should only have one cursor, which is for persistence
107    ASSERT_EQ(1, checkpointManager->getNumOfCursors());
108
109    auto producer =
110            createDcpProducer(cookie, {}, false, IncludeDeleteTime::Yes);
111
112    // The persistence cursor is still within the current checkpoint,
113    // so we should not be allowed to drop any cursors at this time
114    auto cursors = checkpointManager->getListOfCursorsToDrop();
115    ASSERT_EQ(0, cursors.size());
116
117    // Create a DCP stream for the vBucket, and check that we now have 2 cursors
118    // registered
119    createDcpStream(*producer);
120    ASSERT_EQ(2, checkpointManager->getNumOfCursors());
121
122    // Insert items to the vBucket so we create a new checkpoint by going over
123    // the max items limit by 10
124    for (size_t i = 0; i < getMaxCheckpointItems(*vb) + 10; i++) {
125        std::string doc_key = "key_" + std::to_string(i);
126        store_item(vbid, makeStoredDocKey(doc_key), "value");
127    }
128
129    // We should now have 2 checkpoints for this vBucket
130    ASSERT_EQ(2, checkpointManager->getNumCheckpoints());
131
132    // Run the persistence task for this vBucket, this should advance the
133    // persistence cursor out of the first checkpoint
134    flush_vbucket_to_disk(vbid, getMaxCheckpointItems(*vb) + 10);
135
136    // We should now be eligible to drop the user created DCP stream from the
137    // checkpoint
138    cursors = checkpointManager->getListOfCursorsToDrop();
139    ASSERT_EQ(1, cursors.size());
140    ActiveStream& activeStream =
141            reinterpret_cast<ActiveStream&>(*producer->findStream(vbid));
142    ASSERT_EQ(activeStream.getCursorName(), cursors[0]);
143}
144
145/**
146 * Check that the memory of unreferenced checkpoints after we drop all cursors
147 * in a checkpoint is equal to the size of the items that were contained within
148 * it.
149 */
150TEST_F(CheckpointRemoverEPTest, CursorDropMemoryFreed) {
151    setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
152    auto vb = store->getVBuckets().getBucket(vbid);
153    auto& checkpointManager = vb->checkpointManager;
154
155    // We should have one checkpoint which is for the state change
156    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
157    // We should only have one cursor, which is for persistence
158    ASSERT_EQ(1, checkpointManager->getNumOfCursors());
159
160    auto initialSize = checkpointManager->getMemoryUsage();
161
162    auto producer =
163            createDcpProducer(cookie, {}, false, IncludeDeleteTime::Yes);
164
165    createDcpStream(*producer);
166
167    auto expectedFreedMemoryFromItems = 0;
168    for (size_t i = 0; i < getMaxCheckpointItems(*vb); i++) {
169        std::string doc_key = "key_" + std::to_string(i);
170        Item item = store_item(vbid, makeStoredDocKey(doc_key), "value");
171        expectedFreedMemoryFromItems += item.size();
172    }
173    ASSERT_EQ(1, checkpointManager->getNumCheckpoints());
174    ASSERT_EQ(getMaxCheckpointItems(*vb) + 2, checkpointManager->getNumItems());
175    ASSERT_NE(0, expectedFreedMemoryFromItems);
176
177    // Insert a new item, this will create a new checkpoint
178    store_item(vbid, makeStoredDocKey("Banana"), "value");
179    ASSERT_EQ(2, checkpointManager->getNumCheckpoints());
180
181    // Run the persistence task for this vBucket, this should advance the
182    // persistence cursor out of the first checkpoint
183    flush_vbucket_to_disk(vbid, getMaxCheckpointItems(*vb) + 1);
184
185    auto cursors = checkpointManager->getListOfCursorsToDrop();
186    ASSERT_EQ(1, cursors.size());
187    ActiveStream& activeStream =
188            reinterpret_cast<ActiveStream&>(*producer->findStream(vbid));
189    ASSERT_EQ(activeStream.getCursorName(), cursors[0]);
190
191    // Manually handle the slow stream, this is the same logic as the checkpoint
192    // remover task uses, just without the overhead of setting up the task
193    auto memoryOverhead = checkpointManager->getMemoryOverhead();
194    if (engine->getDcpConnMap().handleSlowStream(vbid, cursors[0])) {
195        ASSERT_EQ(expectedFreedMemoryFromItems + initialSize,
196                  checkpointManager->getMemoryUsageOfUnrefCheckpoints());
197        // Check that the memory of unreferenced checkpoints is greater than or
198        // equal to the pre-cursor-dropped memory overhead.
199        //
200        // This is the least amount of memory we expect to be able to free,
201        // as it is all internal and independent from the HashTable.
202        ASSERT_GE(checkpointManager->getMemoryUsageOfUnrefCheckpoints(),
203                  memoryOverhead);
204    } else {
205        ASSERT_FALSE(producer->isCursorDroppingEnabled());
206    }
207
208    // There should only be the one checkpoint cursor now for persistence
209    ASSERT_EQ(1, checkpointManager->getNumOfCursors());
210}