1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2018 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #pragma once 19 20 #include "../objectregistry.h" 21 #include "collections/collection_persisted_stats.h" 22 #include "kvstore.h" 23 #include "kvstore_priv.h" 24 #include "libmagma/magma.h" 25 #include "rollback_result.h" 26 #include "vbucket_bgfetch_item.h" 27 28 #include <platform/dirutils.h> 29 #include <platform/non_negative_counter.h> 30 31 #include <map> 32 #include <shared_mutex> 33 #include <string> 34 #include <vector> 35 36 namespace magma { 37 class Slice; 38 class Status; 39 } // namespace magma 40 41 class MagmaRequest; 42 class MagmaKVStoreConfig; 43 class MagmaCompactionCB; 44 struct kvstats_ctx; 45 struct vbucket_state; 46 47 /** 48 * Magma info is used to mimic info that is stored internally in couchstore. 49 * This info is stored with vbucket_state every time vbucket_state is stored. 50 */ 51 class MagmaInfo { 52 public: 53 MagmaInfo() = default; 54 55 // Note: we don't want to reset the kvstoreRev because it tracks 56 // the kvstore revision which is a monotonically increasing 57 // value each time the kvstore is created. reset()58 void reset() { 59 docCount = 0; 60 persistedDeletes = 0; 61 } 62 cb::NonNegativeCounter<uint64_t> docCount{0}; 63 cb::NonNegativeCounter<uint64_t> persistedDeletes{0}; 64 Monotonic<uint64_t> kvstoreRev{1}; 65 }; 66 67 /** 68 * A persistence store based on magma. 69 */ 70 class MagmaKVStore : public KVStore { 71 public: 72 MagmaKVStore(MagmaKVStoreConfig& config); 73 74 ~MagmaKVStore(); 75 76 void operator=(MagmaKVStore& from) = delete; 77 78 /** 79 * Reset database to a clean state. 80 */ 81 void reset(Vbid vbid) override; 82 83 /** 84 * Begin a transaction (if not already in one). 85 */ 86 bool begin(std::unique_ptr<TransactionContext> txCtx) override; 87 88 /** 89 * Commit a transaction (unless not currently in one). 90 * 91 * Returns false if the commit fails. 92 */ 93 bool commit(Collections::VB::Flush& collectionsFlush) override; 94 95 /** 96 * Rollback a transaction (unless not currently in one). 97 */ 98 void rollback() override; 99 100 StorageProperties getStorageProperties() override; 101 102 /** 103 * Adds a request to a queue for batch processing at commit() 104 */ 105 void set(queued_item itm) override; 106 107 GetValue get(const DiskDocKey& key, Vbid vb) override; 108 109 GetValue getWithHeader(void* dbHandle, 110 const DiskDocKey& key, 111 Vbid vb, 112 GetMetaOnly getMetaOnly) override; 113 114 void getMulti(Vbid vb, vb_bgfetch_queue_t& itms) override; 115 116 void getRange(Vbid vb, 117 const DiskDocKey& startKey, 118 const DiskDocKey& endKey, 119 const GetRangeCb& cb) override; 120 121 void del(queued_item itm) override; 122 123 void delVBucket(Vbid vbucket, uint64_t fileRev) override; 124 125 std::vector<vbucket_state*> listPersistedVbuckets(void) override; 126 127 /** 128 * Take a snapshot of the stats in the main DB. 129 */ snapshotStats(const std::map<std::string, std::string>& m)130 bool snapshotStats(const std::map<std::string, std::string>& m) { 131 // TODO 132 return false; 133 } 134 135 /** 136 * Take a snapshot of the vbucket states in the main DB. 137 */ 138 bool snapshotVBucket(Vbid vbucketId, 139 const vbucket_state& vbstate, 140 VBStatePersist options) override; 141 142 // Compaction in magma is asynchronous. Its triggered by 3 conditions: 143 // - Level compaction 144 // - Expiry compaction 145 // In magma, a histogram of when items will expire is maintained. 146 // Periodically, magma will trigger a compaction to visit those 147 // sstables which have expired items to have them removed. 148 // - Dropped collections removal 149 // At the end of the flusher loop in kv_engine, a call to magma will be 150 // made to trigger asynchronous PurgeRange scans to purge the store 151 // of any dropped collections. For each collectionID, 2 scans will be 152 // triggered, [Default+CollectionID] and 153 // [DurabilityPrepare+CollectionID]. 154 // While a purge scan will identify the sstables containing the scan 155 // key ex.[Default+CollectionID], all items for that CollectionID will 156 // be removed ie. both Default & DurabilityPrepare. This guarantees that 157 // the data from each sstable containing the collectionID is visited 158 // once. 159 // Also during this call from the bg flusher, any completed compaction 160 // data will be picked up. The max_purged_seq will update the vbstate 161 // and any dropped collections will trigger 2 scans (same as above) to 162 // determine if a collection has been removed and if so, the collection 163 // manifest is updated. 164 // 165 // Regardless of which type of compaction is running, all compactions 166 // required a kv_engine callback to pick up compaction_ctx and all 167 // compactions can remove expired items or dropped collection items. 168 // 169 // Synchronous compaction is supported for testing. Normally, kv_engine 170 // with magma store should never call compactDB. But for testing, we 171 // need to support a synchronous call. When compactDB is called, it will 172 // save the compaction_ctx passed in to compactDB and will use it 173 // to perform compaction. 174 // 175 // For DP, we will support only Level and Sychronous compaction. This 176 // alleviates the need for a callback to pick up compaction_ctx. 177 bool compactDB(compaction_ctx*) override; 178 179 Vbid getDBFileId(const cb::mcbp::Request&) override; 180 181 size_t getNumPersistedDeletes(Vbid vbid) override { 182 // TODO 183 return 0; 184 } 185 186 DBFileInfo getDbFileInfo(Vbid vbid) override { 187 // Magma does not support DBFileInfo 188 DBFileInfo vbinfo; 189 return vbinfo; 190 } 191 192 DBFileInfo getAggrDbFileInfo() override { 193 // Magma does not support DBFileInfo 194 DBFileInfo vbinfo; 195 return vbinfo; 196 } 197 198 size_t getItemCount(Vbid vbid) override; 199 200 RollbackResult rollback(Vbid vbid, 201 uint64_t rollbackSeqno, 202 std::shared_ptr<RollbackCB> cb) override; 203 204 void pendingTasks() override { 205 // Magma does not use pendingTasks 206 } 207 208 ENGINE_ERROR_CODE getAllKeys( 209 Vbid vbid, 210 const DiskDocKey& start_key, 211 uint32_t count, 212 std::shared_ptr<Callback<const DiskDocKey&>> cb) override; 213 214 ScanContext* initScanContext( 215 std::shared_ptr<StatusCallback<GetValue>> cb, 216 std::shared_ptr<StatusCallback<CacheLookup>> cl, 217 Vbid vbid, 218 uint64_t startSeqno, 219 DocumentFilter options, 220 ValueFilter valOptions) override; 221 222 scan_error_t scan(ScanContext* sctx) override; 223 224 void destroyScanContext(ScanContext* ctx) override; 225 226 /** 227 * The magmaKVHandle protects magma from a kvstore being dropped 228 * while an API operation is active. This is required because 229 * unlike couchstore which just unlinks the data file, magma 230 * must wait for all threads to exit and then block subsequent 231 * threads from proceeding while the kvstore is being dropped. 232 * Inside the handle is the vbstateMutex. This mutex is used 233 * to protect the vbstate from race conditions when updated. 234 */ 235 struct MagmaKVHandleStruct { 236 std::shared_timed_mutex vbstateMutex; 237 }; 238 using MagmaKVHandle = std::shared_ptr<MagmaKVHandleStruct>; 239 240 std::vector<std::pair<MagmaKVHandle, std::shared_timed_mutex>> 241 magmaKVHandles; 242 getMagmaKVHandle(Vbid vbid)243 const MagmaKVHandle getMagmaKVHandle(Vbid vbid) { 244 std::lock_guard<std::shared_timed_mutex> lock( 245 magmaKVHandles[vbid.get()].second); 246 return magmaKVHandles[vbid.get()].first; 247 } 248 249 class MagmaKVFileHandle : public ::KVFileHandle { 250 public: MagmaKVFileHandle(MagmaKVStore& kvstore, Vbid vbid)251 MagmaKVFileHandle(MagmaKVStore& kvstore, Vbid vbid) 252 : ::KVFileHandle(kvstore), 253 vbid(vbid), 254 kvHandle(kvstore.getMagmaKVHandle(vbid)) { 255 } 256 Vbid vbid; 257 MagmaKVHandle kvHandle; 258 }; 259 260 std::unique_ptr<KVFileHandle, KVFileHandleDeleter> makeFileHandle( 261 Vbid vbid) override; 262 263 void freeFileHandle(KVFileHandle* kvFileHandle) const override { 264 delete kvFileHandle; 265 } 266 267 Collections::VB::PersistedStats getCollectionStats( 268 const KVFileHandle& kvFileHandle, CollectionID collection) override; 269 270 /** 271 * Increment the kvstore revision. 272 */ 273 void prepareToCreateImpl(Vbid vbid) override; 274 275 /** 276 * Soft delete the kvstore. 277 */ 278 uint64_t prepareToDeleteImpl(Vbid vbid) override; 279 280 /** 281 * Retrieve the manifest from the local db. 282 * MagmaKVStore implements this method as a read of 3 _local documents 283 * manifest, open collections, open scopes 284 * 285 * @param vbid vbucket id 286 * @return collections manifest 287 */ 288 Collections::KVStore::Manifest getCollectionsManifest(Vbid vbid) override; 289 290 /** 291 * read local document to get the vector of dropped collections 292 * @param vbid vbucket id 293 * @return a vector of dropped collections (can be empty) 294 */ 295 std::vector<Collections::KVStore::DroppedCollection> getDroppedCollections( 296 Vbid vbid) override; 297 298 /** 299 * This function maintains the set of open collections, adding newly opened 300 * collections and removing those which are dropped. To validate the 301 * creation 302 * of new collections, this method must read the dropped collections. 303 * 304 * @param vbid vbucket id 305 * @param commitBatch current magma commit batch 306 * @param collectionsFlush 307 * @return status magma status 308 */ 309 magma::Status updateCollectionsMeta( 310 Vbid vbid, 311 magma::Magma::CommitBatch& commitBatch, 312 Collections::VB::Flush& collectionsFlush); 313 314 /** 315 * Maintain the current uid committed 316 * 317 * @param vbid vbucket id 318 * @param commitBatch current magma commit batch 319 * @return status magma status 320 */ 321 magma::Status updateManifestUid(magma::Magma::CommitBatch& commitBatch); 322 323 /** 324 * Maintain the list of open collections. The maintenance requires 325 * reading the dropped collections which is passed back to avoid 326 * a reread. 327 * 328 * @param vbid vbucket id 329 * @param commitBatch current magma commit batch 330 * @return pair magma status and dropped collection list 331 */ 332 std::pair<magma::Status, 333 std::vector<Collections::KVStore::DroppedCollection>> 334 updateOpenCollections(Vbid vbid, magma::Magma::CommitBatch& commitBatch); 335 336 /** 337 * Maintain the list of dropped collections 338 * 339 * @param vbid vbucket id 340 * @param commitBatch current magma commit batch 341 * @param dropped This method will only read the dropped collections 342 * from storage if this optional is not initialised 343 * @return status magma status 344 */ 345 magma::Status updateDroppedCollections( 346 Vbid vbid, 347 magma::Magma::CommitBatch& commitBatch, 348 boost::optional< 349 std::vector<Collections::KVStore::DroppedCollection>> 350 dropped); 351 352 /** 353 * Maintain the list of open scopes 354 * 355 * @param vbid vbucket id 356 * @param commitBatch current magma commit batch 357 * @return status magma status 358 */ 359 magma::Status updateScopes(Vbid vbid, 360 magma::Magma::CommitBatch& commitBatch); 361 362 /** 363 * Given a collection id, return the key used to maintain the 364 * collection stats in the local db. 365 * 366 * @param cid Collection ID 367 */ 368 std::string getCollectionsStatsKey(CollectionID cid); 369 370 /** 371 * Save stats for collection cid 372 * 373 * @param commitBatch current magma commit batch 374 * @param cid Collection ID 375 * @param stats The stats that should be persisted 376 */ 377 void saveCollectionStats(magma::Magma::CommitBatch& commitBatch, 378 CollectionID cid, 379 const Collections::VB::PersistedStats& stats); 380 381 /** 382 * Delete the collection stats for the given collection id 383 * 384 * @param commitBatch current magma commit batch 385 * @param cid Collection ID 386 */ 387 magma::Status deleteCollectionStats(magma::Magma::CommitBatch& commitBatch, 388 CollectionID cid); 389 390 /** 391 * Encode a document being stored in the local db by prefixing the 392 * MetaData to the value. 393 * 394 * Magma requires all documents stored in local DB to also include 395 * metadata because it uses the callback functions like getExpiryTime, 396 * isDeleted() and getSeqNum() as part of compaction. 397 */ 398 std::string encodeLocalDoc(Vbid vbid, 399 const std::string& value, 400 bool isDelete); 401 402 /** 403 * Decode a document being stored in the local db by extracting the 404 * MetaData from the value 405 */ 406 std::pair<std::string, bool> decodeLocalDoc(const magma::Slice& valSlice); 407 408 /** 409 * Read from local DB 410 */ 411 std::pair<magma::Status, std::string> readLocalDoc( 412 Vbid vbid, const magma::Slice& keySlice); 413 414 /** 415 * Add a local document to the commitBatch 416 */ 417 magma::Status setLocalDoc(magma::Magma::CommitBatch& commitBatch, 418 const magma::Slice& keySlice, 419 std::string& valBuf, 420 bool deleted = false); 421 422 /** 423 * Encode the cached vbucket_state and magmaInfo into a nlohmann json struct 424 */ 425 nlohmann::json encodeVBState(const vbucket_state& vbstate, 426 MagmaInfo& magmaInfo) const; 427 428 /** 429 * Read the encoded vstate + magmaInfo from the local db into the cache. 430 */ 431 magma::Status readVBStateFromDisk(Vbid vbid); 432 433 /** 434 * Write the encoded vbstate + magmaInfo to the local db. 435 * with the new vbstate as well as write to disk. 436 */ 437 magma::Status writeVBStateToDisk(Vbid vbid, 438 magma::Magma::CommitBatch& commitBatch, 439 vbucket_state& vbs, 440 MagmaInfo& minfo); 441 442 /** 443 * Get vbstate from cache. If cache not populated, read it from disk 444 * and populate cache. If not on disk, return nullptr; 445 * 446 * vbstate and magmaInfo always go together, we should never have a 447 * case where only 1 of them is initialized. So, if vbstate is 448 * uninitilized, assume magmaInfo is as well. 449 */ 450 vbucket_state* getVBucketState(Vbid vbucketId) override; 451 452 // Magma uses a unique logger with a prefix of magma so that all logging 453 // calls from the wrapper thru magma will be prefixed with magma. 454 std::shared_ptr<BucketLogger> logger; 455 456 private: 457 /** 458 * Mamga instance for a shard 459 */ 460 std::unique_ptr<magma::Magma> magma; 461 462 /** 463 * Container for pending Magma requests. 464 * 465 * Using deque as as the expansion behaviour is less aggressive compared to 466 * std::vector (MagmaRequest objects are ~176 bytes in size). 467 */ 468 using PendingRequestQueue = std::deque<MagmaRequest>; 469 470 /* 471 * The DB for each VBucket is created in a separated subfolder of 472 * `configuration.getDBName()`. This function returns the path of the DB 473 * subfolder for the given `vbid`. 474 * 475 * @param vbid vbucket id for the vbucket DB subfolder to return 476 */ 477 std::string getVBDBSubdir(Vbid vbid); 478 479 std::unique_ptr<Item> makeItem(Vbid vb, 480 const magma::Slice& keySlice, 481 const magma::Slice& metaSlice, 482 const magma::Slice& valueSlice, 483 GetMetaOnly getMetaOnly); 484 485 GetValue makeGetValue(Vbid vb, 486 const magma::Slice& keySlice, 487 const magma::Slice& metaSlice, 488 const magma::Slice& valueSlice, 489 GetMetaOnly getMetaOnly = GetMetaOnly::No); 490 491 int saveDocs(Collections::VB::Flush& collectionsFlush, 492 kvstats_ctx& kvctx, 493 const MagmaKVHandle& kvHandle); 494 495 void commitCallback(int status, kvstats_ctx& kvctx); 496 497 static ENGINE_ERROR_CODE magmaErr2EngineErr(magma::Status::Code err, 498 bool found = true); 499 500 // Used for queueing mutation requests (in `set` and `del`) and flushing 501 // them to disk (in `commit`). 502 // unique_ptr for pimpl. 503 std::unique_ptr<PendingRequestQueue> pendingReqs; 504 505 // Magma does *not* need additional synchronisation around 506 // db->Write, but we need to prevent delVBucket racing with 507 // commit, potentially losing data. 508 std::mutex writeLock; 509 510 // This variable is used to verify that the KVStore API is used correctly 511 // when Magma is used as store. "Correctly" means that the caller must 512 // use the API in the following way: 513 // - begin() x1 514 // - set() / del() xN 515 // - commit() 516 bool in_transaction; 517 std::unique_ptr<TransactionContext> transactionCtx; 518 519 // Path to magma files. Include shardID. 520 const std::string magmaPath; 521 522 std::atomic<size_t> scanCounter; // atomic counter for generating scan id 523 524 // Magma does not keep track of docCount, # of persistedDeletes or 525 // revFile internal so we need a mechanism to do that. We use magmaInfo 526 // as the structure to store that and we save magmaInfo with the vbstate. 527 std::vector<std::unique_ptr<MagmaInfo>> cachedMagmaInfo; 528 529 // We need to mimic couchstores ability to turn every batch of items 530 // into a rollback point. This is used for testing only! 531 bool commitPointEveryBatch{false}; 532 533 // Using upsert for Set means we can't keep accurate document totals. 534 // This is used for testing only! 535 bool useUpsertForSet{false}; 536 537 // Get lock on KVHandle and wait for all threads to exit before 538 // returning the lock to the caller. 539 std::unique_lock<std::shared_timed_mutex> getExclusiveKVHandle(Vbid vbid); 540 541 struct MagmaCompactionCtx { MagmaCompactionCtxMagmaKVStore::MagmaCompactionCtx542 MagmaCompactionCtx(compaction_ctx* ctx, MagmaKVHandle kvHandle) 543 : ctx(ctx), kvHandle(kvHandle){}; 544 compaction_ctx* ctx; 545 MagmaKVHandle kvHandle; 546 }; 547 std::vector<std::unique_ptr<MagmaCompactionCtx>> compaction_ctxList; 548 std::mutex compactionCtxMutex; 549 550 class MagmaCompactionCB : public magma::Magma::CompactionCallback { 551 public: 552 MagmaCompactionCB(MagmaKVStore& magmaKVStore); 553 ~MagmaCompactionCB(); operator ()(const magma::Slice& keySlice, const magma::Slice& metaSlice, const magma::Slice& valueSlice)554 bool operator()(const magma::Slice& keySlice, 555 const magma::Slice& metaSlice, 556 const magma::Slice& valueSlice) { 557 return magmaKVStore.compactionCallBack( 558 *this, keySlice, metaSlice, valueSlice); 559 } 560 MagmaKVStore& magmaKVStore; 561 bool initialized = false; 562 compaction_ctx* ctx = nullptr; 563 MagmaKVStore::MagmaKVHandle kvHandle; 564 Vbid vbid; 565 // TODO add code for collections to keep track of # of deletes. 566 // Requires code in ~MagmaCompactionCB() to update magmaInfo docCount 567 }; 568 569 bool compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx, 570 const magma::Slice& keySlice, 571 const magma::Slice& metaSlice, 572 const magma::Slice& valueSlice); 573 574 friend class MagmaCompactionCB; 575 }; 576