1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 //
19 // couch_create, a program for 'offline' generation of Couchbase compatible
20 // couchstore files.
21 //
22 
23 #include <getopt.h>
24 #include <inttypes.h>
25 #include <stddef.h>
26 #include <stdint.h>
27 #include <atomic>
28 #include <chrono>
29 #include <climits>
30 #include <condition_variable>
31 #include <cstdlib>
32 #include <cstring>
33 #include <deque>
34 #include <exception>
35 #include <iostream>
36 #include <memory>
37 #include <mutex>
38 #include <set>
39 #include <sstream>
40 #include <string>
41 #include <thread>
42 #include <vector>
43 #include "crc32.h"
44 
45 #include "libcouchstore/couch_db.h"
46 
47 enum VBucketState { VB_ACTIVE, VB_REPLICA, VB_UNMANAGED };
48 
49 enum DocType {
50     BINARY_DOC,
51     BINARY_DOC_COMPRESSED,
52     JSON_DOC,
53     JSON_DOC_COMPRESSED
54 };
55 
56 //
57 // ProgramParameters:
58 // An object to process argv/argc and carry around the parameters to operate
59 // with.
60 //
61 class ProgramParameters {
62 public:
63     // Define all program defaults as static
64     static const bool reuse_couch_files_default = false;
65     static const int vbc_default = 1024;
66     static const uint64_t key_count_default = 0;
67     static const int keys_per_flush_default = 512;
68     static const int doc_len_default = 256;
69     static const int keys_per_vbucket_default = false;
70     static const uint64_t start_key_default = 0;
71     static const bool low_compression_default = false;
72     static const DocType doc_type_default = BINARY_DOC_COMPRESSED;
73     static const int flusher_count_default = 8;
74 
75     //
76     // Construct a program parameters, all parameters assigned default settings
77     //
ProgramParameters()78     ProgramParameters()
79         : reuse_couch_files(reuse_couch_files_default),
80           vbc(vbc_default),
81           key_count(key_count_default),
82           keys_per_vbucket(keys_per_vbucket_default),
83           keys_per_flush(keys_per_flush_default),
84           doc_len(doc_len_default),
85           doc_type(doc_type_default),
86           vbuckets(vbc_default),
87           vbuckets_managed(0),
88           start_key(start_key_default),
89           low_compression(low_compression_default),
90           flusher_count(flusher_count_default) {
91         fill(vbuckets.begin(), vbuckets.end(), VB_UNMANAGED);
92     }
93 
load(int argc, char** argv)94     void load(int argc, char** argv) {
95         const int KEYS_PER_VBUCKET = 1000;
96         while (1) {
97             static struct option long_options[] = {
98                     {"reuse", no_argument, 0, 'r'},
99                     {"vbc", required_argument, 0, 'v'},
100                     {"keys", required_argument, 0, 'k'},
101                     {"keys-per-vbucket", no_argument, 0, KEYS_PER_VBUCKET},
102                     {"keys-per-flush", required_argument, 0, 'f'},
103                     {"doc-len", required_argument, 0, 'd'},
104                     {"doc-type", required_argument, 0, 't'},
105                     {"start-key", required_argument, 0, 's'},
106                     {"low-compression", no_argument, 0, 'l'},
107                     {0, 0, 0, 0}};
108             /* getopt_long stores the option index here. */
109             int option_index = 0;
110 
111             int c = getopt_long(
112                     argc, argv, "s:v:k:f:d:t:rl", long_options, &option_index);
113 
114             /* Detect the end of the options. */
115             if (c == -1) {
116                 break;
117             }
118 
119             switch (c) {
120             case 'v': {
121                 vbc = static_cast<int16_t>(atoi(optarg));
122                 vbuckets.resize(vbc);
123                 break;
124             }
125 
126             case 'k': {
127                 key_count = strtoull(optarg, 0, 10);
128                 break;
129             }
130 
131             case 'f': {
132                 keys_per_flush = atoi(optarg);
133                 break;
134             }
135 
136             case 'd': {
137                 doc_len = atoi(optarg);
138                 break;
139             }
140 
141             case 'r': {
142                 reuse_couch_files = true;
143                 break;
144             }
145 
146             case 'l': {
147                 low_compression = true;
148                 break;
149             }
150 
151             case 's': {
152                 start_key = strtoull(optarg, 0, 10);
153                 break;
154             }
155 
156             case 't': {
157                 if (strcmp(optarg, "binary") == 0) {
158                     doc_type = BINARY_DOC;
159                 } else if (strcmp(optarg, "binarycompressed") == 0) {
160                     doc_type = BINARY_DOC_COMPRESSED;
161                 }
162                 break;
163             }
164 
165             case KEYS_PER_VBUCKET: {
166                 keys_per_vbucket = true;
167                 break;
168             }
169 
170             default: { usage(1); }
171             }
172         } // end of option parsing
173 
174         // Now are we managing all vbuckets, or a list?
175         if (optind < argc) {
176             while (optind < argc) {
177                 int i = atoi(argv[optind]);
178                 if (i < vbc) {
179                     // a or r present?
180                     VBucketState s = VB_ACTIVE;
181                     for (size_t i = 0; i < strlen(argv[optind]); i++) {
182                         if (argv[optind][i] == 'a') {
183                             s = VB_ACTIVE;
184                         } else if (argv[optind][i] == 'r') {
185                             s = VB_REPLICA;
186                         }
187                     }
188                     vbuckets[i] = s;
189                     vbuckets_managed++; // keep track of how many we are
190                     // managing
191                     std::cout << "Managing VB " << i;
192                     if (s == VB_ACTIVE) {
193                         std::cout << " active" << std::endl;
194                     } else {
195                         std::cout << " replica" << std::endl;
196                     }
197 
198                     optind++;
199                 }
200             }
201         } else {
202             for (int i = 0; i < vbc; i++) {
203                 vbuckets[i] = VB_ACTIVE;
204                 vbuckets_managed++;
205             }
206         }
207     }
208 
209     //
210     // return true if the current parameters are good, else print an error and
211     // return false.
212     //
validate() const213     bool validate() const {
214         if (vbc <= 0) {
215             std::cerr << "Error: vbc less than or equal to 0 - " << vbc
216                       << std::endl;
217             return false;
218         }
219 
220         // this ensures that the program doesn't run away with no args...
221         if (key_count == 0) {
222             std::cerr << "Key count 0 or not specified, use -k to set key "
223                          "count to "
224                          "greater than 0"
225                       << std::endl;
226             return false;
227         }
228         return true;
229     }
230 
get_vbc() const231     int16_t get_vbc() const {
232         return vbc;
233     }
234 
get_key_count() const235     uint64_t get_key_count() const {
236         return key_count;
237     }
238 
get_keys_per_flush() const239     int get_keys_per_flush() const {
240         return keys_per_flush;
241     }
242 
get_doc_len() const243     int get_doc_len() const {
244         return doc_len;
245     }
246 
get_reuse_couch_files() const247     bool get_reuse_couch_files() const {
248         return reuse_couch_files;
249     }
250 
get_doc_type_string() const251     std::string get_doc_type_string() const {
252         switch (doc_type) {
253         case BINARY_DOC: {
254             return std::string("binary");
255             break;
256         }
257         case BINARY_DOC_COMPRESSED: {
258             return std::string("binary compressed");
259             break;
260         }
261         case JSON_DOC: {
262             return std::string("JSON");
263             break;
264         }
265         case JSON_DOC_COMPRESSED: {
266             return std::string("JSON compressed");
267             break;
268         }
269         }
270         return std::string("getDocTypeString failure");
271     }
272 
get_doc_type() const273     DocType get_doc_type() const {
274         return doc_type;
275     }
276 
is_keys_per_vbucket() const277     bool is_keys_per_vbucket() const {
278         return keys_per_vbucket;
279     }
280 
is_vbucket_managed(int vb) const281     bool is_vbucket_managed(int vb) const {
282         if (vb > vbc) {
283             return false;
284         }
285         return vbuckets[vb] != VB_UNMANAGED;
286     }
287 
get_vbuckets_managed()288     int get_vbuckets_managed() {
289         return vbuckets_managed;
290     }
291 
get_start_key()292     uint64_t get_start_key() {
293         return start_key;
294     }
295 
get_vbucket_state(int vb) const296     VBucketState get_vbucket_state(int vb) const {
297         return vbuckets[vb];
298     }
299 
disable_vbucket(int vb)300     void disable_vbucket(int vb) {
301         static std::mutex lock;
302         std::unique_lock<std::mutex> lck(lock);
303         vbuckets[vb] = VB_UNMANAGED;
304         vbuckets_managed--;
305     }
306 
is_low_compression()307     bool is_low_compression() {
308         return low_compression;
309     }
310 
get_flusher_count()311     int get_flusher_count() {
312         return flusher_count;
313     }
314 
usage(int exit_code)315     static void usage(int exit_code) {
316         std::cerr << std::endl;
317         std::cerr << "couch_create <options> <vbucket list>" << std::endl;
318         std::cerr << "options:" << std::endl;
319         std::cerr << "    --reuse,-r: Reuse couch-files (any re-used file must "
320                      "have "
321                      "a vbstate document) (default "
322                   << reuse_couch_files_default << ")." << std::endl;
323         std::cerr << "    --vbc, -v <integer>:  Number of vbuckets (default "
324                   << vbc_default << ")." << std::endl;
325         std::cerr << "    --keys, -k <integer>:  Number of keys to create "
326                      "(default "
327                   << key_count_default << ")." << std::endl;
328         std::cerr << "    --keys-per-vbucket:  The keys value is how many keys "
329                      "for "
330                      "each vbucket default "
331                   << keys_per_vbucket_default << ")." << std::endl;
332         std::cerr << "    --keys-per-flush, -f <integer>:  Number of keys per "
333                      "vbucket before committing to disk (default "
334                   << keys_per_flush_default << ")." << std::endl;
335         std::cerr << "    --doc-len,-d <integer>:  Number of bytes for the "
336                      "document "
337                      "body (default "
338                   << doc_len_default << ")." << std::endl;
339         std::cerr << "    --doc-type,-t <binary|binarycompressed>:  Document "
340                      "type."
341                   << std::endl;
342         std::cerr << "    --start-key,-s <integer>:  Specify the first key "
343                      "number "
344                      "(default "
345                   << start_key_default << ")." << std::endl;
346         std::cerr << "    --low-compression,-l: Generate documents that don't "
347                      "compress well (default "
348                   << low_compression_default << ")." << std::endl;
349 
350         std::cerr << std::endl
351                   << "vbucket list (optional space separated values):"
352                   << std::endl;
353         std::cerr
354                 << "    Specify a list of vbuckets to manage and optionally "
355                    "the "
356                    "state. "
357                 << std::endl
358                 << "E.g. VB 1 can be specified as '1' (defaults to active when "
359                    "creating vbuckets) or '1a' (for active) or '1r' (for "
360                    "replica)."
361                 << std::endl
362                 << "Omiting the vbucket list means all vbuckets will be "
363                    "created."
364                 << std::endl;
365 
366         std::cerr
367                 << "Two modes of operation:" << std::endl
368                 << "    1) Re-use vbuckets (--reuse or -r) \"Automatic mode\":"
369                 << std::endl
370                 << "    In this mode of operation the program will only write "
371                    "key/values into vbucket files it finds in the current "
372                    "directory."
373                 << std::endl
374                 << "    Ideally the vbucket files are empty of documents, but "
375                    "must have a vbstate local doc."
376                 << std::endl
377                 << "    The intent of this mode is for a cluster and bucket to "
378                    "be "
379                    "pre-created, but empty and then to simply "
380                 << std::endl
381                 << "    populate the files found on each node without having "
382                    "to "
383                    "consider which are active/replica."
384                 << std::endl
385                 << std::endl;
386         ;
387 
388         std::cerr
389                 << "    2) Create vbuckets:" << std::endl
390                 << "    In this mode of operation the program will create new "
391                    "vbucket files. The user must make the decision about what "
392                    "is "
393                    "active/replica"
394                 << std::endl
395                 << std::endl;
396 
397         std::cerr << "Examples: " << std::endl;
398         std::cerr
399                 << "  Create 1024 active vbuckets containing 10,000, 256 byte "
400                    "binary documents."
401                 << std::endl;
402         std::cerr << "    > ./couch_create -k 10000" << std::endl << std::endl;
403         std::cerr << "  Iterate over 10,000 keys, but only generate vbuckets "
404                      "0, 1, "
405                      "2 and 3 with a mix of active/replica"
406                   << std::endl;
407         std::cerr << "    > ./couch_create -k 10000 0a 1r 2a 3r" << std::endl
408                   << std::endl;
409         std::cerr
410                 << "  Iterate over 10,000 keys and re-use existing couch-files"
411                 << std::endl;
412         std::cerr << "    > ./couch_create -k 10000 -r" << std::endl
413                   << std::endl;
414         std::cerr << "  Create 10000 keys for each vbucket and re-use existing "
415                      "couch-files"
416                   << std::endl;
417         std::cerr << "    > ./couch_create -k 10000 --keys-per-vbucket -r"
418                   << std::endl
419                   << std::endl;
420 
421         exit(exit_code);
422     }
423 
424 private:
425     bool reuse_couch_files;
426     int16_t vbc;
427     uint64_t key_count;
428     bool keys_per_vbucket;
429     int keys_per_flush;
430     int doc_len;
431     DocType doc_type;
432     std::vector<VBucketState> vbuckets;
433     int vbuckets_managed;
434     uint64_t start_key;
435     bool low_compression;
436     int flusher_count;
437 };
438 
439 //
440 // Class representing a single couchstore document
441 //
442 class Document {
443     class Meta {
444     public:
445         // Create the meta, cas is a millisecond timestamp
Meta(std::chrono::time_point<std::chrono::high_resolution_clock> casTime, uint32_t e, uint32_t f)446         Meta(std::chrono::time_point<std::chrono::high_resolution_clock>
447                      casTime,
448              uint32_t e,
449              uint32_t f)
450             : cas(std::chrono::duration_cast<std::chrono::microseconds>(
451                           casTime.time_since_epoch())
452                           .count() &
453                   0xFFFF),
454               exptime(e),
455               flags(f),
456               flex_meta_code(0x01),
457               flex_value(0x0) {
458         }
459 
set_exptime(uint32_t exptime)460         void set_exptime(uint32_t exptime) {
461             this->exptime = exptime;
462         }
463 
set_flags(uint32_t flags)464         void set_flags(uint32_t flags) {
465             this->flags = flags;
466         }
467 
get_size() const468         size_t get_size() const {
469             // Not safe to use sizeof(Meta) due to trailing padding
470             return sizeof(cas) + sizeof(exptime) + sizeof(flags) +
471                    sizeof(flex_meta_code) + sizeof(flex_value);
472         }
473 
474     public:
475         uint64_t cas;
476         uint32_t exptime;
477         uint32_t flags;
478         uint8_t flex_meta_code;
479         uint8_t flex_value;
480     };
481 
482 public:
Document(const char* k, int klen, ProgramParameters& params, int dlen)483     Document(const char* k, int klen, ProgramParameters& params, int dlen)
484         : meta(std::chrono::high_resolution_clock::now(), 0, 0),
485           key_len(klen),
486           key(NULL),
487           data_len(dlen),
488           data(NULL),
489           parameters(params),
490           doc_created(0) {
491         key = new char[klen];
492         data = new char[dlen];
493         set_doc(k, klen, dlen);
494         memset(&doc_info, 0, sizeof(DocInfo));
495         memset(&doc, 0, sizeof(Doc));
496         doc.id.buf = key;
497         doc.id.size = klen;
498         doc.data.buf = data;
499         doc.data.size = dlen;
500         doc_info.id = doc.id;
501         doc_info.size = doc.data.size;
502         doc_info.db_seq = 0; // db_seq;
503         doc_info.rev_seq = 1; // ++db_seq;
504 
505         if (params.get_doc_type() == BINARY_DOC_COMPRESSED) {
506             doc_info.content_meta =
507                     COUCH_DOC_NON_JSON_MODE | COUCH_DOC_IS_COMPRESSED;
508         } else if (params.get_doc_type() == BINARY_DOC) {
509             doc_info.content_meta = COUCH_DOC_NON_JSON_MODE;
510         } else if (params.get_doc_type() == JSON_DOC_COMPRESSED) {
511             doc_info.content_meta = COUCH_DOC_IS_JSON | COUCH_DOC_IS_COMPRESSED;
512         } else if (params.get_doc_type() == JSON_DOC) {
513             doc_info.content_meta = COUCH_DOC_IS_JSON;
514         } else {
515             doc_info.content_meta = COUCH_DOC_NON_JSON_MODE;
516         }
517 
518         doc_info.rev_meta.buf = reinterpret_cast<char*>(&meta);
519         doc_info.rev_meta.size = meta.get_size();
520         doc_info.deleted = 0;
521     }
522 
~Document()523     ~Document() {
524         delete[] key;
525         delete[] data;
526     }
527 
set_doc(const char* k, int klen, int dlen)528     void set_doc(const char* k, int klen, int dlen) {
529         if (klen > key_len) {
530             delete key;
531             key = new char[klen];
532             doc.id.buf = key;
533             doc.id.size = klen;
534             doc_info.id = doc.id;
535         }
536         if (dlen > data_len) {
537             delete data;
538             data = new char[dlen];
539             doc.data.buf = data;
540             doc.data.size = dlen;
541         }
542 
543         memcpy(key, k, klen);
544         // generate doc body only if size has changed.
545         if (doc_created != dlen) {
546             if (parameters.is_low_compression()) {
547                 srand(0);
548                 for (int data_index = 0; data_index < dlen; data_index++) {
549                     char data_value = (rand() % 255) % ('Z' - '0');
550                     data[data_index] = data_value + '0';
551                 }
552             } else {
553                 char data_value = 0;
554                 for (int data_index = 0; data_index < dlen; data_index++) {
555                     data[data_index] = data_value + '0';
556                     data_value = (data_value + 1) % ('Z' - '0');
557                 }
558             }
559             doc_created = dlen;
560         }
561     }
562 
get_doc()563     Doc* get_doc() {
564         return &doc;
565     }
566 
get_doc_info()567     DocInfo* get_doc_info() {
568         return &doc_info;
569     }
570 
571 private:
572     Doc doc;
573     DocInfo doc_info;
574     Meta meta;
575 
576     int key_len;
577     char* key;
578     int data_len;
579     char* data;
580     ProgramParameters& parameters;
581     int doc_created;
582     static uint64_t db_seq;
583 };
584 
585 uint64_t Document::db_seq = 0;
586 
587 //
588 // A class representing a VBucket.
589 // This object holds a queue of key/values (documents) and manages their writing
590 // to the couch-file.
591 //
592 class VBucket {
593 public:
594     class Exception1 : public std::exception {
what() const595         virtual const char* what() const throw() {
596             return "Found an existing couch-file with vbstate and --reuse/-r "
597                    "is not set.";
598         }
599     } exception1;
600 
601     class Exception2 : public std::exception {
what() const602         virtual const char* what() const throw() {
603             return "Didn't find valid couch-file (or found file with no "
604                    "vbstate) and --reuse/-r is set.";
605         }
606     } exception2;
607 
608     class Exception3 : public std::exception {
what() const609         virtual const char* what() const throw() {
610             return "Error opening couch_file (check ulimit -n).";
611         }
612     } exception3;
613 
614     //
615     // Constructor opens file and validates the state.
616     // throws exceptions if not safe to continue
617     //
VBucket(char* filename, int vb, uint64_t& saved_counter, ProgramParameters& params_ref)618     VBucket(char* filename,
619             int vb,
620             uint64_t& saved_counter,
621             ProgramParameters& params_ref)
622         : handle(NULL),
623           next_free_doc(0),
624           flush_threshold(params_ref.get_keys_per_flush()),
625           docs(params_ref.get_keys_per_flush()),
626           pending_documents(0),
627           documents_saved(saved_counter),
628           params(params_ref),
629           vbid(vb),
630           doc_count(0),
631           got_vbstate(false),
632           vb_seq(0),
633           ok_to_set_vbstate(true) {
634         int flags = params.get_reuse_couch_files()
635                             ? COUCHSTORE_OPEN_FLAG_RDONLY
636                             : COUCHSTORE_OPEN_FLAG_CREATE;
637 
638         couchstore_error_t err = couchstore_open_db(filename, flags, &handle);
639         if (err != COUCHSTORE_SUCCESS) {
640             throw exception3;
641         }
642 
643         if (read_vbstate()) {
644             // A vbstate document exists.
645             // Can only proceed if we're in reuse mode
646             if (!params.get_reuse_couch_files()) {
647                 destroy();
648                 throw exception1;
649             } else {
650                 // VB exists and is valid, close and open in write mode.
651                 destroy();
652                 couchstore_error_t err = couchstore_open_db(
653                         filename, COUCHSTORE_OPEN_FLAG_CREATE, &handle);
654                 if (err != COUCHSTORE_SUCCESS) {
655                     throw exception3;
656                 }
657             }
658         } else {
659             if (params.get_reuse_couch_files()) {
660                 destroy();
661                 throw exception2;
662             }
663         }
664         ok_to_set_vbstate = true;
665     }
666 
~VBucket()667     ~VBucket() {
668         save_docs();
669         if (ok_to_set_vbstate) {
670             set_vbstate(); // set/update local vbstate
671         }
672         docs.clear();
673         destroy();
674     }
675 
676     //
677     // Return true if the special vbstate document is present.
678     //
read_vbstate()679     bool read_vbstate() {
680         LocalDoc* local_doc = nullptr;
681         couchstore_error_t errCode =
682                 couchstore_open_local_document(handle,
683                                                "_local/vbstate",
684                                                sizeof("_local/vbstate") - 1,
685                                                &local_doc);
686         if (local_doc) {
687             got_vbstate = true;
688             vbstate_data.assign(local_doc->json.buf, local_doc->json.size);
689             couchstore_free_local_document(local_doc);
690         }
691         return errCode == COUCHSTORE_SUCCESS;
692     }
693 
694     //
695     // Set the special vbstate document
696     //
set_vbstate()697     void set_vbstate() {
698         std::stringstream jsonState;
699         std::string state_string;
700         if (got_vbstate) {
701             if (vbstate_data.find("replica") != std::string::npos) {
702                 state_string = "replica";
703             } else {
704                 state_string = "active";
705             }
706         } else {
707             state_string = params.get_vbucket_state(vbid) == VB_ACTIVE
708                                    ? "active"
709                                    : "replica";
710         }
711 
712         // Set max_cas to a timestamp
713         uint64_t max_cas =
714                 std::chrono::duration_cast<std::chrono::microseconds>(
715                         std::chrono::high_resolution_clock::now()
716                                 .time_since_epoch())
717                         .count() &
718                 0xFFFF;
719 
720         jsonState << "{\"state\": \"" << state_string << "\""
721                   << ",\"checkpoint_id\": \"0\""
722                   << ",\"max_deleted_seqno\": \"0\""
723                   << ",\"snap_start\": \"" << vb_seq << "\""
724                   << ",\"snap_end\": \"" << vb_seq << "\""
725                   << ",\"max_cas\": \"" << max_cas << "\""
726                   << ",\"drift_counter\": \"0\""
727                   << "}";
728 
729         auto vbstate_json = jsonState.str();
730         LocalDoc vbstate;
731         vbstate.id.buf = (char*)"_local/vbstate";
732         vbstate.id.size = sizeof("_local/vbstate") - 1;
733         vbstate.json.buf = (char*)vbstate_json.c_str();
734         vbstate.json.size = vbstate_json.size();
735         vbstate.deleted = 0;
736 
737         couchstore_error_t errCode =
738                 couchstore_save_local_document(handle, &vbstate);
739         if (errCode != COUCHSTORE_SUCCESS) {
740             std::cerr << "Warning: couchstore_save_local_document failed error="
741                       << couchstore_strerror(errCode) << std::endl;
742         }
743         couchstore_commit(handle);
744     }
745 
746     //
747     // Add a new key/value to the queue
748     // Flushes the queue if has reached the flush_threshold.
749     //
add_doc(char* k, int klen, int dlen)750     void add_doc(char* k, int klen, int dlen) {
751         if (docs[next_free_doc] == nullptr) {
752             docs[next_free_doc] =
753                     std::make_unique<Document>(k, klen, params, dlen);
754         }
755 
756         docs[next_free_doc]->set_doc(k, klen, dlen);
757         pending_documents++;
758         doc_count++;
759         vb_seq++;
760 
761         if (pending_documents == flush_threshold) {
762             save_docs();
763         } else {
764             next_free_doc++;
765         }
766     }
767 
768     //
769     // Save any pending documents to the couch file
770     //
save_docs()771     void save_docs() {
772         if (pending_documents) {
773             std::vector<Doc*> doc_array(pending_documents);
774             std::vector<DocInfo*> doc_info_array(pending_documents);
775 
776             for (int i = 0; i < pending_documents; i++) {
777                 doc_array[i] = docs[i]->get_doc();
778                 doc_info_array[i] = docs[i]->get_doc_info();
779             }
780 
781             int flags = 0;
782 
783             if (params.get_doc_type() == JSON_DOC_COMPRESSED ||
784                 params.get_doc_type() == BINARY_DOC_COMPRESSED) {
785                 flags = COMPRESS_DOC_BODIES;
786             }
787 
788             couchstore_save_documents(handle,
789                                       doc_array.data(),
790                                       doc_info_array.data(),
791                                       pending_documents,
792                                       flags);
793             couchstore_commit(handle);
794             documents_saved += pending_documents;
795             next_free_doc = 0;
796             pending_documents = 0;
797         }
798     }
799 
get_doc_count()800     uint64_t get_doc_count() {
801         return doc_count;
802     }
803 
804 private:
destroy()805     void destroy() {
806         couchstore_close_file(handle);
807         couchstore_free_db(handle);
808     }
809 
810     Db* handle;
811     int next_free_doc;
812     int flush_threshold;
813     std::vector<std::unique_ptr<Document>> docs;
814     int pending_documents;
815     uint64_t& documents_saved;
816     ProgramParameters& params;
817     int vbid;
818     uint64_t doc_count;
819     std::string vbstate_data;
820     bool got_vbstate;
821     uint64_t vb_seq;
822     bool ok_to_set_vbstate;
823 };
824 
825 class VBucketFlusher {
826 public:
VBucketFlusher(ProgramParameters& params)827     VBucketFlusher(ProgramParameters& params)
828         : started(false),
829           documents_saved(0),
830           parameters(params),
831           task_thread(&VBucketFlusher::run, this) {
832     }
833 
~VBucketFlusher()834     ~VBucketFlusher() {
835     }
836 
add_vbucket(uint16_t vbid)837     void add_vbucket(uint16_t vbid) {
838         my_vbuckets.insert(vbid);
839     }
840 
start()841     void start() {
842         while (!started) {
843             std::this_thread::sleep_for(std::chrono::milliseconds(100));
844         }
845         cvar.notify_one();
846     }
847 
join()848     void join() {
849         task_thread.join();
850     }
851 
get_documents_saved()852     uint64_t get_documents_saved() {
853         return documents_saved;
854     }
855 
856 private:
run()857     void run() {
858         std::unique_lock<std::mutex> lck(lock);
859         started = true;
860         cvar.wait(lck);
861 
862         std::vector<std::unique_ptr<VBucket>> vb_handles(parameters.get_vbc());
863         uint64_t key_value = 0;
864 
865         bool start_counting_vbuckets = false;
866 
867         char key[64];
868         uint64_t key_max = parameters.is_keys_per_vbucket()
869                                    ? ULLONG_MAX
870                                    : parameters.get_key_count();
871 
872         // Loop through the key space and create keys, test each key to see if
873         // we are managing the vbucket it maps to.
874         for (uint64_t ii = parameters.get_start_key();
875              ii < (key_max + parameters.get_start_key());
876              ii++) {
877             int key_len = snprintf(key, 64, "K%020" PRId64, key_value);
878             int vbid = client_hash_crc32(reinterpret_cast<const uint8_t*>(key),
879                                          key_len) %
880                        (parameters.get_vbc());
881 
882             // Only if the vbucket is managed generate the doc
883             if (my_vbuckets.count(vbid) > 0 &&
884                 parameters.is_vbucket_managed(vbid)) {
885                 if (vb_handles[vbid] == nullptr) {
886                     char filename[32];
887                     snprintf(filename, 32, "%d.couch.1", vbid);
888                     start_counting_vbuckets = true;
889                     try {
890                         vb_handles[vbid] = std::make_unique<VBucket>(
891                                 filename, vbid, documents_saved, parameters);
892                     } catch (std::exception& e) {
893                         std::unique_lock<std::mutex> print_lck(print_lock);
894                         std::cerr << "Not creating a VB handler for "
895                                   << filename << " \"" << e.what() << "\""
896                                   << std::endl;
897                         parameters.disable_vbucket(vbid);
898                         vb_handles[vbid].reset();
899                     }
900                 }
901 
902                 // if there's now a handle, go for it
903                 if (vb_handles[vbid] != nullptr) {
904                     vb_handles[vbid]->add_doc(
905                             key, key_len, parameters.get_doc_len());
906 
907                     // If we're generating keys per vbucket, stop managing this
908                     // vbucket when we've it the limit
909                     if (parameters.is_keys_per_vbucket() &&
910                         (vb_handles[vbid]->get_doc_count() ==
911                          parameters.get_key_count())) {
912                         vb_handles[vbid].reset(); // done with this VB
913                         parameters.disable_vbucket(vbid);
914                     }
915                 }
916             }
917             key_value++;
918 
919             // Stop when there's no more vbuckets managed, yet we're past
920             // starting
921             if (start_counting_vbuckets &&
922                 parameters.get_vbuckets_managed() == 0) {
923                 break;
924             }
925         }
926 
927         vb_handles.clear();
928     }
929 
930     std::atomic<bool> started;
931     uint64_t documents_saved;
932     ProgramParameters& parameters;
933     std::set<uint16_t> my_vbuckets;
934     std::deque<VBucket*> work_queue;
935     std::mutex lock;
936     static std::mutex print_lock;
937     std::condition_variable cvar;
938     std::thread task_thread;
939 };
940 
941 std::mutex VBucketFlusher::print_lock;
942 
main(int argc, char** argv)943 int main(int argc, char** argv) {
944     ProgramParameters parameters;
945     parameters.load(argc, argv);
946     if (!parameters.validate()) {
947         return 1;
948     }
949 
950     char key[64];
951     uint64_t key_max = parameters.is_keys_per_vbucket()
952                                ? ULLONG_MAX
953                                : parameters.get_key_count();
954     std::cout << "Generating " << parameters.get_key_count() << " keys ";
955     if (parameters.is_keys_per_vbucket()) {
956         std::cout << "per VB";
957     }
958     std::cout << std::endl;
959     if (parameters.get_key_count() > 1) {
960         std::cout << "Key pattern is ";
961         snprintf(key,
962                  64,
963                  "K%020" PRId64,
964                  (uint64_t)0 + parameters.get_start_key());
965         std::cout << key << " to ";
966         snprintf(key,
967                  64,
968                  "K%020" PRId64,
969                  (uint64_t)((key_max + parameters.get_start_key()) - 1));
970         std::cout << key << std::endl;
971     } else {
972         std::cout << "Key pattern is K00000000000000000000" << std::endl;
973     }
974 
975     std::cout << "vbucket count set to " << parameters.get_vbc() << std::endl;
976     std::cout << "keys per flush set to " << parameters.get_keys_per_flush()
977               << std::endl;
978     std::cout << "Document type is " << parameters.get_doc_len() << " bytes "
979               << parameters.get_doc_type_string() << std::endl;
980 
981     if (parameters.get_reuse_couch_files()) {
982         std::cout << "Re-using any existing couch-files" << std::endl;
983     } else {
984         std::cout << "Creating new couch-files" << std::endl;
985     }
986 
987     std::vector<std::unique_ptr<VBucketFlusher>> vb_flushers(
988             parameters.get_flusher_count());
989     uint64_t documents_saved = 0;
990 
991     // loop through VBs and distribute to flushers.
992     for (int vbid = 0; vbid < parameters.get_vbc(); vbid++) {
993         if (parameters.is_vbucket_managed(vbid)) {
994             if (vb_flushers[vbid % parameters.get_flusher_count()] == nullptr) {
995                 vb_flushers[vbid % parameters.get_flusher_count()] =
996                         std::make_unique<VBucketFlusher>(parameters);
997             }
998             vb_flushers[vbid % parameters.get_flusher_count()]->add_vbucket(
999                     vbid);
1000         }
1001     }
1002 
1003     for (auto& flusher : vb_flushers) {
1004         if (flusher.get()) {
1005             flusher->start();
1006         }
1007     }
1008 
1009     for (auto& flusher : vb_flushers) {
1010         if (flusher.get()) {
1011             flusher->join();
1012             documents_saved += flusher->get_documents_saved();
1013         }
1014     }
1015 
1016     std::cout << "Saved " << documents_saved << " documents " << std::endl;
1017 
1018     vb_flushers.clear();
1019     return 0;
1020 }