xref: /5.5.2/couchstore/src/couch_create.cc (revision 30b18fb0)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18//
19// couch_create, a program for 'offline' generation of Couchbase compatible
20// couchstore files.
21//
22
23#include <getopt.h>
24#include <inttypes.h>
25#include <stddef.h>
26#include <stdint.h>
27#include <atomic>
28#include <chrono>
29#include <climits>
30#include <condition_variable>
31#include <cstdlib>
32#include <cstring>
33#include <deque>
34#include <exception>
35#include <iostream>
36#include <memory>
37#include <mutex>
38#include <set>
39#include <sstream>
40#include <string>
41#include <thread>
42#include <vector>
43#include "crc32.h"
44
45#include <platform/make_unique.h>
46#include "libcouchstore/couch_db.h"
47
48enum VBucketState { VB_ACTIVE, VB_REPLICA, VB_UNMANAGED };
49
50enum DocType {
51    BINARY_DOC,
52    BINARY_DOC_COMPRESSED,
53    JSON_DOC,
54    JSON_DOC_COMPRESSED
55};
56
57//
58// ProgramParameters:
59// An object to process argv/argc and carry around the parameters to operate
60// with.
61//
62class ProgramParameters {
63public:
64    // Define all program defaults as static
65    static const bool reuse_couch_files_default = false;
66    static const int vbc_default = 1024;
67    static const uint64_t key_count_default = 0;
68    static const int keys_per_flush_default = 512;
69    static const int doc_len_default = 256;
70    static const int keys_per_vbucket_default = false;
71    static const uint64_t start_key_default = 0;
72    static const bool low_compression_default = false;
73    static const DocType doc_type_default = BINARY_DOC_COMPRESSED;
74    static const int flusher_count_default = 8;
75
76    //
77    // Construct a program parameters, all parameters assigned default settings
78    //
79    ProgramParameters()
80        : reuse_couch_files(reuse_couch_files_default),
81          vbc(vbc_default),
82          key_count(key_count_default),
83          keys_per_vbucket(keys_per_vbucket_default),
84          keys_per_flush(keys_per_flush_default),
85          doc_len(doc_len_default),
86          doc_type(doc_type_default),
87          vbuckets(vbc_default),
88          vbuckets_managed(0),
89          start_key(start_key_default),
90          low_compression(low_compression_default),
91          flusher_count(flusher_count_default) {
92        fill(vbuckets.begin(), vbuckets.end(), VB_UNMANAGED);
93    }
94
95    void load(int argc, char** argv) {
96        const int KEYS_PER_VBUCKET = 1000;
97        while (1) {
98            static struct option long_options[] = {
99                    {"reuse", no_argument, 0, 'r'},
100                    {"vbc", required_argument, 0, 'v'},
101                    {"keys", required_argument, 0, 'k'},
102                    {"keys-per-vbucket", no_argument, 0, KEYS_PER_VBUCKET},
103                    {"keys-per-flush", required_argument, 0, 'f'},
104                    {"doc-len", required_argument, 0, 'd'},
105                    {"doc-type", required_argument, 0, 't'},
106                    {"start-key", required_argument, 0, 's'},
107                    {"low-compression", no_argument, 0, 'l'},
108                    {0, 0, 0, 0}};
109            /* getopt_long stores the option index here. */
110            int option_index = 0;
111
112            int c = getopt_long(
113                    argc, argv, "s:v:k:f:d:t:rl", long_options, &option_index);
114
115            /* Detect the end of the options. */
116            if (c == -1) {
117                break;
118            }
119
120            switch (c) {
121            case 'v': {
122                vbc = static_cast<int16_t>(atoi(optarg));
123                vbuckets.resize(vbc);
124                break;
125            }
126
127            case 'k': {
128                key_count = strtoull(optarg, 0, 10);
129                break;
130            }
131
132            case 'f': {
133                keys_per_flush = atoi(optarg);
134                break;
135            }
136
137            case 'd': {
138                doc_len = atoi(optarg);
139                break;
140            }
141
142            case 'r': {
143                reuse_couch_files = true;
144                break;
145            }
146
147            case 'l': {
148                low_compression = true;
149                break;
150            }
151
152            case 's': {
153                start_key = strtoull(optarg, 0, 10);
154                break;
155            }
156
157            case 't': {
158                if (strcmp(optarg, "binary") == 0) {
159                    doc_type = BINARY_DOC;
160                } else if (strcmp(optarg, "binarycompressed") == 0) {
161                    doc_type = BINARY_DOC_COMPRESSED;
162                }
163                break;
164            }
165
166            case KEYS_PER_VBUCKET: {
167                keys_per_vbucket = true;
168                break;
169            }
170
171            default: { usage(1); }
172            }
173        } // end of option parsing
174
175        // Now are we managing all vbuckets, or a list?
176        if (optind < argc) {
177            while (optind < argc) {
178                int i = atoi(argv[optind]);
179                if (i < vbc) {
180                    // a or r present?
181                    VBucketState s = VB_ACTIVE;
182                    for (size_t i = 0; i < strlen(argv[optind]); i++) {
183                        if (argv[optind][i] == 'a') {
184                            s = VB_ACTIVE;
185                        } else if (argv[optind][i] == 'r') {
186                            s = VB_REPLICA;
187                        }
188                    }
189                    vbuckets[i] = s;
190                    vbuckets_managed++; // keep track of how many we are
191                    // managing
192                    std::cout << "Managing VB " << i;
193                    if (s == VB_ACTIVE) {
194                        std::cout << " active" << std::endl;
195                    } else {
196                        std::cout << " replica" << std::endl;
197                    }
198
199                    optind++;
200                }
201            }
202        } else {
203            for (int i = 0; i < vbc; i++) {
204                vbuckets[i] = VB_ACTIVE;
205                vbuckets_managed++;
206            }
207        }
208    }
209
210    //
211    // return true if the current parameters are good, else print an error and
212    // return false.
213    //
214    bool validate() const {
215        if (vbc <= 0) {
216            std::cerr << "Error: vbc less than or equal to 0 - " << vbc
217                      << std::endl;
218            return false;
219        }
220
221        // this ensures that the program doesn't run away with no args...
222        if (key_count == 0) {
223            std::cerr << "Key count 0 or not specified, use -k to set key "
224                         "count to "
225                         "greater than 0"
226                      << std::endl;
227            return false;
228        }
229        return true;
230    }
231
232    int16_t get_vbc() const {
233        return vbc;
234    }
235
236    uint64_t get_key_count() const {
237        return key_count;
238    }
239
240    int get_keys_per_flush() const {
241        return keys_per_flush;
242    }
243
244    int get_doc_len() const {
245        return doc_len;
246    }
247
248    bool get_reuse_couch_files() const {
249        return reuse_couch_files;
250    }
251
252    std::string get_doc_type_string() const {
253        switch (doc_type) {
254        case BINARY_DOC: {
255            return std::string("binary");
256            break;
257        }
258        case BINARY_DOC_COMPRESSED: {
259            return std::string("binary compressed");
260            break;
261        }
262        case JSON_DOC: {
263            return std::string("JSON");
264            break;
265        }
266        case JSON_DOC_COMPRESSED: {
267            return std::string("JSON compressed");
268            break;
269        }
270        }
271        return std::string("getDocTypeString failure");
272    }
273
274    DocType get_doc_type() const {
275        return doc_type;
276    }
277
278    bool is_keys_per_vbucket() const {
279        return keys_per_vbucket;
280    }
281
282    bool is_vbucket_managed(int vb) const {
283        if (vb > vbc) {
284            return false;
285        }
286        return vbuckets[vb] != VB_UNMANAGED;
287    }
288
289    int get_vbuckets_managed() {
290        return vbuckets_managed;
291    }
292
293    uint64_t get_start_key() {
294        return start_key;
295    }
296
297    VBucketState get_vbucket_state(int vb) const {
298        return vbuckets[vb];
299    }
300
301    void disable_vbucket(int vb) {
302        static std::mutex lock;
303        std::unique_lock<std::mutex> lck(lock);
304        vbuckets[vb] = VB_UNMANAGED;
305        vbuckets_managed--;
306    }
307
308    bool is_low_compression() {
309        return low_compression;
310    }
311
312    int get_flusher_count() {
313        return flusher_count;
314    }
315
316    static void usage(int exit_code) {
317        std::cerr << std::endl;
318        std::cerr << "couch_create <options> <vbucket list>" << std::endl;
319        std::cerr << "options:" << std::endl;
320        std::cerr << "    --reuse,-r: Reuse couch-files (any re-used file must "
321                     "have "
322                     "a vbstate document) (default "
323                  << reuse_couch_files_default << ")." << std::endl;
324        std::cerr << "    --vbc, -v <integer>:  Number of vbuckets (default "
325                  << vbc_default << ")." << std::endl;
326        std::cerr << "    --keys, -k <integer>:  Number of keys to create "
327                     "(default "
328                  << key_count_default << ")." << std::endl;
329        std::cerr << "    --keys-per-vbucket:  The keys value is how many keys "
330                     "for "
331                     "each vbucket default "
332                  << keys_per_vbucket_default << ")." << std::endl;
333        std::cerr << "    --keys-per-flush, -f <integer>:  Number of keys per "
334                     "vbucket before committing to disk (default "
335                  << keys_per_flush_default << ")." << std::endl;
336        std::cerr << "    --doc-len,-d <integer>:  Number of bytes for the "
337                     "document "
338                     "body (default "
339                  << doc_len_default << ")." << std::endl;
340        std::cerr << "    --doc-type,-t <binary|binarycompressed>:  Document "
341                     "type."
342                  << std::endl;
343        std::cerr << "    --start-key,-s <integer>:  Specify the first key "
344                     "number "
345                     "(default "
346                  << start_key_default << ")." << std::endl;
347        std::cerr << "    --low-compression,-l: Generate documents that don't "
348                     "compress well (default "
349                  << low_compression_default << ")." << std::endl;
350
351        std::cerr << std::endl
352                  << "vbucket list (optional space separated values):"
353                  << std::endl;
354        std::cerr
355                << "    Specify a list of vbuckets to manage and optionally "
356                   "the "
357                   "state. "
358                << std::endl
359                << "E.g. VB 1 can be specified as '1' (defaults to active when "
360                   "creating vbuckets) or '1a' (for active) or '1r' (for "
361                   "replica)."
362                << std::endl
363                << "Omiting the vbucket list means all vbuckets will be "
364                   "created."
365                << std::endl;
366
367        std::cerr
368                << "Two modes of operation:" << std::endl
369                << "    1) Re-use vbuckets (--reuse or -r) \"Automatic mode\":"
370                << std::endl
371                << "    In this mode of operation the program will only write "
372                   "key/values into vbucket files it finds in the current "
373                   "directory."
374                << std::endl
375                << "    Ideally the vbucket files are empty of documents, but "
376                   "must have a vbstate local doc."
377                << std::endl
378                << "    The intent of this mode is for a cluster and bucket to "
379                   "be "
380                   "pre-created, but empty and then to simply "
381                << std::endl
382                << "    populate the files found on each node without having "
383                   "to "
384                   "consider which are active/replica."
385                << std::endl
386                << std::endl;
387        ;
388
389        std::cerr
390                << "    2) Create vbuckets:" << std::endl
391                << "    In this mode of operation the program will create new "
392                   "vbucket files. The user must make the decision about what "
393                   "is "
394                   "active/replica"
395                << std::endl
396                << std::endl;
397
398        std::cerr << "Examples: " << std::endl;
399        std::cerr
400                << "  Create 1024 active vbuckets containing 10,000, 256 byte "
401                   "binary documents."
402                << std::endl;
403        std::cerr << "    > ./couch_create -k 10000" << std::endl << std::endl;
404        std::cerr << "  Iterate over 10,000 keys, but only generate vbuckets "
405                     "0, 1, "
406                     "2 and 3 with a mix of active/replica"
407                  << std::endl;
408        std::cerr << "    > ./couch_create -k 10000 0a 1r 2a 3r" << std::endl
409                  << std::endl;
410        std::cerr
411                << "  Iterate over 10,000 keys and re-use existing couch-files"
412                << std::endl;
413        std::cerr << "    > ./couch_create -k 10000 -r" << std::endl
414                  << std::endl;
415        std::cerr << "  Create 10000 keys for each vbucket and re-use existing "
416                     "couch-files"
417                  << std::endl;
418        std::cerr << "    > ./couch_create -k 10000 --keys-per-vbucket -r"
419                  << std::endl
420                  << std::endl;
421
422        exit(exit_code);
423    }
424
425private:
426    bool reuse_couch_files;
427    int16_t vbc;
428    uint64_t key_count;
429    bool keys_per_vbucket;
430    int keys_per_flush;
431    int doc_len;
432    DocType doc_type;
433    std::vector<VBucketState> vbuckets;
434    int vbuckets_managed;
435    uint64_t start_key;
436    bool low_compression;
437    int flusher_count;
438};
439
440//
441// Class representing a single couchstore document
442//
443class Document {
444    class Meta {
445    public:
446        // Create the meta, cas is a millisecond timestamp
447        Meta(std::chrono::time_point<std::chrono::high_resolution_clock>
448                     casTime,
449             uint32_t e,
450             uint32_t f)
451            : cas(std::chrono::duration_cast<std::chrono::microseconds>(
452                          casTime.time_since_epoch())
453                          .count() &
454                  0xFFFF),
455              exptime(e),
456              flags(f),
457              flex_meta_code(0x01),
458              flex_value(0x0) {
459        }
460
461        void set_exptime(uint32_t exptime) {
462            this->exptime = exptime;
463        }
464
465        void set_flags(uint32_t flags) {
466            this->flags = flags;
467        }
468
469        size_t get_size() const {
470            // Not safe to use sizeof(Meta) due to trailing padding
471            return sizeof(cas) + sizeof(exptime) + sizeof(flags) +
472                   sizeof(flex_meta_code) + sizeof(flex_value);
473        }
474
475    public:
476        uint64_t cas;
477        uint32_t exptime;
478        uint32_t flags;
479        uint8_t flex_meta_code;
480        uint8_t flex_value;
481    };
482
483public:
484    Document(const char* k, int klen, ProgramParameters& params, int dlen)
485        : meta(std::chrono::high_resolution_clock::now(), 0, 0),
486          key_len(klen),
487          key(NULL),
488          data_len(dlen),
489          data(NULL),
490          parameters(params),
491          doc_created(0) {
492        key = new char[klen];
493        data = new char[dlen];
494        set_doc(k, klen, dlen);
495        memset(&doc_info, 0, sizeof(DocInfo));
496        memset(&doc, 0, sizeof(Doc));
497        doc.id.buf = key;
498        doc.id.size = klen;
499        doc.data.buf = data;
500        doc.data.size = dlen;
501        doc_info.id = doc.id;
502        doc_info.size = doc.data.size;
503        doc_info.db_seq = 0; // db_seq;
504        doc_info.rev_seq = 1; // ++db_seq;
505
506        if (params.get_doc_type() == BINARY_DOC_COMPRESSED) {
507            doc_info.content_meta =
508                    COUCH_DOC_NON_JSON_MODE | COUCH_DOC_IS_COMPRESSED;
509        } else if (params.get_doc_type() == BINARY_DOC) {
510            doc_info.content_meta = COUCH_DOC_NON_JSON_MODE;
511        } else if (params.get_doc_type() == JSON_DOC_COMPRESSED) {
512            doc_info.content_meta = COUCH_DOC_IS_JSON | COUCH_DOC_IS_COMPRESSED;
513        } else if (params.get_doc_type() == JSON_DOC) {
514            doc_info.content_meta = COUCH_DOC_IS_JSON;
515        } else {
516            doc_info.content_meta = COUCH_DOC_NON_JSON_MODE;
517        }
518
519        doc_info.rev_meta.buf = reinterpret_cast<char*>(&meta);
520        doc_info.rev_meta.size = meta.get_size();
521        doc_info.deleted = 0;
522    }
523
524    ~Document() {
525        delete[] key;
526        delete[] data;
527    }
528
529    void set_doc(const char* k, int klen, int dlen) {
530        if (klen > key_len) {
531            delete key;
532            key = new char[klen];
533            doc.id.buf = key;
534            doc.id.size = klen;
535            doc_info.id = doc.id;
536        }
537        if (dlen > data_len) {
538            delete data;
539            data = new char[dlen];
540            doc.data.buf = data;
541            doc.data.size = dlen;
542        }
543
544        memcpy(key, k, klen);
545        // generate doc body only if size has changed.
546        if (doc_created != dlen) {
547            if (parameters.is_low_compression()) {
548                srand(0);
549                for (int data_index = 0; data_index < dlen; data_index++) {
550                    char data_value = (rand() % 255) % ('Z' - '0');
551                    data[data_index] = data_value + '0';
552                }
553            } else {
554                char data_value = 0;
555                for (int data_index = 0; data_index < dlen; data_index++) {
556                    data[data_index] = data_value + '0';
557                    data_value = (data_value + 1) % ('Z' - '0');
558                }
559            }
560            doc_created = dlen;
561        }
562    }
563
564    Doc* get_doc() {
565        return &doc;
566    }
567
568    DocInfo* get_doc_info() {
569        return &doc_info;
570    }
571
572private:
573    Doc doc;
574    DocInfo doc_info;
575    Meta meta;
576
577    int key_len;
578    char* key;
579    int data_len;
580    char* data;
581    ProgramParameters& parameters;
582    int doc_created;
583    static uint64_t db_seq;
584};
585
586uint64_t Document::db_seq = 0;
587
588//
589// A class representing a VBucket.
590// This object holds a queue of key/values (documents) and manages their writing
591// to the couch-file.
592//
593class VBucket {
594public:
595    class Exception1 : public std::exception {
596        virtual const char* what() const throw() {
597            return "Found an existing couch-file with vbstate and --reuse/-r "
598                   "is not set.";
599        }
600    } exception1;
601
602    class Exception2 : public std::exception {
603        virtual const char* what() const throw() {
604            return "Didn't find valid couch-file (or found file with no "
605                   "vbstate) and --reuse/-r is set.";
606        }
607    } exception2;
608
609    class Exception3 : public std::exception {
610        virtual const char* what() const throw() {
611            return "Error opening couch_file (check ulimit -n).";
612        }
613    } exception3;
614
615    //
616    // Constructor opens file and validates the state.
617    // throws exceptions if not safe to continue
618    //
619    VBucket(char* filename,
620            int vb,
621            uint64_t& saved_counter,
622            ProgramParameters& params_ref)
623        : handle(NULL),
624          next_free_doc(0),
625          flush_threshold(params_ref.get_keys_per_flush()),
626          docs(params_ref.get_keys_per_flush()),
627          pending_documents(0),
628          documents_saved(saved_counter),
629          params(params_ref),
630          vbid(vb),
631          doc_count(0),
632          got_vbstate(false),
633          vb_seq(0),
634          ok_to_set_vbstate(true) {
635        int flags = params.get_reuse_couch_files()
636                            ? COUCHSTORE_OPEN_FLAG_RDONLY
637                            : COUCHSTORE_OPEN_FLAG_CREATE;
638
639        couchstore_error_t err = couchstore_open_db(filename, flags, &handle);
640        if (err != COUCHSTORE_SUCCESS) {
641            throw exception3;
642        }
643
644        if (read_vbstate()) {
645            // A vbstate document exists.
646            // Can only proceed if we're in reuse mode
647            if (!params.get_reuse_couch_files()) {
648                destroy();
649                throw exception1;
650            } else {
651                // VB exists and is valid, close and open in write mode.
652                destroy();
653                couchstore_error_t err = couchstore_open_db(
654                        filename, COUCHSTORE_OPEN_FLAG_CREATE, &handle);
655                if (err != COUCHSTORE_SUCCESS) {
656                    throw exception3;
657                }
658            }
659        } else {
660            if (params.get_reuse_couch_files()) {
661                destroy();
662                throw exception2;
663            }
664        }
665        ok_to_set_vbstate = true;
666    }
667
668    ~VBucket() {
669        save_docs();
670        if (ok_to_set_vbstate) {
671            set_vbstate(); // set/update local vbstate
672        }
673        docs.clear();
674        destroy();
675    }
676
677    //
678    // Return true if the special vbstate document is present.
679    //
680    bool read_vbstate() {
681        LocalDoc* local_doc = nullptr;
682        couchstore_error_t errCode =
683                couchstore_open_local_document(handle,
684                                               "_local/vbstate",
685                                               sizeof("_local/vbstate") - 1,
686                                               &local_doc);
687        if (local_doc) {
688            got_vbstate = true;
689            vbstate_data.assign(local_doc->json.buf, local_doc->json.size);
690            couchstore_free_local_document(local_doc);
691        }
692        return errCode == COUCHSTORE_SUCCESS;
693    }
694
695    //
696    // Set the special vbstate document
697    //
698    void set_vbstate() {
699        std::stringstream jsonState;
700        std::string state_string;
701        if (got_vbstate) {
702            if (vbstate_data.find("replica") != std::string::npos) {
703                state_string = "replica";
704            } else {
705                state_string = "active";
706            }
707        } else {
708            state_string = params.get_vbucket_state(vbid) == VB_ACTIVE
709                                   ? "active"
710                                   : "replica";
711        }
712
713        // Set max_cas to a timestamp
714        uint64_t max_cas =
715                std::chrono::duration_cast<std::chrono::microseconds>(
716                        std::chrono::high_resolution_clock::now()
717                                .time_since_epoch())
718                        .count() &
719                0xFFFF;
720
721        jsonState << "{\"state\": \"" << state_string << "\""
722                  << ",\"checkpoint_id\": \"0\""
723                  << ",\"max_deleted_seqno\": \"0\""
724                  << ",\"snap_start\": \"" << vb_seq << "\""
725                  << ",\"snap_end\": \"" << vb_seq << "\""
726                  << ",\"max_cas\": \"" << max_cas << "\""
727                  << ",\"drift_counter\": \"0\""
728                  << "}";
729
730        auto vbstate_json = jsonState.str();
731        LocalDoc vbstate;
732        vbstate.id.buf = (char*)"_local/vbstate";
733        vbstate.id.size = sizeof("_local/vbstate") - 1;
734        vbstate.json.buf = (char*)vbstate_json.c_str();
735        vbstate.json.size = vbstate_json.size();
736        vbstate.deleted = 0;
737
738        couchstore_error_t errCode =
739                couchstore_save_local_document(handle, &vbstate);
740        if (errCode != COUCHSTORE_SUCCESS) {
741            std::cerr << "Warning: couchstore_save_local_document failed error="
742                      << couchstore_strerror(errCode) << std::endl;
743        }
744        couchstore_commit(handle);
745    }
746
747    //
748    // Add a new key/value to the queue
749    // Flushes the queue if has reached the flush_threshold.
750    //
751    void add_doc(char* k, int klen, int dlen) {
752        if (docs[next_free_doc] == nullptr) {
753            docs[next_free_doc] =
754                    std::make_unique<Document>(k, klen, params, dlen);
755        }
756
757        docs[next_free_doc]->set_doc(k, klen, dlen);
758        pending_documents++;
759        doc_count++;
760        vb_seq++;
761
762        if (pending_documents == flush_threshold) {
763            save_docs();
764        } else {
765            next_free_doc++;
766        }
767    }
768
769    //
770    // Save any pending documents to the couch file
771    //
772    void save_docs() {
773        if (pending_documents) {
774            std::vector<Doc*> doc_array(pending_documents);
775            std::vector<DocInfo*> doc_info_array(pending_documents);
776
777            for (int i = 0; i < pending_documents; i++) {
778                doc_array[i] = docs[i]->get_doc();
779                doc_info_array[i] = docs[i]->get_doc_info();
780            }
781
782            int flags = 0;
783
784            if (params.get_doc_type() == JSON_DOC_COMPRESSED ||
785                params.get_doc_type() == BINARY_DOC_COMPRESSED) {
786                flags = COMPRESS_DOC_BODIES;
787            }
788
789            couchstore_save_documents(handle,
790                                      doc_array.data(),
791                                      doc_info_array.data(),
792                                      pending_documents,
793                                      flags);
794            couchstore_commit(handle);
795            documents_saved += pending_documents;
796            next_free_doc = 0;
797            pending_documents = 0;
798        }
799    }
800
801    uint64_t get_doc_count() {
802        return doc_count;
803    }
804
805private:
806    void destroy() {
807        couchstore_close_file(handle);
808        couchstore_free_db(handle);
809    }
810
811    Db* handle;
812    int next_free_doc;
813    int flush_threshold;
814    std::vector<std::unique_ptr<Document>> docs;
815    int pending_documents;
816    uint64_t& documents_saved;
817    ProgramParameters& params;
818    int vbid;
819    uint64_t doc_count;
820    std::string vbstate_data;
821    bool got_vbstate;
822    uint64_t vb_seq;
823    bool ok_to_set_vbstate;
824};
825
826class VBucketFlusher {
827public:
828    VBucketFlusher(ProgramParameters& params)
829        : started(false),
830          documents_saved(0),
831          parameters(params),
832          task_thread(&VBucketFlusher::run, this) {
833    }
834
835    ~VBucketFlusher() {
836    }
837
838    void add_vbucket(uint16_t vbid) {
839        my_vbuckets.insert(vbid);
840    }
841
842    void start() {
843        while (!started) {
844            std::this_thread::sleep_for(std::chrono::milliseconds(100));
845        }
846        cvar.notify_one();
847    }
848
849    void join() {
850        task_thread.join();
851    }
852
853    uint64_t get_documents_saved() {
854        return documents_saved;
855    }
856
857private:
858    void run() {
859        std::unique_lock<std::mutex> lck(lock);
860        started = true;
861        cvar.wait(lck);
862
863        std::vector<std::unique_ptr<VBucket>> vb_handles(parameters.get_vbc());
864        uint64_t key_value = 0;
865
866        bool start_counting_vbuckets = false;
867
868        char key[64];
869        uint64_t key_max = parameters.is_keys_per_vbucket()
870                                   ? ULLONG_MAX
871                                   : parameters.get_key_count();
872
873        // Loop through the key space and create keys, test each key to see if
874        // we are managing the vbucket it maps to.
875        for (uint64_t ii = parameters.get_start_key();
876             ii < (key_max + parameters.get_start_key());
877             ii++) {
878            int key_len = snprintf(key, 64, "K%020" PRId64, key_value);
879            int vbid = client_hash_crc32(reinterpret_cast<const uint8_t*>(key),
880                                         key_len) %
881                       (parameters.get_vbc());
882
883            // Only if the vbucket is managed generate the doc
884            if (my_vbuckets.count(vbid) > 0 &&
885                parameters.is_vbucket_managed(vbid)) {
886                if (vb_handles[vbid] == nullptr) {
887                    char filename[32];
888                    snprintf(filename, 32, "%d.couch.1", vbid);
889                    start_counting_vbuckets = true;
890                    try {
891                        vb_handles[vbid] = std::make_unique<VBucket>(
892                                filename, vbid, documents_saved, parameters);
893                    } catch (std::exception& e) {
894                        std::unique_lock<std::mutex> print_lck(print_lock);
895                        std::cerr << "Not creating a VB handler for "
896                                  << filename << " \"" << e.what() << "\""
897                                  << std::endl;
898                        parameters.disable_vbucket(vbid);
899                        vb_handles[vbid].reset();
900                    }
901                }
902
903                // if there's now a handle, go for it
904                if (vb_handles[vbid] != nullptr) {
905                    vb_handles[vbid]->add_doc(
906                            key, key_len, parameters.get_doc_len());
907
908                    // If we're generating keys per vbucket, stop managing this
909                    // vbucket when we've it the limit
910                    if (parameters.is_keys_per_vbucket() &&
911                        (vb_handles[vbid]->get_doc_count() ==
912                         parameters.get_key_count())) {
913                        vb_handles[vbid].reset(); // done with this VB
914                        parameters.disable_vbucket(vbid);
915                    }
916                }
917            }
918            key_value++;
919
920            // Stop when there's no more vbuckets managed, yet we're past
921            // starting
922            if (start_counting_vbuckets &&
923                parameters.get_vbuckets_managed() == 0) {
924                break;
925            }
926        }
927
928        vb_handles.clear();
929    }
930
931    std::atomic<bool> started;
932    uint64_t documents_saved;
933    ProgramParameters& parameters;
934    std::set<uint16_t> my_vbuckets;
935    std::deque<VBucket*> work_queue;
936    std::mutex lock;
937    static std::mutex print_lock;
938    std::condition_variable cvar;
939    std::thread task_thread;
940};
941
942std::mutex VBucketFlusher::print_lock;
943
944int main(int argc, char** argv) {
945    ProgramParameters parameters;
946    parameters.load(argc, argv);
947    if (!parameters.validate()) {
948        return 1;
949    }
950
951    char key[64];
952    uint64_t key_max = parameters.is_keys_per_vbucket()
953                               ? ULLONG_MAX
954                               : parameters.get_key_count();
955    std::cout << "Generating " << parameters.get_key_count() << " keys ";
956    if (parameters.is_keys_per_vbucket()) {
957        std::cout << "per VB";
958    }
959    std::cout << std::endl;
960    if (parameters.get_key_count() > 1) {
961        std::cout << "Key pattern is ";
962        snprintf(key,
963                 64,
964                 "K%020" PRId64,
965                 (uint64_t)0 + parameters.get_start_key());
966        std::cout << key << " to ";
967        snprintf(key,
968                 64,
969                 "K%020" PRId64,
970                 (uint64_t)((key_max + parameters.get_start_key()) - 1));
971        std::cout << key << std::endl;
972    } else {
973        std::cout << "Key pattern is K00000000000000000000" << std::endl;
974    }
975
976    std::cout << "vbucket count set to " << parameters.get_vbc() << std::endl;
977    std::cout << "keys per flush set to " << parameters.get_keys_per_flush()
978              << std::endl;
979    std::cout << "Document type is " << parameters.get_doc_len() << " bytes "
980              << parameters.get_doc_type_string() << std::endl;
981
982    if (parameters.get_reuse_couch_files()) {
983        std::cout << "Re-using any existing couch-files" << std::endl;
984    } else {
985        std::cout << "Creating new couch-files" << std::endl;
986    }
987
988    std::vector<std::unique_ptr<VBucketFlusher>> vb_flushers(
989            parameters.get_flusher_count());
990    uint64_t documents_saved = 0;
991
992    // loop through VBs and distribute to flushers.
993    for (int vbid = 0; vbid < parameters.get_vbc(); vbid++) {
994        if (parameters.is_vbucket_managed(vbid)) {
995            if (vb_flushers[vbid % parameters.get_flusher_count()] == nullptr) {
996                vb_flushers[vbid % parameters.get_flusher_count()] =
997                        std::make_unique<VBucketFlusher>(parameters);
998            }
999            vb_flushers[vbid % parameters.get_flusher_count()]->add_vbucket(
1000                    vbid);
1001        }
1002    }
1003
1004    for (auto& flusher : vb_flushers) {
1005        if (flusher.get()) {
1006            flusher->start();
1007        }
1008    }
1009
1010    for (auto& flusher : vb_flushers) {
1011        if (flusher.get()) {
1012            flusher->join();
1013            documents_saved += flusher->get_documents_saved();
1014        }
1015    }
1016
1017    std::cout << "Saved " << documents_saved << " documents " << std::endl;
1018
1019    vb_flushers.clear();
1020    return 0;
1021}