1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 /**
19  * Suite of performance tests for ep-engine.
20  *
21  * Uses the same engine_testapp infrastructure as ep_testsuite.
22  *
23  * Tests print their performance metrics to stdout; to see this output when
24  * run via do:
25  *
26  *     make test ARGS="--verbose"
27  *
28  * Note this is designed as a relatively quick micro-benchmark suite; tests
29  * are tuned to complete in <2 seconds to maintain the quick turnaround.
30 **/
31 
32 #include "config.h"
33 
34 #include <memcached/engine.h>
35 #include <memcached/engine_testapp.h>
36 
37 #include <algorithm>
38 #include <atomic>
39 #include <condition_variable>
40 #include <fstream>
41 #include <iterator>
42 #include <mutex>
43 #include <numeric>
44 #include <random>
45 #include <thread>
46 #include <type_traits>
47 #include <unordered_map>
48 
49 #include "ep_testsuite_common.h"
50 #include "ep_test_apis.h"
51 
52 #include "mock/mock_dcp.h"
53 
54 // Default number of iterations for tests. Individual tests may
55 // override this, but is generally desirable for them to scale the
56 // default iteration count instead of blindly overriding it.
57 const size_t ITERATIONS =
58 #if defined(THREAD_SANITIZER)
59     // Reduced iteration count for ThreadSanitizer, as it runs ~20x
60     // slower than without TSan.  Note: We don't actually track
61     // performance when run under TSan, however the workloads of this
62     // testsuite are still useful to run under TSan to expose any data
63     // race issues.
64     100000 / 20;
65 #else
66     // Set to a value a typical ~2015 laptop can run Baseline in 3s.
67     100000;
68 #endif
69 
70 // Key of the sentinel document, used to detect the end of a run.
71 const char SENTINEL_KEY[] = "__sentinel__";
72 
73 enum class StatRuntime {
74    Slow = true,
75    Fast = false
76  };
77 
78 enum class BackgroundWork {
79    // Performing bitwise operations and so explicitly assigning values
80    None = 0x0,
81    Sets = 0x1,
82    Dcp = 0x2
83  };
84 
operator |(BackgroundWork lhs, BackgroundWork rhs)85 inline BackgroundWork operator | (BackgroundWork lhs, BackgroundWork rhs) {
86     return static_cast<BackgroundWork> (
87             std::underlying_type<BackgroundWork>::type(lhs) |
88             std::underlying_type<BackgroundWork>::type(rhs));
89 }
90 
operator &(BackgroundWork lhs, BackgroundWork rhs)91 inline BackgroundWork operator & (BackgroundWork lhs, BackgroundWork rhs) {
92     return static_cast<BackgroundWork> (
93             std::underlying_type<BackgroundWork>::type(lhs) &
94             std::underlying_type<BackgroundWork>::type(rhs));
95 }
96 
97 template<typename T>
98 struct Stats {
99     std::string name;
100     double mean;
101     double median;
102     double stddev;
103     double pct5;
104     double pct95;
105     double pct99;
106     std::vector<T>* values;
107 };
108 
109 static const int iterations_for_fast_stats = 100;
110 static const int iterations_for_slow_stats = 10;
111 
112 
113 struct StatProperties {
114      const std::string key;
115      const StatRuntime runtime;
116      std::vector<hrtime_t> timings;
117 };
118 
119 const auto& make_stat_pair = std::make_pair<std::string, StatProperties>;
120 
121 /**
122   * The following table is used as input for each of the stats test.
123   * Each map entry specifies the test name, and then a StatProperities struct,
124   * which contains a key (used as input into the get_stats call),
125   * an enum stating whether the stat takes a long time to return (this
126   * is used to determine the number of iterations of get_stats to invoke),
127   * and finally a vector containing the latency timings for the stat.
128   */
129 std::unordered_map<std::string, StatProperties> stat_tests = {
130         make_stat_pair("engine", {"", StatRuntime::Slow, {}}),
131         make_stat_pair("dcpagg", {"dcpagg _", StatRuntime::Fast, {}}),
132         make_stat_pair("dcp", {"dcp", StatRuntime::Fast, {}}),
133         make_stat_pair("hash", {"hash", StatRuntime::Slow, {}}),
134         make_stat_pair("vbucket", {"vbucket", StatRuntime::Fast, {}}),
135         make_stat_pair("vb-details",
136                        {"vbucket-details", StatRuntime::Slow, {}}),
137         make_stat_pair("vb-details_vb0",
138                        {"vbucket-details 0", StatRuntime::Fast, {}}),
139         make_stat_pair("vb-seqno", {"vbucket-seqno", StatRuntime::Slow, {}}),
140         make_stat_pair("vb-seqno_vb0",
141                        {"vbucket-seqno 0", StatRuntime::Fast, {}}),
142         make_stat_pair("prev-vbucket", {"prev-vbucket", StatRuntime::Fast, {}}),
143         make_stat_pair("checkpoint", {"checkpoint", StatRuntime::Slow, {}}),
144         make_stat_pair("checkpoint_vb0",
145                        {"checkpoint 0", StatRuntime::Fast, {}}),
146         make_stat_pair("timings", {"timings", StatRuntime::Fast, {}}),
147         make_stat_pair("dispatcher", {"dispatcher", StatRuntime::Slow, {}}),
148         make_stat_pair("scheduler", {"scheduler", StatRuntime::Fast, {}}),
149         make_stat_pair("runtimes", {"runtimes", StatRuntime::Fast, {}}),
150         make_stat_pair("memory", {"memory", StatRuntime::Fast, {}}),
151         make_stat_pair("uuid", {"uuid", StatRuntime::Fast, {}}),
152         // We add a document with the key __sentinel__ to vbucket 0 at the
153         // start of the test and hence it is used for the key_vb0 stat.
154         make_stat_pair("key_vb0", {"key example_doc 0", StatRuntime::Fast, {}}),
155         make_stat_pair("kvtimings", {"kvtimings", StatRuntime::Slow, {}}),
156         make_stat_pair("kvstore", {"kvstore", StatRuntime::Fast, {}}),
157         make_stat_pair("info", {"info", StatRuntime::Fast, {}}),
158         make_stat_pair("allocator", {"allocator", StatRuntime::Slow, {}}),
159         make_stat_pair("config", {"config", StatRuntime::Fast, {}}),
160         make_stat_pair("dcp-vbtakeover",
161                        {"dcp-vbtakeover 0 DCP", StatRuntime::Fast, {}}),
162         make_stat_pair("workload", {"workload", StatRuntime::Fast, {}}),
163         make_stat_pair("failovers_vb0", {"failovers 0", StatRuntime::Fast, {}}),
164         make_stat_pair("failovers", {"failovers", StatRuntime::Slow, {}}),
165 };
166 
fillLineWith(const char c, int spaces)167 static void fillLineWith(const char c, int spaces) {
168     for (int i = 0; i < spaces; ++i) {
169         putchar(c);
170     }
171 }
172 
173 // Render the specified value stats, in human-readable text format.
174 template<typename T>
renderToText(const std::string& name, const std::string& description, const std::vector<Stats<T> >& value_stats, const std::string& unit)175 void renderToText(const std::string& name,
176                   const std::string& description,
177                   const std::vector<Stats<T> >& value_stats,
178                   const std::string& unit) {
179 
180     printf("%s", description.c_str());
181     fillLineWith('=', 88 - description.length());
182 
183     // From these find the start and end for the spark graphs which covers the
184     // a "reasonable sample" of each value set. We define that as from the 5th
185     // to the 95th percentile, so we ensure *all* sets have that range covered.
186     T spark_start = std::numeric_limits<T>::max();
187     T spark_end = 0;
188     for (const auto& stats : value_stats) {
189         spark_start = (stats.pct5 < spark_start) ? stats.pct5 : spark_start;
190         spark_end = (stats.pct95 > spark_end) ? stats.pct95 : spark_end;
191     }
192 
193     printf("\n\n                                Percentile           \n");
194     printf("  %-22s Median     95th     99th  Std Dev  Histogram of samples\n\n", "");
195     // Finally, print out each set.
196     for (const auto& stats : value_stats) {
197         if (stats.median/1e6 < 1) {
198             printf("%-22s %8.03f %8.03f %8.03f %8.03f  ",
199                     stats.name.c_str(), stats.median/1e3, stats.pct95/1e3,
200                     stats.pct99/1e3, stats.stddev/1e3);
201         } else {
202             printf("%-15s (x1e3) %8.03f %8.03f %8.03f %8.03f  ",
203                     stats.name.c_str(), stats.median/1e6, stats.pct95/1e6,
204                     stats.pct99/1e6, stats.stddev/1e6);
205         }
206 
207         // Calculate and render Sparkline (requires UTF-8 terminal).
208         const int nbins = 32;
209         int prev_distance = 0;
210         std::vector<size_t> histogram;
211         for (unsigned int bin = 0; bin < nbins; bin++) {
212             const T max_for_bin = (spark_end / nbins) * bin;
213             auto it = std::lower_bound(stats.values->begin(),
214                                        stats.values->end(),
215                                        max_for_bin);
216             const int distance = std::distance(stats.values->begin(), it);
217             histogram.push_back(distance - prev_distance);
218             prev_distance = distance;
219         }
220 
221         const auto minmax = std::minmax_element(histogram.begin(), histogram.end());
222         const size_t range = *minmax.second - *minmax.first + 1;
223         const int levels = 8;
224         for (const auto& h : histogram) {
225             int bar_size = ((h - *minmax.first + 1) * (levels - 1)) / range;
226             putchar('\xe2');
227             putchar('\x96');
228             putchar('\x81' + bar_size);
229         }
230         putchar('\n');
231     }
232     printf("%58s  %-14d %s %14d\n\n", "",
233            int(spark_start/1e3), unit.c_str(), int(spark_end/1e3));
234 }
235 
236 template<typename T>
renderToXML(const std::string& name, const std::string& description, const std::vector<Stats<T> >& value_stats, const std::string& unit)237 void renderToXML(const std::string& name, const std::string& description,
238                  const std::vector<Stats<T> >& value_stats,
239                  const std::string& unit) {
240     std::string test_name = testHarness.output_file_prefix;
241     test_name += name;
242     std::ofstream file(test_name + ".xml");
243 
244     time_t now;
245     time(&now);
246     char timebuf[256];
247     // Ideally would use 'put_time' here, but it is not supported until GCC 5
248     strftime(timebuf, sizeof timebuf, "%FT%T%z", gmtime(&now));
249 
250     file << "<testsuites timestamp=\"" << timebuf << "\">\n";
251 
252     std::string classname = "ep-perfsuite";
253 
254     if (testHarness.bucket_type == "") {
255         file << "  <testsuite name=\"ep-perfsuite\">\n";
256     } else {
257         file << "  <testsuite name=\"ep-perfsuite-" << testHarness.bucket_type
258              << "\">\n";
259         classname += "-" + testHarness.bucket_type;
260     }
261 
262     for (const auto& stats : value_stats) {
263         file << "    <testcase name=\"" << name << "." << stats.name
264              << ".median\" time=\"" << stats.median / 1e3 << "\" classname=\""
265              << classname << "\"/>\n"
266              << "    <testcase name=\"" << name << "." << stats.name
267              << ".pct95\" time=\"" << stats.pct95 / 1e3 << "\" classname=\""
268              << classname << "\"/>\n"
269              << "    <testcase name=\"" << name << "." << stats.name
270              << ".pct99\" time=\"" << stats.pct99 / 1e3 << "\" classname=\""
271              << classname << "\"/>\n";
272     }
273     file << "  </testsuite>\n";
274     file << "</testsuites>\n";
275 }
276 
277 // Given a vector of values (each a vector<T>) calculate metrics on them
278 // and print in the format specified by {testharness.output_format}.
279 template<typename T>
output_result(const std::string& name, const std::string& description, std::vector<std::pair<std::string, std::vector<T>* >> values, std::string unit)280 void output_result(const std::string& name,
281                    const std::string& description,
282                    std::vector<std::pair<std::string, std::vector<T>* >> values,
283                    std::string unit) {
284     // First, calculate mean, median, standard deviation and percentiles of
285     // each set of values, both for printing and to derive what the range of
286     // the graphs should be.
287     std::string new_name = name;
288     std::replace(new_name.begin(), new_name.end(), ' ', '_');
289     std::vector<Stats<T>> value_stats;
290     for (const auto &t : values) {
291         Stats<T> stats;
292         stats.name = t.first;
293         stats.values = t.second;
294         std::vector <T> &vec = *t.second;
295 
296         // Calculate latency percentiles
297         std::sort(vec.begin(), vec.end());
298         stats.median = vec[(vec.size() * 50) / 100];
299         stats.pct5 = vec[(vec.size() * 5) / 100];
300         stats.pct95 = vec[(vec.size() * 95) / 100];
301         stats.pct99 = vec[(vec.size() * 99) / 100];
302 
303         const double sum = std::accumulate(vec.begin(), vec.end(), 0.0);
304         stats.mean = sum / vec.size();
305         double accum = 0.0;
306         std::for_each (vec.begin(), vec.end(),[&](const double d) {
307             accum += (d - stats.mean) * (d - stats.mean);
308         });
309         stats.stddev = sqrt(accum / (vec.size() - 1));
310 
311         value_stats.push_back(stats);
312     }
313 
314     // Now render in the given format.
315     switch (testHarness.output_format) {
316     case OutputFormat::Text:
317         renderToText(new_name, description, value_stats, unit);
318         break;
319 
320     case OutputFormat::XML:
321         renderToXML(new_name, description, value_stats, unit);
322         break;
323     }
324 }
325 /* Add a sentinel document (one with a the key SENTINEL_KEY).
326  * This can be used by DCP streams to reliably detect the end of
327  * a run (sequence numbers are only supported by DCP, and
328  * de-duplication complicates simply counting mutations).
329  */
add_sentinel_doc(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vbid)330 static void add_sentinel_doc(ENGINE_HANDLE *h,
331                              ENGINE_HANDLE_V1 *h1, uint16_t vbid) {
332     // Use ADD instead of SET as we only expect to mutate the sentinel
333     // doc once per run.
334     checkeq(cb::engine_errc::success,
335             storeCasVb11(h, h1, nullptr, OPERATION_ADD, SENTINEL_KEY,
336                          nullptr, 0, /*flags*/0, 0, vbid).first,
337             "Failed to add sentinel document.");
338 }
339 
340 /*****************************************************************************
341  ** Testcases
342  *****************************************************************************/
343 
344 /*
345  * The perf_latency_core performs add/get/replace/delete against the bucket
346  * associated with h/h1 parameters.
347  *
348  * key_prefix is used to enable multiple threads to operate on a single bucket
349  * in their own key-spaces.
350  * num_docs controls how many of each operation is performed.
351  *
352  * The elapsed time of each operation is pushed to the vector parameters.
353  */
perf_latency_core(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int key_prefix, int num_docs, std::vector<hrtime_t> &add_timings, std::vector<hrtime_t> &get_timings, std::vector<hrtime_t> &replace_timings, std::vector<hrtime_t> &delete_timings)354 static void perf_latency_core(ENGINE_HANDLE *h,
355                               ENGINE_HANDLE_V1 *h1,
356                               int key_prefix,
357                               int num_docs,
358                               std::vector<hrtime_t> &add_timings,
359                               std::vector<hrtime_t> &get_timings,
360                               std::vector<hrtime_t> &replace_timings,
361                               std::vector<hrtime_t> &delete_timings) {
362 
363     const void *cookie = testHarness.create_cookie();
364     const std::string data(100, 'x');
365 
366     // Build vector of keys
367     std::vector<std::string> keys;
368     for (int i = 0; i < num_docs; i++) {
369         keys.push_back(std::to_string(key_prefix) + std::to_string(i));
370     }
371 
372     // Create (add)
373     for (auto& key : keys) {
374         const auto start = ProcessClock::now();
375         checkeq(cb::engine_errc::success,
376                 storeCasVb11(h, h1, cookie, OPERATION_ADD, key.c_str(),
377                              data.c_str(), data.length(), 0, 0,
378                              /*vBucket*/0, 0, 0).first,
379                 "Failed to add a value");
380         const auto end = ProcessClock::now();
381         add_timings.push_back((end - start).count());
382     }
383 
384     // Get
385     for (auto& key : keys) {
386         const auto start = ProcessClock::now();
387         auto ret = get(h, h1, cookie, key, 0);
388         checkeq(cb::engine_errc::success, ret.first, "Failed to get a value");
389         const auto end = ProcessClock::now();
390         get_timings.push_back((end - start).count());
391     }
392 
393     // Update (Replace)
394     for (auto& key : keys) {
395         const auto start = ProcessClock::now();
396         checkeq(cb::engine_errc::success,
397                 storeCasVb11(h, h1, cookie, OPERATION_REPLACE, key.c_str(),
398                              data.c_str(), data.length(), 0, 0,
399                              /*vBucket*/0, 0, 0).first,
400                 "Failed to replace a value");
401         const auto end = ProcessClock::now();
402         replace_timings.push_back((end - start).count());
403     }
404 
405     // Delete
406     for (auto& key : keys) {
407         const auto start = ProcessClock::now();
408         checkeq(ENGINE_SUCCESS,
409                 del(h, h1, key.c_str(), 0, 0, cookie),
410                 "Failed to delete a value");
411         const auto end = ProcessClock::now();
412         delete_timings.push_back((end - start).count());
413     }
414 
415     testHarness.destroy_cookie(cookie);
416 }
417 
perf_latency(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* title, size_t num_docs)418 static enum test_result perf_latency(ENGINE_HANDLE *h,
419                                      ENGINE_HANDLE_V1 *h1,
420                                      const char* title, size_t num_docs) {
421 
422     // Only timing front-end performance, not considering persistence.
423     stop_persistence(h, h1);
424 
425     std::vector<hrtime_t> add_timings, get_timings,
426                           replace_timings, delete_timings;
427     add_timings.reserve(num_docs);
428     get_timings.reserve(num_docs);
429     replace_timings.reserve(num_docs);
430     delete_timings.reserve(num_docs);
431 
432     std::string description(std::string("Latency [") + title + "] - " +
433                             std::to_string(num_docs) + " items (µs)");
434 
435     // run and measure on this thread.
436     perf_latency_core(h, h1, 0, num_docs, add_timings, get_timings,
437                       replace_timings, delete_timings);
438 
439     add_sentinel_doc(h, h1, /*vbid*/0);
440 
441     std::vector<std::pair<std::string, std::vector<hrtime_t>*> > all_timings;
442     all_timings.push_back(std::make_pair("Add", &add_timings));
443     all_timings.push_back(std::make_pair("Get", &get_timings));
444     all_timings.push_back(std::make_pair("Replace", &replace_timings));
445     all_timings.push_back(std::make_pair("Delete", &delete_timings));
446     output_result(title, description, all_timings, "µs");
447     return SUCCESS;
448 }
449 
450 /* Benchmark the baseline latency (without any tasks running) of ep-engine.
451  */
perf_latency_baseline(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)452 static enum test_result perf_latency_baseline(ENGINE_HANDLE *h,
453                                               ENGINE_HANDLE_V1 *h1) {
454 
455     return perf_latency(h, h1, "1_bucket_1_thread_baseline", ITERATIONS);
456 }
457 
458 /* Benchmark the baseline latency with the defragmenter enabled.
459  */
perf_latency_defragmenter(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)460 static enum test_result perf_latency_defragmenter(ENGINE_HANDLE *h,
461                                                   ENGINE_HANDLE_V1 *h1) {
462     return perf_latency(h, h1, "With constant defragmention", ITERATIONS);
463 }
464 
465 /* Benchmark the baseline latency with the defragmenter enabled.
466  */
perf_latency_expiry_pager(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)467 static enum test_result perf_latency_expiry_pager(ENGINE_HANDLE *h,
468                                                   ENGINE_HANDLE_V1 *h1) {
469     return perf_latency(h, h1, "With constant Expiry pager", ITERATIONS);
470 }
471 
472 class ThreadArguments {
473 public:
reserve(int n)474     void reserve(int n) {
475         add_timings.reserve(n);
476         get_timings.reserve(n);
477         replace_timings.reserve(n);
478         delete_timings.reserve(n);
479     }
480 
clear()481     void clear() {
482         add_timings.clear();
483         get_timings.clear();
484         replace_timings.clear();
485         delete_timings.clear();
486     }
487 
488     ENGINE_HANDLE* h;
489     ENGINE_HANDLE_V1* h1;
490     int key_prefix;
491     int num_docs;
492     std::vector<hrtime_t> add_timings;
493     std::vector<hrtime_t> get_timings;
494     std::vector<hrtime_t> replace_timings;
495     std::vector<hrtime_t> delete_timings;
496 };
497 
498 extern "C" {
perf_latency_thread(void *arg)499     static void perf_latency_thread(void *arg) {
500         ThreadArguments* threadArgs = static_cast<ThreadArguments*>(arg);
501         // run and measure on this thread.
502         perf_latency_core(threadArgs->h,
503                           threadArgs->h1,
504                           threadArgs->key_prefix,
505                           threadArgs->num_docs,
506                           threadArgs->add_timings,
507                           threadArgs->get_timings,
508                           threadArgs->replace_timings,
509                           threadArgs->delete_timings);
510     }
511 }
512 
513 //
514 // Test performance of many buckets/threads
515 //
perf_latency_baseline_multi_thread_bucket(engine_test_t* test, int n_buckets, int n_threads, int num_docs)516 static enum test_result perf_latency_baseline_multi_thread_bucket(engine_test_t* test,
517                                                                   int n_buckets,
518                                                                   int n_threads,
519                                                                   int num_docs) {
520     if (n_buckets > n_threads) {
521         // not supporting...
522         fprintf(stderr, "Returning FAIL because n_buckets(%d) > n_threads(%d)\n",
523                 n_buckets, n_threads);
524         return FAIL;
525     }
526 
527     std::vector<BucketHolder> buckets;
528 
529     printf("\n\n");
530     int printed = printf("=== Latency(%d - bucket(s) %d - thread(s)) - %u items(µs)",
531                          n_buckets, n_threads, num_docs);
532 
533     fillLineWith('=', 88-printed);
534 
535     if (create_buckets(test->cfg, n_buckets, buckets) != n_buckets) {
536         destroy_buckets(buckets);
537         return FAIL;
538     }
539 
540     for (int ii = 0; ii < n_buckets; ii++) {
541         // re-use test_setup to wait for ready
542         test_setup(buckets[ii].h, buckets[ii].h1);
543         // Only timing front-end performance, not considering persistence.
544         stop_persistence(buckets[ii].h, buckets[ii].h1);
545     }
546 
547     std::vector<ThreadArguments> thread_args(n_threads);
548     std::vector<cb_thread_t> threads(n_threads);
549 
550     // setup the arguments each thread will use.
551     // just round robin allocate buckets to threads
552     int bucket = 0;
553     for (int ii = 0; ii < n_threads; ii++) {
554         thread_args[ii].h = buckets[bucket].h;
555         thread_args[ii].h1 = buckets[bucket].h1;
556         thread_args[ii].reserve(num_docs);
557         thread_args[ii].num_docs = num_docs;
558         thread_args[ii].key_prefix = ii;
559         if ((++bucket) == n_buckets) {
560             bucket = 0;
561         }
562     }
563 
564     // Now drive bucket(s) from thread(s)
565     for (int i = 0; i < n_threads; i++) {
566         int r = cb_create_thread(&threads[i], perf_latency_thread, &thread_args[i], 0);
567         cb_assert(r == 0);
568     }
569 
570     for (int i = 0; i < n_threads; i++) {
571         int r = cb_join_thread(threads[i]);
572         cb_assert(r == 0);
573     }
574 
575     // destroy the buckets and rm the db path
576     for (int ii = 0; ii < n_buckets; ii++) {
577         testHarness.destroy_bucket(buckets[ii].h, buckets[ii].h1, false);
578         rmdb(buckets[ii].dbpath.c_str());
579     }
580 
581     // For the results, bring all the bucket timings into a single array
582     std::vector<std::pair<std::string, std::vector<hrtime_t>*> > all_timings;
583     std::vector<hrtime_t> add_timings, get_timings, replace_timings,
584                           delete_timings;
585     for (int ii = 0; ii < n_threads; ii++) {
586         add_timings.insert(add_timings.end(),
587                            thread_args[ii].add_timings.begin(),
588                            thread_args[ii].add_timings.end());
589         get_timings.insert(get_timings.end(),
590                            thread_args[ii].get_timings.begin(),
591                            thread_args[ii].get_timings.end());
592         replace_timings.insert(replace_timings.end(),
593                                thread_args[ii].replace_timings.begin(),
594                                thread_args[ii].replace_timings.end());
595         delete_timings.insert(delete_timings.end(),
596                               thread_args[ii].delete_timings.begin(),
597                               thread_args[ii].delete_timings.end());
598         // done with these arrays now
599         thread_args[ii].clear();
600     }
601     all_timings.push_back(std::make_pair("Add", &add_timings));
602     all_timings.push_back(std::make_pair("Get", &get_timings));
603     all_timings.push_back(std::make_pair("Replace", &replace_timings));
604     all_timings.push_back(std::make_pair("Delete", &delete_timings));
605     std::stringstream title;
606     title << n_buckets << "_buckets_" << n_threads << "_threads_baseline";
607     output_result(title.str(), "Timings", all_timings, "µs");
608 
609     return SUCCESS;
610 }
611 
perf_latency_baseline_multi_bucket_2(engine_test_t* test)612 static enum test_result perf_latency_baseline_multi_bucket_2(engine_test_t* test) {
613     return perf_latency_baseline_multi_thread_bucket(test,
614                                                      2, /* buckets */
615                                                      2, /* threads */
616                                                      10000/* documents */);
617 }
618 
perf_latency_baseline_multi_bucket_4(engine_test_t* test)619 static enum test_result perf_latency_baseline_multi_bucket_4(engine_test_t* test) {
620     return perf_latency_baseline_multi_thread_bucket(test,
621                                                      4, /* buckets */
622                                                      4, /* threads */
623                                                      10000/* documents */);
624 }
625 
626 enum class Doc_format {
627     JSON_PADDED,
628     JSON_RANDOM,
629     BINARY_RANDOM
630 };
631 
632 struct Handle_args {
Handle_argsStatRuntime::BackgroundWork::Handle_args633     Handle_args(ENGINE_HANDLE *_h, ENGINE_HANDLE_V1 *_h1, int _count,
634                 Doc_format _type, std::string _name, uint32_t _opaque,
635                 uint16_t _vb, bool _getCompressed) :
636         h(_h), h1(_h1), itemCount(_count), typeOfData(_type), name(_name),
637         opaque(_opaque), vb(_vb), retrieveCompressed(_getCompressed)
638     {
639         timings.reserve(_count);
640         bytes_received.reserve(_count);
641     }
642 
Handle_argsStatRuntime::BackgroundWork::Handle_args643     Handle_args(struct Handle_args const &ha) :
644         h(ha.h), h1(ha.h1), itemCount(ha.itemCount),
645         typeOfData(ha.typeOfData), name(ha.name), opaque(ha.opaque),
646         vb(ha.vb), retrieveCompressed(ha.retrieveCompressed),
647         timings(ha.timings), bytes_received(ha.bytes_received)
648     { }
649 
650     ENGINE_HANDLE *h;
651     ENGINE_HANDLE_V1 *h1;
652     int itemCount;
653     Doc_format typeOfData;
654     std::string name;
655     uint32_t opaque;
656     uint16_t vb;
657     bool retrieveCompressed;
658     std::vector<hrtime_t> timings;
659     std::vector<size_t> bytes_received;
660 };
661 
662 /* Generates random strings of characters based on the input alphabet.
663  */
664 class UniformCharacterDistribution {
665 public:
UniformCharacterDistribution(const std::string& alphabet_)666     UniformCharacterDistribution(const std::string& alphabet_)
667     : alphabet(alphabet_),
668       uid(0, alphabet.size()) {}
669 
670     template< class Generator >
operator ()(Generator& g)671     char operator()(Generator& g) {
672         return alphabet[uid(g)];
673     }
674 
675 private:
676     // Set of characters to randomly select from
677     std::string alphabet;
678 
679     // Underlying integer distribution used to select character.
680     std::uniform_int_distribution<> uid;
681 };
682 
683 
684 /* Generates a random string of the given length.
685  */
686 template< class Generator>
make_random_string(UniformCharacterDistribution& dist, Generator& gen, size_t len)687 static std::string make_random_string(UniformCharacterDistribution& dist,
688                                       Generator& gen,
689                                       size_t len) {
690     std::string result(len, 0);
691     std::generate_n(result.begin(), len, [&]() {
692         return dist(gen);
693     });
694     return result;
695 }
696 
genVectorOfValues(Doc_format type, size_t count, size_t maxSize)697 std::vector<std::string> genVectorOfValues(Doc_format type,
698                                            size_t count, size_t maxSize) {
699     static const char alphabet[] =
700         "abcdefghijklmnopqrstuvwxyz"
701         "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
702         "0123456789";
703 
704     size_t len = 0;
705 
706     std::random_device ran;
707     std::mt19937 dre(ran());
708     UniformCharacterDistribution alpha_dist(alphabet);
709 
710     std::vector<std::string> vals;
711     vals.reserve(count);
712     switch (type) {
713         case Doc_format::JSON_PADDED:
714             for (size_t i = 0; i < count; ++i) {
715                 len = ((i + 1) * 10) % maxSize; // Set field length
716                 len = (len == 0) ? 10 : len;    // Adjust field length
717                 std::string str(len, alphabet[i % (sizeof(alphabet) - 1)]);
718                 vals.push_back("{"
719                                "\"one\":\"" + std::to_string(i) + "\", "
720                                "\"two\":\"" + "TWO\", "
721                                "\"three\":\"" + std::to_string(i) + "\", "
722                                "\"four\":\"" + "FOUR\", "
723                                "\"five\":\"" + str + "\""
724                                "}");
725             }
726             break;
727         case Doc_format::JSON_RANDOM:
728             for (size_t i = 0; i < count; ++i) {
729                 // Generate a fixed-format document with random field values.
730                 len = ((i + 1) * 10) % maxSize; // Set field length
731                 len = (len == 0) ? 10 : len;    // Adjust field length
732 
733                 vals.push_back(
734                         "{"
735                         "\"one\":\"" + std::to_string(i) + "\", "
736                         "\"two\":\"" +
737                         make_random_string(alpha_dist, dre, len * 0.003) + "\", "
738                         "\"three\":\"" +
739                         make_random_string(alpha_dist, dre, len * 0.001) + "\", "
740                         "\"four\": \"" +
741                         make_random_string(alpha_dist, dre, len * 0.002) + "\", "
742                         "\"five\":\"" +
743                         make_random_string(alpha_dist, dre, len * 0.05) + "\", "
744                         "\"six\":\"{1, 2, 3, 4, 5}\", "
745                         "\"seven\":\"" +
746                         make_random_string(alpha_dist, dre, len * 0.01) + "\", "
747                         "\"eight\":\"" +
748                         make_random_string(alpha_dist, dre, len * 0.01) + "\", "
749                         "\"nine\":{'abc', 'def', 'ghi'}\", "
750                         "\"ten\":\"0.123456789\""
751                         "}");
752             }
753             break;
754         case Doc_format::BINARY_RANDOM:
755             for (size_t i = 0; i < count; ++i) {
756                 len = ((i + 1) * 10) % maxSize; // Set field length
757                 len = (len == 0) ? 10 : len;    // Adjust field length
758                 std::string str(len, 0);
759                 std::generate_n(str.begin(), len, [&]() {
760                     return dre();
761                 });
762                 vals.push_back(str);
763             }
764             break;
765         default:
766             check(false, "Unknown DATA requested!");
767     }
768     return vals;
769 }
770 
771 /* Function which loads documents into a bucket */
perf_load_client(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1, uint16_t vbid, int count, Doc_format typeOfData, std::vector<hrtime_t> &insertTimes)772 static void perf_load_client(ENGINE_HANDLE* h,
773                              ENGINE_HANDLE_V1* h1,
774                              uint16_t vbid,
775                              int count,
776                              Doc_format typeOfData,
777                              std::vector<hrtime_t> &insertTimes) {
778     std::vector<std::string> keys;
779     for (int i = 0; i < count; ++i) {
780         keys.push_back("key" + std::to_string(i));
781     }
782 
783     std::vector<std::string> vals =
784             genVectorOfValues(typeOfData, count, ITERATIONS);
785 
786     for (int i = 0; i < count; ++i) {
787         checkeq(storeCasVb11(h, h1, NULL, OPERATION_SET, keys[i].c_str(),
788                              vals[i].data(), vals[i].size(), /*flags*/9258,
789                              0, vbid).first,
790                 cb::engine_errc::success,
791                 "Failed set.");
792         insertTimes.push_back(ProcessClock::now().time_since_epoch().count());
793     }
794 
795     add_sentinel_doc(h, h1, vbid);
796 
797     wait_for_flusher_to_settle(h, h1);
798 }
799 
800 /* Function which loads documents into a bucket until told to stop*/
perf_background_sets(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1, uint16_t vbid, int count, Doc_format typeOfData, std::vector<hrtime_t> &insertTimes, std::condition_variable &cond_var, std::atomic<bool> &setup_benchmark, std::atomic<bool> &running_benchmark)801 static void perf_background_sets(ENGINE_HANDLE* h,
802                                  ENGINE_HANDLE_V1* h1,
803                                  uint16_t vbid,
804                                  int count,
805                                  Doc_format typeOfData,
806                                  std::vector<hrtime_t> &insertTimes,
807                                  std::condition_variable &cond_var,
808                                  std::atomic<bool> &setup_benchmark,
809                                  std::atomic<bool> &running_benchmark) {
810 
811     std::vector<std::string> keys;
812     const void* cookie = testHarness.create_cookie();
813 
814     for (int ii = 0; ii < count; ++ii) {
815         keys.push_back("key" + std::to_string(ii));
816     }
817 
818     const std::vector<std::string> vals =
819             genVectorOfValues(typeOfData, count, ITERATIONS);
820 
821     int ii = 0;
822     // update atomic stating we are ready to run the benchmark
823     setup_benchmark = true;
824     // signal the thread performing the stats calls
825     cond_var.notify_one();
826 
827     while (running_benchmark) {
828         if (ii == count) {
829             ii = 0;
830         }
831         const auto start = ProcessClock::now();
832         checkeq(storeCasVb11(h, h1, cookie, OPERATION_SET, keys[ii].c_str(),
833                              vals[ii].data(), vals[ii].size(), /*flags*/9258,
834                              0, vbid).first,
835                 cb::engine_errc::success, "Failed set.");
836         const auto end = ProcessClock::now();
837         insertTimes.push_back((end - start).count());
838         ++ii;
839     }
840 
841     testHarness.destroy_cookie(cookie);
842 }
843 
844 /*
845  * Function which implements a DCP client sinking mutations from an ep-engine
846  * DCP Producer (i.e. simulating the replica side of a DCP pairing).
847  */
perf_dcp_client(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1, int itemCount, const std::string& name, uint32_t opaque, uint16_t vbid, bool retrieveCompressed, std::vector<hrtime_t>& recv_timings, std::vector<size_t>& bytes_received)848 static void perf_dcp_client(ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1,
849                             int itemCount, const std::string& name,
850                             uint32_t opaque, uint16_t vbid,
851                             bool retrieveCompressed,
852                             std::vector<hrtime_t>& recv_timings,
853                             std::vector<size_t>& bytes_received) {
854     const void *cookie = testHarness.create_cookie();
855 
856     std::string uuid("vb_" + std::to_string(vbid) + ":0:id");
857     uint64_t vb_uuid = get_ull_stat(h, h1, uuid.c_str(), "failovers");
858     uint32_t streamOpaque = opaque;
859 
860     checkeq(h1->dcp.open(
861                     h, cookie, ++streamOpaque, 0, DCP_OPEN_PRODUCER, name, {}),
862             ENGINE_SUCCESS,
863             "Failed dcp producer open connection");
864 
865     checkeq(h1->dcp.control(h, cookie, ++streamOpaque,
866                                 "connection_buffer_size",
867                                 strlen("connection_buffer_size"), "1024", 4),
868             ENGINE_SUCCESS,
869             "Failed to establish connection buffer");
870 
871     if (retrieveCompressed) {
872 
873         testHarness.set_datatype_support(cookie, PROTOCOL_BINARY_DATATYPE_SNAPPY);
874 
875         checkeq(h1->dcp.control(h, cookie, ++streamOpaque,
876                                     "force_value_compression",
877                                     strlen("force_value_compression"), "true", 4),
878                 ENGINE_SUCCESS,
879                 "Failed to force value compression");
880     }
881 
882     // We create a stream from 0 to MAX(seqno), and then rely on encountering the
883     // sentinel document to know when to finish.
884     uint64_t rollback = 0;
885     checkeq(h1->dcp.stream_req(h, cookie, 0, streamOpaque,
886                                    vbid, 0, std::numeric_limits<uint64_t>::max(),
887                                    vb_uuid, 0, 0, &rollback,
888                                    mock_dcp_add_failover_log),
889             ENGINE_SUCCESS,
890             "Failed to initiate stream request");
891 
892     std::unique_ptr<dcp_message_producers> producers(get_dcp_producers(h, h1));
893 
894     bool done = false;
895     uint32_t bytes_read = 0;
896     bool pending_marker_ack = false;
897     uint64_t marker_end = 0;
898 
899     do {
900         if (bytes_read > 512) {
901             checkeq(ENGINE_SUCCESS,
902                     h1->dcp.buffer_acknowledgement(h, cookie, ++streamOpaque,
903                                                    vbid, bytes_read),
904                     "Failed to acknowledge buffer");
905             bytes_read = 0;
906         }
907         ENGINE_ERROR_CODE err = h1->dcp.step(h, cookie, producers.get());
908         switch (err) {
909         case ENGINE_SUCCESS:
910             // No data currently available - wait to be notified when
911             // more available.
912             testHarness.lock_cookie(cookie);
913             testHarness.waitfor_cookie(cookie);
914             testHarness.unlock_cookie(cookie);
915             break;
916 
917         case ENGINE_WANT_MORE:
918             switch (dcp_last_op) {
919                 case PROTOCOL_BINARY_CMD_DCP_MUTATION:
920                 case PROTOCOL_BINARY_CMD_DCP_DELETION:
921                     // Check for sentinel (before adding to timings).
922                     if (dcp_last_key == SENTINEL_KEY) {
923                         done = true;
924                         break;
925                     }
926                     recv_timings.push_back(
927                             ProcessClock::now().time_since_epoch().count());
928                     bytes_received.push_back(dcp_last_value.length());
929                     bytes_read += dcp_last_packet_size;
930                     if (pending_marker_ack && dcp_last_byseqno == marker_end) {
931                         sendDcpAck(h, h1, cookie,
932                                    PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER,
933                                    PROTOCOL_BINARY_RESPONSE_SUCCESS,
934                                    dcp_last_opaque);
935                     }
936 
937                     break;
938 
939                 case PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER:
940                     if (dcp_last_flags & 8) {
941                         pending_marker_ack = true;
942                         marker_end = dcp_last_snap_end_seqno;
943                     }
944                     bytes_read += dcp_last_packet_size;
945                     break;
946 
947                 case 0:
948                     /* Consider case where no messages were ready on the last
949                      * step call so we will just ignore this case. Note that we
950                      * check for 0 because we clear the dcp_last_op value below.
951                      */
952                     break;
953                 default:
954                     fprintf(stderr, "Unexpected DCP event type received: %d\n",
955                             dcp_last_op);
956                     abort();
957             }
958             dcp_last_op = 0;
959             break;
960 
961         default:
962             fprintf(stderr, "Unhandled dcp->step() result: %d\n", err);
963             abort();
964         }
965     } while (!done);
966 
967     testHarness.destroy_cookie(cookie);
968 }
969 
970 struct Ret_vals {
Ret_valsStatRuntime::BackgroundWork::Ret_vals971     Ret_vals(struct Handle_args _ha, size_t n) :
972         ha(_ha)
973     {
974         timings.reserve(n);
975         received.reserve(n);
976     }
977     struct Handle_args ha;
978     std::vector<hrtime_t> timings;
979     std::vector<size_t> received;
980 };
981 
982 /*
983  * Performs a single DCP latency / bandwidth test with the given parameters.
984  * Returns vectors of item timings and recived bytes.
985  */
986 static std::pair<std::vector<hrtime_t>,
987                  std::vector<size_t>>
single_dcp_latency_bw_test(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb, size_t item_count, Doc_format typeOfData, const std::string& name, uint32_t opaque, bool retrieveCompressed)988 single_dcp_latency_bw_test(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
989                            uint16_t vb, size_t item_count,
990                            Doc_format typeOfData, const std::string& name,
991                            uint32_t opaque, bool retrieveCompressed) {
992     std::vector<size_t> received;
993 
994     check(set_vbucket_state(h, h1, vb, vbucket_state_active),
995             "Failed set_vbucket_state for vbucket");
996     wait_for_flusher_to_settle(h, h1);
997 
998     std::vector<hrtime_t> insert_times;
999 
1000     std::thread load_thread{perf_load_client, h, h1, vb, item_count,
1001                             typeOfData, std::ref(insert_times)};
1002 
1003     std::vector<hrtime_t> recv_times;
1004     std::thread dcp_thread{perf_dcp_client, h, h1, item_count, name,
1005                            opaque, vb, retrieveCompressed,
1006                            std::ref(recv_times), std::ref(received)};
1007     load_thread.join();
1008     dcp_thread.join();
1009 
1010     std::vector<hrtime_t> timings;
1011     for (size_t j = 0; j < insert_times.size(); ++j) {
1012         if (insert_times[j] < recv_times[j]) {
1013             timings.push_back(recv_times[j] - insert_times[j]);
1014         } else {
1015             // Since there is no network overhead at all, it is seen
1016             // that sometimes the DCP client actually received the
1017             // mutation before the store from the load client returned
1018             // a SUCCESS.
1019             timings.push_back(0);
1020         }
1021     }
1022 
1023     return {timings, received};
1024 }
1025 
perf_dcp_latency_and_bandwidth(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, std::string title, Doc_format typeOfData, size_t item_count)1026 static enum test_result perf_dcp_latency_and_bandwidth(ENGINE_HANDLE *h,
1027                                                        ENGINE_HANDLE_V1 *h1,
1028                                                        std::string title,
1029                                                        Doc_format typeOfData,
1030                                                        size_t item_count) {
1031 
1032     std::vector<std::pair<std::string, std::vector<hrtime_t>*> > all_timings;
1033     std::vector<std::pair<std::string, std::vector<size_t>*> > all_sizes;
1034 
1035     std::vector<struct Ret_vals> iterations;
1036 
1037     // For Loader & DCP client to get documents as is from vbucket 0
1038     auto as_is_results =
1039             single_dcp_latency_bw_test(h, h1, /*vb*/0, item_count, typeOfData,
1040                                        "As_is", /*opaque*/0xFFFFFF00, false);
1041     all_timings.push_back({"As_is", &as_is_results.first});
1042     all_sizes.push_back({"As_is", &as_is_results.second});
1043 
1044     // For Loader & DCP client to get documents compressed from vbucket 1
1045     auto compress_results =
1046             single_dcp_latency_bw_test(h, h1, /*vb*/1, item_count, typeOfData,
1047                                       "Compress", /*opaque*/0xFF000000, true);
1048     all_timings.push_back({"Compress", &compress_results.first});
1049     all_sizes.push_back({"Compress", &compress_results.second});
1050 
1051     printf("\n\n");
1052 
1053     int printed = printf("=== %s KB Rcvd. - %zu items (KB)", title.c_str(),
1054                          item_count);
1055     fillLineWith('=', 86-printed);
1056 
1057     output_result(title, "Size", all_sizes, "KB");
1058 
1059     fillLineWith('=', 86);
1060 
1061     printed = printf("=== %s Latency - %zu items(µs)", title.c_str(),
1062                      item_count);
1063     fillLineWith('=', 88-printed);
1064 
1065     output_result(title, "Latency", all_timings, "µs");
1066     printf("\n\n");
1067 
1068     return SUCCESS;
1069 }
1070 
perf_dcp_latency_with_padded_json(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1071 static enum test_result perf_dcp_latency_with_padded_json(ENGINE_HANDLE *h,
1072                                                           ENGINE_HANDLE_V1 *h1) {
1073     return perf_dcp_latency_and_bandwidth(h, h1,
1074                             "DCP In-memory (JSON-PADDED) [As_is vs. Compress]",
1075                             Doc_format::JSON_PADDED, ITERATIONS / 10);
1076 }
1077 
perf_dcp_latency_with_random_json(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1078 static enum test_result perf_dcp_latency_with_random_json(ENGINE_HANDLE *h,
1079                                                           ENGINE_HANDLE_V1 *h1) {
1080     return perf_dcp_latency_and_bandwidth(h, h1,
1081                             "DCP In-memory (JSON-RAND) [As_is vs. Compress]",
1082                             Doc_format::JSON_RANDOM, ITERATIONS / 20);
1083 }
1084 
perf_dcp_latency_with_random_binary(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1085 static enum test_result perf_dcp_latency_with_random_binary(ENGINE_HANDLE *h,
1086                                                             ENGINE_HANDLE_V1 *h1) {
1087     return perf_dcp_latency_and_bandwidth(h, h1,
1088                             "DCP In-memory (BINARY-RAND) [As_is vs. Compress]",
1089                             Doc_format::BINARY_RANDOM, ITERATIONS / 20);
1090 }
1091 
perf_multi_thread_latency(engine_test_t* test)1092 static enum test_result perf_multi_thread_latency(engine_test_t* test) {
1093     return perf_latency_baseline_multi_thread_bucket(test,
1094                                                      1, /* bucket */
1095                                                      4, /* threads */
1096                                                      10000/* documents */);
1097 }
1098 
perf_latency_dcp_impact(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1099 static enum test_result perf_latency_dcp_impact(ENGINE_HANDLE *h,
1100                                                 ENGINE_HANDLE_V1 *h1) {
1101     // Spin up a DCP replication background thread, then start the normal
1102     // latency test.
1103     const size_t num_docs = ITERATIONS;
1104     // Perform 3 DCP-visible operations - add, replace, delete:
1105     const size_t num_dcp_ops = num_docs * 3;
1106 
1107     // Don't actually care about send times & bytes for this test.
1108     std::vector<hrtime_t> ignored_send_times;
1109     std::vector<size_t> ignored_send_bytes;
1110     std::thread dcp_thread{perf_dcp_client, h, h1, num_dcp_ops, "DCP",
1111                            /*opaque*/0x1, /*vb*/0, /*compressed*/false,
1112                            std::ref(ignored_send_times),
1113                            std::ref(ignored_send_bytes)};
1114 
1115     enum test_result result = perf_latency(h, h1, "With background DCP",
1116                                            num_docs);
1117 
1118     dcp_thread.join();
1119 
1120     return result;
1121 }
1122 
perf_stat_latency_core(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, int key_prefix, StatRuntime statRuntime)1123 static void perf_stat_latency_core(ENGINE_HANDLE *h,
1124                                    ENGINE_HANDLE_V1 *h1,
1125                                    int key_prefix,
1126                                    StatRuntime statRuntime) {
1127 
1128     const int iterations = (statRuntime == StatRuntime::Slow) ?
1129             iterations_for_slow_stats : iterations_for_fast_stats;
1130 
1131     const void* cookie = testHarness.create_cookie();
1132     // For some of the stats we need to have a document stored
1133     checkeq(cb::engine_errc::success,
1134             storeCasVb11(h, h1, nullptr, OPERATION_ADD, "example_doc", nullptr,
1135                          0, /*flags*/0, 0, /*vbid*/0).first,
1136                          "Failed to add example document.");
1137 
1138     if (isWarmupEnabled(h, h1)) {
1139         // Include warmup-specific stats
1140         stat_tests.insert({"warmup", {"warmup", StatRuntime::Fast, {}} });
1141     }
1142 
1143     if (isPersistentBucket(h, h1)) {
1144         // Include persistence-specific stats
1145         stat_tests.insert(
1146                 {make_stat_pair("diskinfo",
1147                                 {"diskinfo", StatRuntime::Fast, {}}),
1148                  make_stat_pair("diskinfo-detail",
1149                                 {"diskinfo-detail", StatRuntime::Slow, {}}),
1150                  make_stat_pair(
1151                          "vkey_vb0",
1152                          {"vkey example_doc 0", StatRuntime::Fast, {}})});
1153     }
1154 
1155     for (auto& stat : stat_tests) {
1156         if (stat.second.runtime == statRuntime) {
1157             for (int ii = 0; ii < iterations; ii++) {
1158                 auto start = ProcessClock::now();
1159                 if (stat.first.compare("engine") == 0) {
1160                     checkeq(ENGINE_SUCCESS,
1161                             h1->get_stats(h, cookie, {}, add_stats),
1162                             "Failed to get engine stats");
1163                 } else {
1164                     checkeq(ENGINE_SUCCESS,
1165                             h1->get_stats(h,
1166                                           cookie,
1167                                           {stat.second.key.c_str(),
1168                                            stat.second.key.length()},
1169                                           add_stats),
1170                             (std::string("Failed to get stat:") +
1171                              stat.second.key)
1172                                     .c_str());
1173                 }
1174 
1175                 auto end = ProcessClock::now();
1176                 stat.second.timings.push_back((end - start).count());
1177             }
1178         }
1179     }
1180     testHarness.destroy_cookie(cookie);
1181 }
1182 
perf_stat_latency(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* title, StatRuntime statRuntime, BackgroundWork backgroundWork, int active_vbuckets)1183 static enum test_result perf_stat_latency(ENGINE_HANDLE *h,
1184                                           ENGINE_HANDLE_V1 *h1,
1185                                           const char* title,
1186                                           StatRuntime statRuntime,
1187                                           BackgroundWork backgroundWork,
1188                                           int active_vbuckets) {
1189     std::condition_variable cond_var;
1190     std::mutex m;
1191     std::atomic<bool> setup_benchmark { false };
1192     std::atomic<bool> running_benchmark { true };
1193     std::vector<std::pair<std::string, std::vector<hrtime_t>*> > all_timings;
1194     std::vector<hrtime_t> insert_timings;
1195     std::vector<hrtime_t> ignored_send_times;
1196     std::vector<size_t> ignored_send_bytes;
1197     std::thread dcp_thread;
1198 
1199     insert_timings.reserve(iterations_for_fast_stats);
1200 
1201     for (int vb = 0; vb < active_vbuckets; vb++) {
1202         check(set_vbucket_state(h, h1, vb, vbucket_state_active),
1203               "Failed set_vbucket_state for vbucket");
1204     }
1205     if (isPersistentBucket(h, h1)) {
1206         wait_for_stat_to_be(h, h1, "ep_persist_vbstate_total", active_vbuckets);
1207     }
1208 
1209     // Only timing front-end performance, not considering persistence.
1210     stop_persistence(h, h1);
1211 
1212     if ((backgroundWork & BackgroundWork::Sets) == BackgroundWork::Sets) {
1213         std::thread load_thread { perf_background_sets, h, h1, /*vbid*/0,
1214                                   iterations_for_fast_stats,
1215                                   Doc_format::JSON_RANDOM,
1216                                   std::ref(insert_timings),
1217                                   std::ref(cond_var), std::ref(setup_benchmark),
1218                                   std::ref(running_benchmark) };
1219 
1220         if ((backgroundWork & BackgroundWork::Dcp) == BackgroundWork::Dcp) {
1221             std::thread local_dcp_thread{perf_dcp_client, h, h1, 0, "DCP",
1222                 /*opaque*/0x1, /*vb*/0, /*compressed*/false,
1223                 std::ref(ignored_send_times), std::ref(ignored_send_bytes)};
1224             dcp_thread.swap(local_dcp_thread);
1225         }
1226 
1227         std::unique_lock<std::mutex> lock(m);
1228         while (!setup_benchmark) {
1229             cond_var.wait(lock);
1230         }
1231 
1232         // run and measure on this thread.
1233         perf_stat_latency_core(h, h1, 0, statRuntime);
1234 
1235         // Need to tell the thread performing sets to stop
1236         running_benchmark = false;
1237         load_thread.join();
1238         if ((backgroundWork & BackgroundWork::Dcp) == BackgroundWork::Dcp) {
1239             // Need to tell the thread performing DCP to stop
1240             add_sentinel_doc(h, h1, /*vbid*/0);
1241             dcp_thread.join();
1242             all_timings.emplace_back("Sets and DCP (bg)", &insert_timings);
1243         } else {
1244             all_timings.emplace_back("Sets (bg)", &insert_timings);
1245         }
1246     } else {
1247         // run and measure on this thread.
1248         perf_stat_latency_core(h, h1, 0, statRuntime);
1249     }
1250 
1251     for (auto& stat : stat_tests) {
1252         if (statRuntime == stat.second.runtime) {
1253             all_timings.emplace_back(stat.first,
1254                                      &stat.second.timings);
1255         }
1256     }
1257 
1258     const std::string iterations = (statRuntime == StatRuntime::Slow) ?
1259     std::to_string(iterations_for_slow_stats)
1260     : std::to_string(iterations_for_fast_stats);
1261 
1262     std::string description(std::string("Latency [") + title + "] - " +
1263                             iterations + " items (µs)");
1264     output_result(title, description, all_timings, "µs");
1265     return SUCCESS;
1266 }
1267 
1268 /* Benchmark the baseline stats (without any tasks running) of ep-engine */
perf_stat_latency_baseline(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1269 static enum test_result perf_stat_latency_baseline(ENGINE_HANDLE *h,
1270                                               ENGINE_HANDLE_V1 *h1) {
1271     return perf_stat_latency(h, h1, "Baseline Stats",
1272                              StatRuntime::Fast, BackgroundWork::None, 1);
1273 }
1274 
1275 /* Benchmark the stats with 100 active vbuckets */
perf_stat_latency_100vb(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1276 static enum test_result perf_stat_latency_100vb(ENGINE_HANDLE *h,
1277                                               ENGINE_HANDLE_V1 *h1) {
1278     return perf_stat_latency(h, h1, "Stats with 100 vbuckets",
1279                              StatRuntime::Fast, BackgroundWork::None, 100);
1280 }
1281 
1282 /*
1283  * Benchmark the stats with 100 active vbuckets.  And sets and DCP running on
1284  * background thread.
1285  */
perf_stat_latency_100vb_sets_and_dcp(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1286 static enum test_result perf_stat_latency_100vb_sets_and_dcp(ENGINE_HANDLE *h,
1287                                               ENGINE_HANDLE_V1 *h1) {
1288     return perf_stat_latency(h, h1, "Stats with 100 vbuckets and background sets and DCP",
1289                              StatRuntime::Fast, (BackgroundWork::Sets |
1290                                      BackgroundWork::Dcp), 100);
1291 }
1292 
1293 /* Benchmark the baseline slow stats (without any tasks running) of ep-engine */
perf_slow_stat_latency_baseline(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1294 static enum test_result perf_slow_stat_latency_baseline(ENGINE_HANDLE *h,
1295                                               ENGINE_HANDLE_V1 *h1) {
1296     return perf_stat_latency(h, h1, "Baseline Slow Stats",
1297                              StatRuntime::Slow, BackgroundWork::None, 1);
1298 }
1299 
1300 /* Benchmark the slow stats with 100 active vbuckets */
perf_slow_stat_latency_100vb(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1301 static enum test_result perf_slow_stat_latency_100vb(ENGINE_HANDLE *h,
1302                                               ENGINE_HANDLE_V1 *h1) {
1303     return perf_stat_latency(h, h1, "Slow Stats with 100 vbuckets",
1304                              StatRuntime::Slow, BackgroundWork::None, 100);
1305 }
1306 
1307 /*
1308  * Benchmark the slow stats with 100 active vbuckets.  And sets and DCP running
1309  * on background thread.
1310  */
perf_slow_stat_latency_100vb_sets_and_dcp( ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1)1311 static enum test_result perf_slow_stat_latency_100vb_sets_and_dcp(
1312                                       ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
1313     return perf_stat_latency(h, h1, "Slow Stats with 100 vbuckets and background sets and DCP",
1314                              StatRuntime::Slow, (BackgroundWork::Sets |
1315                                      BackgroundWork::Dcp), 100);
1316 }
1317 
1318 /*****************************************************************************
1319  * List of testcases
1320  *****************************************************************************/
1321 
1322 const char *default_dbname = "./perf_test";
1323 
1324 BaseTestCase testsuite_testcases[] = {
1325         TestCase("Baseline latency", perf_latency_baseline,
1326                  test_setup, teardown,
1327                  "backend=couchdb;ht_size=393209",
1328                  prepare, cleanup),
1329         TestCase("Defragmenter latency", perf_latency_defragmenter,
1330                  test_setup, teardown,
1331                  "backend=couchdb;ht_size=393209"
1332                  // Run defragmenter constantly.
1333                  ";defragmenter_interval=0",
1334                  prepare, cleanup),
1335         TestCase("Expiry pager latency", perf_latency_expiry_pager,
1336                  test_setup, teardown,
1337                  "backend=couchdb;ht_size=393209"
1338                  // Run expiry pager constantly.
1339                  ";exp_pager_stime=0",
1340                  prepare, cleanup),
1341         TestCaseV2("Multi bucket latency", perf_latency_baseline_multi_bucket_2,
1342                    NULL, NULL,
1343                    "backend=couchdb;ht_size=393209",
1344                    prepare, cleanup),
1345         TestCaseV2("Multi bucket latency", perf_latency_baseline_multi_bucket_4,
1346                    NULL, NULL,
1347                    "backend=couchdb;ht_size=393209",
1348                    prepare, cleanup),
1349         TestCase("DCP latency (Padded JSON)", perf_dcp_latency_with_padded_json,
1350                  test_setup, teardown,
1351                  "backend=couchdb;ht_size=393209",
1352                  prepare, cleanup),
1353         TestCase("DCP latency (Random JSON)", perf_dcp_latency_with_random_json,
1354                  test_setup, teardown,
1355                  "backend=couchdb;ht_size=393209",
1356                  prepare, cleanup),
1357         TestCase("DCP latency (Random BIN)", perf_dcp_latency_with_random_binary,
1358                  test_setup, teardown,
1359                  "backend=couchdb;ht_size=393209",
1360                  prepare, cleanup),
1361         TestCaseV2("Multi thread latency", perf_multi_thread_latency,
1362                    NULL, NULL,
1363                    "backend=couchdb;ht_size=393209",
1364                    prepare, cleanup),
1365 
1366         TestCase("DCP impact on front-end latency", perf_latency_dcp_impact,
1367                  test_setup, teardown,
1368                  "backend=couchdb;ht_size=393209",
1369                  prepare, cleanup),
1370 
1371         TestCase("Baseline Stat latency", perf_stat_latency_baseline,
1372                  test_setup, teardown,
1373                  "backend=couchdb;ht_size=393209",
1374                  prepare, cleanup),
1375         TestCase("Stat latency with 100 active vbuckets",
1376                  perf_stat_latency_100vb, test_setup, teardown,
1377                  "backend=couchdb;ht_size=393209",
1378                  prepare, cleanup),
1379         TestCase("Stat latency with 100 vbuckets. Also sets & DCP traffic on "
1380                  "separate thread",
1381                  perf_stat_latency_100vb_sets_and_dcp, test_setup, teardown,
1382                  "backend=couchdb;ht_size=393209",
1383                  prepare, cleanup),
1384         TestCase("Baseline Slow Stat latency", perf_slow_stat_latency_baseline,
1385                  test_setup, teardown, "backend=couchdb;ht_size=393209",
1386                  prepare, cleanup),
1387         TestCase("Stat latency with 100 active vbuckets",
1388                  perf_slow_stat_latency_100vb,
1389                  test_setup, teardown, "backend=couchdb;ht_size=393209",
1390                  prepare, cleanup),
1391         TestCase("Stat latency with 100 vbuckets. Also sets & DCP traffic on "
1392                  "separate thread",
1393                  perf_slow_stat_latency_100vb_sets_and_dcp, test_setup,
1394                  teardown, "backend=couchdb;ht_size=393209", prepare, cleanup),
1395 
1396         TestCase(NULL, NULL, NULL, NULL,
1397                  "backend=couchdb", prepare, cleanup)
1398 };
1399