1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2016 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18/*
19 * Unit tests for the EventuallyPersistentStore class.
20 *
21 * Note that these test do *not* have the normal Tasks running (BGFetcher,
22 * flusher etc) as we do not initialise EPEngine. This means that such tasks
23 * need to be manually run. This can be very helpful as it essentially gives us
24 * synchronous control of EPStore.
25 */
26
27#include "evp_store_test.h"
28
29#include "bgfetcher.h"
30#include "checkpoint.h"
31#include "checkpoint_remover.h"
32#include "connmap.h"
33#include "dcp/flow-control-manager.h"
34#include "ep_engine.h"
35#include "flusher.h"
36#include "../mock/mock_dcp_producer.h"
37#include "replicationthrottle.h"
38#include "tasks.h"
39
40#include "programs/engine_testapp/mock_server.h"
41
42#include <chrono>
43#include <platform/dirutils.h>
44#include <thread>
45
46SynchronousEPEngine::SynchronousEPEngine(const std::string& extra_config)
47    : EventuallyPersistentEngine(get_mock_server_api) {
48    maxFailoverEntries = 1;
49
50    // Merge any extra config into the main configuration.
51    if (extra_config.size() > 0) {
52        if (!configuration.parseConfiguration(extra_config.c_str(),
53                                              serverApi)) {
54            throw std::invalid_argument("Unable to parse config string: " +
55                                        extra_config);
56        }
57    }
58
59    // workload is needed by EPStore's constructor (to construct the
60    // VBucketMap).
61    workload = new WorkLoadPolicy(/*workers*/1, /*shards*/1);
62
63    // dcpConnMap_ is needed by EPStore's constructor.
64    dcpConnMap_ = new DcpConnMap(*this);
65
66    // tapConnMap is needed by queueDirty.
67    tapConnMap = new TapConnMap(*this);
68
69    // checkpointConfig is needed by CheckpointManager (via EPStore).
70    checkpointConfig = new CheckpointConfig(*this);
71
72    dcpFlowControlManager_ = new DcpFlowControlManager(*this);
73
74    replicationThrottle = new ReplicationThrottle(configuration, stats);
75
76    tapConfig = new TapConfig(*this);
77}
78
79void SynchronousEPEngine::setEPStore(EventuallyPersistentStore* store) {
80    cb_assert(epstore == nullptr);
81    epstore = store;
82}
83
84MockEPStore::MockEPStore(EventuallyPersistentEngine &theEngine)
85    : EventuallyPersistentStore(theEngine) {
86    // Perform a limited set of setup (normally done by EPStore::initialize) -
87    // enough such that objects which are assumed to exist are present.
88
89    // Create the closed checkpoint removed task. Note we do _not_ schedule
90    // it, unlike EPStore::initialize
91    chkTask = new ClosedUnrefCheckpointRemoverTask
92            (&engine, stats, theEngine.getConfiguration().getChkRemoverStime());
93}
94
95VBucketMap& MockEPStore::getVbMap() {
96    return vbMap;
97}
98
99/* Mock Task class. Doesn't actually run() or snooze() - they both do nothing.
100 */
101class MockGlobalTask : public GlobalTask {
102public:
103    MockGlobalTask(Taskable& t, TaskId id)
104        : GlobalTask(t, id) {}
105
106    bool run() override { return false; }
107    std::string getDescription() override { return "MockGlobalTask"; }
108
109    void snooze(const double secs) override {}
110};
111
112void EventuallyPersistentStoreTest::SetUp() {
113    // Paranoia - kill any existing files in case they are left over
114    // from a previous run.
115    CouchbaseDirectoryUtilities::rmrf(test_dbname);
116
117    // Add dbname to config string.
118    std::string config = config_string;
119    if (config.size() > 0) {
120        config += ";";
121    }
122    config += "dbname=" + std::string(test_dbname);
123
124    engine.reset(new SynchronousEPEngine(config));
125    ObjectRegistry::onSwitchThread(engine.get());
126
127    store = new MockEPStore(*engine);
128    engine->setEPStore(store);
129
130    // Ensure that EPEngine is hold about necessary server callbacks
131    // (client disconnect, bucket delete).
132    engine->public_initializeEngineCallbacks();
133
134    // Need to initialize ep_real_time and friends.
135    initialize_time_functions(get_mock_server_api()->core);
136
137    cookie = create_mock_cookie();
138}
139
140void EventuallyPersistentStoreTest::TearDown() {
141    destroy_mock_cookie(cookie);
142    destroy_mock_event_callbacks();
143    engine->getDcpConnMap().manageConnections();
144    engine.reset();
145
146    // Shutdown the ExecutorPool singleton (initialized when we create
147    // an EventuallyPersistentStore object). Must happen after engine
148    // has been destroyed (to allow the tasks the engine has
149    // registered a chance to be unregistered).
150    ExecutorPool::shutdown();
151}
152
153Item EventuallyPersistentStoreTest::store_item(uint16_t vbid,
154                                               const std::string& key,
155                                               const std::string& value) {
156    uint8_t ext_meta[EXT_META_LEN] = {PROTOCOL_BINARY_DATATYPE_JSON};
157
158    Item item(key.c_str(), key.size(), /*flags*/0, /*exp*/0, value.c_str(),
159              value.size(), ext_meta, sizeof(ext_meta));
160    item.setVBucketId(vbid);
161    EXPECT_EQ(ENGINE_SUCCESS, store->set(item, nullptr));
162
163    return item;
164}
165
166void EventuallyPersistentStoreTest::flush_vbucket_to_disk(uint16_t vbid) {
167    int result;
168    const auto time_limit = std::chrono::seconds(10);
169    const auto deadline = std::chrono::steady_clock::now() + time_limit;
170
171    // Need to retry as warmup may not have completed.
172    bool flush_successful = false;
173    do {
174        result = store->flushVBucket(vbid);
175        if (result != RETRY_FLUSH_VBUCKET) {
176            flush_successful = true;
177            break;
178        }
179        std::this_thread::sleep_for(std::chrono::microseconds(100));
180    } while (std::chrono::steady_clock::now() < deadline);
181
182    ASSERT_TRUE(flush_successful)
183        << "Hit timeout (" << time_limit.count() << " seconds) waiting for "
184           "warmup to complete while flushing VBucket.";
185
186    ASSERT_EQ(1, result) << "Failed to flush the one item we have stored.";
187
188    /**
189     * Although a flushVBucket writes the item to the underlying store,
190     * the item is not marked clean until an explicit commit is called
191     * If the underlying store is couchstore, a commit is called with
192     * a flushVBucket but in the case of forestdb, a commit is not
193     * always called, hence call an explicit commit.
194     */
195    uint16_t numShards = store->getVbMap().getNumShards();
196
197    store->commit(vbid % numShards);
198}
199
200
201// Verify that when handling a bucket delete with open DCP
202// connections, we don't deadlock when notifying the front-end
203// connection.
204// This is a potential issue because notify_IO_complete
205// needs to lock the worker thread mutex the connection is assigned
206// to, to update the event list for that connection, which the worker
207// thread itself will have locked while it is running. Normally
208// deadlock is avoided by using a background thread (ConnNotifier),
209// which only calls notify_IO_complete and isnt' involved with any
210// other mutexes, however we cannot use that task as it gets shut down
211// during shutdownAllConnections.
212// This test requires ThreadSanitizer or similar to validate;
213// there's no guarantee we'll actually deadlock on any given run.
214TEST_F(EventuallyPersistentStoreTest, test_mb20751_deadlock_on_disconnect_delete) {
215
216    // Create a new Dcp producer, reserving its cookie.
217    get_mock_server_api()->cookie->reserve(cookie);
218    dcp_producer_t producer = engine->getDcpConnMap().newProducer(
219        cookie, "mb_20716r", /*notifyOnly*/false);
220
221    // Check preconditions.
222    EXPECT_TRUE(producer->isPaused());
223
224    // 1. To check that there's no potential data-race with the
225    //    concurrent connection disconnect on another thread
226    //    (simulating a front-end thread).
227    std::thread frontend_thread_handling_disconnect{[this](){
228            // Frontend thread always runs with the cookie locked, so
229            // lock here to match.
230            lock_mock_cookie(cookie);
231            engine->handleDisconnect(cookie);
232            unlock_mock_cookie(cookie);
233        }};
234
235    // 2. Trigger a bucket deletion.
236    engine->handleDeleteBucket(cookie);
237
238    frontend_thread_handling_disconnect.join();
239}
240
241class EPStoreEvictionTest : public EventuallyPersistentStoreTest,
242                             public ::testing::WithParamInterface<std::string> {
243    void SetUp() override {
244        config_string += std::string{"item_eviction_policy="} + GetParam();
245        EventuallyPersistentStoreTest::SetUp();
246
247        // Have all the objects, activate vBucket zero so we can store data.
248        store->setVBucketState(vbid, vbucket_state_active, false);
249
250    }
251};
252
253// getKeyStats tests //////////////////////////////////////////////////////////
254
255// Check that keystats on resident items works correctly.
256TEST_P(EPStoreEvictionTest, GetKeyStatsResident) {
257    key_stats kstats;
258
259    // Should start with key not existing.
260    EXPECT_EQ(ENGINE_KEY_ENOENT,
261              store->getKeyStats("key", 0, cookie, kstats, /*bgfetch*/true,
262                                 /*wantsDeleted*/false));
263
264    store_item(0, "key", "value");
265    EXPECT_EQ(ENGINE_SUCCESS,
266              store->getKeyStats("key", 0, cookie, kstats, /*bgfetch*/true,
267                                 /*wantsDeleted*/false))
268        << "Expected to get key stats on existing item";
269    EXPECT_EQ(vbucket_state_active, kstats.vb_state);
270    EXPECT_FALSE(kstats.logically_deleted);
271}
272
273// Check that keystats on ejected items. When ejected should return ewouldblock
274// until bgfetch completes.
275TEST_P(EPStoreEvictionTest, GetKeyStatsEjected) {
276    key_stats kstats;
277
278    // Store then eject an item. Note we cannot forcefully evict as we have
279    // to ensure it's own disk so we can later bg fetch from there :)
280    store_item(0, "key", "value");
281
282    // Trigger a flush to disk. We have to retry as the warmup may not be
283    // complete.
284    int result;
285    const auto deadline = std::chrono::steady_clock::now() +
286                          std::chrono::seconds(5);
287    do {
288        result = store->flushVBucket(vbid);
289        if (result != RETRY_FLUSH_VBUCKET) {
290            break;
291        }
292        std::this_thread::sleep_for(std::chrono::microseconds(100));
293    } while (std::chrono::steady_clock::now() < deadline);
294
295    ASSERT_EQ(1,
296              result) << "Failed to flush the one item we have stored.";
297
298    const char* msg;
299    size_t msg_size{sizeof(msg)};
300    EXPECT_EQ(ENGINE_SUCCESS, store->evictKey("key", 0, &msg, &msg_size));
301    EXPECT_EQ("Ejected.", std::string(msg));
302
303    // Setup a lambda for how we want to call getKeyStats (saves repeating the
304    // same arguments for each instance below).
305    auto do_getKeyStats = [this, &kstats]() {
306        return store->getKeyStats("key", vbid, cookie, kstats,
307                                  /*bgfetch*/true, /*wantsDeleted*/false);
308    };
309
310    if (GetParam() == "value_only") {
311        EXPECT_EQ(ENGINE_SUCCESS, do_getKeyStats())
312            << "Expected to get key stats on evicted item";
313
314    } else if (GetParam() == "full_eviction") {
315
316        // Try to get key stats. This should return EWOULDBLOCK (as the whole
317        // item is no longer resident). As we arn't running the full EPEngine
318        // task system, then no BGFetch task will be automatically run, we'll
319        // manually run it.
320
321        EXPECT_EQ(ENGINE_EWOULDBLOCK, do_getKeyStats())
322            << "Expected to need to go to disk to get key stats on fully evicted item";
323
324        // Try a second time - this should detect the already-created temp
325        // item, and re-schedule the bgfetch.
326        EXPECT_EQ(ENGINE_EWOULDBLOCK, do_getKeyStats())
327            << "Expected to need to go to disk to get key stats on fully evicted item (try 2)";
328
329        // Manually run the BGFetcher task; to fetch the two outstanding
330        // requests (for the same key).
331        MockGlobalTask mockTask(engine->getTaskable(),
332                                TaskId::MultiBGFetcherTask);
333        store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
334
335        EXPECT_EQ(ENGINE_SUCCESS, do_getKeyStats())
336            << "Expected to get key stats on evicted item after notify_IO_complete";
337
338    } else {
339        FAIL() << "Unhandled GetParam() value:" << GetParam();
340    }
341}
342
343// Create then delete an item, checking we get keyStats reporting the item as
344// deleted.
345TEST_P(EPStoreEvictionTest, GetKeyStatsDeleted) {
346    auto& epstore = *engine->getEpStore();
347    key_stats kstats;
348
349    store_item(0, "key", "value");
350    uint64_t cas = 0;
351    mutation_descr_t mut_info;
352    EXPECT_EQ(ENGINE_SUCCESS,
353              epstore.deleteItem("key", &cas, /*vbucket*/0, cookie,
354                                 /*force*/false, /*itemMeta*/nullptr,
355                                 &mut_info));
356
357    // Should get ENOENT if we don't ask for deleted items.
358    EXPECT_EQ(ENGINE_KEY_ENOENT,
359              epstore.getKeyStats("key", 0, cookie, kstats, /*bgfetch*/false,
360                                  /*wantsDeleted*/false));
361
362    // Should get success (and item flagged as deleted) if we ask for deleted
363    // items.
364    EXPECT_EQ(ENGINE_SUCCESS,
365              epstore.getKeyStats("key", 0, cookie, kstats, /*bgfetch*/true,
366                                  /*wantsDeleted*/true));
367    EXPECT_EQ(vbucket_state_active, kstats.vb_state);
368    EXPECT_TRUE(kstats.logically_deleted);
369}
370
371// Check incorrect vbucket returns not-my-vbucket.
372TEST_P(EPStoreEvictionTest, GetKeyStatsNMVB) {
373    auto& epstore = *engine->getEpStore();
374    key_stats kstats;
375
376    EXPECT_EQ(ENGINE_NOT_MY_VBUCKET,
377              epstore.getKeyStats("key", 1, cookie, kstats, /*bgfetch*/true,
378                                  /*wantsDeleted*/false));
379}
380
381// Test to ensure all pendingBGfetches are deleted when the
382// VBucketMemoryDeletionTask is run
383TEST_P(EPStoreEvictionTest, MB_21976) {
384    // Store an item, then eject it.
385    std::string key("key");
386    auto item = store_item(vbid, key, "value");
387    flush_vbucket_to_disk(vbid);
388    const char* msg;
389    size_t msg_size{sizeof(msg)};
390    EXPECT_EQ(ENGINE_SUCCESS, store->evictKey(key, 0, &msg, &msg_size));
391
392    // Perform a get, which should EWOULDBLOCK
393    get_options_t options = static_cast<get_options_t>(QUEUE_BG_FETCH |
394                                                       HONOR_STATES |
395                                                       TRACK_REFERENCE |
396                                                       DELETE_TEMP |
397                                                       HIDE_LOCKED_CAS |
398                                                       TRACK_STATISTICS);
399    GetValue gv = store->get(key, vbid, cookie, options);
400    EXPECT_EQ(ENGINE_EWOULDBLOCK,gv.getStatus());
401    // Mark the status of the cookie so that we can see if notify is called
402    struct mock_connstruct* c = (struct mock_connstruct *)cookie;
403    c->status = ENGINE_E2BIG;
404
405    // Manually run the VBucketMemoryDeletionTask task
406    RCPtr<VBucket> vb = store->getVBucket(vbid);
407    VBucketMemoryDeletionTask deletionTask(*engine, vb, /*delay*/0.0);
408    deletionTask.run();
409
410    // Check the status of the cookie to see if the cookie status has changed
411    // to ENGINE_NOT_MY_VBUCKET, which means the notify was sent
412    EXPECT_EQ(ENGINE_NOT_MY_VBUCKET, c->status);
413}
414
415// Test cases which run in both Full and Value eviction
416INSTANTIATE_TEST_CASE_P(FullAndValueEviction,
417                        EPStoreEvictionTest,
418                        ::testing::Values("value_only", "full_eviction"),
419                        [] (const ::testing::TestParamInfo<std::string>& info) {
420                            return info.param;
421                        });
422
423
424const char EventuallyPersistentStoreTest::test_dbname[] = "ep_engine_ep_unit_tests_db";
425