1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2012 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_ 19 #define SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_ 1 20 21 #include "config.h" 22 #include "libcouchstore/couch_db.h" 23 24 #include <map> 25 #include <string> 26 #include <vector> 27 28 #include "configuration.h" 29 #include "couch-kvstore/couch-fs-stats.h" 30 #include "histo.h" 31 #include "item.h" 32 #include "kvstore.h" 33 #include "stats.h" 34 #include "tasks.h" 35 #include "atomicqueue.h" 36 37 #define COUCHSTORE_NO_OPTIONS 0 38 39 /** 40 * Stats and timings for couchKVStore 41 */ 42 class CouchKVStoreStats { 43 44 public: 45 /** 46 * Default constructor 47 */ CouchKVStoreStats()48 CouchKVStoreStats() : 49 docsCommitted(0), numOpen(0), numClose(0), 50 numLoadedVb(0), numGetFailure(0), numSetFailure(0), 51 numDelFailure(0), numOpenFailure(0), numVbSetFailure(0), 52 readSizeHisto(ExponentialGenerator<size_t>(1, 2), 25), 53 writeSizeHisto(ExponentialGenerator<size_t>(1, 2), 25) { 54 } 55 reset()56 void reset() { 57 docsCommitted.store(0); 58 numOpen.store(0); 59 numClose.store(0); 60 numLoadedVb.store(0); 61 numGetFailure.store(0); 62 numSetFailure.store(0); 63 numDelFailure.store(0); 64 numOpenFailure.store(0); 65 numVbSetFailure.store(0); 66 67 readTimeHisto.reset(); 68 readSizeHisto.reset(); 69 writeTimeHisto.reset(); 70 writeSizeHisto.reset(); 71 delTimeHisto.reset(); 72 compactHisto.reset(); 73 commitHisto.reset(); 74 saveDocsHisto.reset(); 75 batchSize.reset(); 76 fsStats.reset(); 77 } 78 79 // the number of docs committed 80 AtomicValue<size_t> docsCommitted; 81 // the number of open() calls 82 AtomicValue<size_t> numOpen; 83 // the number of close() calls 84 AtomicValue<size_t> numClose; 85 // the number of vbuckets loaded 86 AtomicValue<size_t> numLoadedVb; 87 88 //stats tracking failures 89 AtomicValue<size_t> numGetFailure; 90 AtomicValue<size_t> numSetFailure; 91 AtomicValue<size_t> numDelFailure; 92 AtomicValue<size_t> numOpenFailure; 93 AtomicValue<size_t> numVbSetFailure; 94 95 /* for flush and vb delete, no error handling in CouchKVStore, such 96 * failure should be tracked in MC-engine */ 97 98 // How long it takes us to complete a read 99 Histogram<hrtime_t> readTimeHisto; 100 // How big are our reads? 101 Histogram<size_t> readSizeHisto; 102 // How long it takes us to complete a write 103 Histogram<hrtime_t> writeTimeHisto; 104 // How big are our writes? 105 Histogram<size_t> writeSizeHisto; 106 // Time spent in delete() calls. 107 Histogram<hrtime_t> delTimeHisto; 108 // Time spent in couchstore commit 109 Histogram<hrtime_t> commitHisto; 110 // Time spent in couchstore compaction 111 Histogram<hrtime_t> compactHisto; 112 // Time spent in couchstore save documents 113 Histogram<hrtime_t> saveDocsHisto; 114 // Batch size of saveDocs calls 115 Histogram<size_t> batchSize; 116 117 // Stats from the underlying OS file operations done by couchstore. 118 CouchstoreStats fsStats; 119 }; 120 121 class EventuallyPersistentEngine; 122 class EPStats; 123 124 typedef union { 125 Callback <mutation_result> *setCb; 126 Callback <int> *delCb; 127 } CouchRequestCallback; 128 129 // Additional 2 Bytes included: 1 for flex_meta_code and the other for datatype field 130 const size_t COUCHSTORE_METADATA_SIZE(2 * sizeof(uint32_t) + sizeof(uint64_t) + 131 FLEX_DATA_OFFSET + EXT_META_LEN); 132 133 /** 134 * Class representing a document to be persisted in couchstore. 135 */ 136 class CouchRequest 137 { 138 public: 139 /** 140 * Constructor 141 * 142 * @param it Item instance to be persisted 143 * @param rev vbucket database revision number 144 * @param cb persistence callback 145 * @param del flag indicating if it is an item deletion or not 146 */ 147 CouchRequest(const Item &it, uint64_t rev, CouchRequestCallback &cb, bool del); 148 149 /** 150 * Get the vbucket id of a document to be persisted 151 * 152 * @return vbucket id of a document 153 */ getVBucketId(void)154 uint16_t getVBucketId(void) { 155 return vbucketId; 156 } 157 158 /** 159 * Get the revision number of the vbucket database file 160 * where the document is persisted 161 * 162 * @return revision number of the corresponding vbucket database file 163 */ getRevNum(void)164 uint64_t getRevNum(void) { 165 return fileRevNum; 166 } 167 168 /** 169 * Get the couchstore Doc instance of a document to be persisted 170 * 171 * @return pointer to the couchstore Doc instance of a document 172 */ getDbDoc(void)173 Doc *getDbDoc(void) { 174 if (deleteItem) { 175 return NULL; 176 } else { 177 return &dbDoc; 178 } 179 } 180 181 /** 182 * Get the couchstore DocInfo instance of a document to be persisted 183 * 184 * @return pointer to the couchstore DocInfo instance of a document 185 */ getDbDocInfo(void)186 DocInfo *getDbDocInfo(void) { 187 return &dbDocInfo; 188 } 189 190 /** 191 * Get the callback instance for SET 192 * 193 * @return callback instance for SET 194 */ getSetCallback(void)195 Callback<mutation_result> *getSetCallback(void) { 196 return callback.setCb; 197 } 198 199 /** 200 * Get the callback instance for DELETE 201 * 202 * @return callback instance for DELETE 203 */ getDelCallback(void)204 Callback<int> *getDelCallback(void) { 205 return callback.delCb; 206 } 207 208 /** 209 * Get the time in ns elapsed since the creation of this instance 210 * 211 * @return time in ns elapsed since the creation of this instance 212 */ getDelta()213 hrtime_t getDelta() { 214 return (gethrtime() - start) / 1000; 215 } 216 217 /** 218 * Get the length of a document body to be persisted 219 * 220 * @return length of a document body 221 */ getNBytes()222 size_t getNBytes() { 223 return dbDocInfo.rev_meta.size + dbDocInfo.size; 224 } 225 226 /** 227 * Return true if the document to be persisted is for DELETE 228 * 229 * @return true if the document to be persisted is for DELETE 230 */ isDelete()231 bool isDelete() { 232 return deleteItem; 233 }; 234 235 /** 236 * Get the key of a document to be persisted 237 * 238 * @return key of a document to be persisted 239 */ getKey(void) const240 const std::string& getKey(void) const { 241 return key; 242 } 243 244 private : 245 value_t value; 246 uint8_t meta[COUCHSTORE_METADATA_SIZE]; 247 uint16_t vbucketId; 248 uint64_t fileRevNum; 249 std::string key; 250 Doc dbDoc; 251 DocInfo dbDocInfo; 252 bool deleteItem; 253 CouchRequestCallback callback; 254 255 hrtime_t start; 256 }; 257 258 /** 259 * KVStore with couchstore as the underlying storage system 260 */ 261 class CouchKVStore : public KVStore 262 { 263 public: 264 /** 265 * Constructor 266 * 267 * @param stats Engine stats 268 * @param config Configuration information 269 * @param read_only flag indicating if this kvstore instance is for read-only operations 270 */ 271 CouchKVStore(EPStats &stats, Configuration &config, bool read_only = false); 272 273 /** 274 * Copy constructor 275 * 276 * @param from the source kvstore instance 277 */ 278 CouchKVStore(const CouchKVStore &from); 279 280 /** 281 * Deconstructor 282 */ 283 ~CouchKVStore(); 284 285 /** 286 * Reset database to a clean state. 287 */ 288 void reset(uint16_t vbucketId); 289 290 /** 291 * Begin a transaction (if not already in one). 292 * 293 * @return true if the transaction is started successfully 294 */ begin(void)295 bool begin(void) { 296 cb_assert(!isReadOnly()); 297 intransaction = true; 298 return intransaction; 299 } 300 301 /** 302 * Commit a transaction (unless not currently in one). 303 * 304 * @return true if the commit is completed successfully. 305 */ 306 bool commit(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno, 307 uint64_t snapEndSeqno); 308 309 /** 310 * Rollback a transaction (unless not currently in one). 311 */ rollback(void)312 void rollback(void) { 313 cb_assert(!isReadOnly()); 314 if (intransaction) { 315 intransaction = false; 316 } 317 } 318 319 /** 320 * Query the properties of the underlying storage. 321 * 322 * @return properties of the underlying storage system 323 */ 324 StorageProperties getStorageProperties(void); 325 326 /** 327 * Insert or update a given document. 328 * 329 * @param itm instance representing the document to be inserted or updated 330 * @param cb callback instance for SET 331 */ 332 void set(const Item &itm, Callback<mutation_result> &cb); 333 334 /** 335 * Retrieve the document with a given key from the underlying storage system. 336 * 337 * @param key the key of a document to be retrieved 338 * @param rowid the sequence number of a document 339 * @param vb vbucket id of a document 340 * @param cb callback instance for GET 341 * @param fetchDelete True if we want to retrieve a deleted item if it not 342 * purged yet. 343 */ 344 void get(const std::string &key, uint64_t rowid, 345 uint16_t vb, Callback<GetValue> &cb, bool fetchDelete = false); 346 347 void getWithHeader(void *dbHandle, const std::string &key, 348 uint16_t vb, Callback<GetValue> &cb, 349 bool fetchDelete = false); 350 351 /** 352 * Retrieve the multiple documents from the underlying storage system at once. 353 * 354 * @param vb vbucket id of a document 355 * @param itms list of items whose documents are going to be retrieved 356 */ 357 void getMulti(uint16_t vb, vb_bgfetch_queue_t &itms); 358 359 /** 360 * Delete a given document from the underlying storage system. 361 * 362 * @param itm instance representing the document to be deleted 363 * @param cb callback instance for DELETE 364 */ 365 void del(const Item &itm, Callback<int> &cb); 366 367 /** 368 * Delete a given vbucket database instance from the underlying storage system 369 * 370 * @param vbucket vbucket id 371 * @param recreate flag to re-create vbucket after deletion 372 */ 373 void delVBucket(uint16_t vbucket); 374 375 /** 376 * Retrieve the list of persisted vbucket states 377 * 378 * @return vbucket state vector instance where key is vbucket id and 379 * value is vbucket state 380 */ 381 std::vector<vbucket_state *> listPersistedVbuckets(void); 382 383 /** 384 * Retrieve ths list of persisted engine stats 385 * 386 * @param stats map instance where the persisted engine stats will be added 387 */ 388 void getPersistedStats(std::map<std::string, std::string> &stats); 389 390 /** 391 * Persist a snapshot of the engine stats in the underlying storage. 392 * 393 * @param engine_stats map instance that contains all the engine stats 394 * @return true if the snapshot is done successfully 395 */ 396 bool snapshotStats(const std::map<std::string, std::string> &engine_stats); 397 398 /** 399 * Persist a snapshot of the vbucket states in the underlying storage system. 400 * 401 * @param vbucketId vbucket id 402 * @param vbstate vbucket state 403 * @param cb - call back for updating kv stats 404 * @return true if the snapshot is done successfully 405 */ 406 bool snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate, 407 Callback<kvstats_ctx> *cb); 408 409 /** 410 * Compact a vbucket in the underlying storage system. 411 * 412 * @param vbid - which vbucket needs to be compacted 413 * @param hook_ctx - details of vbucket which needs to be compacted 414 * @param cb - callback to help process newly expired items 415 * @param kvcb - callback to update kvstore stats 416 */ 417 void compactVBucket(const uint16_t vbid, compaction_ctx *cookie, 418 Callback<compaction_ctx> &cb, 419 Callback<kvstats_ctx> &kvcb); 420 421 /** 422 * Retrieve selected documents from the underlying storage system. 423 * 424 * @param vbids list of vbucket ids whose document keys are going to be retrieved 425 * @param cb callback instance to process each document retrieved 426 * @param cl callback to see if we need to read the value from disk 427 */ 428 void dump(std::vector<uint16_t> &vbids, shared_ptr<Callback<GetValue> > cb, 429 shared_ptr<Callback<CacheLookup> > cl); 430 431 /** 432 * Retrieve all the documents for a given vbucket from the storage system. 433 * 434 * @param vb vbucket id 435 * @param cb callback instance to process each document retrieved 436 * @param cl callback to see if we need to read the value from disk 437 * @param sr callback to notify the caller what the range of the backfill is 438 */ 439 void dump(uint16_t vb, uint64_t stSeqno, 440 shared_ptr<Callback<GetValue> > cb, 441 shared_ptr<Callback<CacheLookup> > cl, 442 shared_ptr<Callback<SeqnoRange> > sr); 443 444 /** 445 * Retrieve all the keys from the underlying storage system. 446 * 447 * @param vbids list of vbucket ids whose document keys are going to be retrieved 448 * @param cb callback instance to process each key retrieved 449 */ 450 void dumpKeys(std::vector<uint16_t> &vbids, shared_ptr<Callback<GetValue> > cb); 451 452 /** 453 * Retrieve the list of keys and their meta data for a given 454 * vbucket, which were deleted. 455 * @param vb vbucket id 456 * @param cb callback instance to process each key and its meta data 457 */ 458 void dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno, 459 shared_ptr<Callback<GetValue> > cb); 460 461 /** 462 * Does the underlying storage system support key-only retrieval operations? 463 * 464 * @return true if key-only retrieval is supported 465 */ isKeyDumpSupported()466 bool isKeyDumpSupported() { 467 return true; 468 } 469 470 /** 471 * Get the estimated number of items that are going to be loaded during warmup. 472 * 473 * @return the number of estimated items to be loaded during warmup 474 */ 475 size_t getEstimatedItemCount(std::vector<uint16_t> &vbs); 476 477 /** 478 * Get the number of deleted items that are persisted to a vbucket file 479 * 480 * @param vbid The vbucket if of the file to get the number of deletes for 481 */ 482 size_t getNumPersistedDeletes(uint16_t vbid); 483 484 /** 485 * Get the number of non-deleted items from a vbucket database file 486 * 487 * @param vbid The vbucket of the file to get the number of docs for 488 */ 489 size_t getNumItems(uint16_t vbid); 490 491 /** 492 * Get the number of non-deleted items from a vbucket database file 493 * 494 * @param vbid The vbucket of the file to get the number of docs for 495 * @param min_seq The sequence number to start the count from 496 * @param max_seq The sequence number to stop the count at 497 */ 498 size_t getNumItems(uint16_t vbid, uint64_t min_seq, uint64_t max_seq); 499 500 /** 501 * Do a rollback to the specified seqNo on the particular vbucket 502 * 503 * @param vbid The vbucket of the file that's to be rolled back 504 * @param rollbackSeqno The sequence number upto which the engine needs 505 * to be rolled back 506 * @param cb getvalue callback 507 */ 508 RollbackResult rollback(uint16_t vbid, uint64_t rollbackSeqno, 509 shared_ptr<RollbackCB> cb); 510 511 /** 512 * Perform the pre-optimizations before persisting dirty items 513 * 514 * @param items list of dirty items that can be pre-optimized 515 */ 516 void optimizeWrites(std::vector<queued_item> &items); 517 518 /** 519 * Perform pending tasks after persisting dirty items 520 */ 521 void pendingTasks(); 522 523 /** 524 * Add all the kvstore stats to the stat response 525 * 526 * @param prefix stat name prefix 527 * @param add_stat upstream function that allows us to add a stat to the response 528 * @param cookie upstream connection cookie 529 */ 530 void addStats(const std::string &prefix, ADD_STAT add_stat, const void *cookie); 531 532 /** 533 * Add all the kvstore timings stats to the stat response 534 * 535 * @param prefix stat name prefix 536 * @param add_stat upstream function that allows us to add a stat to the response 537 * @param cookie upstream connection cookie 538 */ 539 void addTimingStats(const std::string &prefix, ADD_STAT add_stat, 540 const void *c); 541 542 /** 543 * Resets couchstore stats 544 */ resetStats()545 void resetStats() { 546 st.reset(); 547 } 548 549 static int recordDbDump(Db *db, DocInfo *docinfo, void *ctx); 550 static int recordDbStat(Db *db, DocInfo *docinfo, void *ctx); 551 static int getMultiCb(Db *db, DocInfo *docinfo, void *ctx); 552 void readVBState(Db *db, uint16_t vbId); 553 554 couchstore_error_t fetchDoc(Db *db, DocInfo *docinfo, 555 GetValue &docValue, uint16_t vbId, 556 bool metaOnly, bool fetchDelete = false); 557 ENGINE_ERROR_CODE couchErr2EngineErr(couchstore_error_t errCode); 558 getCKVStoreStat(void)559 CouchKVStoreStats &getCKVStoreStat(void) { return st; } 560 561 uint64_t getLastPersistedSeqno(uint16_t vbid); 562 563 /** 564 * Get all_docs API, to return the list of all keys in the store 565 */ 566 ENGINE_ERROR_CODE getAllKeys(uint16_t vbid, std::string &start_key, 567 uint32_t count, AllKeysCB *cb); 568 569 protected: 570 void loadDB(shared_ptr<Callback<GetValue> > cb, 571 shared_ptr<Callback<CacheLookup> > cl, 572 shared_ptr<Callback<SeqnoRange> > sr, 573 bool keysOnly, uint16_t vbid, 574 uint64_t startSeqno, 575 couchstore_docinfos_options options=COUCHSTORE_NO_OPTIONS); 576 bool setVBucketState(uint16_t vbucketId, vbucket_state &vbstate, 577 Callback<kvstats_ctx> *cb); resetVBucket(uint16_t vbucketId, vbucket_state &vbstate)578 bool resetVBucket(uint16_t vbucketId, vbucket_state &vbstate) { 579 cachedDocCount[vbucketId] = 0; 580 return setVBucketState(vbucketId, vbstate, NULL); 581 } 582 583 template <typename T> 584 void addStat(const std::string &prefix, const char *nm, T &val, 585 ADD_STAT add_stat, const void *c); 586 587 private: 588 589 void operator=(const CouchKVStore &from); 590 591 void open(); 592 void close(); 593 bool commit2couchstore(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno, 594 uint64_t snapEndSeqno); 595 596 uint64_t checkNewRevNum(std::string &dbname, bool newFile = false); 597 void populateFileNameMap(std::vector<std::string> &filenames, 598 std::vector<uint16_t> *vbids); 599 void remVBucketFromDbFileMap(uint16_t vbucketId); 600 void updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev); 601 couchstore_error_t openDB(uint16_t vbucketId, uint64_t fileRev, Db **db, 602 uint64_t options, uint64_t *newFileRev = NULL); 603 couchstore_error_t openDB_retry(std::string &dbfile, uint64_t options, 604 const couch_file_ops *ops, 605 Db **db, uint64_t *newFileRev); 606 couchstore_error_t saveDocs(uint16_t vbid, uint64_t rev, Doc **docs, 607 DocInfo **docinfos, size_t docCount, 608 kvstats_ctx &kvctx, 609 uint64_t snapStartSeqno, 610 uint64_t snapEndSeqno); 611 void commitCallback(std::vector<CouchRequest *> &committedReqs, 612 kvstats_ctx &kvctx, 613 couchstore_error_t errCode); 614 couchstore_error_t saveVBState(Db *db, vbucket_state &vbState); 615 void setDocsCommitted(uint16_t docs); 616 void closeDatabaseHandle(Db *db); 617 618 /** 619 * Unlink selected couch file, which will be removed by the OS, 620 * once all its references close. 621 */ 622 void unlinkCouchFile(uint16_t vbucket, uint64_t fRev); 623 624 /** 625 * Remove compact file 626 * 627 * @param dbname 628 * @param vbucket id 629 * @param current db rev number 630 */ 631 void removeCompactFile(const std::string &dbname, uint16_t vbid, 632 uint64_t currentRev); 633 634 void removeCompactFile(const std::string &filename); 635 636 EPStats &epStats; 637 Configuration &configuration; 638 const std::string dbname; 639 std::vector<uint64_t>dbFileRevMap; 640 uint16_t numDbFiles; 641 std::vector<CouchRequest *> pendingReqsQ; 642 bool intransaction; 643 bool dbFileRevMapPopulated; 644 645 /* all stats */ 646 CouchKVStoreStats st; 647 couch_file_ops statCollectingFileOps; 648 /* vbucket state cache*/ 649 std::vector<vbucket_state *> cachedVBStates; 650 /* deleted docs in each file*/ 651 unordered_map<uint16_t, size_t> cachedDeleteCount; 652 /* non-deleted docs in each file */ 653 unordered_map<uint16_t, size_t> cachedDocCount; 654 /* pending file deletions */ 655 AtomicQueue<std::string> pendingFileDeletions; 656 }; 657 658 #endif // SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_ 659