1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_
19 #define SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_ 1
20 
21 #include "config.h"
22 #include "libcouchstore/couch_db.h"
23 
24 #include <map>
25 #include <string>
26 #include <vector>
27 
28 #include "configuration.h"
29 #include "couch-kvstore/couch-fs-stats.h"
30 #include "histo.h"
31 #include "item.h"
32 #include "kvstore.h"
33 #include "stats.h"
34 #include "tasks.h"
35 #include "atomicqueue.h"
36 
37 #define COUCHSTORE_NO_OPTIONS 0
38 
39 /**
40  * Stats and timings for couchKVStore
41  */
42 class CouchKVStoreStats {
43 
44 public:
45     /**
46      * Default constructor
47      */
CouchKVStoreStats()48     CouchKVStoreStats() :
49       docsCommitted(0), numOpen(0), numClose(0),
50       numLoadedVb(0), numGetFailure(0), numSetFailure(0),
51       numDelFailure(0), numOpenFailure(0), numVbSetFailure(0),
52       readSizeHisto(ExponentialGenerator<size_t>(1, 2), 25),
53       writeSizeHisto(ExponentialGenerator<size_t>(1, 2), 25) {
54     }
55 
reset()56     void reset() {
57         docsCommitted.store(0);
58         numOpen.store(0);
59         numClose.store(0);
60         numLoadedVb.store(0);
61         numGetFailure.store(0);
62         numSetFailure.store(0);
63         numDelFailure.store(0);
64         numOpenFailure.store(0);
65         numVbSetFailure.store(0);
66 
67         readTimeHisto.reset();
68         readSizeHisto.reset();
69         writeTimeHisto.reset();
70         writeSizeHisto.reset();
71         delTimeHisto.reset();
72         compactHisto.reset();
73         commitHisto.reset();
74         saveDocsHisto.reset();
75         batchSize.reset();
76         fsStats.reset();
77     }
78 
79     // the number of docs committed
80     AtomicValue<size_t> docsCommitted;
81     // the number of open() calls
82     AtomicValue<size_t> numOpen;
83     // the number of close() calls
84     AtomicValue<size_t> numClose;
85     // the number of vbuckets loaded
86     AtomicValue<size_t> numLoadedVb;
87 
88     //stats tracking failures
89     AtomicValue<size_t> numGetFailure;
90     AtomicValue<size_t> numSetFailure;
91     AtomicValue<size_t> numDelFailure;
92     AtomicValue<size_t> numOpenFailure;
93     AtomicValue<size_t> numVbSetFailure;
94 
95     /* for flush and vb delete, no error handling in CouchKVStore, such
96      * failure should be tracked in MC-engine  */
97 
98     // How long it takes us to complete a read
99     Histogram<hrtime_t> readTimeHisto;
100     // How big are our reads?
101     Histogram<size_t> readSizeHisto;
102     // How long it takes us to complete a write
103     Histogram<hrtime_t> writeTimeHisto;
104     // How big are our writes?
105     Histogram<size_t> writeSizeHisto;
106     // Time spent in delete() calls.
107     Histogram<hrtime_t> delTimeHisto;
108     // Time spent in couchstore commit
109     Histogram<hrtime_t> commitHisto;
110     // Time spent in couchstore compaction
111     Histogram<hrtime_t> compactHisto;
112     // Time spent in couchstore save documents
113     Histogram<hrtime_t> saveDocsHisto;
114     // Batch size of saveDocs calls
115     Histogram<size_t> batchSize;
116 
117     // Stats from the underlying OS file operations done by couchstore.
118     CouchstoreStats fsStats;
119 };
120 
121 class EventuallyPersistentEngine;
122 class EPStats;
123 
124 typedef union {
125     Callback <mutation_result> *setCb;
126     Callback <int> *delCb;
127 } CouchRequestCallback;
128 
129 // Additional 2 Bytes included: 1 for flex_meta_code and the other for datatype field
130 const size_t COUCHSTORE_METADATA_SIZE(2 * sizeof(uint32_t) + sizeof(uint64_t) +
131                                       FLEX_DATA_OFFSET + EXT_META_LEN);
132 
133 /**
134  * Class representing a document to be persisted in couchstore.
135  */
136 class CouchRequest
137 {
138 public:
139     /**
140      * Constructor
141      *
142      * @param it Item instance to be persisted
143      * @param rev vbucket database revision number
144      * @param cb persistence callback
145      * @param del flag indicating if it is an item deletion or not
146      */
147     CouchRequest(const Item &it, uint64_t rev, CouchRequestCallback &cb, bool del);
148 
149     /**
150      * Get the vbucket id of a document to be persisted
151      *
152      * @return vbucket id of a document
153      */
getVBucketId(void)154     uint16_t getVBucketId(void) {
155         return vbucketId;
156     }
157 
158     /**
159      * Get the revision number of the vbucket database file
160      * where the document is persisted
161      *
162      * @return revision number of the corresponding vbucket database file
163      */
getRevNum(void)164     uint64_t getRevNum(void) {
165         return fileRevNum;
166     }
167 
168     /**
169      * Get the couchstore Doc instance of a document to be persisted
170      *
171      * @return pointer to the couchstore Doc instance of a document
172      */
getDbDoc(void)173     Doc *getDbDoc(void) {
174         if (deleteItem) {
175             return NULL;
176         } else {
177             return &dbDoc;
178         }
179     }
180 
181     /**
182      * Get the couchstore DocInfo instance of a document to be persisted
183      *
184      * @return pointer to the couchstore DocInfo instance of a document
185      */
getDbDocInfo(void)186     DocInfo *getDbDocInfo(void) {
187         return &dbDocInfo;
188     }
189 
190     /**
191      * Get the callback instance for SET
192      *
193      * @return callback instance for SET
194      */
getSetCallback(void)195     Callback<mutation_result> *getSetCallback(void) {
196         return callback.setCb;
197     }
198 
199     /**
200      * Get the callback instance for DELETE
201      *
202      * @return callback instance for DELETE
203      */
getDelCallback(void)204     Callback<int> *getDelCallback(void) {
205         return callback.delCb;
206     }
207 
208     /**
209      * Get the time in ns elapsed since the creation of this instance
210      *
211      * @return time in ns elapsed since the creation of this instance
212      */
getDelta()213     hrtime_t getDelta() {
214         return (gethrtime() - start) / 1000;
215     }
216 
217     /**
218      * Get the length of a document body to be persisted
219      *
220      * @return length of a document body
221      */
getNBytes()222     size_t getNBytes() {
223         return dbDocInfo.rev_meta.size + dbDocInfo.size;
224     }
225 
226     /**
227      * Return true if the document to be persisted is for DELETE
228      *
229      * @return true if the document to be persisted is for DELETE
230      */
isDelete()231     bool isDelete() {
232         return deleteItem;
233     };
234 
235     /**
236      * Get the key of a document to be persisted
237      *
238      * @return key of a document to be persisted
239      */
getKey(void) const240     const std::string& getKey(void) const {
241         return key;
242     }
243 
244 private :
245     value_t value;
246     uint8_t meta[COUCHSTORE_METADATA_SIZE];
247     uint16_t vbucketId;
248     uint64_t fileRevNum;
249     std::string key;
250     Doc dbDoc;
251     DocInfo dbDocInfo;
252     bool deleteItem;
253     CouchRequestCallback callback;
254 
255     hrtime_t start;
256 };
257 
258 /**
259  * KVStore with couchstore as the underlying storage system
260  */
261 class CouchKVStore : public KVStore
262 {
263 public:
264     /**
265      * Constructor
266      *
267      * @param stats     Engine stats
268      * @param config    Configuration information
269      * @param read_only flag indicating if this kvstore instance is for read-only operations
270      */
271     CouchKVStore(EPStats &stats, Configuration &config, bool read_only = false);
272 
273     /**
274      * Copy constructor
275      *
276      * @param from the source kvstore instance
277      */
278     CouchKVStore(const CouchKVStore &from);
279 
280     /**
281      * Deconstructor
282      */
283     ~CouchKVStore();
284 
285     /**
286      * Reset database to a clean state.
287      */
288     void reset(uint16_t vbucketId);
289 
290     /**
291      * Begin a transaction (if not already in one).
292      *
293      * @return true if the transaction is started successfully
294      */
begin(void)295     bool begin(void) {
296         cb_assert(!isReadOnly());
297         intransaction = true;
298         return intransaction;
299     }
300 
301     /**
302      * Commit a transaction (unless not currently in one).
303      *
304      * @return true if the commit is completed successfully.
305      */
306     bool commit(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
307                 uint64_t snapEndSeqno);
308 
309     /**
310      * Rollback a transaction (unless not currently in one).
311      */
rollback(void)312     void rollback(void) {
313         cb_assert(!isReadOnly());
314         if (intransaction) {
315             intransaction = false;
316         }
317     }
318 
319     /**
320      * Query the properties of the underlying storage.
321      *
322      * @return properties of the underlying storage system
323      */
324     StorageProperties getStorageProperties(void);
325 
326     /**
327      * Insert or update a given document.
328      *
329      * @param itm instance representing the document to be inserted or updated
330      * @param cb callback instance for SET
331      */
332     void set(const Item &itm, Callback<mutation_result> &cb);
333 
334     /**
335      * Retrieve the document with a given key from the underlying storage system.
336      *
337      * @param key the key of a document to be retrieved
338      * @param rowid the sequence number of a document
339      * @param vb vbucket id of a document
340      * @param cb callback instance for GET
341      * @param fetchDelete True if we want to retrieve a deleted item if it not
342      *        purged yet.
343      */
344     void get(const std::string &key, uint64_t rowid,
345              uint16_t vb, Callback<GetValue> &cb, bool fetchDelete = false);
346 
347     void getWithHeader(void *dbHandle, const std::string &key,
348                        uint16_t vb, Callback<GetValue> &cb,
349                        bool fetchDelete = false);
350 
351     /**
352      * Retrieve the multiple documents from the underlying storage system at once.
353      *
354      * @param vb vbucket id of a document
355      * @param itms list of items whose documents are going to be retrieved
356      */
357     void getMulti(uint16_t vb, vb_bgfetch_queue_t &itms);
358 
359     /**
360      * Delete a given document from the underlying storage system.
361      *
362      * @param itm instance representing the document to be deleted
363      * @param cb callback instance for DELETE
364      */
365     void del(const Item &itm, Callback<int> &cb);
366 
367     /**
368      * Delete a given vbucket database instance from the underlying storage system
369      *
370      * @param vbucket vbucket id
371      * @param recreate flag to re-create vbucket after deletion
372      */
373     void delVBucket(uint16_t vbucket);
374 
375     /**
376      * Retrieve the list of persisted vbucket states
377      *
378      * @return vbucket state vector instance where key is vbucket id and
379      * value is vbucket state
380      */
381    std::vector<vbucket_state *>  listPersistedVbuckets(void);
382 
383     /**
384      * Retrieve ths list of persisted engine stats
385      *
386      * @param stats map instance where the persisted engine stats will be added
387      */
388     void getPersistedStats(std::map<std::string, std::string> &stats);
389 
390     /**
391      * Persist a snapshot of the engine stats in the underlying storage.
392      *
393      * @param engine_stats map instance that contains all the engine stats
394      * @return true if the snapshot is done successfully
395      */
396     bool snapshotStats(const std::map<std::string, std::string> &engine_stats);
397 
398     /**
399      * Persist a snapshot of the vbucket states in the underlying storage system.
400      *
401      * @param vbucketId vbucket id
402      * @param vbstate vbucket state
403      * @param cb - call back for updating kv stats
404      * @return true if the snapshot is done successfully
405      */
406     bool snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
407                          Callback<kvstats_ctx> *cb);
408 
409      /**
410      * Compact a vbucket in the underlying storage system.
411      *
412      * @param vbid   - which vbucket needs to be compacted
413      * @param hook_ctx - details of vbucket which needs to be compacted
414      * @param cb - callback to help process newly expired items
415      * @param kvcb - callback to update kvstore stats
416      */
417     void compactVBucket(const uint16_t vbid, compaction_ctx *cookie,
418                         Callback<compaction_ctx> &cb,
419                         Callback<kvstats_ctx> &kvcb);
420 
421     /**
422      * Retrieve selected documents from the underlying storage system.
423      *
424      * @param vbids list of vbucket ids whose document keys are going to be retrieved
425      * @param cb callback instance to process each document retrieved
426      * @param cl callback to see if we need to read the value from disk
427      */
428     void dump(std::vector<uint16_t> &vbids, shared_ptr<Callback<GetValue> > cb,
429               shared_ptr<Callback<CacheLookup> > cl);
430 
431     /**
432      * Retrieve all the documents for a given vbucket from the storage system.
433      *
434      * @param vb vbucket id
435      * @param cb callback instance to process each document retrieved
436      * @param cl callback to see if we need to read the value from disk
437      * @param sr callback to notify the caller what the range of the backfill is
438      */
439     void dump(uint16_t vb, uint64_t stSeqno,
440               shared_ptr<Callback<GetValue> > cb,
441               shared_ptr<Callback<CacheLookup> > cl,
442               shared_ptr<Callback<SeqnoRange> > sr);
443 
444     /**
445      * Retrieve all the keys from the underlying storage system.
446      *
447      * @param vbids list of vbucket ids whose document keys are going to be retrieved
448      * @param cb callback instance to process each key retrieved
449      */
450     void dumpKeys(std::vector<uint16_t> &vbids,  shared_ptr<Callback<GetValue> > cb);
451 
452     /**
453      * Retrieve the list of keys and their meta data for a given
454      * vbucket, which were deleted.
455      * @param vb vbucket id
456      * @param cb callback instance to process each key and its meta data
457      */
458     void dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
459                      shared_ptr<Callback<GetValue> > cb);
460 
461     /**
462      * Does the underlying storage system support key-only retrieval operations?
463      *
464      * @return true if key-only retrieval is supported
465      */
isKeyDumpSupported()466     bool isKeyDumpSupported() {
467         return true;
468     }
469 
470     /**
471      * Get the estimated number of items that are going to be loaded during warmup.
472      *
473      * @return the number of estimated items to be loaded during warmup
474      */
475     size_t getEstimatedItemCount(std::vector<uint16_t> &vbs);
476 
477     /**
478      * Get the number of deleted items that are persisted to a vbucket file
479      *
480      * @param vbid The vbucket if of the file to get the number of deletes for
481      */
482     size_t getNumPersistedDeletes(uint16_t vbid);
483 
484     /**
485      * Get the number of non-deleted items from a vbucket database file
486      *
487      * @param vbid The vbucket of the file to get the number of docs for
488      */
489     size_t getNumItems(uint16_t vbid);
490 
491     /**
492      * Get the number of non-deleted items from a vbucket database file
493      *
494      * @param vbid The vbucket of the file to get the number of docs for
495      * @param min_seq The sequence number to start the count from
496      * @param max_seq The sequence number to stop the count at
497      */
498     size_t getNumItems(uint16_t vbid, uint64_t min_seq, uint64_t max_seq);
499 
500     /**
501      * Do a rollback to the specified seqNo on the particular vbucket
502      *
503      * @param vbid The vbucket of the file that's to be rolled back
504      * @param rollbackSeqno The sequence number upto which the engine needs
505      * to be rolled back
506      * @param cb getvalue callback
507      */
508     RollbackResult rollback(uint16_t vbid, uint64_t rollbackSeqno,
509                             shared_ptr<RollbackCB> cb);
510 
511     /**
512      * Perform the pre-optimizations before persisting dirty items
513      *
514      * @param items list of dirty items that can be pre-optimized
515      */
516     void optimizeWrites(std::vector<queued_item> &items);
517 
518     /**
519      * Perform pending tasks after persisting dirty items
520      */
521     void pendingTasks();
522 
523     /**
524      * Add all the kvstore stats to the stat response
525      *
526      * @param prefix stat name prefix
527      * @param add_stat upstream function that allows us to add a stat to the response
528      * @param cookie upstream connection cookie
529      */
530     void addStats(const std::string &prefix, ADD_STAT add_stat, const void *cookie);
531 
532     /**
533      * Add all the kvstore timings stats to the stat response
534      *
535      * @param prefix stat name prefix
536      * @param add_stat upstream function that allows us to add a stat to the response
537      * @param cookie upstream connection cookie
538      */
539     void addTimingStats(const std::string &prefix, ADD_STAT add_stat,
540                         const void *c);
541 
542     /**
543      * Resets couchstore stats
544      */
resetStats()545     void resetStats() {
546         st.reset();
547     }
548 
549     static int recordDbDump(Db *db, DocInfo *docinfo, void *ctx);
550     static int recordDbStat(Db *db, DocInfo *docinfo, void *ctx);
551     static int getMultiCb(Db *db, DocInfo *docinfo, void *ctx);
552     void readVBState(Db *db, uint16_t vbId);
553 
554     couchstore_error_t fetchDoc(Db *db, DocInfo *docinfo,
555                                 GetValue &docValue, uint16_t vbId,
556                                 bool metaOnly, bool fetchDelete = false);
557     ENGINE_ERROR_CODE couchErr2EngineErr(couchstore_error_t errCode);
558 
getCKVStoreStat(void)559     CouchKVStoreStats &getCKVStoreStat(void) { return st; }
560 
561     uint64_t getLastPersistedSeqno(uint16_t vbid);
562 
563     /**
564      * Get all_docs API, to return the list of all keys in the store
565      */
566     ENGINE_ERROR_CODE getAllKeys(uint16_t vbid, std::string &start_key,
567                                  uint32_t count, AllKeysCB *cb);
568 
569 protected:
570     void loadDB(shared_ptr<Callback<GetValue> > cb,
571                 shared_ptr<Callback<CacheLookup> > cl,
572                 shared_ptr<Callback<SeqnoRange> > sr,
573                 bool keysOnly, uint16_t vbid,
574                 uint64_t startSeqno,
575                 couchstore_docinfos_options options=COUCHSTORE_NO_OPTIONS);
576     bool setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
577                          Callback<kvstats_ctx> *cb);
resetVBucket(uint16_t vbucketId, vbucket_state &vbstate)578     bool resetVBucket(uint16_t vbucketId, vbucket_state &vbstate) {
579         cachedDocCount[vbucketId] = 0;
580         return setVBucketState(vbucketId, vbstate, NULL);
581     }
582 
583     template <typename T>
584     void addStat(const std::string &prefix, const char *nm, T &val,
585                  ADD_STAT add_stat, const void *c);
586 
587 private:
588 
589     void operator=(const CouchKVStore &from);
590 
591     void open();
592     void close();
593     bool commit2couchstore(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
594                            uint64_t snapEndSeqno);
595 
596     uint64_t checkNewRevNum(std::string &dbname, bool newFile = false);
597     void populateFileNameMap(std::vector<std::string> &filenames,
598                              std::vector<uint16_t> *vbids);
599     void remVBucketFromDbFileMap(uint16_t vbucketId);
600     void updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev);
601     couchstore_error_t openDB(uint16_t vbucketId, uint64_t fileRev, Db **db,
602                               uint64_t options, uint64_t *newFileRev = NULL);
603     couchstore_error_t openDB_retry(std::string &dbfile, uint64_t options,
604                                     const couch_file_ops *ops,
605                                     Db **db, uint64_t *newFileRev);
606     couchstore_error_t saveDocs(uint16_t vbid, uint64_t rev, Doc **docs,
607                                 DocInfo **docinfos, size_t docCount,
608                                 kvstats_ctx &kvctx,
609                                 uint64_t snapStartSeqno,
610                                 uint64_t snapEndSeqno);
611     void commitCallback(std::vector<CouchRequest *> &committedReqs,
612                         kvstats_ctx &kvctx,
613                         couchstore_error_t errCode);
614     couchstore_error_t saveVBState(Db *db, vbucket_state &vbState);
615     void setDocsCommitted(uint16_t docs);
616     void closeDatabaseHandle(Db *db);
617 
618     /**
619      * Unlink selected couch file, which will be removed by the OS,
620      * once all its references close.
621      */
622     void unlinkCouchFile(uint16_t vbucket, uint64_t fRev);
623 
624     /**
625      * Remove compact file
626      *
627      * @param dbname
628      * @param vbucket id
629      * @param current db rev number
630      */
631     void removeCompactFile(const std::string &dbname, uint16_t vbid,
632                            uint64_t currentRev);
633 
634     void removeCompactFile(const std::string &filename);
635 
636     EPStats &epStats;
637     Configuration &configuration;
638     const std::string dbname;
639     std::vector<uint64_t>dbFileRevMap;
640     uint16_t numDbFiles;
641     std::vector<CouchRequest *> pendingReqsQ;
642     bool intransaction;
643     bool dbFileRevMapPopulated;
644 
645     /* all stats */
646     CouchKVStoreStats   st;
647     couch_file_ops statCollectingFileOps;
648     /* vbucket state cache*/
649     std::vector<vbucket_state *> cachedVBStates;
650     /* deleted docs in each file*/
651     unordered_map<uint16_t, size_t> cachedDeleteCount;
652     /* non-deleted docs in each file */
653     unordered_map<uint16_t, size_t> cachedDocCount;
654     /* pending file deletions */
655     AtomicQueue<std::string> pendingFileDeletions;
656 };
657 
658 #endif  // SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_
659