1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2012 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#ifndef SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_
19#define SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_ 1
20
21#include "config.h"
22#include "libcouchstore/couch_db.h"
23
24#include <map>
25#include <string>
26#include <vector>
27
28#include "configuration.h"
29#include "couch-kvstore/couch-fs-stats.h"
30#include "couch-kvstore/couch-notifier.h"
31#include "histo.h"
32#include "item.h"
33#include "kvstore.h"
34#include "stats.h"
35#include "tasks.h"
36
37#define COUCHSTORE_NO_OPTIONS 0
38
39/**
40 * Stats and timings for couchKVStore
41 */
42class CouchKVStoreStats {
43
44public:
45    /**
46     * Default constructor
47     */
48    CouchKVStoreStats() :
49      docsCommitted(0), numOpen(0), numClose(0),
50      numLoadedVb(0), numGetFailure(0), numSetFailure(0),
51      numDelFailure(0), numOpenFailure(0), numVbSetFailure(0),
52      readSizeHisto(ExponentialGenerator<size_t>(1, 2), 25),
53      writeSizeHisto(ExponentialGenerator<size_t>(1, 2), 25) {
54    }
55
56    void reset() {
57        docsCommitted.store(0);
58        numOpen.store(0);
59        numClose.store(0);
60        numLoadedVb.store(0);
61        numGetFailure.store(0);
62        numSetFailure.store(0);
63        numDelFailure.store(0);
64        numOpenFailure.store(0);
65        numVbSetFailure.store(0);
66
67        readTimeHisto.reset();
68        readSizeHisto.reset();
69        writeTimeHisto.reset();
70        writeSizeHisto.reset();
71        delTimeHisto.reset();
72        compactHisto.reset();
73        commitHisto.reset();
74        saveDocsHisto.reset();
75        batchSize.reset();
76        fsStats.reset();
77    }
78
79    // the number of docs committed
80    AtomicValue<size_t> docsCommitted;
81    // the number of open() calls
82    AtomicValue<size_t> numOpen;
83    // the number of close() calls
84    AtomicValue<size_t> numClose;
85    // the number of vbuckets loaded
86    AtomicValue<size_t> numLoadedVb;
87
88    //stats tracking failures
89    AtomicValue<size_t> numGetFailure;
90    AtomicValue<size_t> numSetFailure;
91    AtomicValue<size_t> numDelFailure;
92    AtomicValue<size_t> numOpenFailure;
93    AtomicValue<size_t> numVbSetFailure;
94
95    /* for flush and vb delete, no error handling in CouchKVStore, such
96     * failure should be tracked in MC-engine  */
97
98    // How long it takes us to complete a read
99    Histogram<hrtime_t> readTimeHisto;
100    // How big are our reads?
101    Histogram<size_t> readSizeHisto;
102    // How long it takes us to complete a write
103    Histogram<hrtime_t> writeTimeHisto;
104    // How big are our writes?
105    Histogram<size_t> writeSizeHisto;
106    // Time spent in delete() calls.
107    Histogram<hrtime_t> delTimeHisto;
108    // Time spent in couchstore commit
109    Histogram<hrtime_t> commitHisto;
110    // Time spent in couchstore compaction
111    Histogram<hrtime_t> compactHisto;
112    // Time spent in couchstore save documents
113    Histogram<hrtime_t> saveDocsHisto;
114    // Batch size of saveDocs calls
115    Histogram<size_t> batchSize;
116
117    // Stats from the underlying OS file operations done by couchstore.
118    CouchstoreStats fsStats;
119};
120
121class EventuallyPersistentEngine;
122class EPStats;
123
124typedef union {
125    Callback <mutation_result> *setCb;
126    Callback <int> *delCb;
127} CouchRequestCallback;
128
129// Additional 2 Bytes included: 1 for flex_meta_code and the other for datatype field
130const size_t COUCHSTORE_METADATA_SIZE(2 * sizeof(uint32_t) + sizeof(uint64_t) +
131                                      FLEX_DATA_OFFSET + EXT_META_LEN);
132
133/**
134 * Class representing a document to be persisted in couchstore.
135 */
136class CouchRequest
137{
138public:
139    /**
140     * Constructor
141     *
142     * @param it Item instance to be persisted
143     * @param rev vbucket database revision number
144     * @param cb persistence callback
145     * @param del flag indicating if it is an item deletion or not
146     */
147    CouchRequest(const Item &it, uint64_t rev, CouchRequestCallback &cb, bool del);
148
149    /**
150     * Get the vbucket id of a document to be persisted
151     *
152     * @return vbucket id of a document
153     */
154    uint16_t getVBucketId(void) {
155        return vbucketId;
156    }
157
158    /**
159     * Get the revision number of the vbucket database file
160     * where the document is persisted
161     *
162     * @return revision number of the corresponding vbucket database file
163     */
164    uint64_t getRevNum(void) {
165        return fileRevNum;
166    }
167
168    /**
169     * Get the couchstore Doc instance of a document to be persisted
170     *
171     * @return pointer to the couchstore Doc instance of a document
172     */
173    Doc *getDbDoc(void) {
174        if (deleteItem) {
175            return NULL;
176        } else {
177            return &dbDoc;
178        }
179    }
180
181    /**
182     * Get the couchstore DocInfo instance of a document to be persisted
183     *
184     * @return pointer to the couchstore DocInfo instance of a document
185     */
186    DocInfo *getDbDocInfo(void) {
187        return &dbDocInfo;
188    }
189
190    /**
191     * Get the callback instance for SET
192     *
193     * @return callback instance for SET
194     */
195    Callback<mutation_result> *getSetCallback(void) {
196        return callback.setCb;
197    }
198
199    /**
200     * Get the callback instance for DELETE
201     *
202     * @return callback instance for DELETE
203     */
204    Callback<int> *getDelCallback(void) {
205        return callback.delCb;
206    }
207
208    /**
209     * Get the time in ns elapsed since the creation of this instance
210     *
211     * @return time in ns elapsed since the creation of this instance
212     */
213    hrtime_t getDelta() {
214        return (gethrtime() - start) / 1000;
215    }
216
217    /**
218     * Get the length of a document body to be persisted
219     *
220     * @return length of a document body
221     */
222    size_t getNBytes() {
223        return dbDocInfo.rev_meta.size + dbDocInfo.size;
224    }
225
226    /**
227     * Return true if the document to be persisted is for DELETE
228     *
229     * @return true if the document to be persisted is for DELETE
230     */
231    bool isDelete() {
232        return deleteItem;
233    };
234
235    /**
236     * Get the key of a document to be persisted
237     *
238     * @return key of a document to be persisted
239     */
240    const std::string& getKey(void) const {
241        return key;
242    }
243
244private :
245    value_t value;
246    uint8_t meta[COUCHSTORE_METADATA_SIZE];
247    uint16_t vbucketId;
248    uint64_t fileRevNum;
249    std::string key;
250    Doc dbDoc;
251    DocInfo dbDocInfo;
252    bool deleteItem;
253    CouchRequestCallback callback;
254
255    hrtime_t start;
256};
257
258/**
259 * KVStore with couchstore as the underlying storage system
260 */
261class CouchKVStore : public KVStore
262{
263public:
264    /**
265     * Constructor
266     *
267     * @param theEngine EventuallyPersistentEngine instance
268     * @param read_only flag indicating if this kvstore instance is for read-only operations
269     */
270    CouchKVStore(EPStats &stats, Configuration &config, bool read_only = false);
271
272    /**
273     * Copy constructor
274     *
275     * @param from the source kvstore instance
276     */
277    CouchKVStore(const CouchKVStore &from);
278
279    /**
280     * Deconstructor
281     */
282    virtual ~CouchKVStore() {
283        close();
284    }
285
286    /**
287     * Reset database to a clean state.
288     */
289    void reset(uint16_t shardId);
290
291    /**
292     * Begin a transaction (if not already in one).
293     *
294     * @return true if the transaction is started successfully
295     */
296    bool begin(void) {
297        cb_assert(!isReadOnly());
298        intransaction = true;
299        return intransaction;
300    }
301
302    /**
303     * Commit a transaction (unless not currently in one).
304     *
305     * @return true if the commit is completed successfully.
306     */
307    bool commit(Callback<kvstats_ctx> *cb);
308
309    /**
310     * Rollback a transaction (unless not currently in one).
311     */
312    void rollback(void) {
313        cb_assert(!isReadOnly());
314        if (intransaction) {
315            intransaction = false;
316        }
317    }
318
319    /**
320     * Query the properties of the underlying storage.
321     *
322     * @return properties of the underlying storage system
323     */
324    StorageProperties getStorageProperties(void);
325
326    /**
327     * Insert or update a given document.
328     *
329     * @param itm instance representing the document to be inserted or updated
330     * @param cb callback instance for SET
331     */
332    void set(const Item &itm, Callback<mutation_result> &cb);
333
334    /**
335     * Retrieve the document with a given key from the underlying storage system.
336     *
337     * @param key the key of a document to be retrieved
338     * @param rowid the sequence number of a document
339     * @param vb vbucket id of a document
340     * @param cb callback instance for GET
341     * @param fetchDelete True if we want to retrieve a deleted item if it not
342     *        purged yet.
343     */
344    void get(const std::string &key, uint64_t rowid,
345             uint16_t vb, Callback<GetValue> &cb, bool fetchDelete = false);
346
347    void getWithHeader(void *dbHandle, const std::string &key,
348                       uint16_t vb, Callback<GetValue> &cb,
349                       bool fetchDelete = false);
350
351    /**
352     * Retrieve the multiple documents from the underlying storage system at once.
353     *
354     * @param vb vbucket id of a document
355     * @param itms list of items whose documents are going to be retrieved
356     */
357    void getMulti(uint16_t vb, vb_bgfetch_queue_t &itms);
358
359    /**
360     * Delete a given document from the underlying storage system.
361     *
362     * @param itm instance representing the document to be deleted
363     * @param cb callback instance for DELETE
364     */
365    void del(const Item &itm, Callback<int> &cb);
366
367    /**
368     * Delete a given vbucket database instance from the underlying storage system
369     *
370     * @param vbucket vbucket id
371     * @param recreate true if we need to create an empty vbucket after deletion
372     * @return true if the vbucket deletion is completed successfully.
373     */
374    bool delVBucket(uint16_t vbucket, bool recreate);
375
376    /**
377     * Retrieve the list of persisted vbucket states
378     *
379     * @return vbucket state map instance where key is vbucket id and
380     * value is vbucket state
381     */
382    vbucket_map_t listPersistedVbuckets(void);
383
384    /**
385     * Retrieve ths list of persisted engine stats
386     *
387     * @param stats map instance where the persisted engine stats will be added
388     */
389    void getPersistedStats(std::map<std::string, std::string> &stats);
390
391    /**
392     * Persist a snapshot of the engine stats in the underlying storage.
393     *
394     * @param engine_stats map instance that contains all the engine stats
395     * @return true if the snapshot is done successfully
396     */
397    bool snapshotStats(const std::map<std::string, std::string> &engine_stats);
398
399    /**
400     * Persist a snapshot of the vbucket states in the underlying storage system.
401     *
402     * @param vb_stats map instance that contains all the vbucket states
403     * @param cb - call back for updating kv stats
404     * @return true if the snapshot is done successfully
405     */
406    bool snapshotVBuckets(const vbucket_map_t &vb_states,
407                          Callback<kvstats_ctx> *cb);
408
409     /**
410     * Compact a vbucket in the underlying storage system.
411     *
412     * @param vbid   - which vbucket needs to be compacted
413     * @param hook_ctx - details of vbucket which needs to be compacted
414     * @param cb - callback to help process newly expired items
415     * @param kvcb - callback to update kvstore stats
416     * @return true if the snapshot is done successfully
417     */
418    bool compactVBucket(const uint16_t vbid, compaction_ctx *cookie,
419                        Callback<compaction_ctx> &cb,
420                        Callback<kvstats_ctx> &kvcb);
421
422    /**
423     * Retrieve selected documents from the underlying storage system.
424     *
425     * @param vbids list of vbucket ids whose document keys are going to be retrieved
426     * @param cb callback instance to process each document retrieved
427     * @param cl callback to see if we need to read the value from disk
428     */
429    void dump(std::vector<uint16_t> &vbids, shared_ptr<Callback<GetValue> > cb,
430              shared_ptr<Callback<CacheLookup> > cl);
431
432    /**
433     * Retrieve all the documents for a given vbucket from the storage system.
434     *
435     * @param vb vbucket id
436     * @param cb callback instance to process each document retrieved
437     * @param cl callback to see if we need to read the value from disk
438     * @param sr callback to notify the caller what the range of the backfill is
439     */
440    void dump(uint16_t vb, uint64_t stSeqno,
441              shared_ptr<Callback<GetValue> > cb,
442              shared_ptr<Callback<CacheLookup> > cl,
443              shared_ptr<Callback<SeqnoRange> > sr);
444
445    /**
446     * Retrieve all the keys from the underlying storage system.
447     *
448     * @param vbids list of vbucket ids whose document keys are going to be retrieved
449     * @param cb callback instance to process each key retrieved
450     */
451    void dumpKeys(std::vector<uint16_t> &vbids,  shared_ptr<Callback<GetValue> > cb);
452
453    /**
454     * Retrieve the list of keys and their meta data for a given
455     * vbucket, which were deleted.
456     * @param vb vbucket id
457     * @param cb callback instance to process each key and its meta data
458     */
459    void dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
460                     shared_ptr<Callback<GetValue> > cb);
461
462    /**
463     * Does the underlying storage system support key-only retrieval operations?
464     *
465     * @return true if key-only retrieval is supported
466     */
467    bool isKeyDumpSupported() {
468        return true;
469    }
470
471    /**
472     * Get the estimated number of items that are going to be loaded during warmup.
473     *
474     * @return the number of estimated items to be loaded during warmup
475     */
476    size_t getEstimatedItemCount(std::vector<uint16_t> &vbs);
477
478    /**
479     * Get the number of deleted items that are persisted to a vbucket file
480     *
481     * @param vbid The vbucket if of the file to get the number of deletes for
482     */
483    size_t getNumPersistedDeletes(uint16_t vbid);
484
485    /**
486     * Get the number of non-deleted items from a vbucket database file
487     *
488     * @param vbid The vbucket of the file to get the number of docs for
489     */
490    size_t getNumItems(uint16_t vbid);
491
492    /**
493     * Get the number of non-deleted items from a vbucket database file
494     *
495     * @param vbid The vbucket of the file to get the number of docs for
496     * @param min_seq The sequence number to start the count from
497     * @param max_seq The sequence number to stop the count at
498     */
499    size_t getNumItems(uint16_t vbid, uint64_t min_seq, uint64_t max_seq);
500
501    /**
502     * Do a rollback to the specified seqNo on the particular vbucket
503     *
504     * @param vbid The vbucket of the file that's to be rolled back
505     * @param rollbackSeqno The sequence number upto which the engine needs
506     * to be rolled back
507     * @param cb getvalue callback
508     */
509    rollback_error_code rollback(uint16_t vbid, uint64_t rollbackSeqno,
510                                 shared_ptr<RollbackCB> cb);
511
512    /**
513     * Perform the pre-optimizations before persisting dirty items
514     *
515     * @param items list of dirty items that can be pre-optimized
516     */
517    void optimizeWrites(std::vector<queued_item> &items);
518
519    /**
520     * Add all the kvstore stats to the stat response
521     *
522     * @param prefix stat name prefix
523     * @param add_stat upstream function that allows us to add a stat to the response
524     * @param cookie upstream connection cookie
525     */
526    void addStats(const std::string &prefix, ADD_STAT add_stat, const void *cookie);
527
528    /**
529     * Add all the kvstore timings stats to the stat response
530     *
531     * @param prefix stat name prefix
532     * @param add_stat upstream function that allows us to add a stat to the response
533     * @param cookie upstream connection cookie
534     */
535    void addTimingStats(const std::string &prefix, ADD_STAT add_stat,
536                        const void *c);
537
538    /**
539     * Resets couchstore stats
540     */
541    void resetStats() {
542        st.reset();
543    }
544
545    static int recordDbDump(Db *db, DocInfo *docinfo, void *ctx);
546    static int recordDbStat(Db *db, DocInfo *docinfo, void *ctx);
547    static int getMultiCb(Db *db, DocInfo *docinfo, void *ctx);
548    static void readVBState(Db *db, uint16_t vbId, vbucket_state &vbState);
549
550    couchstore_error_t fetchDoc(Db *db, DocInfo *docinfo,
551                                GetValue &docValue, uint16_t vbId,
552                                bool metaOnly, bool fetchDelete = false);
553    ENGINE_ERROR_CODE couchErr2EngineErr(couchstore_error_t errCode);
554
555    CouchKVStoreStats &getCKVStoreStat(void) { return st; }
556
557   /**
558     * Get all_docs API, to return the list of all keys in the store
559     */
560    ENGINE_ERROR_CODE getAllKeys(uint16_t vbid, std::string &start_key,
561                                 uint32_t count, AllKeysCB *cb);
562
563protected:
564    void loadDB(shared_ptr<Callback<GetValue> > cb,
565                shared_ptr<Callback<CacheLookup> > cl,
566                shared_ptr<Callback<SeqnoRange> > sr,
567                bool keysOnly, uint16_t vbid,
568                uint64_t startSeqno,
569                couchstore_docinfos_options options=COUCHSTORE_NO_OPTIONS);
570    bool setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
571                         uint32_t vb_change_type, Callback<kvstats_ctx> *cb,
572                         bool notify = true);
573    bool resetVBucket(uint16_t vbucketId, vbucket_state &vbstate) {
574        cachedDocCount[vbucketId] = 0;
575        return setVBucketState(vbucketId, vbstate, VB_STATE_CHANGED, NULL);
576    }
577
578    template <typename T>
579    void addStat(const std::string &prefix, const char *nm, T &val,
580                 ADD_STAT add_stat, const void *c);
581
582private:
583    /**
584     * Notify the result of Compaction to Mccouch
585     *
586     * @param vbid   - the vbucket id of the bucket where compaction was done
587     * @param rev    - the new file revision of the vbucket
588     * @param result - the result of the compaction attempt
589     * @param header_pos - new header position of the file
590     * @return true if mccouch was notified successfully, false otherwise
591     */
592    bool notifyCompaction(const uint16_t vbid, uint64_t new_rev,
593                          uint32_t result, uint64_t header_pos);
594
595    void operator=(const CouchKVStore &from);
596
597    void open();
598    void close();
599    bool commit2couchstore(Callback<kvstats_ctx> *cb);
600
601    uint64_t checkNewRevNum(std::string &dbname, bool newFile = false);
602    void populateFileNameMap(std::vector<std::string> &filenames);
603    void remVBucketFromDbFileMap(uint16_t vbucketId);
604    void updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev);
605    couchstore_error_t openDB(uint16_t vbucketId, uint64_t fileRev, Db **db,
606                              uint64_t options, uint64_t *newFileRev = NULL);
607    couchstore_error_t openDB_retry(std::string &dbfile, uint64_t options,
608                                    const couch_file_ops *ops,
609                                    Db **db, uint64_t *newFileRev);
610    couchstore_error_t saveDocs(uint16_t vbid, uint64_t rev, Doc **docs,
611                                DocInfo **docinfos, size_t docCount,
612                                kvstats_ctx &kvctx);
613    void commitCallback(std::vector<CouchRequest *> &committedReqs,
614                        kvstats_ctx &kvctx,
615                        couchstore_error_t errCode);
616    couchstore_error_t saveVBState(Db *db, vbucket_state &vbState);
617    void setDocsCommitted(uint16_t docs);
618    void closeDatabaseHandle(Db *db);
619
620    /**
621     * Remove compact file
622     *
623     * @param dbname
624     * @param vbucket id
625     * @param current db rev number
626     */
627    void removeCompactFile(const std::string &dbname, uint16_t vbid,
628                           uint64_t currentRev);
629
630    void removeCompactFile(const std::string &filename);
631
632    EPStats &epStats;
633    Configuration &configuration;
634    const std::string dbname;
635    CouchNotifier *couchNotifier;
636    std::vector<uint64_t>dbFileRevMap;
637    uint16_t numDbFiles;
638    std::vector<CouchRequest *> pendingReqsQ;
639    bool intransaction;
640    bool dbFileRevMapPopulated;
641
642    /* all stats */
643    CouchKVStoreStats   st;
644    couch_file_ops statCollectingFileOps;
645    /* vbucket state cache*/
646    vbucket_map_t cachedVBStates;
647    /* deleted docs in each file*/
648    std::map<uint16_t, size_t> cachedDeleteCount;
649    /* non-deleted docs in each file */
650    unordered_map<uint16_t, size_t> cachedDocCount;
651
652};
653
654#endif  // SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_
655