1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2016 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /*
19  * Unit test for stats
20  */
21 
22 #include "stats_test.h"
23 #include "dcp/dcpconnmap.h"
24 #include "dcp/producer.h"
25 #include "dcp/stream.h"
26 #include "evp_store_single_threaded_test.h"
27 #include "tasks.h"
28 #include "test_helpers.h"
29 #include "thread_gate.h"
30 #include "tracing/trace_helpers.h"
31 
32 #include <gmock/gmock.h>
33 
34 #include <functional>
35 #include <thread>
36 
SetUp()37 void StatTest::SetUp() {
38     SingleThreadedEPBucketTest::SetUp();
39     store->setVBucketState(vbid, vbucket_state_active, false);
40 }
41 
get_stat(const char* statkey)42 std::map<std::string, std::string> StatTest::get_stat(const char* statkey) {
43     // Define a lambda to use as the ADD_STAT callback. Note we cannot use
44     // a capture for the statistics map (as it's a C-style callback), so
45     // instead pass via the cookie.
46     struct StatMap : cb::tracing::Traceable {
47         std::map<std::string, std::string> map;
48     };
49     StatMap stats;
50     auto add_stats = [](const char* key,
51                         const uint16_t klen,
52                         const char* val,
53                         const uint32_t vlen,
54                         gsl::not_null<const void*> cookie) {
55         auto* stats =
56                 reinterpret_cast<StatMap*>(const_cast<void*>(cookie.get()));
57         std::string k(key, klen);
58         std::string v(val, vlen);
59         stats->map[k] = v;
60     };
61 
62     ENGINE_HANDLE* handle = reinterpret_cast<ENGINE_HANDLE*>(engine.get());
63     EXPECT_EQ(
64             ENGINE_SUCCESS,
65             engine->get_stats(handle,
66                               &stats,
67                               {statkey, statkey == NULL ? 0 : strlen(statkey)},
68                               add_stats))
69             << "Failed to get stats.";
70 
71     return stats.map;
72 }
73 
74 class DatatypeStatTest : public StatTest,
75                          public ::testing::WithParamInterface<std::string> {
76 protected:
77     void SetUp() override {
78         config_string += std::string{"item_eviction_policy="} + GetParam();
79         StatTest::SetUp();
80     }
81 };
82 
TEST_F(StatTest, vbucket_seqno_stats_test)83 TEST_F(StatTest, vbucket_seqno_stats_test) {
84     using namespace testing;
85     const std::string vbucket = "vb_" + std::to_string(vbid);
86     auto vals = get_stat("vbucket-seqno");
87 
88     EXPECT_THAT(vals, UnorderedElementsAre(
89             Key(vbucket + ":uuid"),
90             Pair(vbucket + ":high_seqno", "0"),
91             Pair(vbucket + ":abs_high_seqno", "0"),
92             Pair(vbucket + ":last_persisted_seqno", "0"),
93             Pair(vbucket + ":purge_seqno", "0"),
94             Pair(vbucket + ":last_persisted_snap_start", "0"),
95             Pair(vbucket + ":last_persisted_snap_end", "0")));
96 }
97 
98 // Test that if we request takeover stats for stream that does not exist we
99 // return does_not_exist.
TEST_F(StatTest, vbucket_takeover_stats_no_stream)100 TEST_F(StatTest, vbucket_takeover_stats_no_stream) {
101     // Create a new Dcp producer, reserving its cookie.
102     get_mock_server_api()->cookie->reserve(cookie);
103     engine->getDcpConnMap().newProducer(cookie,
104                                         "test_producer",
105                                         /*flags*/ 0);
106 
107     const std::string stat = "dcp-vbtakeover " + std::to_string(vbid) +
108             " test_producer";;
109     auto vals = get_stat(stat.c_str());
110     EXPECT_EQ("does_not_exist", vals["status"]);
111     EXPECT_EQ(0, std::stoi(vals["estimate"]));
112     EXPECT_EQ(0, std::stoi(vals["backfillRemaining"]));
113 }
114 
115 // Test that if we request takeover stats for stream that is not active we
116 // return does_not_exist.
TEST_F(StatTest, vbucket_takeover_stats_stream_not_active)117 TEST_F(StatTest, vbucket_takeover_stats_stream_not_active) {
118     // Create a new Dcp producer, reserving its cookie.
119     get_mock_server_api()->cookie->reserve(cookie);
120     DcpProducer* producer = engine->getDcpConnMap().newProducer(
121             cookie, "test_producer", DCP_OPEN_NOTIFIER);
122 
123     uint64_t rollbackSeqno;
124     const std::string stat = "dcp-vbtakeover " + std::to_string(vbid) +
125             " test_producer";;
126     ASSERT_EQ(ENGINE_SUCCESS, producer->streamRequest(/*flags*/ 0,
127                                           /*opaque*/ 0,
128                                           /*vbucket*/ vbid,
129                                           /*start_seqno*/ 0,
130                                           /*end_seqno*/ 0,
131                                           /*vb_uuid*/ 0,
132                                           /*snap_start*/ 0,
133                                           /*snap_end*/ 0,
134                                           &rollbackSeqno,
135                                           fakeDcpAddFailoverLog));
136 
137     // Ensure its a notifier connection - this means that streams requested will
138     // not be active
139     ASSERT_EQ("notifier", std::string(producer->getType()));
140     auto vals = get_stat(stat.c_str());
141     EXPECT_EQ("does_not_exist", vals["status"]);
142     EXPECT_EQ(0, std::stoi(vals["estimate"]));
143     EXPECT_EQ(0, std::stoi(vals["backfillRemaining"]));
144     producer->closeStream(/*opaque*/ 0, vbid);
145 }
146 
147 
TEST_P(DatatypeStatTest, datatypesInitiallyZero)148 TEST_P(DatatypeStatTest, datatypesInitiallyZero) {
149     // Check that the datatype stats initialise to 0
150     auto vals = get_stat(nullptr);
151     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_snappy"]));
152     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_snappy,json"]));
153     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_snappy,xattr"]));
154     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_json"]));
155     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_json,xattr"]));
156     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_raw"]));
157     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_xattr"]));
158     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_snappy,json,xattr"]));
159 
160     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_snappy"]));
161     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_snappy,json"]));
162     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_snappy,xattr"]));
163     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_json"]));
164     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_json,xattr"]));
165     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_raw"]));
166     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_xattr"]));
167     EXPECT_EQ(0, std::stoi(vals["ep_replica_datatype_snappy,json,xattr"]));
168 }
169 
setDatatypeItem(KVBucket* store, const void* cookie, protocol_binary_datatype_t datatype, std::string name, std::string val = �)170 void setDatatypeItem(KVBucket* store,
171                      const void* cookie,
172                      protocol_binary_datatype_t datatype,
173                      std::string name, std::string val = "[0]") {
174     Item item(make_item(
175             0, {name, DocNamespace::DefaultCollection}, val, 0, datatype));
176     store->set(item, cookie);
177 }
178 
TEST_P(DatatypeStatTest, datatypeJsonToXattr)179 TEST_P(DatatypeStatTest, datatypeJsonToXattr) {
180     setDatatypeItem(store, cookie, PROTOCOL_BINARY_DATATYPE_JSON, "jsonDoc");
181     auto vals = get_stat(nullptr);
182     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_json"]));
183 
184     // Check that updating an items datatype works
185     setDatatypeItem(store, cookie, PROTOCOL_BINARY_DATATYPE_XATTR, "jsonDoc");
186     vals = get_stat(nullptr);
187 
188     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_xattr"]));
189     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_json"]));
190 }
191 
TEST_P(DatatypeStatTest, datatypeRawStatTest)192 TEST_P(DatatypeStatTest, datatypeRawStatTest) {
193     setDatatypeItem(store, cookie, 0, "rawDoc");
194     auto vals = get_stat(nullptr);
195     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_raw"]));
196 }
197 
TEST_P(DatatypeStatTest, datatypeXattrStatTest)198 TEST_P(DatatypeStatTest, datatypeXattrStatTest) {
199     setDatatypeItem(store, cookie, PROTOCOL_BINARY_DATATYPE_XATTR, "xattrDoc");
200     auto vals = get_stat(nullptr);
201     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_xattr"]));
202     // Update the same key with a different value. The datatype stat should
203     // stay the same
204     setDatatypeItem(store, cookie, PROTOCOL_BINARY_DATATYPE_XATTR,
205                     "xattrDoc", "[2]");
206     vals = get_stat(nullptr);
207     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_xattr"]));
208 }
209 
TEST_P(DatatypeStatTest, datatypeCompressedStatTest)210 TEST_P(DatatypeStatTest, datatypeCompressedStatTest) {
211     setDatatypeItem(store,
212                     cookie,
213                     PROTOCOL_BINARY_DATATYPE_SNAPPY,
214                     "compressedDoc");
215     auto vals = get_stat(nullptr);
216     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_snappy"]));
217 }
218 
TEST_P(DatatypeStatTest, datatypeCompressedJson)219 TEST_P(DatatypeStatTest, datatypeCompressedJson) {
220     setDatatypeItem(
221             store,
222             cookie,
223             PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_SNAPPY,
224             "jsonCompressedDoc");
225     auto vals = get_stat(nullptr);
226     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_snappy,json"]));
227 }
228 
TEST_P(DatatypeStatTest, datatypeCompressedXattr)229 TEST_P(DatatypeStatTest, datatypeCompressedXattr) {
230     setDatatypeItem(store,
231                     cookie,
232                     PROTOCOL_BINARY_DATATYPE_XATTR |
233                             PROTOCOL_BINARY_DATATYPE_SNAPPY,
234                     "xattrCompressedDoc");
235     auto vals = get_stat(nullptr);
236     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_snappy,xattr"]));
237 }
238 
TEST_P(DatatypeStatTest, datatypeJsonXattr)239 TEST_P(DatatypeStatTest, datatypeJsonXattr) {
240     setDatatypeItem(
241             store,
242             cookie,
243             PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_XATTR,
244             "jsonXattrDoc");
245     auto vals = get_stat(nullptr);
246     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_json,xattr"]));
247 }
248 
TEST_P(DatatypeStatTest, datatypeDeletion)249 TEST_P(DatatypeStatTest, datatypeDeletion) {
250     setDatatypeItem(
251             store,
252             cookie,
253             PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_XATTR,
254             "jsonXattrDoc");
255     auto vals = get_stat(nullptr);
256     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_json,xattr"]));
257     uint64_t cas = 0;
258     mutation_descr_t mutation_descr;
259     store->deleteItem({"jsonXattrDoc", DocNamespace::DefaultCollection},
260                       cas,
261                       0,
262                       cookie,
263                       nullptr,
264                       mutation_descr);
265     vals = get_stat(nullptr);
266     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_json,xattr"]));
267 }
268 
TEST_P(DatatypeStatTest, datatypeCompressedJsonXattr)269 TEST_P(DatatypeStatTest, datatypeCompressedJsonXattr) {
270     setDatatypeItem(store,
271                     cookie,
272                     PROTOCOL_BINARY_DATATYPE_JSON |
273                             PROTOCOL_BINARY_DATATYPE_SNAPPY |
274                             PROTOCOL_BINARY_DATATYPE_XATTR,
275                     "jsonCompressedXattrDoc");
276     auto vals = get_stat(nullptr);
277     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_snappy,json,xattr"]));
278 }
279 
TEST_P(DatatypeStatTest, datatypeExpireItem)280 TEST_P(DatatypeStatTest, datatypeExpireItem) {
281     Item item(make_item(
282             0, {"expiryDoc", DocNamespace::DefaultCollection}, "[0]", 1,
283             PROTOCOL_BINARY_DATATYPE_JSON));
284     store->set(item, cookie);
285     store->get({"expiryDoc", DocNamespace::DefaultCollection}, 0, cookie, NONE);
286     auto vals = get_stat(nullptr);
287 
288     //Should be 0, becuase the doc should have expired
289     EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_json"]));
290 }
291 
292 
TEST_P(DatatypeStatTest, datatypeEviction)293 TEST_P(DatatypeStatTest, datatypeEviction) {
294     const DocKey key = {"jsonXattrDoc", DocNamespace::DefaultCollection};
295     setDatatypeItem(
296             store,
297             cookie,
298             PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_XATTR,
299             "jsonXattrDoc");
300     auto vals = get_stat(nullptr);
301     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_json,xattr"]));
302     getEPBucket().flushVBucket(0);
303     const char* msg;
304     store->evictKey(key, 0, &msg);
305     vals = get_stat(nullptr);
306     if (GetParam() == "value_only"){
307         // Should still be 1 as only value is evicted
308         EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_json,xattr"]));
309     } else {
310         // Should be 0 as everything is evicted
311         EXPECT_EQ(0, std::stoi(vals["ep_active_datatype_json,xattr"]));
312     }
313 
314     store->get(key, 0, cookie, QUEUE_BG_FETCH);
315     if (GetParam() == "full_eviction") {
316         // Run the bgfetch to restore the item from disk
317         ExTask task = std::make_shared<SingleBGFetcherTask>(
318                 engine.get(), key, 0, cookie, false, 0, false);
319         task_executor->schedule(task);
320         runNextTask(*task_executor->getLpTaskQ()[READER_TASK_IDX]);
321     }
322     vals = get_stat(nullptr);
323     // The item should be restored to memory, hence added back to the stats
324     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_json,xattr"]));
325 }
326 
TEST_P(DatatypeStatTest, MB23892)327 TEST_P(DatatypeStatTest, MB23892) {
328     // This test checks that updating a document with a different datatype is
329     // safe to do after an eviction (where the blob is now null)
330     const DocKey key = {"jsonXattrDoc", DocNamespace::DefaultCollection};
331     setDatatypeItem(
332             store,
333             cookie,
334             PROTOCOL_BINARY_DATATYPE_JSON | PROTOCOL_BINARY_DATATYPE_XATTR,
335             "jsonXattrDoc");
336     auto vals = get_stat(nullptr);
337     EXPECT_EQ(1, std::stoi(vals["ep_active_datatype_json,xattr"]));
338     getEPBucket().flushVBucket(0);
339     const char* msg;
340     store->evictKey(key, 0, &msg);
341     getEPBucket().flushVBucket(0);
342     setDatatypeItem(store, cookie, PROTOCOL_BINARY_DATATYPE_JSON, "jsonXattrDoc", "[1]");
343 }
344 
345 INSTANTIATE_TEST_CASE_P(FullAndValueEviction, DatatypeStatTest,
346                         ::testing::Values("value_only", "full_eviction"), []
347                                 (const ::testing::TestParamInfo<std::string>&
348                                 info) {return info.param;});
349 
350 class TestEpStat : public EPStats {
351 public:
setMemUsedMergeThreshold(int64_t value)352     void setMemUsedMergeThreshold(int64_t value) {
353         memUsedMergeThreshold = value;
354     }
355 };
356 
357 class EpStatsTest : public ::testing::Test {
358 public:
359 };
360 
TEST_F(EpStatsTest, memoryNegative)361 TEST_F(EpStatsTest, memoryNegative) {
362     TestEpStat stats;
363     stats.memoryTrackerEnabled = true;
364 
365     stats.memDeallocated(100);
366     EXPECT_EQ(0, stats.getEstimatedTotalMemoryUsed());
367     EXPECT_EQ(0, stats.getPreciseTotalMemoryUsed());
368     // getPrecise will have merged, check we really have negative
369     EXPECT_EQ(-100, stats.estimatedTotalMemory->load());
370 }
371 
TEST_F(EpStatsTest, memoryNegativeUntracked)372 TEST_F(EpStatsTest, memoryNegativeUntracked) {
373     TestEpStat stats;
374     stats.memoryTrackerEnabled = false;
375 
376     stats.coreLocal.get()->memOverhead.fetch_sub(100);
377     ASSERT_EQ(-100, stats.coreLocal.get()->memOverhead.load());
378 
379     EXPECT_EQ(0, stats.getEstimatedTotalMemoryUsed());
380 }
381 
382 // Create n threads who all allocate the same amount of memory in very different
383 // orders
TEST_F(EpStatsTest, memoryAllocated)384 TEST_F(EpStatsTest, memoryAllocated) {
385     TestEpStat stats;
386     stats.memoryTrackerEnabled = true;
387     stats.setMemUsedMergeThreshold(100);
388 
389     const int nThreads = 4;
390     ThreadGate tg(nThreads);
391     std::vector<std::thread> workers;
392     for (int i = 0; i < nThreads; i++) {
393         workers.push_back(std::thread([i, &tg, &stats]() {
394             std::mt19937 generator(i);
395             const int nAllocs = 250;
396             std::vector<int> inputs1(nAllocs);
397             std::vector<int> inputs2(nAllocs);
398             std::iota(inputs1.begin(), inputs1.end(), 1);
399             std::iota(inputs2.begin(), inputs2.end(), 1);
400 
401             // Shuffle this threads order of updates
402             std::shuffle(inputs1.begin(), inputs1.end(), generator);
403             std::shuffle(inputs2.begin(), inputs2.end(), generator);
404 
405             // Bind to the functions of interest
406             std::function<void(size_t)> f1 = std::bind(
407                     &EPStats::memAllocated, &stats, std::placeholders::_1);
408             std::function<void(size_t)> f2 = std::bind(
409                     &EPStats::memDeallocated, &stats, std::placeholders::_1);
410 
411             // Reorder if thread id is odd
412             if (i & 1) {
413                 f1.swap(f2);
414             }
415 
416             tg.threadUp();
417 
418             // Now run f1 then f2
419             for (size_t i = 0; i < inputs1.size(); i++) {
420                 f1(inputs1.at(i));
421             }
422             for (size_t i = 0; i < inputs2.size(); i++) {
423                 f2(inputs2.at(i));
424             }
425         }));
426     }
427 
428     for (int i = 0; i < nThreads; i++) {
429         workers.at(i).join();
430     }
431 
432     EXPECT_EQ(0, stats.getPreciseTotalMemoryUsed());
433 }
434